Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

282 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import contextlib 

5import io 

6import os 

7import pdb 

8import shlex 

9import sys 

10import tempfile 

11import typing as t 

12from types import TracebackType 

13 

14from . import _compat 

15from . import formatting 

16from . import termui 

17from . import utils 

18from ._compat import _find_binary_reader 

19 

20if t.TYPE_CHECKING: 

21 from _typeshed import ReadableBuffer 

22 

23 from .core import Command 

24 

25 

26class EchoingStdin: 

27 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: 

28 self._input = input 

29 self._output = output 

30 self._paused = False 

31 

32 def __getattr__(self, x: str) -> t.Any: 

33 return getattr(self._input, x) 

34 

35 def _echo(self, rv: bytes) -> bytes: 

36 if not self._paused: 

37 self._output.write(rv) 

38 

39 return rv 

40 

41 def read(self, n: int = -1) -> bytes: 

42 return self._echo(self._input.read(n)) 

43 

44 def read1(self, n: int = -1) -> bytes: 

45 return self._echo(self._input.read1(n)) # type: ignore 

46 

47 def readline(self, n: int = -1) -> bytes: 

48 return self._echo(self._input.readline(n)) 

49 

50 def readlines(self) -> list[bytes]: 

51 return [self._echo(x) for x in self._input.readlines()] 

52 

53 def __iter__(self) -> cabc.Iterator[bytes]: 

54 return iter(self._echo(x) for x in self._input) 

55 

56 def __repr__(self) -> str: 

57 return repr(self._input) 

58 

59 

60@contextlib.contextmanager 

61def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: 

62 if stream is None: 

63 yield 

64 else: 

65 stream._paused = True 

66 yield 

67 stream._paused = False 

68 

69 

70class BytesIOCopy(io.BytesIO): 

71 """Patch ``io.BytesIO`` to let the written stream be copied to another. 

72 

73 .. versionadded:: 8.2 

74 """ 

75 

76 def __init__(self, copy_to: io.BytesIO) -> None: 

77 super().__init__() 

78 self.copy_to = copy_to 

79 

80 def flush(self) -> None: 

81 super().flush() 

82 self.copy_to.flush() 

83 

84 def write(self, b: ReadableBuffer) -> int: 

85 self.copy_to.write(b) 

86 return super().write(b) 

87 

88 

89class StreamMixer: 

90 """Mixes `<stdout>` and `<stderr>` streams. 

91 

92 The result is available in the ``output`` attribute. 

93 

94 .. versionadded:: 8.2 

95 """ 

96 

97 def __init__(self) -> None: 

98 self.output: io.BytesIO = io.BytesIO() 

99 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) 

100 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) 

101 

102 

103class _NamedTextIOWrapper(io.TextIOWrapper): 

104 """A :class:`~io.TextIOWrapper` with custom ``name`` and ``mode`` 

105 that does not close its underlying buffer. 

106 

107 An optional ``original_fd`` preserves the file descriptor of the 

108 stream being replaced, so that C-level consumers that call 

109 :meth:`fileno` (``faulthandler``, ``subprocess``, ...) still work. 

110 Inspired by pytest's ``capsys``/``capfd`` split: see :doc:`/testing` 

111 for details. 

112 

113 .. versionchanged:: 8.3.3 

114 Added ``original_fd`` parameter and :meth:`fileno` override. 

115 """ 

116 

117 def __init__( 

118 self, 

119 buffer: t.BinaryIO, 

120 name: str, 

121 mode: str, 

122 *, 

123 original_fd: int = -1, 

124 **kwargs: t.Any, 

125 ) -> None: 

126 super().__init__(buffer, **kwargs) 

127 self._name = name 

128 self._mode = mode 

129 self._original_fd = original_fd 

130 

131 def close(self) -> None: 

132 """The buffer this object contains belongs to some other object, 

133 so prevent the default ``__del__`` implementation from closing 

134 that buffer. 

135 

136 .. versionadded:: 8.3.2 

137 """ 

138 

139 def fileno(self) -> int: 

140 """Return the file descriptor of the original stream, if one was 

141 provided at construction time. 

142 

143 This allows C-level consumers (``faulthandler``, ``subprocess``, 

144 signal handlers, ...) to obtain a valid fd without crashing, even 

145 though the Python-level writes are redirected to an in-memory 

146 buffer. 

147 

148 .. versionadded:: 8.3.3 

149 """ 

150 if self._original_fd >= 0: 

151 return self._original_fd 

152 return super().fileno() 

153 

154 @property 

155 def name(self) -> str: 

156 return self._name 

157 

158 @property 

