Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/distlib/resources.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2013-2026 Vinay Sajip. 

4# Licensed to the Python Software Foundation under a contributor agreement. 

5# See LICENSE.txt and CONTRIBUTORS.txt. 

6# 

7from __future__ import unicode_literals 

8 

9import bisect 

10import io 

11import logging 

12import os 

13import pkgutil 

14import sys 

15import types 

16import zipimport 

17 

18from . import DistlibException 

19from .util import cached_property, get_cache_base, Cache 

20 

21logger = logging.getLogger(__name__) 

22 

23cache = None # created when needed 

24 

25 

26class ResourceCache(Cache): 

27 

28 def __init__(self, base=None): 

29 if base is None: 

30 # Use native string to avoid issues on 2.x: see Python #20140. 

31 base = os.path.join(get_cache_base(), str('resource-cache')) 

32 super(ResourceCache, self).__init__(base) 

33 

34 def is_stale(self, resource, path): 

35 """ 

36 Is the cache stale for the given resource? 

37 

38 :param resource: The :class:`Resource` being cached. 

39 :param path: The path of the resource in the cache. 

40 :return: True if the cache is stale. 

41 """ 

42 # Cache invalidation is a hard problem :-) 

43 return True 

44 

45 def get(self, resource): 

46 """ 

47 Get a resource into the cache, 

48 

49 :param resource: A :class:`Resource` instance. 

50 :return: The pathname of the resource in the cache. 

51 """ 

52 prefix, path = resource.finder.get_cache_info(resource) 

53 if prefix is None: 

54 result = path 

55 else: 

56 result = os.path.join(self.base, self.prefix_to_dir(prefix), path) 

57 dirname = os.path.dirname(result) 

58 if not os.path.isdir(dirname): 

59 os.makedirs(dirname) 

60 if not os.path.exists(result): 

61 stale = True 

62 else: 

63 stale = self.is_stale(resource, path) 

64 if stale: 

65 # write the bytes of the resource to the cache location 

66 with open(result, 'wb') as f: 

67 f.write(resource.bytes) 

68 return result 

69 

70 

71class ResourceBase(object): 

72 

73 def __init__(self, finder, name): 

74 self.finder = finder 

75 self.name = name 

76 

77 

78class Resource(ResourceBase): 

79 """ 

80 A class representing an in-package resource, such as a data file. This is 

81 not normally instantiated by user code, but rather by a 

82 :class:`ResourceFinder` which manages the resource. 

83 """ 

84 is_container = False # Backwards compatibility 

85 

86 def as_stream(self): 

87 """ 

88 Get the resource as a stream. 

89 

90 This is not a property to make it obvious that it returns a new stream 

91 each time. 

92 """ 

93 return self.finder.get_stream(self) 

94 

95 @cached_property 

96 def file_path(self): 

97 global cache 

98 if cache is None: 

99 cache = ResourceCache() 

100 return cache.get(self) 

101 

102 @cached_property 

103 def bytes(self): 

104 return self.finder.get_bytes(self) 

105 

106 @cached_property 

107 def size(self): 

108 return self.finder.get_size(self) 

109 

110 

111class ResourceContainer(ResourceBase): 

112 is_container = True # Backwards compatibility 

113 

114 @cached_property 

115 def resources(self): 

116 return self.finder.get_resources(self) 

117 

118 

119class ResourceFinder(object): 

120 """ 

121 Resource finder for file system resources. 

122 """ 

123 

124 if sys.platform.startswith('java'): 

125 skipped_extensions = ('.pyc', '.pyo', '.class') 

126 else: 

127 skipped_extensions = ('.pyc', '.pyo') 

128 

129 def __init__(self, module): 

130 self.module = module 

131 self.loader = getattr(module, '__loader__', None) 

132 self.base = os.path.dirname(getattr(module, '__file__', '')) 

133 

134 def _adjust_path(self, path): 

