Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/romfs.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1from __future__ import annotations 

2 

3import io 

4import os 

5import stat 

6import struct 

7from enum import IntEnum, unique 

8from pathlib import Path 

9 

10from structlog import get_logger 

11 

12from ...file_utils import ( 

13 Endian, 

14 FileSystem, 

15 InvalidInputFormat, 

16 read_until_past, 

17 round_up, 

18) 

19from ...models import ( 

20 Extractor, 

21 ExtractResult, 

22 File, 

23 HandlerDoc, 

24 HandlerType, 

25 HexString, 

26 Reference, 

27 StructHandler, 

28 ValidChunk, 

29) 

30 

31logger = get_logger() 

32 

33 

34STRING_ALIGNMENT = 16 

35MAX_LINUX_PATH_LENGTH = 0xFF 

36MAX_UINT32 = 0x100000000 

37 

38 

39WORLD_RW = 0o666 

40WORLD_RWX = 0o777 

41ROMFS_HEADER_SIZE = 512 

42ROMFS_SIGNATURE = b"-rom1fs-" 

43 

44 

45@unique 

46class FSType(IntEnum): 

47 HARD_LINK = 0 

48 DIRECTORY = 1 

49 FILE = 2 

50 SYMLINK = 3 

51 BLOCK_DEV = 4 

52 CHAR_DEV = 5 

53 SOCKET = 6 

54 FIFO = 7 

55 

56 

57def valid_checksum(content: bytes) -> bool: 

58 """Apply a RomFS checksum and returns whether it's valid or not.""" 

59 total = 0 

60 

61 # unalign content will lead to unpacking errors down the line 

62 if len(content) % 4 != 0: 

63 return False 

64 

65 for i in range(0, len(content), 4): 

66 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32 

67 return total == 0 

68 

69 

70def get_string(file: File) -> bytes: 

71 """Read a 16 bytes aligned, null terminated string.""" 

72 filename = b"" 

73 counter = 0 

74 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH: 

75 filename += file.read(STRING_ALIGNMENT) 

76 counter += STRING_ALIGNMENT 

77 return filename.rstrip(b"\x00") 

78 

79 

80class FileHeader: 

81 addr: int 

82 next_filehdr: int 

83 spec_info: int 

84 fs_type: FSType 

85 executable: bool 

86 size: int 

87 checksum: int 

88 filename: bytes 

89 depth: int = -1 

90 parent: FileHeader | None = None 

91 start_offset: int 

92 end_offset: int 

93 file: File 

94 

95 def __init__(self, addr: int, file: File): 

96 self.addr = addr 

97 fs_typeexec_next = struct.unpack(">L", file.read(4))[0] 

98 self.next_filehdr = fs_typeexec_next & ~0b1111 

99 self.fs_type = FSType(fs_typeexec_next & 0b0111) 

100 self.executable = fs_typeexec_next & 0b1000 

101 self.spec_info = struct.unpack(">I", file.read(4))[0] 

102 self.size = struct.unpack(">I", file.read(4))[0] 

103 self.checksum = struct.unpack(">I", file.read(4))[0] 

104 self.filename = get_string(file) 

105 self.start_offset = file.tell() 

106 self.file = file 

107 

108 def valid_checksum(self) -> bool: 

109 current_position = self.file.tell() 

110 try: 

111 self.file.seek(self.addr, io.SEEK_SET) 

112 filename_len = len(self.filename) 

113 header_size = 16 + round_up(filename_len, 16) 

114 return valid_checksum(self.file.read(header_size)) 

115 finally: 

116 self.file.seek(current_position, io.SEEK_SET) 

117 

118 @property 

119 def content(self) -> bytes: 

120 """Returns the file content. Applicable to files and symlinks.""" 

121 if self.start_offset + self.size > self.file.size(): 

122 raise RomFSError("Inode size extends past the end of the file") 

123 try: 

124 self.file.seek(self.start_offset, io.SEEK_SET) 

125 return self.file.read(self.size) 

126 finally: 

127 self.file.seek(-self.size, io.SEEK_CUR) 

128 

129 @property 

130 def mode(self) -> int: 

131 """Permission mode. 

132 

133 It is assumed to be world readable if executable bit is set, 

134 and world executable otherwise. Handle mode for both block 

135 device and character devices too. 

136 """ 

