Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/utils/temp_dir.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1from __future__ import annotations 

2 

3import errno 

4import itertools 

5import logging 

6import os.path 

7import tempfile 

8import traceback 

9from collections.abc import Callable, Generator 

10from contextlib import ExitStack, contextmanager 

11from pathlib import Path 

12from typing import ( 

13 Any, 

14 TypeVar, 

15) 

16 

17from pip._internal.utils.misc import enum, rmtree 

18 

19logger = logging.getLogger(__name__) 

20 

21_T = TypeVar("_T", bound="TempDirectory") 

22 

23 

24# Kinds of temporary directories. Only needed for ones that are 

25# globally-managed. 

26tempdir_kinds = enum( 

27 BUILD_ENV="build-env", 

28 EPHEM_WHEEL_CACHE="ephem-wheel-cache", 

29 REQ_BUILD="req-build", 

30) 

31 

32 

33_tempdir_manager: ExitStack | None = None 

34 

35 

36@contextmanager 

37def global_tempdir_manager() -> Generator[None, None, None]: 

38 global _tempdir_manager 

39 with ExitStack() as stack: 

40 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack 

41 try: 

42 yield 

43 finally: 

44 _tempdir_manager = old_tempdir_manager 

45 

46 

47class TempDirectoryTypeRegistry: 

48 """Manages temp directory behavior""" 

49 

50 def __init__(self) -> None: 

51 self._should_delete: dict[str, bool] = {} 

52 

53 def set_delete(self, kind: str, value: bool) -> None: 

54 """Indicate whether a TempDirectory of the given kind should be 

55 auto-deleted. 

56 """ 

57 self._should_delete[kind] = value 

58 

59 def get_delete(self, kind: str) -> bool: 

60 """Get configured auto-delete flag for a given TempDirectory type, 

61 default True. 

62 """ 

63 return self._should_delete.get(kind, True) 

64 

65 

66_tempdir_registry: TempDirectoryTypeRegistry | None = None 

67 

68 

69@contextmanager 

70def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]: 

71 """Provides a scoped global tempdir registry that can be used to dictate 

72 whether directories should be deleted. 

73 """ 

74 global _tempdir_registry 

75 old_tempdir_registry = _tempdir_registry 

76 _tempdir_registry = TempDirectoryTypeRegistry() 

77 try: 

78 yield _tempdir_registry 

79 finally: 

80 _tempdir_registry = old_tempdir_registry 

81 

82 

83class _Default: 

84 pass 

85 

86 

87_default = _Default() 

88 

89 

90class TempDirectory: 

91 """Helper class that owns and cleans up a temporary directory. 

92 

93 This class can be used as a context manager or as an OO representation of a 

94 temporary directory. 

95 

96 Attributes: 

97 path 

98 Location to the created temporary directory 

99 delete 

100 Whether the directory should be deleted when exiting 

101 (when used as a contextmanager) 

102 

103 Methods: 

104 cleanup() 

105 Deletes the temporary directory 

106 

107 When used as a context manager, if the delete attribute is True, on 

108 exiting the context the temporary directory is deleted. 

109 """ 

110 

111 def __init__( 

112 self, 

113 path: str | None = None, 

114 delete: bool | None | _Default = _default, 

115 kind: str = "temp", 

116 globally_managed: bool = False, 

117 ignore_cleanup_errors: bool = True, 

118 ): 

119 super().__init__() 

120 

121 if delete is _default: 

122 if path is not None: 

123 # If we were given an explicit directory, resolve delete option 

124 # now. 

125 delete = False 

126 else: 

127 # Otherwise, we wait until cleanup and see what 

128 # tempdir_registry says. 

129 delete = None 

130 

131 # The only time we specify path is in for editables where it 

132 # is the value of the --src option. 

133 if path is None: 

134 path = self._create(kind) 

135 

136 self._path = path 

137 self._deleted = False 

138 self.delete = delete 

139 self.kind = kind 

140 self.ignore_cleanup_errors = ignore_cleanup_errors 

141 

142 if globally_managed: 

143 assert _tempdir_manager is not None 

144 _tempdir_manager.enter_context(self) 

145 

146 @property 

147 def path(self) -> str: 

148 assert not self._deleted, f"Attempted to access deleted path: {self._path}" 

149 return self._path 

150 

151 def __repr__(self) -> str: 

152 return f"<{self.__class__.__name__} {self.path!r}>" 

153 

154 def __enter__(self: _T) -> _T: 

155 return self 

156 

157 def __exit__(self, exc: Any, value: Any, tb: Any) -> None: 

