Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/req/req_uninstall.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

352 statements  

1from __future__ import annotations 

2 

3import functools 

4import os 

5import sys 

6import sysconfig 

7from collections.abc import Generator, Iterable 

8from typing import Any, Callable 

9 

10from pip._internal.exceptions import LegacyDistutilsInstall, UninstallMissingRecord 

11from pip._internal.locations import get_bin_prefix, get_bin_user 

12from pip._internal.metadata import BaseDistribution 

13from pip._internal.utils.compat import WINDOWS 

14from pip._internal.utils.egg_link import egg_link_path_from_location 

15from pip._internal.utils.logging import getLogger, indent_log 

16from pip._internal.utils.misc import ask, normalize_path, renames, rmtree 

17from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory 

18from pip._internal.utils.virtualenv import running_under_virtualenv 

19 

20logger = getLogger(__name__) 

21 

22 

23def _script_names( 

24 bin_dir: str, script_name: str, is_gui: bool 

25) -> Generator[str, None, None]: 

26 """Create the fully qualified name of the files created by 

27 {console,gui}_scripts for the given ``dist``. 

28 Returns the list of file names 

29 """ 

30 exe_name = os.path.join(bin_dir, script_name) 

31 yield exe_name 

32 if not WINDOWS: 

33 return 

34 yield f"{exe_name}.exe" 

35 yield f"{exe_name}.exe.manifest" 

36 if is_gui: 

37 yield f"{exe_name}-script.pyw" 

38 else: 

39 yield f"{exe_name}-script.py" 

40 

41 

42def _unique( 

43 fn: Callable[..., Generator[Any, None, None]], 

44) -> Callable[..., Generator[Any, None, None]]: 

45 @functools.wraps(fn) 

46 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]: 

47 seen: set[Any] = set() 

48 for item in fn(*args, **kw): 

49 if item not in seen: 

50 seen.add(item) 

51 yield item 

52 

53 return unique 

54 

55 

56@_unique 

57def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]: 

58 """ 

59 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] 

60 

61 Yield paths to all the files in RECORD. For each .py file in RECORD, add 

62 the .pyc and .pyo in the same directory. 

63 

64 UninstallPathSet.add() takes care of the __pycache__ .py[co]. 

65 

66 If RECORD is not found, raises an error, 

67 with possible information from the INSTALLER file. 

68 

69 https://packaging.python.org/specifications/recording-installed-packages/ 

70 """ 

71 location = dist.location 

72 assert location is not None, "not installed" 

73 

74 entries = dist.iter_declared_entries() 

75 if entries is None: 

76 raise UninstallMissingRecord(distribution=dist) 

77 

78 for entry in entries: 

79 path = os.path.join(location, entry) 

80 yield path 

81 if path.endswith(".py"): 

82 dn, fn = os.path.split(path) 

83 base = fn[:-3] 

84 path = os.path.join(dn, base + ".pyc") 

85 yield path 

86 path = os.path.join(dn, base + ".pyo") 

87 yield path 

88 

89 

90def compact(paths: Iterable[str]) -> set[str]: 

91 """Compact a path set to contain the minimal number of paths 

92 necessary to contain all paths in the set. If /a/path/ and 

93 /a/path/to/a/file.txt are both in the set, leave only the 

94 shorter path.""" 

95 

96 sep = os.path.sep 

97 short_paths: set[str] = set() 

98 for path in sorted(paths, key=len): 

99 should_skip = any( 

100 path.startswith(shortpath.rstrip("*")) 

101 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep 

102 for shortpath in short_paths 

103 ) 

104 if not should_skip: 

105 short_paths.add(path) 

106 return short_paths 

107 

108 

109def compress_for_rename(paths: Iterable[str]) -> set[str]: 

110 """Returns a set containing the paths that need to be renamed. 

111 

112 This set may include directories when the original sequence of paths 

113 included every file on disk. 

114 """ 

115 case_map = {os.path.normcase(p): p for p in paths} 

