Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/zipfile.py: 8%
1493 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1"""
2Read and write ZIP files.
4XXX references to utf-8 need further investigation.
5"""
6import binascii
7import functools
8import importlib.util
9import io
10import itertools
11import os
12import posixpath
13import shutil
14import stat
15import struct
16import sys
17import threading
18import time
19import contextlib
21try:
22 import zlib # We may need its compression method
23 crc32 = zlib.crc32
24except ImportError:
25 zlib = None
26 crc32 = binascii.crc32
28try:
29 import bz2 # We may need its compression method
30except ImportError:
31 bz2 = None
33try:
34 import lzma # We may need its compression method
35except ImportError:
36 lzma = None
38__all__ = ["BadZipFile", "BadZipfile", "error",
39 "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA",
40 "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"]
42class BadZipFile(Exception):
43 pass
46class LargeZipFile(Exception):
47 """
48 Raised when writing a zipfile, the zipfile requires ZIP64 extensions
49 and those extensions are disabled.
50 """
52error = BadZipfile = BadZipFile # Pre-3.2 compatibility names
55ZIP64_LIMIT = (1 << 31) - 1
56ZIP_FILECOUNT_LIMIT = (1 << 16) - 1
57ZIP_MAX_COMMENT = (1 << 16) - 1
59# constants for Zip file compression methods
60ZIP_STORED = 0
61ZIP_DEFLATED = 8
62ZIP_BZIP2 = 12
63ZIP_LZMA = 14
64# Other ZIP compression methods not supported
66DEFAULT_VERSION = 20
67ZIP64_VERSION = 45
68BZIP2_VERSION = 46
69LZMA_VERSION = 63
70# we recognize (but not necessarily support) all features up to that version
71MAX_EXTRACT_VERSION = 63
73# Below are some formats and associated data for reading/writing headers using
74# the struct module. The names and structures of headers/records are those used
75# in the PKWARE description of the ZIP file format:
76# http://www.pkware.com/documents/casestudies/APPNOTE.TXT
77# (URL valid as of January 2008)
79# The "end of central directory" structure, magic number, size, and indices
80# (section V.I in the format document)
81structEndArchive = b"<4s4H2LH"
82stringEndArchive = b"PK\005\006"
83sizeEndCentDir = struct.calcsize(structEndArchive)
85_ECD_SIGNATURE = 0
86_ECD_DISK_NUMBER = 1
87_ECD_DISK_START = 2
88_ECD_ENTRIES_THIS_DISK = 3
89_ECD_ENTRIES_TOTAL = 4
90_ECD_SIZE = 5
91_ECD_OFFSET = 6
92_ECD_COMMENT_SIZE = 7
93# These last two indices are not part of the structure as defined in the
94# spec, but they are used internally by this module as a convenience
95_ECD_COMMENT = 8
96_ECD_LOCATION = 9
98# The "central directory" structure, magic number, size, and indices
99# of entries in the structure (section V.F in the format document)
100structCentralDir = "<4s4B4HL2L5H2L"
101stringCentralDir = b"PK\001\002"
102sizeCentralDir = struct.calcsize(structCentralDir)
104# indexes of entries in the central directory structure
105_CD_SIGNATURE = 0
106_CD_CREATE_VERSION = 1
107_CD_CREATE_SYSTEM = 2
108_CD_EXTRACT_VERSION = 3
109_CD_EXTRACT_SYSTEM = 4
110_CD_FLAG_BITS = 5
111_CD_COMPRESS_TYPE = 6
112_CD_TIME = 7
113_CD_DATE = 8
114_CD_CRC = 9
115_CD_COMPRESSED_SIZE = 10
116_CD_UNCOMPRESSED_SIZE = 11
117_CD_FILENAME_LENGTH = 12
118_CD_EXTRA_FIELD_LENGTH = 13
119_CD_COMMENT_LENGTH = 14
120_CD_DISK_NUMBER_START = 15
121_CD_INTERNAL_FILE_ATTRIBUTES = 16
122_CD_EXTERNAL_FILE_ATTRIBUTES = 17
123_CD_LOCAL_HEADER_OFFSET = 18
125# The "local file header" structure, magic number, size, and indices
126# (section V.A in the format document)
127structFileHeader = "<4s2B4HL2L2H"
128stringFileHeader = b"PK\003\004"
129sizeFileHeader = struct.calcsize(structFileHeader)
131_FH_SIGNATURE = 0
132_FH_EXTRACT_VERSION = 1
133_FH_EXTRACT_SYSTEM = 2
134_FH_GENERAL_PURPOSE_FLAG_BITS = 3
135_FH_COMPRESSION_METHOD = 4
136_FH_LAST_MOD_TIME = 5
137_FH_LAST_MOD_DATE = 6
138_FH_CRC = 7
139_FH_COMPRESSED_SIZE = 8
140_FH_UNCOMPRESSED_SIZE = 9
141_FH_FILENAME_LENGTH = 10
142_FH_EXTRA_FIELD_LENGTH = 11
144# The "Zip64 end of central directory locator" structure, magic number, and size
145structEndArchive64Locator = "<4sLQL"
146stringEndArchive64Locator = b"PK\x06\x07"
147sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator)
149# The "Zip64 end of central directory" record, magic number, size, and indices
150# (section V.G in the format document)
151structEndArchive64 = "<4sQ2H2L4Q"
152stringEndArchive64 = b"PK\x06\x06"
153sizeEndCentDir64 = struct.calcsize(structEndArchive64)
155_CD64_SIGNATURE = 0
156_CD64_DIRECTORY_RECSIZE = 1
157_CD64_CREATE_VERSION = 2
158_CD64_EXTRACT_VERSION = 3
159_CD64_DISK_NUMBER = 4
160_CD64_DISK_NUMBER_START = 5
161_CD64_NUMBER_ENTRIES_THIS_DISK = 6
162_CD64_NUMBER_ENTRIES_TOTAL = 7
163_CD64_DIRECTORY_SIZE = 8
164_CD64_OFFSET_START_CENTDIR = 9
166_DD_SIGNATURE = 0x08074b50
168_EXTRA_FIELD_STRUCT = struct.Struct('<HH')
170def _strip_extra(extra, xids):
171 # Remove Extra Fields with specified IDs.
172 unpack = _EXTRA_FIELD_STRUCT.unpack
173 modified = False
174 buffer = []
175 start = i = 0
176 while i + 4 <= len(extra):
177 xid, xlen = unpack(extra[i : i + 4])
178 j = i + 4 + xlen
179 if xid in xids:
180 if i != start:
181 buffer.append(extra[start : i])
182 start = j
183 modified = True
184 i = j
185 if not modified:
186 return extra
187 return b''.join(buffer)
189def _check_zipfile(fp):
190 try:
191 if _EndRecData(fp):
192 return True # file has correct magic number
193 except OSError:
194 pass
195 return False
197def is_zipfile(filename):
198 """Quickly see if a file is a ZIP file by checking the magic number.
200 The filename argument may be a file or file-like object too.
201 """
202 result = False
203 try:
204 if hasattr(filename, "read"):
205 result = _check_zipfile(fp=filename)
206 else:
207 with open(filename, "rb") as fp:
208 result = _check_zipfile(fp)
209 except OSError:
210 pass
211 return result
213def _EndRecData64(fpin, offset, endrec):
214 """
215 Read the ZIP64 end-of-archive records and use that to update endrec
216 """
217 try:
218 fpin.seek(offset - sizeEndCentDir64Locator, 2)
219 except OSError:
220 # If the seek fails, the file is not large enough to contain a ZIP64
221 # end-of-archive record, so just return the end record we were given.
222 return endrec
224 data = fpin.read(sizeEndCentDir64Locator)
225 if len(data) != sizeEndCentDir64Locator:
226 return endrec
227 sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
228 if sig != stringEndArchive64Locator:
229 return endrec
231 if diskno != 0 or disks > 1:
232 raise BadZipFile("zipfiles that span multiple disks are not supported")
234 # Assume no 'zip64 extensible data'
235 fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
236 data = fpin.read(sizeEndCentDir64)
237 if len(data) != sizeEndCentDir64:
238 return endrec
239 sig, sz, create_version, read_version, disk_num, disk_dir, \
240 dircount, dircount2, dirsize, diroffset = \
241 struct.unpack(structEndArchive64, data)
242 if sig != stringEndArchive64:
243 return endrec
245 # Update the original endrec using data from the ZIP64 record
246 endrec[_ECD_SIGNATURE] = sig
247 endrec[_ECD_DISK_NUMBER] = disk_num
248 endrec[_ECD_DISK_START] = disk_dir
249 endrec[_ECD_ENTRIES_THIS_DISK] = dircount
250 endrec[_ECD_ENTRIES_TOTAL] = dircount2
251 endrec[_ECD_SIZE] = dirsize
252 endrec[_ECD_OFFSET] = diroffset
253 return endrec
256def _EndRecData(fpin):
257 """Return data from the "End of Central Directory" record, or None.
259 The data is a list of the nine items in the ZIP "End of central dir"
260 record followed by a tenth item, the file seek offset of this record."""
262 # Determine file size
263 fpin.seek(0, 2)
264 filesize = fpin.tell()
266 # Check to see if this is ZIP file with no archive comment (the
267 # "end of central directory" structure should be the last item in the
268 # file if this is the case).
269 try:
270 fpin.seek(-sizeEndCentDir, 2)
271 except OSError:
272 return None
273 data = fpin.read()
274 if (len(data) == sizeEndCentDir and
275 data[0:4] == stringEndArchive and
276 data[-2:] == b"\000\000"):
277 # the signature is correct and there's no comment, unpack structure
278 endrec = struct.unpack(structEndArchive, data)
279 endrec=list(endrec)
281 # Append a blank comment and record start offset
282 endrec.append(b"")
283 endrec.append(filesize - sizeEndCentDir)
285 # Try to read the "Zip64 end of central directory" structure
286 return _EndRecData64(fpin, -sizeEndCentDir, endrec)
288 # Either this is not a ZIP file, or it is a ZIP file with an archive
289 # comment. Search the end of the file for the "end of central directory"
290 # record signature. The comment is the last item in the ZIP file and may be
291 # up to 64K long. It is assumed that the "end of central directory" magic
292 # number does not appear in the comment.
293 maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0)
294 fpin.seek(maxCommentStart, 0)
295 data = fpin.read()
296 start = data.rfind(stringEndArchive)
297 if start >= 0:
298 # found the magic number; attempt to unpack and interpret
299 recData = data[start:start+sizeEndCentDir]
300 if len(recData) != sizeEndCentDir:
301 # Zip file is corrupted.
302 return None
303 endrec = list(struct.unpack(structEndArchive, recData))
304 commentSize = endrec[_ECD_COMMENT_SIZE] #as claimed by the zip file
305 comment = data[start+sizeEndCentDir:start+sizeEndCentDir+commentSize]
306 endrec.append(comment)
307 endrec.append(maxCommentStart + start)
309 # Try to read the "Zip64 end of central directory" structure
310 return _EndRecData64(fpin, maxCommentStart + start - filesize,
311 endrec)
313 # Unable to find a valid end of central directory structure
314 return None
317class ZipInfo (object):
318 """Class with attributes describing each file in the ZIP archive."""
320 __slots__ = (
321 'orig_filename',
322 'filename',
323 'date_time',
324 'compress_type',
325 '_compresslevel',
326 'comment',
327 'extra',
328 'create_system',
329 'create_version',
330 'extract_version',
331 'reserved',
332 'flag_bits',
333 'volume',
334 'internal_attr',
335 'external_attr',
336 'header_offset',
337 'CRC',
338 'compress_size',
339 'file_size',
340 '_raw_time',
341 )
343 def __init__(self, filename="NoName", date_time=(1980,1,1,0,0,0)):
344 self.orig_filename = filename # Original file name in archive
346 # Terminate the file name at the first null byte. Null bytes in file
347 # names are used as tricks by viruses in archives.
348 null_byte = filename.find(chr(0))
349 if null_byte >= 0:
350 filename = filename[0:null_byte]
351 # This is used to ensure paths in generated ZIP files always use
352 # forward slashes as the directory separator, as required by the
353 # ZIP format specification.
354 if os.sep != "/" and os.sep in filename:
355 filename = filename.replace(os.sep, "/")
357 self.filename = filename # Normalized file name
358 self.date_time = date_time # year, month, day, hour, min, sec
360 if date_time[0] < 1980:
361 raise ValueError('ZIP does not support timestamps before 1980')
363 # Standard values:
364 self.compress_type = ZIP_STORED # Type of compression for the file
365 self._compresslevel = None # Level for the compressor
366 self.comment = b"" # Comment for each file
367 self.extra = b"" # ZIP extra data
368 if sys.platform == 'win32':
369 self.create_system = 0 # System which created ZIP archive
370 else:
371 # Assume everything else is unix-y
372 self.create_system = 3 # System which created ZIP archive
373 self.create_version = DEFAULT_VERSION # Version which created ZIP archive
374 self.extract_version = DEFAULT_VERSION # Version needed to extract archive
375 self.reserved = 0 # Must be zero
376 self.flag_bits = 0 # ZIP flag bits
377 self.volume = 0 # Volume number of file header
378 self.internal_attr = 0 # Internal attributes
379 self.external_attr = 0 # External file attributes
380 # Other attributes are set by class ZipFile:
381 # header_offset Byte offset to the file header
382 # CRC CRC-32 of the uncompressed file
383 # compress_size Size of the compressed file
384 # file_size Size of the uncompressed file
386 def __repr__(self):
387 result = ['<%s filename=%r' % (self.__class__.__name__, self.filename)]
388 if self.compress_type != ZIP_STORED:
389 result.append(' compress_type=%s' %
390 compressor_names.get(self.compress_type,
391 self.compress_type))
392 hi = self.external_attr >> 16
393 lo = self.external_attr & 0xFFFF
394 if hi:
395 result.append(' filemode=%r' % stat.filemode(hi))
396 if lo:
397 result.append(' external_attr=%#x' % lo)
398 isdir = self.is_dir()
399 if not isdir or self.file_size:
400 result.append(' file_size=%r' % self.file_size)
401 if ((not isdir or self.compress_size) and
402 (self.compress_type != ZIP_STORED or
403 self.file_size != self.compress_size)):
404 result.append(' compress_size=%r' % self.compress_size)
405 result.append('>')
406 return ''.join(result)
408 def FileHeader(self, zip64=None):
409 """Return the per-file header as a bytes object."""
410 dt = self.date_time
411 dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
412 dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
413 if self.flag_bits & 0x08:
414 # Set these to zero because we write them after the file data
415 CRC = compress_size = file_size = 0
416 else:
417 CRC = self.CRC
418 compress_size = self.compress_size
419 file_size = self.file_size
421 extra = self.extra
423 min_version = 0
424 if zip64 is None:
425 zip64 = file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT
426 if zip64:
427 fmt = '<HHQQ'
428 extra = extra + struct.pack(fmt,
429 1, struct.calcsize(fmt)-4, file_size, compress_size)
430 if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
431 if not zip64:
432 raise LargeZipFile("Filesize would require ZIP64 extensions")
433 # File is larger than what fits into a 4 byte integer,
434 # fall back to the ZIP64 extension
435 file_size = 0xffffffff
436 compress_size = 0xffffffff
437 min_version = ZIP64_VERSION
439 if self.compress_type == ZIP_BZIP2:
440 min_version = max(BZIP2_VERSION, min_version)
441 elif self.compress_type == ZIP_LZMA:
442 min_version = max(LZMA_VERSION, min_version)
444 self.extract_version = max(min_version, self.extract_version)
445 self.create_version = max(min_version, self.create_version)
446 filename, flag_bits = self._encodeFilenameFlags()
447 header = struct.pack(structFileHeader, stringFileHeader,
448 self.extract_version, self.reserved, flag_bits,
449 self.compress_type, dostime, dosdate, CRC,
450 compress_size, file_size,
451 len(filename), len(extra))
452 return header + filename + extra
454 def _encodeFilenameFlags(self):
455 try:
456 return self.filename.encode('ascii'), self.flag_bits
457 except UnicodeEncodeError:
458 return self.filename.encode('utf-8'), self.flag_bits | 0x800
460 def _decodeExtra(self):
461 # Try to decode the extra field.
462 extra = self.extra
463 unpack = struct.unpack
464 while len(extra) >= 4:
465 tp, ln = unpack('<HH', extra[:4])
466 if ln+4 > len(extra):
467 raise BadZipFile("Corrupt extra field %04x (size=%d)" % (tp, ln))
468 if tp == 0x0001:
469 if ln >= 24:
470 counts = unpack('<QQQ', extra[4:28])
471 elif ln == 16:
472 counts = unpack('<QQ', extra[4:20])
473 elif ln == 8:
474 counts = unpack('<Q', extra[4:12])
475 elif ln == 0:
476 counts = ()
477 else:
478 raise BadZipFile("Corrupt extra field %04x (size=%d)" % (tp, ln))
480 idx = 0
482 # ZIP64 extension (large files and/or large archives)
483 if self.file_size in (0xffffffffffffffff, 0xffffffff):
484 if len(counts) <= idx:
485 raise BadZipFile(
486 "Corrupt zip64 extra field. File size not found."
487 )
488 self.file_size = counts[idx]
489 idx += 1
491 if self.compress_size == 0xFFFFFFFF:
492 if len(counts) <= idx:
493 raise BadZipFile(
494 "Corrupt zip64 extra field. Compress size not found."
495 )
496 self.compress_size = counts[idx]
497 idx += 1
499 if self.header_offset == 0xffffffff:
500 if len(counts) <= idx:
501 raise BadZipFile(
502 "Corrupt zip64 extra field. Header offset not found."
503 )
504 old = self.header_offset
505 self.header_offset = counts[idx]
506 idx+=1
508 extra = extra[ln+4:]
510 @classmethod
511 def from_file(cls, filename, arcname=None, *, strict_timestamps=True):
512 """Construct an appropriate ZipInfo for a file on the filesystem.
514 filename should be the path to a file or directory on the filesystem.
516 arcname is the name which it will have within the archive (by default,
517 this will be the same as filename, but without a drive letter and with
518 leading path separators removed).
519 """
520 if isinstance(filename, os.PathLike):
521 filename = os.fspath(filename)
522 st = os.stat(filename)
523 isdir = stat.S_ISDIR(st.st_mode)
524 mtime = time.localtime(st.st_mtime)
525 date_time = mtime[0:6]
526 if not strict_timestamps and date_time[0] < 1980:
527 date_time = (1980, 1, 1, 0, 0, 0)
528 elif not strict_timestamps and date_time[0] > 2107:
529 date_time = (2107, 12, 31, 23, 59, 59)
530 # Create ZipInfo instance to store file information
531 if arcname is None:
532 arcname = filename
533 arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
534 while arcname[0] in (os.sep, os.altsep):
535 arcname = arcname[1:]
536 if isdir:
537 arcname += '/'
538 zinfo = cls(arcname, date_time)
539 zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes
540 if isdir:
541 zinfo.file_size = 0
542 zinfo.external_attr |= 0x10 # MS-DOS directory flag
543 else:
544 zinfo.file_size = st.st_size
546 return zinfo
548 def is_dir(self):
549 """Return True if this archive member is a directory."""
550 return self.filename[-1] == '/'
553# ZIP encryption uses the CRC32 one-byte primitive for scrambling some
554# internal keys. We noticed that a direct implementation is faster than
555# relying on binascii.crc32().
557_crctable = None
558def _gen_crc(crc):
559 for j in range(8):
560 if crc & 1:
561 crc = (crc >> 1) ^ 0xEDB88320
562 else:
563 crc >>= 1
564 return crc
566# ZIP supports a password-based form of encryption. Even though known
567# plaintext attacks have been found against it, it is still useful
568# to be able to get data out of such a file.
569#
570# Usage:
571# zd = _ZipDecrypter(mypwd)
572# plain_bytes = zd(cypher_bytes)
574def _ZipDecrypter(pwd):
575 key0 = 305419896
576 key1 = 591751049
577 key2 = 878082192
579 global _crctable
580 if _crctable is None:
581 _crctable = list(map(_gen_crc, range(256)))
582 crctable = _crctable
584 def crc32(ch, crc):
585 """Compute the CRC32 primitive on one byte."""
586 return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF]
588 def update_keys(c):
589 nonlocal key0, key1, key2
590 key0 = crc32(c, key0)
591 key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF
592 key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF
593 key2 = crc32(key1 >> 24, key2)
595 for p in pwd:
596 update_keys(p)
598 def decrypter(data):
599 """Decrypt a bytes object."""
600 result = bytearray()
601 append = result.append
602 for c in data:
603 k = key2 | 2
604 c ^= ((k * (k^1)) >> 8) & 0xFF
605 update_keys(c)
606 append(c)
607 return bytes(result)
609 return decrypter
612class LZMACompressor:
614 def __init__(self):
615 self._comp = None
617 def _init(self):
618 props = lzma._encode_filter_properties({'id': lzma.FILTER_LZMA1})
619 self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[
620 lzma._decode_filter_properties(lzma.FILTER_LZMA1, props)
621 ])
622 return struct.pack('<BBH', 9, 4, len(props)) + props
624 def compress(self, data):
625 if self._comp is None:
626 return self._init() + self._comp.compress(data)
627 return self._comp.compress(data)
629 def flush(self):
630 if self._comp is None:
631 return self._init() + self._comp.flush()
632 return self._comp.flush()
635class LZMADecompressor:
637 def __init__(self):
638 self._decomp = None
639 self._unconsumed = b''
640 self.eof = False
642 def decompress(self, data):
643 if self._decomp is None:
644 self._unconsumed += data
645 if len(self._unconsumed) <= 4:
646 return b''
647 psize, = struct.unpack('<H', self._unconsumed[2:4])
648 if len(self._unconsumed) <= 4 + psize:
649 return b''
651 self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[
652 lzma._decode_filter_properties(lzma.FILTER_LZMA1,
653 self._unconsumed[4:4 + psize])
654 ])
655 data = self._unconsumed[4 + psize:]
656 del self._unconsumed
658 result = self._decomp.decompress(data)
659 self.eof = self._decomp.eof
660 return result
663compressor_names = {
664 0: 'store',
665 1: 'shrink',
666 2: 'reduce',
667 3: 'reduce',
668 4: 'reduce',
669 5: 'reduce',
670 6: 'implode',
671 7: 'tokenize',
672 8: 'deflate',
673 9: 'deflate64',
674 10: 'implode',
675 12: 'bzip2',
676 14: 'lzma',
677 18: 'terse',
678 19: 'lz77',
679 97: 'wavpack',
680 98: 'ppmd',
681}
683def _check_compression(compression):
684 if compression == ZIP_STORED:
685 pass
686 elif compression == ZIP_DEFLATED:
687 if not zlib:
688 raise RuntimeError(
689 "Compression requires the (missing) zlib module")
690 elif compression == ZIP_BZIP2:
691 if not bz2:
692 raise RuntimeError(
693 "Compression requires the (missing) bz2 module")
694 elif compression == ZIP_LZMA:
695 if not lzma:
696 raise RuntimeError(
697 "Compression requires the (missing) lzma module")
698 else:
699 raise NotImplementedError("That compression method is not supported")
702def _get_compressor(compress_type, compresslevel=None):
703 if compress_type == ZIP_DEFLATED:
704 if compresslevel is not None:
705 return zlib.compressobj(compresslevel, zlib.DEFLATED, -15)
706 return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
707 elif compress_type == ZIP_BZIP2:
708 if compresslevel is not None:
709 return bz2.BZ2Compressor(compresslevel)
710 return bz2.BZ2Compressor()
711 # compresslevel is ignored for ZIP_LZMA
712 elif compress_type == ZIP_LZMA:
713 return LZMACompressor()
714 else:
715 return None
718def _get_decompressor(compress_type):
719 _check_compression(compress_type)
720 if compress_type == ZIP_STORED:
721 return None
722 elif compress_type == ZIP_DEFLATED:
723 return zlib.decompressobj(-15)
724 elif compress_type == ZIP_BZIP2:
725 return bz2.BZ2Decompressor()
726 elif compress_type == ZIP_LZMA:
727 return LZMADecompressor()
728 else:
729 descr = compressor_names.get(compress_type)
730 if descr:
731 raise NotImplementedError("compression type %d (%s)" % (compress_type, descr))
732 else:
733 raise NotImplementedError("compression type %d" % (compress_type,))
736class _SharedFile:
737 def __init__(self, file, pos, close, lock, writing):
738 self._file = file
739 self._pos = pos
740 self._close = close
741 self._lock = lock
742 self._writing = writing
743 self.seekable = file.seekable
744 self.tell = file.tell
746 def seek(self, offset, whence=0):
747 with self._lock:
748 if self._writing():
749 raise ValueError("Can't reposition in the ZIP file while "
750 "there is an open writing handle on it. "
751 "Close the writing handle before trying to read.")
752 self._file.seek(offset, whence)
753 self._pos = self._file.tell()
754 return self._pos
756 def read(self, n=-1):
757 with self._lock:
758 if self._writing():
759 raise ValueError("Can't read from the ZIP file while there "
760 "is an open writing handle on it. "
761 "Close the writing handle before trying to read.")
762 self._file.seek(self._pos)
763 data = self._file.read(n)
764 self._pos = self._file.tell()
765 return data
767 def close(self):
768 if self._file is not None:
769 fileobj = self._file
770 self._file = None
771 self._close(fileobj)
773# Provide the tell method for unseekable stream
774class _Tellable:
775 def __init__(self, fp):
776 self.fp = fp
777 self.offset = 0
779 def write(self, data):
780 n = self.fp.write(data)
781 self.offset += n
782 return n
784 def tell(self):
785 return self.offset
787 def flush(self):
788 self.fp.flush()
790 def close(self):
791 self.fp.close()
794class ZipExtFile(io.BufferedIOBase):
795 """File-like object for reading an archive member.
796 Is returned by ZipFile.open().
797 """
799 # Max size supported by decompressor.
800 MAX_N = 1 << 31 - 1
802 # Read from compressed files in 4k blocks.
803 MIN_READ_SIZE = 4096
805 # Chunk size to read during seek
806 MAX_SEEK_READ = 1 << 24
808 def __init__(self, fileobj, mode, zipinfo, pwd=None,
809 close_fileobj=False):
810 self._fileobj = fileobj
811 self._pwd = pwd
812 self._close_fileobj = close_fileobj
814 self._compress_type = zipinfo.compress_type
815 self._compress_left = zipinfo.compress_size
816 self._left = zipinfo.file_size
818 self._decompressor = _get_decompressor(self._compress_type)
820 self._eof = False
821 self._readbuffer = b''
822 self._offset = 0
824 self.newlines = None
826 self.mode = mode
827 self.name = zipinfo.filename
829 if hasattr(zipinfo, 'CRC'):
830 self._expected_crc = zipinfo.CRC
831 self._running_crc = crc32(b'')
832 else:
833 self._expected_crc = None
835 self._seekable = False
836 try:
837 if fileobj.seekable():
838 self._orig_compress_start = fileobj.tell()
839 self._orig_compress_size = zipinfo.compress_size
840 self._orig_file_size = zipinfo.file_size
841 self._orig_start_crc = self._running_crc
842 self._seekable = True
843 except AttributeError:
844 pass
846 self._decrypter = None
847 if pwd:
848 if zipinfo.flag_bits & 0x8:
849 # compare against the file type from extended local headers
850 check_byte = (zipinfo._raw_time >> 8) & 0xff
851 else:
852 # compare against the CRC otherwise
853 check_byte = (zipinfo.CRC >> 24) & 0xff
854 h = self._init_decrypter()
855 if h != check_byte:
856 raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename)
859 def _init_decrypter(self):
860 self._decrypter = _ZipDecrypter(self._pwd)
861 # The first 12 bytes in the cypher stream is an encryption header
862 # used to strengthen the algorithm. The first 11 bytes are
863 # completely random, while the 12th contains the MSB of the CRC,
864 # or the MSB of the file time depending on the header type
865 # and is used to check the correctness of the password.
866 header = self._fileobj.read(12)
867 self._compress_left -= 12
868 return self._decrypter(header)[11]
870 def __repr__(self):
871 result = ['<%s.%s' % (self.__class__.__module__,
872 self.__class__.__qualname__)]
873 if not self.closed:
874 result.append(' name=%r mode=%r' % (self.name, self.mode))
875 if self._compress_type != ZIP_STORED:
876 result.append(' compress_type=%s' %
877 compressor_names.get(self._compress_type,
878 self._compress_type))
879 else:
880 result.append(' [closed]')
881 result.append('>')
882 return ''.join(result)
884 def readline(self, limit=-1):
885 """Read and return a line from the stream.
887 If limit is specified, at most limit bytes will be read.
888 """
890 if limit < 0:
891 # Shortcut common case - newline found in buffer.
892 i = self._readbuffer.find(b'\n', self._offset) + 1
893 if i > 0:
894 line = self._readbuffer[self._offset: i]
895 self._offset = i
896 return line
898 return io.BufferedIOBase.readline(self, limit)
900 def peek(self, n=1):
901 """Returns buffered bytes without advancing the position."""
902 if n > len(self._readbuffer) - self._offset:
903 chunk = self.read(n)
904 if len(chunk) > self._offset:
905 self._readbuffer = chunk + self._readbuffer[self._offset:]
906 self._offset = 0
907 else:
908 self._offset -= len(chunk)
910 # Return up to 512 bytes to reduce allocation overhead for tight loops.
911 return self._readbuffer[self._offset: self._offset + 512]
913 def readable(self):
914 return True
916 def read(self, n=-1):
917 """Read and return up to n bytes.
918 If the argument is omitted, None, or negative, data is read and returned until EOF is reached.
919 """
920 if n is None or n < 0:
921 buf = self._readbuffer[self._offset:]
922 self._readbuffer = b''
923 self._offset = 0
924 while not self._eof:
925 buf += self._read1(self.MAX_N)
926 return buf
928 end = n + self._offset
929 if end < len(self._readbuffer):
930 buf = self._readbuffer[self._offset:end]
931 self._offset = end
932 return buf
934 n = end - len(self._readbuffer)
935 buf = self._readbuffer[self._offset:]
936 self._readbuffer = b''
937 self._offset = 0
938 while n > 0 and not self._eof:
939 data = self._read1(n)
940 if n < len(data):
941 self._readbuffer = data
942 self._offset = n
943 buf += data[:n]
944 break
945 buf += data
946 n -= len(data)
947 return buf
949 def _update_crc(self, newdata):
950 # Update the CRC using the given data.
951 if self._expected_crc is None:
952 # No need to compute the CRC if we don't have a reference value
953 return
954 self._running_crc = crc32(newdata, self._running_crc)
955 # Check the CRC if we're at the end of the file
956 if self._eof and self._running_crc != self._expected_crc:
957 raise BadZipFile("Bad CRC-32 for file %r" % self.name)
959 def read1(self, n):
960 """Read up to n bytes with at most one read() system call."""
962 if n is None or n < 0:
963 buf = self._readbuffer[self._offset:]
964 self._readbuffer = b''
965 self._offset = 0
966 while not self._eof:
967 data = self._read1(self.MAX_N)
968 if data:
969 buf += data
970 break
971 return buf
973 end = n + self._offset
974 if end < len(self._readbuffer):
975 buf = self._readbuffer[self._offset:end]
976 self._offset = end
977 return buf
979 n = end - len(self._readbuffer)
980 buf = self._readbuffer[self._offset:]
981 self._readbuffer = b''
982 self._offset = 0
983 if n > 0:
984 while not self._eof:
985 data = self._read1(n)
986 if n < len(data):
987 self._readbuffer = data
988 self._offset = n
989 buf += data[:n]
990 break
991 if data:
992 buf += data
993 break
994 return buf
996 def _read1(self, n):
997 # Read up to n compressed bytes with at most one read() system call,
998 # decrypt and decompress them.
999 if self._eof or n <= 0:
1000 return b''
1002 # Read from file.
1003 if self._compress_type == ZIP_DEFLATED:
1004 ## Handle unconsumed data.
1005 data = self._decompressor.unconsumed_tail
1006 if n > len(data):
1007 data += self._read2(n - len(data))
1008 else:
1009 data = self._read2(n)
1011 if self._compress_type == ZIP_STORED:
1012 self._eof = self._compress_left <= 0
1013 elif self._compress_type == ZIP_DEFLATED:
1014 n = max(n, self.MIN_READ_SIZE)
1015 data = self._decompressor.decompress(data, n)
1016 self._eof = (self._decompressor.eof or
1017 self._compress_left <= 0 and
1018 not self._decompressor.unconsumed_tail)
1019 if self._eof:
1020 data += self._decompressor.flush()
1021 else:
1022 data = self._decompressor.decompress(data)
1023 self._eof = self._decompressor.eof or self._compress_left <= 0
1025 data = data[:self._left]
1026 self._left -= len(data)
1027 if self._left <= 0:
1028 self._eof = True
1029 self._update_crc(data)
1030 return data
1032 def _read2(self, n):
1033 if self._compress_left <= 0:
1034 return b''
1036 n = max(n, self.MIN_READ_SIZE)
1037 n = min(n, self._compress_left)
1039 data = self._fileobj.read(n)
1040 self._compress_left -= len(data)
1041 if not data:
1042 raise EOFError
1044 if self._decrypter is not None:
1045 data = self._decrypter(data)
1046 return data
1048 def close(self):
1049 try:
1050 if self._close_fileobj:
1051 self._fileobj.close()
1052 finally:
1053 super().close()
1055 def seekable(self):
1056 return self._seekable
1058 def seek(self, offset, whence=0):
1059 if not self._seekable:
1060 raise io.UnsupportedOperation("underlying stream is not seekable")
1061 curr_pos = self.tell()
1062 if whence == 0: # Seek from start of file
1063 new_pos = offset
1064 elif whence == 1: # Seek from current position
1065 new_pos = curr_pos + offset
1066 elif whence == 2: # Seek from EOF
1067 new_pos = self._orig_file_size + offset
1068 else:
1069 raise ValueError("whence must be os.SEEK_SET (0), "
1070 "os.SEEK_CUR (1), or os.SEEK_END (2)")
1072 if new_pos > self._orig_file_size:
1073 new_pos = self._orig_file_size
1075 if new_pos < 0:
1076 new_pos = 0
1078 read_offset = new_pos - curr_pos
1079 buff_offset = read_offset + self._offset
1081 if buff_offset >= 0 and buff_offset < len(self._readbuffer):
1082 # Just move the _offset index if the new position is in the _readbuffer
1083 self._offset = buff_offset
1084 read_offset = 0
1085 elif read_offset < 0:
1086 # Position is before the current position. Reset the ZipExtFile
1087 self._fileobj.seek(self._orig_compress_start)
1088 self._running_crc = self._orig_start_crc
1089 self._compress_left = self._orig_compress_size
1090 self._left = self._orig_file_size
1091 self._readbuffer = b''
1092 self._offset = 0
1093 self._decompressor = _get_decompressor(self._compress_type)
1094 self._eof = False
1095 read_offset = new_pos
1096 if self._decrypter is not None:
1097 self._init_decrypter()
1099 while read_offset > 0:
1100 read_len = min(self.MAX_SEEK_READ, read_offset)
1101 self.read(read_len)
1102 read_offset -= read_len
1104 return self.tell()
1106 def tell(self):
1107 if not self._seekable:
1108 raise io.UnsupportedOperation("underlying stream is not seekable")
1109 filepos = self._orig_file_size - self._left - len(self._readbuffer) + self._offset
1110 return filepos
1113class _ZipWriteFile(io.BufferedIOBase):
1114 def __init__(self, zf, zinfo, zip64):
1115 self._zinfo = zinfo
1116 self._zip64 = zip64
1117 self._zipfile = zf
1118 self._compressor = _get_compressor(zinfo.compress_type,
1119 zinfo._compresslevel)
1120 self._file_size = 0
1121 self._compress_size = 0
1122 self._crc = 0
1124 @property
1125 def _fileobj(self):
1126 return self._zipfile.fp
1128 def writable(self):
1129 return True
1131 def write(self, data):
1132 if self.closed:
1133 raise ValueError('I/O operation on closed file.')
1134 nbytes = len(data)
1135 self._file_size += nbytes
1136 self._crc = crc32(data, self._crc)
1137 if self._compressor:
1138 data = self._compressor.compress(data)
1139 self._compress_size += len(data)
1140 self._fileobj.write(data)
1141 return nbytes
1143 def close(self):
1144 if self.closed:
1145 return
1146 try:
1147 super().close()
1148 # Flush any data from the compressor, and update header info
1149 if self._compressor:
1150 buf = self._compressor.flush()
1151 self._compress_size += len(buf)
1152 self._fileobj.write(buf)
1153 self._zinfo.compress_size = self._compress_size
1154 else:
1155 self._zinfo.compress_size = self._file_size
1156 self._zinfo.CRC = self._crc
1157 self._zinfo.file_size = self._file_size
1159 # Write updated header info
1160 if self._zinfo.flag_bits & 0x08:
1161 # Write CRC and file sizes after the file data
1162 fmt = '<LLQQ' if self._zip64 else '<LLLL'
1163 self._fileobj.write(struct.pack(fmt, _DD_SIGNATURE, self._zinfo.CRC,
1164 self._zinfo.compress_size, self._zinfo.file_size))
1165 self._zipfile.start_dir = self._fileobj.tell()
1166 else:
1167 if not self._zip64:
1168 if self._file_size > ZIP64_LIMIT:
1169 raise RuntimeError(
1170 'File size unexpectedly exceeded ZIP64 limit')
1171 if self._compress_size > ZIP64_LIMIT:
1172 raise RuntimeError(
1173 'Compressed size unexpectedly exceeded ZIP64 limit')
1174 # Seek backwards and write file header (which will now include
1175 # correct CRC and file sizes)
1177 # Preserve current position in file
1178 self._zipfile.start_dir = self._fileobj.tell()
1179 self._fileobj.seek(self._zinfo.header_offset)
1180 self._fileobj.write(self._zinfo.FileHeader(self._zip64))
1181 self._fileobj.seek(self._zipfile.start_dir)
1183 # Successfully written: Add file to our caches
1184 self._zipfile.filelist.append(self._zinfo)
1185 self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo
1186 finally:
1187 self._zipfile._writing = False
1191class ZipFile:
1192 """ Class with methods to open, read, write, close, list zip files.
1194 z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True,
1195 compresslevel=None)
1197 file: Either the path to the file, or a file-like object.
1198 If it is a path, the file will be opened and closed by ZipFile.
1199 mode: The mode can be either read 'r', write 'w', exclusive create 'x',
1200 or append 'a'.
1201 compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib),
1202 ZIP_BZIP2 (requires bz2) or ZIP_LZMA (requires lzma).
1203 allowZip64: if True ZipFile will create files with ZIP64 extensions when
1204 needed, otherwise it will raise an exception when this would
1205 be necessary.
1206 compresslevel: None (default for the given compression type) or an integer
1207 specifying the level to pass to the compressor.
1208 When using ZIP_STORED or ZIP_LZMA this keyword has no effect.
1209 When using ZIP_DEFLATED integers 0 through 9 are accepted.
1210 When using ZIP_BZIP2 integers 1 through 9 are accepted.
1212 """
1214 fp = None # Set here since __del__ checks it
1215 _windows_illegal_name_trans_table = None
1217 def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True,
1218 compresslevel=None, *, strict_timestamps=True):
1219 """Open the ZIP file with mode read 'r', write 'w', exclusive create 'x',
1220 or append 'a'."""
1221 if mode not in ('r', 'w', 'x', 'a'):
1222 raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'")
1224 _check_compression(compression)
1226 self._allowZip64 = allowZip64
1227 self._didModify = False
1228 self.debug = 0 # Level of printing: 0 through 3
1229 self.NameToInfo = {} # Find file info given name
1230 self.filelist = [] # List of ZipInfo instances for archive
1231 self.compression = compression # Method of compression
1232 self.compresslevel = compresslevel
1233 self.mode = mode
1234 self.pwd = None
1235 self._comment = b''
1236 self._strict_timestamps = strict_timestamps
1238 # Check if we were passed a file-like object
1239 if isinstance(file, os.PathLike):
1240 file = os.fspath(file)
1241 if isinstance(file, str):
1242 # No, it's a filename
1243 self._filePassed = 0
1244 self.filename = file
1245 modeDict = {'r' : 'rb', 'w': 'w+b', 'x': 'x+b', 'a' : 'r+b',
1246 'r+b': 'w+b', 'w+b': 'wb', 'x+b': 'xb'}
1247 filemode = modeDict[mode]
1248 while True:
1249 try:
1250 self.fp = io.open(file, filemode)
1251 except OSError:
1252 if filemode in modeDict:
1253 filemode = modeDict[filemode]
1254 continue
1255 raise
1256 break
1257 else:
1258 self._filePassed = 1
1259 self.fp = file
1260 self.filename = getattr(file, 'name', None)
1261 self._fileRefCnt = 1
1262 self._lock = threading.RLock()
1263 self._seekable = True
1264 self._writing = False
1266 try:
1267 if mode == 'r':
1268 self._RealGetContents()
1269 elif mode in ('w', 'x'):
1270 # set the modified flag so central directory gets written
1271 # even if no files are added to the archive
1272 self._didModify = True
1273 try:
1274 self.start_dir = self.fp.tell()
1275 except (AttributeError, OSError):
1276 self.fp = _Tellable(self.fp)
1277 self.start_dir = 0
1278 self._seekable = False
1279 else:
1280 # Some file-like objects can provide tell() but not seek()
1281 try:
1282 self.fp.seek(self.start_dir)
1283 except (AttributeError, OSError):
1284 self._seekable = False
1285 elif mode == 'a':
1286 try:
1287 # See if file is a zip file
1288 self._RealGetContents()
1289 # seek to start of directory and overwrite
1290 self.fp.seek(self.start_dir)
1291 except BadZipFile:
1292 # file is not a zip file, just append
1293 self.fp.seek(0, 2)
1295 # set the modified flag so central directory gets written
1296 # even if no files are added to the archive
1297 self._didModify = True
1298 self.start_dir = self.fp.tell()
1299 else:
1300 raise ValueError("Mode must be 'r', 'w', 'x', or 'a'")
1301 except:
1302 fp = self.fp
1303 self.fp = None
1304 self._fpclose(fp)
1305 raise
1307 def __enter__(self):
1308 return self
1310 def __exit__(self, type, value, traceback):
1311 self.close()
1313 def __repr__(self):
1314 result = ['<%s.%s' % (self.__class__.__module__,
1315 self.__class__.__qualname__)]
1316 if self.fp is not None:
1317 if self._filePassed:
1318 result.append(' file=%r' % self.fp)
1319 elif self.filename is not None:
1320 result.append(' filename=%r' % self.filename)
1321 result.append(' mode=%r' % self.mode)
1322 else:
1323 result.append(' [closed]')
1324 result.append('>')
1325 return ''.join(result)
1327 def _RealGetContents(self):
1328 """Read in the table of contents for the ZIP file."""
1329 fp = self.fp
1330 try:
1331 endrec = _EndRecData(fp)
1332 except OSError:
1333 raise BadZipFile("File is not a zip file")
1334 if not endrec:
1335 raise BadZipFile("File is not a zip file")
1336 if self.debug > 1:
1337 print(endrec)
1338 size_cd = endrec[_ECD_SIZE] # bytes in central directory
1339 offset_cd = endrec[_ECD_OFFSET] # offset of central directory
1340 self._comment = endrec[_ECD_COMMENT] # archive comment
1342 # "concat" is zero, unless zip was concatenated to another file
1343 concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
1344 if endrec[_ECD_SIGNATURE] == stringEndArchive64:
1345 # If Zip64 extension structures are present, account for them
1346 concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)
1348 if self.debug > 2:
1349 inferred = concat + offset_cd
1350 print("given, inferred, offset", offset_cd, inferred, concat)
1351 # self.start_dir: Position of start of central directory
1352 self.start_dir = offset_cd + concat
1353 fp.seek(self.start_dir, 0)
1354 data = fp.read(size_cd)
1355 fp = io.BytesIO(data)
1356 total = 0
1357 while total < size_cd:
1358 centdir = fp.read(sizeCentralDir)
1359 if len(centdir) != sizeCentralDir:
1360 raise BadZipFile("Truncated central directory")
1361 centdir = struct.unpack(structCentralDir, centdir)
1362 if centdir[_CD_SIGNATURE] != stringCentralDir:
1363 raise BadZipFile("Bad magic number for central directory")
1364 if self.debug > 2:
1365 print(centdir)
1366 filename = fp.read(centdir[_CD_FILENAME_LENGTH])
1367 flags = centdir[5]
1368 if flags & 0x800:
1369 # UTF-8 file names extension
1370 filename = filename.decode('utf-8')
1371 else:
1372 # Historical ZIP filename encoding
1373 filename = filename.decode('cp437')
1374 # Create ZipInfo instance to store file information
1375 x = ZipInfo(filename)
1376 x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
1377 x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
1378 x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
1379 (x.create_version, x.create_system, x.extract_version, x.reserved,
1380 x.flag_bits, x.compress_type, t, d,
1381 x.CRC, x.compress_size, x.file_size) = centdir[1:12]
1382 if x.extract_version > MAX_EXTRACT_VERSION:
1383 raise NotImplementedError("zip file version %.1f" %
1384 (x.extract_version / 10))
1385 x.volume, x.internal_attr, x.external_attr = centdir[15:18]
1386 # Convert date/time code to (year, month, day, hour, min, sec)
1387 x._raw_time = t
1388 x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
1389 t>>11, (t>>5)&0x3F, (t&0x1F) * 2 )
1391 x._decodeExtra()
1392 x.header_offset = x.header_offset + concat
1393 self.filelist.append(x)
1394 self.NameToInfo[x.filename] = x
1396 # update total bytes read from central directory
1397 total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
1398 + centdir[_CD_EXTRA_FIELD_LENGTH]
1399 + centdir[_CD_COMMENT_LENGTH])
1401 if self.debug > 2:
1402 print("total", total)
1405 def namelist(self):
1406 """Return a list of file names in the archive."""
1407 return [data.filename for data in self.filelist]
1409 def infolist(self):
1410 """Return a list of class ZipInfo instances for files in the
1411 archive."""
1412 return self.filelist
1414 def printdir(self, file=None):
1415 """Print a table of contents for the zip file."""
1416 print("%-46s %19s %12s" % ("File Name", "Modified ", "Size"),
1417 file=file)
1418 for zinfo in self.filelist:
1419 date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
1420 print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
1421 file=file)
1423 def testzip(self):
1424 """Read all the files and check the CRC."""
1425 chunk_size = 2 ** 20
1426 for zinfo in self.filelist:
1427 try:
1428 # Read by chunks, to avoid an OverflowError or a
1429 # MemoryError with very large embedded files.
1430 with self.open(zinfo.filename, "r") as f:
1431 while f.read(chunk_size): # Check CRC-32
1432 pass
1433 except BadZipFile:
1434 return zinfo.filename
1436 def getinfo(self, name):
1437 """Return the instance of ZipInfo given 'name'."""
1438 info = self.NameToInfo.get(name)
1439 if info is None:
1440 raise KeyError(
1441 'There is no item named %r in the archive' % name)
1443 return info
1445 def setpassword(self, pwd):
1446 """Set default password for encrypted files."""
1447 if pwd and not isinstance(pwd, bytes):
1448 raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__)
1449 if pwd:
1450 self.pwd = pwd
1451 else:
1452 self.pwd = None
1454 @property
1455 def comment(self):
1456 """The comment text associated with the ZIP file."""
1457 return self._comment
1459 @comment.setter
1460 def comment(self, comment):
1461 if not isinstance(comment, bytes):
1462 raise TypeError("comment: expected bytes, got %s" % type(comment).__name__)
1463 # check for valid comment length
1464 if len(comment) > ZIP_MAX_COMMENT:
1465 import warnings
1466 warnings.warn('Archive comment is too long; truncating to %d bytes'
1467 % ZIP_MAX_COMMENT, stacklevel=2)
1468 comment = comment[:ZIP_MAX_COMMENT]
1469 self._comment = comment
1470 self._didModify = True
1472 def read(self, name, pwd=None):
1473 """Return file bytes for name."""
1474 with self.open(name, "r", pwd) as fp:
1475 return fp.read()
1477 def open(self, name, mode="r", pwd=None, *, force_zip64=False):
1478 """Return file-like object for 'name'.
1480 name is a string for the file name within the ZIP file, or a ZipInfo
1481 object.
1483 mode should be 'r' to read a file already in the ZIP file, or 'w' to
1484 write to a file newly added to the archive.
1486 pwd is the password to decrypt files (only used for reading).
1488 When writing, if the file size is not known in advance but may exceed
1489 2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large
1490 files. If the size is known in advance, it is best to pass a ZipInfo
1491 instance for name, with zinfo.file_size set.
1492 """
1493 if mode not in {"r", "w"}:
1494 raise ValueError('open() requires mode "r" or "w"')
1495 if pwd and not isinstance(pwd, bytes):
1496 raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__)
1497 if pwd and (mode == "w"):
1498 raise ValueError("pwd is only supported for reading files")
1499 if not self.fp:
1500 raise ValueError(
1501 "Attempt to use ZIP archive that was already closed")
1503 # Make sure we have an info object
1504 if isinstance(name, ZipInfo):
1505 # 'name' is already an info object
1506 zinfo = name
1507 elif mode == 'w':
1508 zinfo = ZipInfo(name)
1509 zinfo.compress_type = self.compression
1510 zinfo._compresslevel = self.compresslevel
1511 else:
1512 # Get info object for name
1513 zinfo = self.getinfo(name)
1515 if mode == 'w':
1516 return self._open_to_write(zinfo, force_zip64=force_zip64)
1518 if self._writing:
1519 raise ValueError("Can't read from the ZIP file while there "
1520 "is an open writing handle on it. "
1521 "Close the writing handle before trying to read.")
1523 # Open for reading:
1524 self._fileRefCnt += 1
1525 zef_file = _SharedFile(self.fp, zinfo.header_offset,
1526 self._fpclose, self._lock, lambda: self._writing)
1527 try:
1528 # Skip the file header:
1529 fheader = zef_file.read(sizeFileHeader)
1530 if len(fheader) != sizeFileHeader:
1531 raise BadZipFile("Truncated file header")
1532 fheader = struct.unpack(structFileHeader, fheader)
1533 if fheader[_FH_SIGNATURE] != stringFileHeader:
1534 raise BadZipFile("Bad magic number for file header")
1536 fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
1537 if fheader[_FH_EXTRA_FIELD_LENGTH]:
1538 zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
1540 if zinfo.flag_bits & 0x20:
1541 # Zip 2.7: compressed patched data
1542 raise NotImplementedError("compressed patched data (flag bit 5)")
1544 if zinfo.flag_bits & 0x40:
1545 # strong encryption
1546 raise NotImplementedError("strong encryption (flag bit 6)")
1548 if zinfo.flag_bits & 0x800:
1549 # UTF-8 filename
1550 fname_str = fname.decode("utf-8")
1551 else:
1552 fname_str = fname.decode("cp437")
1554 if fname_str != zinfo.orig_filename:
1555 raise BadZipFile(
1556 'File name in directory %r and header %r differ.'
1557 % (zinfo.orig_filename, fname))
1559 # check for encrypted flag & handle password
1560 is_encrypted = zinfo.flag_bits & 0x1
1561 if is_encrypted:
1562 if not pwd:
1563 pwd = self.pwd
1564 if not pwd:
1565 raise RuntimeError("File %r is encrypted, password "
1566 "required for extraction" % name)
1567 else:
1568 pwd = None
1570 return ZipExtFile(zef_file, mode, zinfo, pwd, True)
1571 except:
1572 zef_file.close()
1573 raise
1575 def _open_to_write(self, zinfo, force_zip64=False):
1576 if force_zip64 and not self._allowZip64:
1577 raise ValueError(
1578 "force_zip64 is True, but allowZip64 was False when opening "
1579 "the ZIP file."
1580 )
1581 if self._writing:
1582 raise ValueError("Can't write to the ZIP file while there is "
1583 "another write handle open on it. "
1584 "Close the first handle before opening another.")
1586 # Sizes and CRC are overwritten with correct data after processing the file
1587 if not hasattr(zinfo, 'file_size'):
1588 zinfo.file_size = 0
1589 zinfo.compress_size = 0
1590 zinfo.CRC = 0
1592 zinfo.flag_bits = 0x00
1593 if zinfo.compress_type == ZIP_LZMA:
1594 # Compressed data includes an end-of-stream (EOS) marker
1595 zinfo.flag_bits |= 0x02
1596 if not self._seekable:
1597 zinfo.flag_bits |= 0x08
1599 if not zinfo.external_attr:
1600 zinfo.external_attr = 0o600 << 16 # permissions: ?rw-------
1602 # Compressed size can be larger than uncompressed size
1603 zip64 = self._allowZip64 and \
1604 (force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT)
1606 if self._seekable:
1607 self.fp.seek(self.start_dir)
1608 zinfo.header_offset = self.fp.tell()
1610 self._writecheck(zinfo)
1611 self._didModify = True
1613 self.fp.write(zinfo.FileHeader(zip64))
1615 self._writing = True
1616 return _ZipWriteFile(self, zinfo, zip64)
1618 def extract(self, member, path=None, pwd=None):
1619 """Extract a member from the archive to the current working directory,
1620 using its full name. Its file information is extracted as accurately
1621 as possible. `member' may be a filename or a ZipInfo object. You can
1622 specify a different directory using `path'.
1623 """
1624 if path is None:
1625 path = os.getcwd()
1626 else:
1627 path = os.fspath(path)
1629 return self._extract_member(member, path, pwd)
1631 def extractall(self, path=None, members=None, pwd=None):
1632 """Extract all members from the archive to the current working
1633 directory. `path' specifies a different directory to extract to.
1634 `members' is optional and must be a subset of the list returned
1635 by namelist().
1636 """
1637 if members is None:
1638 members = self.namelist()
1640 if path is None:
1641 path = os.getcwd()
1642 else:
1643 path = os.fspath(path)
1645 for zipinfo in members:
1646 self._extract_member(zipinfo, path, pwd)
1648 @classmethod
1649 def _sanitize_windows_name(cls, arcname, pathsep):
1650 """Replace bad characters and remove trailing dots from parts."""
1651 table = cls._windows_illegal_name_trans_table
1652 if not table:
1653 illegal = ':<>|"?*'
1654 table = str.maketrans(illegal, '_' * len(illegal))
1655 cls._windows_illegal_name_trans_table = table
1656 arcname = arcname.translate(table)
1657 # remove trailing dots
1658 arcname = (x.rstrip('.') for x in arcname.split(pathsep))
1659 # rejoin, removing empty parts.
1660 arcname = pathsep.join(x for x in arcname if x)
1661 return arcname
1663 def _extract_member(self, member, targetpath, pwd):
1664 """Extract the ZipInfo object 'member' to a physical
1665 file on the path targetpath.
1666 """
1667 if not isinstance(member, ZipInfo):
1668 member = self.getinfo(member)
1670 # build the destination pathname, replacing
1671 # forward slashes to platform specific separators.
1672 arcname = member.filename.replace('/', os.path.sep)
1674 if os.path.altsep:
1675 arcname = arcname.replace(os.path.altsep, os.path.sep)
1676 # interpret absolute pathname as relative, remove drive letter or
1677 # UNC path, redundant separators, "." and ".." components.
1678 arcname = os.path.splitdrive(arcname)[1]
1679 invalid_path_parts = ('', os.path.curdir, os.path.pardir)
1680 arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
1681 if x not in invalid_path_parts)
1682 if os.path.sep == '\\':
1683 # filter illegal characters on Windows
1684 arcname = self._sanitize_windows_name(arcname, os.path.sep)
1686 targetpath = os.path.join(targetpath, arcname)
1687 targetpath = os.path.normpath(targetpath)
1689 # Create all upper directories if necessary.
1690 upperdirs = os.path.dirname(targetpath)
1691 if upperdirs and not os.path.exists(upperdirs):
1692 os.makedirs(upperdirs)
1694 if member.is_dir():
1695 if not os.path.isdir(targetpath):
1696 os.mkdir(targetpath)
1697 return targetpath
1699 with self.open(member, pwd=pwd) as source, \
1700 open(targetpath, "wb") as target:
1701 shutil.copyfileobj(source, target)
1703 return targetpath
1705 def _writecheck(self, zinfo):
1706 """Check for errors before writing a file to the archive."""
1707 if zinfo.filename in self.NameToInfo:
1708 import warnings
1709 warnings.warn('Duplicate name: %r' % zinfo.filename, stacklevel=3)
1710 if self.mode not in ('w', 'x', 'a'):
1711 raise ValueError("write() requires mode 'w', 'x', or 'a'")
1712 if not self.fp:
1713 raise ValueError(
1714 "Attempt to write ZIP archive that was already closed")
1715 _check_compression(zinfo.compress_type)
1716 if not self._allowZip64:
1717 requires_zip64 = None
1718 if len(self.filelist) >= ZIP_FILECOUNT_LIMIT:
1719 requires_zip64 = "Files count"
1720 elif zinfo.file_size > ZIP64_LIMIT:
1721 requires_zip64 = "Filesize"
1722 elif zinfo.header_offset > ZIP64_LIMIT:
1723 requires_zip64 = "Zipfile size"
1724 if requires_zip64:
1725 raise LargeZipFile(requires_zip64 +
1726 " would require ZIP64 extensions")
1728 def write(self, filename, arcname=None,
1729 compress_type=None, compresslevel=None):
1730 """Put the bytes from filename into the archive under the name
1731 arcname."""
1732 if not self.fp:
1733 raise ValueError(
1734 "Attempt to write to ZIP archive that was already closed")
1735 if self._writing:
1736 raise ValueError(
1737 "Can't write to ZIP archive while an open writing handle exists"
1738 )
1740 zinfo = ZipInfo.from_file(filename, arcname,
1741 strict_timestamps=self._strict_timestamps)
1743 if zinfo.is_dir():
1744 zinfo.compress_size = 0
1745 zinfo.CRC = 0
1746 else:
1747 if compress_type is not None:
1748 zinfo.compress_type = compress_type
1749 else:
1750 zinfo.compress_type = self.compression
1752 if compresslevel is not None:
1753 zinfo._compresslevel = compresslevel
1754 else:
1755 zinfo._compresslevel = self.compresslevel
1757 if zinfo.is_dir():
1758 with self._lock:
1759 if self._seekable:
1760 self.fp.seek(self.start_dir)
1761 zinfo.header_offset = self.fp.tell() # Start of header bytes
1762 if zinfo.compress_type == ZIP_LZMA:
1763 # Compressed data includes an end-of-stream (EOS) marker
1764 zinfo.flag_bits |= 0x02
1766 self._writecheck(zinfo)
1767 self._didModify = True
1769 self.filelist.append(zinfo)
1770 self.NameToInfo[zinfo.filename] = zinfo
1771 self.fp.write(zinfo.FileHeader(False))
1772 self.start_dir = self.fp.tell()
1773 else:
1774 with open(filename, "rb") as src, self.open(zinfo, 'w') as dest:
1775 shutil.copyfileobj(src, dest, 1024*8)
1777 def writestr(self, zinfo_or_arcname, data,
1778 compress_type=None, compresslevel=None):
1779 """Write a file into the archive. The contents is 'data', which
1780 may be either a 'str' or a 'bytes' instance; if it is a 'str',
1781 it is encoded as UTF-8 first.
1782 'zinfo_or_arcname' is either a ZipInfo instance or
1783 the name of the file in the archive."""
1784 if isinstance(data, str):
1785 data = data.encode("utf-8")
1786 if not isinstance(zinfo_or_arcname, ZipInfo):
1787 zinfo = ZipInfo(filename=zinfo_or_arcname,
1788 date_time=time.localtime(time.time())[:6])
1789 zinfo.compress_type = self.compression
1790 zinfo._compresslevel = self.compresslevel
1791 if zinfo.filename[-1] == '/':
1792 zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
1793 zinfo.external_attr |= 0x10 # MS-DOS directory flag
1794 else:
1795 zinfo.external_attr = 0o600 << 16 # ?rw-------
1796 else:
1797 zinfo = zinfo_or_arcname
1799 if not self.fp:
1800 raise ValueError(
1801 "Attempt to write to ZIP archive that was already closed")
1802 if self._writing:
1803 raise ValueError(
1804 "Can't write to ZIP archive while an open writing handle exists."
1805 )
1807 if compress_type is not None:
1808 zinfo.compress_type = compress_type
1810 if compresslevel is not None:
1811 zinfo._compresslevel = compresslevel
1813 zinfo.file_size = len(data) # Uncompressed size
1814 with self._lock:
1815 with self.open(zinfo, mode='w') as dest:
1816 dest.write(data)
1818 def __del__(self):
1819 """Call the "close()" method in case the user forgot."""
1820 self.close()
1822 def close(self):
1823 """Close the file, and for mode 'w', 'x' and 'a' write the ending
1824 records."""
1825 if self.fp is None:
1826 return
1828 if self._writing:
1829 raise ValueError("Can't close the ZIP file while there is "
1830 "an open writing handle on it. "
1831 "Close the writing handle before closing the zip.")
1833 try:
1834 if self.mode in ('w', 'x', 'a') and self._didModify: # write ending records
1835 with self._lock:
1836 if self._seekable:
1837 self.fp.seek(self.start_dir)
1838 self._write_end_record()
1839 finally:
1840 fp = self.fp
1841 self.fp = None
1842 self._fpclose(fp)
1844 def _write_end_record(self):
1845 for zinfo in self.filelist: # write central directory
1846 dt = zinfo.date_time
1847 dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
1848 dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
1849 extra = []
1850 if zinfo.file_size > ZIP64_LIMIT \
1851 or zinfo.compress_size > ZIP64_LIMIT:
1852 extra.append(zinfo.file_size)
1853 extra.append(zinfo.compress_size)
1854 file_size = 0xffffffff
1855 compress_size = 0xffffffff
1856 else:
1857 file_size = zinfo.file_size
1858 compress_size = zinfo.compress_size
1860 if zinfo.header_offset > ZIP64_LIMIT:
1861 extra.append(zinfo.header_offset)
1862 header_offset = 0xffffffff
1863 else:
1864 header_offset = zinfo.header_offset
1866 extra_data = zinfo.extra
1867 min_version = 0
1868 if extra:
1869 # Append a ZIP64 field to the extra's
1870 extra_data = _strip_extra(extra_data, (1,))
1871 extra_data = struct.pack(
1872 '<HH' + 'Q'*len(extra),
1873 1, 8*len(extra), *extra) + extra_data
1875 min_version = ZIP64_VERSION
1877 if zinfo.compress_type == ZIP_BZIP2:
1878 min_version = max(BZIP2_VERSION, min_version)
1879 elif zinfo.compress_type == ZIP_LZMA:
1880 min_version = max(LZMA_VERSION, min_version)
1882 extract_version = max(min_version, zinfo.extract_version)
1883 create_version = max(min_version, zinfo.create_version)
1884 try:
1885 filename, flag_bits = zinfo._encodeFilenameFlags()
1886 centdir = struct.pack(structCentralDir,
1887 stringCentralDir, create_version,
1888 zinfo.create_system, extract_version, zinfo.reserved,
1889 flag_bits, zinfo.compress_type, dostime, dosdate,
1890 zinfo.CRC, compress_size, file_size,
1891 len(filename), len(extra_data), len(zinfo.comment),
1892 0, zinfo.internal_attr, zinfo.external_attr,
1893 header_offset)
1894 except DeprecationWarning:
1895 print((structCentralDir, stringCentralDir, create_version,
1896 zinfo.create_system, extract_version, zinfo.reserved,
1897 zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
1898 zinfo.CRC, compress_size, file_size,
1899 len(zinfo.filename), len(extra_data), len(zinfo.comment),
1900 0, zinfo.internal_attr, zinfo.external_attr,
1901 header_offset), file=sys.stderr)
1902 raise
1903 self.fp.write(centdir)
1904 self.fp.write(filename)
1905 self.fp.write(extra_data)
1906 self.fp.write(zinfo.comment)
1908 pos2 = self.fp.tell()
1909 # Write end-of-zip-archive record
1910 centDirCount = len(self.filelist)
1911 centDirSize = pos2 - self.start_dir
1912 centDirOffset = self.start_dir
1913 requires_zip64 = None
1914 if centDirCount > ZIP_FILECOUNT_LIMIT:
1915 requires_zip64 = "Files count"
1916 elif centDirOffset > ZIP64_LIMIT:
1917 requires_zip64 = "Central directory offset"
1918 elif centDirSize > ZIP64_LIMIT:
1919 requires_zip64 = "Central directory size"
1920 if requires_zip64:
1921 # Need to write the ZIP64 end-of-archive records
1922 if not self._allowZip64:
1923 raise LargeZipFile(requires_zip64 +
1924 " would require ZIP64 extensions")
1925 zip64endrec = struct.pack(
1926 structEndArchive64, stringEndArchive64,
1927 44, 45, 45, 0, 0, centDirCount, centDirCount,
1928 centDirSize, centDirOffset)
1929 self.fp.write(zip64endrec)
1931 zip64locrec = struct.pack(
1932 structEndArchive64Locator,
1933 stringEndArchive64Locator, 0, pos2, 1)
1934 self.fp.write(zip64locrec)
1935 centDirCount = min(centDirCount, 0xFFFF)
1936 centDirSize = min(centDirSize, 0xFFFFFFFF)
1937 centDirOffset = min(centDirOffset, 0xFFFFFFFF)
1939 endrec = struct.pack(structEndArchive, stringEndArchive,
1940 0, 0, centDirCount, centDirCount,
1941 centDirSize, centDirOffset, len(self._comment))
1942 self.fp.write(endrec)
1943 self.fp.write(self._comment)
1944 self.fp.flush()
1946 def _fpclose(self, fp):
1947 assert self._fileRefCnt > 0
1948 self._fileRefCnt -= 1
1949 if not self._fileRefCnt and not self._filePassed:
1950 fp.close()
1953class PyZipFile(ZipFile):
1954 """Class to create ZIP archives with Python library files and packages."""
1956 def __init__(self, file, mode="r", compression=ZIP_STORED,
1957 allowZip64=True, optimize=-1):
1958 ZipFile.__init__(self, file, mode=mode, compression=compression,
1959 allowZip64=allowZip64)
1960 self._optimize = optimize
1962 def writepy(self, pathname, basename="", filterfunc=None):
1963 """Add all files from "pathname" to the ZIP archive.
1965 If pathname is a package directory, search the directory and
1966 all package subdirectories recursively for all *.py and enter
1967 the modules into the archive. If pathname is a plain
1968 directory, listdir *.py and enter all modules. Else, pathname
1969 must be a Python *.py file and the module will be put into the
1970 archive. Added modules are always module.pyc.
1971 This method will compile the module.py into module.pyc if
1972 necessary.
1973 If filterfunc(pathname) is given, it is called with every argument.
1974 When it is False, the file or directory is skipped.
1975 """
1976 pathname = os.fspath(pathname)
1977 if filterfunc and not filterfunc(pathname):
1978 if self.debug:
1979 label = 'path' if os.path.isdir(pathname) else 'file'
1980 print('%s %r skipped by filterfunc' % (label, pathname))
1981 return
1982 dir, name = os.path.split(pathname)
1983 if os.path.isdir(pathname):
1984 initname = os.path.join(pathname, "__init__.py")
1985 if os.path.isfile(initname):
1986 # This is a package directory, add it
1987 if basename:
1988 basename = "%s/%s" % (basename, name)
1989 else:
1990 basename = name
1991 if self.debug:
1992 print("Adding package in", pathname, "as", basename)
1993 fname, arcname = self._get_codename(initname[0:-3], basename)
1994 if self.debug:
1995 print("Adding", arcname)
1996 self.write(fname, arcname)
1997 dirlist = sorted(os.listdir(pathname))
1998 dirlist.remove("__init__.py")
1999 # Add all *.py files and package subdirectories
2000 for filename in dirlist:
2001 path = os.path.join(pathname, filename)
2002 root, ext = os.path.splitext(filename)
2003 if os.path.isdir(path):
2004 if os.path.isfile(os.path.join(path, "__init__.py")):
2005 # This is a package directory, add it
2006 self.writepy(path, basename,
2007 filterfunc=filterfunc) # Recursive call
2008 elif ext == ".py":
2009 if filterfunc and not filterfunc(path):
2010 if self.debug:
2011 print('file %r skipped by filterfunc' % path)
2012 continue
2013 fname, arcname = self._get_codename(path[0:-3],
2014 basename)
2015 if self.debug:
2016 print("Adding", arcname)
2017 self.write(fname, arcname)
2018 else:
2019 # This is NOT a package directory, add its files at top level
2020 if self.debug:
2021 print("Adding files from directory", pathname)
2022 for filename in sorted(os.listdir(pathname)):
2023 path = os.path.join(pathname, filename)
2024 root, ext = os.path.splitext(filename)
2025 if ext == ".py":
2026 if filterfunc and not filterfunc(path):
2027 if self.debug:
2028 print('file %r skipped by filterfunc' % path)
2029 continue
2030 fname, arcname = self._get_codename(path[0:-3],
2031 basename)
2032 if self.debug:
2033 print("Adding", arcname)
2034 self.write(fname, arcname)
2035 else:
2036 if pathname[-3:] != ".py":
2037 raise RuntimeError(
2038 'Files added with writepy() must end with ".py"')
2039 fname, arcname = self._get_codename(pathname[0:-3], basename)
2040 if self.debug:
2041 print("Adding file", arcname)
2042 self.write(fname, arcname)
2044 def _get_codename(self, pathname, basename):
2045 """Return (filename, archivename) for the path.
2047 Given a module name path, return the correct file path and
2048 archive name, compiling if necessary. For example, given
2049 /python/lib/string, return (/python/lib/string.pyc, string).
2050 """
2051 def _compile(file, optimize=-1):
2052 import py_compile
2053 if self.debug:
2054 print("Compiling", file)
2055 try:
2056 py_compile.compile(file, doraise=True, optimize=optimize)
2057 except py_compile.PyCompileError as err:
2058 print(err.msg)
2059 return False
2060 return True
2062 file_py = pathname + ".py"
2063 file_pyc = pathname + ".pyc"
2064 pycache_opt0 = importlib.util.cache_from_source(file_py, optimization='')
2065 pycache_opt1 = importlib.util.cache_from_source(file_py, optimization=1)
2066 pycache_opt2 = importlib.util.cache_from_source(file_py, optimization=2)
2067 if self._optimize == -1:
2068 # legacy mode: use whatever file is present
2069 if (os.path.isfile(file_pyc) and
2070 os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime):
2071 # Use .pyc file.
2072 arcname = fname = file_pyc
2073 elif (os.path.isfile(pycache_opt0) and
2074 os.stat(pycache_opt0).st_mtime >= os.stat(file_py).st_mtime):
2075 # Use the __pycache__/*.pyc file, but write it to the legacy pyc
2076 # file name in the archive.
2077 fname = pycache_opt0
2078 arcname = file_pyc
2079 elif (os.path.isfile(pycache_opt1) and
2080 os.stat(pycache_opt1).st_mtime >= os.stat(file_py).st_mtime):
2081 # Use the __pycache__/*.pyc file, but write it to the legacy pyc
2082 # file name in the archive.
2083 fname = pycache_opt1
2084 arcname = file_pyc
2085 elif (os.path.isfile(pycache_opt2) and
2086 os.stat(pycache_opt2).st_mtime >= os.stat(file_py).st_mtime):
2087 # Use the __pycache__/*.pyc file, but write it to the legacy pyc
2088 # file name in the archive.
2089 fname = pycache_opt2
2090 arcname = file_pyc
2091 else:
2092 # Compile py into PEP 3147 pyc file.
2093 if _compile(file_py):
2094 if sys.flags.optimize == 0:
2095 fname = pycache_opt0
2096 elif sys.flags.optimize == 1:
2097 fname = pycache_opt1
2098 else:
2099 fname = pycache_opt2
2100 arcname = file_pyc
2101 else:
2102 fname = arcname = file_py
2103 else:
2104 # new mode: use given optimization level
2105 if self._optimize == 0:
2106 fname = pycache_opt0
2107 arcname = file_pyc
2108 else:
2109 arcname = file_pyc
2110 if self._optimize == 1:
2111 fname = pycache_opt1
2112 elif self._optimize == 2:
2113 fname = pycache_opt2
2114 else:
2115 msg = "invalid value for 'optimize': {!r}".format(self._optimize)
2116 raise ValueError(msg)
2117 if not (os.path.isfile(fname) and
2118 os.stat(fname).st_mtime >= os.stat(file_py).st_mtime):
2119 if not _compile(file_py, optimize=self._optimize):
2120 fname = arcname = file_py
2121 archivename = os.path.split(arcname)[1]
2122 if basename:
2123 archivename = "%s/%s" % (basename, archivename)
2124 return (fname, archivename)
2127def _parents(path):
2128 """
2129 Given a path with elements separated by
2130 posixpath.sep, generate all parents of that path.
2132 >>> list(_parents('b/d'))
2133 ['b']
2134 >>> list(_parents('/b/d/'))
2135 ['/b']
2136 >>> list(_parents('b/d/f/'))
2137 ['b/d', 'b']
2138 >>> list(_parents('b'))
2139 []
2140 >>> list(_parents(''))
2141 []
2142 """
2143 return itertools.islice(_ancestry(path), 1, None)
2146def _ancestry(path):
2147 """
2148 Given a path with elements separated by
2149 posixpath.sep, generate all elements of that path
2151 >>> list(_ancestry('b/d'))
2152 ['b/d', 'b']
2153 >>> list(_ancestry('/b/d/'))
2154 ['/b/d', '/b']
2155 >>> list(_ancestry('b/d/f/'))
2156 ['b/d/f', 'b/d', 'b']
2157 >>> list(_ancestry('b'))
2158 ['b']
2159 >>> list(_ancestry(''))
2160 []
2161 """
2162 path = path.rstrip(posixpath.sep)
2163 while path and path != posixpath.sep:
2164 yield path
2165 path, tail = posixpath.split(path)
2168_dedupe = dict.fromkeys
2169"""Deduplicate an iterable in original order"""
2172def _difference(minuend, subtrahend):
2173 """
2174 Return items in minuend not in subtrahend, retaining order
2175 with O(1) lookup.
2176 """
2177 return itertools.filterfalse(set(subtrahend).__contains__, minuend)
2180class CompleteDirs(ZipFile):
2181 """
2182 A ZipFile subclass that ensures that implied directories
2183 are always included in the namelist.
2184 """
2186 @staticmethod
2187 def _implied_dirs(names):
2188 parents = itertools.chain.from_iterable(map(_parents, names))
2189 as_dirs = (p + posixpath.sep for p in parents)
2190 return _dedupe(_difference(as_dirs, names))
2192 def namelist(self):
2193 names = super(CompleteDirs, self).namelist()
2194 return names + list(self._implied_dirs(names))
2196 def _name_set(self):
2197 return set(self.namelist())
2199 def resolve_dir(self, name):
2200 """
2201 If the name represents a directory, return that name
2202 as a directory (with the trailing slash).
2203 """
2204 names = self._name_set()
2205 dirname = name + '/'
2206 dir_match = name not in names and dirname in names
2207 return dirname if dir_match else name
2209 @classmethod
2210 def make(cls, source):
2211 """
2212 Given a source (filename or zipfile), return an
2213 appropriate CompleteDirs subclass.
2214 """
2215 if isinstance(source, CompleteDirs):
2216 return source
2218 if not isinstance(source, ZipFile):
2219 return cls(source)
2221 # Only allow for FastPath when supplied zipfile is read-only
2222 if 'r' not in source.mode:
2223 cls = CompleteDirs
2225 res = cls.__new__(cls)
2226 vars(res).update(vars(source))
2227 return res
2230class FastLookup(CompleteDirs):
2231 """
2232 ZipFile subclass to ensure implicit
2233 dirs exist and are resolved rapidly.
2234 """
2235 def namelist(self):
2236 with contextlib.suppress(AttributeError):
2237 return self.__names
2238 self.__names = super(FastLookup, self).namelist()
2239 return self.__names
2241 def _name_set(self):
2242 with contextlib.suppress(AttributeError):
2243 return self.__lookup
2244 self.__lookup = super(FastLookup, self)._name_set()
2245 return self.__lookup
2248class Path:
2249 """
2250 A pathlib-compatible interface for zip files.
2252 Consider a zip file with this structure::
2254 .
2255 ├── a.txt
2256 └── b
2257 ├── c.txt
2258 └── d
2259 └── e.txt
2261 >>> data = io.BytesIO()
2262 >>> zf = ZipFile(data, 'w')
2263 >>> zf.writestr('a.txt', 'content of a')
2264 >>> zf.writestr('b/c.txt', 'content of c')
2265 >>> zf.writestr('b/d/e.txt', 'content of e')
2266 >>> zf.filename = 'abcde.zip'
2268 Path accepts the zipfile object itself or a filename
2270 >>> root = Path(zf)
2272 From there, several path operations are available.
2274 Directory iteration (including the zip file itself):
2276 >>> a, b = root.iterdir()
2277 >>> a
2278 Path('abcde.zip', 'a.txt')
2279 >>> b
2280 Path('abcde.zip', 'b/')
2282 name property:
2284 >>> b.name
2285 'b'
2287 join with divide operator:
2289 >>> c = b / 'c.txt'
2290 >>> c
2291 Path('abcde.zip', 'b/c.txt')
2292 >>> c.name
2293 'c.txt'
2295 Read text:
2297 >>> c.read_text()
2298 'content of c'
2300 existence:
2302 >>> c.exists()
2303 True
2304 >>> (b / 'missing.txt').exists()
2305 False
2307 Coercion to string:
2309 >>> str(c)
2310 'abcde.zip/b/c.txt'
2311 """
2313 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
2315 def __init__(self, root, at=""):
2316 self.root = FastLookup.make(root)
2317 self.at = at
2319 @property
2320 def open(self):
2321 return functools.partial(self.root.open, self.at)
2323 @property
2324 def name(self):
2325 return posixpath.basename(self.at.rstrip("/"))
2327 def read_text(self, *args, **kwargs):
2328 with self.open() as strm:
2329 return io.TextIOWrapper(strm, *args, **kwargs).read()
2331 def read_bytes(self):
2332 with self.open() as strm:
2333 return strm.read()
2335 def _is_child(self, path):
2336 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
2338 def _next(self, at):
2339 return Path(self.root, at)
2341 def is_dir(self):
2342 return not self.at or self.at.endswith("/")
2344 def is_file(self):
2345 return not self.is_dir()
2347 def exists(self):
2348 return self.at in self.root._name_set()
2350 def iterdir(self):
2351 if not self.is_dir():
2352 raise ValueError("Can't listdir a file")
2353 subs = map(self._next, self.root.namelist())
2354 return filter(self._is_child, subs)
2356 def __str__(self):
2357 return posixpath.join(self.root.filename, self.at)
2359 def __repr__(self):
2360 return self.__repr.format(self=self)
2362 def joinpath(self, add):
2363 next = posixpath.join(self.at, add)
2364 return self._next(self.root.resolve_dir(next))
2366 __truediv__ = joinpath
2368 @property
2369 def parent(self):
2370 parent_at = posixpath.dirname(self.at.rstrip('/'))
2371 if parent_at:
2372 parent_at += '/'
2373 return self._next(parent_at)
2376def main(args=None):
2377 import argparse
2379 description = 'A simple command-line interface for zipfile module.'
2380 parser = argparse.ArgumentParser(description=description)
2381 group = parser.add_mutually_exclusive_group(required=True)
2382 group.add_argument('-l', '--list', metavar='<zipfile>',
2383 help='Show listing of a zipfile')
2384 group.add_argument('-e', '--extract', nargs=2,
2385 metavar=('<zipfile>', '<output_dir>'),
2386 help='Extract zipfile into target dir')
2387 group.add_argument('-c', '--create', nargs='+',
2388 metavar=('<name>', '<file>'),
2389 help='Create zipfile from sources')
2390 group.add_argument('-t', '--test', metavar='<zipfile>',
2391 help='Test if a zipfile is valid')
2392 args = parser.parse_args(args)
2394 if args.test is not None:
2395 src = args.test
2396 with ZipFile(src, 'r') as zf:
2397 badfile = zf.testzip()
2398 if badfile:
2399 print("The following enclosed file is corrupted: {!r}".format(badfile))
2400 print("Done testing")
2402 elif args.list is not None:
2403 src = args.list
2404 with ZipFile(src, 'r') as zf:
2405 zf.printdir()
2407 elif args.extract is not None:
2408 src, curdir = args.extract
2409 with ZipFile(src, 'r') as zf:
2410 zf.extractall(curdir)
2412 elif args.create is not None:
2413 zip_name = args.create.pop(0)
2414 files = args.create
2416 def addToZip(zf, path, zippath):
2417 if os.path.isfile(path):
2418 zf.write(path, zippath, ZIP_DEFLATED)
2419 elif os.path.isdir(path):
2420 if zippath:
2421 zf.write(path, zippath)
2422 for nm in sorted(os.listdir(path)):
2423 addToZip(zf,
2424 os.path.join(path, nm), os.path.join(zippath, nm))
2425 # else: ignore
2427 with ZipFile(zip_name, 'w') as zf:
2428 for path in files:
2429 zippath = os.path.basename(path)
2430 if not zippath:
2431 zippath = os.path.basename(os.path.dirname(path))
2432 if zippath in ('', os.curdir, os.pardir):
2433 zippath = ''
2434 addToZip(zf, path, zippath)
2437if __name__ == "__main__":
2438 main()