Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py: 36%
109 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import errno
2import itertools
3import logging
4import os.path
5import tempfile
6from contextlib import ExitStack, contextmanager
7from typing import Any, Dict, Generator, Optional, TypeVar, Union
9from pip._internal.utils.misc import enum, rmtree
11logger = logging.getLogger(__name__)
13_T = TypeVar("_T", bound="TempDirectory")
16# Kinds of temporary directories. Only needed for ones that are
17# globally-managed.
18tempdir_kinds = enum(
19 BUILD_ENV="build-env",
20 EPHEM_WHEEL_CACHE="ephem-wheel-cache",
21 REQ_BUILD="req-build",
22)
25_tempdir_manager: Optional[ExitStack] = None
28@contextmanager
29def global_tempdir_manager() -> Generator[None, None, None]:
30 global _tempdir_manager
31 with ExitStack() as stack:
32 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
33 try:
34 yield
35 finally:
36 _tempdir_manager = old_tempdir_manager
39class TempDirectoryTypeRegistry:
40 """Manages temp directory behavior"""
42 def __init__(self) -> None:
43 self._should_delete: Dict[str, bool] = {}
45 def set_delete(self, kind: str, value: bool) -> None:
46 """Indicate whether a TempDirectory of the given kind should be
47 auto-deleted.
48 """
49 self._should_delete[kind] = value
51 def get_delete(self, kind: str) -> bool:
52 """Get configured auto-delete flag for a given TempDirectory type,
53 default True.
54 """
55 return self._should_delete.get(kind, True)
58_tempdir_registry: Optional[TempDirectoryTypeRegistry] = None
61@contextmanager
62def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
63 """Provides a scoped global tempdir registry that can be used to dictate
64 whether directories should be deleted.
65 """
66 global _tempdir_registry
67 old_tempdir_registry = _tempdir_registry
68 _tempdir_registry = TempDirectoryTypeRegistry()
69 try:
70 yield _tempdir_registry
71 finally:
72 _tempdir_registry = old_tempdir_registry
75class _Default:
76 pass
79_default = _Default()
82class TempDirectory:
83 """Helper class that owns and cleans up a temporary directory.
85 This class can be used as a context manager or as an OO representation of a
86 temporary directory.
88 Attributes:
89 path
90 Location to the created temporary directory
91 delete
92 Whether the directory should be deleted when exiting
93 (when used as a contextmanager)
95 Methods:
96 cleanup()
97 Deletes the temporary directory
99 When used as a context manager, if the delete attribute is True, on
100 exiting the context the temporary directory is deleted.
101 """
103 def __init__(
104 self,
105 path: Optional[str] = None,
106 delete: Union[bool, None, _Default] = _default,
107 kind: str = "temp",
108 globally_managed: bool = False,
109 ):
110 super().__init__()
112 if delete is _default:
113 if path is not None:
114 # If we were given an explicit directory, resolve delete option
115 # now.
116 delete = False
117 else:
118 # Otherwise, we wait until cleanup and see what
119 # tempdir_registry says.
120 delete = None
122 # The only time we specify path is in for editables where it
123 # is the value of the --src option.
124 if path is None:
125 path = self._create(kind)
127 self._path = path
128 self._deleted = False
129 self.delete = delete
130 self.kind = kind
132 if globally_managed:
133 assert _tempdir_manager is not None
134 _tempdir_manager.enter_context(self)
136 @property
137 def path(self) -> str:
138 assert not self._deleted, f"Attempted to access deleted path: {self._path}"
139 return self._path
141 def __repr__(self) -> str:
142 return f"<{self.__class__.__name__} {self.path!r}>"
144 def __enter__(self: _T) -> _T:
145 return self
147 def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
148 if self.delete is not None:
149 delete = self.delete
150 elif _tempdir_registry:
151 delete = _tempdir_registry.get_delete(self.kind)
152 else:
153 delete = True
155 if delete:
156 self.cleanup()
158 def _create(self, kind: str) -> str:
159 """Create a temporary directory and store its path in self.path"""
160 # We realpath here because some systems have their default tmpdir
161 # symlinked to another directory. This tends to confuse build
162 # scripts, so we canonicalize the path by traversing potential
163 # symlinks here.
164 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
165 logger.debug("Created temporary directory: %s", path)
166 return path
168 def cleanup(self) -> None:
169 """Remove the temporary directory created and reset state"""
170 self._deleted = True
171 if not os.path.exists(self._path):
172 return
173 rmtree(self._path)
176class AdjacentTempDirectory(TempDirectory):
177 """Helper class that creates a temporary directory adjacent to a real one.
179 Attributes:
180 original
181 The original directory to create a temp directory for.
182 path
183 After calling create() or entering, contains the full
184 path to the temporary directory.
185 delete
186 Whether the directory should be deleted when exiting
187 (when used as a contextmanager)
189 """
191 # The characters that may be used to name the temp directory
192 # We always prepend a ~ and then rotate through these until
193 # a usable name is found.
194 # pkg_resources raises a different error for .dist-info folder
195 # with leading '-' and invalid metadata
196 LEADING_CHARS = "-~.=%0123456789"
198 def __init__(self, original: str, delete: Optional[bool] = None) -> None:
199 self.original = original.rstrip("/\\")
200 super().__init__(delete=delete)
202 @classmethod
203 def _generate_names(cls, name: str) -> Generator[str, None, None]:
204 """Generates a series of temporary names.
206 The algorithm replaces the leading characters in the name
207 with ones that are valid filesystem characters, but are not
208 valid package names (for both Python and pip definitions of
209 package).
210 """
211 for i in range(1, len(name)):
212 for candidate in itertools.combinations_with_replacement(
213 cls.LEADING_CHARS, i - 1
214 ):
215 new_name = "~" + "".join(candidate) + name[i:]
216 if new_name != name:
217 yield new_name
219 # If we make it this far, we will have to make a longer name
220 for i in range(len(cls.LEADING_CHARS)):
221 for candidate in itertools.combinations_with_replacement(
222 cls.LEADING_CHARS, i
223 ):
224 new_name = "~" + "".join(candidate) + name
225 if new_name != name:
226 yield new_name
228 def _create(self, kind: str) -> str:
229 root, name = os.path.split(self.original)
230 for candidate in self._generate_names(name):
231 path = os.path.join(root, candidate)
232 try:
233 os.mkdir(path)
234 except OSError as ex:
235 # Continue if the name exists already
236 if ex.errno != errno.EEXIST:
237 raise
238 else:
239 path = os.path.realpath(path)
240 break
241 else:
242 # Final fallback on the default behavior.
243 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
245 logger.debug("Created temporary directory: %s", path)
246 return path