Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/git.py: 27%

224 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1import logging 

2import os.path 

3import pathlib 

4import re 

5import urllib.parse 

6import urllib.request 

7from typing import List, Optional, Tuple 

8 

9from pip._internal.exceptions import BadCommand, InstallationError 

10from pip._internal.utils.misc import HiddenText, display_path, hide_url 

11from pip._internal.utils.subprocess import make_command 

12from pip._internal.vcs.versioncontrol import ( 

13 AuthInfo, 

14 RemoteNotFoundError, 

15 RemoteNotValidError, 

16 RevOptions, 

17 VersionControl, 

18 find_path_to_project_root_from_repo_root, 

19 vcs, 

20) 

21 

22urlsplit = urllib.parse.urlsplit 

23urlunsplit = urllib.parse.urlunsplit 

24 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29GIT_VERSION_REGEX = re.compile( 

30 r"^git version " # Prefix. 

31 r"(\d+)" # Major. 

32 r"\.(\d+)" # Dot, minor. 

33 r"(?:\.(\d+))?" # Optional dot, patch. 

34 r".*$" # Suffix, including any pre- and post-release segments we don't care about. 

35) 

36 

37HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$") 

38 

39# SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git' 

40SCP_REGEX = re.compile( 

41 r"""^ 

42 # Optional user, e.g. 'git@' 

43 (\w+@)? 

44 # Server, e.g. 'github.com'. 

45 ([^/:]+): 

46 # The server-side path. e.g. 'user/project.git'. Must start with an 

47 # alphanumeric character so as not to be confusable with a Windows paths 

48 # like 'C:/foo/bar' or 'C:\foo\bar'. 

49 (\w[^:]*) 

50 $""", 

51 re.VERBOSE, 

52) 

53 

54 

55def looks_like_hash(sha: str) -> bool: 

56 return bool(HASH_REGEX.match(sha)) 

57 

58 

59class Git(VersionControl): 

60 name = "git" 

61 dirname = ".git" 

62 repo_name = "clone" 

63 schemes = ( 

64 "git+http", 

65 "git+https", 

66 "git+ssh", 

67 "git+git", 

68 "git+file", 

69 ) 

70 # Prevent the user's environment variables from interfering with pip: 

71 # https://github.com/pypa/pip/issues/1130 

72 unset_environ = ("GIT_DIR", "GIT_WORK_TREE") 

73 default_arg_rev = "HEAD" 

74 

75 @staticmethod 

76 def get_base_rev_args(rev: str) -> List[str]: 

77 return [rev] 

78 

79 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

80 _, rev_options = self.get_url_rev_options(hide_url(url)) 

81 if not rev_options.rev: 

82 return False 

83 if not self.is_commit_id_equal(dest, rev_options.rev): 

84 # the current commit is different from rev, 

85 # which means rev was something else than a commit hash 

86 return False 

87 # return False in the rare case rev is both a commit hash 

88 # and a tag or a branch; we don't want to cache in that case 

89 # because that branch/tag could point to something else in the future 

90 is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0]) 

91 return not is_tag_or_branch 

92 

93 def get_git_version(self) -> Tuple[int, ...]: 

94 version = self.run_command( 

95 ["version"], 

96 command_desc="git version", 

97 show_stdout=False, 

98 stdout_only=True, 

99 ) 

100 match = GIT_VERSION_REGEX.match(version) 

101 if not match: 

102 logger.warning("Can't parse git version: %s", version) 

103 return () 

104 return (int(match.group(1)), int(match.group(2))) 

105 

106 @classmethod 

107 def get_current_branch(cls, location: str) -> Optional[str]: 

108 """ 

109 Return the current branch, or None if HEAD isn't at a branch 

110 (e.g. detached HEAD). 

111 """ 

112 # git-symbolic-ref exits with empty stdout if "HEAD" is a detached 

113 # HEAD rather than a symbolic ref. In addition, the -q causes the 

114 # command to exit with status code 1 instead of 128 in this case 

115 # and to suppress the message to stderr. 

116 args = ["symbolic-ref", "-q", "HEAD"] 

117 output = cls.run_command( 

118 args, 

119 extra_ok_returncodes=(1,), 

120 show_stdout=False, 

121 stdout_only=True, 

122 cwd=location, 

123 ) 

124 ref = output.strip() 

125 

126 if ref.startswith("refs/heads/"): 

127 return ref[len("refs/heads/") :] 

128 

129 return None 

130 

131 @classmethod 

132 def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]: 

