1from __future__ import annotations
2
3import sys
4from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
5from io import BytesIO
6from os import PathLike
7from subprocess import PIPE, CalledProcessError, CompletedProcess
8from typing import IO, Any, Union, cast
9
10from ..abc import Process
11from ._eventloop import get_async_backend
12from ._tasks import create_task_group
13
14if sys.version_info >= (3, 10):
15 from typing import TypeAlias
16else:
17 from typing_extensions import TypeAlias
18
19StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
20
21
22async def run_process(
23 command: StrOrBytesPath | Sequence[StrOrBytesPath],
24 *,
25 input: bytes | None = None,
26 stdin: int | IO[Any] | None = None,
27 stdout: int | IO[Any] | None = PIPE,
28 stderr: int | IO[Any] | None = PIPE,
29 check: bool = True,
30 cwd: StrOrBytesPath | None = None,
31 env: Mapping[str, str] | None = None,
32 startupinfo: Any = None,
33 creationflags: int = 0,
34 start_new_session: bool = False,
35 pass_fds: Sequence[int] = (),
36 user: str | int | None = None,
37 group: str | int | None = None,
38 extra_groups: Iterable[str | int] | None = None,
39 umask: int = -1,
40) -> CompletedProcess[bytes]:
41 """
42 Run an external command in a subprocess and wait until it completes.
43
44 .. seealso:: :func:`subprocess.run`
45
46 :param command: either a string to pass to the shell, or an iterable of strings
47 containing the executable name or path and its arguments
48 :param input: bytes passed to the standard input of the subprocess
49 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
50 a file-like object, or `None`; ``input`` overrides this
51 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
52 a file-like object, or `None`
53 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
54 :data:`subprocess.STDOUT`, a file-like object, or `None`
55 :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the
56 process terminates with a return code other than 0
57 :param cwd: If not ``None``, change the working directory to this before running the
58 command
59 :param env: if not ``None``, this mapping replaces the inherited environment
60 variables from the parent process
61 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
62 to specify process startup parameters (Windows only)
63 :param creationflags: flags that can be used to control the creation of the
64 subprocess (see :class:`subprocess.Popen` for the specifics)
65 :param start_new_session: if ``true`` the setsid() system call will be made in the
66 child process prior to the execution of the subprocess. (POSIX only)
67 :param pass_fds: sequence of file descriptors to keep open between the parent and
68 child processes. (POSIX only)
69 :param user: effective user to run the process as (Python >= 3.9, POSIX only)
70 :param group: effective group to run the process as (Python >= 3.9, POSIX only)
71 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
72 POSIX only)
73 :param umask: if not negative, this umask is applied in the child process before
74 running the given command (Python >= 3.9, POSIX only)
75 :return: an object representing the completed process
76 :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
77 exits with a nonzero return code
78
79 """
80
81 async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
82 buffer = BytesIO()
83 async for chunk in stream:
84 buffer.write(chunk)
85
86 stream_contents[index] = buffer.getvalue()
87
88 if stdin is not None and input is not None:
89 raise ValueError("only one of stdin and input is allowed")
90
91 async with await open_process(
92 command,
93 stdin=PIPE if input else stdin,
94 stdout=stdout,
95 stderr=stderr,
96 cwd=cwd,
97 env=env,
98 startupinfo=startupinfo,
99 creationflags=creationflags,
100 start_new_session=start_new_session,
101 pass_fds=pass_fds,
102 user=user,
103 group=group,
104 extra_groups=extra_groups,
105 umask=umask,
106 ) as process:
107 stream_contents: list[bytes | None] = [None, None]
108 async with create_task_group() as tg:
109 if process.stdout:
110 tg.start_soon(drain_stream, process.stdout, 0)
111
112 if process.stderr:
113 tg.start_soon(drain_stream, process.stderr, 1)
114
115 if process.stdin and input:
116 await process.stdin.send(input)
117 await process.stdin.aclose()
118
119 await process.wait()
120
121 output, errors = stream_contents
122 if check and process.returncode != 0:
123 raise CalledProcessError(cast(int, process.returncode), command, output, errors)
124
125 return CompletedProcess(command, cast(int, process.returncode), output, errors)
126
127
128async def open_process(
129 command: StrOrBytesPath | Sequence[StrOrBytesPath],
130 *,
131 stdin: int | IO[Any] | None = PIPE,
132 stdout: int | IO[Any] | None = PIPE,
133 stderr: int | IO[Any] | None = PIPE,
134 cwd: StrOrBytesPath | None = None,
135 env: Mapping[str, str] | None = None,
136 startupinfo: Any = None,
137 creationflags: int = 0,
138 start_new_session: bool = False,
139 pass_fds: Sequence[int] = (),
140 user: str | int | None = None,
141 group: str | int | None = None,
142 extra_groups: Iterable[str | int] | None = None,
143 umask: int = -1,
144) -> Process:
145 """
146 Start an external command in a subprocess.
147
148 .. seealso:: :class:`subprocess.Popen`
149
150 :param command: either a string to pass to the shell, or an iterable of strings
151 containing the executable name or path and its arguments
152 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
153 file-like object, or ``None``
154 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
155 a file-like object, or ``None``
156 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
157 :data:`subprocess.STDOUT`, a file-like object, or ``None``
158 :param cwd: If not ``None``, the working directory is changed before executing
159 :param env: If env is not ``None``, it must be a mapping that defines the
160 environment variables for the new process
161 :param creationflags: flags that can be used to control the creation of the
162 subprocess (see :class:`subprocess.Popen` for the specifics)
163 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
164 to specify process startup parameters (Windows only)
165 :param start_new_session: if ``true`` the setsid() system call will be made in the
166 child process prior to the execution of the subprocess. (POSIX only)
167 :param pass_fds: sequence of file descriptors to keep open between the parent and
168 child processes. (POSIX only)
169 :param user: effective user to run the process as (POSIX only)
170 :param group: effective group to run the process as (POSIX only)
171 :param extra_groups: supplementary groups to set in the subprocess (POSIX only)
172 :param umask: if not negative, this umask is applied in the child process before
173 running the given command (POSIX only)
174 :return: an asynchronous process object
175
176 """
177 kwargs: dict[str, Any] = {}
178 if user is not None:
179 kwargs["user"] = user
180
181 if group is not None:
182 kwargs["group"] = group
183
184 if extra_groups is not None:
185 kwargs["extra_groups"] = group
186
187 if umask >= 0:
188 kwargs["umask"] = umask
189
190 return await get_async_backend().open_process(
191 command,
192 stdin=stdin,
193 stdout=stdout,
194 stderr=stderr,
195 cwd=cwd,
196 env=env,
197 startupinfo=startupinfo,
198 creationflags=creationflags,
199 start_new_session=start_new_session,
200 pass_fds=pass_fds,
201 **kwargs,
202 )