Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/gzip.py: 16%

367 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-20 07:00 +0000

1"""Functions that read and write gzipped files. 

2 

3The user of the file doesn't have to worry about the compression, 

4but random access is not allowed.""" 

5 

6# based on Andrew Kuchling's minigzip.py distributed with the zlib module 

7 

8import struct, sys, time, os 

9import zlib 

10import builtins 

11import io 

12import _compression 

13 

14__all__ = ["BadGzipFile", "GzipFile", "open", "compress", "decompress"] 

15 

16FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 

17 

18READ, WRITE = 1, 2 

19 

20_COMPRESS_LEVEL_FAST = 1 

21_COMPRESS_LEVEL_TRADEOFF = 6 

22_COMPRESS_LEVEL_BEST = 9 

23 

24 

25def open(filename, mode="rb", compresslevel=_COMPRESS_LEVEL_BEST, 

26 encoding=None, errors=None, newline=None): 

27 """Open a gzip-compressed file in binary or text mode. 

28 

29 The filename argument can be an actual filename (a str or bytes object), or 

30 an existing file object to read from or write to. 

31 

32 The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for 

33 binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is 

34 "rb", and the default compresslevel is 9. 

35 

36 For binary mode, this function is equivalent to the GzipFile constructor: 

37 GzipFile(filename, mode, compresslevel). In this case, the encoding, errors 

38 and newline arguments must not be provided. 

39 

40 For text mode, a GzipFile object is created, and wrapped in an 

41 io.TextIOWrapper instance with the specified encoding, error handling 

42 behavior, and line ending(s). 

43 

44 """ 

45 if "t" in mode: 

46 if "b" in mode: 

47 raise ValueError("Invalid mode: %r" % (mode,)) 

48 else: 

49 if encoding is not None: 

50 raise ValueError("Argument 'encoding' not supported in binary mode") 

51 if errors is not None: 

52 raise ValueError("Argument 'errors' not supported in binary mode") 

53 if newline is not None: 

54 raise ValueError("Argument 'newline' not supported in binary mode") 

55 

56 gz_mode = mode.replace("t", "") 

57 if isinstance(filename, (str, bytes, os.PathLike)): 

58 binary_file = GzipFile(filename, gz_mode, compresslevel) 

59 elif hasattr(filename, "read") or hasattr(filename, "write"): 

60 binary_file = GzipFile(None, gz_mode, compresslevel, filename) 

61 else: 

62 raise TypeError("filename must be a str or bytes object, or a file") 

63 

64 if "t" in mode: 

65 return io.TextIOWrapper(binary_file, encoding, errors, newline) 

66 else: 

67 return binary_file 

68 

69def write32u(output, value): 

70 # The L format writes the bit pattern correctly whether signed 

71 # or unsigned. 

72 output.write(struct.pack("<L", value)) 

73 

74class _PaddedFile: 

75 """Minimal read-only file object that prepends a string to the contents 

76 of an actual file. Shouldn't be used outside of gzip.py, as it lacks 

77 essential functionality.""" 

78 

79 def __init__(self, f, prepend=b''): 

80 self._buffer = prepend 

81 self._length = len(prepend) 

82 self.file = f 

83 self._read = 0 

84 

85 def read(self, size): 

86 if self._read is None: 

87 return self.file.read(size) 

88 if self._read + size <= self._length: 

89 read = self._read 

90 self._read += size 

91 return self._buffer[read:self._read] 

92 else: 

93 read = self._read 

94 self._read = None 

95 return self._buffer[read:] + \ 

96 self.file.read(size-self._length+read) 

97 

98 def prepend(self, prepend=b''): 

99 if self._read is None: 

100 self._buffer = prepend 

101 else: # Assume data was read since the last prepend() call 

102 self._read -= len(prepend) 

103 return 

104 self._length = len(self._buffer) 

105 self._read = 0 

106 

107 def seek(self, off): 

108 self._read = None 

109 self._buffer = None 

110 return self.file.seek(off) 

111 

112 def seekable(self): 

113 return True # Allows fast-forwarding even in unseekable streams 

114 

115 

116class BadGzipFile(OSError): 

117 """Exception raised in some cases for invalid gzip files.""" 

118 

119 

120class GzipFile(_compression.BaseStream): 

121 """The GzipFile class simulates most of the methods of a file object with 

122 the exception of the truncate() method. 

123 

124 This class only supports opening files in binary mode. If you need to open a 

125 compressed file in text mode, use the gzip.open() function. 

126 

127 """ 

