Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

263 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import contextlib 

5import io 

6import os 

7import shlex 

8import sys 

9import tempfile 

10import typing as t 

11from types import TracebackType 

12 

13from . import _compat 

14from . import formatting 

15from . import termui 

16from . import utils 

17from ._compat import _find_binary_reader 

18 

19if t.TYPE_CHECKING: 

20 from _typeshed import ReadableBuffer 

21 

22 from .core import Command 

23 

24 

25class EchoingStdin: 

26 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: 

27 self._input = input 

28 self._output = output 

29 self._paused = False 

30 

31 def __getattr__(self, x: str) -> t.Any: 

32 return getattr(self._input, x) 

33 

34 def _echo(self, rv: bytes) -> bytes: 

35 if not self._paused: 

36 self._output.write(rv) 

37 

38 return rv 

39 

40 def read(self, n: int = -1) -> bytes: 

41 return self._echo(self._input.read(n)) 

42 

43 def read1(self, n: int = -1) -> bytes: 

44 return self._echo(self._input.read1(n)) # type: ignore 

45 

46 def readline(self, n: int = -1) -> bytes: 

47 return self._echo(self._input.readline(n)) 

48 

49 def readlines(self) -> list[bytes]: 

50 return [self._echo(x) for x in self._input.readlines()] 

51 

52 def __iter__(self) -> cabc.Iterator[bytes]: 

53 return iter(self._echo(x) for x in self._input) 

54 

55 def __repr__(self) -> str: 

56 return repr(self._input) 

57 

58 

59@contextlib.contextmanager 

60def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: 

61 if stream is None: 

62 yield 

63 else: 

64 stream._paused = True 

65 yield 

66 stream._paused = False 

67 

68 

69class BytesIOCopy(io.BytesIO): 

70 """Patch ``io.BytesIO`` to let the written stream be copied to another. 

71 

72 .. versionadded:: 8.2 

73 """ 

74 

75 def __init__(self, copy_to: io.BytesIO) -> None: 

76 super().__init__() 

77 self.copy_to = copy_to 

78 

79 def flush(self) -> None: 

80 super().flush() 

81 self.copy_to.flush() 

82 

83 def write(self, b: ReadableBuffer) -> int: 

84 self.copy_to.write(b) 

85 return super().write(b) 

86 

87 

88class StreamMixer: 

89 """Mixes `<stdout>` and `<stderr>` streams. 

90 

91 The result is available in the ``output`` attribute. 

92 

93 .. versionadded:: 8.2 

94 """ 

95 

96 def __init__(self) -> None: 

97 self.output: io.BytesIO = io.BytesIO() 

98 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) 

99 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) 

100 

101 def __del__(self) -> None: 

102 """ 

103 Guarantee that embedded file-like objects are closed in a 

104 predictable order, protecting against races between 

105 self.output being closed and other streams being flushed on close 

106 

107 .. versionadded:: 8.2.2 

108 """ 

109 self.stderr.close() 

110 self.stdout.close() 

111 self.output.close() 

112 

113 

114class _NamedTextIOWrapper(io.TextIOWrapper): 

115 def __init__( 

116 self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any 

117 ) -> None: 

118 super().__init__(buffer, **kwargs) 

119 self._name = name 

120 self._mode = mode 

121 

122 @property 

123 def name(self) -> str: 

124 return self._name 

125 

126 @property 

127 def mode(self) -> str: 

128 return self._mode 

129 

130 

131def make_input_stream( 

132 input: str | bytes | t.IO[t.Any] | None, charset: str 

133) -> t.BinaryIO: 

134 # Is already an input stream. 

135 if hasattr(input, "read"): 

136 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) 

137 

138 if rv is not None: 

139 return rv 

140 

141 raise TypeError("Could not find binary reader for input stream.") 

142 

143 if input is None: 

144 input = b"" 

145 elif isinstance(input, str): 

146 input = input.encode(charset) 

147 

148 return io.BytesIO(input) 

149 

150 

151class Result: 

152 """Holds the captured result of an invoked CLI script. 

153 

154 :param runner: The runner that created the result 

155 :param stdout_bytes: The standard output as bytes. 

156 :param stderr_bytes: The standard error as bytes. 

157 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the 

158 user would see it in its terminal. 

159 :param return_value: The value returned from the invoked command. 

160 :param exit_code: The exit code as integer. 

161 :param exception: The exception that happened if one did. 

162 :param exc_info: Exception information (exception type, exception instance, 

163 traceback type). 

164 

165 .. versionchanged:: 8.2 

166 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and 

167 ``mix_stderr`` has been removed. 

168 

169 .. versionadded:: 8.0 

170 Added ``return_value``. 

171 """ 

172 

