Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py: 14%

356 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import functools 

2import os 

3import sys 

4import sysconfig 

5from importlib.util import cache_from_source 

6from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple 

7 

8from pip._internal.exceptions import UninstallationError 

9from pip._internal.locations import get_bin_prefix, get_bin_user 

10from pip._internal.metadata import BaseDistribution 

11from pip._internal.utils.compat import WINDOWS 

12from pip._internal.utils.egg_link import egg_link_path_from_location 

13from pip._internal.utils.logging import getLogger, indent_log 

14from pip._internal.utils.misc import ask, normalize_path, renames, rmtree 

15from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory 

16from pip._internal.utils.virtualenv import running_under_virtualenv 

17 

18logger = getLogger(__name__) 

19 

20 

21def _script_names( 

22 bin_dir: str, script_name: str, is_gui: bool 

23) -> Generator[str, None, None]: 

24 """Create the fully qualified name of the files created by 

25 {console,gui}_scripts for the given ``dist``. 

26 Returns the list of file names 

27 """ 

28 exe_name = os.path.join(bin_dir, script_name) 

29 yield exe_name 

30 if not WINDOWS: 

31 return 

32 yield f"{exe_name}.exe" 

33 yield f"{exe_name}.exe.manifest" 

34 if is_gui: 

35 yield f"{exe_name}-script.pyw" 

36 else: 

37 yield f"{exe_name}-script.py" 

38 

39 

40def _unique( 

41 fn: Callable[..., Generator[Any, None, None]] 

42) -> Callable[..., Generator[Any, None, None]]: 

43 @functools.wraps(fn) 

44 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: 

45 seen: Set[Any] = set() 

46 for item in fn(*args, **kw): 

47 if item not in seen: 

48 seen.add(item) 

49 yield item 

50 

51 return unique 

52 

53 

54@_unique 

55def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: 

56 """ 

57 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] 

58 

59 Yield paths to all the files in RECORD. For each .py file in RECORD, add 

60 the .pyc and .pyo in the same directory. 

61 

62 UninstallPathSet.add() takes care of the __pycache__ .py[co]. 

63 

64 If RECORD is not found, raises UninstallationError, 

65 with possible information from the INSTALLER file. 

66 

67 https://packaging.python.org/specifications/recording-installed-packages/ 

68 """ 

69 location = dist.location 

70 assert location is not None, "not installed" 

71 

72 entries = dist.iter_declared_entries() 

73 if entries is None: 

74 msg = "Cannot uninstall {dist}, RECORD file not found.".format(dist=dist) 

75 installer = dist.installer 

76 if not installer or installer == "pip": 

77 dep = "{}=={}".format(dist.raw_name, dist.version) 

78 msg += ( 

79 " You might be able to recover from this via: " 

80 "'pip install --force-reinstall --no-deps {}'.".format(dep) 

81 ) 

82 else: 

83 msg += " Hint: The package was installed by {}.".format(installer) 

84 raise UninstallationError(msg) 

85 

86 for entry in entries: 

87 path = os.path.join(location, entry) 

88 yield path 

89 if path.endswith(".py"): 

90 dn, fn = os.path.split(path) 

91 base = fn[:-3] 

92 path = os.path.join(dn, base + ".pyc") 

93 yield path 

94 path = os.path.join(dn, base + ".pyo") 

95 yield path 

96 

97 

98def compact(paths: Iterable[str]) -> Set[str]: 

99 """Compact a path set to contain the minimal number of paths 

100 necessary to contain all paths in the set. If /a/path/ and 

101 /a/path/to/a/file.txt are both in the set, leave only the 

102 shorter path.""" 

103 

104 sep = os.path.sep 

105 short_paths: Set[str] = set() 

106 for path in sorted(paths, key=len): 

107 should_skip = any( 

108 path.startswith(shortpath.rstrip("*")) 

109 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep 

110 for shortpath in short_paths 

111 ) 

112 if not should_skip: 

113 short_paths.add(path) 

