1from __future__ import annotations
2
3import logging
4import os.path
5import pathlib
6import re
7import urllib.parse
8import urllib.request
9from dataclasses import replace
10from typing import Any
11
12from pip._internal.exceptions import BadCommand, InstallationError
13from pip._internal.utils.misc import HiddenText, display_path, hide_url
14from pip._internal.utils.subprocess import make_command
15from pip._internal.vcs.versioncontrol import (
16 AuthInfo,
17 RemoteNotFoundError,
18 RemoteNotValidError,
19 RevOptions,
20 VersionControl,
21 find_path_to_project_root_from_repo_root,
22 vcs,
23)
24
25urlsplit = urllib.parse.urlsplit
26urlunsplit = urllib.parse.urlunsplit
27
28
29logger = logging.getLogger(__name__)
30
31
32GIT_VERSION_REGEX = re.compile(
33 r"^git version " # Prefix.
34 r"(\d+)" # Major.
35 r"\.(\d+)" # Dot, minor.
36 r"(?:\.(\d+))?" # Optional dot, patch.
37 r".*$" # Suffix, including any pre- and post-release segments we don't care about.
38)
39
40HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")
41
42# SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git'
43SCP_REGEX = re.compile(
44 r"""^
45 # Optional user, e.g. 'git@'
46 (\w+@)?
47 # Server, e.g. 'github.com'.
48 ([^/:]+):
49 # The server-side path. e.g. 'user/project.git'. Must start with an
50 # alphanumeric character so as not to be confusable with a Windows paths
51 # like 'C:/foo/bar' or 'C:\foo\bar'.
52 (\w[^:]*)
53 $""",
54 re.VERBOSE,
55)
56
57
58def looks_like_hash(sha: str) -> bool:
59 return bool(HASH_REGEX.match(sha))
60
61
62class Git(VersionControl):
63 name = "git"
64 dirname = ".git"
65 repo_name = "clone"
66 schemes = (
67 "git+http",
68 "git+https",
69 "git+ssh",
70 "git+git",
71 "git+file",
72 )
73 # Prevent the user's environment variables from interfering with pip:
74 # https://github.com/pypa/pip/issues/1130
75 unset_environ = ("GIT_DIR", "GIT_WORK_TREE")
76 default_arg_rev = "HEAD"
77
78 @staticmethod
79 def get_base_rev_args(rev: str) -> list[str]:
80 return [rev]
81
82 @classmethod
83 def run_command(cls, *args: Any, **kwargs: Any) -> str:
84 if os.environ.get("PIP_NO_INPUT"):
85 extra_environ = kwargs.get("extra_environ", {})
86 extra_environ["GIT_TERMINAL_PROMPT"] = "0"
87 extra_environ["GIT_SSH_COMMAND"] = "ssh -oBatchMode=yes"
88 kwargs["extra_environ"] = extra_environ
89 return super().run_command(*args, **kwargs)
90
91 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
92 _, rev_options = self.get_url_rev_options(hide_url(url))
93 if not rev_options.rev:
94 return False
95 if not self.is_commit_id_equal(dest, rev_options.rev):
96 # the current commit is different from rev,
97 # which means rev was something else than a commit hash
98 return False
99 # return False in the rare case rev is both a commit hash
100 # and a tag or a branch; we don't want to cache in that case
101 # because that branch/tag could point to something else in the future
102 is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])
103 return not is_tag_or_branch
104
105 def get_git_version(self) -> tuple[int, ...]:
106 version = self.run_command(
107 ["version"],
108 command_desc="git version",
109 show_stdout=False,
110 stdout_only=True,
111 )
112 match = GIT_VERSION_REGEX.match(version)
113 if not match:
114 logger.warning("Can't parse git version: %s", version)
115 return ()
116 return (int(match.group(1)), int(match.group(2)))
117
118 @classmethod
119 def get_current_branch(cls, location: str) -> str | None:
120 """
121 Return the current branch, or None if HEAD isn't at a branch
122 (e.g. detached HEAD).
123 """
124 # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
125 # HEAD rather than a symbolic ref. In addition, the -q causes the
126 # command to exit with status code 1 instead of 128 in this case
127 # and to suppress the message to stderr.
128 args = ["symbolic-ref", "-q", "HEAD"]
129 output = cls.run_command(
130 args,
131 extra_ok_returncodes=(1,),
132 show_stdout=False,
133 stdout_only=True,
134 cwd=location,
135 )
136 ref = output.strip()
137
138 if ref.startswith("refs/heads/"):
139 return ref[len("refs/heads/") :]
140
141 return None
142
143 @classmethod
144 def get_revision_sha(cls, dest: str, rev: str) -> tuple[str | None, bool]:
145 """
146 Return (sha_or_none, is_branch), where sha_or_none is a commit hash
147 if the revision names a remote branch or tag, otherwise None.
148
149 Args:
150 dest: the repository directory.
151 rev: the revision name.
152 """
153 # Pass rev to pre-filter the list.
154 output = cls.run_command(
155 ["show-ref", rev],
156 cwd=dest,
157 show_stdout=False,
158 stdout_only=True,
159 on_returncode="ignore",
160 )
161 refs = {}
162 # NOTE: We do not use splitlines here since that would split on other
163 # unicode separators, which can be maliciously used to install a
164 # different revision.
165 for line in output.strip().split("\n"):
166 line = line.rstrip("\r")
167 if not line:
168 continue
169 try:
170 ref_sha, ref_name = line.split(" ", maxsplit=2)
171 except ValueError:
172 # Include the offending line to simplify troubleshooting if
173 # this error ever occurs.
174 raise ValueError(f"unexpected show-ref line: {line!r}")
175
176 refs[ref_name] = ref_sha
177
178 branch_ref = f"refs/remotes/origin/{rev}"
179 tag_ref = f"refs/tags/{rev}"
180
181 sha = refs.get(branch_ref)
182 if sha is not None:
183 return (sha, True)
184
185 sha = refs.get(tag_ref)
186
187 return (sha, False)
188
189 @classmethod
190 def _should_fetch(cls, dest: str, rev: str) -> bool:
191 """
192 Return true if rev is a ref or is a commit that we don't have locally.
193
194 Branches and tags are not considered in this method because they are
195 assumed to be always available locally (which is a normal outcome of
196 ``git clone`` and ``git fetch --tags``).
197 """
198 if rev.startswith("refs/"):
199 # Always fetch remote refs.
200 return True
201
202 if not looks_like_hash(rev):
203 # Git fetch would fail with abbreviated commits.
204 return False
205
206 if cls.has_commit(dest, rev):
207 # Don't fetch if we have the commit locally.
208 return False
209
210 return True
211
212 @classmethod
213 def resolve_revision(
214 cls, dest: str, url: HiddenText, rev_options: RevOptions
215 ) -> RevOptions:
216 """
217 Resolve a revision to a new RevOptions object with the SHA1 of the
218 branch, tag, or ref if found.
219
220 Args:
221 rev_options: a RevOptions object.
222 """
223 rev = rev_options.arg_rev
224 # The arg_rev property's implementation for Git ensures that the
225 # rev return value is always non-None.
226 assert rev is not None
227
228 sha, is_branch = cls.get_revision_sha(dest, rev)
229
230 if sha is not None:
231 rev_options = rev_options.make_new(sha)
232 rev_options = replace(rev_options, branch_name=(rev if is_branch else None))
233
234 return rev_options
235
236 # Do not show a warning for the common case of something that has
237 # the form of a Git commit hash.
238 if not looks_like_hash(rev):
239 logger.info(
240 "Did not find branch or tag '%s', assuming revision or ref.",
241 rev,
242 )
243
244 if not cls._should_fetch(dest, rev):
245 return rev_options
246
247 # fetch the requested revision
248 cls.run_command(
249 make_command("fetch", "-q", url, rev_options.to_args()),
250 cwd=dest,
251 )
252 # Change the revision to the SHA of the ref we fetched
253 sha = cls.get_revision(dest, rev="FETCH_HEAD")
254 rev_options = rev_options.make_new(sha)
255
256 return rev_options
257
258 @classmethod
259 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool:
260 """
261 Return whether the current commit hash equals the given name.
262
263 Args:
264 dest: the repository directory.
265 name: a string name.
266 """
267 if not name:
268 # Then avoid an unnecessary subprocess call.
269 return False
270
271 return cls.get_revision(dest) == name
272
273 def fetch_new(
274 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
275 ) -> None:
276 rev_display = rev_options.to_display()
277 logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
278 if verbosity <= 0:
279 flags: tuple[str, ...] = ("--quiet",)
280 elif verbosity == 1:
281 flags = ()
282 else:
283 flags = ("--verbose", "--progress")
284 if self.get_git_version() >= (2, 17):
285 # Git added support for partial clone in 2.17
286 # https://git-scm.com/docs/partial-clone
287 # Speeds up cloning by functioning without a complete copy of repository
288 self.run_command(
289 make_command(
290 "clone",
291 "--filter=blob:none",
292 *flags,
293 url,
294 dest,
295 )
296 )
297 else:
298 self.run_command(make_command("clone", *flags, url, dest))
299
300 if rev_options.rev:
301 # Then a specific revision was requested.
302 rev_options = self.resolve_revision(dest, url, rev_options)
303 branch_name = getattr(rev_options, "branch_name", None)
304 logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)
305 if branch_name is None:
306 # Only do a checkout if the current commit id doesn't match
307 # the requested revision.
308 if not self.is_commit_id_equal(dest, rev_options.rev):
309 cmd_args = make_command(
310 "checkout",
311 "-q",
312 rev_options.to_args(),
313 )
314 self.run_command(cmd_args, cwd=dest)
315 elif self.get_current_branch(dest) != branch_name:
316 # Then a specific branch was requested, and that branch
317 # is not yet checked out.
318 track_branch = f"origin/{branch_name}"
319 cmd_args = [
320 "checkout",
321 "-b",
322 branch_name,
323 "--track",
324 track_branch,
325 ]
326 self.run_command(cmd_args, cwd=dest)
327 else:
328 sha = self.get_revision(dest)
329 rev_options = rev_options.make_new(sha)
330
331 logger.info("Resolved %s to commit %s", url, rev_options.rev)
332
333 #: repo may contain submodules
334 self.update_submodules(dest, verbosity=verbosity)
335
336 def switch(
337 self,
338 dest: str,
339 url: HiddenText,
340 rev_options: RevOptions,
341 verbosity: int = 0,
342 ) -> None:
343 self.run_command(
344 make_command("config", "remote.origin.url", url),
345 cwd=dest,
346 )
347
348 extra_flags = []
349
350 if verbosity <= 0:
351 extra_flags.append("-q")
352
353 cmd_args = make_command("checkout", *extra_flags, rev_options.to_args())
354 self.run_command(cmd_args, cwd=dest)
355
356 self.update_submodules(dest, verbosity=verbosity)
357
358 def update(
359 self,
360 dest: str,
361 url: HiddenText,
362 rev_options: RevOptions,
363 verbosity: int = 0,
364 ) -> None:
365 extra_flags = []
366
367 if verbosity <= 0:
368 extra_flags.append("-q")
369
370 # First fetch changes from the default remote
371 if self.get_git_version() >= (1, 9):
372 # fetch tags in addition to everything else
373 self.run_command(["fetch", "--tags", *extra_flags], cwd=dest)
374 else:
375 self.run_command(["fetch", *extra_flags], cwd=dest)
376 # Then reset to wanted revision (maybe even origin/master)
377 rev_options = self.resolve_revision(dest, url, rev_options)
378 cmd_args = make_command(
379 "reset",
380 "--hard",
381 *extra_flags,
382 rev_options.to_args(),
383 )
384 self.run_command(cmd_args, cwd=dest)
385 #: update submodules
386 self.update_submodules(dest, verbosity=verbosity)
387
388 @classmethod
389 def get_remote_url(cls, location: str) -> str:
390 """
391 Return URL of the first remote encountered.
392
393 Raises RemoteNotFoundError if the repository does not have a remote
394 url configured.
395 """
396 # We need to pass 1 for extra_ok_returncodes since the command
397 # exits with return code 1 if there are no matching lines.
398 stdout = cls.run_command(
399 ["config", "--get-regexp", r"remote\..*\.url"],
400 extra_ok_returncodes=(1,),
401 show_stdout=False,
402 stdout_only=True,
403 cwd=location,
404 )
405 remotes = stdout.splitlines()
406 try:
407 found_remote = remotes[0]
408 except IndexError:
409 raise RemoteNotFoundError
410
411 for remote in remotes:
412 if remote.startswith("remote.origin.url "):
413 found_remote = remote
414 break
415 url = found_remote.split(" ")[1]
416 return cls._git_remote_to_pip_url(url.strip())
417
418 @staticmethod
419 def _git_remote_to_pip_url(url: str) -> str:
420 """
421 Convert a remote url from what git uses to what pip accepts.
422
423 There are 3 legal forms **url** may take:
424
425 1. A fully qualified url: ssh://git@example.com/foo/bar.git
426 2. A local project.git folder: /path/to/bare/repository.git
427 3. SCP shorthand for form 1: git@example.com:foo/bar.git
428
429 Form 1 is output as-is. Form 2 must be converted to URI and form 3 must
430 be converted to form 1.
431
432 See the corresponding test test_git_remote_url_to_pip() for examples of
433 sample inputs/outputs.
434 """
435 if re.match(r"\w+://", url):
436 # This is already valid. Pass it though as-is.
437 return url
438 if os.path.exists(url):
439 # A local bare remote (git clone --mirror).
440 # Needs a file:// prefix.
441 return pathlib.PurePath(url).as_uri()
442 scp_match = SCP_REGEX.match(url)
443 if scp_match:
444 # Add an ssh:// prefix and replace the ':' with a '/'.
445 return scp_match.expand(r"ssh://\1\2/\3")
446 # Otherwise, bail out.
447 raise RemoteNotValidError(url)
448
449 @classmethod
450 def has_commit(cls, location: str, rev: str) -> bool:
451 """
452 Check if rev is a commit that is available in the local repository.
453 """
454 try:
455 cls.run_command(
456 ["rev-parse", "-q", "--verify", "sha^" + rev],
457 cwd=location,
458 log_failed_cmd=False,
459 )
460 except InstallationError:
461 return False
462 else:
463 return True
464
465 @classmethod
466 def get_revision(cls, location: str, rev: str | None = None) -> str:
467 if rev is None:
468 rev = "HEAD"
469 current_rev = cls.run_command(
470 ["rev-parse", rev],
471 show_stdout=False,
472 stdout_only=True,
473 cwd=location,
474 )
475 return current_rev.strip()
476
477 @classmethod
478 def get_subdirectory(cls, location: str) -> str | None:
479 """
480 Return the path to Python project root, relative to the repo root.
481 Return None if the project root is in the repo root.
482 """
483 # find the repo root
484 git_dir = cls.run_command(
485 ["rev-parse", "--git-dir"],
486 show_stdout=False,
487 stdout_only=True,
488 cwd=location,
489 ).strip()
490 if not os.path.isabs(git_dir):
491 git_dir = os.path.join(location, git_dir)
492 repo_root = os.path.abspath(os.path.join(git_dir, ".."))
493 return find_path_to_project_root_from_repo_root(location, repo_root)
494
495 @classmethod
496 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]:
497 """
498 Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
499 That's required because although they use SSH they sometimes don't
500 work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
501 parsing. Hence we remove it again afterwards and return it as a stub.
502 """
503 # Works around an apparent Git bug
504 # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
505 scheme, netloc, path, query, fragment = urlsplit(url)
506 if scheme.endswith("file"):
507 initial_slashes = path[: -len(path.lstrip("/"))]
508 newpath = initial_slashes + urllib.request.url2pathname(path).replace(
509 "\\", "/"
510 ).lstrip("/")
511 after_plus = scheme.find("+") + 1
512 url = scheme[:after_plus] + urlunsplit(
513 (scheme[after_plus:], netloc, newpath, query, fragment),
514 )
515
516 if "://" not in url:
517 assert "file:" not in url
518 url = url.replace("git+", "git+ssh://")
519 url, rev, user_pass = super().get_url_rev_and_auth(url)
520 url = url.replace("ssh://", "")
521 else:
522 url, rev, user_pass = super().get_url_rev_and_auth(url)
523
524 return url, rev, user_pass
525
526 @classmethod
527 def update_submodules(cls, location: str, verbosity: int = 0) -> None:
528 argv = ["submodule", "update", "--init", "--recursive"]
529
530 if verbosity <= 0:
531 argv.append("-q")
532
533 if not os.path.exists(os.path.join(location, ".gitmodules")):
534 return
535 cls.run_command(
536 argv,
537 cwd=location,
538 )
539
540 @classmethod
541 def get_repository_root(cls, location: str) -> str | None:
542 loc = super().get_repository_root(location)
543 if loc:
544 return loc
545 try:
546 r = cls.run_command(
547 ["rev-parse", "--show-toplevel"],
548 cwd=location,
549 show_stdout=False,
550 stdout_only=True,
551 on_returncode="raise",
552 log_failed_cmd=False,
553 )
554 except BadCommand:
555 logger.debug(
556 "could not determine if %s is under git control "
557 "because git is not available",
558 location,
559 )
560 return None
561 except InstallationError:
562 return None
563 return os.path.normpath(r.rstrip("\r\n"))
564
565 @staticmethod
566 def should_add_vcs_url_prefix(repo_url: str) -> bool:
567 """In either https or ssh form, requirements must be prefixed with git+."""
568 return True
569
570
571vcs.register(Git)