173 def __init__( 

174 self, 

175 runner: CliRunner, 

176 stdout_bytes: bytes, 

177 stderr_bytes: bytes, 

178 output_bytes: bytes, 

179 return_value: t.Any, 

180 exit_code: int, 

181 exception: BaseException | None, 

182 exc_info: tuple[type[BaseException], BaseException, TracebackType] 

183 | None = None, 

184 ): 

185 self.runner = runner 

186 self.stdout_bytes = stdout_bytes 

187 self.stderr_bytes = stderr_bytes 

188 self.output_bytes = output_bytes 

189 self.return_value = return_value 

190 self.exit_code = exit_code 

191 self.exception = exception 

192 self.exc_info = exc_info 

193 

194 @property 

195 def output(self) -> str: 

196 """The terminal output as unicode string, as the user would see it. 

197 

198 .. versionchanged:: 8.2 

199 No longer a proxy for ``self.stdout``. Now has its own independent stream 

200 that is mixing `<stdout>` and `<stderr>`, in the order they were written. 

201 """ 

202 return self.output_bytes.decode(self.runner.charset, "replace").replace( 

203 "\r\n", "\n" 

204 ) 

205 

206 @property 

207 def stdout(self) -> str: 

208 """The standard output as unicode string.""" 

209 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( 

210 "\r\n", "\n" 

211 ) 

212 

213 @property 

214 def stderr(self) -> str: 

215 """The standard error as unicode string. 

216 

217 .. versionchanged:: 8.2 

218 No longer raise an exception, always returns the `<stderr>` string. 

219 """ 

220 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( 

221 "\r\n", "\n" 

222 ) 

223 

224 def __repr__(self) -> str: 

225 exc_str = repr(self.exception) if self.exception else "okay" 

226 return f"<{type(self).__name__} {exc_str}>" 

227 

228 

229class CliRunner: 

230 """The CLI runner provides functionality to invoke a Click command line 

231 script for unittesting purposes in a isolated environment. This only 

232 works in single-threaded systems without any concurrency as it changes the 

233 global interpreter state. 

234 

235 :param charset: the character set for the input and output data. 

236 :param env: a dictionary with environment variables for overriding. 

237 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes 

238 to `<stdout>`. This is useful for showing examples in 

239 some circumstances. Note that regular prompts 

240 will automatically echo the input. 

241 :param catch_exceptions: Whether to catch any exceptions other than 

242 ``SystemExit`` when running :meth:`~CliRunner.invoke`. 

243 

244 .. versionchanged:: 8.2 

245 Added the ``catch_exceptions`` parameter. 

246 

247 .. versionchanged:: 8.2 

248 ``mix_stderr`` parameter has been removed. 

249 """ 

250 

251 def __init__( 

252 self, 

253 charset: str = "utf-8", 

254 env: cabc.Mapping[str, str | None] | None = None, 

255 echo_stdin: bool = False, 

256 catch_exceptions: bool = True, 

257 ) -> None: 

258 self.charset = charset 

259 self.env: cabc.Mapping[str, str | None] = env or {} 

260 self.echo_stdin = echo_stdin 

261 self.catch_exceptions = catch_exceptions 

262 

263 def get_default_prog_name(self, cli: Command) -> str: 

264 """Given a command object it will return the default program name 

265 for it. The default is the `name` attribute or ``"root"`` if not 

266 set. 

267 """ 

268 return cli.name or "root" 

269 

270 def make_env( 

271 self, overrides: cabc.Mapping[str, str | None] | None = None 

272 ) -> cabc.Mapping[str, str | None]: 

273 """Returns the environment overrides for invoking a script.""" 

274 rv = dict(self.env) 

275 if overrides: 

276 rv.update(overrides) 

277 return rv 

278 

279 @contextlib.contextmanager 

280 def isolation( 

281 self, 

282 input: str | bytes | t.IO[t.Any] | None = None, 

283 env: cabc.Mapping[str, str | None] | None = None, 

284 color: bool = False, 

285 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: 

286 """A context manager that sets up the isolation for invoking of a 

287 command line tool. This sets up `<stdin>` with the given input data 

288 and `os.environ` with the overrides from the given dictionary. 

289 This also rebinds some internals in Click to be mocked (like the 

290 prompt functionality). 

291 

292 This is automatically done in the :meth:`invoke` method. 

293 

294 :param input: the input stream to put into `sys.stdin`. 

295 :param env: the environment overrides as dictionary. 

296 :param color: whether the output should contain color codes. The 

297 application can still override this explicitly. 

298 

299 .. versionadded:: 8.2 

300 An additional output stream is returned, which is a mix of 

301 `<stdout>` and `<stderr>` streams. 

302 

303 .. versionchanged:: 8.2 

304 Always returns the `<stderr>` stream. 

305 

306 .. versionchanged:: 8.0 

307 `<stderr>` is opened with ``errors="backslashreplace"`` 

308 instead of the default ``"strict"``. 

309 

310 .. versionchanged:: 4.0 

311 Added the ``color`` parameter. 

312 """ 

313 bytes_input = make_input_stream(input, self.charset) 

314 echo_input = None 

315 

316 old_stdin = sys.stdin 

317 old_stdout = sys.stdout 

318 old_stderr = sys.stderr 

319 old_forced_width = formatting.FORCED_WIDTH 

320 formatting.FORCED_WIDTH = 80 

321 

322 env = self.make_env(env) 

323 

324 stream_mixer = StreamMixer() 

325 

326 if self.echo_stdin: 

327 bytes_input = echo_input = t.cast( 

328 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) 

329 ) 