114 return short_paths 

115 

116 

117def compress_for_rename(paths: Iterable[str]) -> Set[str]: 

118 """Returns a set containing the paths that need to be renamed. 

119 

120 This set may include directories when the original sequence of paths 

121 included every file on disk. 

122 """ 

123 case_map = {os.path.normcase(p): p for p in paths} 

124 remaining = set(case_map) 

125 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) 

126 wildcards: Set[str] = set() 

127 

128 def norm_join(*a: str) -> str: 

129 return os.path.normcase(os.path.join(*a)) 

130 

131 for root in unchecked: 

132 if any(os.path.normcase(root).startswith(w) for w in wildcards): 

133 # This directory has already been handled. 

134 continue 

135 

136 all_files: Set[str] = set() 

137 all_subdirs: Set[str] = set() 

138 for dirname, subdirs, files in os.walk(root): 

139 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) 

140 all_files.update(norm_join(root, dirname, f) for f in files) 

141 # If all the files we found are in our remaining set of files to 

142 # remove, then remove them from the latter set and add a wildcard 

143 # for the directory. 

144 if not (all_files - remaining): 

145 remaining.difference_update(all_files) 

146 wildcards.add(root + os.sep) 

147 

148 return set(map(case_map.__getitem__, remaining)) | wildcards 

149 

150 

151def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]: 

152 """Returns a tuple of 2 sets of which paths to display to user 

153 

154 The first set contains paths that would be deleted. Files of a package 

155 are not added and the top-level directory of the package has a '*' added 

156 at the end - to signify that all it's contents are removed. 

157 

158 The second set contains files that would have been skipped in the above 

159 folders. 

160 """ 

161 

162 will_remove = set(paths) 

163 will_skip = set() 

164 

165 # Determine folders and files 

166 folders = set() 

167 files = set() 

168 for path in will_remove: 

169 if path.endswith(".pyc"): 

170 continue 

171 if path.endswith("__init__.py") or ".dist-info" in path: 

172 folders.add(os.path.dirname(path)) 

173 files.add(path) 

174 

175 # probably this one https://github.com/python/mypy/issues/390 

176 _normcased_files = set(map(os.path.normcase, files)) # type: ignore 

177 

178 folders = compact(folders) 

179 

180 # This walks the tree using os.walk to not miss extra folders 

181 # that might get added. 

182 for folder in folders: 

183 for dirpath, _, dirfiles in os.walk(folder): 

184 for fname in dirfiles: 

185 if fname.endswith(".pyc"): 

186 continue 

187 

188 file_ = os.path.join(dirpath, fname) 

189 if ( 

190 os.path.isfile(file_) 

191 and os.path.normcase(file_) not in _normcased_files 

192 ): 

193 # We are skipping this file. Add it to the set. 

194 will_skip.add(file_) 

195 

196 will_remove = files | {os.path.join(folder, "*") for folder in folders} 

197 

198 return will_remove, will_skip 

199 

200 

201class StashedUninstallPathSet: 

202 """A set of file rename operations to stash files while 

203 tentatively uninstalling them.""" 

204 

205 def __init__(self) -> None: 

206 # Mapping from source file root to [Adjacent]TempDirectory 

207 # for files under that directory. 

208 self._save_dirs: Dict[str, TempDirectory] = {} 

209 # (old path, new path) tuples for each move that may need 

210 # to be undone. 

211 self._moves: List[Tuple[str, str]] = [] 

212 

213 def _get_directory_stash(self, path: str) -> str: 

214 """Stashes a directory. 

215 

216 Directories are stashed adjacent to their original location if 

217 possible, or else moved/copied into the user's temp dir.""" 

218 

219 try: 

220 save_dir: TempDirectory = AdjacentTempDirectory(path) 

221 except OSError: 

222 save_dir = TempDirectory(kind="uninstall") 

223 self._save_dirs[os.path.normcase(path)] = save_dir 

224 

225 return save_dir.path 

226 

227 def _get_file_stash(self, path: str) -> str: 

