Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py: 14%

356 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1import functools 

2import os 

3import sys 

4import sysconfig 

5from importlib.util import cache_from_source 

6from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple 

7 

8from pip._internal.exceptions import UninstallationError 

9from pip._internal.locations import get_bin_prefix, get_bin_user 

10from pip._internal.metadata import BaseDistribution 

11from pip._internal.utils.compat import WINDOWS 

12from pip._internal.utils.egg_link import egg_link_path_from_location 

13from pip._internal.utils.logging import getLogger, indent_log 

14from pip._internal.utils.misc import ask, normalize_path, renames, rmtree 

15from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory 

16from pip._internal.utils.virtualenv import running_under_virtualenv 

17 

18logger = getLogger(__name__) 

19 

20 

21def _script_names( 

22 bin_dir: str, script_name: str, is_gui: bool 

23) -> Generator[str, None, None]: 

24 """Create the fully qualified name of the files created by 

25 {console,gui}_scripts for the given ``dist``. 

26 Returns the list of file names 

27 """ 

28 exe_name = os.path.join(bin_dir, script_name) 

29 yield exe_name 

30 if not WINDOWS: 

31 return 

32 yield f"{exe_name}.exe" 

33 yield f"{exe_name}.exe.manifest" 

34 if is_gui: 

35 yield f"{exe_name}-script.pyw" 

36 else: 

37 yield f"{exe_name}-script.py" 

38 

39 

40def _unique( 

41 fn: Callable[..., Generator[Any, None, None]] 

42) -> Callable[..., Generator[Any, None, None]]: 

43 @functools.wraps(fn) 

44 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: 

45 seen: Set[Any] = set() 

46 for item in fn(*args, **kw): 

47 if item not in seen: 

48 seen.add(item) 

49 yield item 

50 

51 return unique 

52 

53 

54@_unique 

55def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: 

56 """ 

57 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] 

58 

59 Yield paths to all the files in RECORD. For each .py file in RECORD, add 

60 the .pyc and .pyo in the same directory. 

61 

62 UninstallPathSet.add() takes care of the __pycache__ .py[co]. 

63 

64 If RECORD is not found, raises UninstallationError, 

65 with possible information from the INSTALLER file. 

66 

67 https://packaging.python.org/specifications/recording-installed-packages/ 

68 """ 

69 location = dist.location 

70 assert location is not None, "not installed" 

71 

72 entries = dist.iter_declared_entries() 

73 if entries is None: 

74 msg = f"Cannot uninstall {dist}, RECORD file not found." 

75 installer = dist.installer 

76 if not installer or installer == "pip": 

77 dep = f"{dist.raw_name}=={dist.version}" 

78 msg += ( 

79 " You might be able to recover from this via: " 

80 f"'pip install --force-reinstall --no-deps {dep}'." 

81 ) 

82 else: 

83 msg += f" Hint: The package was installed by {installer}." 

84 raise UninstallationError(msg) 

85 

86 for entry in entries: 

87 path = os.path.join(location, entry) 

88 yield path 

89 if path.endswith(".py"): 

90 dn, fn = os.path.split(path) 

91 base = fn[:-3] 

92 path = os.path.join(dn, base + ".pyc") 

93 yield path 

94 path = os.path.join(dn, base + ".pyo") 

95 yield path 

96 

97 

98def compact(paths: Iterable[str]) -> Set[str]: 

99 """Compact a path set to contain the minimal number of paths 

100 necessary to contain all paths in the set. If /a/path/ and 

101 /a/path/to/a/file.txt are both in the set, leave only the 

102 shorter path.""" 

103 

104 sep = os.path.sep 

105 short_paths: Set[str] = set() 

106 for path in sorted(paths, key=len): 

107 should_skip = any( 

108 path.startswith(shortpath.rstrip("*")) 

109 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep 

110 for shortpath in short_paths 

111 ) 

112 if not should_skip: 

113 short_paths.add(path) 

114 return short_paths 