128 

129 # Overridden with internal file object to be closed, if only a filename 

130 # is passed in 

131 myfileobj = None 

132 

133 def __init__(self, filename=None, mode=None, 

134 compresslevel=_COMPRESS_LEVEL_BEST, fileobj=None, mtime=None): 

135 """Constructor for the GzipFile class. 

136 

137 At least one of fileobj and filename must be given a 

138 non-trivial value. 

139 

140 The new class instance is based on fileobj, which can be a regular 

141 file, an io.BytesIO object, or any other object which simulates a file. 

142 It defaults to None, in which case filename is opened to provide 

143 a file object. 

144 

145 When fileobj is not None, the filename argument is only used to be 

146 included in the gzip file header, which may include the original 

147 filename of the uncompressed file. It defaults to the filename of 

148 fileobj, if discernible; otherwise, it defaults to the empty string, 

149 and in this case the original filename is not included in the header. 

150 

151 The mode argument can be any of 'r', 'rb', 'a', 'ab', 'w', 'wb', 'x', or 

152 'xb' depending on whether the file will be read or written. The default 

153 is the mode of fileobj if discernible; otherwise, the default is 'rb'. 

154 A mode of 'r' is equivalent to one of 'rb', and similarly for 'w' and 

155 'wb', 'a' and 'ab', and 'x' and 'xb'. 

156 

157 The compresslevel argument is an integer from 0 to 9 controlling the 

158 level of compression; 1 is fastest and produces the least compression, 

159 and 9 is slowest and produces the most compression. 0 is no compression 

160 at all. The default is 9. 

161 

162 The mtime argument is an optional numeric timestamp to be written 

163 to the last modification time field in the stream when compressing. 

164 If omitted or None, the current time is used. 

165 

166 """ 

167 

168 if mode and ('t' in mode or 'U' in mode): 

169 raise ValueError("Invalid mode: {!r}".format(mode)) 

170 if mode and 'b' not in mode: 

171 mode += 'b' 

172 if fileobj is None: 

173 fileobj = self.myfileobj = builtins.open(filename, mode or 'rb') 

174 if filename is None: 

175 filename = getattr(fileobj, 'name', '') 

176 if not isinstance(filename, (str, bytes)): 

177 filename = '' 

178 else: 

179 filename = os.fspath(filename) 

180 origmode = mode 

181 if mode is None: 

182 mode = getattr(fileobj, 'mode', 'rb') 

183 

184 if mode.startswith('r'): 

185 self.mode = READ 

186 raw = _GzipReader(fileobj) 

187 self._buffer = io.BufferedReader(raw) 

188 self.name = filename 

189 

190 elif mode.startswith(('w', 'a', 'x')): 

191 if origmode is None: 

192 import warnings 

193 warnings.warn( 

194 "GzipFile was opened for writing, but this will " 

195 "change in future Python releases. " 

196 "Specify the mode argument for opening it for writing.", 

197 FutureWarning, 2) 

198 self.mode = WRITE 

199 self._init_write(filename) 

200 self.compress = zlib.compressobj(compresslevel, 

201 zlib.DEFLATED, 

202 -zlib.MAX_WBITS, 

203 zlib.DEF_MEM_LEVEL, 

204 0) 

205 self._write_mtime = mtime 

206 else: 

207 raise ValueError("Invalid mode: {!r}".format(mode)) 

208 

209 self.fileobj = fileobj 

210 

211 if self.mode == WRITE: 

212 self._write_gzip_header(compresslevel) 

213 

214 @property 

215 def filename(self): 

216 import warnings 

217 warnings.warn("use the name attribute", DeprecationWarning, 2) 

218 if self.mode == WRITE and self.name[-3:] != ".gz": 

219 return self.name + ".gz" 

220 return self.name 

221 

222 @property 

223 def mtime(self): 

224 """Last modification time read from stream, or None""" 

225 return self._buffer.raw._last_mtime 

226 

227 def __repr__(self): 

228 s = repr(self.fileobj) 

229 return '<gzip ' + s[1:-1] + ' ' + hex(id(self)) + '>' 

230 

231 def _init_write(self, filename): 

232 self.name = filename 

233 self.crc = zlib.crc32(b"") 

234 self.size = 0 

235 self.writebuf = [] 

236 self.bufsize = 0 

237 self.offset = 0 # Current file offset for seek(), tell(), etc 

