Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections.abc as cabc
4import contextlib
5import io
6import os
7import pdb
8import shlex
9import sys
10import tempfile
11import typing as t
12from types import TracebackType
14from . import _compat
15from . import formatting
16from . import termui
17from . import utils
18from ._compat import _find_binary_reader
20if t.TYPE_CHECKING:
21 from _typeshed import ReadableBuffer
23 from .core import Command
25CaptureMode = t.Literal["sys", "fd"]
28class EchoingStdin:
29 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
30 self._input = input
31 self._output = output
32 self._paused = False
34 def __getattr__(self, x: str) -> t.Any:
35 return getattr(self._input, x)
37 def _echo(self, rv: bytes) -> bytes:
38 if not self._paused:
39 self._output.write(rv)
41 return rv
43 def read(self, n: int = -1) -> bytes:
44 return self._echo(self._input.read(n))
46 def read1(self, n: int = -1) -> bytes:
47 return self._echo(self._input.read1(n)) # type: ignore
49 def readline(self, n: int = -1) -> bytes:
50 return self._echo(self._input.readline(n))
52 def readlines(self) -> list[bytes]:
53 return [self._echo(x) for x in self._input.readlines()]
55 def __iter__(self) -> cabc.Iterator[bytes]:
56 return iter(self._echo(x) for x in self._input)
58 def __repr__(self) -> str:
59 return repr(self._input)
62@contextlib.contextmanager
63def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]:
64 if stream is None:
65 yield
66 else:
67 stream._paused = True
68 yield
69 stream._paused = False
72class _FDCapture:
73 """Redirect a file descriptor to a temporary file for capture.
75 Saves the current target of *targetfd* via :func:`os.dup`, then
76 redirects it to a temporary file via :func:`os.dup2`. On
77 :meth:`stop`, restores the original ``fd`` and returns the captured
78 bytes. Inspired by Pytest's ``FDCapture``.
80 .. versionadded:: 8.4.0
81 """
83 def __init__(self, targetfd: int) -> None:
84 self._targetfd = targetfd
85 self.saved_fd: int = -1
86 self._tmpfile: t.BinaryIO | None = None
88 def start(self) -> None:
89 self.saved_fd = os.dup(self._targetfd)
90 self._tmpfile = tempfile.TemporaryFile(buffering=0)
91 os.dup2(self._tmpfile.fileno(), self._targetfd)
93 def stop(self) -> bytes:
94 assert self._tmpfile is not None, "_FDCapture.start() was not called"
95 os.dup2(self.saved_fd, self._targetfd)
96 os.close(self.saved_fd)
97 self.saved_fd = -1
98 self._tmpfile.seek(0)
99 data = self._tmpfile.read()
100 self._tmpfile.close()
101 self._tmpfile = None
102 return data
105class BytesIOCopy(io.BytesIO):
106 """Patch ``io.BytesIO`` to let the written stream be copied to another.
108 .. versionadded:: 8.2
109 """
111 def __init__(self, copy_to: io.BytesIO) -> None:
112 super().__init__()
113 self.copy_to = copy_to
115 def flush(self) -> None:
116 super().flush()
117 self.copy_to.flush()
119 def write(self, b: ReadableBuffer) -> int:
120 self.copy_to.write(b)
121 return super().write(b)
124class StreamMixer:
125 """Mixes `<stdout>` and `<stderr>` streams.
127 The result is available in the ``output`` attribute.
129 .. versionadded:: 8.2
130 """
132 def __init__(self) -> None:
133 self.output: io.BytesIO = io.BytesIO()
134 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output)
135 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output)
138class _NamedTextIOWrapper(io.TextIOWrapper):
139 """A :class:`~io.TextIOWrapper` with custom ``name`` and ``mode``
140 that does not close its underlying buffer.
142 When ``CliRunner`` runs in ``fd`` mode, ``_original_fd`` is patched to
143 point at the saved (pre-redirection) ``fd``, so C-level consumers that call
144 :meth:`fileno` (like ``faulthandler`` or ``subprocess``) keep working. In
145 the default ``sys`` mode ``_original_fd`` stays at ``-1`` and
146 :meth:`fileno` raises :exc:`io.UnsupportedOperation`, matching the
147 pre-``8.3.3`` behavior.
148 """
150 def __init__(
151 self,
152 buffer: t.BinaryIO,
153 name: str,
154 mode: str,
155 **kwargs: t.Any,
156 ) -> None:
157 super().__init__(buffer, **kwargs)
158 self._name = name
159 self._mode = mode
160 self._original_fd: int = -1
162 def close(self) -> None:
163 """The buffer this object contains belongs to some other object,
164 so prevent the default ``__del__`` implementation from closing
165 that buffer.
167 .. versionadded:: 8.3.2
168 """
170 def fileno(self) -> int:
171 """Return the file descriptor of the saved original stream when
172 ``CliRunner`` runs in ``fd`` mode. Otherwise delegate to
173 :class:`~io.TextIOWrapper`, which raises
174 :exc:`io.UnsupportedOperation` for a ``BytesIO``-backed buffer.
175 """
176 if self._original_fd >= 0:
177 return self._original_fd
178 return super().fileno()
180 @property
181 def name(self) -> str:
182 return self._name
184 @property
185 def mode(self) -> str:
186 return self._mode
189def make_input_stream(
190 input: str | bytes | t.IO[t.Any] | None, charset: str
191) -> t.BinaryIO:
192 # Is already an input stream.
193 if hasattr(input, "read"):
194 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input))
196 if rv is not None:
197 return rv
199 raise TypeError("Could not find binary reader for input stream.")
201 if input is None:
202 input = b""
203 elif isinstance(input, str):
204 input = input.encode(charset)
206 return io.BytesIO(input)
209class Result:
210 """Holds the captured result of an invoked CLI script.
212 :param runner: The runner that created the result
213 :param stdout_bytes: The standard output as bytes.
214 :param stderr_bytes: The standard error as bytes.
215 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the
216 user would see it in its terminal.
217 :param return_value: The value returned from the invoked command.
218 :param exit_code: The exit code as integer.
219 :param exception: The exception that happened if one did.
220 :param exc_info: Exception information (exception type, exception instance,
221 traceback type).
223 .. versionchanged:: 8.2
224 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and
225 ``mix_stderr`` has been removed.
227 .. versionadded:: 8.0
228 Added ``return_value``.
229 """
231 def __init__(
232 self,
233 runner: CliRunner,
234 stdout_bytes: bytes,
235 stderr_bytes: bytes,
236 output_bytes: bytes,
237 return_value: t.Any,
238 exit_code: int,
239 exception: BaseException | None,
240 exc_info: tuple[type[BaseException], BaseException, TracebackType]
241 | None = None,
242 ):
243 self.runner = runner
244 self.stdout_bytes = stdout_bytes
245 self.stderr_bytes = stderr_bytes
246 self.output_bytes = output_bytes
247 self.return_value = return_value
248 self.exit_code = exit_code
249 self.exception = exception
250 self.exc_info = exc_info
252 @property
253 def output(self) -> str:
254 """The terminal output as unicode string, as the user would see it.
256 .. versionchanged:: 8.2
257 No longer a proxy for ``self.stdout``. Now has its own independent stream
258 that is mixing `<stdout>` and `<stderr>`, in the order they were written.
259 """
260 return self.output_bytes.decode(self.runner.charset, "replace").replace(
261 "\r\n", "\n"
262 )
264 @property
265 def stdout(self) -> str:
266 """The standard output as unicode string."""
267 return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
268 "\r\n", "\n"
269 )
271 @property
272 def stderr(self) -> str:
273 """The standard error as unicode string.
275 .. versionchanged:: 8.2
276 No longer raise an exception, always returns the `<stderr>` string.
277 """
278 return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
279 "\r\n", "\n"
280 )
282 def __repr__(self) -> str:
283 exc_str = repr(self.exception) if self.exception else "okay"
284 return f"<{type(self).__name__} {exc_str}>"
287class CliRunner:
288 """The CLI runner provides functionality to invoke a Click command line
289 script for unittesting purposes in a isolated environment. This only
290 works in single-threaded systems without any concurrency as it changes the
291 global interpreter state.
293 :param charset: the character set for the input and output data.
294 :param env: a dictionary with environment variables for overriding.
295 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes
296 to `<stdout>`. This is useful for showing examples in
297 some circumstances. Note that regular prompts
298 will automatically echo the input.
299 :param catch_exceptions: Whether to catch any exceptions other than
300 ``SystemExit`` when running :meth:`~CliRunner.invoke`.
301 :param capture: Selects the output capture strategy. ``sys`` (default)
302 captures Python-level writes only and leaves
303 :meth:`sys.stdout.fileno` raising :exc:`io.UnsupportedOperation`, so
304 user code that calls :func:`os.dup2` on ``sys.stdout.fileno()`` cannot
305 clobber the host runner's stdout. ``fd`` redirects file descriptors
306 ``1`` and ``2`` via :func:`os.dup2` to a temporary file, also catching
307 output from stale stream references, C extensions, and subprocesses.
308 ``fd`` is not supported on Windows.
310 .. versionchanged:: 8.4.0
311 Added the ``capture`` parameter. The default ``sys`` mode no longer
312 exposes the original fd through :meth:`fileno`, reverting the change
313 introduced in ``8.3.3`` that broke Pytest's ``fd``-level capture
314 teardown. Use ``capture="fd"`` to restore that behavior with proper
315 isolation. :issue:`3384`
317 .. versionchanged:: 8.2
318 Added the ``catch_exceptions`` parameter.
320 .. versionchanged:: 8.2
321 ``mix_stderr`` parameter has been removed.
322 """
324 def __init__(
325 self,
326 charset: str = "utf-8",
327 env: cabc.Mapping[str, str | None] | None = None,
328 echo_stdin: bool = False,
329 catch_exceptions: bool = True,
330 capture: CaptureMode = "sys",
331 ) -> None:
332 if capture not in {"sys", "fd"}:
333 raise ValueError(
334 f"capture={capture!r} is not valid. Choose from 'sys' or 'fd'."
335 )
336 if capture == "fd" and sys.platform == "win32":
337 raise ValueError(
338 f"capture={capture!r} is not supported on Windows. Use 'sys'."
339 )
340 self.charset = charset
341 self.env: cabc.Mapping[str, str | None] = env or {}
342 self.echo_stdin = echo_stdin
343 self.catch_exceptions = catch_exceptions
344 self.capture: CaptureMode = capture
346 def get_default_prog_name(self, cli: Command) -> str:
347 """Given a command object it will return the default program name
348 for it. The default is the `name` attribute or ``"root"`` if not
349 set.
350 """
351 return cli.name or "root"
353 def make_env(
354 self, overrides: cabc.Mapping[str, str | None] | None = None
355 ) -> cabc.Mapping[str, str | None]:
356 """Returns the environment overrides for invoking a script."""
357 rv = dict(self.env)
358 if overrides:
359 rv.update(overrides)
360 return rv
362 @contextlib.contextmanager
363 def isolation(
364 self,
365 input: str | bytes | t.IO[t.Any] | None = None,
366 env: cabc.Mapping[str, str | None] | None = None,
367 color: bool = False,
368 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]:
369 """A context manager that sets up the isolation for invoking of a
370 command line tool. This sets up `<stdin>` with the given input data
371 and `os.environ` with the overrides from the given dictionary.
372 This also rebinds some internals in Click to be mocked (like the
373 prompt functionality).
375 This is automatically done in the :meth:`invoke` method.
377 :param input: the input stream to put into `sys.stdin`.
378 :param env: the environment overrides as dictionary.
379 :param color: whether the output should contain color codes. The
380 application can still override this explicitly.
382 .. versionadded:: 8.2
383 An additional output stream is returned, which is a mix of
384 `<stdout>` and `<stderr>` streams.
386 .. versionchanged:: 8.2
387 Always returns the `<stderr>` stream.
389 .. versionchanged:: 8.0
390 `<stderr>` is opened with ``errors="backslashreplace"``
391 instead of the default ``"strict"``.
393 .. versionchanged:: 4.0
394 Added the ``color`` parameter.
395 """
396 bytes_input = make_input_stream(input, self.charset)
397 echo_input = None
399 old_stdin = sys.stdin
400 old_stdout = sys.stdout
401 old_stderr = sys.stderr
402 old_forced_width = formatting.FORCED_WIDTH
403 formatting.FORCED_WIDTH = 80
405 env = self.make_env(env)
407 stream_mixer = StreamMixer()
409 if self.echo_stdin:
410 bytes_input = echo_input = t.cast(
411 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout)
412 )
414 sys.stdin = text_input = _NamedTextIOWrapper(
415 bytes_input, encoding=self.charset, name="<stdin>", mode="r"
416 )
418 if self.echo_stdin:
419 # Force unbuffered reads, otherwise TextIOWrapper reads a
420 # large chunk which is echoed early.
421 text_input._CHUNK_SIZE = 1 # type: ignore
423 sys.stdout = _NamedTextIOWrapper(
424 stream_mixer.stdout,
425 encoding=self.charset,
426 name="<stdout>",
427 mode="w",
428 )
430 sys.stderr = _NamedTextIOWrapper(
431 stream_mixer.stderr,
432 encoding=self.charset,
433 name="<stderr>",
434 mode="w",
435 errors="backslashreplace",
436 )
438 @_pause_echo(echo_input) # type: ignore
439 def visible_input(prompt: str | None = None) -> str:
440 sys.stdout.write(prompt or "")
441 try:
442 val = next(text_input).rstrip("\r\n")
443 except StopIteration as e:
444 raise EOFError() from e
445 sys.stdout.write(f"{val}\n")
446 sys.stdout.flush()
447 return val
449 @_pause_echo(echo_input) # type: ignore
450 def hidden_input(prompt: str | None = None) -> str:
451 sys.stdout.write(f"{prompt or ''}\n")
452 sys.stdout.flush()
453 try:
454 return next(text_input).rstrip("\r\n")
455 except StopIteration as e:
456 raise EOFError() from e
458 @_pause_echo(echo_input) # type: ignore
459 def _getchar(echo: bool) -> str:
460 char = sys.stdin.read(1)
462 if echo:
463 sys.stdout.write(char)
465 sys.stdout.flush()
466 return char
468 default_color = color
470 def should_strip_ansi(
471 stream: t.IO[t.Any] | None = None, color: bool | None = None
472 ) -> bool:
473 if color is None:
474 return not default_color
475 return not color
477 old_visible_prompt_func = termui.visible_prompt_func
478 old_hidden_prompt_func = termui.hidden_prompt_func
479 old__getchar_func = termui._getchar
480 old_should_strip_ansi = utils.should_strip_ansi # type: ignore
481 old__compat_should_strip_ansi = _compat.should_strip_ansi
482 old_pdb_init = pdb.Pdb.__init__
483 termui.visible_prompt_func = visible_input
484 termui.hidden_prompt_func = hidden_input
485 termui._getchar = _getchar
486 utils.should_strip_ansi = should_strip_ansi # type: ignore
487 _compat.should_strip_ansi = should_strip_ansi
489 def _patched_pdb_init(
490 self: pdb.Pdb,
491 completekey: str = "tab",
492 stdin: t.IO[str] | None = None,
493 stdout: t.IO[str] | None = None,
494 **kwargs: t.Any,
495 ) -> None:
496 """Default ``pdb.Pdb`` to real terminal streams during
497 ``CliRunner`` isolation.
499 Without this patch, ``pdb.Pdb.__init__`` inherits from
500 ``cmd.Cmd`` which falls back to ``sys.stdin``/``sys.stdout``
501 when no explicit streams are provided. During isolation
502 those are ``BytesIO``-backed wrappers, so the debugger
503 reads from an empty buffer and writes to captured output,
504 making interactive debugging impossible.
506 By defaulting to ``sys.__stdin__``/``sys.__stdout__`` (the
507 original terminal streams Python preserves regardless of
508 redirection), debuggers can interact with the user while
509 ``click.echo`` output is still captured normally.
511 This covers ``pdb.set_trace()``, ``breakpoint()``,
512 ``pdb.post_mortem()``, and debuggers that subclass
513 ``pdb.Pdb`` (ipdb, pdbpp). Explicit ``stdin``/``stdout``
514 arguments are honored and not overridden. Debuggers that
515 do not subclass ``pdb.Pdb`` (pudb, debugpy) are not
516 covered.
517 """
518 if stdin is None:
519 stdin = sys.__stdin__
520 if stdout is None:
521 stdout = sys.__stdout__
522 old_pdb_init(
523 self, completekey=completekey, stdin=stdin, stdout=stdout, **kwargs
524 )
526 pdb.Pdb.__init__ = _patched_pdb_init # type: ignore[assignment]
528 old_env = {}
529 try:
530 for key, value in env.items():
531 old_env[key] = os.environ.get(key)
532 if value is None:
533 try:
534 del os.environ[key]
535 except Exception:
536 pass
537 else:
538 os.environ[key] = value
539 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output)
540 finally:
541 for key, value in old_env.items():
542 if value is None:
543 try:
544 del os.environ[key]
545 except Exception:
546 pass
547 else:
548 os.environ[key] = value
549 sys.stdout = old_stdout
550 sys.stderr = old_stderr
551 sys.stdin = old_stdin
552 termui.visible_prompt_func = old_visible_prompt_func
553 termui.hidden_prompt_func = old_hidden_prompt_func
554 termui._getchar = old__getchar_func
555 utils.should_strip_ansi = old_should_strip_ansi # type: ignore
556 _compat.should_strip_ansi = old__compat_should_strip_ansi
557 formatting.FORCED_WIDTH = old_forced_width
558 pdb.Pdb.__init__ = old_pdb_init # type: ignore[method-assign]
560 def invoke(
561 self,
562 cli: Command,
563 args: str | cabc.Sequence[str] | None = None,
564 input: str | bytes | t.IO[t.Any] | None = None,
565 env: cabc.Mapping[str, str | None] | None = None,
566 catch_exceptions: bool | None = None,
567 color: bool = False,
568 **extra: t.Any,
569 ) -> Result:
570 """Invokes a command in an isolated environment. The arguments are
571 forwarded directly to the command line script, the `extra` keyword
572 arguments are passed to the :meth:`~clickpkg.Command.main` function of
573 the command.
575 This returns a :class:`Result` object.
577 :param cli: the command to invoke
578 :param args: the arguments to invoke. It may be given as an iterable
579 or a string. When given as string it will be interpreted
580 as a Unix shell command. More details at
581 :func:`shlex.split`.
582 :param input: the input data for `sys.stdin`.
583 :param env: the environment overrides.
584 :param catch_exceptions: Whether to catch any other exceptions than
585 ``SystemExit``. If :data:`None`, the value
586 from :class:`CliRunner` is used.
587 :param extra: the keyword arguments to pass to :meth:`main`.
588 :param color: whether the output should contain color codes. The
589 application can still override this explicitly.
591 .. versionadded:: 8.2
592 The result object has the ``output_bytes`` attribute with
593 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
594 see it in its terminal.
596 .. versionchanged:: 8.2
597 The result object always returns the ``stderr_bytes`` stream.
599 .. versionchanged:: 8.0
600 The result object has the ``return_value`` attribute with
601 the value returned from the invoked command.
603 .. versionchanged:: 4.0
604 Added the ``color`` parameter.
606 .. versionchanged:: 3.0
607 Added the ``catch_exceptions`` parameter.
609 .. versionchanged:: 3.0
610 The result object has the ``exc_info`` attribute with the
611 traceback if available.
612 """
613 exc_info = None
614 if catch_exceptions is None:
615 catch_exceptions = self.catch_exceptions
617 # Set up fd capture before isolation replaces sys.stdout and sys.stderr.
618 cap_out: _FDCapture | None = None
619 cap_err: _FDCapture | None = None
621 if self.capture == "fd":
622 cap_out = _FDCapture(1)
623 cap_err = _FDCapture(2)
624 try:
625 cap_out.start()
626 cap_err.start()
627 except OSError:
628 cap_out = cap_err = None
630 with self.isolation(input=input, env=env, color=color) as outstreams:
631 # Point the captured streams' fileno() at the saved (original)
632 # fd so that C-level consumers like faulthandler keep working
633 # while fd 1/2 are redirected to the capture tmpfile.
634 if cap_out is not None and cap_err is not None:
635 sys.stdout._original_fd = cap_out.saved_fd # type: ignore[union-attr]
636 sys.stderr._original_fd = cap_err.saved_fd # type: ignore[union-attr]
638 return_value = None
639 exception: BaseException | None = None
640 exit_code = 0
642 if isinstance(args, str):
643 args = shlex.split(args)
645 try:
646 prog_name = extra.pop("prog_name")
647 except KeyError:
648 prog_name = self.get_default_prog_name(cli)
650 try:
651 return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
652 except SystemExit as e:
653 exc_info = sys.exc_info()
654 e_code = t.cast("int | t.Any | None", e.code)
656 if e_code is None:
657 e_code = 0
659 if e_code != 0:
660 exception = e
662 if not isinstance(e_code, int):
663 sys.stdout.write(str(e_code))
664 sys.stdout.write("\n")
665 e_code = 1
667 exit_code = e_code
669 except Exception as e:
670 if not catch_exceptions:
671 raise
672 exception = e
673 exit_code = 1
674 exc_info = sys.exc_info()
675 finally:
676 sys.stdout.flush()
677 sys.stderr.flush()
679 # Stop fd capture and merge the captured bytes into
680 # the stdout/stderr BytesIO streams. BytesIOCopy mirrors
681 # those writes into outstreams[2] automatically.
682 if cap_out is not None and cap_err is not None:
683 fd_out = cap_out.stop()
684 fd_err = cap_err.stop()
685 if fd_out:
686 outstreams[0].write(fd_out)
687 if fd_err:
688 outstreams[1].write(fd_err)
690 stdout = outstreams[0].getvalue()
691 stderr = outstreams[1].getvalue()
692 output = outstreams[2].getvalue()
694 return Result(
695 runner=self,
696 stdout_bytes=stdout,
697 stderr_bytes=stderr,
698 output_bytes=output,
699 return_value=return_value,
700 exit_code=exit_code,
701 exception=exception,
702 exc_info=exc_info, # type: ignore
703 )
705 @contextlib.contextmanager
706 def isolated_filesystem(
707 self, temp_dir: str | os.PathLike[str] | None = None
708 ) -> cabc.Iterator[str]:
709 """A context manager that creates a temporary directory and
710 changes the current working directory to it. This isolates tests
711 that affect the contents of the CWD to prevent them from
712 interfering with each other.
714 :param temp_dir: Create the temporary directory under this
715 directory. If given, the created directory is not removed
716 when exiting.
718 .. versionchanged:: 8.0
719 Added the ``temp_dir`` parameter.
720 """
721 cwd = os.getcwd()
722 dt = tempfile.mkdtemp(dir=temp_dir)
723 os.chdir(dt)
725 try:
726 yield dt
727 finally:
728 os.chdir(cwd)
730 if temp_dir is None:
731 import shutil
733 try:
734 shutil.rmtree(dt)
735 except OSError:
736 pass