1from __future__ import annotations
2
3import gzip
4import io
5from pathlib import Path, PureWindowsPath
6
7from unblob.file_utils import Endian, File, FileSystem, InvalidInputFormat, StructParser
8from unblob.models import (
9 Extractor,
10 HandlerDoc,
11 HandlerType,
12 HexString,
13 StructHandler,
14 ValidChunk,
15)
16
17C_DEFINITIONS = r"""
18 typedef struct deafbead_header {
19 uint32 magic; /* "DE AF BE AD" */
20 } deafbead_header_t;
21
22 typedef struct deafbead_dir {
23 uint8 magic; /* "86" */
24 uint16 name_len;
25 char name[name_len];
26 } deafbead_dir_t;
27
28 typedef struct deafbead_file {
29 uint8 magic; /* "87" */
30 uint16 name_len;
31 char name[name_len];
32 uint32 file_size;
33 char file_contents[file_size];
34 } deafbead_file_t;
35"""
36DIR_MAGIC = b"\x86"
37FILE_MAGIC = b"\x87"
38VALID_MAGICS = {DIR_MAGIC, FILE_MAGIC}
39HEADER_LEN = 4
40
41
42class DeafBeadExtractor(Extractor):
43 def __init__(self):
44 self._struct_parser = StructParser(C_DEFINITIONS)
45
46 def extract(self, inpath: Path, outdir: Path):
47 fs = FileSystem(outdir)
48 with File.from_path(inpath) as file:
49 file.seek(HEADER_LEN)
50 while (magic := file.read(1)) in VALID_MAGICS:
51 file.seek(-1, io.SEEK_CUR) # go back to read the full struct
52 if magic == DIR_MAGIC:
53 self._handle_dir(file, fs)
54 elif magic == FILE_MAGIC:
55 self._handle_file(file, fs)
56
57 def _handle_dir(self, file: File, fs: FileSystem):
58 dir_header = self._struct_parser.parse("deafbead_dir_t", file, Endian.LITTLE)
59 fs.mkdir(self._convert_path(dir_header.name))
60
61 def _handle_file(self, file: File, fs: FileSystem):
62 file_header = self._struct_parser.parse("deafbead_file_t", file, Endian.LITTLE)
63 try:
64 decompressed = gzip.decompress(file_header.file_contents)
65 fs.write_bytes(self._convert_path(file_header.name), decompressed)
66 except gzip.BadGzipFile as error:
67 raise InvalidInputFormat("Invalid GZIP file") from error
68
69 @staticmethod
70 def _convert_path(path_entry: bytes) -> Path:
71 decoded_path = path_entry.decode("utf-8", errors="replace")
72 if "\\" in decoded_path: # windows path => convert slashes
73 return Path(PureWindowsPath(decoded_path).as_posix())
74 return Path(decoded_path)
75
76
77class DeafBeadHandler(StructHandler):
78 NAME = "deafbead"
79 PATTERNS = [HexString("DE AF BE AD (86 | 87)")]
80
81 DOC = HandlerDoc(
82 name="D-Link DEAFBEAD",
83 description="Archive files as found in D-Link DSL-500G and DSL-504G firmware images.",
84 handler_type=HandlerType.ARCHIVE,
85 vendor="D-Link",
86 references=[],
87 limitations=[],
88 )
89
90 C_DEFINITIONS = C_DEFINITIONS
91 HEADER_STRUCT = "deafbead_header_t"
92 EXTRACTOR = DeafBeadExtractor()
93
94 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
95 file.seek(start_offset + HEADER_LEN)
96 while (magic := file.read(1)) in VALID_MAGICS:
97 file.seek(-1, io.SEEK_CUR)
98 if magic == DIR_MAGIC:
99 header = self.cparser_le.deafbead_dir_t(file)
100 if header.name_len == 0:
101 raise InvalidInputFormat("Invalid directory header.")
102 else:
103 header = self.cparser_le.deafbead_file_t(file)
104 if header.name_len == 0 or header.file_size == 0:
105 raise InvalidInputFormat("Invalid file header.")
106
107 end_offset = file.tell()
108 if magic: # if EOF wasn't reached (i.e. magic is not empty), we need to undo the last read
109 end_offset -= 1
110 return ValidChunk(start_offset=start_offset, end_offset=end_offset)