Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/vcs/versioncontrol.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

274 statements  

1"""Handles all VCS (version control) support""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import os 

7import shutil 

8import sys 

9import urllib.parse 

10from collections.abc import Iterable, Iterator, Mapping 

11from dataclasses import dataclass, field 

12from typing import ( 

13 Any, 

14 Literal, 

15) 

16 

17from pip._internal.cli.spinners import SpinnerInterface 

18from pip._internal.exceptions import BadCommand, InstallationError 

19from pip._internal.utils.misc import ( 

20 HiddenText, 

21 ask_path_exists, 

22 backup_dir, 

23 display_path, 

24 hide_url, 

25 hide_value, 

26 is_installable_dir, 

27 rmtree, 

28) 

29from pip._internal.utils.subprocess import ( 

30 CommandArgs, 

31 call_subprocess, 

32 format_command_args, 

33 make_command, 

34) 

35 

36__all__ = ["vcs"] 

37 

38 

39logger = logging.getLogger(__name__) 

40 

41AuthInfo = tuple[str | None, str | None] 

42 

43 

44def is_url(name: str) -> bool: 

45 """ 

46 Return true if the name looks like a URL. 

47 """ 

48 scheme = urllib.parse.urlsplit(name).scheme 

49 if not scheme: 

50 return False 

51 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes 

52 

53 

54def make_vcs_requirement_url( 

55 repo_url: str, rev: str, project_name: str, subdir: str | None = None 

56) -> str: 

57 """ 

58 Return the URL for a VCS requirement. 

59 

60 Args: 

61 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+"). 

62 project_name: the (unescaped) project name. 

63 """ 

64 quoted_rev = urllib.parse.quote(rev, "/") 

65 egg_project_name = project_name.replace("-", "_") 

66 req = f"{repo_url}@{quoted_rev}#egg={egg_project_name}" 

67 if subdir: 

68 req += f"&subdirectory={subdir}" 

69 

70 return req 

71 

72 

73def find_path_to_project_root_from_repo_root( 

74 location: str, repo_root: str 

75) -> str | None: 

76 """ 

77 Find the the Python project's root by searching up the filesystem from 

78 `location`. Return the path to project root relative to `repo_root`. 

79 Return None if the project root is `repo_root`, or cannot be found. 

80 """ 

81 # find project root. 

82 orig_location = location 

83 while not is_installable_dir(location): 

84 last_location = location 

85 location = os.path.dirname(location) 

86 if location == last_location: 

87 # We've traversed up to the root of the filesystem without 

88 # finding a Python project. 

89 logger.warning( 

90 "Could not find a Python project for directory %s (tried all " 

91 "parent directories)", 

92 orig_location, 

93 ) 

94 return None 

95 

96 if os.path.samefile(repo_root, location): 

97 return None 

98 

99 return os.path.relpath(location, repo_root) 

100 

101 

102class RemoteNotFoundError(Exception): 

103 pass 

104 

105 

106class RemoteNotValidError(Exception): 

107 def __init__(self, url: str): 

108 super().__init__(url) 

109 self.url = url 

110 

111 

112@dataclass(frozen=True) 

113class RevOptions: 

114 """ 

115 Encapsulates a VCS-specific revision to install, along with any VCS 

116 install options. 

117 

118 Args: 

119 vc_class: a VersionControl subclass. 

120 rev: the name of the revision to install. 

121 extra_args: a list of extra options. 

122 """ 

123 

124 vc_class: type[VersionControl] 

125 rev: str | None = None 

126 extra_args: CommandArgs = field(default_factory=list) 

127 branch_name: str | None = None 

128 

129 def __repr__(self) -> str: 

130 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>" 

131 

132 @property 

133 def arg_rev(self) -> str | None: 

134 if self.rev is None: 

135 return self.vc_class.default_arg_rev 

136 

137 return self.rev 

138 

139 def to_args(self) -> CommandArgs: 

140 """ 

141 Return the VCS-specific command arguments. 

142 """ 

143 args: CommandArgs = [] 

144 rev = self.arg_rev 

145 if rev is not None: 

146 args += self.vc_class.get_base_rev_args(rev) 

147 args += self.extra_args 

148 

149 return args 

150 

151 def to_display(self) -> str: 

152 if not self.rev: 

153 return "" 

154 

155 return f" (to revision {self.rev})" 

156 

157 def make_new(self, rev: str) -> RevOptions: 

158 """ 

