Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/versioncontrol.py: 38%

269 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1"""Handles all VCS (version control) support""" 

2 

3import logging 

4import os 

5import shutil 

6import sys 

7import urllib.parse 

8from typing import ( 

9 Any, 

10 Dict, 

11 Iterable, 

12 Iterator, 

13 List, 

14 Literal, 

15 Mapping, 

16 Optional, 

17 Tuple, 

18 Type, 

19 Union, 

20) 

21 

22from pip._internal.cli.spinners import SpinnerInterface 

23from pip._internal.exceptions import BadCommand, InstallationError 

24from pip._internal.utils.misc import ( 

25 HiddenText, 

26 ask_path_exists, 

27 backup_dir, 

28 display_path, 

29 hide_url, 

30 hide_value, 

31 is_installable_dir, 

32 rmtree, 

33) 

34from pip._internal.utils.subprocess import ( 

35 CommandArgs, 

36 call_subprocess, 

37 format_command_args, 

38 make_command, 

39) 

40from pip._internal.utils.urls import get_url_scheme 

41 

42__all__ = ["vcs"] 

43 

44 

45logger = logging.getLogger(__name__) 

46 

47AuthInfo = Tuple[Optional[str], Optional[str]] 

48 

49 

50def is_url(name: str) -> bool: 

51 """ 

52 Return true if the name looks like a URL. 

53 """ 

54 scheme = get_url_scheme(name) 

55 if scheme is None: 

56 return False 

57 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes 

58 

59 

60def make_vcs_requirement_url( 

61 repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None 

62) -> str: 

63 """ 

64 Return the URL for a VCS requirement. 

65 

66 Args: 

67 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+"). 

68 project_name: the (unescaped) project name. 

69 """ 

70 egg_project_name = project_name.replace("-", "_") 

71 req = f"{repo_url}@{rev}#egg={egg_project_name}" 

72 if subdir: 

73 req += f"&subdirectory={subdir}" 

74 

75 return req 

76 

77 

78def find_path_to_project_root_from_repo_root( 

79 location: str, repo_root: str 

80) -> Optional[str]: 

81 """ 

82 Find the the Python project's root by searching up the filesystem from 

83 `location`. Return the path to project root relative to `repo_root`. 

84 Return None if the project root is `repo_root`, or cannot be found. 

85 """ 

86 # find project root. 

87 orig_location = location 

88 while not is_installable_dir(location): 

89 last_location = location 

90 location = os.path.dirname(location) 

91 if location == last_location: 

92 # We've traversed up to the root of the filesystem without 

93 # finding a Python project. 

94 logger.warning( 

95 "Could not find a Python project for directory %s (tried all " 

96 "parent directories)", 

97 orig_location, 

98 ) 

99 return None 

100 

101 if os.path.samefile(repo_root, location): 

102 return None 

103 

104 return os.path.relpath(location, repo_root) 

105 

106 

107class RemoteNotFoundError(Exception): 

108 pass 

109 

110 

111class RemoteNotValidError(Exception): 

112 def __init__(self, url: str): 

113 super().__init__(url) 

114 self.url = url 

115 

116 

117class RevOptions: 

118 

119 """ 

120 Encapsulates a VCS-specific revision to install, along with any VCS 

121 install options. 

122 

123 Instances of this class should be treated as if immutable. 

124 """ 

125 

126 def __init__( 

127 self, 

128 vc_class: Type["VersionControl"], 

129 rev: Optional[str] = None, 

130 extra_args: Optional[CommandArgs] = None, 

131 ) -> None: 

132 """ 

133 Args: 

134 vc_class: a VersionControl subclass. 

135 rev: the name of the revision to install. 

136 extra_args: a list of extra options. 

137 """ 

138 if extra_args is None: 

139 extra_args = [] 

140 

141 self.extra_args = extra_args 

142 self.rev = rev 

143 self.vc_class = vc_class 

144 self.branch_name: Optional[str] = None 

145 

146 def __repr__(self) -> str: 

147 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>" 

148 

149 @property 

150 def arg_rev(self) -> Optional[str]: 

151 if self.rev is None: 

152 return self.vc_class.default_arg_rev 

153 

154 return self.rev 

155 

156 def to_args(self) -> CommandArgs: 

