Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/distlib/resources.py: 32%

225 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2013-2017 Vinay Sajip. 

4# Licensed to the Python Software Foundation under a contributor agreement. 

5# See LICENSE.txt and CONTRIBUTORS.txt. 

6# 

7from __future__ import unicode_literals 

8 

9import bisect 

10import io 

11import logging 

12import os 

13import pkgutil 

14import sys 

15import types 

16import zipimport 

17 

18from . import DistlibException 

19from .util import cached_property, get_cache_base, Cache 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24cache = None # created when needed 

25 

26 

27class ResourceCache(Cache): 

28 def __init__(self, base=None): 

29 if base is None: 

30 # Use native string to avoid issues on 2.x: see Python #20140. 

31 base = os.path.join(get_cache_base(), str('resource-cache')) 

32 super(ResourceCache, self).__init__(base) 

33 

34 def is_stale(self, resource, path): 

35 """ 

36 Is the cache stale for the given resource? 

37 

38 :param resource: The :class:`Resource` being cached. 

39 :param path: The path of the resource in the cache. 

40 :return: True if the cache is stale. 

41 """ 

42 # Cache invalidation is a hard problem :-) 

43 return True 

44 

45 def get(self, resource): 

46 """ 

47 Get a resource into the cache, 

48 

49 :param resource: A :class:`Resource` instance. 

50 :return: The pathname of the resource in the cache. 

51 """ 

52 prefix, path = resource.finder.get_cache_info(resource) 

53 if prefix is None: 

54 result = path 

55 else: 

56 result = os.path.join(self.base, self.prefix_to_dir(prefix), path) 

57 dirname = os.path.dirname(result) 

58 if not os.path.isdir(dirname): 

59 os.makedirs(dirname) 

60 if not os.path.exists(result): 

61 stale = True 

62 else: 

63 stale = self.is_stale(resource, path) 

64 if stale: 

65 # write the bytes of the resource to the cache location 

66 with open(result, 'wb') as f: 

67 f.write(resource.bytes) 

68 return result 

69 

70 

71class ResourceBase(object): 

72 def __init__(self, finder, name): 

73 self.finder = finder 

74 self.name = name 

75 

76 

77class Resource(ResourceBase): 

78 """ 

79 A class representing an in-package resource, such as a data file. This is 

80 not normally instantiated by user code, but rather by a 

81 :class:`ResourceFinder` which manages the resource. 

82 """ 

83 is_container = False # Backwards compatibility 

84 

85 def as_stream(self): 

86 """ 

87 Get the resource as a stream. 

88 

89 This is not a property to make it obvious that it returns a new stream 

90 each time. 

91 """ 

92 return self.finder.get_stream(self) 

93 

94 @cached_property 

95 def file_path(self): 

96 global cache 

97 if cache is None: 

98 cache = ResourceCache() 

99 return cache.get(self) 

100 

101 @cached_property 

102 def bytes(self): 

103 return self.finder.get_bytes(self) 

104 

105 @cached_property 

106 def size(self): 

107 return self.finder.get_size(self) 

108 

109 

110class ResourceContainer(ResourceBase): 

111 is_container = True # Backwards compatibility 

112 

113 @cached_property 

114 def resources(self): 

115 return self.finder.get_resources(self) 

116 

117 

118class ResourceFinder(object): 

119 """ 

120 Resource finder for file system resources. 

121 """ 

122 

123 if sys.platform.startswith('java'): 

124 skipped_extensions = ('.pyc', '.pyo', '.class') 

125 else: 

126 skipped_extensions = ('.pyc', '.pyo') 

127 

128 def __init__(self, module): 

129 self.module = module 

130 self.loader = getattr(module, '__loader__', None) 

131 self.base = os.path.dirname(getattr(module, '__file__', '')) 

132 

133 def _adjust_path(self, path): 

134 return os.path.realpath(path) 

135 

136 def _make_path(self, resource_name): 

137 # Issue #50: need to preserve type of path on Python 2.x 

138 # like os.path._get_sep 

139 if isinstance(resource_name, bytes): # should only happen on 2.x 

140 sep = b'/' 

141 else: 

142 sep = '/' 

143 parts = resource_name.split(sep) 

144 parts.insert(0, self.base) 

145 result = os.path.join(*parts) 

146 return self._adjust_path(result) 

147 

148 def _find(self, path): 

149 return os.path.exists(path) 

150 

151 def get_cache_info(self, resource): 

152 return None, resource.path 

153 

154 def find(self, resource_name): 

155 path = self._make_path(resource_name) 

156 if not self._find(path): 

157 result = None 

158 else: 

159 if self._is_directory(path): 

160 result = ResourceContainer(self, resource_name) 

161 else: 

162 result = Resource(self, resource_name) 

163 result.path = path 

164 return result 

165 

166 def get_stream(self, resource): 

167 return open(resource.path, 'rb') 

168 

169 def get_bytes(self, resource): 

170 with open(resource.path, 'rb') as f: 

171 return f.read() 

172 

173 def get_size(self, resource): 

174 return os.path.getsize(resource.path) 

175 

176 def get_resources(self, resource): 

177 def allowed(f): 

178 return (f != '__pycache__' and not 

179 f.endswith(self.skipped_extensions)) 

180 return set([f for f in os.listdir(resource.path) if allowed(f)]) 

181 

182 def is_container(self, resource): 

183 return self._is_directory(resource.path) 

184 

185 _is_directory = staticmethod(os.path.isdir) 

186 

187 def iterator(self, resource_name): 

