Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zipp/__init__.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

179 statements  

1import io 

2import posixpath 

3import zipfile 

4import itertools 

5import contextlib 

6import pathlib 

7import re 

8import sys 

9 

10from .compat.py310 import text_encoding 

11from .glob import Translator 

12 

13 

14__all__ = ['Path'] 

15 

16 

17def _parents(path): 

18 """ 

19 Given a path with elements separated by 

20 posixpath.sep, generate all parents of that path. 

21 

22 >>> list(_parents('b/d')) 

23 ['b'] 

24 >>> list(_parents('/b/d/')) 

25 ['/b'] 

26 >>> list(_parents('b/d/f/')) 

27 ['b/d', 'b'] 

28 >>> list(_parents('b')) 

29 [] 

30 >>> list(_parents('')) 

31 [] 

32 """ 

33 return itertools.islice(_ancestry(path), 1, None) 

34 

35 

36def _ancestry(path): 

37 """ 

38 Given a path with elements separated by 

39 posixpath.sep, generate all elements of that path 

40 

41 >>> list(_ancestry('b/d')) 

42 ['b/d', 'b'] 

43 >>> list(_ancestry('/b/d/')) 

44 ['/b/d', '/b'] 

45 >>> list(_ancestry('b/d/f/')) 

46 ['b/d/f', 'b/d', 'b'] 

47 >>> list(_ancestry('b')) 

48 ['b'] 

49 >>> list(_ancestry('')) 

50 [] 

51 """ 

52 path = path.rstrip(posixpath.sep) 

53 while path and path != posixpath.sep: 

54 yield path 

55 path, tail = posixpath.split(path) 

56 

57 

58_dedupe = dict.fromkeys 

59"""Deduplicate an iterable in original order""" 

60 

61 

62def _difference(minuend, subtrahend): 

63 """ 

64 Return items in minuend not in subtrahend, retaining order 

65 with O(1) lookup. 

66 """ 

67 return itertools.filterfalse(set(subtrahend).__contains__, minuend) 

68 

69 

70class InitializedState: 

71 """ 

72 Mix-in to save the initialization state for pickling. 

73 """ 

74 

75 def __init__(self, *args, **kwargs): 

76 self.__args = args 

77 self.__kwargs = kwargs 

78 super().__init__(*args, **kwargs) 

79 

80 def __getstate__(self): 

81 return self.__args, self.__kwargs 

82 

83 def __setstate__(self, state): 

84 args, kwargs = state 

85 super().__init__(*args, **kwargs) 

86 

87 

88class CompleteDirs(InitializedState, zipfile.ZipFile): 

89 """ 

90 A ZipFile subclass that ensures that implied directories 

91 are always included in the namelist. 

92 

93 >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt'])) 

94 ['foo/', 'foo/bar/'] 

95 >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/'])) 

96 ['foo/'] 

97 """ 

98 

99 @staticmethod 

100 def _implied_dirs(names): 

101 parents = itertools.chain.from_iterable(map(_parents, names)) 

102 as_dirs = (p + posixpath.sep for p in parents) 

103 return _dedupe(_difference(as_dirs, names)) 

104 

105 def namelist(self): 

106 names = super().namelist() 

107 return names + list(self._implied_dirs(names)) 

108 

109 def _name_set(self): 

110 return set(self.namelist()) 

111 

112 def resolve_dir(self, name): 

113 """ 

114 If the name represents a directory, return that name 

115 as a directory (with the trailing slash). 

116 """ 

117 names = self._name_set() 

118 dirname = name + '/' 

119 dir_match = name not in names and dirname in names 

120 return dirname if dir_match else name 

121 

122 def getinfo(self, name): 

123 """ 

124 Supplement getinfo for implied dirs. 

125 """ 

126 try: 

127 return super().getinfo(name) 

128 except KeyError: 

129 if not name.endswith('/') or name not in self._name_set(): 

130 raise 

131 return zipfile.ZipInfo(filename=name) 

132 

133 @classmethod 

134 def make(cls, source): 

135 """ 

136 Given a source (filename or zipfile), return an 

137 appropriate CompleteDirs subclass. 

138 """ 

139 if isinstance(source, CompleteDirs): 

140 return source 

141 

142 if not isinstance(source, zipfile.ZipFile): 

143 return cls(source) 

144 

145 # Only allow for FastLookup when supplied zipfile is read-only 

