Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/romfs.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

220 statements  

1import io 

2import os 

3import stat 

4import struct 

5from enum import IntEnum, unique 

6from pathlib import Path 

7from typing import Optional 

8 

9from structlog import get_logger 

10 

11from ...file_utils import ( 

12 Endian, 

13 FileSystem, 

14 InvalidInputFormat, 

15 read_until_past, 

16 round_up, 

17) 

18from ...models import ( 

19 Extractor, 

20 ExtractResult, 

21 File, 

22 HandlerDoc, 

23 HandlerType, 

24 HexString, 

25 Reference, 

26 StructHandler, 

27 ValidChunk, 

28) 

29 

30logger = get_logger() 

31 

32 

33STRING_ALIGNMENT = 16 

34MAX_LINUX_PATH_LENGTH = 0xFF 

35MAX_UINT32 = 0x100000000 

36 

37 

38WORLD_RW = 0o666 

39WORLD_RWX = 0o777 

40ROMFS_HEADER_SIZE = 512 

41ROMFS_SIGNATURE = b"-rom1fs-" 

42 

43 

44@unique 

45class FSType(IntEnum): 

46 HARD_LINK = 0 

47 DIRECTORY = 1 

48 FILE = 2 

49 SYMLINK = 3 

50 BLOCK_DEV = 4 

51 CHAR_DEV = 5 

52 SOCKET = 6 

53 FIFO = 7 

54 

55 

56def valid_checksum(content: bytes) -> bool: 

57 """Apply a RomFS checksum and returns whether it's valid or not.""" 

58 total = 0 

59 

60 # unalign content will lead to unpacking errors down the line 

61 if len(content) % 4 != 0: 

62 return False 

63 

64 for i in range(0, len(content), 4): 

65 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32 

66 return total == 0 

67 

68 

69def get_string(file: File) -> bytes: 

70 """Read a 16 bytes aligned, null terminated string.""" 

71 filename = b"" 

72 counter = 0 

73 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH: 

74 filename += file.read(STRING_ALIGNMENT) 

75 counter += STRING_ALIGNMENT 

76 return filename.rstrip(b"\x00") 

77 

78 

79class FileHeader: 

80 addr: int 

81 next_filehdr: int 

82 spec_info: int 

83 fs_type: FSType 

84 executable: bool 

85 size: int 

86 checksum: int 

87 filename: bytes 

88 depth: int = -1 

89 parent: Optional["FileHeader"] = None 

90 start_offset: int 

91 end_offset: int 

92 file: File 

93 

94 def __init__(self, addr: int, file: File): 

95 self.addr = addr 

96 fs_typeexec_next = struct.unpack(">L", file.read(4))[0] 

97 self.next_filehdr = fs_typeexec_next & ~0b1111 

98 self.fs_type = FSType(fs_typeexec_next & 0b0111) 

99 self.executable = fs_typeexec_next & 0b1000 

100 self.spec_info = struct.unpack(">I", file.read(4))[0] 

101 self.size = struct.unpack(">I", file.read(4))[0] 

102 self.checksum = struct.unpack(">I", file.read(4))[0] 

103 self.filename = get_string(file) 

104 self.start_offset = file.tell() 

105 self.file = file 

106 

107 def valid_checksum(self) -> bool: 

108 current_position = self.file.tell() 

109 try: 

110 self.file.seek(self.addr, io.SEEK_SET) 

111 filename_len = len(self.filename) 

112 header_size = 16 + round_up(filename_len, 16) 

113 return valid_checksum(self.file.read(header_size)) 

114 finally: 

115 self.file.seek(current_position, io.SEEK_SET) 

116 

117 @property 

118 def content(self) -> bytes: 

119 """Returns the file content. Applicable to files and symlinks.""" 

120 try: 

121 self.file.seek(self.start_offset, io.SEEK_SET) 

122 return self.file.read(self.size) 

123 finally: 

124 self.file.seek(-self.size, io.SEEK_CUR) 

125 

126 @property 

127 def mode(self) -> int: 

128 """Permission mode. 

129 

130 It is assumed to be world readable if executable bit is set, 

131 and world executable otherwise. Handle mode for both block 

132 device and character devices too. 

133 """ 

134 mode = WORLD_RWX if self.executable else WORLD_RW 

