Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/_compat.py: 23%
316 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:03 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:03 +0000
1from __future__ import annotations
3import codecs
4import collections.abc as cabc
5import io
6import os
7import re
8import sys
9import typing as t
10from types import TracebackType
11from weakref import WeakKeyDictionary
13CYGWIN = sys.platform.startswith("cygwin")
14WIN = sys.platform.startswith("win")
15auto_wrap_for_ansi: t.Callable[[t.TextIO], t.TextIO] | None = None
16_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
19def _make_text_stream(
20 stream: t.BinaryIO,
21 encoding: str | None,
22 errors: str | None,
23 force_readable: bool = False,
24 force_writable: bool = False,
25) -> t.TextIO:
26 if encoding is None:
27 encoding = get_best_encoding(stream)
28 if errors is None:
29 errors = "replace"
30 return _NonClosingTextIOWrapper(
31 stream,
32 encoding,
33 errors,
34 line_buffering=True,
35 force_readable=force_readable,
36 force_writable=force_writable,
37 )
40def is_ascii_encoding(encoding: str) -> bool:
41 """Checks if a given encoding is ascii."""
42 try:
43 return codecs.lookup(encoding).name == "ascii"
44 except LookupError:
45 return False
48def get_best_encoding(stream: t.IO[t.Any]) -> str:
49 """Returns the default stream encoding if not found."""
50 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
51 if is_ascii_encoding(rv):
52 return "utf-8"
53 return rv
56class _NonClosingTextIOWrapper(io.TextIOWrapper):
57 def __init__(
58 self,
59 stream: t.BinaryIO,
60 encoding: str | None,
61 errors: str | None,
62 force_readable: bool = False,
63 force_writable: bool = False,
64 **extra: t.Any,
65 ) -> None:
66 self._stream = stream = t.cast(
67 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
68 )
69 super().__init__(stream, encoding, errors, **extra)
71 def __del__(self) -> None:
72 try:
73 self.detach()
74 except Exception:
75 pass
77 def isatty(self) -> bool:
78 # https://bitbucket.org/pypy/pypy/issue/1803
79 return self._stream.isatty()
82class _FixupStream:
83 """The new io interface needs more from streams than streams
84 traditionally implement. As such, this fix-up code is necessary in
85 some circumstances.
87 The forcing of readable and writable flags are there because some tools
88 put badly patched objects on sys (one such offender are certain version
89 of jupyter notebook).
90 """
92 def __init__(
93 self,
94 stream: t.BinaryIO,
95 force_readable: bool = False,
96 force_writable: bool = False,
97 ):
98 self._stream = stream
99 self._force_readable = force_readable
100 self._force_writable = force_writable
102 def __getattr__(self, name: str) -> t.Any:
103 return getattr(self._stream, name)
105 def read1(self, size: int) -> bytes:
106 f = getattr(self._stream, "read1", None)
108 if f is not None:
109 return t.cast(bytes, f(size))
111 return self._stream.read(size)
113 def readable(self) -> bool:
114 if self._force_readable:
115 return True
116 x = getattr(self._stream, "readable", None)
117 if x is not None:
118 return t.cast(bool, x())
119 try:
120 self._stream.read(0)
121 except Exception:
122 return False
123 return True
125 def writable(self) -> bool:
126 if self._force_writable:
127 return True
128 x = getattr(self._stream, "writable", None)
129 if x is not None:
130 return t.cast(bool, x())
131 try:
132 self._stream.write("") # type: ignore
133 except Exception:
134 try:
135 self._stream.write(b"")
136 except Exception:
137 return False
138 return True
140 def seekable(self) -> bool:
141 x = getattr(self._stream, "seekable", None)
142 if x is not None:
143 return t.cast(bool, x())
144 try:
145 self._stream.seek(self._stream.tell())
146 except Exception:
147 return False
148 return True
151def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
152 try:
153 return isinstance(stream.read(0), bytes)
154 except Exception:
155 return default
156 # This happens in some cases where the stream was already
157 # closed. In this case, we assume the default.
160def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
161 try:
162 stream.write(b"")
163 except Exception:
164 try:
165 stream.write("")
166 return False
167 except Exception:
168 pass
169 return default
170 return True
173def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None:
174 # We need to figure out if the given stream is already binary.
175 # This can happen because the official docs recommend detaching
176 # the streams to get binary streams. Some code might do this, so
177 # we need to deal with this case explicitly.
178 if _is_binary_reader(stream, False):
179 return t.cast(t.BinaryIO, stream)
181 buf = getattr(stream, "buffer", None)
183 # Same situation here; this time we assume that the buffer is
184 # actually binary in case it's closed.
185 if buf is not None and _is_binary_reader(buf, True):
186 return t.cast(t.BinaryIO, buf)
188 return None
191def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None:
192 # We need to figure out if the given stream is already binary.
193 # This can happen because the official docs recommend detaching
194 # the streams to get binary streams. Some code might do this, so
195 # we need to deal with this case explicitly.
196 if _is_binary_writer(stream, False):
197 return t.cast(t.BinaryIO, stream)
199 buf = getattr(stream, "buffer", None)
201 # Same situation here; this time we assume that the buffer is
202 # actually binary in case it's closed.
203 if buf is not None and _is_binary_writer(buf, True):
204 return t.cast(t.BinaryIO, buf)
206 return None
209def _stream_is_misconfigured(stream: t.TextIO) -> bool:
210 """A stream is misconfigured if its encoding is ASCII."""
211 # If the stream does not have an encoding set, we assume it's set
212 # to ASCII. This appears to happen in certain unittest
213 # environments. It's not quite clear what the correct behavior is
214 # but this at least will force Click to recover somehow.
215 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
218def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool:
219 """A stream attribute is compatible if it is equal to the
220 desired value or the desired value is unset and the attribute
221 has a value.
222 """
223 stream_value = getattr(stream, attr, None)
224 return stream_value == value or (value is None and stream_value is not None)
227def _is_compatible_text_stream(
228 stream: t.TextIO, encoding: str | None, errors: str | None
229) -> bool:
230 """Check if a stream's encoding and errors attributes are
231 compatible with the desired values.
232 """
233 return _is_compat_stream_attr(
234 stream, "encoding", encoding
235 ) and _is_compat_stream_attr(stream, "errors", errors)
238def _force_correct_text_stream(
239 text_stream: t.IO[t.Any],
240 encoding: str | None,
241 errors: str | None,
242 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
243 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None],
244 force_readable: bool = False,
245 force_writable: bool = False,
246) -> t.TextIO:
247 if is_binary(text_stream, False):
248 binary_reader = t.cast(t.BinaryIO, text_stream)
249 else:
250 text_stream = t.cast(t.TextIO, text_stream)
251 # If the stream looks compatible, and won't default to a
252 # misconfigured ascii encoding, return it as-is.
253 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
254 encoding is None and _stream_is_misconfigured(text_stream)
255 ):
256 return text_stream
258 # Otherwise, get the underlying binary reader.
259 possible_binary_reader = find_binary(text_stream)
261 # If that's not possible, silently use the original reader
262 # and get mojibake instead of exceptions.
263 if possible_binary_reader is None:
264 return text_stream
266 binary_reader = possible_binary_reader
268 # Default errors to replace instead of strict in order to get
269 # something that works.
270 if errors is None:
271 errors = "replace"
273 # Wrap the binary stream in a text stream with the correct
274 # encoding parameters.
275 return _make_text_stream(
276 binary_reader,
277 encoding,
278 errors,
279 force_readable=force_readable,
280 force_writable=force_writable,
281 )
284def _force_correct_text_reader(
285 text_reader: t.IO[t.Any],
286 encoding: str | None,
287 errors: str | None,
288 force_readable: bool = False,
289) -> t.TextIO:
290 return _force_correct_text_stream(
291 text_reader,
292 encoding,
293 errors,
294 _is_binary_reader,
295 _find_binary_reader,
296 force_readable=force_readable,
297 )
300def _force_correct_text_writer(
301 text_writer: t.IO[t.Any],
302 encoding: str | None,
303 errors: str | None,
304 force_writable: bool = False,
305) -> t.TextIO:
306 return _force_correct_text_stream(
307 text_writer,
308 encoding,
309 errors,
310 _is_binary_writer,
311 _find_binary_writer,
312 force_writable=force_writable,
313 )
316def get_binary_stdin() -> t.BinaryIO:
317 reader = _find_binary_reader(sys.stdin)
318 if reader is None:
319 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
320 return reader
323def get_binary_stdout() -> t.BinaryIO:
324 writer = _find_binary_writer(sys.stdout)
325 if writer is None:
326 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
327 return writer
330def get_binary_stderr() -> t.BinaryIO:
331 writer = _find_binary_writer(sys.stderr)
332 if writer is None:
333 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
334 return writer
337def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
338 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
339 if rv is not None:
340 return rv
341 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
344def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
345 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
346 if rv is not None:
347 return rv
348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
351def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
352 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
353 if rv is not None:
354 return rv
355 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
358def _wrap_io_open(
359 file: str | os.PathLike[str] | int,
360 mode: str,
361 encoding: str | None,
362 errors: str | None,
363) -> t.IO[t.Any]:
364 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
365 if "b" in mode:
366 return open(file, mode)
368 return open(file, mode, encoding=encoding, errors=errors)
371def open_stream(
372 filename: str | os.PathLike[str],
373 mode: str = "r",
374 encoding: str | None = None,
375 errors: str | None = "strict",
376 atomic: bool = False,
377) -> tuple[t.IO[t.Any], bool]:
378 binary = "b" in mode
379 filename = os.fspath(filename)
381 # Standard streams first. These are simple because they ignore the
382 # atomic flag. Use fsdecode to handle Path("-").
383 if os.fsdecode(filename) == "-":
384 if any(m in mode for m in ["w", "a", "x"]):
385 if binary:
386 return get_binary_stdout(), False
387 return get_text_stdout(encoding=encoding, errors=errors), False
388 if binary:
389 return get_binary_stdin(), False
390 return get_text_stdin(encoding=encoding, errors=errors), False
392 # Non-atomic writes directly go out through the regular open functions.
393 if not atomic:
394 return _wrap_io_open(filename, mode, encoding, errors), True
396 # Some usability stuff for atomic writes
397 if "a" in mode:
398 raise ValueError(
399 "Appending to an existing file is not supported, because that"
400 " would involve an expensive `copy`-operation to a temporary"
401 " file. Open the file in normal `w`-mode and copy explicitly"
402 " if that's what you're after."
403 )
404 if "x" in mode:
405 raise ValueError("Use the `overwrite`-parameter instead.")
406 if "w" not in mode:
407 raise ValueError("Atomic writes only make sense with `w`-mode.")
409 # Atomic writes are more complicated. They work by opening a file
410 # as a proxy in the same folder and then using the fdopen
411 # functionality to wrap it in a Python file. Then we wrap it in an
412 # atomic file that moves the file over on close.
413 import errno
414 import random
416 try:
417 perm: int | None = os.stat(filename).st_mode
418 except OSError:
419 perm = None
421 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
423 if binary:
424 flags |= getattr(os, "O_BINARY", 0)
426 while True:
427 tmp_filename = os.path.join(
428 os.path.dirname(filename),
429 f".__atomic-write{random.randrange(1 << 32):08x}",
430 )
431 try:
432 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
433 break
434 except OSError as e:
435 if e.errno == errno.EEXIST or (
436 os.name == "nt"
437 and e.errno == errno.EACCES
438 and os.path.isdir(e.filename)
439 and os.access(e.filename, os.W_OK)
440 ):
441 continue
442 raise
444 if perm is not None:
445 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
447 f = _wrap_io_open(fd, mode, encoding, errors)
448 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
449 return t.cast(t.IO[t.Any], af), True
452class _AtomicFile:
453 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
454 self._f = f
455 self._tmp_filename = tmp_filename
456 self._real_filename = real_filename
457 self.closed = False
459 @property
460 def name(self) -> str:
461 return self._real_filename
463 def close(self, delete: bool = False) -> None:
464 if self.closed:
465 return
466 self._f.close()
467 os.replace(self._tmp_filename, self._real_filename)
468 self.closed = True
470 def __getattr__(self, name: str) -> t.Any:
471 return getattr(self._f, name)
473 def __enter__(self) -> _AtomicFile:
474 return self
476 def __exit__(
477 self,
478 exc_type: type[BaseException] | None,
479 exc_value: BaseException | None,
480 tb: TracebackType | None,
481 ) -> None:
482 self.close(delete=exc_type is not None)
484 def __repr__(self) -> str:
485 return repr(self._f)
488def strip_ansi(value: str) -> str:
489 return _ansi_re.sub("", value)
492def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
493 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
494 stream = stream._stream
496 return stream.__class__.__module__.startswith("ipykernel.")
499def should_strip_ansi(
500 stream: t.IO[t.Any] | None = None, color: bool | None = None
501) -> bool:
502 if color is None:
503 if stream is None:
504 stream = sys.stdin
505 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
506 return not color
509# On Windows, wrap the output streams with colorama to support ANSI
510# color codes.
511# NOTE: double check is needed so mypy does not analyze this on Linux
512if sys.platform.startswith("win") and WIN:
513 from ._winconsole import _get_windows_console_stream
515 def _get_argv_encoding() -> str:
516 import locale
518 return locale.getpreferredencoding()
520 _ansi_stream_wrappers: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
522 def auto_wrap_for_ansi( # noqa: F811
523 stream: t.TextIO, color: bool | None = None
524 ) -> t.TextIO:
525 """Support ANSI color and style codes on Windows by wrapping a
526 stream with colorama.
527 """
528 try:
529 cached = _ansi_stream_wrappers.get(stream)
530 except Exception:
531 cached = None
533 if cached is not None:
534 return cached
536 import colorama
538 strip = should_strip_ansi(stream, color)
539 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
540 rv = t.cast(t.TextIO, ansi_wrapper.stream)
541 _write = rv.write
543 def _safe_write(s):
544 try:
545 return _write(s)
546 except BaseException:
547 ansi_wrapper.reset_all()
548 raise
550 rv.write = _safe_write
552 try:
553 _ansi_stream_wrappers[stream] = rv
554 except Exception:
555 pass
557 return rv
559else:
561 def _get_argv_encoding() -> str:
562 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
564 def _get_windows_console_stream(
565 f: t.TextIO, encoding: str | None, errors: str | None
566 ) -> t.TextIO | None:
567 return None
570def term_len(x: str) -> int:
571 return len(strip_ansi(x))
574def isatty(stream: t.IO[t.Any]) -> bool:
575 try:
576 return stream.isatty()
577 except Exception:
578 return False
581def _make_cached_stream_func(
582 src_func: t.Callable[[], t.TextIO | None],
583 wrapper_func: t.Callable[[], t.TextIO],
584) -> t.Callable[[], t.TextIO | None]:
585 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
587 def func() -> t.TextIO | None:
588 stream = src_func()
590 if stream is None:
591 return None
593 try:
594 rv = cache.get(stream)
595 except Exception:
596 rv = None
597 if rv is not None:
598 return rv
599 rv = wrapper_func()
600 try:
601 cache[stream] = rv
602 except Exception:
603 pass
604 return rv
606 return func
609_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
610_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
611_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
614binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = {
615 "stdin": get_binary_stdin,
616 "stdout": get_binary_stdout,
617 "stderr": get_binary_stderr,
618}
620text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = {
621 "stdin": get_text_stdin,
622 "stdout": get_text_stdout,
623 "stderr": get_text_stderr,
624}