Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/compression.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Helper functions for a standard streaming compression API"""
3import sys
4from zipfile import ZipFile
6import fsspec.utils
7from fsspec.spec import AbstractBufferedFile
10def noop_file(file, mode, **kwargs):
11 return file
14# TODO: files should also be available as contexts
15# should be functions of the form func(infile, mode=, **kwargs) -> file-like
16compr = {None: noop_file}
19def register_compression(name, callback, extensions, force=False):
20 """Register an "inferable" file compression type.
22 Registers transparent file compression type for use with fsspec.open.
23 Compression can be specified by name in open, or "infer"-ed for any files
24 ending with the given extensions.
26 Args:
27 name: (str) The compression type name. Eg. "gzip".
28 callback: A callable of form (infile, mode, **kwargs) -> file-like.
29 Accepts an input file-like object, the target mode and kwargs.
30 Returns a wrapped file-like object.
31 extensions: (str, Iterable[str]) A file extension, or list of file
32 extensions for which to infer this compression scheme. Eg. "gz".
33 force: (bool) Force re-registration of compression type or extensions.
35 Raises:
36 ValueError: If name or extensions already registered, and not force.
38 """
39 if isinstance(extensions, str):
40 extensions = [extensions]
42 # Validate registration
43 if name in compr and not force:
44 raise ValueError(f"Duplicate compression registration: {name}")
46 for ext in extensions:
47 if ext in fsspec.utils.compressions and not force:
48 raise ValueError(f"Duplicate compression file extension: {ext} ({name})")
50 compr[name] = callback
52 for ext in extensions:
53 fsspec.utils.compressions[ext] = name
56def unzip(infile, mode="rb", filename=None, **kwargs):
57 if "r" not in mode:
58 filename = filename or "file"
59 z = ZipFile(infile, mode="w", **kwargs)
60 fo = z.open(filename, mode="w")
61 fo.close = lambda closer=fo.close: closer() or z.close()
62 return fo
63 z = ZipFile(infile)
64 if filename is None:
65 filename = z.namelist()[0]
66 return z.open(filename, mode="r", **kwargs)
69register_compression("zip", unzip, "zip")
71try:
72 from bz2 import BZ2File
73except ImportError:
74 pass
75else:
76 register_compression("bz2", BZ2File, "bz2")
78try: # pragma: no cover
79 from isal import igzip
81 def isal(infile, mode="rb", **kwargs):
82 return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
84 register_compression("gzip", isal, "gz")
85except ImportError:
86 from gzip import GzipFile
88 register_compression(
89 "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
90 )
92try:
93 from lzma import LZMAFile
95 register_compression("lzma", LZMAFile, "lzma")
96 register_compression("xz", LZMAFile, "xz")
97except ImportError:
98 pass
100try:
101 import lzmaffi
103 register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True)
104 register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
105except ImportError:
106 pass
109class SnappyFile(AbstractBufferedFile):
110 def __init__(self, infile, mode, **kwargs):
111 import snappy
113 super().__init__(
114 fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
115 )
116 self.infile = infile
117 if "r" in mode:
118 self.codec = snappy.StreamDecompressor()
119 else:
120 self.codec = snappy.StreamCompressor()
122 def _upload_chunk(self, final=False):
123 self.buffer.seek(0)
124 out = self.codec.add_chunk(self.buffer.read())
125 self.infile.write(out)
126 return True
128 def seek(self, loc, whence=0):
129 raise NotImplementedError("SnappyFile is not seekable")
131 def seekable(self):
132 return False
134 def _fetch_range(self, start, end):
135 """Get the specified set of bytes from remote"""
136 data = self.infile.read(end - start)
137 return self.codec.decompress(data)
140try:
141 import snappy
143 snappy.compress(b"")
144 # Snappy may use the .sz file extension, but this is not part of the
145 # standard implementation.
146 register_compression("snappy", SnappyFile, [])
148except (ImportError, NameError, AttributeError):
149 pass
151try:
152 import lz4.frame
154 register_compression("lz4", lz4.frame.open, "lz4")
155except ImportError:
156 pass
158try:
159 if sys.version_info >= (3, 14):
160 from compression import zstd
161 else:
162 from backports import zstd
164 register_compression("zstd", zstd.ZstdFile, "zst")
165except ImportError:
166 pass
169def available_compressions():
170 """Return a list of the implemented compressions."""
171 return list(compr)