Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py: 32%

127 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1import errno 

2import itertools 

3import logging 

4import os.path 

5import tempfile 

6import traceback 

7from contextlib import ExitStack, contextmanager 

8from pathlib import Path 

9from typing import ( 

10 Any, 

11 Callable, 

12 Dict, 

13 Generator, 

14 List, 

15 Optional, 

16 TypeVar, 

17 Union, 

18) 

19 

20from pip._internal.utils.misc import enum, rmtree 

21 

22logger = logging.getLogger(__name__) 

23 

24_T = TypeVar("_T", bound="TempDirectory") 

25 

26 

27# Kinds of temporary directories. Only needed for ones that are 

28# globally-managed. 

29tempdir_kinds = enum( 

30 BUILD_ENV="build-env", 

31 EPHEM_WHEEL_CACHE="ephem-wheel-cache", 

32 REQ_BUILD="req-build", 

33) 

34 

35 

36_tempdir_manager: Optional[ExitStack] = None 

37 

38 

39@contextmanager 

40def global_tempdir_manager() -> Generator[None, None, None]: 

41 global _tempdir_manager 

42 with ExitStack() as stack: 

43 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack 

44 try: 

45 yield 

46 finally: 

47 _tempdir_manager = old_tempdir_manager 

48 

49 

50class TempDirectoryTypeRegistry: 

51 """Manages temp directory behavior""" 

52 

53 def __init__(self) -> None: 

54 self._should_delete: Dict[str, bool] = {} 

55 

56 def set_delete(self, kind: str, value: bool) -> None: 

57 """Indicate whether a TempDirectory of the given kind should be 

58 auto-deleted. 

59 """ 

60 self._should_delete[kind] = value 

61 

62 def get_delete(self, kind: str) -> bool: 

63 """Get configured auto-delete flag for a given TempDirectory type, 

64 default True. 

65 """ 

66 return self._should_delete.get(kind, True) 

67 

68 

69_tempdir_registry: Optional[TempDirectoryTypeRegistry] = None 

70 

71 

72@contextmanager 

73def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]: 

74 """Provides a scoped global tempdir registry that can be used to dictate 

75 whether directories should be deleted. 

76 """ 

77 global _tempdir_registry 

78 old_tempdir_registry = _tempdir_registry 

79 _tempdir_registry = TempDirectoryTypeRegistry() 

80 try: 

81 yield _tempdir_registry 

82 finally: 

83 _tempdir_registry = old_tempdir_registry 

84 

85 

86class _Default: 

87 pass 

88 

89 

90_default = _Default() 

91 

92 

93class TempDirectory: 

94 """Helper class that owns and cleans up a temporary directory. 

95 

96 This class can be used as a context manager or as an OO representation of a 

97 temporary directory. 

98 

99 Attributes: 

100 path 

101 Location to the created temporary directory 

102 delete 

103 Whether the directory should be deleted when exiting 

104 (when used as a contextmanager) 

105 

106 Methods: 

107 cleanup() 

108 Deletes the temporary directory 

109 

110 When used as a context manager, if the delete attribute is True, on 

111 exiting the context the temporary directory is deleted. 

112 """ 

113 

114 def __init__( 

115 self, 

116 path: Optional[str] = None, 

117 delete: Union[bool, None, _Default] = _default, 

118 kind: str = "temp", 

119 globally_managed: bool = False, 

120 ignore_cleanup_errors: bool = True, 

121 ): 

122 super().__init__() 

123 

124 if delete is _default: 

125 if path is not None: 

126 # If we were given an explicit directory, resolve delete option 

127 # now. 

128 delete = False 

129 else: 

130 # Otherwise, we wait until cleanup and see what 

131 # tempdir_registry says. 

132 delete = None 

133 

134 # The only time we specify path is in for editables where it 

135 # is the value of the --src option. 

136 if path is None: 

137 path = self._create(kind) 

138 

139 self._path = path 

140 self._deleted = False 

141 self.delete = delete 

142 self.kind = kind 

143 self.ignore_cleanup_errors = ignore_cleanup_errors 

144 

145 if globally_managed: 

146 assert _tempdir_manager is not None 

147 _tempdir_manager.enter_context(self) 

148 

149 @property 

150 def path(self) -> str: 

151 assert not self._deleted, f"Attempted to access deleted path: {self._path}" 

152 return self._path 

153 

154 def __repr__(self) -> str: 

155 return f"<{self.__class__.__name__} {self.path!r}>" 

156 

157 def __enter__(self: _T) -> _T: 

158 return self 

159 

160 def __exit__(self, exc: Any, value: Any, tb: Any) -> None: 

161 if self.delete is not None: 