137 mode = WORLD_RWX if self.executable else WORLD_RW 

138 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0 

139 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0 

140 return mode 

141 

142 @property 

143 def dev(self) -> int: 

144 """Raw device number if block device or character device, zero otherwise.""" 

145 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

146 major = self.spec_info >> 16 

147 minor = self.spec_info & 0xFFFF 

148 return os.makedev(major, minor) 

149 return 0 

150 

151 @property 

152 def path(self) -> Path: 

153 """Returns the full path of this file, up to the RomFS root.""" 

154 current_node = self 

155 current_path = Path() 

156 while current_node is not None: 

157 current_path = Path(current_node.filename.decode("utf-8")).joinpath( 

158 current_path 

159 ) 

160 current_node = current_node.parent 

161 return current_path 

162 

163 def __repr__(self): 

164 return ( 

165 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type}," 

166 f" executable:{self.executable}, spec_info:{self.spec_info}," 

167 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>" 

168 ) 

169 

170 

171class RomFSError(Exception): 

172 pass 

173 

174 

175class RomFSHeader: 

176 signature: bytes 

177 full_size: int 

178 checksum: int 

179 volume_name: bytes 

180 eof: int 

181 file: File 

182 end_offset: int 

183 inodes: dict[int, FileHeader] 

184 fs: FileSystem 

185 

186 def __init__( 

187 self, 

188 file: File, 

189 fs: FileSystem, 

190 ): 

191 self.file = file 

192 self.file.seek(0, io.SEEK_END) 

193 self.eof = self.file.tell() 

194 self.file.seek(0, io.SEEK_SET) 

195 

196 if self.eof < ROMFS_HEADER_SIZE: 

197 raise RomFSError("File too small to hold ROMFS") 

198 

199 self.signature = self.file.read(8) 

200 self.full_size = struct.unpack(">I", self.file.read(4))[0] 

201 self.checksum = struct.unpack(">I", self.file.read(4))[0] 

202 self.volume_name = get_string(self.file) 

203 self.header_end_offset = self.file.tell() 

204 self.inodes = {} 

205 

206 self.fs = fs 

207 

208 def valid_checksum(self) -> bool: 

209 current_position = self.file.tell() 

210 try: 

211 self.file.seek(0, io.SEEK_SET) 

212 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE)) 

213 finally: 

214 self.file.seek(current_position, io.SEEK_SET) 

215 

216 def validate(self): 

217 if self.signature != ROMFS_SIGNATURE: 

218 raise RomFSError("Invalid RomFS signature") 

219 if self.full_size > self.eof: 

220 raise RomFSError("ROMFS size is greater than file size") 

221 if not self.valid_checksum(): 

222 raise RomFSError("Invalid checksum") 

223 

224 def is_valid_addr(self, addr): 

225 """Validate that an inode address is valid. 

226 

227 Inodes addresses must be 16 bytes aligned and placed within 

228 the RomFS on file. 

229 """ 

230 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0) 

231 

232 def is_recursive(self, addr) -> bool: 

233 return addr in self.inodes 

234 

235 def recursive_walk(self, addr: int, parent: FileHeader | None = None): 

236 while self.is_valid_addr(addr) is True: 

237 addr = self.walk_dir(addr, parent) 

238 

239 def walk_dir(self, addr: int, parent: FileHeader | None = None): 

240 self.file.seek(addr, io.SEEK_SET) 

241 file_header = FileHeader(addr, self.file) 

242 file_header.parent = parent 

243 

244 if not file_header.valid_checksum(): 

245 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.") 

246 

247 logger.debug("walking dir", addr=addr, file=file_header) 

248 

249 if file_header.filename not in [b".", b".."]: 

250 if ( 

251 file_header.fs_type == FSType.DIRECTORY 

252 and file_header.spec_info != 0x0 

253 and not self.is_recursive(addr) 

254 ): 

255 self.inodes[addr] = file_header 

256 self.recursive_walk(file_header.spec_info, file_header) 

257 self.inodes[addr] = file_header 

258 return file_header.next_filehdr 

259 

260 def create_symlink(self, output_path: Path, inode: FileHeader): 

261 target_path = Path(inode.content.decode("utf-8")) 

262 self.fs.create_symlink(src=target_path, dst=output_path) 

