Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/vcs/versioncontrol.py: 38%

271 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1"""Handles all VCS (version control) support""" 

2 

3import logging 

4import os 

5import shutil 

6import sys 

7import urllib.parse 

8from typing import ( 

9 TYPE_CHECKING, 

10 Any, 

11 Dict, 

12 Iterable, 

13 Iterator, 

14 List, 

15 Mapping, 

16 Optional, 

17 Tuple, 

18 Type, 

19 Union, 

20) 

21 

22from pip._internal.cli.spinners import SpinnerInterface 

23from pip._internal.exceptions import BadCommand, InstallationError 

24from pip._internal.utils.misc import ( 

25 HiddenText, 

26 ask_path_exists, 

27 backup_dir, 

28 display_path, 

29 hide_url, 

30 hide_value, 

31 is_installable_dir, 

32 rmtree, 

33) 

34from pip._internal.utils.subprocess import ( 

35 CommandArgs, 

36 call_subprocess, 

37 format_command_args, 

38 make_command, 

39) 

40from pip._internal.utils.urls import get_url_scheme 

41 

42if TYPE_CHECKING: 

43 # Literal was introduced in Python 3.8. 

44 # 

45 # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7. 

46 from typing import Literal 

47 

48 

49__all__ = ["vcs"] 

50 

51 

52logger = logging.getLogger(__name__) 

53 

54AuthInfo = Tuple[Optional[str], Optional[str]] 

55 

56 

57def is_url(name: str) -> bool: 

58 """ 

59 Return true if the name looks like a URL. 

60 """ 

61 scheme = get_url_scheme(name) 

62 if scheme is None: 

63 return False 

64 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes 

65 

66 

67def make_vcs_requirement_url( 

68 repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None 

69) -> str: 

70 """ 

71 Return the URL for a VCS requirement. 

72 

73 Args: 

74 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+"). 

75 project_name: the (unescaped) project name. 

76 """ 

77 egg_project_name = project_name.replace("-", "_") 

78 req = f"{repo_url}@{rev}#egg={egg_project_name}" 

79 if subdir: 

80 req += f"&subdirectory={subdir}" 

81 

82 return req 

83 

84 

85def find_path_to_project_root_from_repo_root( 

86 location: str, repo_root: str 

87) -> Optional[str]: 

88 """ 

89 Find the the Python project's root by searching up the filesystem from 

90 `location`. Return the path to project root relative to `repo_root`. 

91 Return None if the project root is `repo_root`, or cannot be found. 

92 """ 

93 # find project root. 

94 orig_location = location 

95 while not is_installable_dir(location): 

96 last_location = location 

97 location = os.path.dirname(location) 

98 if location == last_location: 

99 # We've traversed up to the root of the filesystem without 

100 # finding a Python project. 

101 logger.warning( 

102 "Could not find a Python project for directory %s (tried all " 

103 "parent directories)", 

104 orig_location, 

105 ) 

106 return None 

107 

108 if os.path.samefile(repo_root, location): 

109 return None 

110 

111 return os.path.relpath(location, repo_root) 

112 

113 

114class RemoteNotFoundError(Exception): 

115 pass 

116 

117 

118class RemoteNotValidError(Exception): 

119 def __init__(self, url: str): 

120 super().__init__(url) 

121 self.url = url 

122 

123 

124class RevOptions: 

125 

126 """ 

127 Encapsulates a VCS-specific revision to install, along with any VCS 

128 install options. 

129 

130 Instances of this class should be treated as if immutable. 

131 """ 

132 

133 def __init__( 

134 self, 

135 vc_class: Type["VersionControl"], 

136 rev: Optional[str] = None, 

137 extra_args: Optional[CommandArgs] = None, 

138 ) -> None: 

139 """ 

140 Args: 

141 vc_class: a VersionControl subclass. 

142 rev: the name of the revision to install. 

143 extra_args: a list of extra options. 

144 """ 

145 if extra_args is None: 

146 extra_args = [] 

147 

148 self.extra_args = extra_args 

149 self.rev = rev 

150 self.vc_class = vc_class 

151 self.branch_name: Optional[str] = None 

152 

153 def __repr__(self) -> str: 

154 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>" 

155 

156 @property 

157 def arg_rev(self) -> Optional[str]: 

158 if self.rev is None: 

