Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/req/req_uninstall.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

352 statements  

1from __future__ import annotations 

2 

3import functools 

4import os 

5import sys 

6import sysconfig 

7from collections.abc import Generator, Iterable 

8from importlib.util import cache_from_source 

9from typing import Any, Callable 

10 

11from pip._internal.exceptions import LegacyDistutilsInstall, UninstallMissingRecord 

12from pip._internal.locations import get_bin_prefix, get_bin_user 

13from pip._internal.metadata import BaseDistribution 

14from pip._internal.utils.compat import WINDOWS 

15from pip._internal.utils.egg_link import egg_link_path_from_location 

16from pip._internal.utils.logging import getLogger, indent_log 

17from pip._internal.utils.misc import ask, normalize_path, renames, rmtree 

18from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory 

19from pip._internal.utils.virtualenv import running_under_virtualenv 

20 

21logger = getLogger(__name__) 

22 

23 

24def _script_names( 

25 bin_dir: str, script_name: str, is_gui: bool 

26) -> Generator[str, None, None]: 

27 """Create the fully qualified name of the files created by 

28 {console,gui}_scripts for the given ``dist``. 

29 Returns the list of file names 

30 """ 

31 exe_name = os.path.join(bin_dir, script_name) 

32 yield exe_name 

33 if not WINDOWS: 

34 return 

35 yield f"{exe_name}.exe" 

36 yield f"{exe_name}.exe.manifest" 

37 if is_gui: 

38 yield f"{exe_name}-script.pyw" 

39 else: 

40 yield f"{exe_name}-script.py" 

41 

42 

43def _unique( 

44 fn: Callable[..., Generator[Any, None, None]], 

45) -> Callable[..., Generator[Any, None, None]]: 

46 @functools.wraps(fn) 

47 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: 

48 seen: set[Any] = set() 

49 for item in fn(*args, **kw): 

50 if item not in seen: 

51 seen.add(item) 

52 yield item 

53 

54 return unique 

55 

56 

57@_unique 

58def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: 

59 """ 

60 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] 

61 

62 Yield paths to all the files in RECORD. For each .py file in RECORD, add 

63 the .pyc and .pyo in the same directory. 

64 

65 UninstallPathSet.add() takes care of the __pycache__ .py[co]. 

66 

67 If RECORD is not found, raises an error, 

68 with possible information from the INSTALLER file. 

69 

70 https://packaging.python.org/specifications/recording-installed-packages/ 

71 """ 

72 location = dist.location 

73 assert location is not None, "not installed" 

74 

75 entries = dist.iter_declared_entries() 

76 if entries is None: 

77 raise UninstallMissingRecord(distribution=dist) 

78 

79 for entry in entries: 

80 path = os.path.join(location, entry) 

81 yield path 

82 if path.endswith(".py"): 

83 dn, fn = os.path.split(path) 

84 base = fn[:-3] 

85 path = os.path.join(dn, base + ".pyc") 

86 yield path 

87 path = os.path.join(dn, base + ".pyo") 

88 yield path 

89 

90 

91def compact(paths: Iterable[str]) -> set[str]: 

92 """Compact a path set to contain the minimal number of paths 

93 necessary to contain all paths in the set. If /a/path/ and 

94 /a/path/to/a/file.txt are both in the set, leave only the 

95 shorter path.""" 

96 

97 sep = os.path.sep 

98 short_paths: set[str] = set() 

99 for path in sorted(paths, key=len): 

100 should_skip = any( 

101 path.startswith(shortpath.rstrip("*")) 

102 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep 

103 for shortpath in short_paths 

104 ) 

105 if not should_skip: 

106 short_paths.add(path) 

107 return short_paths 

108 

109 

110def compress_for_rename(paths: Iterable[str]) -> set[str]: 

111 """Returns a set containing the paths that need to be renamed. 

112 

113 This set may include directories when the original sequence of paths 

114 included every file on disk. 

115 """ 

