Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/_compat.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

320 statements  

1from __future__ import annotations 

2 

3import codecs 

4import collections.abc as cabc 

5import io 

6import os 

7import re 

8import sys 

9import typing as t 

10from types import TracebackType 

11from weakref import WeakKeyDictionary 

12 

13CYGWIN = sys.platform.startswith("cygwin") 

14WIN = sys.platform.startswith("win") 

15MAC = sys.platform == "darwin" 

16auto_wrap_for_ansi: t.Callable[[t.TextIO], t.TextIO] | None = None 

17_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") 

18 

19 

20def _make_text_stream( 

21 stream: t.BinaryIO, 

22 encoding: str | None, 

23 errors: str | None, 

24 force_readable: bool = False, 

25 force_writable: bool = False, 

26) -> t.TextIO: 

27 if encoding is None: 

28 encoding = get_best_encoding(stream) 

29 if errors is None: 

30 errors = "replace" 

31 return _NonClosingTextIOWrapper( 

32 stream, 

33 encoding, 

34 errors, 

35 line_buffering=True, 

36 force_readable=force_readable, 

37 force_writable=force_writable, 

38 ) 

39 

40 

41def is_ascii_encoding(encoding: str) -> bool: 

42 """Checks if a given encoding is ascii.""" 

43 try: 

44 return codecs.lookup(encoding).name == "ascii" 

45 except LookupError: 

46 return False 

47 

48 

49def get_best_encoding(stream: t.IO[t.Any]) -> str: 

50 """Returns the default stream encoding if not found.""" 

51 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() 

52 if is_ascii_encoding(rv): 

53 return "utf-8" 

54 return rv 

55 

56 

57class _NonClosingTextIOWrapper(io.TextIOWrapper): 

58 def __init__( 

59 self, 

60 stream: t.BinaryIO, 

61 encoding: str | None, 

62 errors: str | None, 

63 force_readable: bool = False, 

64 force_writable: bool = False, 

65 **extra: t.Any, 

66 ) -> None: 

67 self._stream = stream = t.cast( 

68 t.BinaryIO, _FixupStream(stream, force_readable, force_writable) 

69 ) 

70 super().__init__(stream, encoding, errors, **extra) 

71 

72 def __del__(self) -> None: 

73 try: 

74 self.detach() 

75 except Exception: 

76 pass 

77 

78 def isatty(self) -> bool: 

79 # https://bitbucket.org/pypy/pypy/issue/1803 

80 return self._stream.isatty() 

81 

82 

83class _FixupStream: 

84 """The new io interface needs more from streams than streams 

85 traditionally implement. As such, this fix-up code is necessary in 

86 some circumstances. 

87 

88 The forcing of readable and writable flags are there because some tools 

89 put badly patched objects on sys (one such offender are certain version 

90 of jupyter notebook). 

91 """ 

92 

93 def __init__( 

94 self, 

95 stream: t.BinaryIO, 

96 force_readable: bool = False, 

97 force_writable: bool = False, 

98 ): 

99 self._stream = stream 

100 self._force_readable = force_readable 

101 self._force_writable = force_writable 

102 

103 def __getattr__(self, name: str) -> t.Any: 

104 return getattr(self._stream, name) 

105 

106 def read1(self, size: int) -> bytes: 

107 f = getattr(self._stream, "read1", None) 

108 

109 if f is not None: 

110 return t.cast(bytes, f(size)) 

111 

112 return self._stream.read(size) 

113 

114 def readable(self) -> bool: 

115 if self._force_readable: 

116 return True 

117 x = getattr(self._stream, "readable", None) 

118 if x is not None: 

119 return t.cast(bool, x()) 

120 try: 

121 self._stream.read(0) 

122 except Exception: 

123 return False 

124 return True 

125 

126 def writable(self) -> bool: 

127 if self._force_writable: 

128 return True 

129 x = getattr(self._stream, "writable", None) 

130 if x is not None: 

131 return t.cast(bool, x()) 

132 try: 

133 self._stream.write(b"") 

134 except Exception: 

135 try: 

136 self._stream.write(b"") 

137 except Exception: 

138 return False 

139 return True 

140 

141 def seekable(self) -> bool: 

142 x = getattr(self._stream, "seekable", None) 

143 if x is not None: 

144 return t.cast(bool, x()) 

145 try: 

146 self._stream.seek(self._stream.tell()) 

147 except Exception: 

148 return False 

149 return True 

150 

151 

152def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: 

153 try: 

154 return isinstance(stream.read(0), bytes) 

155 except Exception: 

156 return default 

157 # This happens in some cases where the stream was already 

158 # closed. In this case, we assume the default. 

159 

160 

161def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: 

162 try: 

163 stream.write(b"") 