159 return self.vc_class.default_arg_rev 

160 

161 return self.rev 

162 

163 def to_args(self) -> CommandArgs: 

164 """ 

165 Return the VCS-specific command arguments. 

166 """ 

167 args: CommandArgs = [] 

168 rev = self.arg_rev 

169 if rev is not None: 

170 args += self.vc_class.get_base_rev_args(rev) 

171 args += self.extra_args 

172 

173 return args 

174 

175 def to_display(self) -> str: 

176 if not self.rev: 

177 return "" 

178 

179 return f" (to revision {self.rev})" 

180 

181 def make_new(self, rev: str) -> "RevOptions": 

182 """ 

183 Make a copy of the current instance, but with a new rev. 

184 

185 Args: 

186 rev: the name of the revision for the new object. 

187 """ 

188 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args) 

189 

190 

191class VcsSupport: 

192 _registry: Dict[str, "VersionControl"] = {} 

193 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"] 

194 

195 def __init__(self) -> None: 

196 # Register more schemes with urlparse for various version control 

197 # systems 

198 urllib.parse.uses_netloc.extend(self.schemes) 

199 super().__init__() 

200 

201 def __iter__(self) -> Iterator[str]: 

202 return self._registry.__iter__() 

203 

204 @property 

205 def backends(self) -> List["VersionControl"]: 

206 return list(self._registry.values()) 

207 

208 @property 

209 def dirnames(self) -> List[str]: 

210 return [backend.dirname for backend in self.backends] 

211 

212 @property 

213 def all_schemes(self) -> List[str]: 

214 schemes: List[str] = [] 

215 for backend in self.backends: 

216 schemes.extend(backend.schemes) 

217 return schemes 

218 

219 def register(self, cls: Type["VersionControl"]) -> None: 

220 if not hasattr(cls, "name"): 

221 logger.warning("Cannot register VCS %s", cls.__name__) 

222 return 

223 if cls.name not in self._registry: 

224 self._registry[cls.name] = cls() 

225 logger.debug("Registered VCS backend: %s", cls.name) 

226 

227 def unregister(self, name: str) -> None: 

228 if name in self._registry: 

229 del self._registry[name] 

230 

231 def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]: 

232 """ 

233 Return a VersionControl object if a repository of that type is found 

234 at the given directory. 

235 """ 

236 vcs_backends = {} 

237 for vcs_backend in self._registry.values(): 

238 repo_path = vcs_backend.get_repository_root(location) 

239 if not repo_path: 

240 continue 

241 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name) 

242 vcs_backends[repo_path] = vcs_backend 

243 

244 if not vcs_backends: 

245 return None 

246 

247 # Choose the VCS in the inner-most directory. Since all repository 

248 # roots found here would be either `location` or one of its 

249 # parents, the longest path should have the most path components, 

250 # i.e. the backend representing the inner-most repository. 

251 inner_most_repo_path = max(vcs_backends, key=len) 

252 return vcs_backends[inner_most_repo_path] 

253 

254 def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]: 

255 """ 

256 Return a VersionControl object or None. 

257 """ 

258 for vcs_backend in self._registry.values(): 

259 if scheme in vcs_backend.schemes: 

260 return vcs_backend 

261 return None 

262 

263 def get_backend(self, name: str) -> Optional["VersionControl"]: 

264 """ 

265 Return a VersionControl object or None. 

266 """ 

267 name = name.lower() 

268 return self._registry.get(name) 

269 

270 

271vcs = VcsSupport() 

272 

273 

274class VersionControl: 

275 name = "" 

276 dirname = "" 

277 repo_name = "" 

278 # List of supported schemes for this Version Control 

279 schemes: Tuple[str, ...] = () 

280 # Iterable of environment variable names to pass to call_subprocess(). 

281 unset_environ: Tuple[str, ...] = () 

282 default_arg_rev: Optional[str] = None 

283 

284 @classmethod 

285 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: 

286 """ 

287 Return whether the vcs prefix (e.g. "git+") should be added to a 

288 repository's remote url when used in a requirement. 

289 """ 

290 return not remote_url.lower().startswith(f"{cls.name}:") 

291 

292 @classmethod 

293 def get_subdirectory(cls, location: str) -> Optional[str]: 

294 """ 

295 Return the path to Python project root, relative to the repo root. 

296 Return None if the project root is in the repo root. 

297 """ 

