Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/compression.py: 53%

93 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1"""Helper functions for a standard streaming compression API""" 

2from zipfile import ZipFile 

3 

4import fsspec.utils 

5from fsspec.spec import AbstractBufferedFile 

6 

7 

8def noop_file(file, mode, **kwargs): 

9 return file 

10 

11 

12# TODO: files should also be available as contexts 

13# should be functions of the form func(infile, mode=, **kwargs) -> file-like 

14compr = {None: noop_file} 

15 

16 

17def register_compression(name, callback, extensions, force=False): 

18 """Register an "inferable" file compression type. 

19 

20 Registers transparent file compression type for use with fsspec.open. 

21 Compression can be specified by name in open, or "infer"-ed for any files 

22 ending with the given extensions. 

23 

24 Args: 

25 name: (str) The compression type name. Eg. "gzip". 

26 callback: A callable of form (infile, mode, **kwargs) -> file-like. 

27 Accepts an input file-like object, the target mode and kwargs. 

28 Returns a wrapped file-like object. 

29 extensions: (str, Iterable[str]) A file extension, or list of file 

30 extensions for which to infer this compression scheme. Eg. "gz". 

31 force: (bool) Force re-registration of compression type or extensions. 

32 

33 Raises: 

34 ValueError: If name or extensions already registered, and not force. 

35 

36 """ 

37 if isinstance(extensions, str): 

38 extensions = [extensions] 

39 

40 # Validate registration 

41 if name in compr and not force: 

42 raise ValueError(f"Duplicate compression registration: {name}") 

43 

44 for ext in extensions: 

45 if ext in fsspec.utils.compressions and not force: 

46 raise ValueError(f"Duplicate compression file extension: {ext} ({name})") 

47 

48 compr[name] = callback 

49 

50 for ext in extensions: 

51 fsspec.utils.compressions[ext] = name 

52 

53 

54def unzip(infile, mode="rb", filename=None, **kwargs): 

55 if "r" not in mode: 

56 filename = filename or "file" 

57 z = ZipFile(infile, mode="w", **kwargs) 

58 fo = z.open(filename, mode="w") 

59 fo.close = lambda closer=fo.close: closer() or z.close() 

60 return fo 

61 z = ZipFile(infile) 

62 if filename is None: 

63 filename = z.namelist()[0] 

64 return z.open(filename, mode="r", **kwargs) 

65 

66 

67register_compression("zip", unzip, "zip") 

68 

69try: 

70 from bz2 import BZ2File 

71except ImportError: 

72 pass 

73else: 

74 register_compression("bz2", BZ2File, "bz2") 

75 

76try: # pragma: no cover 

77 from isal import igzip 

78 

79 def isal(infile, mode="rb", **kwargs): 

80 return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs) 

81 

82 register_compression("gzip", isal, "gz") 

83except ImportError: 

84 from gzip import GzipFile 

85 

86 register_compression( 

87 "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz" 

88 ) 

89 

90try: 

91 from lzma import LZMAFile 

92 

93 register_compression("lzma", LZMAFile, "xz") 

94 register_compression("xz", LZMAFile, "xz", force=True) 

95except ImportError: 

96 pass 

97 

98try: 

99 import lzmaffi 

100 

101 register_compression("lzma", lzmaffi.LZMAFile, "xz", force=True) 

102 register_compression("xz", lzmaffi.LZMAFile, "xz", force=True) 

103except ImportError: 

104 pass 

105 

106 

107class SnappyFile(AbstractBufferedFile): 

108 def __init__(self, infile, mode, **kwargs): 

109 import snappy 

110 

111 super().__init__( 

112 fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs 

113 ) 

114 self.infile = infile 

115 if "r" in mode: 

116 self.codec = snappy.StreamDecompressor() 

117 else: 

118 self.codec = snappy.StreamCompressor() 

119 

120 def _upload_chunk(self, final=False): 

121 self.buffer.seek(0) 

122 out = self.codec.add_chunk(self.buffer.read()) 

123 self.infile.write(out) 

124 return True 

125 

126 def seek(self, loc, whence=0): 

127 raise NotImplementedError("SnappyFile is not seekable") 

128 

129 def seekable(self): 

130 return False 

131 

132 def _fetch_range(self, start, end): 

133 """Get the specified set of bytes from remote""" 

134 data = self.infile.read(end - start) 

135 return self.codec.decompress(data) 

136 

137 

138try: 

139 import snappy 

140 

141 snappy.compress 

142 # Snappy may use the .sz file extension, but this is not part of the 

143 # standard implementation. 

144 register_compression("snappy", SnappyFile, []) 

145 

146except (ImportError, NameError, AttributeError): 

147 pass 

148 

149try: 

150 import lz4.frame 

151 

152 register_compression("lz4", lz4.frame.open, "lz4") 

153except ImportError: 

154 pass 

155 

156try: 

157 import zstandard as zstd 

158 

159 def zstandard_file(infile, mode="rb"): 

160 if "r" in mode: 

161 cctx = zstd.ZstdDecompressor() 

162 return cctx.stream_reader(infile) 

163 else: 

164 cctx = zstd.ZstdCompressor(level=10) 

165 return cctx.stream_writer(infile) 

166 

167 register_compression("zstd", zstandard_file, "zst") 

168except ImportError: 

169 pass 

170 

171 

172def available_compressions(): 

173 """Return a list of the implemented compressions.""" 

174 return list(compr)