Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/click/_compat.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import codecs
4import collections.abc as cabc
5import io
6import os
7import re
8import sys
9import typing as t
10from types import TracebackType
11from weakref import WeakKeyDictionary
13CYGWIN = sys.platform.startswith("cygwin")
14WIN = sys.platform.startswith("win")
15_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
18def _make_text_stream(
19 stream: t.BinaryIO,
20 encoding: str | None,
21 errors: str | None,
22 force_readable: bool = False,
23 force_writable: bool = False,
24) -> t.TextIO:
25 if encoding is None:
26 encoding = get_best_encoding(stream)
27 if errors is None:
28 errors = "replace"
29 return _NonClosingTextIOWrapper(
30 stream,
31 encoding,
32 errors,
33 line_buffering=True,
34 force_readable=force_readable,
35 force_writable=force_writable,
36 )
39def is_ascii_encoding(encoding: str) -> bool:
40 """Checks if a given encoding is ascii."""
41 try:
42 return codecs.lookup(encoding).name == "ascii"
43 except LookupError:
44 return False
47def get_best_encoding(stream: t.IO[t.Any]) -> str:
48 """Returns the default stream encoding if not found."""
49 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
50 if is_ascii_encoding(rv):
51 return "utf-8"
52 return rv
55class _NonClosingTextIOWrapper(io.TextIOWrapper):
56 def __init__(
57 self,
58 stream: t.BinaryIO,
59 encoding: str | None,
60 errors: str | None,
61 force_readable: bool = False,
62 force_writable: bool = False,
63 **extra: t.Any,
64 ) -> None:
65 self._stream = stream = t.cast(
66 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
67 )
68 super().__init__(stream, encoding, errors, **extra)
70 def __del__(self) -> None:
71 try:
72 self.detach()
73 except Exception:
74 pass
76 def isatty(self) -> bool:
77 # https://bitbucket.org/pypy/pypy/issue/1803
78 return self._stream.isatty()
81class _FixupStream:
82 """The new io interface needs more from streams than streams
83 traditionally implement. As such, this fix-up code is necessary in
84 some circumstances.
86 The forcing of readable and writable flags are there because some tools
87 put badly patched objects on sys (one such offender are certain version
88 of jupyter notebook).
89 """
91 def __init__(
92 self,
93 stream: t.BinaryIO,
94 force_readable: bool = False,
95 force_writable: bool = False,
96 ):
97 self._stream = stream
98 self._force_readable = force_readable
99 self._force_writable = force_writable
101 def __getattr__(self, name: str) -> t.Any:
102 return getattr(self._stream, name)
104 def read1(self, size: int) -> bytes:
105 f = getattr(self._stream, "read1", None)
107 if f is not None:
108 return t.cast(bytes, f(size))
110 return self._stream.read(size)
112 def readable(self) -> bool:
113 if self._force_readable:
114 return True
115 x = getattr(self._stream, "readable", None)
116 if x is not None:
117 return t.cast(bool, x())
118 try:
119 self._stream.read(0)
120 except Exception:
121 return False
122 return True
124 def writable(self) -> bool:
125 if self._force_writable:
126 return True
127 x = getattr(self._stream, "writable", None)
128 if x is not None:
129 return t.cast(bool, x())
130 try:
131 self._stream.write(b"")
132 except Exception:
133 try:
134 self._stream.write(b"")
135 except Exception:
136 return False
137 return True
139 def seekable(self) -> bool:
140 x = getattr(self._stream, "seekable", None)
141 if x is not None:
142 return t.cast(bool, x())
143 try:
144 self._stream.seek(self._stream.tell())
145 except Exception:
146 return False
147 return True
150def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
151 try:
152 return isinstance(stream.read(0), bytes)
153 except Exception:
154 return default
155 # This happens in some cases where the stream was already
156 # closed. In this case, we assume the default.
159def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
160 try:
161 stream.write(b"")
162 except Exception:
163 try:
164 stream.write("")
165 return False
166 except Exception:
167 pass
168 return default
169 return True
172def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None:
173 # We need to figure out if the given stream is already binary.
174 # This can happen because the official docs recommend detaching
175 # the streams to get binary streams. Some code might do this, so
176 # we need to deal with this case explicitly.
177 if _is_binary_reader(stream, False):
178 return t.cast(t.BinaryIO, stream)
180 buf = getattr(stream, "buffer", None)
182 # Same situation here; this time we assume that the buffer is
183 # actually binary in case it's closed.
184 if buf is not None and _is_binary_reader(buf, True):
185 return t.cast(t.BinaryIO, buf)
187 return None
190def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None:
191 # We need to figure out if the given stream is already binary.
192 # This can happen because the official docs recommend detaching
193 # the streams to get binary streams. Some code might do this, so
194 # we need to deal with this case explicitly.
195 if _is_binary_writer(stream, False):
196 return t.cast(t.BinaryIO, stream)
198 buf = getattr(stream, "buffer", None)
200 # Same situation here; this time we assume that the buffer is
201 # actually binary in case it's closed.
202 if buf is not None and _is_binary_writer(buf, True):
203 return t.cast(t.BinaryIO, buf)
205 return None
208def _stream_is_misconfigured(stream: t.TextIO) -> bool:
209 """A stream is misconfigured if its encoding is ASCII."""
210 # If the stream does not have an encoding set, we assume it's set
211 # to ASCII. This appears to happen in certain unittest
212 # environments. It's not quite clear what the correct behavior is
213 # but this at least will force Click to recover somehow.
214 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
217def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: str | None) -> bool:
218 """A stream attribute is compatible if it is equal to the
219 desired value or the desired value is unset and the attribute
220 has a value.
221 """
222 stream_value = getattr(stream, attr, None)
223 return stream_value == value or (value is None and stream_value is not None)
226def _is_compatible_text_stream(
227 stream: t.TextIO, encoding: str | None, errors: str | None
228) -> bool:
229 """Check if a stream's encoding and errors attributes are
230 compatible with the desired values.
231 """
232 return _is_compat_stream_attr(
233 stream, "encoding", encoding
234 ) and _is_compat_stream_attr(stream, "errors", errors)
237def _force_correct_text_stream(
238 text_stream: t.IO[t.Any],
239 encoding: str | None,
240 errors: str | None,
241 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
242 find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None],
243 force_readable: bool = False,
244 force_writable: bool = False,
245) -> t.TextIO:
246 if is_binary(text_stream, False):
247 binary_reader = t.cast(t.BinaryIO, text_stream)
248 else:
249 text_stream = t.cast(t.TextIO, text_stream)
250 # If the stream looks compatible, and won't default to a
251 # misconfigured ascii encoding, return it as-is.
252 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
253 encoding is None and _stream_is_misconfigured(text_stream)
254 ):
255 return text_stream
257 # Otherwise, get the underlying binary reader.
258 possible_binary_reader = find_binary(text_stream)
260 # If that's not possible, silently use the original reader
261 # and get mojibake instead of exceptions.
262 if possible_binary_reader is None:
263 return text_stream
265 binary_reader = possible_binary_reader
267 # Default errors to replace instead of strict in order to get
268 # something that works.
269 if errors is None:
270 errors = "replace"
272 # Wrap the binary stream in a text stream with the correct
273 # encoding parameters.
274 return _make_text_stream(
275 binary_reader,
276 encoding,
277 errors,
278 force_readable=force_readable,
279 force_writable=force_writable,
280 )
283def _force_correct_text_reader(
284 text_reader: t.IO[t.Any],
285 encoding: str | None,
286 errors: str | None,
287 force_readable: bool = False,
288) -> t.TextIO:
289 return _force_correct_text_stream(
290 text_reader,
291 encoding,
292 errors,
293 _is_binary_reader,
294 _find_binary_reader,
295 force_readable=force_readable,
296 )
299def _force_correct_text_writer(
300 text_writer: t.IO[t.Any],
301 encoding: str | None,
302 errors: str | None,
303 force_writable: bool = False,
304) -> t.TextIO:
305 return _force_correct_text_stream(
306 text_writer,
307 encoding,
308 errors,
309 _is_binary_writer,
310 _find_binary_writer,
311 force_writable=force_writable,
312 )
315def get_binary_stdin() -> t.BinaryIO:
316 reader = _find_binary_reader(sys.stdin)
317 if reader is None:
318 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
319 return reader
322def get_binary_stdout() -> t.BinaryIO:
323 writer = _find_binary_writer(sys.stdout)
324 if writer is None:
325 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
326 return writer
329def get_binary_stderr() -> t.BinaryIO:
330 writer = _find_binary_writer(sys.stderr)
331 if writer is None:
332 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
333 return writer
336def get_text_stdin(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
337 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
338 if rv is not None:
339 return rv
340 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
343def get_text_stdout(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
344 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
345 if rv is not None:
346 return rv
347 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
350def get_text_stderr(encoding: str | None = None, errors: str | None = None) -> t.TextIO:
351 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
352 if rv is not None:
353 return rv
354 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
357def _wrap_io_open(
358 file: str | os.PathLike[str] | int,
359 mode: str,
360 encoding: str | None,
361 errors: str | None,
362) -> t.IO[t.Any]:
363 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
364 if "b" in mode:
365 return open(file, mode)
367 return open(file, mode, encoding=encoding, errors=errors)
370def open_stream(
371 filename: str | os.PathLike[str],
372 mode: str = "r",
373 encoding: str | None = None,
374 errors: str | None = "strict",
375 atomic: bool = False,
376) -> tuple[t.IO[t.Any], bool]:
377 binary = "b" in mode
378 filename = os.fspath(filename)
380 # Standard streams first. These are simple because they ignore the
381 # atomic flag. Use fsdecode to handle Path("-").
382 if os.fsdecode(filename) == "-":
383 if any(m in mode for m in ["w", "a", "x"]):
384 if binary:
385 return get_binary_stdout(), False
386 return get_text_stdout(encoding=encoding, errors=errors), False
387 if binary:
388 return get_binary_stdin(), False
389 return get_text_stdin(encoding=encoding, errors=errors), False
391 # Non-atomic writes directly go out through the regular open functions.
392 if not atomic:
393 return _wrap_io_open(filename, mode, encoding, errors), True
395 # Some usability stuff for atomic writes
396 if "a" in mode:
397 raise ValueError(
398 "Appending to an existing file is not supported, because that"
399 " would involve an expensive `copy`-operation to a temporary"
400 " file. Open the file in normal `w`-mode and copy explicitly"
401 " if that's what you're after."
402 )
403 if "x" in mode:
404 raise ValueError("Use the `overwrite`-parameter instead.")
405 if "w" not in mode:
406 raise ValueError("Atomic writes only make sense with `w`-mode.")
408 # Atomic writes are more complicated. They work by opening a file
409 # as a proxy in the same folder and then using the fdopen
410 # functionality to wrap it in a Python file. Then we wrap it in an
411 # atomic file that moves the file over on close.
412 import errno
413 import random
415 try:
416 perm: int | None = os.stat(filename).st_mode
417 except OSError:
418 perm = None
420 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
422 if binary:
423 flags |= getattr(os, "O_BINARY", 0)
425 while True:
426 tmp_filename = os.path.join(
427 os.path.dirname(filename),
428 f".__atomic-write{random.randrange(1 << 32):08x}",
429 )
430 try:
431 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
432 break
433 except OSError as e:
434 if e.errno == errno.EEXIST or (
435 os.name == "nt"
436 and e.errno == errno.EACCES
437 and os.path.isdir(e.filename)
438 and os.access(e.filename, os.W_OK)
439 ):
440 continue
441 raise
443 if perm is not None:
444 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
446 f = _wrap_io_open(fd, mode, encoding, errors)
447 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
448 return t.cast(t.IO[t.Any], af), True
451class _AtomicFile:
452 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
453 self._f = f
454 self._tmp_filename = tmp_filename
455 self._real_filename = real_filename
456 self.closed = False
458 @property
459 def name(self) -> str:
460 return self._real_filename
462 def close(self, delete: bool = False) -> None:
463 if self.closed:
464 return
465 self._f.close()
466 os.replace(self._tmp_filename, self._real_filename)
467 self.closed = True
469 def __getattr__(self, name: str) -> t.Any:
470 return getattr(self._f, name)
472 def __enter__(self) -> _AtomicFile:
473 return self
475 def __exit__(
476 self,
477 exc_type: type[BaseException] | None,
478 exc_value: BaseException | None,
479 tb: TracebackType | None,
480 ) -> None:
481 self.close(delete=exc_type is not None)
483 def __repr__(self) -> str:
484 return repr(self._f)
487def strip_ansi(value: str) -> str:
488 return _ansi_re.sub("", value)
491def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
492 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
493 stream = stream._stream
495 return stream.__class__.__module__.startswith("ipykernel.")
498def should_strip_ansi(
499 stream: t.IO[t.Any] | None = None, color: bool | None = None
500) -> bool:
501 if color is None:
502 if stream is None:
503 stream = sys.stdin
504 elif hasattr(stream, "color"):
505 # ._termui_impl.MaybeStripAnsi handles stripping ansi itself,
506 # so we don't need to strip it here
507 return False
508 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
509 return not color
512# double check is needed so mypy does not analyze this on Linux
513if sys.platform.startswith("win") and WIN:
514 from ._winconsole import _get_windows_console_stream
516 def _get_argv_encoding() -> str:
517 import locale
519 return locale.getpreferredencoding()
521else:
523 def _get_argv_encoding() -> str:
524 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
526 def _get_windows_console_stream(
527 f: t.TextIO, encoding: str | None, errors: str | None
528 ) -> t.TextIO | None:
529 return None
532def term_len(x: str) -> int:
533 return len(strip_ansi(x))
536def isatty(stream: t.IO[t.Any]) -> bool:
537 try:
538 return stream.isatty()
539 except Exception:
540 return False
543def _make_cached_stream_func(
544 src_func: t.Callable[[], t.TextIO | None],
545 wrapper_func: t.Callable[[], t.TextIO],
546) -> t.Callable[[], t.TextIO | None]:
547 cache: cabc.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
549 def func() -> t.TextIO | None:
550 stream = src_func()
552 if stream is None:
553 return None
555 try:
556 rv = cache.get(stream)
557 except Exception:
558 rv = None
559 if rv is not None:
560 return rv
561 rv = wrapper_func()
562 try:
563 cache[stream] = rv
564 except Exception:
565 pass
566 return rv
568 return func
571_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
572_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
573_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
576binary_streams: cabc.Mapping[str, t.Callable[[], t.BinaryIO]] = {
577 "stdin": get_binary_stdin,
578 "stdout": get_binary_stdout,
579 "stderr": get_binary_stderr,
580}
582text_streams: cabc.Mapping[str, t.Callable[[str | None, str | None], t.TextIO]] = {
583 "stdin": get_text_stdin,
584 "stdout": get_text_stdout,
585 "stderr": get_text_stderr,
586}