238 

239 def _write_gzip_header(self, compresslevel): 

240 self.fileobj.write(b'\037\213') # magic header 

241 self.fileobj.write(b'\010') # compression method 

242 try: 

243 # RFC 1952 requires the FNAME field to be Latin-1. Do not 

244 # include filenames that cannot be represented that way. 

245 fname = os.path.basename(self.name) 

246 if not isinstance(fname, bytes): 

247 fname = fname.encode('latin-1') 

248 if fname.endswith(b'.gz'): 

249 fname = fname[:-3] 

250 except UnicodeEncodeError: 

251 fname = b'' 

252 flags = 0 

253 if fname: 

254 flags = FNAME 

255 self.fileobj.write(chr(flags).encode('latin-1')) 

256 mtime = self._write_mtime 

257 if mtime is None: 

258 mtime = time.time() 

259 write32u(self.fileobj, int(mtime)) 

260 if compresslevel == _COMPRESS_LEVEL_BEST: 

261 xfl = b'\002' 

262 elif compresslevel == _COMPRESS_LEVEL_FAST: 

263 xfl = b'\004' 

264 else: 

265 xfl = b'\000' 

266 self.fileobj.write(xfl) 

267 self.fileobj.write(b'\377') 

268 if fname: 

269 self.fileobj.write(fname + b'\000') 

270 

271 def write(self,data): 

272 self._check_not_closed() 

273 if self.mode != WRITE: 

274 import errno 

275 raise OSError(errno.EBADF, "write() on read-only GzipFile object") 

276 

277 if self.fileobj is None: 

278 raise ValueError("write() on closed GzipFile object") 

279 

280 if isinstance(data, bytes): 

281 length = len(data) 

282 else: 

283 # accept any data that supports the buffer protocol 

284 data = memoryview(data) 

285 length = data.nbytes 

286 

287 if length > 0: 

288 self.fileobj.write(self.compress.compress(data)) 

289 self.size += length 

290 self.crc = zlib.crc32(data, self.crc) 

291 self.offset += length 

292 

293 return length 

294 

295 def read(self, size=-1): 

296 self._check_not_closed() 

297 if self.mode != READ: 

298 import errno 

299 raise OSError(errno.EBADF, "read() on write-only GzipFile object") 

300 return self._buffer.read(size) 

301 

302 def read1(self, size=-1): 

303 """Implements BufferedIOBase.read1() 

304 

305 Reads up to a buffer's worth of data if size is negative.""" 

306 self._check_not_closed() 

307 if self.mode != READ: 

308 import errno 

309 raise OSError(errno.EBADF, "read1() on write-only GzipFile object") 

310 

311 if size < 0: 

312 size = io.DEFAULT_BUFFER_SIZE 

313 return self._buffer.read1(size) 

314 

315 def peek(self, n): 

316 self._check_not_closed() 

317 if self.mode != READ: 

318 import errno 

319 raise OSError(errno.EBADF, "peek() on write-only GzipFile object") 

320 return self._buffer.peek(n) 

321 

322 @property 

323 def closed(self): 

324 return self.fileobj is None 

325 

326 def close(self): 

327 fileobj = self.fileobj 

328 if fileobj is None: 

329 return 

330 self.fileobj = None 

331 try: 

332 if self.mode == WRITE: 

333 fileobj.write(self.compress.flush()) 

334 write32u(fileobj, self.crc) 

335 # self.size may exceed 2 GiB, or even 4 GiB 

336 write32u(fileobj, self.size & 0xffffffff) 

337 elif self.mode == READ: 

338 self._buffer.close() 

339 finally: 

340 myfileobj = self.myfileobj 

341 if myfileobj: 

342 self.myfileobj = None 

343 myfileobj.close() 

344 

345 def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH): 

346 self._check_not_closed() 

347 if self.mode == WRITE: 

348 # Ensure the compressor's buffer is flushed 

349 self.fileobj.write(self.compress.flush(zlib_mode)) 

350 self.fileobj.flush() 

351 

352 def fileno(self): 

353 """Invoke the underlying file object's fileno() method. 

354 

355 This will raise AttributeError if the underlying file object 

356 doesn't support fileno(). 

357 """ 

358 return self.fileobj.fileno() 

359 

360 def rewind(self): 

361 '''Return the uncompressed stream file position indicator to the 

362 beginning of the file''' 

363 if self.mode != READ: 

364 raise OSError("Can't rewind in write mode") 

