1import re
2import zlib
3from pathlib import Path
4from typing import Optional
5
6from structlog import get_logger
7
8from unblob.handlers.archive.dmg import DMGHandler
9
10from ...file_utils import DEFAULT_BUFSIZE, InvalidInputFormat
11from ...models import (
12 Extractor,
13 File,
14 Handler,
15 HandlerDoc,
16 HandlerType,
17 Reference,
18 Regex,
19 ValidChunk,
20)
21
22logger = get_logger()
23
24
25class ZlibExtractor(Extractor):
26 def extract(self, inpath: Path, outdir: Path):
27 decompressor = zlib.decompressobj()
28 outpath = outdir / "zlib.uncompressed"
29 with File.from_path(inpath) as f, outpath.open("wb") as outfile:
30 content = f.read(DEFAULT_BUFSIZE)
31 while content and not decompressor.eof:
32 outfile.write(decompressor.decompress(content))
33 content = f.read(DEFAULT_BUFSIZE)
34
35
36class ZlibHandler(Handler):
37 NAME = "zlib"
38
39 PATTERNS = [
40 Regex(r"^\x78\x01"), # low compression
41 Regex(r"^\x78\x9c"), # default compression
42 Regex(r"^\x78\xda"), # best compression
43 Regex(r"^\x78\x5e"), # compressed
44 ]
45
46 EXTRACTOR = ZlibExtractor()
47
48 DOC = HandlerDoc(
49 name=NAME,
50 description="The zlib format is a compressed data format based on the DEFLATE algorithm, often used for data compression in various applications. It includes a lightweight header and checksum for data integrity.",
51 handler_type=HandlerType.COMPRESSION,
52 vendor=None,
53 references=[
54 Reference(
55 title="zlib File Format Specification",
56 url="https://www.zlib.net/manual.html",
57 ),
58 Reference(
59 title="zlib Wikipedia",
60 url="https://en.wikipedia.org/wiki/Zlib",
61 ),
62 ],
63 limitations=[],
64 )
65
66 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
67 for pattern in DMGHandler.PATTERNS:
68 if re.search(pattern.as_regex(), file[-512:]):
69 raise InvalidInputFormat(
70 "File is a DMG archive made of zlib streams. Aborting."
71 )
72
73 decompressor = zlib.decompressobj()
74
75 try:
76 content = file.read(DEFAULT_BUFSIZE)
77 while content and not decompressor.eof:
78 decompressor.decompress(content)
79 content = file.read(DEFAULT_BUFSIZE)
80
81 except zlib.error:
82 raise InvalidInputFormat("invalid zlib stream") from None
83
84 end_offset = file.tell() - len(decompressor.unused_data)
85
86 return ValidChunk(
87 start_offset=start_offset,
88 end_offset=end_offset,
89 )