1"""LZ4 handler.
2
3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md.
4"""
5
6import io
7
8from lz4.block import LZ4BlockError, decompress
9from structlog import get_logger
10
11from unblob.extractors import Command
12
13from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32
14from ...models import (
15 File,
16 Handler,
17 HandlerDoc,
18 HandlerType,
19 HexString,
20 Reference,
21 ValidChunk,
22)
23
24logger = get_logger()
25
26SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)]
27FRAME_MAGIC = 0x184D2204
28LEGACY_FRAME_MAGIC = 0x184C2102
29FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC]
30
31_1BIT = 0x01
32_2BITS = 0x03
33
34# highest bit of the block size flags an uncompressed block, the lower 31 bits hold the size
35BLOCK_SIZE_MASK = 0x7FFFFFFF
36
37END_MARK = 0x00000000
38
39CONTENT_SIZE_LEN = 8
40BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = (
41 MAGIC_LEN
42) = DICTID_LEN = 4
43FLG_LEN = BD_LEN = HC_LEN = 1
44MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB
45
46
47class FLG:
48 """Represents the FLG field."""
49
50 version: int = 0
51 block_independence: int = 0
52 block_checksum: int = 0
53 content_size: int = 0
54 content_checksum: int = 0
55 dictid: int = 0
56
57 def __init__(self, raw_flg: int):
58 self.version = (raw_flg >> 6) & _2BITS
59 self.block_independence = (raw_flg >> 5) & _1BIT
60 self.block_checksum = (raw_flg >> 4) & _1BIT
61 self.content_size = (raw_flg >> 3) & _1BIT
62 self.content_checksum = (raw_flg >> 2) & _1BIT
63 self.dictid = raw_flg & _1BIT
64
65 def as_dict(self) -> dict:
66 return {
67 "version": self.version,
68 "block_independence": self.block_independence,
69 "block_checksum": self.block_checksum,
70 "content_size": self.content_size,
71 "content_checksum": self.content_checksum,
72 "dictid": self.dictid,
73 }
74
75
76class _LZ4HandlerBase(Handler):
77 """A common base for all LZ4 formats."""
78
79 def _skip_magic_bytes(self, file: File):
80 file.seek(MAGIC_LEN, io.SEEK_CUR)
81
82 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed")
83
84
85class LegacyFrameHandler(_LZ4HandlerBase):
86 NAME = "lz4_legacy"
87 PATTERNS = [HexString("02 21 4C 18")]
88
89 DOC = HandlerDoc(
90 name="LZ4 (legacy)",
91 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.",
92 handler_type=HandlerType.COMPRESSION,
93 vendor=None,
94 references=[
95 Reference(
96 title="LZ4 Frame Format Documentation",
97 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
98 ),
99 Reference(
100 title="LZ4 Wikipedia",
101 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
102 ),
103 ],
104 limitations=[],
105 )
106
107 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
108 self._skip_magic_bytes(file)
109
110 while True:
111 # The last block is detected either because it is followed by the “EOF” (End of File) mark,
112 # or because it is followed by a known Frame Magic Number.
113 raw_bsize = file.read(BLOCK_SIZE_LEN)
114 if raw_bsize == b"": # EOF
115 break
116
117 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE)
118 if block_compressed_size in FRAME_MAGICS:
119 # next magic, read too far
120 file.seek(-4, io.SEEK_CUR)
121 break
122
123 compressed_block = file.read(block_compressed_size)
124 try:
125 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE)
126 except LZ4BlockError:
127 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None
128
129 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame
130 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE:
131 break
132
133 end_offset = file.tell()
134 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
135
136
137class SkippableFrameHandler(_LZ4HandlerBase):
138 """Can be anything, basically uncompressed data."""
139
140 NAME = "lz4_skippable"
141 PATTERNS = [HexString("5? 2A 4D 18")]
142
143 DOC = HandlerDoc(
144 name="LZ4 (skippable)",
145 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.",
146 handler_type=HandlerType.COMPRESSION,
147 vendor=None,
148 references=[
149 Reference(
150 title="LZ4 Frame Format Documentation",
151 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
152 ),
153 Reference(
154 title="LZ4 Wikipedia",
155 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
156 ),
157 ],
158 limitations=[],
159 )
160
161 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
162 self._skip_magic_bytes(file)
163 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE)
164 file.seek(frame_size, io.SEEK_CUR)
165 end_offset = file.tell()
166 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
167
168
169class DefaultFrameHandler(_LZ4HandlerBase):
170 """Modern version, most frequently used."""
171
172 NAME = "lz4_default"
173
174 PATTERNS = [HexString("04 22 4D 18")]
175
176 DOC = HandlerDoc(
177 name="LZ4",
178 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.",
179 handler_type=HandlerType.COMPRESSION,
180 vendor=None,
181 references=[
182 Reference(
183 title="LZ4 Frame Format Documentation",
184 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
185 ),
186 Reference(
187 title="LZ4 Wikipedia",
188 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
189 ),
190 ],
191 limitations=[],
192 )
193
194 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
195 self._skip_magic_bytes(file)
196
197 # 2. we parse the frame descriptor of dynamic size
198 flg_bytes = file.read(FLG_LEN)
199 raw_flg = convert_int8(flg_bytes, Endian.LITTLE)
200 flg = FLG(raw_flg)
201 logger.debug("Parsed FLG", **flg.as_dict())
202
203 # skip BD (max blocksize), only useful for decoders that needs to allocate memory
204 file.seek(BD_LEN, io.SEEK_CUR)
205
206 if flg.content_size:
207 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR)
208 if flg.dictid:
209 file.seek(DICTID_LEN, io.SEEK_CUR)
210
211 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE)
212 logger.debug("Header checksum (HC) read", header_checksum=header_checksum)
213
214 # 3. we read block by block until we hit the endmarker
215 while True:
216 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE)
217 logger.debug("block_size", block_size=block_size)
218 if block_size == END_MARK:
219 break
220 file.seek(block_size & BLOCK_SIZE_MASK, io.SEEK_CUR)
221 if flg.block_checksum:
222 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR)
223
224 # 4. we reached the endmark (0x00000000)
225
226 # 5. if frame descriptor mentions CRC, we add CRC
227 if flg.content_checksum:
228 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR)
229
230 end_offset = file.tell()
231
232 return ValidChunk(start_offset=start_offset, end_offset=end_offset)