330 

331 sys.stdin = text_input = _NamedTextIOWrapper( 

332 bytes_input, encoding=self.charset, name="<stdin>", mode="r" 

333 ) 

334 

335 if self.echo_stdin: 

336 # Force unbuffered reads, otherwise TextIOWrapper reads a 

337 # large chunk which is echoed early. 

338 text_input._CHUNK_SIZE = 1 # type: ignore 

339 

340 sys.stdout = _NamedTextIOWrapper( 

341 stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w" 

342 ) 

343 

344 sys.stderr = _NamedTextIOWrapper( 

345 stream_mixer.stderr, 

346 encoding=self.charset, 

347 name="<stderr>", 

348 mode="w", 

349 errors="backslashreplace", 

350 ) 

351 

352 @_pause_echo(echo_input) # type: ignore 

353 def visible_input(prompt: str | None = None) -> str: 

354 sys.stdout.write(prompt or "") 

355 try: 

356 val = next(text_input).rstrip("\r\n") 

357 except StopIteration as e: 

358 raise EOFError() from e 

359 sys.stdout.write(f"{val}\n") 

360 sys.stdout.flush() 

361 return val 

362 

363 @_pause_echo(echo_input) # type: ignore 

364 def hidden_input(prompt: str | None = None) -> str: 

365 sys.stdout.write(f"{prompt or ''}\n") 

366 sys.stdout.flush() 

367 try: 

368 return next(text_input).rstrip("\r\n") 

369 except StopIteration as e: 

370 raise EOFError() from e 

371 

372 @_pause_echo(echo_input) # type: ignore 

373 def _getchar(echo: bool) -> str: 

374 char = sys.stdin.read(1) 

375 

376 if echo: 

377 sys.stdout.write(char) 

378 

379 sys.stdout.flush() 

380 return char 

381 

382 default_color = color 

383 

384 def should_strip_ansi( 

385 stream: t.IO[t.Any] | None = None, color: bool | None = None 

386 ) -> bool: 

387 if color is None: 

388 return not default_color 

389 return not color 

390 

391 old_visible_prompt_func = termui.visible_prompt_func 

392 old_hidden_prompt_func = termui.hidden_prompt_func 

393 old__getchar_func = termui._getchar 

394 old_should_strip_ansi = utils.should_strip_ansi # type: ignore 

395 old__compat_should_strip_ansi = _compat.should_strip_ansi 

396 termui.visible_prompt_func = visible_input 

397 termui.hidden_prompt_func = hidden_input 

398 termui._getchar = _getchar 

399 utils.should_strip_ansi = should_strip_ansi # type: ignore 

400 _compat.should_strip_ansi = should_strip_ansi 

401 

402 old_env = {} 

403 try: 

404 for key, value in env.items(): 

405 old_env[key] = os.environ.get(key) 

406 if value is None: 

407 try: 

408 del os.environ[key] 

409 except Exception: 

410 pass 

411 else: 

412 os.environ[key] = value 

413 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) 

414 finally: 

415 for key, value in old_env.items(): 

416 if value is None: 

417 try: 

418 del os.environ[key] 

419 except Exception: 

420 pass 

421 else: 

422 os.environ[key] = value 

423 sys.stdout = old_stdout 

424 sys.stderr = old_stderr 

425 sys.stdin = old_stdin 

426 termui.visible_prompt_func = old_visible_prompt_func 

427 termui.hidden_prompt_func = old_hidden_prompt_func 

428 termui._getchar = old__getchar_func 

429 utils.should_strip_ansi = old_should_strip_ansi # type: ignore 

430 _compat.should_strip_ansi = old__compat_should_strip_ansi 

431 formatting.FORCED_WIDTH = old_forced_width 

432 

