Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections.abc as cabc
4import contextlib
5import io
6import os
7import pdb
8import shlex
9import sys
10import tempfile
11import typing as t
12from types import TracebackType
14from . import _compat
15from . import formatting
16from . import termui
17from . import utils
18from ._compat import _find_binary_reader
20if t.TYPE_CHECKING:
21 from _typeshed import ReadableBuffer
23 from .core import Command
26class EchoingStdin:
27 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
28 self._input = input
29 self._output = output
30 self._paused = False
32 def __getattr__(self, x: str) -> t.Any:
33 return getattr(self._input, x)
35 def _echo(self, rv: bytes) -> bytes:
36 if not self._paused:
37 self._output.write(rv)
39 return rv
41 def read(self, n: int = -1) -> bytes:
42 return self._echo(self._input.read(n))
44 def read1(self, n: int = -1) -> bytes:
45 return self._echo(self._input.read1(n)) # type: ignore
47 def readline(self, n: int = -1) -> bytes:
48 return self._echo(self._input.readline(n))
50 def readlines(self) -> list[bytes]:
51 return [self._echo(x) for x in self._input.readlines()]
53 def __iter__(self) -> cabc.Iterator[bytes]:
54 return iter(self._echo(x) for x in self._input)
56 def __repr__(self) -> str:
57 return repr(self._input)
60@contextlib.contextmanager
61def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]:
62 if stream is None:
63 yield
64 else:
65 stream._paused = True
66 yield
67 stream._paused = False
70class BytesIOCopy(io.BytesIO):
71 """Patch ``io.BytesIO`` to let the written stream be copied to another.
73 .. versionadded:: 8.2
74 """
76 def __init__(self, copy_to: io.BytesIO) -> None:
77 super().__init__()
78 self.copy_to = copy_to
80 def flush(self) -> None:
81 super().flush()
82 self.copy_to.flush()
84 def write(self, b: ReadableBuffer) -> int:
85 self.copy_to.write(b)
86 return super().write(b)
89class StreamMixer:
90 """Mixes `<stdout>` and `<stderr>` streams.
92 The result is available in the ``output`` attribute.
94 .. versionadded:: 8.2
95 """
97 def __init__(self) -> None:
98 self.output: io.BytesIO = io.BytesIO()
99 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output)
100 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output)
103class _NamedTextIOWrapper(io.TextIOWrapper):
104 """A :class:`~io.TextIOWrapper` with custom ``name`` and ``mode``
105 that does not close its underlying buffer.
107 An optional ``original_fd`` preserves the file descriptor of the
108 stream being replaced, so that C-level consumers that call
109 :meth:`fileno` (``faulthandler``, ``subprocess``, ...) still work.
110 Inspired by pytest's ``capsys``/``capfd`` split: see :doc:`/testing`
111 for details.
113 .. versionchanged:: 8.3.3
114 Added ``original_fd`` parameter and :meth:`fileno` override.
115 """
117 def __init__(
118 self,
119 buffer: t.BinaryIO,
120 name: str,
121 mode: str,
122 *,
123 original_fd: int = -1,
124 **kwargs: t.Any,
125 ) -> None:
126 super().__init__(buffer, **kwargs)
127 self._name = name
128 self._mode = mode
129 self._original_fd = original_fd
131 def close(self) -> None:
132 """The buffer this object contains belongs to some other object,
133 so prevent the default ``__del__`` implementation from closing
134 that buffer.
136 .. versionadded:: 8.3.2
137 """
139 def fileno(self) -> int:
140 """Return the file descriptor of the original stream, if one was
141 provided at construction time.
143 This allows C-level consumers (``faulthandler``, ``subprocess``,
144 signal handlers, ...) to obtain a valid fd without crashing, even
145 though the Python-level writes are redirected to an in-memory
146 buffer.
148 .. versionadded:: 8.3.3
149 """
150 if self._original_fd >= 0:
151 return self._original_fd
152 return super().fileno()
154 @property
155 def name(self) -> str:
156 return self._name
158 @property
159 def mode(self) -> str:
160 return self._mode
163def make_input_stream(
164 input: str | bytes | t.IO[t.Any] | None, charset: str
165) -> t.BinaryIO:
166 # Is already an input stream.
167 if hasattr(input, "read"):
168 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input))
170 if rv is not None:
171 return rv
173 raise TypeError("Could not find binary reader for input stream.")
175 if input is None:
176 input = b""
177 elif isinstance(input, str):
178 input = input.encode(charset)
180 return io.BytesIO(input)
183class Result:
184 """Holds the captured result of an invoked CLI script.
186 :param runner: The runner that created the result
187 :param stdout_bytes: The standard output as bytes.
188 :param stderr_bytes: The standard error as bytes.
189 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the
190 user would see it in its terminal.
191 :param return_value: The value returned from the invoked command.
192 :param exit_code: The exit code as integer.
193 :param exception: The exception that happened if one did.
194 :param exc_info: Exception information (exception type, exception instance,
195 traceback type).
197 .. versionchanged:: 8.2
198 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and
199 ``mix_stderr`` has been removed.
201 .. versionadded:: 8.0
202 Added ``return_value``.
203 """
205 def __init__(
206 self,
207 runner: CliRunner,
208 stdout_bytes: bytes,
209 stderr_bytes: bytes,
210 output_bytes: bytes,
211 return_value: t.Any,
212 exit_code: int,
213 exception: BaseException | None,
214 exc_info: tuple[type[BaseException], BaseException, TracebackType]
215 | None = None,
216 ):
217 self.runner = runner
218 self.stdout_bytes = stdout_bytes
219 self.stderr_bytes = stderr_bytes
220 self.output_bytes = output_bytes
221 self.return_value = return_value
222 self.exit_code = exit_code
223 self.exception = exception
224 self.exc_info = exc_info
226 @property
227 def output(self) -> str:
228 """The terminal output as unicode string, as the user would see it.
230 .. versionchanged:: 8.2
231 No longer a proxy for ``self.stdout``. Now has its own independent stream
232 that is mixing `<stdout>` and `<stderr>`, in the order they were written.
233 """
234 return self.output_bytes.decode(self.runner.charset, "replace").replace(
235 "\r\n", "\n"
236 )
238 @property
239 def stdout(self) -> str:
240 """The standard output as unicode string."""
241 return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
242 "\r\n", "\n"
243 )
245 @property
246 def stderr(self) -> str:
247 """The standard error as unicode string.
249 .. versionchanged:: 8.2
250 No longer raise an exception, always returns the `<stderr>` string.
251 """
252 return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
253 "\r\n", "\n"
254 )
256 def __repr__(self) -> str:
257 exc_str = repr(self.exception) if self.exception else "okay"
258 return f"<{type(self).__name__} {exc_str}>"
261class CliRunner:
262 """The CLI runner provides functionality to invoke a Click command line
263 script for unittesting purposes in a isolated environment. This only
264 works in single-threaded systems without any concurrency as it changes the
265 global interpreter state.
267 :param charset: the character set for the input and output data.
268 :param env: a dictionary with environment variables for overriding.
269 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes
270 to `<stdout>`. This is useful for showing examples in
271 some circumstances. Note that regular prompts
272 will automatically echo the input.
273 :param catch_exceptions: Whether to catch any exceptions other than
274 ``SystemExit`` when running :meth:`~CliRunner.invoke`.
276 .. versionchanged:: 8.2
277 Added the ``catch_exceptions`` parameter.
279 .. versionchanged:: 8.2
280 ``mix_stderr`` parameter has been removed.
281 """
283 def __init__(
284 self,
285 charset: str = "utf-8",
286 env: cabc.Mapping[str, str | None] | None = None,
287 echo_stdin: bool = False,
288 catch_exceptions: bool = True,
289 ) -> None:
290 self.charset = charset
291 self.env: cabc.Mapping[str, str | None] = env or {}
292 self.echo_stdin = echo_stdin
293 self.catch_exceptions = catch_exceptions
295 def get_default_prog_name(self, cli: Command) -> str:
296 """Given a command object it will return the default program name
297 for it. The default is the `name` attribute or ``"root"`` if not
298 set.
299 """
300 return cli.name or "root"
302 def make_env(
303 self, overrides: cabc.Mapping[str, str | None] | None = None
304 ) -> cabc.Mapping[str, str | None]:
305 """Returns the environment overrides for invoking a script."""
306 rv = dict(self.env)
307 if overrides:
308 rv.update(overrides)
309 return rv
311 @contextlib.contextmanager
312 def isolation(
313 self,
314 input: str | bytes | t.IO[t.Any] | None = None,
315 env: cabc.Mapping[str, str | None] | None = None,
316 color: bool = False,
317 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]:
318 """A context manager that sets up the isolation for invoking of a
319 command line tool. This sets up `<stdin>` with the given input data
320 and `os.environ` with the overrides from the given dictionary.
321 This also rebinds some internals in Click to be mocked (like the
322 prompt functionality).
324 This is automatically done in the :meth:`invoke` method.
326 :param input: the input stream to put into `sys.stdin`.
327 :param env: the environment overrides as dictionary.
328 :param color: whether the output should contain color codes. The
329 application can still override this explicitly.
331 .. versionadded:: 8.2
332 An additional output stream is returned, which is a mix of
333 `<stdout>` and `<stderr>` streams.
335 .. versionchanged:: 8.2
336 Always returns the `<stderr>` stream.
338 .. versionchanged:: 8.0
339 `<stderr>` is opened with ``errors="backslashreplace"``
340 instead of the default ``"strict"``.
342 .. versionchanged:: 4.0
343 Added the ``color`` parameter.
344 """
345 bytes_input = make_input_stream(input, self.charset)
346 echo_input = None
348 old_stdin = sys.stdin
349 old_stdout = sys.stdout
350 old_stderr = sys.stderr
351 old_forced_width = formatting.FORCED_WIDTH
352 formatting.FORCED_WIDTH = 80
354 env = self.make_env(env)
356 stream_mixer = StreamMixer()
358 # Preserve the original file descriptors so that C-level
359 # consumers (faulthandler, subprocess, etc.) can still obtain a
360 # valid fd from the redirected streams. The original streams
361 # may themselves lack a fileno() (e.g. when CliRunner is used
362 # inside pytest's capsys), so we fall back to -1.
363 def _safe_fileno(stream: t.IO[t.Any]) -> int:
364 try:
365 return stream.fileno()
366 except (AttributeError, io.UnsupportedOperation):
367 return -1
369 old_stdout_fd = _safe_fileno(old_stdout)
370 old_stderr_fd = _safe_fileno(old_stderr)
372 if self.echo_stdin:
373 bytes_input = echo_input = t.cast(
374 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout)
375 )
377 sys.stdin = text_input = _NamedTextIOWrapper(
378 bytes_input, encoding=self.charset, name="<stdin>", mode="r"
379 )
381 if self.echo_stdin:
382 # Force unbuffered reads, otherwise TextIOWrapper reads a
383 # large chunk which is echoed early.
384 text_input._CHUNK_SIZE = 1 # type: ignore
386 sys.stdout = _NamedTextIOWrapper(
387 stream_mixer.stdout,
388 encoding=self.charset,
389 name="<stdout>",
390 mode="w",
391 original_fd=old_stdout_fd,
392 )
394 sys.stderr = _NamedTextIOWrapper(
395 stream_mixer.stderr,
396 encoding=self.charset,
397 name="<stderr>",
398 mode="w",
399 errors="backslashreplace",
400 original_fd=old_stderr_fd,
401 )
403 @_pause_echo(echo_input) # type: ignore
404 def visible_input(prompt: str | None = None) -> str:
405 sys.stdout.write(prompt or "")
406 try:
407 val = next(text_input).rstrip("\r\n")
408 except StopIteration as e:
409 raise EOFError() from e
410 sys.stdout.write(f"{val}\n")
411 sys.stdout.flush()
412 return val
414 @_pause_echo(echo_input) # type: ignore
415 def hidden_input(prompt: str | None = None) -> str:
416 sys.stdout.write(f"{prompt or ''}\n")
417 sys.stdout.flush()
418 try:
419 return next(text_input).rstrip("\r\n")
420 except StopIteration as e:
421 raise EOFError() from e
423 @_pause_echo(echo_input) # type: ignore
424 def _getchar(echo: bool) -> str:
425 char = sys.stdin.read(1)
427 if echo:
428 sys.stdout.write(char)
430 sys.stdout.flush()
431 return char
433 default_color = color
435 def should_strip_ansi(
436 stream: t.IO[t.Any] | None = None, color: bool | None = None
437 ) -> bool:
438 if color is None:
439 return not default_color
440 return not color
442 old_visible_prompt_func = termui.visible_prompt_func
443 old_hidden_prompt_func = termui.hidden_prompt_func
444 old__getchar_func = termui._getchar
445 old_should_strip_ansi = utils.should_strip_ansi # type: ignore
446 old__compat_should_strip_ansi = _compat.should_strip_ansi
447 old_pdb_init = pdb.Pdb.__init__
448 termui.visible_prompt_func = visible_input
449 termui.hidden_prompt_func = hidden_input
450 termui._getchar = _getchar
451 utils.should_strip_ansi = should_strip_ansi # type: ignore
452 _compat.should_strip_ansi = should_strip_ansi
454 def _patched_pdb_init(
455 self: pdb.Pdb,
456 completekey: str = "tab",
457 stdin: t.IO[str] | None = None,
458 stdout: t.IO[str] | None = None,
459 **kwargs: t.Any,
460 ) -> None:
461 """Default ``pdb.Pdb`` to real terminal streams during
462 ``CliRunner`` isolation.
464 Without this patch, ``pdb.Pdb.__init__`` inherits from
465 ``cmd.Cmd`` which falls back to ``sys.stdin``/``sys.stdout``
466 when no explicit streams are provided. During isolation
467 those are ``BytesIO``-backed wrappers, so the debugger
468 reads from an empty buffer and writes to captured output,
469 making interactive debugging impossible.
471 By defaulting to ``sys.__stdin__``/``sys.__stdout__`` (the
472 original terminal streams Python preserves regardless of
473 redirection), debuggers can interact with the user while
474 ``click.echo`` output is still captured normally.
476 This covers ``pdb.set_trace()``, ``breakpoint()``,
477 ``pdb.post_mortem()``, and debuggers that subclass
478 ``pdb.Pdb`` (ipdb, pdbpp). Explicit ``stdin``/``stdout``
479 arguments are honored and not overridden. Debuggers that
480 do not subclass ``pdb.Pdb`` (pudb, debugpy) are not
481 covered.
482 """
483 if stdin is None:
484 stdin = sys.__stdin__
485 if stdout is None:
486 stdout = sys.__stdout__
487 old_pdb_init(
488 self, completekey=completekey, stdin=stdin, stdout=stdout, **kwargs
489 )
491 pdb.Pdb.__init__ = _patched_pdb_init # type: ignore[assignment]
493 old_env = {}
494 try:
495 for key, value in env.items():
496 old_env[key] = os.environ.get(key)
497 if value is None:
498 try:
499 del os.environ[key]
500 except Exception:
501 pass
502 else:
503 os.environ[key] = value
504 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output)
505 finally:
506 for key, value in old_env.items():
507 if value is None:
508 try:
509 del os.environ[key]
510 except Exception:
511 pass
512 else:
513 os.environ[key] = value
514 sys.stdout = old_stdout
515 sys.stderr = old_stderr
516 sys.stdin = old_stdin
517 termui.visible_prompt_func = old_visible_prompt_func
518 termui.hidden_prompt_func = old_hidden_prompt_func
519 termui._getchar = old__getchar_func
520 utils.should_strip_ansi = old_should_strip_ansi # type: ignore
521 _compat.should_strip_ansi = old__compat_should_strip_ansi
522 formatting.FORCED_WIDTH = old_forced_width
523 pdb.Pdb.__init__ = old_pdb_init # type: ignore[method-assign]
525 def invoke(
526 self,
527 cli: Command,
528 args: str | cabc.Sequence[str] | None = None,
529 input: str | bytes | t.IO[t.Any] | None = None,
530 env: cabc.Mapping[str, str | None] | None = None,
531 catch_exceptions: bool | None = None,
532 color: bool = False,
533 **extra: t.Any,
534 ) -> Result:
535 """Invokes a command in an isolated environment. The arguments are
536 forwarded directly to the command line script, the `extra` keyword
537 arguments are passed to the :meth:`~clickpkg.Command.main` function of
538 the command.
540 This returns a :class:`Result` object.
542 :param cli: the command to invoke
543 :param args: the arguments to invoke. It may be given as an iterable
544 or a string. When given as string it will be interpreted
545 as a Unix shell command. More details at
546 :func:`shlex.split`.
547 :param input: the input data for `sys.stdin`.
548 :param env: the environment overrides.
549 :param catch_exceptions: Whether to catch any other exceptions than
550 ``SystemExit``. If :data:`None`, the value
551 from :class:`CliRunner` is used.
552 :param extra: the keyword arguments to pass to :meth:`main`.
553 :param color: whether the output should contain color codes. The
554 application can still override this explicitly.
556 .. versionadded:: 8.2
557 The result object has the ``output_bytes`` attribute with
558 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
559 see it in its terminal.
561 .. versionchanged:: 8.2
562 The result object always returns the ``stderr_bytes`` stream.
564 .. versionchanged:: 8.0
565 The result object has the ``return_value`` attribute with
566 the value returned from the invoked command.
568 .. versionchanged:: 4.0
569 Added the ``color`` parameter.
571 .. versionchanged:: 3.0
572 Added the ``catch_exceptions`` parameter.
574 .. versionchanged:: 3.0
575 The result object has the ``exc_info`` attribute with the
576 traceback if available.
577 """
578 exc_info = None
579 if catch_exceptions is None:
580 catch_exceptions = self.catch_exceptions
582 with self.isolation(input=input, env=env, color=color) as outstreams:
583 return_value = None
584 exception: BaseException | None = None
585 exit_code = 0
587 if isinstance(args, str):
588 args = shlex.split(args)
590 try:
591 prog_name = extra.pop("prog_name")
592 except KeyError:
593 prog_name = self.get_default_prog_name(cli)
595 try:
596 return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
597 except SystemExit as e:
598 exc_info = sys.exc_info()
599 e_code = t.cast("int | t.Any | None", e.code)
601 if e_code is None:
602 e_code = 0
604 if e_code != 0:
605 exception = e
607 if not isinstance(e_code, int):
608 sys.stdout.write(str(e_code))
609 sys.stdout.write("\n")
610 e_code = 1
612 exit_code = e_code
614 except Exception as e:
615 if not catch_exceptions:
616 raise
617 exception = e
618 exit_code = 1
619 exc_info = sys.exc_info()
620 finally:
621 sys.stdout.flush()
622 sys.stderr.flush()
623 stdout = outstreams[0].getvalue()
624 stderr = outstreams[1].getvalue()
625 output = outstreams[2].getvalue()
627 return Result(
628 runner=self,
629 stdout_bytes=stdout,
630 stderr_bytes=stderr,
631 output_bytes=output,
632 return_value=return_value,
633 exit_code=exit_code,
634 exception=exception,
635 exc_info=exc_info, # type: ignore
636 )
638 @contextlib.contextmanager
639 def isolated_filesystem(
640 self, temp_dir: str | os.PathLike[str] | None = None
641 ) -> cabc.Iterator[str]:
642 """A context manager that creates a temporary directory and
643 changes the current working directory to it. This isolates tests
644 that affect the contents of the CWD to prevent them from
645 interfering with each other.
647 :param temp_dir: Create the temporary directory under this
648 directory. If given, the created directory is not removed
649 when exiting.
651 .. versionchanged:: 8.0
652 Added the ``temp_dir`` parameter.
653 """
654 cwd = os.getcwd()
655 dt = tempfile.mkdtemp(dir=temp_dir)
656 os.chdir(dt)
658 try:
659 yield dt
660 finally:
661 os.chdir(cwd)
663 if temp_dir is None:
664 import shutil
666 try:
667 shutil.rmtree(dt)
668 except OSError:
669 pass