1from __future__ import annotations
2
3import io
4import os
5import stat
6import struct
7from enum import IntEnum, unique
8from pathlib import Path
9
10from structlog import get_logger
11
12from ...file_utils import (
13 Endian,
14 FileSystem,
15 InvalidInputFormat,
16 read_until_past,
17 round_up,
18)
19from ...models import (
20 Extractor,
21 ExtractResult,
22 File,
23 HandlerDoc,
24 HandlerType,
25 HexString,
26 Reference,
27 StructHandler,
28 ValidChunk,
29)
30
31logger = get_logger()
32
33
34STRING_ALIGNMENT = 16
35MAX_LINUX_PATH_LENGTH = 0xFF
36MAX_UINT32 = 0x100000000
37
38
39WORLD_RW = 0o666
40WORLD_RWX = 0o777
41ROMFS_HEADER_SIZE = 512
42ROMFS_SIGNATURE = b"-rom1fs-"
43
44
45@unique
46class FSType(IntEnum):
47 HARD_LINK = 0
48 DIRECTORY = 1
49 FILE = 2
50 SYMLINK = 3
51 BLOCK_DEV = 4
52 CHAR_DEV = 5
53 SOCKET = 6
54 FIFO = 7
55
56
57def valid_checksum(content: bytes) -> bool:
58 """Apply a RomFS checksum and returns whether it's valid or not."""
59 total = 0
60
61 # unalign content will lead to unpacking errors down the line
62 if len(content) % 4 != 0:
63 return False
64
65 for i in range(0, len(content), 4):
66 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32
67 return total == 0
68
69
70def get_string(file: File) -> bytes:
71 """Read a 16 bytes aligned, null terminated string."""
72 filename = b""
73 counter = 0
74 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH:
75 filename += file.read(STRING_ALIGNMENT)
76 counter += STRING_ALIGNMENT
77 return filename.rstrip(b"\x00")
78
79
80class FileHeader:
81 addr: int
82 next_filehdr: int
83 spec_info: int
84 fs_type: FSType
85 executable: bool
86 size: int
87 checksum: int
88 filename: bytes
89 depth: int = -1
90 parent: FileHeader | None = None
91 start_offset: int
92 end_offset: int
93 file: File
94
95 def __init__(self, addr: int, file: File):
96 self.addr = addr
97 fs_typeexec_next = struct.unpack(">L", file.read(4))[0]
98 self.next_filehdr = fs_typeexec_next & ~0b1111
99 self.fs_type = FSType(fs_typeexec_next & 0b0111)
100 self.executable = fs_typeexec_next & 0b1000
101 self.spec_info = struct.unpack(">I", file.read(4))[0]
102 self.size = struct.unpack(">I", file.read(4))[0]
103 self.checksum = struct.unpack(">I", file.read(4))[0]
104 self.filename = get_string(file)
105 self.start_offset = file.tell()
106 self.file = file
107
108 def valid_checksum(self) -> bool:
109 current_position = self.file.tell()
110 try:
111 self.file.seek(self.addr, io.SEEK_SET)
112 filename_len = len(self.filename)
113 header_size = 16 + round_up(filename_len, 16)
114 return valid_checksum(self.file.read(header_size))
115 finally:
116 self.file.seek(current_position, io.SEEK_SET)
117
118 @property
119 def content(self) -> bytes:
120 """Returns the file content. Applicable to files and symlinks."""
121 try:
122 self.file.seek(self.start_offset, io.SEEK_SET)
123 return self.file.read(self.size)
124 finally:
125 self.file.seek(-self.size, io.SEEK_CUR)
126
127 @property
128 def mode(self) -> int:
129 """Permission mode.
130
131 It is assumed to be world readable if executable bit is set,
132 and world executable otherwise. Handle mode for both block
133 device and character devices too.
134 """
135 mode = WORLD_RWX if self.executable else WORLD_RW
136 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0
137 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0
138 return mode
139
140 @property
141 def dev(self) -> int:
142 """Raw device number if block device or character device, zero otherwise."""
143 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
144 major = self.spec_info >> 16
145 minor = self.spec_info & 0xFFFF
146 return os.makedev(major, minor)
147 return 0
148
149 @property
150 def path(self) -> Path:
151 """Returns the full path of this file, up to the RomFS root."""
152 current_node = self
153 current_path = Path()
154 while current_node is not None:
155 current_path = Path(current_node.filename.decode("utf-8")).joinpath(
156 current_path
157 )
158 current_node = current_node.parent
159 return current_path
160
161 def __repr__(self):
162 return (
163 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type},"
164 f" executable:{self.executable}, spec_info:{self.spec_info},"
165 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>"
166 )
167
168
169class RomFSError(Exception):
170 pass
171
172
173class RomFSHeader:
174 signature: bytes
175 full_size: int
176 checksum: int
177 volume_name: bytes
178 eof: int
179 file: File
180 end_offset: int
181 inodes: dict[int, FileHeader]
182 fs: FileSystem
183
184 def __init__(
185 self,
186 file: File,
187 fs: FileSystem,
188 ):
189 self.file = file
190 self.file.seek(0, io.SEEK_END)
191 self.eof = self.file.tell()
192 self.file.seek(0, io.SEEK_SET)
193
194 if self.eof < ROMFS_HEADER_SIZE:
195 raise RomFSError("File too small to hold ROMFS")
196
197 self.signature = self.file.read(8)
198 self.full_size = struct.unpack(">I", self.file.read(4))[0]
199 self.checksum = struct.unpack(">I", self.file.read(4))[0]
200 self.volume_name = get_string(self.file)
201 self.header_end_offset = self.file.tell()
202 self.inodes = {}
203
204 self.fs = fs
205
206 def valid_checksum(self) -> bool:
207 current_position = self.file.tell()
208 try:
209 self.file.seek(0, io.SEEK_SET)
210 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE))
211 finally:
212 self.file.seek(current_position, io.SEEK_SET)
213
214 def validate(self):
215 if self.signature != ROMFS_SIGNATURE:
216 raise RomFSError("Invalid RomFS signature")
217 if self.full_size > self.eof:
218 raise RomFSError("ROMFS size is greater than file size")
219 if not self.valid_checksum():
220 raise RomFSError("Invalid checksum")
221
222 def is_valid_addr(self, addr):
223 """Validate that an inode address is valid.
224
225 Inodes addresses must be 16 bytes aligned and placed within
226 the RomFS on file.
227 """
228 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0)
229
230 def is_recursive(self, addr) -> bool:
231 return addr in self.inodes
232
233 def recursive_walk(self, addr: int, parent: FileHeader | None = None):
234 while self.is_valid_addr(addr) is True:
235 addr = self.walk_dir(addr, parent)
236
237 def walk_dir(self, addr: int, parent: FileHeader | None = None):
238 self.file.seek(addr, io.SEEK_SET)
239 file_header = FileHeader(addr, self.file)
240 file_header.parent = parent
241
242 if not file_header.valid_checksum():
243 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.")
244
245 logger.debug("walking dir", addr=addr, file=file_header)
246
247 if file_header.filename not in [b".", b".."]:
248 if (
249 file_header.fs_type == FSType.DIRECTORY
250 and file_header.spec_info != 0x0
251 and not self.is_recursive(addr)
252 ):
253 self.inodes[addr] = file_header
254 self.recursive_walk(file_header.spec_info, file_header)
255 self.inodes[addr] = file_header
256 return file_header.next_filehdr
257
258 def create_symlink(self, output_path: Path, inode: FileHeader):
259 target_path = Path(inode.content.decode("utf-8"))
260 self.fs.create_symlink(src=target_path, dst=output_path)
261
262 def create_hardlink(self, output_path: Path, inode: FileHeader):
263 if inode.spec_info in self.inodes:
264 target_path = self.inodes[inode.spec_info].path
265 self.fs.create_hardlink(dst=output_path, src=target_path)
266 else:
267 logger.warning("Invalid hard link target", inode_key=inode.spec_info)
268
269 def create_inode(self, inode: FileHeader):
270 output_path = inode.path
271 logger.info("dumping inode", inode=inode, output_path=str(output_path))
272
273 if inode.fs_type == FSType.HARD_LINK:
274 self.create_hardlink(output_path, inode)
275 elif inode.fs_type == FSType.SYMLINK:
276 self.create_symlink(output_path, inode)
277 elif inode.fs_type == FSType.DIRECTORY:
278 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True)
279 elif inode.fs_type == FSType.FILE:
280 self.fs.write_bytes(output_path, inode.content)
281 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
282 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev)
283 elif inode.fs_type == FSType.FIFO:
284 self.fs.mkfifo(output_path, mode=inode.mode)
285
286 def dump_fs(self):
287 def inodes(*inode_types):
288 return sorted(
289 (v for v in self.inodes.values() if v.fs_type in inode_types),
290 key=lambda inode: inode.path,
291 )
292
293 # order of file object creation is important
294 sorted_inodes = (
295 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET)
296 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV)
297 + inodes(FSType.SYMLINK, FSType.HARD_LINK)
298 )
299
300 for inode in sorted_inodes:
301 self.create_inode(inode)
302
303 def __str__(self):
304 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}"
305
306
307class RomfsExtractor(Extractor):
308 def extract(self, inpath: Path, outdir: Path):
309 fs = FileSystem(outdir)
310 with File.from_path(inpath) as f:
311 header = RomFSHeader(f, fs)
312 header.validate()
313 header.recursive_walk(header.header_end_offset, None)
314 header.dump_fs()
315 return ExtractResult(reports=fs.problems)
316
317
318class RomFSFSHandler(StructHandler):
319 NAME = "romfs"
320
321 PATTERNS = [
322 # '-rom1fs-'
323 HexString("2D 72 6F 6D 31 66 73 2d")
324 ]
325
326 C_DEFINITIONS = r"""
327 struct romfs_header {
328 char magic[8];
329 uint32 full_size;
330 uint32 checksum;
331 }
332 """
333 HEADER_STRUCT = "romfs_header"
334 EXTRACTOR = RomfsExtractor()
335
336 DOC = HandlerDoc(
337 name="RomFS",
338 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.",
339 handler_type=HandlerType.FILESYSTEM,
340 vendor=None,
341 references=[
342 Reference(
343 title="RomFS Documentation",
344 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html",
345 ),
346 Reference(
347 title="RomFS Wikipedia",
348 url="https://en.wikipedia.org/wiki/Romfs",
349 ),
350 ],
351 limitations=[],
352 )
353
354 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
355 if not valid_checksum(file.read(512)):
356 raise InvalidInputFormat("Invalid RomFS checksum.")
357
358 file.seek(-512, io.SEEK_CUR)
359
360 # Every multi byte value must be in big endian order.
361 header = self.parse_header(file, Endian.BIG)
362
363 # The zero terminated name of the volume, padded to 16 byte boundary.
364 get_string(file)
365
366 # seek filesystem size (number of accessible bytes in this fs)
367 # from the actual end of the header
368 file.seek(header.full_size, io.SEEK_CUR)
369
370 # Another thing to note is that romfs works on file headers and data
371 # aligned to 16 byte boundaries, but most hardware devices and the block
372 # device drivers are unable to cope with smaller than block-sized data.
373 # To overcome this limitation, the whole size of the file system must be
374 # padded to an 1024 byte boundary.
375 read_until_past(file, b"\x00")
376
377 return ValidChunk(
378 start_offset=start_offset,
379 end_offset=file.tell(),
380 )