135 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0 

136 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0 

137 return mode 

138 

139 @property 

140 def dev(self) -> int: 

141 """Raw device number if block device or character device, zero otherwise.""" 

142 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

143 major = self.spec_info >> 16 

144 minor = self.spec_info & 0xFFFF 

145 return os.makedev(major, minor) 

146 return 0 

147 

148 @property 

149 def path(self) -> Path: 

150 """Returns the full path of this file, up to the RomFS root.""" 

151 current_node = self 

152 current_path = Path() 

153 while current_node is not None: 

154 current_path = Path(current_node.filename.decode("utf-8")).joinpath( 

155 current_path 

156 ) 

157 current_node = current_node.parent 

158 return current_path 

159 

160 def __repr__(self): 

161 return ( 

162 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type}," 

163 f" executable:{self.executable}, spec_info:{self.spec_info}," 

164 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>" 

165 ) 

166 

167 

168class RomFSError(Exception): 

169 pass 

170 

171 

172class RomFSHeader: 

173 signature: bytes 

174 full_size: int 

175 checksum: int 

176 volume_name: bytes 

177 eof: int 

178 file: File 

179 end_offset: int 

180 inodes: dict[int, "FileHeader"] 

181 fs: FileSystem 

182 

183 def __init__( 

184 self, 

185 file: File, 

186 fs: FileSystem, 

187 ): 

188 self.file = file 

189 self.file.seek(0, io.SEEK_END) 

190 self.eof = self.file.tell() 

191 self.file.seek(0, io.SEEK_SET) 

192 

193 if self.eof < ROMFS_HEADER_SIZE: 

194 raise RomFSError("File too small to hold ROMFS") 

195 

196 self.signature = self.file.read(8) 

197 self.full_size = struct.unpack(">I", self.file.read(4))[0] 

198 self.checksum = struct.unpack(">I", self.file.read(4))[0] 

199 self.volume_name = get_string(self.file) 

200 self.header_end_offset = self.file.tell() 

201 self.inodes = {} 

202 

203 self.fs = fs 

204 

205 def valid_checksum(self) -> bool: 

206 current_position = self.file.tell() 

207 try: 

208 self.file.seek(0, io.SEEK_SET) 

209 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE)) 

210 finally: 

211 self.file.seek(current_position, io.SEEK_SET) 

212 

213 def validate(self): 

214 if self.signature != ROMFS_SIGNATURE: 

215 raise RomFSError("Invalid RomFS signature") 

216 if self.full_size > self.eof: 

217 raise RomFSError("ROMFS size is greater than file size") 

218 if not self.valid_checksum(): 

219 raise RomFSError("Invalid checksum") 

220 

221 def is_valid_addr(self, addr): 

222 """Validate that an inode address is valid. 

223 

224 Inodes addresses must be 16 bytes aligned and placed within 

225 the RomFS on file. 

226 """ 

227 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0) 

228 

229 def is_recursive(self, addr) -> bool: 

230 return addr in self.inodes 

231 

232 def recursive_walk(self, addr: int, parent: Optional[FileHeader] = None): 

233 while self.is_valid_addr(addr) is True: 

234 addr = self.walk_dir(addr, parent) 

235 

236 def walk_dir(self, addr: int, parent: Optional[FileHeader] = None): 

237 self.file.seek(addr, io.SEEK_SET) 

238 file_header = FileHeader(addr, self.file) 

239 file_header.parent = parent 

240 

241 if not file_header.valid_checksum(): 

242 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.") 

243 

244 logger.debug("walking dir", addr=addr, file=file_header) 

245 

246 if file_header.filename not in [b".", b".."]: 

247 if ( 

248 file_header.fs_type == FSType.DIRECTORY 

249 and file_header.spec_info != 0x0 

250 and not self.is_recursive(addr) 

251 ): 

252 self.inodes[addr] = file_header 

253 self.recursive_walk(file_header.spec_info, file_header) 

254 self.inodes[addr] = file_header 

255 return file_header.next_filehdr 

256 

257 def create_symlink(self, output_path: Path, inode: FileHeader): 

258 target_path = Path(inode.content.decode("utf-8")) 

259 self.fs.create_symlink(src=target_path, dst=output_path) 

260 

