Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/testing.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

259 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import contextlib 

5import io 

6import os 

7import shlex 

8import shutil 

9import sys 

10import tempfile 

11import typing as t 

12from types import TracebackType 

13 

14from . import _compat 

15from . import formatting 

16from . import termui 

17from . import utils 

18from ._compat import _find_binary_reader 

19 

20if t.TYPE_CHECKING: 

21 from _typeshed import ReadableBuffer 

22 

23 from .core import Command 

24 

25 

26class EchoingStdin: 

27 def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: 

28 self._input = input 

29 self._output = output 

30 self._paused = False 

31 

32 def __getattr__(self, x: str) -> t.Any: 

33 return getattr(self._input, x) 

34 

35 def _echo(self, rv: bytes) -> bytes: 

36 if not self._paused: 

37 self._output.write(rv) 

38 

39 return rv 

40 

41 def read(self, n: int = -1) -> bytes: 

42 return self._echo(self._input.read(n)) 

43 

44 def read1(self, n: int = -1) -> bytes: 

45 return self._echo(self._input.read1(n)) # type: ignore 

46 

47 def readline(self, n: int = -1) -> bytes: 

48 return self._echo(self._input.readline(n)) 

49 

50 def readlines(self) -> list[bytes]: 

51 return [self._echo(x) for x in self._input.readlines()] 

52 

53 def __iter__(self) -> cabc.Iterator[bytes]: 

54 return iter(self._echo(x) for x in self._input) 

55 

56 def __repr__(self) -> str: 

57 return repr(self._input) 

58 

59 

60@contextlib.contextmanager 

61def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: 

62 if stream is None: 

63 yield 

64 else: 

65 stream._paused = True 

66 yield 

67 stream._paused = False 

68 

69 

70class BytesIOCopy(io.BytesIO): 

71 """Patch ``io.BytesIO`` to let the written stream be copied to another. 

72 

73 .. versionadded:: 8.2 

74 """ 

75 

76 def __init__(self, copy_to: io.BytesIO) -> None: 

77 super().__init__() 

78 self.copy_to = copy_to 

79 

80 def flush(self) -> None: 

81 super().flush() 

82 self.copy_to.flush() 

83 

84 def write(self, b: ReadableBuffer) -> int: 

85 self.copy_to.write(b) 

86 return super().write(b) 

87 

88 

89class StreamMixer: 

90 """Mixes `<stdout>` and `<stderr>` streams. 

91 

92 The result is available in the ``output`` attribute. 

93 

94 .. versionadded:: 8.2 

95 """ 

96 

97 def __init__(self) -> None: 

98 self.output: io.BytesIO = io.BytesIO() 

99 self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) 

100 self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) 

101 

102 

103class _NamedTextIOWrapper(io.TextIOWrapper): 

104 def __init__( 

105 self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any 

106 ) -> None: 

107 super().__init__(buffer, **kwargs) 

108 self._name = name 

109 self._mode = mode 

110 

111 @property 

112 def name(self) -> str: 

113 return self._name 

114 

115 @property 

116 def mode(self) -> str: 

117 return self._mode 

118 

119 def __next__(self) -> str: # type: ignore 

120 try: 

121 line = super().__next__() 

122 except StopIteration as e: 

123 raise EOFError() from e 

124 return line 

125 

126 

127def make_input_stream( 

128 input: str | bytes | t.IO[t.Any] | None, charset: str 

129) -> t.BinaryIO: 

130 # Is already an input stream. 

131 if hasattr(input, "read"): 

132 rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) 

133 

134 if rv is not None: 

135 return rv 

136 

137 raise TypeError("Could not find binary reader for input stream.") 

138 

139 if input is None: 

140 input = b"" 

141 elif isinstance(input, str): 

142 input = input.encode(charset) 

143 

144 return io.BytesIO(input) 

145 

146 

147class Result: 

148 """Holds the captured result of an invoked CLI script. 

149 

150 :param runner: The runner that created the result 

151 :param stdout_bytes: The standard output as bytes. 

152 :param stderr_bytes: The standard error as bytes. 

153 :param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the 

154 user would see it in its terminal. 

155 :param return_value: The value returned from the invoked command. 

156 :param exit_code: The exit code as integer. 

157 :param exception: The exception that happened if one did. 

158 :param exc_info: Exception information (exception type, exception instance, 

159 traceback type). 

160 

161 .. versionchanged:: 8.2 

162 ``stderr_bytes`` no longer optional, ``output_bytes`` introduced and 

163 ``mix_stderr`` has been removed. 

164 

165 .. versionadded:: 8.0 

166 Added ``return_value``. 

167 """ 

168 