159 def mode(self) -> str: 

160 return self._mode 

161 

162 

163def make_input_stream( 

164 input: str | bytes | t.IO[t.Any] | None, charset: str 

165) -> t.BinaryIO: 

166 # Is already an input stream. 

167 if hasattr(input, "read"): 

168 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) 

169 

170 if rv is not None: 

171 return rv 

172 

173 raise TypeError("Could not find binary reader for input stream.") 

174 

175 if input is None: 

176 input = b"" 

177 elif isinstance(input, str): 

178 input = input.encode(charset) 

179 

180 return io.BytesIO(input) 

181 

182 

183class Result: 

184 """Holds the captured result of an invoked CLI script. 

185 

186 :param runner: The runner that created the result 

187 :param stdout_bytes: The standard output as bytes. 

188 :param stderr_bytes: The standard error as bytes. 

189 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the 

190 user would see it in its terminal. 

191 :param return_value: The value returned from the invoked command. 

192 :param exit_code: The exit code as integer. 

193 :param exception: The exception that happened if one did. 

194 :param exc_info: Exception information (exception type, exception instance, 

195 traceback type). 

196 

197 .. versionchanged:: 8.2 

198 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and 

199 ``mix_stderr`` has been removed. 

200 

201 .. versionadded:: 8.0 

202 Added ``return_value``. 

203 """ 

204 

205 def __init__( 

206 self, 

207 runner: CliRunner, 

208 stdout_bytes: bytes, 

209 stderr_bytes: bytes, 

210 output_bytes: bytes, 

211 return_value: t.Any, 

212 exit_code: int, 

213 exception: BaseException | None, 

214 exc_info: tuple[type[BaseException], BaseException, TracebackType] 

215 | None = None, 

216 ): 

217 self.runner = runner 

218 self.stdout_bytes = stdout_bytes 

219 self.stderr_bytes = stderr_bytes 

220 self.output_bytes = output_bytes 

221 self.return_value = return_value 

222 self.exit_code = exit_code 

223 self.exception = exception 

224 self.exc_info = exc_info 

225 

226 @property 

227 def output(self) -> str: 

228 """The terminal output as unicode string, as the user would see it. 

229 

230 .. versionchanged:: 8.2 

231 No longer a proxy for ``self.stdout``. Now has its own independent stream 

232 that is mixing `<stdout>` and `<stderr>`, in the order they were written. 

233 """ 

234 return self.output_bytes.decode(self.runner.charset, "replace").replace( 

235 "\r\n", "\n" 

236 ) 

237 

238 @property 

239 def stdout(self) -> str: 

240 """The standard output as unicode string.""" 

241 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( 

242 "\r\n", "\n" 

243 ) 

244 

245 @property 

246 def stderr(self) -> str: 

247 """The standard error as unicode string. 

248 

249 .. versionchanged:: 8.2 

250 No longer raise an exception, always returns the `<stderr>` string. 

251 """ 

252 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( 

253 "\r\n", "\n" 

254 ) 

255 

256 def __repr__(self) -> str: 

257 exc_str = repr(self.exception) if self.exception else "okay" 

258 return f"<{type(self).__name__} {exc_str}>" 

259 

260 

261class CliRunner: 

262 """The CLI runner provides functionality to invoke a Click command line 

263 script for unittesting purposes in a isolated environment. This only 

264 works in single-threaded systems without any concurrency as it changes the 

265 global interpreter state. 

266 

267 :param charset: the character set for the input and output data. 

268 :param env: a dictionary with environment variables for overriding. 

269 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes 

270 to `<stdout>`. This is useful for showing examples in 

271 some circumstances. Note that regular prompts 

272 will automatically echo the input. 

273 :param catch_exceptions: Whether to catch any exceptions other than 

274 ``SystemExit`` when running :meth:`~CliRunner.invoke`. 

275 

276 .. versionchanged:: 8.2 

277 Added the ``catch_exceptions`` parameter. 

278 

279 .. versionchanged:: 8.2 

280 ``mix_stderr`` parameter has been removed. 

281 """ 

282 

283 def __init__( 

284 self, 

285 charset: str = "utf-8", 

286 env: cabc.Mapping[str, str | None] | None = None, 

287 echo_stdin: bool = False, 

288 catch_exceptions: bool = True, 

289 ) -> None: 

290 self.charset = charset 

291 self.env: cabc.Mapping[str, str | None] = env or {} 

292 self.echo_stdin = echo_stdin 

293 self.catch_exceptions = catch_exceptions 

294 

295 def get_default_prog_name(self, cli: Command) -> str: 

