Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/_compat.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

313 statements  

1import codecs 

2import io 

3import os 

4import re 

5import sys 

6import typing as t 

7from weakref import WeakKeyDictionary 

8 

9CYGWIN = sys.platform.startswith("cygwin") 

10WIN = sys.platform.startswith("win") 

11auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None 

12_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") 

13 

14 

15def _make_text_stream( 

16 stream: t.BinaryIO, 

17 encoding: t.Optional[str], 

18 errors: t.Optional[str], 

19 force_readable: bool = False, 

20 force_writable: bool = False, 

21) -> t.TextIO: 

22 if encoding is None: 

23 encoding = get_best_encoding(stream) 

24 if errors is None: 

25 errors = "replace" 

26 return _NonClosingTextIOWrapper( 

27 stream, 

28 encoding, 

29 errors, 

30 line_buffering=True, 

31 force_readable=force_readable, 

32 force_writable=force_writable, 

33 ) 

34 

35 

36def is_ascii_encoding(encoding: str) -> bool: 

37 """Checks if a given encoding is ascii.""" 

38 try: 

39 return codecs.lookup(encoding).name == "ascii" 

40 except LookupError: 

41 return False 

42 

43 

44def get_best_encoding(stream: t.IO[t.Any]) -> str: 

45 """Returns the default stream encoding if not found.""" 

46 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() 

47 if is_ascii_encoding(rv): 

48 return "utf-8" 

49 return rv 

50 

51 

52class _NonClosingTextIOWrapper(io.TextIOWrapper): 

53 def __init__( 

54 self, 

55 stream: t.BinaryIO, 

56 encoding: t.Optional[str], 

57 errors: t.Optional[str], 

58 force_readable: bool = False, 

59 force_writable: bool = False, 

60 **extra: t.Any, 

61 ) -> None: 

62 self._stream = stream = t.cast( 

63 t.BinaryIO, _FixupStream(stream, force_readable, force_writable) 

64 ) 

65 super().__init__(stream, encoding, errors, **extra) 

66 

67 def __del__(self) -> None: 

68 try: 

69 self.detach() 

70 except Exception: 

71 pass 

72 

73 def isatty(self) -> bool: 

74 # https://bitbucket.org/pypy/pypy/issue/1803 

75 return self._stream.isatty() 

76 

77 

78class _FixupStream: 

79 """The new io interface needs more from streams than streams 

80 traditionally implement. As such, this fix-up code is necessary in 

81 some circumstances. 

82 

83 The forcing of readable and writable flags are there because some tools 

84 put badly patched objects on sys (one such offender are certain version 

85 of jupyter notebook). 

86 """ 

87 

88 def __init__( 

89 self, 

90 stream: t.BinaryIO, 

91 force_readable: bool = False, 

92 force_writable: bool = False, 

93 ): 

94 self._stream = stream 

95 self._force_readable = force_readable 

96 self._force_writable = force_writable 

97 

98 def __getattr__(self, name: str) -> t.Any: 

99 return getattr(self._stream, name) 

100 

101 def read1(self, size: int) -> bytes: 

102 f = getattr(self._stream, "read1", None) 

103 

104 if f is not None: 

105 return t.cast(bytes, f(size)) 

106 

107 return self._stream.read(size) 

108 

109 def readable(self) -> bool: 

110 if self._force_readable: 

111 return True 

112 x = getattr(self._stream, "readable", None) 

113 if x is not None: 

114 return t.cast(bool, x()) 

115 try: 

116 self._stream.read(0) 

117 except Exception: 

118 return False 

119 return True 

120 

121 def writable(self) -> bool: 

122 if self._force_writable: 

123 return True 

124 x = getattr(self._stream, "writable", None) 

125 if x is not None: 

126 return t.cast(bool, x()) 

127 try: 

128 self._stream.write("") # type: ignore 

129 except Exception: 

130 try: 

131 self._stream.write(b"") 

132 except Exception: 

133 return False 

134 return True 

135 

136 def seekable(self) -> bool: 

137 x = getattr(self._stream, "seekable", None) 

138 if x is not None: 

139 return t.cast(bool, x()) 

140 try: 

141 self._stream.seek(self._stream.tell()) 

142 except Exception: 

143 return False 

144 return True 

145 

146 

147def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: 

148 try: 

149 return isinstance(stream.read(0), bytes) 

150 except Exception: 

151 return default 

152 # This happens in some cases where the stream was already 

153 # closed. In this case, we assume the default. 

154 

155 

156def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: 

157 try: 

158 stream.write(b"") 

159 except Exception: 

160 try: 

161 stream.write("") 

162 return False 

163 except Exception: 

