1import io
2import os
3import stat
4import struct
5from enum import IntEnum, unique
6from pathlib import Path
7from typing import Optional
8
9from structlog import get_logger
10
11from ...file_utils import (
12 Endian,
13 FileSystem,
14 InvalidInputFormat,
15 read_until_past,
16 round_up,
17)
18from ...models import (
19 Extractor,
20 ExtractResult,
21 File,
22 HandlerDoc,
23 HandlerType,
24 HexString,
25 Reference,
26 StructHandler,
27 ValidChunk,
28)
29
30logger = get_logger()
31
32
33STRING_ALIGNMENT = 16
34MAX_LINUX_PATH_LENGTH = 0xFF
35MAX_UINT32 = 0x100000000
36
37
38WORLD_RW = 0o666
39WORLD_RWX = 0o777
40ROMFS_HEADER_SIZE = 512
41ROMFS_SIGNATURE = b"-rom1fs-"
42
43
44@unique
45class FSType(IntEnum):
46 HARD_LINK = 0
47 DIRECTORY = 1
48 FILE = 2
49 SYMLINK = 3
50 BLOCK_DEV = 4
51 CHAR_DEV = 5
52 SOCKET = 6
53 FIFO = 7
54
55
56def valid_checksum(content: bytes) -> bool:
57 """Apply a RomFS checksum and returns whether it's valid or not."""
58 total = 0
59
60 # unalign content will lead to unpacking errors down the line
61 if len(content) % 4 != 0:
62 return False
63
64 for i in range(0, len(content), 4):
65 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32
66 return total == 0
67
68
69def get_string(file: File) -> bytes:
70 """Read a 16 bytes aligned, null terminated string."""
71 filename = b""
72 counter = 0
73 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH:
74 filename += file.read(STRING_ALIGNMENT)
75 counter += STRING_ALIGNMENT
76 return filename.rstrip(b"\x00")
77
78
79class FileHeader:
80 addr: int
81 next_filehdr: int
82 spec_info: int
83 fs_type: FSType
84 executable: bool
85 size: int
86 checksum: int
87 filename: bytes
88 depth: int = -1
89 parent: Optional["FileHeader"] = None
90 start_offset: int
91 end_offset: int
92 file: File
93
94 def __init__(self, addr: int, file: File):
95 self.addr = addr
96 fs_typeexec_next = struct.unpack(">L", file.read(4))[0]
97 self.next_filehdr = fs_typeexec_next & ~0b1111
98 self.fs_type = FSType(fs_typeexec_next & 0b0111)
99 self.executable = fs_typeexec_next & 0b1000
100 self.spec_info = struct.unpack(">I", file.read(4))[0]
101 self.size = struct.unpack(">I", file.read(4))[0]
102 self.checksum = struct.unpack(">I", file.read(4))[0]
103 self.filename = get_string(file)
104 self.start_offset = file.tell()
105 self.file = file
106
107 def valid_checksum(self) -> bool:
108 current_position = self.file.tell()
109 try:
110 self.file.seek(self.addr, io.SEEK_SET)
111 filename_len = len(self.filename)
112 header_size = 16 + round_up(filename_len, 16)
113 return valid_checksum(self.file.read(header_size))
114 finally:
115 self.file.seek(current_position, io.SEEK_SET)
116
117 @property
118 def content(self) -> bytes:
119 """Returns the file content. Applicable to files and symlinks."""
120 try:
121 self.file.seek(self.start_offset, io.SEEK_SET)
122 return self.file.read(self.size)
123 finally:
124 self.file.seek(-self.size, io.SEEK_CUR)
125
126 @property
127 def mode(self) -> int:
128 """Permission mode.
129
130 It is assumed to be world readable if executable bit is set,
131 and world executable otherwise. Handle mode for both block
132 device and character devices too.
133 """
134 mode = WORLD_RWX if self.executable else WORLD_RW
135 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0
136 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0
137 return mode
138
139 @property
140 def dev(self) -> int:
141 """Raw device number if block device or character device, zero otherwise."""
142 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
143 major = self.spec_info >> 16
144 minor = self.spec_info & 0xFFFF
145 return os.makedev(major, minor)
146 return 0
147
148 @property
149 def path(self) -> Path:
150 """Returns the full path of this file, up to the RomFS root."""
151 current_node = self
152 current_path = Path()
153 while current_node is not None:
154 current_path = Path(current_node.filename.decode("utf-8")).joinpath(
155 current_path
156 )
157 current_node = current_node.parent
158 return current_path
159
160 def __repr__(self):
161 return (
162 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type},"
163 f" executable:{self.executable}, spec_info:{self.spec_info},"
164 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>"
165 )
166
167
168class RomFSError(Exception):
169 pass
170
171
172class RomFSHeader:
173 signature: bytes
174 full_size: int
175 checksum: int
176 volume_name: bytes
177 eof: int
178 file: File
179 end_offset: int
180 inodes: dict[int, "FileHeader"]
181 fs: FileSystem
182
183 def __init__(
184 self,
185 file: File,
186 fs: FileSystem,
187 ):
188 self.file = file
189 self.file.seek(0, io.SEEK_END)
190 self.eof = self.file.tell()
191 self.file.seek(0, io.SEEK_SET)
192
193 if self.eof < ROMFS_HEADER_SIZE:
194 raise RomFSError("File too small to hold ROMFS")
195
196 self.signature = self.file.read(8)
197 self.full_size = struct.unpack(">I", self.file.read(4))[0]
198 self.checksum = struct.unpack(">I", self.file.read(4))[0]
199 self.volume_name = get_string(self.file)
200 self.header_end_offset = self.file.tell()
201 self.inodes = {}
202
203 self.fs = fs
204
205 def valid_checksum(self) -> bool:
206 current_position = self.file.tell()
207 try:
208 self.file.seek(0, io.SEEK_SET)
209 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE))
210 finally:
211 self.file.seek(current_position, io.SEEK_SET)
212
213 def validate(self):
214 if self.signature != ROMFS_SIGNATURE:
215 raise RomFSError("Invalid RomFS signature")
216 if self.full_size > self.eof:
217 raise RomFSError("ROMFS size is greater than file size")
218 if not self.valid_checksum():
219 raise RomFSError("Invalid checksum")
220
221 def is_valid_addr(self, addr):
222 """Validate that an inode address is valid.
223
224 Inodes addresses must be 16 bytes aligned and placed within
225 the RomFS on file.
226 """
227 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0)
228
229 def is_recursive(self, addr) -> bool:
230 return addr in self.inodes
231
232 def recursive_walk(self, addr: int, parent: Optional[FileHeader] = None):
233 while self.is_valid_addr(addr) is True:
234 addr = self.walk_dir(addr, parent)
235
236 def walk_dir(self, addr: int, parent: Optional[FileHeader] = None):
237 self.file.seek(addr, io.SEEK_SET)
238 file_header = FileHeader(addr, self.file)
239 file_header.parent = parent
240
241 if not file_header.valid_checksum():
242 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.")
243
244 logger.debug("walking dir", addr=addr, file=file_header)
245
246 if file_header.filename not in [b".", b".."]:
247 if (
248 file_header.fs_type == FSType.DIRECTORY
249 and file_header.spec_info != 0x0
250 and not self.is_recursive(addr)
251 ):
252 self.inodes[addr] = file_header
253 self.recursive_walk(file_header.spec_info, file_header)
254 self.inodes[addr] = file_header
255 return file_header.next_filehdr
256
257 def create_symlink(self, output_path: Path, inode: FileHeader):
258 target_path = Path(inode.content.decode("utf-8"))
259 self.fs.create_symlink(src=target_path, dst=output_path)
260
261 def create_hardlink(self, output_path: Path, inode: FileHeader):
262 if inode.spec_info in self.inodes:
263 target_path = self.inodes[inode.spec_info].path
264 self.fs.create_hardlink(dst=output_path, src=target_path)
265 else:
266 logger.warning("Invalid hard link target", inode_key=inode.spec_info)
267
268 def create_inode(self, inode: FileHeader):
269 output_path = inode.path
270 logger.info("dumping inode", inode=inode, output_path=str(output_path))
271
272 if inode.fs_type == FSType.HARD_LINK:
273 self.create_hardlink(output_path, inode)
274 elif inode.fs_type == FSType.SYMLINK:
275 self.create_symlink(output_path, inode)
276 elif inode.fs_type == FSType.DIRECTORY:
277 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True)
278 elif inode.fs_type == FSType.FILE:
279 self.fs.write_bytes(output_path, inode.content)
280 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
281 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev)
282 elif inode.fs_type == FSType.FIFO:
283 self.fs.mkfifo(output_path, mode=inode.mode)
284
285 def dump_fs(self):
286 def inodes(*inode_types):
287 return sorted(
288 (v for v in self.inodes.values() if v.fs_type in inode_types),
289 key=lambda inode: inode.path,
290 )
291
292 # order of file object creation is important
293 sorted_inodes = (
294 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET)
295 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV)
296 + inodes(FSType.SYMLINK, FSType.HARD_LINK)
297 )
298
299 for inode in sorted_inodes:
300 self.create_inode(inode)
301
302 def __str__(self):
303 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}"
304
305
306class RomfsExtractor(Extractor):
307 def extract(self, inpath: Path, outdir: Path):
308 fs = FileSystem(outdir)
309 with File.from_path(inpath) as f:
310 header = RomFSHeader(f, fs)
311 header.validate()
312 header.recursive_walk(header.header_end_offset, None)
313 header.dump_fs()
314 return ExtractResult(reports=fs.problems)
315
316
317class RomFSFSHandler(StructHandler):
318 NAME = "romfs"
319
320 PATTERNS = [
321 # '-rom1fs-'
322 HexString("2D 72 6F 6D 31 66 73 2d")
323 ]
324
325 C_DEFINITIONS = r"""
326 struct romfs_header {
327 char magic[8];
328 uint32 full_size;
329 uint32 checksum;
330 }
331 """
332 HEADER_STRUCT = "romfs_header"
333 EXTRACTOR = RomfsExtractor()
334
335 DOC = HandlerDoc(
336 name="RomFS",
337 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.",
338 handler_type=HandlerType.FILESYSTEM,
339 vendor=None,
340 references=[
341 Reference(
342 title="RomFS Documentation",
343 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html",
344 ),
345 Reference(
346 title="RomFS Wikipedia",
347 url="https://en.wikipedia.org/wiki/Romfs",
348 ),
349 ],
350 limitations=[],
351 )
352
353 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
354 if not valid_checksum(file.read(512)):
355 raise InvalidInputFormat("Invalid RomFS checksum.")
356
357 file.seek(-512, io.SEEK_CUR)
358
359 # Every multi byte value must be in big endian order.
360 header = self.parse_header(file, Endian.BIG)
361
362 # The zero terminated name of the volume, padded to 16 byte boundary.
363 get_string(file)
364
365 # seek filesystem size (number of accessible bytes in this fs)
366 # from the actual end of the header
367 file.seek(header.full_size, io.SEEK_CUR)
368
369 # Another thing to note is that romfs works on file headers and data
370 # aligned to 16 byte boundaries, but most hardware devices and the block
371 # device drivers are unable to cope with smaller than block-sized data.
372 # To overcome this limitation, the whole size of the file system must be
373 # padded to an 1024 byte boundary.
374 read_until_past(file, b"\x00")
375
376 return ValidChunk(
377 start_offset=start_offset,
378 end_offset=file.tell(),
379 )