Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/compressor.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

305 statements  

1"""Classes and functions for managing compressors.""" 

2 

3import io 

4import zlib 

5 

6from joblib.backports import LooseVersion 

7 

8try: 

9 from threading import RLock 

10except ImportError: 

11 from dummy_threading import RLock 

12 

13try: 

14 import bz2 

15except ImportError: 

16 bz2 = None 

17 

18try: 

19 import lz4 

20 from lz4.frame import LZ4FrameFile 

21except ImportError: 

22 lz4 = None 

23 

24try: 

25 import lzma 

26except ImportError: 

27 lzma = None 

28 

29 

30LZ4_NOT_INSTALLED_ERROR = ( 

31 "LZ4 is not installed. Install it with pip: https://python-lz4.readthedocs.io/" 

32) 

33 

34# Registered compressors 

35_COMPRESSORS = {} 

36 

37# Magic numbers of supported compression file formats. 

38_ZFILE_PREFIX = b"ZF" # used with pickle files created before 0.9.3. 

39_ZLIB_PREFIX = b"\x78" 

40_GZIP_PREFIX = b"\x1f\x8b" 

41_BZ2_PREFIX = b"BZ" 

42_XZ_PREFIX = b"\xfd\x37\x7a\x58\x5a" 

43_LZMA_PREFIX = b"\x5d\x00" 

44_LZ4_PREFIX = b"\x04\x22\x4d\x18" 

45 

46 

47def register_compressor(compressor_name, compressor, force=False): 

48 """Register a new compressor. 

49 

50 Parameters 

51 ---------- 

52 compressor_name: str. 

53 The name of the compressor. 

54 compressor: CompressorWrapper 

55 An instance of a 'CompressorWrapper'. 

56 """ 

57 global _COMPRESSORS 

58 if not isinstance(compressor_name, str): 

59 raise ValueError( 

60 "Compressor name should be a string, '{}' given.".format(compressor_name) 

61 ) 

62 

63 if not isinstance(compressor, CompressorWrapper): 

64 raise ValueError( 

65 "Compressor should implement the CompressorWrapper " 

66 "interface, '{}' given.".format(compressor) 

67 ) 

68 

69 if compressor.fileobj_factory is not None and ( 

70 not hasattr(compressor.fileobj_factory, "read") 

71 or not hasattr(compressor.fileobj_factory, "write") 

72 or not hasattr(compressor.fileobj_factory, "seek") 

73 or not hasattr(compressor.fileobj_factory, "tell") 

74 ): 

75 raise ValueError( 

76 "Compressor 'fileobj_factory' attribute should " 

77 "implement the file object interface, '{}' given.".format( 

78 compressor.fileobj_factory 

79 ) 

80 ) 

81 

82 if compressor_name in _COMPRESSORS and not force: 

83 raise ValueError("Compressor '{}' already registered.".format(compressor_name)) 

84 

85 _COMPRESSORS[compressor_name] = compressor 

86 

87 

88class CompressorWrapper: 

89 """A wrapper around a compressor file object. 

90 

91 Attributes 

92 ---------- 

93 obj: a file-like object 

94 The object must implement the buffer interface and will be used 

95 internally to compress/decompress the data. 

96 prefix: bytestring 

97 A bytestring corresponding to the magic number that identifies the 

98 file format associated to the compressor. 

99 extension: str 

100 The file extension used to automatically select this compressor during 

101 a dump to a file. 

102 """ 

103 

104 def __init__(self, obj, prefix=b"", extension=""): 

105 self.fileobj_factory = obj 

106 self.prefix = prefix 

107 self.extension = extension 

108 

109 def compressor_file(self, fileobj, compresslevel=None): 

110 """Returns an instance of a compressor file object.""" 

111 if compresslevel is None: 

112 return self.fileobj_factory(fileobj, "wb") 

113 else: 

114 return self.fileobj_factory(fileobj, "wb", compresslevel=compresslevel) 

115 

116 def decompressor_file(self, fileobj): 

117 """Returns an instance of a decompressor file object.""" 

118 return self.fileobj_factory(fileobj, "rb") 

