Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/misc.py: 33%
315 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1# The following comment should be removed at some point in the future.
2# mypy: strict-optional=False
4import contextlib
5import errno
6import getpass
7import hashlib
8import io
9import logging
10import os
11import posixpath
12import shutil
13import stat
14import sys
15import sysconfig
16import urllib.parse
17from io import StringIO
18from itertools import filterfalse, tee, zip_longest
19from types import TracebackType
20from typing import (
21 Any,
22 BinaryIO,
23 Callable,
24 ContextManager,
25 Dict,
26 Generator,
27 Iterable,
28 Iterator,
29 List,
30 Optional,
31 TextIO,
32 Tuple,
33 Type,
34 TypeVar,
35 Union,
36 cast,
37)
39from pip._vendor.pyproject_hooks import BuildBackendHookCaller
40from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
42from pip import __version__
43from pip._internal.exceptions import CommandError, ExternallyManagedEnvironment
44from pip._internal.locations import get_major_minor_version
45from pip._internal.utils.compat import WINDOWS
46from pip._internal.utils.virtualenv import running_under_virtualenv
48__all__ = [
49 "rmtree",
50 "display_path",
51 "backup_dir",
52 "ask",
53 "splitext",
54 "format_size",
55 "is_installable_dir",
56 "normalize_path",
57 "renames",
58 "get_prog",
59 "captured_stdout",
60 "ensure_dir",
61 "remove_auth_from_url",
62 "check_externally_managed",
63 "ConfiguredBuildBackendHookCaller",
64]
66logger = logging.getLogger(__name__)
68T = TypeVar("T")
69ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
70VersionInfo = Tuple[int, int, int]
71NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
74def get_pip_version() -> str:
75 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
76 pip_pkg_dir = os.path.abspath(pip_pkg_dir)
78 return "pip {} from {} (python {})".format(
79 __version__,
80 pip_pkg_dir,
81 get_major_minor_version(),
82 )
85def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
86 """
87 Convert a tuple of ints representing a Python version to one of length
88 three.
90 :param py_version_info: a tuple of ints representing a Python version,
91 or None to specify no version. The tuple can have any length.
93 :return: a tuple of length three if `py_version_info` is non-None.
94 Otherwise, return `py_version_info` unchanged (i.e. None).
95 """
96 if len(py_version_info) < 3:
97 py_version_info += (3 - len(py_version_info)) * (0,)
98 elif len(py_version_info) > 3:
99 py_version_info = py_version_info[:3]
101 return cast("VersionInfo", py_version_info)
104def ensure_dir(path: str) -> None:
105 """os.path.makedirs without EEXIST."""
106 try:
107 os.makedirs(path)
108 except OSError as e:
109 # Windows can raise spurious ENOTEMPTY errors. See #6426.
110 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
111 raise
114def get_prog() -> str:
115 try:
116 prog = os.path.basename(sys.argv[0])
117 if prog in ("__main__.py", "-c"):
118 return f"{sys.executable} -m pip"
119 else:
120 return prog
121 except (AttributeError, TypeError, IndexError):
122 pass
123 return "pip"
126# Retry every half second for up to 3 seconds
127# Tenacity raises RetryError by default, explicitly raise the original exception
128@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
129def rmtree(dir: str, ignore_errors: bool = False) -> None:
130 shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
133def rmtree_errorhandler(func: Callable[..., Any], path: str, exc_info: ExcInfo) -> None:
134 """On Windows, the files in .svn are read-only, so when rmtree() tries to
135 remove them, an exception is thrown. We catch that here, remove the
136 read-only attribute, and hopefully continue without problems."""
137 try:
138 has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)
139 except OSError:
140 # it's equivalent to os.path.exists
141 return
143 if has_attr_readonly:
144 # convert to read/write
145 os.chmod(path, stat.S_IWRITE)
146 # use the original function to repeat the operation
147 func(path)
148 return
149 else:
150 raise
153def display_path(path: str) -> str:
154 """Gives the display value for a given path, making it relative to cwd
155 if possible."""
156 path = os.path.normcase(os.path.abspath(path))
157 if path.startswith(os.getcwd() + os.path.sep):
158 path = "." + path[len(os.getcwd()) :]
159 return path
162def backup_dir(dir: str, ext: str = ".bak") -> str:
163 """Figure out the name of a directory to back up the given dir to
164 (adding .bak, .bak2, etc)"""
165 n = 1
166 extension = ext
167 while os.path.exists(dir + extension):
168 n += 1
169 extension = ext + str(n)
170 return dir + extension
173def ask_path_exists(message: str, options: Iterable[str]) -> str:
174 for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
175 if action in options:
176 return action
177 return ask(message, options)
180def _check_no_input(message: str) -> None:
181 """Raise an error if no input is allowed."""
182 if os.environ.get("PIP_NO_INPUT"):
183 raise Exception(
184 f"No input was expected ($PIP_NO_INPUT set); question: {message}"
185 )
188def ask(message: str, options: Iterable[str]) -> str:
189 """Ask the message interactively, with the given possible responses"""
190 while 1:
191 _check_no_input(message)
192 response = input(message)
193 response = response.strip().lower()
194 if response not in options:
195 print(
196 "Your response ({!r}) was not one of the expected responses: "
197 "{}".format(response, ", ".join(options))
198 )
199 else:
200 return response
203def ask_input(message: str) -> str:
204 """Ask for input interactively."""
205 _check_no_input(message)
206 return input(message)
209def ask_password(message: str) -> str:
210 """Ask for a password interactively."""
211 _check_no_input(message)
212 return getpass.getpass(message)
215def strtobool(val: str) -> int:
216 """Convert a string representation of truth to true (1) or false (0).
218 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
219 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
220 'val' is anything else.
221 """
222 val = val.lower()
223 if val in ("y", "yes", "t", "true", "on", "1"):
224 return 1
225 elif val in ("n", "no", "f", "false", "off", "0"):
226 return 0
227 else:
228 raise ValueError(f"invalid truth value {val!r}")
231def format_size(bytes: float) -> str:
232 if bytes > 1000 * 1000:
233 return "{:.1f} MB".format(bytes / 1000.0 / 1000)
234 elif bytes > 10 * 1000:
235 return "{} kB".format(int(bytes / 1000))
236 elif bytes > 1000:
237 return "{:.1f} kB".format(bytes / 1000.0)
238 else:
239 return "{} bytes".format(int(bytes))
242def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
243 """Return a list of formatted rows and a list of column sizes.
245 For example::
247 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
248 (['foobar 2000', '3735928559'], [10, 4])
249 """
250 rows = [tuple(map(str, row)) for row in rows]
251 sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
252 table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
253 return table, sizes
256def is_installable_dir(path: str) -> bool:
257 """Is path is a directory containing pyproject.toml or setup.py?
259 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
260 a legacy setuptools layout by identifying setup.py. We don't check for the
261 setup.cfg because using it without setup.py is only available for PEP 517
262 projects, which are already covered by the pyproject.toml check.
263 """
264 if not os.path.isdir(path):
265 return False
266 if os.path.isfile(os.path.join(path, "pyproject.toml")):
267 return True
268 if os.path.isfile(os.path.join(path, "setup.py")):
269 return True
270 return False
273def read_chunks(
274 file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE
275) -> Generator[bytes, None, None]:
276 """Yield pieces of data from a file-like object until EOF."""
277 while True:
278 chunk = file.read(size)
279 if not chunk:
280 break
281 yield chunk
284def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
285 """
286 Convert a path to its canonical, case-normalized, absolute version.
288 """
289 path = os.path.expanduser(path)
290 if resolve_symlinks:
291 path = os.path.realpath(path)
292 else:
293 path = os.path.abspath(path)
294 return os.path.normcase(path)
297def splitext(path: str) -> Tuple[str, str]:
298 """Like os.path.splitext, but take off .tar too"""
299 base, ext = posixpath.splitext(path)
300 if base.lower().endswith(".tar"):
301 ext = base[-4:] + ext
302 base = base[:-4]
303 return base, ext
306def renames(old: str, new: str) -> None:
307 """Like os.renames(), but handles renaming across devices."""
308 # Implementation borrowed from os.renames().
309 head, tail = os.path.split(new)
310 if head and tail and not os.path.exists(head):
311 os.makedirs(head)
313 shutil.move(old, new)
315 head, tail = os.path.split(old)
316 if head and tail:
317 try:
318 os.removedirs(head)
319 except OSError:
320 pass
323def is_local(path: str) -> bool:
324 """
325 Return True if path is within sys.prefix, if we're running in a virtualenv.
327 If we're not in a virtualenv, all paths are considered "local."
329 Caution: this function assumes the head of path has been normalized
330 with normalize_path.
331 """
332 if not running_under_virtualenv():
333 return True
334 return path.startswith(normalize_path(sys.prefix))
337def write_output(msg: Any, *args: Any) -> None:
338 logger.info(msg, *args)
341class StreamWrapper(StringIO):
342 orig_stream: TextIO = None
344 @classmethod
345 def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":
346 cls.orig_stream = orig_stream
347 return cls()
349 # compileall.compile_dir() needs stdout.encoding to print to stdout
350 # https://github.com/python/mypy/issues/4125
351 @property
352 def encoding(self): # type: ignore
353 return self.orig_stream.encoding
356@contextlib.contextmanager
357def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:
358 """Return a context manager used by captured_stdout/stdin/stderr
359 that temporarily replaces the sys stream *stream_name* with a StringIO.
361 Taken from Lib/support/__init__.py in the CPython repo.
362 """
363 orig_stdout = getattr(sys, stream_name)
364 setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
365 try:
366 yield getattr(sys, stream_name)
367 finally:
368 setattr(sys, stream_name, orig_stdout)
371def captured_stdout() -> ContextManager[StreamWrapper]:
372 """Capture the output of sys.stdout:
374 with captured_stdout() as stdout:
375 print('hello')
376 self.assertEqual(stdout.getvalue(), 'hello\n')
378 Taken from Lib/support/__init__.py in the CPython repo.
379 """
380 return captured_output("stdout")
383def captured_stderr() -> ContextManager[StreamWrapper]:
384 """
385 See captured_stdout().
386 """
387 return captured_output("stderr")
390# Simulates an enum
391def enum(*sequential: Any, **named: Any) -> Type[Any]:
392 enums = dict(zip(sequential, range(len(sequential))), **named)
393 reverse = {value: key for key, value in enums.items()}
394 enums["reverse_mapping"] = reverse
395 return type("Enum", (), enums)
398def build_netloc(host: str, port: Optional[int]) -> str:
399 """
400 Build a netloc from a host-port pair
401 """
402 if port is None:
403 return host
404 if ":" in host:
405 # Only wrap host with square brackets when it is IPv6
406 host = f"[{host}]"
407 return f"{host}:{port}"
410def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
411 """
412 Build a full URL from a netloc.
413 """
414 if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
415 # It must be a bare IPv6 address, so wrap it with brackets.
416 netloc = f"[{netloc}]"
417 return f"{scheme}://{netloc}"
420def parse_netloc(netloc: str) -> Tuple[str, Optional[int]]:
421 """
422 Return the host-port pair from a netloc.
423 """
424 url = build_url_from_netloc(netloc)
425 parsed = urllib.parse.urlparse(url)
426 return parsed.hostname, parsed.port
429def split_auth_from_netloc(netloc: str) -> NetlocTuple:
430 """
431 Parse out and remove the auth information from a netloc.
433 Returns: (netloc, (username, password)).
434 """
435 if "@" not in netloc:
436 return netloc, (None, None)
438 # Split from the right because that's how urllib.parse.urlsplit()
439 # behaves if more than one @ is present (which can be checked using
440 # the password attribute of urlsplit()'s return value).
441 auth, netloc = netloc.rsplit("@", 1)
442 pw: Optional[str] = None
443 if ":" in auth:
444 # Split from the left because that's how urllib.parse.urlsplit()
445 # behaves if more than one : is present (which again can be checked
446 # using the password attribute of the return value)
447 user, pw = auth.split(":", 1)
448 else:
449 user, pw = auth, None
451 user = urllib.parse.unquote(user)
452 if pw is not None:
453 pw = urllib.parse.unquote(pw)
455 return netloc, (user, pw)
458def redact_netloc(netloc: str) -> str:
459 """
460 Replace the sensitive data in a netloc with "****", if it exists.
462 For example:
463 - "user:pass@example.com" returns "user:****@example.com"
464 - "accesstoken@example.com" returns "****@example.com"
465 """
466 netloc, (user, password) = split_auth_from_netloc(netloc)
467 if user is None:
468 return netloc
469 if password is None:
470 user = "****"
471 password = ""
472 else:
473 user = urllib.parse.quote(user)
474 password = ":****"
475 return "{user}{password}@{netloc}".format(
476 user=user, password=password, netloc=netloc
477 )
480def _transform_url(
481 url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
482) -> Tuple[str, NetlocTuple]:
483 """Transform and replace netloc in a url.
485 transform_netloc is a function taking the netloc and returning a
486 tuple. The first element of this tuple is the new netloc. The
487 entire tuple is returned.
489 Returns a tuple containing the transformed url as item 0 and the
490 original tuple returned by transform_netloc as item 1.
491 """
492 purl = urllib.parse.urlsplit(url)
493 netloc_tuple = transform_netloc(purl.netloc)
494 # stripped url
495 url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
496 surl = urllib.parse.urlunsplit(url_pieces)
497 return surl, cast("NetlocTuple", netloc_tuple)
500def _get_netloc(netloc: str) -> NetlocTuple:
501 return split_auth_from_netloc(netloc)
504def _redact_netloc(netloc: str) -> Tuple[str]:
505 return (redact_netloc(netloc),)
508def split_auth_netloc_from_url(url: str) -> Tuple[str, str, Tuple[str, str]]:
509 """
510 Parse a url into separate netloc, auth, and url with no auth.
512 Returns: (url_without_auth, netloc, (username, password))
513 """
514 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
515 return url_without_auth, netloc, auth
518def remove_auth_from_url(url: str) -> str:
519 """Return a copy of url with 'username:password@' removed."""
520 # username/pass params are passed to subversion through flags
521 # and are not recognized in the url.
522 return _transform_url(url, _get_netloc)[0]
525def redact_auth_from_url(url: str) -> str:
526 """Replace the password in a given url with ****."""
527 return _transform_url(url, _redact_netloc)[0]
530class HiddenText:
531 def __init__(self, secret: str, redacted: str) -> None:
532 self.secret = secret
533 self.redacted = redacted
535 def __repr__(self) -> str:
536 return "<HiddenText {!r}>".format(str(self))
538 def __str__(self) -> str:
539 return self.redacted
541 # This is useful for testing.
542 def __eq__(self, other: Any) -> bool:
543 if type(self) != type(other):
544 return False
546 # The string being used for redaction doesn't also have to match,
547 # just the raw, original string.
548 return self.secret == other.secret
551def hide_value(value: str) -> HiddenText:
552 return HiddenText(value, redacted="****")
555def hide_url(url: str) -> HiddenText:
556 redacted = redact_auth_from_url(url)
557 return HiddenText(url, redacted=redacted)
560def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
561 """Protection of pip.exe from modification on Windows
563 On Windows, any operation modifying pip should be run as:
564 python -m pip ...
565 """
566 pip_names = [
567 "pip",
568 f"pip{sys.version_info.major}",
569 f"pip{sys.version_info.major}.{sys.version_info.minor}",
570 ]
572 # See https://github.com/pypa/pip/issues/1299 for more discussion
573 should_show_use_python_msg = (
574 modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
575 )
577 if should_show_use_python_msg:
578 new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
579 raise CommandError(
580 "To modify pip, please run the following command:\n{}".format(
581 " ".join(new_command)
582 )
583 )
586def check_externally_managed() -> None:
587 """Check whether the current environment is externally managed.
589 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
590 is considered externally managed, and an ExternallyManagedEnvironment is
591 raised.
592 """
593 if running_under_virtualenv():
594 return
595 marker = os.path.join(sysconfig.get_path("stdlib"), "EXTERNALLY-MANAGED")
596 if not os.path.isfile(marker):
597 return
598 raise ExternallyManagedEnvironment.from_config(marker)
601def is_console_interactive() -> bool:
602 """Is this console interactive?"""
603 return sys.stdin is not None and sys.stdin.isatty()
606def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
607 """Return (hash, length) for path using hashlib.sha256()"""
609 h = hashlib.sha256()
610 length = 0
611 with open(path, "rb") as f:
612 for block in read_chunks(f, size=blocksize):
613 length += len(block)
614 h.update(block)
615 return h, length
618def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
619 """
620 Return paired elements.
622 For example:
623 s -> (s0, s1), (s2, s3), (s4, s5), ...
624 """
625 iterable = iter(iterable)
626 return zip_longest(iterable, iterable)
629def partition(
630 pred: Callable[[T], bool],
631 iterable: Iterable[T],
632) -> Tuple[Iterable[T], Iterable[T]]:
633 """
634 Use a predicate to partition entries into false entries and true entries,
635 like
637 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
638 """
639 t1, t2 = tee(iterable)
640 return filterfalse(pred, t1), filter(pred, t2)
643class ConfiguredBuildBackendHookCaller(BuildBackendHookCaller):
644 def __init__(
645 self,
646 config_holder: Any,
647 source_dir: str,
648 build_backend: str,
649 backend_path: Optional[str] = None,
650 runner: Optional[Callable[..., None]] = None,
651 python_executable: Optional[str] = None,
652 ):
653 super().__init__(
654 source_dir, build_backend, backend_path, runner, python_executable
655 )
656 self.config_holder = config_holder
658 def build_wheel(
659 self,
660 wheel_directory: str,
661 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
662 metadata_directory: Optional[str] = None,
663 ) -> str:
664 cs = self.config_holder.config_settings
665 return super().build_wheel(
666 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
667 )
669 def build_sdist(
670 self,
671 sdist_directory: str,
672 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
673 ) -> str:
674 cs = self.config_holder.config_settings
675 return super().build_sdist(sdist_directory, config_settings=cs)
677 def build_editable(
678 self,
679 wheel_directory: str,
680 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
681 metadata_directory: Optional[str] = None,
682 ) -> str:
683 cs = self.config_holder.config_settings
684 return super().build_editable(
685 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
686 )
688 def get_requires_for_build_wheel(
689 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
690 ) -> List[str]:
691 cs = self.config_holder.config_settings
692 return super().get_requires_for_build_wheel(config_settings=cs)
694 def get_requires_for_build_sdist(
695 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
696 ) -> List[str]:
697 cs = self.config_holder.config_settings
698 return super().get_requires_for_build_sdist(config_settings=cs)
700 def get_requires_for_build_editable(
701 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
702 ) -> List[str]:
703 cs = self.config_holder.config_settings
704 return super().get_requires_for_build_editable(config_settings=cs)
706 def prepare_metadata_for_build_wheel(
707 self,
708 metadata_directory: str,
709 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
710 _allow_fallback: bool = True,
711 ) -> str:
712 cs = self.config_holder.config_settings
713 return super().prepare_metadata_for_build_wheel(
714 metadata_directory=metadata_directory,
715 config_settings=cs,
716 _allow_fallback=_allow_fallback,
717 )
719 def prepare_metadata_for_build_editable(
720 self,
721 metadata_directory: str,
722 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
723 _allow_fallback: bool = True,
724 ) -> str:
725 cs = self.config_holder.config_settings
726 return super().prepare_metadata_for_build_editable(
727 metadata_directory=metadata_directory,
728 config_settings=cs,
729 _allow_fallback=_allow_fallback,
730 )