Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/_compat.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

293 statements  

1from __future__ import annotations 

2 

3import codecs 

4import collections.abc as cabc 

5import io 

6import os 

7import re 

8import sys 

9import typing as t 

10from types import TracebackType 

11from weakref import WeakKeyDictionary 

12 

13CYGWIN = sys.platform.startswith("cygwin") 

14WIN = sys.platform.startswith("win") 

15_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") 

16 

17 

18def _make_text_stream( 

19 stream: t.BinaryIO, 

20 encoding: str | None, 

21 errors: str | None, 

22 force_readable: bool = False, 

23 force_writable: bool = False, 

24) -> t.TextIO: 

25 if encoding is None: 

26 encoding = get_best_encoding(stream) 

27 if errors is None: 

28 errors = "replace" 

29 return _NonClosingTextIOWrapper( 

30 stream, 

31 encoding, 

32 errors, 

33 line_buffering=True, 

34 force_readable=force_readable, 

35 force_writable=force_writable, 

36 ) 

37 

38 

39def is_ascii_encoding(encoding: str) -> bool: 

40 """Checks if a given encoding is ascii.""" 

41 try: 

42 return codecs.lookup(encoding).name == "ascii" 

43 except LookupError: 

44 return False 

45 

46 

47def get_best_encoding(stream: t.IO[t.Any]) -> str: 

48 """Returns the default stream encoding if not found.""" 

49 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() 

50 if is_ascii_encoding(rv): 

51 return "utf-8" 

52 return rv 

53 

54 

55class _NonClosingTextIOWrapper(io.TextIOWrapper): 

56 def __init__( 

57 self, 

58 stream: t.BinaryIO, 

59 encoding: str | None, 

60 errors: str | None, 

61 force_readable: bool = False, 

62 force_writable: bool = False, 

63 **extra: t.Any, 

64 ) -> None: 

65 self._stream = stream = t.cast( 

66 t.BinaryIO, _FixupStream(stream, force_readable, force_writable) 

67 ) 

68 super().__init__(stream, encoding, errors, **extra) 

69 

70 def __del__(self) -> None: 

71 try: 

72 self.detach() 

73 except Exception: 

74 pass 

75 

76 def isatty(self) -> bool: 

77 # https://bitbucket.org/pypy/pypy/issue/1803 

78 return self._stream.isatty() 

79 

80 

81class _FixupStream: 

82 """The new io interface needs more from streams than streams 

83 traditionally implement. As such, this fix-up code is necessary in 

84 some circumstances. 

85 

86 The forcing of readable and writable flags are there because some tools 

87 put badly patched objects on sys (one such offender are certain version 

88 of jupyter notebook). 

89 """ 

90 

91 def __init__( 

92 self, 

93 stream: t.BinaryIO, 

94 force_readable: bool = False, 

95 force_writable: bool = False, 

96 ): 

97 self._stream = stream 

98 self._force_readable = force_readable 

99 self._force_writable = force_writable 

100 

101 def __getattr__(self, name: str) -> t.Any: 

102 return getattr(self._stream, name) 

103 

104 def read1(self, size: int) -> bytes: 

105 f = getattr(self._stream, "read1", None) 

106 

107 if f is not None: 

108 return t.cast(bytes, f(size)) 

109 

110 return self._stream.read(size) 

111 

112 def readable(self) -> bool: 

113 if self._force_readable: 

114 return True 

115 x = getattr(self._stream, "readable", None) 

116 if x is not None: 

117 return t.cast(bool, x()) 

118 try: 

119 self._stream.read(0) 

120 except Exception: 

121 return False 

122 return True 

123 

124 def writable(self) -> bool: 

125 if self._force_writable: 

126 return True 

127 x = getattr(self._stream, "writable", None) 

128 if x is not None: 

129 return t.cast(bool, x()) 

130 try: 

131 self._stream.write(b"") 

132 except Exception: 

133 try: 

134 self._stream.write(b"") 

135 except Exception: 

136 return False 

137 return True 

138 

139 def seekable(self) -> bool: 

140 x = getattr(self._stream, "seekable", None) 

141 if x is not None: 

142 return t.cast(bool, x()) 

143 try: 

144 self._stream.seek(self._stream.tell()) 

145 except Exception: 

146 return False 

147 return True 

148 

149 

150def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: 

151 try: 

152 return isinstance(stream.read(0), bytes) 

153 except Exception: 