119 

120 

121class BZ2CompressorWrapper(CompressorWrapper): 

122 prefix = _BZ2_PREFIX 

123 extension = ".bz2" 

124 

125 def __init__(self): 

126 if bz2 is not None: 

127 self.fileobj_factory = bz2.BZ2File 

128 else: 

129 self.fileobj_factory = None 

130 

131 def _check_versions(self): 

132 if bz2 is None: 

133 raise ValueError( 

134 "bz2 module is not compiled on your python standard library." 

135 ) 

136 

137 def compressor_file(self, fileobj, compresslevel=None): 

138 """Returns an instance of a compressor file object.""" 

139 self._check_versions() 

140 if compresslevel is None: 

141 return self.fileobj_factory(fileobj, "wb") 

142 else: 

143 return self.fileobj_factory(fileobj, "wb", compresslevel=compresslevel) 

144 

145 def decompressor_file(self, fileobj): 

146 """Returns an instance of a decompressor file object.""" 

147 self._check_versions() 

148 fileobj = self.fileobj_factory(fileobj, "rb") 

149 return fileobj 

150 

151 

152class LZMACompressorWrapper(CompressorWrapper): 

153 prefix = _LZMA_PREFIX 

154 extension = ".lzma" 

155 _lzma_format_name = "FORMAT_ALONE" 

156 

157 def __init__(self): 

158 if lzma is not None: 

159 self.fileobj_factory = lzma.LZMAFile 

160 self._lzma_format = getattr(lzma, self._lzma_format_name) 

161 else: 

162 self.fileobj_factory = None 

163 

164 def _check_versions(self): 

165 if lzma is None: 

166 raise ValueError( 

167 "lzma module is not compiled on your python standard library." 

168 ) 

169 

170 def compressor_file(self, fileobj, compresslevel=None): 

171 """Returns an instance of a compressor file object.""" 

172 if compresslevel is None: 

173 return self.fileobj_factory(fileobj, "wb", format=self._lzma_format) 

174 else: 

175 return self.fileobj_factory( 

176 fileobj, "wb", format=self._lzma_format, preset=compresslevel 

177 ) 

178 

179 def decompressor_file(self, fileobj): 

180 """Returns an instance of a decompressor file object.""" 

181 return lzma.LZMAFile(fileobj, "rb") 

182 

183 

184class XZCompressorWrapper(LZMACompressorWrapper): 

185 prefix = _XZ_PREFIX 

186 extension = ".xz" 

187 _lzma_format_name = "FORMAT_XZ" 

188 

189 

190class LZ4CompressorWrapper(CompressorWrapper): 

191 prefix = _LZ4_PREFIX 

192 extension = ".lz4" 

193 

194 def __init__(self): 

195 if lz4 is not None: 

196 self.fileobj_factory = LZ4FrameFile 

197 else: 

198 self.fileobj_factory = None 

199 

200 def _check_versions(self): 

201 if lz4 is None: 

202 raise ValueError(LZ4_NOT_INSTALLED_ERROR) 

203 lz4_version = lz4.__version__ 

204 if lz4_version.startswith("v"): 

205 lz4_version = lz4_version[1:] 

206 if LooseVersion(lz4_version) < LooseVersion("0.19"): 

207 raise ValueError(LZ4_NOT_INSTALLED_ERROR) 

208 

209 def compressor_file(self, fileobj, compresslevel=None): 

210 """Returns an instance of a compressor file object.""" 

211 self._check_versions() 

212 if compresslevel is None: 

213 return self.fileobj_factory(fileobj, "wb") 

214 else: 

215 return self.fileobj_factory(fileobj, "wb", compression_level=compresslevel) 

216 

217 def decompressor_file(self, fileobj): 

218 """Returns an instance of a decompressor file object.""" 

219 self._check_versions() 

220 return self.fileobj_factory(fileobj, "rb") 

221 

222 

223############################################################################### 

224# base file compression/decompression object definition 

225_MODE_CLOSED = 0 

226_MODE_READ = 1 

227_MODE_READ_EOF = 2 

228_MODE_WRITE = 3 