146 if 'r' not in source.mode: 

147 cls = CompleteDirs 

148 

149 source.__class__ = cls 

150 return source 

151 

152 @classmethod 

153 def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile: 

154 """ 

155 Given a writable zip file zf, inject directory entries for 

156 any directories implied by the presence of children. 

157 """ 

158 for name in cls._implied_dirs(zf.namelist()): 

159 zf.writestr(name, b"") 

160 return zf 

161 

162 

163class FastLookup(CompleteDirs): 

164 """ 

165 ZipFile subclass to ensure implicit 

166 dirs exist and are resolved rapidly. 

167 """ 

168 

169 def namelist(self): 

170 with contextlib.suppress(AttributeError): 

171 return self.__names 

172 self.__names = super().namelist() 

173 return self.__names 

174 

175 def _name_set(self): 

176 with contextlib.suppress(AttributeError): 

177 return self.__lookup 

178 self.__lookup = super()._name_set() 

179 return self.__lookup 

180 

181 

182def _extract_text_encoding(encoding=None, *args, **kwargs): 

183 # compute stack level so that the caller of the caller sees any warning. 

184 is_pypy = sys.implementation.name == 'pypy' 

185 stack_level = 3 + is_pypy 

186 return text_encoding(encoding, stack_level), args, kwargs 

187 

188 

189class Path: 

190 """ 

191 A pathlib-compatible interface for zip files. 

192 

193 Consider a zip file with this structure:: 

194 

195 . 

196 ├── a.txt 

197 └── b 

198 ├── c.txt 

199 └── d 

200 └── e.txt 

201 

202 >>> data = io.BytesIO() 

203 >>> zf = zipfile.ZipFile(data, 'w') 

204 >>> zf.writestr('a.txt', 'content of a') 

205 >>> zf.writestr('b/c.txt', 'content of c') 

206 >>> zf.writestr('b/d/e.txt', 'content of e') 

207 >>> zf.filename = 'mem/abcde.zip' 

208 

209 Path accepts the zipfile object itself or a filename 

210 

211 >>> path = Path(zf) 

212 

213 From there, several path operations are available. 

214 

215 Directory iteration (including the zip file itself): 

216 

217 >>> a, b = path.iterdir() 

218 >>> a 

219 Path('mem/abcde.zip', 'a.txt') 

220 >>> b 

221 Path('mem/abcde.zip', 'b/') 

222 

223 name property: 

224 

225 >>> b.name 

226 'b' 

227 

228 join with divide operator: 

229 

230 >>> c = b / 'c.txt' 

231 >>> c 

232 Path('mem/abcde.zip', 'b/c.txt') 

233 >>> c.name 

234 'c.txt' 

235 

236 Read text: 

237 

238 >>> c.read_text(encoding='utf-8') 

239 'content of c' 

240 

241 existence: 

242 

243 >>> c.exists() 

244 True 

245 >>> (b / 'missing.txt').exists() 

246 False 

247 

248 Coercion to string: 

249 

250 >>> import os 

251 >>> str(c).replace(os.sep, posixpath.sep) 

252 'mem/abcde.zip/b/c.txt' 

253 

254 At the root, ``name``, ``filename``, and ``parent`` 

255 resolve to the zipfile. 

256 

257 >>> str(path) 

258 'mem/abcde.zip/' 

259 >>> path.name 

260 'abcde.zip' 

261 >>> path.filename == pathlib.Path('mem/abcde.zip') 

262 True 

263 >>> str(path.parent) 

264 'mem' 

265 

266 If the zipfile has no filename, such attributes are not 

267 valid and accessing them will raise an Exception. 

268 

269 >>> zf.filename = None 

270 >>> path.name 

271 Traceback (most recent call last): 

272 ... 

273 TypeError: ... 

274 

275 >>> path.filename 

276 Traceback (most recent call last): 

277 ... 

278 TypeError: ... 

279 

280 >>> path.parent 

281 Traceback (most recent call last): 

282 ... 

283 TypeError: ... 

284 

285 # workaround python/cpython#106763 

286 >>> pass 

287 """ 

288 

289 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" 

290 

291 def __init__(self, root, at=""): 

292 """ 

293 Construct a Path from a ZipFile or filename. 

294 

295 Note: When the source is an existing ZipFile object, 

296 its type (__class__) will be mutated to a 

297 specialized type. If the caller wishes to retain the 

298 original type, the caller should either create a 

299 separate ZipFile object or pass a filename. 

300 """ 