159 Make a copy of the current instance, but with a new rev. 

160 

161 Args: 

162 rev: the name of the revision for the new object. 

163 """ 

164 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args) 

165 

166 

167class VcsSupport: 

168 _registry: dict[str, VersionControl] = {} 

169 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"] 

170 

171 def __init__(self) -> None: 

172 # Register more schemes with urlparse for various version control 

173 # systems 

174 urllib.parse.uses_netloc.extend(self.schemes) 

175 super().__init__() 

176 

177 def __iter__(self) -> Iterator[str]: 

178 return self._registry.__iter__() 

179 

180 @property 

181 def backends(self) -> list[VersionControl]: 

182 return list(self._registry.values()) 

183 

184 @property 

185 def dirnames(self) -> list[str]: 

186 return [backend.dirname for backend in self.backends] 

187 

188 @property 

189 def all_schemes(self) -> list[str]: 

190 schemes: list[str] = [] 

191 for backend in self.backends: 

192 schemes.extend(backend.schemes) 

193 return schemes 

194 

195 def register(self, cls: type[VersionControl]) -> None: 

196 if not hasattr(cls, "name"): 

197 logger.warning("Cannot register VCS %s", cls.__name__) 

198 return 

199 if cls.name not in self._registry: 

200 self._registry[cls.name] = cls() 

201 logger.debug("Registered VCS backend: %s", cls.name) 

202 

203 def unregister(self, name: str) -> None: 

204 if name in self._registry: 

205 del self._registry[name] 

206 

207 def get_backend_for_dir(self, location: str) -> VersionControl | None: 

208 """ 

209 Return a VersionControl object if a repository of that type is found 

210 at the given directory. 

211 """ 

212 vcs_backends = {} 

213 for vcs_backend in self._registry.values(): 

214 repo_path = vcs_backend.get_repository_root(location) 

215 if not repo_path: 

216 continue 

217 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name) 

218 vcs_backends[repo_path] = vcs_backend 

219 

220 if not vcs_backends: 

221 return None 

222 

223 # Choose the VCS in the inner-most directory. Since all repository 

224 # roots found here would be either `location` or one of its 

225 # parents, the longest path should have the most path components, 

226 # i.e. the backend representing the inner-most repository. 

227 inner_most_repo_path = max(vcs_backends, key=len) 

228 return vcs_backends[inner_most_repo_path] 

229 

230 def get_backend_for_scheme(self, scheme: str) -> VersionControl | None: 

231 """ 

232 Return a VersionControl object or None. 

233 """ 

234 for vcs_backend in self._registry.values(): 

235 if scheme in vcs_backend.schemes: 

236 return vcs_backend 

237 return None 

238 

239 def get_backend(self, name: str) -> VersionControl | None: 

240 """ 

241 Return a VersionControl object or None. 

242 """ 

243 name = name.lower() 

244 return self._registry.get(name) 

245 

246 

247vcs = VcsSupport() 

248 

249 

250class VersionControl: 

251 name = "" 

252 dirname = "" 

253 repo_name = "" 

254 # List of supported schemes for this Version Control 

255 schemes: tuple[str, ...] = () 

256 # Iterable of environment variable names to pass to call_subprocess(). 

257 unset_environ: tuple[str, ...] = () 

258 default_arg_rev: str | None = None 

259 

260 @classmethod 

261 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

262 """ 

263 Return whether the vcs prefix (e.g. "git+") should be added to a 

264 repository's remote url when used in a requirement. 

265 """ 

266 return not remote_url.lower().startswith(f"{cls.name}:") 

267 

268 @classmethod 

269 def get_subdirectory(cls, location: str) -> str | None: 

270 """ 

271 Return the path to Python project root, relative to the repo root. 

272 Return None if the project root is in the repo root. 

273 """ 

274 return None 

275 

276 @classmethod 

277 def get_requirement_revision(cls, repo_dir: str) -> str: 

278 """ 

279 Return the revision string that should be used in a requirement. 

280 """ 

281 return cls.get_revision(repo_dir) 

282 

283 @classmethod 

284 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str: 

285 """ 

286 Return the requirement string to use to redownload the files 

287 currently at the given repository directory. 

288 

289 Args: 

290 project_name: the (unescaped) project name. 

291 

292 The return value has a form similar to the following: 

293 

