1"""Run subprocesses with ``subprocess.run()`` and ``subprocess.Popen()``."""
2
3import errno
4import logging
5import os
6import subprocess
7import sys
8import typing
9
10from .. import _compat
11
12__all__ = ['run_check', 'ExecutableNotFound', 'CalledProcessError']
13
14
15log = logging.getLogger(__name__)
16
17
18BytesOrStrIterator = typing.Union[typing.Iterator[bytes],
19 typing.Iterator[str]]
20
21
22@typing.overload
23def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
24 input_lines: typing.Optional[typing.Iterator[bytes]] = ...,
25 encoding: None = ...,
26 quiet: bool = ...,
27 **kwargs) -> subprocess.CompletedProcess:
28 """Accept bytes input_lines with default ``encoding=None```."""
29
30
31@typing.overload
32def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
33 input_lines: typing.Optional[typing.Iterator[str]] = ...,
34 encoding: str,
35 quiet: bool = ...,
36 **kwargs) -> subprocess.CompletedProcess:
37 """Accept string input_lines when given ``encoding``."""
38
39
40@typing.overload
41def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
42 input_lines: typing.Optional[BytesOrStrIterator] = ...,
43 encoding: typing.Optional[str] = ...,
44 capture_output: bool = ...,
45 quiet: bool = ...,
46 **kwargs) -> subprocess.CompletedProcess:
47 """Accept bytes or string input_lines depending on ``encoding``."""
48
49
50def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
51 input_lines: typing.Optional[BytesOrStrIterator] = None,
52 encoding: typing.Optional[str] = None,
53 quiet: bool = False,
54 **kwargs) -> subprocess.CompletedProcess:
55 """Run the command described by ``cmd``
56 with ``check=True`` and return its completed process.
57
58 Raises:
59 CalledProcessError: if the returncode of the subprocess is non-zero.
60 """
61 log.debug('run %r', cmd)
62 if not kwargs.pop('check', True): # pragma: no cover
63 raise NotImplementedError('check must be True or omited')
64
65 if encoding is not None:
66 kwargs['encoding'] = encoding
67
68 kwargs.setdefault('startupinfo', _compat.get_startupinfo())
69
70 try:
71 if input_lines is not None:
72 assert kwargs.get('input') is None
73 assert iter(input_lines) is input_lines
74 if kwargs.pop('capture_output'):
75 kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
76 proc = _run_input_lines(cmd, input_lines, kwargs=kwargs)
77 else:
78 proc = subprocess.run(cmd, **kwargs)
79 except OSError as e:
80 if e.errno == errno.ENOENT:
81 raise ExecutableNotFound(cmd) from e
82 raise
83
84 if not quiet and proc.stderr:
85 _write_stderr(proc.stderr)
86
87 try:
88 proc.check_returncode()
89 except subprocess.CalledProcessError as e:
90 raise CalledProcessError(*e.args)
91
92 return proc
93
94
95def _run_input_lines(cmd, input_lines, *, kwargs):
96 popen = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)
97
98 stdin_write = popen.stdin.write
99 for line in input_lines:
100 stdin_write(line)
101
102 stdout, stderr = popen.communicate()
103 return subprocess.CompletedProcess(popen.args, popen.returncode,
104 stdout=stdout, stderr=stderr)
105
106
107def _write_stderr(stderr) -> None:
108 if isinstance(stderr, bytes):
109 stderr_encoding = (getattr(sys.stderr, 'encoding', None)
110 or sys.getdefaultencoding())
111 stderr = stderr.decode(stderr_encoding)
112
113 sys.stderr.write(stderr)
114 sys.stderr.flush()
115 return None
116
117
118class ExecutableNotFound(RuntimeError):
119 """:exc:`RuntimeError` raised if the Graphviz executable is not found."""
120
121 _msg = ('failed to execute {!r}, '
122 'make sure the Graphviz executables are on your systems\' PATH')
123
124 def __init__(self, args) -> None:
125 super().__init__(self._msg.format(*args))
126
127
128class CalledProcessError(subprocess.CalledProcessError):
129 """:exc:`~subprocess.CalledProcessError` raised if a subprocess ``returncode`` is not ``0``.""" # noqa: E501
130
131 def __str__(self) -> 'str':
132 return f'{super().__str__()} [stderr: {self.stderr!r}]'