157 """ 

158 Return the VCS-specific command arguments. 

159 """ 

160 args: CommandArgs = [] 

161 rev = self.arg_rev 

162 if rev is not None: 

163 args += self.vc_class.get_base_rev_args(rev) 

164 args += self.extra_args 

165 

166 return args 

167 

168 def to_display(self) -> str: 

169 if not self.rev: 

170 return "" 

171 

172 return f" (to revision {self.rev})" 

173 

174 def make_new(self, rev: str) -> "RevOptions": 

175 """ 

176 Make a copy of the current instance, but with a new rev. 

177 

178 Args: 

179 rev: the name of the revision for the new object. 

180 """ 

181 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args) 

182 

183 

184class VcsSupport: 

185 _registry: Dict[str, "VersionControl"] = {} 

186 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"] 

187 

188 def __init__(self) -> None: 

189 # Register more schemes with urlparse for various version control 

190 # systems 

191 urllib.parse.uses_netloc.extend(self.schemes) 

192 super().__init__() 

193 

194 def __iter__(self) -> Iterator[str]: 

195 return self._registry.__iter__() 

196 

197 @property 

198 def backends(self) -> List["VersionControl"]: 

199 return list(self._registry.values()) 

200 

201 @property 

202 def dirnames(self) -> List[str]: 

203 return [backend.dirname for backend in self.backends] 

204 

205 @property 

206 def all_schemes(self) -> List[str]: 

207 schemes: List[str] = [] 

208 for backend in self.backends: 

209 schemes.extend(backend.schemes) 

210 return schemes 

211 

212 def register(self, cls: Type["VersionControl"]) -> None: 

213 if not hasattr(cls, "name"): 

214 logger.warning("Cannot register VCS %s", cls.__name__) 

215 return 

216 if cls.name not in self._registry: 

217 self._registry[cls.name] = cls() 

218 logger.debug("Registered VCS backend: %s", cls.name) 

219 

220 def unregister(self, name: str) -> None: 

221 if name in self._registry: 

222 del self._registry[name] 

223 

224 def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]: 

225 """ 

226 Return a VersionControl object if a repository of that type is found 

227 at the given directory. 

228 """ 

229 vcs_backends = {} 

230 for vcs_backend in self._registry.values(): 

231 repo_path = vcs_backend.get_repository_root(location) 

232 if not repo_path: 

233 continue 

234 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name) 

235 vcs_backends[repo_path] = vcs_backend 

236 

237 if not vcs_backends: 

238 return None 

239 

240 # Choose the VCS in the inner-most directory. Since all repository 

241 # roots found here would be either `location` or one of its 

242 # parents, the longest path should have the most path components, 

243 # i.e. the backend representing the inner-most repository. 

244 inner_most_repo_path = max(vcs_backends, key=len) 

245 return vcs_backends[inner_most_repo_path] 

246 

247 def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]: 

248 """ 

249 Return a VersionControl object or None. 

250 """ 

251 for vcs_backend in self._registry.values(): 

252 if scheme in vcs_backend.schemes: 

253 return vcs_backend 

254 return None 

255 

256 def get_backend(self, name: str) -> Optional["VersionControl"]: 

257 """ 

258 Return a VersionControl object or None. 

259 """ 

260 name = name.lower() 

261 return self._registry.get(name) 

262 

263 

264vcs = VcsSupport() 

265 

266 

267class VersionControl: 

268 name = "" 

269 dirname = "" 

270 repo_name = "" 

271 # List of supported schemes for this Version Control 

272 schemes: Tuple[str, ...] = () 

273 # Iterable of environment variable names to pass to call_subprocess(). 

274 unset_environ: Tuple[str, ...] = () 

275 default_arg_rev: Optional[str] = None 

276 

277 @classmethod 

278 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

279 """ 

280 Return whether the vcs prefix (e.g. "git+") should be added to a 

281 repository's remote url when used in a requirement. 

282 """ 

283 return not remote_url.lower().startswith(f"{cls.name}:") 

284 

285 @classmethod 

286 def get_subdirectory(cls, location: str) -> Optional[str]: 

287 """ 

288 Return the path to Python project root, relative to the repo root. 

289 Return None if the project root is in the repo root. 

290 """ 

291 return None 

292 

293 @classmethod 

