Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

274 statements  

1"""Handles all VCS (version control) support""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import os 

7import shutil 

8import sys 

9import urllib.parse 

10from collections.abc import Iterable, Iterator, Mapping 

11from dataclasses import dataclass, field 

12from typing import ( 

13 Any, 

14 Literal, 

15 Optional, 

16) 

17 

18from pip._internal.cli.spinners import SpinnerInterface 

19from pip._internal.exceptions import BadCommand, InstallationError 

20from pip._internal.utils.misc import ( 

21 HiddenText, 

22 ask_path_exists, 

23 backup_dir, 

24 display_path, 

25 hide_url, 

26 hide_value, 

27 is_installable_dir, 

28 rmtree, 

29) 

30from pip._internal.utils.subprocess import ( 

31 CommandArgs, 

32 call_subprocess, 

33 format_command_args, 

34 make_command, 

35) 

36 

37__all__ = ["vcs"] 

38 

39 

40logger = logging.getLogger(__name__) 

41 

42AuthInfo = tuple[Optional[str], Optional[str]] 

43 

44 

45def is_url(name: str) -> bool: 

46 """ 

47 Return true if the name looks like a URL. 

48 """ 

49 scheme = urllib.parse.urlsplit(name).scheme 

50 if not scheme: 

51 return False 

52 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes 

53 

54 

55def make_vcs_requirement_url( 

56 repo_url: str, rev: str, project_name: str, subdir: str | None = None 

57) -> str: 

58 """ 

59 Return the URL for a VCS requirement. 

60 

61 Args: 

62 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+"). 

63 project_name: the (unescaped) project name. 

64 """ 

65 quoted_rev = urllib.parse.quote(rev, "/") 

66 egg_project_name = project_name.replace("-", "_") 

67 req = f"{repo_url}@{quoted_rev}#egg={egg_project_name}" 

68 if subdir: 

69 req += f"&subdirectory={subdir}" 

70 

71 return req 

72 

73 

74def find_path_to_project_root_from_repo_root( 

75 location: str, repo_root: str 

76) -> str | None: 

77 """ 

78 Find the the Python project's root by searching up the filesystem from 

79 `location`. Return the path to project root relative to `repo_root`. 

80 Return None if the project root is `repo_root`, or cannot be found. 

81 """ 

82 # find project root. 

83 orig_location = location 

84 while not is_installable_dir(location): 

85 last_location = location 

86 location = os.path.dirname(location) 

87 if location == last_location: 

88 # We've traversed up to the root of the filesystem without 

89 # finding a Python project. 

90 logger.warning( 

91 "Could not find a Python project for directory %s (tried all " 

92 "parent directories)", 

93 orig_location, 

94 ) 

95 return None 

96 

97 if os.path.samefile(repo_root, location): 

98 return None 

99 

100 return os.path.relpath(location, repo_root) 

101 

102 

103class RemoteNotFoundError(Exception): 

104 pass 

105 

106 

107class RemoteNotValidError(Exception): 

108 def __init__(self, url: str): 

109 super().__init__(url) 

110 self.url = url 

111 

112 

113@dataclass(frozen=True) 

114class RevOptions: 

115 """ 

116 Encapsulates a VCS-specific revision to install, along with any VCS 

117 install options. 

118 

119 Args: 

120 vc_class: a VersionControl subclass. 

121 rev: the name of the revision to install. 

122 extra_args: a list of extra options. 

123 """ 

124 

125 vc_class: type[VersionControl] 

126 rev: str | None = None 

127 extra_args: CommandArgs = field(default_factory=list) 

128 branch_name: str | None = None 

129 

130 def __repr__(self) -> str: 

131 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>" 

132 

133 @property 

134 def arg_rev(self) -> str | None: 

135 if self.rev is None: 

136 return self.vc_class.default_arg_rev 

137 

138 return self.rev 

139 

140 def to_args(self) -> CommandArgs: 

141 """ 

142 Return the VCS-specific command arguments. 

143 """ 

144 args: CommandArgs = [] 

145 rev = self.arg_rev 

146 if rev is not None: 

147 args += self.vc_class.get_base_rev_args(rev) 

148 args += self.extra_args 

149 

150 return args 

151 

152 def to_display(self) -> str: 

153 if not self.rev: 

154 return "" 

155 

156 return f" (to revision {self.rev})" 

157 

158 def make_new(self, rev: str) -> RevOptions: 

159 """ 

