1from __future__ import annotations
2
3import functools
4import os
5import sys
6import sysconfig
7from collections.abc import Generator, Iterable
8from typing import Any, Callable
9
10from pip._internal.exceptions import LegacyDistutilsInstall, UninstallMissingRecord
11from pip._internal.locations import get_bin_prefix, get_bin_user
12from pip._internal.metadata import BaseDistribution
13from pip._internal.utils.compat import WINDOWS
14from pip._internal.utils.egg_link import egg_link_path_from_location
15from pip._internal.utils.logging import getLogger, indent_log
16from pip._internal.utils.misc import ask, normalize_path, renames, rmtree
17from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
18from pip._internal.utils.virtualenv import running_under_virtualenv
19
20logger = getLogger(__name__)
21
22
23def _script_names(
24 bin_dir: str, script_name: str, is_gui: bool
25) -> Generator[str, None, None]:
26 """Create the fully qualified name of the files created by
27 {console,gui}_scripts for the given ``dist``.
28 Returns the list of file names
29 """
30 exe_name = os.path.join(bin_dir, script_name)
31 yield exe_name
32 if not WINDOWS:
33 return
34 yield f"{exe_name}.exe"
35 yield f"{exe_name}.exe.manifest"
36 if is_gui:
37 yield f"{exe_name}-script.pyw"
38 else:
39 yield f"{exe_name}-script.py"
40
41
42def _unique(
43 fn: Callable[..., Generator[Any, None, None]],
44) -> Callable[..., Generator[Any, None, None]]:
45 @functools.wraps(fn)
46 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
47 seen: set[Any] = set()
48 for item in fn(*args, **kw):
49 if item not in seen:
50 seen.add(item)
51 yield item
52
53 return unique
54
55
56@_unique
57def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
58 """
59 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
60
61 Yield paths to all the files in RECORD. For each .py file in RECORD, add
62 the .pyc and .pyo in the same directory.
63
64 UninstallPathSet.add() takes care of the __pycache__ .py[co].
65
66 If RECORD is not found, raises an error,
67 with possible information from the INSTALLER file.
68
69 https://packaging.python.org/specifications/recording-installed-packages/
70 """
71 location = dist.location
72 assert location is not None, "not installed"
73
74 entries = dist.iter_declared_entries()
75 if entries is None:
76 raise UninstallMissingRecord(distribution=dist)
77
78 for entry in entries:
79 path = os.path.join(location, entry)
80 yield path
81 if path.endswith(".py"):
82 dn, fn = os.path.split(path)
83 base = fn[:-3]
84 path = os.path.join(dn, base + ".pyc")
85 yield path
86 path = os.path.join(dn, base + ".pyo")
87 yield path
88
89
90def compact(paths: Iterable[str]) -> set[str]:
91 """Compact a path set to contain the minimal number of paths
92 necessary to contain all paths in the set. If /a/path/ and
93 /a/path/to/a/file.txt are both in the set, leave only the
94 shorter path."""
95
96 sep = os.path.sep
97 short_paths: set[str] = set()
98 for path in sorted(paths, key=len):
99 should_skip = any(
100 path.startswith(shortpath.rstrip("*"))
101 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
102 for shortpath in short_paths
103 )
104 if not should_skip:
105 short_paths.add(path)
106 return short_paths
107
108
109def compress_for_rename(paths: Iterable[str]) -> set[str]:
110 """Returns a set containing the paths that need to be renamed.
111
112 This set may include directories when the original sequence of paths
113 included every file on disk.
114 """
115 case_map = {os.path.normcase(p): p for p in paths}
116 remaining = set(case_map)
117 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
118 wildcards: set[str] = set()
119
120 def norm_join(*a: str) -> str:
121 return os.path.normcase(os.path.join(*a))
122
123 for root in unchecked:
124 if any(os.path.normcase(root).startswith(w) for w in wildcards):
125 # This directory has already been handled.
126 continue
127
128 all_files: set[str] = set()
129 all_subdirs: set[str] = set()
130 for dirname, subdirs, files in os.walk(root):
131 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
132 all_files.update(norm_join(root, dirname, f) for f in files)
133 # If all the files we found are in our remaining set of files to
134 # remove, then remove them from the latter set and add a wildcard
135 # for the directory.
136 if not (all_files - remaining):
137 remaining.difference_update(all_files)
138 wildcards.add(root + os.sep)
139
140 return set(map(case_map.__getitem__, remaining)) | wildcards
141
142
143def compress_for_output_listing(paths: Iterable[str]) -> tuple[set[str], set[str]]:
144 """Returns a tuple of 2 sets of which paths to display to user
145
146 The first set contains paths that would be deleted. Files of a package
147 are not added and the top-level directory of the package has a '*' added
148 at the end - to signify that all it's contents are removed.
149
150 The second set contains files that would have been skipped in the above
151 folders.
152 """
153
154 will_remove = set(paths)
155 will_skip = set()
156
157 # Determine folders and files
158 folders = set()
159 files = set()
160 for path in will_remove:
161 if path.endswith(".pyc"):
162 continue
163 if path.endswith("__init__.py") or ".dist-info" in path:
164 folders.add(os.path.dirname(path))
165 files.add(path)
166
167 _normcased_files = set(map(os.path.normcase, files))
168
169 folders = compact(folders)
170
171 # This walks the tree using os.walk to not miss extra folders
172 # that might get added.
173 for folder in folders:
174 for dirpath, _, dirfiles in os.walk(folder):
175 for fname in dirfiles:
176 if fname.endswith(".pyc"):
177 continue
178
179 file_ = os.path.join(dirpath, fname)
180 if (
181 os.path.isfile(file_)
182 and os.path.normcase(file_) not in _normcased_files
183 ):
184 # We are skipping this file. Add it to the set.
185 will_skip.add(file_)
186
187 will_remove = files | {os.path.join(folder, "*") for folder in folders}
188
189 return will_remove, will_skip
190
191
192class StashedUninstallPathSet:
193 """A set of file rename operations to stash files while
194 tentatively uninstalling them."""
195
196 def __init__(self) -> None:
197 # Mapping from source file root to [Adjacent]TempDirectory
198 # for files under that directory.
199 self._save_dirs: dict[str, TempDirectory] = {}
200 # (old path, new path) tuples for each move that may need
201 # to be undone.
202 self._moves: list[tuple[str, str]] = []
203
204 def _get_directory_stash(self, path: str) -> str:
205 """Stashes a directory.
206
207 Directories are stashed adjacent to their original location if
208 possible, or else moved/copied into the user's temp dir."""
209
210 try:
211 save_dir: TempDirectory = AdjacentTempDirectory(path)
212 except OSError:
213 save_dir = TempDirectory(kind="uninstall")
214 self._save_dirs[os.path.normcase(path)] = save_dir
215
216 return save_dir.path
217
218 def _get_file_stash(self, path: str) -> str:
219 """Stashes a file.
220
221 If no root has been provided, one will be created for the directory
222 in the user's temp directory."""
223 path = os.path.normcase(path)
224 head, old_head = os.path.dirname(path), None
225 save_dir = None
226
227 while head != old_head:
228 try:
229 save_dir = self._save_dirs[head]
230 break
231 except KeyError:
232 pass
233 head, old_head = os.path.dirname(head), head
234 else:
235 # Did not find any suitable root
236 head = os.path.dirname(path)
237 save_dir = TempDirectory(kind="uninstall")
238 self._save_dirs[head] = save_dir
239
240 relpath = os.path.relpath(path, head)
241 if relpath and relpath != os.path.curdir:
242 return os.path.join(save_dir.path, relpath)
243 return save_dir.path
244
245 def stash(self, path: str) -> str:
246 """Stashes the directory or file and returns its new location.
247 Handle symlinks as files to avoid modifying the symlink targets.
248 """
249 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
250 if path_is_dir:
251 new_path = self._get_directory_stash(path)
252 else:
253 new_path = self._get_file_stash(path)
254
255 self._moves.append((path, new_path))
256 if path_is_dir and os.path.isdir(new_path):
257 # If we're moving a directory, we need to
258 # remove the destination first or else it will be
259 # moved to inside the existing directory.
260 # We just created new_path ourselves, so it will
261 # be removable.
262 os.rmdir(new_path)
263 renames(path, new_path)
264 return new_path
265
266 def commit(self) -> None:
267 """Commits the uninstall by removing stashed files."""
268 for save_dir in self._save_dirs.values():
269 save_dir.cleanup()
270 self._moves = []
271 self._save_dirs = {}
272
273 def rollback(self) -> None:
274 """Undoes the uninstall by moving stashed files back."""
275 for p in self._moves:
276 logger.info("Moving to %s\n from %s", *p)
277
278 for new_path, path in self._moves:
279 try:
280 logger.debug("Replacing %s from %s", new_path, path)
281 if os.path.isfile(new_path) or os.path.islink(new_path):
282 os.unlink(new_path)
283 elif os.path.isdir(new_path):
284 rmtree(new_path)
285 renames(path, new_path)
286 except OSError as ex:
287 logger.error("Failed to restore %s", new_path)
288 logger.debug("Exception: %s", ex)
289
290 self.commit()
291
292 @property
293 def can_rollback(self) -> bool:
294 return bool(self._moves)
295
296
297class UninstallPathSet:
298 """A set of file paths to be removed in the uninstallation of a
299 requirement."""
300
301 def __init__(self, dist: BaseDistribution) -> None:
302 self._paths: set[str] = set()
303 self._refuse: set[str] = set()
304 self._pth: dict[str, UninstallPthEntries] = {}
305 self._dist = dist
306 self._moved_paths = StashedUninstallPathSet()
307 # Create local cache of normalize_path results. Creating an UninstallPathSet
308 # can result in hundreds/thousands of redundant calls to normalize_path with
309 # the same args, which hurts performance.
310 self._normalize_path_cached = functools.lru_cache(normalize_path)
311
312 def _permitted(self, path: str) -> bool:
313 """
314 Return True if the given path is one we are permitted to
315 remove/modify, False otherwise.
316
317 """
318 # aka is_local, but caching normalized sys.prefix
319 if not running_under_virtualenv():
320 return True
321 return path.startswith(self._normalize_path_cached(sys.prefix))
322
323 def add(self, path: str) -> None:
324 head, tail = os.path.split(path)
325
326 # we normalize the head to resolve parent directory symlinks, but not
327 # the tail, since we only want to uninstall symlinks, not their targets
328 path = os.path.join(self._normalize_path_cached(head), os.path.normcase(tail))
329
330 if not os.path.exists(path):
331 return
332 if self._permitted(path):
333 self._paths.add(path)
334 else:
335 self._refuse.add(path)
336
337 # __pycache__ files can show up after 'installed-files.txt' is created,
338 # due to imports
339 # Add the adjacent __pycache__ directory to the UninstallPathSet when a
340 # .py file is removed. We do this to avoid the risk of orphaned .pyc
341 # files created by a different interpreter version than the one running
342 # pip at the time of package installation and uninstallation or an
343 # interpreter run at a different optimization level (PYTHONOPTIMIZE).
344 if os.path.splitext(path)[1] == ".py":
345 pycache = os.path.join(os.path.dirname(path), "__pycache__")
346 self.add(pycache)
347
348 def add_pth(self, pth_file: str, entry: str) -> None:
349 pth_file = self._normalize_path_cached(pth_file)
350 if self._permitted(pth_file):
351 if pth_file not in self._pth:
352 self._pth[pth_file] = UninstallPthEntries(pth_file)
353 self._pth[pth_file].add(entry)
354 else:
355 self._refuse.add(pth_file)
356
357 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
358 """Remove paths in ``self._paths`` with confirmation (unless
359 ``auto_confirm`` is True)."""
360
361 if not self._paths:
362 logger.info(
363 "Can't uninstall '%s'. No files were found to uninstall.",
364 self._dist.raw_name,
365 )
366 return
367
368 dist_name_version = f"{self._dist.raw_name}-{self._dist.raw_version}"
369 logger.info("Uninstalling %s:", dist_name_version)
370
371 with indent_log():
372 if auto_confirm or self._allowed_to_proceed(verbose):
373 moved = self._moved_paths
374
375 for_rename = compress_for_rename(self._paths)
376
377 for path in sorted(compact(for_rename)):
378 moved.stash(path)
379 logger.verbose("Removing file or directory %s", path)
380
381 for pth in self._pth.values():
382 pth.remove()
383
384 logger.info("Successfully uninstalled %s", dist_name_version)
385
386 def _allowed_to_proceed(self, verbose: bool) -> bool:
387 """Display which files would be deleted and prompt for confirmation"""
388
389 def _display(msg: str, paths: Iterable[str]) -> None:
390 if not paths:
391 return
392
393 logger.info(msg)
394 with indent_log():
395 for path in sorted(compact(paths)):
396 logger.info(path)
397
398 if not verbose:
399 will_remove, will_skip = compress_for_output_listing(self._paths)
400 else:
401 # In verbose mode, display all the files that are going to be
402 # deleted.
403 will_remove = set(self._paths)
404 will_skip = set()
405
406 _display("Would remove:", will_remove)
407 _display("Would not remove (might be manually added):", will_skip)
408 _display("Would not remove (outside of prefix):", self._refuse)
409 if verbose:
410 _display("Will actually move:", compress_for_rename(self._paths))
411
412 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
413
414 def rollback(self) -> None:
415 """Rollback the changes previously made by remove()."""
416 if not self._moved_paths.can_rollback:
417 logger.error(
418 "Can't roll back %s; was not uninstalled",
419 self._dist.raw_name,
420 )
421 return
422 logger.info("Rolling back uninstall of %s", self._dist.raw_name)
423 self._moved_paths.rollback()
424 for pth in self._pth.values():
425 pth.rollback()
426
427 def commit(self) -> None:
428 """Remove temporary save dir: rollback will no longer be possible."""
429 self._moved_paths.commit()
430
431 @classmethod
432 def from_dist(cls, dist: BaseDistribution) -> UninstallPathSet:
433 dist_location = dist.location
434 info_location = dist.info_location
435 if dist_location is None:
436 logger.info(
437 "Not uninstalling %s since it is not installed",
438 dist.canonical_name,
439 )
440 return cls(dist)
441
442 normalized_dist_location = normalize_path(dist_location)
443 if not dist.local:
444 logger.info(
445 "Not uninstalling %s at %s, outside environment %s",
446 dist.canonical_name,
447 normalized_dist_location,
448 sys.prefix,
449 )
450 return cls(dist)
451
452 if normalized_dist_location in {
453 p
454 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
455 if p
456 }:
457 logger.info(
458 "Not uninstalling %s at %s, as it is in the standard library.",
459 dist.canonical_name,
460 normalized_dist_location,
461 )
462 return cls(dist)
463
464 paths_to_remove = cls(dist)
465 develop_egg_link = egg_link_path_from_location(dist.raw_name)
466
467 # Distribution is installed with metadata in a "flat" .egg-info
468 # directory. This means it is not a modern .dist-info installation, an
469 # egg, or legacy editable.
470 setuptools_flat_installation = (
471 dist.installed_with_setuptools_egg_info
472 and info_location is not None
473 and os.path.exists(info_location)
474 # If dist is editable and the location points to a ``.egg-info``,
475 # we are in fact in the legacy editable case.
476 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info")
477 )
478
479 # Uninstall cases order do matter as in the case of 2 installs of the
480 # same package, pip needs to uninstall the currently detected version
481 if setuptools_flat_installation:
482 if info_location is not None:
483 paths_to_remove.add(info_location)
484 installed_files = dist.iter_declared_entries()
485 if installed_files is not None:
486 for installed_file in installed_files:
487 paths_to_remove.add(os.path.join(dist_location, installed_file))
488 # FIXME: need a test for this elif block
489 # occurs with --single-version-externally-managed/--record outside
490 # of pip
491 elif dist.is_file("top_level.txt"):
492 try:
493 namespace_packages = dist.read_text("namespace_packages.txt")
494 except FileNotFoundError:
495 namespaces = []
496 else:
497 namespaces = namespace_packages.splitlines(keepends=False)
498 for top_level_pkg in [
499 p
500 for p in dist.read_text("top_level.txt").splitlines()
501 if p and p not in namespaces
502 ]:
503 path = os.path.join(dist_location, top_level_pkg)
504 paths_to_remove.add(path)
505 paths_to_remove.add(f"{path}.py")
506 paths_to_remove.add(f"{path}.pyc")
507 paths_to_remove.add(f"{path}.pyo")
508
509 elif dist.installed_by_distutils:
510 raise LegacyDistutilsInstall(distribution=dist)
511
512 elif dist.installed_as_egg:
513 # package installed by easy_install
514 # We cannot match on dist.egg_name because it can slightly vary
515 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
516 # XXX We use normalized_dist_location because dist_location my contain
517 # a trailing / if the distribution is a zipped egg
518 # (which is not a directory).
519 paths_to_remove.add(normalized_dist_location)
520 easy_install_egg = os.path.split(normalized_dist_location)[1]
521 easy_install_pth = os.path.join(
522 os.path.dirname(normalized_dist_location),
523 "easy-install.pth",
524 )
525 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
526
527 elif dist.installed_with_dist_info:
528 for path in uninstallation_paths(dist):
529 paths_to_remove.add(path)
530
531 elif develop_egg_link:
532 # PEP 660 modern editable is handled in the ``.dist-info`` case
533 # above, so this only covers the setuptools-style editable.
534 with open(develop_egg_link) as fh:
535 link_pointer = os.path.normcase(fh.readline().strip())
536 normalized_link_pointer = paths_to_remove._normalize_path_cached(
537 link_pointer
538 )
539 assert os.path.samefile(
540 normalized_link_pointer, normalized_dist_location
541 ), (
542 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match "
543 f"installed location of {dist.raw_name} (at {dist_location})"
544 )
545 paths_to_remove.add(develop_egg_link)
546 easy_install_pth = os.path.join(
547 os.path.dirname(develop_egg_link), "easy-install.pth"
548 )
549 paths_to_remove.add_pth(easy_install_pth, dist_location)
550
551 else:
552 logger.debug(
553 "Not sure how to uninstall: %s - Check: %s",
554 dist,
555 dist_location,
556 )
557
558 if dist.in_usersite:
559 bin_dir = get_bin_user()
560 else:
561 bin_dir = get_bin_prefix()
562
563 # find distutils scripts= scripts
564 try:
565 for script in dist.iter_distutils_script_names():
566 paths_to_remove.add(os.path.join(bin_dir, script))
567 if WINDOWS:
568 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
569 except (FileNotFoundError, NotADirectoryError):
570 pass
571
572 # find console_scripts and gui_scripts
573 def iter_scripts_to_remove(
574 dist: BaseDistribution,
575 bin_dir: str,
576 ) -> Generator[str, None, None]:
577 for entry_point in dist.iter_entry_points():
578 if entry_point.group == "console_scripts":
579 yield from _script_names(bin_dir, entry_point.name, False)
580 elif entry_point.group == "gui_scripts":
581 yield from _script_names(bin_dir, entry_point.name, True)
582
583 for s in iter_scripts_to_remove(dist, bin_dir):
584 paths_to_remove.add(s)
585
586 return paths_to_remove
587
588
589class UninstallPthEntries:
590 def __init__(self, pth_file: str) -> None:
591 self.file = pth_file
592 self.entries: set[str] = set()
593 self._saved_lines: list[bytes] | None = None
594
595 def add(self, entry: str) -> None:
596 entry = os.path.normcase(entry)
597 # On Windows, os.path.normcase converts the entry to use
598 # backslashes. This is correct for entries that describe absolute
599 # paths outside of site-packages, but all the others use forward
600 # slashes.
601 # os.path.splitdrive is used instead of os.path.isabs because isabs
602 # treats non-absolute paths with drive letter markings like c:foo\bar
603 # as absolute paths. It also does not recognize UNC paths if they don't
604 # have more than "\\sever\share". Valid examples: "\\server\share\" or
605 # "\\server\share\folder".
606 if WINDOWS and not os.path.splitdrive(entry)[0]:
607 entry = entry.replace("\\", "/")
608 self.entries.add(entry)
609
610 def remove(self) -> None:
611 logger.verbose("Removing pth entries from %s:", self.file)
612
613 # If the file doesn't exist, log a warning and return
614 if not os.path.isfile(self.file):
615 logger.warning("Cannot remove entries from nonexistent file %s", self.file)
616 return
617 with open(self.file, "rb") as fh:
618 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
619 lines = fh.readlines()
620 self._saved_lines = lines
621 if any(b"\r\n" in line for line in lines):
622 endline = "\r\n"
623 else:
624 endline = "\n"
625 # handle missing trailing newline
626 if lines and not lines[-1].endswith(endline.encode("utf-8")):
627 lines[-1] = lines[-1] + endline.encode("utf-8")
628 for entry in self.entries:
629 try:
630 logger.verbose("Removing entry: %s", entry)
631 lines.remove((entry + endline).encode("utf-8"))
632 except ValueError:
633 pass
634 with open(self.file, "wb") as fh:
635 fh.writelines(lines)
636
637 def rollback(self) -> bool:
638 if self._saved_lines is None:
639 logger.error("Cannot roll back changes to %s, none were made", self.file)
640 return False
641 logger.debug("Rolling %s back to previous state", self.file)
642 with open(self.file, "wb") as fh:
643 fh.writelines(self._saved_lines)
644 return True