Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/vcs/git.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

243 statements  

1from __future__ import annotations 

2 

3import logging 

4import os.path 

5import pathlib 

6import re 

7import urllib.parse 

8import urllib.request 

9from dataclasses import replace 

10from typing import Any 

11 

12from pip._internal.exceptions import BadCommand, InstallationError 

13from pip._internal.utils.misc import HiddenText, display_path, hide_url 

14from pip._internal.utils.subprocess import make_command 

15from pip._internal.vcs.versioncontrol import ( 

16 AuthInfo, 

17 RemoteNotFoundError, 

18 RemoteNotValidError, 

19 RevOptions, 

20 VersionControl, 

21 find_path_to_project_root_from_repo_root, 

22 vcs, 

23) 

24 

25urlsplit = urllib.parse.urlsplit 

26urlunsplit = urllib.parse.urlunsplit 

27 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32GIT_VERSION_REGEX = re.compile( 

33 r"^git version " # Prefix. 

34 r"(\d+)" # Major. 

35 r"\.(\d+)" # Dot, minor. 

36 r"(?:\.(\d+))?" # Optional dot, patch. 

37 r".*$" # Suffix, including any pre- and post-release segments we don't care about. 

38) 

39 

40HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$") 

41 

42# SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git' 

43SCP_REGEX = re.compile( 

44 r"""^ 

45 # Optional user, e.g. 'git@' 

46 (\w+@)? 

47 # Server, e.g. 'github.com'. 

48 ([^/:]+): 

49 # The server-side path. e.g. 'user/project.git'. Must start with an 

50 # alphanumeric character so as not to be confusable with a Windows paths 

51 # like 'C:/foo/bar' or 'C:\foo\bar'. 

52 (\w[^:]*) 

53 $""", 

54 re.VERBOSE, 

55) 

56 

57 

58def looks_like_hash(sha: str) -> bool: 

59 return bool(HASH_REGEX.match(sha)) 

60 

61 

62class Git(VersionControl): 

63 name = "git" 

64 dirname = ".git" 

65 repo_name = "clone" 

66 schemes = ( 

67 "git+http", 

68 "git+https", 

69 "git+ssh", 

70 "git+git", 

71 "git+file", 

72 ) 

73 # Prevent the user's environment variables from interfering with pip: 

74 # https://github.com/pypa/pip/issues/1130 

75 unset_environ = ("GIT_DIR", "GIT_WORK_TREE") 

76 default_arg_rev = "HEAD" 

77 

78 @staticmethod 

79 def get_base_rev_args(rev: str) -> list[str]: 

80 return [rev] 

81 

82 @classmethod 

83 def run_command(cls, *args: Any, **kwargs: Any) -> str: 

84 if os.environ.get("PIP_NO_INPUT"): 

85 extra_environ = kwargs.get("extra_environ", {}) 

86 extra_environ["GIT_TERMINAL_PROMPT"] = "0" 

87 extra_environ["GIT_SSH_COMMAND"] = "ssh -oBatchMode=yes" 

88 kwargs["extra_environ"] = extra_environ 

89 return super().run_command(*args, **kwargs) 

90 

91 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

92 _, rev_options = self.get_url_rev_options(hide_url(url)) 

93 if not rev_options.rev: 

94 return False 

95 if not self.is_commit_id_equal(dest, rev_options.rev): 

96 # the current commit is different from rev, 

97 # which means rev was something else than a commit hash 

98 return False 

99 # return False in the rare case rev is both a commit hash 

100 # and a tag or a branch; we don't want to cache in that case 

101 # because that branch/tag could point to something else in the future 

102 is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0]) 

103 return not is_tag_or_branch 

104 

105 def get_git_version(self) -> tuple[int, ...]: 

106 version = self.run_command( 

107 ["version"], 

108 command_desc="git version", 

109 show_stdout=False, 

110 stdout_only=True, 

111 ) 

112 match = GIT_VERSION_REGEX.match(version) 

113 if not match: 