115 

116 

117def compress_for_rename(paths: Iterable[str]) -> Set[str]: 

118 """Returns a set containing the paths that need to be renamed. 

119 

120 This set may include directories when the original sequence of paths 

121 included every file on disk. 

122 """ 

123 case_map = {os.path.normcase(p): p for p in paths} 

124 remaining = set(case_map) 

125 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) 

126 wildcards: Set[str] = set() 

127 

128 def norm_join(*a: str) -> str: 

129 return os.path.normcase(os.path.join(*a)) 

130 

131 for root in unchecked: 

132 if any(os.path.normcase(root).startswith(w) for w in wildcards): 

133 # This directory has already been handled. 

134 continue 

135 

136 all_files: Set[str] = set() 

137 all_subdirs: Set[str] = set() 

138 for dirname, subdirs, files in os.walk(root): 

139 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) 

140 all_files.update(norm_join(root, dirname, f) for f in files) 

141 # If all the files we found are in our remaining set of files to 

142 # remove, then remove them from the latter set and add a wildcard 

143 # for the directory. 

144 if not (all_files - remaining): 

145 remaining.difference_update(all_files) 

146 wildcards.add(root + os.sep) 

147 

148 return set(map(case_map.__getitem__, remaining)) | wildcards 

149 

150 

151def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]: 

152 """Returns a tuple of 2 sets of which paths to display to user 

153 

154 The first set contains paths that would be deleted. Files of a package 

155 are not added and the top-level directory of the package has a '*' added 

156 at the end - to signify that all it's contents are removed. 

157 

158 The second set contains files that would have been skipped in the above 

159 folders. 

160 """ 

161 

162 will_remove = set(paths) 

163 will_skip = set() 

164 

165 # Determine folders and files 

166 folders = set() 

167 files = set() 

168 for path in will_remove: 

169 if path.endswith(".pyc"): 

170 continue 

171 if path.endswith("__init__.py") or ".dist-info" in path: 

172 folders.add(os.path.dirname(path)) 

173 files.add(path) 

174 

175 _normcased_files = set(map(os.path.normcase, files)) 

176 

177 folders = compact(folders) 

178 

179 # This walks the tree using os.walk to not miss extra folders 

180 # that might get added. 

181 for folder in folders: 

182 for dirpath, _, dirfiles in os.walk(folder): 

183 for fname in dirfiles: 

184 if fname.endswith(".pyc"): 

185 continue 

186 

187 file_ = os.path.join(dirpath, fname) 

188 if ( 

189 os.path.isfile(file_) 

190 and os.path.normcase(file_) not in _normcased_files 

191 ): 

192 # We are skipping this file. Add it to the set. 

193 will_skip.add(file_) 

194 

195 will_remove = files | {os.path.join(folder, "*") for folder in folders} 

196 

197 return will_remove, will_skip 

198 

199 

200class StashedUninstallPathSet: 

201 """A set of file rename operations to stash files while 

202 tentatively uninstalling them.""" 

203 

204 def __init__(self) -> None: 

205 # Mapping from source file root to [Adjacent]TempDirectory 

206 # for files under that directory. 

207 self._save_dirs: Dict[str, TempDirectory] = {} 

208 # (old path, new path) tuples for each move that may need 

209 # to be undone. 

210 self._moves: List[Tuple[str, str]] = [] 

211 

212 def _get_directory_stash(self, path: str) -> str: 

213 """Stashes a directory. 

214 

215 Directories are stashed adjacent to their original location if 

216 possible, or else moved/copied into the user's temp dir.""" 

217 

218 try: 

219 save_dir: TempDirectory = AdjacentTempDirectory(path) 

220 except OSError: 

221 save_dir = TempDirectory(kind="uninstall") 

222 self._save_dirs[os.path.normcase(path)] = save_dir 

223 

224 return save_dir.path 

225 

226 def _get_file_stash(self, path: str) -> str: 

