1from __future__ import annotations
2
3import errno
4import getpass
5import hashlib
6import logging
7import os
8import posixpath
9import shutil
10import stat
11import sys
12import sysconfig
13import urllib.parse
14from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
15from dataclasses import dataclass
16from functools import partial
17from io import StringIO
18from itertools import filterfalse, tee, zip_longest
19from pathlib import Path
20from types import FunctionType, TracebackType
21from typing import (
22 Any,
23 BinaryIO,
24 Callable,
25 Optional,
26 TextIO,
27 TypeVar,
28 cast,
29)
30
31from pip._vendor.packaging.requirements import Requirement
32from pip._vendor.pyproject_hooks import BuildBackendHookCaller
33
34from pip import __version__
35from pip._internal.exceptions import CommandError, ExternallyManagedEnvironment
36from pip._internal.locations import get_major_minor_version
37from pip._internal.utils.compat import WINDOWS
38from pip._internal.utils.retry import retry
39from pip._internal.utils.virtualenv import running_under_virtualenv
40
41__all__ = [
42 "rmtree",
43 "display_path",
44 "backup_dir",
45 "ask",
46 "splitext",
47 "format_size",
48 "is_installable_dir",
49 "normalize_path",
50 "renames",
51 "get_prog",
52 "ensure_dir",
53 "remove_auth_from_url",
54 "check_externally_managed",
55 "ConfiguredBuildBackendHookCaller",
56]
57
58logger = logging.getLogger(__name__)
59
60T = TypeVar("T")
61ExcInfo = tuple[type[BaseException], BaseException, TracebackType]
62VersionInfo = tuple[int, int, int]
63NetlocTuple = tuple[str, tuple[Optional[str], Optional[str]]]
64OnExc = Callable[[FunctionType, Path, BaseException], Any]
65OnErr = Callable[[FunctionType, Path, ExcInfo], Any]
66
67FILE_CHUNK_SIZE = 1024 * 1024
68
69
70def get_pip_version() -> str:
71 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
72 pip_pkg_dir = os.path.abspath(pip_pkg_dir)
73
74 return f"pip {__version__} from {pip_pkg_dir} (python {get_major_minor_version()})"
75
76
77def normalize_version_info(py_version_info: tuple[int, ...]) -> tuple[int, int, int]:
78 """
79 Convert a tuple of ints representing a Python version to one of length
80 three.
81
82 :param py_version_info: a tuple of ints representing a Python version,
83 or None to specify no version. The tuple can have any length.
84
85 :return: a tuple of length three if `py_version_info` is non-None.
86 Otherwise, return `py_version_info` unchanged (i.e. None).
87 """
88 if len(py_version_info) < 3:
89 py_version_info += (3 - len(py_version_info)) * (0,)
90 elif len(py_version_info) > 3:
91 py_version_info = py_version_info[:3]
92
93 return cast("VersionInfo", py_version_info)
94
95
96def ensure_dir(path: str) -> None:
97 """os.path.makedirs without EEXIST."""
98 try:
99 os.makedirs(path)
100 except OSError as e:
101 # Windows can raise spurious ENOTEMPTY errors. See #6426.
102 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
103 raise
104
105
106def get_prog() -> str:
107 try:
108 prog = os.path.basename(sys.argv[0])
109 if prog in ("__main__.py", "-c"):
110 return f"{sys.executable} -m pip"
111 else:
112 return prog
113 except (AttributeError, TypeError, IndexError):
114 pass
115 return "pip"
116
117
118# Retry every half second for up to 3 seconds
119@retry(stop_after_delay=3, wait=0.5)
120def rmtree(dir: str, ignore_errors: bool = False, onexc: OnExc | None = None) -> None:
121 if ignore_errors:
122 onexc = _onerror_ignore
123 if onexc is None:
124 onexc = _onerror_reraise
125 handler: OnErr = partial(rmtree_errorhandler, onexc=onexc)
126 if sys.version_info >= (3, 12):
127 # See https://docs.python.org/3.12/whatsnew/3.12.html#shutil.
128 shutil.rmtree(dir, onexc=handler) # type: ignore
129 else:
130 shutil.rmtree(dir, onerror=handler) # type: ignore
131
132
133def _onerror_ignore(*_args: Any) -> None:
134 pass
135
136
137def _onerror_reraise(*_args: Any) -> None:
138 raise # noqa: PLE0704 - Bare exception used to reraise existing exception
139
140
141def rmtree_errorhandler(
142 func: FunctionType,
143 path: Path,
144 exc_info: ExcInfo | BaseException,
145 *,
146 onexc: OnExc = _onerror_reraise,
147) -> None:
148 """
149 `rmtree` error handler to 'force' a file remove (i.e. like `rm -f`).
150
151 * If a file is readonly then it's write flag is set and operation is
152 retried.
153
154 * `onerror` is the original callback from `rmtree(... onerror=onerror)`
155 that is chained at the end if the "rm -f" still fails.
156 """
157 try:
158 st_mode = os.stat(path).st_mode
159 except OSError:
160 # it's equivalent to os.path.exists
161 return
162
163 if not st_mode & stat.S_IWRITE:
164 # convert to read/write
165 try:
166 os.chmod(path, st_mode | stat.S_IWRITE)
167 except OSError:
168 pass
169 else:
170 # use the original function to repeat the operation
171 try:
172 func(path)
173 return
174 except OSError:
175 pass
176
177 if not isinstance(exc_info, BaseException):
178 _, exc_info, _ = exc_info
179 onexc(func, path, exc_info)
180
181
182def display_path(path: str) -> str:
183 """Gives the display value for a given path, making it relative to cwd
184 if possible."""
185 try:
186 relative = Path(path).relative_to(Path.cwd())
187 except ValueError:
188 # If the path isn't relative to the CWD, leave it alone
189 return path
190 return os.path.join(".", relative)
191
192
193def backup_dir(dir: str, ext: str = ".bak") -> str:
194 """Figure out the name of a directory to back up the given dir to
195 (adding .bak, .bak2, etc)"""
196 n = 1
197 extension = ext
198 while os.path.exists(dir + extension):
199 n += 1
200 extension = ext + str(n)
201 return dir + extension
202
203
204def ask_path_exists(message: str, options: Iterable[str]) -> str:
205 for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
206 if action in options:
207 return action
208 return ask(message, options)
209
210
211def _check_no_input(message: str) -> None:
212 """Raise an error if no input is allowed."""
213 if os.environ.get("PIP_NO_INPUT"):
214 raise Exception(
215 f"No input was expected ($PIP_NO_INPUT set); question: {message}"
216 )
217
218
219def ask(message: str, options: Iterable[str]) -> str:
220 """Ask the message interactively, with the given possible responses"""
221 while 1:
222 _check_no_input(message)
223 response = input(message)
224 response = response.strip().lower()
225 if response not in options:
226 print(
227 "Your response ({!r}) was not one of the expected responses: "
228 "{}".format(response, ", ".join(options))
229 )
230 else:
231 return response
232
233
234def ask_input(message: str) -> str:
235 """Ask for input interactively."""
236 _check_no_input(message)
237 return input(message)
238
239
240def ask_password(message: str) -> str:
241 """Ask for a password interactively."""
242 _check_no_input(message)
243 return getpass.getpass(message)
244
245
246def strtobool(val: str) -> int:
247 """Convert a string representation of truth to true (1) or false (0).
248
249 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
250 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
251 'val' is anything else.
252 """
253 val = val.lower()
254 if val in ("y", "yes", "t", "true", "on", "1"):
255 return 1
256 elif val in ("n", "no", "f", "false", "off", "0"):
257 return 0
258 else:
259 raise ValueError(f"invalid truth value {val!r}")
260
261
262def format_size(bytes: float) -> str:
263 if bytes > 1000 * 1000:
264 return f"{bytes / 1000.0 / 1000:.1f} MB"
265 elif bytes > 10 * 1000:
266 return f"{int(bytes / 1000)} kB"
267 elif bytes > 1000:
268 return f"{bytes / 1000.0:.1f} kB"
269 else:
270 return f"{int(bytes)} bytes"
271
272
273def tabulate(rows: Iterable[Iterable[Any]]) -> tuple[list[str], list[int]]:
274 """Return a list of formatted rows and a list of column sizes.
275
276 For example::
277
278 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
279 (['foobar 2000', '3735928559'], [10, 4])
280 """
281 rows = [tuple(map(str, row)) for row in rows]
282 sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
283 table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
284 return table, sizes
285
286
287def is_installable_dir(path: str) -> bool:
288 """Is path is a directory containing pyproject.toml or setup.py?
289
290 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
291 a legacy setuptools layout by identifying setup.py. We don't check for the
292 setup.cfg because using it without setup.py is only available for PEP 517
293 projects, which are already covered by the pyproject.toml check.
294 """
295 if not os.path.isdir(path):
296 return False
297 if os.path.isfile(os.path.join(path, "pyproject.toml")):
298 return True
299 if os.path.isfile(os.path.join(path, "setup.py")):
300 return True
301 return False
302
303
304def read_chunks(
305 file: BinaryIO, size: int = FILE_CHUNK_SIZE
306) -> Generator[bytes, None, None]:
307 """Yield pieces of data from a file-like object until EOF."""
308 while True:
309 chunk = file.read(size)
310 if not chunk:
311 break
312 yield chunk
313
314
315def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
316 """
317 Convert a path to its canonical, case-normalized, absolute version.
318
319 """
320 path = os.path.expanduser(path)
321 if resolve_symlinks:
322 path = os.path.realpath(path)
323 else:
324 path = os.path.abspath(path)
325 return os.path.normcase(path)
326
327
328def splitext(path: str) -> tuple[str, str]:
329 """Like os.path.splitext, but take off .tar too"""
330 base, ext = posixpath.splitext(path)
331 if base.lower().endswith(".tar"):
332 ext = base[-4:] + ext
333 base = base[:-4]
334 return base, ext
335
336
337def renames(old: str, new: str) -> None:
338 """Like os.renames(), but handles renaming across devices."""
339 # Implementation borrowed from os.renames().
340 head, tail = os.path.split(new)
341 if head and tail and not os.path.exists(head):
342 os.makedirs(head)
343
344 shutil.move(old, new)
345
346 head, tail = os.path.split(old)
347 if head and tail:
348 try:
349 os.removedirs(head)
350 except OSError:
351 pass
352
353
354def is_local(path: str) -> bool:
355 """
356 Return True if path is within sys.prefix, if we're running in a virtualenv.
357
358 If we're not in a virtualenv, all paths are considered "local."
359
360 Caution: this function assumes the head of path has been normalized
361 with normalize_path.
362 """
363 if not running_under_virtualenv():
364 return True
365 return path.startswith(normalize_path(sys.prefix))
366
367
368def write_output(msg: Any, *args: Any) -> None:
369 logger.info(msg, *args)
370
371
372class StreamWrapper(StringIO):
373 orig_stream: TextIO
374
375 @classmethod
376 def from_stream(cls, orig_stream: TextIO) -> StreamWrapper:
377 ret = cls()
378 ret.orig_stream = orig_stream
379 return ret
380
381 # compileall.compile_dir() needs stdout.encoding to print to stdout
382 # type ignore is because TextIOBase.encoding is writeable
383 @property
384 def encoding(self) -> str: # type: ignore
385 return self.orig_stream.encoding
386
387
388# Simulates an enum
389def enum(*sequential: Any, **named: Any) -> type[Any]:
390 enums = dict(zip(sequential, range(len(sequential))), **named)
391 reverse = {value: key for key, value in enums.items()}
392 enums["reverse_mapping"] = reverse
393 return type("Enum", (), enums)
394
395
396def build_netloc(host: str, port: int | None) -> str:
397 """
398 Build a netloc from a host-port pair
399 """
400 if port is None:
401 return host
402 if ":" in host:
403 # Only wrap host with square brackets when it is IPv6
404 host = f"[{host}]"
405 return f"{host}:{port}"
406
407
408def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
409 """
410 Build a full URL from a netloc.
411 """
412 if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
413 # It must be a bare IPv6 address, so wrap it with brackets.
414 netloc = f"[{netloc}]"
415 return f"{scheme}://{netloc}"
416
417
418def parse_netloc(netloc: str) -> tuple[str | None, int | None]:
419 """
420 Return the host-port pair from a netloc.
421 """
422 url = build_url_from_netloc(netloc)
423 parsed = urllib.parse.urlparse(url)
424 return parsed.hostname, parsed.port
425
426
427def split_auth_from_netloc(netloc: str) -> NetlocTuple:
428 """
429 Parse out and remove the auth information from a netloc.
430
431 Returns: (netloc, (username, password)).
432 """
433 if "@" not in netloc:
434 return netloc, (None, None)
435
436 # Split from the right because that's how urllib.parse.urlsplit()
437 # behaves if more than one @ is present (which can be checked using
438 # the password attribute of urlsplit()'s return value).
439 auth, netloc = netloc.rsplit("@", 1)
440 pw: str | None = None
441 if ":" in auth:
442 # Split from the left because that's how urllib.parse.urlsplit()
443 # behaves if more than one : is present (which again can be checked
444 # using the password attribute of the return value)
445 user, pw = auth.split(":", 1)
446 else:
447 user, pw = auth, None
448
449 user = urllib.parse.unquote(user)
450 if pw is not None:
451 pw = urllib.parse.unquote(pw)
452
453 return netloc, (user, pw)
454
455
456def redact_netloc(netloc: str) -> str:
457 """
458 Replace the sensitive data in a netloc with "****", if it exists.
459
460 For example:
461 - "user:pass@example.com" returns "user:****@example.com"
462 - "accesstoken@example.com" returns "****@example.com"
463 """
464 netloc, (user, password) = split_auth_from_netloc(netloc)
465 if user is None:
466 return netloc
467 if password is None:
468 user = "****"
469 password = ""
470 else:
471 user = urllib.parse.quote(user)
472 password = ":****"
473 return f"{user}{password}@{netloc}"
474
475
476def _transform_url(
477 url: str, transform_netloc: Callable[[str], tuple[Any, ...]]
478) -> tuple[str, NetlocTuple]:
479 """Transform and replace netloc in a url.
480
481 transform_netloc is a function taking the netloc and returning a
482 tuple. The first element of this tuple is the new netloc. The
483 entire tuple is returned.
484
485 Returns a tuple containing the transformed url as item 0 and the
486 original tuple returned by transform_netloc as item 1.
487 """
488 purl = urllib.parse.urlsplit(url)
489 netloc_tuple = transform_netloc(purl.netloc)
490 # stripped url
491 url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
492 surl = urllib.parse.urlunsplit(url_pieces)
493 return surl, cast("NetlocTuple", netloc_tuple)
494
495
496def _get_netloc(netloc: str) -> NetlocTuple:
497 return split_auth_from_netloc(netloc)
498
499
500def _redact_netloc(netloc: str) -> tuple[str]:
501 return (redact_netloc(netloc),)
502
503
504def split_auth_netloc_from_url(
505 url: str,
506) -> tuple[str, str, tuple[str | None, str | None]]:
507 """
508 Parse a url into separate netloc, auth, and url with no auth.
509
510 Returns: (url_without_auth, netloc, (username, password))
511 """
512 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
513 return url_without_auth, netloc, auth
514
515
516def remove_auth_from_url(url: str) -> str:
517 """Return a copy of url with 'username:password@' removed."""
518 # username/pass params are passed to subversion through flags
519 # and are not recognized in the url.
520 return _transform_url(url, _get_netloc)[0]
521
522
523def redact_auth_from_url(url: str) -> str:
524 """Replace the password in a given url with ****."""
525 return _transform_url(url, _redact_netloc)[0]
526
527
528def redact_auth_from_requirement(req: Requirement) -> str:
529 """Replace the password in a given requirement url with ****."""
530 if not req.url:
531 return str(req)
532 return str(req).replace(req.url, redact_auth_from_url(req.url))
533
534
535@dataclass(frozen=True)
536class HiddenText:
537 secret: str
538 redacted: str
539
540 def __repr__(self) -> str:
541 return f"<HiddenText {str(self)!r}>"
542
543 def __str__(self) -> str:
544 return self.redacted
545
546 def __eq__(self, other: object) -> bool:
547 # Equality is particularly useful for testing.
548 if type(self) is type(other):
549 # The string being used for redaction doesn't also have to match,
550 # just the raw, original string.
551 return self.secret == cast(HiddenText, other).secret
552 return NotImplemented
553
554 # Disable hashing, since we have a custom __eq__ and don't need hash-ability
555 # (yet). The only required property of hashing is that objects which compare
556 # equal have the same hash value.
557 __hash__ = None # type: ignore[assignment]
558
559
560def hide_value(value: str) -> HiddenText:
561 return HiddenText(value, redacted="****")
562
563
564def hide_url(url: str) -> HiddenText:
565 redacted = redact_auth_from_url(url)
566 return HiddenText(url, redacted=redacted)
567
568
569def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
570 """Protection of pip.exe from modification on Windows
571
572 On Windows, any operation modifying pip should be run as:
573 python -m pip ...
574 """
575 pip_names = [
576 "pip",
577 f"pip{sys.version_info.major}",
578 f"pip{sys.version_info.major}.{sys.version_info.minor}",
579 ]
580
581 # See https://github.com/pypa/pip/issues/1299 for more discussion
582 should_show_use_python_msg = (
583 modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
584 )
585
586 if should_show_use_python_msg:
587 new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
588 raise CommandError(
589 "To modify pip, please run the following command:\n{}".format(
590 " ".join(new_command)
591 )
592 )
593
594
595def check_externally_managed() -> None:
596 """Check whether the current environment is externally managed.
597
598 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
599 is considered externally managed, and an ExternallyManagedEnvironment is
600 raised.
601 """
602 if running_under_virtualenv():
603 return
604 marker = os.path.join(sysconfig.get_path("stdlib"), "EXTERNALLY-MANAGED")
605 if not os.path.isfile(marker):
606 return
607 raise ExternallyManagedEnvironment.from_config(marker)
608
609
610def is_console_interactive() -> bool:
611 """Is this console interactive?"""
612 return sys.stdin is not None and sys.stdin.isatty()
613
614
615def hash_file(path: str, blocksize: int = 1 << 20) -> tuple[Any, int]:
616 """Return (hash, length) for path using hashlib.sha256()"""
617
618 h = hashlib.sha256()
619 length = 0
620 with open(path, "rb") as f:
621 for block in read_chunks(f, size=blocksize):
622 length += len(block)
623 h.update(block)
624 return h, length
625
626
627def pairwise(iterable: Iterable[Any]) -> Iterator[tuple[Any, Any]]:
628 """
629 Return paired elements.
630
631 For example:
632 s -> (s0, s1), (s2, s3), (s4, s5), ...
633 """
634 iterable = iter(iterable)
635 return zip_longest(iterable, iterable)
636
637
638def partition(
639 pred: Callable[[T], bool], iterable: Iterable[T]
640) -> tuple[Iterable[T], Iterable[T]]:
641 """
642 Use a predicate to partition entries into false entries and true entries,
643 like
644
645 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
646 """
647 t1, t2 = tee(iterable)
648 return filterfalse(pred, t1), filter(pred, t2)
649
650
651class ConfiguredBuildBackendHookCaller(BuildBackendHookCaller):
652 def __init__(
653 self,
654 config_holder: Any,
655 source_dir: str,
656 build_backend: str,
657 backend_path: str | None = None,
658 runner: Callable[..., None] | None = None,
659 python_executable: str | None = None,
660 ):
661 super().__init__(
662 source_dir, build_backend, backend_path, runner, python_executable
663 )
664 self.config_holder = config_holder
665
666 def build_wheel(
667 self,
668 wheel_directory: str,
669 config_settings: Mapping[str, Any] | None = None,
670 metadata_directory: str | None = None,
671 ) -> str:
672 cs = self.config_holder.config_settings
673 return super().build_wheel(
674 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
675 )
676
677 def build_sdist(
678 self,
679 sdist_directory: str,
680 config_settings: Mapping[str, Any] | None = None,
681 ) -> str:
682 cs = self.config_holder.config_settings
683 return super().build_sdist(sdist_directory, config_settings=cs)
684
685 def build_editable(
686 self,
687 wheel_directory: str,
688 config_settings: Mapping[str, Any] | None = None,
689 metadata_directory: str | None = None,
690 ) -> str:
691 cs = self.config_holder.config_settings
692 return super().build_editable(
693 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
694 )
695
696 def get_requires_for_build_wheel(
697 self, config_settings: Mapping[str, Any] | None = None
698 ) -> Sequence[str]:
699 cs = self.config_holder.config_settings
700 return super().get_requires_for_build_wheel(config_settings=cs)
701
702 def get_requires_for_build_sdist(
703 self, config_settings: Mapping[str, Any] | None = None
704 ) -> Sequence[str]:
705 cs = self.config_holder.config_settings
706 return super().get_requires_for_build_sdist(config_settings=cs)
707
708 def get_requires_for_build_editable(
709 self, config_settings: Mapping[str, Any] | None = None
710 ) -> Sequence[str]:
711 cs = self.config_holder.config_settings
712 return super().get_requires_for_build_editable(config_settings=cs)
713
714 def prepare_metadata_for_build_wheel(
715 self,
716 metadata_directory: str,
717 config_settings: Mapping[str, Any] | None = None,
718 _allow_fallback: bool = True,
719 ) -> str:
720 cs = self.config_holder.config_settings
721 return super().prepare_metadata_for_build_wheel(
722 metadata_directory=metadata_directory,
723 config_settings=cs,
724 _allow_fallback=_allow_fallback,
725 )
726
727 def prepare_metadata_for_build_editable(
728 self,
729 metadata_directory: str,
730 config_settings: Mapping[str, Any] | None = None,
731 _allow_fallback: bool = True,
732 ) -> str | None:
733 cs = self.config_holder.config_settings
734 return super().prepare_metadata_for_build_editable(
735 metadata_directory=metadata_directory,
736 config_settings=cs,
737 _allow_fallback=_allow_fallback,
738 )
739
740
741def warn_if_run_as_root() -> None:
742 """Output a warning for sudo users on Unix.
743
744 In a virtual environment, sudo pip still writes to virtualenv.
745 On Windows, users may run pip as Administrator without issues.
746 This warning only applies to Unix root users outside of virtualenv.
747 """
748 if running_under_virtualenv():
749 return
750 if not hasattr(os, "getuid"):
751 return
752 # On Windows, there are no "system managed" Python packages. Installing as
753 # Administrator via pip is the correct way of updating system environments.
754 #
755 # We choose sys.platform over utils.compat.WINDOWS here to enable Mypy platform
756 # checks: https://mypy.readthedocs.io/en/stable/common_issues.html
757 if sys.platform == "win32" or sys.platform == "cygwin":
758 return
759
760 if os.getuid() != 0:
761 return
762
763 logger.warning(
764 "Running pip as the 'root' user can result in broken permissions and "
765 "conflicting behaviour with the system package manager, possibly "
766 "rendering your system unusable. "
767 "It is recommended to use a virtual environment instead: "
768 "https://pip.pypa.io/warnings/venv. "
769 "Use the --root-user-action option if you know what you are doing and "
770 "want to suppress this warning."
771 )