Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/compression.py: 53%
90 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1"""Helper functions for a standard streaming compression API"""
2from bz2 import BZ2File
3from zipfile import ZipFile
5import fsspec.utils
6from fsspec.spec import AbstractBufferedFile
9def noop_file(file, mode, **kwargs):
10 return file
13# TODO: files should also be available as contexts
14# should be functions of the form func(infile, mode=, **kwargs) -> file-like
15compr = {None: noop_file}
18def register_compression(name, callback, extensions, force=False):
19 """Register an "inferable" file compression type.
21 Registers transparent file compression type for use with fsspec.open.
22 Compression can be specified by name in open, or "infer"-ed for any files
23 ending with the given extensions.
25 Args:
26 name: (str) The compression type name. Eg. "gzip".
27 callback: A callable of form (infile, mode, **kwargs) -> file-like.
28 Accepts an input file-like object, the target mode and kwargs.
29 Returns a wrapped file-like object.
30 extensions: (str, Iterable[str]) A file extension, or list of file
31 extensions for which to infer this compression scheme. Eg. "gz".
32 force: (bool) Force re-registration of compression type or extensions.
34 Raises:
35 ValueError: If name or extensions already registered, and not force.
37 """
38 if isinstance(extensions, str):
39 extensions = [extensions]
41 # Validate registration
42 if name in compr and not force:
43 raise ValueError("Duplicate compression registration: %s" % name)
45 for ext in extensions:
46 if ext in fsspec.utils.compressions and not force:
47 raise ValueError(
48 "Duplicate compression file extension: %s (%s)" % (ext, name)
49 )
51 compr[name] = callback
53 for ext in extensions:
54 fsspec.utils.compressions[ext] = name
57def unzip(infile, mode="rb", filename=None, **kwargs):
58 if "r" not in mode:
59 filename = filename or "file"
60 z = ZipFile(infile, mode="w", **kwargs)
61 fo = z.open(filename, mode="w")
62 fo.close = lambda closer=fo.close: closer() or z.close()
63 return fo
64 z = ZipFile(infile)
65 if filename is None:
66 filename = z.namelist()[0]
67 return z.open(filename, mode="r", **kwargs)
70register_compression("zip", unzip, "zip")
71register_compression("bz2", BZ2File, "bz2")
73try: # pragma: no cover
74 from isal import igzip
76 def isal(infile, mode="rb", **kwargs):
77 return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
79 register_compression("gzip", isal, "gz")
80except ImportError:
81 from gzip import GzipFile
83 register_compression(
84 "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
85 )
87try:
88 from lzma import LZMAFile
90 register_compression("lzma", LZMAFile, "xz")
91 register_compression("xz", LZMAFile, "xz", force=True)
92except ImportError:
93 pass
95try:
96 import lzmaffi
98 register_compression("lzma", lzmaffi.LZMAFile, "xz", force=True)
99 register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
100except ImportError:
101 pass
104class SnappyFile(AbstractBufferedFile):
105 def __init__(self, infile, mode, **kwargs):
106 import snappy
108 super().__init__(
109 fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
110 )
111 self.infile = infile
112 if "r" in mode:
113 self.codec = snappy.StreamDecompressor()
114 else:
115 self.codec = snappy.StreamCompressor()
117 def _upload_chunk(self, final=False):
118 self.buffer.seek(0)
119 out = self.codec.add_chunk(self.buffer.read())
120 self.infile.write(out)
121 return True
123 def seek(self, loc, whence=0):
124 raise NotImplementedError("SnappyFile is not seekable")
126 def seekable(self):
127 return False
129 def _fetch_range(self, start, end):
130 """Get the specified set of bytes from remote"""
131 data = self.infile.read(end - start)
132 return self.codec.decompress(data)
135try:
136 import snappy
138 snappy.compress
139 # Snappy may use the .sz file extension, but this is not part of the
140 # standard implementation.
141 register_compression("snappy", SnappyFile, [])
143except (ImportError, NameError, AttributeError):
144 pass
146try:
147 import lz4.frame
149 register_compression("lz4", lz4.frame.open, "lz4")
150except ImportError:
151 pass
153try:
154 import zstandard as zstd
156 def zstandard_file(infile, mode="rb"):
157 if "r" in mode:
158 cctx = zstd.ZstdDecompressor()
159 return cctx.stream_reader(infile)
160 else:
161 cctx = zstd.ZstdCompressor(level=10)
162 return cctx.stream_writer(infile)
164 register_compression("zstd", zstandard_file, "zst")
165except ImportError:
166 pass
169def available_compressions():
170 """Return a list of the implemented compressions."""
171 return list(compr)