1"""LZ4 handler.
2
3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md.
4"""
5
6import io
7from typing import Optional
8
9from lz4.block import LZ4BlockError, decompress
10from structlog import get_logger
11
12from unblob.extractors import Command
13
14from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32
15from ...models import (
16 File,
17 Handler,
18 HandlerDoc,
19 HandlerType,
20 HexString,
21 Reference,
22 ValidChunk,
23)
24
25logger = get_logger()
26
27SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)]
28FRAME_MAGIC = 0x184D2204
29LEGACY_FRAME_MAGIC = 0x184C2102
30FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC]
31
32_1BIT = 0x01
33_2BITS = 0x03
34
35END_MARK = 0x00000000
36
37CONTENT_SIZE_LEN = 8
38BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = (
39 MAGIC_LEN
40) = DICTID_LEN = 4
41FLG_LEN = BD_LEN = HC_LEN = 1
42MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB
43
44
45class FLG:
46 """Represents the FLG field."""
47
48 version: int = 0
49 block_independence: int = 0
50 block_checksum: int = 0
51 content_size: int = 0
52 content_checksum: int = 0
53 dictid: int = 0
54
55 def __init__(self, raw_flg: int):
56 self.version = (raw_flg >> 6) & _2BITS
57 self.block_independence = (raw_flg >> 5) & _1BIT
58 self.block_checksum = (raw_flg >> 4) & _1BIT
59 self.content_size = (raw_flg >> 3) & _1BIT
60 self.content_checksum = (raw_flg >> 2) & _1BIT
61 self.dictid = raw_flg & _1BIT
62
63 def as_dict(self) -> dict:
64 return {
65 "version": self.version,
66 "block_independence": self.block_independence,
67 "block_checksum": self.block_checksum,
68 "content_size": self.content_size,
69 "content_checksum": self.content_checksum,
70 "dictid": self.dictid,
71 }
72
73
74class _LZ4HandlerBase(Handler):
75 """A common base for all LZ4 formats."""
76
77 def _skip_magic_bytes(self, file: File):
78 file.seek(MAGIC_LEN, io.SEEK_CUR)
79
80 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed")
81
82
83class LegacyFrameHandler(_LZ4HandlerBase):
84 NAME = "lz4_legacy"
85 PATTERNS = [HexString("02 21 4C 18")]
86
87 DOC = HandlerDoc(
88 name="LZ4 (legacy)",
89 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.",
90 handler_type=HandlerType.COMPRESSION,
91 vendor=None,
92 references=[
93 Reference(
94 title="LZ4 Frame Format Documentation",
95 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
96 ),
97 Reference(
98 title="LZ4 Wikipedia",
99 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
100 ),
101 ],
102 limitations=[],
103 )
104
105 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
106 self._skip_magic_bytes(file)
107
108 while True:
109 # The last block is detected either because it is followed by the “EOF” (End of File) mark,
110 # or because it is followed by a known Frame Magic Number.
111 raw_bsize = file.read(BLOCK_SIZE_LEN)
112 if raw_bsize == b"": # EOF
113 break
114
115 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE)
116 if block_compressed_size in FRAME_MAGICS:
117 # next magic, read too far
118 file.seek(-4, io.SEEK_CUR)
119 break
120
121 compressed_block = file.read(block_compressed_size)
122 try:
123 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE)
124 except LZ4BlockError:
125 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None
126
127 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame
128 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE:
129 break
130
131 end_offset = file.tell()
132 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
133
134
135class SkippableFrameHandler(_LZ4HandlerBase):
136 """Can be anything, basically uncompressed data."""
137
138 NAME = "lz4_skippable"
139 PATTERNS = [HexString("5? 2A 4D 18")]
140
141 DOC = HandlerDoc(
142 name="LZ4 (skippable)",
143 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.",
144 handler_type=HandlerType.COMPRESSION,
145 vendor=None,
146 references=[
147 Reference(
148 title="LZ4 Frame Format Documentation",
149 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
150 ),
151 Reference(
152 title="LZ4 Wikipedia",
153 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
154 ),
155 ],
156 limitations=[],
157 )
158
159 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
160 self._skip_magic_bytes(file)
161 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE)
162 file.seek(frame_size, io.SEEK_CUR)
163 end_offset = file.tell()
164 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
165
166
167class DefaultFrameHandler(_LZ4HandlerBase):
168 """Modern version, most frequently used."""
169
170 NAME = "lz4_default"
171
172 PATTERNS = [HexString("04 22 4D 18")]
173
174 DOC = HandlerDoc(
175 name="LZ4",
176 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.",
177 handler_type=HandlerType.COMPRESSION,
178 vendor=None,
179 references=[
180 Reference(
181 title="LZ4 Frame Format Documentation",
182 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
183 ),
184 Reference(
185 title="LZ4 Wikipedia",
186 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
187 ),
188 ],
189 limitations=[],
190 )
191
192 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
193 self._skip_magic_bytes(file)
194
195 # 2. we parse the frame descriptor of dynamic size
196 flg_bytes = file.read(FLG_LEN)
197 raw_flg = convert_int8(flg_bytes, Endian.LITTLE)
198 flg = FLG(raw_flg)
199 logger.debug("Parsed FLG", **flg.as_dict())
200
201 # skip BD (max blocksize), only useful for decoders that needs to allocate memory
202 file.seek(BD_LEN, io.SEEK_CUR)
203
204 if flg.content_size:
205 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR)
206 if flg.dictid:
207 file.seek(DICTID_LEN, io.SEEK_CUR)
208
209 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE)
210 logger.debug("Header checksum (HC) read", header_checksum=header_checksum)
211
212 # 3. we read block by block until we hit the endmarker
213 while True:
214 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE)
215 logger.debug("block_size", block_size=block_size)
216 if block_size == END_MARK:
217 break
218 file.seek(block_size, io.SEEK_CUR)
219 if flg.block_checksum:
220 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR)
221
222 # 4. we reached the endmark (0x00000000)
223
224 # 5. if frame descriptor mentions CRC, we add CRC
225 if flg.content_checksum:
226 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR)
227
228 end_offset = file.tell()
229
230 return ValidChunk(start_offset=start_offset, end_offset=end_offset)