Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/gzip.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1"""Handler for gzip compression format. 

2 

3It is based on standard documented at 

4https://datatracker.ietf.org/doc/html/rfc1952. 

5 

6The handler will create valid chunks for each gzip compressed stream 

7instead of concatenating sequential streams into an overall 

8ValidChunk. 

9 

10We monkey patched Python builtin gzip's _GzipReader read() function to 

11stop reading as soon as it reach the EOF marker of the current gzip 

12stream. This is a requirement for unblob given that streams can be 

13malformed and followed by garbage/random content that triggers 

14BadGzipFile errors when gzip library tries to read the next stream 

15header. 

16""" 

17 

18import gzip 

19import io 

20import struct 

21import zlib 

22from pathlib import Path 

23 

24from structlog import get_logger 

25 

26from unblob.extractors import Command 

27from unblob.extractors.command import MultiFileCommand 

28from unblob.models import Extractor 

29 

30from ...file_utils import InvalidInputFormat 

31from ...models import ( 

32 DirectoryExtractor, 

33 DirectoryHandler, 

34 ExtractResult, 

35 File, 

36 Glob, 

37 Handler, 

38 HandlerDoc, 

39 HandlerType, 

40 HexString, 

41 MultiFile, 

42 Reference, 

43 ValidChunk, 

44) 

45from ._gzip_reader import SingleMemberGzipReader 

46 

47logger = get_logger() 

48 

49GZIP2_CRC_LEN = 4 

50GZIP2_SIZE_LEN = 4 

51GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN 

52 

53FLAG_EXTRA = 4 

54FLAG_NAME = 8 

55 

56 

57def get_gzip_embedded_name(path: Path) -> str: 

58 name = b"" 

59 with path.open("rb") as file: 

60 # skip magic bytes and method 

61 file.read(2) 

62 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8)) 

63 

64 if flag & FLAG_EXTRA: 

65 # Read & discard the extra field, if present 

66 [extra_len] = struct.unpack("<H", file.read(2)) 

67 file.seek(extra_len, io.SEEK_CUR) 

68 

69 if flag & FLAG_NAME: 

70 # Read and discard a null-terminated string containing the filename 

71 while True: 

72 s = file.read(1) 

73 if not s or s == b"\000": 

74 break 

75 name += s 

76 

77 # return a valid, safe name without directories! 

78 try: 

79 return Path(name.decode("utf-8")).name 

80 except UnicodeDecodeError: 

81 return "" 

82 

83 

84class GZIPExtractor(Extractor): 

85 def get_dependencies(self) -> list[str]: 

86 return ["7z"] 

87 

88 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

89 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed" 

90 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name) 

91 return extractor.extract(inpath, outdir) 

92 

93 

94class MultiGZIPExtractor(DirectoryExtractor): 

95 def get_dependencies(self) -> list[str]: 

96 return ["7z"] 

97 

98 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

99 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed" 

100 extractor = MultiFileCommand( 

101 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name 

102 ) 

103 return extractor.extract(paths, outdir) 

104 

105 

106class GZIPHandler(Handler): 

107 NAME = "gzip" 

108 

109 EXTRACTOR = GZIPExtractor() 

110 

111 PATTERNS = [ 

112 HexString( 

113 """ 

114 // ID1 

115 1F 

116 // ID2 

117 8B 

118 // compression method (0x8 = DEFLATE) 

119 08 

120 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved 

121 ( 

122 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 

123 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 | 

124 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E 

125 ) 

126 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1) 

127 // we accept any value because the RFC is not followed by some samples 

128 [5] 

129 // Operating System (0-13, or 255 per RFC1952 2.3.1) 

130 ( 

131 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF 

132 ) 

133 """ 

134 ) 

135 ] 

136 

137 DOC = HandlerDoc( 

138 name="GZIP", 

139 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

140 handler_type=HandlerType.COMPRESSION, 

141 vendor=None, 

142 references=[ 

143 Reference( 

144 title="GZIP File Format Specification", 

145 url="https://datatracker.ietf.org/doc/html/rfc1952", 

146 ), 

147 Reference( 

148 title="GZIP Wikipedia", 

149 url="https://en.wikipedia.org/wiki/Gzip", 

150 ), 

151 ], 

152 limitations=[], 

153 ) 

154 

155 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

156 fp = SingleMemberGzipReader(file) 

157 if not fp.read_header(): 

158 return None 

159 

160 try: 

161 fp.read_until_eof() 

162 except (gzip.BadGzipFile, zlib.error) as e: 

163 raise InvalidInputFormat from e 

164 

165 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR) 

166 

167 return ValidChunk( 

168 start_offset=start_offset, 

169 end_offset=file.tell(), 

170 ) 

171 

172 

173class MultiVolumeGzipHandler(DirectoryHandler): 

174 NAME = "multi-gzip" 

175 EXTRACTOR = MultiGZIPExtractor() 

176 

177 PATTERN = Glob("*.gz.*") 

178 

179 DOC = HandlerDoc( 

180 name="GZIP (multi-volume)", 

181 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.", 

182 handler_type=HandlerType.COMPRESSION, 

183 vendor=None, 

184 references=[ 

185 Reference( 

186 title="GZIP File Format Specification", 

187 url="https://datatracker.ietf.org/doc/html/rfc1952", 

188 ), 

189 Reference( 

190 title="GZIP Wikipedia", 

191 url="https://en.wikipedia.org/wiki/Gzip", 

192 ), 

193 ], 

194 limitations=[], 

195 ) 

196 

197 def is_valid_gzip(self, path: Path) -> bool: 

198 try: 

199 file = File.from_path(path) 

200 except ValueError: 

201 return False 

202 

203 with file as f: 

204 try: 

205 fp = SingleMemberGzipReader(f) 

206 if not fp.read_header(): 

207 return False 

208 except gzip.BadGzipFile: 

209 return False 

210 return True 

211 

212 def calculate_multifile(self, file: Path) -> MultiFile | None: 

213 paths = sorted( 

214 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()] 

215 ) 

216 

217 # we 'discard' paths that are not the first in the ordered list, 

218 # otherwise we will end up with colliding reports, one for every 

219 # path in the list. 

220 if not paths or file != paths[0]: 

221 return None 

222 

223 if self.is_valid_gzip(file): 

224 files_size = sum(path.stat().st_size for path in paths) 

225 logger.debug( 

226 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2 

227 ) 

228 

229 return MultiFile( 

230 name=paths[0].stem, 

231 paths=paths, 

232 ) 

233 return None