1import io
2from pathlib import Path
3
4import attrs
5from pyperscan import Flag, Pattern, Scan, StreamDatabase
6from structlog import get_logger
7
8from unblob.file_utils import File, iterate_file, stream_scan
9from unblob.models import (
10 Endian,
11 Extractor,
12 Handler,
13 HandlerDoc,
14 HandlerType,
15 HexString,
16 StructParser,
17 ValidChunk,
18)
19
20logger = get_logger()
21
22FOOTER_LEN = 74
23SECRET = "QNAPNASVERSION" # noqa: S105
24
25C_DEFINITIONS = """
26 typedef struct qnap_header {
27 char magic[6];
28 uint32 encrypted_len;
29 char device_id[16];
30 char file_version[16];
31 char firmware_date[16];
32 char revision[16];
33 } qnap_header_t;
34"""
35FOOTER_PATTERN = [
36 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes
37]
38
39
40@attrs.define
41class QTSSearchContext:
42 start_offset: int
43 file: File
44 end_offset: int
45
46
47def is_valid_header(header) -> bool:
48 try:
49 header.device_id.decode("utf-8")
50 header.file_version.decode("utf-8")
51 header.firmware_date.decode("utf-8")
52 header.revision.decode("utf-8")
53 except UnicodeDecodeError:
54 return False
55 return True
56
57
58def _hyperscan_match(
59 context: QTSSearchContext, pattern_id: int, offset: int, end: int
60) -> Scan:
61 del pattern_id, end # unused arguments
62 if offset < context.start_offset:
63 return Scan.Continue
64 context.file.seek(offset, io.SEEK_SET)
65 struct_parser = StructParser(C_DEFINITIONS)
66 header = struct_parser.parse("qnap_header_t", context.file, Endian.LITTLE)
67 logger.debug("qnap_header_t", header=header)
68
69 if is_valid_header(header):
70 context.end_offset = context.file.tell()
71 return Scan.Terminate
72 return Scan.Continue
73
74
75def build_stream_end_scan_db(pattern_list):
76 return StreamDatabase(
77 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
78 )
79
80
81hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN)
82
83
84class QnapExtractor(Extractor):
85 def __init__(self):
86 self._struct_parser = StructParser(C_DEFINITIONS)
87
88 def extract(self, inpath: Path, outdir: Path):
89 outpath = outdir.joinpath(f"{inpath.name}.decrypted")
90 with File.from_path(inpath) as file:
91 file.seek(-FOOTER_LEN, io.SEEK_END)
92 header = self._struct_parser.parse("qnap_header_t", file, Endian.LITTLE)
93 eof = file.tell()
94 cryptor = Cryptor(SECRET + header.file_version.decode("utf-8")[0])
95 with outpath.open("wb") as outfile:
96 for chunk in iterate_file(file, 0, header.encrypted_len, 1024):
97 outfile.write(cryptor.decrypt_chunk(chunk))
98 for chunk in iterate_file(
99 file,
100 header.encrypted_len,
101 eof - FOOTER_LEN - header.encrypted_len,
102 1024,
103 ):
104 outfile.write(chunk)
105
106
107class QnapHandler(Handler):
108 NAME = "qnap_nas"
109
110 PATTERNS = [
111 HexString("F5 7B 47 03"),
112 ]
113 EXTRACTOR = QnapExtractor()
114
115 DOC = HandlerDoc(
116 name="QNAP NAS",
117 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.",
118 handler_type=HandlerType.ARCHIVE,
119 vendor="QNAP",
120 references=[],
121 limitations=[],
122 )
123
124 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
125 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1)
126
127 try:
128 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
129 stream_scan(scanner, file)
130 except Exception as e:
131 logger.debug(
132 "Error scanning for QNAP patterns",
133 error=e,
134 )
135 if context.end_offset > 0:
136 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset)
137 return None
138
139
140# https://gist.github.com/ulidtko/966277a465f1856109b2d2674dcee741#file-qnap-qts-fw-cryptor-py-L114
141class Cryptor:
142 def __init__(self, secret):
143 self.secret = list(bytes(secret, "ascii"))
144 self.n = len(secret) // 2
145 if self.n % 2 == 0:
146 self.secret.append(0)
147 self.precompute_k()
148 self.acc = 0
149 self.y = 0
150 self.z = 0
151
152 def scan(self, f, xs, s0):
153 s = s0
154 for x in xs:
155 w, s = f(s, x)
156 yield w
157
158 def promote(self, char):
159 return char if char < 0x80 else char - 0x101
160
161 def precompute_k(self):
162 self.k = {acc: self.table_for_acc(acc) for acc in range(256)}
163
164 def table_for_acc(self, a):
165 ks = [
166 0xFFFFFFFF
167 & (
168 (self.promote(self.secret[2 * i] ^ a) << 8)
169 + (self.secret[2 * i + 1] ^ a)
170 )
171 for i in range(self.n)
172 ]
173
174 def kstep(st, q):
175 x = st ^ q
176 y = self.lcg(x)
177 z = 0xFFFF & (0x15A * x)
178 return (z, y), y
179
180 return list(self.scan(kstep, ks, 0))
181
182 def lcg(self, x):
183 return 0xFFFF & (0x4E35 * x + 1)
184
185 def kdf(self):
186 """self.secret -> 8bit hash (+ state effects)."""
187 tt = self.k[self.acc]
188 res = 0
189 for i in range(self.n):
190 yy = self.y
191 self.y, t2 = tt[i]
192 self.z = 0xFFFF & (self.y + yy + 0x4E35 * (self.z + i))
193 res = res ^ t2 ^ self.z
194 hi, lo = res >> 8, res & 0xFF
195 return hi ^ lo
196
197 def decrypt_byte(self, v):
198 k = self.kdf()
199 r = 0xFF & (v ^ k)
200 self.acc = self.acc ^ r
201 return r
202
203 def decrypt_chunk(self, chunk):
204 return bytes(map(self.decrypt_byte, chunk))