1"""Classes and functions for managing compressors."""
2
3import io
4import zlib
5
6from joblib.backports import LooseVersion
7
8try:
9 from threading import RLock
10except ImportError:
11 from dummy_threading import RLock
12
13try:
14 import bz2
15except ImportError:
16 bz2 = None
17
18try:
19 import lz4
20 from lz4.frame import LZ4FrameFile
21except ImportError:
22 lz4 = None
23
24try:
25 import lzma
26except ImportError:
27 lzma = None
28
29
30LZ4_NOT_INSTALLED_ERROR = (
31 "LZ4 is not installed. Install it with pip: https://python-lz4.readthedocs.io/"
32)
33
34# Registered compressors
35_COMPRESSORS = {}
36
37# Magic numbers of supported compression file formats.
38_ZFILE_PREFIX = b"ZF" # used with pickle files created before 0.9.3.
39_ZLIB_PREFIX = b"\x78"
40_GZIP_PREFIX = b"\x1f\x8b"
41_BZ2_PREFIX = b"BZ"
42_XZ_PREFIX = b"\xfd\x37\x7a\x58\x5a"
43_LZMA_PREFIX = b"\x5d\x00"
44_LZ4_PREFIX = b"\x04\x22\x4d\x18"
45
46
47def register_compressor(compressor_name, compressor, force=False):
48 """Register a new compressor.
49
50 Parameters
51 ----------
52 compressor_name: str.
53 The name of the compressor.
54 compressor: CompressorWrapper
55 An instance of a 'CompressorWrapper'.
56 """
57 global _COMPRESSORS
58 if not isinstance(compressor_name, str):
59 raise ValueError(
60 "Compressor name should be a string, '{}' given.".format(compressor_name)
61 )
62
63 if not isinstance(compressor, CompressorWrapper):
64 raise ValueError(
65 "Compressor should implement the CompressorWrapper "
66 "interface, '{}' given.".format(compressor)
67 )
68
69 if compressor.fileobj_factory is not None and (
70 not hasattr(compressor.fileobj_factory, "read")
71 or not hasattr(compressor.fileobj_factory, "write")
72 or not hasattr(compressor.fileobj_factory, "seek")
73 or not hasattr(compressor.fileobj_factory, "tell")
74 ):
75 raise ValueError(
76 "Compressor 'fileobj_factory' attribute should "
77 "implement the file object interface, '{}' given.".format(
78 compressor.fileobj_factory
79 )
80 )
81
82 if compressor_name in _COMPRESSORS and not force:
83 raise ValueError("Compressor '{}' already registered.".format(compressor_name))
84
85 _COMPRESSORS[compressor_name] = compressor
86
87
88class CompressorWrapper:
89 """A wrapper around a compressor file object.
90
91 Attributes
92 ----------
93 obj: a file-like object
94 The object must implement the buffer interface and will be used
95 internally to compress/decompress the data.
96 prefix: bytestring
97 A bytestring corresponding to the magic number that identifies the
98 file format associated to the compressor.
99 extension: str
100 The file extension used to automatically select this compressor during
101 a dump to a file.
102 """
103
104 def __init__(self, obj, prefix=b"", extension=""):
105 self.fileobj_factory = obj
106 self.prefix = prefix
107 self.extension = extension
108
109 def compressor_file(self, fileobj, compresslevel=None):
110 """Returns an instance of a compressor file object."""
111 if compresslevel is None:
112 return self.fileobj_factory(fileobj, "wb")
113 else:
114 return self.fileobj_factory(fileobj, "wb", compresslevel=compresslevel)
115
116 def decompressor_file(self, fileobj):
117 """Returns an instance of a decompressor file object."""
118 return self.fileobj_factory(fileobj, "rb")
119
120
121class BZ2CompressorWrapper(CompressorWrapper):
122 prefix = _BZ2_PREFIX
123 extension = ".bz2"
124
125 def __init__(self):
126 if bz2 is not None:
127 self.fileobj_factory = bz2.BZ2File
128 else:
129 self.fileobj_factory = None
130
131 def _check_versions(self):
132 if bz2 is None:
133 raise ValueError(
134 "bz2 module is not compiled on your python standard library."
135 )
136
137 def compressor_file(self, fileobj, compresslevel=None):
138 """Returns an instance of a compressor file object."""
139 self._check_versions()
140 if compresslevel is None:
141 return self.fileobj_factory(fileobj, "wb")
142 else:
143 return self.fileobj_factory(fileobj, "wb", compresslevel=compresslevel)
144
145 def decompressor_file(self, fileobj):
146 """Returns an instance of a decompressor file object."""
147 self._check_versions()
148 fileobj = self.fileobj_factory(fileobj, "rb")
149 return fileobj
150
151
152class LZMACompressorWrapper(CompressorWrapper):
153 prefix = _LZMA_PREFIX
154 extension = ".lzma"
155 _lzma_format_name = "FORMAT_ALONE"
156
157 def __init__(self):
158 if lzma is not None:
159 self.fileobj_factory = lzma.LZMAFile
160 self._lzma_format = getattr(lzma, self._lzma_format_name)
161 else:
162 self.fileobj_factory = None
163
164 def _check_versions(self):
165 if lzma is None:
166 raise ValueError(
167 "lzma module is not compiled on your python standard library."
168 )
169
170 def compressor_file(self, fileobj, compresslevel=None):
171 """Returns an instance of a compressor file object."""
172 if compresslevel is None:
173 return self.fileobj_factory(fileobj, "wb", format=self._lzma_format)
174 else:
175 return self.fileobj_factory(
176 fileobj, "wb", format=self._lzma_format, preset=compresslevel
177 )
178
179 def decompressor_file(self, fileobj):
180 """Returns an instance of a decompressor file object."""
181 return lzma.LZMAFile(fileobj, "rb")
182
183
184class XZCompressorWrapper(LZMACompressorWrapper):
185 prefix = _XZ_PREFIX
186 extension = ".xz"
187 _lzma_format_name = "FORMAT_XZ"
188
189
190class LZ4CompressorWrapper(CompressorWrapper):
191 prefix = _LZ4_PREFIX
192 extension = ".lz4"
193
194 def __init__(self):
195 if lz4 is not None:
196 self.fileobj_factory = LZ4FrameFile
197 else:
198 self.fileobj_factory = None
199
200 def _check_versions(self):
201 if lz4 is None:
202 raise ValueError(LZ4_NOT_INSTALLED_ERROR)
203 lz4_version = lz4.__version__
204 if lz4_version.startswith("v"):
205 lz4_version = lz4_version[1:]
206 if LooseVersion(lz4_version) < LooseVersion("0.19"):
207 raise ValueError(LZ4_NOT_INSTALLED_ERROR)
208
209 def compressor_file(self, fileobj, compresslevel=None):
210 """Returns an instance of a compressor file object."""
211 self._check_versions()
212 if compresslevel is None:
213 return self.fileobj_factory(fileobj, "wb")
214 else:
215 return self.fileobj_factory(fileobj, "wb", compression_level=compresslevel)
216
217 def decompressor_file(self, fileobj):
218 """Returns an instance of a decompressor file object."""
219 self._check_versions()
220 return self.fileobj_factory(fileobj, "rb")
221
222
223###############################################################################
224# base file compression/decompression object definition
225_MODE_CLOSED = 0
226_MODE_READ = 1
227_MODE_READ_EOF = 2
228_MODE_WRITE = 3
229_BUFFER_SIZE = 8192
230
231
232class BinaryZlibFile(io.BufferedIOBase):
233 """A file object providing transparent zlib (de)compression.
234
235 TODO python2_drop: is it still needed since we dropped Python 2 support A
236 BinaryZlibFile can act as a wrapper for an existing file object, or refer
237 directly to a named file on disk.
238
239 Note that BinaryZlibFile provides only a *binary* file interface: data read
240 is returned as bytes, and data to be written should be given as bytes.
241
242 This object is an adaptation of the BZ2File object and is compatible with
243 versions of python >= 2.7.
244
245 If filename is a str or bytes object, it gives the name
246 of the file to be opened. Otherwise, it should be a file object,
247 which will be used to read or write the compressed data.
248
249 mode can be 'rb' for reading (default) or 'wb' for (over)writing
250
251 If mode is 'wb', compresslevel can be a number between 1
252 and 9 specifying the level of compression: 1 produces the least
253 compression, and 9 produces the most compression. 3 is the default.
254 """
255
256 wbits = zlib.MAX_WBITS
257
258 def __init__(self, filename, mode="rb", compresslevel=3):
259 # This lock must be recursive, so that BufferedIOBase's
260 # readline(), readlines() and writelines() don't deadlock.
261 self._lock = RLock()
262 self._fp = None
263 self._closefp = False
264 self._mode = _MODE_CLOSED
265 self._pos = 0
266 self._size = -1
267 self.compresslevel = compresslevel
268
269 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
270 raise ValueError(
271 "'compresslevel' must be an integer "
272 "between 1 and 9. You provided 'compresslevel={}'".format(compresslevel)
273 )
274
275 if mode == "rb":
276 self._mode = _MODE_READ
277 self._decompressor = zlib.decompressobj(self.wbits)
278 self._buffer = b""
279 self._buffer_offset = 0
280 elif mode == "wb":
281 self._mode = _MODE_WRITE
282 self._compressor = zlib.compressobj(
283 self.compresslevel, zlib.DEFLATED, self.wbits, zlib.DEF_MEM_LEVEL, 0
284 )
285 else:
286 raise ValueError("Invalid mode: %r" % (mode,))
287
288 if isinstance(filename, str):
289 self._fp = io.open(filename, mode)
290 self._closefp = True
291 elif hasattr(filename, "read") or hasattr(filename, "write"):
292 self._fp = filename
293 else:
294 raise TypeError("filename must be a str or bytes object, or a file")
295
296 def close(self):
297 """Flush and close the file.
298
299 May be called more than once without error. Once the file is
300 closed, any other operation on it will raise a ValueError.
301 """
302 with self._lock:
303 if self._mode == _MODE_CLOSED:
304 return
305 try:
306 if self._mode in (_MODE_READ, _MODE_READ_EOF):
307 self._decompressor = None
308 elif self._mode == _MODE_WRITE:
309 self._fp.write(self._compressor.flush())
310 self._compressor = None
311 finally:
312 try:
313 if self._closefp:
314 self._fp.close()
315 finally:
316 self._fp = None
317 self._closefp = False
318 self._mode = _MODE_CLOSED
319 self._buffer = b""
320 self._buffer_offset = 0
321
322 @property
323 def closed(self):
324 """True if this file is closed."""
325 return self._mode == _MODE_CLOSED
326
327 def fileno(self):
328 """Return the file descriptor for the underlying file."""
329 self._check_not_closed()
330 return self._fp.fileno()
331
332 def seekable(self):
333 """Return whether the file supports seeking."""
334 return self.readable() and self._fp.seekable()
335
336 def readable(self):
337 """Return whether the file was opened for reading."""
338 self._check_not_closed()
339 return self._mode in (_MODE_READ, _MODE_READ_EOF)
340
341 def writable(self):
342 """Return whether the file was opened for writing."""
343 self._check_not_closed()
344 return self._mode == _MODE_WRITE
345
346 # Mode-checking helper functions.
347
348 def _check_not_closed(self):
349 if self.closed:
350 fname = getattr(self._fp, "name", None)
351 msg = "I/O operation on closed file"
352 if fname is not None:
353 msg += " {}".format(fname)
354 msg += "."
355 raise ValueError(msg)
356
357 def _check_can_read(self):
358 if self._mode not in (_MODE_READ, _MODE_READ_EOF):
359 self._check_not_closed()
360 raise io.UnsupportedOperation("File not open for reading")
361
362 def _check_can_write(self):
363 if self._mode != _MODE_WRITE:
364 self._check_not_closed()
365 raise io.UnsupportedOperation("File not open for writing")
366
367 def _check_can_seek(self):
368 if self._mode not in (_MODE_READ, _MODE_READ_EOF):
369 self._check_not_closed()
370 raise io.UnsupportedOperation(
371 "Seeking is only supported on files open for reading"
372 )
373 if not self._fp.seekable():
374 raise io.UnsupportedOperation(
375 "The underlying file object does not support seeking"
376 )
377
378 # Fill the readahead buffer if it is empty. Returns False on EOF.
379 def _fill_buffer(self):
380 if self._mode == _MODE_READ_EOF:
381 return False
382 # Depending on the input data, our call to the decompressor may not
383 # return any data. In this case, try again after reading another block.
384 while self._buffer_offset == len(self._buffer):
385 try:
386 rawblock = self._decompressor.unused_data or self._fp.read(_BUFFER_SIZE)
387 if not rawblock:
388 raise EOFError
389 except EOFError:
390 # End-of-stream marker and end of file. We're good.
391 self._mode = _MODE_READ_EOF
392 self._size = self._pos
393 return False
394 else:
395 self._buffer = self._decompressor.decompress(rawblock)
396 self._buffer_offset = 0
397 return True
398
399 # Read data until EOF.
400 # If return_data is false, consume the data without returning it.
401 def _read_all(self, return_data=True):
402 # The loop assumes that _buffer_offset is 0. Ensure that this is true.
403 self._buffer = self._buffer[self._buffer_offset :]
404 self._buffer_offset = 0
405
406 blocks = []
407 while self._fill_buffer():
408 if return_data:
409 blocks.append(self._buffer)
410 self._pos += len(self._buffer)
411 self._buffer = b""
412 if return_data:
413 return b"".join(blocks)
414
415 # Read a block of up to n bytes.
416 # If return_data is false, consume the data without returning it.
417 def _read_block(self, n_bytes, return_data=True):
418 # If we have enough data buffered, return immediately.
419 end = self._buffer_offset + n_bytes
420 if end <= len(self._buffer):
421 data = self._buffer[self._buffer_offset : end]
422 self._buffer_offset = end
423 self._pos += len(data)
424 return data if return_data else None
425
426 # The loop assumes that _buffer_offset is 0. Ensure that this is true.
427 self._buffer = self._buffer[self._buffer_offset :]
428 self._buffer_offset = 0
429
430 blocks = []
431 while n_bytes > 0 and self._fill_buffer():
432 if n_bytes < len(self._buffer):
433 data = self._buffer[:n_bytes]
434 self._buffer_offset = n_bytes
435 else:
436 data = self._buffer
437 self._buffer = b""
438 if return_data:
439 blocks.append(data)
440 self._pos += len(data)
441 n_bytes -= len(data)
442 if return_data:
443 return b"".join(blocks)
444
445 def read(self, size=-1):
446 """Read up to size uncompressed bytes from the file.
447
448 If size is negative or omitted, read until EOF is reached.
449 Returns b'' if the file is already at EOF.
450 """
451 with self._lock:
452 self._check_can_read()
453 if size == 0:
454 return b""
455 elif size < 0:
456 return self._read_all()
457 else:
458 return self._read_block(size)
459
460 def readinto(self, b):
461 """Read up to len(b) bytes into b.
462
463 Returns the number of bytes read (0 for EOF).
464 """
465 with self._lock:
466 return io.BufferedIOBase.readinto(self, b)
467
468 def write(self, data):
469 """Write a byte string to the file.
470
471 Returns the number of uncompressed bytes written, which is
472 always len(data). Note that due to buffering, the file on disk
473 may not reflect the data written until close() is called.
474 """
475 with self._lock:
476 self._check_can_write()
477 # Convert data type if called by io.BufferedWriter.
478 if isinstance(data, memoryview):
479 data = data.tobytes()
480
481 compressed = self._compressor.compress(data)
482 self._fp.write(compressed)
483 self._pos += len(data)
484 return len(data)
485
486 # Rewind the file to the beginning of the data stream.
487 def _rewind(self):
488 self._fp.seek(0, 0)
489 self._mode = _MODE_READ
490 self._pos = 0
491 self._decompressor = zlib.decompressobj(self.wbits)
492 self._buffer = b""
493 self._buffer_offset = 0
494
495 def seek(self, offset, whence=0):
496 """Change the file position.
497
498 The new position is specified by offset, relative to the
499 position indicated by whence. Values for whence are:
500
501 0: start of stream (default); offset must not be negative
502 1: current stream position
503 2: end of stream; offset must not be positive
504
505 Returns the new file position.
506
507 Note that seeking is emulated, so depending on the parameters,
508 this operation may be extremely slow.
509 """
510 with self._lock:
511 self._check_can_seek()
512
513 # Recalculate offset as an absolute file position.
514 if whence == 0:
515 pass
516 elif whence == 1:
517 offset = self._pos + offset
518 elif whence == 2:
519 # Seeking relative to EOF - we need to know the file's size.
520 if self._size < 0:
521 self._read_all(return_data=False)
522 offset = self._size + offset
523 else:
524 raise ValueError("Invalid value for whence: %s" % (whence,))
525
526 # Make it so that offset is the number of bytes to skip forward.
527 if offset < self._pos:
528 self._rewind()
529 else:
530 offset -= self._pos
531
532 # Read and discard data until we reach the desired position.
533 self._read_block(offset, return_data=False)
534
535 return self._pos
536
537 def tell(self):
538 """Return the current file position."""
539 with self._lock:
540 self._check_not_closed()
541 return self._pos
542
543
544class ZlibCompressorWrapper(CompressorWrapper):
545 def __init__(self):
546 CompressorWrapper.__init__(
547 self, obj=BinaryZlibFile, prefix=_ZLIB_PREFIX, extension=".z"
548 )
549
550
551class BinaryGzipFile(BinaryZlibFile):
552 """A file object providing transparent gzip (de)compression.
553
554 If filename is a str or bytes object, it gives the name
555 of the file to be opened. Otherwise, it should be a file object,
556 which will be used to read or write the compressed data.
557
558 mode can be 'rb' for reading (default) or 'wb' for (over)writing
559
560 If mode is 'wb', compresslevel can be a number between 1
561 and 9 specifying the level of compression: 1 produces the least
562 compression, and 9 produces the most compression. 3 is the default.
563 """
564
565 wbits = 31 # zlib compressor/decompressor wbits value for gzip format.
566
567
568class GzipCompressorWrapper(CompressorWrapper):
569 def __init__(self):
570 CompressorWrapper.__init__(
571 self, obj=BinaryGzipFile, prefix=_GZIP_PREFIX, extension=".gz"
572 )