1"""Handler for gzip compression format.
2
3It is based on standard documented at
4https://datatracker.ietf.org/doc/html/rfc1952.
5
6The handler will create valid chunks for each gzip compressed stream
7instead of concatenating sequential streams into an overall
8ValidChunk.
9
10We monkey patched Python builtin gzip's _GzipReader read() function to
11stop reading as soon as it reach the EOF marker of the current gzip
12stream. This is a requirement for unblob given that streams can be
13malformed and followed by garbage/random content that triggers
14BadGzipFile errors when gzip library tries to read the next stream
15header.
16"""
17
18import gzip
19import io
20import struct
21import zlib
22from pathlib import Path
23
24from structlog import get_logger
25
26from unblob.extractors import Command
27from unblob.extractors.command import MultiFileCommand
28from unblob.models import Extractor
29
30from ...file_utils import InvalidInputFormat
31from ...models import (
32 DirectoryExtractor,
33 DirectoryHandler,
34 ExtractResult,
35 File,
36 Glob,
37 Handler,
38 HandlerDoc,
39 HandlerType,
40 HexString,
41 MultiFile,
42 Reference,
43 ValidChunk,
44)
45from ._gzip_reader import SingleMemberGzipReader
46
47logger = get_logger()
48
49GZIP2_CRC_LEN = 4
50GZIP2_SIZE_LEN = 4
51GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN
52
53FLAG_EXTRA = 4
54FLAG_NAME = 8
55
56
57def get_gzip_embedded_name(path: Path) -> str:
58 name = b""
59 with path.open("rb") as file:
60 # skip magic bytes and method
61 file.read(2)
62 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8))
63
64 if flag & FLAG_EXTRA:
65 # Read & discard the extra field, if present
66 [extra_len] = struct.unpack("<H", file.read(2))
67 file.seek(extra_len, io.SEEK_CUR)
68
69 if flag & FLAG_NAME:
70 # Read and discard a null-terminated string containing the filename
71 while True:
72 s = file.read(1)
73 if not s or s == b"\000":
74 break
75 name += s
76
77 # return a valid, safe name without directories!
78 try:
79 return Path(name.decode("utf-8")).name
80 except UnicodeDecodeError:
81 return ""
82
83
84class GZIPExtractor(Extractor):
85 def get_dependencies(self) -> list[str]:
86 return ["7z"]
87
88 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
89 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed"
90 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name)
91 return extractor.extract(inpath, outdir)
92
93
94class MultiGZIPExtractor(DirectoryExtractor):
95 def get_dependencies(self) -> list[str]:
96 return ["7z"]
97
98 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
99 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed"
100 extractor = MultiFileCommand(
101 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name
102 )
103 return extractor.extract(paths, outdir)
104
105
106class GZIPHandler(Handler):
107 NAME = "gzip"
108
109 EXTRACTOR = GZIPExtractor()
110
111 PATTERNS = [
112 HexString(
113 """
114 // ID1
115 1F
116 // ID2
117 8B
118 // compression method (0x8 = DEFLATE)
119 08
120 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved
121 (
122 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 |
123 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 |
124 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E
125 )
126 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1)
127 // we accept any value because the RFC is not followed by some samples
128 [5]
129 // Operating System (0-13, or 255 per RFC1952 2.3.1)
130 (
131 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF
132 )
133 """
134 )
135 ]
136
137 DOC = HandlerDoc(
138 name="GZIP",
139 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
140 handler_type=HandlerType.COMPRESSION,
141 vendor=None,
142 references=[
143 Reference(
144 title="GZIP File Format Specification",
145 url="https://datatracker.ietf.org/doc/html/rfc1952",
146 ),
147 Reference(
148 title="GZIP Wikipedia",
149 url="https://en.wikipedia.org/wiki/Gzip",
150 ),
151 ],
152 limitations=[],
153 )
154
155 def _read_member_end_offset(self, file: File) -> int | None:
156 fp = SingleMemberGzipReader(file)
157 if not fp.read_header():
158 return None
159
160 try:
161 fp.read_until_eof()
162 except (gzip.BadGzipFile, zlib.error) as e:
163 raise InvalidInputFormat from e
164
165 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR)
166
167 return file.tell()
168
169 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
170 end_offset = self._read_member_end_offset(file)
171 if end_offset is None:
172 return None
173
174 while end_offset < file.size():
175 try:
176 next_end_offset = self._read_member_end_offset(file)
177 except (gzip.BadGzipFile, InvalidInputFormat):
178 # Stop at the last valid member if a later member-like sequence is malformed.
179 break
180
181 if next_end_offset is None:
182 break
183
184 end_offset = next_end_offset
185
186 return ValidChunk(
187 start_offset=start_offset,
188 end_offset=end_offset,
189 )
190
191
192class MultiVolumeGzipHandler(DirectoryHandler):
193 NAME = "multi-gzip"
194 EXTRACTOR = MultiGZIPExtractor()
195
196 PATTERN = Glob("*.gz.*")
197
198 DOC = HandlerDoc(
199 name="GZIP (multi-volume)",
200 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
201 handler_type=HandlerType.COMPRESSION,
202 vendor=None,
203 references=[
204 Reference(
205 title="GZIP File Format Specification",
206 url="https://datatracker.ietf.org/doc/html/rfc1952",
207 ),
208 Reference(
209 title="GZIP Wikipedia",
210 url="https://en.wikipedia.org/wiki/Gzip",
211 ),
212 ],
213 limitations=[],
214 )
215
216 def is_valid_gzip(self, path: Path) -> bool:
217 try:
218 file = File.from_path(path)
219 except ValueError:
220 return False
221
222 with file as f:
223 try:
224 fp = SingleMemberGzipReader(f)
225 if not fp.read_header():
226 return False
227 except gzip.BadGzipFile:
228 return False
229 return True
230
231 def calculate_multifile(self, file: Path) -> MultiFile | None:
232 paths = sorted(
233 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
234 )
235
236 # we 'discard' paths that are not the first in the ordered list,
237 # otherwise we will end up with colliding reports, one for every
238 # path in the list.
239 if not paths or file != paths[0]:
240 return None
241
242 if self.is_valid_gzip(file):
243 files_size = sum(path.stat().st_size for path in paths)
244 logger.debug(
245 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
246 )
247
248 return MultiFile(
249 name=paths[0].stem,
250 paths=paths,
251 )
252 return None