1"""Run subprocesses with ``subprocess.run()`` and ``subprocess.Popen()``."""
2
3from collections.abc import Iterator, Sequence
4import errno
5import io
6import logging
7import os
8import subprocess
9import sys
10from typing import TypeAlias, overload
11
12from .. import _compat
13
14__all__ = ['run_check', 'ExecutableNotFound', 'CalledProcessError']
15
16
17log = logging.getLogger(__name__)
18
19
20BytesOrStrIterator: TypeAlias = Iterator[bytes] | Iterator[str]
21
22
23@overload
24def run_check(cmd: Sequence[os.PathLike[str] | str], *,
25 input_lines: Iterator[bytes] | None = ...,
26 encoding: None = ...,
27 quiet: bool = ...,
28 **kwargs) -> subprocess.CompletedProcess:
29 """Accept bytes input_lines with default ``encoding=None```."""
30
31
32@overload
33def run_check(cmd: Sequence[os.PathLike[str] | str], *,
34 input_lines: Iterator[str] | None = ...,
35 encoding: str,
36 quiet: bool = ...,
37 **kwargs) -> subprocess.CompletedProcess:
38 """Accept string input_lines when given ``encoding``."""
39
40
41@overload
42def run_check(cmd: Sequence[os.PathLike[str] | str], *,
43 input_lines: BytesOrStrIterator | None = ...,
44 encoding: str | None = ...,
45 capture_output: bool = ...,
46 quiet: bool = ...,
47 **kwargs) -> subprocess.CompletedProcess:
48 """Accept bytes or string input_lines depending on ``encoding``."""
49
50
51def run_check(cmd: Sequence[os.PathLike[str] | str], *,
52 input_lines: BytesOrStrIterator | None = None,
53 encoding: str | None = None,
54 quiet: bool = False,
55 **kwargs) -> subprocess.CompletedProcess:
56 """Run the command described by ``cmd``
57 with ``check=True`` and return its completed process.
58
59 Raises:
60 CalledProcessError: if the returncode of the subprocess is non-zero.
61 """
62 log.debug('run %r', cmd)
63 if not kwargs.pop('check', True): # pragma: no cover
64 raise NotImplementedError('check must be True or omited')
65
66 if encoding is not None:
67 kwargs['encoding'] = encoding
68
69 kwargs.setdefault('startupinfo', _compat.get_startupinfo()) # type: ignore[func-returns-value]
70
71 try:
72 if input_lines is not None:
73 assert kwargs.get('input') is None
74 assert iter(input_lines) is input_lines
75 if kwargs.pop('capture_output'):
76 kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
77 proc = _run_input_lines(cmd, input_lines, kwargs=kwargs)
78 else:
79 proc = subprocess.run(cmd, **kwargs)
80 except OSError as e:
81 if e.errno == errno.ENOENT:
82 raise ExecutableNotFound(cmd) from e
83 raise
84
85 if not quiet and proc.stderr:
86 _write_stderr(proc.stderr)
87
88 try:
89 proc.check_returncode()
90 except subprocess.CalledProcessError as e:
91 raise CalledProcessError(*e.args)
92
93 return proc
94
95
96def _run_input_lines(cmd, input_lines, *, kwargs):
97 popen = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)
98
99 assert isinstance(popen.stdin, io.IOBase)
100 stdin_write = popen.stdin.write
101 for line in input_lines:
102 stdin_write(line)
103
104 stdout, stderr = popen.communicate()
105 return subprocess.CompletedProcess(popen.args, popen.returncode,
106 stdout=stdout, stderr=stderr)
107
108
109def _write_stderr(stderr) -> None:
110 if isinstance(stderr, bytes):
111 stderr_encoding = (getattr(sys.stderr, 'encoding', None)
112 or sys.getdefaultencoding())
113 stderr = stderr.decode(stderr_encoding)
114
115 sys.stderr.write(stderr)
116 sys.stderr.flush()
117 return None
118
119
120class ExecutableNotFound(RuntimeError):
121 """:exc:`RuntimeError` raised if the Graphviz executable is not found."""
122
123 _msg = ('failed to execute {!r}, '
124 'make sure the Graphviz executables are on your systems\' PATH')
125
126 def __init__(self, args) -> None:
127 super().__init__(self._msg.format(*args))
128
129
130class CalledProcessError(subprocess.CalledProcessError):
131 """:exc:`~subprocess.CalledProcessError` raised if a subprocess ``returncode`` is not ``0``.""" # noqa: E501
132
133 def __str__(self) -> str:
134 return f'{super().__str__()} [stderr: {self.stderr!r}]'