365 self._buffer.seek(0) 

366 

367 def readable(self): 

368 return self.mode == READ 

369 

370 def writable(self): 

371 return self.mode == WRITE 

372 

373 def seekable(self): 

374 return True 

375 

376 def seek(self, offset, whence=io.SEEK_SET): 

377 if self.mode == WRITE: 

378 if whence != io.SEEK_SET: 

379 if whence == io.SEEK_CUR: 

380 offset = self.offset + offset 

381 else: 

382 raise ValueError('Seek from end not supported') 

383 if offset < self.offset: 

384 raise OSError('Negative seek in write mode') 

385 count = offset - self.offset 

386 chunk = b'\0' * 1024 

387 for i in range(count // 1024): 

388 self.write(chunk) 

389 self.write(b'\0' * (count % 1024)) 

390 elif self.mode == READ: 

391 self._check_not_closed() 

392 return self._buffer.seek(offset, whence) 

393 

394 return self.offset 

395 

396 def readline(self, size=-1): 

397 self._check_not_closed() 

398 return self._buffer.readline(size) 

399 

400 

401class _GzipReader(_compression.DecompressReader): 

402 def __init__(self, fp): 

403 super().__init__(_PaddedFile(fp), zlib.decompressobj, 

404 wbits=-zlib.MAX_WBITS) 

405 # Set flag indicating start of a new member 

406 self._new_member = True 

407 self._last_mtime = None 

408 

409 def _init_read(self): 

410 self._crc = zlib.crc32(b"") 

411 self._stream_size = 0 # Decompressed size of unconcatenated stream 

412 

413 def _read_exact(self, n): 

414 '''Read exactly *n* bytes from `self._fp` 

415 

416 This method is required because self._fp may be unbuffered, 

417 i.e. return short reads. 

418 ''' 

419 

420 data = self._fp.read(n) 

421 while len(data) < n: 

422 b = self._fp.read(n - len(data)) 

423 if not b: 

424 raise EOFError("Compressed file ended before the " 

425 "end-of-stream marker was reached") 

426 data += b 

427 return data 

428 

429 def _read_gzip_header(self): 

430 magic = self._fp.read(2) 

431 if magic == b'': 

432 return False 

433 

434 if magic != b'\037\213': 

435 raise BadGzipFile('Not a gzipped file (%r)' % magic) 

436 

437 (method, flag, 

438 self._last_mtime) = struct.unpack("<BBIxx", self._read_exact(8)) 

439 if method != 8: 

440 raise BadGzipFile('Unknown compression method') 

441 

442 if flag & FEXTRA: 

443 # Read & discard the extra field, if present 

444 extra_len, = struct.unpack("<H", self._read_exact(2)) 

445 self._read_exact(extra_len) 

446 if flag & FNAME: 

447 # Read and discard a null-terminated string containing the filename 

448 while True: 

449 s = self._fp.read(1) 

450 if not s or s==b'\000': 

451 break 

452 if flag & FCOMMENT: 

453 # Read and discard a null-terminated string containing a comment 

454 while True: 

455 s = self._fp.read(1) 

456 if not s or s==b'\000': 

457 break 

458 if flag & FHCRC: 

459 self._read_exact(2) # Read & discard the 16-bit header CRC 

460 return True 

461 

462 def read(self, size=-1): 

463 if size < 0: 

464 return self.readall() 

465 # size=0 is special because decompress(max_length=0) is not supported 

466 if not size: 

467 return b"" 

468 

469 # For certain input data, a single 

470 # call to decompress() may not return 

471 # any data. In this case, retry until we get some data or reach EOF. 

472 while True: 

473 if self._decompressor.eof: 

474 # Ending case: we've come to the end of a member in the file, 

475 # so finish up this member, and read a new gzip header. 

476 # Check the CRC and file size, and set the flag so we read 

477 # a new member 

478 self._read_eof() 

479 self._new_member = True 

480 self._decompressor = self._decomp_factory( 

481 **self._decomp_args) 

482 

483 if self._new_member: 

484 # If the _new_member flag is set, we have to 

485 # jump to the next member, if there is one. 

486 self._init_read() 

487 if not self._read_gzip_header(): 

488 self._size = self._pos 

489 return b"" 

490 self._new_member = False 

491 

492 # Read a chunk of data from the file 

493 buf = self._fp.read(io.DEFAULT_BUFFER_SIZE) 

494 

495 uncompress = self._decompressor.decompress(buf, size) 

496 if self._decompressor.unconsumed_tail != b"": 

497 self._fp.prepend(self._decompressor.unconsumed_tail) 

498 elif self._decompressor.unused_data != b"": 

499 # Prepend the already read bytes to the fileobj so they can 

500 # be seen by _read_eof() and _read_gzip_header() 

501 self._fp.prepend(self._decompressor.unused_data) 

502 

503 if uncompress != b"": 

504 break 

505 if buf == b"": 

506 raise EOFError("Compressed file ended before the " 

507 "end-of-stream marker was reached") 

508 

509 self._add_read_data( uncompress ) 

510 self._pos += len(uncompress) 

511 return uncompress 

512 

513 def _add_read_data(self, data): 

514 self._crc = zlib.crc32(data, self._crc) 

515 self._stream_size = self._stream_size + len(data) 

516 

517 def _read_eof(self): 

518 # We've read to the end of the file 

519 # We check the that the computed CRC and size of the 

520 # uncompressed data matches the stored values. Note that the size 

521 # stored is the true file size mod 2**32. 

522 crc32, isize = struct.unpack("<II", self._read_exact(8)) 

523 if crc32 != self._crc: 

524 raise BadGzipFile("CRC check failed %s != %s" % (hex(crc32), 

525 hex(self._crc))) 

526 elif isize != (self._stream_size & 0xffffffff): 

527 raise BadGzipFile("Incorrect length of data produced") 

528 

529 # Gzip files can be padded with zeroes and still have archives. 

530 # Consume all zero bytes and set the file position to the first 

531 # non-zero byte. See http://www.gzip.org/#faq8 

532 c = b"\x00" 

533 while c == b"\x00": 

534 c = self._fp.read(1) 

535 if c: 

536 self._fp.prepend(c) 

537 

538 def _rewind(self): 

539 super()._rewind() 

540 self._new_member = True 

541 

542def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=None): 