116 remaining = set(case_map) 

117 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len) 

118 wildcards: set[str] = set() 

119 

120 def norm_join(*a: str) -> str: 

121 return os.path.normcase(os.path.join(*a)) 

122 

123 for root in unchecked: 

124 if any(os.path.normcase(root).startswith(w) for w in wildcards): 

125 # This directory has already been handled. 

126 continue 

127 

128 all_files: set[str] = set() 

129 all_subdirs: set[str] = set() 

130 for dirname, subdirs, files in os.walk(root): 

131 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs) 

132 all_files.update(norm_join(root, dirname, f) for f in files) 

133 # If all the files we found are in our remaining set of files to 

134 # remove, then remove them from the latter set and add a wildcard 

135 # for the directory. 

136 if not (all_files - remaining): 

137 remaining.difference_update(all_files) 

138 wildcards.add(root + os.sep) 

139 

140 return set(map(case_map.__getitem__, remaining)) | wildcards 

141 

142 

143def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str]]: 

144 """Returns a tuple of 2 sets of which paths to display to user 

145 

146 The first set contains paths that would be deleted. Files of a package 

147 are not added and the top-level directory of the package has a '*' added 

148 at the end - to signify that all it's contents are removed. 

149 

150 The second set contains files that would have been skipped in the above 

151 folders. 

152 """ 

153 

154 will_remove = set(paths) 

155 will_skip = set() 

156 

157 # Determine folders and files 

158 folders = set() 

159 files = set() 

160 for path in will_remove: 

161 if path.endswith(".pyc"): 

162 continue 

163 if path.endswith("__init__.py") or ".dist-info" in path: 

164 folders.add(os.path.dirname(path)) 

165 files.add(path) 

166 

167 _normcased_files = set(map(os.path.normcase, files)) 

168 

169 folders = compact(folders) 

170 

171 # This walks the tree using os.walk to not miss extra folders 

172 # that might get added. 

173 for folder in folders: 

174 for dirpath, _, dirfiles in os.walk(folder): 

175 for fname in dirfiles: 

176 if fname.endswith(".pyc"): 

177 continue 

178 

179 file_ = os.path.join(dirpath, fname) 

180 if ( 

181 os.path.isfile(file_) 

182 and os.path.normcase(file_) not in _normcased_files 

183 ): 

184 # We are skipping this file. Add it to the set. 

185 will_skip.add(file_) 

186 

187 will_remove = files | {os.path.join(folder, "*") for folder in folders} 

188 

189 return will_remove, will_skip 

190 

191 

192class StashedUninstallPathSet: 

193 """A set of file rename operations to stash files while 

194 tentatively uninstalling them.""" 

195 

196 def __init__(self) -> None: 

197 # Mapping from source file root to [Adjacent]TempDirectory 

198 # for files under that directory. 

199 self._save_dirs: dict[str, TempDirectory] = {} 

200 # (old path, new path) tuples for each move that may need 

201 # to be undone. 

202 self._moves: list[tuple[str, str]] = [] 

203 

204 def _get_directory_stash(self, path: str) -> str: 

205 """Stashes a directory. 

206 

207 Directories are stashed adjacent to their original location if 

208 possible, or else moved/copied into the user's temp dir.""" 

209 

210 try: 

211 save_dir: TempDirectory = AdjacentTempDirectory(path) 

212 except OSError: 

213 save_dir = TempDirectory(kind="uninstall") 

214 self._save_dirs[os.path.normcase(path)] = save_dir 

215 

216 return save_dir.path 

217 

218 def _get_file_stash(self, path: str) -> str: 

219 """Stashes a file. 

220 

221 If no root has been provided, one will be created for the directory 

222 in the user's temp directory.""" 

223 path = os.path.normcase(path) 

224 head, old_head = os.path.dirname(path), None 

225 save_dir = None 

226 

227 while head != old_head: 

228 try: 

229 save_dir = self._save_dirs[head] 

230 break 

231 except KeyError: 

232 pass 

