1from __future__ import annotations
2
3import functools
4import os
5import sys
6import sysconfig
7from collections.abc import Generator, Iterable
8from importlib.util import cache_from_source
9from typing import Any, Callable
10
11from pip._internal.exceptions import LegacyDistutilsInstall, UninstallMissingRecord
12from pip._internal.locations import get_bin_prefix, get_bin_user
13from pip._internal.metadata import BaseDistribution
14from pip._internal.utils.compat import WINDOWS
15from pip._internal.utils.egg_link import egg_link_path_from_location
16from pip._internal.utils.logging import getLogger, indent_log
17from pip._internal.utils.misc import ask, normalize_path, renames, rmtree
18from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
19from pip._internal.utils.virtualenv import running_under_virtualenv
20
21logger = getLogger(__name__)
22
23
24def _script_names(
25 bin_dir: str, script_name: str, is_gui: bool
26) -> Generator[str, None, None]:
27 """Create the fully qualified name of the files created by
28 {console,gui}_scripts for the given ``dist``.
29 Returns the list of file names
30 """
31 exe_name = os.path.join(bin_dir, script_name)
32 yield exe_name
33 if not WINDOWS:
34 return
35 yield f"{exe_name}.exe"
36 yield f"{exe_name}.exe.manifest"
37 if is_gui:
38 yield f"{exe_name}-script.pyw"
39 else:
40 yield f"{exe_name}-script.py"
41
42
43def _unique(
44 fn: Callable[..., Generator[Any, None, None]],
45) -> Callable[..., Generator[Any, None, None]]:
46 @functools.wraps(fn)
47 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
48 seen: set[Any] = set()
49 for item in fn(*args, **kw):
50 if item not in seen:
51 seen.add(item)
52 yield item
53
54 return unique
55
56
57@_unique
58def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
59 """
60 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
61
62 Yield paths to all the files in RECORD. For each .py file in RECORD, add
63 the .pyc and .pyo in the same directory.
64
65 UninstallPathSet.add() takes care of the __pycache__ .py[co].
66
67 If RECORD is not found, raises an error,
68 with possible information from the INSTALLER file.
69
70 https://packaging.python.org/specifications/recording-installed-packages/
71 """
72 location = dist.location
73 assert location is not None, "not installed"
74
75 entries = dist.iter_declared_entries()
76 if entries is None:
77 raise UninstallMissingRecord(distribution=dist)
78
79 for entry in entries:
80 path = os.path.join(location, entry)
81 yield path
82 if path.endswith(".py"):
83 dn, fn = os.path.split(path)
84 base = fn[:-3]
85 path = os.path.join(dn, base + ".pyc")
86 yield path
87 path = os.path.join(dn, base + ".pyo")
88 yield path
89
90
91def compact(paths: Iterable[str]) -> set[str]:
92 """Compact a path set to contain the minimal number of paths
93 necessary to contain all paths in the set. If /a/path/ and
94 /a/path/to/a/file.txt are both in the set, leave only the
95 shorter path."""
96
97 sep = os.path.sep
98 short_paths: set[str] = set()
99 for path in sorted(paths, key=len):
100 should_skip = any(
101 path.startswith(shortpath.rstrip("*"))
102 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
103 for shortpath in short_paths
104 )
105 if not should_skip:
106 short_paths.add(path)
107 return short_paths
108
109
110def compress_for_rename(paths: Iterable[str]) -> set[str]:
111 """Returns a set containing the paths that need to be renamed.
112
113 This set may include directories when the original sequence of paths
114 included every file on disk.
115 """
116 case_map = {os.path.normcase(p): p for p in paths}
117 remaining = set(case_map)
118 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
119 wildcards: set[str] = set()
120
121 def norm_join(*a: str) -> str:
122 return os.path.normcase(os.path.join(*a))
123
124 for root in unchecked:
125 if any(os.path.normcase(root).startswith(w) for w in wildcards):
126 # This directory has already been handled.
127 continue
128
129 all_files: set[str] = set()
130 all_subdirs: set[str] = set()
131 for dirname, subdirs, files in os.walk(root):
132 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
133 all_files.update(norm_join(root, dirname, f) for f in files)
134 # If all the files we found are in our remaining set of files to
135 # remove, then remove them from the latter set and add a wildcard
136 # for the directory.
137 if not (all_files - remaining):
138 remaining.difference_update(all_files)
139 wildcards.add(root + os.sep)
140
141 return set(map(case_map.__getitem__, remaining)) | wildcards
142
143
144def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str]]:
145 """Returns a tuple of 2 sets of which paths to display to user
146
147 The first set contains paths that would be deleted. Files of a package
148 are not added and the top-level directory of the package has a '*' added
149 at the end - to signify that all it's contents are removed.
150
151 The second set contains files that would have been skipped in the above
152 folders.
153 """
154
155 will_remove = set(paths)
156 will_skip = set()
157
158 # Determine folders and files
159 folders = set()
160 files = set()
161 for path in will_remove:
162 if path.endswith(".pyc"):
163 continue
164 if path.endswith("__init__.py") or ".dist-info" in path:
165 folders.add(os.path.dirname(path))
166 files.add(path)
167
168 _normcased_files = set(map(os.path.normcase, files))
169
170 folders = compact(folders)
171
172 # This walks the tree using os.walk to not miss extra folders
173 # that might get added.
174 for folder in folders:
175 for dirpath, _, dirfiles in os.walk(folder):
176 for fname in dirfiles:
177 if fname.endswith(".pyc"):
178 continue
179
180 file_ = os.path.join(dirpath, fname)
181 if (
182 os.path.isfile(file_)
183 and os.path.normcase(file_) not in _normcased_files
184 ):
185 # We are skipping this file. Add it to the set.
186 will_skip.add(file_)
187
188 will_remove = files | {os.path.join(folder, "*") for folder in folders}
189
190 return will_remove, will_skip
191
192
193class StashedUninstallPathSet:
194 """A set of file rename operations to stash files while
195 tentatively uninstalling them."""
196
197 def __init__(self) -> None:
198 # Mapping from source file root to [Adjacent]TempDirectory
199 # for files under that directory.
200 self._save_dirs: dict[str, TempDirectory] = {}
201 # (old path, new path) tuples for each move that may need
202 # to be undone.
203 self._moves: list[tuple[str, str]] = []
204
205 def _get_directory_stash(self, path: str) -> str:
206 """Stashes a directory.
207
208 Directories are stashed adjacent to their original location if
209 possible, or else moved/copied into the user's temp dir."""
210
211 try:
212 save_dir: TempDirectory = AdjacentTempDirectory(path)
213 except OSError:
214 save_dir = TempDirectory(kind="uninstall")
215 self._save_dirs[os.path.normcase(path)] = save_dir
216
217 return save_dir.path
218
219 def _get_file_stash(self, path: str) -> str:
220 """Stashes a file.
221
222 If no root has been provided, one will be created for the directory
223 in the user's temp directory."""
224 path = os.path.normcase(path)
225 head, old_head = os.path.dirname(path), None
226 save_dir = None
227
228 while head != old_head:
229 try:
230 save_dir = self._save_dirs[head]
231 break
232 except KeyError:
233 pass
234 head, old_head = os.path.dirname(head), head
235 else:
236 # Did not find any suitable root
237 head = os.path.dirname(path)
238 save_dir = TempDirectory(kind="uninstall")
239 self._save_dirs[head] = save_dir
240
241 relpath = os.path.relpath(path, head)
242 if relpath and relpath != os.path.curdir:
243 return os.path.join(save_dir.path, relpath)
244 return save_dir.path
245
246 def stash(self, path: str) -> str:
247 """Stashes the directory or file and returns its new location.
248 Handle symlinks as files to avoid modifying the symlink targets.
249 """
250 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
251 if path_is_dir:
252 new_path = self._get_directory_stash(path)
253 else:
254 new_path = self._get_file_stash(path)
255
256 self._moves.append((path, new_path))
257 if path_is_dir and os.path.isdir(new_path):
258 # If we're moving a directory, we need to
259 # remove the destination first or else it will be
260 # moved to inside the existing directory.
261 # We just created new_path ourselves, so it will
262 # be removable.
263 os.rmdir(new_path)
264 renames(path, new_path)
265 return new_path
266
267 def commit(self) -> None:
268 """Commits the uninstall by removing stashed files."""
269 for save_dir in self._save_dirs.values():
270 save_dir.cleanup()
271 self._moves = []
272 self._save_dirs = {}
273
274 def rollback(self) -> None:
275 """Undoes the uninstall by moving stashed files back."""
276 for p in self._moves:
277 logger.info("Moving to %s\n from %s", *p)
278
279 for new_path, path in self._moves:
280 try:
281 logger.debug("Replacing %s from %s", new_path, path)
282 if os.path.isfile(new_path) or os.path.islink(new_path):
283 os.unlink(new_path)
284 elif os.path.isdir(new_path):
285 rmtree(new_path)
286 renames(path, new_path)
287 except OSError as ex:
288 logger.error("Failed to restore %s", new_path)
289 logger.debug("Exception: %s", ex)
290
291 self.commit()
292
293 @property
294 def can_rollback(self) -> bool:
295 return bool(self._moves)
296
297
298class UninstallPathSet:
299 """A set of file paths to be removed in the uninstallation of a
300 requirement."""
301
302 def __init__(self, dist: BaseDistribution) -> None:
303 self._paths: set[str] = set()
304 self._refuse: set[str] = set()
305 self._pth: dict[str, UninstallPthEntries] = {}
306 self._dist = dist
307 self._moved_paths = StashedUninstallPathSet()
308 # Create local cache of normalize_path results. Creating an UninstallPathSet
309 # can result in hundreds/thousands of redundant calls to normalize_path with
310 # the same args, which hurts performance.
311 self._normalize_path_cached = functools.lru_cache(normalize_path)
312
313 def _permitted(self, path: str) -> bool:
314 """
315 Return True if the given path is one we are permitted to
316 remove/modify, False otherwise.
317
318 """
319 # aka is_local, but caching normalized sys.prefix
320 if not running_under_virtualenv():
321 return True
322 return path.startswith(self._normalize_path_cached(sys.prefix))
323
324 def add(self, path: str) -> None:
325 head, tail = os.path.split(path)
326
327 # we normalize the head to resolve parent directory symlinks, but not
328 # the tail, since we only want to uninstall symlinks, not their targets
329 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail))
330
331 if not os.path.exists(path):
332 return
333 if self._permitted(path):
334 self._paths.add(path)
335 else:
336 self._refuse.add(path)
337
338 # __pycache__ files can show up after 'installed-files.txt' is created,
339 # due to imports
340 if os.path.splitext(path)[1] == ".py":
341 self.add(cache_from_source(path))
342
343 def add_pth(self, pth_file: str, entry: str) -> None:
344 pth_file = self._normalize_path_cached(pth_file)
345 if self._permitted(pth_file):
346 if pth_file not in self._pth:
347 self._pth[pth_file] = UninstallPthEntries(pth_file)
348 self._pth[pth_file].add(entry)
349 else:
350 self._refuse.add(pth_file)
351
352 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
353 """Remove paths in ``self._paths`` with confirmation (unless
354 ``auto_confirm`` is True)."""
355
356 if not self._paths:
357 logger.info(
358 "Can't uninstall '%s'. No files were found to uninstall.",
359 self._dist.raw_name,
360 )
361 return
362
363 dist_name_version = f"{self._dist.raw_name}-{self._dist.raw_version}"
364 logger.info("Uninstalling %s:", dist_name_version)
365
366 with indent_log():
367 if auto_confirm or self._allowed_to_proceed(verbose):
368 moved = self._moved_paths
369
370 for_rename = compress_for_rename(self._paths)
371
372 for path in sorted(compact(for_rename)):
373 moved.stash(path)
374 logger.verbose("Removing file or directory %s", path)
375
376 for pth in self._pth.values():
377 pth.remove()
378
379 logger.info("Successfully uninstalled %s", dist_name_version)
380
381 def _allowed_to_proceed(self, verbose: bool) -> bool:
382 """Display which files would be deleted and prompt for confirmation"""
383
384 def _display(msg: str, paths: Iterable[str]) -> None:
385 if not paths:
386 return
387
388 logger.info(msg)
389 with indent_log():
390 for path in sorted(compact(paths)):
391 logger.info(path)
392
393 if not verbose:
394 will_remove, will_skip = compress_for_output_listing(self._paths)
395 else:
396 # In verbose mode, display all the files that are going to be
397 # deleted.
398 will_remove = set(self._paths)
399 will_skip = set()
400
401 _display("Would remove:", will_remove)
402 _display("Would not remove (might be manually added):", will_skip)
403 _display("Would not remove (outside of prefix):", self._refuse)
404 if verbose:
405 _display("Will actually move:", compress_for_rename(self._paths))
406
407 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
408
409 def rollback(self) -> None:
410 """Rollback the changes previously made by remove()."""
411 if not self._moved_paths.can_rollback:
412 logger.error(
413 "Can't roll back %s; was not uninstalled",
414 self._dist.raw_name,
415 )
416 return
417 logger.info("Rolling back uninstall of %s", self._dist.raw_name)
418 self._moved_paths.rollback()
419 for pth in self._pth.values():
420 pth.rollback()
421
422 def commit(self) -> None:
423 """Remove temporary save dir: rollback will no longer be possible."""
424 self._moved_paths.commit()
425
426 @classmethod
427 def from_dist(cls, dist: BaseDistribution) -> UninstallPathSet:
428 dist_location = dist.location
429 info_location = dist.info_location
430 if dist_location is None:
431 logger.info(
432 "Not uninstalling %s since it is not installed",
433 dist.canonical_name,
434 )
435 return cls(dist)
436
437 normalized_dist_location = normalize_path(dist_location)
438 if not dist.local:
439 logger.info(
440 "Not uninstalling %s at %s, outside environment %s",
441 dist.canonical_name,
442 normalized_dist_location,
443 sys.prefix,
444 )
445 return cls(dist)
446
447 if normalized_dist_location in {
448 p
449 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
450 if p
451 }:
452 logger.info(
453 "Not uninstalling %s at %s, as it is in the standard library.",
454 dist.canonical_name,
455 normalized_dist_location,
456 )
457 return cls(dist)
458
459 paths_to_remove = cls(dist)
460 develop_egg_link = egg_link_path_from_location(dist.raw_name)
461
462 # Distribution is installed with metadata in a "flat" .egg-info
463 # directory. This means it is not a modern .dist-info installation, an
464 # egg, or legacy editable.
465 setuptools_flat_installation = (
466 dist.installed_with_setuptools_egg_info
467 and info_location is not None
468 and os.path.exists(info_location)
469 # If dist is editable and the location points to a ``.egg-info``,
470 # we are in fact in the legacy editable case.
471 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info")
472 )
473
474 # Uninstall cases order do matter as in the case of 2 installs of the
475 # same package, pip needs to uninstall the currently detected version
476 if setuptools_flat_installation:
477 if info_location is not None:
478 paths_to_remove.add(info_location)
479 installed_files = dist.iter_declared_entries()
480 if installed_files is not None:
481 for installed_file in installed_files:
482 paths_to_remove.add(os.path.join(dist_location, installed_file))
483 # FIXME: need a test for this elif block
484 # occurs with --single-version-externally-managed/--record outside
485 # of pip
486 elif dist.is_file("top_level.txt"):
487 try:
488 namespace_packages = dist.read_text("namespace_packages.txt")
489 except FileNotFoundError:
490 namespaces = []
491 else:
492 namespaces = namespace_packages.splitlines(keepends=False)
493 for top_level_pkg in [
494 p
495 for p in dist.read_text("top_level.txt").splitlines()
496 if p and p not in namespaces
497 ]:
498 path = os.path.join(dist_location, top_level_pkg)
499 paths_to_remove.add(path)
500 paths_to_remove.add(f"{path}.py")
501 paths_to_remove.add(f"{path}.pyc")
502 paths_to_remove.add(f"{path}.pyo")
503
504 elif dist.installed_by_distutils:
505 raise LegacyDistutilsInstall(distribution=dist)
506
507 elif dist.installed_as_egg:
508 # package installed by easy_install
509 # We cannot match on dist.egg_name because it can slightly vary
510 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
511 # XXX We use normalized_dist_location because dist_location my contain
512 # a trailing / if the distribution is a zipped egg
513 # (which is not a directory).
514 paths_to_remove.add(normalized_dist_location)
515 easy_install_egg = os.path.split(normalized_dist_location)[1]
516 easy_install_pth = os.path.join(
517 os.path.dirname(normalized_dist_location),
518 "easy-install.pth",
519 )
520 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
521
522 elif dist.installed_with_dist_info:
523 for path in uninstallation_paths(dist):
524 paths_to_remove.add(path)
525
526 elif develop_egg_link:
527 # PEP 660 modern editable is handled in the ``.dist-info`` case
528 # above, so this only covers the setuptools-style editable.
529 with open(develop_egg_link) as fh:
530 link_pointer = os.path.normcase(fh.readline().strip())
531 normalized_link_pointer = paths_to_remove._normalize_path_cached(
532 link_pointer
533 )
534 assert os.path.samefile(
535 normalized_link_pointer, normalized_dist_location
536 ), (
537 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match "
538 f"installed location of {dist.raw_name} (at {dist_location})"
539 )
540 paths_to_remove.add(develop_egg_link)
541 easy_install_pth = os.path.join(
542 os.path.dirname(develop_egg_link), "easy-install.pth"
543 )
544 paths_to_remove.add_pth(easy_install_pth, dist_location)
545
546 else:
547 logger.debug(
548 "Not sure how to uninstall: %s - Check: %s",
549 dist,
550 dist_location,
551 )
552
553 if dist.in_usersite:
554 bin_dir = get_bin_user()
555 else:
556 bin_dir = get_bin_prefix()
557
558 # find distutils scripts= scripts
559 try:
560 for script in dist.iter_distutils_script_names():
561 paths_to_remove.add(os.path.join(bin_dir, script))
562 if WINDOWS:
563 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
564 except (FileNotFoundError, NotADirectoryError):
565 pass
566
567 # find console_scripts and gui_scripts
568 def iter_scripts_to_remove(
569 dist: BaseDistribution,
570 bin_dir: str,
571 ) -> Generator[str, None, None]:
572 for entry_point in dist.iter_entry_points():
573 if entry_point.group == "console_scripts":
574 yield from _script_names(bin_dir, entry_point.name, False)
575 elif entry_point.group == "gui_scripts":
576 yield from _script_names(bin_dir, entry_point.name, True)
577
578 for s in iter_scripts_to_remove(dist, bin_dir):
579 paths_to_remove.add(s)
580
581 return paths_to_remove
582
583
584class UninstallPthEntries:
585 def __init__(self, pth_file: str) -> None:
586 self.file = pth_file
587 self.entries: set[str] = set()
588 self._saved_lines: list[bytes] | None = None
589
590 def add(self, entry: str) -> None:
591 entry = os.path.normcase(entry)
592 # On Windows, os.path.normcase converts the entry to use
593 # backslashes. This is correct for entries that describe absolute
594 # paths outside of site-packages, but all the others use forward
595 # slashes.
596 # os.path.splitdrive is used instead of os.path.isabs because isabs
597 # treats non-absolute paths with drive letter markings like c:foo\bar
598 # as absolute paths. It also does not recognize UNC paths if they don't
599 # have more than "\\sever\share". Valid examples: "\\server\share\" or
600 # "\\server\share\folder".
601 if WINDOWS and not os.path.splitdrive(entry)[0]:
602 entry = entry.replace("\\", "/")
603 self.entries.add(entry)
604
605 def remove(self) -> None:
606 logger.verbose("Removing pth entries from %s:", self.file)
607
608 # If the file doesn't exist, log a warning and return
609 if not os.path.isfile(self.file):
610 logger.warning("Cannot remove entries from nonexistent file %s", self.file)
611 return
612 with open(self.file, "rb") as fh:
613 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
614 lines = fh.readlines()
615 self._saved_lines = lines
616 if any(b"\r\n" in line for line in lines):
617 endline = "\r\n"
618 else:
619 endline = "\n"
620 # handle missing trailing newline
621 if lines and not lines[-1].endswith(endline.encode("utf-8")):
622 lines[-1] = lines[-1] + endline.encode("utf-8")
623 for entry in self.entries:
624 try:
625 logger.verbose("Removing entry: %s", entry)
626 lines.remove((entry + endline).encode("utf-8"))
627 except ValueError:
628 pass
629 with open(self.file, "wb") as fh:
630 fh.writelines(lines)
631
632 def rollback(self) -> bool:
633 if self._saved_lines is None:
634 logger.error("Cannot roll back changes to %s, none were made", self.file)
635 return False
636 logger.debug("Rolling %s back to previous state", self.file)
637 with open(self.file, "wb") as fh:
638 fh.writelines(self._saved_lines)
639 return True