229_BUFFER_SIZE = 8192 

230 

231 

232class BinaryZlibFile(io.BufferedIOBase): 

233 """A file object providing transparent zlib (de)compression. 

234 

235 TODO python2_drop: is it still needed since we dropped Python 2 support A 

236 BinaryZlibFile can act as a wrapper for an existing file object, or refer 

237 directly to a named file on disk. 

238 

239 Note that BinaryZlibFile provides only a *binary* file interface: data read 

240 is returned as bytes, and data to be written should be given as bytes. 

241 

242 This object is an adaptation of the BZ2File object and is compatible with 

243 versions of python >= 2.7. 

244 

245 If filename is a str or bytes object, it gives the name 

246 of the file to be opened. Otherwise, it should be a file object, 

247 which will be used to read or write the compressed data. 

248 

249 mode can be 'rb' for reading (default) or 'wb' for (over)writing 

250 

251 If mode is 'wb', compresslevel can be a number between 1 

252 and 9 specifying the level of compression: 1 produces the least 

253 compression, and 9 produces the most compression. 3 is the default. 

254 """ 

255 

256 wbits = zlib.MAX_WBITS 

257 

258 def __init__(self, filename, mode="rb", compresslevel=3): 

259 # This lock must be recursive, so that BufferedIOBase's 

260 # readline(), readlines() and writelines() don't deadlock. 

261 self._lock = RLock() 

262 self._fp = None 

263 self._closefp = False 

264 self._mode = _MODE_CLOSED 

265 self._pos = 0 

266 self._size = -1 

267 self.compresslevel = compresslevel 

268 

269 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9): 

270 raise ValueError( 

271 "'compresslevel' must be an integer " 

272 "between 1 and 9. You provided 'compresslevel={}'".format(compresslevel) 

273 ) 

274 

275 if mode == "rb": 

276 self._mode = _MODE_READ 

277 self._decompressor = zlib.decompressobj(self.wbits) 

278 self._buffer = b"" 

279 self._buffer_offset = 0 

280 elif mode == "wb": 

281 self._mode = _MODE_WRITE 

282 self._compressor = zlib.compressobj( 

283 self.compresslevel, zlib.DEFLATED, self.wbits, zlib.DEF_MEM_LEVEL, 0 

284 ) 

285 else: 

286 raise ValueError("Invalid mode: %r" % (mode,)) 

287 

288 if isinstance(filename, str): 

289 self._fp = io.open(filename, mode) 

290 self._closefp = True 

291 elif hasattr(filename, "read") or hasattr(filename, "write"): 

292 self._fp = filename 

293 else: 

294 raise TypeError("filename must be a str or bytes object, or a file") 

295 

296 def close(self): 

297 """Flush and close the file. 

298 

299 May be called more than once without error. Once the file is 

300 closed, any other operation on it will raise a ValueError. 

301 """ 

302 with self._lock: 

303 if self._mode == _MODE_CLOSED: 

304 return 

305 try: 

306 if self._mode in (_MODE_READ, _MODE_READ_EOF): 

307 self._decompressor = None 

308 elif self._mode == _MODE_WRITE: 

309 self._fp.write(self._compressor.flush()) 

310 self._compressor = None 

311 finally: 

312 try: 

313 if self._closefp: 

314 self._fp.close() 

315 finally: 

316 self._fp = None 

317 self._closefp = False 

318 self._mode = _MODE_CLOSED 

319 self._buffer = b"" 

320 self._buffer_offset = 0 

321 

322 @property 

323 def closed(self): 

324 """True if this file is closed.""" 

325 return self._mode == _MODE_CLOSED 

326 

327 def fileno(self): 

328 """Return the file descriptor for the underlying file.""" 

329 self._check_not_closed() 

330 return self._fp.fileno() 

331 

332 def seekable(self): 

333 """Return whether the file supports seeking.""" 

334 return self.readable() and self._fp.seekable() 

335 

336 def readable(self): 

337 """Return whether the file was opened for reading.""" 

338 self._check_not_closed() 

339 return self._mode in (_MODE_READ, _MODE_READ_EOF) 