294 {repository_url}@{revision}#egg={project_name} 

295 """ 

296 repo_url = cls.get_remote_url(repo_dir) 

297 

298 if cls.should_add_vcs_url_prefix(repo_url): 

299 repo_url = f"{cls.name}+{repo_url}" 

300 

301 revision = cls.get_requirement_revision(repo_dir) 

302 subdir = cls.get_subdirectory(repo_dir) 

303 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir) 

304 

305 return req 

306 

307 @staticmethod 

308 def get_base_rev_args(rev: str) -> list[str]: 

309 """ 

310 Return the base revision arguments for a vcs command. 

311 

312 Args: 

313 rev: the name of a revision to install. Cannot be None. 

314 """ 

315 raise NotImplementedError 

316 

317 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

318 """ 

319 Return true if the commit hash checked out at dest matches 

320 the revision in url. 

321 

322 Always return False, if the VCS does not support immutable commit 

323 hashes. 

324 

325 This method does not check if there are local uncommitted changes 

326 in dest after checkout, as pip currently has no use case for that. 

327 """ 

328 return False 

329 

330 @classmethod 

331 def make_rev_options( 

332 cls, rev: str | None = None, extra_args: CommandArgs | None = None 

333 ) -> RevOptions: 

334 """ 

335 Return a RevOptions object. 

336 

337 Args: 

338 rev: the name of a revision to install. 

339 extra_args: a list of extra options. 

340 """ 

341 return RevOptions(cls, rev, extra_args=extra_args or []) 

342 

343 @classmethod 

344 def _is_local_repository(cls, repo: str) -> bool: 

345 """ 

346 posix absolute paths start with os.path.sep, 

347 win32 ones start with drive (like c:\\folder) 

348 """ 

349 drive, tail = os.path.splitdrive(repo) 

350 return repo.startswith(os.path.sep) or bool(drive) 

351 

352 @classmethod 

353 def get_netloc_and_auth( 

354 cls, netloc: str, scheme: str 

355 ) -> tuple[str, tuple[str | None, str | None]]: 

356 """ 

357 Parse the repository URL's netloc, and return the new netloc to use 

358 along with auth information. 

359 

360 Args: 

361 netloc: the original repository URL netloc. 

362 scheme: the repository URL's scheme without the vcs prefix. 

363 

364 This is mainly for the Subversion class to override, so that auth 

365 information can be provided via the --username and --password options 

366 instead of through the URL. For other subclasses like Git without 

367 such an option, auth information must stay in the URL. 

368 

369 Returns: (netloc, (username, password)). 

370 """ 

371 return netloc, (None, None) 

372 

373 @classmethod 

374 def get_url_rev_and_auth(cls, url: str) -> tuple[str, str | None, AuthInfo]: 

375 """ 

376 Parse the repository URL to use, and return the URL, revision, 

377 and auth info to use. 

378 

379 Returns: (url, rev, (username, password)). 

380 """ 

381 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url) 

382 if "+" not in scheme: 

383 raise ValueError( 

384 f"Sorry, {url!r} is a malformed VCS url. " 

385 "The format is <vcs>+<protocol>://<url>, " 

386 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp" 

387 ) 

388 # Remove the vcs prefix. 

389 scheme = scheme.split("+", 1)[1] 

390 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme) 

391 rev = None 

392 if "@" in path: 

393 path, rev = path.rsplit("@", 1) 

394 if not rev: 

395 raise InstallationError( 

396 f"The URL {url!r} has an empty revision (after @) " 

397 "which is not supported. Include a revision after @ " 

398 "or remove @ from the URL." 

399 ) 

400 rev = urllib.parse.unquote(rev) 

401 url = urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

402 return url, rev, user_pass 

403 

404 @staticmethod 

405 def make_rev_args(username: str | None, password: HiddenText | None) -> CommandArgs: 

406 """ 

407 Return the RevOptions "extra arguments" to use in obtain(). 

408 """ 

409 return [] 

410 

411 def get_url_rev_options(self, url: HiddenText) -> tuple[HiddenText, RevOptions]: 

412 """ 

413 Return the URL and RevOptions object to use in obtain(), 

414 as a tuple (url, rev_options). 