296 """Given a command object it will return the default program name 

297 for it. The default is the `name` attribute or ``"root"`` if not 

298 set. 

299 """ 

300 return cli.name or "root" 

301 

302 def make_env( 

303 self, overrides: cabc.Mapping[str, str | None] | None = None 

304 ) -> cabc.Mapping[str, str | None]: 

305 """Returns the environment overrides for invoking a script.""" 

306 rv = dict(self.env) 

307 if overrides: 

308 rv.update(overrides) 

309 return rv 

310 

311 @contextlib.contextmanager 

312 def isolation( 

313 self, 

314 input: str | bytes | t.IO[t.Any] | None = None, 

315 env: cabc.Mapping[str, str | None] | None = None, 

316 color: bool = False, 

317 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: 

318 """A context manager that sets up the isolation for invoking of a 

319 command line tool. This sets up `<stdin>` with the given input data 

320 and `os.environ` with the overrides from the given dictionary. 

321 This also rebinds some internals in Click to be mocked (like the 

322 prompt functionality). 

323 

324 This is automatically done in the :meth:`invoke` method. 

325 

326 :param input: the input stream to put into `sys.stdin`. 

327 :param env: the environment overrides as dictionary. 

328 :param color: whether the output should contain color codes. The 

329 application can still override this explicitly. 

330 

331 .. versionadded:: 8.2 

332 An additional output stream is returned, which is a mix of 

333 `<stdout>` and `<stderr>` streams. 

334 

335 .. versionchanged:: 8.2 

336 Always returns the `<stderr>` stream. 

337 

338 .. versionchanged:: 8.0 

339 `<stderr>` is opened with ``errors="backslashreplace"`` 

340 instead of the default ``"strict"``. 

341 

342 .. versionchanged:: 4.0 

343 Added the ``color`` parameter. 

344 """ 

345 bytes_input = make_input_stream(input, self.charset) 

346 echo_input = None 

347 

348 old_stdin = sys.stdin 

349 old_stdout = sys.stdout 

350 old_stderr = sys.stderr 

351 old_forced_width = formatting.FORCED_WIDTH 

352 formatting.FORCED_WIDTH = 80 

353 

354 env = self.make_env(env) 

355 

356 stream_mixer = StreamMixer() 

357 

358 # Preserve the original file descriptors so that C-level 

359 # consumers (faulthandler, subprocess, etc.) can still obtain a 

360 # valid fd from the redirected streams. The original streams 

361 # may themselves lack a fileno() (e.g. when CliRunner is used 

362 # inside pytest's capsys), so we fall back to -1. 

363 def _safe_fileno(stream: t.IO[t.Any]) -> int: 

364 try: 

365 return stream.fileno() 

366 except (AttributeError, io.UnsupportedOperation): 

367 return -1 

368 

369 old_stdout_fd = _safe_fileno(old_stdout) 

370 old_stderr_fd = _safe_fileno(old_stderr) 

371 

372 if self.echo_stdin: 

373 bytes_input = echo_input = t.cast( 

374 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) 

375 ) 

376 

377 sys.stdin = text_input = _NamedTextIOWrapper( 

378 bytes_input, encoding=self.charset, name="<stdin>", mode="r" 

379 ) 

380 

381 if self.echo_stdin: 

382 # Force unbuffered reads, otherwise TextIOWrapper reads a 

383 # large chunk which is echoed early. 

384 text_input._CHUNK_SIZE = 1 # type: ignore 

385 

386 sys.stdout = _NamedTextIOWrapper( 

387 stream_mixer.stdout, 

388 encoding=self.charset, 

389 name="<stdout>", 

390 mode="w", 

391 original_fd=old_stdout_fd, 

392 ) 

393 

394 sys.stderr = _NamedTextIOWrapper( 

395 stream_mixer.stderr, 

396 encoding=self.charset, 

397 name="<stderr>", 

398 mode="w", 

399 errors="backslashreplace", 

400 original_fd=old_stderr_fd, 

401 ) 

402 

403 @_pause_echo(echo_input) # type: ignore 

404 def visible_input(prompt: str | None = None) -> str: 

405 sys.stdout.write(prompt or "") 

406 try: 

407 val = next(text_input).rstrip("\r\n") 

408 except StopIteration as e: 

409 raise EOFError() from e 

410 sys.stdout.write(f"{val}\n") 

411 sys.stdout.flush() 

412 return val 

413 

414 @_pause_echo(echo_input) # type: ignore 

415 def hidden_input(prompt: str | None = None) -> str: 

416 sys.stdout.write(f"{prompt or ''}\n") 

417 sys.stdout.flush() 

