1from __future__ import annotations
2
3from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
4from io import BytesIO
5from os import PathLike
6from subprocess import PIPE, CalledProcessError, CompletedProcess
7from typing import IO, Any, TypeAlias, cast
8
9from ..abc import Process
10from ._eventloop import get_async_backend
11from ._tasks import create_task_group
12
13StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes]
14
15
16async def run_process(
17 command: StrOrBytesPath | Sequence[StrOrBytesPath],
18 *,
19 input: bytes | None = None,
20 stdin: int | IO[Any] | None = None,
21 stdout: int | IO[Any] | None = PIPE,
22 stderr: int | IO[Any] | None = PIPE,
23 check: bool = True,
24 cwd: StrOrBytesPath | None = None,
25 env: Mapping[str, str] | None = None,
26 startupinfo: Any = None,
27 creationflags: int = 0,
28 start_new_session: bool = False,
29 pass_fds: Sequence[int] = (),
30 user: str | int | None = None,
31 group: str | int | None = None,
32 extra_groups: Iterable[str | int] | None = None,
33 umask: int = -1,
34) -> CompletedProcess[bytes]:
35 """
36 Run an external command in a subprocess and wait until it completes.
37
38 .. seealso:: :func:`subprocess.run`
39
40 :param command: either a string to pass to the shell, or an iterable of strings
41 containing the executable name or path and its arguments
42 :param input: bytes passed to the standard input of the subprocess
43 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
44 a file-like object, or `None`; ``input`` overrides this
45 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
46 a file-like object, or `None`
47 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
48 :data:`subprocess.STDOUT`, a file-like object, or `None`
49 :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the
50 process terminates with a return code other than 0
51 :param cwd: If not ``None``, change the working directory to this before running the
52 command
53 :param env: if not ``None``, this mapping replaces the inherited environment
54 variables from the parent process
55 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
56 to specify process startup parameters (Windows only)
57 :param creationflags: flags that can be used to control the creation of the
58 subprocess (see :class:`subprocess.Popen` for the specifics)
59 :param start_new_session: if ``true`` the setsid() system call will be made in the
60 child process prior to the execution of the subprocess. (POSIX only)
61 :param pass_fds: sequence of file descriptors to keep open between the parent and
62 child processes. (POSIX only)
63 :param user: effective user to run the process as (Python >= 3.9, POSIX only)
64 :param group: effective group to run the process as (Python >= 3.9, POSIX only)
65 :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
66 POSIX only)
67 :param umask: if not negative, this umask is applied in the child process before
68 running the given command (Python >= 3.9, POSIX only)
69 :return: an object representing the completed process
70 :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
71 exits with a nonzero return code
72
73 """
74
75 async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
76 buffer = BytesIO()
77 async for chunk in stream:
78 buffer.write(chunk)
79
80 stream_contents[index] = buffer.getvalue()
81
82 if stdin is not None and input is not None:
83 raise ValueError("only one of stdin and input is allowed")
84
85 async with await open_process(
86 command,
87 stdin=PIPE if input else stdin,
88 stdout=stdout,
89 stderr=stderr,
90 cwd=cwd,
91 env=env,
92 startupinfo=startupinfo,
93 creationflags=creationflags,
94 start_new_session=start_new_session,
95 pass_fds=pass_fds,
96 user=user,
97 group=group,
98 extra_groups=extra_groups,
99 umask=umask,
100 ) as process:
101 stream_contents: list[bytes | None] = [None, None]
102 async with create_task_group() as tg:
103 if process.stdout:
104 tg.start_soon(drain_stream, process.stdout, 0)
105
106 if process.stderr:
107 tg.start_soon(drain_stream, process.stderr, 1)
108
109 if process.stdin and input:
110 await process.stdin.send(input)
111 await process.stdin.aclose()
112
113 await process.wait()
114
115 output, errors = stream_contents
116 if check and process.returncode != 0:
117 raise CalledProcessError(cast(int, process.returncode), command, output, errors)
118
119 return CompletedProcess(command, cast(int, process.returncode), output, errors)
120
121
122async def open_process(
123 command: StrOrBytesPath | Sequence[StrOrBytesPath],
124 *,
125 stdin: int | IO[Any] | None = PIPE,
126 stdout: int | IO[Any] | None = PIPE,
127 stderr: int | IO[Any] | None = PIPE,
128 cwd: StrOrBytesPath | None = None,
129 env: Mapping[str, str] | None = None,
130 startupinfo: Any = None,
131 creationflags: int = 0,
132 start_new_session: bool = False,
133 pass_fds: Sequence[int] = (),
134 user: str | int | None = None,
135 group: str | int | None = None,
136 extra_groups: Iterable[str | int] | None = None,
137 umask: int = -1,
138) -> Process:
139 """
140 Start an external command in a subprocess.
141
142 .. seealso:: :class:`subprocess.Popen`
143
144 :param command: either a string to pass to the shell, or an iterable of strings
145 containing the executable name or path and its arguments
146 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
147 file-like object, or ``None``
148 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
149 a file-like object, or ``None``
150 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
151 :data:`subprocess.STDOUT`, a file-like object, or ``None``
152 :param cwd: If not ``None``, the working directory is changed before executing
153 :param env: If env is not ``None``, it must be a mapping that defines the
154 environment variables for the new process
155 :param creationflags: flags that can be used to control the creation of the
156 subprocess (see :class:`subprocess.Popen` for the specifics)
157 :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
158 to specify process startup parameters (Windows only)
159 :param start_new_session: if ``true`` the setsid() system call will be made in the
160 child process prior to the execution of the subprocess. (POSIX only)
161 :param pass_fds: sequence of file descriptors to keep open between the parent and
162 child processes. (POSIX only)
163 :param user: effective user to run the process as (POSIX only)
164 :param group: effective group to run the process as (POSIX only)
165 :param extra_groups: supplementary groups to set in the subprocess (POSIX only)
166 :param umask: if not negative, this umask is applied in the child process before
167 running the given command (POSIX only)
168 :return: an asynchronous process object
169
170 """
171 kwargs: dict[str, Any] = {}
172 if user is not None:
173 kwargs["user"] = user
174
175 if group is not None:
176 kwargs["group"] = group
177
178 if extra_groups is not None:
179 kwargs["extra_groups"] = group
180
181 if umask >= 0:
182 kwargs["umask"] = umask
183
184 return await get_async_backend().open_process(
185 command,
186 stdin=stdin,
187 stdout=stdout,
188 stderr=stderr,
189 cwd=cwd,
190 env=env,
191 startupinfo=startupinfo,
192 creationflags=creationflags,
193 start_new_session=start_new_session,
194 pass_fds=pass_fds,
195 **kwargs,
196 )