1import binascii
2import io
3from pathlib import Path
4from typing import TYPE_CHECKING, Optional, cast
5
6from structlog import get_logger
7
8from unblob.extractor import carve_chunk_to_file
9from unblob.file_utils import Endian, File, InvalidInputFormat, StructParser
10from unblob.models import (
11 Chunk,
12 Extractor,
13 HandlerDoc,
14 HandlerType,
15 HexString,
16 StructHandler,
17 ValidChunk,
18)
19
20if TYPE_CHECKING:
21 from collections.abc import Iterable
22
23logger = get_logger()
24CRC_CONTENT_OFFSET = 12
25
26TRX_V1_C_DEFINITION = r"""
27 typedef struct trx_header {
28 uint32 magic; /* "HDR0" */
29 uint32 len; /* header size + data */
30 uint32 crc32; /* 32-bit CRC from flag_version to end of file */
31 uint16 flags;
32 uint16 version;
33 uint32 offsets[3]; /* Offsets of partitions from start of header */
34 } trx_header_t;
35"""
36
37TRX_V2_C_DEFINITION = r"""
38 typedef struct trx_header {
39 uint32 magic; /* "HDR0" */
40 uint32 len; /* header size + data */
41 uint32 crc32; /* 32-bit CRC from flag_version to end of file */
42 uint16 flags;
43 uint16 version;
44 uint32 offsets[4]; /* Offsets of partitions from start of header */
45 } trx_header_t;
46"""
47
48
49class TRXExtractor(Extractor):
50 def __init__(self, c_definitions: str):
51 self._struct_parser = StructParser(c_definitions)
52
53 def extract(self, inpath: Path, outdir: Path):
54 with File.from_path(inpath) as file:
55 header = self._struct_parser.parse("trx_header_t", file, Endian.LITTLE)
56 file.seek(0, io.SEEK_END)
57 eof = file.tell()
58 offsets = sorted(
59 [
60 offset
61 for offset in [*cast("Iterable", header.offsets), eof]
62 if offset > 0
63 ]
64 )
65 for i, (start_offset, end_offset) in enumerate(zip(offsets, offsets[1:])):
66 chunk = Chunk(start_offset=start_offset, end_offset=end_offset)
67 carve_chunk_to_file(outdir.joinpath(Path(f"part{i}")), file, chunk)
68
69
70class NetgearTRXBase(StructHandler):
71 HEADER_STRUCT = "trx_header_t"
72
73 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
74 header = self.parse_header(file, endian=Endian.LITTLE)
75
76 if not self.is_valid_header(header):
77 raise InvalidInputFormat("Invalid TRX header.")
78
79 if not self._is_crc_valid(file, start_offset, header):
80 raise InvalidInputFormat("Invalid CRC32.")
81
82 return ValidChunk(
83 start_offset=start_offset, end_offset=start_offset + header.len
84 )
85
86 def is_valid_header(self, header) -> bool:
87 return header.len >= len(header)
88
89 def _is_crc_valid(self, file: File, start_offset: int, header) -> bool:
90 file.seek(start_offset + CRC_CONTENT_OFFSET)
91 content = bytearray(file.read(header.len - CRC_CONTENT_OFFSET))
92 computed_crc = (binascii.crc32(content) ^ -1) & 0xFFFFFFFF
93 return header.crc32 == computed_crc
94
95
96class NetgearTRXv1Handler(NetgearTRXBase):
97 NAME = "trx_v1"
98 PATTERNS = [
99 HexString(
100 """
101 // 00000000: 4844 5230 0010 0000 33b9 d625 0000 0100 HDR0....3..%....
102 // 00000010: 1c00 0000 3000 0000 4400 0000 7361 6d70 ....0...D...
103 48 44 52 30 [10] 01 00
104 """
105 ),
106 ]
107 C_DEFINITIONS = TRX_V1_C_DEFINITION
108 EXTRACTOR = TRXExtractor(TRX_V1_C_DEFINITION)
109
110 DOC = HandlerDoc(
111 name="Netgear TRX v1",
112 description="Netgear TRX v1 firmware format includes a custom header with partition offsets and a CRC32 checksum for integrity verification. It supports up to three partitions defined in the header.",
113 handler_type=HandlerType.ARCHIVE,
114 vendor="Netgear",
115 references=[],
116 limitations=[],
117 )
118
119
120class NetgearTRXv2Handler(NetgearTRXBase):
121 NAME = "trx_v2"
122 PATTERNS = [
123 HexString(
124 """
125 // 00000000: 4844 5230 0010 0000 0a3d 4b05 0000 0200 HDR0.....=K.....
126 // 00000010: 2000 0000 3400 ffff ffff ffff ffff 0000 ...4...........
127 48 44 52 30 [10] 02 00
128 """
129 ),
130 ]
131 C_DEFINITIONS = TRX_V2_C_DEFINITION
132 EXTRACTOR = TRXExtractor(TRX_V2_C_DEFINITION)
133
134 DOC = HandlerDoc(
135 name="Netgear TRX v2",
136 description="Netgear TRX v2 firmware format includes a custom header with partition offsets and a CRC32 checksum for integrity verification. It supports up to four partitions defined in the header.",
137 handler_type=HandlerType.ARCHIVE,
138 vendor="Netgear",
139 references=[],
140 limitations=[],
141 )