Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/utils/temp_dir.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1from __future__ import annotations 

2 

3import errno 

4import itertools 

5import logging 

6import os.path 

7import tempfile 

8import traceback 

9from collections.abc import Generator 

10from contextlib import ExitStack, contextmanager 

11from pathlib import Path 

12from typing import ( 

13 Any, 

14 Callable, 

15 TypeVar, 

16) 

17 

18from pip._internal.utils.misc import enum, rmtree 

19 

20logger = logging.getLogger(__name__) 

21 

22_T = TypeVar("_T", bound="TempDirectory") 

23 

24 

25# Kinds of temporary directories. Only needed for ones that are 

26# globally-managed. 

27tempdir_kinds = enum( 

28 BUILD_ENV="build-env", 

29 EPHEM_WHEEL_CACHE="ephem-wheel-cache", 

30 REQ_BUILD="req-build", 

31) 

32 

33 

34_tempdir_manager: ExitStack | None = None 

35 

36 

37@contextmanager 

38def global_tempdir_manager() -> Generator[None, None, None]: 

39 global _tempdir_manager 

40 with ExitStack() as stack: 

41 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack 

42 try: 

43 yield 

44 finally: 

45 _tempdir_manager = old_tempdir_manager 

46 

47 

48class TempDirectoryTypeRegistry: 

49 """Manages temp directory behavior""" 

50 

51 def __init__(self) -> None: 

52 self._should_delete: dict[str, bool] = {} 

53 

54 def set_delete(self, kind: str, value: bool) -> None: 

55 """Indicate whether a TempDirectory of the given kind should be 

56 auto-deleted. 

57 """ 

58 self._should_delete[kind] = value 

59 

60 def get_delete(self, kind: str) -> bool: 

61 """Get configured auto-delete flag for a given TempDirectory type, 

62 default True. 

63 """ 

64 return self._should_delete.get(kind, True) 

65 

66 

67_tempdir_registry: TempDirectoryTypeRegistry | None = None 

68 

69 

70@contextmanager 

71def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]: 

72 """Provides a scoped global tempdir registry that can be used to dictate 

73 whether directories should be deleted. 

74 """ 

75 global _tempdir_registry 

76 old_tempdir_registry = _tempdir_registry 

77 _tempdir_registry = TempDirectoryTypeRegistry() 

78 try: 

79 yield _tempdir_registry 

80 finally: 

81 _tempdir_registry = old_tempdir_registry 

82 

83 

84class _Default: 

85 pass 

86 

87 

88_default = _Default() 

89 

90 

91class TempDirectory: 

92 """Helper class that owns and cleans up a temporary directory. 

93 

94 This class can be used as a context manager or as an OO representation of a 

95 temporary directory. 

96 

97 Attributes: 

98 path 

99 Location to the created temporary directory 

100 delete 

101 Whether the directory should be deleted when exiting 

102 (when used as a contextmanager) 

103 

104 Methods: 

105 cleanup() 

106 Deletes the temporary directory 

107 

108 When used as a context manager, if the delete attribute is True, on 

109 exiting the context the temporary directory is deleted. 

110 """ 

111 

112 def __init__( 

113 self, 

114 path: str | None = None, 

115 delete: bool | None | _Default = _default, 

116 kind: str = "temp", 

117 globally_managed: bool = False, 

118 ignore_cleanup_errors: bool = True, 

119 ): 

120 super().__init__() 

121 

122 if delete is _default: 

123 if path is not None: 

124 # If we were given an explicit directory, resolve delete option 

125 # now. 

126 delete = False 

127 else: 

128 # Otherwise, we wait until cleanup and see what 

129 # tempdir_registry says. 

130 delete = None 

131 

132 # The only time we specify path is in for editables where it 

133 # is the value of the --src option. 

134 if path is None: 

135 path = self._create(kind) 

136 

137 self._path = path 

138 self._deleted = False 

139 self.delete = delete 

140 self.kind = kind 

141 self.ignore_cleanup_errors = ignore_cleanup_errors 

142 

143 if globally_managed: 

144 assert _tempdir_manager is not None 

145 _tempdir_manager.enter_context(self) 

146 

147 @property 

148 def path(self) -> str: 

149 assert not self._deleted, f"Attempted to access deleted path: {self._path}" 

150 return self._path 

151 

152 def __repr__(self) -> str: 

153 return f"<{self.__class__.__name__} {self.path!r}>" 

154 

155 def __enter__(self: _T) -> _T: 

156 return self 

157 

158 def __exit__(self, exc: Any, value: Any, tb: Any) -> None: 