116 case_map = {os.path.normcase(p): p for p in paths} 

117 remaining = set(case_map) 

118 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) 

119 wildcards: set[str] = set() 

120 

121 def norm_join(*a: str) -> str: 

122 return os.path.normcase(os.path.join(*a)) 

123 

124 for root in unchecked: 

125 if any(os.path.normcase(root).startswith(w) for w in wildcards): 

126 # This directory has already been handled. 

127 continue 

128 

129 all_files: set[str] = set() 

130 all_subdirs: set[str] = set() 

131 for dirname, subdirs, files in os.walk(root): 

132 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) 

133 all_files.update(norm_join(root, dirname, f) for f in files) 

134 # If all the files we found are in our remaining set of files to 

135 # remove, then remove them from the latter set and add a wildcard 

136 # for the directory. 

137 if not (all_files - remaining): 

138 remaining.difference_update(all_files) 

139 wildcards.add(root + os.sep) 

140 

141 return set(map(case_map.__getitem__, remaining)) | wildcards 

142 

143 

144def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str]]: 

145 """Returns a tuple of 2 sets of which paths to display to user 

146 

147 The first set contains paths that would be deleted. Files of a package 

148 are not added and the top-level directory of the package has a '*' added 

149 at the end - to signify that all it's contents are removed. 

150 

151 The second set contains files that would have been skipped in the above 

152 folders. 

153 """ 

154 

155 will_remove = set(paths) 

156 will_skip = set() 

157 

158 # Determine folders and files 

159 folders = set() 

160 files = set() 

161 for path in will_remove: 

162 if path.endswith(".pyc"): 

163 continue 

164 if path.endswith("__init__.py") or ".dist-info" in path: 

165 folders.add(os.path.dirname(path)) 

166 files.add(path) 

167 

168 _normcased_files = set(map(os.path.normcase, files)) 

169 

170 folders = compact(folders) 

171 

172 # This walks the tree using os.walk to not miss extra folders 

173 # that might get added. 

174 for folder in folders: 

175 for dirpath, _, dirfiles in os.walk(folder): 

176 for fname in dirfiles: 

177 if fname.endswith(".pyc"): 

178 continue 

179 

180 file_ = os.path.join(dirpath, fname) 

181 if ( 

182 os.path.isfile(file_) 

183 and os.path.normcase(file_) not in _normcased_files 

184 ): 

185 # We are skipping this file. Add it to the set. 

186 will_skip.add(file_) 

187 

188 will_remove = files | {os.path.join(folder, "*") for folder in folders} 

189 

190 return will_remove, will_skip 

191 

192 

193class StashedUninstallPathSet: 

194 """A set of file rename operations to stash files while 

195 tentatively uninstalling them.""" 

196 

197 def __init__(self) -> None: 

198 # Mapping from source file root to [Adjacent]TempDirectory 

199 # for files under that directory. 

200 self._save_dirs: dict[str, TempDirectory] = {} 

201 # (old path, new path) tuples for each move that may need 

202 # to be undone. 

203 self._moves: list[tuple[str, str]] = [] 

204 

205 def _get_directory_stash(self, path: str) -> str: 

206 """Stashes a directory. 

207 

208 Directories are stashed adjacent to their original location if 

209 possible, or else moved/copied into the user's temp dir.""" 

210 

211 try: 

212 save_dir: TempDirectory = AdjacentTempDirectory(path) 

213 except OSError: 

214 save_dir = TempDirectory(kind="uninstall") 

215 self._save_dirs[os.path.normcase(path)] = save_dir 

216 

217 return save_dir.path 

218 

219 def _get_file_stash(self, path: str) -> str: 

220 """Stashes a file. 

221 

222 If no root has been provided, one will be created for the directory 

223 in the user's temp directory.""" 

224 path = os.path.normcase(path) 

225 head, old_head = os.path.dirname(path), None 

226 save_dir = None 