133 """ 

134 Return (sha_or_none, is_branch), where sha_or_none is a commit hash 

135 if the revision names a remote branch or tag, otherwise None. 

136 

137 Args: 

138 dest: the repository directory. 

139 rev: the revision name. 

140 """ 

141 # Pass rev to pre-filter the list. 

142 output = cls.run_command( 

143 ["show-ref", rev], 

144 cwd=dest, 

145 show_stdout=False, 

146 stdout_only=True, 

147 on_returncode="ignore", 

148 ) 

149 refs = {} 

150 # NOTE: We do not use splitlines here since that would split on other 

151 # unicode separators, which can be maliciously used to install a 

152 # different revision. 

153 for line in output.strip().split("\n"): 

154 line = line.rstrip("\r") 

155 if not line: 

156 continue 

157 try: 

158 ref_sha, ref_name = line.split(" ", maxsplit=2) 

159 except ValueError: 

160 # Include the offending line to simplify troubleshooting if 

161 # this error ever occurs. 

162 raise ValueError(f"unexpected show-ref line: {line!r}") 

163 

164 refs[ref_name] = ref_sha 

165 

166 branch_ref = f"refs/remotes/origin/{rev}" 

167 tag_ref = f"refs/tags/{rev}" 

168 

169 sha = refs.get(branch_ref) 

170 if sha is not None: 

171 return (sha, True) 

172 

173 sha = refs.get(tag_ref) 

174 

175 return (sha, False) 

176 

177 @classmethod 

178 def _should_fetch(cls, dest: str, rev: str) -> bool: 

179 """ 

180 Return true if rev is a ref or is a commit that we don't have locally. 

181 

182 Branches and tags are not considered in this method because they are 

183 assumed to be always available locally (which is a normal outcome of 

184 ``git clone`` and ``git fetch --tags``). 

185 """ 

186 if rev.startswith("refs/"): 

187 # Always fetch remote refs. 

188 return True 

189 

190 if not looks_like_hash(rev): 

191 # Git fetch would fail with abbreviated commits. 

192 return False 

193 

194 if cls.has_commit(dest, rev): 

195 # Don't fetch if we have the commit locally. 

196 return False 

197 

198 return True 

199 

200 @classmethod 

201 def resolve_revision( 

202 cls, dest: str, url: HiddenText, rev_options: RevOptions 

203 ) -> RevOptions: 

204 """ 

205 Resolve a revision to a new RevOptions object with the SHA1 of the 

206 branch, tag, or ref if found. 

207 

208 Args: 

209 rev_options: a RevOptions object. 

210 """ 

211 rev = rev_options.arg_rev 

212 # The arg_rev property's implementation for Git ensures that the 

213 # rev return value is always non-None. 

214 assert rev is not None 

215 

216 sha, is_branch = cls.get_revision_sha(dest, rev) 

217 

218 if sha is not None: 

219 rev_options = rev_options.make_new(sha) 

220 rev_options.branch_name = rev if is_branch else None 

221 

222 return rev_options 

223 

224 # Do not show a warning for the common case of something that has 

225 # the form of a Git commit hash. 

226 if not looks_like_hash(rev): 

227 logger.warning( 

228 "Did not find branch or tag '%s', assuming revision or ref.", 

229 rev, 

230 ) 

231 

232 if not cls._should_fetch(dest, rev): 

233 return rev_options 

234 

235 # fetch the requested revision 

236 cls.run_command( 

237 make_command("fetch", "-q", url, rev_options.to_args()), 

238 cwd=dest, 

239 ) 

240 # Change the revision to the SHA of the ref we fetched 

241 sha = cls.get_revision(dest, rev="FETCH_HEAD") 

242 rev_options = rev_options.make_new(sha) 

243 

244 return rev_options 

245 

246 @classmethod 

247 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: 

248 """ 

249 Return whether the current commit hash equals the given name. 

250 

251 Args: 

252 dest: the repository directory. 

253 name: a string name. 

254 """ 

255 if not name: 

256 # Then avoid an unnecessary subprocess call. 

257 return False 

258 

259 return cls.get_revision(dest) == name 

260 

261 def fetch_new( 

262 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

263 ) -> None: 

264 rev_display = rev_options.to_display() 

265 logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest)) 

266 if verbosity <= 0: 

267 flags: Tuple[str, ...] = ("--quiet",) 

268 elif verbosity == 1: 

269 flags = () 

270 else: 

271 flags = ("--verbose", "--progress") 

272 if self.get_git_version() >= (2, 17): 

273 # Git added support for partial clone in 2.17 

274 # https://git-scm.com/docs/partial-clone 

