Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/click/_compat.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import codecs
2import io
3import os
4import re
5import sys
6import typing as t
7from weakref import WeakKeyDictionary
9CYGWIN = sys.platform.startswith("cygwin")
10WIN = sys.platform.startswith("win")
11auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
12_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
15def _make_text_stream(
16 stream: t.BinaryIO,
17 encoding: t.Optional[str],
18 errors: t.Optional[str],
19 force_readable: bool = False,
20 force_writable: bool = False,
21) -> t.TextIO:
22 if encoding is None:
23 encoding = get_best_encoding(stream)
24 if errors is None:
25 errors = "replace"
26 return _NonClosingTextIOWrapper(
27 stream,
28 encoding,
29 errors,
30 line_buffering=True,
31 force_readable=force_readable,
32 force_writable=force_writable,
33 )
36def is_ascii_encoding(encoding: str) -> bool:
37 """Checks if a given encoding is ascii."""
38 try:
39 return codecs.lookup(encoding).name == "ascii"
40 except LookupError:
41 return False
44def get_best_encoding(stream: t.IO[t.Any]) -> str:
45 """Returns the default stream encoding if not found."""
46 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
47 if is_ascii_encoding(rv):
48 return "utf-8"
49 return rv
52class _NonClosingTextIOWrapper(io.TextIOWrapper):
53 def __init__(
54 self,
55 stream: t.BinaryIO,
56 encoding: t.Optional[str],
57 errors: t.Optional[str],
58 force_readable: bool = False,
59 force_writable: bool = False,
60 **extra: t.Any,
61 ) -> None:
62 self._stream = stream = t.cast(
63 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
64 )
65 super().__init__(stream, encoding, errors, **extra)
67 def __del__(self) -> None:
68 try:
69 self.detach()
70 except Exception:
71 pass
73 def isatty(self) -> bool:
74 # https://bitbucket.org/pypy/pypy/issue/1803
75 return self._stream.isatty()
78class _FixupStream:
79 """The new io interface needs more from streams than streams
80 traditionally implement. As such, this fix-up code is necessary in
81 some circumstances.
83 The forcing of readable and writable flags are there because some tools
84 put badly patched objects on sys (one such offender are certain version
85 of jupyter notebook).
86 """
88 def __init__(
89 self,
90 stream: t.BinaryIO,
91 force_readable: bool = False,
92 force_writable: bool = False,
93 ):
94 self._stream = stream
95 self._force_readable = force_readable
96 self._force_writable = force_writable
98 def __getattr__(self, name: str) -> t.Any:
99 return getattr(self._stream, name)
101 def read1(self, size: int) -> bytes:
102 f = getattr(self._stream, "read1", None)
104 if f is not None:
105 return t.cast(bytes, f(size))
107 return self._stream.read(size)
109 def readable(self) -> bool:
110 if self._force_readable:
111 return True
112 x = getattr(self._stream, "readable", None)
113 if x is not None:
114 return t.cast(bool, x())
115 try:
116 self._stream.read(0)
117 except Exception:
118 return False
119 return True
121 def writable(self) -> bool:
122 if self._force_writable:
123 return True
124 x = getattr(self._stream, "writable", None)
125 if x is not None:
126 return t.cast(bool, x())
127 try:
128 self._stream.write("") # type: ignore
129 except Exception:
130 try:
131 self._stream.write(b"")
132 except Exception:
133 return False
134 return True
136 def seekable(self) -> bool:
137 x = getattr(self._stream, "seekable", None)
138 if x is not None:
139 return t.cast(bool, x())
140 try:
141 self._stream.seek(self._stream.tell())
142 except Exception:
143 return False
144 return True
147def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
148 try:
149 return isinstance(stream.read(0), bytes)
150 except Exception:
151 return default
152 # This happens in some cases where the stream was already
153 # closed. In this case, we assume the default.
156def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
157 try:
158 stream.write(b"")
159 except Exception:
160 try:
161 stream.write("")
162 return False
163 except Exception:
164 pass
165 return default
166 return True
169def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
170 # We need to figure out if the given stream is already binary.
171 # This can happen because the official docs recommend detaching
172 # the streams to get binary streams. Some code might do this, so
173 # we need to deal with this case explicitly.
174 if _is_binary_reader(stream, False):
175 return t.cast(t.BinaryIO, stream)
177 buf = getattr(stream, "buffer", None)
179 # Same situation here; this time we assume that the buffer is
180 # actually binary in case it's closed.
181 if buf is not None and _is_binary_reader(buf, True):
182 return t.cast(t.BinaryIO, buf)
184 return None
187def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
188 # We need to figure out if the given stream is already binary.
189 # This can happen because the official docs recommend detaching
190 # the streams to get binary streams. Some code might do this, so
191 # we need to deal with this case explicitly.
192 if _is_binary_writer(stream, False):
193 return t.cast(t.BinaryIO, stream)
195 buf = getattr(stream, "buffer", None)
197 # Same situation here; this time we assume that the buffer is
198 # actually binary in case it's closed.
199 if buf is not None and _is_binary_writer(buf, True):
200 return t.cast(t.BinaryIO, buf)
202 return None
205def _stream_is_misconfigured(stream: t.TextIO) -> bool:
206 """A stream is misconfigured if its encoding is ASCII."""
207 # If the stream does not have an encoding set, we assume it's set
208 # to ASCII. This appears to happen in certain unittest
209 # environments. It's not quite clear what the correct behavior is
210 # but this at least will force Click to recover somehow.
211 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
214def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
215 """A stream attribute is compatible if it is equal to the
216 desired value or the desired value is unset and the attribute
217 has a value.
218 """
219 stream_value = getattr(stream, attr, None)
220 return stream_value == value or (value is None and stream_value is not None)
223def _is_compatible_text_stream(
224 stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
225) -> bool:
226 """Check if a stream's encoding and errors attributes are
227 compatible with the desired values.
228 """
229 return _is_compat_stream_attr(
230 stream, "encoding", encoding
231 ) and _is_compat_stream_attr(stream, "errors", errors)
234def _force_correct_text_stream(
235 text_stream: t.IO[t.Any],
236 encoding: t.Optional[str],
237 errors: t.Optional[str],
238 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
239 find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
240 force_readable: bool = False,
241 force_writable: bool = False,
242) -> t.TextIO:
243 if is_binary(text_stream, False):
244 binary_reader = t.cast(t.BinaryIO, text_stream)
245 else:
246 text_stream = t.cast(t.TextIO, text_stream)
247 # If the stream looks compatible, and won't default to a
248 # misconfigured ascii encoding, return it as-is.
249 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
250 encoding is None and _stream_is_misconfigured(text_stream)
251 ):
252 return text_stream
254 # Otherwise, get the underlying binary reader.
255 possible_binary_reader = find_binary(text_stream)
257 # If that's not possible, silently use the original reader
258 # and get mojibake instead of exceptions.
259 if possible_binary_reader is None:
260 return text_stream
262 binary_reader = possible_binary_reader
264 # Default errors to replace instead of strict in order to get
265 # something that works.
266 if errors is None:
267 errors = "replace"
269 # Wrap the binary stream in a text stream with the correct
270 # encoding parameters.
271 return _make_text_stream(
272 binary_reader,
273 encoding,
274 errors,
275 force_readable=force_readable,
276 force_writable=force_writable,
277 )
280def _force_correct_text_reader(
281 text_reader: t.IO[t.Any],
282 encoding: t.Optional[str],
283 errors: t.Optional[str],
284 force_readable: bool = False,
285) -> t.TextIO:
286 return _force_correct_text_stream(
287 text_reader,
288 encoding,
289 errors,
290 _is_binary_reader,
291 _find_binary_reader,
292 force_readable=force_readable,
293 )
296def _force_correct_text_writer(
297 text_writer: t.IO[t.Any],
298 encoding: t.Optional[str],
299 errors: t.Optional[str],
300 force_writable: bool = False,
301) -> t.TextIO:
302 return _force_correct_text_stream(
303 text_writer,
304 encoding,
305 errors,
306 _is_binary_writer,
307 _find_binary_writer,
308 force_writable=force_writable,
309 )
312def get_binary_stdin() -> t.BinaryIO:
313 reader = _find_binary_reader(sys.stdin)
314 if reader is None:
315 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
316 return reader
319def get_binary_stdout() -> t.BinaryIO:
320 writer = _find_binary_writer(sys.stdout)
321 if writer is None:
322 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
323 return writer
326def get_binary_stderr() -> t.BinaryIO:
327 writer = _find_binary_writer(sys.stderr)
328 if writer is None:
329 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
330 return writer
333def get_text_stdin(
334 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
335) -> t.TextIO:
336 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
337 if rv is not None:
338 return rv
339 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
342def get_text_stdout(
343 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
344) -> t.TextIO:
345 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
346 if rv is not None:
347 return rv
348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
351def get_text_stderr(
352 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
353) -> t.TextIO:
354 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
355 if rv is not None:
356 return rv
357 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
360def _wrap_io_open(
361 file: t.Union[str, "os.PathLike[str]", int],
362 mode: str,
363 encoding: t.Optional[str],
364 errors: t.Optional[str],
365) -> t.IO[t.Any]:
366 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
367 if "b" in mode:
368 return open(file, mode)
370 return open(file, mode, encoding=encoding, errors=errors)
373def open_stream(
374 filename: "t.Union[str, os.PathLike[str]]",
375 mode: str = "r",
376 encoding: t.Optional[str] = None,
377 errors: t.Optional[str] = "strict",
378 atomic: bool = False,
379) -> t.Tuple[t.IO[t.Any], bool]:
380 binary = "b" in mode
381 filename = os.fspath(filename)
383 # Standard streams first. These are simple because they ignore the
384 # atomic flag. Use fsdecode to handle Path("-").
385 if os.fsdecode(filename) == "-":
386 if any(m in mode for m in ["w", "a", "x"]):
387 if binary:
388 return get_binary_stdout(), False
389 return get_text_stdout(encoding=encoding, errors=errors), False
390 if binary:
391 return get_binary_stdin(), False
392 return get_text_stdin(encoding=encoding, errors=errors), False
394 # Non-atomic writes directly go out through the regular open functions.
395 if not atomic:
396 return _wrap_io_open(filename, mode, encoding, errors), True
398 # Some usability stuff for atomic writes
399 if "a" in mode:
400 raise ValueError(
401 "Appending to an existing file is not supported, because that"
402 " would involve an expensive `copy`-operation to a temporary"
403 " file. Open the file in normal `w`-mode and copy explicitly"
404 " if that's what you're after."
405 )
406 if "x" in mode:
407 raise ValueError("Use the `overwrite`-parameter instead.")
408 if "w" not in mode:
409 raise ValueError("Atomic writes only make sense with `w`-mode.")
411 # Atomic writes are more complicated. They work by opening a file
412 # as a proxy in the same folder and then using the fdopen
413 # functionality to wrap it in a Python file. Then we wrap it in an
414 # atomic file that moves the file over on close.
415 import errno
416 import random
418 try:
419 perm: t.Optional[int] = os.stat(filename).st_mode
420 except OSError:
421 perm = None
423 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
425 if binary:
426 flags |= getattr(os, "O_BINARY", 0)
428 while True:
429 tmp_filename = os.path.join(
430 os.path.dirname(filename),
431 f".__atomic-write{random.randrange(1 << 32):08x}",
432 )
433 try:
434 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
435 break
436 except OSError as e:
437 if e.errno == errno.EEXIST or (
438 os.name == "nt"
439 and e.errno == errno.EACCES
440 and os.path.isdir(e.filename)
441 and os.access(e.filename, os.W_OK)
442 ):
443 continue
444 raise
446 if perm is not None:
447 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
449 f = _wrap_io_open(fd, mode, encoding, errors)
450 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
451 return t.cast(t.IO[t.Any], af), True
454class _AtomicFile:
455 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
456 self._f = f
457 self._tmp_filename = tmp_filename
458 self._real_filename = real_filename
459 self.closed = False
461 @property
462 def name(self) -> str:
463 return self._real_filename
465 def close(self, delete: bool = False) -> None:
466 if self.closed:
467 return
468 self._f.close()
469 os.replace(self._tmp_filename, self._real_filename)
470 self.closed = True
472 def __getattr__(self, name: str) -> t.Any:
473 return getattr(self._f, name)
475 def __enter__(self) -> "_AtomicFile":
476 return self
478 def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
479 self.close(delete=exc_type is not None)
481 def __repr__(self) -> str:
482 return repr(self._f)
485def strip_ansi(value: str) -> str:
486 return _ansi_re.sub("", value)
489def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
490 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
491 stream = stream._stream
493 return stream.__class__.__module__.startswith("ipykernel.")
496def should_strip_ansi(
497 stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
498) -> bool:
499 if color is None:
500 if stream is None:
501 stream = sys.stdin
502 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
503 return not color
506# On Windows, wrap the output streams with colorama to support ANSI
507# color codes.
508# NOTE: double check is needed so mypy does not analyze this on Linux
509if sys.platform.startswith("win") and WIN:
510 from ._winconsole import _get_windows_console_stream
512 def _get_argv_encoding() -> str:
513 import locale
515 return locale.getpreferredencoding()
517 _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
519 def auto_wrap_for_ansi( # noqa: F811
520 stream: t.TextIO, color: t.Optional[bool] = None
521 ) -> t.TextIO:
522 """Support ANSI color and style codes on Windows by wrapping a
523 stream with colorama.
524 """
525 try:
526 cached = _ansi_stream_wrappers.get(stream)
527 except Exception:
528 cached = None
530 if cached is not None:
531 return cached
533 import colorama
535 strip = should_strip_ansi(stream, color)
536 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
537 rv = t.cast(t.TextIO, ansi_wrapper.stream)
538 _write = rv.write
540 def _safe_write(s):
541 try:
542 return _write(s)
543 except BaseException:
544 ansi_wrapper.reset_all()
545 raise
547 rv.write = _safe_write
549 try:
550 _ansi_stream_wrappers[stream] = rv
551 except Exception:
552 pass
554 return rv
556else:
558 def _get_argv_encoding() -> str:
559 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
561 def _get_windows_console_stream(
562 f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
563 ) -> t.Optional[t.TextIO]:
564 return None
567def term_len(x: str) -> int:
568 return len(strip_ansi(x))
571def isatty(stream: t.IO[t.Any]) -> bool:
572 try:
573 return stream.isatty()
574 except Exception:
575 return False
578def _make_cached_stream_func(
579 src_func: t.Callable[[], t.Optional[t.TextIO]],
580 wrapper_func: t.Callable[[], t.TextIO],
581) -> t.Callable[[], t.Optional[t.TextIO]]:
582 cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
584 def func() -> t.Optional[t.TextIO]:
585 stream = src_func()
587 if stream is None:
588 return None
590 try:
591 rv = cache.get(stream)
592 except Exception:
593 rv = None
594 if rv is not None:
595 return rv
596 rv = wrapper_func()
597 try:
598 cache[stream] = rv
599 except Exception:
600 pass
601 return rv
603 return func
606_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
607_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
608_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
611binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
612 "stdin": get_binary_stdin,
613 "stdout": get_binary_stdout,
614 "stderr": get_binary_stderr,
615}
617text_streams: t.Mapping[
618 str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
619] = {
620 "stdin": get_text_stdin,
621 "stdout": get_text_stdout,
622 "stderr": get_text_stderr,
623}