1from __future__ import annotations
2
3import sys
4from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
5from io import BytesIO
6from os import PathLike
7from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
8from typing import IO, Any, Union, cast
9
10from ..abc import Process
11from ._eventloop import get_async_backend
12from ._tasks import create_task_group
13
14if sys.version_info >= (3, 10):
15 from typing import TypeAlias
16else:
17 from typing_extensions import TypeAlias
18
19StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
20
21
22async def run_process(
23 command: StrOrBytesPath | Sequence[StrOrBytesPath],
24 *,
25 input: bytes | None = None,
26 stdout: int | IO[Any] | None = PIPE,
27 stderr: int | IO[Any] | None = PIPE,
28 check: bool = True,
29 cwd: StrOrBytesPath | None = None,
30 env: Mapping[str, str] | None = None,
31 startupinfo: Any = None,
32 creationflags: int = 0,
33 start_new_session: bool = False,
34 pass_fds: Sequence[int] = (),
35 user: str | int | None = None,
36 group: str | int | None = None,
37 extra_groups: Iterable[str | int] | None = None,
38 umask: int = -1,
39) -> CompletedProcess[bytes]:
40 """
41 Run an external command in a subprocess and wait until it completes.
42
43 .. seealso:: :func:`subprocess.run`
44
45 :param command: either a string to pass to the shell, or an iterable of strings
46 containing the executable name or path and its arguments
47 :param input: bytes passed to the standard input of the subprocess
48 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
49 a file-like object, or `None`
50 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
51 :data:`subprocess.STDOUT`, a file-like object, or `None`
52 :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the
53 process terminates with a return code other than 0
54 :param cwd: If not ``None``, change the working directory to this before running the
55 command
56 :param env: if not ``None``, this mapping replaces the inherited environment
57 variables from the parent process
58 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
59 to specify process startup parameters (Windows only)
60 :param creationflags: flags that can be used to control the creation of the
61 subprocess (see :class:`subprocess.Popen` for the specifics)
62 :param start_new_session: if ``true`` the setsid() system call will be made in the
63 child process prior to the execution of the subprocess. (POSIX only)
64 :param pass_fds: sequence of file descriptors to keep open between the parent and
65 child processes. (POSIX only)
66 :param user: effective user to run the process as (Python >= 3.9, POSIX only)
67 :param group: effective group to run the process as (Python >= 3.9, POSIX only)
68 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
69 POSIX only)
70 :param umask: if not negative, this umask is applied in the child process before
71 running the given command (Python >= 3.9, POSIX only)
72 :return: an object representing the completed process
73 :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
74 exits with a nonzero return code
75
76 """
77
78 async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
79 buffer = BytesIO()
80 async for chunk in stream:
81 buffer.write(chunk)
82
83 stream_contents[index] = buffer.getvalue()
84
85 async with await open_process(
86 command,
87 stdin=PIPE if input else DEVNULL,
88 stdout=stdout,
89 stderr=stderr,
90 cwd=cwd,
91 env=env,
92 startupinfo=startupinfo,
93 creationflags=creationflags,
94 start_new_session=start_new_session,
95 pass_fds=pass_fds,
96 user=user,
97 group=group,
98 extra_groups=extra_groups,
99 umask=umask,
100 ) as process:
101 stream_contents: list[bytes | None] = [None, None]
102 async with create_task_group() as tg:
103 if process.stdout:
104 tg.start_soon(drain_stream, process.stdout, 0)
105
106 if process.stderr:
107 tg.start_soon(drain_stream, process.stderr, 1)
108
109 if process.stdin and input:
110 await process.stdin.send(input)
111 await process.stdin.aclose()
112
113 await process.wait()
114
115 output, errors = stream_contents
116 if check and process.returncode != 0:
117 raise CalledProcessError(cast(int, process.returncode), command, output, errors)
118
119 return CompletedProcess(command, cast(int, process.returncode), output, errors)
120
121
122async def open_process(
123 command: StrOrBytesPath | Sequence[StrOrBytesPath],
124 *,
125 stdin: int | IO[Any] | None = PIPE,
126 stdout: int | IO[Any] | None = PIPE,
127 stderr: int | IO[Any] | None = PIPE,
128 cwd: StrOrBytesPath | None = None,
129 env: Mapping[str, str] | None = None,
130 startupinfo: Any = None,
131 creationflags: int = 0,
132 start_new_session: bool = False,
133 pass_fds: Sequence[int] = (),
134 user: str | int | None = None,
135 group: str | int | None = None,
136 extra_groups: Iterable[str | int] | None = None,
137 umask: int = -1,
138) -> Process:
139 """
140 Start an external command in a subprocess.
141
142 .. seealso:: :class:`subprocess.Popen`
143
144 :param command: either a string to pass to the shell, or an iterable of strings
145 containing the executable name or path and its arguments
146 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
147 file-like object, or ``None``
148 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
149 a file-like object, or ``None``
150 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
151 :data:`subprocess.STDOUT`, a file-like object, or ``None``
152 :param cwd: If not ``None``, the working directory is changed before executing
153 :param env: If env is not ``None``, it must be a mapping that defines the
154 environment variables for the new process
155 :param creationflags: flags that can be used to control the creation of the
156 subprocess (see :class:`subprocess.Popen` for the specifics)
157 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
158 to specify process startup parameters (Windows only)
159 :param start_new_session: if ``true`` the setsid() system call will be made in the
160 child process prior to the execution of the subprocess. (POSIX only)
161 :param pass_fds: sequence of file descriptors to keep open between the parent and
162 child processes. (POSIX only)
163 :param user: effective user to run the process as (Python >= 3.9; POSIX only)
164 :param group: effective group to run the process as (Python >= 3.9; POSIX only)
165 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9;
166 POSIX only)
167 :param umask: if not negative, this umask is applied in the child process before
168 running the given command (Python >= 3.9; POSIX only)
169 :return: an asynchronous process object
170
171 """
172 kwargs: dict[str, Any] = {}
173 if user is not None:
174 if sys.version_info < (3, 9):
175 raise TypeError("the 'user' argument requires Python 3.9 or later")
176
177 kwargs["user"] = user
178
179 if group is not None:
180 if sys.version_info < (3, 9):
181 raise TypeError("the 'group' argument requires Python 3.9 or later")
182
183 kwargs["group"] = group
184
185 if extra_groups is not None:
186 if sys.version_info < (3, 9):
187 raise TypeError("the 'extra_groups' argument requires Python 3.9 or later")
188
189 kwargs["extra_groups"] = group
190
191 if umask >= 0:
192 if sys.version_info < (3, 9):
193 raise TypeError("the 'umask' argument requires Python 3.9 or later")
194
195 kwargs["umask"] = umask
196
197 return await get_async_backend().open_process(
198 command,
199 stdin=stdin,
200 stdout=stdout,
201 stderr=stderr,
202 cwd=cwd,
203 env=env,
204 startupinfo=startupinfo,
205 creationflags=creationflags,
206 start_new_session=start_new_session,
207 pass_fds=pass_fds,
208 **kwargs,
209 )