418 try: 

419 return next(text_input).rstrip("\r\n") 

420 except StopIteration as e: 

421 raise EOFError() from e 

422 

423 @_pause_echo(echo_input) # type: ignore 

424 def _getchar(echo: bool) -> str: 

425 char = sys.stdin.read(1) 

426 

427 if echo: 

428 sys.stdout.write(char) 

429 

430 sys.stdout.flush() 

431 return char 

432 

433 default_color = color 

434 

435 def should_strip_ansi( 

436 stream: t.IO[t.Any] | None = None, color: bool | None = None 

437 ) -> bool: 

438 if color is None: 

439 return not default_color 

440 return not color 

441 

442 old_visible_prompt_func = termui.visible_prompt_func 

443 old_hidden_prompt_func = termui.hidden_prompt_func 

444 old__getchar_func = termui._getchar 

445 old_should_strip_ansi = utils.should_strip_ansi # type: ignore 

446 old__compat_should_strip_ansi = _compat.should_strip_ansi 

447 old_pdb_init = pdb.Pdb.__init__ 

448 termui.visible_prompt_func = visible_input 

449 termui.hidden_prompt_func = hidden_input 

450 termui._getchar = _getchar 

451 utils.should_strip_ansi = should_strip_ansi # type: ignore 

452 _compat.should_strip_ansi = should_strip_ansi 

453 

454 def _patched_pdb_init( 

455 self: pdb.Pdb, 

456 completekey: str = "tab", 

457 stdin: t.IO[str] | None = None, 

458 stdout: t.IO[str] | None = None, 

459 **kwargs: t.Any, 

460 ) -> None: 

461 """Default ``pdb.Pdb`` to real terminal streams during 

462 ``CliRunner`` isolation. 

463 

464 Without this patch, ``pdb.Pdb.__init__`` inherits from 

465 ``cmd.Cmd`` which falls back to ``sys.stdin``/``sys.stdout`` 

466 when no explicit streams are provided. During isolation 

467 those are ``BytesIO``-backed wrappers, so the debugger 

468 reads from an empty buffer and writes to captured output, 

469 making interactive debugging impossible. 

470 

471 By defaulting to ``sys.__stdin__``/``sys.__stdout__`` (the 

472 original terminal streams Python preserves regardless of 

473 redirection), debuggers can interact with the user while 

474 ``click.echo`` output is still captured normally. 

475 

476 This covers ``pdb.set_trace()``, ``breakpoint()``, 

477 ``pdb.post_mortem()``, and debuggers that subclass 

478 ``pdb.Pdb`` (ipdb, pdbpp). Explicit ``stdin``/``stdout`` 

479 arguments are honored and not overridden. Debuggers that 

480 do not subclass ``pdb.Pdb`` (pudb, debugpy) are not 

481 covered. 

482 """ 

483 if stdin is None: 

484 stdin = sys.__stdin__ 

485 if stdout is None: 

486 stdout = sys.__stdout__ 

487 old_pdb_init( 

488 self, completekey=completekey, stdin=stdin, stdout=stdout, **kwargs 

489 ) 

490 

491 pdb.Pdb.__init__ = _patched_pdb_init # type: ignore[assignment] 

492 

493 old_env = {} 

494 try: 

495 for key, value in env.items(): 

496 old_env[key] = os.environ.get(key) 

497 if value is None: 

498 try: 

499 del os.environ[key] 

500 except Exception: 

501 pass 

502 else: 

503 os.environ[key] = value 

504 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) 

505 finally: 

506 for key, value in old_env.items(): 

507 if value is None: 

508 try: 

509 del os.environ[key] 

510 except Exception: 

511 pass 

512 else: 

513 os.environ[key] = value 

514 sys.stdout = old_stdout 

515 sys.stderr = old_stderr 

516 sys.stdin = old_stdin 

517 termui.visible_prompt_func = old_visible_prompt_func 

518 termui.hidden_prompt_func = old_hidden_prompt_func 

519 termui._getchar = old__getchar_func 

520 utils.should_strip_ansi = old_should_strip_ansi # type: ignore 

521 _compat.should_strip_ansi = old__compat_should_strip_ansi 

522 formatting.FORCED_WIDTH = old_forced_width 

523 pdb.Pdb.__init__ = old_pdb_init # type: ignore[method-assign] 

524 

525 def invoke( 

526 self, 

527 cli: Command, 

528 args: str | cabc.Sequence[str] | None = None, 

529 input: str | bytes | t.IO[t.Any] | None = None, 

530 env: cabc.Mapping[str, str | None] | None = None, 

531 catch_exceptions: bool | None = None, 

532 color: bool = False, 

533 **extra: t.Any, 

534 ) -> Result: 

