Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/versioncontrol.py: 38%
269 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1"""Handles all VCS (version control) support"""
3import logging
4import os
5import shutil
6import sys
7import urllib.parse
8from typing import (
9 Any,
10 Dict,
11 Iterable,
12 Iterator,
13 List,
14 Literal,
15 Mapping,
16 Optional,
17 Tuple,
18 Type,
19 Union,
20)
22from pip._internal.cli.spinners import SpinnerInterface
23from pip._internal.exceptions import BadCommand, InstallationError
24from pip._internal.utils.misc import (
25 HiddenText,
26 ask_path_exists,
27 backup_dir,
28 display_path,
29 hide_url,
30 hide_value,
31 is_installable_dir,
32 rmtree,
33)
34from pip._internal.utils.subprocess import (
35 CommandArgs,
36 call_subprocess,
37 format_command_args,
38 make_command,
39)
40from pip._internal.utils.urls import get_url_scheme
42__all__ = ["vcs"]
45logger = logging.getLogger(__name__)
47AuthInfo = Tuple[Optional[str], Optional[str]]
50def is_url(name: str) -> bool:
51 """
52 Return true if the name looks like a URL.
53 """
54 scheme = get_url_scheme(name)
55 if scheme is None:
56 return False
57 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
60def make_vcs_requirement_url(
61 repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
62) -> str:
63 """
64 Return the URL for a VCS requirement.
66 Args:
67 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
68 project_name: the (unescaped) project name.
69 """
70 egg_project_name = project_name.replace("-", "_")
71 req = f"{repo_url}@{rev}#egg={egg_project_name}"
72 if subdir:
73 req += f"&subdirectory={subdir}"
75 return req
78def find_path_to_project_root_from_repo_root(
79 location: str, repo_root: str
80) -> Optional[str]:
81 """
82 Find the the Python project's root by searching up the filesystem from
83 `location`. Return the path to project root relative to `repo_root`.
84 Return None if the project root is `repo_root`, or cannot be found.
85 """
86 # find project root.
87 orig_location = location
88 while not is_installable_dir(location):
89 last_location = location
90 location = os.path.dirname(location)
91 if location == last_location:
92 # We've traversed up to the root of the filesystem without
93 # finding a Python project.
94 logger.warning(
95 "Could not find a Python project for directory %s (tried all "
96 "parent directories)",
97 orig_location,
98 )
99 return None
101 if os.path.samefile(repo_root, location):
102 return None
104 return os.path.relpath(location, repo_root)
107class RemoteNotFoundError(Exception):
108 pass
111class RemoteNotValidError(Exception):
112 def __init__(self, url: str):
113 super().__init__(url)
114 self.url = url
117class RevOptions:
119 """
120 Encapsulates a VCS-specific revision to install, along with any VCS
121 install options.
123 Instances of this class should be treated as if immutable.
124 """
126 def __init__(
127 self,
128 vc_class: Type["VersionControl"],
129 rev: Optional[str] = None,
130 extra_args: Optional[CommandArgs] = None,
131 ) -> None:
132 """
133 Args:
134 vc_class: a VersionControl subclass.
135 rev: the name of the revision to install.
136 extra_args: a list of extra options.
137 """
138 if extra_args is None:
139 extra_args = []
141 self.extra_args = extra_args
142 self.rev = rev
143 self.vc_class = vc_class
144 self.branch_name: Optional[str] = None
146 def __repr__(self) -> str:
147 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
149 @property
150 def arg_rev(self) -> Optional[str]:
151 if self.rev is None:
152 return self.vc_class.default_arg_rev
154 return self.rev
156 def to_args(self) -> CommandArgs:
157 """
158 Return the VCS-specific command arguments.
159 """
160 args: CommandArgs = []
161 rev = self.arg_rev
162 if rev is not None:
163 args += self.vc_class.get_base_rev_args(rev)
164 args += self.extra_args
166 return args
168 def to_display(self) -> str:
169 if not self.rev:
170 return ""
172 return f" (to revision {self.rev})"
174 def make_new(self, rev: str) -> "RevOptions":
175 """
176 Make a copy of the current instance, but with a new rev.
178 Args:
179 rev: the name of the revision for the new object.
180 """
181 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
184class VcsSupport:
185 _registry: Dict[str, "VersionControl"] = {}
186 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
188 def __init__(self) -> None:
189 # Register more schemes with urlparse for various version control
190 # systems
191 urllib.parse.uses_netloc.extend(self.schemes)
192 super().__init__()
194 def __iter__(self) -> Iterator[str]:
195 return self._registry.__iter__()
197 @property
198 def backends(self) -> List["VersionControl"]:
199 return list(self._registry.values())
201 @property
202 def dirnames(self) -> List[str]:
203 return [backend.dirname for backend in self.backends]
205 @property
206 def all_schemes(self) -> List[str]:
207 schemes: List[str] = []
208 for backend in self.backends:
209 schemes.extend(backend.schemes)
210 return schemes
212 def register(self, cls: Type["VersionControl"]) -> None:
213 if not hasattr(cls, "name"):
214 logger.warning("Cannot register VCS %s", cls.__name__)
215 return
216 if cls.name not in self._registry:
217 self._registry[cls.name] = cls()
218 logger.debug("Registered VCS backend: %s", cls.name)
220 def unregister(self, name: str) -> None:
221 if name in self._registry:
222 del self._registry[name]
224 def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
225 """
226 Return a VersionControl object if a repository of that type is found
227 at the given directory.
228 """
229 vcs_backends = {}
230 for vcs_backend in self._registry.values():
231 repo_path = vcs_backend.get_repository_root(location)
232 if not repo_path:
233 continue
234 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
235 vcs_backends[repo_path] = vcs_backend
237 if not vcs_backends:
238 return None
240 # Choose the VCS in the inner-most directory. Since all repository
241 # roots found here would be either `location` or one of its
242 # parents, the longest path should have the most path components,
243 # i.e. the backend representing the inner-most repository.
244 inner_most_repo_path = max(vcs_backends, key=len)
245 return vcs_backends[inner_most_repo_path]
247 def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
248 """
249 Return a VersionControl object or None.
250 """
251 for vcs_backend in self._registry.values():
252 if scheme in vcs_backend.schemes:
253 return vcs_backend
254 return None
256 def get_backend(self, name: str) -> Optional["VersionControl"]:
257 """
258 Return a VersionControl object or None.
259 """
260 name = name.lower()
261 return self._registry.get(name)
264vcs = VcsSupport()
267class VersionControl:
268 name = ""
269 dirname = ""
270 repo_name = ""
271 # List of supported schemes for this Version Control
272 schemes: Tuple[str, ...] = ()
273 # Iterable of environment variable names to pass to call_subprocess().
274 unset_environ: Tuple[str, ...] = ()
275 default_arg_rev: Optional[str] = None
277 @classmethod
278 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
279 """
280 Return whether the vcs prefix (e.g. "git+") should be added to a
281 repository's remote url when used in a requirement.
282 """
283 return not remote_url.lower().startswith(f"{cls.name}:")
285 @classmethod
286 def get_subdirectory(cls, location: str) -> Optional[str]:
287 """
288 Return the path to Python project root, relative to the repo root.
289 Return None if the project root is in the repo root.
290 """
291 return None
293 @classmethod
294 def get_requirement_revision(cls, repo_dir: str) -> str:
295 """
296 Return the revision string that should be used in a requirement.
297 """
298 return cls.get_revision(repo_dir)
300 @classmethod
301 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
302 """
303 Return the requirement string to use to redownload the files
304 currently at the given repository directory.
306 Args:
307 project_name: the (unescaped) project name.
309 The return value has a form similar to the following:
311 {repository_url}@{revision}#egg={project_name}
312 """
313 repo_url = cls.get_remote_url(repo_dir)
315 if cls.should_add_vcs_url_prefix(repo_url):
316 repo_url = f"{cls.name}+{repo_url}"
318 revision = cls.get_requirement_revision(repo_dir)
319 subdir = cls.get_subdirectory(repo_dir)
320 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
322 return req
324 @staticmethod
325 def get_base_rev_args(rev: str) -> List[str]:
326 """
327 Return the base revision arguments for a vcs command.
329 Args:
330 rev: the name of a revision to install. Cannot be None.
331 """
332 raise NotImplementedError
334 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
335 """
336 Return true if the commit hash checked out at dest matches
337 the revision in url.
339 Always return False, if the VCS does not support immutable commit
340 hashes.
342 This method does not check if there are local uncommitted changes
343 in dest after checkout, as pip currently has no use case for that.
344 """
345 return False
347 @classmethod
348 def make_rev_options(
349 cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
350 ) -> RevOptions:
351 """
352 Return a RevOptions object.
354 Args:
355 rev: the name of a revision to install.
356 extra_args: a list of extra options.
357 """
358 return RevOptions(cls, rev, extra_args=extra_args)
360 @classmethod
361 def _is_local_repository(cls, repo: str) -> bool:
362 """
363 posix absolute paths start with os.path.sep,
364 win32 ones start with drive (like c:\\folder)
365 """
366 drive, tail = os.path.splitdrive(repo)
367 return repo.startswith(os.path.sep) or bool(drive)
369 @classmethod
370 def get_netloc_and_auth(
371 cls, netloc: str, scheme: str
372 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
373 """
374 Parse the repository URL's netloc, and return the new netloc to use
375 along with auth information.
377 Args:
378 netloc: the original repository URL netloc.
379 scheme: the repository URL's scheme without the vcs prefix.
381 This is mainly for the Subversion class to override, so that auth
382 information can be provided via the --username and --password options
383 instead of through the URL. For other subclasses like Git without
384 such an option, auth information must stay in the URL.
386 Returns: (netloc, (username, password)).
387 """
388 return netloc, (None, None)
390 @classmethod
391 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
392 """
393 Parse the repository URL to use, and return the URL, revision,
394 and auth info to use.
396 Returns: (url, rev, (username, password)).
397 """
398 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
399 if "+" not in scheme:
400 raise ValueError(
401 f"Sorry, {url!r} is a malformed VCS url. "
402 "The format is <vcs>+<protocol>://<url>, "
403 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
404 )
405 # Remove the vcs prefix.
406 scheme = scheme.split("+", 1)[1]
407 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
408 rev = None
409 if "@" in path:
410 path, rev = path.rsplit("@", 1)
411 if not rev:
412 raise InstallationError(
413 f"The URL {url!r} has an empty revision (after @) "
414 "which is not supported. Include a revision after @ "
415 "or remove @ from the URL."
416 )
417 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
418 return url, rev, user_pass
420 @staticmethod
421 def make_rev_args(
422 username: Optional[str], password: Optional[HiddenText]
423 ) -> CommandArgs:
424 """
425 Return the RevOptions "extra arguments" to use in obtain().
426 """
427 return []
429 def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
430 """
431 Return the URL and RevOptions object to use in obtain(),
432 as a tuple (url, rev_options).
433 """
434 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
435 username, secret_password = user_pass
436 password: Optional[HiddenText] = None
437 if secret_password is not None:
438 password = hide_value(secret_password)
439 extra_args = self.make_rev_args(username, password)
440 rev_options = self.make_rev_options(rev, extra_args=extra_args)
442 return hide_url(secret_url), rev_options
444 @staticmethod
445 def normalize_url(url: str) -> str:
446 """
447 Normalize a URL for comparison by unquoting it and removing any
448 trailing slash.
449 """
450 return urllib.parse.unquote(url).rstrip("/")
452 @classmethod
453 def compare_urls(cls, url1: str, url2: str) -> bool:
454 """
455 Compare two repo URLs for identity, ignoring incidental differences.
456 """
457 return cls.normalize_url(url1) == cls.normalize_url(url2)
459 def fetch_new(
460 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
461 ) -> None:
462 """
463 Fetch a revision from a repository, in the case that this is the
464 first fetch from the repository.
466 Args:
467 dest: the directory to fetch the repository to.
468 rev_options: a RevOptions object.
469 verbosity: verbosity level.
470 """
471 raise NotImplementedError
473 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
474 """
475 Switch the repo at ``dest`` to point to ``URL``.
477 Args:
478 rev_options: a RevOptions object.
479 """
480 raise NotImplementedError
482 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
483 """
484 Update an already-existing repo to the given ``rev_options``.
486 Args:
487 rev_options: a RevOptions object.
488 """
489 raise NotImplementedError
491 @classmethod
492 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
493 """
494 Return whether the id of the current commit equals the given name.
496 Args:
497 dest: the repository directory.
498 name: a string name.
499 """
500 raise NotImplementedError
502 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
503 """
504 Install or update in editable mode the package represented by this
505 VersionControl object.
507 :param dest: the repository directory in which to install or update.
508 :param url: the repository URL starting with a vcs prefix.
509 :param verbosity: verbosity level.
510 """
511 url, rev_options = self.get_url_rev_options(url)
513 if not os.path.exists(dest):
514 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
515 return
517 rev_display = rev_options.to_display()
518 if self.is_repository_directory(dest):
519 existing_url = self.get_remote_url(dest)
520 if self.compare_urls(existing_url, url.secret):
521 logger.debug(
522 "%s in %s exists, and has correct URL (%s)",
523 self.repo_name.title(),
524 display_path(dest),
525 url,
526 )
527 if not self.is_commit_id_equal(dest, rev_options.rev):
528 logger.info(
529 "Updating %s %s%s",
530 display_path(dest),
531 self.repo_name,
532 rev_display,
533 )
534 self.update(dest, url, rev_options)
535 else:
536 logger.info("Skipping because already up-to-date.")
537 return
539 logger.warning(
540 "%s %s in %s exists with URL %s",
541 self.name,
542 self.repo_name,
543 display_path(dest),
544 existing_url,
545 )
546 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
547 else:
548 logger.warning(
549 "Directory %s already exists, and is not a %s %s.",
550 dest,
551 self.name,
552 self.repo_name,
553 )
554 # https://github.com/python/mypy/issues/1174
555 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
557 logger.warning(
558 "The plan is to install the %s repository %s",
559 self.name,
560 url,
561 )
562 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1])
564 if response == "a":
565 sys.exit(-1)
567 if response == "w":
568 logger.warning("Deleting %s", display_path(dest))
569 rmtree(dest)
570 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
571 return
573 if response == "b":
574 dest_dir = backup_dir(dest)
575 logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
576 shutil.move(dest, dest_dir)
577 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
578 return
580 # Do nothing if the response is "i".
581 if response == "s":
582 logger.info(
583 "Switching %s %s to %s%s",
584 self.repo_name,
585 display_path(dest),
586 url,
587 rev_display,
588 )
589 self.switch(dest, url, rev_options)
591 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
592 """
593 Clean up current location and download the url repository
594 (and vcs infos) into location
596 :param url: the repository URL starting with a vcs prefix.
597 :param verbosity: verbosity level.
598 """
599 if os.path.exists(location):
600 rmtree(location)
601 self.obtain(location, url=url, verbosity=verbosity)
603 @classmethod
604 def get_remote_url(cls, location: str) -> str:
605 """
606 Return the url used at location
608 Raises RemoteNotFoundError if the repository does not have a remote
609 url configured.
610 """
611 raise NotImplementedError
613 @classmethod
614 def get_revision(cls, location: str) -> str:
615 """
616 Return the current commit id of the files at the given location.
617 """
618 raise NotImplementedError
620 @classmethod
621 def run_command(
622 cls,
623 cmd: Union[List[str], CommandArgs],
624 show_stdout: bool = True,
625 cwd: Optional[str] = None,
626 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
627 extra_ok_returncodes: Optional[Iterable[int]] = None,
628 command_desc: Optional[str] = None,
629 extra_environ: Optional[Mapping[str, Any]] = None,
630 spinner: Optional[SpinnerInterface] = None,
631 log_failed_cmd: bool = True,
632 stdout_only: bool = False,
633 ) -> str:
634 """
635 Run a VCS subcommand
636 This is simply a wrapper around call_subprocess that adds the VCS
637 command name, and checks that the VCS is available
638 """
639 cmd = make_command(cls.name, *cmd)
640 if command_desc is None:
641 command_desc = format_command_args(cmd)
642 try:
643 return call_subprocess(
644 cmd,
645 show_stdout,
646 cwd,
647 on_returncode=on_returncode,
648 extra_ok_returncodes=extra_ok_returncodes,
649 command_desc=command_desc,
650 extra_environ=extra_environ,
651 unset_environ=cls.unset_environ,
652 spinner=spinner,
653 log_failed_cmd=log_failed_cmd,
654 stdout_only=stdout_only,
655 )
656 except FileNotFoundError:
657 # errno.ENOENT = no such file or directory
658 # In other words, the VCS executable isn't available
659 raise BadCommand(
660 f"Cannot find command {cls.name!r} - do you have "
661 f"{cls.name!r} installed and in your PATH?"
662 )
663 except PermissionError:
664 # errno.EACCES = Permission denied
665 # This error occurs, for instance, when the command is installed
666 # only for another user. So, the current user don't have
667 # permission to call the other user command.
668 raise BadCommand(
669 f"No permission to execute {cls.name!r} - install it "
670 f"locally, globally (ask admin), or check your PATH. "
671 f"See possible solutions at "
672 f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
673 f"#fixing-permission-denied."
674 )
676 @classmethod
677 def is_repository_directory(cls, path: str) -> bool:
678 """
679 Return whether a directory path is a repository directory.
680 """
681 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
682 return os.path.exists(os.path.join(path, cls.dirname))
684 @classmethod
685 def get_repository_root(cls, location: str) -> Optional[str]:
686 """
687 Return the "root" (top-level) directory controlled by the vcs,
688 or `None` if the directory is not in any.
690 It is meant to be overridden to implement smarter detection
691 mechanisms for specific vcs.
693 This can do more than is_repository_directory() alone. For
694 example, the Git override checks that Git is actually available.
695 """
696 if cls.is_repository_directory(location):
697 return location
698 return None