227 """Stashes a file. 

228 

229 If no root has been provided, one will be created for the directory 

230 in the user's temp directory.""" 

231 path = os.path.normcase(path) 

232 head, old_head = os.path.dirname(path), None 

233 save_dir = None 

234 

235 while head != old_head: 

236 try: 

237 save_dir = self._save_dirs[head] 

238 break 

239 except KeyError: 

240 pass 

241 head, old_head = os.path.dirname(head), head 

242 else: 

243 # Did not find any suitable root 

244 head = os.path.dirname(path) 

245 save_dir = TempDirectory(kind="uninstall") 

246 self._save_dirs[head] = save_dir 

247 

248 relpath = os.path.relpath(path, head) 

249 if relpath and relpath != os.path.curdir: 

250 return os.path.join(save_dir.path, relpath) 

251 return save_dir.path 

252 

253 def stash(self, path: str) -> str: 

254 """Stashes the directory or file and returns its new location. 

255 Handle symlinks as files to avoid modifying the symlink targets. 

256 """ 

257 path_is_dir = os.path.isdir(path) and not os.path.islink(path) 

258 if path_is_dir: 

259 new_path = self._get_directory_stash(path) 

260 else: 

261 new_path = self._get_file_stash(path) 

262 

263 self._moves.append((path, new_path)) 

264 if path_is_dir and os.path.isdir(new_path): 

265 # If we're moving a directory, we need to 

266 # remove the destination first or else it will be 

267 # moved to inside the existing directory. 

268 # We just created new_path ourselves, so it will 

269 # be removable. 

270 os.rmdir(new_path) 

271 renames(path, new_path) 

272 return new_path 

273 

274 def commit(self) -> None: 

275 """Commits the uninstall by removing stashed files.""" 

276 for save_dir in self._save_dirs.values(): 

277 save_dir.cleanup() 

278 self._moves = [] 

279 self._save_dirs = {} 

280 

281 def rollback(self) -> None: 

282 """Undoes the uninstall by moving stashed files back.""" 

283 for p in self._moves: 

284 logger.info("Moving to %s\n from %s", *p) 

285 

286 for new_path, path in self._moves: 

287 try: 

288 logger.debug("Replacing %s from %s", new_path, path) 

289 if os.path.isfile(new_path) or os.path.islink(new_path): 

290 os.unlink(new_path) 

291 elif os.path.isdir(new_path): 

292 rmtree(new_path) 

293 renames(path, new_path) 

294 except OSError as ex: 

295 logger.error("Failed to restore %s", new_path) 

296 logger.debug("Exception: %s", ex) 

297 

298 self.commit() 

299 

300 @property 

301 def can_rollback(self) -> bool: 

302 return bool(self._moves) 

303 

304 

305class UninstallPathSet: 

306 """A set of file paths to be removed in the uninstallation of a 

307 requirement.""" 

308 

309 def __init__(self, dist: BaseDistribution) -> None: 

310 self._paths: Set[str] = set() 

311 self._refuse: Set[str] = set() 

312 self._pth: Dict[str, UninstallPthEntries] = {} 

313 self._dist = dist 

314 self._moved_paths = StashedUninstallPathSet() 

315 # Create local cache of normalize_path results. Creating an UninstallPathSet 

316 # can result in hundreds/thousands of redundant calls to normalize_path with 

317 # the same args, which hurts performance. 

318 self._normalize_path_cached = functools.lru_cache(normalize_path) 

319 

320 def _permitted(self, path: str) -> bool: 

321 """ 

322 Return True if the given path is one we are permitted to 

323 remove/modify, False otherwise. 

324 

325 """ 

326 # aka is_local, but caching normalized sys.prefix 

327 if not running_under_virtualenv(): 

328 return True 

329 return path.startswith(self._normalize_path_cached(sys.prefix)) 

330 

331 def add(self, path: str) -> None: 

332 head, tail = os.path.split(path) 

333 

334 # we normalize the head to resolve parent directory symlinks, but not 