169 def __init__( 

170 self, 

171 runner: CliRunner, 

172 stdout_bytes: bytes, 

173 stderr_bytes: bytes, 

174 output_bytes: bytes, 

175 return_value: t.Any, 

176 exit_code: int, 

177 exception: BaseException | None, 

178 exc_info: tuple[type[BaseException], BaseException, TracebackType] 

179 | None = None, 

180 ): 

181 self.runner = runner 

182 self.stdout_bytes = stdout_bytes 

183 self.stderr_bytes = stderr_bytes 

184 self.output_bytes = output_bytes 

185 self.return_value = return_value 

186 self.exit_code = exit_code 

187 self.exception = exception 

188 self.exc_info = exc_info 

189 

190 @property 

191 def output(self) -> str: 

192 """The terminal output as unicode string, as the user would see it. 

193 

194 .. versionchanged:: 8.2 

195 No longer a proxy for ``self.stdout``. Now has its own independent stream 

196 that is mixing `<stdout>` and `<stderr>`, in the order they were written. 

197 """ 

198 return self.output_bytes.decode(self.runner.charset, "replace").replace( 

199 "\r\n", "\n" 

200 ) 

201 

202 @property 

203 def stdout(self) -> str: 

204 """The standard output as unicode string.""" 

205 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( 

206 "\r\n", "\n" 

207 ) 

208 

209 @property 

210 def stderr(self) -> str: 

211 """The standard error as unicode string. 

212 

213 .. versionchanged:: 8.2 

214 No longer raise an exception, always returns the `<stderr>` string. 

215 """ 

216 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( 

217 "\r\n", "\n" 

218 ) 

219 

220 def __repr__(self) -> str: 

221 exc_str = repr(self.exception) if self.exception else "okay" 

222 return f"<{type(self).__name__} {exc_str}>" 

223 

224 

225class CliRunner: 

226 """The CLI runner provides functionality to invoke a Click command line 

227 script for unittesting purposes in a isolated environment. This only 

228 works in single-threaded systems without any concurrency as it changes the 

229 global interpreter state. 

230 

231 :param charset: the character set for the input and output data. 

232 :param env: a dictionary with environment variables for overriding. 

233 :param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes 

234 to `<stdout>`. This is useful for showing examples in 

235 some circumstances. Note that regular prompts 

236 will automatically echo the input. 

237 :param catch_exceptions: Whether to catch any exceptions other than 

238 ``SystemExit`` when running :meth:`~CliRunner.invoke`. 

239 

240 .. versionchanged:: 8.2 

241 Added the ``catch_exceptions`` parameter. 

242 

243 .. versionchanged:: 8.2 

244 ``mix_stderr`` parameter has been removed. 

245 """ 

246 

247 def __init__( 

248 self, 

249 charset: str = "utf-8", 

250 env: cabc.Mapping[str, str | None] | None = None, 

251 echo_stdin: bool = False, 

252 catch_exceptions: bool = True, 

253 ) -> None: 

254 self.charset = charset 

255 self.env: cabc.Mapping[str, str | None] = env or {} 

256 self.echo_stdin = echo_stdin 

257 self.catch_exceptions = catch_exceptions 

258 

259 def get_default_prog_name(self, cli: Command) -> str: 

260 """Given a command object it will return the default program name 

261 for it. The default is the `name` attribute or ``"root"`` if not 

262 set. 

263 """ 

264 return cli.name or "root" 

265 

266 def make_env( 

267 self, overrides: cabc.Mapping[str, str | None] | None = None 

268 ) -> cabc.Mapping[str, str | None]: 

269 """Returns the environment overrides for invoking a script.""" 

270 rv = dict(self.env) 

271 if overrides: 

272 rv.update(overrides) 

273 return rv 

274 

275 @contextlib.contextmanager 

276 def isolation( 

277 self, 

278 input: str | bytes | t.IO[t.Any] | None = None, 

279 env: cabc.Mapping[str, str | None] | None = None, 

280 color: bool = False, 

281 ) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: 

282 """A context manager that sets up the isolation for invoking of a 

283 command line tool. This sets up `<stdin>` with the given input data 

284 and `os.environ` with the overrides from the given dictionary. 

285 This also rebinds some internals in Click to be mocked (like the 

286 prompt functionality). 

287 

288 This is automatically done in the :meth:`invoke` method. 

289 

290 :param input: the input stream to put into `sys.stdin`. 

291 :param env: the environment overrides as dictionary. 

292 :param color: whether the output should contain color codes. The 

293 application can still override this explicitly. 

294 

295 .. versionadded:: 8.2 

296 An additional output stream is returned, which is a mix of 

297 `<stdout>` and `<stderr>` streams. 

298 

299 .. versionchanged:: 8.2 

300 Always returns the `<stderr>` stream. 

301 

302 .. versionchanged:: 8.0 

303 `<stderr>` is opened with ``errors="backslashreplace"`` 

304 instead of the default ``"strict"``. 

305 

306 .. versionchanged:: 4.0 

307 Added the ``color`` parameter. 

308 """ 

