Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/compression.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1"""Helper functions for a standard streaming compression API""" 

2 

3import sys 

4from zipfile import ZipFile 

5 

6import fsspec.utils 

7from fsspec.spec import AbstractBufferedFile 

8 

9 

10def noop_file(file, mode, **kwargs): 

11 return file 

12 

13 

14# TODO: files should also be available as contexts 

15# should be functions of the form func(infile, mode=, **kwargs) -> file-like 

16compr = {None: noop_file} 

17 

18 

19def register_compression(name, callback, extensions, force=False): 

20 """Register an "inferable" file compression type. 

21 

22 Registers transparent file compression type for use with fsspec.open. 

23 Compression can be specified by name in open, or "infer"-ed for any files 

24 ending with the given extensions. 

25 

26 Args: 

27 name: (str) The compression type name. Eg. "gzip". 

28 callback: A callable of form (infile, mode, **kwargs) -> file-like. 

29 Accepts an input file-like object, the target mode and kwargs. 

30 Returns a wrapped file-like object. 

31 extensions: (str, Iterable[str]) A file extension, or list of file 

32 extensions for which to infer this compression scheme. Eg. "gz". 

33 force: (bool) Force re-registration of compression type or extensions. 

34 

35 Raises: 

36 ValueError: If name or extensions already registered, and not force. 

37 

38 """ 

39 if isinstance(extensions, str): 

40 extensions = [extensions] 

41 

42 # Validate registration 

43 if name in compr and not force: 

44 raise ValueError(f"Duplicate compression registration: {name}") 

45 

46 for ext in extensions: 

47 if ext in fsspec.utils.compressions and not force: 

48 raise ValueError(f"Duplicate compression file extension: {ext} ({name})") 

49 

50 compr[name] = callback 

51 

52 for ext in extensions: 

53 fsspec.utils.compressions[ext] = name 

54 

55 

56def unzip(infile, mode="rb", filename=None, **kwargs): 

57 if "r" not in mode: 

58 filename = filename or "file" 

59 z = ZipFile(infile, mode="w", **kwargs) 

60 fo = z.open(filename, mode="w") 

61 fo.close = lambda closer=fo.close: closer() or z.close() 

62 return fo 

63 z = ZipFile(infile) 

64 if filename is None: 

65 filename = z.namelist()[0] 

66 return z.open(filename, mode="r", **kwargs) 

67 

68 

69register_compression("zip", unzip, "zip") 

70 

71try: 

72 from bz2 import BZ2File 

73except ImportError: 

74 pass 

75else: 

76 register_compression("bz2", BZ2File, "bz2") 

77 

78try: # pragma: no cover 

79 from isal import igzip 

80 

81 def isal(infile, mode="rb", **kwargs): 

82 return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs) 

83 

84 register_compression("gzip", isal, "gz") 

85except ImportError: 

86 from gzip import GzipFile 

87 

88 register_compression( 

89 "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz" 

90 ) 

91 

92try: 

93 from lzma import LZMAFile 

94 

95 register_compression("lzma", LZMAFile, "lzma") 

96 register_compression("xz", LZMAFile, "xz") 

97except ImportError: 

98 pass 

99 

100try: 

101 import lzmaffi 

102 

103 register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True) 

104 register_compression("xz", lzmaffi.LZMAFile, "xz", force=True) 

105except ImportError: 

106 pass 

107 

108 

109class SnappyFile(AbstractBufferedFile): 

110 def __init__(self, infile, mode, **kwargs): 

111 import snappy 

112 

113 super().__init__( 

114 fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs 

115 ) 

116 self.infile = infile 

117 if "r" in mode: 

118 self.codec = snappy.StreamDecompressor() 

119 else: 

120 self.codec = snappy.StreamCompressor() 

121 

122 def _upload_chunk(self, final=False): 

123 self.buffer.seek(0) 

124 out = self.codec.add_chunk(self.buffer.read()) 

125 self.infile.write(out) 

126 return True 

127 

128 def seek(self, loc, whence=0): 

129 raise NotImplementedError("SnappyFile is not seekable") 

130 

131 def seekable(self): 

132 return False 

133 

134 def _fetch_range(self, start, end): 

135 """Get the specified set of bytes from remote""" 

136 data = self.infile.read(end - start) 

137 return self.codec.decompress(data) 

138 

139 

140try: 

141 import snappy 

142 

143 snappy.compress(b"") 

144 # Snappy may use the .sz file extension, but this is not part of the 

145 # standard implementation. 

146 register_compression("snappy", SnappyFile, []) 

147 

148except (ImportError, NameError, AttributeError): 

149 pass 

150 

151try: 

152 import lz4.frame 

153 

154 register_compression("lz4", lz4.frame.open, "lz4") 

155except ImportError: 

156 pass 

157 

158try: 

159 if sys.version_info >= (3, 14): 

160 from compression import zstd 

161 else: 

162 from backports import zstd 

163 

164 register_compression("zstd", zstd.ZstdFile, "zst") 

165except ImportError: 

166 pass 

167 

168 

169def available_compressions(): 

170 """Return a list of the implemented compressions.""" 

171 return list(compr)