335 # the tail, since we only want to uninstall symlinks, not their targets 

336 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) 

337 

338 if not os.path.exists(path): 

339 return 

340 if self._permitted(path): 

341 self._paths.add(path) 

342 else: 

343 self._refuse.add(path) 

344 

345 # __pycache__ files can show up after 'installed-files.txt' is created, 

346 # due to imports 

347 if os.path.splitext(path)[1] == ".py": 

348 self.add(cache_from_source(path)) 

349 

350 def add_pth(self, pth_file: str, entry: str) -> None: 

351 pth_file = self._normalize_path_cached(pth_file) 

352 if self._permitted(pth_file): 

353 if pth_file not in self._pth: 

354 self._pth[pth_file] = UninstallPthEntries(pth_file) 

355 self._pth[pth_file].add(entry) 

356 else: 

357 self._refuse.add(pth_file) 

358 

359 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: 

360 """Remove paths in ``self._paths`` with confirmation (unless 

361 ``auto_confirm`` is True).""" 

362 

363 if not self._paths: 

364 logger.info( 

365 "Can't uninstall '%s'. No files were found to uninstall.", 

366 self._dist.raw_name, 

367 ) 

368 return 

369 

370 dist_name_version = f"{self._dist.raw_name}-{self._dist.version}" 

371 logger.info("Uninstalling %s:", dist_name_version) 

372 

373 with indent_log(): 

374 if auto_confirm or self._allowed_to_proceed(verbose): 

375 moved = self._moved_paths 

376 

377 for_rename = compress_for_rename(self._paths) 

378 

379 for path in sorted(compact(for_rename)): 

380 moved.stash(path) 

381 logger.verbose("Removing file or directory %s", path) 

382 

383 for pth in self._pth.values(): 

384 pth.remove() 

385 

386 logger.info("Successfully uninstalled %s", dist_name_version) 

387 

388 def _allowed_to_proceed(self, verbose: bool) -> bool: 

389 """Display which files would be deleted and prompt for confirmation""" 

390 

391 def _display(msg: str, paths: Iterable[str]) -> None: 

392 if not paths: 

393 return 

394 

395 logger.info(msg) 

396 with indent_log(): 

397 for path in sorted(compact(paths)): 

398 logger.info(path) 

399 

400 if not verbose: 

401 will_remove, will_skip = compress_for_output_listing(self._paths) 

402 else: 

403 # In verbose mode, display all the files that are going to be 

404 # deleted. 

405 will_remove = set(self._paths) 

406 will_skip = set() 

407 

408 _display("Would remove:", will_remove) 

409 _display("Would not remove (might be manually added):", will_skip) 

410 _display("Would not remove (outside of prefix):", self._refuse) 

411 if verbose: 

412 _display("Will actually move:", compress_for_rename(self._paths)) 

413 

414 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" 

415 

416 def rollback(self) -> None: 

417 """Rollback the changes previously made by remove().""" 

418 if not self._moved_paths.can_rollback: 

419 logger.error( 

420 "Can't roll back %s; was not uninstalled", 

421 self._dist.raw_name, 

422 ) 

423 return 

424 logger.info("Rolling back uninstall of %s", self._dist.raw_name) 

425 self._moved_paths.rollback() 

426 for pth in self._pth.values(): 

427 pth.rollback() 

428 

429 def commit(self) -> None: 

430 """Remove temporary save dir: rollback will no longer be possible.""" 

431 self._moved_paths.commit() 

432 

433 @classmethod 

434 def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet": 

435 dist_location = dist.location 

436 info_location = dist.info_location 

437 if dist_location is None: 

438 logger.info( 

439 "Not uninstalling %s since it is not installed", 

440 dist.canonical_name, 

441 ) 

442 return cls(dist) 

443 

444 normalized_dist_location = normalize_path(dist_location) 

445 if not dist.local: 

446 logger.info( 

447 "Not uninstalling %s at %s, outside environment %s", 

448 dist.canonical_name, 

449 normalized_dist_location, 

450 sys.prefix, 

451 ) 

