1from __future__ import annotations
2
3from collections.abc import AsyncIterable, Mapping, Sequence
4from io import BytesIO
5from os import PathLike
6from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
7from typing import IO, Any, cast
8
9from ..abc import Process
10from ._eventloop import get_async_backend
11from ._tasks import create_task_group
12
13
14async def run_process(
15 command: str | bytes | Sequence[str | bytes],
16 *,
17 input: bytes | None = None,
18 stdout: int | IO[Any] | None = PIPE,
19 stderr: int | IO[Any] | None = PIPE,
20 check: bool = True,
21 cwd: str | bytes | PathLike[str] | None = None,
22 env: Mapping[str, str] | None = None,
23 start_new_session: bool = False,
24) -> CompletedProcess[bytes]:
25 """
26 Run an external command in a subprocess and wait until it completes.
27
28 .. seealso:: :func:`subprocess.run`
29
30 :param command: either a string to pass to the shell, or an iterable of strings
31 containing the executable name or path and its arguments
32 :param input: bytes passed to the standard input of the subprocess
33 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
34 a file-like object, or `None`
35 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
36 :data:`subprocess.STDOUT`, a file-like object, or `None`
37 :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the
38 process terminates with a return code other than 0
39 :param cwd: If not ``None``, change the working directory to this before running the
40 command
41 :param env: if not ``None``, this mapping replaces the inherited environment
42 variables from the parent process
43 :param start_new_session: if ``true`` the setsid() system call will be made in the
44 child process prior to the execution of the subprocess. (POSIX only)
45 :return: an object representing the completed process
46 :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
47 exits with a nonzero return code
48
49 """
50
51 async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
52 buffer = BytesIO()
53 async for chunk in stream:
54 buffer.write(chunk)
55
56 stream_contents[index] = buffer.getvalue()
57
58 async with await open_process(
59 command,
60 stdin=PIPE if input else DEVNULL,
61 stdout=stdout,
62 stderr=stderr,
63 cwd=cwd,
64 env=env,
65 start_new_session=start_new_session,
66 ) as process:
67 stream_contents: list[bytes | None] = [None, None]
68 async with create_task_group() as tg:
69 if process.stdout:
70 tg.start_soon(drain_stream, process.stdout, 0)
71
72 if process.stderr:
73 tg.start_soon(drain_stream, process.stderr, 1)
74
75 if process.stdin and input:
76 await process.stdin.send(input)
77 await process.stdin.aclose()
78
79 await process.wait()
80
81 output, errors = stream_contents
82 if check and process.returncode != 0:
83 raise CalledProcessError(cast(int, process.returncode), command, output, errors)
84
85 return CompletedProcess(command, cast(int, process.returncode), output, errors)
86
87
88async def open_process(
89 command: str | bytes | Sequence[str | bytes],
90 *,
91 stdin: int | IO[Any] | None = PIPE,
92 stdout: int | IO[Any] | None = PIPE,
93 stderr: int | IO[Any] | None = PIPE,
94 cwd: str | bytes | PathLike[str] | None = None,
95 env: Mapping[str, str] | None = None,
96 start_new_session: bool = False,
97) -> Process:
98 """
99 Start an external command in a subprocess.
100
101 .. seealso:: :class:`subprocess.Popen`
102
103 :param command: either a string to pass to the shell, or an iterable of strings
104 containing the executable name or path and its arguments
105 :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
106 file-like object, or ``None``
107 :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
108 a file-like object, or ``None``
109 :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
110 :data:`subprocess.STDOUT`, a file-like object, or ``None``
111 :param cwd: If not ``None``, the working directory is changed before executing
112 :param env: If env is not ``None``, it must be a mapping that defines the
113 environment variables for the new process
114 :param start_new_session: if ``true`` the setsid() system call will be made in the
115 child process prior to the execution of the subprocess. (POSIX only)
116 :return: an asynchronous process object
117
118 """
119 if isinstance(command, (str, bytes)):
120 return await get_async_backend().open_process(
121 command,
122 shell=True,
123 stdin=stdin,
124 stdout=stdout,
125 stderr=stderr,
126 cwd=cwd,
127 env=env,
128 start_new_session=start_new_session,
129 )
130 else:
131 return await get_async_backend().open_process(
132 command,
133 shell=False,
134 stdin=stdin,
135 stdout=stdout,
136 stderr=stderr,
137 cwd=cwd,
138 env=env,
139 start_new_session=start_new_session,
140 )