154 return default 

155 # This happens in some cases where the stream was already 

156 # closed. In this case, we assume the default. 

157 

158 

159def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: 

160 try: 

161 stream.write(b"") 

162 except Exception: 

163 try: 

164 stream.write("") 

165 return False 

166 except Exception: 

167 pass 

168 return default 

169 return True 

170 

171 

172def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

173 # We need to figure out if the given stream is already binary. 

174 # This can happen because the official docs recommend detaching 

175 # the streams to get binary streams. Some code might do this, so 

176 # we need to deal with this case explicitly. 

177 if _is_binary_reader(stream, False): 

178 return t.cast(t.BinaryIO, stream) 

179 

180 buf = getattr(stream, "buffer", None) 

181 

182 # Same situation here; this time we assume that the buffer is 

183 # actually binary in case it's closed. 

184 if buf is not None and _is_binary_reader(buf, True): 

185 return t.cast(t.BinaryIO, buf) 

186 

187 return None 

188 

189 

190def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None: 

191 # We need to figure out if the given stream is already binary. 

192 # This can happen because the official docs recommend detaching 

193 # the streams to get binary streams. Some code might do this, so 

194 # we need to deal with this case explicitly. 

195 if _is_binary_writer(stream, False): 

196 return t.cast(t.BinaryIO, stream) 

197 

198 buf = getattr(stream, "buffer", None) 

199 

200 # Same situation here; this time we assume that the buffer is 

201 # actually binary in case it's closed. 

202 if buf is not None and _is_binary_writer(buf, True): 

203 return t.cast(t.BinaryIO, buf) 

204 

205 return None 

206 

207 

208def _stream_is_misconfigured(stream: t.TextIO) -> bool: 

209 """A stream is misconfigured if its encoding is ASCII.""" 

210 # If the stream does not have an encoding set, we assume it's set 

211 # to ASCII. This appears to happen in certain unittest 

212 # environments. It's not quite clear what the correct behavior is 

213 # but this at least will force Click to recover somehow. 

214 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") 

215 

216 

217def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool: 

218 """A stream attribute is compatible if it is equal to the 

219 desired value or the desired value is unset and the attribute 

220 has a value. 

221 """ 

222 stream_value = getattr(stream, attr, None) 

223 return stream_value == value or (value is None and stream_value is not None) 

224 

225 

226def _is_compatible_text_stream( 

227 stream: t.TextIO, encoding: str | None, errors: str | None 

228) -> bool: 

229 """Check if a stream's encoding and errors attributes are 

230 compatible with the desired values. 

231 """ 

232 return _is_compat_stream_attr( 

233 stream, "encoding", encoding 

234 ) and _is_compat_stream_attr(stream, "errors", errors) 

235 

236 

237def _force_correct_text_stream( 

238 text_stream: t.IO[t.Any], 

239 encoding: str | None, 

240 errors: str | None, 

241 is_binary: t.Callable[[t.IO[t.Any], bool], bool], 

242 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None], 

243 force_readable: bool = False, 

244 force_writable: bool = False, 

245) -> t.TextIO: 

246 if is_binary(text_stream, False): 

247 binary_reader = t.cast(t.BinaryIO, text_stream) 

248 else: 

249 text_stream = t.cast(t.TextIO, text_stream) 

250 # If the stream looks compatible, and won't default to a 

251 # misconfigured ascii encoding, return it as-is. 

252 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( 

253 encoding is None and _stream_is_misconfigured(text_stream) 

254 ): 

255 return text_stream 

256 

257 # Otherwise, get the underlying binary reader. 

258 possible_binary_reader = find_binary(text_stream) 

259 

260 # If that's not possible, silently use the original reader 

261 # and get mojibake instead of exceptions. 

262 if possible_binary_reader is None: 

263 return text_stream 

264 

265 binary_reader = possible_binary_reader 

266 

267 # Default errors to replace instead of strict in order to get 

268 # something that works. 

269 if errors is None: 

270 errors = "replace" 

271 

272 # Wrap the binary stream in a text stream with the correct 

273 # encoding parameters. 

274 return _make_text_stream( 

275 binary_reader, 

276 encoding, 

277 errors, 

278 force_readable=force_readable, 

279 force_writable=force_writable, 

280 ) 

281 

282 

283def _force_correct_text_reader( 

284 text_reader: t.IO[t.Any], 

285 encoding: str | None, 

286 errors: str | None, 

287 force_readable: bool = False, 

288) -> t.TextIO: 

