Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/qnap/qnap_nas.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1import io 

2from pathlib import Path 

3from typing import Optional 

4 

5import attrs 

6from pyperscan import Flag, Pattern, Scan, StreamDatabase 

7from structlog import get_logger 

8 

9from unblob.file_utils import File, iterate_file, stream_scan 

10from unblob.models import ( 

11 Endian, 

12 Extractor, 

13 Handler, 

14 HandlerDoc, 

15 HandlerType, 

16 HexString, 

17 StructParser, 

18 ValidChunk, 

19) 

20 

21logger = get_logger() 

22 

23FOOTER_LEN = 74 

24SECRET = "QNAPNASVERSION" # noqa: S105 

25 

26C_DEFINITIONS = """ 

27 typedef struct qnap_header { 

28 char magic[6]; 

29 uint32 encrypted_len; 

30 char device_id[16]; 

31 char file_version[16]; 

32 char firmware_date[16]; 

33 char revision[16]; 

34 } qnap_header_t; 

35""" 

36FOOTER_PATTERN = [ 

37 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes 

38] 

39 

40 

41@attrs.define 

42class QTSSearchContext: 

43 start_offset: int 

44 file: File 

45 end_offset: int 

46 

47 

48def is_valid_header(header) -> bool: 

49 try: 

50 header.device_id.decode("utf-8") 

51 header.file_version.decode("utf-8") 

52 header.firmware_date.decode("utf-8") 

53 header.revision.decode("utf-8") 

54 except UnicodeDecodeError: 

55 return False 

56 return True 

57 

58 

59def _hyperscan_match( 

60 context: QTSSearchContext, pattern_id: int, offset: int, end: int 

61) -> Scan: 

62 del pattern_id, end # unused arguments 

63 if offset < context.start_offset: 

64 return Scan.Continue 

65 context.file.seek(offset, io.SEEK_SET) 

66 struct_parser = StructParser(C_DEFINITIONS) 

67 header = struct_parser.parse("qnap_header_t", context.file, Endian.LITTLE) 

68 logger.debug("qnap_header_t", header=header) 

69 

70 if is_valid_header(header): 

71 context.end_offset = context.file.tell() 

72 return Scan.Terminate 

73 return Scan.Continue 

74 

75 

76def build_stream_end_scan_db(pattern_list): 

77 return StreamDatabase( 

78 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

79 ) 

80 

81 

82hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN) 

83 

84 

85class QnapExtractor(Extractor): 

86 def __init__(self): 

87 self._struct_parser = StructParser(C_DEFINITIONS) 

88 

89 def extract(self, inpath: Path, outdir: Path): 

90 outpath = outdir.joinpath(f"{inpath.name}.decrypted") 

91 with File.from_path(inpath) as file: 

92 file.seek(-FOOTER_LEN, io.SEEK_END) 

93 header = self._struct_parser.parse("qnap_header_t", file, Endian.LITTLE) 

94 eof = file.tell() 

95 cryptor = Cryptor(SECRET + header.file_version.decode("utf-8")[0]) 

96 with outpath.open("wb") as outfile: 

97 for chunk in iterate_file(file, 0, header.encrypted_len, 1024): 

98 outfile.write(cryptor.decrypt_chunk(chunk)) 

99 for chunk in iterate_file( 

100 file, 

101 header.encrypted_len, 

102 eof - FOOTER_LEN - header.encrypted_len, 

103 1024, 

104 ): 

105 outfile.write(chunk) 

106 

107 

108class QnapHandler(Handler): 

109 NAME = "qnap_nas" 

110 

111 PATTERNS = [ 

112 HexString("F5 7B 47 03"), 

113 ] 

114 EXTRACTOR = QnapExtractor() 

115 

116 DOC = HandlerDoc( 

117 name="QNAP NAS", 

118 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.", 

119 handler_type=HandlerType.ARCHIVE, 

120 vendor="QNAP", 

121 references=[], 

122 limitations=[], 

123 ) 

124 

125 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

126 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1) 

127 

128 try: 

129 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

130 stream_scan(scanner, file) 

131 except Exception as e: 

132 logger.debug( 

133 "Error scanning for QNAP patterns", 

134 error=e, 

135 ) 

136 if context.end_offset > 0: 

137 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset) 

138 return None 

139 

140 

141# https://gist.github.com/ulidtko/966277a465f1856109b2d2674dcee741#file-qnap-qts-fw-cryptor-py-L114 

142class Cryptor: 

143 def __init__(self, secret): 

144 self.secret = list(bytes(secret, "ascii")) 

145 self.n = len(secret) // 2 

146 if self.n % 2 == 0: 

147 self.secret.append(0) 

148 self.precompute_k() 

149 self.acc = 0 

150 self.y = 0 

151 self.z = 0 

152 

153 def scan(self, f, xs, s0): 

154 s = s0 

155 for x in xs: 

156 w, s = f(s, x) 

157 yield w 

158 

159 def promote(self, char): 

160 return char if char < 0x80 else char - 0x101 

161 

162 def precompute_k(self): 

163 self.k = {acc: self.table_for_acc(acc) for acc in range(256)} 

164 

165 def table_for_acc(self, a): 

166 ks = [ 

167 0xFFFFFFFF 

168 & ( 

169 (self.promote(self.secret[2 * i] ^ a) << 8) 

170 + (self.secret[2 * i + 1] ^ a) 

171 ) 

172 for i in range(self.n) 

173 ] 

174 

175 def kstep(st, q): 

176 x = st ^ q 

177 y = self.lcg(x) 

178 z = 0xFFFF & (0x15A * x) 

179 return (z, y), y 

180 

181 return list(self.scan(kstep, ks, 0)) 

182 

183 def lcg(self, x): 

184 return 0xFFFF & (0x4E35 * x + 1) 

185 

186 def kdf(self): 

187 """self.secret -> 8bit hash (+ state effects).""" 

188 tt = self.k[self.acc] 

189 res = 0 

190 for i in range(self.n): 

191 yy = self.y 

192 self.y, t2 = tt[i] 

193 self.z = 0xFFFF & (self.y + yy + 0x4E35 * (self.z + i)) 

194 res = res ^ t2 ^ self.z 

195 hi, lo = res >> 8, res & 0xFF 

196 return hi ^ lo 

197 

198 def decrypt_byte(self, v): 

199 k = self.kdf() 

200 r = 0xFF & (v ^ k) 

201 self.acc = self.acc ^ r 

202 return r 

203 

204 def decrypt_chunk(self, chunk): 

205 return bytes(map(self.decrypt_byte, chunk))