Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/gzip.py: 68%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1"""Handler for gzip compression format. 

2 

3It is based on standard documented at 

4https://datatracker.ietf.org/doc/html/rfc1952. 

5 

6The handler will create valid chunks for each gzip compressed stream 

7instead of concatenating sequential streams into an overall 

8ValidChunk. 

9 

10We monkey patched Python builtin gzip's _GzipReader read() function to 

11stop reading as soon as it reach the EOF marker of the current gzip 

12stream. This is a requirement for unblob given that streams can be 

13malformed and followed by garbage/random content that triggers 

14BadGzipFile errors when gzip library tries to read the next stream 

15header. 

16""" 

17 

18import gzip 

19import io 

20import struct 

21import zlib 

22from pathlib import Path 

23 

24from structlog import get_logger 

25 

26from unblob.extractors import Command 

27from unblob.extractors.command import MultiFileCommand 

28from unblob.models import Extractor 

29 

30from ...file_utils import InvalidInputFormat 

31from ...models import ( 

32 DirectoryExtractor, 

33 DirectoryHandler, 

34 ExtractResult, 

35 File, 

36 Glob, 

37 Handler, 

38 HandlerDoc, 

39 HandlerType, 

40 HexString, 

41 MultiFile, 

42 Reference, 

43 ValidChunk, 

44) 

45from ._gzip_reader import SingleMemberGzipReader 

46 

47logger = get_logger() 

48 

49GZIP2_CRC_LEN = 4 

50GZIP2_SIZE_LEN = 4 

51GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN 

52 

53FLAG_EXTRA = 4 

54FLAG_NAME = 8 

55 

56 

57def get_gzip_embedded_name(path: Path) -> str: 

58 name = b"" 

59 with path.open("rb") as file: 

60 # skip magic bytes and method 

61 file.read(2) 

62 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8)) 

63 

64 if flag & FLAG_EXTRA: 

65 # Read & discard the extra field, if present 

66 [extra_len] = struct.unpack("<H", file.read(2)) 

67 file.seek(extra_len, io.SEEK_CUR) 

68 

69 if flag & FLAG_NAME: 

70 # Read and discard a null-terminated string containing the filename 

71 while True: 

72 s = file.read(1) 

73 if not s or s == b"\000": 

74 break 

75 name += s 

76 

77 # return a valid, safe name without directories! 

78 try: 

79 return Path(name.decode("utf-8")).name 

80 except UnicodeDecodeError: 

81 return "" 

82 

83 

84class GZIPExtractor(Extractor): 

85 def get_dependencies(self) -> list[str]: 

86 return ["7z"] 

87 

88 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

89 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed" 

90 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name) 

91 return extractor.extract(inpath, outdir) 

92 

93 

94class MultiGZIPExtractor(DirectoryExtractor): 

95 def get_dependencies(self) -> list[str]: 

96 return ["7z"] 

97 

98 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

99 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed" 

100 extractor = MultiFileCommand( 

101 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name 

102 ) 

103 return extractor.extract(paths, outdir) 

104 

105 

106class GZIPHandler(Handler): 

107 NAME = "gzip" 

108 

109 EXTRACTOR = GZIPExtractor() 

110 

111 PATTERNS = [ 

112 HexString( 

113 """ 

114 // ID1 

115 1F 

116 // ID2 

117 8B 

118 // compression method (0x8 = DEFLATE) 

119 08 

120 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved 

121 ( 

122 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 

123 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 | 

124 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E 

125 ) 

126 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1) 

127 // we accept any value because the RFC is not followed by some samples 

128 [5] 

129 // Operating System (0-13, or 255 per RFC1952 2.3.1) 

130 ( 

131 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF 

132 ) 

133 """ 

134 ) 

135 ] 

136 

137 DOC = HandlerDoc( 

138 name="GZIP", 

139 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

140 handler_type=HandlerType.COMPRESSION, 

141 vendor=None, 

142 references=[ 

143 Reference( 

144 title="GZIP File Format Specification", 

145 url="https://datatracker.ietf.org/doc/html/rfc1952", 

146 ), 

147 Reference( 

148 title="GZIP Wikipedia", 

149 url="https://en.wikipedia.org/wiki/Gzip", 

150 ), 

151 ], 

152 limitations=[], 

153 ) 

154 

155 def _read_member_end_offset(self, file: File) -> int | None: 

156 fp = SingleMemberGzipReader(file) 

157 if not fp.read_header(): 

158 return None 

159 

160 try: 

161 fp.read_until_eof() 

162 except (gzip.BadGzipFile, zlib.error) as e: 

163 raise InvalidInputFormat from e 

164 

165 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR) 

166 

167 return file.tell() 

168 

169 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

170 end_offset = self._read_member_end_offset(file) 

171 if end_offset is None: 

172 return None 

173 

174 while end_offset < file.size(): 

175 try: 

176 next_end_offset = self._read_member_end_offset(file) 

177 except (gzip.BadGzipFile, InvalidInputFormat): 

178 # Stop at the last valid member if a later member-like sequence is malformed. 

179 break 

180 

181 if next_end_offset is None: 

182 break 

183 

184 end_offset = next_end_offset 

185 

186 return ValidChunk( 

187 start_offset=start_offset, 

188 end_offset=end_offset, 

189 ) 

190 

191 

192class MultiVolumeGzipHandler(DirectoryHandler): 

193 NAME = "multi-gzip" 

194 EXTRACTOR = MultiGZIPExtractor() 

195 

196 PATTERN = Glob("*.gz.*") 

197 

198 DOC = HandlerDoc( 

199 name="GZIP (multi-volume)", 

200 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

201 handler_type=HandlerType.COMPRESSION, 

202 vendor=None, 

203 references=[ 

204 Reference( 

205 title="GZIP File Format Specification", 

206 url="https://datatracker.ietf.org/doc/html/rfc1952", 

207 ), 

208 Reference( 

209 title="GZIP Wikipedia", 

210 url="https://en.wikipedia.org/wiki/Gzip", 

211 ), 

212 ], 

213 limitations=[], 

214 ) 

215 

216 def is_valid_gzip(self, path: Path) -> bool: 

217 try: 

218 file = File.from_path(path) 

219 except ValueError: 

220 return False 

221 

222 with file as f: 

223 try: 

224 fp = SingleMemberGzipReader(f) 

225 if not fp.read_header(): 

226 return False 

227 except gzip.BadGzipFile: 

228 return False 

229 return True 

230 

231 def calculate_multifile(self, file: Path) -> MultiFile | None: 

232 paths = sorted( 

233 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()] 

234 ) 

235 

236 # we 'discard' paths that are not the first in the ordered list, 

237 # otherwise we will end up with colliding reports, one for every 

238 # path in the list. 

239 if not paths or file != paths[0]: 

240 return None 

241 

242 if self.is_valid_gzip(file): 

243 files_size = sum(path.stat().st_size for path in paths) 

244 logger.debug( 

245 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2 

246 ) 

247 

248 return MultiFile( 

249 name=paths[0].stem, 

250 paths=paths, 

251 ) 

252 return None