164 pass 

165 return default 

166 return True 

167 

168 

169def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: 

170 # We need to figure out if the given stream is already binary. 

171 # This can happen because the official docs recommend detaching 

172 # the streams to get binary streams. Some code might do this, so 

173 # we need to deal with this case explicitly. 

174 if _is_binary_reader(stream, False): 

175 return t.cast(t.BinaryIO, stream) 

176 

177 buf = getattr(stream, "buffer", None) 

178 

179 # Same situation here; this time we assume that the buffer is 

180 # actually binary in case it's closed. 

181 if buf is not None and _is_binary_reader(buf, True): 

182 return t.cast(t.BinaryIO, buf) 

183 

184 return None 

185 

186 

187def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: 

188 # We need to figure out if the given stream is already binary. 

189 # This can happen because the official docs recommend detaching 

190 # the streams to get binary streams. Some code might do this, so 

191 # we need to deal with this case explicitly. 

192 if _is_binary_writer(stream, False): 

193 return t.cast(t.BinaryIO, stream) 

194 

195 buf = getattr(stream, "buffer", None) 

196 

197 # Same situation here; this time we assume that the buffer is 

198 # actually binary in case it's closed. 

199 if buf is not None and _is_binary_writer(buf, True): 

200 return t.cast(t.BinaryIO, buf) 

201 

202 return None 

203 

204 

205def _stream_is_misconfigured(stream: t.TextIO) -> bool: 

206 """A stream is misconfigured if its encoding is ASCII.""" 

207 # If the stream does not have an encoding set, we assume it's set 

208 # to ASCII. This appears to happen in certain unittest 

209 # environments. It's not quite clear what the correct behavior is 

210 # but this at least will force Click to recover somehow. 

211 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") 

212 

213 

214def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool: 

215 """A stream attribute is compatible if it is equal to the 

216 desired value or the desired value is unset and the attribute 

217 has a value. 

218 """ 

219 stream_value = getattr(stream, attr, None) 

220 return stream_value == value or (value is None and stream_value is not None) 

221 

222 

223def _is_compatible_text_stream( 

224 stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] 

225) -> bool: 

226 """Check if a stream's encoding and errors attributes are 

227 compatible with the desired values. 

228 """ 

229 return _is_compat_stream_attr( 

230 stream, "encoding", encoding 

231 ) and _is_compat_stream_attr(stream, "errors", errors) 

232 

233 

234def _force_correct_text_stream( 

235 text_stream: t.IO[t.Any], 

236 encoding: t.Optional[str], 

237 errors: t.Optional[str], 

238 is_binary: t.Callable[[t.IO[t.Any], bool], bool], 

239 find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]], 

240 force_readable: bool = False, 

241 force_writable: bool = False, 

242) -> t.TextIO: 

243 if is_binary(text_stream, False): 

244 binary_reader = t.cast(t.BinaryIO, text_stream) 

245 else: 

246 text_stream = t.cast(t.TextIO, text_stream) 

247 # If the stream looks compatible, and won't default to a 

248 # misconfigured ascii encoding, return it as-is. 

249 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( 

250 encoding is None and _stream_is_misconfigured(text_stream) 

251 ): 

252 return text_stream 

253 

254 # Otherwise, get the underlying binary reader. 

255 possible_binary_reader = find_binary(text_stream) 

256 

257 # If that's not possible, silently use the original reader 

258 # and get mojibake instead of exceptions. 

259 if possible_binary_reader is None: 

260 return text_stream 

261 

262 binary_reader = possible_binary_reader 

263 

264 # Default errors to replace instead of strict in order to get 

265 # something that works. 

266 if errors is None: 

267 errors = "replace" 

268 

269 # Wrap the binary stream in a text stream with the correct 

270 # encoding parameters. 

271 return _make_text_stream( 

272 binary_reader, 

273 encoding, 

274 errors, 

275 force_readable=force_readable, 

276 force_writable=force_writable, 

277 ) 

278 

279 

280def _force_correct_text_reader( 

281 text_reader: t.IO[t.Any], 

282 encoding: t.Optional[str], 

283 errors: t.Optional[str], 

284 force_readable: bool = False, 

285) -> t.TextIO: 

286 return _force_correct_text_stream( 

287 text_reader, 

288 encoding, 

289 errors, 

290 _is_binary_reader, 

291 _find_binary_reader, 

292 force_readable=force_readable, 

293 ) 

294 

295 

296def _force_correct_text_writer( 

297 text_writer: t.IO[t.Any], 

298 encoding: t.Optional[str], 

299 errors: t.Optional[str], 

300 force_writable: bool = False, 

301) -> t.TextIO: 