114 logger.warning("Can't parse git version: %s", version) 

115 return () 

116 return (int(match.group(1)), int(match.group(2))) 

117 

118 @classmethod 

119 def get_current_branch(cls, location: str) -> str | None: 

120 """ 

121 Return the current branch, or None if HEAD isn't at a branch 

122 (e.g. detached HEAD). 

123 """ 

124 # git-symbolic-ref exits with empty stdout if "HEAD" is a detached 

125 # HEAD rather than a symbolic ref. In addition, the -q causes the 

126 # command to exit with status code 1 instead of 128 in this case 

127 # and to suppress the message to stderr. 

128 args = ["symbolic-ref", "-q", "HEAD"] 

129 output = cls.run_command( 

130 args, 

131 extra_ok_returncodes=(1,), 

132 show_stdout=False, 

133 stdout_only=True, 

134 cwd=location, 

135 ) 

136 ref = output.strip() 

137 

138 if ref.startswith("refs/heads/"): 

139 return ref[len("refs/heads/") :] 

140 

141 return None 

142 

143 @classmethod 

144 def get_revision_sha(cls, dest: str, rev: str) -> tuple[str | None, bool]: 

145 """ 

146 Return (sha_or_none, is_branch), where sha_or_none is a commit hash 

147 if the revision names a remote branch or tag, otherwise None. 

148 

149 Args: 

150 dest: the repository directory. 

151 rev: the revision name. 

152 """ 

153 # Pass rev to pre-filter the list. 

154 output = cls.run_command( 

155 ["show-ref", rev], 

156 cwd=dest, 

157 show_stdout=False, 

158 stdout_only=True, 

159 on_returncode="ignore", 

160 ) 

161 refs = {} 

162 # NOTE: We do not use splitlines here since that would split on other 

163 # unicode separators, which can be maliciously used to install a 

164 # different revision. 

165 for line in output.strip().split("\n"): 

166 line = line.rstrip("\r") 

167 if not line: 

168 continue 

169 try: 

170 ref_sha, ref_name = line.split(" ", maxsplit=2) 

171 except ValueError: 

172 # Include the offending line to simplify troubleshooting if 

173 # this error ever occurs. 

174 raise ValueError(f"unexpected show-ref line: {line!r}") 

175 

176 refs[ref_name] = ref_sha 

177 

178 branch_ref = f"refs/remotes/origin/{rev}" 

179 tag_ref = f"refs/tags/{rev}" 

180 

181 sha = refs.get(branch_ref) 

182 if sha is not None: 

183 return (sha, True) 

184 

185 sha = refs.get(tag_ref) 

186 

187 return (sha, False) 

188 

189 @classmethod 

190 def _should_fetch(cls, dest: str, rev: str) -> bool: 

191 """ 

192 Return true if rev is a ref or is a commit that we don't have locally. 

193 

194 Branches and tags are not considered in this method because they are 

195 assumed to be always available locally (which is a normal outcome of 

196 ``git clone`` and ``git fetch --tags``). 

197 """ 

198 if rev.startswith("refs/"): 

199 # Always fetch remote refs. 

200 return True 

201 

202 if not looks_like_hash(rev): 

203 # Git fetch would fail with abbreviated commits. 

204 return False 

205 

206 if cls.has_commit(dest, rev): 

207 # Don't fetch if we have the commit locally. 

208 return False 

209 

210 return True 

211 

212 @classmethod 

213 def resolve_revision( 

214 cls, dest: str, url: HiddenText, rev_options: RevOptions 

215 ) -> RevOptions: 

216 """ 

217 Resolve a revision to a new RevOptions object with the SHA1 of the 

218 branch, tag, or ref if found. 

219 

220 Args: 

221 rev_options: a RevOptions object. 

222 """ 

223 rev = rev_options.arg_rev 

224 # The arg_rev property's implementation for Git ensures that the 

225 # rev return value is always non-None. 

226 assert rev is not None 

227 

228 sha, is_branch = cls.get_revision_sha(dest, rev) 