294 def get_requirement_revision(cls, repo_dir: str) -> str: 

295 """ 

296 Return the revision string that should be used in a requirement. 

297 """ 

298 return cls.get_revision(repo_dir) 

299 

300 @classmethod 

301 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str: 

302 """ 

303 Return the requirement string to use to redownload the files 

304 currently at the given repository directory. 

305 

306 Args: 

307 project_name: the (unescaped) project name. 

308 

309 The return value has a form similar to the following: 

310 

311 {repository_url}@{revision}#egg={project_name} 

312 """ 

313 repo_url = cls.get_remote_url(repo_dir) 

314 

315 if cls.should_add_vcs_url_prefix(repo_url): 

316 repo_url = f"{cls.name}+{repo_url}" 

317 

318 revision = cls.get_requirement_revision(repo_dir) 

319 subdir = cls.get_subdirectory(repo_dir) 

320 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir) 

321 

322 return req 

323 

324 @staticmethod 

325 def get_base_rev_args(rev: str) -> List[str]: 

326 """ 

327 Return the base revision arguments for a vcs command. 

328 

329 Args: 

330 rev: the name of a revision to install. Cannot be None. 

331 """ 

332 raise NotImplementedError 

333 

334 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

335 """ 

336 Return true if the commit hash checked out at dest matches 

337 the revision in url. 

338 

339 Always return False, if the VCS does not support immutable commit 

340 hashes. 

341 

342 This method does not check if there are local uncommitted changes 

343 in dest after checkout, as pip currently has no use case for that. 

344 """ 

345 return False 

346 

347 @classmethod 

348 def make_rev_options( 

349 cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None 

350 ) -> RevOptions: 

351 """ 

352 Return a RevOptions object. 

353 

354 Args: 

355 rev: the name of a revision to install. 

356 extra_args: a list of extra options. 

357 """ 

358 return RevOptions(cls, rev, extra_args=extra_args) 

359 

360 @classmethod 

361 def _is_local_repository(cls, repo: str) -> bool: 

362 """ 

363 posix absolute paths start with os.path.sep, 

364 win32 ones start with drive (like c:\\folder) 

365 """ 

366 drive, tail = os.path.splitdrive(repo) 

367 return repo.startswith(os.path.sep) or bool(drive) 

368 

369 @classmethod 

370 def get_netloc_and_auth( 

371 cls, netloc: str, scheme: str 

372 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]: 

373 """ 

374 Parse the repository URL's netloc, and return the new netloc to use 

375 along with auth information. 

376 

377 Args: 

378 netloc: the original repository URL netloc. 

379 scheme: the repository URL's scheme without the vcs prefix. 

380 

381 This is mainly for the Subversion class to override, so that auth 

382 information can be provided via the --username and --password options 

383 instead of through the URL. For other subclasses like Git without 

384 such an option, auth information must stay in the URL. 

385 

386 Returns: (netloc, (username, password)). 

387 """ 

388 return netloc, (None, None) 

389 

390 @classmethod 

391 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: 

392 """ 

393 Parse the repository URL to use, and return the URL, revision, 

394 and auth info to use. 

395 

396 Returns: (url, rev, (username, password)). 

397 """ 

398 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url) 

399 if "+" not in scheme: 

400 raise ValueError( 

401 f"Sorry, {url!r} is a malformed VCS url. " 

402 "The format is <vcs>+<protocol>://<url>, " 

403 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp" 

404 ) 

405 # Remove the vcs prefix. 

406 scheme = scheme.split("+", 1)[1] 

407 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme) 

408 rev = None 

409 if "@" in path: 

410 path, rev = path.rsplit("@", 1) 

411 if not rev: 

412 raise InstallationError( 

413 f"The URL {url!r} has an empty revision (after @) " 

414 "which is not supported. Include a revision after @ " 

415 "or remove @ from the URL." 

416 ) 

417 url = urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

418 return url, rev, user_pass 

419 

420 @staticmethod 

421 def make_rev_args( 

422 username: Optional[str], password: Optional[HiddenText] 

423 ) -> CommandArgs: 

424 """ 

425 Return the RevOptions "extra arguments" to use in obtain(). 

426 """ 

427 return [] 

428 

429 def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]: 