298 return None 

299 

300 @classmethod 

301 def get_requirement_revision(cls, repo_dir: str) -> str: 

302 """ 

303 Return the revision string that should be used in a requirement. 

304 """ 

305 return cls.get_revision(repo_dir) 

306 

307 @classmethod 

308 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str: 

309 """ 

310 Return the requirement string to use to redownload the files 

311 currently at the given repository directory. 

312 

313 Args: 

314 project_name: the (unescaped) project name. 

315 

316 The return value has a form similar to the following: 

317 

318 {repository_url}@{revision}#egg={project_name} 

319 """ 

320 repo_url = cls.get_remote_url(repo_dir) 

321 

322 if cls.should_add_vcs_url_prefix(repo_url): 

323 repo_url = f"{cls.name}+{repo_url}" 

324 

325 revision = cls.get_requirement_revision(repo_dir) 

326 subdir = cls.get_subdirectory(repo_dir) 

327 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir) 

328 

329 return req 

330 

331 @staticmethod 

332 def get_base_rev_args(rev: str) -> List[str]: 

333 """ 

334 Return the base revision arguments for a vcs command. 

335 

336 Args: 

337 rev: the name of a revision to install. Cannot be None. 

338 """ 

339 raise NotImplementedError 

340 

341 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: 

342 """ 

343 Return true if the commit hash checked out at dest matches 

344 the revision in url. 

345 

346 Always return False, if the VCS does not support immutable commit 

347 hashes. 

348 

349 This method does not check if there are local uncommitted changes 

350 in dest after checkout, as pip currently has no use case for that. 

351 """ 

352 return False 

353 

354 @classmethod 

355 def make_rev_options( 

356 cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None 

357 ) -> RevOptions: 

358 """ 

359 Return a RevOptions object. 

360 

361 Args: 

362 rev: the name of a revision to install. 

363 extra_args: a list of extra options. 

364 """ 

365 return RevOptions(cls, rev, extra_args=extra_args) 

366 

367 @classmethod 

368 def _is_local_repository(cls, repo: str) -> bool: 

369 """ 

370 posix absolute paths start with os.path.sep, 

371 win32 ones start with drive (like c:\\folder) 

372 """ 

373 drive, tail = os.path.splitdrive(repo) 

374 return repo.startswith(os.path.sep) or bool(drive) 

375 

376 @classmethod 

377 def get_netloc_and_auth( 

378 cls, netloc: str, scheme: str 

379 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]: 

380 """ 

381 Parse the repository URL's netloc, and return the new netloc to use 

382 along with auth information. 

383 

384 Args: 

385 netloc: the original repository URL netloc. 

386 scheme: the repository URL's scheme without the vcs prefix. 

387 

388 This is mainly for the Subversion class to override, so that auth 

389 information can be provided via the --username and --password options 

390 instead of through the URL. For other subclasses like Git without 

391 such an option, auth information must stay in the URL. 

392 

393 Returns: (netloc, (username, password)). 

394 """ 

395 return netloc, (None, None) 

396 

397 @classmethod 

398 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: 

399 """ 

400 Parse the repository URL to use, and return the URL, revision, 

401 and auth info to use. 

402 

403 Returns: (url, rev, (username, password)). 

404 """ 

405 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url) 

406 if "+" not in scheme: 

407 raise ValueError( 

408 "Sorry, {!r} is a malformed VCS url. " 

409 "The format is <vcs>+<protocol>://<url>, " 

410 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url) 

411 ) 

412 # Remove the vcs prefix. 

413 scheme = scheme.split("+", 1)[1] 

414 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme) 

415 rev = None 

416 if "@" in path: 

417 path, rev = path.rsplit("@", 1) 

418 if not rev: 

419 raise InstallationError( 

420 "The URL {!r} has an empty revision (after @) " 

421 "which is not supported. Include a revision after @ " 

422 "or remove @ from the URL.".format(url) 

423 ) 

424 url = urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

425 return url, rev, user_pass 

426 

427 @staticmethod 

428 def make_rev_args( 

429 username: Optional[str], password: Optional[HiddenText] 

430 ) -> CommandArgs: 

431 """ 

432 Return the RevOptions "extra arguments" to use in obtain(). 

433 """ 

434 return [] 

435 

436 def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]: 

