1import re
2import zlib
3from pathlib import Path
4
5from structlog import get_logger
6
7from unblob.handlers.archive.dmg import DMGHandler
8
9from ...file_utils import DEFAULT_BUFSIZE, InvalidInputFormat
10from ...models import (
11 Extractor,
12 File,
13 Handler,
14 HandlerDoc,
15 HandlerType,
16 Reference,
17 Regex,
18 ValidChunk,
19)
20
21logger = get_logger()
22
23
24class ZlibExtractor(Extractor):
25 def extract(self, inpath: Path, outdir: Path):
26 decompressor = zlib.decompressobj()
27 outpath = outdir / "zlib.uncompressed"
28 with File.from_path(inpath) as f, outpath.open("wb") as outfile:
29 content = f.read(DEFAULT_BUFSIZE)
30 while content and not decompressor.eof:
31 outfile.write(decompressor.decompress(content))
32 content = f.read(DEFAULT_BUFSIZE)
33
34
35class ZlibHandler(Handler):
36 NAME = "zlib"
37
38 PATTERNS = [
39 Regex(r"^\x78\x01"), # low compression
40 Regex(r"^\x78\x9c"), # default compression
41 Regex(r"^\x78\xda"), # best compression
42 Regex(r"^\x78\x5e"), # compressed
43 ]
44
45 EXTRACTOR = ZlibExtractor()
46
47 DOC = HandlerDoc(
48 name=NAME,
49 description="The zlib format is a compressed data format based on the DEFLATE algorithm, often used for data compression in various applications. It includes a lightweight header and checksum for data integrity.",
50 handler_type=HandlerType.COMPRESSION,
51 vendor=None,
52 references=[
53 Reference(
54 title="zlib File Format Specification",
55 url="https://www.zlib.net/manual.html",
56 ),
57 Reference(
58 title="zlib Wikipedia",
59 url="https://en.wikipedia.org/wiki/Zlib",
60 ),
61 ],
62 limitations=[],
63 )
64
65 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
66 for pattern in DMGHandler.PATTERNS:
67 if re.search(pattern.as_regex(), file[-512:]):
68 raise InvalidInputFormat(
69 "File is a DMG archive made of zlib streams. Aborting."
70 )
71
72 decompressor = zlib.decompressobj()
73
74 try:
75 content = file.read(DEFAULT_BUFSIZE)
76 while content and not decompressor.eof:
77 decompressor.decompress(content)
78 content = file.read(DEFAULT_BUFSIZE)
79
80 except zlib.error:
81 raise InvalidInputFormat("invalid zlib stream") from None
82
83 end_offset = file.tell() - len(decompressor.unused_data)
84
85 return ValidChunk(
86 start_offset=start_offset,
87 end_offset=end_offset,
88 )