Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/versioncontrol.py: 38%
271 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1"""Handles all VCS (version control) support"""
3import logging
4import os
5import shutil
6import sys
7import urllib.parse
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Dict,
12 Iterable,
13 Iterator,
14 List,
15 Mapping,
16 Optional,
17 Tuple,
18 Type,
19 Union,
20)
22from pip._internal.cli.spinners import SpinnerInterface
23from pip._internal.exceptions import BadCommand, InstallationError
24from pip._internal.utils.misc import (
25 HiddenText,
26 ask_path_exists,
27 backup_dir,
28 display_path,
29 hide_url,
30 hide_value,
31 is_installable_dir,
32 rmtree,
33)
34from pip._internal.utils.subprocess import (
35 CommandArgs,
36 call_subprocess,
37 format_command_args,
38 make_command,
39)
40from pip._internal.utils.urls import get_url_scheme
42if TYPE_CHECKING:
43 # Literal was introduced in Python 3.8.
44 #
45 # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
46 from typing import Literal
49__all__ = ["vcs"]
52logger = logging.getLogger(__name__)
54AuthInfo = Tuple[Optional[str], Optional[str]]
57def is_url(name: str) -> bool:
58 """
59 Return true if the name looks like a URL.
60 """
61 scheme = get_url_scheme(name)
62 if scheme is None:
63 return False
64 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
67def make_vcs_requirement_url(
68 repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
69) -> str:
70 """
71 Return the URL for a VCS requirement.
73 Args:
74 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
75 project_name: the (unescaped) project name.
76 """
77 egg_project_name = project_name.replace("-", "_")
78 req = f"{repo_url}@{rev}#egg={egg_project_name}"
79 if subdir:
80 req += f"&subdirectory={subdir}"
82 return req
85def find_path_to_project_root_from_repo_root(
86 location: str, repo_root: str
87) -> Optional[str]:
88 """
89 Find the the Python project's root by searching up the filesystem from
90 `location`. Return the path to project root relative to `repo_root`.
91 Return None if the project root is `repo_root`, or cannot be found.
92 """
93 # find project root.
94 orig_location = location
95 while not is_installable_dir(location):
96 last_location = location
97 location = os.path.dirname(location)
98 if location == last_location:
99 # We've traversed up to the root of the filesystem without
100 # finding a Python project.
101 logger.warning(
102 "Could not find a Python project for directory %s (tried all "
103 "parent directories)",
104 orig_location,
105 )
106 return None
108 if os.path.samefile(repo_root, location):
109 return None
111 return os.path.relpath(location, repo_root)
114class RemoteNotFoundError(Exception):
115 pass
118class RemoteNotValidError(Exception):
119 def __init__(self, url: str):
120 super().__init__(url)
121 self.url = url
124class RevOptions:
126 """
127 Encapsulates a VCS-specific revision to install, along with any VCS
128 install options.
130 Instances of this class should be treated as if immutable.
131 """
133 def __init__(
134 self,
135 vc_class: Type["VersionControl"],
136 rev: Optional[str] = None,
137 extra_args: Optional[CommandArgs] = None,
138 ) -> None:
139 """
140 Args:
141 vc_class: a VersionControl subclass.
142 rev: the name of the revision to install.
143 extra_args: a list of extra options.
144 """
145 if extra_args is None:
146 extra_args = []
148 self.extra_args = extra_args
149 self.rev = rev
150 self.vc_class = vc_class
151 self.branch_name: Optional[str] = None
153 def __repr__(self) -> str:
154 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
156 @property
157 def arg_rev(self) -> Optional[str]:
158 if self.rev is None:
159 return self.vc_class.default_arg_rev
161 return self.rev
163 def to_args(self) -> CommandArgs:
164 """
165 Return the VCS-specific command arguments.
166 """
167 args: CommandArgs = []
168 rev = self.arg_rev
169 if rev is not None:
170 args += self.vc_class.get_base_rev_args(rev)
171 args += self.extra_args
173 return args
175 def to_display(self) -> str:
176 if not self.rev:
177 return ""
179 return f" (to revision {self.rev})"
181 def make_new(self, rev: str) -> "RevOptions":
182 """
183 Make a copy of the current instance, but with a new rev.
185 Args:
186 rev: the name of the revision for the new object.
187 """
188 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
191class VcsSupport:
192 _registry: Dict[str, "VersionControl"] = {}
193 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
195 def __init__(self) -> None:
196 # Register more schemes with urlparse for various version control
197 # systems
198 urllib.parse.uses_netloc.extend(self.schemes)
199 super().__init__()
201 def __iter__(self) -> Iterator[str]:
202 return self._registry.__iter__()
204 @property
205 def backends(self) -> List["VersionControl"]:
206 return list(self._registry.values())
208 @property
209 def dirnames(self) -> List[str]:
210 return [backend.dirname for backend in self.backends]
212 @property
213 def all_schemes(self) -> List[str]:
214 schemes: List[str] = []
215 for backend in self.backends:
216 schemes.extend(backend.schemes)
217 return schemes
219 def register(self, cls: Type["VersionControl"]) -> None:
220 if not hasattr(cls, "name"):
221 logger.warning("Cannot register VCS %s", cls.__name__)
222 return
223 if cls.name not in self._registry:
224 self._registry[cls.name] = cls()
225 logger.debug("Registered VCS backend: %s", cls.name)
227 def unregister(self, name: str) -> None:
228 if name in self._registry:
229 del self._registry[name]
231 def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
232 """
233 Return a VersionControl object if a repository of that type is found
234 at the given directory.
235 """
236 vcs_backends = {}
237 for vcs_backend in self._registry.values():
238 repo_path = vcs_backend.get_repository_root(location)
239 if not repo_path:
240 continue
241 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
242 vcs_backends[repo_path] = vcs_backend
244 if not vcs_backends:
245 return None
247 # Choose the VCS in the inner-most directory. Since all repository
248 # roots found here would be either `location` or one of its
249 # parents, the longest path should have the most path components,
250 # i.e. the backend representing the inner-most repository.
251 inner_most_repo_path = max(vcs_backends, key=len)
252 return vcs_backends[inner_most_repo_path]
254 def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
255 """
256 Return a VersionControl object or None.
257 """
258 for vcs_backend in self._registry.values():
259 if scheme in vcs_backend.schemes:
260 return vcs_backend
261 return None
263 def get_backend(self, name: str) -> Optional["VersionControl"]:
264 """
265 Return a VersionControl object or None.
266 """
267 name = name.lower()
268 return self._registry.get(name)
271vcs = VcsSupport()
274class VersionControl:
275 name = ""
276 dirname = ""
277 repo_name = ""
278 # List of supported schemes for this Version Control
279 schemes: Tuple[str, ...] = ()
280 # Iterable of environment variable names to pass to call_subprocess().
281 unset_environ: Tuple[str, ...] = ()
282 default_arg_rev: Optional[str] = None
284 @classmethod
285 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
286 """
287 Return whether the vcs prefix (e.g. "git+") should be added to a
288 repository's remote url when used in a requirement.
289 """
290 return not remote_url.lower().startswith(f"{cls.name}:")
292 @classmethod
293 def get_subdirectory(cls, location: str) -> Optional[str]:
294 """
295 Return the path to Python project root, relative to the repo root.
296 Return None if the project root is in the repo root.
297 """
298 return None
300 @classmethod
301 def get_requirement_revision(cls, repo_dir: str) -> str:
302 """
303 Return the revision string that should be used in a requirement.
304 """
305 return cls.get_revision(repo_dir)
307 @classmethod
308 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
309 """
310 Return the requirement string to use to redownload the files
311 currently at the given repository directory.
313 Args:
314 project_name: the (unescaped) project name.
316 The return value has a form similar to the following:
318 {repository_url}@{revision}#egg={project_name}
319 """
320 repo_url = cls.get_remote_url(repo_dir)
322 if cls.should_add_vcs_url_prefix(repo_url):
323 repo_url = f"{cls.name}+{repo_url}"
325 revision = cls.get_requirement_revision(repo_dir)
326 subdir = cls.get_subdirectory(repo_dir)
327 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
329 return req
331 @staticmethod
332 def get_base_rev_args(rev: str) -> List[str]:
333 """
334 Return the base revision arguments for a vcs command.
336 Args:
337 rev: the name of a revision to install. Cannot be None.
338 """
339 raise NotImplementedError
341 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
342 """
343 Return true if the commit hash checked out at dest matches
344 the revision in url.
346 Always return False, if the VCS does not support immutable commit
347 hashes.
349 This method does not check if there are local uncommitted changes
350 in dest after checkout, as pip currently has no use case for that.
351 """
352 return False
354 @classmethod
355 def make_rev_options(
356 cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
357 ) -> RevOptions:
358 """
359 Return a RevOptions object.
361 Args:
362 rev: the name of a revision to install.
363 extra_args: a list of extra options.
364 """
365 return RevOptions(cls, rev, extra_args=extra_args)
367 @classmethod
368 def _is_local_repository(cls, repo: str) -> bool:
369 """
370 posix absolute paths start with os.path.sep,
371 win32 ones start with drive (like c:\\folder)
372 """
373 drive, tail = os.path.splitdrive(repo)
374 return repo.startswith(os.path.sep) or bool(drive)
376 @classmethod
377 def get_netloc_and_auth(
378 cls, netloc: str, scheme: str
379 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
380 """
381 Parse the repository URL's netloc, and return the new netloc to use
382 along with auth information.
384 Args:
385 netloc: the original repository URL netloc.
386 scheme: the repository URL's scheme without the vcs prefix.
388 This is mainly for the Subversion class to override, so that auth
389 information can be provided via the --username and --password options
390 instead of through the URL. For other subclasses like Git without
391 such an option, auth information must stay in the URL.
393 Returns: (netloc, (username, password)).
394 """
395 return netloc, (None, None)
397 @classmethod
398 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
399 """
400 Parse the repository URL to use, and return the URL, revision,
401 and auth info to use.
403 Returns: (url, rev, (username, password)).
404 """
405 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
406 if "+" not in scheme:
407 raise ValueError(
408 "Sorry, {!r} is a malformed VCS url. "
409 "The format is <vcs>+<protocol>://<url>, "
410 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
411 )
412 # Remove the vcs prefix.
413 scheme = scheme.split("+", 1)[1]
414 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
415 rev = None
416 if "@" in path:
417 path, rev = path.rsplit("@", 1)
418 if not rev:
419 raise InstallationError(
420 "The URL {!r} has an empty revision (after @) "
421 "which is not supported. Include a revision after @ "
422 "or remove @ from the URL.".format(url)
423 )
424 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
425 return url, rev, user_pass
427 @staticmethod
428 def make_rev_args(
429 username: Optional[str], password: Optional[HiddenText]
430 ) -> CommandArgs:
431 """
432 Return the RevOptions "extra arguments" to use in obtain().
433 """
434 return []
436 def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
437 """
438 Return the URL and RevOptions object to use in obtain(),
439 as a tuple (url, rev_options).
440 """
441 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
442 username, secret_password = user_pass
443 password: Optional[HiddenText] = None
444 if secret_password is not None:
445 password = hide_value(secret_password)
446 extra_args = self.make_rev_args(username, password)
447 rev_options = self.make_rev_options(rev, extra_args=extra_args)
449 return hide_url(secret_url), rev_options
451 @staticmethod
452 def normalize_url(url: str) -> str:
453 """
454 Normalize a URL for comparison by unquoting it and removing any
455 trailing slash.
456 """
457 return urllib.parse.unquote(url).rstrip("/")
459 @classmethod
460 def compare_urls(cls, url1: str, url2: str) -> bool:
461 """
462 Compare two repo URLs for identity, ignoring incidental differences.
463 """
464 return cls.normalize_url(url1) == cls.normalize_url(url2)
466 def fetch_new(
467 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
468 ) -> None:
469 """
470 Fetch a revision from a repository, in the case that this is the
471 first fetch from the repository.
473 Args:
474 dest: the directory to fetch the repository to.
475 rev_options: a RevOptions object.
476 verbosity: verbosity level.
477 """
478 raise NotImplementedError
480 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
481 """
482 Switch the repo at ``dest`` to point to ``URL``.
484 Args:
485 rev_options: a RevOptions object.
486 """
487 raise NotImplementedError
489 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
490 """
491 Update an already-existing repo to the given ``rev_options``.
493 Args:
494 rev_options: a RevOptions object.
495 """
496 raise NotImplementedError
498 @classmethod
499 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
500 """
501 Return whether the id of the current commit equals the given name.
503 Args:
504 dest: the repository directory.
505 name: a string name.
506 """
507 raise NotImplementedError
509 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
510 """
511 Install or update in editable mode the package represented by this
512 VersionControl object.
514 :param dest: the repository directory in which to install or update.
515 :param url: the repository URL starting with a vcs prefix.
516 :param verbosity: verbosity level.
517 """
518 url, rev_options = self.get_url_rev_options(url)
520 if not os.path.exists(dest):
521 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
522 return
524 rev_display = rev_options.to_display()
525 if self.is_repository_directory(dest):
526 existing_url = self.get_remote_url(dest)
527 if self.compare_urls(existing_url, url.secret):
528 logger.debug(
529 "%s in %s exists, and has correct URL (%s)",
530 self.repo_name.title(),
531 display_path(dest),
532 url,
533 )
534 if not self.is_commit_id_equal(dest, rev_options.rev):
535 logger.info(
536 "Updating %s %s%s",
537 display_path(dest),
538 self.repo_name,
539 rev_display,
540 )
541 self.update(dest, url, rev_options)
542 else:
543 logger.info("Skipping because already up-to-date.")
544 return
546 logger.warning(
547 "%s %s in %s exists with URL %s",
548 self.name,
549 self.repo_name,
550 display_path(dest),
551 existing_url,
552 )
553 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
554 else:
555 logger.warning(
556 "Directory %s already exists, and is not a %s %s.",
557 dest,
558 self.name,
559 self.repo_name,
560 )
561 # https://github.com/python/mypy/issues/1174
562 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
564 logger.warning(
565 "The plan is to install the %s repository %s",
566 self.name,
567 url,
568 )
569 response = ask_path_exists("What to do? {}".format(prompt[0]), prompt[1])
571 if response == "a":
572 sys.exit(-1)
574 if response == "w":
575 logger.warning("Deleting %s", display_path(dest))
576 rmtree(dest)
577 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
578 return
580 if response == "b":
581 dest_dir = backup_dir(dest)
582 logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
583 shutil.move(dest, dest_dir)
584 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
585 return
587 # Do nothing if the response is "i".
588 if response == "s":
589 logger.info(
590 "Switching %s %s to %s%s",
591 self.repo_name,
592 display_path(dest),
593 url,
594 rev_display,
595 )
596 self.switch(dest, url, rev_options)
598 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
599 """
600 Clean up current location and download the url repository
601 (and vcs infos) into location
603 :param url: the repository URL starting with a vcs prefix.
604 :param verbosity: verbosity level.
605 """
606 if os.path.exists(location):
607 rmtree(location)
608 self.obtain(location, url=url, verbosity=verbosity)
610 @classmethod
611 def get_remote_url(cls, location: str) -> str:
612 """
613 Return the url used at location
615 Raises RemoteNotFoundError if the repository does not have a remote
616 url configured.
617 """
618 raise NotImplementedError
620 @classmethod
621 def get_revision(cls, location: str) -> str:
622 """
623 Return the current commit id of the files at the given location.
624 """
625 raise NotImplementedError
627 @classmethod
628 def run_command(
629 cls,
630 cmd: Union[List[str], CommandArgs],
631 show_stdout: bool = True,
632 cwd: Optional[str] = None,
633 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
634 extra_ok_returncodes: Optional[Iterable[int]] = None,
635 command_desc: Optional[str] = None,
636 extra_environ: Optional[Mapping[str, Any]] = None,
637 spinner: Optional[SpinnerInterface] = None,
638 log_failed_cmd: bool = True,
639 stdout_only: bool = False,
640 ) -> str:
641 """
642 Run a VCS subcommand
643 This is simply a wrapper around call_subprocess that adds the VCS
644 command name, and checks that the VCS is available
645 """
646 cmd = make_command(cls.name, *cmd)
647 if command_desc is None:
648 command_desc = format_command_args(cmd)
649 try:
650 return call_subprocess(
651 cmd,
652 show_stdout,
653 cwd,
654 on_returncode=on_returncode,
655 extra_ok_returncodes=extra_ok_returncodes,
656 command_desc=command_desc,
657 extra_environ=extra_environ,
658 unset_environ=cls.unset_environ,
659 spinner=spinner,
660 log_failed_cmd=log_failed_cmd,
661 stdout_only=stdout_only,
662 )
663 except FileNotFoundError:
664 # errno.ENOENT = no such file or directory
665 # In other words, the VCS executable isn't available
666 raise BadCommand(
667 f"Cannot find command {cls.name!r} - do you have "
668 f"{cls.name!r} installed and in your PATH?"
669 )
670 except PermissionError:
671 # errno.EACCES = Permission denied
672 # This error occurs, for instance, when the command is installed
673 # only for another user. So, the current user don't have
674 # permission to call the other user command.
675 raise BadCommand(
676 f"No permission to execute {cls.name!r} - install it "
677 f"locally, globally (ask admin), or check your PATH. "
678 f"See possible solutions at "
679 f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
680 f"#fixing-permission-denied."
681 )
683 @classmethod
684 def is_repository_directory(cls, path: str) -> bool:
685 """
686 Return whether a directory path is a repository directory.
687 """
688 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
689 return os.path.exists(os.path.join(path, cls.dirname))
691 @classmethod
692 def get_repository_root(cls, location: str) -> Optional[str]:
693 """
694 Return the "root" (top-level) directory controlled by the vcs,
695 or `None` if the directory is not in any.
697 It is meant to be overridden to implement smarter detection
698 mechanisms for specific vcs.
700 This can do more than is_repository_directory() alone. For
701 example, the Git override checks that Git is actually available.
702 """
703 if cls.is_repository_directory(location):
704 return location
705 return None