452 return cls(dist) 

453 

454 if normalized_dist_location in { 

455 p 

456 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} 

457 if p 

458 }: 

459 logger.info( 

460 "Not uninstalling %s at %s, as it is in the standard library.", 

461 dist.canonical_name, 

462 normalized_dist_location, 

463 ) 

464 return cls(dist) 

465 

466 paths_to_remove = cls(dist) 

467 develop_egg_link = egg_link_path_from_location(dist.raw_name) 

468 

469 # Distribution is installed with metadata in a "flat" .egg-info 

470 # directory. This means it is not a modern .dist-info installation, an 

471 # egg, or legacy editable. 

472 setuptools_flat_installation = ( 

473 dist.installed_with_setuptools_egg_info 

474 and info_location is not None 

475 and os.path.exists(info_location) 

476 # If dist is editable and the location points to a ``.egg-info``, 

477 # we are in fact in the legacy editable case. 

478 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") 

479 ) 

480 

481 # Uninstall cases order do matter as in the case of 2 installs of the 

482 # same package, pip needs to uninstall the currently detected version 

483 if setuptools_flat_installation: 

484 if info_location is not None: 

485 paths_to_remove.add(info_location) 

486 installed_files = dist.iter_declared_entries() 

487 if installed_files is not None: 

488 for installed_file in installed_files: 

489 paths_to_remove.add(os.path.join(dist_location, installed_file)) 

490 # FIXME: need a test for this elif block 

491 # occurs with --single-version-externally-managed/--record outside 

492 # of pip 

493 elif dist.is_file("top_level.txt"): 

494 try: 

495 namespace_packages = dist.read_text("namespace_packages.txt") 

496 except FileNotFoundError: 

497 namespaces = [] 

498 else: 

499 namespaces = namespace_packages.splitlines(keepends=False) 

500 for top_level_pkg in [ 

501 p 

502 for p in dist.read_text("top_level.txt").splitlines() 

503 if p and p not in namespaces 

504 ]: 

505 path = os.path.join(dist_location, top_level_pkg) 

506 paths_to_remove.add(path) 

507 paths_to_remove.add(f"{path}.py") 

508 paths_to_remove.add(f"{path}.pyc") 

509 paths_to_remove.add(f"{path}.pyo") 

510 

511 elif dist.installed_by_distutils: 

512 raise UninstallationError( 

513 "Cannot uninstall {!r}. It is a distutils installed project " 

514 "and thus we cannot accurately determine which files belong " 

515 "to it which would lead to only a partial uninstall.".format( 

516 dist.raw_name, 

517 ) 

518 ) 

519 

520 elif dist.installed_as_egg: 

521 # package installed by easy_install 

522 # We cannot match on dist.egg_name because it can slightly vary 

523 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg 

524 paths_to_remove.add(dist_location) 

525 easy_install_egg = os.path.split(dist_location)[1] 

526 easy_install_pth = os.path.join( 

527 os.path.dirname(dist_location), 

528 "easy-install.pth", 

529 ) 

530 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) 

531 

532 elif dist.installed_with_dist_info: 

533 for path in uninstallation_paths(dist): 

534 paths_to_remove.add(path) 

535 

536 elif develop_egg_link: 

537 # PEP 660 modern editable is handled in the ``.dist-info`` case 

538 # above, so this only covers the setuptools-style editable. 

539 with open(develop_egg_link) as fh: 

540 link_pointer = os.path.normcase(fh.readline().strip()) 

541 normalized_link_pointer = paths_to_remove._normalize_path_cached( 

542 link_pointer 

543 ) 

544 assert os.path.samefile( 

545 normalized_link_pointer, normalized_dist_location 

546 ), ( 

547 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " 

548 f"installed location of {dist.raw_name} (at {dist_location})" 

549 ) 

550 paths_to_remove.add(develop_egg_link) 