433 def invoke( 

434 self, 

435 cli: Command, 

436 args: str | cabc.Sequence[str] | None = None, 

437 input: str | bytes | t.IO[t.Any] | None = None, 

438 env: cabc.Mapping[str, str | None] | None = None, 

439 catch_exceptions: bool | None = None, 

440 color: bool = False, 

441 **extra: t.Any, 

442 ) -> Result: 

443 """Invokes a command in an isolated environment. The arguments are 

444 forwarded directly to the command line script, the `extra` keyword 

445 arguments are passed to the :meth:`~clickpkg.Command.main` function of 

446 the command. 

447 

448 This returns a :class:`Result` object. 

449 

450 :param cli: the command to invoke 

451 :param args: the arguments to invoke. It may be given as an iterable 

452 or a string. When given as string it will be interpreted 

453 as a Unix shell command. More details at 

454 :func:`shlex.split`. 

455 :param input: the input data for `sys.stdin`. 

456 :param env: the environment overrides. 

457 :param catch_exceptions: Whether to catch any other exceptions than 

458 ``SystemExit``. If :data:`None`, the value 

459 from :class:`CliRunner` is used. 

460 :param extra: the keyword arguments to pass to :meth:`main`. 

461 :param color: whether the output should contain color codes. The 

462 application can still override this explicitly. 

463 

464 .. versionadded:: 8.2 

465 The result object has the ``output_bytes`` attribute with 

466 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would 

467 see it in its terminal. 

468 

469 .. versionchanged:: 8.2 

470 The result object always returns the ``stderr_bytes`` stream. 

471 

472 .. versionchanged:: 8.0 

473 The result object has the ``return_value`` attribute with 

474 the value returned from the invoked command. 

475 

476 .. versionchanged:: 4.0 

477 Added the ``color`` parameter. 

478 

479 .. versionchanged:: 3.0 

480 Added the ``catch_exceptions`` parameter. 

481 

482 .. versionchanged:: 3.0 

483 The result object has the ``exc_info`` attribute with the 

484 traceback if available. 

485 """ 

486 exc_info = None 

487 if catch_exceptions is None: 

488 catch_exceptions = self.catch_exceptions 

489 

490 with self.isolation(input=input, env=env, color=color) as outstreams: 

491 return_value = None 

492 exception: BaseException | None = None 

493 exit_code = 0 

494 

495 if isinstance(args, str): 

496 args = shlex.split(args) 

497 

498 try: 

499 prog_name = extra.pop("prog_name") 

500 except KeyError: 

501 prog_name = self.get_default_prog_name(cli) 

502 

503 try: 

504 return_value = cli.main(args=args or (), prog_name=prog_name, **extra) 

505 except SystemExit as e: 

506 exc_info = sys.exc_info() 

507 e_code = t.cast("int | t.Any | None", e.code) 

508 

509 if e_code is None: 

510 e_code = 0 

511 

512 if e_code != 0: 

513 exception = e 

514 

515 if not isinstance(e_code, int): 

516 sys.stdout.write(str(e_code)) 

517 sys.stdout.write("\n") 

518 e_code = 1 

519 

520 exit_code = e_code 

521 

522 except Exception as e: 

523 if not catch_exceptions: 

524 raise 

525 exception = e 

526 exit_code = 1 

527 exc_info = sys.exc_info() 

528 finally: 

529 sys.stdout.flush() 

530 sys.stderr.flush() 

531 stdout = outstreams[0].getvalue() 

532 stderr = outstreams[1].getvalue() 

533 output = outstreams[2].getvalue() 

534 

535 return Result( 

536 runner=self, 

537 stdout_bytes=stdout, 

538 stderr_bytes=stderr, 

539 output_bytes=output, 

540 return_value=return_value, 

541 exit_code=exit_code, 

542 exception=exception, 

543 exc_info=exc_info, # type: ignore 

544 ) 

545 

546 @contextlib.contextmanager 

547 def isolated_filesystem( 

548 self, temp_dir: str | os.PathLike[str] | None = None 

549 ) -> cabc.Iterator[str]: 

550 """A context manager that creates a temporary directory and 

551 changes the current working directory to it. This isolates tests 

552 that affect the contents of the CWD to prevent them from 

553 interfering with each other. 

554 

555 :param temp_dir: Create the temporary directory under this 

556 directory. If given, the created directory is not removed 

557 when exiting. 

558 

559 .. versionchanged:: 8.0 

560 Added the ``temp_dir`` parameter. 

561 """ 

562 cwd = os.getcwd() 

563 dt = tempfile.mkdtemp(dir=temp_dir) 

564 os.chdir(dt) 

565 

566 try: 

567 yield dt 

568 finally: 

569 os.chdir(cwd) 

570 

571 if temp_dir is None: 

572 import shutil 

573 

574 try: 

575 shutil.rmtree(dt) 

576 except OSError: 

577 pass