1"""LZ4 handler.
2
3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md.
4"""
5
6import io
7
8from lz4.block import LZ4BlockError, decompress
9from structlog import get_logger
10
11from unblob.extractors import Command
12
13from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32
14from ...models import (
15 File,
16 Handler,
17 HandlerDoc,
18 HandlerType,
19 HexString,
20 Reference,
21 ValidChunk,
22)
23
24logger = get_logger()
25
26SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)]
27FRAME_MAGIC = 0x184D2204
28LEGACY_FRAME_MAGIC = 0x184C2102
29FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC]
30
31_1BIT = 0x01
32_2BITS = 0x03
33
34END_MARK = 0x00000000
35
36CONTENT_SIZE_LEN = 8
37BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = (
38 MAGIC_LEN
39) = DICTID_LEN = 4
40FLG_LEN = BD_LEN = HC_LEN = 1
41MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB
42
43
44class FLG:
45 """Represents the FLG field."""
46
47 version: int = 0
48 block_independence: int = 0
49 block_checksum: int = 0
50 content_size: int = 0
51 content_checksum: int = 0
52 dictid: int = 0
53
54 def __init__(self, raw_flg: int):
55 self.version = (raw_flg >> 6) & _2BITS
56 self.block_independence = (raw_flg >> 5) & _1BIT
57 self.block_checksum = (raw_flg >> 4) & _1BIT
58 self.content_size = (raw_flg >> 3) & _1BIT
59 self.content_checksum = (raw_flg >> 2) & _1BIT
60 self.dictid = raw_flg & _1BIT
61
62 def as_dict(self) -> dict:
63 return {
64 "version": self.version,
65 "block_independence": self.block_independence,
66 "block_checksum": self.block_checksum,
67 "content_size": self.content_size,
68 "content_checksum": self.content_checksum,
69 "dictid": self.dictid,
70 }
71
72
73class _LZ4HandlerBase(Handler):
74 """A common base for all LZ4 formats."""
75
76 def _skip_magic_bytes(self, file: File):
77 file.seek(MAGIC_LEN, io.SEEK_CUR)
78
79 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed")
80
81
82class LegacyFrameHandler(_LZ4HandlerBase):
83 NAME = "lz4_legacy"
84 PATTERNS = [HexString("02 21 4C 18")]
85
86 DOC = HandlerDoc(
87 name="LZ4 (legacy)",
88 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.",
89 handler_type=HandlerType.COMPRESSION,
90 vendor=None,
91 references=[
92 Reference(
93 title="LZ4 Frame Format Documentation",
94 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
95 ),
96 Reference(
97 title="LZ4 Wikipedia",
98 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
99 ),
100 ],
101 limitations=[],
102 )
103
104 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
105 self._skip_magic_bytes(file)
106
107 while True:
108 # The last block is detected either because it is followed by the “EOF” (End of File) mark,
109 # or because it is followed by a known Frame Magic Number.
110 raw_bsize = file.read(BLOCK_SIZE_LEN)
111 if raw_bsize == b"": # EOF
112 break
113
114 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE)
115 if block_compressed_size in FRAME_MAGICS:
116 # next magic, read too far
117 file.seek(-4, io.SEEK_CUR)
118 break
119
120 compressed_block = file.read(block_compressed_size)
121 try:
122 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE)
123 except LZ4BlockError:
124 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None
125
126 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame
127 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE:
128 break
129
130 end_offset = file.tell()
131 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
132
133
134class SkippableFrameHandler(_LZ4HandlerBase):
135 """Can be anything, basically uncompressed data."""
136
137 NAME = "lz4_skippable"
138 PATTERNS = [HexString("5? 2A 4D 18")]
139
140 DOC = HandlerDoc(
141 name="LZ4 (skippable)",
142 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.",
143 handler_type=HandlerType.COMPRESSION,
144 vendor=None,
145 references=[
146 Reference(
147 title="LZ4 Frame Format Documentation",
148 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
149 ),
150 Reference(
151 title="LZ4 Wikipedia",
152 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
153 ),
154 ],
155 limitations=[],
156 )
157
158 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
159 self._skip_magic_bytes(file)
160 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE)
161 file.seek(frame_size, io.SEEK_CUR)
162 end_offset = file.tell()
163 return ValidChunk(start_offset=start_offset, end_offset=end_offset)
164
165
166class DefaultFrameHandler(_LZ4HandlerBase):
167 """Modern version, most frequently used."""
168
169 NAME = "lz4_default"
170
171 PATTERNS = [HexString("04 22 4D 18")]
172
173 DOC = HandlerDoc(
174 name="LZ4",
175 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.",
176 handler_type=HandlerType.COMPRESSION,
177 vendor=None,
178 references=[
179 Reference(
180 title="LZ4 Frame Format Documentation",
181 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md",
182 ),
183 Reference(
184 title="LZ4 Wikipedia",
185 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)",
186 ),
187 ],
188 limitations=[],
189 )
190
191 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
192 self._skip_magic_bytes(file)
193
194 # 2. we parse the frame descriptor of dynamic size
195 flg_bytes = file.read(FLG_LEN)
196 raw_flg = convert_int8(flg_bytes, Endian.LITTLE)
197 flg = FLG(raw_flg)
198 logger.debug("Parsed FLG", **flg.as_dict())
199
200 # skip BD (max blocksize), only useful for decoders that needs to allocate memory
201 file.seek(BD_LEN, io.SEEK_CUR)
202
203 if flg.content_size:
204 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR)
205 if flg.dictid:
206 file.seek(DICTID_LEN, io.SEEK_CUR)
207
208 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE)
209 logger.debug("Header checksum (HC) read", header_checksum=header_checksum)
210
211 # 3. we read block by block until we hit the endmarker
212 while True:
213 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE)
214 logger.debug("block_size", block_size=block_size)
215 if block_size == END_MARK:
216 break
217 file.seek(block_size, io.SEEK_CUR)
218 if flg.block_checksum:
219 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR)
220
221 # 4. we reached the endmark (0x00000000)
222
223 # 5. if frame descriptor mentions CRC, we add CRC
224 if flg.content_checksum:
225 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR)
226
227 end_offset = file.tell()
228
229 return ValidChunk(start_offset=start_offset, end_offset=end_offset)