430 """ 

431 Return the URL and RevOptions object to use in obtain(), 

432 as a tuple (url, rev_options). 

433 """ 

434 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret) 

435 username, secret_password = user_pass 

436 password: Optional[HiddenText] = None 

437 if secret_password is not None: 

438 password = hide_value(secret_password) 

439 extra_args = self.make_rev_args(username, password) 

440 rev_options = self.make_rev_options(rev, extra_args=extra_args) 

441 

442 return hide_url(secret_url), rev_options 

443 

444 @staticmethod 

445 def normalize_url(url: str) -> str: 

446 """ 

447 Normalize a URL for comparison by unquoting it and removing any 

448 trailing slash. 

449 """ 

450 return urllib.parse.unquote(url).rstrip("/") 

451 

452 @classmethod 

453 def compare_urls(cls, url1: str, url2: str) -> bool: 

454 """ 

455 Compare two repo URLs for identity, ignoring incidental differences. 

456 """ 

457 return cls.normalize_url(url1) == cls.normalize_url(url2) 

458 

459 def fetch_new( 

460 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

461 ) -> None: 

462 """ 

463 Fetch a revision from a repository, in the case that this is the 

464 first fetch from the repository. 

465 

466 Args: 

467 dest: the directory to fetch the repository to. 

468 rev_options: a RevOptions object. 

469 verbosity: verbosity level. 

470 """ 

471 raise NotImplementedError 

472 

473 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

474 """ 

475 Switch the repo at ``dest`` to point to ``URL``. 

476 

477 Args: 

478 rev_options: a RevOptions object. 

479 """ 

480 raise NotImplementedError 

481 

482 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

483 """ 

484 Update an already-existing repo to the given ``rev_options``. 

485 

486 Args: 

487 rev_options: a RevOptions object. 

488 """ 

489 raise NotImplementedError 

490 

491 @classmethod 

492 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: 

493 """ 

494 Return whether the id of the current commit equals the given name. 

495 

496 Args: 

497 dest: the repository directory. 

498 name: a string name. 

499 """ 

500 raise NotImplementedError 

501 

502 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None: 

503 """ 

504 Install or update in editable mode the package represented by this 

505 VersionControl object. 

506 

507 :param dest: the repository directory in which to install or update. 

508 :param url: the repository URL starting with a vcs prefix. 

509 :param verbosity: verbosity level. 

510 """ 

511 url, rev_options = self.get_url_rev_options(url) 

512 

513 if not os.path.exists(dest): 

514 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

515 return 

516 

517 rev_display = rev_options.to_display() 

518 if self.is_repository_directory(dest): 

519 existing_url = self.get_remote_url(dest) 

520 if self.compare_urls(existing_url, url.secret): 

521 logger.debug( 

522 "%s in %s exists, and has correct URL (%s)", 

523 self.repo_name.title(), 

524 display_path(dest), 

525 url, 

526 ) 

527 if not self.is_commit_id_equal(dest, rev_options.rev): 

528 logger.info( 

529 "Updating %s %s%s", 

530 display_path(dest), 

531 self.repo_name, 

532 rev_display, 

533 ) 

534 self.update(dest, url, rev_options) 

535 else: 

536 logger.info("Skipping because already up-to-date.") 

537 return 

538 

539 logger.warning( 

540 "%s %s in %s exists with URL %s", 

541 self.name, 

542 self.repo_name, 

543 display_path(dest), 

544 existing_url, 

545 ) 

546 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b")) 

547 else: 

548 logger.warning( 

549 "Directory %s already exists, and is not a %s %s.", 

550 dest, 

551 self.name, 

552 self.repo_name, 

553 ) 

554 # https://github.com/python/mypy/issues/1174 

555 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore 

556 

557 logger.warning( 

558 "The plan is to install the %s repository %s", 

559 self.name, 

560 url, 

561 ) 

562 response = ask_path_exists(f"What to do? {prompt[0]}", prompt[1]) 

563 

564 if response == "a": 

565 sys.exit(-1) 

566 

567 if response == "w": 

568 logger.warning("Deleting %s", display_path(dest)) 

569 rmtree(dest) 

570 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

571 return 

572 

573 if response == "b": 

574 dest_dir = backup_dir(dest) 

