Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_subprocesses.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

54 statements  

1from __future__ import annotations 

2 

3import sys 

4from collections.abc import AsyncIterable, Iterable, Mapping, Sequence 

5from io import BytesIO 

6from os import PathLike 

7from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess 

8from typing import IO, Any, Union, cast 

9 

10from ..abc import Process 

11from ._eventloop import get_async_backend 

12from ._tasks import create_task_group 

13 

14if sys.version_info >= (3, 10): 

15 from typing import TypeAlias 

16else: 

17 from typing_extensions import TypeAlias 

18 

19StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] 

20 

21 

22async def run_process( 

23 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

24 *, 

25 input: bytes | None = None, 

26 stdout: int | IO[Any] | None = PIPE, 

27 stderr: int | IO[Any] | None = PIPE, 

28 check: bool = True, 

29 cwd: StrOrBytesPath | None = None, 

30 env: Mapping[str, str] | None = None, 

31 startupinfo: Any = None, 

32 creationflags: int = 0, 

33 start_new_session: bool = False, 

34 pass_fds: Sequence[int] = (), 

35 user: str | int | None = None, 

36 group: str | int | None = None, 

37 extra_groups: Iterable[str | int] | None = None, 

38 umask: int = -1, 

39) -> CompletedProcess[bytes]: 

40 """ 

41 Run an external command in a subprocess and wait until it completes. 

42 

43 .. seealso:: :func:`subprocess.run` 

44 

45 :param command: either a string to pass to the shell, or an iterable of strings 

46 containing the executable name or path and its arguments 

47 :param input: bytes passed to the standard input of the subprocess 

48 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, 

49 a file-like object, or `None` 

50 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, 

51 :data:`subprocess.STDOUT`, a file-like object, or `None` 

52 :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the 

53 process terminates with a return code other than 0 

54 :param cwd: If not ``None``, change the working directory to this before running the 

55 command 

56 :param env: if not ``None``, this mapping replaces the inherited environment 

57 variables from the parent process 

58 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used 

59 to specify process startup parameters (Windows only) 

60 :param creationflags: flags that can be used to control the creation of the 

61 subprocess (see :class:`subprocess.Popen` for the specifics) 

62 :param start_new_session: if ``true`` the setsid() system call will be made in the 

63 child process prior to the execution of the subprocess. (POSIX only) 

64 :param pass_fds: sequence of file descriptors to keep open between the parent and 

65 child processes. (POSIX only) 

66 :param user: effective user to run the process as (Python >= 3.9, POSIX only) 

67 :param group: effective group to run the process as (Python >= 3.9, POSIX only) 

68 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9, 

69 POSIX only) 

70 :param umask: if not negative, this umask is applied in the child process before 

71 running the given command (Python >= 3.9, POSIX only) 

72 :return: an object representing the completed process 

73 :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process 

74 exits with a nonzero return code 

75 

76 """ 

77 

78 async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: 

79 buffer = BytesIO() 

80 async for chunk in stream: 

81 buffer.write(chunk) 

82 

83 stream_contents[index] = buffer.getvalue() 

84 

85 async with await open_process( 

86 command, 

87 stdin=PIPE if input else DEVNULL, 

88 stdout=stdout, 

89 stderr=stderr, 

90 cwd=cwd, 

91 env=env, 

92 startupinfo=startupinfo, 

93 creationflags=creationflags, 

94 start_new_session=start_new_session, 

95 pass_fds=pass_fds, 

96 user=user, 

97 group=group, 

98 extra_groups=extra_groups, 

99 umask=umask, 

100 ) as process: 

101 stream_contents: list[bytes | None] = [None, None] 

102 async with create_task_group() as tg: 

103 if process.stdout: 

104 tg.start_soon(drain_stream, process.stdout, 0) 

105 

106 if process.stderr: 

107 tg.start_soon(drain_stream, process.stderr, 1) 

