1from __future__ import annotations
2
3import errno
4import itertools
5import logging
6import os.path
7import tempfile
8import traceback
9from collections.abc import Callable, Generator
10from contextlib import ExitStack, contextmanager
11from pathlib import Path
12from typing import (
13 Any,
14 TypeVar,
15)
16
17from pip._internal.utils.misc import enum, rmtree
18
19logger = logging.getLogger(__name__)
20
21_T = TypeVar("_T", bound="TempDirectory")
22
23
24# Kinds of temporary directories. Only needed for ones that are
25# globally-managed.
26tempdir_kinds = enum(
27 BUILD_ENV="build-env",
28 EPHEM_WHEEL_CACHE="ephem-wheel-cache",
29 REQ_BUILD="req-build",
30)
31
32
33_tempdir_manager: ExitStack | None = None
34
35
36@contextmanager
37def global_tempdir_manager() -> Generator[None, None, None]:
38 global _tempdir_manager
39 with ExitStack() as stack:
40 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
41 try:
42 yield
43 finally:
44 _tempdir_manager = old_tempdir_manager
45
46
47class TempDirectoryTypeRegistry:
48 """Manages temp directory behavior"""
49
50 def __init__(self) -> None:
51 self._should_delete: dict[str, bool] = {}
52
53 def set_delete(self, kind: str, value: bool) -> None:
54 """Indicate whether a TempDirectory of the given kind should be
55 auto-deleted.
56 """
57 self._should_delete[kind] = value
58
59 def get_delete(self, kind: str) -> bool:
60 """Get configured auto-delete flag for a given TempDirectory type,
61 default True.
62 """
63 return self._should_delete.get(kind, True)
64
65
66_tempdir_registry: TempDirectoryTypeRegistry | None = None
67
68
69@contextmanager
70def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
71 """Provides a scoped global tempdir registry that can be used to dictate
72 whether directories should be deleted.
73 """
74 global _tempdir_registry
75 old_tempdir_registry = _tempdir_registry
76 _tempdir_registry = TempDirectoryTypeRegistry()
77 try:
78 yield _tempdir_registry
79 finally:
80 _tempdir_registry = old_tempdir_registry
81
82
83class _Default:
84 pass
85
86
87_default = _Default()
88
89
90class TempDirectory:
91 """Helper class that owns and cleans up a temporary directory.
92
93 This class can be used as a context manager or as an OO representation of a
94 temporary directory.
95
96 Attributes:
97 path
98 Location to the created temporary directory
99 delete
100 Whether the directory should be deleted when exiting
101 (when used as a contextmanager)
102
103 Methods:
104 cleanup()
105 Deletes the temporary directory
106
107 When used as a context manager, if the delete attribute is True, on
108 exiting the context the temporary directory is deleted.
109 """
110
111 def __init__(
112 self,
113 path: str | None = None,
114 delete: bool | None | _Default = _default,
115 kind: str = "temp",
116 globally_managed: bool = False,
117 ignore_cleanup_errors: bool = True,
118 ):
119 super().__init__()
120
121 if delete is _default:
122 if path is not None:
123 # If we were given an explicit directory, resolve delete option
124 # now.
125 delete = False
126 else:
127 # Otherwise, we wait until cleanup and see what
128 # tempdir_registry says.
129 delete = None
130
131 # The only time we specify path is in for editables where it
132 # is the value of the --src option.
133 if path is None:
134 path = self._create(kind)
135
136 self._path = path
137 self._deleted = False
138 self.delete = delete
139 self.kind = kind
140 self.ignore_cleanup_errors = ignore_cleanup_errors
141
142 if globally_managed:
143 assert _tempdir_manager is not None
144 _tempdir_manager.enter_context(self)
145
146 @property
147 def path(self) -> str:
148 assert not self._deleted, f"Attempted to access deleted path: {self._path}"
149 return self._path
150
151 def __repr__(self) -> str:
152 return f"<{self.__class__.__name__} {self.path!r}>"
153
154 def __enter__(self: _T) -> _T:
155 return self
156
157 def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
158 if self.delete is not None:
159 delete = self.delete
160 elif _tempdir_registry:
161 delete = _tempdir_registry.get_delete(self.kind)
162 else:
163 delete = True
164
165 if delete:
166 self.cleanup()
167
168 def _create(self, kind: str) -> str:
169 """Create a temporary directory and store its path in self.path"""
170 # We realpath here because some systems have their default tmpdir
171 # symlinked to another directory. This tends to confuse build
172 # scripts, so we canonicalize the path by traversing potential
173 # symlinks here.
174 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
175 logger.debug("Created temporary directory: %s", path)
176 return path
177
178 def cleanup(self) -> None:
179 """Remove the temporary directory created and reset state"""
180 self._deleted = True
181 if not os.path.exists(self._path):
182 return
183
184 errors: list[BaseException] = []
185
186 def onerror(
187 func: Callable[..., Any],
188 path: Path,
189 exc_val: BaseException,
190 ) -> None:
191 """Log a warning for a `rmtree` error and continue"""
192 formatted_exc = "\n".join(
193 traceback.format_exception_only(type(exc_val), exc_val)
194 )
195 formatted_exc = formatted_exc.rstrip() # remove trailing new line
196 if func in (os.unlink, os.remove, os.rmdir):
197 logger.debug(
198 "Failed to remove a temporary file '%s' due to %s.\n",
199 path,
200 formatted_exc,
201 )
202 else:
203 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc)
204 errors.append(exc_val)
205
206 if self.ignore_cleanup_errors:
207 try:
208 # first try with @retry; retrying to handle ephemeral errors
209 rmtree(self._path, ignore_errors=False)
210 except OSError:
211 # last pass ignore/log all errors
212 rmtree(self._path, onexc=onerror)
213 if errors:
214 logger.warning(
215 "Failed to remove contents in a temporary directory '%s'.\n"
216 "You can safely remove it manually.",
217 self._path,
218 )
219 else:
220 rmtree(self._path)
221
222
223class AdjacentTempDirectory(TempDirectory):
224 """Helper class that creates a temporary directory adjacent to a real one.
225
226 Attributes:
227 original
228 The original directory to create a temp directory for.
229 path
230 After calling create() or entering, contains the full
231 path to the temporary directory.
232 delete
233 Whether the directory should be deleted when exiting
234 (when used as a contextmanager)
235
236 """
237
238 # The characters that may be used to name the temp directory
239 # We always prepend a ~ and then rotate through these until
240 # a usable name is found.
241 # pkg_resources raises a different error for .dist-info folder
242 # with leading '-' and invalid metadata
243 LEADING_CHARS = "-~.=%0123456789"
244
245 def __init__(self, original: str, delete: bool | None = None) -> None:
246 self.original = original.rstrip("/\\")
247 super().__init__(delete=delete)
248
249 @classmethod
250 def _generate_names(cls, name: str) -> Generator[str, None, None]:
251 """Generates a series of temporary names.
252
253 The algorithm replaces the leading characters in the name
254 with ones that are valid filesystem characters, but are not
255 valid package names (for both Python and pip definitions of
256 package).
257 """
258 for i in range(1, len(name)):
259 for candidate in itertools.combinations_with_replacement(
260 cls.LEADING_CHARS, i - 1
261 ):
262 new_name = "~" + "".join(candidate) + name[i:]
263 if new_name != name:
264 yield new_name
265
266 # If we make it this far, we will have to make a longer name
267 for i in range(len(cls.LEADING_CHARS)):
268 for candidate in itertools.combinations_with_replacement(
269 cls.LEADING_CHARS, i
270 ):
271 new_name = "~" + "".join(candidate) + name
272 if new_name != name:
273 yield new_name
274
275 def _create(self, kind: str) -> str:
276 root, name = os.path.split(self.original)
277 for candidate in self._generate_names(name):
278 path = os.path.join(root, candidate)
279 try:
280 os.mkdir(path)
281 except OSError as ex:
282 # Continue if the name exists already
283 if ex.errno != errno.EEXIST:
284 raise
285 else:
286 path = os.path.realpath(path)
287 break
288 else:
289 # Final fallback on the default behavior.
290 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
291
292 logger.debug("Created temporary directory: %s", path)
293 return path