Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections.abc as cabc
4import contextlib
5import io
6import os
7import shlex
8import shutil
9import sys
10import tempfile
11import typing as t
12from types import TracebackType
14from . import _compat
15from . import formatting
16from . import termui
17from . import utils
18from ._compat import _find_binary_reader
20if t.TYPE_CHECKING:
21 from _typeshed import ReadableBuffer
23 from .core import Command
26class EchoingStdin:
27 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
28 self._input = input
29 self._output = output
30 self._paused = False
32 def __getattr__(self, x: str) -> t.Any:
33 return getattr(self._input, x)
35 def _echo(self, rv: bytes) -> bytes:
36 if not self._paused:
37 self._output.write(rv)
39 return rv
41 def read(self, n: int = -1) -> bytes:
42 return self._echo(self._input.read(n))
44 def read1(self, n: int = -1) -> bytes:
45 return self._echo(self._input.read1(n)) # type: ignore
47 def readline(self, n: int = -1) -> bytes:
48 return self._echo(self._input.readline(n))
50 def readlines(self) -> list[bytes]:
51 return [self._echo(x) for x in self._input.readlines()]
53 def __iter__(self) -> cabc.Iterator[bytes]:
54 return iter(self._echo(x) for x in self._input)
56 def __repr__(self) -> str:
57 return repr(self._input)
60@contextlib.contextmanager
61def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]:
62 if stream is None:
63 yield
64 else:
65 stream._paused = True
66 yield
67 stream._paused = False
70class BytesIOCopy(io.BytesIO):
71 """Patch ``io.BytesIO`` to let the written stream be copied to another.
73 .. versionadded:: 8.2
74 """
76 def __init__(self, copy_to: io.BytesIO) -> None:
77 super().__init__()
78 self.copy_to = copy_to
80 def flush(self) -> None:
81 super().flush()
82 self.copy_to.flush()
84 def write(self, b: ReadableBuffer) -> int:
85 self.copy_to.write(b)
86 return super().write(b)
89class StreamMixer:
90 """Mixes `<stdout>` and `<stderr>` streams.
92 The result is available in the ``output`` attribute.
94 .. versionadded:: 8.2
95 """
97 def __init__(self) -> None:
98 self.output: io.BytesIO = io.BytesIO()
99 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output)
100 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output)
103class _NamedTextIOWrapper(io.TextIOWrapper):
104 def __init__(
105 self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
106 ) -> None:
107 super().__init__(buffer, **kwargs)
108 self._name = name
109 self._mode = mode
111 @property
112 def name(self) -> str:
113 return self._name
115 @property
116 def mode(self) -> str:
117 return self._mode
119 def __next__(self) -> str: # type: ignore
120 try:
121 line = super().__next__()
122 except StopIteration as e:
123 raise EOFError() from e
124 return line
127def make_input_stream(
128 input: str | bytes | t.IO[t.Any] | None, charset: str
129) -> t.BinaryIO:
130 # Is already an input stream.
131 if hasattr(input, "read"):
132 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input))
134 if rv is not None:
135 return rv
137 raise TypeError("Could not find binary reader for input stream.")
139 if input is None:
140 input = b""
141 elif isinstance(input, str):
142 input = input.encode(charset)
144 return io.BytesIO(input)
147class Result:
148 """Holds the captured result of an invoked CLI script.
150 :param runner: The runner that created the result
151 :param stdout_bytes: The standard output as bytes.
152 :param stderr_bytes: The standard error as bytes.
153 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the
154 user would see it in its terminal.
155 :param return_value: The value returned from the invoked command.
156 :param exit_code: The exit code as integer.
157 :param exception: The exception that happened if one did.
158 :param exc_info: Exception information (exception type, exception instance,
159 traceback type).
161 .. versionchanged:: 8.2
162 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and
163 ``mix_stderr`` has been removed.
165 .. versionadded:: 8.0
166 Added ``return_value``.
167 """
169 def __init__(
170 self,
171 runner: CliRunner,
172 stdout_bytes: bytes,
173 stderr_bytes: bytes,
174 output_bytes: bytes,
175 return_value: t.Any,
176 exit_code: int,
177 exception: BaseException | None,
178 exc_info: tuple[type[BaseException], BaseException, TracebackType]
179 | None = None,
180 ):
181 self.runner = runner
182 self.stdout_bytes = stdout_bytes
183 self.stderr_bytes = stderr_bytes
184 self.output_bytes = output_bytes
185 self.return_value = return_value
186 self.exit_code = exit_code
187 self.exception = exception
188 self.exc_info = exc_info
190 @property
191 def output(self) -> str:
192 """The terminal output as unicode string, as the user would see it.
194 .. versionchanged:: 8.2
195 No longer a proxy for ``self.stdout``. Now has its own independent stream
196 that is mixing `<stdout>` and `<stderr>`, in the order they were written.
197 """
198 return self.output_bytes.decode(self.runner.charset, "replace").replace(
199 "\r\n", "\n"
200 )
202 @property
203 def stdout(self) -> str:
204 """The standard output as unicode string."""
205 return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
206 "\r\n", "\n"
207 )
209 @property
210 def stderr(self) -> str:
211 """The standard error as unicode string.
213 .. versionchanged:: 8.2
214 No longer raise an exception, always returns the `<stderr>` string.
215 """
216 return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
217 "\r\n", "\n"
218 )
220 def __repr__(self) -> str:
221 exc_str = repr(self.exception) if self.exception else "okay"
222 return f"<{type(self).__name__} {exc_str}>"
225class CliRunner:
226 """The CLI runner provides functionality to invoke a Click command line
227 script for unittesting purposes in a isolated environment. This only
228 works in single-threaded systems without any concurrency as it changes the
229 global interpreter state.
231 :param charset: the character set for the input and output data.
232 :param env: a dictionary with environment variables for overriding.
233 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes
234 to `<stdout>`. This is useful for showing examples in
235 some circumstances. Note that regular prompts
236 will automatically echo the input.
237 :param catch_exceptions: Whether to catch any exceptions other than
238 ``SystemExit`` when running :meth:`~CliRunner.invoke`.
240 .. versionchanged:: 8.2
241 Added the ``catch_exceptions`` parameter.
243 .. versionchanged:: 8.2
244 ``mix_stderr`` parameter has been removed.
245 """
247 def __init__(
248 self,
249 charset: str = "utf-8",
250 env: cabc.Mapping[str, str | None] | None = None,
251 echo_stdin: bool = False,
252 catch_exceptions: bool = True,
253 ) -> None:
254 self.charset = charset
255 self.env: cabc.Mapping[str, str | None] = env or {}
256 self.echo_stdin = echo_stdin
257 self.catch_exceptions = catch_exceptions
259 def get_default_prog_name(self, cli: Command) -> str:
260 """Given a command object it will return the default program name
261 for it. The default is the `name` attribute or ``"root"`` if not
262 set.
263 """
264 return cli.name or "root"
266 def make_env(
267 self, overrides: cabc.Mapping[str, str | None] | None = None
268 ) -> cabc.Mapping[str, str | None]:
269 """Returns the environment overrides for invoking a script."""
270 rv = dict(self.env)
271 if overrides:
272 rv.update(overrides)
273 return rv
275 @contextlib.contextmanager
276 def isolation(
277 self,
278 input: str | bytes | t.IO[t.Any] | None = None,
279 env: cabc.Mapping[str, str | None] | None = None,
280 color: bool = False,
281 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]:
282 """A context manager that sets up the isolation for invoking of a
283 command line tool. This sets up `<stdin>` with the given input data
284 and `os.environ` with the overrides from the given dictionary.
285 This also rebinds some internals in Click to be mocked (like the
286 prompt functionality).
288 This is automatically done in the :meth:`invoke` method.
290 :param input: the input stream to put into `sys.stdin`.
291 :param env: the environment overrides as dictionary.
292 :param color: whether the output should contain color codes. The
293 application can still override this explicitly.
295 .. versionadded:: 8.2
296 An additional output stream is returned, which is a mix of
297 `<stdout>` and `<stderr>` streams.
299 .. versionchanged:: 8.2
300 Always returns the `<stderr>` stream.
302 .. versionchanged:: 8.0
303 `<stderr>` is opened with ``errors="backslashreplace"``
304 instead of the default ``"strict"``.
306 .. versionchanged:: 4.0
307 Added the ``color`` parameter.
308 """
309 bytes_input = make_input_stream(input, self.charset)
310 echo_input = None
312 old_stdin = sys.stdin
313 old_stdout = sys.stdout
314 old_stderr = sys.stderr
315 old_forced_width = formatting.FORCED_WIDTH
316 formatting.FORCED_WIDTH = 80
318 env = self.make_env(env)
320 stream_mixer = StreamMixer()
322 if self.echo_stdin:
323 bytes_input = echo_input = t.cast(
324 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout)
325 )
327 sys.stdin = text_input = _NamedTextIOWrapper(
328 bytes_input, encoding=self.charset, name="<stdin>", mode="r"
329 )
331 if self.echo_stdin:
332 # Force unbuffered reads, otherwise TextIOWrapper reads a
333 # large chunk which is echoed early.
334 text_input._CHUNK_SIZE = 1 # type: ignore
336 sys.stdout = _NamedTextIOWrapper(
337 stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w"
338 )
340 sys.stderr = _NamedTextIOWrapper(
341 stream_mixer.stderr,
342 encoding=self.charset,
343 name="<stderr>",
344 mode="w",
345 errors="backslashreplace",
346 )
348 @_pause_echo(echo_input) # type: ignore
349 def visible_input(prompt: str | None = None) -> str:
350 sys.stdout.write(prompt or "")
351 val = next(text_input).rstrip("\r\n")
352 sys.stdout.write(f"{val}\n")
353 sys.stdout.flush()
354 return val
356 @_pause_echo(echo_input) # type: ignore
357 def hidden_input(prompt: str | None = None) -> str:
358 sys.stdout.write(f"{prompt or ''}\n")
359 sys.stdout.flush()
360 return next(text_input).rstrip("\r\n")
362 @_pause_echo(echo_input) # type: ignore
363 def _getchar(echo: bool) -> str:
364 char = sys.stdin.read(1)
366 if echo:
367 sys.stdout.write(char)
369 sys.stdout.flush()
370 return char
372 default_color = color
374 def should_strip_ansi(
375 stream: t.IO[t.Any] | None = None, color: bool | None = None
376 ) -> bool:
377 if color is None:
378 return not default_color
379 return not color
381 old_visible_prompt_func = termui.visible_prompt_func
382 old_hidden_prompt_func = termui.hidden_prompt_func
383 old__getchar_func = termui._getchar
384 old_should_strip_ansi = utils.should_strip_ansi # type: ignore
385 old__compat_should_strip_ansi = _compat.should_strip_ansi
386 termui.visible_prompt_func = visible_input
387 termui.hidden_prompt_func = hidden_input
388 termui._getchar = _getchar
389 utils.should_strip_ansi = should_strip_ansi # type: ignore
390 _compat.should_strip_ansi = should_strip_ansi
392 old_env = {}
393 try:
394 for key, value in env.items():
395 old_env[key] = os.environ.get(key)
396 if value is None:
397 try:
398 del os.environ[key]
399 except Exception:
400 pass
401 else:
402 os.environ[key] = value
403 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output)
404 finally:
405 for key, value in old_env.items():
406 if value is None:
407 try:
408 del os.environ[key]
409 except Exception:
410 pass
411 else:
412 os.environ[key] = value
413 sys.stdout = old_stdout
414 sys.stderr = old_stderr
415 sys.stdin = old_stdin
416 termui.visible_prompt_func = old_visible_prompt_func
417 termui.hidden_prompt_func = old_hidden_prompt_func
418 termui._getchar = old__getchar_func
419 utils.should_strip_ansi = old_should_strip_ansi # type: ignore
420 _compat.should_strip_ansi = old__compat_should_strip_ansi
421 formatting.FORCED_WIDTH = old_forced_width
423 def invoke(
424 self,
425 cli: Command,
426 args: str | cabc.Sequence[str] | None = None,
427 input: str | bytes | t.IO[t.Any] | None = None,
428 env: cabc.Mapping[str, str | None] | None = None,
429 catch_exceptions: bool | None = None,
430 color: bool = False,
431 **extra: t.Any,
432 ) -> Result:
433 """Invokes a command in an isolated environment. The arguments are
434 forwarded directly to the command line script, the `extra` keyword
435 arguments are passed to the :meth:`~clickpkg.Command.main` function of
436 the command.
438 This returns a :class:`Result` object.
440 :param cli: the command to invoke
441 :param args: the arguments to invoke. It may be given as an iterable
442 or a string. When given as string it will be interpreted
443 as a Unix shell command. More details at
444 :func:`shlex.split`.
445 :param input: the input data for `sys.stdin`.
446 :param env: the environment overrides.
447 :param catch_exceptions: Whether to catch any other exceptions than
448 ``SystemExit``. If :data:`None`, the value
449 from :class:`CliRunner` is used.
450 :param extra: the keyword arguments to pass to :meth:`main`.
451 :param color: whether the output should contain color codes. The
452 application can still override this explicitly.
454 .. versionadded:: 8.2
455 The result object has the ``output_bytes`` attribute with
456 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would
457 see it in its terminal.
459 .. versionchanged:: 8.2
460 The result object always returns the ``stderr_bytes`` stream.
462 .. versionchanged:: 8.0
463 The result object has the ``return_value`` attribute with
464 the value returned from the invoked command.
466 .. versionchanged:: 4.0
467 Added the ``color`` parameter.
469 .. versionchanged:: 3.0
470 Added the ``catch_exceptions`` parameter.
472 .. versionchanged:: 3.0
473 The result object has the ``exc_info`` attribute with the
474 traceback if available.
475 """
476 exc_info = None
477 if catch_exceptions is None:
478 catch_exceptions = self.catch_exceptions
480 with self.isolation(input=input, env=env, color=color) as outstreams:
481 return_value = None
482 exception: BaseException | None = None
483 exit_code = 0
485 if isinstance(args, str):
486 args = shlex.split(args)
488 try:
489 prog_name = extra.pop("prog_name")
490 except KeyError:
491 prog_name = self.get_default_prog_name(cli)
493 try:
494 return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
495 except SystemExit as e:
496 exc_info = sys.exc_info()
497 e_code = t.cast("int | t.Any | None", e.code)
499 if e_code is None:
500 e_code = 0
502 if e_code != 0:
503 exception = e
505 if not isinstance(e_code, int):
506 sys.stdout.write(str(e_code))
507 sys.stdout.write("\n")
508 e_code = 1
510 exit_code = e_code
512 except Exception as e:
513 if not catch_exceptions:
514 raise
515 exception = e
516 exit_code = 1
517 exc_info = sys.exc_info()
518 finally:
519 sys.stdout.flush()
520 sys.stderr.flush()
521 stdout = outstreams[0].getvalue()
522 stderr = outstreams[1].getvalue()
523 output = outstreams[2].getvalue()
525 return Result(
526 runner=self,
527 stdout_bytes=stdout,
528 stderr_bytes=stderr,
529 output_bytes=output,
530 return_value=return_value,
531 exit_code=exit_code,
532 exception=exception,
533 exc_info=exc_info, # type: ignore
534 )
536 @contextlib.contextmanager
537 def isolated_filesystem(
538 self, temp_dir: str | os.PathLike[str] | None = None
539 ) -> cabc.Iterator[str]:
540 """A context manager that creates a temporary directory and
541 changes the current working directory to it. This isolates tests
542 that affect the contents of the CWD to prevent them from
543 interfering with each other.
545 :param temp_dir: Create the temporary directory under this
546 directory. If given, the created directory is not removed
547 when exiting.
549 .. versionchanged:: 8.0
550 Added the ``temp_dir`` parameter.
551 """
552 cwd = os.getcwd()
553 dt = tempfile.mkdtemp(dir=temp_dir)
554 os.chdir(dt)
556 try:
557 yield dt
558 finally:
559 os.chdir(cwd)
561 if temp_dir is None:
562 try:
563 shutil.rmtree(dt)
564 except OSError:
565 pass