1"""Handler for gzip compression format.
2
3It is based on standard documented at
4https://datatracker.ietf.org/doc/html/rfc1952.
5
6The handler will create valid chunks for each gzip compressed stream
7instead of concatenating sequential streams into an overall
8ValidChunk.
9
10We monkey patched Python builtin gzip's _GzipReader read() function to
11stop reading as soon as it reach the EOF marker of the current gzip
12stream. This is a requirement for unblob given that streams can be
13malformed and followed by garbage/random content that triggers
14BadGzipFile errors when gzip library tries to read the next stream
15header.
16"""
17
18import gzip
19import io
20import struct
21import zlib
22from pathlib import Path
23
24from structlog import get_logger
25
26from unblob.extractors import Command
27from unblob.extractors.command import MultiFileCommand
28from unblob.models import Extractor
29
30from ...file_utils import InvalidInputFormat
31from ...models import (
32 DirectoryExtractor,
33 DirectoryHandler,
34 ExtractResult,
35 File,
36 Glob,
37 Handler,
38 HandlerDoc,
39 HandlerType,
40 HexString,
41 MultiFile,
42 Reference,
43 ValidChunk,
44)
45from ._gzip_reader import SingleMemberGzipReader
46
47logger = get_logger()
48
49GZIP2_CRC_LEN = 4
50GZIP2_SIZE_LEN = 4
51GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN
52
53FLAG_EXTRA = 4
54FLAG_NAME = 8
55
56
57def get_gzip_embedded_name(path: Path) -> str:
58 name = b""
59 with path.open("rb") as file:
60 # skip magic bytes and method
61 file.read(2)
62 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8))
63
64 if flag & FLAG_EXTRA:
65 # Read & discard the extra field, if present
66 [extra_len] = struct.unpack("<H", file.read(2))
67 file.seek(extra_len, io.SEEK_CUR)
68
69 if flag & FLAG_NAME:
70 # Read and discard a null-terminated string containing the filename
71 while True:
72 s = file.read(1)
73 if not s or s == b"\000":
74 break
75 name += s
76
77 # return a valid, safe name without directories!
78 try:
79 return Path(name.decode("utf-8")).name
80 except UnicodeDecodeError:
81 return ""
82
83
84class GZIPExtractor(Extractor):
85 def get_dependencies(self) -> list[str]:
86 return ["7z"]
87
88 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
89 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed"
90 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name)
91 return extractor.extract(inpath, outdir)
92
93
94class MultiGZIPExtractor(DirectoryExtractor):
95 def get_dependencies(self) -> list[str]:
96 return ["7z"]
97
98 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
99 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed"
100 extractor = MultiFileCommand(
101 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name
102 )
103 return extractor.extract(paths, outdir)
104
105
106class GZIPHandler(Handler):
107 NAME = "gzip"
108
109 EXTRACTOR = GZIPExtractor()
110
111 PATTERNS = [
112 HexString(
113 """
114 // ID1
115 1F
116 // ID2
117 8B
118 // compression method (0x8 = DEFLATE)
119 08
120 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved
121 (
122 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 |
123 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 |
124 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E
125 )
126 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1)
127 // we accept any value because the RFC is not followed by some samples
128 [5]
129 // Operating System (0-13, or 255 per RFC1952 2.3.1)
130 (
131 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF
132 )
133 """
134 )
135 ]
136
137 DOC = HandlerDoc(
138 name="GZIP",
139 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
140 handler_type=HandlerType.COMPRESSION,
141 vendor=None,
142 references=[
143 Reference(
144 title="GZIP File Format Specification",
145 url="https://datatracker.ietf.org/doc/html/rfc1952",
146 ),
147 Reference(
148 title="GZIP Wikipedia",
149 url="https://en.wikipedia.org/wiki/Gzip",
150 ),
151 ],
152 limitations=[],
153 )
154
155 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
156 fp = SingleMemberGzipReader(file)
157 if not fp.read_header():
158 return None
159
160 try:
161 fp.read_until_eof()
162 except (gzip.BadGzipFile, zlib.error) as e:
163 raise InvalidInputFormat from e
164
165 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR)
166
167 return ValidChunk(
168 start_offset=start_offset,
169 end_offset=file.tell(),
170 )
171
172
173class MultiVolumeGzipHandler(DirectoryHandler):
174 NAME = "multi-gzip"
175 EXTRACTOR = MultiGZIPExtractor()
176
177 PATTERN = Glob("*.gz.*")
178
179 DOC = HandlerDoc(
180 name="GZIP (multi-volume)",
181 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
182 handler_type=HandlerType.COMPRESSION,
183 vendor=None,
184 references=[
185 Reference(
186 title="GZIP File Format Specification",
187 url="https://datatracker.ietf.org/doc/html/rfc1952",
188 ),
189 Reference(
190 title="GZIP Wikipedia",
191 url="https://en.wikipedia.org/wiki/Gzip",
192 ),
193 ],
194 limitations=[],
195 )
196
197 def is_valid_gzip(self, path: Path) -> bool:
198 try:
199 file = File.from_path(path)
200 except ValueError:
201 return False
202
203 with file as f:
204 try:
205 fp = SingleMemberGzipReader(f)
206 if not fp.read_header():
207 return False
208 except gzip.BadGzipFile:
209 return False
210 return True
211
212 def calculate_multifile(self, file: Path) -> MultiFile | None:
213 paths = sorted(
214 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
215 )
216
217 # we 'discard' paths that are not the first in the ordered list,
218 # otherwise we will end up with colliding reports, one for every
219 # path in the list.
220 if not paths or file != paths[0]:
221 return None
222
223 if self.is_valid_gzip(file):
224 files_size = sum(path.stat().st_size for path in paths)
225 logger.debug(
226 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
227 )
228
229 return MultiFile(
230 name=paths[0].stem,
231 paths=paths,
232 )
233 return None