233 head, old_head = os.path.dirname(head), head 

234 else: 

235 # Did not find any suitable root 

236 head = os.path.dirname(path) 

237 save_dir = TempDirectory(kind="uninstall") 

238 self._save_dirs[head] = save_dir 

239 

240 relpath = os.path.relpath(path, head) 

241 if relpath and relpath != os.path.curdir: 

242 return os.path.join(save_dir.path, relpath) 

243 return save_dir.path 

244 

245 def stash(self, path: str) -> str: 

246 """Stashes the directory or file and returns its new location. 

247 Handle symlinks as files to avoid modifying the symlink targets. 

248 """ 

249 path_is_dir = os.path.isdir(path) and not os.path.islink(path) 

250 if path_is_dir: 

251 new_path = self._get_directory_stash(path) 

252 else: 

253 new_path = self._get_file_stash(path) 

254 

255 self._moves.append((path, new_path)) 

256 if path_is_dir and os.path.isdir(new_path): 

257 # If we're moving a directory, we need to 

258 # remove the destination first or else it will be 

259 # moved to inside the existing directory. 

260 # We just created new_path ourselves, so it will 

261 # be removable. 

262 os.rmdir(new_path) 

263 renames(path, new_path) 

264 return new_path 

265 

266 def commit(self) -> None: 

267 """Commits the uninstall by removing stashed files.""" 

268 for save_dir in self._save_dirs.values(): 

269 save_dir.cleanup() 

270 self._moves = [] 

271 self._save_dirs = {} 

272 

273 def rollback(self) -> None: 

274 """Undoes the uninstall by moving stashed files back.""" 

275 for p in self._moves: 

276 logger.info("Moving to %s\n from %s", *p) 

277 

278 for new_path, path in self._moves: 

279 try: 

280 logger.debug("Replacing %s from %s", new_path, path) 

281 if os.path.isfile(new_path) or os.path.islink(new_path): 

282 os.unlink(new_path) 

283 elif os.path.isdir(new_path): 

284 rmtree(new_path) 

285 renames(path, new_path) 

286 except OSError as ex: 

287 logger.error("Failed to restore %s", new_path) 

288 logger.debug("Exception: %s", ex) 

289 

290 self.commit() 

291 

292 @property 

293 def can_rollback(self) -> bool: 

294 return bool(self._moves) 

295 

296 

297class UninstallPathSet: 

298 """A set of file paths to be removed in the uninstallation of a 

299 requirement.""" 

300 

301 def __init__(self, dist: BaseDistribution) -> None: 

302 self._paths: set[str] = set() 

303 self._refuse: set[str] = set() 

304 self._pth: dict[str, UninstallPthEntries] = {} 

305 self._dist = dist 

306 self._moved_paths = StashedUninstallPathSet() 

307 # Create local cache of normalize_path results. Creating an UninstallPathSet 

308 # can result in hundreds/thousands of redundant calls to normalize_path with 

309 # the same args, which hurts performance. 

310 self._normalize_path_cached = functools.lru_cache(normalize_path) 

311 

312 def _permitted(self, path: str) -> bool: 

313 """ 

314 Return True if the given path is one we are permitted to 

315 remove/modify, False otherwise. 

316 

317 """ 

318 # aka is_local, but caching normalized sys.prefix 

319 if not running_under_virtualenv(): 

320 return True 

321 return path.startswith(self._normalize_path_cached(sys.prefix)) 

322 

323 def add(self, path: str) -> None: 

324 head, tail = os.path.split(path) 

325 

326 # we normalize the head to resolve parent directory symlinks, but not 

327 # the tail, since we only want to uninstall symlinks, not their targets 

328 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail)) 

329 

330 if not os.path.exists(path): 

331 return 

332 if self._permitted(path): 

333 self._paths.add(path) 

334 else: 

335 self._refuse.add(path) 

336 

337 # __pycache__ files can show up after 'installed-files.txt' is created, 

338 # due to imports 

339 # Add the adjacent __pycache__ directory to the UninstallPathSet when a 