437 """ 

438 Return the URL and RevOptions object to use in obtain(), 

439 as a tuple (url, rev_options). 

440 """ 

441 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret) 

442 username, secret_password = user_pass 

443 password: Optional[HiddenText] = None 

444 if secret_password is not None: 

445 password = hide_value(secret_password) 

446 extra_args = self.make_rev_args(username, password) 

447 rev_options = self.make_rev_options(rev, extra_args=extra_args) 

448 

449 return hide_url(secret_url), rev_options 

450 

451 @staticmethod 

452 def normalize_url(url: str) -> str: 

453 """ 

454 Normalize a URL for comparison by unquoting it and removing any 

455 trailing slash. 

456 """ 

457 return urllib.parse.unquote(url).rstrip("/") 

458 

459 @classmethod 

460 def compare_urls(cls, url1: str, url2: str) -> bool: 

461 """ 

462 Compare two repo URLs for identity, ignoring incidental differences. 

463 """ 

464 return cls.normalize_url(url1) == cls.normalize_url(url2) 

465 

466 def fetch_new( 

467 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int 

468 ) -> None: 

469 """ 

470 Fetch a revision from a repository, in the case that this is the 

471 first fetch from the repository. 

472 

473 Args: 

474 dest: the directory to fetch the repository to. 

475 rev_options: a RevOptions object. 

476 verbosity: verbosity level. 

477 """ 

478 raise NotImplementedError 

479 

480 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

481 """ 

482 Switch the repo at ``dest`` to point to ``URL``. 

483 

484 Args: 

485 rev_options: a RevOptions object. 

486 """ 

487 raise NotImplementedError 

488 

489 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: 

490 """ 

491 Update an already-existing repo to the given ``rev_options``. 

492 

493 Args: 

494 rev_options: a RevOptions object. 

495 """ 

496 raise NotImplementedError 

497 

498 @classmethod 

499 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: 

500 """ 

501 Return whether the id of the current commit equals the given name. 

502 

503 Args: 

504 dest: the repository directory. 

505 name: a string name. 

506 """ 

507 raise NotImplementedError 

508 

509 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None: 

510 """ 

511 Install or update in editable mode the package represented by this 

512 VersionControl object. 

513 

514 :param dest: the repository directory in which to install or update. 

515 :param url: the repository URL starting with a vcs prefix. 

516 :param verbosity: verbosity level. 

517 """ 

518 url, rev_options = self.get_url_rev_options(url) 

519 

520 if not os.path.exists(dest): 

521 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

522 return 

523 

524 rev_display = rev_options.to_display() 

525 if self.is_repository_directory(dest): 

526 existing_url = self.get_remote_url(dest) 

527 if self.compare_urls(existing_url, url.secret): 

528 logger.debug( 

529 "%s in %s exists, and has correct URL (%s)", 

530 self.repo_name.title(), 

531 display_path(dest), 

532 url, 

533 ) 

534 if not self.is_commit_id_equal(dest, rev_options.rev): 

535 logger.info( 

536 "Updating %s %s%s", 

537 display_path(dest), 

538 self.repo_name, 

539 rev_display, 

540 ) 

541 self.update(dest, url, rev_options) 

542 else: 

543 logger.info("Skipping because already up-to-date.") 

544 return 

545 

546 logger.warning( 

547 "%s %s in %s exists with URL %s", 

548 self.name, 

549 self.repo_name, 

550 display_path(dest), 

551 existing_url, 

552 ) 

553 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b")) 

554 else: 

555 logger.warning( 

556 "Directory %s already exists, and is not a %s %s.", 

557 dest, 

558 self.name, 

559 self.repo_name, 

560 ) 

561 # https://github.com/python/mypy/issues/1174 

562 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore 

563 

564 logger.warning( 

565 "The plan is to install the %s repository %s", 

566 self.name, 

567 url, 

568 ) 

569 response = ask_path_exists("What to do? {}".format(prompt[0]), prompt[1]) 

570 

571 if response == "a": 

572 sys.exit(-1) 

573 

574 if response == "w": 

575 logger.warning("Deleting %s", display_path(dest)) 

576 rmtree(dest) 

577 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

578 return 

579 

580 if response == "b": 

581 dest_dir = backup_dir(dest) 

582 logger.warning("Backing up %s to %s", display_path(dest), dest_dir) 