309 bytes_input = make_input_stream(input, self.charset) 

310 echo_input = None 

311 

312 old_stdin = sys.stdin 

313 old_stdout = sys.stdout 

314 old_stderr = sys.stderr 

315 old_forced_width = formatting.FORCED_WIDTH 

316 formatting.FORCED_WIDTH = 80 

317 

318 env = self.make_env(env) 

319 

320 stream_mixer = StreamMixer() 

321 

322 if self.echo_stdin: 

323 bytes_input = echo_input = t.cast( 

324 t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) 

325 ) 

326 

327 sys.stdin = text_input = _NamedTextIOWrapper( 

328 bytes_input, encoding=self.charset, name="<stdin>", mode="r" 

329 ) 

330 

331 if self.echo_stdin: 

332 # Force unbuffered reads, otherwise TextIOWrapper reads a 

333 # large chunk which is echoed early. 

334 text_input._CHUNK_SIZE = 1 # type: ignore 

335 

336 sys.stdout = _NamedTextIOWrapper( 

337 stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w" 

338 ) 

339 

340 sys.stderr = _NamedTextIOWrapper( 

341 stream_mixer.stderr, 

342 encoding=self.charset, 

343 name="<stderr>", 

344 mode="w", 

345 errors="backslashreplace", 

346 ) 

347 

348 @_pause_echo(echo_input) # type: ignore 

349 def visible_input(prompt: str | None = None) -> str: 

350 sys.stdout.write(prompt or "") 

351 val = next(text_input).rstrip("\r\n") 

352 sys.stdout.write(f"{val}\n") 

353 sys.stdout.flush() 

354 return val 

355 

356 @_pause_echo(echo_input) # type: ignore 

357 def hidden_input(prompt: str | None = None) -> str: 

358 sys.stdout.write(f"{prompt or ''}\n") 

359 sys.stdout.flush() 

360 return next(text_input).rstrip("\r\n") 

361 

362 @_pause_echo(echo_input) # type: ignore 

363 def _getchar(echo: bool) -> str: 

364 char = sys.stdin.read(1) 

365 

366 if echo: 

367 sys.stdout.write(char) 

368 

369 sys.stdout.flush() 

370 return char 

371 

372 default_color = color 

373 

374 def should_strip_ansi( 

375 stream: t.IO[t.Any] | None = None, color: bool | None = None 

376 ) -> bool: 

377 if color is None: 

378 return not default_color 

379 return not color 

380 

381 old_visible_prompt_func = termui.visible_prompt_func 

382 old_hidden_prompt_func = termui.hidden_prompt_func 

383 old__getchar_func = termui._getchar 

384 old_should_strip_ansi = utils.should_strip_ansi # type: ignore 

385 old__compat_should_strip_ansi = _compat.should_strip_ansi 

386 termui.visible_prompt_func = visible_input 

387 termui.hidden_prompt_func = hidden_input 

388 termui._getchar = _getchar 

389 utils.should_strip_ansi = should_strip_ansi # type: ignore 

390 _compat.should_strip_ansi = should_strip_ansi 

391 

392 old_env = {} 

393 try: 

394 for key, value in env.items(): 

395 old_env[key] = os.environ.get(key) 

396 if value is None: 

397 try: 

398 del os.environ[key] 

399 except Exception: 

400 pass 

401 else: 

402 os.environ[key] = value 

403 yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) 

404 finally: 

405 for key, value in old_env.items(): 

406 if value is None: 

407 try: 

408 del os.environ[key] 

409 except Exception: 

410 pass 

411 else: 

412 os.environ[key] = value 

413 sys.stdout = old_stdout 

414 sys.stderr = old_stderr 

415 sys.stdin = old_stdin 

416 termui.visible_prompt_func = old_visible_prompt_func 

417 termui.hidden_prompt_func = old_hidden_prompt_func 

418 termui._getchar = old__getchar_func 

419 utils.should_strip_ansi = old_should_strip_ansi # type: ignore 

420 _compat.should_strip_ansi = old__compat_should_strip_ansi 

421 formatting.FORCED_WIDTH = old_forced_width 

422 

