Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py: 14%
356 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import functools
2import os
3import sys
4import sysconfig
5from importlib.util import cache_from_source
6from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple
8from pip._internal.exceptions import UninstallationError
9from pip._internal.locations import get_bin_prefix, get_bin_user
10from pip._internal.metadata import BaseDistribution
11from pip._internal.utils.compat import WINDOWS
12from pip._internal.utils.egg_link import egg_link_path_from_location
13from pip._internal.utils.logging import getLogger, indent_log
14from pip._internal.utils.misc import ask, normalize_path, renames, rmtree
15from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
16from pip._internal.utils.virtualenv import running_under_virtualenv
18logger = getLogger(__name__)
21def _script_names(
22 bin_dir: str, script_name: str, is_gui: bool
23) -> Generator[str, None, None]:
24 """Create the fully qualified name of the files created by
25 {console,gui}_scripts for the given ``dist``.
26 Returns the list of file names
27 """
28 exe_name = os.path.join(bin_dir, script_name)
29 yield exe_name
30 if not WINDOWS:
31 return
32 yield f"{exe_name}.exe"
33 yield f"{exe_name}.exe.manifest"
34 if is_gui:
35 yield f"{exe_name}-script.pyw"
36 else:
37 yield f"{exe_name}-script.py"
40def _unique(
41 fn: Callable[..., Generator[Any, None, None]]
42) -> Callable[..., Generator[Any, None, None]]:
43 @functools.wraps(fn)
44 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
45 seen: Set[Any] = set()
46 for item in fn(*args, **kw):
47 if item not in seen:
48 seen.add(item)
49 yield item
51 return unique
54@_unique
55def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
56 """
57 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
59 Yield paths to all the files in RECORD. For each .py file in RECORD, add
60 the .pyc and .pyo in the same directory.
62 UninstallPathSet.add() takes care of the __pycache__ .py[co].
64 If RECORD is not found, raises UninstallationError,
65 with possible information from the INSTALLER file.
67 https://packaging.python.org/specifications/recording-installed-packages/
68 """
69 location = dist.location
70 assert location is not None, "not installed"
72 entries = dist.iter_declared_entries()
73 if entries is None:
74 msg = "Cannot uninstall {dist}, RECORD file not found.".format(dist=dist)
75 installer = dist.installer
76 if not installer or installer == "pip":
77 dep = "{}=={}".format(dist.raw_name, dist.version)
78 msg += (
79 " You might be able to recover from this via: "
80 "'pip install --force-reinstall --no-deps {}'.".format(dep)
81 )
82 else:
83 msg += " Hint: The package was installed by {}.".format(installer)
84 raise UninstallationError(msg)
86 for entry in entries:
87 path = os.path.join(location, entry)
88 yield path
89 if path.endswith(".py"):
90 dn, fn = os.path.split(path)
91 base = fn[:-3]
92 path = os.path.join(dn, base + ".pyc")
93 yield path
94 path = os.path.join(dn, base + ".pyo")
95 yield path
98def compact(paths: Iterable[str]) -> Set[str]:
99 """Compact a path set to contain the minimal number of paths
100 necessary to contain all paths in the set. If /a/path/ and
101 /a/path/to/a/file.txt are both in the set, leave only the
102 shorter path."""
104 sep = os.path.sep
105 short_paths: Set[str] = set()
106 for path in sorted(paths, key=len):
107 should_skip = any(
108 path.startswith(shortpath.rstrip("*"))
109 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
110 for shortpath in short_paths
111 )
112 if not should_skip:
113 short_paths.add(path)
114 return short_paths
117def compress_for_rename(paths: Iterable[str]) -> Set[str]:
118 """Returns a set containing the paths that need to be renamed.
120 This set may include directories when the original sequence of paths
121 included every file on disk.
122 """
123 case_map = {os.path.normcase(p): p for p in paths}
124 remaining = set(case_map)
125 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
126 wildcards: Set[str] = set()
128 def norm_join(*a: str) -> str:
129 return os.path.normcase(os.path.join(*a))
131 for root in unchecked:
132 if any(os.path.normcase(root).startswith(w) for w in wildcards):
133 # This directory has already been handled.
134 continue
136 all_files: Set[str] = set()
137 all_subdirs: Set[str] = set()
138 for dirname, subdirs, files in os.walk(root):
139 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
140 all_files.update(norm_join(root, dirname, f) for f in files)
141 # If all the files we found are in our remaining set of files to
142 # remove, then remove them from the latter set and add a wildcard
143 # for the directory.
144 if not (all_files - remaining):
145 remaining.difference_update(all_files)
146 wildcards.add(root + os.sep)
148 return set(map(case_map.__getitem__, remaining)) | wildcards
151def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]:
152 """Returns a tuple of 2 sets of which paths to display to user
154 The first set contains paths that would be deleted. Files of a package
155 are not added and the top-level directory of the package has a '*' added
156 at the end - to signify that all it's contents are removed.
158 The second set contains files that would have been skipped in the above
159 folders.
160 """
162 will_remove = set(paths)
163 will_skip = set()
165 # Determine folders and files
166 folders = set()
167 files = set()
168 for path in will_remove:
169 if path.endswith(".pyc"):
170 continue
171 if path.endswith("__init__.py") or ".dist-info" in path:
172 folders.add(os.path.dirname(path))
173 files.add(path)
175 # probably this one https://github.com/python/mypy/issues/390
176 _normcased_files = set(map(os.path.normcase, files)) # type: ignore
178 folders = compact(folders)
180 # This walks the tree using os.walk to not miss extra folders
181 # that might get added.
182 for folder in folders:
183 for dirpath, _, dirfiles in os.walk(folder):
184 for fname in dirfiles:
185 if fname.endswith(".pyc"):
186 continue
188 file_ = os.path.join(dirpath, fname)
189 if (
190 os.path.isfile(file_)
191 and os.path.normcase(file_) not in _normcased_files
192 ):
193 # We are skipping this file. Add it to the set.
194 will_skip.add(file_)
196 will_remove = files | {os.path.join(folder, "*") for folder in folders}
198 return will_remove, will_skip
201class StashedUninstallPathSet:
202 """A set of file rename operations to stash files while
203 tentatively uninstalling them."""
205 def __init__(self) -> None:
206 # Mapping from source file root to [Adjacent]TempDirectory
207 # for files under that directory.
208 self._save_dirs: Dict[str, TempDirectory] = {}
209 # (old path, new path) tuples for each move that may need
210 # to be undone.
211 self._moves: List[Tuple[str, str]] = []
213 def _get_directory_stash(self, path: str) -> str:
214 """Stashes a directory.
216 Directories are stashed adjacent to their original location if
217 possible, or else moved/copied into the user's temp dir."""
219 try:
220 save_dir: TempDirectory = AdjacentTempDirectory(path)
221 except OSError:
222 save_dir = TempDirectory(kind="uninstall")
223 self._save_dirs[os.path.normcase(path)] = save_dir
225 return save_dir.path
227 def _get_file_stash(self, path: str) -> str:
228 """Stashes a file.
230 If no root has been provided, one will be created for the directory
231 in the user's temp directory."""
232 path = os.path.normcase(path)
233 head, old_head = os.path.dirname(path), None
234 save_dir = None
236 while head != old_head:
237 try:
238 save_dir = self._save_dirs[head]
239 break
240 except KeyError:
241 pass
242 head, old_head = os.path.dirname(head), head
243 else:
244 # Did not find any suitable root
245 head = os.path.dirname(path)
246 save_dir = TempDirectory(kind="uninstall")
247 self._save_dirs[head] = save_dir
249 relpath = os.path.relpath(path, head)
250 if relpath and relpath != os.path.curdir:
251 return os.path.join(save_dir.path, relpath)
252 return save_dir.path
254 def stash(self, path: str) -> str:
255 """Stashes the directory or file and returns its new location.
256 Handle symlinks as files to avoid modifying the symlink targets.
257 """
258 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
259 if path_is_dir:
260 new_path = self._get_directory_stash(path)
261 else:
262 new_path = self._get_file_stash(path)
264 self._moves.append((path, new_path))
265 if path_is_dir and os.path.isdir(new_path):
266 # If we're moving a directory, we need to
267 # remove the destination first or else it will be
268 # moved to inside the existing directory.
269 # We just created new_path ourselves, so it will
270 # be removable.
271 os.rmdir(new_path)
272 renames(path, new_path)
273 return new_path
275 def commit(self) -> None:
276 """Commits the uninstall by removing stashed files."""
277 for _, save_dir in self._save_dirs.items():
278 save_dir.cleanup()
279 self._moves = []
280 self._save_dirs = {}
282 def rollback(self) -> None:
283 """Undoes the uninstall by moving stashed files back."""
284 for p in self._moves:
285 logger.info("Moving to %s\n from %s", *p)
287 for new_path, path in self._moves:
288 try:
289 logger.debug("Replacing %s from %s", new_path, path)
290 if os.path.isfile(new_path) or os.path.islink(new_path):
291 os.unlink(new_path)
292 elif os.path.isdir(new_path):
293 rmtree(new_path)
294 renames(path, new_path)
295 except OSError as ex:
296 logger.error("Failed to restore %s", new_path)
297 logger.debug("Exception: %s", ex)
299 self.commit()
301 @property
302 def can_rollback(self) -> bool:
303 return bool(self._moves)
306class UninstallPathSet:
307 """A set of file paths to be removed in the uninstallation of a
308 requirement."""
310 def __init__(self, dist: BaseDistribution) -> None:
311 self._paths: Set[str] = set()
312 self._refuse: Set[str] = set()
313 self._pth: Dict[str, UninstallPthEntries] = {}
314 self._dist = dist
315 self._moved_paths = StashedUninstallPathSet()
316 # Create local cache of normalize_path results. Creating an UninstallPathSet
317 # can result in hundreds/thousands of redundant calls to normalize_path with
318 # the same args, which hurts performance.
319 self._normalize_path_cached = functools.lru_cache()(normalize_path)
321 def _permitted(self, path: str) -> bool:
322 """
323 Return True if the given path is one we are permitted to
324 remove/modify, False otherwise.
326 """
327 # aka is_local, but caching normalized sys.prefix
328 if not running_under_virtualenv():
329 return True
330 return path.startswith(self._normalize_path_cached(sys.prefix))
332 def add(self, path: str) -> None:
333 head, tail = os.path.split(path)
335 # we normalize the head to resolve parent directory symlinks, but not
336 # the tail, since we only want to uninstall symlinks, not their targets
337 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail))
339 if not os.path.exists(path):
340 return
341 if self._permitted(path):
342 self._paths.add(path)
343 else:
344 self._refuse.add(path)
346 # __pycache__ files can show up after 'installed-files.txt' is created,
347 # due to imports
348 if os.path.splitext(path)[1] == ".py":
349 self.add(cache_from_source(path))
351 def add_pth(self, pth_file: str, entry: str) -> None:
352 pth_file = self._normalize_path_cached(pth_file)
353 if self._permitted(pth_file):
354 if pth_file not in self._pth:
355 self._pth[pth_file] = UninstallPthEntries(pth_file)
356 self._pth[pth_file].add(entry)
357 else:
358 self._refuse.add(pth_file)
360 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
361 """Remove paths in ``self._paths`` with confirmation (unless
362 ``auto_confirm`` is True)."""
364 if not self._paths:
365 logger.info(
366 "Can't uninstall '%s'. No files were found to uninstall.",
367 self._dist.raw_name,
368 )
369 return
371 dist_name_version = f"{self._dist.raw_name}-{self._dist.version}"
372 logger.info("Uninstalling %s:", dist_name_version)
374 with indent_log():
375 if auto_confirm or self._allowed_to_proceed(verbose):
376 moved = self._moved_paths
378 for_rename = compress_for_rename(self._paths)
380 for path in sorted(compact(for_rename)):
381 moved.stash(path)
382 logger.verbose("Removing file or directory %s", path)
384 for pth in self._pth.values():
385 pth.remove()
387 logger.info("Successfully uninstalled %s", dist_name_version)
389 def _allowed_to_proceed(self, verbose: bool) -> bool:
390 """Display which files would be deleted and prompt for confirmation"""
392 def _display(msg: str, paths: Iterable[str]) -> None:
393 if not paths:
394 return
396 logger.info(msg)
397 with indent_log():
398 for path in sorted(compact(paths)):
399 logger.info(path)
401 if not verbose:
402 will_remove, will_skip = compress_for_output_listing(self._paths)
403 else:
404 # In verbose mode, display all the files that are going to be
405 # deleted.
406 will_remove = set(self._paths)
407 will_skip = set()
409 _display("Would remove:", will_remove)
410 _display("Would not remove (might be manually added):", will_skip)
411 _display("Would not remove (outside of prefix):", self._refuse)
412 if verbose:
413 _display("Will actually move:", compress_for_rename(self._paths))
415 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
417 def rollback(self) -> None:
418 """Rollback the changes previously made by remove()."""
419 if not self._moved_paths.can_rollback:
420 logger.error(
421 "Can't roll back %s; was not uninstalled",
422 self._dist.raw_name,
423 )
424 return
425 logger.info("Rolling back uninstall of %s", self._dist.raw_name)
426 self._moved_paths.rollback()
427 for pth in self._pth.values():
428 pth.rollback()
430 def commit(self) -> None:
431 """Remove temporary save dir: rollback will no longer be possible."""
432 self._moved_paths.commit()
434 @classmethod
435 def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet":
436 dist_location = dist.location
437 info_location = dist.info_location
438 if dist_location is None:
439 logger.info(
440 "Not uninstalling %s since it is not installed",
441 dist.canonical_name,
442 )
443 return cls(dist)
445 normalized_dist_location = normalize_path(dist_location)
446 if not dist.local:
447 logger.info(
448 "Not uninstalling %s at %s, outside environment %s",
449 dist.canonical_name,
450 normalized_dist_location,
451 sys.prefix,
452 )
453 return cls(dist)
455 if normalized_dist_location in {
456 p
457 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
458 if p
459 }:
460 logger.info(
461 "Not uninstalling %s at %s, as it is in the standard library.",
462 dist.canonical_name,
463 normalized_dist_location,
464 )
465 return cls(dist)
467 paths_to_remove = cls(dist)
468 develop_egg_link = egg_link_path_from_location(dist.raw_name)
470 # Distribution is installed with metadata in a "flat" .egg-info
471 # directory. This means it is not a modern .dist-info installation, an
472 # egg, or legacy editable.
473 setuptools_flat_installation = (
474 dist.installed_with_setuptools_egg_info
475 and info_location is not None
476 and os.path.exists(info_location)
477 # If dist is editable and the location points to a ``.egg-info``,
478 # we are in fact in the legacy editable case.
479 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info")
480 )
482 # Uninstall cases order do matter as in the case of 2 installs of the
483 # same package, pip needs to uninstall the currently detected version
484 if setuptools_flat_installation:
485 if info_location is not None:
486 paths_to_remove.add(info_location)
487 installed_files = dist.iter_declared_entries()
488 if installed_files is not None:
489 for installed_file in installed_files:
490 paths_to_remove.add(os.path.join(dist_location, installed_file))
491 # FIXME: need a test for this elif block
492 # occurs with --single-version-externally-managed/--record outside
493 # of pip
494 elif dist.is_file("top_level.txt"):
495 try:
496 namespace_packages = dist.read_text("namespace_packages.txt")
497 except FileNotFoundError:
498 namespaces = []
499 else:
500 namespaces = namespace_packages.splitlines(keepends=False)
501 for top_level_pkg in [
502 p
503 for p in dist.read_text("top_level.txt").splitlines()
504 if p and p not in namespaces
505 ]:
506 path = os.path.join(dist_location, top_level_pkg)
507 paths_to_remove.add(path)
508 paths_to_remove.add(f"{path}.py")
509 paths_to_remove.add(f"{path}.pyc")
510 paths_to_remove.add(f"{path}.pyo")
512 elif dist.installed_by_distutils:
513 raise UninstallationError(
514 "Cannot uninstall {!r}. It is a distutils installed project "
515 "and thus we cannot accurately determine which files belong "
516 "to it which would lead to only a partial uninstall.".format(
517 dist.raw_name,
518 )
519 )
521 elif dist.installed_as_egg:
522 # package installed by easy_install
523 # We cannot match on dist.egg_name because it can slightly vary
524 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
525 paths_to_remove.add(dist_location)
526 easy_install_egg = os.path.split(dist_location)[1]
527 easy_install_pth = os.path.join(
528 os.path.dirname(dist_location),
529 "easy-install.pth",
530 )
531 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
533 elif dist.installed_with_dist_info:
534 for path in uninstallation_paths(dist):
535 paths_to_remove.add(path)
537 elif develop_egg_link:
538 # PEP 660 modern editable is handled in the ``.dist-info`` case
539 # above, so this only covers the setuptools-style editable.
540 with open(develop_egg_link) as fh:
541 link_pointer = os.path.normcase(fh.readline().strip())
542 normalized_link_pointer = paths_to_remove._normalize_path_cached(
543 link_pointer
544 )
545 assert os.path.samefile(
546 normalized_link_pointer, normalized_dist_location
547 ), (
548 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match "
549 f"installed location of {dist.raw_name} (at {dist_location})"
550 )
551 paths_to_remove.add(develop_egg_link)
552 easy_install_pth = os.path.join(
553 os.path.dirname(develop_egg_link), "easy-install.pth"
554 )
555 paths_to_remove.add_pth(easy_install_pth, dist_location)
557 else:
558 logger.debug(
559 "Not sure how to uninstall: %s - Check: %s",
560 dist,
561 dist_location,
562 )
564 if dist.in_usersite:
565 bin_dir = get_bin_user()
566 else:
567 bin_dir = get_bin_prefix()
569 # find distutils scripts= scripts
570 try:
571 for script in dist.iter_distutils_script_names():
572 paths_to_remove.add(os.path.join(bin_dir, script))
573 if WINDOWS:
574 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
575 except (FileNotFoundError, NotADirectoryError):
576 pass
578 # find console_scripts and gui_scripts
579 def iter_scripts_to_remove(
580 dist: BaseDistribution,
581 bin_dir: str,
582 ) -> Generator[str, None, None]:
583 for entry_point in dist.iter_entry_points():
584 if entry_point.group == "console_scripts":
585 yield from _script_names(bin_dir, entry_point.name, False)
586 elif entry_point.group == "gui_scripts":
587 yield from _script_names(bin_dir, entry_point.name, True)
589 for s in iter_scripts_to_remove(dist, bin_dir):
590 paths_to_remove.add(s)
592 return paths_to_remove
595class UninstallPthEntries:
596 def __init__(self, pth_file: str) -> None:
597 self.file = pth_file
598 self.entries: Set[str] = set()
599 self._saved_lines: Optional[List[bytes]] = None
601 def add(self, entry: str) -> None:
602 entry = os.path.normcase(entry)
603 # On Windows, os.path.normcase converts the entry to use
604 # backslashes. This is correct for entries that describe absolute
605 # paths outside of site-packages, but all the others use forward
606 # slashes.
607 # os.path.splitdrive is used instead of os.path.isabs because isabs
608 # treats non-absolute paths with drive letter markings like c:foo\bar
609 # as absolute paths. It also does not recognize UNC paths if they don't
610 # have more than "\\sever\share". Valid examples: "\\server\share\" or
611 # "\\server\share\folder".
612 if WINDOWS and not os.path.splitdrive(entry)[0]:
613 entry = entry.replace("\\", "/")
614 self.entries.add(entry)
616 def remove(self) -> None:
617 logger.verbose("Removing pth entries from %s:", self.file)
619 # If the file doesn't exist, log a warning and return
620 if not os.path.isfile(self.file):
621 logger.warning("Cannot remove entries from nonexistent file %s", self.file)
622 return
623 with open(self.file, "rb") as fh:
624 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
625 lines = fh.readlines()
626 self._saved_lines = lines
627 if any(b"\r\n" in line for line in lines):
628 endline = "\r\n"
629 else:
630 endline = "\n"
631 # handle missing trailing newline
632 if lines and not lines[-1].endswith(endline.encode("utf-8")):
633 lines[-1] = lines[-1] + endline.encode("utf-8")
634 for entry in self.entries:
635 try:
636 logger.verbose("Removing entry: %s", entry)
637 lines.remove((entry + endline).encode("utf-8"))
638 except ValueError:
639 pass
640 with open(self.file, "wb") as fh:
641 fh.writelines(lines)
643 def rollback(self) -> bool:
644 if self._saved_lines is None:
645 logger.error("Cannot roll back changes to %s, none were made", self.file)
646 return False
647 logger.debug("Rolling %s back to previous state", self.file)
648 with open(self.file, "wb") as fh:
649 fh.writelines(self._saved_lines)
650 return True