Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/misc.py: 33%
344 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1import contextlib
2import errno
3import getpass
4import hashlib
5import io
6import logging
7import os
8import posixpath
9import shutil
10import stat
11import sys
12import sysconfig
13import urllib.parse
14from functools import partial
15from io import StringIO
16from itertools import filterfalse, tee, zip_longest
17from pathlib import Path
18from types import FunctionType, TracebackType
19from typing import (
20 Any,
21 BinaryIO,
22 Callable,
23 ContextManager,
24 Dict,
25 Generator,
26 Iterable,
27 Iterator,
28 List,
29 Optional,
30 TextIO,
31 Tuple,
32 Type,
33 TypeVar,
34 Union,
35 cast,
36)
38from pip._vendor.packaging.requirements import Requirement
39from pip._vendor.pyproject_hooks import BuildBackendHookCaller
40from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
42from pip import __version__
43from pip._internal.exceptions import CommandError, ExternallyManagedEnvironment
44from pip._internal.locations import get_major_minor_version
45from pip._internal.utils.compat import WINDOWS
46from pip._internal.utils.virtualenv import running_under_virtualenv
48__all__ = [
49 "rmtree",
50 "display_path",
51 "backup_dir",
52 "ask",
53 "splitext",
54 "format_size",
55 "is_installable_dir",
56 "normalize_path",
57 "renames",
58 "get_prog",
59 "captured_stdout",
60 "ensure_dir",
61 "remove_auth_from_url",
62 "check_externally_managed",
63 "ConfiguredBuildBackendHookCaller",
64]
66logger = logging.getLogger(__name__)
68T = TypeVar("T")
69ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
70VersionInfo = Tuple[int, int, int]
71NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
72OnExc = Callable[[FunctionType, Path, BaseException], Any]
73OnErr = Callable[[FunctionType, Path, ExcInfo], Any]
76def get_pip_version() -> str:
77 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
78 pip_pkg_dir = os.path.abspath(pip_pkg_dir)
80 return f"pip {__version__} from {pip_pkg_dir} (python {get_major_minor_version()})"
83def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
84 """
85 Convert a tuple of ints representing a Python version to one of length
86 three.
88 :param py_version_info: a tuple of ints representing a Python version,
89 or None to specify no version. The tuple can have any length.
91 :return: a tuple of length three if `py_version_info` is non-None.
92 Otherwise, return `py_version_info` unchanged (i.e. None).
93 """
94 if len(py_version_info) < 3:
95 py_version_info += (3 - len(py_version_info)) * (0,)
96 elif len(py_version_info) > 3:
97 py_version_info = py_version_info[:3]
99 return cast("VersionInfo", py_version_info)
102def ensure_dir(path: str) -> None:
103 """os.path.makedirs without EEXIST."""
104 try:
105 os.makedirs(path)
106 except OSError as e:
107 # Windows can raise spurious ENOTEMPTY errors. See #6426.
108 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
109 raise
112def get_prog() -> str:
113 try:
114 prog = os.path.basename(sys.argv[0])
115 if prog in ("__main__.py", "-c"):
116 return f"{sys.executable} -m pip"
117 else:
118 return prog
119 except (AttributeError, TypeError, IndexError):
120 pass
121 return "pip"
124# Retry every half second for up to 3 seconds
125# Tenacity raises RetryError by default, explicitly raise the original exception
126@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
127def rmtree(
128 dir: str,
129 ignore_errors: bool = False,
130 onexc: Optional[OnExc] = None,
131) -> None:
132 if ignore_errors:
133 onexc = _onerror_ignore
134 if onexc is None:
135 onexc = _onerror_reraise
136 handler: OnErr = partial(
137 # `[func, path, Union[ExcInfo, BaseException]] -> Any` is equivalent to
138 # `Union[([func, path, ExcInfo] -> Any), ([func, path, BaseException] -> Any)]`.
139 cast(Union[OnExc, OnErr], rmtree_errorhandler),
140 onexc=onexc,
141 )
142 if sys.version_info >= (3, 12):
143 # See https://docs.python.org/3.12/whatsnew/3.12.html#shutil.
144 shutil.rmtree(dir, onexc=handler) # type: ignore
145 else:
146 shutil.rmtree(dir, onerror=handler) # type: ignore
149def _onerror_ignore(*_args: Any) -> None:
150 pass
153def _onerror_reraise(*_args: Any) -> None:
154 raise
157def rmtree_errorhandler(
158 func: FunctionType,
159 path: Path,
160 exc_info: Union[ExcInfo, BaseException],
161 *,
162 onexc: OnExc = _onerror_reraise,
163) -> None:
164 """
165 `rmtree` error handler to 'force' a file remove (i.e. like `rm -f`).
167 * If a file is readonly then it's write flag is set and operation is
168 retried.
170 * `onerror` is the original callback from `rmtree(... onerror=onerror)`
171 that is chained at the end if the "rm -f" still fails.
172 """
173 try:
174 st_mode = os.stat(path).st_mode
175 except OSError:
176 # it's equivalent to os.path.exists
177 return
179 if not st_mode & stat.S_IWRITE:
180 # convert to read/write
181 try:
182 os.chmod(path, st_mode | stat.S_IWRITE)
183 except OSError:
184 pass
185 else:
186 # use the original function to repeat the operation
187 try:
188 func(path)
189 return
190 except OSError:
191 pass
193 if not isinstance(exc_info, BaseException):
194 _, exc_info, _ = exc_info
195 onexc(func, path, exc_info)
198def display_path(path: str) -> str:
199 """Gives the display value for a given path, making it relative to cwd
200 if possible."""
201 path = os.path.normcase(os.path.abspath(path))
202 if path.startswith(os.getcwd() + os.path.sep):
203 path = "." + path[len(os.getcwd()) :]
204 return path
207def backup_dir(dir: str, ext: str = ".bak") -> str:
208 """Figure out the name of a directory to back up the given dir to
209 (adding .bak, .bak2, etc)"""
210 n = 1
211 extension = ext
212 while os.path.exists(dir + extension):
213 n += 1
214 extension = ext + str(n)
215 return dir + extension
218def ask_path_exists(message: str, options: Iterable[str]) -> str:
219 for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
220 if action in options:
221 return action
222 return ask(message, options)
225def _check_no_input(message: str) -> None:
226 """Raise an error if no input is allowed."""
227 if os.environ.get("PIP_NO_INPUT"):
228 raise Exception(
229 f"No input was expected ($PIP_NO_INPUT set); question: {message}"
230 )
233def ask(message: str, options: Iterable[str]) -> str:
234 """Ask the message interactively, with the given possible responses"""
235 while 1:
236 _check_no_input(message)
237 response = input(message)
238 response = response.strip().lower()
239 if response not in options:
240 print(
241 "Your response ({!r}) was not one of the expected responses: "
242 "{}".format(response, ", ".join(options))
243 )
244 else:
245 return response
248def ask_input(message: str) -> str:
249 """Ask for input interactively."""
250 _check_no_input(message)
251 return input(message)
254def ask_password(message: str) -> str:
255 """Ask for a password interactively."""
256 _check_no_input(message)
257 return getpass.getpass(message)
260def strtobool(val: str) -> int:
261 """Convert a string representation of truth to true (1) or false (0).
263 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
264 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
265 'val' is anything else.
266 """
267 val = val.lower()
268 if val in ("y", "yes", "t", "true", "on", "1"):
269 return 1
270 elif val in ("n", "no", "f", "false", "off", "0"):
271 return 0
272 else:
273 raise ValueError(f"invalid truth value {val!r}")
276def format_size(bytes: float) -> str:
277 if bytes > 1000 * 1000:
278 return f"{bytes / 1000.0 / 1000:.1f} MB"
279 elif bytes > 10 * 1000:
280 return f"{int(bytes / 1000)} kB"
281 elif bytes > 1000:
282 return f"{bytes / 1000.0:.1f} kB"
283 else:
284 return f"{int(bytes)} bytes"
287def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
288 """Return a list of formatted rows and a list of column sizes.
290 For example::
292 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
293 (['foobar 2000', '3735928559'], [10, 4])
294 """
295 rows = [tuple(map(str, row)) for row in rows]
296 sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
297 table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
298 return table, sizes
301def is_installable_dir(path: str) -> bool:
302 """Is path is a directory containing pyproject.toml or setup.py?
304 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
305 a legacy setuptools layout by identifying setup.py. We don't check for the
306 setup.cfg because using it without setup.py is only available for PEP 517
307 projects, which are already covered by the pyproject.toml check.
308 """
309 if not os.path.isdir(path):
310 return False
311 if os.path.isfile(os.path.join(path, "pyproject.toml")):
312 return True
313 if os.path.isfile(os.path.join(path, "setup.py")):
314 return True
315 return False
318def read_chunks(
319 file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE
320) -> Generator[bytes, None, None]:
321 """Yield pieces of data from a file-like object until EOF."""
322 while True:
323 chunk = file.read(size)
324 if not chunk:
325 break
326 yield chunk
329def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
330 """
331 Convert a path to its canonical, case-normalized, absolute version.
333 """
334 path = os.path.expanduser(path)
335 if resolve_symlinks:
336 path = os.path.realpath(path)
337 else:
338 path = os.path.abspath(path)
339 return os.path.normcase(path)
342def splitext(path: str) -> Tuple[str, str]:
343 """Like os.path.splitext, but take off .tar too"""
344 base, ext = posixpath.splitext(path)
345 if base.lower().endswith(".tar"):
346 ext = base[-4:] + ext
347 base = base[:-4]
348 return base, ext
351def renames(old: str, new: str) -> None:
352 """Like os.renames(), but handles renaming across devices."""
353 # Implementation borrowed from os.renames().
354 head, tail = os.path.split(new)
355 if head and tail and not os.path.exists(head):
356 os.makedirs(head)
358 shutil.move(old, new)
360 head, tail = os.path.split(old)
361 if head and tail:
362 try:
363 os.removedirs(head)
364 except OSError:
365 pass
368def is_local(path: str) -> bool:
369 """
370 Return True if path is within sys.prefix, if we're running in a virtualenv.
372 If we're not in a virtualenv, all paths are considered "local."
374 Caution: this function assumes the head of path has been normalized
375 with normalize_path.
376 """
377 if not running_under_virtualenv():
378 return True
379 return path.startswith(normalize_path(sys.prefix))
382def write_output(msg: Any, *args: Any) -> None:
383 logger.info(msg, *args)
386class StreamWrapper(StringIO):
387 orig_stream: TextIO
389 @classmethod
390 def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":
391 ret = cls()
392 ret.orig_stream = orig_stream
393 return ret
395 # compileall.compile_dir() needs stdout.encoding to print to stdout
396 # type ignore is because TextIOBase.encoding is writeable
397 @property
398 def encoding(self) -> str: # type: ignore
399 return self.orig_stream.encoding
402@contextlib.contextmanager
403def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:
404 """Return a context manager used by captured_stdout/stdin/stderr
405 that temporarily replaces the sys stream *stream_name* with a StringIO.
407 Taken from Lib/support/__init__.py in the CPython repo.
408 """
409 orig_stdout = getattr(sys, stream_name)
410 setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
411 try:
412 yield getattr(sys, stream_name)
413 finally:
414 setattr(sys, stream_name, orig_stdout)
417def captured_stdout() -> ContextManager[StreamWrapper]:
418 """Capture the output of sys.stdout:
420 with captured_stdout() as stdout:
421 print('hello')
422 self.assertEqual(stdout.getvalue(), 'hello\n')
424 Taken from Lib/support/__init__.py in the CPython repo.
425 """
426 return captured_output("stdout")
429def captured_stderr() -> ContextManager[StreamWrapper]:
430 """
431 See captured_stdout().
432 """
433 return captured_output("stderr")
436# Simulates an enum
437def enum(*sequential: Any, **named: Any) -> Type[Any]:
438 enums = dict(zip(sequential, range(len(sequential))), **named)
439 reverse = {value: key for key, value in enums.items()}
440 enums["reverse_mapping"] = reverse
441 return type("Enum", (), enums)
444def build_netloc(host: str, port: Optional[int]) -> str:
445 """
446 Build a netloc from a host-port pair
447 """
448 if port is None:
449 return host
450 if ":" in host:
451 # Only wrap host with square brackets when it is IPv6
452 host = f"[{host}]"
453 return f"{host}:{port}"
456def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
457 """
458 Build a full URL from a netloc.
459 """
460 if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
461 # It must be a bare IPv6 address, so wrap it with brackets.
462 netloc = f"[{netloc}]"
463 return f"{scheme}://{netloc}"
466def parse_netloc(netloc: str) -> Tuple[Optional[str], Optional[int]]:
467 """
468 Return the host-port pair from a netloc.
469 """
470 url = build_url_from_netloc(netloc)
471 parsed = urllib.parse.urlparse(url)
472 return parsed.hostname, parsed.port
475def split_auth_from_netloc(netloc: str) -> NetlocTuple:
476 """
477 Parse out and remove the auth information from a netloc.
479 Returns: (netloc, (username, password)).
480 """
481 if "@" not in netloc:
482 return netloc, (None, None)
484 # Split from the right because that's how urllib.parse.urlsplit()
485 # behaves if more than one @ is present (which can be checked using
486 # the password attribute of urlsplit()'s return value).
487 auth, netloc = netloc.rsplit("@", 1)
488 pw: Optional[str] = None
489 if ":" in auth:
490 # Split from the left because that's how urllib.parse.urlsplit()
491 # behaves if more than one : is present (which again can be checked
492 # using the password attribute of the return value)
493 user, pw = auth.split(":", 1)
494 else:
495 user, pw = auth, None
497 user = urllib.parse.unquote(user)
498 if pw is not None:
499 pw = urllib.parse.unquote(pw)
501 return netloc, (user, pw)
504def redact_netloc(netloc: str) -> str:
505 """
506 Replace the sensitive data in a netloc with "****", if it exists.
508 For example:
509 - "user:pass@example.com" returns "user:****@example.com"
510 - "accesstoken@example.com" returns "****@example.com"
511 """
512 netloc, (user, password) = split_auth_from_netloc(netloc)
513 if user is None:
514 return netloc
515 if password is None:
516 user = "****"
517 password = ""
518 else:
519 user = urllib.parse.quote(user)
520 password = ":****"
521 return f"{user}{password}@{netloc}"
524def _transform_url(
525 url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
526) -> Tuple[str, NetlocTuple]:
527 """Transform and replace netloc in a url.
529 transform_netloc is a function taking the netloc and returning a
530 tuple. The first element of this tuple is the new netloc. The
531 entire tuple is returned.
533 Returns a tuple containing the transformed url as item 0 and the
534 original tuple returned by transform_netloc as item 1.
535 """
536 purl = urllib.parse.urlsplit(url)
537 netloc_tuple = transform_netloc(purl.netloc)
538 # stripped url
539 url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
540 surl = urllib.parse.urlunsplit(url_pieces)
541 return surl, cast("NetlocTuple", netloc_tuple)
544def _get_netloc(netloc: str) -> NetlocTuple:
545 return split_auth_from_netloc(netloc)
548def _redact_netloc(netloc: str) -> Tuple[str]:
549 return (redact_netloc(netloc),)
552def split_auth_netloc_from_url(
553 url: str,
554) -> Tuple[str, str, Tuple[Optional[str], Optional[str]]]:
555 """
556 Parse a url into separate netloc, auth, and url with no auth.
558 Returns: (url_without_auth, netloc, (username, password))
559 """
560 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
561 return url_without_auth, netloc, auth
564def remove_auth_from_url(url: str) -> str:
565 """Return a copy of url with 'username:password@' removed."""
566 # username/pass params are passed to subversion through flags
567 # and are not recognized in the url.
568 return _transform_url(url, _get_netloc)[0]
571def redact_auth_from_url(url: str) -> str:
572 """Replace the password in a given url with ****."""
573 return _transform_url(url, _redact_netloc)[0]
576def redact_auth_from_requirement(req: Requirement) -> str:
577 """Replace the password in a given requirement url with ****."""
578 if not req.url:
579 return str(req)
580 return str(req).replace(req.url, redact_auth_from_url(req.url))
583class HiddenText:
584 def __init__(self, secret: str, redacted: str) -> None:
585 self.secret = secret
586 self.redacted = redacted
588 def __repr__(self) -> str:
589 return f"<HiddenText {str(self)!r}>"
591 def __str__(self) -> str:
592 return self.redacted
594 # This is useful for testing.
595 def __eq__(self, other: Any) -> bool:
596 if type(self) != type(other):
597 return False
599 # The string being used for redaction doesn't also have to match,
600 # just the raw, original string.
601 return self.secret == other.secret
604def hide_value(value: str) -> HiddenText:
605 return HiddenText(value, redacted="****")
608def hide_url(url: str) -> HiddenText:
609 redacted = redact_auth_from_url(url)
610 return HiddenText(url, redacted=redacted)
613def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
614 """Protection of pip.exe from modification on Windows
616 On Windows, any operation modifying pip should be run as:
617 python -m pip ...
618 """
619 pip_names = [
620 "pip",
621 f"pip{sys.version_info.major}",
622 f"pip{sys.version_info.major}.{sys.version_info.minor}",
623 ]
625 # See https://github.com/pypa/pip/issues/1299 for more discussion
626 should_show_use_python_msg = (
627 modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
628 )
630 if should_show_use_python_msg:
631 new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
632 raise CommandError(
633 "To modify pip, please run the following command:\n{}".format(
634 " ".join(new_command)
635 )
636 )
639def check_externally_managed() -> None:
640 """Check whether the current environment is externally managed.
642 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
643 is considered externally managed, and an ExternallyManagedEnvironment is
644 raised.
645 """
646 if running_under_virtualenv():
647 return
648 marker = os.path.join(sysconfig.get_path("stdlib"), "EXTERNALLY-MANAGED")
649 if not os.path.isfile(marker):
650 return
651 raise ExternallyManagedEnvironment.from_config(marker)
654def is_console_interactive() -> bool:
655 """Is this console interactive?"""
656 return sys.stdin is not None and sys.stdin.isatty()
659def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
660 """Return (hash, length) for path using hashlib.sha256()"""
662 h = hashlib.sha256()
663 length = 0
664 with open(path, "rb") as f:
665 for block in read_chunks(f, size=blocksize):
666 length += len(block)
667 h.update(block)
668 return h, length
671def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
672 """
673 Return paired elements.
675 For example:
676 s -> (s0, s1), (s2, s3), (s4, s5), ...
677 """
678 iterable = iter(iterable)
679 return zip_longest(iterable, iterable)
682def partition(
683 pred: Callable[[T], bool],
684 iterable: Iterable[T],
685) -> Tuple[Iterable[T], Iterable[T]]:
686 """
687 Use a predicate to partition entries into false entries and true entries,
688 like
690 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
691 """
692 t1, t2 = tee(iterable)
693 return filterfalse(pred, t1), filter(pred, t2)
696class ConfiguredBuildBackendHookCaller(BuildBackendHookCaller):
697 def __init__(
698 self,
699 config_holder: Any,
700 source_dir: str,
701 build_backend: str,
702 backend_path: Optional[str] = None,
703 runner: Optional[Callable[..., None]] = None,
704 python_executable: Optional[str] = None,
705 ):
706 super().__init__(
707 source_dir, build_backend, backend_path, runner, python_executable
708 )
709 self.config_holder = config_holder
711 def build_wheel(
712 self,
713 wheel_directory: str,
714 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
715 metadata_directory: Optional[str] = None,
716 ) -> str:
717 cs = self.config_holder.config_settings
718 return super().build_wheel(
719 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
720 )
722 def build_sdist(
723 self,
724 sdist_directory: str,
725 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
726 ) -> str:
727 cs = self.config_holder.config_settings
728 return super().build_sdist(sdist_directory, config_settings=cs)
730 def build_editable(
731 self,
732 wheel_directory: str,
733 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
734 metadata_directory: Optional[str] = None,
735 ) -> str:
736 cs = self.config_holder.config_settings
737 return super().build_editable(
738 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
739 )
741 def get_requires_for_build_wheel(
742 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
743 ) -> List[str]:
744 cs = self.config_holder.config_settings
745 return super().get_requires_for_build_wheel(config_settings=cs)
747 def get_requires_for_build_sdist(
748 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
749 ) -> List[str]:
750 cs = self.config_holder.config_settings
751 return super().get_requires_for_build_sdist(config_settings=cs)
753 def get_requires_for_build_editable(
754 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
755 ) -> List[str]:
756 cs = self.config_holder.config_settings
757 return super().get_requires_for_build_editable(config_settings=cs)
759 def prepare_metadata_for_build_wheel(
760 self,
761 metadata_directory: str,
762 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
763 _allow_fallback: bool = True,
764 ) -> str:
765 cs = self.config_holder.config_settings
766 return super().prepare_metadata_for_build_wheel(
767 metadata_directory=metadata_directory,
768 config_settings=cs,
769 _allow_fallback=_allow_fallback,
770 )
772 def prepare_metadata_for_build_editable(
773 self,
774 metadata_directory: str,
775 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
776 _allow_fallback: bool = True,
777 ) -> str:
778 cs = self.config_holder.config_settings
779 return super().prepare_metadata_for_build_editable(
780 metadata_directory=metadata_directory,
781 config_settings=cs,
782 _allow_fallback=_allow_fallback,
783 )