1from __future__ import annotations
2
3import gzip
4import io
5from enum import IntEnum
6from pathlib import Path
7
8from unblob.file_utils import Endian, File, FileSystem, InvalidInputFormat, StructParser
9from unblob.models import (
10 Extractor,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 StructHandler,
15 ValidChunk,
16)
17
18C_DEFINITIONS = r"""
19 typedef struct frm_container_header {
20 char magic[4]; /* *FRM */
21 uint32 unknown1; /* version maybe, only 00 00 00 01 observed */
22 uint32 total_length; /* the size of the whole container (should equal the sum of the section sizes + header_length) */
23 uint16 header_length; /* only 60 00 observed -> size 0x60 = 96 */
24 uint16 section_count; /* only 02 00 observed -> 2 section entries (webserver FS and FW binary) */
25 uint8 unknown2[48]; /* optional unknown entries; empty in some samples */
26 } frm_container_header_t;
27
28 typedef struct frm_section_entry {
29 uint32 type; /* probably type; values 1 (FW binary) and 2 (webserver FS) observed */
30 uint32 offset;
31 uint32 length;
32 uint32 unknown;
33 } frm_section_entry_t;
34
35 typedef struct frm_fs_header {
36 char device_name[32];
37 uint8 unknown1[4]; /* version maybe? */
38 uint32 timestamp; /* creation time (UNIX timestamp) */
39 uint32 unknown2;
40 uint32 unknown3; /* maybe checksum? */
41 uint32 file_table_offset; /* offset of the file table; only 0x100 observed -> usually starts at 0x160 */
42 uint32 file_table_length; /* size of the file table (= file_header_length * file_count) */
43 uint16 file_header_length; /* size of each entry; only 0x40 (64) observed */
44 uint16 file_count; /* the number of table entries (files) */
45 uint32 data_length; /* the length of the file content data */
46 uint16 unknown4;
47 uint16 file_count2; /* for some devices (e.g. Nport 5600) this field contains the file count and file_count is 0 */
48 uint32 file_table_length2;
49 uint32 unknown5;
50 } frm_fs_header_t;
51
52 typedef struct frm_file_header {
53 char name[48];
54 uint8 unknown[8]; /* could be the creation time */
55 uint32 file_length;
56 uint32 data_offset;
57 } frm_file_header_t;
58"""
59
60MAGIC = b"*FRM"
61GZIP_MAGIC = bytes.fromhex("1f 8b")
62
63
64class SectionTypes(IntEnum):
65 FW_BINARY = 1
66 FILESYSTEM = 2
67
68
69class MoxaFRMExtractor(Extractor):
70 def __init__(self):
71 self._struct_parser = StructParser(C_DEFINITIONS)
72
73 def extract(self, inpath: Path, outdir: Path):
74 fs = FileSystem(outdir)
75 with File.from_path(inpath) as file:
76 header = self._struct_parser.parse(
77 "frm_container_header_t", file, Endian.LITTLE
78 )
79 sections = self._parse_section_table(file, header.section_count)
80 for section in sections:
81 self._extract_section(section, file, fs)
82
83 def _parse_section_table(self, file: File, section_count: int):
84 return [
85 self._struct_parser.parse("frm_section_entry_t", file, Endian.LITTLE)
86 for _ in range(section_count)
87 ]
88
89 def _extract_section(self, section, file: File, fs: FileSystem):
90 match section.type:
91 case SectionTypes.FW_BINARY:
92 fs.carve(Path("firmware.bin"), file, section.offset, section.length)
93 case SectionTypes.FILESYSTEM:
94 self._extract_fs_section(file, fs, section.offset)
95 case _:
96 raise InvalidInputFormat(f"Unknown section type: {section.type}")
97
98 def _extract_fs_section(self, file: File, fs: FileSystem, section_offset: int):
99 file.seek(section_offset, io.SEEK_SET)
100 fs_header = self._struct_parser.parse("frm_fs_header_t", file, Endian.LITTLE)
101
102 file_table_offset = section_offset + fs_header.file_table_offset
103 for index in range(fs_header.file_count or fs_header.file_count2):
104 entry_offset = file_table_offset + index * fs_header.file_header_length
105 file.seek(entry_offset, io.SEEK_SET)
106 entry = self._struct_parser.parse("frm_file_header_t", file, Endian.LITTLE)
107
108 name = bytes(entry.name).rstrip(b"\x00")
109 if not name:
110 continue
111
112 file.seek(section_offset + entry.data_offset, io.SEEK_SET)
113 raw = file.read(entry.file_length)
114 self._write_file(fs, Path(name.decode("ascii", errors="replace")), raw)
115
116 @staticmethod
117 def _write_file(fs: FileSystem, file_path: Path, file_contents: bytes):
118 # some (but usually not all) files are GZIP compressed
119 if file_contents[:2] == GZIP_MAGIC:
120 file_contents = gzip.decompress(file_contents)
121 fs.write_bytes(file_path, file_contents)
122
123
124class MoxaFRMHandler(StructHandler):
125 NAME = "moxa_frm"
126
127 PATTERNS = [HexString("2A 46 52 4D 00 00 00 01 [4] 60 00 02 00")]
128
129 DOC = HandlerDoc(
130 name="Moxa FRM",
131 description=(
132 "Firmware container format used in Moxa firmware (e.g. NPort, MGate and MiiNePort devices)."
133 ),
134 handler_type=HandlerType.ARCHIVE,
135 vendor="Moxa",
136 references=[],
137 limitations=[],
138 )
139
140 C_DEFINITIONS = C_DEFINITIONS
141 HEADER_STRUCT = "frm_container_header_t"
142 EXTRACTOR = MoxaFRMExtractor()
143
144 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
145 header = self.parse_header(file)
146 return ValidChunk(
147 start_offset=start_offset, end_offset=start_offset + header.total_length
148 )