227 

228 while head != old_head: 

229 try: 

230 save_dir = self._save_dirs[head] 

231 break 

232 except KeyError: 

233 pass 

234 head, old_head = os.path.dirname(head), head 

235 else: 

236 # Did not find any suitable root 

237 head = os.path.dirname(path) 

238 save_dir = TempDirectory(kind="uninstall") 

239 self._save_dirs[head] = save_dir 

240 

241 relpath = os.path.relpath(path, head) 

242 if relpath and relpath != os.path.curdir: 

243 return os.path.join(save_dir.path, relpath) 

244 return save_dir.path 

245 

246 def stash(self, path: str) -> str: 

247 """Stashes the directory or file and returns its new location. 

248 Handle symlinks as files to avoid modifying the symlink targets. 

249 """ 

250 path_is_dir = os.path.isdir(path) and not os.path.islink(path) 

251 if path_is_dir: 

252 new_path = self._get_directory_stash(path) 

253 else: 

254 new_path = self._get_file_stash(path) 

255 

256 self._moves.append((path, new_path)) 

257 if path_is_dir and os.path.isdir(new_path): 

258 # If we're moving a directory, we need to 

259 # remove the destination first or else it will be 

260 # moved to inside the existing directory. 

261 # We just created new_path ourselves, so it will 

262 # be removable. 

263 os.rmdir(new_path) 

264 renames(path, new_path) 

265 return new_path 

266 

267 def commit(self) -> None: 

268 """Commits the uninstall by removing stashed files.""" 

269 for save_dir in self._save_dirs.values(): 

270 save_dir.cleanup() 

271 self._moves = [] 

272 self._save_dirs = {} 

273 

274 def rollback(self) -> None: 

275 """Undoes the uninstall by moving stashed files back.""" 

276 for p in self._moves: 

277 logger.info("Moving to %s\n from %s", *p) 

278 

279 for new_path, path in self._moves: 

280 try: 

281 logger.debug("Replacing %s from %s", new_path, path) 

282 if os.path.isfile(new_path) or os.path.islink(new_path): 

283 os.unlink(new_path) 

284 elif os.path.isdir(new_path): 

285 rmtree(new_path) 

286 renames(path, new_path) 

287 except OSError as ex: 

288 logger.error("Failed to restore %s", new_path) 

289 logger.debug("Exception: %s", ex) 

290 

291 self.commit() 

292 

293 @property 

294 def can_rollback(self) -> bool: 

295 return bool(self._moves) 

296 

297 

298class UninstallPathSet: 

299 """A set of file paths to be removed in the uninstallation of a 

300 requirement.""" 

301 

302 def __init__(self, dist: BaseDistribution) -> None: 

303 self._paths: set[str] = set() 

304 self._refuse: set[str] = set() 

305 self._pth: dict[str, UninstallPthEntries] = {} 

306 self._dist = dist 

307 self._moved_paths = StashedUninstallPathSet() 

308 # Create local cache of normalize_path results. Creating an UninstallPathSet 

309 # can result in hundreds/thousands of redundant calls to normalize_path with 

310 # the same args, which hurts performance. 

311 self._normalize_path_cached = functools.lru_cache(normalize_path) 

312 

313 def _permitted(self, path: str) -> bool: 

314 """ 

315 Return True if the given path is one we are permitted to 

316 remove/modify, False otherwise. 

317 

318 """ 

319 # aka is_local, but caching normalized sys.prefix 

320 if not running_under_virtualenv(): 

321 return True 

322 return path.startswith(self._normalize_path_cached(sys.prefix)) 

323 

324 def add(self, path: str) -> None: 

325 head, tail = os.path.split(path) 

326 

327 # we normalize the head to resolve parent directory symlinks, but not 

328 # the tail, since we only want to uninstall symlinks, not their targets 

329 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) 

330 

331 if not os.path.exists(path): 

332 return 

333 if self._permitted(path): 