289 return _force_correct_text_stream( 

290 text_reader, 

291 encoding, 

292 errors, 

293 _is_binary_reader, 

294 _find_binary_reader, 

295 force_readable=force_readable, 

296 ) 

297 

298 

299def _force_correct_text_writer( 

300 text_writer: t.IO[t.Any], 

301 encoding: str | None, 

302 errors: str | None, 

303 force_writable: bool = False, 

304) -> t.TextIO: 

305 return _force_correct_text_stream( 

306 text_writer, 

307 encoding, 

308 errors, 

309 _is_binary_writer, 

310 _find_binary_writer, 

311 force_writable=force_writable, 

312 ) 

313 

314 

315def get_binary_stdin() -> t.BinaryIO: 

316 reader = _find_binary_reader(sys.stdin) 

317 if reader is None: 

318 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") 

319 return reader 

320 

321 

322def get_binary_stdout() -> t.BinaryIO: 

323 writer = _find_binary_writer(sys.stdout) 

324 if writer is None: 

325 raise RuntimeError("Was not able to determine binary stream for sys.stdout.") 

326 return writer 

327 

328 

329def get_binary_stderr() -> t.BinaryIO: 

330 writer = _find_binary_writer(sys.stderr) 

331 if writer is None: 

332 raise RuntimeError("Was not able to determine binary stream for sys.stderr.") 

333 return writer 

334 

335 

336def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

337 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 

338 if rv is not None: 

339 return rv 

340 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) 

341 

342 

343def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

344 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 

345 if rv is not None: 

346 return rv 

347 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) 

348 

349 

350def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO: 

351 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 

352 if rv is not None: 

353 return rv 

354 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) 

355 

356 

357def _wrap_io_open( 

358 file: str | os.PathLike[str] | int, 

359 mode: str, 

360 encoding: str | None, 

361 errors: str | None, 

362) -> t.IO[t.Any]: 

363 """Handles not passing ``encoding`` and ``errors`` in binary mode.""" 

364 if "b" in mode: 

365 return open(file, mode) 

366 

367 return open(file, mode, encoding=encoding, errors=errors) 

368 

369 

370def open_stream( 

371 filename: str | os.PathLike[str], 

372 mode: str = "r", 

373 encoding: str | None = None, 

374 errors: str | None = "strict", 

375 atomic: bool = False, 

376) -> tuple[t.IO[t.Any], bool]: 

377 binary = "b" in mode 

378 filename = os.fspath(filename) 

379 

380 # Standard streams first. These are simple because they ignore the 

381 # atomic flag. Use fsdecode to handle Path("-"). 

382 if os.fsdecode(filename) == "-": 

383 if any(m in mode for m in ["w", "a", "x"]): 

384 if binary: 

385 return get_binary_stdout(), False 

386 return get_text_stdout(encoding=encoding, errors=errors), False 

387 if binary: 

388 return get_binary_stdin(), False 

389 return get_text_stdin(encoding=encoding, errors=errors), False 

390 

391 # Non-atomic writes directly go out through the regular open functions. 

392 if not atomic: 

393 return _wrap_io_open(filename, mode, encoding, errors), True 

394 

395 # Some usability stuff for atomic writes 

396 if "a" in mode: 

397 raise ValueError( 

398 "Appending to an existing file is not supported, because that" 

399 " would involve an expensive `copy`-operation to a temporary" 

400 " file. Open the file in normal `w`-mode and copy explicitly" 

401 " if that's what you're after." 

402 ) 

403 if "x" in mode: 

404 raise ValueError("Use the `overwrite`-parameter instead.") 

405 if "w" not in mode: 

406 raise ValueError("Atomic writes only make sense with `w`-mode.") 

407 

408 # Atomic writes are more complicated. They work by opening a file 

409 # as a proxy in the same folder and then using the fdopen 

410 # functionality to wrap it in a Python file. Then we wrap it in an 

411 # atomic file that moves the file over on close. 

412 import errno 

413 import random 

414 

415 try: 

416 perm: int | None = os.stat(filename).st_mode 

417 except OSError: 

418 perm = None 

419 

420 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL 

421 

422 if binary: 

423 flags |= getattr(os, "O_BINARY", 0) 

424 

425 while True: 