535 """Invokes a command in an isolated environment. The arguments are 

536 forwarded directly to the command line script, the `extra` keyword 

537 arguments are passed to the :meth:`~clickpkg.Command.main` function of 

538 the command. 

539 

540 This returns a :class:`Result` object. 

541 

542 :param cli: the command to invoke 

543 :param args: the arguments to invoke. It may be given as an iterable 

544 or a string. When given as string it will be interpreted 

545 as a Unix shell command. More details at 

546 :func:`shlex.split`. 

547 :param input: the input data for `sys.stdin`. 

548 :param env: the environment overrides. 

549 :param catch_exceptions: Whether to catch any other exceptions than 

550 ``SystemExit``. If :data:`None`, the value 

551 from :class:`CliRunner` is used. 

552 :param extra: the keyword arguments to pass to :meth:`main`. 

553 :param color: whether the output should contain color codes. The 

554 application can still override this explicitly. 

555 

556 .. versionadded:: 8.2 

557 The result object has the ``output_bytes`` attribute with 

558 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would 

559 see it in its terminal. 

560 

561 .. versionchanged:: 8.2 

562 The result object always returns the ``stderr_bytes`` stream. 

563 

564 .. versionchanged:: 8.0 

565 The result object has the ``return_value`` attribute with 

566 the value returned from the invoked command. 

567 

568 .. versionchanged:: 4.0 

569 Added the ``color`` parameter. 

570 

571 .. versionchanged:: 3.0 

572 Added the ``catch_exceptions`` parameter. 

573 

574 .. versionchanged:: 3.0 

575 The result object has the ``exc_info`` attribute with the 

576 traceback if available. 

577 """ 

578 exc_info = None 

579 if catch_exceptions is None: 

580 catch_exceptions = self.catch_exceptions 

581 

582 with self.isolation(input=input, env=env, color=color) as outstreams: 

583 return_value = None 

584 exception: BaseException | None = None 

585 exit_code = 0 

586 

587 if isinstance(args, str): 

588 args = shlex.split(args) 

589 

590 try: 

591 prog_name = extra.pop("prog_name") 

592 except KeyError: 

593 prog_name = self.get_default_prog_name(cli) 

594 

595 try: 

596 return_value = cli.main(args=args or (), prog_name=prog_name, **extra) 

597 except SystemExit as e: 

598 exc_info = sys.exc_info() 

599 e_code = t.cast("int | t.Any | None", e.code) 

600 

601 if e_code is None: 

602 e_code = 0 

603 

604 if e_code != 0: 

605 exception = e 

606 

607 if not isinstance(e_code, int): 

608 sys.stdout.write(str(e_code)) 

609 sys.stdout.write("\n") 

610 e_code = 1 

611 

612 exit_code = e_code 

613 

614 except Exception as e: 

615 if not catch_exceptions: 

616 raise 

617 exception = e 

618 exit_code = 1 

619 exc_info = sys.exc_info() 

620 finally: 

621 sys.stdout.flush() 

622 sys.stderr.flush() 

623 stdout = outstreams[0].getvalue() 

624 stderr = outstreams[1].getvalue() 

625 output = outstreams[2].getvalue() 

626 

627 return Result( 

628 runner=self, 

629 stdout_bytes=stdout, 

630 stderr_bytes=stderr, 

631 output_bytes=output, 

632 return_value=return_value, 

633 exit_code=exit_code, 

634 exception=exception, 

635 exc_info=exc_info, # type: ignore 

636 ) 

637 

638 @contextlib.contextmanager 

639 def isolated_filesystem( 

640 self, temp_dir: str | os.PathLike[str] | None = None 

641 ) -> cabc.Iterator[str]: 

642 """A context manager that creates a temporary directory and 

643 changes the current working directory to it. This isolates tests 

644 that affect the contents of the CWD to prevent them from 

645 interfering with each other. 

646 

647 :param temp_dir: Create the temporary directory under this 

648 directory. If given, the created directory is not removed 

649 when exiting. 

650 

651 .. versionchanged:: 8.0 

652 Added the ``temp_dir`` parameter. 

653 """ 

654 cwd = os.getcwd() 

655 dt = tempfile.mkdtemp(dir=temp_dir) 

656 os.chdir(dt) 

657 

658 try: 

659 yield dt 

660 finally: 

661 os.chdir(cwd) 

662 

663 if temp_dir is None: 

664 import shutil 

665 

666 try: 

667 shutil.rmtree(dt) 

668 except OSError: 

669 pass