302 return _force_correct_text_stream( 

303 text_writer, 

304 encoding, 

305 errors, 

306 _is_binary_writer, 

307 _find_binary_writer, 

308 force_writable=force_writable, 

309 ) 

310 

311 

312def get_binary_stdin() -> t.BinaryIO: 

313 reader = _find_binary_reader(sys.stdin) 

314 if reader is None: 

315 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") 

316 return reader 

317 

318 

319def get_binary_stdout() -> t.BinaryIO: 

320 writer = _find_binary_writer(sys.stdout) 

321 if writer is None: 

322 raise RuntimeError("Was not able to determine binary stream for sys.stdout.") 

323 return writer 

324 

325 

326def get_binary_stderr() -> t.BinaryIO: 

327 writer = _find_binary_writer(sys.stderr) 

328 if writer is None: 

329 raise RuntimeError("Was not able to determine binary stream for sys.stderr.") 

330 return writer 

331 

332 

333def get_text_stdin( 

334 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

335) -> t.TextIO: 

336 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 

337 if rv is not None: 

338 return rv 

339 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) 

340 

341 

342def get_text_stdout( 

343 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

344) -> t.TextIO: 

345 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 

346 if rv is not None: 

347 return rv 

348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) 

349 

350 

351def get_text_stderr( 

352 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

353) -> t.TextIO: 

354 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 

355 if rv is not None: 

356 return rv 

357 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) 

358 

359 

360def _wrap_io_open( 

361 file: t.Union[str, "os.PathLike[str]", int], 

362 mode: str, 

363 encoding: t.Optional[str], 

364 errors: t.Optional[str], 

365) -> t.IO[t.Any]: 

366 """Handles not passing ``encoding`` and ``errors`` in binary mode.""" 

367 if "b" in mode: 

368 return open(file, mode) 

369 

370 return open(file, mode, encoding=encoding, errors=errors) 

371 

372 

373def open_stream( 

374 filename: "t.Union[str, os.PathLike[str]]", 

375 mode: str = "r", 

376 encoding: t.Optional[str] = None, 

377 errors: t.Optional[str] = "strict", 

378 atomic: bool = False, 

379) -> t.Tuple[t.IO[t.Any], bool]: 

380 binary = "b" in mode 

381 filename = os.fspath(filename) 

382 

383 # Standard streams first. These are simple because they ignore the 

384 # atomic flag. Use fsdecode to handle Path("-"). 

385 if os.fsdecode(filename) == "-": 

386 if any(m in mode for m in ["w", "a", "x"]): 

387 if binary: 

388 return get_binary_stdout(), False 

389 return get_text_stdout(encoding=encoding, errors=errors), False 

390 if binary: 

391 return get_binary_stdin(), False 

392 return get_text_stdin(encoding=encoding, errors=errors), False 

393 

394 # Non-atomic writes directly go out through the regular open functions. 

395 if not atomic: 

396 return _wrap_io_open(filename, mode, encoding, errors), True 

397 

398 # Some usability stuff for atomic writes 

399 if "a" in mode: 

400 raise ValueError( 

401 "Appending to an existing file is not supported, because that" 

402 " would involve an expensive `copy`-operation to a temporary" 

403 " file. Open the file in normal `w`-mode and copy explicitly" 

404 " if that's what you're after." 

405 ) 

406 if "x" in mode: 

407 raise ValueError("Use the `overwrite`-parameter instead.") 

408 if "w" not in mode: 

409 raise ValueError("Atomic writes only make sense with `w`-mode.") 

410 

411 # Atomic writes are more complicated. They work by opening a file 

412 # as a proxy in the same folder and then using the fdopen 

413 # functionality to wrap it in a Python file. Then we wrap it in an 

414 # atomic file that moves the file over on close. 

415 import errno 

416 import random 

417 

418 try: 

419 perm: t.Optional[int] = os.stat(filename).st_mode 

420 except OSError: 

421 perm = None 

422 

423 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL 

424 

425 if binary: 

426 flags |= getattr(os, "O_BINARY", 0) 

427 

428 while True: 

429 tmp_filename = os.path.join( 

430 os.path.dirname(filename), 

431 f".__atomic-write{random.randrange(1 << 32):08x}", 

432 ) 

433 try: 

434 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) 

435 break 

436 except OSError as e: 

437 if e.errno == errno.EEXIST or ( 

438 os.name == "nt" 

439 and e.errno == errno.EACCES 

440 and os.path.isdir(e.filename) 

441 and os.access(e.filename, os.W_OK) 

442 ): 

443 continue 

444 raise 

445 

446 if perm is not None: 

447 os.chmod(tmp_filename, perm) # in case perm includes bits in umask 

