1"""Run subprocesses with ``subprocess.run()`` and ``subprocess.Popen()``.""" 
    2 
    3from collections.abc import Iterator, Sequence 
    4import errno 
    5import io 
    6import logging 
    7import os 
    8import subprocess 
    9import sys 
    10from typing import TypeAlias, overload 
    11 
    12from .. import _compat 
    13 
    14__all__ = ['run_check', 'ExecutableNotFound', 'CalledProcessError'] 
    15 
    16 
    17log = logging.getLogger(__name__) 
    18 
    19 
    20BytesOrStrIterator: TypeAlias = Iterator[bytes] | Iterator[str] 
    21 
    22 
    23@overload 
    24def run_check(cmd: Sequence[os.PathLike[str] | str], *, 
    25              input_lines: Iterator[bytes] | None = ..., 
    26              encoding: None = ..., 
    27              quiet: bool = ..., 
    28              **kwargs) -> subprocess.CompletedProcess: 
    29    """Accept bytes input_lines with default ``encoding=None```.""" 
    30 
    31 
    32@overload 
    33def run_check(cmd: Sequence[os.PathLike[str] | str], *, 
    34              input_lines: Iterator[str] | None = ..., 
    35              encoding: str, 
    36              quiet: bool = ..., 
    37              **kwargs) -> subprocess.CompletedProcess: 
    38    """Accept string input_lines when given ``encoding``.""" 
    39 
    40 
    41@overload 
    42def run_check(cmd: Sequence[os.PathLike[str] | str], *, 
    43              input_lines: BytesOrStrIterator | None = ..., 
    44              encoding: str | None = ..., 
    45              capture_output: bool = ..., 
    46              quiet: bool = ..., 
    47              **kwargs) -> subprocess.CompletedProcess: 
    48    """Accept bytes or string input_lines depending on ``encoding``.""" 
    49 
    50 
    51def run_check(cmd: Sequence[os.PathLike[str] | str], *, 
    52              input_lines: BytesOrStrIterator | None = None, 
    53              encoding: str | None = None, 
    54              quiet: bool = False, 
    55              **kwargs) -> subprocess.CompletedProcess: 
    56    """Run the command described by ``cmd`` 
    57        with ``check=True`` and return its completed process. 
    58 
    59    Raises: 
    60        CalledProcessError: if the returncode of the subprocess is non-zero. 
    61    """ 
    62    log.debug('run %r', cmd) 
    63    if not kwargs.pop('check', True):  # pragma: no cover 
    64        raise NotImplementedError('check must be True or omited') 
    65 
    66    if encoding is not None: 
    67        kwargs['encoding'] = encoding 
    68 
    69    kwargs.setdefault('startupinfo', _compat.get_startupinfo())  # type: ignore[func-returns-value] 
    70 
    71    try: 
    72        if input_lines is not None: 
    73            assert kwargs.get('input') is None 
    74            assert iter(input_lines) is input_lines 
    75            if kwargs.pop('capture_output'): 
    76                kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE 
    77            proc = _run_input_lines(cmd, input_lines, kwargs=kwargs) 
    78        else: 
    79            proc = subprocess.run(cmd, **kwargs) 
    80    except OSError as e: 
    81        if e.errno == errno.ENOENT: 
    82            raise ExecutableNotFound(cmd) from e 
    83        raise 
    84 
    85    if not quiet and proc.stderr: 
    86        _write_stderr(proc.stderr) 
    87 
    88    try: 
    89        proc.check_returncode() 
    90    except subprocess.CalledProcessError as e: 
    91        raise CalledProcessError(*e.args) 
    92 
    93    return proc 
    94 
    95 
    96def _run_input_lines(cmd, input_lines, *, kwargs): 
    97    popen = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs) 
    98 
    99    assert isinstance(popen.stdin, io.IOBase) 
    100    stdin_write = popen.stdin.write 
    101    for line in input_lines: 
    102        stdin_write(line) 
    103 
    104    stdout, stderr = popen.communicate() 
    105    return subprocess.CompletedProcess(popen.args, popen.returncode, 
    106                                       stdout=stdout, stderr=stderr) 
    107 
    108 
    109def _write_stderr(stderr) -> None: 
    110    if isinstance(stderr, bytes): 
    111        stderr_encoding = (getattr(sys.stderr, 'encoding', None) 
    112                           or sys.getdefaultencoding()) 
    113        stderr = stderr.decode(stderr_encoding) 
    114 
    115    sys.stderr.write(stderr) 
    116    sys.stderr.flush() 
    117    return None 
    118 
    119 
    120class ExecutableNotFound(RuntimeError): 
    121    """:exc:`RuntimeError` raised if the Graphviz executable is not found.""" 
    122 
    123    _msg = ('failed to execute {!r}, ' 
    124            'make sure the Graphviz executables are on your systems\' PATH') 
    125 
    126    def __init__(self, args) -> None: 
    127        super().__init__(self._msg.format(*args)) 
    128 
    129 
    130class CalledProcessError(subprocess.CalledProcessError): 
    131    """:exc:`~subprocess.CalledProcessError` raised if a subprocess ``returncode`` is not ``0``."""  # noqa: E501 
    132 
    133    def __str__(self) -> str: 
    134        return f'{super().__str__()} [stderr: {self.stderr!r}]'