415 """ 

416 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret) 

417 username, secret_password = user_pass 

418 password: HiddenText | None = None 

419 if secret_password is not None: 

420 password = hide_value(secret_password) 

421 extra_args = self.make_rev_args(username, password) 

422 rev_options = self.make_rev_options(rev, extra_args=extra_args) 

423 

424 return hide_url(secret_url), rev_options 

425 

426 @staticmethod 

427 def normalize_url(url: str) -> str: 

428 """ 

429 Normalize a URL for comparison by unquoting it and removing any 

430 trailing slash. 

431 """ 

432 return urllib.parse.unquote(url).rstrip("/") 

433 

434 @classmethod 

435 def compare_urls(cls, url1: str, url2: str) -> bool: 

436 """ 

437 Compare two repo URLs for identity, ignoring incidental differences. 

438 """ 

439 return cls.normalize_url(url1) == cls.normalize_url(url2) 

440 

441 def fetch_new( 

442 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

443 ) -> None: 

444 """ 

445 Fetch a revision from a repository, in the case that this is the 

446 first fetch from the repository. 

447 

448 Args: 

449 dest: the directory to fetch the repository to. 

450 rev_options: a RevOptions object. 

451 verbosity: verbosity level. 

452 """ 

453 raise NotImplementedError 

454 

455 def switch( 

456 self, 

457 dest: str, 

458 url: HiddenText, 

459 rev_options: RevOptions, 

460 verbosity: int = 0, 

461 ) -> None: 

462 """ 

463 Switch the repo at ``dest`` to point to ``URL``. 

464 

465 Args: 

466 rev_options: a RevOptions object. 

467 """ 

468 raise NotImplementedError 

469 

470 def update( 

471 self, 

472 dest: str, 

473 url: HiddenText, 

474 rev_options: RevOptions, 

475 verbosity: int = 0, 

476 ) -> None: 

477 """ 

478 Update an already-existing repo to the given ``rev_options``. 

479 

480 Args: 

481 rev_options: a RevOptions object. 

482 """ 

483 raise NotImplementedError 

484 

485 @classmethod 

486 def is_commit_id_equal(cls, dest: str, name: str | None) -> bool: 

487 """ 

488 Return whether the id of the current commit equals the given name. 

489 

490 Args: 

491 dest: the repository directory. 

492 name: a string name. 

493 """ 

494 raise NotImplementedError 

495 

496 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None: 

497 """ 

498 Install or update in editable mode the package represented by this 

499 VersionControl object. 

500 

501 :param dest: the repository directory in which to install or update. 

502 :param url: the repository URL starting with a vcs prefix. 

503 :param verbosity: verbosity level. 

504 """ 

505 url, rev_options = self.get_url_rev_options(url) 

506 

507 if not os.path.exists(dest): 

508 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

509 return 

510 

511 rev_display = rev_options.to_display() 

512 if self.is_repository_directory(dest): 

513 existing_url = self.get_remote_url(dest) 

514 if self.compare_urls(existing_url, url.secret): 

515 logger.debug( 

516 "%s in %s exists, and has correct URL (%s)", 

517 self.repo_name.title(), 

518 display_path(dest), 

519 url, 

520 ) 

521 if not self.is_commit_id_equal(dest, rev_options.rev): 

522 logger.info( 

523 "Updating %s %s%s", 

524 display_path(dest), 

525 self.repo_name, 

526 rev_display, 

527 ) 

528 self.update(dest, url, rev_options, verbosity=verbosity) 

529 else: 

530 logger.info("Skipping because already up-to-date.") 

531 return 

532 

533 logger.warning( 

534 "%s %s in %s exists with URL %s", 

535 self.name, 

536 self.repo_name, 

537 display_path(dest), 

538 existing_url, 

539 ) 

540 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b")) 

541 else: 

542 logger.warning( 

543 "Directory %s already exists, and is not a %s %s.", 

544 dest, 

545 self.name, 

546 self.repo_name, 

547 ) 

548 # https://github.com/python/mypy/issues/1174 

549 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore 

550 

551 logger.warning( 

552 "The plan is to install the %s repository %s", 

553 self.name, 

554 url, 

555 ) 

556 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1]) 

557 

558 if response == "a": 

559 sys.exit(-1) 

560 

561 if response == "w": 

562 logger.warning("Deleting %s", display_path(dest)) 

563 rmtree(dest) 

564 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

565 return 

566 

567 if response == "b": 

568 dest_dir = backup_dir(dest) 

569 logger.warning("Backing up %s to %s", display_path(dest), dest_dir) 

570 shutil.move(dest, dest_dir) 

571 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

572 return 

573 

574 # Do nothing if the response is "i". 

575 if response == "s": 

576 logger.info( 

577 "Switching %s %s to %s%s", 

578 self.repo_name, 

579 display_path(dest), 

580 url, 

581 rev_display, 

582 ) 

583 self.switch(dest, url, rev_options, verbosity=verbosity) 

584 

585 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None: 

586 """ 

