Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/_compat.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import codecs
4import collections.abc as cabc
5import io
6import os
7import re
8import sys
9import typing as t
10from types import TracebackType
11from weakref import WeakKeyDictionary
13CYGWIN = sys.platform.startswith("cygwin")
14WIN = sys.platform.startswith("win")
15auto_wrap_for_ansi: t.Callable[[t.TextIO], t.TextIO] | None = None
16_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
19def _make_text_stream(
20 stream: t.BinaryIO,
21 encoding: str | None,
22 errors: str | None,
23 force_readable: bool = False,
24 force_writable: bool = False,
25) -> t.TextIO:
26 if encoding is None:
27 encoding = get_best_encoding(stream)
28 if errors is None:
29 errors = "replace"
30 return _NonClosingTextIOWrapper(
31 stream,
32 encoding,
33 errors,
34 line_buffering=True,
35 force_readable=force_readable,
36 force_writable=force_writable,
37 )
40def is_ascii_encoding(encoding: str) -> bool:
41 """Checks if a given encoding is ascii."""
42 try:
43 return codecs.lookup(encoding).name == "ascii"
44 except LookupError:
45 return False
48def get_best_encoding(stream: t.IO[t.Any]) -> str:
49 """Returns the default stream encoding if not found."""
50 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
51 if is_ascii_encoding(rv):
52 return "utf-8"
53 return rv
56class _NonClosingTextIOWrapper(io.TextIOWrapper):
57 def __init__(
58 self,
59 stream: t.BinaryIO,
60 encoding: str | None,
61 errors: str | None,
62 force_readable: bool = False,
63 force_writable: bool = False,
64 **extra: t.Any,
65 ) -> None:
66 self._stream = stream = t.cast(
67 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
68 )
69 super().__init__(stream, encoding, errors, **extra)
71 def __del__(self) -> None:
72 try:
73 self.detach()
74 except Exception:
75 pass
77 def isatty(self) -> bool:
78 # https://bitbucket.org/pypy/pypy/issue/1803
79 return self._stream.isatty()
82class _FixupStream:
83 """The new io interface needs more from streams than streams
84 traditionally implement. As such, this fix-up code is necessary in
85 some circumstances.
87 The forcing of readable and writable flags are there because some tools
88 put badly patched objects on sys (one such offender are certain version
89 of jupyter notebook).
90 """
92 def __init__(
93 self,
94 stream: t.BinaryIO,
95 force_readable: bool = False,
96 force_writable: bool = False,
97 ):
98 self._stream = stream
99 self._force_readable = force_readable
100 self._force_writable = force_writable
102 def __getattr__(self, name: str) -> t.Any:
103 return getattr(self._stream, name)
105 def read1(self, size: int) -> bytes:
106 f = getattr(self._stream, "read1", None)
108 if f is not None:
109 return t.cast(bytes, f(size))
111 return self._stream.read(size)
113 def readable(self) -> bool:
114 if self._force_readable:
115 return True
116 x = getattr(self._stream, "readable", None)
117 if x is not None:
118 return t.cast(bool, x())
119 try:
120 self._stream.read(0)
121 except Exception:
122 return False
123 return True
125 def writable(self) -> bool:
126 if self._force_writable:
127 return True
128 x = getattr(self._stream, "writable", None)
129 if x is not None:
130 return t.cast(bool, x())
131 try:
132 self._stream.write("") # type: ignore
133 except Exception:
134 try:
135 self._stream.write(b"")
136 except Exception:
137 return False
138 return True
140 def seekable(self) -> bool:
141 x = getattr(self._stream, "seekable", None)
142 if x is not None:
143 return t.cast(bool, x())
144 try:
145 self._stream.seek(self._stream.tell())
146 except Exception:
147 return False
148 return True
151def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
152 try:
153 return isinstance(stream.read(0), bytes)
154 except Exception:
155 return default
156 # This happens in some cases where the stream was already
157 # closed. In this case, we assume the default.
160def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
161 try:
162 stream.write(b"")
163 except Exception:
164 try:
165 stream.write("")
166 return False
167 except Exception:
168 pass
169 return default
170 return True
173def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None:
174 # We need to figure out if the given stream is already binary.
175 # This can happen because the official docs recommend detaching
176 # the streams to get binary streams. Some code might do this, so
177 # we need to deal with this case explicitly.
178 if _is_binary_reader(stream, False):
179 return t.cast(t.BinaryIO, stream)
181 buf = getattr(stream, "buffer", None)
183 # Same situation here; this time we assume that the buffer is
184 # actually binary in case it's closed.
185 if buf is not None and _is_binary_reader(buf, True):
186 return t.cast(t.BinaryIO, buf)
188 return None
191def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None:
192 # We need to figure out if the given stream is already binary.
193 # This can happen because the official docs recommend detaching
194 # the streams to get binary streams. Some code might do this, so
195 # we need to deal with this case explicitly.
196 if _is_binary_writer(stream, False):
197 return t.cast(t.BinaryIO, stream)
199 buf = getattr(stream, "buffer", None)
201 # Same situation here; this time we assume that the buffer is
202 # actually binary in case it's closed.
203 if buf is not None and _is_binary_writer(buf, True):
204 return t.cast(t.BinaryIO, buf)
206 return None
209def _stream_is_misconfigured(stream: t.TextIO) -> bool:
210 """A stream is misconfigured if its encoding is ASCII."""
211 # If the stream does not have an encoding set, we assume it's set
212 # to ASCII. This appears to happen in certain unittest
213 # environments. It's not quite clear what the correct behavior is
214 # but this at least will force Click to recover somehow.
215 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
218def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool:
219 """A stream attribute is compatible if it is equal to the
220 desired value or the desired value is unset and the attribute
221 has a value.
222 """
223 stream_value = getattr(stream, attr, None)
224 return stream_value == value or (value is None and stream_value is not None)
227def _is_compatible_text_stream(
228 stream: t.TextIO, encoding: str | None, errors: str | None
229) -> bool:
230 """Check if a stream's encoding and errors attributes are
231 compatible with the desired values.
232 """
233 return _is_compat_stream_attr(
234 stream, "encoding", encoding
235 ) and _is_compat_stream_attr(stream, "errors", errors)
238def _force_correct_text_stream(
239 text_stream: t.IO[t.Any],
240 encoding: str | None,
241 errors: str | None,
242 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
243 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None],
244 force_readable: bool = False,
245 force_writable: bool = False,
246) -> t.TextIO:
247 if is_binary(text_stream, False):
248 binary_reader = t.cast(t.BinaryIO, text_stream)
249 else:
250 text_stream = t.cast(t.TextIO, text_stream)
251 # If the stream looks compatible, and won't default to a
252 # misconfigured ascii encoding, return it as-is.
253 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
254 encoding is None and _stream_is_misconfigured(text_stream)
255 ):
256 return text_stream
258 # Otherwise, get the underlying binary reader.
259 possible_binary_reader = find_binary(text_stream)
261 # If that's not possible, silently use the original reader
262 # and get mojibake instead of exceptions.
263 if possible_binary_reader is None:
264 return text_stream
266 binary_reader = possible_binary_reader
268 # Default errors to replace instead of strict in order to get
269 # something that works.
270 if errors is None:
271 errors = "replace"
273 # Wrap the binary stream in a text stream with the correct
274 # encoding parameters.
275 return _make_text_stream(
276 binary_reader,
277 encoding,
278 errors,
279 force_readable=force_readable,
280 force_writable=force_writable,
281 )
284def _force_correct_text_reader(
285 text_reader: t.IO[t.Any],
286 encoding: str | None,
287 errors: str | None,
288 force_readable: bool = False,
289) -> t.TextIO:
290 return _force_correct_text_stream(
291 text_reader,
292 encoding,
293 errors,
294 _is_binary_reader,
295 _find_binary_reader,
296 force_readable=force_readable,
297 )
300def _force_correct_text_writer(
301 text_writer: t.IO[t.Any],
302 encoding: str | None,
303 errors: str | None,
304 force_writable: bool = False,
305) -> t.TextIO:
306 return _force_correct_text_stream(
307 text_writer,
308 encoding,
309 errors,
310 _is_binary_writer,
311 _find_binary_writer,
312 force_writable=force_writable,
313 )
316def get_binary_stdin() -> t.BinaryIO:
317 reader = _find_binary_reader(sys.stdin)
318 if reader is None:
319 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
320 return reader
323def get_binary_stdout() -> t.BinaryIO:
324 writer = _find_binary_writer(sys.stdout)
325 if writer is None:
326 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
327 return writer
330def get_binary_stderr() -> t.BinaryIO:
331 writer = _find_binary_writer(sys.stderr)
332 if writer is None:
333 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
334 return writer
337def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
338 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
339 if rv is not None:
340 return rv
341 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
344def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
345 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
346 if rv is not None:
347 return rv
348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
351def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
352 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
353 if rv is not None:
354 return rv
355 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
358def _wrap_io_open(
359 file: str | os.PathLike[str] | int,
360 mode: str,
361 encoding: str | None,
362 errors: str | None,
363) -> t.IO[t.Any]:
364 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
365 if "b" in mode:
366 return open(file, mode)
368 return open(file, mode, encoding=encoding, errors=errors)
371def open_stream(
372 filename: str | os.PathLike[str],
373 mode: str = "r",
374 encoding: str | None = None,
375 errors: str | None = "strict",
376 atomic: bool = False,
377) -> tuple[t.IO[t.Any], bool]:
378 binary = "b" in mode
379 filename = os.fspath(filename)
381 # Standard streams first. These are simple because they ignore the
382 # atomic flag. Use fsdecode to handle Path("-").
383 if os.fsdecode(filename) == "-":
384 if any(m in mode for m in ["w", "a", "x"]):
385 if binary:
386 return get_binary_stdout(), False
387 return get_text_stdout(encoding=encoding, errors=errors), False
388 if binary:
389 return get_binary_stdin(), False
390 return get_text_stdin(encoding=encoding, errors=errors), False
392 # Non-atomic writes directly go out through the regular open functions.
393 if not atomic:
394 return _wrap_io_open(filename, mode, encoding, errors), True
396 # Some usability stuff for atomic writes
397 if "a" in mode:
398 raise ValueError(
399 "Appending to an existing file is not supported, because that"
400 " would involve an expensive `copy`-operation to a temporary"
401 " file. Open the file in normal `w`-mode and copy explicitly"
402 " if that's what you're after."
403 )
404 if "x" in mode:
405 raise ValueError("Use the `overwrite`-parameter instead.")
406 if "w" not in mode:
407 raise ValueError("Atomic writes only make sense with `w`-mode.")
409 # Atomic writes are more complicated. They work by opening a file
410 # as a proxy in the same folder and then using the fdopen
411 # functionality to wrap it in a Python file. Then we wrap it in an
412 # atomic file that moves the file over on close.
413 import errno
414 import random
416 try:
417 perm: int | None = os.stat(filename).st_mode
418 except OSError:
419 perm = None
421 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
423 if binary:
424 flags |= getattr(os, "O_BINARY", 0)
426 while True:
427 tmp_filename = os.path.join(
428 os.path.dirname(filename),
429 f".__atomic-write{random.randrange(1 << 32):08x}",
430 )
431 try:
432 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
433 break
434 except OSError as e:
435 if e.errno == errno.EEXIST or (
436 os.name == "nt"
437 and e.errno == errno.EACCES
438 and os.path.isdir(e.filename)
439 and os.access(e.filename, os.W_OK)
440 ):
441 continue
442 raise
444 if perm is not None:
445 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
447 f = _wrap_io_open(fd, mode, encoding, errors)
448 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
449 return t.cast(t.IO[t.Any], af), True
452class _AtomicFile:
453 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
454 self._f = f
455 self._tmp_filename = tmp_filename
456 self._real_filename = real_filename
457 self.closed = False
459 @property
460 def name(self) -> str:
461 return self._real_filename
463 def close(self, delete: bool = False) -> None:
464 if self.closed:
465 return
466 self._f.close()
467 os.replace(self._tmp_filename, self._real_filename)
468 self.closed = True
470 def __getattr__(self, name: str) -> t.Any:
471 return getattr(self._f, name)
473 def __enter__(self) -> _AtomicFile:
474 return self
476 def __exit__(
477 self,
478 exc_type: type[BaseException] | None,
479 exc_value: BaseException | None,
480 tb: TracebackType | None,
481 ) -> None:
482 self.close(delete=exc_type is not None)
484 def __repr__(self) -> str:
485 return repr(self._f)
488def strip_ansi(value: str) -> str:
489 return _ansi_re.sub("", value)
492def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
493 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
494 stream = stream._stream
496 return stream.__class__.__module__.startswith("ipykernel.")
499def should_strip_ansi(
500 stream: t.IO[t.Any] | None = None, color: bool | None = None
501) -> bool:
502 if color is None:
503 if stream is None:
504 stream = sys.stdin
505 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
506 return not color
509# On Windows, wrap the output streams with colorama to support ANSI
510# color codes.
511# NOTE: double check is needed so mypy does not analyze this on Linux
512if sys.platform.startswith("win") and WIN:
513 from ._winconsole import _get_windows_console_stream
515 def _get_argv_encoding() -> str:
516 import locale
518 return locale.getpreferredencoding()
520 _ansi_stream_wrappers: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
522 def auto_wrap_for_ansi(stream: t.TextIO, color: bool | None = None) -> t.TextIO:
523 """Support ANSI color and style codes on Windows by wrapping a
524 stream with colorama.
525 """
526 try:
527 cached = _ansi_stream_wrappers.get(stream)
528 except Exception:
529 cached = None
531 if cached is not None:
532 return cached
534 import colorama
536 strip = should_strip_ansi(stream, color)
537 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
538 rv = t.cast(t.TextIO, ansi_wrapper.stream)
539 _write = rv.write
541 def _safe_write(s):
542 try:
543 return _write(s)
544 except BaseException:
545 ansi_wrapper.reset_all()
546 raise
548 rv.write = _safe_write
550 try:
551 _ansi_stream_wrappers[stream] = rv
552 except Exception:
553 pass
555 return rv
557else:
559 def _get_argv_encoding() -> str:
560 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
562 def _get_windows_console_stream(
563 f: t.TextIO, encoding: str | None, errors: str | None
564 ) -> t.TextIO | None:
565 return None
568def term_len(x: str) -> int:
569 return len(strip_ansi(x))
572def isatty(stream: t.IO[t.Any]) -> bool:
573 try:
574 return stream.isatty()
575 except Exception:
576 return False
579def _make_cached_stream_func(
580 src_func: t.Callable[[], t.TextIO | None],
581 wrapper_func: t.Callable[[], t.TextIO],
582) -> t.Callable[[], t.TextIO | None]:
583 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
585 def func() -> t.TextIO | None:
586 stream = src_func()
588 if stream is None:
589 return None
591 try:
592 rv = cache.get(stream)
593 except Exception:
594 rv = None
595 if rv is not None:
596 return rv
597 rv = wrapper_func()
598 try:
599 cache[stream] = rv
600 except Exception:
601 pass
602 return rv
604 return func
607_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
608_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
609_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
612binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = {
613 "stdin": get_binary_stdin,
614 "stdout": get_binary_stdout,
615 "stderr": get_binary_stderr,
616}
618text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = {
619 "stdin": get_text_stdin,
620 "stdout": get_text_stdout,
621 "stderr": get_text_stderr,
622}