263 

264 def create_hardlink(self, output_path: Path, inode: FileHeader): 

265 if inode.spec_info in self.inodes: 

266 target_path = self.inodes[inode.spec_info].path 

267 self.fs.create_hardlink(dst=output_path, src=target_path) 

268 else: 

269 logger.warning("Invalid hard link target", inode_key=inode.spec_info) 

270 

271 def create_inode(self, inode: FileHeader): 

272 output_path = inode.path 

273 logger.info("dumping inode", inode=inode, output_path=str(output_path)) 

274 

275 if inode.fs_type == FSType.HARD_LINK: 

276 self.create_hardlink(output_path, inode) 

277 elif inode.fs_type == FSType.SYMLINK: 

278 self.create_symlink(output_path, inode) 

279 elif inode.fs_type == FSType.DIRECTORY: 

280 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True) 

281 elif inode.fs_type == FSType.FILE: 

282 self.fs.write_bytes(output_path, inode.content) 

283 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

284 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev) 

285 elif inode.fs_type == FSType.FIFO: 

286 self.fs.mkfifo(output_path, mode=inode.mode) 

287 

288 def dump_fs(self): 

289 def inodes(*inode_types): 

290 return sorted( 

291 (v for v in self.inodes.values() if v.fs_type in inode_types), 

292 key=lambda inode: inode.path, 

293 ) 

294 

295 # order of file object creation is important 

296 sorted_inodes = ( 

297 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET) 

298 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV) 

299 + inodes(FSType.SYMLINK, FSType.HARD_LINK) 

300 ) 

301 

302 for inode in sorted_inodes: 

303 self.create_inode(inode) 

304 

305 def __str__(self): 

306 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}" 

307 

308 

309class RomfsExtractor(Extractor): 

310 def extract(self, inpath: Path, outdir: Path): 

311 fs = FileSystem(outdir) 

312 with File.from_path(inpath) as f: 

313 header = RomFSHeader(f, fs) 

314 header.validate() 

315 header.recursive_walk(header.header_end_offset, None) 

316 header.dump_fs() 

317 return ExtractResult(reports=fs.problems) 

318 

319 

320class RomFSFSHandler(StructHandler): 

321 NAME = "romfs" 

322 

323 PATTERNS = [ 

324 # '-rom1fs-' 

325 HexString("2D 72 6F 6D 31 66 73 2d") 

326 ] 

327 

328 C_DEFINITIONS = r""" 

329 struct romfs_header { 

330 char magic[8]; 

331 uint32 full_size; 

332 uint32 checksum; 

333 } 

334 """ 

335 HEADER_STRUCT = "romfs_header" 

336 EXTRACTOR = RomfsExtractor() 

337 

338 DOC = HandlerDoc( 

339 name="RomFS", 

340 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.", 

341 handler_type=HandlerType.FILESYSTEM, 

342 vendor=None, 

343 references=[ 

344 Reference( 

345 title="RomFS Documentation", 

346 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html", 

347 ), 

348 Reference( 

349 title="RomFS Wikipedia", 

350 url="https://en.wikipedia.org/wiki/Romfs", 

351 ), 

352 ], 

353 limitations=[], 

354 ) 

355 

356 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

357 if not valid_checksum(file.read(512)): 

358 raise InvalidInputFormat("Invalid RomFS checksum.") 

359 

360 file.seek(-512, io.SEEK_CUR) 

361 

362 # Every multi byte value must be in big endian order. 

363 header = self.parse_header(file, Endian.BIG) 

364 

365 # The zero terminated name of the volume, padded to 16 byte boundary. 

366 get_string(file) 

367 

368 # seek filesystem size (number of accessible bytes in this fs) 

369 # from the actual end of the header 

370 file.seek(header.full_size, io.SEEK_CUR) 

371 

372 # Another thing to note is that romfs works on file headers and data 

373 # aligned to 16 byte boundaries, but most hardware devices and the block 

374 # device drivers are unable to cope with smaller than block-sized data. 

375 # To overcome this limitation, the whole size of the file system must be 

376 # padded to an 1024 byte boundary. 

377 read_until_past(file, b"\x00") 

378 

379 return ValidChunk( 

380 start_offset=start_offset, 

381 end_offset=file.tell(), 

382 )