164 except Exception: 

165 try: 

166 stream.write("") 

167 return False 

168 except Exception: 

169 pass 

170 return default 

171 return True 

172 

173 

174def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

175 # We need to figure out if the given stream is already binary. 

176 # This can happen because the official docs recommend detaching 

177 # the streams to get binary streams. Some code might do this, so 

178 # we need to deal with this case explicitly. 

179 if _is_binary_reader(stream, False): 

180 return t.cast(t.BinaryIO, stream) 

181 

182 buf = getattr(stream, "buffer", None) 

183 

184 # Same situation here; this time we assume that the buffer is 

185 # actually binary in case it's closed. 

186 if buf is not None and _is_binary_reader(buf, True): 

187 return t.cast(t.BinaryIO, buf) 

188 

189 return None 

190 

191 

192def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

193 # We need to figure out if the given stream is already binary. 

194 # This can happen because the official docs recommend detaching 

195 # the streams to get binary streams. Some code might do this, so 

196 # we need to deal with this case explicitly. 

197 if _is_binary_writer(stream, False): 

198 return t.cast(t.BinaryIO, stream) 

199 

200 buf = getattr(stream, "buffer", None) 

201 

202 # Same situation here; this time we assume that the buffer is 

203 # actually binary in case it's closed. 

204 if buf is not None and _is_binary_writer(buf, True): 

205 return t.cast(t.BinaryIO, buf) 

206 

207 return None 

208 

209 

210def _stream_is_misconfigured(stream: t.TextIO) -> bool: 

211 """A stream is misconfigured if its encoding is ASCII.""" 

212 # If the stream does not have an encoding set, we assume it's set 

213 # to ASCII. This appears to happen in certain unittest 

214 # environments. It's not quite clear what the correct behavior is 

215 # but this at least will force Click to recover somehow. 

216 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") 

217 

218 

219def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool: 

220 """A stream attribute is compatible if it is equal to the 

221 desired value or the desired value is unset and the attribute 

222 has a value. 

223 """ 

224 stream_value = getattr(stream, attr, None) 

225 return stream_value == value or (value is None and stream_value is not None) 

226 

227 

228def _is_compatible_text_stream( 

229 stream: t.TextIO, encoding: str | None, errors: str | None 

230) -> bool: 

231 """Check if a stream's encoding and errors attributes are 

232 compatible with the desired values. 

233 """ 

234 return _is_compat_stream_attr( 

235 stream, "encoding", encoding 

236 ) and _is_compat_stream_attr(stream, "errors", errors) 

237 

238 

239def _force_correct_text_stream( 

240 text_stream: t.IO[t.Any], 

241 encoding: str | None, 

242 errors: str | None, 

243 is_binary: t.Callable[[t.IO[t.Any], bool], bool], 

244 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None], 

245 force_readable: bool = False, 

246 force_writable: bool = False, 

247) -> t.TextIO: 

248 if is_binary(text_stream, False): 

249 binary_reader = t.cast(t.BinaryIO, text_stream) 

250 else: 

251 text_stream = t.cast(t.TextIO, text_stream) 

252 # If the stream looks compatible, and won't default to a 

253 # misconfigured ascii encoding, return it as-is. 

254 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( 

255 encoding is None and _stream_is_misconfigured(text_stream) 

256 ): 

257 return text_stream 

258 

259 # Otherwise, get the underlying binary reader. 

260 possible_binary_reader = find_binary(text_stream) 

261 

262 # If that's not possible, silently use the original reader 

263 # and get mojibake instead of exceptions. 

264 if possible_binary_reader is None: 

265 return text_stream 

266 

267 binary_reader = possible_binary_reader 

268 

269 # Default errors to replace instead of strict in order to get 

270 # something that works. 

271 if errors is None: 

272 errors = "replace" 

273 

274 # Wrap the binary stream in a text stream with the correct 

275 # encoding parameters. 

276 return _make_text_stream( 

277 binary_reader, 

278 encoding, 

279 errors, 

280 force_readable=force_readable, 

281 force_writable=force_writable, 

282 ) 

283 

284 

285def _force_correct_text_reader( 

286 text_reader: t.IO[t.Any], 

287 encoding: str | None, 

288 errors: str | None, 

289 force_readable: bool = False, 

290) -> t.TextIO: 

291 return _force_correct_text_stream( 

292 text_reader, 

293 encoding, 

294 errors, 

295 _is_binary_reader, 

296 _find_binary_reader, 

297 force_readable=force_readable, 

298 ) 

299 

300 

301def _force_correct_text_writer( 

302 text_writer: t.IO[t.Any], 

303 encoding: str | None, 

304 errors: str | None, 

305 force_writable: bool = False, 

306) -> t.TextIO: 

