Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/compression.py: 53%
93 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1"""Helper functions for a standard streaming compression API"""
2from zipfile import ZipFile
4import fsspec.utils
5from fsspec.spec import AbstractBufferedFile
8def noop_file(file, mode, **kwargs):
9 return file
12# TODO: files should also be available as contexts
13# should be functions of the form func(infile, mode=, **kwargs) -> file-like
14compr = {None: noop_file}
17def register_compression(name, callback, extensions, force=False):
18 """Register an "inferable" file compression type.
20 Registers transparent file compression type for use with fsspec.open.
21 Compression can be specified by name in open, or "infer"-ed for any files
22 ending with the given extensions.
24 Args:
25 name: (str) The compression type name. Eg. "gzip".
26 callback: A callable of form (infile, mode, **kwargs) -> file-like.
27 Accepts an input file-like object, the target mode and kwargs.
28 Returns a wrapped file-like object.
29 extensions: (str, Iterable[str]) A file extension, or list of file
30 extensions for which to infer this compression scheme. Eg. "gz".
31 force: (bool) Force re-registration of compression type or extensions.
33 Raises:
34 ValueError: If name or extensions already registered, and not force.
36 """
37 if isinstance(extensions, str):
38 extensions = [extensions]
40 # Validate registration
41 if name in compr and not force:
42 raise ValueError(f"Duplicate compression registration: {name}")
44 for ext in extensions:
45 if ext in fsspec.utils.compressions and not force:
46 raise ValueError(f"Duplicate compression file extension: {ext} ({name})")
48 compr[name] = callback
50 for ext in extensions:
51 fsspec.utils.compressions[ext] = name
54def unzip(infile, mode="rb", filename=None, **kwargs):
55 if "r" not in mode:
56 filename = filename or "file"
57 z = ZipFile(infile, mode="w", **kwargs)
58 fo = z.open(filename, mode="w")
59 fo.close = lambda closer=fo.close: closer() or z.close()
60 return fo
61 z = ZipFile(infile)
62 if filename is None:
63 filename = z.namelist()[0]
64 return z.open(filename, mode="r", **kwargs)
67register_compression("zip", unzip, "zip")
69try:
70 from bz2 import BZ2File
71except ImportError:
72 pass
73else:
74 register_compression("bz2", BZ2File, "bz2")
76try: # pragma: no cover
77 from isal import igzip
79 def isal(infile, mode="rb", **kwargs):
80 return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
82 register_compression("gzip", isal, "gz")
83except ImportError:
84 from gzip import GzipFile
86 register_compression(
87 "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
88 )
90try:
91 from lzma import LZMAFile
93 register_compression("lzma", LZMAFile, "xz")
94 register_compression("xz", LZMAFile, "xz", force=True)
95except ImportError:
96 pass
98try:
99 import lzmaffi
101 register_compression("lzma", lzmaffi.LZMAFile, "xz", force=True)
102 register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
103except ImportError:
104 pass
107class SnappyFile(AbstractBufferedFile):
108 def __init__(self, infile, mode, **kwargs):
109 import snappy
111 super().__init__(
112 fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
113 )
114 self.infile = infile
115 if "r" in mode:
116 self.codec = snappy.StreamDecompressor()
117 else:
118 self.codec = snappy.StreamCompressor()
120 def _upload_chunk(self, final=False):
121 self.buffer.seek(0)
122 out = self.codec.add_chunk(self.buffer.read())
123 self.infile.write(out)
124 return True
126 def seek(self, loc, whence=0):
127 raise NotImplementedError("SnappyFile is not seekable")
129 def seekable(self):
130 return False
132 def _fetch_range(self, start, end):
133 """Get the specified set of bytes from remote"""
134 data = self.infile.read(end - start)
135 return self.codec.decompress(data)
138try:
139 import snappy
141 snappy.compress
142 # Snappy may use the .sz file extension, but this is not part of the
143 # standard implementation.
144 register_compression("snappy", SnappyFile, [])
146except (ImportError, NameError, AttributeError):
147 pass
149try:
150 import lz4.frame
152 register_compression("lz4", lz4.frame.open, "lz4")
153except ImportError:
154 pass
156try:
157 import zstandard as zstd
159 def zstandard_file(infile, mode="rb"):
160 if "r" in mode:
161 cctx = zstd.ZstdDecompressor()
162 return cctx.stream_reader(infile)
163 else:
164 cctx = zstd.ZstdCompressor(level=10)
165 return cctx.stream_writer(infile)
167 register_compression("zstd", zstandard_file, "zst")
168except ImportError:
169 pass
172def available_compressions():
173 """Return a list of the implemented compressions."""
174 return list(compr)