228 """Stashes a file. 

229 

230 If no root has been provided, one will be created for the directory 

231 in the user's temp directory.""" 

232 path = os.path.normcase(path) 

233 head, old_head = os.path.dirname(path), None 

234 save_dir = None 

235 

236 while head != old_head: 

237 try: 

238 save_dir = self._save_dirs[head] 

239 break 

240 except KeyError: 

241 pass 

242 head, old_head = os.path.dirname(head), head 

243 else: 

244 # Did not find any suitable root 

245 head = os.path.dirname(path) 

246 save_dir = TempDirectory(kind="uninstall") 

247 self._save_dirs[head] = save_dir 

248 

249 relpath = os.path.relpath(path, head) 

250 if relpath and relpath != os.path.curdir: 

251 return os.path.join(save_dir.path, relpath) 

252 return save_dir.path 

253 

254 def stash(self, path: str) -> str: 

255 """Stashes the directory or file and returns its new location. 

256 Handle symlinks as files to avoid modifying the symlink targets. 

257 """ 

258 path_is_dir = os.path.isdir(path) and not os.path.islink(path) 

259 if path_is_dir: 

260 new_path = self._get_directory_stash(path) 

261 else: 

262 new_path = self._get_file_stash(path) 

263 

264 self._moves.append((path, new_path)) 

265 if path_is_dir and os.path.isdir(new_path): 

266 # If we're moving a directory, we need to 

267 # remove the destination first or else it will be 

268 # moved to inside the existing directory. 

269 # We just created new_path ourselves, so it will 

270 # be removable. 

271 os.rmdir(new_path) 

272 renames(path, new_path) 

273 return new_path 

274 

275 def commit(self) -> None: 

276 """Commits the uninstall by removing stashed files.""" 

277 for _, save_dir in self._save_dirs.items(): 

278 save_dir.cleanup() 

279 self._moves = [] 

280 self._save_dirs = {} 

281 

282 def rollback(self) -> None: 

283 """Undoes the uninstall by moving stashed files back.""" 

284 for p in self._moves: 

285 logger.info("Moving to %s\n from %s", *p) 

286 

287 for new_path, path in self._moves: 

288 try: 

289 logger.debug("Replacing %s from %s", new_path, path) 

290 if os.path.isfile(new_path) or os.path.islink(new_path): 

291 os.unlink(new_path) 

292 elif os.path.isdir(new_path): 

293 rmtree(new_path) 

294 renames(path, new_path) 

295 except OSError as ex: 

296 logger.error("Failed to restore %s", new_path) 

297 logger.debug("Exception: %s", ex) 

298 

299 self.commit() 

300 

301 @property 

302 def can_rollback(self) -> bool: 

303 return bool(self._moves) 

304 

305 

306class UninstallPathSet: 

307 """A set of file paths to be removed in the uninstallation of a 

308 requirement.""" 

309 

310 def __init__(self, dist: BaseDistribution) -> None: 

311 self._paths: Set[str] = set() 

312 self._refuse: Set[str] = set() 

313 self._pth: Dict[str, UninstallPthEntries] = {} 

314 self._dist = dist 

315 self._moved_paths = StashedUninstallPathSet() 

316 # Create local cache of normalize_path results. Creating an UninstallPathSet 

317 # can result in hundreds/thousands of redundant calls to normalize_path with 

318 # the same args, which hurts performance. 

319 self._normalize_path_cached = functools.lru_cache()(normalize_path) 

320 

321 def _permitted(self, path: str) -> bool: 

322 """ 

323 Return True if the given path is one we are permitted to 

324 remove/modify, False otherwise. 

325 

326 """ 

327 # aka is_local, but caching normalized sys.prefix 

328 if not running_under_virtualenv(): 

329 return True 

330 return path.startswith(self._normalize_path_cached(sys.prefix)) 

331 

332 def add(self, path: str) -> None: 

333 head, tail = os.path.split(path) 

334 