135 return os.path.realpath(path) 

136 

137 def _is_in_base(self, path): 

138 base = self._adjust_path(self.base) 

139 if path == base: 

140 return True 

141 if not base.endswith(os.sep): 

142 base = base + os.sep 

143 return path.startswith(base) 

144 

145 def _make_path(self, resource_name): 

146 # Issue #50: need to preserve type of path on Python 2.x 

147 # like os.path._get_sep 

148 if isinstance(resource_name, bytes): # should only happen on 2.x 

149 sep = b'/' 

150 else: 

151 sep = '/' 

152 parts = resource_name.split(sep) 

153 parts.insert(0, self.base) 

154 result = os.path.join(*parts) 

155 result = self._adjust_path(result) 

156 # Confine the resolved resource to the package base so a resource 

157 # name containing '..' cannot read files outside the package. 

158 if not self._is_in_base(result): 

159 raise DistlibException('Resource name escapes package: ' 

160 '%r' % resource_name) 

161 return result 

162 

163 def _find(self, path): 

164 return os.path.exists(path) 

165 

166 def get_cache_info(self, resource): 

167 return None, resource.path 

168 

169 def find(self, resource_name): 

170 path = self._make_path(resource_name) 

171 if not self._find(path): 

172 result = None 

173 else: 

174 if self._is_directory(path): 

175 result = ResourceContainer(self, resource_name) 

176 else: 

177 result = Resource(self, resource_name) 

178 result.path = path 

179 return result 

180 

181 def get_stream(self, resource): 

182 return open(resource.path, 'rb') 

183 

184 def get_bytes(self, resource): 

185 with open(resource.path, 'rb') as f: 

186 return f.read() 

187 

188 def get_size(self, resource): 

189 return os.path.getsize(resource.path) 

190 

191 def get_resources(self, resource): 

192 

193 def allowed(f): 

194 return (f != '__pycache__' and not f.endswith(self.skipped_extensions)) 

195 

196 return set([f for f in os.listdir(resource.path) if allowed(f)]) 

197 

198 def is_container(self, resource): 

199 return self._is_directory(resource.path) 

200 

201 _is_directory = staticmethod(os.path.isdir) 

202 

203 def iterator(self, resource_name): 

204 resource = self.find(resource_name) 

205 if resource is not None: 

206 todo = [resource] 

207 while todo: 

208 resource = todo.pop(0) 

209 yield resource 

210 if resource.is_container: 

211 rname = resource.name 

212 for name in resource.resources: 

213 if not rname: 

214 new_name = name 

215 else: 

216 new_name = '/'.join([rname, name]) 

217 child = self.find(new_name) 

218 if child.is_container: 

219 todo.append(child) 

220 else: 

221 yield child 

222 

223 

224class ZipResourceFinder(ResourceFinder): 

225 """ 

226 Resource finder for resources in .zip files. 

227 """ 

228 

229 def __init__(self, module): 

230 super(ZipResourceFinder, self).__init__(module) 

231 archive = self.loader.archive 

232 self.prefix_len = 1 + len(archive) 

233 # PyPy doesn't have a _files attr on zipimporter, and you can't set one 

234 if hasattr(self.loader, '_files'): 

235 self._files = self.loader._files 

236 else: 

237 self._files = zipimport._zip_directory_cache[archive] 

238 self.index = sorted(self._files) 

239 

240 def _adjust_path(self, path): 

241 return path 

242 

243 def _find(self, path): 

244 path = path[self.prefix_len:] 

245 if path in self._files: 

246 result = True 

247 else: 

248 if path and path[-1] != os.sep: 

249 path = path + os.sep 

250 i = bisect.bisect(self.index, path) 

251 try: 

252 result = self.index[i].startswith(path) 

253 except IndexError: 

254 result = False 

255 if not result: 

256 logger.debug('_find failed: %r %r', path, self.loader.prefix) 