340 # .py file is removed. We do this to avoid the risk of orphaned .pyc 

341 # files created by a different interpreter version than the one running 

342 # pip at the time of package installation and uninstallation or an 

343 # interpreter run at a different optimization level (PYTHONOPTIMIZE). 

344 if os.path.splitext(path)[1] == ".py": 

345 pycache = os.path.join(os.path.dirname(path), "__pycache__") 

346 self.add(pycache) 

347 

348 def add_pth(self, pth_file: str, entry: str) -> None: 

349 pth_file = self._normalize_path_cached(pth_file) 

350 if self._permitted(pth_file): 

351 if pth_file not in self._pth: 

352 self._pth[pth_file] = UninstallPthEntries(pth_file) 

353 self._pth[pth_file].add(entry) 

354 else: 

355 self._refuse.add(pth_file) 

356 

357 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None: 

358 """Remove paths in ``self._paths`` with confirmation (unless 

359 ``auto_confirm`` is True).""" 

360 

361 if not self._paths: 

362 logger.info( 

363 "Can't uninstall '%s'. No files were found to uninstall.", 

364 self._dist.raw_name, 

365 ) 

366 return 

367 

368 dist_name_version = f"{self._dist.raw_name}-{self._dist.raw_version}" 

369 logger.info("Uninstalling %s:", dist_name_version) 

370 

371 with indent_log(): 

372 if auto_confirm or self._allowed_to_proceed(verbose): 

373 moved = self._moved_paths 

374 

375 for_rename = compress_for_rename(self._paths) 

376 

377 for path in sorted(compact(for_rename)): 

378 moved.stash(path) 

379 logger.verbose("Removing file or directory %s", path) 

380 

381 for pth in self._pth.values(): 

382 pth.remove() 

383 

384 logger.info("Successfully uninstalled %s", dist_name_version) 

385 

386 def _allowed_to_proceed(self, verbose: bool) -> bool: 

387 """Display which files would be deleted and prompt for confirmation""" 

388 

389 def _display(msg: str, paths: Iterable[str]) -> None: 

390 if not paths: 

391 return 

392 

393 logger.info(msg) 

394 with indent_log(): 

395 for path in sorted(compact(paths)): 

396 logger.info(path) 

397 

398 if not verbose: 

399 will_remove, will_skip = compress_for_output_listing(self._paths) 

400 else: 

401 # In verbose mode, display all the files that are going to be 

402 # deleted. 

403 will_remove = set(self._paths) 

404 will_skip = set() 

405 

406 _display("Would remove:", will_remove) 

407 _display("Would not remove (might be manually added):", will_skip) 

408 _display("Would not remove (outside of prefix):", self._refuse) 

409 if verbose: 

410 _display("Will actually move:", compress_for_rename(self._paths)) 

411 

412 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n" 

413 

414 def rollback(self) -> None: 

415 """Rollback the changes previously made by remove().""" 

416 if not self._moved_paths.can_rollback: 

417 logger.error( 

418 "Can't roll back %s; was not uninstalled", 

419 self._dist.raw_name, 

420 ) 

421 return 

422 logger.info("Rolling back uninstall of %s", self._dist.raw_name) 

423 self._moved_paths.rollback() 

424 for pth in self._pth.values(): 

425 pth.rollback() 

426 

427 def commit(self) -> None: 

428 """Remove temporary save dir: rollback will no longer be possible.""" 

429 self._moved_paths.commit() 

430 

431 @classmethod 

432 def from_dist(cls, dist: BaseDistribution) -> UninstallPathSet: 

433 dist_location = dist.location 

434 info_location = dist.info_location 

435 if dist_location is None: 

436 logger.info( 

437 "Not uninstalling %s since it is not installed", 

438 dist.canonical_name, 

439 ) 

440 return cls(dist) 

441 

442 normalized_dist_location = normalize_path(dist_location) 

443 if not dist.local: 

