Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/_compat.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import codecs
4import collections.abc as cabc
5import io
6import os
7import re
8import sys
9import typing as t
10from types import TracebackType
11from weakref import WeakKeyDictionary
13CYGWIN = sys.platform.startswith("cygwin")
14WIN = sys.platform.startswith("win")
15MAC = sys.platform == "darwin"
16auto_wrap_for_ansi: t.Callable[[t.TextIO], t.TextIO] | None = None
17_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
20def _make_text_stream(
21 stream: t.BinaryIO,
22 encoding: str | None,
23 errors: str | None,
24 force_readable: bool = False,
25 force_writable: bool = False,
26) -> t.TextIO:
27 if encoding is None:
28 encoding = get_best_encoding(stream)
29 if errors is None:
30 errors = "replace"
31 return _NonClosingTextIOWrapper(
32 stream,
33 encoding,
34 errors,
35 line_buffering=True,
36 force_readable=force_readable,
37 force_writable=force_writable,
38 )
41def is_ascii_encoding(encoding: str) -> bool:
42 """Checks if a given encoding is ascii."""
43 try:
44 return codecs.lookup(encoding).name == "ascii"
45 except LookupError:
46 return False
49def get_best_encoding(stream: t.IO[t.Any]) -> str:
50 """Returns the default stream encoding if not found."""
51 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
52 if is_ascii_encoding(rv):
53 return "utf-8"
54 return rv
57class _NonClosingTextIOWrapper(io.TextIOWrapper):
58 def __init__(
59 self,
60 stream: t.BinaryIO,
61 encoding: str | None,
62 errors: str | None,
63 force_readable: bool = False,
64 force_writable: bool = False,
65 **extra: t.Any,
66 ) -> None:
67 self._stream = stream = t.cast(
68 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
69 )
70 super().__init__(stream, encoding, errors, **extra)
72 def __del__(self) -> None:
73 try:
74 self.detach()
75 except Exception:
76 pass
78 def isatty(self) -> bool:
79 # https://bitbucket.org/pypy/pypy/issue/1803
80 return self._stream.isatty()
83class _FixupStream:
84 """The new io interface needs more from streams than streams
85 traditionally implement. As such, this fix-up code is necessary in
86 some circumstances.
88 The forcing of readable and writable flags are there because some tools
89 put badly patched objects on sys (one such offender are certain version
90 of jupyter notebook).
91 """
93 def __init__(
94 self,
95 stream: t.BinaryIO,
96 force_readable: bool = False,
97 force_writable: bool = False,
98 ):
99 self._stream = stream
100 self._force_readable = force_readable
101 self._force_writable = force_writable
103 def __getattr__(self, name: str) -> t.Any:
104 return getattr(self._stream, name)
106 def read1(self, size: int) -> bytes:
107 f = getattr(self._stream, "read1", None)
109 if f is not None:
110 return t.cast(bytes, f(size))
112 return self._stream.read(size)
114 def readable(self) -> bool:
115 if self._force_readable:
116 return True
117 x = getattr(self._stream, "readable", None)
118 if x is not None:
119 return t.cast(bool, x())
120 try:
121 self._stream.read(0)
122 except Exception:
123 return False
124 return True
126 def writable(self) -> bool:
127 if self._force_writable:
128 return True
129 x = getattr(self._stream, "writable", None)
130 if x is not None:
131 return t.cast(bool, x())
132 try:
133 self._stream.write(b"")
134 except Exception:
135 try:
136 self._stream.write(b"")
137 except Exception:
138 return False
139 return True
141 def seekable(self) -> bool:
142 x = getattr(self._stream, "seekable", None)
143 if x is not None:
144 return t.cast(bool, x())
145 try:
146 self._stream.seek(self._stream.tell())
147 except Exception:
148 return False
149 return True
152def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
153 try:
154 return isinstance(stream.read(0), bytes)
155 except Exception:
156 return default
157 # This happens in some cases where the stream was already
158 # closed. In this case, we assume the default.
161def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
162 try:
163 stream.write(b"")
164 except Exception:
165 try:
166 stream.write("")
167 return False
168 except Exception:
169 pass
170 return default
171 return True
174def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None:
175 # We need to figure out if the given stream is already binary.
176 # This can happen because the official docs recommend detaching
177 # the streams to get binary streams. Some code might do this, so
178 # we need to deal with this case explicitly.
179 if _is_binary_reader(stream, False):
180 return t.cast(t.BinaryIO, stream)
182 buf = getattr(stream, "buffer", None)
184 # Same situation here; this time we assume that the buffer is
185 # actually binary in case it's closed.
186 if buf is not None and _is_binary_reader(buf, True):
187 return t.cast(t.BinaryIO, buf)
189 return None
192def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None:
193 # We need to figure out if the given stream is already binary.
194 # This can happen because the official docs recommend detaching
195 # the streams to get binary streams. Some code might do this, so
196 # we need to deal with this case explicitly.
197 if _is_binary_writer(stream, False):
198 return t.cast(t.BinaryIO, stream)
200 buf = getattr(stream, "buffer", None)
202 # Same situation here; this time we assume that the buffer is
203 # actually binary in case it's closed.
204 if buf is not None and _is_binary_writer(buf, True):
205 return t.cast(t.BinaryIO, buf)
207 return None
210def _stream_is_misconfigured(stream: t.TextIO) -> bool:
211 """A stream is misconfigured if its encoding is ASCII."""
212 # If the stream does not have an encoding set, we assume it's set
213 # to ASCII. This appears to happen in certain unittest
214 # environments. It's not quite clear what the correct behavior is
215 # but this at least will force Click to recover somehow.
216 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
219def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool:
220 """A stream attribute is compatible if it is equal to the
221 desired value or the desired value is unset and the attribute
222 has a value.
223 """
224 stream_value = getattr(stream, attr, None)
225 return stream_value == value or (value is None and stream_value is not None)
228def _is_compatible_text_stream(
229 stream: t.TextIO, encoding: str | None, errors: str | None
230) -> bool:
231 """Check if a stream's encoding and errors attributes are
232 compatible with the desired values.
233 """
234 return _is_compat_stream_attr(
235 stream, "encoding", encoding
236 ) and _is_compat_stream_attr(stream, "errors", errors)
239def _force_correct_text_stream(
240 text_stream: t.IO[t.Any],
241 encoding: str | None,
242 errors: str | None,
243 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
244 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None],
245 force_readable: bool = False,
246 force_writable: bool = False,
247) -> t.TextIO:
248 if is_binary(text_stream, False):
249 binary_reader = t.cast(t.BinaryIO, text_stream)
250 else:
251 text_stream = t.cast(t.TextIO, text_stream)
252 # If the stream looks compatible, and won't default to a
253 # misconfigured ascii encoding, return it as-is.
254 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
255 encoding is None and _stream_is_misconfigured(text_stream)
256 ):
257 return text_stream
259 # Otherwise, get the underlying binary reader.
260 possible_binary_reader = find_binary(text_stream)
262 # If that's not possible, silently use the original reader
263 # and get mojibake instead of exceptions.
264 if possible_binary_reader is None:
265 return text_stream
267 binary_reader = possible_binary_reader
269 # Default errors to replace instead of strict in order to get
270 # something that works.
271 if errors is None:
272 errors = "replace"
274 # Wrap the binary stream in a text stream with the correct
275 # encoding parameters.
276 return _make_text_stream(
277 binary_reader,
278 encoding,
279 errors,
280 force_readable=force_readable,
281 force_writable=force_writable,
282 )
285def _force_correct_text_reader(
286 text_reader: t.IO[t.Any],
287 encoding: str | None,
288 errors: str | None,
289 force_readable: bool = False,
290) -> t.TextIO:
291 return _force_correct_text_stream(
292 text_reader,
293 encoding,
294 errors,
295 _is_binary_reader,
296 _find_binary_reader,
297 force_readable=force_readable,
298 )
301def _force_correct_text_writer(
302 text_writer: t.IO[t.Any],
303 encoding: str | None,
304 errors: str | None,
305 force_writable: bool = False,
306) -> t.TextIO:
307 return _force_correct_text_stream(
308 text_writer,
309 encoding,
310 errors,
311 _is_binary_writer,
312 _find_binary_writer,
313 force_writable=force_writable,
314 )
317def get_binary_stdin() -> t.BinaryIO:
318 reader = _find_binary_reader(sys.stdin)
319 if reader is None:
320 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
321 return reader
324def get_binary_stdout() -> t.BinaryIO:
325 writer = _find_binary_writer(sys.stdout)
326 if writer is None:
327 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
328 return writer
331def get_binary_stderr() -> t.BinaryIO:
332 writer = _find_binary_writer(sys.stderr)
333 if writer is None:
334 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
335 return writer
338def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
339 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
340 if rv is not None:
341 return rv
342 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
345def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
346 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
347 if rv is not None:
348 return rv
349 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
352def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
353 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
354 if rv is not None:
355 return rv
356 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
359def _wrap_io_open(
360 file: str | os.PathLike[str] | int,
361 mode: str,
362 encoding: str | None,
363 errors: str | None,
364) -> t.IO[t.Any]:
365 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
366 if "b" in mode:
367 return open(file, mode)
369 return open(file, mode, encoding=encoding, errors=errors)
372def open_stream(
373 filename: str | os.PathLike[str],
374 mode: str = "r",
375 encoding: str | None = None,
376 errors: str | None = "strict",
377 atomic: bool = False,
378) -> tuple[t.IO[t.Any], bool]:
379 binary = "b" in mode
380 filename = os.fspath(filename)
382 # Standard streams first. These are simple because they ignore the
383 # atomic flag. Use fsdecode to handle Path("-").
384 if os.fsdecode(filename) == "-":
385 if any(m in mode for m in ["w", "a", "x"]):
386 if binary:
387 return get_binary_stdout(), False
388 return get_text_stdout(encoding=encoding, errors=errors), False
389 if binary:
390 return get_binary_stdin(), False
391 return get_text_stdin(encoding=encoding, errors=errors), False
393 # Non-atomic writes directly go out through the regular open functions.
394 if not atomic:
395 return _wrap_io_open(filename, mode, encoding, errors), True
397 # Some usability stuff for atomic writes
398 if "a" in mode:
399 raise ValueError(
400 "Appending to an existing file is not supported, because that"
401 " would involve an expensive `copy`-operation to a temporary"
402 " file. Open the file in normal `w`-mode and copy explicitly"
403 " if that's what you're after."
404 )
405 if "x" in mode:
406 raise ValueError("Use the `overwrite`-parameter instead.")
407 if "w" not in mode:
408 raise ValueError("Atomic writes only make sense with `w`-mode.")
410 # Atomic writes are more complicated. They work by opening a file
411 # as a proxy in the same folder and then using the fdopen
412 # functionality to wrap it in a Python file. Then we wrap it in an
413 # atomic file that moves the file over on close.
414 import errno
415 import random
417 try:
418 perm: int | None = os.stat(filename).st_mode
419 except OSError:
420 perm = None
422 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
424 if binary:
425 flags |= getattr(os, "O_BINARY", 0)
427 while True:
428 tmp_filename = os.path.join(
429 os.path.dirname(filename),
430 f".__atomic-write{random.randrange(1 << 32):08x}",
431 )
432 try:
433 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
434 break
435 except OSError as e:
436 if e.errno == errno.EEXIST or (
437 os.name == "nt"
438 and e.errno == errno.EACCES
439 and os.path.isdir(e.filename)
440 and os.access(e.filename, os.W_OK)
441 ):
442 continue
443 raise
445 if perm is not None:
446 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
448 f = _wrap_io_open(fd, mode, encoding, errors)
449 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
450 return t.cast(t.IO[t.Any], af), True
453class _AtomicFile:
454 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
455 self._f = f
456 self._tmp_filename = tmp_filename
457 self._real_filename = real_filename
458 self.closed = False
460 @property
461 def name(self) -> str:
462 return self._real_filename
464 def close(self, delete: bool = False) -> None:
465 if self.closed:
466 return
467 self._f.close()
468 os.replace(self._tmp_filename, self._real_filename)
469 self.closed = True
471 def __getattr__(self, name: str) -> t.Any:
472 return getattr(self._f, name)
474 def __enter__(self) -> _AtomicFile:
475 return self
477 def __exit__(
478 self,
479 exc_type: type[BaseException] | None,
480 exc_value: BaseException | None,
481 tb: TracebackType | None,
482 ) -> None:
483 self.close(delete=exc_type is not None)
485 def __repr__(self) -> str:
486 return repr(self._f)
489def strip_ansi(value: str) -> str:
490 return _ansi_re.sub("", value)
493def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
494 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
495 stream = stream._stream
497 return stream.__class__.__module__.startswith("ipykernel.")
500def should_strip_ansi(
501 stream: t.IO[t.Any] | None = None, color: bool | None = None
502) -> bool:
503 if color is None:
504 if stream is None:
505 stream = sys.stdin
506 elif hasattr(stream, "color"):
507 # ._termui_impl.MaybeStripAnsi handles stripping ansi itself,
508 # so we don't need to strip it here
509 return False
510 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
511 return not color
514# On Windows, wrap the output streams with colorama to support ANSI
515# color codes.
516# NOTE: double check is needed so mypy does not analyze this on Linux
517if sys.platform.startswith("win") and WIN:
518 from ._winconsole import _get_windows_console_stream
520 def _get_argv_encoding() -> str:
521 import locale
523 return locale.getpreferredencoding()
525 _ansi_stream_wrappers: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
527 def auto_wrap_for_ansi(stream: t.TextIO, color: bool | None = None) -> t.TextIO:
528 """Support ANSI color and style codes on Windows by wrapping a
529 stream with colorama.
530 """
531 try:
532 cached = _ansi_stream_wrappers.get(stream)
533 except Exception:
534 cached = None
536 if cached is not None:
537 return cached
539 import colorama
541 strip = should_strip_ansi(stream, color)
542 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
543 rv = t.cast(t.TextIO, ansi_wrapper.stream)
544 _write = rv.write
546 def _safe_write(s: str) -> int:
547 try:
548 return _write(s)
549 except BaseException:
550 ansi_wrapper.reset_all()
551 raise
553 rv.write = _safe_write # type: ignore[method-assign]
555 try:
556 _ansi_stream_wrappers[stream] = rv
557 except Exception:
558 pass
560 return rv
562else:
564 def _get_argv_encoding() -> str:
565 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
567 def _get_windows_console_stream(
568 f: t.TextIO, encoding: str | None, errors: str | None
569 ) -> t.TextIO | None:
570 return None
573def term_len(x: str) -> int:
574 return len(strip_ansi(x))
577def isatty(stream: t.IO[t.Any]) -> bool:
578 try:
579 return stream.isatty()
580 except Exception:
581 return False
584def _make_cached_stream_func(
585 src_func: t.Callable[[], t.TextIO | None],
586 wrapper_func: t.Callable[[], t.TextIO],
587) -> t.Callable[[], t.TextIO | None]:
588 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
590 def func() -> t.TextIO | None:
591 stream = src_func()
593 if stream is None:
594 return None
596 try:
597 rv = cache.get(stream)
598 except Exception:
599 rv = None
600 if rv is not None:
601 return rv
602 rv = wrapper_func()
603 try:
604 cache[stream] = rv
605 except Exception:
606 pass
607 return rv
609 return func
612_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
613_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
614_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
617binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = {
618 "stdin": get_binary_stdin,
619 "stdout": get_binary_stdout,
620 "stderr": get_binary_stderr,
621}
623text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = {
624 "stdin": get_text_stdin,
625 "stdout": get_text_stdout,
626 "stderr": get_text_stderr,
627}