Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py: 32%
127 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1import errno
2import itertools
3import logging
4import os.path
5import tempfile
6import traceback
7from contextlib import ExitStack, contextmanager
8from pathlib import Path
9from typing import (
10 Any,
11 Callable,
12 Dict,
13 Generator,
14 List,
15 Optional,
16 TypeVar,
17 Union,
18)
20from pip._internal.utils.misc import enum, rmtree
22logger = logging.getLogger(__name__)
24_T = TypeVar("_T", bound="TempDirectory")
27# Kinds of temporary directories. Only needed for ones that are
28# globally-managed.
29tempdir_kinds = enum(
30 BUILD_ENV="build-env",
31 EPHEM_WHEEL_CACHE="ephem-wheel-cache",
32 REQ_BUILD="req-build",
33)
36_tempdir_manager: Optional[ExitStack] = None
39@contextmanager
40def global_tempdir_manager() -> Generator[None, None, None]:
41 global _tempdir_manager
42 with ExitStack() as stack:
43 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
44 try:
45 yield
46 finally:
47 _tempdir_manager = old_tempdir_manager
50class TempDirectoryTypeRegistry:
51 """Manages temp directory behavior"""
53 def __init__(self) -> None:
54 self._should_delete: Dict[str, bool] = {}
56 def set_delete(self, kind: str, value: bool) -> None:
57 """Indicate whether a TempDirectory of the given kind should be
58 auto-deleted.
59 """
60 self._should_delete[kind] = value
62 def get_delete(self, kind: str) -> bool:
63 """Get configured auto-delete flag for a given TempDirectory type,
64 default True.
65 """
66 return self._should_delete.get(kind, True)
69_tempdir_registry: Optional[TempDirectoryTypeRegistry] = None
72@contextmanager
73def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
74 """Provides a scoped global tempdir registry that can be used to dictate
75 whether directories should be deleted.
76 """
77 global _tempdir_registry
78 old_tempdir_registry = _tempdir_registry
79 _tempdir_registry = TempDirectoryTypeRegistry()
80 try:
81 yield _tempdir_registry
82 finally:
83 _tempdir_registry = old_tempdir_registry
86class _Default:
87 pass
90_default = _Default()
93class TempDirectory:
94 """Helper class that owns and cleans up a temporary directory.
96 This class can be used as a context manager or as an OO representation of a
97 temporary directory.
99 Attributes:
100 path
101 Location to the created temporary directory
102 delete
103 Whether the directory should be deleted when exiting
104 (when used as a contextmanager)
106 Methods:
107 cleanup()
108 Deletes the temporary directory
110 When used as a context manager, if the delete attribute is True, on
111 exiting the context the temporary directory is deleted.
112 """
114 def __init__(
115 self,
116 path: Optional[str] = None,
117 delete: Union[bool, None, _Default] = _default,
118 kind: str = "temp",
119 globally_managed: bool = False,
120 ignore_cleanup_errors: bool = True,
121 ):
122 super().__init__()
124 if delete is _default:
125 if path is not None:
126 # If we were given an explicit directory, resolve delete option
127 # now.
128 delete = False
129 else:
130 # Otherwise, we wait until cleanup and see what
131 # tempdir_registry says.
132 delete = None
134 # The only time we specify path is in for editables where it
135 # is the value of the --src option.
136 if path is None:
137 path = self._create(kind)
139 self._path = path
140 self._deleted = False
141 self.delete = delete
142 self.kind = kind
143 self.ignore_cleanup_errors = ignore_cleanup_errors
145 if globally_managed:
146 assert _tempdir_manager is not None
147 _tempdir_manager.enter_context(self)
149 @property
150 def path(self) -> str:
151 assert not self._deleted, f"Attempted to access deleted path: {self._path}"
152 return self._path
154 def __repr__(self) -> str:
155 return f"<{self.__class__.__name__} {self.path!r}>"
157 def __enter__(self: _T) -> _T:
158 return self
160 def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
161 if self.delete is not None:
162 delete = self.delete
163 elif _tempdir_registry:
164 delete = _tempdir_registry.get_delete(self.kind)
165 else:
166 delete = True
168 if delete:
169 self.cleanup()
171 def _create(self, kind: str) -> str:
172 """Create a temporary directory and store its path in self.path"""
173 # We realpath here because some systems have their default tmpdir
174 # symlinked to another directory. This tends to confuse build
175 # scripts, so we canonicalize the path by traversing potential
176 # symlinks here.
177 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
178 logger.debug("Created temporary directory: %s", path)
179 return path
181 def cleanup(self) -> None:
182 """Remove the temporary directory created and reset state"""
183 self._deleted = True
184 if not os.path.exists(self._path):
185 return
187 errors: List[BaseException] = []
189 def onerror(
190 func: Callable[..., Any],
191 path: Path,
192 exc_val: BaseException,
193 ) -> None:
194 """Log a warning for a `rmtree` error and continue"""
195 formatted_exc = "\n".join(
196 traceback.format_exception_only(type(exc_val), exc_val)
197 )
198 formatted_exc = formatted_exc.rstrip() # remove trailing new line
199 if func in (os.unlink, os.remove, os.rmdir):
200 logger.debug(
201 "Failed to remove a temporary file '%s' due to %s.\n",
202 path,
203 formatted_exc,
204 )
205 else:
206 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc)
207 errors.append(exc_val)
209 if self.ignore_cleanup_errors:
210 try:
211 # first try with tenacity; retrying to handle ephemeral errors
212 rmtree(self._path, ignore_errors=False)
213 except OSError:
214 # last pass ignore/log all errors
215 rmtree(self._path, onexc=onerror)
216 if errors:
217 logger.warning(
218 "Failed to remove contents in a temporary directory '%s'.\n"
219 "You can safely remove it manually.",
220 self._path,
221 )
222 else:
223 rmtree(self._path)
226class AdjacentTempDirectory(TempDirectory):
227 """Helper class that creates a temporary directory adjacent to a real one.
229 Attributes:
230 original
231 The original directory to create a temp directory for.
232 path
233 After calling create() or entering, contains the full
234 path to the temporary directory.
235 delete
236 Whether the directory should be deleted when exiting
237 (when used as a contextmanager)
239 """
241 # The characters that may be used to name the temp directory
242 # We always prepend a ~ and then rotate through these until
243 # a usable name is found.
244 # pkg_resources raises a different error for .dist-info folder
245 # with leading '-' and invalid metadata
246 LEADING_CHARS = "-~.=%0123456789"
248 def __init__(self, original: str, delete: Optional[bool] = None) -> None:
249 self.original = original.rstrip("/\\")
250 super().__init__(delete=delete)
252 @classmethod
253 def _generate_names(cls, name: str) -> Generator[str, None, None]:
254 """Generates a series of temporary names.
256 The algorithm replaces the leading characters in the name
257 with ones that are valid filesystem characters, but are not
258 valid package names (for both Python and pip definitions of
259 package).
260 """
261 for i in range(1, len(name)):
262 for candidate in itertools.combinations_with_replacement(
263 cls.LEADING_CHARS, i - 1
264 ):
265 new_name = "~" + "".join(candidate) + name[i:]
266 if new_name != name:
267 yield new_name
269 # If we make it this far, we will have to make a longer name
270 for i in range(len(cls.LEADING_CHARS)):
271 for candidate in itertools.combinations_with_replacement(
272 cls.LEADING_CHARS, i
273 ):
274 new_name = "~" + "".join(candidate) + name
275 if new_name != name:
276 yield new_name
278 def _create(self, kind: str) -> str:
279 root, name = os.path.split(self.original)
280 for candidate in self._generate_names(name):
281 path = os.path.join(root, candidate)
282 try:
283 os.mkdir(path)
284 except OSError as ex:
285 # Continue if the name exists already
286 if ex.errno != errno.EEXIST:
287 raise
288 else:
289 path = os.path.realpath(path)
290 break
291 else:
292 # Final fallback on the default behavior.
293 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
295 logger.debug("Created temporary directory: %s", path)
296 return path