583 shutil.move(dest, dest_dir) 

584 self.fetch_new(dest, url, rev_options, verbosity=verbosity) 

585 return 

586 

587 # Do nothing if the response is "i". 

588 if response == "s": 

589 logger.info( 

590 "Switching %s %s to %s%s", 

591 self.repo_name, 

592 display_path(dest), 

593 url, 

594 rev_display, 

595 ) 

596 self.switch(dest, url, rev_options) 

597 

598 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None: 

599 """ 

600 Clean up current location and download the url repository 

601 (and vcs infos) into location 

602 

603 :param url: the repository URL starting with a vcs prefix. 

604 :param verbosity: verbosity level. 

605 """ 

606 if os.path.exists(location): 

607 rmtree(location) 

608 self.obtain(location, url=url, verbosity=verbosity) 

609 

610 @classmethod 

611 def get_remote_url(cls, location: str) -> str: 

612 """ 

613 Return the url used at location 

614 

615 Raises RemoteNotFoundError if the repository does not have a remote 

616 url configured. 

617 """ 

618 raise NotImplementedError 

619 

620 @classmethod 

621 def get_revision(cls, location: str) -> str: 

622 """ 

623 Return the current commit id of the files at the given location. 

624 """ 

625 raise NotImplementedError 

626 

627 @classmethod 

628 def run_command( 

629 cls, 

630 cmd: Union[List[str], CommandArgs], 

631 show_stdout: bool = True, 

632 cwd: Optional[str] = None, 

633 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise", 

634 extra_ok_returncodes: Optional[Iterable[int]] = None, 

635 command_desc: Optional[str] = None, 

636 extra_environ: Optional[Mapping[str, Any]] = None, 

637 spinner: Optional[SpinnerInterface] = None, 

638 log_failed_cmd: bool = True, 

639 stdout_only: bool = False, 

640 ) -> str: 

641 """ 

642 Run a VCS subcommand 

643 This is simply a wrapper around call_subprocess that adds the VCS 

644 command name, and checks that the VCS is available 

645 """ 

646 cmd = make_command(cls.name, *cmd) 

647 if command_desc is None: 

648 command_desc = format_command_args(cmd) 

649 try: 

650 return call_subprocess( 

651 cmd, 

652 show_stdout, 

653 cwd, 

654 on_returncode=on_returncode, 

655 extra_ok_returncodes=extra_ok_returncodes, 

656 command_desc=command_desc, 

657 extra_environ=extra_environ, 

658 unset_environ=cls.unset_environ, 

659 spinner=spinner, 

660 log_failed_cmd=log_failed_cmd, 

661 stdout_only=stdout_only, 

662 ) 

663 except FileNotFoundError: 

664 # errno.ENOENT = no such file or directory 

665 # In other words, the VCS executable isn't available 

666 raise BadCommand( 

667 f"Cannot find command {cls.name!r} - do you have " 

668 f"{cls.name!r} installed and in your PATH?" 

669 ) 

670 except PermissionError: 

671 # errno.EACCES = Permission denied 

672 # This error occurs, for instance, when the command is installed 

673 # only for another user. So, the current user don't have 

674 # permission to call the other user command. 

675 raise BadCommand( 

676 f"No permission to execute {cls.name!r} - install it " 

677 f"locally, globally (ask admin), or check your PATH. " 

678 f"See possible solutions at " 

679 f"https://pip.pypa.io/en/latest/reference/pip_freeze/" 

680 f"#fixing-permission-denied." 

681 ) 

682 

683 @classmethod 

684 def is_repository_directory(cls, path: str) -> bool: 

685 """ 

686 Return whether a directory path is a repository directory. 

687 """ 

688 logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name) 

689 return os.path.exists(os.path.join(path, cls.dirname)) 

690 

691 @classmethod 

692 def get_repository_root(cls, location: str) -> Optional[str]: 

693 """ 

694 Return the "root" (top-level) directory controlled by the vcs, 

695 or `None` if the directory is not in any. 

696 

697 It is meant to be overridden to implement smarter detection 

698 mechanisms for specific vcs. 

699 

700 This can do more than is_repository_directory() alone. For 

701 example, the Git override checks that Git is actually available. 

702 """ 

703 if cls.is_repository_directory(location): 

704 return location 

705 return None