160 Make a copy of the current instance, but with a new rev. 

161 

162 Args: 

163 rev: the name of the revision for the new object. 

164 """ 

165 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args) 

166 

167 

168class VcsSupport: 

169 _registry: dict[str, VersionControl] = {} 

170 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"] 

171 

172 def __init__(self) -> None: 

173 # Register more schemes with urlparse for various version control 

174 # systems 

175 urllib.parse.uses_netloc.extend(self.schemes) 

176 super().__init__() 

177 

178 def __iter__(self) -> Iterator[str]: 

179 return self._registry.__iter__() 

180 

181 @property 

182 def backends(self) -> list[VersionControl]: 

183 return list(self._registry.values()) 

184 

185 @property 

186 def dirnames(self) -> list[str]: 

187 return [backend.dirname for backend in self.backends] 

188 

189 @property 

190 def all_schemes(self) -> list[str]: 

191 schemes: list[str] = [] 

192 for backend in self.backends: 

193 schemes.extend(backend.schemes) 

194 return schemes 

195 

196 def register(self, cls: type[VersionControl]) -> None: 

197 if not hasattr(cls, "name"): 

198 logger.warning("Cannot register VCS %s", cls.__name__) 

199 return 

200 if cls.name not in self._registry: 

201 self._registry[cls.name] = cls() 

202 logger.debug("Registered VCS backend: %s", cls.name) 

203 

204 def unregister(self, name: str) -> None: 

205 if name in self._registry: 

206 del self._registry[name] 

207 

208 def get_backend_for_dir(self, location: str) -> VersionControl | None: 

209 """ 

210 Return a VersionControl object if a repository of that type is found 

211 at the given directory. 

212 """ 

213 vcs_backends = {} 

214 for vcs_backend in self._registry.values(): 

215 repo_path = vcs_backend.get_repository_root(location) 

216 if not repo_path: 

217 continue 

218 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name) 

219 vcs_backends[repo_path] = vcs_backend 

220 

221 if not vcs_backends: 

222 return None 

223 

224 # Choose the VCS in the inner-most directory. Since all repository 

225 # roots found here would be either `location` or one of its 

226 # parents, the longest path should have the most path components, 

227 # i.e. the backend representing the inner-most repository. 

228 inner_most_repo_path = max(vcs_backends, key=len) 

229 return vcs_backends[inner_most_repo_path] 

230 

231 def get_backend_for_scheme(self, scheme: str) -> VersionControl | None: 

232 """ 

233 Return a VersionControl object or None. 

234 """ 

235 for vcs_backend in self._registry.values(): 

236 if scheme in vcs_backend.schemes: 

237 return vcs_backend 

238 return None 

239 

240 def get_backend(self, name: str) -> VersionControl | None: 

241 """ 

242 Return a VersionControl object or None. 

243 """ 

244 name = name.lower() 

245 return self._registry.get(name) 

246 

247 

248vcs = VcsSupport() 

249 

250 

251class VersionControl: 

252 name = "" 

253 dirname = "" 

254 repo_name = "" 

255 # List of supported schemes for this Version Control 

256 schemes: tuple[str, ...] = () 

257 # Iterable of environment variable names to pass to call_subprocess(). 

258 unset_environ: tuple[str, ...] = () 

259 default_arg_rev: str | None = None 

260 

261 @classmethod 

262 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

263 """ 

264 Return whether the vcs prefix (e.g. "git+") should be added to a 

265 repository's remote url when used in a requirement. 

266 """ 

267 return not remote_url.lower().startswith(f"{cls.name}:") 

268 

269 @classmethod 

270 def get_subdirectory(cls, location: str) -> str | None: 

271 """ 

272 Return the path to Python project root, relative to the repo root. 

273 Return None if the project root is in the repo root. 

274 """ 

275 return None 

276 

277 @classmethod 

278 def get_requirement_revision(cls, repo_dir: str) -> str: 

279 """ 

280 Return the revision string that should be used in a requirement. 

281 """ 

282 return cls.get_revision(repo_dir) 

283 

284 @classmethod 

285 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str: 

286 """ 

287 Return the requirement string to use to redownload the files 

288 currently at the given repository directory. 

289 

290 Args: 

291 project_name: the (unescaped) project name. 