108 

109 if process.stdin and input: 

110 await process.stdin.send(input) 

111 await process.stdin.aclose() 

112 

113 await process.wait() 

114 

115 output, errors = stream_contents 

116 if check and process.returncode != 0: 

117 raise CalledProcessError(cast(int, process.returncode), command, output, errors) 

118 

119 return CompletedProcess(command, cast(int, process.returncode), output, errors) 

120 

121 

122async def open_process( 

123 command: StrOrBytesPath | Sequence[StrOrBytesPath], 

124 *, 

125 stdin: int | IO[Any] | None = PIPE, 

126 stdout: int | IO[Any] | None = PIPE, 

127 stderr: int | IO[Any] | None = PIPE, 

128 cwd: StrOrBytesPath | None = None, 

129 env: Mapping[str, str] | None = None, 

130 startupinfo: Any = None, 

131 creationflags: int = 0, 

132 start_new_session: bool = False, 

133 pass_fds: Sequence[int] = (), 

134 user: str | int | None = None, 

135 group: str | int | None = None, 

136 extra_groups: Iterable[str | int] | None = None, 

137 umask: int = -1, 

138) -> Process: 

139 """ 

140 Start an external command in a subprocess. 

141 

142 .. seealso:: :class:`subprocess.Popen` 

143 

144 :param command: either a string to pass to the shell, or an iterable of strings 

145 containing the executable name or path and its arguments 

146 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a 

147 file-like object, or ``None`` 

148 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, 

149 a file-like object, or ``None`` 

150 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, 

151 :data:`subprocess.STDOUT`, a file-like object, or ``None`` 

152 :param cwd: If not ``None``, the working directory is changed before executing 

153 :param env: If env is not ``None``, it must be a mapping that defines the 

154 environment variables for the new process 

155 :param creationflags: flags that can be used to control the creation of the 

156 subprocess (see :class:`subprocess.Popen` for the specifics) 

157 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used 

158 to specify process startup parameters (Windows only) 

159 :param start_new_session: if ``true`` the setsid() system call will be made in the 

160 child process prior to the execution of the subprocess. (POSIX only) 

161 :param pass_fds: sequence of file descriptors to keep open between the parent and 

162 child processes. (POSIX only) 

163 :param user: effective user to run the process as (Python >= 3.9; POSIX only) 

164 :param group: effective group to run the process as (Python >= 3.9; POSIX only) 

165 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9; 

166 POSIX only) 

167 :param umask: if not negative, this umask is applied in the child process before 

168 running the given command (Python >= 3.9; POSIX only) 

169 :return: an asynchronous process object 

170 

171 """ 

172 kwargs: dict[str, Any] = {} 

173 if user is not None: 

174 if sys.version_info < (3, 9): 

175 raise TypeError("the 'user' argument requires Python 3.9 or later") 

176 

177 kwargs["user"] = user 

178 

179 if group is not None: 

180 if sys.version_info < (3, 9): 

181 raise TypeError("the 'group' argument requires Python 3.9 or later") 

182 

183 kwargs["group"] = group 

184 

185 if extra_groups is not None: 

186 if sys.version_info < (3, 9): 

187 raise TypeError("the 'extra_groups' argument requires Python 3.9 or later") 

188 

189 kwargs["extra_groups"] = group 

190 

191 if umask >= 0: 

192 if sys.version_info < (3, 9): 

193 raise TypeError("the 'umask' argument requires Python 3.9 or later") 

194 

195 kwargs["umask"] = umask 

196 

197 return await get_async_backend().open_process( 

198 command, 

199 stdin=stdin, 

200 stdout=stdout, 

201 stderr=stderr, 

202 cwd=cwd, 

203 env=env, 

204 startupinfo=startupinfo, 

205 creationflags=creationflags, 

206 start_new_session=start_new_session, 

207 pass_fds=pass_fds, 

208 **kwargs, 

209 )