Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections.abc as cabc
4import contextlib
5import io
6import os
7import shlex
8import sys
9import tempfile
10import typing as t
11from types import TracebackType
13from . import _compat
14from . import formatting
15from . import termui
16from . import utils
17from ._compat import _find_binary_reader
19if t.TYPE_CHECKING:
20 from _typeshed import ReadableBuffer
22 from .core import Command
25class EchoingStdin:
26 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
27 self._input = input
28 self._output = output
29 self._paused = False
31 def __getattr__(self, x: str) -> t.Any:
32 return getattr(self._input, x)
34 def _echo(self, rv: bytes) -> bytes:
35 if not self._paused:
36 self._output.write(rv)
38 return rv
40 def read(self, n: int = -1) -> bytes:
41 return self._echo(self._input.read(n))
43 def read1(self, n: int = -1) -> bytes:
44 return self._echo(self._input.read1(n)) # type: ignore
46 def readline(self, n: int = -1) -> bytes:
47 return self._echo(self._input.readline(n))
49 def readlines(self) -> list[bytes]:
50 return [self._echo(x) for x in self._input.readlines()]
52 def __iter__(self) -> cabc.Iterator[bytes]:
53 return iter(self._echo(x) for x in self._input)
55 def __repr__(self) -> str:
56 return repr(self._input)
59@contextlib.contextmanager
60def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]:
61 if stream is None:
62 yield
63 else:
64 stream._paused = True
65 yield
66 stream._paused = False
69class BytesIOCopy(io.BytesIO):
70 """Patch ``io.BytesIO`` to let the written stream be copied to another.
72 .. versionadded:: 8.2
73 """
75 def __init__(self, copy_to: io.BytesIO) -> None:
76 super().__init__()
77 self.copy_to = copy_to
79 def flush(self) -> None:
80 super().flush()
81 self.copy_to.flush()
83 def write(self, b: ReadableBuffer) -> int:
84 self.copy_to.write(b)
85 return super().write(b)
88class StreamMixer:
89 """Mixes `<stdout>` and `<stderr>` streams.
91 The result is available in the ``output`` attribute.
93 .. versionadded:: 8.2
94 """
96 def __init__(self) -> None:
97 self.output: io.BytesIO = io.BytesIO()
98 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output)
99 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output)
101 def __del__(self) -> None:
102 """
103 Guarantee that embedded file-like objects are closed in a
104 predictable order, protecting against races between
105 self.output being closed and other streams being flushed on close
107 .. versionadded:: 8.2.2
108 """
109 self.stderr.close()
110 self.stdout.close()
111 self.output.close()
114class _NamedTextIOWrapper(io.TextIOWrapper):
115 def __init__(
116 self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
117 ) -> None:
118 super().__init__(buffer, **kwargs)
119 self._name = name
120 self._mode = mode
122 @property
123 def name(self) -> str:
124 return self._name
126 @property
127 def mode(self) -> str:
128 return self._mode
131def make_input_stream(
132 input: str | bytes | t.IO[t.Any] | None, charset: str
133) -> t.BinaryIO:
134 # Is already an input stream.
135 if hasattr(input, "read"):
136 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input))
138 if rv is not None:
139 return rv
141 raise TypeError("Could not find binary reader for input stream.")
143 if input is None:
144 input = b""
145 elif isinstance(input, str):
146 input = input.encode(charset)
148 return io.BytesIO(input)
151class Result:
152 """Holds the captured result of an invoked CLI script.
154 :param runner: The runner that created the result
155 :param stdout_bytes: The standard output as bytes.
156 :param stderr_bytes: The standard error as bytes.
157 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the
158 user would see it in its terminal.
159 :param return_value: The value returned from the invoked command.
160 :param exit_code: The exit code as integer.
161 :param exception: The exception that happened if one did.
162 :param exc_info: Exception information (exception type, exception instance,
163 traceback type).
165 .. versionchanged:: 8.2
166 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and
167 ``mix_stderr`` has been removed.
169 .. versionadded:: 8.0
170 Added ``return_value``.
171 """
173 def __init__(
174 self,
175 runner: CliRunner,
176 stdout_bytes: bytes,
177 stderr_bytes: bytes,
178 output_bytes: bytes,
179 return_value: t.Any,
180 exit_code: int,
181 exception: BaseException | None,
182 exc_info: tuple[type[BaseException], BaseException, TracebackType]
183 | None = None,
184 ):
185 self.runner = runner
186 self.stdout_bytes = stdout_bytes
187 self.stderr_bytes = stderr_bytes
188 self.output_bytes = output_bytes
189 self.return_value = return_value
190 self.exit_code = exit_code
191 self.exception = exception
192 self.exc_info = exc_info
194 @property
195 def output(self) -> str:
196 """The terminal output as unicode string, as the user would see it.
198 .. versionchanged:: 8.2
199 No longer a proxy for ``self.stdout``. Now has its own independent stream
200 that is mixing `<stdout>` and `<stderr>`, in the order they were written.
201 """
202 return self.output_bytes.decode(self.runner.charset, "replace").replace(
203 "\r\n", "\n"
204 )
206 @property
207 def stdout(self) -> str:
208 """The standard output as unicode string."""
209 return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
210 "\r\n", "\n"
211 )
213 @property
214 def stderr(self) -> str:
215 """The standard error as unicode string.
217 .. versionchanged:: 8.2
218 No longer raise an exception, always returns the `<stderr>` string.
219 """
220 return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
221 "\r\n", "\n"
222 )
224 def __repr__(self) -> str:
225 exc_str = repr(self.exception) if self.exception else "okay"
226 return f"<{type(self).__name__} {exc_str}>"
229class CliRunner:
230 """The CLI runner provides functionality to invoke a Click command line
231 script for unittesting purposes in a isolated environment. This only
232 works in single-threaded systems without any concurrency as it changes the
233 global interpreter state.
235 :param charset: the character set for the input and output data.
236 :param env: a dictionary with environment variables for overriding.
237 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes
238 to `<stdout>`. This is useful for showing examples in
239 some circumstances. Note that regular prompts
240 will automatically echo the input.
241 :param catch_exceptions: Whether to catch any exceptions other than
242 ``SystemExit`` when running :meth:`~CliRunner.invoke`.
244 .. versionchanged:: 8.2
245 Added the ``catch_exceptions`` parameter.
247 .. versionchanged:: 8.2
248 ``mix_stderr`` parameter has been removed.
249 """
251 def __init__(
252 self,
253 charset: str = "utf-8",
254 env: cabc.Mapping[str, str | None] | None = None,
255 echo_stdin: bool = False,
256 catch_exceptions: bool = True,
257 ) -> None:
258 self.charset = charset
259 self.env: cabc.Mapping[str, str | None] = env or {}
260 self.echo_stdin = echo_stdin
261 self.catch_exceptions = catch_exceptions
263 def get_default_prog_name(self, cli: Command) -> str:
264 """Given a command object it will return the default program name
265 for it. The default is the `name` attribute or ``"root"`` if not
266 set.
267 """
268 return cli.name or "root"
270 def make_env(
271 self, overrides: cabc.Mapping[str, str | None] | None = None
272 ) -> cabc.Mapping[str, str | None]:
273 """Returns the environment overrides for invoking a script."""
274 rv = dict(self.env)
275 if overrides:
276 rv.update(overrides)
277 return rv
279 @contextlib.contextmanager
280 def isolation(
281 self,
282 input: str | bytes | t.IO[t.Any] | None = None,
283 env: cabc.Mapping[str, str | None] | None = None,
284 color: bool = False,
285 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]:
286 """A context manager that sets up the isolation for invoking of a
287 command line tool. This sets up `<stdin>` with the given input data
288 and `os.environ` with the overrides from the given dictionary.
289 This also rebinds some internals in Click to be mocked (like the
290 prompt functionality).
292 This is automatically done in the :meth:`invoke` method.
294 :param input: the input stream to put into `sys.stdin`.
295 :param env: the environment overrides as dictionary.
296 :param color: whether the output should contain color codes. The
297 application can still override this explicitly.
299 .. versionadded:: 8.2
300 An additional output stream is returned, which is a mix of
301 `<stdout>` and `<stderr>` streams.
303 .. versionchanged:: 8.2
304 Always returns the `<stderr>` stream.
306 .. versionchanged:: 8.0
307 `<stderr>` is opened with ``errors="backslashreplace"``
308 instead of the default ``"strict"``.
310 .. versionchanged:: 4.0
311 Added the ``color`` parameter.
312 """
313 bytes_input = make_input_stream(input, self.charset)
314 echo_input = None
316 old_stdin = sys.stdin
317 old_stdout = sys.stdout
318 old_stderr = sys.stderr
319 old_forced_width = formatting.FORCED_WIDTH
320 formatting.FORCED_WIDTH = 80
322 env = self.make_env(env)
324 stream_mixer = StreamMixer()
326 if self.echo_stdin:
327 bytes_input = echo_input = t.cast(
328 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout)
329 )
331 sys.stdin = text_input = _NamedTextIOWrapper(
332 bytes_input, encoding=self.charset, name="<stdin>", mode="r"
333 )
335 if self.echo_stdin:
336 # Force unbuffered reads, otherwise TextIOWrapper reads a
337 # large chunk which is echoed early.
338 text_input._CHUNK_SIZE = 1 # type: ignore
340 sys.stdout = _NamedTextIOWrapper(
341 stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w"
342 )
344 sys.stderr = _NamedTextIOWrapper(
345 stream_mixer.stderr,
346 encoding=self.charset,
347 name="<stderr>",
348 mode="w",
349 errors="backslashreplace",
350 )
352 @_pause_echo(echo_input) # type: ignore
353 def visible_input(prompt: str | None = None) -> str:
354 sys.stdout.write(prompt or "")
355 try:
356 val = next(text_input).rstrip("\r\n")
357 except StopIteration as e:
358 raise EOFError() from e
359 sys.stdout.write(f"{val}\n")
360 sys.stdout.flush()
361 return val
363 @_pause_echo(echo_input) # type: ignore
364 def hidden_input(prompt: str | None = None) -> str:
365 sys.stdout.write(f"{prompt or ''}\n")
366 sys.stdout.flush()
367 try:
368 return next(text_input).rstrip("\r\n")
369 except StopIteration as e:
370 raise EOFError() from e
372 @_pause_echo(echo_input) # type: ignore
373 def _getchar(echo: bool) -> str:
374 char = sys.stdin.read(1)
376 if echo:
377 sys.stdout.write(char)
379 sys.stdout.flush()
380 return char
382 default_color = color
384 def should_strip_ansi(
385 stream: t.IO[t.Any] | None = None, color: bool | None = None
386 ) -> bool:
387 if color is None:
388 return not default_color
389 return not color
391 old_visible_prompt_func = termui.visible_prompt_func
392 old_hidden_prompt_func = termui.hidden_prompt_func
393 old__getchar_func = termui._getchar
394 old_should_strip_ansi = utils.should_strip_ansi # type: ignore
395 old__compat_should_strip_ansi = _compat.should_strip_ansi
396 termui.visible_prompt_func = visible_input
397 termui.hidden_prompt_func = hidden_input
398 termui._getchar = _getchar
399 utils.should_strip_ansi = should_strip_ansi # type: ignore
400 _compat.should_strip_ansi = should_strip_ansi
402 old_env = {}
403 try:
404 for key, value in env.items():
405 old_env[key] = os.environ.get(key)
406 if value is None:
407 try:
408 del os.environ[key]
409 except Exception:
410 pass
411 else:
412 os.environ[key] = value
413 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output)
414 finally:
415 for key, value in old_env.items():
416 if value is None:
417 try:
418 del os.environ[key]
419 except Exception:
420 pass
421 else:
422 os.environ[key] = value
423 sys.stdout = old_stdout
424 sys.stderr = old_stderr
425 sys.stdin = old_stdin
426 termui.visible_prompt_func = old_visible_prompt_func
427 termui.hidden_prompt_func = old_hidden_prompt_func
428 termui._getchar = old__getchar_func
429 utils.should_strip_ansi = old_should_strip_ansi # type: ignore
430 _compat.should_strip_ansi = old__compat_should_strip_ansi
431 formatting.FORCED_WIDTH = old_forced_width
433 def invoke(
434 self,
435 cli: Command,
436 args: str | cabc.Sequence[str] | None = None,
437 input: str | bytes | t.IO[t.Any] | None = None,
438 env: cabc.Mapping[str, str | None] | None = None,
439 catch_exceptions: bool | None = None,
440 color: bool = False,
441 **extra: t.Any,
442 ) -> Result:
443 """Invokes a command in an isolated environment. The arguments are
444 forwarded directly to the command line script, the `extra` keyword
445 arguments are passed to the :meth:`~clickpkg.Command.main` function of
446 the command.
448 This returns a :class:`Result` object.
450 :param cli: the command to invoke
451 :param args: the arguments to invoke. It may be given as an iterable
452 or a string. When given as string it will be interpreted
453 as a Unix shell command. More details at
454 :func:`shlex.split`.
455 :param input: the input data for `sys.stdin`.
456 :param env: the environment overrides.
457 :param catch_exceptions: Whether to catch any other exceptions than
458 ``SystemExit``. If :data:`None`, the value
459 from :class:`CliRunner` is used.
460 :param extra: the keyword arguments to pass to :meth:`main`.
461 :param color: whether the output should contain color codes. The
462 application can still override this explicitly.
464 .. versionadded:: 8.2
465 The result object has the ``output_bytes`` attribute with
466 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
467 see it in its terminal.
469 .. versionchanged:: 8.2
470 The result object always returns the ``stderr_bytes`` stream.
472 .. versionchanged:: 8.0
473 The result object has the ``return_value`` attribute with
474 the value returned from the invoked command.
476 .. versionchanged:: 4.0
477 Added the ``color`` parameter.
479 .. versionchanged:: 3.0
480 Added the ``catch_exceptions`` parameter.
482 .. versionchanged:: 3.0
483 The result object has the ``exc_info`` attribute with the
484 traceback if available.
485 """
486 exc_info = None
487 if catch_exceptions is None:
488 catch_exceptions = self.catch_exceptions
490 with self.isolation(input=input, env=env, color=color) as outstreams:
491 return_value = None
492 exception: BaseException | None = None
493 exit_code = 0
495 if isinstance(args, str):
496 args = shlex.split(args)
498 try:
499 prog_name = extra.pop("prog_name")
500 except KeyError:
501 prog_name = self.get_default_prog_name(cli)
503 try:
504 return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
505 except SystemExit as e:
506 exc_info = sys.exc_info()
507 e_code = t.cast("int | t.Any | None", e.code)
509 if e_code is None:
510 e_code = 0
512 if e_code != 0:
513 exception = e
515 if not isinstance(e_code, int):
516 sys.stdout.write(str(e_code))
517 sys.stdout.write("\n")
518 e_code = 1
520 exit_code = e_code
522 except Exception as e:
523 if not catch_exceptions:
524 raise
525 exception = e
526 exit_code = 1
527 exc_info = sys.exc_info()
528 finally:
529 sys.stdout.flush()
530 sys.stderr.flush()
531 stdout = outstreams[0].getvalue()
532 stderr = outstreams[1].getvalue()
533 output = outstreams[2].getvalue()
535 return Result(
536 runner=self,
537 stdout_bytes=stdout,
538 stderr_bytes=stderr,
539 output_bytes=output,
540 return_value=return_value,
541 exit_code=exit_code,
542 exception=exception,
543 exc_info=exc_info, # type: ignore
544 )
546 @contextlib.contextmanager
547 def isolated_filesystem(
548 self, temp_dir: str | os.PathLike[str] | None = None
549 ) -> cabc.Iterator[str]:
550 """A context manager that creates a temporary directory and
551 changes the current working directory to it. This isolates tests
552 that affect the contents of the CWD to prevent them from
553 interfering with each other.
555 :param temp_dir: Create the temporary directory under this
556 directory. If given, the created directory is not removed
557 when exiting.
559 .. versionchanged:: 8.0
560 Added the ``temp_dir`` parameter.
561 """
562 cwd = os.getcwd()
563 dt = tempfile.mkdtemp(dir=temp_dir)
564 os.chdir(dt)
566 try:
567 yield dt
568 finally:
569 os.chdir(cwd)
571 if temp_dir is None:
572 import shutil
574 try:
575 shutil.rmtree(dt)
576 except OSError:
577 pass