1import io
2
3import attrs
4from pyperscan import Flag, Pattern, Scan, StreamDatabase
5from structlog import get_logger
6
7from unblob.file_utils import File, stream_scan
8from unblob.models import (
9 Endian,
10 Handler,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 StructParser,
15 ValidChunk,
16)
17
18from ._qnap import C_DEFINITIONS, QnapExtractor
19
20logger = get_logger()
21
22SECRET = "QNAPNASVERSION" # noqa: S105
23
24FOOTER_PATTERN = [
25 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes
26]
27
28_STRUCT_PARSER = StructParser(C_DEFINITIONS)
29
30
31@attrs.define
32class QTSSearchContext:
33 start_offset: int
34 file: File
35 end_offset: int
36
37
38def is_valid_header(header) -> bool:
39 try:
40 header.device_id.decode("utf-8")
41 header.file_version.decode("utf-8")
42 header.firmware_date.decode("utf-8")
43 header.revision.decode("utf-8")
44 except UnicodeDecodeError:
45 return False
46 return True
47
48
49def _hyperscan_match(
50 context: QTSSearchContext, pattern_id: int, offset: int, end: int
51) -> Scan:
52 del pattern_id, end # unused arguments
53 if offset < context.start_offset:
54 return Scan.Continue
55 context.file.seek(offset, io.SEEK_SET)
56 header = _STRUCT_PARSER.parse("qnap_header_t", context.file, Endian.LITTLE)
57 logger.debug("qnap_header_t", header=header)
58
59 if is_valid_header(header):
60 context.end_offset = context.file.tell()
61 return Scan.Terminate
62 return Scan.Continue
63
64
65def build_stream_end_scan_db(pattern_list):
66 return StreamDatabase(
67 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
68 )
69
70
71hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN)
72
73
74class QnapNasExtractor(QnapExtractor):
75 def _get_secret(self, header) -> str:
76 return SECRET + header.file_version.decode("utf-8")[0]
77
78
79class QnapHandler(Handler):
80 NAME = "qnap_nas"
81
82 PATTERNS = [
83 HexString("F5 7B 47 03"),
84 ]
85 EXTRACTOR = QnapNasExtractor()
86
87 DOC = HandlerDoc(
88 name="QNAP NAS",
89 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.",
90 handler_type=HandlerType.ARCHIVE,
91 vendor="QNAP",
92 references=[],
93 limitations=[],
94 )
95
96 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
97 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1)
98
99 try:
100 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
101 stream_scan(scanner, file)
102 except Exception as e:
103 logger.debug(
104 "Error scanning for QNAP patterns",
105 error=e,
106 )
107 if context.end_offset > 0:
108 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset)
109 return None