1from __future__ import annotations
2
3import io
4from dataclasses import dataclass
5from enum import IntEnum
6from pathlib import Path
7from struct import Struct
8from typing import TYPE_CHECKING
9
10from unblob.file_utils import Endian, FileSystem, InvalidInputFormat, StructParser
11from unblob.models import (
12 Extractor,
13 File,
14 Handler,
15 HandlerDoc,
16 HandlerType,
17 HexString,
18 ValidChunk,
19)
20
21if TYPE_CHECKING:
22 from collections.abc import Iterable
23
24
25C_DEFINITIONS = r"""
26 typedef struct cpkg_header {
27 char magic[4]; // "CPKG"
28 uint32 unknown; // 0x01_00_00_00 in all samples (maybe version)
29 uint32 first_entry_offset;
30 } cpkg_header_t;
31
32 typedef struct fpkg_header {
33 char magic[4]; // "FPKG"
34 uint32 unknown; // 0x01_00_00_00 in all samples
35 uint32 first_entry_offset;
36 uint32 unknown2; // only 0x00_00_00_01 observed
37 uint32 name_len; // length of model name field
38 char name[name_len];
39 } fpkg_header_t;
40
41 typedef struct file_header {
42 uint32 header_len; // only 0x1C (28) observed
43 uint16 type; // probably file type
44 uint16 unknown; // observed values: 0x0 and 0x6874
45 uint32 file_size; // size of file data
46 char filename[];
47 } file_header_t;
48"""
49CPKG_HEADER = "cpkg_header_t"
50FILE_HEADER = "file_header_t"
51CPKG_HEADER_SIZE = 12
52FILE_HEADER_SIZE = 0x1C
53VALID_MAGICS = {b"FPKG", b"CPKG"}
54
55
56@dataclass
57class FileHeader(Struct):
58 header_len: int
59 type: int
60 start_offset: int
61 file_size: int
62 filename: str
63 total_size: int
64
65
66class FileType(IntEnum):
67 REGULAR_FILE = 0x100
68 UNKNOWN = 0x101 # NOTE: not observed in the wild, but makes sense in sequence
69 CHECKSUM = 0x102
70 SIGNATURE = 0x103
71
72
73class FPKGParser:
74 def __init__(self, file: File, start_offset: int = 0):
75 self.file = file
76 self.start_offset = start_offset
77 self.struct_parser = StructParser(C_DEFINITIONS)
78 self.header = self.struct_parser.parse(CPKG_HEADER, file, Endian.BIG)
79 self._validate_header(self.header)
80 self.file_data_offset = start_offset + self.header.first_entry_offset
81
82 def iter_entries(self) -> Iterable[FileHeader]:
83 current_offset = self.file_data_offset
84 while current_offset < self.file.size():
85 self.file.seek(current_offset, io.SEEK_SET)
86 entry = self.struct_parser.parse(FILE_HEADER, self.file, Endian.BIG)
87 self._validate_file_header(entry)
88 yield FileHeader(
89 header_len=entry.header_len,
90 file_size=entry.file_size,
91 start_offset=current_offset + entry.header_len,
92 type=entry.type,
93 filename=entry.filename.rstrip().decode("utf-8", errors="replace"),
94 total_size=entry.file_size + entry.header_len,
95 )
96 current_offset += entry.header_len + entry.file_size
97
98 @staticmethod
99 def _validate_file_header(file_header) -> None:
100 if file_header.header_len != FILE_HEADER_SIZE:
101 raise InvalidInputFormat(
102 f"Invalid file header length: {file_header.header_len}"
103 )
104 if not file_header.filename.isascii():
105 raise InvalidInputFormat(f"Invalid filename: {file_header.filename}")
106 try:
107 FileType(file_header.type)
108 except ValueError as e:
109 raise InvalidInputFormat(f"Invalid file type: {file_header.type}") from e
110
111 @staticmethod
112 def _validate_header(header) -> None:
113 if header.magic not in VALID_MAGICS:
114 raise InvalidInputFormat("Invalid magic")
115 if header.first_entry_offset < CPKG_HEADER_SIZE:
116 raise InvalidInputFormat(
117 f"Invalid first entry offset: {header.first_entry_offset}"
118 )
119
120
121class FPKGExtractor(Extractor):
122 def extract(self, inpath: Path, outdir: Path):
123 fs = FileSystem(outdir)
124 with File.from_path(inpath) as file:
125 parser = FPKGParser(file)
126 for entry in parser.iter_entries():
127 fs.carve(
128 Path(entry.filename), file, entry.start_offset, entry.file_size
129 )
130
131
132class FPKGHandler(Handler):
133 NAME = "fpkg"
134 PATTERNS = [HexString("(43 | 46) 50 4B 47 01 00 00 00")] # (C | F) P K G
135 # FPKG and CPKG headers are compatible if we only care about the "first_entry_offset"
136 EXTRACTOR = FPKGExtractor()
137 DOC = HandlerDoc(
138 name="D-Link FPKG",
139 description="CPKG and FPKG are archive formats used in D-Link DFL firewall firmware",
140 handler_type=HandlerType.ARCHIVE,
141 vendor="D-Link",
142 references=[],
143 limitations=[],
144 )
145
146 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
147 parser = FPKGParser(file, start_offset)
148 entries = list(parser.iter_entries())
149 if not entries:
150 raise InvalidInputFormat("No valid entries found")
151
152 end_offset = parser.file_data_offset + sum(e.total_size for e in entries)
153 return ValidChunk(start_offset=start_offset, end_offset=end_offset)