188 resource = self.find(resource_name) 

189 if resource is not None: 

190 todo = [resource] 

191 while todo: 

192 resource = todo.pop(0) 

193 yield resource 

194 if resource.is_container: 

195 rname = resource.name 

196 for name in resource.resources: 

197 if not rname: 

198 new_name = name 

199 else: 

200 new_name = '/'.join([rname, name]) 

201 child = self.find(new_name) 

202 if child.is_container: 

203 todo.append(child) 

204 else: 

205 yield child 

206 

207 

208class ZipResourceFinder(ResourceFinder): 

209 """ 

210 Resource finder for resources in .zip files. 

211 """ 

212 def __init__(self, module): 

213 super(ZipResourceFinder, self).__init__(module) 

214 archive = self.loader.archive 

215 self.prefix_len = 1 + len(archive) 

216 # PyPy doesn't have a _files attr on zipimporter, and you can't set one 

217 if hasattr(self.loader, '_files'): 

218 self._files = self.loader._files 

219 else: 

220 self._files = zipimport._zip_directory_cache[archive] 

221 self.index = sorted(self._files) 

222 

223 def _adjust_path(self, path): 

224 return path 

225 

226 def _find(self, path): 

227 path = path[self.prefix_len:] 

228 if path in self._files: 

229 result = True 

230 else: 

231 if path and path[-1] != os.sep: 

232 path = path + os.sep 

233 i = bisect.bisect(self.index, path) 

234 try: 

235 result = self.index[i].startswith(path) 

236 except IndexError: 

237 result = False 

238 if not result: 

239 logger.debug('_find failed: %r %r', path, self.loader.prefix) 

240 else: 

241 logger.debug('_find worked: %r %r', path, self.loader.prefix) 

242 return result 

243 

244 def get_cache_info(self, resource): 

245 prefix = self.loader.archive 

246 path = resource.path[1 + len(prefix):] 

247 return prefix, path 

248 

249 def get_bytes(self, resource): 

250 return self.loader.get_data(resource.path) 

251 

252 def get_stream(self, resource): 

253 return io.BytesIO(self.get_bytes(resource)) 

254 

255 def get_size(self, resource): 

256 path = resource.path[self.prefix_len:] 

257 return self._files[path][3] 

258 

259 def get_resources(self, resource): 

260 path = resource.path[self.prefix_len:] 

261 if path and path[-1] != os.sep: 

262 path += os.sep 

263 plen = len(path) 

264 result = set() 

265 i = bisect.bisect(self.index, path) 

266 while i < len(self.index): 

267 if not self.index[i].startswith(path): 

268 break 

269 s = self.index[i][plen:] 

270 result.add(s.split(os.sep, 1)[0]) # only immediate children 

271 i += 1 

272 return result 

273 

274 def _is_directory(self, path): 

275 path = path[self.prefix_len:] 

276 if path and path[-1] != os.sep: 

277 path += os.sep 

278 i = bisect.bisect(self.index, path) 

279 try: 

280 result = self.index[i].startswith(path) 

281 except IndexError: 

282 result = False 

283 return result 

284 

285 

286_finder_registry = { 

287 type(None): ResourceFinder, 

288 zipimport.zipimporter: ZipResourceFinder 

289} 

290 

291try: 

292 # In Python 3.6, _frozen_importlib -> _frozen_importlib_external 

293 try: 

294 import _frozen_importlib_external as _fi 

295 except ImportError: 

296 import _frozen_importlib as _fi 

297 _finder_registry[_fi.SourceFileLoader] = ResourceFinder 

298 _finder_registry[_fi.FileFinder] = ResourceFinder 

299 # See issue #146 

300 _finder_registry[_fi.SourcelessFileLoader] = ResourceFinder 

301 del _fi 

302except (ImportError, AttributeError): 

303 pass 

304 

305 

306def register_finder(loader, finder_maker): 

307 _finder_registry[type(loader)] = finder_maker 

308 

309 

310_finder_cache = {} 

311 

312 

313def finder(package): 

314 """ 

315 Return a resource finder for a package. 

316 :param package: The name of the package. 

317 :return: A :class:`ResourceFinder` instance for the package. 

318 """ 

319 if package in _finder_cache: 

320 result = _finder_cache[package] 

321 else: 

322 if package not in sys.modules: 

323 __import__(package) 

324 module = sys.modules[package] 

325 path = getattr(module, '__path__', None) 

326 if path is None: 

327 raise DistlibException('You cannot get a finder for a module, ' 

328 'only for a package') 

329 loader = getattr(module, '__loader__', None) 

330 finder_maker = _finder_registry.get(type(loader)) 

331 if finder_maker is None: 

332 raise DistlibException('Unable to locate finder for %r' % package) 

333 result = finder_maker(module) 

334 _finder_cache[package] = result 

335 return result 

336 

337 

338_dummy_module = types.ModuleType(str('__dummy__')) 

339 

340 

341def finder_for_path(path): 

342 """ 

343 Return a resource finder for a path, which should represent a container. 

344 

345 :param path: The path. 

346 :return: A :class:`ResourceFinder` instance for the path. 

347 """ 

348 result = None 

349 # calls any path hooks, gets importer into cache 

350 pkgutil.get_importer(path) 

351 loader = sys.path_importer_cache.get(path) 

352 finder = _finder_registry.get(type(loader)) 

353 if finder: 

354 module = _dummy_module 

355 module.__file__ = os.path.join(path, '') 

356 module.__loader__ = loader 

357 result = finder(module) 

358 return result