292 

293 The return value has a form similar to the following: 

294 

295 {repository_url}@{revision}#egg={project_name} 

296 """ 

297 repo_url = cls.get_remote_url(repo_dir) 

298 

299 if cls.should_add_vcs_url_prefix(repo_url): 

300 repo_url = f"{cls.name}+{repo_url}" 

301 

302 revision = cls.get_requirement_revision(repo_dir) 

303 subdir = cls.get_subdirectory(repo_dir) 

304 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir) 

305 

306 return req 

307 

308 @staticmethod 

309 def get_base_rev_args(rev: str) -> list[str]: 

310 """ 

311 Return the base revision arguments for a vcs command. 

312 

313 Args: 

314 rev: the name of a revision to install. Cannot be None. 

315 """ 

316 raise NotImplementedError 

317 

318 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

319 """ 

320 Return true if the commit hash checked out at dest matches 

321 the revision in url. 

322 

323 Always return False, if the VCS does not support immutable commit 

324 hashes. 

325 

326 This method does not check if there are local uncommitted changes 

327 in dest after checkout, as pip currently has no use case for that. 

328 """ 

329 return False 

330 

331 @classmethod 

332 def make_rev_options( 

333 cls, rev: str | None = None, extra_args: CommandArgs | None = None 

334 ) -> RevOptions: 

335 """ 

336 Return a RevOptions object. 

337 

338 Args: 

339 rev: the name of a revision to install. 

340 extra_args: a list of extra options. 

341 """ 

342 return RevOptions(cls, rev, extra_args=extra_args or []) 

343 

344 @classmethod 

345 def _is_local_repository(cls, repo: str) -> bool: 

346 """ 

347 posix absolute paths start with os.path.sep, 

348 win32 ones start with drive (like c:\\folder) 

349 """ 

350 drive, tail = os.path.splitdrive(repo) 

351 return repo.startswith(os.path.sep) or bool(drive) 

352 

353 @classmethod 

354 def get_netloc_and_auth( 

355 cls, netloc: str, scheme: str 

356 ) -> tuple[str, tuple[str | None, str | None]]: 

357 """ 

358 Parse the repository URL's netloc, and return the new netloc to use 

359 along with auth information. 

360 

361 Args: 

362 netloc: the original repository URL netloc. 

363 scheme: the repository URL's scheme without the vcs prefix. 

364 

365 This is mainly for the Subversion class to override, so that auth 

366 information can be provided via the --username and --password options 

367 instead of through the URL. For other subclasses like Git without 

368 such an option, auth information must stay in the URL. 

369 

370 Returns: (netloc, (username, password)). 

371 """ 

372 return netloc, (None, None) 

373 

374 @classmethod 

375 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]: 

376 """ 

377 Parse the repository URL to use, and return the URL, revision, 

378 and auth info to use. 

379 

380 Returns: (url, rev, (username, password)). 

381 """ 

382 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url) 

383 if "+" not in scheme: 

384 raise ValueError( 

385 f"Sorry, {url!r} is a malformed VCS url. " 

386 "The format is <vcs>+<protocol>://<url>, " 

387 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp" 

388 ) 

389 # Remove the vcs prefix. 

390 scheme = scheme.split("+", 1)[1] 

391 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme) 

392 rev = None 

393 if "@" in path: 

394 path, rev = path.rsplit("@", 1) 

395 if not rev: 

396 raise InstallationError( 

397 f"The URL {url!r} has an empty revision (after @) " 

398 "which is not supported. Include a revision after @ " 

399 "or remove @ from the URL." 

400 ) 

401 rev = urllib.parse.unquote(rev) 

402 url = urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

403 return url, rev, user_pass 

404 

405 @staticmethod 

406 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs: 

407 """ 

408 Return the RevOptions "extra arguments" to use in obtain(). 

409 """ 

410 return [] 

411 

412 def get_url_rev_options(self, url: HiddenText) -> tuple[HiddenText, RevOptions]: 

413 """ 

414 Return the URL and RevOptions object to use in obtain(), 

415 as a tuple (url, rev_options). 