335 # we normalize the head to resolve parent directory symlinks, but not 

336 # the tail, since we only want to uninstall symlinks, not their targets 

337 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) 

338 

339 if not os.path.exists(path): 

340 return 

341 if self._permitted(path): 

342 self._paths.add(path) 

343 else: 

344 self._refuse.add(path) 

345 

346 # __pycache__ files can show up after 'installed-files.txt' is created, 

347 # due to imports 

348 if os.path.splitext(path)[1] == ".py": 

349 self.add(cache_from_source(path)) 

350 

351 def add_pth(self, pth_file: str, entry: str) -> None: 

352 pth_file = self._normalize_path_cached(pth_file) 

353 if self._permitted(pth_file): 

354 if pth_file not in self._pth: 

355 self._pth[pth_file] = UninstallPthEntries(pth_file) 

356 self._pth[pth_file].add(entry) 

357 else: 

358 self._refuse.add(pth_file) 

359 

360 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: 

361 """Remove paths in ``self._paths`` with confirmation (unless 

362 ``auto_confirm`` is True).""" 

363 

364 if not self._paths: 

365 logger.info( 

366 "Can't uninstall '%s'. No files were found to uninstall.", 

367 self._dist.raw_name, 

368 ) 

369 return 

370 

371 dist_name_version = f"{self._dist.raw_name}-{self._dist.version}" 

372 logger.info("Uninstalling %s:", dist_name_version) 

373 

374 with indent_log(): 

375 if auto_confirm or self._allowed_to_proceed(verbose): 

376 moved = self._moved_paths 

377 

378 for_rename = compress_for_rename(self._paths) 

379 

380 for path in sorted(compact(for_rename)): 

381 moved.stash(path) 

382 logger.verbose("Removing file or directory %s", path) 

383 

384 for pth in self._pth.values(): 

385 pth.remove() 

386 

387 logger.info("Successfully uninstalled %s", dist_name_version) 

388 

389 def _allowed_to_proceed(self, verbose: bool) -> bool: 

390 """Display which files would be deleted and prompt for confirmation""" 

391 

392 def _display(msg: str, paths: Iterable[str]) -> None: 

393 if not paths: 

394 return 

395 

396 logger.info(msg) 

397 with indent_log(): 

398 for path in sorted(compact(paths)): 

399 logger.info(path) 

400 

401 if not verbose: 

402 will_remove, will_skip = compress_for_output_listing(self._paths) 

403 else: 

404 # In verbose mode, display all the files that are going to be 

405 # deleted. 

406 will_remove = set(self._paths) 

407 will_skip = set() 

408 

409 _display("Would remove:", will_remove) 

410 _display("Would not remove (might be manually added):", will_skip) 

411 _display("Would not remove (outside of prefix):", self._refuse) 

412 if verbose: 

413 _display("Will actually move:", compress_for_rename(self._paths)) 

414 

415 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" 

416 

417 def rollback(self) -> None: 

418 """Rollback the changes previously made by remove().""" 

419 if not self._moved_paths.can_rollback: 

420 logger.error( 

421 "Can't roll back %s; was not uninstalled", 

422 self._dist.raw_name, 

423 ) 

424 return 

425 logger.info("Rolling back uninstall of %s", self._dist.raw_name) 

426 self._moved_paths.rollback() 

427 for pth in self._pth.values(): 

428 pth.rollback() 

429 

430 def commit(self) -> None: 

431 """Remove temporary save dir: rollback will no longer be possible.""" 

432 self._moved_paths.commit() 

433 

434 @classmethod 

435 def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet": 

436 dist_location = dist.location 

437 info_location = dist.info_location 

438 if dist_location is None: 

439 logger.info( 

440 "Not uninstalling %s since it is not installed", 

441 dist.canonical_name, 

442 ) 

443 return cls(dist) 

444 

445 normalized_dist_location = normalize_path(dist_location) 

446 if not dist.local: 

