Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/qnap/qnap_nas.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1import io 

2from pathlib import Path 

3 

4import attrs 

5from pyperscan import Flag, Pattern, Scan, StreamDatabase 

6from structlog import get_logger 

7 

8from unblob.file_utils import File, iterate_file, stream_scan 

9from unblob.models import ( 

10 Endian, 

11 Extractor, 

12 Handler, 

13 HandlerDoc, 

14 HandlerType, 

15 HexString, 

16 StructParser, 

17 ValidChunk, 

18) 

19 

20logger = get_logger() 

21 

22FOOTER_LEN = 74 

23SECRET = "QNAPNASVERSION" # noqa: S105 

24 

25C_DEFINITIONS = """ 

26 typedef struct qnap_header { 

27 char magic[6]; 

28 uint32 encrypted_len; 

29 char device_id[16]; 

30 char file_version[16]; 

31 char firmware_date[16]; 

32 char revision[16]; 

33 } qnap_header_t; 

34""" 

35FOOTER_PATTERN = [ 

36 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes 

37] 

38 

39 

40@attrs.define 

41class QTSSearchContext: 

42 start_offset: int 

43 file: File 

44 end_offset: int 

45 

46 

47def is_valid_header(header) -> bool: 

48 try: 

49 header.device_id.decode("utf-8") 

50 header.file_version.decode("utf-8") 

51 header.firmware_date.decode("utf-8") 

52 header.revision.decode("utf-8") 

53 except UnicodeDecodeError: 

54 return False 

55 return True 

56 

57 

58def _hyperscan_match( 

59 context: QTSSearchContext, pattern_id: int, offset: int, end: int 

60) -> Scan: 

61 del pattern_id, end # unused arguments 

62 if offset < context.start_offset: 

63 return Scan.Continue 

64 context.file.seek(offset, io.SEEK_SET) 

65 struct_parser = StructParser(C_DEFINITIONS) 

66 header = struct_parser.parse("qnap_header_t", context.file, Endian.LITTLE) 

67 logger.debug("qnap_header_t", header=header) 

68 

69 if is_valid_header(header): 

70 context.end_offset = context.file.tell() 

71 return Scan.Terminate 

72 return Scan.Continue 

73 

74 

75def build_stream_end_scan_db(pattern_list): 

76 return StreamDatabase( 

77 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

78 ) 

79 

80 

81hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN) 

82 

83 

84class QnapExtractor(Extractor): 

85 def __init__(self): 

86 self._struct_parser = StructParser(C_DEFINITIONS) 

87 

88 def extract(self, inpath: Path, outdir: Path): 

89 outpath = outdir.joinpath(f"{inpath.name}.decrypted") 

90 with File.from_path(inpath) as file: 

91 file.seek(-FOOTER_LEN, io.SEEK_END) 

92 header = self._struct_parser.parse("qnap_header_t", file, Endian.LITTLE) 

93 eof = file.tell() 

94 cryptor = Cryptor(SECRET + header.file_version.decode("utf-8")[0]) 

95 with outpath.open("wb") as outfile: 

96 for chunk in iterate_file(file, 0, header.encrypted_len, 1024): 

97 outfile.write(cryptor.decrypt_chunk(chunk)) 

98 for chunk in iterate_file( 

99 file, 

100 header.encrypted_len, 

101 eof - FOOTER_LEN - header.encrypted_len, 

102 1024, 

103 ): 

104 outfile.write(chunk) 

105 

106 

107class QnapHandler(Handler): 

108 NAME = "qnap_nas" 

109 

110 PATTERNS = [ 

111 HexString("F5 7B 47 03"), 

112 ] 

113 EXTRACTOR = QnapExtractor() 

114 

115 DOC = HandlerDoc( 

116 name="QNAP NAS", 

117 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.", 

118 handler_type=HandlerType.ARCHIVE, 

119 vendor="QNAP", 

120 references=[], 

121 limitations=[], 

122 ) 

123 

124 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

125 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1) 

126 

127 try: 

128 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

129 stream_scan(scanner, file) 

130 except Exception as e: 

131 logger.debug( 

132 "Error scanning for QNAP patterns", 

133 error=e, 

134 ) 

135 if context.end_offset > 0: 

136 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset) 

137 return None 

138 

139 

140# https://gist.github.com/ulidtko/966277a465f1856109b2d2674dcee741#file-qnap-qts-fw-cryptor-py-L114 

141class Cryptor: 

142 def __init__(self, secret): 

143 self.secret = list(bytes(secret, "ascii")) 

144 self.n = len(secret) // 2 

145 if self.n % 2 == 0: 

146 self.secret.append(0) 

147 self.precompute_k() 

148 self.acc = 0 

149 self.y = 0 

150 self.z = 0 

151 

152 def scan(self, f, xs, s0): 

153 s = s0 

154 for x in xs: 

155 w, s = f(s, x) 

156 yield w 

157 

158 def promote(self, char): 

159 return char if char < 0x80 else char - 0x101 

160 

161 def precompute_k(self): 

162 self.k = {acc: self.table_for_acc(acc) for acc in range(256)} 

163 

164 def table_for_acc(self, a): 

165 ks = [ 

166 0xFFFFFFFF 

167 & ( 

168 (self.promote(self.secret[2 * i] ^ a) << 8) 

169 + (self.secret[2 * i + 1] ^ a) 

170 ) 

171 for i in range(self.n) 

172 ] 

173 

174 def kstep(st, q): 

175 x = st ^ q 

176 y = self.lcg(x) 

177 z = 0xFFFF & (0x15A * x) 

178 return (z, y), y 

179 

180 return list(self.scan(kstep, ks, 0)) 

181 

182 def lcg(self, x): 

183 return 0xFFFF & (0x4E35 * x + 1) 

184 

185 def kdf(self): 

186 """self.secret -> 8bit hash (+ state effects).""" 

187 tt = self.k[self.acc] 

188 res = 0 

189 for i in range(self.n): 

190 yy = self.y 

191 self.y, t2 = tt[i] 

192 self.z = 0xFFFF & (self.y + yy + 0x4E35 * (self.z + i)) 

193 res = res ^ t2 ^ self.z 

194 hi, lo = res >> 8, res & 0xFF 

195 return hi ^ lo 

196 

197 def decrypt_byte(self, v): 

198 k = self.kdf() 

199 r = 0xFF & (v ^ k) 

200 self.acc = self.acc ^ r 

201 return r 

202 

203 def decrypt_chunk(self, chunk): 

204 return bytes(map(self.decrypt_byte, chunk))