307 return _force_correct_text_stream( 

308 text_writer, 

309 encoding, 

310 errors, 

311 _is_binary_writer, 

312 _find_binary_writer, 

313 force_writable=force_writable, 

314 ) 

315 

316 

317def get_binary_stdin() -> t.BinaryIO: 

318 reader = _find_binary_reader(sys.stdin) 

319 if reader is None: 

320 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") 

321 return reader 

322 

323 

324def get_binary_stdout() -> t.BinaryIO: 

325 writer = _find_binary_writer(sys.stdout) 

326 if writer is None: 

327 raise RuntimeError("Was not able to determine binary stream for sys.stdout.") 

328 return writer 

329 

330 

331def get_binary_stderr() -> t.BinaryIO: 

332 writer = _find_binary_writer(sys.stderr) 

333 if writer is None: 

334 raise RuntimeError("Was not able to determine binary stream for sys.stderr.") 

335 return writer 

336 

337 

338def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

339 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 

340 if rv is not None: 

341 return rv 

342 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) 

343 

344 

345def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

346 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 

347 if rv is not None: 

348 return rv 

349 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) 

350 

351 

352def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

353 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 

354 if rv is not None: 

355 return rv 

356 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) 

357 

358 

359def _wrap_io_open( 

360 file: str | os.PathLike[str] | int, 

361 mode: str, 

362 encoding: str | None, 

363 errors: str | None, 

364) -> t.IO[t.Any]: 

365 """Handles not passing ``encoding`` and ``errors`` in binary mode.""" 

366 if "b" in mode: 

367 return open(file, mode) 

368 

369 return open(file, mode, encoding=encoding, errors=errors) 

370 

371 

372def open_stream( 

373 filename: str | os.PathLike[str], 

374 mode: str = "r", 

375 encoding: str | None = None, 

376 errors: str | None = "strict", 

377 atomic: bool = False, 

378) -> tuple[t.IO[t.Any], bool]: 

379 binary = "b" in mode 

380 filename = os.fspath(filename) 

381 

382 # Standard streams first. These are simple because they ignore the 

383 # atomic flag. Use fsdecode to handle Path("-"). 

384 if os.fsdecode(filename) == "-": 

385 if any(m in mode for m in ["w", "a", "x"]): 

386 if binary: 

387 return get_binary_stdout(), False 

388 return get_text_stdout(encoding=encoding, errors=errors), False 

389 if binary: 

390 return get_binary_stdin(), False 

391 return get_text_stdin(encoding=encoding, errors=errors), False 

392 

393 # Non-atomic writes directly go out through the regular open functions. 

394 if not atomic: 

395 return _wrap_io_open(filename, mode, encoding, errors), True 

396 

397 # Some usability stuff for atomic writes 

398 if "a" in mode: 

399 raise ValueError( 

400 "Appending to an existing file is not supported, because that" 

401 " would involve an expensive `copy`-operation to a temporary" 

402 " file. Open the file in normal `w`-mode and copy explicitly" 

403 " if that's what you're after." 

404 ) 

405 if "x" in mode: 

406 raise ValueError("Use the `overwrite`-parameter instead.") 

407 if "w" not in mode: 

408 raise ValueError("Atomic writes only make sense with `w`-mode.") 

409 

410 # Atomic writes are more complicated. They work by opening a file 

411 # as a proxy in the same folder and then using the fdopen 

412 # functionality to wrap it in a Python file. Then we wrap it in an 

413 # atomic file that moves the file over on close. 

414 import errno 

415 import random 

416 

417 try: 

418 perm: int | None = os.stat(filename).st_mode 

419 except OSError: 

420 perm = None 

421 

422 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL 

423 

424 if binary: 

425 flags |= getattr(os, "O_BINARY", 0) 

426 

427 while True: 

428 tmp_filename = os.path.join( 

429 os.path.dirname(filename), 

430 f".__atomic-write{random.randrange(1 << 32):08x}", 

431 ) 

432 try: 

433 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) 

434 break 

435 except OSError as e: 

436 if e.errno == errno.EEXIST or ( 

437 os.name == "nt" 

438 and e.errno == errno.EACCES 

439 and os.path.isdir(e.filename) 

440 and os.access(e.filename, os.W_OK) 

441 ): 

442 continue 

443 raise 

444 

445 if perm is not None: 

446 os.chmod(tmp_filename, perm) # in case perm includes bits in umask 

447 

448 f = _wrap_io_open(fd, mode, encoding, errors) 

449 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) 

450 return t.cast(t.IO[t.Any], af), True 

451 

452 

453class _AtomicFile: 

454 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: 

455 self._f = f 

456 self._tmp_filename = tmp_filename 