543 """Compress data in one shot and return the compressed string. 

544 Optional argument is the compression level, in range of 0-9. 

545 """ 

546 buf = io.BytesIO() 

547 with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel, mtime=mtime) as f: 

548 f.write(data) 

549 return buf.getvalue() 

550 

551def decompress(data): 

552 """Decompress a gzip compressed string in one shot. 

553 Return the decompressed string. 

554 """ 

555 with GzipFile(fileobj=io.BytesIO(data)) as f: 

556 return f.read() 

557 

558 

559def main(): 

560 from argparse import ArgumentParser 

561 parser = ArgumentParser(description= 

562 "A simple command line interface for the gzip module: act like gzip, " 

563 "but do not delete the input file.") 

564 group = parser.add_mutually_exclusive_group() 

565 group.add_argument('--fast', action='store_true', help='compress faster') 

566 group.add_argument('--best', action='store_true', help='compress better') 

567 group.add_argument("-d", "--decompress", action="store_true", 

568 help="act like gunzip instead of gzip") 

569 

570 parser.add_argument("args", nargs="*", default=["-"], metavar='file') 

571 args = parser.parse_args() 

572 

573 compresslevel = _COMPRESS_LEVEL_TRADEOFF 

574 if args.fast: 

575 compresslevel = _COMPRESS_LEVEL_FAST 

576 elif args.best: 

577 compresslevel = _COMPRESS_LEVEL_BEST 

578 

579 for arg in args.args: 

580 if args.decompress: 

581 if arg == "-": 

582 f = GzipFile(filename="", mode="rb", fileobj=sys.stdin.buffer) 

583 g = sys.stdout.buffer 

584 else: 

585 if arg[-3:] != ".gz": 

586 sys.exit(f"filename doesn't end in .gz: {arg!r}") 

587 f = open(arg, "rb") 

588 g = builtins.open(arg[:-3], "wb") 

589 else: 

590 if arg == "-": 

591 f = sys.stdin.buffer 

592 g = GzipFile(filename="", mode="wb", fileobj=sys.stdout.buffer, 

593 compresslevel=compresslevel) 

594 else: 

595 f = builtins.open(arg, "rb") 

596 g = open(arg + ".gz", "wb") 

597 while True: 

598 chunk = f.read(1024) 

599 if not chunk: 

600 break 

601 g.write(chunk) 

602 if g is not sys.stdout.buffer: 

603 g.close() 

604 if f is not sys.stdin.buffer: 

605 f.close() 

606 

607if __name__ == '__main__': 

608 main()