229 

230 if sha is not None: 

231 rev_options = rev_options.make_new(sha) 

232 rev_options = replace(rev_options, branch_name=(rev if is_branch else None)) 

233 

234 return rev_options 

235 

236 # Do not show a warning for the common case of something that has 

237 # the form of a Git commit hash. 

238 if not looks_like_hash(rev): 

239 logger.info( 

240 "Did not find branch or tag '%s', assuming revision or ref.", 

241 rev, 

242 ) 

243 

244 if not cls._should_fetch(dest, rev): 

245 return rev_options 

246 

247 # fetch the requested revision 

248 cls.run_command( 

249 make_command("fetch", "-q", url, rev_options.to_args()), 

250 cwd=dest, 

251 ) 

252 # Change the revision to the SHA of the ref we fetched 

253 sha = cls.get_revision(dest, rev="FETCH_HEAD") 

254 rev_options = rev_options.make_new(sha) 

255 

256 return rev_options 

257 

258 @classmethod 

259 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool: 

260 """ 

261 Return whether the current commit hash equals the given name. 

262 

263 Args: 

264 dest: the repository directory. 

265 name: a string name. 

266 """ 

267 if not name: 

268 # Then avoid an unnecessary subprocess call. 

269 return False 

270 

271 return cls.get_revision(dest) == name 

272 

273 def fetch_new( 

274 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

275 ) -> None: 

276 rev_display = rev_options.to_display() 

277 logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest)) 

278 if verbosity <= 0: 

279 flags: tuple[str, ...] = ("--quiet",) 

280 elif verbosity == 1: 

281 flags = () 

282 else: 

283 flags = ("--verbose", "--progress") 

284 if self.get_git_version() >= (2, 17): 

285 # Git added support for partial clone in 2.17 

286 # https://git-scm.com/docs/partial-clone 

287 # Speeds up cloning by functioning without a complete copy of repository 

288 self.run_command( 

289 make_command( 

290 "clone", 

291 "--filter=blob:none", 

292 *flags, 

293 url, 

294 dest, 

295 ) 

296 ) 

297 else: 

298 self.run_command(make_command("clone", *flags, url, dest)) 

299 

300 if rev_options.rev: 

301 # Then a specific revision was requested. 

302 rev_options = self.resolve_revision(dest, url, rev_options) 

303 branch_name = getattr(rev_options, "branch_name", None) 

304 logger.debug("Rev options %s, branch_name %s", rev_options, branch_name) 

305 if branch_name is None: 

306 # Only do a checkout if the current commit id doesn't match 

307 # the requested revision. 

308 if not self.is_commit_id_equal(dest, rev_options.rev): 

309 cmd_args = make_command( 

310 "checkout", 

311 "-q", 

312 rev_options.to_args(), 

313 ) 

314 self.run_command(cmd_args, cwd=dest) 

315 elif self.get_current_branch(dest) != branch_name: 

316 # Then a specific branch was requested, and that branch 

317 # is not yet checked out. 

318 track_branch = f"origin/{branch_name}" 

319 cmd_args = [ 

320 "checkout", 

321 "-b", 

322 branch_name, 

323 "--track", 

324 track_branch, 

325 ] 

326 self.run_command(cmd_args, cwd=dest) 

327 else: 

328 sha = self.get_revision(dest) 

329 rev_options = rev_options.make_new(sha) 

330 

331 logger.info("Resolved %s to commit %s", url, rev_options.rev) 

332 

333 #: repo may contain submodules 

334 self.update_submodules(dest, verbosity=verbosity) 

335 

336 def switch( 

337 self, 

338 dest: str, 

339 url: HiddenText, 

340 rev_options: RevOptions, 

341 verbosity: int = 0, 

342 ) -> None: 

343 self.run_command( 

344 make_command("config", "remote.origin.url", url), 

345 cwd=dest, 

346 ) 

347 

348 extra_flags = [] 

349 

350 if verbosity <= 0: 