340 

341 def writable(self): 

342 """Return whether the file was opened for writing.""" 

343 self._check_not_closed() 

344 return self._mode == _MODE_WRITE 

345 

346 # Mode-checking helper functions. 

347 

348 def _check_not_closed(self): 

349 if self.closed: 

350 fname = getattr(self._fp, "name", None) 

351 msg = "I/O operation on closed file" 

352 if fname is not None: 

353 msg += " {}".format(fname) 

354 msg += "." 

355 raise ValueError(msg) 

356 

357 def _check_can_read(self): 

358 if self._mode not in (_MODE_READ, _MODE_READ_EOF): 

359 self._check_not_closed() 

360 raise io.UnsupportedOperation("File not open for reading") 

361 

362 def _check_can_write(self): 

363 if self._mode != _MODE_WRITE: 

364 self._check_not_closed() 

365 raise io.UnsupportedOperation("File not open for writing") 

366 

367 def _check_can_seek(self): 

368 if self._mode not in (_MODE_READ, _MODE_READ_EOF): 

369 self._check_not_closed() 

370 raise io.UnsupportedOperation( 

371 "Seeking is only supported on files open for reading" 

372 ) 

373 if not self._fp.seekable(): 

374 raise io.UnsupportedOperation( 

375 "The underlying file object does not support seeking" 

376 ) 

377 

378 # Fill the readahead buffer if it is empty. Returns False on EOF. 

379 def _fill_buffer(self): 

380 if self._mode == _MODE_READ_EOF: 

381 return False 

382 # Depending on the input data, our call to the decompressor may not 

383 # return any data. In this case, try again after reading another block. 

384 while self._buffer_offset == len(self._buffer): 

385 try: 

386 rawblock = self._decompressor.unused_data or self._fp.read(_BUFFER_SIZE) 

387 if not rawblock: 

388 raise EOFError 

389 except EOFError: 

390 # End-of-stream marker and end of file. We're good. 

391 self._mode = _MODE_READ_EOF 

392 self._size = self._pos 

393 return False 

394 else: 

395 self._buffer = self._decompressor.decompress(rawblock) 

396 self._buffer_offset = 0 

397 return True 

398 

399 # Read data until EOF. 

400 # If return_data is false, consume the data without returning it. 

401 def _read_all(self, return_data=True): 

402 # The loop assumes that _buffer_offset is 0. Ensure that this is true. 

403 self._buffer = self._buffer[self._buffer_offset :] 

404 self._buffer_offset = 0 

405 

406 blocks = [] 

407 while self._fill_buffer(): 

408 if return_data: 

409 blocks.append(self._buffer) 

410 self._pos += len(self._buffer) 

411 self._buffer = b"" 

412 if return_data: 

413 return b"".join(blocks) 

414 

415 # Read a block of up to n bytes. 

416 # If return_data is false, consume the data without returning it. 

417 def _read_block(self, n_bytes, return_data=True): 

418 # If we have enough data buffered, return immediately. 

419 end = self._buffer_offset + n_bytes 

420 if end <= len(self._buffer): 

421 data = self._buffer[self._buffer_offset : end] 

422 self._buffer_offset = end 

423 self._pos += len(data) 

424 return data if return_data else None 

425 

426 # The loop assumes that _buffer_offset is 0. Ensure that this is true. 

427 self._buffer = self._buffer[self._buffer_offset :] 

428 self._buffer_offset = 0 

429 

430 blocks = [] 

431 while n_bytes > 0 and self._fill_buffer(): 

432 if n_bytes < len(self._buffer): 

433 data = self._buffer[:n_bytes] 

434 self._buffer_offset = n_bytes 

435 else: 

436 data = self._buffer 

437 self._buffer = b"" 

438 if return_data: 

439 blocks.append(data) 

440 self._pos += len(data) 

441 n_bytes -= len(data) 

442 if return_data: 

443 return b"".join(blocks) 

444 

445 def read(self, size=-1): 

446 """Read up to size uncompressed bytes from the file. 

447 

448 If size is negative or omitted, read until EOF is reached. 

449 Returns b'' if the file is already at EOF. 

450 """ 

