Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/_compat.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

316 statements  

1from __future__ import annotations 

2 

3import codecs 

4import collections.abc as cabc 

5import io 

6import os 

7import re 

8import sys 

9import typing as t 

10from types import TracebackType 

11from weakref import WeakKeyDictionary 

12 

13CYGWIN = sys.platform.startswith("cygwin") 

14WIN = sys.platform.startswith("win") 

15auto_wrap_for_ansi: t.Callable[[t.TextIO], t.TextIO] | None = None 

16_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") 

17 

18 

19def _make_text_stream( 

20 stream: t.BinaryIO, 

21 encoding: str | None, 

22 errors: str | None, 

23 force_readable: bool = False, 

24 force_writable: bool = False, 

25) -> t.TextIO: 

26 if encoding is None: 

27 encoding = get_best_encoding(stream) 

28 if errors is None: 

29 errors = "replace" 

30 return _NonClosingTextIOWrapper( 

31 stream, 

32 encoding, 

33 errors, 

34 line_buffering=True, 

35 force_readable=force_readable, 

36 force_writable=force_writable, 

37 ) 

38 

39 

40def is_ascii_encoding(encoding: str) -> bool: 

41 """Checks if a given encoding is ascii.""" 

42 try: 

43 return codecs.lookup(encoding).name == "ascii" 

44 except LookupError: 

45 return False 

46 

47 

48def get_best_encoding(stream: t.IO[t.Any]) -> str: 

49 """Returns the default stream encoding if not found.""" 

50 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() 

51 if is_ascii_encoding(rv): 

52 return "utf-8" 

53 return rv 

54 

55 

56class _NonClosingTextIOWrapper(io.TextIOWrapper): 

57 def __init__( 

58 self, 

59 stream: t.BinaryIO, 

60 encoding: str | None, 

61 errors: str | None, 

62 force_readable: bool = False, 

63 force_writable: bool = False, 

64 **extra: t.Any, 

65 ) -> None: 

66 self._stream = stream = t.cast( 

67 t.BinaryIO, _FixupStream(stream, force_readable, force_writable) 

68 ) 

69 super().__init__(stream, encoding, errors, **extra) 

70 

71 def __del__(self) -> None: 

72 try: 

73 self.detach() 

74 except Exception: 

75 pass 

76 

77 def isatty(self) -> bool: 

78 # https://bitbucket.org/pypy/pypy/issue/1803 

79 return self._stream.isatty() 

80 

81 

82class _FixupStream: 

83 """The new io interface needs more from streams than streams 

84 traditionally implement. As such, this fix-up code is necessary in 

85 some circumstances. 

86 

87 The forcing of readable and writable flags are there because some tools 

88 put badly patched objects on sys (one such offender are certain version 

89 of jupyter notebook). 

90 """ 

91 

92 def __init__( 

93 self, 

94 stream: t.BinaryIO, 

95 force_readable: bool = False, 

96 force_writable: bool = False, 

97 ): 

98 self._stream = stream 

99 self._force_readable = force_readable 

100 self._force_writable = force_writable 

101 

102 def __getattr__(self, name: str) -> t.Any: 

103 return getattr(self._stream, name) 

104 

105 def read1(self, size: int) -> bytes: 

106 f = getattr(self._stream, "read1", None) 

107 

108 if f is not None: 

109 return t.cast(bytes, f(size)) 

110 

111 return self._stream.read(size) 

112 

113 def readable(self) -> bool: 

114 if self._force_readable: 

115 return True 

116 x = getattr(self._stream, "readable", None) 

117 if x is not None: 

118 return t.cast(bool, x()) 

119 try: 

120 self._stream.read(0) 

121 except Exception: 

122 return False 

123 return True 

124 

125 def writable(self) -> bool: 

126 if self._force_writable: 

127 return True 

128 x = getattr(self._stream, "writable", None) 

129 if x is not None: 

130 return t.cast(bool, x()) 

131 try: 

132 self._stream.write("") # type: ignore 

133 except Exception: 

134 try: 

135 self._stream.write(b"") 

136 except Exception: 

137 return False 

138 return True 

139 

140 def seekable(self) -> bool: 

141 x = getattr(self._stream, "seekable", None) 

142 if x is not None: 

143 return t.cast(bool, x()) 

144 try: 

145 self._stream.seek(self._stream.tell()) 

146 except Exception: 

147 return False 

148 return True 

149 

150 

151def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: 

152 try: 

153 return isinstance(stream.read(0), bytes) 

154 except Exception: 

155 return default 

156 # This happens in some cases where the stream was already 

157 # closed. In this case, we assume the default. 

158 

159 

160def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: 