334 self._paths.add(path) 

335 else: 

336 self._refuse.add(path) 

337 

338 # __pycache__ files can show up after 'installed-files.txt' is created, 

339 # due to imports 

340 if os.path.splitext(path)[1] == ".py": 

341 self.add(cache_from_source(path)) 

342 

343 def add_pth(self, pth_file: str, entry: str) -> None: 

344 pth_file = self._normalize_path_cached(pth_file) 

345 if self._permitted(pth_file): 

346 if pth_file not in self._pth: 

347 self._pth[pth_file] = UninstallPthEntries(pth_file) 

348 self._pth[pth_file].add(entry) 

349 else: 

350 self._refuse.add(pth_file) 

351 

352 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: 

353 """Remove paths in ``self._paths`` with confirmation (unless 

354 ``auto_confirm`` is True).""" 

355 

356 if not self._paths: 

357 logger.info( 

358 "Can't uninstall '%s'. No files were found to uninstall.", 

359 self._dist.raw_name, 

360 ) 

361 return 

362 

363 dist_name_version = f"{self._dist.raw_name}-{self._dist.raw_version}" 

364 logger.info("Uninstalling %s:", dist_name_version) 

365 

366 with indent_log(): 

367 if auto_confirm or self._allowed_to_proceed(verbose): 

368 moved = self._moved_paths 

369 

370 for_rename = compress_for_rename(self._paths) 

371 

372 for path in sorted(compact(for_rename)): 

373 moved.stash(path) 

374 logger.verbose("Removing file or directory %s", path) 

375 

376 for pth in self._pth.values(): 

377 pth.remove() 

378 

379 logger.info("Successfully uninstalled %s", dist_name_version) 

380 

381 def _allowed_to_proceed(self, verbose: bool) -> bool: 

382 """Display which files would be deleted and prompt for confirmation""" 

383 

384 def _display(msg: str, paths: Iterable[str]) -> None: 

385 if not paths: 

386 return 

387 

388 logger.info(msg) 

389 with indent_log(): 

390 for path in sorted(compact(paths)): 

391 logger.info(path) 

392 

393 if not verbose: 

394 will_remove, will_skip = compress_for_output_listing(self._paths) 

395 else: 

396 # In verbose mode, display all the files that are going to be 

397 # deleted. 

398 will_remove = set(self._paths) 

399 will_skip = set() 

400 

401 _display("Would remove:", will_remove) 

402 _display("Would not remove (might be manually added):", will_skip) 

403 _display("Would not remove (outside of prefix):", self._refuse) 

404 if verbose: 

405 _display("Will actually move:", compress_for_rename(self._paths)) 

406 

407 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" 

408 

409 def rollback(self) -> None: 

410 """Rollback the changes previously made by remove().""" 

411 if not self._moved_paths.can_rollback: 

412 logger.error( 

413 "Can't roll back %s; was not uninstalled", 

414 self._dist.raw_name, 

415 ) 

416 return 

417 logger.info("Rolling back uninstall of %s", self._dist.raw_name) 

418 self._moved_paths.rollback() 

419 for pth in self._pth.values(): 

420 pth.rollback() 

421 

422 def commit(self) -> None: 

423 """Remove temporary save dir: rollback will no longer be possible.""" 

424 self._moved_paths.commit() 

425 

426 @classmethod 

427 def from_dist(cls, dist: BaseDistribution) -> UninstallPathSet: 

428 dist_location = dist.location 

429 info_location = dist.info_location 

430 if dist_location is None: 

431 logger.info( 

432 "Not uninstalling %s since it is not installed", 

433 dist.canonical_name, 

434 ) 

435 return cls(dist) 

436 

437 normalized_dist_location = normalize_path(dist_location) 

438 if not dist.local: 

439 logger.info( 

440 "Not uninstalling %s at %s, outside environment %s", 

441 dist.canonical_name, 

442 normalized_dist_location, 

443 sys.prefix, 

444 ) 

