Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/qnap/qnap_nas.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1import io 

2 

3import attrs 

4from pyperscan import Flag, Pattern, Scan, StreamDatabase 

5from structlog import get_logger 

6 

7from unblob.file_utils import File, stream_scan 

8from unblob.models import ( 

9 Endian, 

10 Handler, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 StructParser, 

15 ValidChunk, 

16) 

17 

18from ._qnap import C_DEFINITIONS, QnapExtractor 

19 

20logger = get_logger() 

21 

22SECRET = "QNAPNASVERSION" # noqa: S105 

23 

24FOOTER_PATTERN = [ 

25 HexString("69 63 70 6e 61 73"), # encrypted gzip stream start bytes 

26] 

27 

28_STRUCT_PARSER = StructParser(C_DEFINITIONS) 

29 

30 

31@attrs.define 

32class QTSSearchContext: 

33 start_offset: int 

34 file: File 

35 end_offset: int 

36 

37 

38def is_valid_header(header) -> bool: 

39 try: 

40 header.device_id.decode("utf-8") 

41 header.file_version.decode("utf-8") 

42 header.firmware_date.decode("utf-8") 

43 header.revision.decode("utf-8") 

44 except UnicodeDecodeError: 

45 return False 

46 return True 

47 

48 

49def _hyperscan_match( 

50 context: QTSSearchContext, pattern_id: int, offset: int, end: int 

51) -> Scan: 

52 del pattern_id, end # unused arguments 

53 if offset < context.start_offset: 

54 return Scan.Continue 

55 context.file.seek(offset, io.SEEK_SET) 

56 header = _STRUCT_PARSER.parse("qnap_header_t", context.file, Endian.LITTLE) 

57 logger.debug("qnap_header_t", header=header) 

58 

59 if is_valid_header(header): 

60 context.end_offset = context.file.tell() 

61 return Scan.Terminate 

62 return Scan.Continue 

63 

64 

65def build_stream_end_scan_db(pattern_list): 

66 return StreamDatabase( 

67 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

68 ) 

69 

70 

71hyperscan_stream_end_magic_db = build_stream_end_scan_db(FOOTER_PATTERN) 

72 

73 

74class QnapNasExtractor(QnapExtractor): 

75 def _get_secret(self, header) -> str: 

76 return SECRET + header.file_version.decode("utf-8")[0] 

77 

78 

79class QnapHandler(Handler): 

80 NAME = "qnap_nas" 

81 

82 PATTERNS = [ 

83 HexString("F5 7B 47 03"), 

84 ] 

85 EXTRACTOR = QnapNasExtractor() 

86 

87 DOC = HandlerDoc( 

88 name="QNAP NAS", 

89 description="QNAP NAS firmware files consist of a custom header, encrypted data sections, and a footer marking the end of the encrypted stream. The header contains metadata such as device ID, firmware version, and encryption details.", 

90 handler_type=HandlerType.ARCHIVE, 

91 vendor="QNAP", 

92 references=[], 

93 limitations=[], 

94 ) 

95 

96 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

97 context = QTSSearchContext(start_offset=start_offset, file=file, end_offset=-1) 

98 

99 try: 

100 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

101 stream_scan(scanner, file) 

102 except Exception as e: 

103 logger.debug( 

104 "Error scanning for QNAP patterns", 

105 error=e, 

106 ) 

107 if context.end_offset > 0: 

108 return ValidChunk(start_offset=start_offset, end_offset=context.end_offset) 

109 return None