447 logger.info( 

448 "Not uninstalling %s at %s, outside environment %s", 

449 dist.canonical_name, 

450 normalized_dist_location, 

451 sys.prefix, 

452 ) 

453 return cls(dist) 

454 

455 if normalized_dist_location in { 

456 p 

457 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} 

458 if p 

459 }: 

460 logger.info( 

461 "Not uninstalling %s at %s, as it is in the standard library.", 

462 dist.canonical_name, 

463 normalized_dist_location, 

464 ) 

465 return cls(dist) 

466 

467 paths_to_remove = cls(dist) 

468 develop_egg_link = egg_link_path_from_location(dist.raw_name) 

469 

470 # Distribution is installed with metadata in a "flat" .egg-info 

471 # directory. This means it is not a modern .dist-info installation, an 

472 # egg, or legacy editable. 

473 setuptools_flat_installation = ( 

474 dist.installed_with_setuptools_egg_info 

475 and info_location is not None 

476 and os.path.exists(info_location) 

477 # If dist is editable and the location points to a ``.egg-info``, 

478 # we are in fact in the legacy editable case. 

479 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") 

480 ) 

481 

482 # Uninstall cases order do matter as in the case of 2 installs of the 

483 # same package, pip needs to uninstall the currently detected version 

484 if setuptools_flat_installation: 

485 if info_location is not None: 

486 paths_to_remove.add(info_location) 

487 installed_files = dist.iter_declared_entries() 

488 if installed_files is not None: 

489 for installed_file in installed_files: 

490 paths_to_remove.add(os.path.join(dist_location, installed_file)) 

491 # FIXME: need a test for this elif block 

492 # occurs with --single-version-externally-managed/--record outside 

493 # of pip 

494 elif dist.is_file("top_level.txt"): 

495 try: 

496 namespace_packages = dist.read_text("namespace_packages.txt") 

497 except FileNotFoundError: 

498 namespaces = [] 

499 else: 

500 namespaces = namespace_packages.splitlines(keepends=False) 

501 for top_level_pkg in [ 

502 p 

503 for p in dist.read_text("top_level.txt").splitlines() 

504 if p and p not in namespaces 

505 ]: 

506 path = os.path.join(dist_location, top_level_pkg) 

507 paths_to_remove.add(path) 

508 paths_to_remove.add(f"{path}.py") 

509 paths_to_remove.add(f"{path}.pyc") 

510 paths_to_remove.add(f"{path}.pyo") 

511 

512 elif dist.installed_by_distutils: 

513 raise UninstallationError( 

514 "Cannot uninstall {!r}. It is a distutils installed project " 

515 "and thus we cannot accurately determine which files belong " 

516 "to it which would lead to only a partial uninstall.".format( 

517 dist.raw_name, 

518 ) 

519 ) 

520 

521 elif dist.installed_as_egg: 

522 # package installed by easy_install 

523 # We cannot match on dist.egg_name because it can slightly vary 

524 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg 

525 paths_to_remove.add(dist_location) 

526 easy_install_egg = os.path.split(dist_location)[1] 

527 easy_install_pth = os.path.join( 

528 os.path.dirname(dist_location), 

529 "easy-install.pth", 

530 ) 

531 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) 

532 

533 elif dist.installed_with_dist_info: 

534 for path in uninstallation_paths(dist): 

535 paths_to_remove.add(path) 

536 

537 elif develop_egg_link: 

538 # PEP 660 modern editable is handled in the ``.dist-info`` case 

539 # above, so this only covers the setuptools-style editable. 

540 with open(develop_egg_link) as fh: 

541 link_pointer = os.path.normcase(fh.readline().strip()) 

542 normalized_link_pointer = paths_to_remove._normalize_path_cached( 

543 link_pointer 

544 ) 

545 assert os.path.samefile( 

546 normalized_link_pointer, normalized_dist_location 

547 ), ( 

548 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " 

549 f"installed location of {dist.raw_name} (at {dist_location})" 

550 ) 

551 paths_to_remove.add(develop_egg_link) 