445 return cls(dist) 

446 

447 if normalized_dist_location in { 

448 p 

449 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} 

450 if p 

451 }: 

452 logger.info( 

453 "Not uninstalling %s at %s, as it is in the standard library.", 

454 dist.canonical_name, 

455 normalized_dist_location, 

456 ) 

457 return cls(dist) 

458 

459 paths_to_remove = cls(dist) 

460 develop_egg_link = egg_link_path_from_location(dist.raw_name) 

461 

462 # Distribution is installed with metadata in a "flat" .egg-info 

463 # directory. This means it is not a modern .dist-info installation, an 

464 # egg, or legacy editable. 

465 setuptools_flat_installation = ( 

466 dist.installed_with_setuptools_egg_info 

467 and info_location is not None 

468 and os.path.exists(info_location) 

469 # If dist is editable and the location points to a ``.egg-info``, 

470 # we are in fact in the legacy editable case. 

471 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") 

472 ) 

473 

474 # Uninstall cases order do matter as in the case of 2 installs of the 

475 # same package, pip needs to uninstall the currently detected version 

476 if setuptools_flat_installation: 

477 if info_location is not None: 

478 paths_to_remove.add(info_location) 

479 installed_files = dist.iter_declared_entries() 

480 if installed_files is not None: 

481 for installed_file in installed_files: 

482 paths_to_remove.add(os.path.join(dist_location, installed_file)) 

483 # FIXME: need a test for this elif block 

484 # occurs with --single-version-externally-managed/--record outside 

485 # of pip 

486 elif dist.is_file("top_level.txt"): 

487 try: 

488 namespace_packages = dist.read_text("namespace_packages.txt") 

489 except FileNotFoundError: 

490 namespaces = [] 

491 else: 

492 namespaces = namespace_packages.splitlines(keepends=False) 

493 for top_level_pkg in [ 

494 p 

495 for p in dist.read_text("top_level.txt").splitlines() 

496 if p and p not in namespaces 

497 ]: 

498 path = os.path.join(dist_location, top_level_pkg) 

499 paths_to_remove.add(path) 

500 paths_to_remove.add(f"{path}.py") 

501 paths_to_remove.add(f"{path}.pyc") 

502 paths_to_remove.add(f"{path}.pyo") 

503 

504 elif dist.installed_by_distutils: 

505 raise LegacyDistutilsInstall(distribution=dist) 

506 

507 elif dist.installed_as_egg: 

508 # package installed by easy_install 

509 # We cannot match on dist.egg_name because it can slightly vary 

510 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg 

511 # XXX We use normalized_dist_location because dist_location my contain 

512 # a trailing / if the distribution is a zipped egg 

513 # (which is not a directory). 

514 paths_to_remove.add(normalized_dist_location) 

515 easy_install_egg = os.path.split(normalized_dist_location)[1] 

516 easy_install_pth = os.path.join( 

517 os.path.dirname(normalized_dist_location), 

518 "easy-install.pth", 

519 ) 

520 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) 

521 

522 elif dist.installed_with_dist_info: 

523 for path in uninstallation_paths(dist): 

524 paths_to_remove.add(path) 

525 

526 elif develop_egg_link: 

527 # PEP 660 modern editable is handled in the ``.dist-info`` case 

528 # above, so this only covers the setuptools-style editable. 

529 with open(develop_egg_link) as fh: 

530 link_pointer = os.path.normcase(fh.readline().strip()) 

531 normalized_link_pointer = paths_to_remove._normalize_path_cached( 

532 link_pointer 

533 ) 

534 assert os.path.samefile( 

535 normalized_link_pointer, normalized_dist_location 

536 ), ( 

537 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " 

538 f"installed location of {dist.raw_name} (at {dist_location})" 

539 ) 

540 paths_to_remove.add(develop_egg_link) 

541 easy_install_pth = os.path.join( 

542 os.path.dirname(develop_egg_link), "easy-install.pth" 

543 ) 