448 

449 f = _wrap_io_open(fd, mode, encoding, errors) 

450 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) 

451 return t.cast(t.IO[t.Any], af), True 

452 

453 

454class _AtomicFile: 

455 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: 

456 self._f = f 

457 self._tmp_filename = tmp_filename 

458 self._real_filename = real_filename 

459 self.closed = False 

460 

461 @property 

462 def name(self) -> str: 

463 return self._real_filename 

464 

465 def close(self, delete: bool = False) -> None: 

466 if self.closed: 

467 return 

468 self._f.close() 

469 os.replace(self._tmp_filename, self._real_filename) 

470 self.closed = True 

471 

472 def __getattr__(self, name: str) -> t.Any: 

473 return getattr(self._f, name) 

474 

475 def __enter__(self) -> "_AtomicFile": 

476 return self 

477 

478 def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None: 

479 self.close(delete=exc_type is not None) 

480 

481 def __repr__(self) -> str: 

482 return repr(self._f) 

483 

484 

485def strip_ansi(value: str) -> str: 

486 return _ansi_re.sub("", value) 

487 

488 

489def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: 

490 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): 

491 stream = stream._stream 

492 

493 return stream.__class__.__module__.startswith("ipykernel.") 

494 

495 

496def should_strip_ansi( 

497 stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None 

498) -> bool: 

499 if color is None: 

500 if stream is None: 

501 stream = sys.stdin 

502 return not isatty(stream) and not _is_jupyter_kernel_output(stream) 

503 return not color 

504 

505 

506# On Windows, wrap the output streams with colorama to support ANSI 

507# color codes. 

508# NOTE: double check is needed so mypy does not analyze this on Linux 

509if sys.platform.startswith("win") and WIN: 

510 from ._winconsole import _get_windows_console_stream 

511 

512 def _get_argv_encoding() -> str: 

513 import locale 

514 

515 return locale.getpreferredencoding() 

516 

517 _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

518 

519 def auto_wrap_for_ansi( # noqa: F811 

520 stream: t.TextIO, color: t.Optional[bool] = None 

521 ) -> t.TextIO: 

522 """Support ANSI color and style codes on Windows by wrapping a 

523 stream with colorama. 

524 """ 

525 try: 

526 cached = _ansi_stream_wrappers.get(stream) 

527 except Exception: 

528 cached = None 

529 

530 if cached is not None: 

531 return cached 

532 

533 import colorama 

534 

535 strip = should_strip_ansi(stream, color) 

536 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 

537 rv = t.cast(t.TextIO, ansi_wrapper.stream) 

538 _write = rv.write 

539 

540 def _safe_write(s): 

541 try: 

542 return _write(s) 

543 except BaseException: 

544 ansi_wrapper.reset_all() 

545 raise 

546 

547 rv.write = _safe_write 

548 

549 try: 

550 _ansi_stream_wrappers[stream] = rv 

551 except Exception: 

552 pass 

553 

554 return rv 

555 

556else: 

557 

558 def _get_argv_encoding() -> str: 

559 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() 

560 

561 def _get_windows_console_stream( 

562 f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] 

563 ) -> t.Optional[t.TextIO]: 

564 return None 

565 

566 

567def term_len(x: str) -> int: 

568 return len(strip_ansi(x)) 

569 

570 

571def isatty(stream: t.IO[t.Any]) -> bool: 

572 try: 

573 return stream.isatty() 

574 except Exception: 

575 return False 

576 

577 

578def _make_cached_stream_func( 

579 src_func: t.Callable[[], t.Optional[t.TextIO]], 

580 wrapper_func: t.Callable[[], t.TextIO], 

581) -> t.Callable[[], t.Optional[t.TextIO]]: 

582 cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

583 

584 def func() -> t.Optional[t.TextIO]: 

585 stream = src_func() 

586 

587 if stream is None: 

588 return None 

589 

590 try: 

591 rv = cache.get(stream) 

592 except Exception: 

593 rv = None 

594 if rv is not None: 

595 return rv 

596 rv = wrapper_func() 

597 try: 

598 cache[stream] = rv 

599 except Exception: 

600 pass 

601 return rv 

602 

603 return func 

604 

605 

606_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 

607_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 

608_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 

609 

610 

611binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = { 

612 "stdin": get_binary_stdin, 

613 "stdout": get_binary_stdout, 

614 "stderr": get_binary_stderr, 

615} 

616 

617text_streams: t.Mapping[ 

618 str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO] 

619] = { 

620 "stdin": get_text_stdin, 

621 "stdout": get_text_stdout, 

622 "stderr": get_text_stderr, 

623}