552 easy_install_pth = os.path.join( 

553 os.path.dirname(develop_egg_link), "easy-install.pth" 

554 ) 

555 paths_to_remove.add_pth(easy_install_pth, dist_location) 

556 

557 else: 

558 logger.debug( 

559 "Not sure how to uninstall: %s - Check: %s", 

560 dist, 

561 dist_location, 

562 ) 

563 

564 if dist.in_usersite: 

565 bin_dir = get_bin_user() 

566 else: 

567 bin_dir = get_bin_prefix() 

568 

569 # find distutils scripts= scripts 

570 try: 

571 for script in dist.iter_distutils_script_names(): 

572 paths_to_remove.add(os.path.join(bin_dir, script)) 

573 if WINDOWS: 

574 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) 

575 except (FileNotFoundError, NotADirectoryError): 

576 pass 

577 

578 # find console_scripts and gui_scripts 

579 def iter_scripts_to_remove( 

580 dist: BaseDistribution, 

581 bin_dir: str, 

582 ) -> Generator[str, None, None]: 

583 for entry_point in dist.iter_entry_points(): 

584 if entry_point.group == "console_scripts": 

585 yield from _script_names(bin_dir, entry_point.name, False) 

586 elif entry_point.group == "gui_scripts": 

587 yield from _script_names(bin_dir, entry_point.name, True) 

588 

589 for s in iter_scripts_to_remove(dist, bin_dir): 

590 paths_to_remove.add(s) 

591 

592 return paths_to_remove 

593 

594 

595class UninstallPthEntries: 

596 def __init__(self, pth_file: str) -> None: 

597 self.file = pth_file 

598 self.entries: Set[str] = set() 

599 self._saved_lines: Optional[List[bytes]] = None 

600 

601 def add(self, entry: str) -> None: 

602 entry = os.path.normcase(entry) 

603 # On Windows, os.path.normcase converts the entry to use 

604 # backslashes. This is correct for entries that describe absolute 

605 # paths outside of site-packages, but all the others use forward 

606 # slashes. 

607 # os.path.splitdrive is used instead of os.path.isabs because isabs 

608 # treats non-absolute paths with drive letter markings like c:foo\bar 

609 # as absolute paths. It also does not recognize UNC paths if they don't 

610 # have more than "\\sever\share". Valid examples: "\\server\share\" or 

611 # "\\server\share\folder". 

612 if WINDOWS and not os.path.splitdrive(entry)[0]: 

613 entry = entry.replace("\\", "/") 

614 self.entries.add(entry) 

615 

616 def remove(self) -> None: 

617 logger.verbose("Removing pth entries from %s:", self.file) 

618 

619 # If the file doesn't exist, log a warning and return 

620 if not os.path.isfile(self.file): 

621 logger.warning("Cannot remove entries from nonexistent file %s", self.file) 

622 return 

623 with open(self.file, "rb") as fh: 

624 # windows uses '\r\n' with py3k, but uses '\n' with py2.x 

625 lines = fh.readlines() 

626 self._saved_lines = lines 

627 if any(b"\r\n" in line for line in lines): 

628 endline = "\r\n" 

629 else: 

630 endline = "\n" 

631 # handle missing trailing newline 

632 if lines and not lines[-1].endswith(endline.encode("utf-8")): 

633 lines[-1] = lines[-1] + endline.encode("utf-8") 

634 for entry in self.entries: 

635 try: 

636 logger.verbose("Removing entry: %s", entry) 

637 lines.remove((entry + endline).encode("utf-8")) 

638 except ValueError: 

639 pass 

640 with open(self.file, "wb") as fh: 

641 fh.writelines(lines) 

642 

643 def rollback(self) -> bool: 

644 if self._saved_lines is None: 

645 logger.error("Cannot roll back changes to %s, none were made", self.file) 

646 return False 

647 logger.debug("Rolling %s back to previous state", self.file) 

648 with open(self.file, "wb") as fh: 

649 fh.writelines(self._saved_lines) 

650 return True