351 extra_flags.append("-q") 

352 

353 cmd_args = make_command("checkout", *extra_flags, rev_options.to_args()) 

354 self.run_command(cmd_args, cwd=dest) 

355 

356 self.update_submodules(dest, verbosity=verbosity) 

357 

358 def update( 

359 self, 

360 dest: str, 

361 url: HiddenText, 

362 rev_options: RevOptions, 

363 verbosity: int = 0, 

364 ) -> None: 

365 extra_flags = [] 

366 

367 if verbosity <= 0: 

368 extra_flags.append("-q") 

369 

370 # First fetch changes from the default remote 

371 if self.get_git_version() >= (1, 9): 

372 # fetch tags in addition to everything else 

373 self.run_command(["fetch", "--tags", *extra_flags], cwd=dest) 

374 else: 

375 self.run_command(["fetch", *extra_flags], cwd=dest) 

376 # Then reset to wanted revision (maybe even origin/master) 

377 rev_options = self.resolve_revision(dest, url, rev_options) 

378 cmd_args = make_command( 

379 "reset", 

380 "--hard", 

381 *extra_flags, 

382 rev_options.to_args(), 

383 ) 

384 self.run_command(cmd_args, cwd=dest) 

385 #: update submodules 

386 self.update_submodules(dest, verbosity=verbosity) 

387 

388 @classmethod 

389 def get_remote_url(cls, location: str) -> str: 

390 """ 

391 Return URL of the first remote encountered. 

392 

393 Raises RemoteNotFoundError if the repository does not have a remote 

394 url configured. 

395 """ 

396 # We need to pass 1 for extra_ok_returncodes since the command 

397 # exits with return code 1 if there are no matching lines. 

398 stdout = cls.run_command( 

399 ["config", "--get-regexp", r"remote\..*\.url"], 

400 extra_ok_returncodes=(1,), 

401 show_stdout=False, 

402 stdout_only=True, 

403 cwd=location, 

404 ) 

405 remotes = stdout.splitlines() 

406 try: 

407 found_remote = remotes[0] 

408 except IndexError: 

409 raise RemoteNotFoundError 

410 

411 for remote in remotes: 

412 if remote.startswith("remote.origin.url "): 

413 found_remote = remote 

414 break 

415 url = found_remote.split(" ")[1] 

416 return cls._git_remote_to_pip_url(url.strip()) 

417 

418 @staticmethod 

419 def _git_remote_to_pip_url(url: str) -> str: 

420 """ 

421 Convert a remote url from what git uses to what pip accepts. 

422 

423 There are 3 legal forms **url** may take: 

424 

425 1. A fully qualified url: ssh://git@example.com/foo/bar.git 

426 2. A local project.git folder: /path/to/bare/repository.git 

427 3. SCP shorthand for form 1: git@example.com:foo/bar.git 

428 

429 Form 1 is output as-is. Form 2 must be converted to URI and form 3 must 

430 be converted to form 1. 

431 

432 See the corresponding test test_git_remote_url_to_pip() for examples of 

433 sample inputs/outputs. 

434 """ 

435 if re.match(r"\w+://", url): 

436 # This is already valid. Pass it though as-is. 

437 return url 

438 if os.path.exists(url): 

439 # A local bare remote (git clone --mirror). 

440 # Needs a file:// prefix. 

441 return pathlib.PurePath(url).as_uri() 

442 scp_match = SCP_REGEX.match(url) 

443 if scp_match: 

444 # Add an ssh:// prefix and replace the ':' with a '/'. 

445 return scp_match.expand(r"ssh://\1\2/\3") 

446 # Otherwise, bail out. 

447 raise RemoteNotValidError(url) 

448 

449 @classmethod 

450 def has_commit(cls, location: str, rev: str) -> bool: 

451 """ 

452 Check if rev is a commit that is available in the local repository. 

453 """ 

454 try: 

455 cls.run_command( 

456 ["rev-parse", "-q", "--verify", "sha^" + rev], 

457 cwd=location, 

458 log_failed_cmd=False, 

459 ) 