416 """ 

417 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret) 

418 username, secret_password = user_pass 

419 password: HiddenText | None = None 

420 if secret_password is not None: 

421 password = hide_value(secret_password) 

422 extra_args = self.make_rev_args(username, password) 

423 rev_options = self.make_rev_options(rev, extra_args=extra_args) 

424 

425 return hide_url(secret_url), rev_options 

426 

427 @staticmethod 

428 def normalize_url(url: str) -> str: 

429 """ 

430 Normalize a URL for comparison by unquoting it and removing any 

431 trailing slash. 

432 """ 

433 return urllib.parse.unquote(url).rstrip("/") 

434 

435 @classmethod 

436 def compare_urls(cls, url1: str, url2: str) -> bool: 

437 """ 

438 Compare two repo URLs for identity, ignoring incidental differences. 

439 """ 

440 return cls.normalize_url(url1) == cls.normalize_url(url2) 

441 

442 def fetch_new( 

443 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

444 ) -> None: 

445 """ 

446 Fetch a revision from a repository, in the case that this is the 

447 first fetch from the repository. 

448 

449 Args: 

450 dest: the directory to fetch the repository to. 

451 rev_options: a RevOptions object. 

452 verbosity: verbosity level. 

453 """ 

454 raise NotImplementedError 

455 

456 def switch( 

457 self, 

458 dest: str, 

459 url: HiddenText, 

460 rev_options: RevOptions, 

461 verbosity: int = 0, 

462 ) -> None: 

463 """ 

464 Switch the repo at ``dest`` to point to ``URL``. 

465 

466 Args: 

467 rev_options: a RevOptions object. 

468 """ 

469 raise NotImplementedError 

470 

471 def update( 

472 self, 

473 dest: str, 

474 url: HiddenText, 

475 rev_options: RevOptions, 

476 verbosity: int = 0, 

477 ) -> None: 

478 """ 

479 Update an already-existing repo to the given ``rev_options``. 

480 

481 Args: 

482 rev_options: a RevOptions object. 

483 """ 

484 raise NotImplementedError 

485 

486 @classmethod 

487 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool: 

488 """ 

489 Return whether the id of the current commit equals the given name. 

490 

491 Args: 

492 dest: the repository directory. 

493 name: a string name. 

494 """ 

495 raise NotImplementedError 

496 

497 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None: 

498 """ 

499 Install or update in editable mode the package represented by this 

500 VersionControl object. 

501 

502 :param dest: the repository directory in which to install or update. 

503 :param url: the repository URL starting with a vcs prefix. 

504 :param verbosity: verbosity level. 

505 """ 

506 url, rev_options = self.get_url_rev_options(url) 

507 

508 if not os.path.exists(dest): 

509 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

510 return 

511 

512 rev_display = rev_options.to_display() 

513 if self.is_repository_directory(dest): 

514 existing_url = self.get_remote_url(dest) 

515 if self.compare_urls(existing_url, url.secret): 

516 logger.debug( 

517 "%s in %s exists, and has correct URL (%s)", 

518 self.repo_name.title(), 

519 display_path(dest), 

520 url, 

521 ) 

522 if not self.is_commit_id_equal(dest, rev_options.rev): 

523 logger.info( 

524 "Updating %s %s%s", 

525 display_path(dest), 

526 self.repo_name, 

527 rev_display, 

528 ) 

529 self.update(dest, url, rev_options, verbosity=verbosity) 

530 else: 

531 logger.info("Skipping because already up-to-date.") 

532 return 

533 

534 logger.warning( 

535 "%s %s in %s exists with URL %s", 

536 self.name, 

537 self.repo_name, 

538 display_path(dest), 

539 existing_url, 

540 ) 

541 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b")) 

542 else: 

543 logger.warning( 

544 "Directory %s already exists, and is not a %s %s.", 

545 dest, 

546 self.name, 

547 self.repo_name, 

548 ) 

549 # https://github.com/python/mypy/issues/1174 

550 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore 

551 

552 logger.warning( 

553 "The plan is to install the %s repository %s", 

554 self.name, 

555 url, 

556 ) 

557 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1]) 

558 

559 if response == "a": 

560 sys.exit(-1) 

561 

562 if response == "w": 

563 logger.warning("Deleting %s", display_path(dest)) 

564 rmtree(dest) 

565 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

566 return 

567 

568 if response == "b": 

569 dest_dir = backup_dir(dest) 

570 logger.warning("Backing up %s to %s", display_path(dest), dest_dir) 

571 shutil.move(dest, dest_dir) 

572 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

573 return 

574 

575 # Do nothing if the response is "i". 

576 if response == "s": 

577 logger.info( 

578 "Switching %s %s to %s%s", 

579 self.repo_name, 

580 display_path(dest), 

581 url, 

582 rev_display, 

583 ) 

584 self.switch(dest, url, rev_options, verbosity=verbosity) 

585 

586 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None: 

587 """ 

