Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/moxa/frm.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1from __future__ import annotations 

2 

3import gzip 

4import io 

5from enum import IntEnum 

6from pathlib import Path 

7 

8from unblob.file_utils import Endian, File, FileSystem, InvalidInputFormat, StructParser 

9from unblob.models import ( 

10 Extractor, 

11 HandlerDoc, 

12 HandlerType, 

13 HexString, 

14 StructHandler, 

15 ValidChunk, 

16) 

17 

18C_DEFINITIONS = r""" 

19 typedef struct frm_container_header { 

20 char magic[4]; /* *FRM */ 

21 uint32 unknown1; /* version maybe, only 00 00 00 01 observed */ 

22 uint32 total_length; /* the size of the whole container (should equal the sum of the section sizes + header_length) */ 

23 uint16 header_length; /* only 60 00 observed -> size 0x60 = 96 */ 

24 uint16 section_count; /* only 02 00 observed -> 2 section entries (webserver FS and FW binary) */ 

25 uint8 unknown2[48]; /* optional unknown entries; empty in some samples */ 

26 } frm_container_header_t; 

27 

28 typedef struct frm_section_entry { 

29 uint32 type; /* probably type; values 1 (FW binary) and 2 (webserver FS) observed */ 

30 uint32 offset; 

31 uint32 length; 

32 uint32 unknown; 

33 } frm_section_entry_t; 

34 

35 typedef struct frm_fs_header { 

36 char device_name[32]; 

37 uint8 unknown1[4]; /* version maybe? */ 

38 uint32 timestamp; /* creation time (UNIX timestamp) */ 

39 uint32 unknown2; 

40 uint32 unknown3; /* maybe checksum? */ 

41 uint32 file_table_offset; /* offset of the file table; only 0x100 observed -> usually starts at 0x160 */ 

42 uint32 file_table_length; /* size of the file table (= file_header_length * file_count) */ 

43 uint16 file_header_length; /* size of each entry; only 0x40 (64) observed */ 

44 uint16 file_count; /* the number of table entries (files) */ 

45 uint32 data_length; /* the length of the file content data */ 

46 uint16 unknown4; 

47 uint16 file_count2; /* for some devices (e.g. Nport 5600) this field contains the file count and file_count is 0 */ 

48 uint32 file_table_length2; 

49 uint32 unknown5; 

50 } frm_fs_header_t; 

51 

52 typedef struct frm_file_header { 

53 char name[48]; 

54 uint8 unknown[8]; /* could be the creation time */ 

55 uint32 file_length; 

56 uint32 data_offset; 

57 } frm_file_header_t; 

58""" 

59 

60MAGIC = b"*FRM" 

61GZIP_MAGIC = bytes.fromhex("1f 8b") 

62 

63 

64class SectionTypes(IntEnum): 

65 FW_BINARY = 1 

66 FILESYSTEM = 2 

67 

68 

69class MoxaFRMExtractor(Extractor): 

70 def __init__(self): 

71 self._struct_parser = StructParser(C_DEFINITIONS) 

72 

73 def extract(self, inpath: Path, outdir: Path): 

74 fs = FileSystem(outdir) 

75 with File.from_path(inpath) as file: 

76 header = self._struct_parser.parse( 

77 "frm_container_header_t", file, Endian.LITTLE 

78 ) 

79 sections = self._parse_section_table(file, header.section_count) 

80 for section in sections: 

81 self._extract_section(section, file, fs) 

82 

83 def _parse_section_table(self, file: File, section_count: int): 

84 return [ 

85 self._struct_parser.parse("frm_section_entry_t", file, Endian.LITTLE) 

86 for _ in range(section_count) 

87 ] 

88 

89 def _extract_section(self, section, file: File, fs: FileSystem): 

90 match section.type: 

91 case SectionTypes.FW_BINARY: 

92 fs.carve(Path("firmware.bin"), file, section.offset, section.length) 

93 case SectionTypes.FILESYSTEM: 

94 self._extract_fs_section(file, fs, section.offset) 

95 case _: 

96 raise InvalidInputFormat(f"Unknown section type: {section.type}") 

97 

98 def _extract_fs_section(self, file: File, fs: FileSystem, section_offset: int): 

99 file.seek(section_offset, io.SEEK_SET) 

100 fs_header = self._struct_parser.parse("frm_fs_header_t", file, Endian.LITTLE) 

101 

102 file_table_offset = section_offset + fs_header.file_table_offset 

103 for index in range(fs_header.file_count or fs_header.file_count2): 

104 entry_offset = file_table_offset + index * fs_header.file_header_length 

105 file.seek(entry_offset, io.SEEK_SET) 

106 entry = self._struct_parser.parse("frm_file_header_t", file, Endian.LITTLE) 

107 

108 name = bytes(entry.name).rstrip(b"\x00") 

109 if not name: 

110 continue 

111 

112 file.seek(section_offset + entry.data_offset, io.SEEK_SET) 

113 raw = file.read(entry.file_length) 

114 self._write_file(fs, Path(name.decode("ascii", errors="replace")), raw) 

115 

116 @staticmethod 

117 def _write_file(fs: FileSystem, file_path: Path, file_contents: bytes): 

118 # some (but usually not all) files are GZIP compressed 

119 if file_contents[:2] == GZIP_MAGIC: 

120 file_contents = gzip.decompress(file_contents) 

121 fs.write_bytes(file_path, file_contents) 

122 

123 

124class MoxaFRMHandler(StructHandler): 

125 NAME = "moxa_frm" 

126 

127 PATTERNS = [HexString("2A 46 52 4D 00 00 00 01 [4] 60 00 02 00")] 

128 

129 DOC = HandlerDoc( 

130 name="Moxa FRM", 

131 description=( 

132 "Firmware container format used in Moxa firmware (e.g. NPort, MGate and MiiNePort devices)." 

133 ), 

134 handler_type=HandlerType.ARCHIVE, 

135 vendor="Moxa", 

136 references=[], 

137 limitations=[], 

138 ) 

139 

140 C_DEFINITIONS = C_DEFINITIONS 

141 HEADER_STRUCT = "frm_container_header_t" 

142 EXTRACTOR = MoxaFRMExtractor() 

143 

144 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

145 header = self.parse_header(file) 

146 return ValidChunk( 

147 start_offset=start_offset, end_offset=start_offset + header.total_length 

148 )