275 # Speeds up cloning by functioning without a complete copy of repository 

276 self.run_command( 

277 make_command( 

278 "clone", 

279 "--filter=blob:none", 

280 *flags, 

281 url, 

282 dest, 

283 ) 

284 ) 

285 else: 

286 self.run_command(make_command("clone", *flags, url, dest)) 

287 

288 if rev_options.rev: 

289 # Then a specific revision was requested. 

290 rev_options = self.resolve_revision(dest, url, rev_options) 

291 branch_name = getattr(rev_options, "branch_name", None) 

292 logger.debug("Rev options %s, branch_name %s", rev_options, branch_name) 

293 if branch_name is None: 

294 # Only do a checkout if the current commit id doesn't match 

295 # the requested revision. 

296 if not self.is_commit_id_equal(dest, rev_options.rev): 

297 cmd_args = make_command( 

298 "checkout", 

299 "-q", 

300 rev_options.to_args(), 

301 ) 

302 self.run_command(cmd_args, cwd=dest) 

303 elif self.get_current_branch(dest) != branch_name: 

304 # Then a specific branch was requested, and that branch 

305 # is not yet checked out. 

306 track_branch = f"origin/{branch_name}" 

307 cmd_args = [ 

308 "checkout", 

309 "-b", 

310 branch_name, 

311 "--track", 

312 track_branch, 

313 ] 

314 self.run_command(cmd_args, cwd=dest) 

315 else: 

316 sha = self.get_revision(dest) 

317 rev_options = rev_options.make_new(sha) 

318 

319 logger.info("Resolved %s to commit %s", url, rev_options.rev) 

320 

321 #: repo may contain submodules 

322 self.update_submodules(dest) 

323 

324 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

325 self.run_command( 

326 make_command("config", "remote.origin.url", url), 

327 cwd=dest, 

328 ) 

329 cmd_args = make_command("checkout", "-q", rev_options.to_args()) 

330 self.run_command(cmd_args, cwd=dest) 

331 

332 self.update_submodules(dest) 

333 

334 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

335 # First fetch changes from the default remote 

336 if self.get_git_version() >= (1, 9): 

337 # fetch tags in addition to everything else 

338 self.run_command(["fetch", "-q", "--tags"], cwd=dest) 

339 else: 

340 self.run_command(["fetch", "-q"], cwd=dest) 

341 # Then reset to wanted revision (maybe even origin/master) 

342 rev_options = self.resolve_revision(dest, url, rev_options) 

343 cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args()) 

344 self.run_command(cmd_args, cwd=dest) 

345 #: update submodules 

346 self.update_submodules(dest) 

347 

348 @classmethod 

349 def get_remote_url(cls, location: str) -> str: 

350 """ 

351 Return URL of the first remote encountered. 

352 

353 Raises RemoteNotFoundError if the repository does not have a remote 

354 url configured. 

355 """ 

356 # We need to pass 1 for extra_ok_returncodes since the command 

357 # exits with return code 1 if there are no matching lines. 

358 stdout = cls.run_command( 

359 ["config", "--get-regexp", r"remote\..*\.url"], 

360 extra_ok_returncodes=(1,), 

361 show_stdout=False, 

362 stdout_only=True, 

363 cwd=location, 

364 ) 

365 remotes = stdout.splitlines() 

366 try: 

367 found_remote = remotes[0] 

368 except IndexError: 

369 raise RemoteNotFoundError 

370 

371 for remote in remotes: 

372 if remote.startswith("remote.origin.url "): 

373 found_remote = remote 

374 break 

375 url = found_remote.split(" ")[1] 

376 return cls._git_remote_to_pip_url(url.strip()) 

377 

378 @staticmethod 

379 def _git_remote_to_pip_url(url: str) -> str: 

380 """ 

381 Convert a remote url from what git uses to what pip accepts. 

382 

383 There are 3 legal forms **url** may take: 

384 

385 1. A fully qualified url: ssh://git@example.com/foo/bar.git 

386 2. A local project.git folder: /path/to/bare/repository.git 

387 3. SCP shorthand for form 1: git@example.com:foo/bar.git 

388 

389 Form 1 is output as-is. Form 2 must be converted to URI and form 3 must 

390 be converted to form 1. 

391 

392 See the corresponding test test_git_remote_url_to_pip() for examples of 

393 sample inputs/outputs. 

394 """ 

395 if re.match(r"\w+://", url): 

396 # This is already valid. Pass it though as-is. 

397 return url 

398 if os.path.exists(url): 

399 # A local bare remote (git clone --mirror). 

