1import io
2from pathlib import Path
3from typing import Optional
4
5import attrs
6from pyperscan import Flag, Pattern, Scan, StreamDatabase
7from structlog import get_logger
8
9from unblob.file_utils import File, iterate_file, stream_scan
10from unblob.models import (
11 Endian,
12 Extractor,
13 Handler,
14 HandlerDoc,
15 HandlerType,
16 HexString,
17 StructParser,
18 ValidChunk,
19)
20
21logger = get_logger()
22
23FOOTER_LEN = 74
24SECRET = "QNAPNASVERSION" # noqa: S105
25
26C_DEFINITIONS = """
27 typedef struct qnap_header {
28 char magic[6];
29 uint32 encrypted_len;
30 char device_id[16];
31 char file_version[16];
32 char firmware_date[16];
33 char revision[16];
34 } qnap_header_t;
35"""
36FOOTER_PATTERN = [
37 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes
38]
39
40
41@attrs.define
42class QTSSearchContext:
43 start_offset: int
44 file: File
45 end_offset: int
46
47
48def is_valid_header(header) -> bool:
49 try:
50 header.device_id.decode("utf-8")
51 header.file_version.decode("utf-8")
52 header.firmware_date.decode("utf-8")
53 header.revision.decode("utf-8")
54 except UnicodeDecodeError:
55 return False
56 return True
57
58
59def _hyperscan_match(
60 context: QTSSearchContext, pattern_id: int, offset: int, end: int
61) -> Scan:
62 del pattern_id, end # unused arguments
63 if offset < context.start_offset:
64 return Scan.Continue
65 context.file.seek(offset, io.SEEK_SET)
66 struct_parser = StructParser(C_DEFINITIONS)
67 header = struct_parser.parse("qnap_header_t", context.file, Endian.LITTLE)
68 logger.debug("qnap_header_t", header=header)
69
70 if is_valid_header(header):
71 context.end_offset = context.file.tell()
72 return Scan.Terminate
73 return Scan.Continue
74
75
76def build_stream_end_scan_db(pattern_list):
77 return StreamDatabase(
78 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
79 )
80
81
82hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN)
83
84
85class QnapExtractor(Extractor):
86 def __init__(self):
87 self._struct_parser = StructParser(C_DEFINITIONS)
88
89 def extract(self, inpath: Path, outdir: Path):
90 outpath = outdir.joinpath(f"{inpath.name}.decrypted")
91 with File.from_path(inpath) as file:
92 file.seek(-FOOTER_LEN, io.SEEK_END)
93 header = self._struct_parser.parse("qnap_header_t", file, Endian.LITTLE)
94 eof = file.tell()
95 cryptor = Cryptor(SECRET + header.file_version.decode("utf-8")[0])
96 with outpath.open("wb") as outfile:
97 for chunk in iterate_file(file, 0, header.encrypted_len, 1024):
98 outfile.write(cryptor.decrypt_chunk(chunk))
99 for chunk in iterate_file(
100 file,
101 header.encrypted_len,
102 eof - FOOTER_LEN - header.encrypted_len,
103 1024,
104 ):
105 outfile.write(chunk)
106
107
108class QnapHandler(Handler):
109 NAME = "qnap_nas"
110
111 PATTERNS = [
112 HexString("F5 7B 47 03"),
113 ]
114 EXTRACTOR = QnapExtractor()
115
116 DOC = HandlerDoc(
117 name="QNAP NAS",
118 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.",
119 handler_type=HandlerType.ARCHIVE,
120 vendor="QNAP",
121 references=[],
122 limitations=[],
123 )
124
125 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
126 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1)
127
128 try:
129 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
130 stream_scan(scanner, file)
131 except Exception as e:
132 logger.debug(
133 "Error scanning for QNAP patterns",
134 error=e,
135 )
136 if context.end_offset > 0:
137 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset)
138 return None
139
140
141# https://gist.github.com/ulidtko/966277a465f1856109b2d2674dcee741#file-qnap-qts-fw-cryptor-py-L114
142class Cryptor:
143 def __init__(self, secret):
144 self.secret = list(bytes(secret, "ascii"))
145 self.n = len(secret) // 2
146 if self.n % 2 == 0:
147 self.secret.append(0)
148 self.precompute_k()
149 self.acc = 0
150 self.y = 0
151 self.z = 0
152
153 def scan(self, f, xs, s0):
154 s = s0
155 for x in xs:
156 w, s = f(s, x)
157 yield w
158
159 def promote(self, char):
160 return char if char < 0x80 else char - 0x101
161
162 def precompute_k(self):
163 self.k = {acc: self.table_for_acc(acc) for acc in range(256)}
164
165 def table_for_acc(self, a):
166 ks = [
167 0xFFFFFFFF
168 & (
169 (self.promote(self.secret[2 * i] ^ a) << 8)
170 + (self.secret[2 * i + 1] ^ a)
171 )
172 for i in range(self.n)
173 ]
174
175 def kstep(st, q):
176 x = st ^ q
177 y = self.lcg(x)
178 z = 0xFFFF & (0x15A * x)
179 return (z, y), y
180
181 return list(self.scan(kstep, ks, 0))
182
183 def lcg(self, x):
184 return 0xFFFF & (0x4E35 * x + 1)
185
186 def kdf(self):
187 """self.secret -> 8bit hash (+ state effects)."""
188 tt = self.k[self.acc]
189 res = 0
190 for i in range(self.n):
191 yy = self.y
192 self.y, t2 = tt[i]
193 self.z = 0xFFFF & (self.y + yy + 0x4E35 * (self.z + i))
194 res = res ^ t2 ^ self.z
195 hi, lo = res >> 8, res & 0xFF
196 return hi ^ lo
197
198 def decrypt_byte(self, v):
199 k = self.kdf()
200 r = 0xFF & (v ^ k)
201 self.acc = self.acc ^ r
202 return r
203
204 def decrypt_chunk(self, chunk):
205 return bytes(map(self.decrypt_byte, chunk))