Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/temp_dir.py: 36%

109 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import errno 

2import itertools 

3import logging 

4import os.path 

5import tempfile 

6from contextlib import ExitStack, contextmanager 

7from typing import Any, Dict, Generator, Optional, TypeVar, Union 

8 

9from pip._internal.utils.misc import enum, rmtree 

10 

11logger = logging.getLogger(__name__) 

12 

13_T = TypeVar("_T", bound="TempDirectory") 

14 

15 

16# Kinds of temporary directories. Only needed for ones that are 

17# globally-managed. 

18tempdir_kinds = enum( 

19 BUILD_ENV="build-env", 

20 EPHEM_WHEEL_CACHE="ephem-wheel-cache", 

21 REQ_BUILD="req-build", 

22) 

23 

24 

25_tempdir_manager: Optional[ExitStack] = None 

26 

27 

28@contextmanager 

29def global_tempdir_manager() -> Generator[None, None, None]: 

30 global _tempdir_manager 

31 with ExitStack() as stack: 

32 old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack 

33 try: 

34 yield 

35 finally: 

36 _tempdir_manager = old_tempdir_manager 

37 

38 

39class TempDirectoryTypeRegistry: 

40 """Manages temp directory behavior""" 

41 

42 def __init__(self) -> None: 

43 self._should_delete: Dict[str, bool] = {} 

44 

45 def set_delete(self, kind: str, value: bool) -> None: 

46 """Indicate whether a TempDirectory of the given kind should be 

47 auto-deleted. 

48 """ 

49 self._should_delete[kind] = value 

50 

51 def get_delete(self, kind: str) -> bool: 

52 """Get configured auto-delete flag for a given TempDirectory type, 

53 default True. 

54 """ 

55 return self._should_delete.get(kind, True) 

56 

57 

58_tempdir_registry: Optional[TempDirectoryTypeRegistry] = None 

59 

60 

61@contextmanager 

62def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]: 

63 """Provides a scoped global tempdir registry that can be used to dictate 

64 whether directories should be deleted. 

65 """ 

66 global _tempdir_registry 

67 old_tempdir_registry = _tempdir_registry 

68 _tempdir_registry = TempDirectoryTypeRegistry() 

69 try: 

70 yield _tempdir_registry 

71 finally: 

72 _tempdir_registry = old_tempdir_registry 

73 

74 

75class _Default: 

76 pass 

77 

78 

79_default = _Default() 

80 

81 

82class TempDirectory: 

83 """Helper class that owns and cleans up a temporary directory. 

84 

85 This class can be used as a context manager or as an OO representation of a 

86 temporary directory. 

87 

88 Attributes: 

89 path 

90 Location to the created temporary directory 

91 delete 

92 Whether the directory should be deleted when exiting 

93 (when used as a contextmanager) 

94 

95 Methods: 

96 cleanup() 

97 Deletes the temporary directory 

98 

99 When used as a context manager, if the delete attribute is True, on 

100 exiting the context the temporary directory is deleted. 

101 """ 

102 

103 def __init__( 

104 self, 

105 path: Optional[str] = None, 

106 delete: Union[bool, None, _Default] = _default, 

107 kind: str = "temp", 

108 globally_managed: bool = False, 

109 ): 

110 super().__init__() 

111 

112 if delete is _default: 

113 if path is not None: 

114 # If we were given an explicit directory, resolve delete option 

115 # now. 

116 delete = False 

117 else: 

118 # Otherwise, we wait until cleanup and see what 

119 # tempdir_registry says. 

120 delete = None 

121 

122 # The only time we specify path is in for editables where it 

123 # is the value of the --src option. 

124 if path is None: 

125 path = self._create(kind) 

126 

127 self._path = path 

128 self._deleted = False 

129 self.delete = delete 

130 self.kind = kind 

131 

132 if globally_managed: 

133 assert _tempdir_manager is not None 

134 _tempdir_manager.enter_context(self) 

135 

136 @property 

137 def path(self) -> str: 

138 assert not self._deleted, f"Attempted to access deleted path: {self._path}" 

139 return self._path 

140 

141 def __repr__(self) -> str: 

142 return f"<{self.__class__.__name__} {self.path!r}>" 

143 

144 def __enter__(self: _T) -> _T: 

145 return self 

146 

147 def __exit__(self, exc: Any, value: Any, tb: Any) -> None: 

148 if self.delete is not None: 

149 delete = self.delete 

150 elif _tempdir_registry: 

151 delete = _tempdir_registry.get_delete(self.kind) 

152 else: 

153 delete = True 

154 

155 if delete: 

156 self.cleanup() 

157 

158 def _create(self, kind: str) -> str: 

159 """Create a temporary directory and store its path in self.path""" 

160 # We realpath here because some systems have their default tmpdir 

161 # symlinked to another directory. This tends to confuse build 

162 # scripts, so we canonicalize the path by traversing potential 

163 # symlinks here. 

164 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

165 logger.debug("Created temporary directory: %s", path) 

166 return path 

167 

168 def cleanup(self) -> None: 

169 """Remove the temporary directory created and reset state""" 

170 self._deleted = True 

171 if not os.path.exists(self._path): 

172 return 

173 rmtree(self._path) 

174 

175 

176class AdjacentTempDirectory(TempDirectory): 

177 """Helper class that creates a temporary directory adjacent to a real one. 

178 

179 Attributes: 

180 original 

181 The original directory to create a temp directory for. 

182 path 

183 After calling create() or entering, contains the full 

184 path to the temporary directory. 

185 delete 

186 Whether the directory should be deleted when exiting 

187 (when used as a contextmanager) 

188 

189 """ 

190 

191 # The characters that may be used to name the temp directory 

192 # We always prepend a ~ and then rotate through these until 

193 # a usable name is found. 

194 # pkg_resources raises a different error for .dist-info folder 

195 # with leading '-' and invalid metadata 

196 LEADING_CHARS = "-~.=%0123456789" 

197 

198 def __init__(self, original: str, delete: Optional[bool] = None) -> None: 

199 self.original = original.rstrip("/\\") 

200 super().__init__(delete=delete) 

201 

202 @classmethod 

203 def _generate_names(cls, name: str) -> Generator[str, None, None]: 

204 """Generates a series of temporary names. 

205 

206 The algorithm replaces the leading characters in the name 

207 with ones that are valid filesystem characters, but are not 

208 valid package names (for both Python and pip definitions of 

209 package). 

210 """ 

211 for i in range(1, len(name)): 

212 for candidate in itertools.combinations_with_replacement( 

213 cls.LEADING_CHARS, i - 1 

214 ): 

215 new_name = "~" + "".join(candidate) + name[i:] 

216 if new_name != name: 

217 yield new_name 

218 

219 # If we make it this far, we will have to make a longer name 

220 for i in range(len(cls.LEADING_CHARS)): 

221 for candidate in itertools.combinations_with_replacement( 

222 cls.LEADING_CHARS, i 

223 ): 

224 new_name = "~" + "".join(candidate) + name 

225 if new_name != name: 

226 yield new_name 

227 

228 def _create(self, kind: str) -> str: 

229 root, name = os.path.split(self.original) 

230 for candidate in self._generate_names(name): 

231 path = os.path.join(root, candidate) 

232 try: 

233 os.mkdir(path) 

234 except OSError as ex: 

235 # Continue if the name exists already 

236 if ex.errno != errno.EEXIST: 

237 raise 

238 else: 

239 path = os.path.realpath(path) 

240 break 

241 else: 

242 # Final fallback on the default behavior. 

243 path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) 

244 

245 logger.debug("Created temporary directory: %s", path) 

246 return path