161 try: 

162 stream.write(b"") 

163 except Exception: 

164 try: 

165 stream.write("") 

166 return False 

167 except Exception: 

168 pass 

169 return default 

170 return True 

171 

172 

173def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

174 # We need to figure out if the given stream is already binary. 

175 # This can happen because the official docs recommend detaching 

176 # the streams to get binary streams. Some code might do this, so 

177 # we need to deal with this case explicitly. 

178 if _is_binary_reader(stream, False): 

179 return t.cast(t.BinaryIO, stream) 

180 

181 buf = getattr(stream, "buffer", None) 

182 

183 # Same situation here; this time we assume that the buffer is 

184 # actually binary in case it's closed. 

185 if buf is not None and _is_binary_reader(buf, True): 

186 return t.cast(t.BinaryIO, buf) 

187 

188 return None 

189 

190 

191def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

192 # We need to figure out if the given stream is already binary. 

193 # This can happen because the official docs recommend detaching 

194 # the streams to get binary streams. Some code might do this, so 

195 # we need to deal with this case explicitly. 

196 if _is_binary_writer(stream, False): 

197 return t.cast(t.BinaryIO, stream) 

198 

199 buf = getattr(stream, "buffer", None) 

200 

201 # Same situation here; this time we assume that the buffer is 

202 # actually binary in case it's closed. 

203 if buf is not None and _is_binary_writer(buf, True): 

204 return t.cast(t.BinaryIO, buf) 

205 

206 return None 

207 

208 

209def _stream_is_misconfigured(stream: t.TextIO) -> bool: 

210 """A stream is misconfigured if its encoding is ASCII.""" 

211 # If the stream does not have an encoding set, we assume it's set 

212 # to ASCII. This appears to happen in certain unittest 

213 # environments. It's not quite clear what the correct behavior is 

214 # but this at least will force Click to recover somehow. 

215 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") 

216 

217 

218def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool: 

219 """A stream attribute is compatible if it is equal to the 

220 desired value or the desired value is unset and the attribute 

221 has a value. 

222 """ 

223 stream_value = getattr(stream, attr, None) 

224 return stream_value == value or (value is None and stream_value is not None) 

225 

226 

227def _is_compatible_text_stream( 

228 stream: t.TextIO, encoding: str | None, errors: str | None 

229) -> bool: 

230 """Check if a stream's encoding and errors attributes are 

231 compatible with the desired values. 

232 """ 

233 return _is_compat_stream_attr( 

234 stream, "encoding", encoding 

235 ) and _is_compat_stream_attr(stream, "errors", errors) 

236 

237 

238def _force_correct_text_stream( 

239 text_stream: t.IO[t.Any], 

240 encoding: str | None, 

241 errors: str | None, 

242 is_binary: t.Callable[[t.IO[t.Any], bool], bool], 

243 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None], 

244 force_readable: bool = False, 

245 force_writable: bool = False, 

246) -> t.TextIO: 

247 if is_binary(text_stream, False): 

248 binary_reader = t.cast(t.BinaryIO, text_stream) 

249 else: 

250 text_stream = t.cast(t.TextIO, text_stream) 

251 # If the stream looks compatible, and won't default to a 

252 # misconfigured ascii encoding, return it as-is. 

253 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( 

254 encoding is None and _stream_is_misconfigured(text_stream) 

255 ): 

256 return text_stream 

257 

258 # Otherwise, get the underlying binary reader. 

259 possible_binary_reader = find_binary(text_stream) 

260 

261 # If that's not possible, silently use the original reader 

262 # and get mojibake instead of exceptions. 

263 if possible_binary_reader is None: 

264 return text_stream 

265 

266 binary_reader = possible_binary_reader 

267 

268 # Default errors to replace instead of strict in order to get 

269 # something that works. 

270 if errors is None: 

271 errors = "replace" 

272 

273 # Wrap the binary stream in a text stream with the correct 

274 # encoding parameters. 

275 return _make_text_stream( 

276 binary_reader, 

277 encoding, 

278 errors, 

279 force_readable=force_readable, 

280 force_writable=force_writable, 

281 ) 

282 

283 

284def _force_correct_text_reader( 

285 text_reader: t.IO[t.Any], 

286 encoding: str | None, 

287 errors: str | None, 

288 force_readable: bool = False, 

289) -> t.TextIO: 

290 return _force_correct_text_stream( 

291 text_reader, 

292 encoding, 

293 errors, 

294 _is_binary_reader, 

295 _find_binary_reader, 

296 force_readable=force_readable, 

297 ) 

298 

299 