588 Clean up current location and download the url repository 

589 (and vcs infos) into location 

590 

591 :param url: the repository URL starting with a vcs prefix. 

592 :param verbosity: verbosity level. 

593 """ 

594 if os.path.exists(location): 

595 rmtree(location) 

596 self.obtain(location, url=url, verbosity=verbosity) 

597 

598 @classmethod 

599 def get_remote_url(cls, location: str) -> str: 

600 """ 

601 Return the url used at location 

602 

603 Raises RemoteNotFoundError if the repository does not have a remote 

604 url configured. 

605 """ 

606 raise NotImplementedError 

607 

608 @classmethod 

609 def get_revision(cls, location: str) -> str: 

610 """ 

611 Return the current commit id of the files at the given location. 

612 """ 

613 raise NotImplementedError 

614 

615 @classmethod 

616 def run_command( 

617 cls, 

618 cmd: list[str] | CommandArgs, 

619 show_stdout: bool = True, 

620 cwd: str | None = None, 

621 on_returncode: Literal["raise", "warn", "ignore"] = "raise", 

622 extra_ok_returncodes: Iterable[int] | None = None, 

623 command_desc: str | None = None, 

624 extra_environ: Mapping[str, Any] | None = None, 

625 spinner: SpinnerInterface | None = None, 

626 log_failed_cmd: bool = True, 

627 stdout_only: bool = False, 

628 ) -> str: 

629 """ 

630 Run a VCS subcommand 

631 This is simply a wrapper around call_subprocess that adds the VCS 

632 command name, and checks that the VCS is available 

633 """ 

634 cmd = make_command(cls.name, *cmd) 

635 if command_desc is None: 

636 command_desc = format_command_args(cmd) 

637 try: 

638 return call_subprocess( 

639 cmd, 

640 show_stdout, 

641 cwd, 

642 on_returncode=on_returncode, 

643 extra_ok_returncodes=extra_ok_returncodes, 

644 command_desc=command_desc, 

645 extra_environ=extra_environ, 

646 unset_environ=cls.unset_environ, 

647 spinner=spinner, 

648 log_failed_cmd=log_failed_cmd, 

649 stdout_only=stdout_only, 

650 ) 

651 except NotADirectoryError: 

652 raise BadCommand(f"Cannot find command {cls.name!r} - invalid PATH") 

653 except FileNotFoundError: 

654 # errno.ENOENT = no such file or directory 

655 # In other words, the VCS executable isn't available 

656 raise BadCommand( 

657 f"Cannot find command {cls.name!r} - do you have " 

658 f"{cls.name!r} installed and in your PATH?" 

659 ) 

660 except PermissionError: 

661 # errno.EACCES = Permission denied 

662 # This error occurs, for instance, when the command is installed 

663 # only for another user. So, the current user don't have 

664 # permission to call the other user command. 

665 raise BadCommand( 

666 f"No permission to execute {cls.name!r} - install it " 

667 f"locally, globally (ask admin), or check your PATH. " 

668 f"See possible solutions at " 

669 f"https://pip.pypa.io/en/latest/reference/pip_freeze/" 

670 f"#fixing-permission-denied." 

671 ) 

672 

673 @classmethod 

674 def is_repository_directory(cls, path: str) -> bool: 

675 """ 

676 Return whether a directory path is a repository directory. 

677 """ 

678 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name) 

679 return os.path.exists(os.path.join(path, cls.dirname)) 

680 

681 @classmethod 

682 def get_repository_root(cls, location: str) -> str | None: 

683 """ 

684 Return the "root" (top-level) directory controlled by the vcs, 

685 or `None` if the directory is not in any. 

686 

687 It is meant to be overridden to implement smarter detection 

688 mechanisms for specific vcs. 

689 

690 This can do more than is_repository_directory() alone. For 

691 example, the Git override checks that Git is actually available. 

692 """ 

693 if cls.is_repository_directory(location): 

694 return location 

695 return None