575 logger.warning("Backing up %s to %s", display_path(dest), dest_dir) 

576 shutil.move(dest, dest_dir) 

577 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

578 return 

579 

580 # Do nothing if the response is "i". 

581 if response == "s": 

582 logger.info( 

583 "Switching %s %s to %s%s", 

584 self.repo_name, 

585 display_path(dest), 

586 url, 

587 rev_display, 

588 ) 

589 self.switch(dest, url, rev_options) 

590 

591 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None: 

592 """ 

593 Clean up current location and download the url repository 

594 (and vcs infos) into location 

595 

596 :param url: the repository URL starting with a vcs prefix. 

597 :param verbosity: verbosity level. 

598 """ 

599 if os.path.exists(location): 

600 rmtree(location) 

601 self.obtain(location, url=url, verbosity=verbosity) 

602 

603 @classmethod 

604 def get_remote_url(cls, location: str) -> str: 

605 """ 

606 Return the url used at location 

607 

608 Raises RemoteNotFoundError if the repository does not have a remote 

609 url configured. 

610 """ 

611 raise NotImplementedError 

612 

613 @classmethod 

614 def get_revision(cls, location: str) -> str: 

615 """ 

616 Return the current commit id of the files at the given location. 

617 """ 

618 raise NotImplementedError 

619 

620 @classmethod 

621 def run_command( 

622 cls, 

623 cmd: Union[List[str], CommandArgs], 

624 show_stdout: bool = True, 

625 cwd: Optional[str] = None, 

626 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise", 

627 extra_ok_returncodes: Optional[Iterable[int]] = None, 

628 command_desc: Optional[str] = None, 

629 extra_environ: Optional[Mapping[str, Any]] = None, 

630 spinner: Optional[SpinnerInterface] = None, 

631 log_failed_cmd: bool = True, 

632 stdout_only: bool = False, 

633 ) -> str: 

634 """ 

635 Run a VCS subcommand 

636 This is simply a wrapper around call_subprocess that adds the VCS 

637 command name, and checks that the VCS is available 

638 """ 

639 cmd = make_command(cls.name, *cmd) 

640 if command_desc is None: 

641 command_desc = format_command_args(cmd) 

642 try: 

643 return call_subprocess( 

644 cmd, 

645 show_stdout, 

646 cwd, 

647 on_returncode=on_returncode, 

648 extra_ok_returncodes=extra_ok_returncodes, 

649 command_desc=command_desc, 

650 extra_environ=extra_environ, 

651 unset_environ=cls.unset_environ, 

652 spinner=spinner, 

653 log_failed_cmd=log_failed_cmd, 

654 stdout_only=stdout_only, 

655 ) 

656 except FileNotFoundError: 

657 # errno.ENOENT = no such file or directory 

658 # In other words, the VCS executable isn't available 

659 raise BadCommand( 

660 f"Cannot find command {cls.name!r} - do you have " 

661 f"{cls.name!r} installed and in your PATH?" 

662 ) 

663 except PermissionError: 

664 # errno.EACCES = Permission denied 

665 # This error occurs, for instance, when the command is installed 

666 # only for another user. So, the current user don't have 

667 # permission to call the other user command. 

668 raise BadCommand( 

669 f"No permission to execute {cls.name!r} - install it " 

670 f"locally, globally (ask admin), or check your PATH. " 

671 f"See possible solutions at " 

672 f"https://pip.pypa.io/en/latest/reference/pip_freeze/" 

673 f"#fixing-permission-denied." 

674 ) 

675 

676 @classmethod 

677 def is_repository_directory(cls, path: str) -> bool: 

678 """ 

679 Return whether a directory path is a repository directory. 

680 """ 

681 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name) 

682 return os.path.exists(os.path.join(path, cls.dirname)) 

683 

684 @classmethod 

685 def get_repository_root(cls, location: str) -> Optional[str]: 

686 """ 

687 Return the "root" (top-level) directory controlled by the vcs, 

688 or `None` if the directory is not in any. 

689 

690 It is meant to be overridden to implement smarter detection 

691 mechanisms for specific vcs. 

692 

693 This can do more than is_repository_directory() alone. For 

694 example, the Git override checks that Git is actually available. 

695 """ 

696 if cls.is_repository_directory(location): 

697 return location 

698 return None