Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/gzip.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

94 statements  

1"""Handler for gzip compression format. 

2 

3It is based on standard documented at 

4https://datatracker.ietf.org/doc/html/rfc1952. 

5 

6The handler will create valid chunks for each gzip compressed stream 

7instead of concatenating sequential streams into an overall 

8ValidChunk. 

9 

10We monkey patched Python builtin gzip's _GzipReader read() function to 

11stop reading as soon as it reach the EOF marker of the current gzip 

12stream. This is a requirement for unblob given that streams can be 

13malformed and followed by garbage/random content that triggers 

14BadGzipFile errors when gzip library tries to read the next stream 

15header. 

16""" 

17 

18import gzip 

19import io 

20import struct 

21import zlib 

22from pathlib import Path 

23from typing import Optional 

24 

25from structlog import get_logger 

26 

27from unblob.extractors import Command 

28from unblob.extractors.command import MultiFileCommand 

29from unblob.models import Extractor 

30 

31from ...file_utils import InvalidInputFormat 

32from ...models import ( 

33 DirectoryExtractor, 

34 DirectoryHandler, 

35 ExtractResult, 

36 File, 

37 Glob, 

38 Handler, 

39 HandlerDoc, 

40 HandlerType, 

41 HexString, 

42 MultiFile, 

43 Reference, 

44 ValidChunk, 

45) 

46from ._gzip_reader import SingleMemberGzipReader 

47 

48logger = get_logger() 

49 

50GZIP2_CRC_LEN = 4 

51GZIP2_SIZE_LEN = 4 

52GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN 

53 

54FLAG_EXTRA = 4 

55FLAG_NAME = 8 

56 

57 

58def get_gzip_embedded_name(path: Path) -> str: 

59 name = b"" 

60 with path.open("rb") as file: 

61 # skip magic bytes and method 

62 file.read(2) 

63 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8)) 

64 

65 if flag & FLAG_EXTRA: 

66 # Read & discard the extra field, if present 

67 [extra_len] = struct.unpack("<H", file.read(2)) 

68 file.seek(extra_len, io.SEEK_CUR) 

69 

70 if flag & FLAG_NAME: 

71 # Read and discard a null-terminated string containing the filename 

72 while True: 

73 s = file.read(1) 

74 if not s or s == b"\000": 

75 break 

76 name += s 

77 

78 # return a valid, safe name without directories! 

79 try: 

80 return Path(name.decode("utf-8")).name 

81 except UnicodeDecodeError: 

82 return "" 

83 

84 

85class GZIPExtractor(Extractor): 

86 def get_dependencies(self) -> list[str]: 

87 return ["7z"] 

88 

89 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

90 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed" 

91 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name) 

92 return extractor.extract(inpath, outdir) 

93 

94 

95class MultiGZIPExtractor(DirectoryExtractor): 

96 def get_dependencies(self) -> list[str]: 

97 return ["7z"] 

98 

99 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

100 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed" 

101 extractor = MultiFileCommand( 

102 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name 

103 ) 

104 return extractor.extract(paths, outdir) 

105 

106 

107class GZIPHandler(Handler): 

108 NAME = "gzip" 

109 

110 EXTRACTOR = GZIPExtractor() 

111 

112 PATTERNS = [ 

113 HexString( 

114 """ 

115 // ID1 

116 1F 

117 // ID2 

118 8B 

119 // compression method (0x8 = DEFLATE) 

120 08 

121 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved 

122 ( 

123 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 

124 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 | 

125 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E 

126 ) 

127 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1) 

128 // we accept any value because the RFC is not followed by some samples 

129 [5] 

130 // Operating System (0-13, or 255 per RFC1952 2.3.1) 

131 ( 

132 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF 

133 ) 

134 """ 

135 ) 

136 ] 

137 

138 DOC = HandlerDoc( 

139 name="GZIP", 

140 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

141 handler_type=HandlerType.COMPRESSION, 

142 vendor=None, 

143 references=[ 

144 Reference( 

145 title="GZIP File Format Specification", 

146 url="https://datatracker.ietf.org/doc/html/rfc1952", 

147 ), 

148 Reference( 

149 title="GZIP Wikipedia", 

150 url="https://en.wikipedia.org/wiki/Gzip", 

151 ), 

152 ], 

153 limitations=[], 

154 ) 

155 

156 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

157 fp = SingleMemberGzipReader(file) 

158 if not fp.read_header(): 

159 return None 

160 

161 try: 

162 fp.read_until_eof() 

163 except (gzip.BadGzipFile, zlib.error) as e: 

164 raise InvalidInputFormat from e 

165 

166 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR) 

167 

168 return ValidChunk( 

169 start_offset=start_offset, 

170 end_offset=file.tell(), 

171 ) 

172 

173 

174class MultiVolumeGzipHandler(DirectoryHandler): 

175 NAME = "multi-gzip" 

176 EXTRACTOR = MultiGZIPExtractor() 

177 

178 PATTERN = Glob("*.gz.*") 

179 

180 DOC = HandlerDoc( 

181 name="GZIP (multi-volume)", 

182 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

183 handler_type=HandlerType.COMPRESSION, 

184 vendor=None, 

185 references=[ 

186 Reference( 

187 title="GZIP File Format Specification", 

188 url="https://datatracker.ietf.org/doc/html/rfc1952", 

189 ), 

190 Reference( 

191 title="GZIP Wikipedia", 

192 url="https://en.wikipedia.org/wiki/Gzip", 

193 ), 

194 ], 

195 limitations=[], 

196 ) 

197 

198 def is_valid_gzip(self, path: Path) -> bool: 

199 try: 

200 file = File.from_path(path) 

201 except ValueError: 

202 return False 

203 

204 with file as f: 

205 try: 

206 fp = SingleMemberGzipReader(f) 

207 if not fp.read_header(): 

208 return False 

209 except gzip.BadGzipFile: 

210 return False 

211 return True 

212 

213 def calculate_multifile(self, file: Path) -> Optional[MultiFile]: 

214 paths = sorted( 

215 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()] 

216 ) 

217 

218 # we 'discard' paths that are not the first in the ordered list, 

219 # otherwise we will end up with colliding reports, one for every 

220 # path in the list. 

221 if not paths or file != paths[0]: 

222 return None 

223 

224 if self.is_valid_gzip(file): 

225 files_size = sum(path.stat().st_size for path in paths) 

226 logger.debug( 

227 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2 

228 ) 

229 

230 return MultiFile( 

231 name=paths[0].stem, 

232 paths=paths, 

233 ) 

234 return None