162 delete = self.delete 

163 elif _tempdir_registry: 

164 delete = _tempdir_registry.get_delete(self.kind) 

165 else: 

166 delete = True 

167 

168 if delete: 

169 self.cleanup() 

170 

171 def _create(self, kind: str) -> str: 

172 """Create a temporary directory and store its path in self.path""" 

173 # We realpath here because some systems have their default tmpdir 

174 # symlinked to another directory. This tends to confuse build 

175 # scripts, so we canonicalize the path by traversing potential 

176 # symlinks here. 

177 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

178 logger.debug("Created temporary directory: %s", path) 

179 return path 

180 

181 def cleanup(self) -> None: 

182 """Remove the temporary directory created and reset state""" 

183 self._deleted = True 

184 if not os.path.exists(self._path): 

185 return 

186 

187 errors: List[BaseException] = [] 

188 

189 def onerror( 

190 func: Callable[..., Any], 

191 path: Path, 

192 exc_val: BaseException, 

193 ) -> None: 

194 """Log a warning for a `rmtree` error and continue""" 

195 formatted_exc = "\n".join( 

196 traceback.format_exception_only(type(exc_val), exc_val) 

197 ) 

198 formatted_exc = formatted_exc.rstrip() # remove trailing new line 

199 if func in (os.unlink, os.remove, os.rmdir): 

200 logger.debug( 

201 "Failed to remove a temporary file '%s' due to %s.\n", 

202 path, 

203 formatted_exc, 

204 ) 

205 else: 

206 logger.debug("%s failed with %s.", func.__qualname__, formatted_exc) 

207 errors.append(exc_val) 

208 

209 if self.ignore_cleanup_errors: 

210 try: 

211 # first try with tenacity; retrying to handle ephemeral errors 

212 rmtree(self._path, ignore_errors=False) 

213 except OSError: 

214 # last pass ignore/log all errors 

215 rmtree(self._path, onexc=onerror) 

216 if errors: 

217 logger.warning( 

218 "Failed to remove contents in a temporary directory '%s'.\n" 

219 "You can safely remove it manually.", 

220 self._path, 

221 ) 

222 else: 

223 rmtree(self._path) 

224 

225 

226class AdjacentTempDirectory(TempDirectory): 

227 """Helper class that creates a temporary directory adjacent to a real one. 

228 

229 Attributes: 

230 original 

231 The original directory to create a temp directory for. 

232 path 

233 After calling create() or entering, contains the full 

234 path to the temporary directory. 

235 delete 

236 Whether the directory should be deleted when exiting 

237 (when used as a contextmanager) 

238 

239 """ 

240 

241 # The characters that may be used to name the temp directory 

242 # We always prepend a ~ and then rotate through these until 

243 # a usable name is found. 

244 # pkg_resources raises a different error for .dist-info folder 

245 # with leading '-' and invalid metadata 

246 LEADING_CHARS = "-~.=%0123456789" 

247 

248 def __init__(self, original: str, delete: Optional[bool] = None) -> None: 

249 self.original = original.rstrip("/\\") 

250 super().__init__(delete=delete) 

251 

252 @classmethod 

253 def _generate_names(cls, name: str) -> Generator[str, None, None]: 

254 """Generates a series of temporary names. 

255 

256 The algorithm replaces the leading characters in the name 

257 with ones that are valid filesystem characters, but are not 

258 valid package names (for both Python and pip definitions of 

259 package). 

260 """ 

261 for i in range(1, len(name)): 

262 for candidate in itertools.combinations_with_replacement( 

263 cls.LEADING_CHARS, i - 1 

264 ): 

265 new_name = "~" + "".join(candidate) + name[i:] 

266 if new_name != name: 

267 yield new_name 

268 

269 # If we make it this far, we will have to make a longer name 

270 for i in range(len(cls.LEADING_CHARS)): 

271 for candidate in itertools.combinations_with_replacement( 

272 cls.LEADING_CHARS, i 

273 ): 

274 new_name = "~" + "".join(candidate) + name 

275 if new_name != name: 

276 yield new_name 

277 

278 def _create(self, kind: str) -> str: 

279 root, name = os.path.split(self.original) 

280 for candidate in self._generate_names(name): 

281 path = os.path.join(root, candidate) 

282 try: 

283 os.mkdir(path) 

284 except OSError as ex: 

285 # Continue if the name exists already 

286 if ex.errno != errno.EEXIST: 

287 raise 

288 else: 

289 path = os.path.realpath(path) 

290 break 

291 else: 

292 # Final fallback on the default behavior. 

293 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

294 

295 logger.debug("Created temporary directory: %s", path) 

296 return path