1from __future__ import annotations
2
3import errno
4import itertools
5import logging
6import os.path
7import tempfile
8import traceback
9from collections.abc import Generator
10from contextlib import ExitStack, contextmanager
11from pathlib import Path
12from typing import (
13 Any,
14 Callable,
15 TypeVar,
16)
17
18from pip._internal.utils.misc import enum, rmtree
19
20logger = logging.getLogger(__name__)
21
22_T = TypeVar("_T", bound="TempDirectory")
23
24
25# Kinds of temporary directories. Only needed for ones that are
26# globally-managed.
27tempdir_kinds = enum(
28 BUILD_ENV="build-env",
29 EPHEM_WHEEL_CACHE="ephem-wheel-cache",
30 REQ_BUILD="req-build",
31)
32
33
34_tempdir_manager: ExitStack | None = None
35
36
37@contextmanager
38def global_tempdir_manager() -> Generator[None, None, None]:
39 global _tempdir_manager
40 with ExitStack() as stack:
41 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
42 try:
43 yield
44 finally:
45 _tempdir_manager = old_tempdir_manager
46
47
48class TempDirectoryTypeRegistry:
49 """Manages temp directory behavior"""
50
51 def __init__(self) -> None:
52 self._should_delete: dict[str, bool] = {}
53
54 def set_delete(self, kind: str, value: bool) -> None:
55 """Indicate whether a TempDirectory of the given kind should be
56 auto-deleted.
57 """
58 self._should_delete[kind] = value
59
60 def get_delete(self, kind: str) -> bool:
61 """Get configured auto-delete flag for a given TempDirectory type,
62 default True.
63 """
64 return self._should_delete.get(kind, True)
65
66
67_tempdir_registry: TempDirectoryTypeRegistry | None = None
68
69
70@contextmanager
71def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
72 """Provides a scoped global tempdir registry that can be used to dictate
73 whether directories should be deleted.
74 """
75 global _tempdir_registry
76 old_tempdir_registry = _tempdir_registry
77 _tempdir_registry = TempDirectoryTypeRegistry()
78 try:
79 yield _tempdir_registry
80 finally:
81 _tempdir_registry = old_tempdir_registry
82
83
84class _Default:
85 pass
86
87
88_default = _Default()
89
90
91class TempDirectory:
92 """Helper class that owns and cleans up a temporary directory.
93
94 This class can be used as a context manager or as an OO representation of a
95 temporary directory.
96
97 Attributes:
98 path
99 Location to the created temporary directory
100 delete
101 Whether the directory should be deleted when exiting
102 (when used as a contextmanager)
103
104 Methods:
105 cleanup()
106 Deletes the temporary directory
107
108 When used as a context manager, if the delete attribute is True, on
109 exiting the context the temporary directory is deleted.
110 """
111
112 def __init__(
113 self,
114 path: str | None = None,
115 delete: bool | None | _Default = _default,
116 kind: str = "temp",
117 globally_managed: bool = False,
118 ignore_cleanup_errors: bool = True,
119 ):
120 super().__init__()
121
122 if delete is _default:
123 if path is not None:
124 # If we were given an explicit directory, resolve delete option
125 # now.
126 delete = False
127 else:
128 # Otherwise, we wait until cleanup and see what
129 # tempdir_registry says.
130 delete = None
131
132 # The only time we specify path is in for editables where it
133 # is the value of the --src option.
134 if path is None:
135 path = self._create(kind)
136
137 self._path = path
138 self._deleted = False
139 self.delete = delete
140 self.kind = kind
141 self.ignore_cleanup_errors = ignore_cleanup_errors
142
143 if globally_managed:
144 assert _tempdir_manager is not None
145 _tempdir_manager.enter_context(self)
146
147 @property
148 def path(self) -> str:
149 assert not self._deleted, f"Attempted to access deleted path: {self._path}"
150 return self._path
151
152 def __repr__(self) -> str:
153 return f"<{self.__class__.__name__} {self.path!r}>"
154
155 def __enter__(self: _T) -> _T:
156 return self
157
158 def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
159 if self.delete is not None:
160 delete = self.delete
161 elif _tempdir_registry:
162 delete = _tempdir_registry.get_delete(self.kind)
163 else:
164 delete = True
165
166 if delete:
167 self.cleanup()
168
169 def _create(self, kind: str) -> str:
170 """Create a temporary directory and store its path in self.path"""
171 # We realpath here because some systems have their default tmpdir
172 # symlinked to another directory. This tends to confuse build
173 # scripts, so we canonicalize the path by traversing potential
174 # symlinks here.
175 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
176 logger.debug("Created temporary directory: %s", path)
177 return path
178
179 def cleanup(self) -> None:
180 """Remove the temporary directory created and reset state"""
181 self._deleted = True
182 if not os.path.exists(self._path):
183 return
184
185 errors: list[BaseException] = []
186
187 def onerror(
188 func: Callable[..., Any],
189 path: Path,
190 exc_val: BaseException,
191 ) -> None:
192 """Log a warning for a `rmtree` error and continue"""
193 formatted_exc = "\n".join(
194 traceback.format_exception_only(type(exc_val), exc_val)
195 )
196 formatted_exc = formatted_exc.rstrip() # remove trailing new line
197 if func in (os.unlink, os.remove, os.rmdir):
198 logger.debug(
199 "Failed to remove a temporary file '%s' due to %s.\n",
200 path,
201 formatted_exc,
202 )
203 else:
204 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc)
205 errors.append(exc_val)
206
207 if self.ignore_cleanup_errors:
208 try:
209 # first try with @retry; retrying to handle ephemeral errors
210 rmtree(self._path, ignore_errors=False)
211 except OSError:
212 # last pass ignore/log all errors
213 rmtree(self._path, onexc=onerror)
214 if errors:
215 logger.warning(
216 "Failed to remove contents in a temporary directory '%s'.\n"
217 "You can safely remove it manually.",
218 self._path,
219 )
220 else:
221 rmtree(self._path)
222
223
224class AdjacentTempDirectory(TempDirectory):
225 """Helper class that creates a temporary directory adjacent to a real one.
226
227 Attributes:
228 original
229 The original directory to create a temp directory for.
230 path
231 After calling create() or entering, contains the full
232 path to the temporary directory.
233 delete
234 Whether the directory should be deleted when exiting
235 (when used as a contextmanager)
236
237 """
238
239 # The characters that may be used to name the temp directory
240 # We always prepend a ~ and then rotate through these until
241 # a usable name is found.
242 # pkg_resources raises a different error for .dist-info folder
243 # with leading '-' and invalid metadata
244 LEADING_CHARS = "-~.=%0123456789"
245
246 def __init__(self, original: str, delete: bool | None = None) -> None:
247 self.original = original.rstrip("/\\")
248 super().__init__(delete=delete)
249
250 @classmethod
251 def _generate_names(cls, name: str) -> Generator[str, None, None]:
252 """Generates a series of temporary names.
253
254 The algorithm replaces the leading characters in the name
255 with ones that are valid filesystem characters, but are not
256 valid package names (for both Python and pip definitions of
257 package).
258 """
259 for i in range(1, len(name)):
260 for candidate in itertools.combinations_with_replacement(
261 cls.LEADING_CHARS, i - 1
262 ):
263 new_name = "~" + "".join(candidate) + name[i:]
264 if new_name != name:
265 yield new_name
266
267 # If we make it this far, we will have to make a longer name
268 for i in range(len(cls.LEADING_CHARS)):
269 for candidate in itertools.combinations_with_replacement(
270 cls.LEADING_CHARS, i
271 ):
272 new_name = "~" + "".join(candidate) + name
273 if new_name != name:
274 yield new_name
275
276 def _create(self, kind: str) -> str:
277 root, name = os.path.split(self.original)
278 for candidate in self._generate_names(name):
279 path = os.path.join(root, candidate)
280 try:
281 os.mkdir(path)
282 except OSError as ex:
283 # Continue if the name exists already
284 if ex.errno != errno.EEXIST:
285 raise
286 else:
287 path = os.path.realpath(path)
288 break
289 else:
290 # Final fallback on the default behavior.
291 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
292
293 logger.debug("Created temporary directory: %s", path)
294 return path