444 logger.info( 

445 "Not uninstalling %s at %s, outside environment %s", 

446 dist.canonical_name, 

447 normalized_dist_location, 

448 sys.prefix, 

449 ) 

450 return cls(dist) 

451 

452 if normalized_dist_location in { 

453 p 

454 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")} 

455 if p 

456 }: 

457 logger.info( 

458 "Not uninstalling %s at %s, as it is in the standard library.", 

459 dist.canonical_name, 

460 normalized_dist_location, 

461 ) 

462 return cls(dist) 

463 

464 paths_to_remove = cls(dist) 

465 develop_egg_link = egg_link_path_from_location(dist.raw_name) 

466 

467 # Distribution is installed with metadata in a "flat" .egg-info 

468 # directory. This means it is not a modern .dist-info installation, an 

469 # egg, or legacy editable. 

470 setuptools_flat_installation = ( 

471 dist.installed_with_setuptools_egg_info 

472 and info_location is not None 

473 and os.path.exists(info_location) 

474 # If dist is editable and the location points to a ``.egg-info``, 

475 # we are in fact in the legacy editable case. 

476 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info") 

477 ) 

478 

479 # Uninstall cases order do matter as in the case of 2 installs of the 

480 # same package, pip needs to uninstall the currently detected version 

481 if setuptools_flat_installation: 

482 if info_location is not None: 

483 paths_to_remove.add(info_location) 

484 installed_files = dist.iter_declared_entries() 

485 if installed_files is not None: 

486 for installed_file in installed_files: 

487 paths_to_remove.add(os.path.join(dist_location, installed_file)) 

488 # FIXME: need a test for this elif block 

489 # occurs with --single-version-externally-managed/--record outside 

490 # of pip 

491 elif dist.is_file("top_level.txt"): 

492 try: 

493 namespace_packages = dist.read_text("namespace_packages.txt") 

494 except FileNotFoundError: 

495 namespaces = [] 

496 else: 

497 namespaces = namespace_packages.splitlines(keepends=False) 

498 for top_level_pkg in [ 

499 p 

500 for p in dist.read_text("top_level.txt").splitlines() 

501 if p and p not in namespaces 

502 ]: 

503 path = os.path.join(dist_location, top_level_pkg) 

504 paths_to_remove.add(path) 

505 paths_to_remove.add(f"{path}.py") 

506 paths_to_remove.add(f"{path}.pyc") 

507 paths_to_remove.add(f"{path}.pyo") 

508 

509 elif dist.installed_by_distutils: 

510 raise LegacyDistutilsInstall(distribution=dist) 

511 

512 elif dist.installed_as_egg: 

513 # package installed by easy_install 

514 # We cannot match on dist.egg_name because it can slightly vary 

515 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg 

516 # XXX We use normalized_dist_location because dist_location my contain 

517 # a trailing / if the distribution is a zipped egg 

518 # (which is not a directory). 

519 paths_to_remove.add(normalized_dist_location) 

520 easy_install_egg = os.path.split(normalized_dist_location)[1] 

521 easy_install_pth = os.path.join( 

522 os.path.dirname(normalized_dist_location), 

523 "easy-install.pth", 

524 ) 

525 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg) 

526 

527 elif dist.installed_with_dist_info: 

528 for path in uninstallation_paths(dist): 

529 paths_to_remove.add(path) 

530 

531 elif develop_egg_link: 

532 # PEP 660 modern editable is handled in the ``.dist-info`` case 

533 # above, so this only covers the setuptools-style editable. 

534 with open(develop_egg_link) as fh: 

535 link_pointer = os.path.normcase(fh.readline().strip()) 

536 normalized_link_pointer = paths_to_remove._normalize_path_cached( 

537 link_pointer 

538 ) 

539 assert os.path.samefile( 

540 normalized_link_pointer, normalized_dist_location 

541 ), ( 

542 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match " 

543 f"installed location of {dist.raw_name} (at {dist_location})" 

544 ) 

545 paths_to_remove.add(develop_egg_link) 