261 def create_hardlink(self, output_path: Path, inode: FileHeader): 

262 if inode.spec_info in self.inodes: 

263 target_path = self.inodes[inode.spec_info].path 

264 self.fs.create_hardlink(dst=output_path, src=target_path) 

265 else: 

266 logger.warning("Invalid hard link target", inode_key=inode.spec_info) 

267 

268 def create_inode(self, inode: FileHeader): 

269 output_path = inode.path 

270 logger.info("dumping inode", inode=inode, output_path=str(output_path)) 

271 

272 if inode.fs_type == FSType.HARD_LINK: 

273 self.create_hardlink(output_path, inode) 

274 elif inode.fs_type == FSType.SYMLINK: 

275 self.create_symlink(output_path, inode) 

276 elif inode.fs_type == FSType.DIRECTORY: 

277 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True) 

278 elif inode.fs_type == FSType.FILE: 

279 self.fs.write_bytes(output_path, inode.content) 

280 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

281 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev) 

282 elif inode.fs_type == FSType.FIFO: 

283 self.fs.mkfifo(output_path, mode=inode.mode) 

284 

285 def dump_fs(self): 

286 def inodes(*inode_types): 

287 return sorted( 

288 (v for v in self.inodes.values() if v.fs_type in inode_types), 

289 key=lambda inode: inode.path, 

290 ) 

291 

292 # order of file object creation is important 

293 sorted_inodes = ( 

294 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET) 

295 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV) 

296 + inodes(FSType.SYMLINK, FSType.HARD_LINK) 

297 ) 

298 

299 for inode in sorted_inodes: 

300 self.create_inode(inode) 

301 

302 def __str__(self): 

303 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}" 

304 

305 

306class RomfsExtractor(Extractor): 

307 def extract(self, inpath: Path, outdir: Path): 

308 fs = FileSystem(outdir) 

309 with File.from_path(inpath) as f: 

310 header = RomFSHeader(f, fs) 

311 header.validate() 

312 header.recursive_walk(header.header_end_offset, None) 

313 header.dump_fs() 

314 return ExtractResult(reports=fs.problems) 

315 

316 

317class RomFSFSHandler(StructHandler): 

318 NAME = "romfs" 

319 

320 PATTERNS = [ 

321 # '-rom1fs-' 

322 HexString("2D 72 6F 6D 31 66 73 2d") 

323 ] 

324 

325 C_DEFINITIONS = r""" 

326 struct romfs_header { 

327 char magic[8]; 

328 uint32 full_size; 

329 uint32 checksum; 

330 } 

331 """ 

332 HEADER_STRUCT = "romfs_header" 

333 EXTRACTOR = RomfsExtractor() 

334 

335 DOC = HandlerDoc( 

336 name="RomFS", 

337 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.", 

338 handler_type=HandlerType.FILESYSTEM, 

339 vendor=None, 

340 references=[ 

341 Reference( 

342 title="RomFS Documentation", 

343 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html", 

344 ), 

345 Reference( 

346 title="RomFS Wikipedia", 

347 url="https://en.wikipedia.org/wiki/Romfs", 

348 ), 

349 ], 

350 limitations=[], 

351 ) 

352 

353 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

354 if not valid_checksum(file.read(512)): 

355 raise InvalidInputFormat("Invalid RomFS checksum.") 

356 

357 file.seek(-512, io.SEEK_CUR) 

358 

359 # Every multi byte value must be in big endian order. 

360 header = self.parse_header(file, Endian.BIG) 

361 

362 # The zero terminated name of the volume, padded to 16 byte boundary. 

363 get_string(file) 

364 

365 # seek filesystem size (number of accessible bytes in this fs) 

366 # from the actual end of the header 

367 file.seek(header.full_size, io.SEEK_CUR) 

368 

369 # Another thing to note is that romfs works on file headers and data 

370 # aligned to 16 byte boundaries, but most hardware devices and the block 

371 # device drivers are unable to cope with smaller than block-sized data. 

372 # To overcome this limitation, the whole size of the file system must be 

373 # padded to an 1024 byte boundary. 

374 read_until_past(file, b"\x00") 

375 

376 return ValidChunk( 

377 start_offset=start_offset, 

378 end_offset=file.tell(), 

379 )