457 self._real_filename = real_filename 

458 self.closed = False 

459 

460 @property 

461 def name(self) -> str: 

462 return self._real_filename 

463 

464 def close(self, delete: bool = False) -> None: 

465 if self.closed: 

466 return 

467 self._f.close() 

468 os.replace(self._tmp_filename, self._real_filename) 

469 self.closed = True 

470 

471 def __getattr__(self, name: str) -> t.Any: 

472 return getattr(self._f, name) 

473 

474 def __enter__(self) -> _AtomicFile: 

475 return self 

476 

477 def __exit__( 

478 self, 

479 exc_type: type[BaseException] | None, 

480 exc_value: BaseException | None, 

481 tb: TracebackType | None, 

482 ) -> None: 

483 self.close(delete=exc_type is not None) 

484 

485 def __repr__(self) -> str: 

486 return repr(self._f) 

487 

488 

489def strip_ansi(value: str) -> str: 

490 return _ansi_re.sub("", value) 

491 

492 

493def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: 

494 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): 

495 stream = stream._stream 

496 

497 return stream.__class__.__module__.startswith("ipykernel.") 

498 

499 

500def should_strip_ansi( 

501 stream: t.IO[t.Any] | None = None, color: bool | None = None 

502) -> bool: 

503 if color is None: 

504 if stream is None: 

505 stream = sys.stdin 

506 elif hasattr(stream, "color"): 

507 # ._termui_impl.MaybeStripAnsi handles stripping ansi itself, 

508 # so we don't need to strip it here 

509 return False 

510 return not isatty(stream) and not _is_jupyter_kernel_output(stream) 

511 return not color 

512 

513 

514# On Windows, wrap the output streams with colorama to support ANSI 

515# color codes. 

516# NOTE: double check is needed so mypy does not analyze this on Linux 

517if sys.platform.startswith("win") and WIN: 

518 from ._winconsole import _get_windows_console_stream 

519 

520 def _get_argv_encoding() -> str: 

521 import locale 

522 

523 return locale.getpreferredencoding() 

524 

525 _ansi_stream_wrappers: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

526 

527 def auto_wrap_for_ansi(stream: t.TextIO, color: bool | None = None) -> t.TextIO: 

528 """Support ANSI color and style codes on Windows by wrapping a 

529 stream with colorama. 

530 """ 

531 try: 

532 cached = _ansi_stream_wrappers.get(stream) 

533 except Exception: 

534 cached = None 

535 

536 if cached is not None: 

537 return cached 

538 

539 import colorama 

540 

541 strip = should_strip_ansi(stream, color) 

542 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 

543 rv = t.cast(t.TextIO, ansi_wrapper.stream) 

544 _write = rv.write 

545 

546 def _safe_write(s: str) -> int: 

547 try: 

548 return _write(s) 

549 except BaseException: 

550 ansi_wrapper.reset_all() 

551 raise 

552 

553 rv.write = _safe_write # type: ignore[method-assign] 

554 

555 try: 

556 _ansi_stream_wrappers[stream] = rv 

557 except Exception: 

558 pass 

559 

560 return rv 

561 

562else: 

563 

564 def _get_argv_encoding() -> str: 

565 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() 

566 

567 def _get_windows_console_stream( 

568 f: t.TextIO, encoding: str | None, errors: str | None 

569 ) -> t.TextIO | None: 

570 return None 

571 

572 

573def term_len(x: str) -> int: 

574 return len(strip_ansi(x)) 

575 

576 

577def isatty(stream: t.IO[t.Any]) -> bool: 

578 try: 

579 return stream.isatty() 

580 except Exception: 

581 return False 

582 

583 

584def _make_cached_stream_func( 

585 src_func: t.Callable[[], t.TextIO | None], 

586 wrapper_func: t.Callable[[], t.TextIO], 

587) -> t.Callable[[], t.TextIO | None]: 

588 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

589 

590 def func() -> t.TextIO | None: 

591 stream = src_func() 

592 

593 if stream is None: 

594 return None 

595 

596 try: 

597 rv = cache.get(stream) 

598 except Exception: 

599 rv = None 

600 if rv is not None: 

601 return rv 

602 rv = wrapper_func() 

603 try: 

604 cache[stream] = rv 

605 except Exception: 

606 pass 

607 return rv 

608 

609 return func 

610 

611 

612_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 

613_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 

614_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 

615 

616 

617binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = { 

618 "stdin": get_binary_stdin, 

619 "stdout": get_binary_stdout, 

620 "stderr": get_binary_stderr, 

621} 

622 

623text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = { 

624 "stdin": get_text_stdin, 

625 "stdout": get_text_stdout, 

626 "stderr": get_text_stderr, 

627}