426 tmp_filename = os.path.join( 

427 os.path.dirname(filename), 

428 f".__atomic-write{random.randrange(1 << 32):08x}", 

429 ) 

430 try: 

431 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) 

432 break 

433 except OSError as e: 

434 if e.errno == errno.EEXIST or ( 

435 os.name == "nt" 

436 and e.errno == errno.EACCES 

437 and os.path.isdir(e.filename) 

438 and os.access(e.filename, os.W_OK) 

439 ): 

440 continue 

441 raise 

442 

443 if perm is not None: 

444 os.chmod(tmp_filename, perm) # in case perm includes bits in umask 

445 

446 f = _wrap_io_open(fd, mode, encoding, errors) 

447 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) 

448 return t.cast(t.IO[t.Any], af), True 

449 

450 

451class _AtomicFile: 

452 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: 

453 self._f = f 

454 self._tmp_filename = tmp_filename 

455 self._real_filename = real_filename 

456 self.closed = False 

457 

458 @property 

459 def name(self) -> str: 

460 return self._real_filename 

461 

462 def close(self, delete: bool = False) -> None: 

463 if self.closed: 

464 return 

465 self._f.close() 

466 os.replace(self._tmp_filename, self._real_filename) 

467 self.closed = True 

468 

469 def __getattr__(self, name: str) -> t.Any: 

470 return getattr(self._f, name) 

471 

472 def __enter__(self) -> _AtomicFile: 

473 return self 

474 

475 def __exit__( 

476 self, 

477 exc_type: type[BaseException] | None, 

478 exc_value: BaseException | None, 

479 tb: TracebackType | None, 

480 ) -> None: 

481 self.close(delete=exc_type is not None) 

482 

483 def __repr__(self) -> str: 

484 return repr(self._f) 

485 

486 

487def strip_ansi(value: str) -> str: 

488 return _ansi_re.sub("", value) 

489 

490 

491def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: 

492 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): 

493 stream = stream._stream 

494 

495 return stream.__class__.__module__.startswith("ipykernel.") 

496 

497 

498def should_strip_ansi( 

499 stream: t.IO[t.Any] | None = None, color: bool | None = None 

500) -> bool: 

501 if color is None: 

502 if stream is None: 

503 stream = sys.stdin 

504 elif hasattr(stream, "color"): 

505 # ._termui_impl.MaybeStripAnsi handles stripping ansi itself, 

506 # so we don't need to strip it here 

507 return False 

508 return not isatty(stream) and not _is_jupyter_kernel_output(stream) 

509 return not color 

510 

511 

512# double check is needed so mypy does not analyze this on Linux 

513if sys.platform.startswith("win") and WIN: 

514 from ._winconsole import _get_windows_console_stream 

515 

516 def _get_argv_encoding() -> str: 

517 import locale 

518 

519 return locale.getpreferredencoding() 

520 

521else: 

522 

523 def _get_argv_encoding() -> str: 

524 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() 

525 

526 def _get_windows_console_stream( 

527 f: t.TextIO, encoding: str | None, errors: str | None 

528 ) -> t.TextIO | None: 

529 return None 

530 

531 

532def term_len(x: str) -> int: 

533 return len(strip_ansi(x)) 

534 

535 

536def isatty(stream: t.IO[t.Any]) -> bool: 

537 try: 

538 return stream.isatty() 

539 except Exception: 

540 return False 

541 

542 

543def _make_cached_stream_func( 

544 src_func: t.Callable[[], t.TextIO | None], 

545 wrapper_func: t.Callable[[], t.TextIO], 

546) -> t.Callable[[], t.TextIO | None]: 

547 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

548 

549 def func() -> t.TextIO | None: 

550 stream = src_func() 

551 

552 if stream is None: 

553 return None 

554 

555 try: 

556 rv = cache.get(stream) 

557 except Exception: 

558 rv = None 

559 if rv is not None: 

560 return rv 

561 rv = wrapper_func() 

562 try: 

563 cache[stream] = rv 

564 except Exception: 

565 pass 

566 return rv 

567 

568 return func 

569 

570 

571_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 

572_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 

573_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 

574 

575 

576binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = { 

577 "stdin": get_binary_stdin, 

578 "stdout": get_binary_stdout, 

579 "stderr": get_binary_stderr, 

580} 

581 

582text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = { 

583 "stdin": get_text_stdin, 

584 "stdout": get_text_stdout, 

585 "stderr": get_text_stderr, 

586}