587 Clean up current location and download the url repository 

588 (and vcs infos) into location 

589 

590 :param url: the repository URL starting with a vcs prefix. 

591 :param verbosity: verbosity level. 

592 """ 

593 if os.path.exists(location): 

594 rmtree(location) 

595 self.obtain(location, url=url, verbosity=verbosity) 

596 

597 @classmethod 

598 def get_remote_url(cls, location: str) -> str: 

599 """ 

600 Return the url used at location 

601 

602 Raises RemoteNotFoundError if the repository does not have a remote 

603 url configured. 

604 """ 

605 raise NotImplementedError 

606 

607 @classmethod 

608 def get_revision(cls, location: str) -> str: 

609 """ 

610 Return the current commit id of the files at the given location. 

611 """ 

612 raise NotImplementedError 

613 

614 @classmethod 

615 def run_command( 

616 cls, 

617 cmd: list[str] | CommandArgs, 

618 show_stdout: bool = True, 

619 cwd: str | None = None, 

620 on_returncode: Literal["raise", "warn", "ignore"] = "raise", 

621 extra_ok_returncodes: Iterable[int] | None = None, 

622 command_desc: str | None = None, 

623 extra_environ: Mapping[str, Any] | None = None, 

624 spinner: SpinnerInterface | None = None, 

625 log_failed_cmd: bool = True, 

626 stdout_only: bool = False, 

627 ) -> str: 

628 """ 

629 Run a VCS subcommand 

630 This is simply a wrapper around call_subprocess that adds the VCS 

631 command name, and checks that the VCS is available 

632 """ 

633 cmd = make_command(cls.name, *cmd) 

634 if command_desc is None: 

635 command_desc = format_command_args(cmd) 

636 try: 

637 return call_subprocess( 

638 cmd, 

639 show_stdout, 

640 cwd, 

641 on_returncode=on_returncode, 

642 extra_ok_returncodes=extra_ok_returncodes, 

643 command_desc=command_desc, 

644 extra_environ=extra_environ, 

645 unset_environ=cls.unset_environ, 

646 spinner=spinner, 

647 log_failed_cmd=log_failed_cmd, 

648 stdout_only=stdout_only, 

649 ) 

650 except NotADirectoryError: 

651 raise BadCommand(f"Cannot find command {cls.name!r} - invalid PATH") 

652 except FileNotFoundError: 

653 # errno.ENOENT = no such file or directory 

654 # In other words, the VCS executable isn't available 

655 raise BadCommand( 

656 f"Cannot find command {cls.name!r} - do you have " 

657 f"{cls.name!r} installed and in your PATH?" 

658 ) 

659 except PermissionError: 

660 # errno.EACCES = Permission denied 

661 # This error occurs, for instance, when the command is installed 

662 # only for another user. So, the current user don't have 

663 # permission to call the other user command. 

664 raise BadCommand( 

665 f"No permission to execute {cls.name!r} - install it " 

666 f"locally, globally (ask admin), or check your PATH. " 

667 f"See possible solutions at " 

668 f"https://pip.pypa.io/en/latest/reference/pip_freeze/" 

669 f"#fixing-permission-denied." 

670 ) 

671 

672 @classmethod 

673 def is_repository_directory(cls, path: str) -> bool: 

674 """ 

675 Return whether a directory path is a repository directory. 

676 """ 

677 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name) 

678 return os.path.exists(os.path.join(path, cls.dirname)) 

679 

680 @classmethod 

681 def get_repository_root(cls, location: str) -> str | None: 

682 """ 

683 Return the "root" (top-level) directory controlled by the vcs, 

684 or `None` if the directory is not in any. 

685 

686 It is meant to be overridden to implement smarter detection 

687 mechanisms for specific vcs. 

688 

689 This can do more than is_repository_directory() alone. For 

690 example, the Git override checks that Git is actually available. 

691 """ 

692 if cls.is_repository_directory(location): 

693 return location 

694 return None