400 # Needs a file:// prefix. 

401 return pathlib.PurePath(url).as_uri() 

402 scp_match = SCP_REGEX.match(url) 

403 if scp_match: 

404 # Add an ssh:// prefix and replace the ':' with a '/'. 

405 return scp_match.expand(r"ssh://\1\2/\3") 

406 # Otherwise, bail out. 

407 raise RemoteNotValidError(url) 

408 

409 @classmethod 

410 def has_commit(cls, location: str, rev: str) -> bool: 

411 """ 

412 Check if rev is a commit that is available in the local repository. 

413 """ 

414 try: 

415 cls.run_command( 

416 ["rev-parse", "-q", "--verify", "sha^" + rev], 

417 cwd=location, 

418 log_failed_cmd=False, 

419 ) 

420 except InstallationError: 

421 return False 

422 else: 

423 return True 

424 

425 @classmethod 

426 def get_revision(cls, location: str, rev: Optional[str] = None) -> str: 

427 if rev is None: 

428 rev = "HEAD" 

429 current_rev = cls.run_command( 

430 ["rev-parse", rev], 

431 show_stdout=False, 

432 stdout_only=True, 

433 cwd=location, 

434 ) 

435 return current_rev.strip() 

436 

437 @classmethod 

438 def get_subdirectory(cls, location: str) -> Optional[str]: 

439 """ 

440 Return the path to Python project root, relative to the repo root. 

441 Return None if the project root is in the repo root. 

442 """ 

443 # find the repo root 

444 git_dir = cls.run_command( 

445 ["rev-parse", "--git-dir"], 

446 show_stdout=False, 

447 stdout_only=True, 

448 cwd=location, 

449 ).strip() 

450 if not os.path.isabs(git_dir): 

451 git_dir = os.path.join(location, git_dir) 

452 repo_root = os.path.abspath(os.path.join(git_dir, "..")) 

453 return find_path_to_project_root_from_repo_root(location, repo_root) 

454 

455 @classmethod 

456 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: 

457 """ 

458 Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'. 

459 That's required because although they use SSH they sometimes don't 

460 work with a ssh:// scheme (e.g. GitHub). But we need a scheme for 

461 parsing. Hence we remove it again afterwards and return it as a stub. 

462 """ 

463 # Works around an apparent Git bug 

464 # (see https://article.gmane.org/gmane.comp.version-control.git/146500) 

465 scheme, netloc, path, query, fragment = urlsplit(url) 

466 if scheme.endswith("file"): 

467 initial_slashes = path[: -len(path.lstrip("/"))] 

468 newpath = initial_slashes + urllib.request.url2pathname(path).replace( 

469 "\\", "/" 

470 ).lstrip("/") 

471 after_plus = scheme.find("+") + 1 

472 url = scheme[:after_plus] + urlunsplit( 

473 (scheme[after_plus:], netloc, newpath, query, fragment), 

474 ) 

475 

476 if "://" not in url: 

477 assert "file:" not in url 

478 url = url.replace("git+", "git+ssh://") 

479 url, rev, user_pass = super().get_url_rev_and_auth(url) 

480 url = url.replace("ssh://", "") 

481 else: 

482 url, rev, user_pass = super().get_url_rev_and_auth(url) 

483 

484 return url, rev, user_pass 

485 

486 @classmethod 

487 def update_submodules(cls, location: str) -> None: 

488 if not os.path.exists(os.path.join(location, ".gitmodules")): 

489 return 

490 cls.run_command( 

491 ["submodule", "update", "--init", "--recursive", "-q"], 

492 cwd=location, 

493 ) 

494 

495 @classmethod 

496 def get_repository_root(cls, location: str) -> Optional[str]: 

497 loc = super().get_repository_root(location) 

498 if loc: 

499 return loc 

500 try: 

501 r = cls.run_command( 

502 ["rev-parse", "--show-toplevel"], 

503 cwd=location, 

504 show_stdout=False, 

505 stdout_only=True, 

506 on_returncode="raise", 

507 log_failed_cmd=False, 

508 ) 

509 except BadCommand: 

510 logger.debug( 

511 "could not determine if %s is under git control " 

512 "because git is not available", 

513 location, 

514 ) 

515 return None 

516 except InstallationError: 

517 return None 

518 return os.path.normpath(r.rstrip("\r\n")) 

519 

520 @staticmethod 

521 def should_add_vcs_url_prefix(repo_url: str) -> bool: 

522 """In either https or ssh form, requirements must be prefixed with git+.""" 

523 return True 

524 

525 

526vcs.register(Git)