1"""Handles all VCS (version control) support"""
2
3from __future__ import annotations
4
5import logging
6import os
7import shutil
8import sys
9import urllib.parse
10from collections.abc import Iterable, Iterator, Mapping
11from dataclasses import dataclass, field
12from typing import (
13 Any,
14 Literal,
15 Optional,
16)
17
18from pip._internal.cli.spinners import SpinnerInterface
19from pip._internal.exceptions import BadCommand, InstallationError
20from pip._internal.utils.misc import (
21 HiddenText,
22 ask_path_exists,
23 backup_dir,
24 display_path,
25 hide_url,
26 hide_value,
27 is_installable_dir,
28 rmtree,
29)
30from pip._internal.utils.subprocess import (
31 CommandArgs,
32 call_subprocess,
33 format_command_args,
34 make_command,
35)
36
37__all__ = ["vcs"]
38
39
40logger = logging.getLogger(__name__)
41
42AuthInfo = tuple[Optional[str], Optional[str]]
43
44
45def is_url(name: str) -> bool:
46 """
47 Return true if the name looks like a URL.
48 """
49 scheme = urllib.parse.urlsplit(name).scheme
50 if not scheme:
51 return False
52 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
53
54
55def make_vcs_requirement_url(
56 repo_url: str, rev: str, project_name: str, subdir: str | None = None
57) -> str:
58 """
59 Return the URL for a VCS requirement.
60
61 Args:
62 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
63 project_name: the (unescaped) project name.
64 """
65 quoted_rev = urllib.parse.quote(rev, "/")
66 egg_project_name = project_name.replace("-", "_")
67 req = f"{repo_url}@{quoted_rev}#egg={egg_project_name}"
68 if subdir:
69 req += f"&subdirectory={subdir}"
70
71 return req
72
73
74def find_path_to_project_root_from_repo_root(
75 location: str, repo_root: str
76) -> str | None:
77 """
78 Find the the Python project's root by searching up the filesystem from
79 `location`. Return the path to project root relative to `repo_root`.
80 Return None if the project root is `repo_root`, or cannot be found.
81 """
82 # find project root.
83 orig_location = location
84 while not is_installable_dir(location):
85 last_location = location
86 location = os.path.dirname(location)
87 if location == last_location:
88 # We've traversed up to the root of the filesystem without
89 # finding a Python project.
90 logger.warning(
91 "Could not find a Python project for directory %s (tried all "
92 "parent directories)",
93 orig_location,
94 )
95 return None
96
97 if os.path.samefile(repo_root, location):
98 return None
99
100 return os.path.relpath(location, repo_root)
101
102
103class RemoteNotFoundError(Exception):
104 pass
105
106
107class RemoteNotValidError(Exception):
108 def __init__(self, url: str):
109 super().__init__(url)
110 self.url = url
111
112
113@dataclass(frozen=True)
114class RevOptions:
115 """
116 Encapsulates a VCS-specific revision to install, along with any VCS
117 install options.
118
119 Args:
120 vc_class: a VersionControl subclass.
121 rev: the name of the revision to install.
122 extra_args: a list of extra options.
123 """
124
125 vc_class: type[VersionControl]
126 rev: str | None = None
127 extra_args: CommandArgs = field(default_factory=list)
128 branch_name: str | None = None
129
130 def __repr__(self) -> str:
131 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
132
133 @property
134 def arg_rev(self) -> str | None:
135 if self.rev is None:
136 return self.vc_class.default_arg_rev
137
138 return self.rev
139
140 def to_args(self) -> CommandArgs:
141 """
142 Return the VCS-specific command arguments.
143 """
144 args: CommandArgs = []
145 rev = self.arg_rev
146 if rev is not None:
147 args += self.vc_class.get_base_rev_args(rev)
148 args += self.extra_args
149
150 return args
151
152 def to_display(self) -> str:
153 if not self.rev:
154 return ""
155
156 return f" (to revision {self.rev})"
157
158 def make_new(self, rev: str) -> RevOptions:
159 """
160 Make a copy of the current instance, but with a new rev.
161
162 Args:
163 rev: the name of the revision for the new object.
164 """
165 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
166
167
168class VcsSupport:
169 _registry: dict[str, VersionControl] = {}
170 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
171
172 def __init__(self) -> None:
173 # Register more schemes with urlparse for various version control
174 # systems
175 urllib.parse.uses_netloc.extend(self.schemes)
176 super().__init__()
177
178 def __iter__(self) -> Iterator[str]:
179 return self._registry.__iter__()
180
181 @property
182 def backends(self) -> list[VersionControl]:
183 return list(self._registry.values())
184
185 @property
186 def dirnames(self) -> list[str]:
187 return [backend.dirname for backend in self.backends]
188
189 @property
190 def all_schemes(self) -> list[str]:
191 schemes: list[str] = []
192 for backend in self.backends:
193 schemes.extend(backend.schemes)
194 return schemes
195
196 def register(self, cls: type[VersionControl]) -> None:
197 if not hasattr(cls, "name"):
198 logger.warning("Cannot register VCS %s", cls.__name__)
199 return
200 if cls.name not in self._registry:
201 self._registry[cls.name] = cls()
202 logger.debug("Registered VCS backend: %s", cls.name)
203
204 def unregister(self, name: str) -> None:
205 if name in self._registry:
206 del self._registry[name]
207
208 def get_backend_for_dir(self, location: str) -> VersionControl | None:
209 """
210 Return a VersionControl object if a repository of that type is found
211 at the given directory.
212 """
213 vcs_backends = {}
214 for vcs_backend in self._registry.values():
215 repo_path = vcs_backend.get_repository_root(location)
216 if not repo_path:
217 continue
218 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
219 vcs_backends[repo_path] = vcs_backend
220
221 if not vcs_backends:
222 return None
223
224 # Choose the VCS in the inner-most directory. Since all repository
225 # roots found here would be either `location` or one of its
226 # parents, the longest path should have the most path components,
227 # i.e. the backend representing the inner-most repository.
228 inner_most_repo_path = max(vcs_backends, key=len)
229 return vcs_backends[inner_most_repo_path]
230
231 def get_backend_for_scheme(self, scheme: str) -> VersionControl | None:
232 """
233 Return a VersionControl object or None.
234 """
235 for vcs_backend in self._registry.values():
236 if scheme in vcs_backend.schemes:
237 return vcs_backend
238 return None
239
240 def get_backend(self, name: str) -> VersionControl | None:
241 """
242 Return a VersionControl object or None.
243 """
244 name = name.lower()
245 return self._registry.get(name)
246
247
248vcs = VcsSupport()
249
250
251class VersionControl:
252 name = ""
253 dirname = ""
254 repo_name = ""
255 # List of supported schemes for this Version Control
256 schemes: tuple[str, ...] = ()
257 # Iterable of environment variable names to pass to call_subprocess().
258 unset_environ: tuple[str, ...] = ()
259 default_arg_rev: str | None = None
260
261 @classmethod
262 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
263 """
264 Return whether the vcs prefix (e.g. "git+") should be added to a
265 repository's remote url when used in a requirement.
266 """
267 return not remote_url.lower().startswith(f"{cls.name}:")
268
269 @classmethod
270 def get_subdirectory(cls, location: str) -> str | None:
271 """
272 Return the path to Python project root, relative to the repo root.
273 Return None if the project root is in the repo root.
274 """
275 return None
276
277 @classmethod
278 def get_requirement_revision(cls, repo_dir: str) -> str:
279 """
280 Return the revision string that should be used in a requirement.
281 """
282 return cls.get_revision(repo_dir)
283
284 @classmethod
285 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
286 """
287 Return the requirement string to use to redownload the files
288 currently at the given repository directory.
289
290 Args:
291 project_name: the (unescaped) project name.
292
293 The return value has a form similar to the following:
294
295 {repository_url}@{revision}#egg={project_name}
296 """
297 repo_url = cls.get_remote_url(repo_dir)
298
299 if cls.should_add_vcs_url_prefix(repo_url):
300 repo_url = f"{cls.name}+{repo_url}"
301
302 revision = cls.get_requirement_revision(repo_dir)
303 subdir = cls.get_subdirectory(repo_dir)
304 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
305
306 return req
307
308 @staticmethod
309 def get_base_rev_args(rev: str) -> list[str]:
310 """
311 Return the base revision arguments for a vcs command.
312
313 Args:
314 rev: the name of a revision to install. Cannot be None.
315 """
316 raise NotImplementedError
317
318 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
319 """
320 Return true if the commit hash checked out at dest matches
321 the revision in url.
322
323 Always return False, if the VCS does not support immutable commit
324 hashes.
325
326 This method does not check if there are local uncommitted changes
327 in dest after checkout, as pip currently has no use case for that.
328 """
329 return False
330
331 @classmethod
332 def make_rev_options(
333 cls, rev: str | None = None, extra_args: CommandArgs | None = None
334 ) -> RevOptions:
335 """
336 Return a RevOptions object.
337
338 Args:
339 rev: the name of a revision to install.
340 extra_args: a list of extra options.
341 """
342 return RevOptions(cls, rev, extra_args=extra_args or [])
343
344 @classmethod
345 def _is_local_repository(cls, repo: str) -> bool:
346 """
347 posix absolute paths start with os.path.sep,
348 win32 ones start with drive (like c:\\folder)
349 """
350 drive, tail = os.path.splitdrive(repo)
351 return repo.startswith(os.path.sep) or bool(drive)
352
353 @classmethod
354 def get_netloc_and_auth(
355 cls, netloc: str, scheme: str
356 ) -> tuple[str, tuple[str | None, str | None]]:
357 """
358 Parse the repository URL's netloc, and return the new netloc to use
359 along with auth information.
360
361 Args:
362 netloc: the original repository URL netloc.
363 scheme: the repository URL's scheme without the vcs prefix.
364
365 This is mainly for the Subversion class to override, so that auth
366 information can be provided via the --username and --password options
367 instead of through the URL. For other subclasses like Git without
368 such an option, auth information must stay in the URL.
369
370 Returns: (netloc, (username, password)).
371 """
372 return netloc, (None, None)
373
374 @classmethod
375 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]:
376 """
377 Parse the repository URL to use, and return the URL, revision,
378 and auth info to use.
379
380 Returns: (url, rev, (username, password)).
381 """
382 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
383 if "+" not in scheme:
384 raise ValueError(
385 f"Sorry, {url!r} is a malformed VCS url. "
386 "The format is <vcs>+<protocol>://<url>, "
387 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
388 )
389 # Remove the vcs prefix.
390 scheme = scheme.split("+", 1)[1]
391 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
392 rev = None
393 if "@" in path:
394 path, rev = path.rsplit("@", 1)
395 if not rev:
396 raise InstallationError(
397 f"The URL {url!r} has an empty revision (after @) "
398 "which is not supported. Include a revision after @ "
399 "or remove @ from the URL."
400 )
401 rev = urllib.parse.unquote(rev)
402 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
403 return url, rev, user_pass
404
405 @staticmethod
406 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs:
407 """
408 Return the RevOptions "extra arguments" to use in obtain().
409 """
410 return []
411
412 def get_url_rev_options(self, url: HiddenText) -> tuple[HiddenText, RevOptions]:
413 """
414 Return the URL and RevOptions object to use in obtain(),
415 as a tuple (url, rev_options).
416 """
417 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
418 username, secret_password = user_pass
419 password: HiddenText | None = None
420 if secret_password is not None:
421 password = hide_value(secret_password)
422 extra_args = self.make_rev_args(username, password)
423 rev_options = self.make_rev_options(rev, extra_args=extra_args)
424
425 return hide_url(secret_url), rev_options
426
427 @staticmethod
428 def normalize_url(url: str) -> str:
429 """
430 Normalize a URL for comparison by unquoting it and removing any
431 trailing slash.
432 """
433 return urllib.parse.unquote(url).rstrip("/")
434
435 @classmethod
436 def compare_urls(cls, url1: str, url2: str) -> bool:
437 """
438 Compare two repo URLs for identity, ignoring incidental differences.
439 """
440 return cls.normalize_url(url1) == cls.normalize_url(url2)
441
442 def fetch_new(
443 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
444 ) -> None:
445 """
446 Fetch a revision from a repository, in the case that this is the
447 first fetch from the repository.
448
449 Args:
450 dest: the directory to fetch the repository to.
451 rev_options: a RevOptions object.
452 verbosity: verbosity level.
453 """
454 raise NotImplementedError
455
456 def switch(
457 self,
458 dest: str,
459 url: HiddenText,
460 rev_options: RevOptions,
461 verbosity: int = 0,
462 ) -> None:
463 """
464 Switch the repo at ``dest`` to point to ``URL``.
465
466 Args:
467 rev_options: a RevOptions object.
468 """
469 raise NotImplementedError
470
471 def update(
472 self,
473 dest: str,
474 url: HiddenText,
475 rev_options: RevOptions,
476 verbosity: int = 0,
477 ) -> None:
478 """
479 Update an already-existing repo to the given ``rev_options``.
480
481 Args:
482 rev_options: a RevOptions object.
483 """
484 raise NotImplementedError
485
486 @classmethod
487 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool:
488 """
489 Return whether the id of the current commit equals the given name.
490
491 Args:
492 dest: the repository directory.
493 name: a string name.
494 """
495 raise NotImplementedError
496
497 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
498 """
499 Install or update in editable mode the package represented by this
500 VersionControl object.
501
502 :param dest: the repository directory in which to install or update.
503 :param url: the repository URL starting with a vcs prefix.
504 :param verbosity: verbosity level.
505 """
506 url, rev_options = self.get_url_rev_options(url)
507
508 if not os.path.exists(dest):
509 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
510 return
511
512 rev_display = rev_options.to_display()
513 if self.is_repository_directory(dest):
514 existing_url = self.get_remote_url(dest)
515 if self.compare_urls(existing_url, url.secret):
516 logger.debug(
517 "%s in %s exists, and has correct URL (%s)",
518 self.repo_name.title(),
519 display_path(dest),
520 url,
521 )
522 if not self.is_commit_id_equal(dest, rev_options.rev):
523 logger.info(
524 "Updating %s %s%s",
525 display_path(dest),
526 self.repo_name,
527 rev_display,
528 )
529 self.update(dest, url, rev_options, verbosity=verbosity)
530 else:
531 logger.info("Skipping because already up-to-date.")
532 return
533
534 logger.warning(
535 "%s %s in %s exists with URL %s",
536 self.name,
537 self.repo_name,
538 display_path(dest),
539 existing_url,
540 )
541 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
542 else:
543 logger.warning(
544 "Directory %s already exists, and is not a %s %s.",
545 dest,
546 self.name,
547 self.repo_name,
548 )
549 # https://github.com/python/mypy/issues/1174
550 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
551
552 logger.warning(
553 "The plan is to install the %s repository %s",
554 self.name,
555 url,
556 )
557 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1])
558
559 if response == "a":
560 sys.exit(-1)
561
562 if response == "w":
563 logger.warning("Deleting %s", display_path(dest))
564 rmtree(dest)
565 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
566 return
567
568 if response == "b":
569 dest_dir = backup_dir(dest)
570 logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
571 shutil.move(dest, dest_dir)
572 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
573 return
574
575 # Do nothing if the response is "i".
576 if response == "s":
577 logger.info(
578 "Switching %s %s to %s%s",
579 self.repo_name,
580 display_path(dest),
581 url,
582 rev_display,
583 )
584 self.switch(dest, url, rev_options, verbosity=verbosity)
585
586 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
587 """
588 Clean up current location and download the url repository
589 (and vcs infos) into location
590
591 :param url: the repository URL starting with a vcs prefix.
592 :param verbosity: verbosity level.
593 """
594 if os.path.exists(location):
595 rmtree(location)
596 self.obtain(location, url=url, verbosity=verbosity)
597
598 @classmethod
599 def get_remote_url(cls, location: str) -> str:
600 """
601 Return the url used at location
602
603 Raises RemoteNotFoundError if the repository does not have a remote
604 url configured.
605 """
606 raise NotImplementedError
607
608 @classmethod
609 def get_revision(cls, location: str) -> str:
610 """
611 Return the current commit id of the files at the given location.
612 """
613 raise NotImplementedError
614
615 @classmethod
616 def run_command(
617 cls,
618 cmd: list[str] | CommandArgs,
619 show_stdout: bool = True,
620 cwd: str | None = None,
621 on_returncode: Literal["raise", "warn", "ignore"] = "raise",
622 extra_ok_returncodes: Iterable[int] | None = None,
623 command_desc: str | None = None,
624 extra_environ: Mapping[str, Any] | None = None,
625 spinner: SpinnerInterface | None = None,
626 log_failed_cmd: bool = True,
627 stdout_only: bool = False,
628 ) -> str:
629 """
630 Run a VCS subcommand
631 This is simply a wrapper around call_subprocess that adds the VCS
632 command name, and checks that the VCS is available
633 """
634 cmd = make_command(cls.name, *cmd)
635 if command_desc is None:
636 command_desc = format_command_args(cmd)
637 try:
638 return call_subprocess(
639 cmd,
640 show_stdout,
641 cwd,
642 on_returncode=on_returncode,
643 extra_ok_returncodes=extra_ok_returncodes,
644 command_desc=command_desc,
645 extra_environ=extra_environ,
646 unset_environ=cls.unset_environ,
647 spinner=spinner,
648 log_failed_cmd=log_failed_cmd,
649 stdout_only=stdout_only,
650 )
651 except NotADirectoryError:
652 raise BadCommand(f"Cannot find command {cls.name!r} - invalid PATH")
653 except FileNotFoundError:
654 # errno.ENOENT = no such file or directory
655 # In other words, the VCS executable isn't available
656 raise BadCommand(
657 f"Cannot find command {cls.name!r} - do you have "
658 f"{cls.name!r} installed and in your PATH?"
659 )
660 except PermissionError:
661 # errno.EACCES = Permission denied
662 # This error occurs, for instance, when the command is installed
663 # only for another user. So, the current user don't have
664 # permission to call the other user command.
665 raise BadCommand(
666 f"No permission to execute {cls.name!r} - install it "
667 f"locally, globally (ask admin), or check your PATH. "
668 f"See possible solutions at "
669 f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
670 f"#fixing-permission-denied."
671 )
672
673 @classmethod
674 def is_repository_directory(cls, path: str) -> bool:
675 """
676 Return whether a directory path is a repository directory.
677 """
678 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
679 return os.path.exists(os.path.join(path, cls.dirname))
680
681 @classmethod
682 def get_repository_root(cls, location: str) -> str | None:
683 """
684 Return the "root" (top-level) directory controlled by the vcs,
685 or `None` if the directory is not in any.
686
687 It is meant to be overridden to implement smarter detection
688 mechanisms for specific vcs.
689
690 This can do more than is_repository_directory() alone. For
691 example, the Git override checks that Git is actually available.
692 """
693 if cls.is_repository_directory(location):
694 return location
695 return None