460 except InstallationError: 

461 return False 

462 else: 

463 return True 

464 

465 @classmethod 

466 def get_revision(cls, location: str, rev: str | None = None) -> str: 

467 if rev is None: 

468 rev = "HEAD" 

469 current_rev = cls.run_command( 

470 ["rev-parse", rev], 

471 show_stdout=False, 

472 stdout_only=True, 

473 cwd=location, 

474 ) 

475 return current_rev.strip() 

476 

477 @classmethod 

478 def get_subdirectory(cls, location: str) -> str | None: 

479 """ 

480 Return the path to Python project root, relative to the repo root. 

481 Return None if the project root is in the repo root. 

482 """ 

483 # find the repo root 

484 git_dir = cls.run_command( 

485 ["rev-parse", "--git-dir"], 

486 show_stdout=False, 

487 stdout_only=True, 

488 cwd=location, 

489 ).strip() 

490 if not os.path.isabs(git_dir): 

491 git_dir = os.path.join(location, git_dir) 

492 repo_root = os.path.abspath(os.path.join(git_dir, "..")) 

493 return find_path_to_project_root_from_repo_root(location, repo_root) 

494 

495 @classmethod 

496 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]: 

497 """ 

498 Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'. 

499 That's required because although they use SSH they sometimes don't 

500 work with a ssh:// scheme (e.g. GitHub). But we need a scheme for 

501 parsing. Hence we remove it again afterwards and return it as a stub. 

502 """ 

503 # Works around an apparent Git bug 

504 # (see https://article.gmane.org/gmane.comp.version-control.git/146500) 

505 scheme, netloc, path, query, fragment = urlsplit(url) 

506 if scheme.endswith("file"): 

507 initial_slashes = path[: -len(path.lstrip("/"))] 

508 newpath = initial_slashes + urllib.request.url2pathname(path).replace( 

509 "\\", "/" 

510 ).lstrip("/") 

511 after_plus = scheme.find("+") + 1 

512 url = scheme[:after_plus] + urlunsplit( 

513 (scheme[after_plus:], netloc, newpath, query, fragment), 

514 ) 

515 

516 if "://" not in url: 

517 assert "file:" not in url 

518 url = url.replace("git+", "git+ssh://") 

519 url, rev, user_pass = super().get_url_rev_and_auth(url) 

520 url = url.replace("ssh://", "") 

521 else: 

522 url, rev, user_pass = super().get_url_rev_and_auth(url) 

523 

524 return url, rev, user_pass 

525 

526 @classmethod 

527 def update_submodules(cls, location: str, verbosity: int = 0) -> None: 

528 argv = ["submodule", "update", "--init", "--recursive"] 

529 

530 if verbosity <= 0: 

531 argv.append("-q") 

532 

533 if not os.path.exists(os.path.join(location, ".gitmodules")): 

534 return 

535 cls.run_command( 

536 argv, 

537 cwd=location, 

538 ) 

539 

540 @classmethod 

541 def get_repository_root(cls, location: str) -> str | None: 

542 loc = super().get_repository_root(location) 

543 if loc: 

544 return loc 

545 try: 

546 r = cls.run_command( 

547 ["rev-parse", "--show-toplevel"], 

548 cwd=location, 

549 show_stdout=False, 

550 stdout_only=True, 

551 on_returncode="raise", 

552 log_failed_cmd=False, 

553 ) 

554 except BadCommand: 

555 logger.debug( 

556 "could not determine if %s is under git control " 

557 "because git is not available", 

558 location, 

559 ) 

560 return None 

561 except InstallationError: 

562 return None 

563 return os.path.normpath(r.rstrip("\r\n")) 

564 

565 @staticmethod 

566 def should_add_vcs_url_prefix(repo_url: str) -> bool: 

567 """In either https or ssh form, requirements must be prefixed with git+.""" 

568 return True 

569 

570 

571vcs.register(Git)