300def _force_correct_text_writer( 

301 text_writer: t.IO[t.Any], 

302 encoding: str | None, 

303 errors: str | None, 

304 force_writable: bool = False, 

305) -> t.TextIO: 

306 return _force_correct_text_stream( 

307 text_writer, 

308 encoding, 

309 errors, 

310 _is_binary_writer, 

311 _find_binary_writer, 

312 force_writable=force_writable, 

313 ) 

314 

315 

316def get_binary_stdin() -> t.BinaryIO: 

317 reader = _find_binary_reader(sys.stdin) 

318 if reader is None: 

319 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") 

320 return reader 

321 

322 

323def get_binary_stdout() -> t.BinaryIO: 

324 writer = _find_binary_writer(sys.stdout) 

325 if writer is None: 

326 raise RuntimeError("Was not able to determine binary stream for sys.stdout.") 

327 return writer 

328 

329 

330def get_binary_stderr() -> t.BinaryIO: 

331 writer = _find_binary_writer(sys.stderr) 

332 if writer is None: 

333 raise RuntimeError("Was not able to determine binary stream for sys.stderr.") 

334 return writer 

335 

336 

337def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

338 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 

339 if rv is not None: 

340 return rv 

341 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) 

342 

343 

344def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

345 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 

346 if rv is not None: 

347 return rv 

348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) 

349 

350 

351def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

352 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 

353 if rv is not None: 

354 return rv 

355 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) 

356 

357 

358def _wrap_io_open( 

359 file: str | os.PathLike[str] | int, 

360 mode: str, 

361 encoding: str | None, 

362 errors: str | None, 

363) -> t.IO[t.Any]: 

364 """Handles not passing ``encoding`` and ``errors`` in binary mode.""" 

365 if "b" in mode: 

366 return open(file, mode) 

367 

368 return open(file, mode, encoding=encoding, errors=errors) 

369 

370 

371def open_stream( 

372 filename: str | os.PathLike[str], 

373 mode: str = "r", 

374 encoding: str | None = None, 

375 errors: str | None = "strict", 

376 atomic: bool = False, 

377) -> tuple[t.IO[t.Any], bool]: 

378 binary = "b" in mode 

379 filename = os.fspath(filename) 

380 

381 # Standard streams first. These are simple because they ignore the 

382 # atomic flag. Use fsdecode to handle Path("-"). 

383 if os.fsdecode(filename) == "-": 

384 if any(m in mode for m in ["w", "a", "x"]): 

385 if binary: 

386 return get_binary_stdout(), False 

387 return get_text_stdout(encoding=encoding, errors=errors), False 

388 if binary: 

389 return get_binary_stdin(), False 

390 return get_text_stdin(encoding=encoding, errors=errors), False 

391 

392 # Non-atomic writes directly go out through the regular open functions. 

393 if not atomic: 

394 return _wrap_io_open(filename, mode, encoding, errors), True 

395 

396 # Some usability stuff for atomic writes 

397 if "a" in mode: 

398 raise ValueError( 

399 "Appending to an existing file is not supported, because that" 

400 " would involve an expensive `copy`-operation to a temporary" 

401 " file. Open the file in normal `w`-mode and copy explicitly" 

402 " if that's what you're after." 

403 ) 

404 if "x" in mode: 

405 raise ValueError("Use the `overwrite`-parameter instead.") 

406 if "w" not in mode: 

407 raise ValueError("Atomic writes only make sense with `w`-mode.") 

408 

409 # Atomic writes are more complicated. They work by opening a file 

410 # as a proxy in the same folder and then using the fdopen 

411 # functionality to wrap it in a Python file. Then we wrap it in an 

412 # atomic file that moves the file over on close. 

413 import errno 

414 import random 

415 

416 try: 

417 perm: int | None = os.stat(filename).st_mode 

418 except OSError: 

419 perm = None 

420 

421 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL 

422 

423 if binary: 

424 flags |= getattr(os, "O_BINARY", 0) 

425 

426 while True: 

427 tmp_filename = os.path.join( 

428 os.path.dirname(filename), 

429 f".__atomic-write{random.randrange(1 << 32):08x}", 

430 ) 

431 try: 

432 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) 

433 break 

434 except OSError as e: 

435 if e.errno == errno.EEXIST or ( 

436 os.name == "nt" 

437 and e.errno == errno.EACCES 

438 and os.path.isdir(e.filename) 

439 and os.access(e.filename, os.W_OK) 

440 ): 

441 continue 

442 raise 

443 

444 if perm is not None: 

445 os.chmod(tmp_filename, perm) # in case perm includes bits in umask 

446 

447 f = _wrap_io_open(fd, mode, encoding, errors) 

448 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) 

