1import io
2from typing import Optional
3
4import attrs
5from pyperscan import Flag, Pattern, Scan, StreamDatabase
6from structlog import get_logger
7
8from unblob.extractors import Command
9
10from ...file_utils import (
11 Endian,
12 convert_int8,
13 convert_int16,
14 convert_int32,
15 decode_multibyte_integer,
16 read_until_past,
17 round_up,
18 stream_scan,
19)
20from ...models import (
21 File,
22 Handler,
23 HandlerDoc,
24 HandlerType,
25 HexString,
26 InvalidInputFormat,
27 Reference,
28 ValidChunk,
29)
30
31logger = get_logger()
32
33# The .xz file format definition: https://tukaani.org/xz/xz-file-format-1.0.4.txt
34
35STREAM_START_MAGIC = b"\xfd\x37\x7a\x58\x5a\x00"
36
37STREAM_END_MAGIC_PATTERNS = [
38 HexString("00 00 59 5A"), # None
39 HexString("00 01 59 5A"), # CRC32
40 HexString("00 04 59 5A"), # CRC64
41 HexString("00 0A 59 5A"), # SHA-256
42]
43
44NONE_STREAM_FLAG = 0x0
45CRC32_STREAM_FLAG = 0x1
46CRC64_STREAM_FLAG = 0x4
47SHA256_STREAM_FLAG = 0xA
48VALID_FLAGS = [
49 NONE_STREAM_FLAG,
50 CRC32_STREAM_FLAG,
51 CRC64_STREAM_FLAG,
52 SHA256_STREAM_FLAG,
53]
54BACKWARD_SIZE_LEN = 4
55MAX_MBI_LEN = 9 # maximum multi-byte integer size is 9, per XZ standard
56XZ_PADDING = 4 # XZ format byte alignment
57FLAG_LEN = 2
58EOS_MAGIC_LEN = 2
59CRC32_LEN = 4
60STREAM_HEADER_LEN = len(STREAM_START_MAGIC) + FLAG_LEN + CRC32_LEN
61STREAM_FOOTER_LEN = CRC32_LEN + BACKWARD_SIZE_LEN + FLAG_LEN + EOS_MAGIC_LEN
62
63
64def build_stream_end_scan_db(pattern_list):
65 return StreamDatabase(
66 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
67 )
68
69
70hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS)
71
72
73@attrs.define
74class XZSearchContext:
75 start_offset: int
76 file: File
77 end_streams_offset: int
78 stream_flag: int
79
80
81def read_multibyte_int(file: File) -> tuple[int, int]:
82 """Read a multibyte integer and return the number of bytes read and the integer itself."""
83 data = bytearray(file.read(MAX_MBI_LEN))
84 file.seek(-MAX_MBI_LEN, io.SEEK_CUR)
85 size, mbi = decode_multibyte_integer(data)
86 file.seek(size, io.SEEK_CUR)
87 return size, mbi
88
89
90def get_stream_size(footer_offset: int, file: File) -> int:
91 file.seek(footer_offset - BACKWARD_SIZE_LEN, io.SEEK_SET)
92 backward_bytes = file.read(BACKWARD_SIZE_LEN)
93 stored_backward_size = convert_int32(backward_bytes, Endian.LITTLE)
94 real_backward_size = (stored_backward_size + 1) * 4
95
96 if real_backward_size > footer_offset - CRC32_LEN - BACKWARD_SIZE_LEN:
97 raise InvalidInputFormat("Invalid backward size.")
98
99 # skip backwards to the end of the Index
100 file.seek(-CRC32_LEN - BACKWARD_SIZE_LEN, io.SEEK_CUR)
101
102 # skip backwards of backward size to the start of index
103 file.seek(-real_backward_size, io.SEEK_CUR)
104
105 index_size = 0
106 index_indicator = convert_int8(file.read(1), Endian.LITTLE)
107 # index indicator must be 0, per xz standard
108 if index_indicator != 0:
109 raise InvalidInputFormat("Invalid index indicator")
110
111 index_size += 1
112
113 # read Index 'Number of Records'
114 size, num_records = read_multibyte_int(file)
115 index_size += size
116
117 # read Record 'Unpadded Size' and 'Uncompressed Size' for every Record
118 blocks_size = 0
119 for _ in range(num_records):
120 size, unpadded_size = read_multibyte_int(file)
121 index_size += size
122
123 size, _ = read_multibyte_int(file)
124 index_size += size
125
126 blocks_size += round_up(unpadded_size, XZ_PADDING)
127
128 index_size += CRC32_LEN
129
130 return round_up(
131 (STREAM_HEADER_LEN + blocks_size + index_size + STREAM_FOOTER_LEN),
132 XZ_PADDING,
133 )
134
135
136def _hyperscan_match(
137 context: XZSearchContext, pattern_id: int, offset: int, end: int
138) -> Scan:
139 del pattern_id, end # unused arguments
140 # if we matched before our start offset, continue looking
141 end_offset = offset + FLAG_LEN + EOS_MAGIC_LEN
142 if end_offset < context.start_offset:
143 return Scan.Continue
144
145 try:
146 stream_size = get_stream_size(offset, context.file)
147 except InvalidInputFormat:
148 return Scan.Continue
149
150 # stream_size does not match, we continue our search
151 if stream_size != (end_offset - context.start_offset):
152 return Scan.Continue
153
154 # stream padding validation
155 # padding MUST contain only null bytes and be 4 bytes aligned
156 context.file.seek(end_offset)
157 end_padding_offset = read_until_past(context.file, b"\x00")
158 padding_size = end_padding_offset - end_offset
159 if padding_size % 4 != 0:
160 context.end_streams_offset = end_offset
161 return Scan.Continue
162
163 # next magic validation
164 context.end_streams_offset = end_padding_offset
165 context.file.seek(end_padding_offset, io.SEEK_SET)
166 magic = context.file.read(len(STREAM_START_MAGIC))
167 if magic == STREAM_START_MAGIC:
168 context.start_offset = end_padding_offset
169 return Scan.Continue
170 return Scan.Terminate
171
172
173class XZHandler(Handler):
174 NAME = "xz"
175
176 PATTERNS = [HexString("FD 37 7A 58 5A 00")]
177
178 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="xz.uncompressed")
179
180 DOC = HandlerDoc(
181 name="XZ",
182 description="XZ is a compressed file format that uses the LZMA2 algorithm for high compression efficiency. It is designed for general-purpose data compression with support for integrity checks and padding for alignment.",
183 handler_type=HandlerType.COMPRESSION,
184 vendor=None,
185 references=[
186 Reference(
187 title="XZ File Format Specification",
188 url="https://tukaani.org/xz/xz-file-format-1.0.4.txt",
189 ),
190 Reference(
191 title="XZ Wikipedia",
192 url="https://en.wikipedia.org/wiki/XZ_Utils",
193 ),
194 ],
195 limitations=[],
196 )
197
198 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
199 file.seek(start_offset + len(STREAM_START_MAGIC), io.SEEK_SET)
200 stream_flag = convert_int16(file.read(2), Endian.BIG)
201 if stream_flag not in VALID_FLAGS:
202 raise InvalidInputFormat("Invalid stream flag for xz stream.")
203
204 context = XZSearchContext(
205 start_offset=start_offset,
206 file=file,
207 end_streams_offset=-1,
208 stream_flag=stream_flag,
209 )
210
211 try:
212 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
213 stream_scan(scanner, file)
214 except Exception as e:
215 logger.debug(
216 "Error scanning for xz patterns",
217 error=e,
218 )
219
220 if context.end_streams_offset > 0:
221 return ValidChunk(
222 start_offset=start_offset, end_offset=context.end_streams_offset
223 )
224
225 raise InvalidInputFormat("No valid xz compression stream was detected")