551 easy_install_pth = os.path.join( 

552 os.path.dirname(develop_egg_link), "easy-install.pth" 

553 ) 

554 paths_to_remove.add_pth(easy_install_pth, dist_location) 

555 

556 else: 

557 logger.debug( 

558 "Not sure how to uninstall: %s - Check: %s", 

559 dist, 

560 dist_location, 

561 ) 

562 

563 if dist.in_usersite: 

564 bin_dir = get_bin_user() 

565 else: 

566 bin_dir = get_bin_prefix() 

567 

568 # find distutils scripts= scripts 

569 try: 

570 for script in dist.iter_distutils_script_names(): 

571 paths_to_remove.add(os.path.join(bin_dir, script)) 

572 if WINDOWS: 

573 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) 

574 except (FileNotFoundError, NotADirectoryError): 

575 pass 

576 

577 # find console_scripts and gui_scripts 

578 def iter_scripts_to_remove( 

579 dist: BaseDistribution, 

580 bin_dir: str, 

581 ) -> Generator[str, None, None]: 

582 for entry_point in dist.iter_entry_points(): 

583 if entry_point.group == "console_scripts": 

584 yield from _script_names(bin_dir, entry_point.name, False) 

585 elif entry_point.group == "gui_scripts": 

586 yield from _script_names(bin_dir, entry_point.name, True) 

587 

588 for s in iter_scripts_to_remove(dist, bin_dir): 

589 paths_to_remove.add(s) 

590 

591 return paths_to_remove 

592 

593 

594class UninstallPthEntries: 

595 def __init__(self, pth_file: str) -> None: 

596 self.file = pth_file 

597 self.entries: Set[str] = set() 

598 self._saved_lines: Optional[List[bytes]] = None 

599 

600 def add(self, entry: str) -> None: 

601 entry = os.path.normcase(entry) 

602 # On Windows, os.path.normcase converts the entry to use 

603 # backslashes. This is correct for entries that describe absolute 

604 # paths outside of site-packages, but all the others use forward 

605 # slashes. 

606 # os.path.splitdrive is used instead of os.path.isabs because isabs 

607 # treats non-absolute paths with drive letter markings like c:foo\bar 

608 # as absolute paths. It also does not recognize UNC paths if they don't 

609 # have more than "\\sever\share". Valid examples: "\\server\share\" or 

610 # "\\server\share\folder". 

611 if WINDOWS and not os.path.splitdrive(entry)[0]: 

612 entry = entry.replace("\\", "/") 

613 self.entries.add(entry) 

614 

615 def remove(self) -> None: 

616 logger.verbose("Removing pth entries from %s:", self.file) 

617 

618 # If the file doesn't exist, log a warning and return 

619 if not os.path.isfile(self.file): 

620 logger.warning("Cannot remove entries from nonexistent file %s", self.file) 

621 return 

622 with open(self.file, "rb") as fh: 

623 # windows uses '\r\n' with py3k, but uses '\n' with py2.x 

624 lines = fh.readlines() 

625 self._saved_lines = lines 

626 if any(b"\r\n" in line for line in lines): 

627 endline = "\r\n" 

628 else: 

629 endline = "\n" 

630 # handle missing trailing newline 

631 if lines and not lines[-1].endswith(endline.encode("utf-8")): 

632 lines[-1] = lines[-1] + endline.encode("utf-8") 

633 for entry in self.entries: 

634 try: 

635 logger.verbose("Removing entry: %s", entry) 

636 lines.remove((entry + endline).encode("utf-8")) 

637 except ValueError: 

638 pass 

639 with open(self.file, "wb") as fh: 

640 fh.writelines(lines) 

641 

642 def rollback(self) -> bool: 

643 if self._saved_lines is None: 

644 logger.error("Cannot roll back changes to %s, none were made", self.file) 

645 return False 

646 logger.debug("Rolling %s back to previous state", self.file) 

647 with open(self.file, "wb") as fh: 

648 fh.writelines(self._saved_lines) 

649 return True