1from __future__ import annotations
2
3import io
4import os
5import stat
6import struct
7from enum import IntEnum, unique
8from pathlib import Path
9
10from structlog import get_logger
11
12from ...file_utils import (
13 Endian,
14 FileSystem,
15 InvalidInputFormat,
16 read_until_past,
17 round_up,
18)
19from ...models import (
20 Extractor,
21 ExtractResult,
22 File,
23 HandlerDoc,
24 HandlerType,
25 HexString,
26 Reference,
27 StructHandler,
28 ValidChunk,
29)
30
31logger = get_logger()
32
33
34STRING_ALIGNMENT = 16
35MAX_LINUX_PATH_LENGTH = 0xFF
36MAX_UINT32 = 0x100000000
37
38
39WORLD_RW = 0o666
40WORLD_RWX = 0o777
41ROMFS_HEADER_SIZE = 512
42ROMFS_SIGNATURE = b"-rom1fs-"
43
44
45@unique
46class FSType(IntEnum):
47 HARD_LINK = 0
48 DIRECTORY = 1
49 FILE = 2
50 SYMLINK = 3
51 BLOCK_DEV = 4
52 CHAR_DEV = 5
53 SOCKET = 6
54 FIFO = 7
55
56
57def valid_checksum(content: bytes) -> bool:
58 """Apply a RomFS checksum and returns whether it's valid or not."""
59 total = 0
60
61 # unalign content will lead to unpacking errors down the line
62 if len(content) % 4 != 0:
63 return False
64
65 for i in range(0, len(content), 4):
66 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32
67 return total == 0
68
69
70def get_string(file: File) -> bytes:
71 """Read a 16 bytes aligned, null terminated string."""
72 filename = b""
73 counter = 0
74 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH:
75 filename += file.read(STRING_ALIGNMENT)
76 counter += STRING_ALIGNMENT
77 return filename.rstrip(b"\x00")
78
79
80class FileHeader:
81 addr: int
82 next_filehdr: int
83 spec_info: int
84 fs_type: FSType
85 executable: bool
86 size: int
87 checksum: int
88 filename: bytes
89 depth: int = -1
90 parent: FileHeader | None = None
91 start_offset: int
92 end_offset: int
93 file: File
94
95 def __init__(self, addr: int, file: File):
96 self.addr = addr
97 fs_typeexec_next = struct.unpack(">L", file.read(4))[0]
98 self.next_filehdr = fs_typeexec_next & ~0b1111
99 self.fs_type = FSType(fs_typeexec_next & 0b0111)
100 self.executable = fs_typeexec_next & 0b1000
101 self.spec_info = struct.unpack(">I", file.read(4))[0]
102 self.size = struct.unpack(">I", file.read(4))[0]
103 self.checksum = struct.unpack(">I", file.read(4))[0]
104 self.filename = get_string(file)
105 self.start_offset = file.tell()
106 self.file = file
107
108 def valid_checksum(self) -> bool:
109 current_position = self.file.tell()
110 try:
111 self.file.seek(self.addr, io.SEEK_SET)
112 filename_len = len(self.filename)
113 header_size = 16 + round_up(filename_len, 16)
114 return valid_checksum(self.file.read(header_size))
115 finally:
116 self.file.seek(current_position, io.SEEK_SET)
117
118 @property
119 def content(self) -> bytes:
120 """Returns the file content. Applicable to files and symlinks."""
121 if self.start_offset + self.size > self.file.size():
122 raise RomFSError("Inode size extends past the end of the file")
123 try:
124 self.file.seek(self.start_offset, io.SEEK_SET)
125 return self.file.read(self.size)
126 finally:
127 self.file.seek(-self.size, io.SEEK_CUR)
128
129 @property
130 def mode(self) -> int:
131 """Permission mode.
132
133 It is assumed to be world readable if executable bit is set,
134 and world executable otherwise. Handle mode for both block
135 device and character devices too.
136 """
137 mode = WORLD_RWX if self.executable else WORLD_RW
138 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0
139 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0
140 return mode
141
142 @property
143 def dev(self) -> int:
144 """Raw device number if block device or character device, zero otherwise."""
145 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
146 major = self.spec_info >> 16
147 minor = self.spec_info & 0xFFFF
148 return os.makedev(major, minor)
149 return 0
150
151 @property
152 def path(self) -> Path:
153 """Returns the full path of this file, up to the RomFS root."""
154 current_node = self
155 current_path = Path()
156 while current_node is not None:
157 current_path = Path(current_node.filename.decode("utf-8")).joinpath(
158 current_path
159 )
160 current_node = current_node.parent
161 return current_path
162
163 def __repr__(self):
164 return (
165 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type},"
166 f" executable:{self.executable}, spec_info:{self.spec_info},"
167 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>"
168 )
169
170
171class RomFSError(Exception):
172 pass
173
174
175class RomFSHeader:
176 signature: bytes
177 full_size: int
178 checksum: int
179 volume_name: bytes
180 eof: int
181 file: File
182 end_offset: int
183 inodes: dict[int, FileHeader]
184 fs: FileSystem
185
186 def __init__(
187 self,
188 file: File,
189 fs: FileSystem,
190 ):
191 self.file = file
192 self.file.seek(0, io.SEEK_END)
193 self.eof = self.file.tell()
194 self.file.seek(0, io.SEEK_SET)
195
196 if self.eof < ROMFS_HEADER_SIZE:
197 raise RomFSError("File too small to hold ROMFS")
198
199 self.signature = self.file.read(8)
200 self.full_size = struct.unpack(">I", self.file.read(4))[0]
201 self.checksum = struct.unpack(">I", self.file.read(4))[0]
202 self.volume_name = get_string(self.file)
203 self.header_end_offset = self.file.tell()
204 self.inodes = {}
205
206 self.fs = fs
207
208 def valid_checksum(self) -> bool:
209 current_position = self.file.tell()
210 try:
211 self.file.seek(0, io.SEEK_SET)
212 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE))
213 finally:
214 self.file.seek(current_position, io.SEEK_SET)
215
216 def validate(self):
217 if self.signature != ROMFS_SIGNATURE:
218 raise RomFSError("Invalid RomFS signature")
219 if self.full_size > self.eof:
220 raise RomFSError("ROMFS size is greater than file size")
221 if not self.valid_checksum():
222 raise RomFSError("Invalid checksum")
223
224 def is_valid_addr(self, addr):
225 """Validate that an inode address is valid.
226
227 Inodes addresses must be 16 bytes aligned and placed within
228 the RomFS on file.
229 """
230 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0)
231
232 def is_recursive(self, addr) -> bool:
233 return addr in self.inodes
234
235 def recursive_walk(self, addr: int, parent: FileHeader | None = None):
236 while self.is_valid_addr(addr) is True:
237 addr = self.walk_dir(addr, parent)
238
239 def walk_dir(self, addr: int, parent: FileHeader | None = None):
240 self.file.seek(addr, io.SEEK_SET)
241 file_header = FileHeader(addr, self.file)
242 file_header.parent = parent
243
244 if not file_header.valid_checksum():
245 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.")
246
247 logger.debug("walking dir", addr=addr, file=file_header)
248
249 if file_header.filename not in [b".", b".."]:
250 if (
251 file_header.fs_type == FSType.DIRECTORY
252 and file_header.spec_info != 0x0
253 and not self.is_recursive(addr)
254 ):
255 self.inodes[addr] = file_header
256 self.recursive_walk(file_header.spec_info, file_header)
257 self.inodes[addr] = file_header
258 return file_header.next_filehdr
259
260 def create_symlink(self, output_path: Path, inode: FileHeader):
261 target_path = Path(inode.content.decode("utf-8"))
262 self.fs.create_symlink(src=target_path, dst=output_path)
263
264 def create_hardlink(self, output_path: Path, inode: FileHeader):
265 if inode.spec_info in self.inodes:
266 target_path = self.inodes[inode.spec_info].path
267 self.fs.create_hardlink(dst=output_path, src=target_path)
268 else:
269 logger.warning("Invalid hard link target", inode_key=inode.spec_info)
270
271 def create_inode(self, inode: FileHeader):
272 output_path = inode.path
273 logger.info("dumping inode", inode=inode, output_path=str(output_path))
274
275 if inode.fs_type == FSType.HARD_LINK:
276 self.create_hardlink(output_path, inode)
277 elif inode.fs_type == FSType.SYMLINK:
278 self.create_symlink(output_path, inode)
279 elif inode.fs_type == FSType.DIRECTORY:
280 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True)
281 elif inode.fs_type == FSType.FILE:
282 self.fs.write_bytes(output_path, inode.content)
283 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]:
284 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev)
285 elif inode.fs_type == FSType.FIFO:
286 self.fs.mkfifo(output_path, mode=inode.mode)
287
288 def dump_fs(self):
289 def inodes(*inode_types):
290 return sorted(
291 (v for v in self.inodes.values() if v.fs_type in inode_types),
292 key=lambda inode: inode.path,
293 )
294
295 # order of file object creation is important
296 sorted_inodes = (
297 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET)
298 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV)
299 + inodes(FSType.SYMLINK, FSType.HARD_LINK)
300 )
301
302 for inode in sorted_inodes:
303 self.create_inode(inode)
304
305 def __str__(self):
306 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}"
307
308
309class RomfsExtractor(Extractor):
310 def extract(self, inpath: Path, outdir: Path):
311 fs = FileSystem(outdir)
312 with File.from_path(inpath) as f:
313 header = RomFSHeader(f, fs)
314 header.validate()
315 header.recursive_walk(header.header_end_offset, None)
316 header.dump_fs()
317 return ExtractResult(reports=fs.problems)
318
319
320class RomFSFSHandler(StructHandler):
321 NAME = "romfs"
322
323 PATTERNS = [
324 # '-rom1fs-'
325 HexString("2D 72 6F 6D 31 66 73 2d")
326 ]
327
328 C_DEFINITIONS = r"""
329 struct romfs_header {
330 char magic[8];
331 uint32 full_size;
332 uint32 checksum;
333 }
334 """
335 HEADER_STRUCT = "romfs_header"
336 EXTRACTOR = RomfsExtractor()
337
338 DOC = HandlerDoc(
339 name="RomFS",
340 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.",
341 handler_type=HandlerType.FILESYSTEM,
342 vendor=None,
343 references=[
344 Reference(
345 title="RomFS Documentation",
346 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html",
347 ),
348 Reference(
349 title="RomFS Wikipedia",
350 url="https://en.wikipedia.org/wiki/Romfs",
351 ),
352 ],
353 limitations=[],
354 )
355
356 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
357 if not valid_checksum(file.read(512)):
358 raise InvalidInputFormat("Invalid RomFS checksum.")
359
360 file.seek(-512, io.SEEK_CUR)
361
362 # Every multi byte value must be in big endian order.
363 header = self.parse_header(file, Endian.BIG)
364
365 # The zero terminated name of the volume, padded to 16 byte boundary.
366 get_string(file)
367
368 # seek filesystem size (number of accessible bytes in this fs)
369 # from the actual end of the header
370 file.seek(header.full_size, io.SEEK_CUR)
371
372 # Another thing to note is that romfs works on file headers and data
373 # aligned to 16 byte boundaries, but most hardware devices and the block
374 # device drivers are unable to cope with smaller than block-sized data.
375 # To overcome this limitation, the whole size of the file system must be
376 # padded to an 1024 byte boundary.
377 read_until_past(file, b"\x00")
378
379 return ValidChunk(
380 start_offset=start_offset,
381 end_offset=file.tell(),
382 )