257 else: 

258 logger.debug('_find worked: %r %r', path, self.loader.prefix) 

259 return result 

260 

261 def get_cache_info(self, resource): 

262 prefix = self.loader.archive 

263 path = resource.path[1 + len(prefix):] 

264 return prefix, path 

265 

266 def get_bytes(self, resource): 

267 return self.loader.get_data(resource.path) 

268 

269 def get_stream(self, resource): 

270 return io.BytesIO(self.get_bytes(resource)) 

271 

272 def get_size(self, resource): 

273 path = resource.path[self.prefix_len:] 

274 return self._files[path][3] 

275 

276 def get_resources(self, resource): 

277 path = resource.path[self.prefix_len:] 

278 if path and path[-1] != os.sep: 

279 path += os.sep 

280 plen = len(path) 

281 result = set() 

282 i = bisect.bisect(self.index, path) 

283 while i < len(self.index): 

284 if not self.index[i].startswith(path): 

285 break 

286 s = self.index[i][plen:] 

287 result.add(s.split(os.sep, 1)[0]) # only immediate children 

288 i += 1 

289 return result 

290 

291 def _is_directory(self, path): 

292 path = path[self.prefix_len:] 

293 if path and path[-1] != os.sep: 

294 path += os.sep 

295 i = bisect.bisect(self.index, path) 

296 try: 

297 result = self.index[i].startswith(path) 

298 except IndexError: 

299 result = False 

300 return result 

301 

302 

303_finder_registry = {type(None): ResourceFinder, zipimport.zipimporter: ZipResourceFinder} 

304 

305try: 

306 # In Python 3.6, _frozen_importlib -> _frozen_importlib_external 

307 try: 

308 import _frozen_importlib_external as _fi 

309 except ImportError: 

310 import _frozen_importlib as _fi 

311 _finder_registry[_fi.SourceFileLoader] = ResourceFinder 

312 _finder_registry[_fi.FileFinder] = ResourceFinder 

313 # See issue #146 

314 _finder_registry[_fi.SourcelessFileLoader] = ResourceFinder 

315 del _fi 

316except (ImportError, AttributeError): 

317 pass 

318 

319 

320def register_finder(loader, finder_maker): 

321 _finder_registry[type(loader)] = finder_maker 

322 

323 

324_finder_cache = {} 

325 

326 

327def finder(package): 

328 """ 

329 Return a resource finder for a package. 

330 :param package: The name of the package. 

331 :return: A :class:`ResourceFinder` instance for the package. 

332 """ 

333 if package in _finder_cache: 

334 result = _finder_cache[package] 

335 else: 

336 if package not in sys.modules: 

337 __import__(package) 

338 module = sys.modules[package] 

339 path = getattr(module, '__path__', None) 

340 if path is None: 

341 raise DistlibException('You cannot get a finder for a module, ' 

342 'only for a package') 

343 loader = getattr(module, '__loader__', None) 

344 finder_maker = _finder_registry.get(type(loader)) 

345 if finder_maker is None: 

346 raise DistlibException('Unable to locate finder for %r' % package) 

347 result = finder_maker(module) 

348 _finder_cache[package] = result 

349 return result 

350 

351 

352_dummy_module = types.ModuleType(str('__dummy__')) 

353 

354 

355def finder_for_path(path): 

356 """ 

357 Return a resource finder for a path, which should represent a container. 

358 

359 :param path: The path. 

360 :return: A :class:`ResourceFinder` instance for the path. 

361 """ 

362 result = None 

363 # calls any path hooks, gets importer into cache 

364 pkgutil.get_importer(path) 

365 loader = sys.path_importer_cache.get(path) 

366 finder = _finder_registry.get(type(loader)) 

367 if finder: 

368 module = _dummy_module 

369 module.__file__ = os.path.join(path, '') 

370 module.__loader__ = loader 

371 result = finder(module) 

372 return result