451 with self._lock: 

452 self._check_can_read() 

453 if size == 0: 

454 return b"" 

455 elif size < 0: 

456 return self._read_all() 

457 else: 

458 return self._read_block(size) 

459 

460 def readinto(self, b): 

461 """Read up to len(b) bytes into b. 

462 

463 Returns the number of bytes read (0 for EOF). 

464 """ 

465 with self._lock: 

466 return io.BufferedIOBase.readinto(self, b) 

467 

468 def write(self, data): 

469 """Write a byte string to the file. 

470 

471 Returns the number of uncompressed bytes written, which is 

472 always len(data). Note that due to buffering, the file on disk 

473 may not reflect the data written until close() is called. 

474 """ 

475 with self._lock: 

476 self._check_can_write() 

477 # Convert data type if called by io.BufferedWriter. 

478 if isinstance(data, memoryview): 

479 data = data.tobytes() 

480 

481 compressed = self._compressor.compress(data) 

482 self._fp.write(compressed) 

483 self._pos += len(data) 

484 return len(data) 

485 

486 # Rewind the file to the beginning of the data stream. 

487 def _rewind(self): 

488 self._fp.seek(0, 0) 

489 self._mode = _MODE_READ 

490 self._pos = 0 

491 self._decompressor = zlib.decompressobj(self.wbits) 

492 self._buffer = b"" 

493 self._buffer_offset = 0 

494 

495 def seek(self, offset, whence=0): 

496 """Change the file position. 

497 

498 The new position is specified by offset, relative to the 

499 position indicated by whence. Values for whence are: 

500 

501 0: start of stream (default); offset must not be negative 

502 1: current stream position 

503 2: end of stream; offset must not be positive 

504 

505 Returns the new file position. 

506 

507 Note that seeking is emulated, so depending on the parameters, 

508 this operation may be extremely slow. 

509 """ 

510 with self._lock: 

511 self._check_can_seek() 

512 

513 # Recalculate offset as an absolute file position. 

514 if whence == 0: 

515 pass 

516 elif whence == 1: 

517 offset = self._pos + offset 

518 elif whence == 2: 

519 # Seeking relative to EOF - we need to know the file's size. 

520 if self._size < 0: 

521 self._read_all(return_data=False) 

522 offset = self._size + offset 

523 else: 

524 raise ValueError("Invalid value for whence: %s" % (whence,)) 

525 

526 # Make it so that offset is the number of bytes to skip forward. 

527 if offset < self._pos: 

528 self._rewind() 

529 else: 

530 offset -= self._pos 

531 

532 # Read and discard data until we reach the desired position. 

533 self._read_block(offset, return_data=False) 

534 

535 return self._pos 

536 

537 def tell(self): 

538 """Return the current file position.""" 

539 with self._lock: 

540 self._check_not_closed() 

541 return self._pos 

542 

543 

544class ZlibCompressorWrapper(CompressorWrapper): 

545 def __init__(self): 

546 CompressorWrapper.__init__( 

547 self, obj=BinaryZlibFile, prefix=_ZLIB_PREFIX, extension=".z" 

548 ) 

549 

550 

551class BinaryGzipFile(BinaryZlibFile): 

552 """A file object providing transparent gzip (de)compression. 

553 

554 If filename is a str or bytes object, it gives the name 

555 of the file to be opened. Otherwise, it should be a file object, 

556 which will be used to read or write the compressed data. 

557 

558 mode can be 'rb' for reading (default) or 'wb' for (over)writing 

559 

560 If mode is 'wb', compresslevel can be a number between 1 

561 and 9 specifying the level of compression: 1 produces the least 

562 compression, and 9 produces the most compression. 3 is the default. 

563 """ 

564 

565 wbits = 31 # zlib compressor/decompressor wbits value for gzip format. 

566 

567 

568class GzipCompressorWrapper(CompressorWrapper): 

569 def __init__(self): 

570 CompressorWrapper.__init__( 

571 self, obj=BinaryGzipFile, prefix=_GZIP_PREFIX, extension=".gz" 

572 )