546 easy_install_pth = os.path.join( 

547 os.path.dirname(develop_egg_link), "easy-install.pth" 

548 ) 

549 paths_to_remove.add_pth(easy_install_pth, dist_location) 

550 

551 else: 

552 logger.debug( 

553 "Not sure how to uninstall: %s - Check: %s", 

554 dist, 

555 dist_location, 

556 ) 

557 

558 if dist.in_usersite: 

559 bin_dir = get_bin_user() 

560 else: 

561 bin_dir = get_bin_prefix() 

562 

563 # find distutils scripts= scripts 

564 try: 

565 for script in dist.iter_distutils_script_names(): 

566 paths_to_remove.add(os.path.join(bin_dir, script)) 

567 if WINDOWS: 

568 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat")) 

569 except (FileNotFoundError, NotADirectoryError): 

570 pass 

571 

572 # find console_scripts and gui_scripts 

573 def iter_scripts_to_remove( 

574 dist: BaseDistribution, 

575 bin_dir: str, 

576 ) -> Generator[str, None, None]: 

577 for entry_point in dist.iter_entry_points(): 

578 if entry_point.group == "console_scripts": 

579 yield from _script_names(bin_dir, entry_point.name, False) 

580 elif entry_point.group == "gui_scripts": 

581 yield from _script_names(bin_dir, entry_point.name, True) 

582 

583 for s in iter_scripts_to_remove(dist, bin_dir): 

584 paths_to_remove.add(s) 

585 

586 return paths_to_remove 

587 

588 

589class UninstallPthEntries: 

590 def __init__(self, pth_file: str) -> None: 

591 self.file = pth_file 

592 self.entries: set[str] = set() 

593 self._saved_lines: list[bytes] | None = None 

594 

595 def add(self, entry: str) -> None: 

596 entry = os.path.normcase(entry) 

597 # On Windows, os.path.normcase converts the entry to use 

598 # backslashes. This is correct for entries that describe absolute 

599 # paths outside of site-packages, but all the others use forward 

600 # slashes. 

601 # os.path.splitdrive is used instead of os.path.isabs because isabs 

602 # treats non-absolute paths with drive letter markings like c:foo\bar 

603 # as absolute paths. It also does not recognize UNC paths if they don't 

604 # have more than "\\sever\share". Valid examples: "\\server\share\" or 

605 # "\\server\share\folder". 

606 if WINDOWS and not os.path.splitdrive(entry)[0]: 

607 entry = entry.replace("\\", "/") 

608 self.entries.add(entry) 

609 

610 def remove(self) -> None: 

611 logger.verbose("Removing pth entries from %s:", self.file) 

612 

613 # If the file doesn't exist, log a warning and return 

614 if not os.path.isfile(self.file): 

615 logger.warning("Cannot remove entries from nonexistent file %s", self.file) 

616 return 

617 with open(self.file, "rb") as fh: 

618 # windows uses '\r\n' with py3k, but uses '\n' with py2.x 

619 lines = fh.readlines() 

620 self._saved_lines = lines 

621 if any(b"\r\n" in line for line in lines): 

622 endline = "\r\n" 

623 else: 

624 endline = "\n" 

625 # handle missing trailing newline 

626 if lines and not lines[-1].endswith(endline.encode("utf-8")): 

627 lines[-1] = lines[-1] + endline.encode("utf-8") 

628 for entry in self.entries: 

629 try: 

630 logger.verbose("Removing entry: %s", entry) 

631 lines.remove((entry + endline).encode("utf-8")) 

632 except ValueError: 

633 pass 

634 with open(self.file, "wb") as fh: 

635 fh.writelines(lines) 

636 

637 def rollback(self) -> bool: 

638 if self._saved_lines is None: 

639 logger.error("Cannot roll back changes to %s, none were made", self.file) 

640 return False 

641 logger.debug("Rolling %s back to previous state", self.file) 

642 with open(self.file, "wb") as fh: 

643 fh.writelines(self._saved_lines) 

644 return True