1"""Handler for gzip compression format.
2
3It is based on standard documented at
4https://datatracker.ietf.org/doc/html/rfc1952.
5
6The handler will create valid chunks for each gzip compressed stream
7instead of concatenating sequential streams into an overall
8ValidChunk.
9
10We monkey patched Python builtin gzip's _GzipReader read() function to
11stop reading as soon as it reach the EOF marker of the current gzip
12stream. This is a requirement for unblob given that streams can be
13malformed and followed by garbage/random content that triggers
14BadGzipFile errors when gzip library tries to read the next stream
15header.
16"""
17
18import gzip
19import io
20import struct
21import zlib
22from pathlib import Path
23from typing import Optional
24
25from structlog import get_logger
26
27from unblob.extractors import Command
28from unblob.extractors.command import MultiFileCommand
29from unblob.models import Extractor
30
31from ...file_utils import InvalidInputFormat
32from ...models import (
33 DirectoryExtractor,
34 DirectoryHandler,
35 ExtractResult,
36 File,
37 Glob,
38 Handler,
39 HandlerDoc,
40 HandlerType,
41 HexString,
42 MultiFile,
43 Reference,
44 ValidChunk,
45)
46from ._gzip_reader import SingleMemberGzipReader
47
48logger = get_logger()
49
50GZIP2_CRC_LEN = 4
51GZIP2_SIZE_LEN = 4
52GZIP2_FOOTER_LEN = GZIP2_CRC_LEN + GZIP2_SIZE_LEN
53
54FLAG_EXTRA = 4
55FLAG_NAME = 8
56
57
58def get_gzip_embedded_name(path: Path) -> str:
59 name = b""
60 with path.open("rb") as file:
61 # skip magic bytes and method
62 file.read(2)
63 (_method, flag, _last_mtime) = struct.unpack("<BBIxx", file.read(8))
64
65 if flag & FLAG_EXTRA:
66 # Read & discard the extra field, if present
67 [extra_len] = struct.unpack("<H", file.read(2))
68 file.seek(extra_len, io.SEEK_CUR)
69
70 if flag & FLAG_NAME:
71 # Read and discard a null-terminated string containing the filename
72 while True:
73 s = file.read(1)
74 if not s or s == b"\000":
75 break
76 name += s
77
78 # return a valid, safe name without directories!
79 try:
80 return Path(name.decode("utf-8")).name
81 except UnicodeDecodeError:
82 return ""
83
84
85class GZIPExtractor(Extractor):
86 def get_dependencies(self) -> list[str]:
87 return ["7z"]
88
89 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
90 name = get_gzip_embedded_name(inpath) or "gzip.uncompressed"
91 extractor = Command("7z", "x", "-y", "{inpath}", "-so", stdout=name)
92 return extractor.extract(inpath, outdir)
93
94
95class MultiGZIPExtractor(DirectoryExtractor):
96 def get_dependencies(self) -> list[str]:
97 return ["7z"]
98
99 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]:
100 name = get_gzip_embedded_name(paths[0]) or "gzip.uncompressed"
101 extractor = MultiFileCommand(
102 "7z", "x", "-p", "-y", "{inpath}", "-so", stdout=name
103 )
104 return extractor.extract(paths, outdir)
105
106
107class GZIPHandler(Handler):
108 NAME = "gzip"
109
110 EXTRACTOR = GZIPExtractor()
111
112 PATTERNS = [
113 HexString(
114 """
115 // ID1
116 1F
117 // ID2
118 8B
119 // compression method (0x8 = DEFLATE)
120 08
121 // flags, 00011111 (0x1f) is the highest since the first 3 bits are reserved
122 (
123 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 |
124 0A | 0B | 0C | 0D | 0E | 0F | 10 | 11 | 12 | 13 |
125 14 | 15 | 16 | 17 | 18 | 19 | 1A | 1B | 1C | 1D | 1E
126 )
127 // unix time (uint32) + eXtra FLags (2 or 4 per RFC1952 2.3.1)
128 // we accept any value because the RFC is not followed by some samples
129 [5]
130 // Operating System (0-13, or 255 per RFC1952 2.3.1)
131 (
132 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 0A | 0B | 0C | 0D | FF
133 )
134 """
135 )
136 ]
137
138 DOC = HandlerDoc(
139 name="GZIP",
140 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
141 handler_type=HandlerType.COMPRESSION,
142 vendor=None,
143 references=[
144 Reference(
145 title="GZIP File Format Specification",
146 url="https://datatracker.ietf.org/doc/html/rfc1952",
147 ),
148 Reference(
149 title="GZIP Wikipedia",
150 url="https://en.wikipedia.org/wiki/Gzip",
151 ),
152 ],
153 limitations=[],
154 )
155
156 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
157 fp = SingleMemberGzipReader(file)
158 if not fp.read_header():
159 return None
160
161 try:
162 fp.read_until_eof()
163 except (gzip.BadGzipFile, zlib.error) as e:
164 raise InvalidInputFormat from e
165
166 file.seek(GZIP2_FOOTER_LEN - len(fp.unused_data), io.SEEK_CUR)
167
168 return ValidChunk(
169 start_offset=start_offset,
170 end_offset=file.tell(),
171 )
172
173
174class MultiVolumeGzipHandler(DirectoryHandler):
175 NAME = "multi-gzip"
176 EXTRACTOR = MultiGZIPExtractor()
177
178 PATTERN = Glob("*.gz.*")
179
180 DOC = HandlerDoc(
181 name="GZIP (multi-volume)",
182 description="GZIP is a compressed file format that uses the DEFLATE algorithm and includes metadata such as original file name and modification time. It is commonly used for efficient file storage and transfer.",
183 handler_type=HandlerType.COMPRESSION,
184 vendor=None,
185 references=[
186 Reference(
187 title="GZIP File Format Specification",
188 url="https://datatracker.ietf.org/doc/html/rfc1952",
189 ),
190 Reference(
191 title="GZIP Wikipedia",
192 url="https://en.wikipedia.org/wiki/Gzip",
193 ),
194 ],
195 limitations=[],
196 )
197
198 def is_valid_gzip(self, path: Path) -> bool:
199 try:
200 file = File.from_path(path)
201 except ValueError:
202 return False
203
204 with file as f:
205 try:
206 fp = SingleMemberGzipReader(f)
207 if not fp.read_header():
208 return False
209 except gzip.BadGzipFile:
210 return False
211 return True
212
213 def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
214 paths = sorted(
215 [p for p in file.parent.glob(f"{file.stem}.*") if p.resolve().exists()]
216 )
217
218 # we 'discard' paths that are not the first in the ordered list,
219 # otherwise we will end up with colliding reports, one for every
220 # path in the list.
221 if not paths or file != paths[0]:
222 return None
223
224 if self.is_valid_gzip(file):
225 files_size = sum(path.stat().st_size for path in paths)
226 logger.debug(
227 "Multi-volume files", paths=paths, files_size=files_size, _verbosity=2
228 )
229
230 return MultiFile(
231 name=paths[0].stem,
232 paths=paths,
233 )
234 return None