1"""Handles all VCS (version control) support"""
2
3from __future__ import annotations
4
5import logging
6import os
7import shutil
8import sys
9import urllib.parse
10from collections.abc import Iterable, Iterator, Mapping
11from dataclasses import dataclass, field
12from typing import (
13 Any,
14 Literal,
15)
16
17from pip._internal.cli.spinners import SpinnerInterface
18from pip._internal.exceptions import BadCommand, InstallationError
19from pip._internal.utils.misc import (
20 HiddenText,
21 ask_path_exists,
22 backup_dir,
23 display_path,
24 hide_url,
25 hide_value,
26 is_installable_dir,
27 rmtree,
28)
29from pip._internal.utils.subprocess import (
30 CommandArgs,
31 call_subprocess,
32 format_command_args,
33 make_command,
34)
35
36__all__ = ["vcs"]
37
38
39logger = logging.getLogger(__name__)
40
41AuthInfo = tuple[str | None, str | None]
42
43
44def is_url(name: str) -> bool:
45 """
46 Return true if the name looks like a URL.
47 """
48 scheme = urllib.parse.urlsplit(name).scheme
49 if not scheme:
50 return False
51 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
52
53
54def make_vcs_requirement_url(
55 repo_url: str, rev: str, project_name: str, subdir: str | None = None
56) -> str:
57 """
58 Return the URL for a VCS requirement.
59
60 Args:
61 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
62 project_name: the (unescaped) project name.
63 """
64 quoted_rev = urllib.parse.quote(rev, "/")
65 egg_project_name = project_name.replace("-", "_")
66 req = f"{repo_url}@{quoted_rev}#egg={egg_project_name}"
67 if subdir:
68 req += f"&subdirectory={subdir}"
69
70 return req
71
72
73def find_path_to_project_root_from_repo_root(
74 location: str, repo_root: str
75) -> str | None:
76 """
77 Find the the Python project's root by searching up the filesystem from
78 `location`. Return the path to project root relative to `repo_root`.
79 Return None if the project root is `repo_root`, or cannot be found.
80 """
81 # find project root.
82 orig_location = location
83 while not is_installable_dir(location):
84 last_location = location
85 location = os.path.dirname(location)
86 if location == last_location:
87 # We've traversed up to the root of the filesystem without
88 # finding a Python project.
89 logger.warning(
90 "Could not find a Python project for directory %s (tried all "
91 "parent directories)",
92 orig_location,
93 )
94 return None
95
96 if os.path.samefile(repo_root, location):
97 return None
98
99 return os.path.relpath(location, repo_root)
100
101
102class RemoteNotFoundError(Exception):
103 pass
104
105
106class RemoteNotValidError(Exception):
107 def __init__(self, url: str):
108 super().__init__(url)
109 self.url = url
110
111
112@dataclass(frozen=True)
113class RevOptions:
114 """
115 Encapsulates a VCS-specific revision to install, along with any VCS
116 install options.
117
118 Args:
119 vc_class: a VersionControl subclass.
120 rev: the name of the revision to install.
121 extra_args: a list of extra options.
122 """
123
124 vc_class: type[VersionControl]
125 rev: str | None = None
126 extra_args: CommandArgs = field(default_factory=list)
127 branch_name: str | None = None
128
129 def __repr__(self) -> str:
130 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
131
132 @property
133 def arg_rev(self) -> str | None:
134 if self.rev is None:
135 return self.vc_class.default_arg_rev
136
137 return self.rev
138
139 def to_args(self) -> CommandArgs:
140 """
141 Return the VCS-specific command arguments.
142 """
143 args: CommandArgs = []
144 rev = self.arg_rev
145 if rev is not None:
146 args += self.vc_class.get_base_rev_args(rev)
147 args += self.extra_args
148
149 return args
150
151 def to_display(self) -> str:
152 if not self.rev:
153 return ""
154
155 return f" (to revision {self.rev})"
156
157 def make_new(self, rev: str) -> RevOptions:
158 """
159 Make a copy of the current instance, but with a new rev.
160
161 Args:
162 rev: the name of the revision for the new object.
163 """
164 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
165
166
167class VcsSupport:
168 _registry: dict[str, VersionControl] = {}
169 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
170
171 def __init__(self) -> None:
172 # Register more schemes with urlparse for various version control
173 # systems
174 urllib.parse.uses_netloc.extend(self.schemes)
175 super().__init__()
176
177 def __iter__(self) -> Iterator[str]:
178 return self._registry.__iter__()
179
180 @property
181 def backends(self) -> list[VersionControl]:
182 return list(self._registry.values())
183
184 @property
185 def dirnames(self) -> list[str]:
186 return [backend.dirname for backend in self.backends]
187
188 @property
189 def all_schemes(self) -> list[str]:
190 schemes: list[str] = []
191 for backend in self.backends:
192 schemes.extend(backend.schemes)
193 return schemes
194
195 def register(self, cls: type[VersionControl]) -> None:
196 if not hasattr(cls, "name"):
197 logger.warning("Cannot register VCS %s", cls.__name__)
198 return
199 if cls.name not in self._registry:
200 self._registry[cls.name] = cls()
201 logger.debug("Registered VCS backend: %s", cls.name)
202
203 def unregister(self, name: str) -> None:
204 if name in self._registry:
205 del self._registry[name]
206
207 def get_backend_for_dir(self, location: str) -> VersionControl | None:
208 """
209 Return a VersionControl object if a repository of that type is found
210 at the given directory.
211 """
212 vcs_backends = {}
213 for vcs_backend in self._registry.values():
214 repo_path = vcs_backend.get_repository_root(location)
215 if not repo_path:
216 continue
217 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
218 vcs_backends[repo_path] = vcs_backend
219
220 if not vcs_backends:
221 return None
222
223 # Choose the VCS in the inner-most directory. Since all repository
224 # roots found here would be either `location` or one of its
225 # parents, the longest path should have the most path components,
226 # i.e. the backend representing the inner-most repository.
227 inner_most_repo_path = max(vcs_backends, key=len)
228 return vcs_backends[inner_most_repo_path]
229
230 def get_backend_for_scheme(self, scheme: str) -> VersionControl | None:
231 """
232 Return a VersionControl object or None.
233 """
234 for vcs_backend in self._registry.values():
235 if scheme in vcs_backend.schemes:
236 return vcs_backend
237 return None
238
239 def get_backend(self, name: str) -> VersionControl | None:
240 """
241 Return a VersionControl object or None.
242 """
243 name = name.lower()
244 return self._registry.get(name)
245
246
247vcs = VcsSupport()
248
249
250class VersionControl:
251 name = ""
252 dirname = ""
253 repo_name = ""
254 # List of supported schemes for this Version Control
255 schemes: tuple[str, ...] = ()
256 # Iterable of environment variable names to pass to call_subprocess().
257 unset_environ: tuple[str, ...] = ()
258 default_arg_rev: str | None = None
259
260 @classmethod
261 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
262 """
263 Return whether the vcs prefix (e.g. "git+") should be added to a
264 repository's remote url when used in a requirement.
265 """
266 return not remote_url.lower().startswith(f"{cls.name}:")
267
268 @classmethod
269 def get_subdirectory(cls, location: str) -> str | None:
270 """
271 Return the path to Python project root, relative to the repo root.
272 Return None if the project root is in the repo root.
273 """
274 return None
275
276 @classmethod
277 def get_requirement_revision(cls, repo_dir: str) -> str:
278 """
279 Return the revision string that should be used in a requirement.
280 """
281 return cls.get_revision(repo_dir)
282
283 @classmethod
284 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
285 """
286 Return the requirement string to use to redownload the files
287 currently at the given repository directory.
288
289 Args:
290 project_name: the (unescaped) project name.
291
292 The return value has a form similar to the following:
293
294 {repository_url}@{revision}#egg={project_name}
295 """
296 repo_url = cls.get_remote_url(repo_dir)
297
298 if cls.should_add_vcs_url_prefix(repo_url):
299 repo_url = f"{cls.name}+{repo_url}"
300
301 revision = cls.get_requirement_revision(repo_dir)
302 subdir = cls.get_subdirectory(repo_dir)
303 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
304
305 return req
306
307 @staticmethod
308 def get_base_rev_args(rev: str) -> list[str]:
309 """
310 Return the base revision arguments for a vcs command.
311
312 Args:
313 rev: the name of a revision to install. Cannot be None.
314 """
315 raise NotImplementedError
316
317 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
318 """
319 Return true if the commit hash checked out at dest matches
320 the revision in url.
321
322 Always return False, if the VCS does not support immutable commit
323 hashes.
324
325 This method does not check if there are local uncommitted changes
326 in dest after checkout, as pip currently has no use case for that.
327 """
328 return False
329
330 @classmethod
331 def make_rev_options(
332 cls, rev: str | None = None, extra_args: CommandArgs | None = None
333 ) -> RevOptions:
334 """
335 Return a RevOptions object.
336
337 Args:
338 rev: the name of a revision to install.
339 extra_args: a list of extra options.
340 """
341 return RevOptions(cls, rev, extra_args=extra_args or [])
342
343 @classmethod
344 def _is_local_repository(cls, repo: str) -> bool:
345 """
346 posix absolute paths start with os.path.sep,
347 win32 ones start with drive (like c:\\folder)
348 """
349 drive, tail = os.path.splitdrive(repo)
350 return repo.startswith(os.path.sep) or bool(drive)
351
352 @classmethod
353 def get_netloc_and_auth(
354 cls, netloc: str, scheme: str
355 ) -> tuple[str, tuple[str | None, str | None]]:
356 """
357 Parse the repository URL's netloc, and return the new netloc to use
358 along with auth information.
359
360 Args:
361 netloc: the original repository URL netloc.
362 scheme: the repository URL's scheme without the vcs prefix.
363
364 This is mainly for the Subversion class to override, so that auth
365 information can be provided via the --username and --password options
366 instead of through the URL. For other subclasses like Git without
367 such an option, auth information must stay in the URL.
368
369 Returns: (netloc, (username, password)).
370 """
371 return netloc, (None, None)
372
373 @classmethod
374 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]:
375 """
376 Parse the repository URL to use, and return the URL, revision,
377 and auth info to use.
378
379 Returns: (url, rev, (username, password)).
380 """
381 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
382 if "+" not in scheme:
383 raise ValueError(
384 f"Sorry, {url!r} is a malformed VCS url. "
385 "The format is <vcs>+<protocol>://<url>, "
386 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp"
387 )
388 # Remove the vcs prefix.
389 scheme = scheme.split("+", 1)[1]
390 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
391 rev = None
392 if "@" in path:
393 path, rev = path.rsplit("@", 1)
394 if not rev:
395 raise InstallationError(
396 f"The URL {url!r} has an empty revision (after @) "
397 "which is not supported. Include a revision after @ "
398 "or remove @ from the URL."
399 )
400 rev = urllib.parse.unquote(rev)
401 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
402 return url, rev, user_pass
403
404 @staticmethod
405 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs:
406 """
407 Return the RevOptions "extra arguments" to use in obtain().
408 """
409 return []
410
411 def get_url_rev_options(self, url: HiddenText) -> tuple[HiddenText, RevOptions]:
412 """
413 Return the URL and RevOptions object to use in obtain(),
414 as a tuple (url, rev_options).
415 """
416 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
417 username, secret_password = user_pass
418 password: HiddenText | None = None
419 if secret_password is not None:
420 password = hide_value(secret_password)
421 extra_args = self.make_rev_args(username, password)
422 rev_options = self.make_rev_options(rev, extra_args=extra_args)
423
424 return hide_url(secret_url), rev_options
425
426 @staticmethod
427 def normalize_url(url: str) -> str:
428 """
429 Normalize a URL for comparison by unquoting it and removing any
430 trailing slash.
431 """
432 return urllib.parse.unquote(url).rstrip("/")
433
434 @classmethod
435 def compare_urls(cls, url1: str, url2: str) -> bool:
436 """
437 Compare two repo URLs for identity, ignoring incidental differences.
438 """
439 return cls.normalize_url(url1) == cls.normalize_url(url2)
440
441 def fetch_new(
442 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
443 ) -> None:
444 """
445 Fetch a revision from a repository, in the case that this is the
446 first fetch from the repository.
447
448 Args:
449 dest: the directory to fetch the repository to.
450 rev_options: a RevOptions object.
451 verbosity: verbosity level.
452 """
453 raise NotImplementedError
454
455 def switch(
456 self,
457 dest: str,
458 url: HiddenText,
459 rev_options: RevOptions,
460 verbosity: int = 0,
461 ) -> None:
462 """
463 Switch the repo at ``dest`` to point to ``URL``.
464
465 Args:
466 rev_options: a RevOptions object.
467 """
468 raise NotImplementedError
469
470 def update(
471 self,
472 dest: str,
473 url: HiddenText,
474 rev_options: RevOptions,
475 verbosity: int = 0,
476 ) -> None:
477 """
478 Update an already-existing repo to the given ``rev_options``.
479
480 Args:
481 rev_options: a RevOptions object.
482 """
483 raise NotImplementedError
484
485 @classmethod
486 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool:
487 """
488 Return whether the id of the current commit equals the given name.
489
490 Args:
491 dest: the repository directory.
492 name: a string name.
493 """
494 raise NotImplementedError
495
496 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
497 """
498 Install or update in editable mode the package represented by this
499 VersionControl object.
500
501 :param dest: the repository directory in which to install or update.
502 :param url: the repository URL starting with a vcs prefix.
503 :param verbosity: verbosity level.
504 """
505 url, rev_options = self.get_url_rev_options(url)
506
507 if not os.path.exists(dest):
508 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
509 return
510
511 rev_display = rev_options.to_display()
512 if self.is_repository_directory(dest):
513 existing_url = self.get_remote_url(dest)
514 if self.compare_urls(existing_url, url.secret):
515 logger.debug(
516 "%s in %s exists, and has correct URL (%s)",
517 self.repo_name.title(),
518 display_path(dest),
519 url,
520 )
521 if not self.is_commit_id_equal(dest, rev_options.rev):
522 logger.info(
523 "Updating %s %s%s",
524 display_path(dest),
525 self.repo_name,
526 rev_display,
527 )
528 self.update(dest, url, rev_options, verbosity=verbosity)
529 else:
530 logger.info("Skipping because already up-to-date.")
531 return
532
533 logger.warning(
534 "%s %s in %s exists with URL %s",
535 self.name,
536 self.repo_name,
537 display_path(dest),
538 existing_url,
539 )
540 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
541 else:
542 logger.warning(
543 "Directory %s already exists, and is not a %s %s.",
544 dest,
545 self.name,
546 self.repo_name,
547 )
548 # https://github.com/python/mypy/issues/1174
549 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
550
551 logger.warning(
552 "The plan is to install the %s repository %s",
553 self.name,
554 url,
555 )
556 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1])
557
558 if response == "a":
559 sys.exit(-1)
560
561 if response == "w":
562 logger.warning("Deleting %s", display_path(dest))
563 rmtree(dest)
564 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
565 return
566
567 if response == "b":
568 dest_dir = backup_dir(dest)
569 logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
570 shutil.move(dest, dest_dir)
571 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
572 return
573
574 # Do nothing if the response is "i".
575 if response == "s":
576 logger.info(
577 "Switching %s %s to %s%s",
578 self.repo_name,
579 display_path(dest),
580 url,
581 rev_display,
582 )
583 self.switch(dest, url, rev_options, verbosity=verbosity)
584
585 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
586 """
587 Clean up current location and download the url repository
588 (and vcs infos) into location
589
590 :param url: the repository URL starting with a vcs prefix.
591 :param verbosity: verbosity level.
592 """
593 if os.path.exists(location):
594 rmtree(location)
595 self.obtain(location, url=url, verbosity=verbosity)
596
597 @classmethod
598 def get_remote_url(cls, location: str) -> str:
599 """
600 Return the url used at location
601
602 Raises RemoteNotFoundError if the repository does not have a remote
603 url configured.
604 """
605 raise NotImplementedError
606
607 @classmethod
608 def get_revision(cls, location: str) -> str:
609 """
610 Return the current commit id of the files at the given location.
611 """
612 raise NotImplementedError
613
614 @classmethod
615 def run_command(
616 cls,
617 cmd: list[str] | CommandArgs,
618 show_stdout: bool = True,
619 cwd: str | None = None,
620 on_returncode: Literal["raise", "warn", "ignore"] = "raise",
621 extra_ok_returncodes: Iterable[int] | None = None,
622 command_desc: str | None = None,
623 extra_environ: Mapping[str, Any] | None = None,
624 spinner: SpinnerInterface | None = None,
625 log_failed_cmd: bool = True,
626 stdout_only: bool = False,
627 ) -> str:
628 """
629 Run a VCS subcommand
630 This is simply a wrapper around call_subprocess that adds the VCS
631 command name, and checks that the VCS is available
632 """
633 cmd = make_command(cls.name, *cmd)
634 if command_desc is None:
635 command_desc = format_command_args(cmd)
636 try:
637 return call_subprocess(
638 cmd,
639 show_stdout,
640 cwd,
641 on_returncode=on_returncode,
642 extra_ok_returncodes=extra_ok_returncodes,
643 command_desc=command_desc,
644 extra_environ=extra_environ,
645 unset_environ=cls.unset_environ,
646 spinner=spinner,
647 log_failed_cmd=log_failed_cmd,
648 stdout_only=stdout_only,
649 )
650 except NotADirectoryError:
651 raise BadCommand(f"Cannot find command {cls.name!r} - invalid PATH")
652 except FileNotFoundError:
653 # errno.ENOENT = no such file or directory
654 # In other words, the VCS executable isn't available
655 raise BadCommand(
656 f"Cannot find command {cls.name!r} - do you have "
657 f"{cls.name!r} installed and in your PATH?"
658 )
659 except PermissionError:
660 # errno.EACCES = Permission denied
661 # This error occurs, for instance, when the command is installed
662 # only for another user. So, the current user don't have
663 # permission to call the other user command.
664 raise BadCommand(
665 f"No permission to execute {cls.name!r} - install it "
666 f"locally, globally (ask admin), or check your PATH. "
667 f"See possible solutions at "
668 f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
669 f"#fixing-permission-denied."
670 )
671
672 @classmethod
673 def is_repository_directory(cls, path: str) -> bool:
674 """
675 Return whether a directory path is a repository directory.
676 """
677 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
678 return os.path.exists(os.path.join(path, cls.dirname))
679
680 @classmethod
681 def get_repository_root(cls, location: str) -> str | None:
682 """
683 Return the "root" (top-level) directory controlled by the vcs,
684 or `None` if the directory is not in any.
685
686 It is meant to be overridden to implement smarter detection
687 mechanisms for specific vcs.
688
689 This can do more than is_repository_directory() alone. For
690 example, the Git override checks that Git is actually available.
691 """
692 if cls.is_repository_directory(location):
693 return location
694 return None