301 self.root = FastLookup.make(root) 

302 self.at = at 

303 

304 def __eq__(self, other): 

305 """ 

306 >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo' 

307 False 

308 """ 

309 if self.__class__ is not other.__class__: 

310 return NotImplemented 

311 return (self.root, self.at) == (other.root, other.at) 

312 

313 def __hash__(self): 

314 return hash((self.root, self.at)) 

315 

316 def open(self, mode='r', *args, pwd=None, **kwargs): 

317 """ 

318 Open this entry as text or binary following the semantics 

319 of ``pathlib.Path.open()`` by passing arguments through 

320 to io.TextIOWrapper(). 

321 """ 

322 if self.is_dir(): 

323 raise IsADirectoryError(self) 

324 zip_mode = mode[0] 

325 if not self.exists() and zip_mode == 'r': 

326 raise FileNotFoundError(self) 

327 stream = self.root.open(self.at, zip_mode, pwd=pwd) 

328 if 'b' in mode: 

329 if args or kwargs: 

330 raise ValueError("encoding args invalid for binary operation") 

331 return stream 

332 # Text mode: 

333 encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 

334 return io.TextIOWrapper(stream, encoding, *args, **kwargs) 

335 

336 def _base(self): 

337 return pathlib.PurePosixPath(self.at or self.root.filename) 

338 

339 @property 

340 def name(self): 

341 return self._base().name 

342 

343 @property 

344 def suffix(self): 

345 return self._base().suffix 

346 

347 @property 

348 def suffixes(self): 

349 return self._base().suffixes 

350 

351 @property 

352 def stem(self): 

353 return self._base().stem 

354 

355 @property 

356 def filename(self): 

357 return pathlib.Path(self.root.filename).joinpath(self.at) 

358 

359 def read_text(self, *args, **kwargs): 

360 encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 

361 with self.open('r', encoding, *args, **kwargs) as strm: 

362 return strm.read() 

363 

364 def read_bytes(self): 

365 with self.open('rb') as strm: 

366 return strm.read() 

367 

368 def _is_child(self, path): 

369 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") 

370 

371 def _next(self, at): 

372 return self.__class__(self.root, at) 

373 

374 def is_dir(self): 

375 return not self.at or self.at.endswith("/") 

376 

377 def is_file(self): 

378 return self.exists() and not self.is_dir() 

379 

380 def exists(self): 

381 return self.at in self.root._name_set() 

382 

383 def iterdir(self): 

384 if not self.is_dir(): 

385 raise ValueError("Can't listdir a file") 

386 subs = map(self._next, self.root.namelist()) 

387 return filter(self._is_child, subs) 

388 

389 def match(self, path_pattern): 

390 return pathlib.PurePosixPath(self.at).match(path_pattern) 

391 

392 def is_symlink(self): 

393 """ 

394 Return whether this path is a symlink. Always false (python/cpython#82102). 

395 """ 

396 return False 

397 

398 def glob(self, pattern): 

399 if not pattern: 

400 raise ValueError(f"Unacceptable pattern: {pattern!r}") 

401 

402 prefix = re.escape(self.at) 

403 tr = Translator(seps='/') 

404 matches = re.compile(prefix + tr.translate(pattern)).fullmatch 

405 names = (data.filename for data in self.root.filelist) 

406 return map(self._next, filter(matches, names)) 

407 

408 def rglob(self, pattern): 

409 return self.glob(f'**/{pattern}') 

410 

411 def relative_to(self, other, *extra): 

412 return posixpath.relpath(str(self), str(other.joinpath(*extra))) 

413 

414 def __str__(self): 

415 return posixpath.join(self.root.filename, self.at) 

416 

417 def __repr__(self): 

418 return self.__repr.format(self=self) 

419 

420 def joinpath(self, *other): 

421 next = posixpath.join(self.at, *other) 

422 return self._next(self.root.resolve_dir(next)) 

423 

424 __truediv__ = joinpath 

425 

426 @property 

427 def parent(self): 

428 if not self.at: 

429 return self.filename.parent 

430 parent_at = posixpath.dirname(self.at.rstrip('/')) 

431 if parent_at: 

432 parent_at += '/' 

433 return self._next(parent_at)