158 if self.delete is not None: 

159 delete = self.delete 

160 elif _tempdir_registry: 

161 delete = _tempdir_registry.get_delete(self.kind) 

162 else: 

163 delete = True 

164 

165 if delete: 

166 self.cleanup() 

167 

168 def _create(self, kind: str) -> str: 

169 """Create a temporary directory and store its path in self.path""" 

170 # We realpath here because some systems have their default tmpdir 

171 # symlinked to another directory. This tends to confuse build 

172 # scripts, so we canonicalize the path by traversing potential 

173 # symlinks here. 

174 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

175 logger.debug("Created temporary directory: %s", path) 

176 return path 

177 

178 def cleanup(self) -> None: 

179 """Remove the temporary directory created and reset state""" 

180 self._deleted = True 

181 if not os.path.exists(self._path): 

182 return 

183 

184 errors: list[BaseException] = [] 

185 

186 def onerror( 

187 func: Callable[..., Any], 

188 path: Path, 

189 exc_val: BaseException, 

190 ) -> None: 

191 """Log a warning for a `rmtree` error and continue""" 

192 formatted_exc = "\n".join( 

193 traceback.format_exception_only(type(exc_val), exc_val) 

194 ) 

195 formatted_exc = formatted_exc.rstrip() # remove trailing new line 

196 if func in (os.unlink, os.remove, os.rmdir): 

197 logger.debug( 

198 "Failed to remove a temporary file '%s' due to %s.\n", 

199 path, 

200 formatted_exc, 

201 ) 

202 else: 

203 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc) 

204 errors.append(exc_val) 

205 

206 if self.ignore_cleanup_errors: 

207 try: 

208 # first try with @retry; retrying to handle ephemeral errors 

209 rmtree(self._path, ignore_errors=False) 

210 except OSError: 

211 # last pass ignore/log all errors 

212 rmtree(self._path, onexc=onerror) 

213 if errors: 

214 logger.warning( 

215 "Failed to remove contents in a temporary directory '%s'.\n" 

216 "You can safely remove it manually.", 

217 self._path, 

218 ) 

219 else: 

220 rmtree(self._path) 

221 

222 

223class AdjacentTempDirectory(TempDirectory): 

224 """Helper class that creates a temporary directory adjacent to a real one. 

225 

226 Attributes: 

227 original 

228 The original directory to create a temp directory for. 

229 path 

230 After calling create() or entering, contains the full 

231 path to the temporary directory. 

232 delete 

233 Whether the directory should be deleted when exiting 

234 (when used as a contextmanager) 

235 

236 """ 

237 

238 # The characters that may be used to name the temp directory 

239 # We always prepend a ~ and then rotate through these until 

240 # a usable name is found. 

241 # pkg_resources raises a different error for .dist-info folder 

242 # with leading '-' and invalid metadata 

243 LEADING_CHARS = "-~.=%0123456789" 

244 

245 def __init__(self, original: str, delete: bool | None = None) -> None: 

246 self.original = original.rstrip("/\\") 

247 super().__init__(delete=delete) 

248 

249 @classmethod 

250 def _generate_names(cls, name: str) -> Generator[str, None, None]: 

251 """Generates a series of temporary names. 

252 

253 The algorithm replaces the leading characters in the name 

254 with ones that are valid filesystem characters, but are not 

255 valid package names (for both Python and pip definitions of 

256 package). 

257 """ 

258 for i in range(1, len(name)): 

259 for candidate in itertools.combinations_with_replacement( 

260 cls.LEADING_CHARS, i - 1 

261 ): 

262 new_name = "~" + "".join(candidate) + name[i:] 

263 if new_name != name: 

264 yield new_name 

265 

266 # If we make it this far, we will have to make a longer name 

267 for i in range(len(cls.LEADING_CHARS)): 

268 for candidate in itertools.combinations_with_replacement( 

269 cls.LEADING_CHARS, i 

270 ): 

271 new_name = "~" + "".join(candidate) + name 

272 if new_name != name: 

273 yield new_name 

274 

275 def _create(self, kind: str) -> str: 

276 root, name = os.path.split(self.original) 

277 for candidate in self._generate_names(name): 

278 path = os.path.join(root, candidate) 

279 try: 

280 os.mkdir(path) 

281 except OSError as ex: 

282 # Continue if the name exists already 

283 if ex.errno != errno.EEXIST: 

284 raise 

285 else: 

286 path = os.path.realpath(path) 

287 break 

288 else: 

289 # Final fallback on the default behavior. 

290 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

291 

292 logger.debug("Created temporary directory: %s", path) 

293 return path