544 paths_to_remove.add_pth(easy_install_pth, dist_location) 

545 

546 else: 

547 logger.debug( 

548 "Not sure how to uninstall: %s - Check: %s", 

549 dist, 

550 dist_location, 

551 ) 

552 

553 if dist.in_usersite: 

554 bin_dir = get_bin_user() 

555 else: 

556 bin_dir = get_bin_prefix() 

557 

558 # find distutils scripts= scripts 

559 try: 

560 for script in dist.iter_distutils_script_names(): 

561 paths_to_remove.add(os.path.join(bin_dir, script)) 

562 if WINDOWS: 

563 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) 

564 except (FileNotFoundError, NotADirectoryError): 

565 pass 

566 

567 # find console_scripts and gui_scripts 

568 def iter_scripts_to_remove( 

569 dist: BaseDistribution, 

570 bin_dir: str, 

571 ) -> Generator[str, None, None]: 

572 for entry_point in dist.iter_entry_points(): 

573 if entry_point.group == "console_scripts": 

574 yield from _script_names(bin_dir, entry_point.name, False) 

575 elif entry_point.group == "gui_scripts": 

576 yield from _script_names(bin_dir, entry_point.name, True) 

577 

578 for s in iter_scripts_to_remove(dist, bin_dir): 

579 paths_to_remove.add(s) 

580 

581 return paths_to_remove 

582 

583 

584class UninstallPthEntries: 

585 def __init__(self, pth_file: str) -> None: 

586 self.file = pth_file 

587 self.entries: set[str] = set() 

588 self._saved_lines: list[bytes] | None = None 

589 

590 def add(self, entry: str) -> None: 

591 entry = os.path.normcase(entry) 

592 # On Windows, os.path.normcase converts the entry to use 

593 # backslashes. This is correct for entries that describe absolute 

594 # paths outside of site-packages, but all the others use forward 

595 # slashes. 

596 # os.path.splitdrive is used instead of os.path.isabs because isabs 

597 # treats non-absolute paths with drive letter markings like c:foo\bar 

598 # as absolute paths. It also does not recognize UNC paths if they don't 

599 # have more than "\\sever\share". Valid examples: "\\server\share\" or 

600 # "\\server\share\folder". 

601 if WINDOWS and not os.path.splitdrive(entry)[0]: 

602 entry = entry.replace("\\", "/") 

603 self.entries.add(entry) 

604 

605 def remove(self) -> None: 

606 logger.verbose("Removing pth entries from %s:", self.file) 

607 

608 # If the file doesn't exist, log a warning and return 

609 if not os.path.isfile(self.file): 

610 logger.warning("Cannot remove entries from nonexistent file %s", self.file) 

611 return 

612 with open(self.file, "rb") as fh: 

613 # windows uses '\r\n' with py3k, but uses '\n' with py2.x 

614 lines = fh.readlines() 

615 self._saved_lines = lines 

616 if any(b"\r\n" in line for line in lines): 

617 endline = "\r\n" 

618 else: 

619 endline = "\n" 

620 # handle missing trailing newline 

621 if lines and not lines[-1].endswith(endline.encode("utf-8")): 

622 lines[-1] = lines[-1] + endline.encode("utf-8") 

623 for entry in self.entries: 

624 try: 

625 logger.verbose("Removing entry: %s", entry) 

626 lines.remove((entry + endline).encode("utf-8")) 

627 except ValueError: 

628 pass 

629 with open(self.file, "wb") as fh: 

630 fh.writelines(lines) 

631 

632 def rollback(self) -> bool: 

633 if self._saved_lines is None: 

634 logger.error("Cannot roll back changes to %s, none were made", self.file) 

635 return False 

636 logger.debug("Rolling %s back to previous state", self.file) 

637 with open(self.file, "wb") as fh: 

638 fh.writelines(self._saved_lines) 

639 return True