Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py: 14%
356 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1import functools
2import os
3import sys
4import sysconfig
5from importlib.util import cache_from_source
6from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple
8from pip._internal.exceptions import UninstallationError
9from pip._internal.locations import get_bin_prefix, get_bin_user
10from pip._internal.metadata import BaseDistribution
11from pip._internal.utils.compat import WINDOWS
12from pip._internal.utils.egg_link import egg_link_path_from_location
13from pip._internal.utils.logging import getLogger, indent_log
14from pip._internal.utils.misc import ask, normalize_path, renames, rmtree
15from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
16from pip._internal.utils.virtualenv import running_under_virtualenv
18logger = getLogger(__name__)
21def _script_names(
22 bin_dir: str, script_name: str, is_gui: bool
23) -> Generator[str, None, None]:
24 """Create the fully qualified name of the files created by
25 {console,gui}_scripts for the given ``dist``.
26 Returns the list of file names
27 """
28 exe_name = os.path.join(bin_dir, script_name)
29 yield exe_name
30 if not WINDOWS:
31 return
32 yield f"{exe_name}.exe"
33 yield f"{exe_name}.exe.manifest"
34 if is_gui:
35 yield f"{exe_name}-script.pyw"
36 else:
37 yield f"{exe_name}-script.py"
40def _unique(
41 fn: Callable[..., Generator[Any, None, None]]
42) -> Callable[..., Generator[Any, None, None]]:
43 @functools.wraps(fn)
44 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
45 seen: Set[Any] = set()
46 for item in fn(*args, **kw):
47 if item not in seen:
48 seen.add(item)
49 yield item
51 return unique
54@_unique
55def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
56 """
57 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
59 Yield paths to all the files in RECORD. For each .py file in RECORD, add
60 the .pyc and .pyo in the same directory.
62 UninstallPathSet.add() takes care of the __pycache__ .py[co].
64 If RECORD is not found, raises UninstallationError,
65 with possible information from the INSTALLER file.
67 https://packaging.python.org/specifications/recording-installed-packages/
68 """
69 location = dist.location
70 assert location is not None, "not installed"
72 entries = dist.iter_declared_entries()
73 if entries is None:
74 msg = f"Cannot uninstall {dist}, RECORD file not found."
75 installer = dist.installer
76 if not installer or installer == "pip":
77 dep = f"{dist.raw_name}=={dist.version}"
78 msg += (
79 " You might be able to recover from this via: "
80 f"'pip install --force-reinstall --no-deps {dep}'."
81 )
82 else:
83 msg += f" Hint: The package was installed by {installer}."
84 raise UninstallationError(msg)
86 for entry in entries:
87 path = os.path.join(location, entry)
88 yield path
89 if path.endswith(".py"):
90 dn, fn = os.path.split(path)
91 base = fn[:-3]
92 path = os.path.join(dn, base + ".pyc")
93 yield path
94 path = os.path.join(dn, base + ".pyo")
95 yield path
98def compact(paths: Iterable[str]) -> Set[str]:
99 """Compact a path set to contain the minimal number of paths
100 necessary to contain all paths in the set. If /a/path/ and
101 /a/path/to/a/file.txt are both in the set, leave only the
102 shorter path."""
104 sep = os.path.sep
105 short_paths: Set[str] = set()
106 for path in sorted(paths, key=len):
107 should_skip = any(
108 path.startswith(shortpath.rstrip("*"))
109 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
110 for shortpath in short_paths
111 )
112 if not should_skip:
113 short_paths.add(path)
114 return short_paths
117def compress_for_rename(paths: Iterable[str]) -> Set[str]:
118 """Returns a set containing the paths that need to be renamed.
120 This set may include directories when the original sequence of paths
121 included every file on disk.
122 """
123 case_map = {os.path.normcase(p): p for p in paths}
124 remaining = set(case_map)
125 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
126 wildcards: Set[str] = set()
128 def norm_join(*a: str) -> str:
129 return os.path.normcase(os.path.join(*a))
131 for root in unchecked:
132 if any(os.path.normcase(root).startswith(w) for w in wildcards):
133 # This directory has already been handled.
134 continue
136 all_files: Set[str] = set()
137 all_subdirs: Set[str] = set()
138 for dirname, subdirs, files in os.walk(root):
139 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
140 all_files.update(norm_join(root, dirname, f) for f in files)
141 # If all the files we found are in our remaining set of files to
142 # remove, then remove them from the latter set and add a wildcard
143 # for the directory.
144 if not (all_files - remaining):
145 remaining.difference_update(all_files)
146 wildcards.add(root + os.sep)
148 return set(map(case_map.__getitem__, remaining)) | wildcards
151def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]:
152 """Returns a tuple of 2 sets of which paths to display to user
154 The first set contains paths that would be deleted. Files of a package
155 are not added and the top-level directory of the package has a '*' added
156 at the end - to signify that all it's contents are removed.
158 The second set contains files that would have been skipped in the above
159 folders.
160 """
162 will_remove = set(paths)
163 will_skip = set()
165 # Determine folders and files
166 folders = set()
167 files = set()
168 for path in will_remove:
169 if path.endswith(".pyc"):
170 continue
171 if path.endswith("__init__.py") or ".dist-info" in path:
172 folders.add(os.path.dirname(path))
173 files.add(path)
175 _normcased_files = set(map(os.path.normcase, files))
177 folders = compact(folders)
179 # This walks the tree using os.walk to not miss extra folders
180 # that might get added.
181 for folder in folders:
182 for dirpath, _, dirfiles in os.walk(folder):
183 for fname in dirfiles:
184 if fname.endswith(".pyc"):
185 continue
187 file_ = os.path.join(dirpath, fname)
188 if (
189 os.path.isfile(file_)
190 and os.path.normcase(file_) not in _normcased_files
191 ):
192 # We are skipping this file. Add it to the set.
193 will_skip.add(file_)
195 will_remove = files | {os.path.join(folder, "*") for folder in folders}
197 return will_remove, will_skip
200class StashedUninstallPathSet:
201 """A set of file rename operations to stash files while
202 tentatively uninstalling them."""
204 def __init__(self) -> None:
205 # Mapping from source file root to [Adjacent]TempDirectory
206 # for files under that directory.
207 self._save_dirs: Dict[str, TempDirectory] = {}
208 # (old path, new path) tuples for each move that may need
209 # to be undone.
210 self._moves: List[Tuple[str, str]] = []
212 def _get_directory_stash(self, path: str) -> str:
213 """Stashes a directory.
215 Directories are stashed adjacent to their original location if
216 possible, or else moved/copied into the user's temp dir."""
218 try:
219 save_dir: TempDirectory = AdjacentTempDirectory(path)
220 except OSError:
221 save_dir = TempDirectory(kind="uninstall")
222 self._save_dirs[os.path.normcase(path)] = save_dir
224 return save_dir.path
226 def _get_file_stash(self, path: str) -> str:
227 """Stashes a file.
229 If no root has been provided, one will be created for the directory
230 in the user's temp directory."""
231 path = os.path.normcase(path)
232 head, old_head = os.path.dirname(path), None
233 save_dir = None
235 while head != old_head:
236 try:
237 save_dir = self._save_dirs[head]
238 break
239 except KeyError:
240 pass
241 head, old_head = os.path.dirname(head), head
242 else:
243 # Did not find any suitable root
244 head = os.path.dirname(path)
245 save_dir = TempDirectory(kind="uninstall")
246 self._save_dirs[head] = save_dir
248 relpath = os.path.relpath(path, head)
249 if relpath and relpath != os.path.curdir:
250 return os.path.join(save_dir.path, relpath)
251 return save_dir.path
253 def stash(self, path: str) -> str:
254 """Stashes the directory or file and returns its new location.
255 Handle symlinks as files to avoid modifying the symlink targets.
256 """
257 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
258 if path_is_dir:
259 new_path = self._get_directory_stash(path)
260 else:
261 new_path = self._get_file_stash(path)
263 self._moves.append((path, new_path))
264 if path_is_dir and os.path.isdir(new_path):
265 # If we're moving a directory, we need to
266 # remove the destination first or else it will be
267 # moved to inside the existing directory.
268 # We just created new_path ourselves, so it will
269 # be removable.
270 os.rmdir(new_path)
271 renames(path, new_path)
272 return new_path
274 def commit(self) -> None:
275 """Commits the uninstall by removing stashed files."""
276 for save_dir in self._save_dirs.values():
277 save_dir.cleanup()
278 self._moves = []
279 self._save_dirs = {}
281 def rollback(self) -> None:
282 """Undoes the uninstall by moving stashed files back."""
283 for p in self._moves:
284 logger.info("Moving to %s\n from %s", *p)
286 for new_path, path in self._moves:
287 try:
288 logger.debug("Replacing %s from %s", new_path, path)
289 if os.path.isfile(new_path) or os.path.islink(new_path):
290 os.unlink(new_path)
291 elif os.path.isdir(new_path):
292 rmtree(new_path)
293 renames(path, new_path)
294 except OSError as ex:
295 logger.error("Failed to restore %s", new_path)
296 logger.debug("Exception: %s", ex)
298 self.commit()
300 @property
301 def can_rollback(self) -> bool:
302 return bool(self._moves)
305class UninstallPathSet:
306 """A set of file paths to be removed in the uninstallation of a
307 requirement."""
309 def __init__(self, dist: BaseDistribution) -> None:
310 self._paths: Set[str] = set()
311 self._refuse: Set[str] = set()
312 self._pth: Dict[str, UninstallPthEntries] = {}
313 self._dist = dist
314 self._moved_paths = StashedUninstallPathSet()
315 # Create local cache of normalize_path results. Creating an UninstallPathSet
316 # can result in hundreds/thousands of redundant calls to normalize_path with
317 # the same args, which hurts performance.
318 self._normalize_path_cached = functools.lru_cache(normalize_path)
320 def _permitted(self, path: str) -> bool:
321 """
322 Return True if the given path is one we are permitted to
323 remove/modify, False otherwise.
325 """
326 # aka is_local, but caching normalized sys.prefix
327 if not running_under_virtualenv():
328 return True
329 return path.startswith(self._normalize_path_cached(sys.prefix))
331 def add(self, path: str) -> None:
332 head, tail = os.path.split(path)
334 # we normalize the head to resolve parent directory symlinks, but not
335 # the tail, since we only want to uninstall symlinks, not their targets
336 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail))
338 if not os.path.exists(path):
339 return
340 if self._permitted(path):
341 self._paths.add(path)
342 else:
343 self._refuse.add(path)
345 # __pycache__ files can show up after 'installed-files.txt' is created,
346 # due to imports
347 if os.path.splitext(path)[1] == ".py":
348 self.add(cache_from_source(path))
350 def add_pth(self, pth_file: str, entry: str) -> None:
351 pth_file = self._normalize_path_cached(pth_file)
352 if self._permitted(pth_file):
353 if pth_file not in self._pth:
354 self._pth[pth_file] = UninstallPthEntries(pth_file)
355 self._pth[pth_file].add(entry)
356 else:
357 self._refuse.add(pth_file)
359 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
360 """Remove paths in ``self._paths`` with confirmation (unless
361 ``auto_confirm`` is True)."""
363 if not self._paths:
364 logger.info(
365 "Can't uninstall '%s'. No files were found to uninstall.",
366 self._dist.raw_name,
367 )
368 return
370 dist_name_version = f"{self._dist.raw_name}-{self._dist.version}"
371 logger.info("Uninstalling %s:", dist_name_version)
373 with indent_log():
374 if auto_confirm or self._allowed_to_proceed(verbose):
375 moved = self._moved_paths
377 for_rename = compress_for_rename(self._paths)
379 for path in sorted(compact(for_rename)):
380 moved.stash(path)
381 logger.verbose("Removing file or directory %s", path)
383 for pth in self._pth.values():
384 pth.remove()
386 logger.info("Successfully uninstalled %s", dist_name_version)
388 def _allowed_to_proceed(self, verbose: bool) -> bool:
389 """Display which files would be deleted and prompt for confirmation"""
391 def _display(msg: str, paths: Iterable[str]) -> None:
392 if not paths:
393 return
395 logger.info(msg)
396 with indent_log():
397 for path in sorted(compact(paths)):
398 logger.info(path)
400 if not verbose:
401 will_remove, will_skip = compress_for_output_listing(self._paths)
402 else:
403 # In verbose mode, display all the files that are going to be
404 # deleted.
405 will_remove = set(self._paths)
406 will_skip = set()
408 _display("Would remove:", will_remove)
409 _display("Would not remove (might be manually added):", will_skip)
410 _display("Would not remove (outside of prefix):", self._refuse)
411 if verbose:
412 _display("Will actually move:", compress_for_rename(self._paths))
414 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
416 def rollback(self) -> None:
417 """Rollback the changes previously made by remove()."""
418 if not self._moved_paths.can_rollback:
419 logger.error(
420 "Can't roll back %s; was not uninstalled",
421 self._dist.raw_name,
422 )
423 return
424 logger.info("Rolling back uninstall of %s", self._dist.raw_name)
425 self._moved_paths.rollback()
426 for pth in self._pth.values():
427 pth.rollback()
429 def commit(self) -> None:
430 """Remove temporary save dir: rollback will no longer be possible."""
431 self._moved_paths.commit()
433 @classmethod
434 def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet":
435 dist_location = dist.location
436 info_location = dist.info_location
437 if dist_location is None:
438 logger.info(
439 "Not uninstalling %s since it is not installed",
440 dist.canonical_name,
441 )
442 return cls(dist)
444 normalized_dist_location = normalize_path(dist_location)
445 if not dist.local:
446 logger.info(
447 "Not uninstalling %s at %s, outside environment %s",
448 dist.canonical_name,
449 normalized_dist_location,
450 sys.prefix,
451 )
452 return cls(dist)
454 if normalized_dist_location in {
455 p
456 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
457 if p
458 }:
459 logger.info(
460 "Not uninstalling %s at %s, as it is in the standard library.",
461 dist.canonical_name,
462 normalized_dist_location,
463 )
464 return cls(dist)
466 paths_to_remove = cls(dist)
467 develop_egg_link = egg_link_path_from_location(dist.raw_name)
469 # Distribution is installed with metadata in a "flat" .egg-info
470 # directory. This means it is not a modern .dist-info installation, an
471 # egg, or legacy editable.
472 setuptools_flat_installation = (
473 dist.installed_with_setuptools_egg_info
474 and info_location is not None
475 and os.path.exists(info_location)
476 # If dist is editable and the location points to a ``.egg-info``,
477 # we are in fact in the legacy editable case.
478 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info")
479 )
481 # Uninstall cases order do matter as in the case of 2 installs of the
482 # same package, pip needs to uninstall the currently detected version
483 if setuptools_flat_installation:
484 if info_location is not None:
485 paths_to_remove.add(info_location)
486 installed_files = dist.iter_declared_entries()
487 if installed_files is not None:
488 for installed_file in installed_files:
489 paths_to_remove.add(os.path.join(dist_location, installed_file))
490 # FIXME: need a test for this elif block
491 # occurs with --single-version-externally-managed/--record outside
492 # of pip
493 elif dist.is_file("top_level.txt"):
494 try:
495 namespace_packages = dist.read_text("namespace_packages.txt")
496 except FileNotFoundError:
497 namespaces = []
498 else:
499 namespaces = namespace_packages.splitlines(keepends=False)
500 for top_level_pkg in [
501 p
502 for p in dist.read_text("top_level.txt").splitlines()
503 if p and p not in namespaces
504 ]:
505 path = os.path.join(dist_location, top_level_pkg)
506 paths_to_remove.add(path)
507 paths_to_remove.add(f"{path}.py")
508 paths_to_remove.add(f"{path}.pyc")
509 paths_to_remove.add(f"{path}.pyo")
511 elif dist.installed_by_distutils:
512 raise UninstallationError(
513 "Cannot uninstall {!r}. It is a distutils installed project "
514 "and thus we cannot accurately determine which files belong "
515 "to it which would lead to only a partial uninstall.".format(
516 dist.raw_name,
517 )
518 )
520 elif dist.installed_as_egg:
521 # package installed by easy_install
522 # We cannot match on dist.egg_name because it can slightly vary
523 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
524 paths_to_remove.add(dist_location)
525 easy_install_egg = os.path.split(dist_location)[1]
526 easy_install_pth = os.path.join(
527 os.path.dirname(dist_location),
528 "easy-install.pth",
529 )
530 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
532 elif dist.installed_with_dist_info:
533 for path in uninstallation_paths(dist):
534 paths_to_remove.add(path)
536 elif develop_egg_link:
537 # PEP 660 modern editable is handled in the ``.dist-info`` case
538 # above, so this only covers the setuptools-style editable.
539 with open(develop_egg_link) as fh:
540 link_pointer = os.path.normcase(fh.readline().strip())
541 normalized_link_pointer = paths_to_remove._normalize_path_cached(
542 link_pointer
543 )
544 assert os.path.samefile(
545 normalized_link_pointer, normalized_dist_location
546 ), (
547 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match "
548 f"installed location of {dist.raw_name} (at {dist_location})"
549 )
550 paths_to_remove.add(develop_egg_link)
551 easy_install_pth = os.path.join(
552 os.path.dirname(develop_egg_link), "easy-install.pth"
553 )
554 paths_to_remove.add_pth(easy_install_pth, dist_location)
556 else:
557 logger.debug(
558 "Not sure how to uninstall: %s - Check: %s",
559 dist,
560 dist_location,
561 )
563 if dist.in_usersite:
564 bin_dir = get_bin_user()
565 else:
566 bin_dir = get_bin_prefix()
568 # find distutils scripts= scripts
569 try:
570 for script in dist.iter_distutils_script_names():
571 paths_to_remove.add(os.path.join(bin_dir, script))
572 if WINDOWS:
573 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
574 except (FileNotFoundError, NotADirectoryError):
575 pass
577 # find console_scripts and gui_scripts
578 def iter_scripts_to_remove(
579 dist: BaseDistribution,
580 bin_dir: str,
581 ) -> Generator[str, None, None]:
582 for entry_point in dist.iter_entry_points():
583 if entry_point.group == "console_scripts":
584 yield from _script_names(bin_dir, entry_point.name, False)
585 elif entry_point.group == "gui_scripts":
586 yield from _script_names(bin_dir, entry_point.name, True)
588 for s in iter_scripts_to_remove(dist, bin_dir):
589 paths_to_remove.add(s)
591 return paths_to_remove
594class UninstallPthEntries:
595 def __init__(self, pth_file: str) -> None:
596 self.file = pth_file
597 self.entries: Set[str] = set()
598 self._saved_lines: Optional[List[bytes]] = None
600 def add(self, entry: str) -> None:
601 entry = os.path.normcase(entry)
602 # On Windows, os.path.normcase converts the entry to use
603 # backslashes. This is correct for entries that describe absolute
604 # paths outside of site-packages, but all the others use forward
605 # slashes.
606 # os.path.splitdrive is used instead of os.path.isabs because isabs
607 # treats non-absolute paths with drive letter markings like c:foo\bar
608 # as absolute paths. It also does not recognize UNC paths if they don't
609 # have more than "\\sever\share". Valid examples: "\\server\share\" or
610 # "\\server\share\folder".
611 if WINDOWS and not os.path.splitdrive(entry)[0]:
612 entry = entry.replace("\\", "/")
613 self.entries.add(entry)
615 def remove(self) -> None:
616 logger.verbose("Removing pth entries from %s:", self.file)
618 # If the file doesn't exist, log a warning and return
619 if not os.path.isfile(self.file):
620 logger.warning("Cannot remove entries from nonexistent file %s", self.file)
621 return
622 with open(self.file, "rb") as fh:
623 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
624 lines = fh.readlines()
625 self._saved_lines = lines
626 if any(b"\r\n" in line for line in lines):
627 endline = "\r\n"
628 else:
629 endline = "\n"
630 # handle missing trailing newline
631 if lines and not lines[-1].endswith(endline.encode("utf-8")):
632 lines[-1] = lines[-1] + endline.encode("utf-8")
633 for entry in self.entries:
634 try:
635 logger.verbose("Removing entry: %s", entry)
636 lines.remove((entry + endline).encode("utf-8"))
637 except ValueError:
638 pass
639 with open(self.file, "wb") as fh:
640 fh.writelines(lines)
642 def rollback(self) -> bool:
643 if self._saved_lines is None:
644 logger.error("Cannot roll back changes to %s, none were made", self.file)
645 return False
646 logger.debug("Rolling %s back to previous state", self.file)
647 with open(self.file, "wb") as fh:
648 fh.writelines(self._saved_lines)
649 return True