1import io
2
3import attrs
4from pyperscan import Flag, Pattern, Scan, StreamDatabase
5from structlog import get_logger
6
7from unblob.extractors import Command
8
9from ...file_utils import (
10 Endian,
11 convert_int8,
12 convert_int16,
13 convert_int32,
14 decode_multibyte_integer,
15 read_until_past,
16 round_up,
17 stream_scan,
18)
19from ...models import (
20 File,
21 Handler,
22 HandlerDoc,
23 HandlerType,
24 HexString,
25 InvalidInputFormat,
26 Reference,
27 ValidChunk,
28)
29
30logger = get_logger()
31
32# The .xz file format definition: https://tukaani.org/xz/xz-file-format-1.0.4.txt
33
34STREAM_START_MAGIC = b"\xfd\x37\x7a\x58\x5a\x00"
35
36STREAM_END_MAGIC_PATTERNS = [
37 HexString("00 00 59 5A"), # None
38 HexString("00 01 59 5A"), # CRC32
39 HexString("00 04 59 5A"), # CRC64
40 HexString("00 0A 59 5A"), # SHA-256
41]
42
43NONE_STREAM_FLAG = 0x0
44CRC32_STREAM_FLAG = 0x1
45CRC64_STREAM_FLAG = 0x4
46SHA256_STREAM_FLAG = 0xA
47VALID_FLAGS = [
48 NONE_STREAM_FLAG,
49 CRC32_STREAM_FLAG,
50 CRC64_STREAM_FLAG,
51 SHA256_STREAM_FLAG,
52]
53BACKWARD_SIZE_LEN = 4
54MAX_MBI_LEN = 9 # maximum multi-byte integer size is 9, per XZ standard
55XZ_PADDING = 4 # XZ format byte alignment
56FLAG_LEN = 2
57EOS_MAGIC_LEN = 2
58CRC32_LEN = 4
59STREAM_HEADER_LEN = len(STREAM_START_MAGIC) + FLAG_LEN + CRC32_LEN
60STREAM_FOOTER_LEN = CRC32_LEN + BACKWARD_SIZE_LEN + FLAG_LEN + EOS_MAGIC_LEN
61
62
63def build_stream_end_scan_db(pattern_list):
64 return StreamDatabase(
65 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
66 )
67
68
69hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS)
70
71
72@attrs.define
73class XZSearchContext:
74 start_offset: int
75 file: File
76 end_streams_offset: int
77 stream_flag: int
78
79
80def read_multibyte_int(file: File) -> tuple[int, int]:
81 """Read a multibyte integer and return the number of bytes read and the integer itself."""
82 data = bytearray(file.read(MAX_MBI_LEN))
83 file.seek(-MAX_MBI_LEN, io.SEEK_CUR)
84 size, mbi = decode_multibyte_integer(data)
85 file.seek(size, io.SEEK_CUR)
86 return size, mbi
87
88
89def get_stream_size(footer_offset: int, file: File) -> int:
90 file.seek(footer_offset - BACKWARD_SIZE_LEN, io.SEEK_SET)
91 backward_bytes = file.read(BACKWARD_SIZE_LEN)
92 stored_backward_size = convert_int32(backward_bytes, Endian.LITTLE)
93 real_backward_size = (stored_backward_size + 1) * 4
94
95 if real_backward_size > footer_offset - CRC32_LEN - BACKWARD_SIZE_LEN:
96 raise InvalidInputFormat("Invalid backward size.")
97
98 # skip backwards to the end of the Index
99 file.seek(-CRC32_LEN - BACKWARD_SIZE_LEN, io.SEEK_CUR)
100
101 # skip backwards of backward size to the start of index
102 file.seek(-real_backward_size, io.SEEK_CUR)
103
104 index_size = 0
105 index_indicator = convert_int8(file.read(1), Endian.LITTLE)
106 # index indicator must be 0, per xz standard
107 if index_indicator != 0:
108 raise InvalidInputFormat("Invalid index indicator")
109
110 index_size += 1
111
112 # read Index 'Number of Records'
113 size, num_records = read_multibyte_int(file)
114 index_size += size
115
116 # read Record 'Unpadded Size' and 'Uncompressed Size' for every Record
117 blocks_size = 0
118 for _ in range(num_records):
119 size, unpadded_size = read_multibyte_int(file)
120 index_size += size
121
122 size, _ = read_multibyte_int(file)
123 index_size += size
124
125 blocks_size += round_up(unpadded_size, XZ_PADDING)
126
127 index_size += CRC32_LEN
128
129 return round_up(
130 (STREAM_HEADER_LEN + blocks_size + index_size + STREAM_FOOTER_LEN),
131 XZ_PADDING,
132 )
133
134
135def _hyperscan_match(
136 context: XZSearchContext, pattern_id: int, offset: int, end: int
137) -> Scan:
138 del pattern_id, end # unused arguments
139 # if we matched before our start offset, continue looking
140 end_offset = offset + FLAG_LEN + EOS_MAGIC_LEN
141 if end_offset < context.start_offset:
142 return Scan.Continue
143
144 try:
145 stream_size = get_stream_size(offset, context.file)
146 except InvalidInputFormat:
147 return Scan.Continue
148
149 # stream_size does not match, we continue our search
150 if stream_size != (end_offset - context.start_offset):
151 return Scan.Continue
152
153 # stream padding validation
154 # padding MUST contain only null bytes and be 4 bytes aligned
155 context.file.seek(end_offset)
156 end_padding_offset = read_until_past(context.file, b"\x00")
157 padding_size = end_padding_offset - end_offset
158 if padding_size % 4 != 0:
159 context.end_streams_offset = end_offset
160 return Scan.Continue
161
162 # next magic validation
163 context.end_streams_offset = end_padding_offset
164 context.file.seek(end_padding_offset, io.SEEK_SET)
165 magic = context.file.read(len(STREAM_START_MAGIC))
166 if magic == STREAM_START_MAGIC:
167 context.start_offset = end_padding_offset
168 return Scan.Continue
169 return Scan.Terminate
170
171
172class XZHandler(Handler):
173 NAME = "xz"
174
175 PATTERNS = [HexString("FD 37 7A 58 5A 00")]
176
177 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="xz.uncompressed")
178
179 DOC = HandlerDoc(
180 name="XZ",
181 description="XZ is a compressed file format that uses the LZMA2 algorithm for high compression efficiency. It is designed for general-purpose data compression with support for integrity checks and padding for alignment.",
182 handler_type=HandlerType.COMPRESSION,
183 vendor=None,
184 references=[
185 Reference(
186 title="XZ File Format Specification",
187 url="https://tukaani.org/xz/xz-file-format-1.0.4.txt",
188 ),
189 Reference(
190 title="XZ Wikipedia",
191 url="https://en.wikipedia.org/wiki/XZ_Utils",
192 ),
193 ],
194 limitations=[],
195 )
196
197 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
198 file.seek(start_offset + len(STREAM_START_MAGIC), io.SEEK_SET)
199 stream_flag = convert_int16(file.read(2), Endian.BIG)
200 if stream_flag not in VALID_FLAGS:
201 raise InvalidInputFormat("Invalid stream flag for xz stream.")
202
203 context = XZSearchContext(
204 start_offset=start_offset,
205 file=file,
206 end_streams_offset=-1,
207 stream_flag=stream_flag,
208 )
209
210 try:
211 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
212 stream_scan(scanner, file)
213 except Exception as e:
214 logger.debug(
215 "Error scanning for xz patterns",
216 error=e,
217 )
218
219 if context.end_streams_offset > 0:
220 return ValidChunk(
221 start_offset=start_offset, end_offset=context.end_streams_offset
222 )
223
224 raise InvalidInputFormat("No valid xz compression stream was detected")