159 if self.delete is not None: 

160 delete = self.delete 

161 elif _tempdir_registry: 

162 delete = _tempdir_registry.get_delete(self.kind) 

163 else: 

164 delete = True 

165 

166 if delete: 

167 self.cleanup() 

168 

169 def _create(self, kind: str) -> str: 

170 """Create a temporary directory and store its path in self.path""" 

171 # We realpath here because some systems have their default tmpdir 

172 # symlinked to another directory. This tends to confuse build 

173 # scripts, so we canonicalize the path by traversing potential 

174 # symlinks here. 

175 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

176 logger.debug("Created temporary directory: %s", path) 

177 return path 

178 

179 def cleanup(self) -> None: 

180 """Remove the temporary directory created and reset state""" 

181 self._deleted = True 

182 if not os.path.exists(self._path): 

183 return 

184 

185 errors: list[BaseException] = [] 

186 

187 def onerror( 

188 func: Callable[..., Any], 

189 path: Path, 

190 exc_val: BaseException, 

191 ) -> None: 

192 """Log a warning for a `rmtree` error and continue""" 

193 formatted_exc = "\n".join( 

194 traceback.format_exception_only(type(exc_val), exc_val) 

195 ) 

196 formatted_exc = formatted_exc.rstrip() # remove trailing new line 

197 if func in (os.unlink, os.remove, os.rmdir): 

198 logger.debug( 

199 "Failed to remove a temporary file '%s' due to %s.\n", 

200 path, 

201 formatted_exc, 

202 ) 

203 else: 

204 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc) 

205 errors.append(exc_val) 

206 

207 if self.ignore_cleanup_errors: 

208 try: 

209 # first try with @retry; retrying to handle ephemeral errors 

210 rmtree(self._path, ignore_errors=False) 

211 except OSError: 

212 # last pass ignore/log all errors 

213 rmtree(self._path, onexc=onerror) 

214 if errors: 

215 logger.warning( 

216 "Failed to remove contents in a temporary directory '%s'.\n" 

217 "You can safely remove it manually.", 

218 self._path, 

219 ) 

220 else: 

221 rmtree(self._path) 

222 

223 

224class AdjacentTempDirectory(TempDirectory): 

225 """Helper class that creates a temporary directory adjacent to a real one. 

226 

227 Attributes: 

228 original 

229 The original directory to create a temp directory for. 

230 path 

231 After calling create() or entering, contains the full 

232 path to the temporary directory. 

233 delete 

234 Whether the directory should be deleted when exiting 

235 (when used as a contextmanager) 

236 

237 """ 

238 

239 # The characters that may be used to name the temp directory 

240 # We always prepend a ~ and then rotate through these until 

241 # a usable name is found. 

242 # pkg_resources raises a different error for .dist-info folder 

243 # with leading '-' and invalid metadata 

244 LEADING_CHARS = "-~.=%0123456789" 

245 

246 def __init__(self, original: str, delete: bool | None = None) -> None: 

247 self.original = original.rstrip("/\\") 

248 super().__init__(delete=delete) 

249 

250 @classmethod 

251 def _generate_names(cls, name: str) -> Generator[str, None, None]: 

252 """Generates a series of temporary names. 

253 

254 The algorithm replaces the leading characters in the name 

255 with ones that are valid filesystem characters, but are not 

256 valid package names (for both Python and pip definitions of 

257 package). 

258 """ 

259 for i in range(1, len(name)): 

260 for candidate in itertools.combinations_with_replacement( 

261 cls.LEADING_CHARS, i - 1 

262 ): 

263 new_name = "~" + "".join(candidate) + name[i:] 

264 if new_name != name: 

265 yield new_name 

266 

267 # If we make it this far, we will have to make a longer name 

268 for i in range(len(cls.LEADING_CHARS)): 

269 for candidate in itertools.combinations_with_replacement( 

270 cls.LEADING_CHARS, i 

271 ): 

272 new_name = "~" + "".join(candidate) + name 

273 if new_name != name: 

274 yield new_name 

275 

276 def _create(self, kind: str) -> str: 

277 root, name = os.path.split(self.original) 

278 for candidate in self._generate_names(name): 

279 path = os.path.join(root, candidate) 

280 try: 

281 os.mkdir(path) 

282 except OSError as ex: 

283 # Continue if the name exists already 

284 if ex.errno != errno.EEXIST: 

285 raise 

286 else: 

287 path = os.path.realpath(path) 

288 break 

289 else: 

290 # Final fallback on the default behavior. 

291 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

292 

293 logger.debug("Created temporary directory: %s", path) 

294 return path