449 return t.cast(t.IO[t.Any], af), True 

450 

451 

452class _AtomicFile: 

453 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: 

454 self._f = f 

455 self._tmp_filename = tmp_filename 

456 self._real_filename = real_filename 

457 self.closed = False 

458 

459 @property 

460 def name(self) -> str: 

461 return self._real_filename 

462 

463 def close(self, delete: bool = False) -> None: 

464 if self.closed: 

465 return 

466 self._f.close() 

467 os.replace(self._tmp_filename, self._real_filename) 

468 self.closed = True 

469 

470 def __getattr__(self, name: str) -> t.Any: 

471 return getattr(self._f, name) 

472 

473 def __enter__(self) -> _AtomicFile: 

474 return self 

475 

476 def __exit__( 

477 self, 

478 exc_type: type[BaseException] | None, 

479 exc_value: BaseException | None, 

480 tb: TracebackType | None, 

481 ) -> None: 

482 self.close(delete=exc_type is not None) 

483 

484 def __repr__(self) -> str: 

485 return repr(self._f) 

486 

487 

488def strip_ansi(value: str) -> str: 

489 return _ansi_re.sub("", value) 

490 

491 

492def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: 

493 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): 

494 stream = stream._stream 

495 

496 return stream.__class__.__module__.startswith("ipykernel.") 

497 

498 

499def should_strip_ansi( 

500 stream: t.IO[t.Any] | None = None, color: bool | None = None 

501) -> bool: 

502 if color is None: 

503 if stream is None: 

504 stream = sys.stdin 

505 return not isatty(stream) and not _is_jupyter_kernel_output(stream) 

506 return not color 

507 

508 

509# On Windows, wrap the output streams with colorama to support ANSI 

510# color codes. 

511# NOTE: double check is needed so mypy does not analyze this on Linux 

512if sys.platform.startswith("win") and WIN: 

513 from ._winconsole import _get_windows_console_stream 

514 

515 def _get_argv_encoding() -> str: 

516 import locale 

517 

518 return locale.getpreferredencoding() 

519 

520 _ansi_stream_wrappers: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

521 

522 def auto_wrap_for_ansi(stream: t.TextIO, color: bool | None = None) -> t.TextIO: 

523 """Support ANSI color and style codes on Windows by wrapping a 

524 stream with colorama. 

525 """ 

526 try: 

527 cached = _ansi_stream_wrappers.get(stream) 

528 except Exception: 

529 cached = None 

530 

531 if cached is not None: 

532 return cached 

533 

534 import colorama 

535 

536 strip = should_strip_ansi(stream, color) 

537 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 

538 rv = t.cast(t.TextIO, ansi_wrapper.stream) 

539 _write = rv.write 

540 

541 def _safe_write(s): 

542 try: 

543 return _write(s) 

544 except BaseException: 

545 ansi_wrapper.reset_all() 

546 raise 

547 

548 rv.write = _safe_write 

549 

550 try: 

551 _ansi_stream_wrappers[stream] = rv 

552 except Exception: 

553 pass 

554 

555 return rv 

556 

557else: 

558 

559 def _get_argv_encoding() -> str: 

560 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() 

561 

562 def _get_windows_console_stream( 

563 f: t.TextIO, encoding: str | None, errors: str | None 

564 ) -> t.TextIO | None: 

565 return None 

566 

567 

568def term_len(x: str) -> int: 

569 return len(strip_ansi(x)) 

570 

571 

572def isatty(stream: t.IO[t.Any]) -> bool: 

573 try: 

574 return stream.isatty() 

575 except Exception: 

576 return False 

577 

578 

579def _make_cached_stream_func( 

580 src_func: t.Callable[[], t.TextIO | None], 

581 wrapper_func: t.Callable[[], t.TextIO], 

582) -> t.Callable[[], t.TextIO | None]: 

583 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

584 

585 def func() -> t.TextIO | None: 

586 stream = src_func() 

587 

588 if stream is None: 

589 return None 

590 

591 try: 

592 rv = cache.get(stream) 

593 except Exception: 

594 rv = None 

595 if rv is not None: 

596 return rv 

597 rv = wrapper_func() 

598 try: 

599 cache[stream] = rv 

600 except Exception: 

601 pass 

602 return rv 

603 

604 return func 

605 

606 

607_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 

608_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 

609_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 

610 

611 

612binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = { 

613 "stdin": get_binary_stdin, 

614 "stdout": get_binary_stdout, 

615 "stderr": get_binary_stderr, 

616} 

617 

618text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = { 

619 "stdin": get_text_stdin, 

620 "stdout": get_text_stdout, 

621 "stderr": get_text_stderr, 

622}