423 def invoke( 

424 self, 

425 cli: Command, 

426 args: str | cabc.Sequence[str] | None = None, 

427 input: str | bytes | t.IO[t.Any] | None = None, 

428 env: cabc.Mapping[str, str | None] | None = None, 

429 catch_exceptions: bool | None = None, 

430 color: bool = False, 

431 **extra: t.Any, 

432 ) -> Result: 

433 """Invokes a command in an isolated environment. The arguments are 

434 forwarded directly to the command line script, the `extra` keyword 

435 arguments are passed to the :meth:`~clickpkg.Command.main` function of 

436 the command. 

437 

438 This returns a :class:`Result` object. 

439 

440 :param cli: the command to invoke 

441 :param args: the arguments to invoke. It may be given as an iterable 

442 or a string. When given as string it will be interpreted 

443 as a Unix shell command. More details at 

444 :func:`shlex.split`. 

445 :param input: the input data for `sys.stdin`. 

446 :param env: the environment overrides. 

447 :param catch_exceptions: Whether to catch any other exceptions than 

448 ``SystemExit``. If :data:`None`, the value 

449 from :class:`CliRunner` is used. 

450 :param extra: the keyword arguments to pass to :meth:`main`. 

451 :param color: whether the output should contain color codes. The 

452 application can still override this explicitly. 

453 

454 .. versionadded:: 8.2 

455 The result object has the ``output_bytes`` attribute with 

456 the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would 

457 see it in its terminal. 

458 

459 .. versionchanged:: 8.2 

460 The result object always returns the ``stderr_bytes`` stream. 

461 

462 .. versionchanged:: 8.0 

463 The result object has the ``return_value`` attribute with 

464 the value returned from the invoked command. 

465 

466 .. versionchanged:: 4.0 

467 Added the ``color`` parameter. 

468 

469 .. versionchanged:: 3.0 

470 Added the ``catch_exceptions`` parameter. 

471 

472 .. versionchanged:: 3.0 

473 The result object has the ``exc_info`` attribute with the 

474 traceback if available. 

475 """ 

476 exc_info = None 

477 if catch_exceptions is None: 

478 catch_exceptions = self.catch_exceptions 

479 

480 with self.isolation(input=input, env=env, color=color) as outstreams: 

481 return_value = None 

482 exception: BaseException | None = None 

483 exit_code = 0 

484 

485 if isinstance(args, str): 

486 args = shlex.split(args) 

487 

488 try: 

489 prog_name = extra.pop("prog_name") 

490 except KeyError: 

491 prog_name = self.get_default_prog_name(cli) 

492 

493 try: 

494 return_value = cli.main(args=args or (), prog_name=prog_name, **extra) 

495 except SystemExit as e: 

496 exc_info = sys.exc_info() 

497 e_code = t.cast("int | t.Any | None", e.code) 

498 

499 if e_code is None: 

500 e_code = 0 

501 

502 if e_code != 0: 

503 exception = e 

504 

505 if not isinstance(e_code, int): 

506 sys.stdout.write(str(e_code)) 

507 sys.stdout.write("\n") 

508 e_code = 1 

509 

510 exit_code = e_code 

511 

512 except Exception as e: 

513 if not catch_exceptions: 

514 raise 

515 exception = e 

516 exit_code = 1 

517 exc_info = sys.exc_info() 

518 finally: 

519 sys.stdout.flush() 

520 sys.stderr.flush() 

521 stdout = outstreams[0].getvalue() 

522 stderr = outstreams[1].getvalue() 

523 output = outstreams[2].getvalue() 

524 

525 return Result( 

526 runner=self, 

527 stdout_bytes=stdout, 

528 stderr_bytes=stderr, 

529 output_bytes=output, 

530 return_value=return_value, 

531 exit_code=exit_code, 

532 exception=exception, 

533 exc_info=exc_info, # type: ignore 

534 ) 

535 

536 @contextlib.contextmanager 

537 def isolated_filesystem( 

538 self, temp_dir: str | os.PathLike[str] | None = None 

539 ) -> cabc.Iterator[str]: 

540 """A context manager that creates a temporary directory and 

541 changes the current working directory to it. This isolates tests 

542 that affect the contents of the CWD to prevent them from 

543 interfering with each other. 

544 

545 :param temp_dir: Create the temporary directory under this 

546 directory. If given, the created directory is not removed 

547 when exiting. 

548 

549 .. versionchanged:: 8.0 

550 Added the ``temp_dir`` parameter. 

551 """ 

552 cwd = os.getcwd() 

553 dt = tempfile.mkdtemp(dir=temp_dir) 

554 os.chdir(dt) 

555 

556 try: 

557 yield dt 

558 finally: 

559 os.chdir(cwd) 

560 

561 if temp_dir is None: 

562 try: 

563 shutil.rmtree(dt) 

564 except OSError: 

565 pass