Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/romfs.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

220 statements  

1from __future__ import annotations 

2 

3import io 

4import os 

5import stat 

6import struct 

7from enum import IntEnum, unique 

8from pathlib import Path 

9 

10from structlog import get_logger 

11 

12from ...file_utils import ( 

13 Endian, 

14 FileSystem, 

15 InvalidInputFormat, 

16 read_until_past, 

17 round_up, 

18) 

19from ...models import ( 

20 Extractor, 

21 ExtractResult, 

22 File, 

23 HandlerDoc, 

24 HandlerType, 

25 HexString, 

26 Reference, 

27 StructHandler, 

28 ValidChunk, 

29) 

30 

31logger = get_logger() 

32 

33 

34STRING_ALIGNMENT = 16 

35MAX_LINUX_PATH_LENGTH = 0xFF 

36MAX_UINT32 = 0x100000000 

37 

38 

39WORLD_RW = 0o666 

40WORLD_RWX = 0o777 

41ROMFS_HEADER_SIZE = 512 

42ROMFS_SIGNATURE = b"-rom1fs-" 

43 

44 

45@unique 

46class FSType(IntEnum): 

47 HARD_LINK = 0 

48 DIRECTORY = 1 

49 FILE = 2 

50 SYMLINK = 3 

51 BLOCK_DEV = 4 

52 CHAR_DEV = 5 

53 SOCKET = 6 

54 FIFO = 7 

55 

56 

57def valid_checksum(content: bytes) -> bool: 

58 """Apply a RomFS checksum and returns whether it's valid or not.""" 

59 total = 0 

60 

61 # unalign content will lead to unpacking errors down the line 

62 if len(content) % 4 != 0: 

63 return False 

64 

65 for i in range(0, len(content), 4): 

66 total = (total + struct.unpack(">L", content[i : i + 4])[0]) % MAX_UINT32 

67 return total == 0 

68 

69 

70def get_string(file: File) -> bytes: 

71 """Read a 16 bytes aligned, null terminated string.""" 

72 filename = b"" 

73 counter = 0 

74 while b"\x00" not in filename and counter < MAX_LINUX_PATH_LENGTH: 

75 filename += file.read(STRING_ALIGNMENT) 

76 counter += STRING_ALIGNMENT 

77 return filename.rstrip(b"\x00") 

78 

79 

80class FileHeader: 

81 addr: int 

82 next_filehdr: int 

83 spec_info: int 

84 fs_type: FSType 

85 executable: bool 

86 size: int 

87 checksum: int 

88 filename: bytes 

89 depth: int = -1 

90 parent: FileHeader | None = None 

91 start_offset: int 

92 end_offset: int 

93 file: File 

94 

95 def __init__(self, addr: int, file: File): 

96 self.addr = addr 

97 fs_typeexec_next = struct.unpack(">L", file.read(4))[0] 

98 self.next_filehdr = fs_typeexec_next & ~0b1111 

99 self.fs_type = FSType(fs_typeexec_next & 0b0111) 

100 self.executable = fs_typeexec_next & 0b1000 

101 self.spec_info = struct.unpack(">I", file.read(4))[0] 

102 self.size = struct.unpack(">I", file.read(4))[0] 

103 self.checksum = struct.unpack(">I", file.read(4))[0] 

104 self.filename = get_string(file) 

105 self.start_offset = file.tell() 

106 self.file = file 

107 

108 def valid_checksum(self) -> bool: 

109 current_position = self.file.tell() 

110 try: 

111 self.file.seek(self.addr, io.SEEK_SET) 

112 filename_len = len(self.filename) 

113 header_size = 16 + round_up(filename_len, 16) 

114 return valid_checksum(self.file.read(header_size)) 

115 finally: 

116 self.file.seek(current_position, io.SEEK_SET) 

117 

118 @property 

119 def content(self) -> bytes: 

120 """Returns the file content. Applicable to files and symlinks.""" 

121 try: 

122 self.file.seek(self.start_offset, io.SEEK_SET) 

123 return self.file.read(self.size) 

124 finally: 

125 self.file.seek(-self.size, io.SEEK_CUR) 

126 

127 @property 

128 def mode(self) -> int: 

129 """Permission mode. 

130 

131 It is assumed to be world readable if executable bit is set, 

132 and world executable otherwise. Handle mode for both block 

133 device and character devices too. 

134 """ 

135 mode = WORLD_RWX if self.executable else WORLD_RW 

136 mode |= stat.S_IFBLK if self.fs_type == FSType.BLOCK_DEV else 0x0 

137 mode |= stat.S_IFCHR if self.fs_type == FSType.CHAR_DEV else 0x0 

138 return mode 

139 

140 @property 

141 def dev(self) -> int: 

142 """Raw device number if block device or character device, zero otherwise.""" 

143 if self.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

144 major = self.spec_info >> 16 

145 minor = self.spec_info & 0xFFFF 

146 return os.makedev(major, minor) 

147 return 0 

148 

149 @property 

150 def path(self) -> Path: 

151 """Returns the full path of this file, up to the RomFS root.""" 

152 current_node = self 

153 current_path = Path() 

154 while current_node is not None: 

155 current_path = Path(current_node.filename.decode("utf-8")).joinpath( 

156 current_path 

157 ) 

158 current_node = current_node.parent 

159 return current_path 

160 

161 def __repr__(self): 

162 return ( 

163 f"FileHeader<next_filehdr:{self.next_filehdr}, type:{self.fs_type}," 

164 f" executable:{self.executable}, spec_info:{self.spec_info}," 

165 f" size:{self.size}, checksum:{self.checksum}, filename:{self.filename}>" 

166 ) 

167 

168 

169class RomFSError(Exception): 

170 pass 

171 

172 

173class RomFSHeader: 

174 signature: bytes 

175 full_size: int 

176 checksum: int 

177 volume_name: bytes 

178 eof: int 

179 file: File 

180 end_offset: int 

181 inodes: dict[int, FileHeader] 

182 fs: FileSystem 

183 

184 def __init__( 

185 self, 

186 file: File, 

187 fs: FileSystem, 

188 ): 

189 self.file = file 

190 self.file.seek(0, io.SEEK_END) 

191 self.eof = self.file.tell() 

192 self.file.seek(0, io.SEEK_SET) 

193 

194 if self.eof < ROMFS_HEADER_SIZE: 

195 raise RomFSError("File too small to hold ROMFS") 

196 

197 self.signature = self.file.read(8) 

198 self.full_size = struct.unpack(">I", self.file.read(4))[0] 

199 self.checksum = struct.unpack(">I", self.file.read(4))[0] 

200 self.volume_name = get_string(self.file) 

201 self.header_end_offset = self.file.tell() 

202 self.inodes = {} 

203 

204 self.fs = fs 

205 

206 def valid_checksum(self) -> bool: 

207 current_position = self.file.tell() 

208 try: 

209 self.file.seek(0, io.SEEK_SET) 

210 return valid_checksum(self.file.read(ROMFS_HEADER_SIZE)) 

211 finally: 

212 self.file.seek(current_position, io.SEEK_SET) 

213 

214 def validate(self): 

215 if self.signature != ROMFS_SIGNATURE: 

216 raise RomFSError("Invalid RomFS signature") 

217 if self.full_size > self.eof: 

218 raise RomFSError("ROMFS size is greater than file size") 

219 if not self.valid_checksum(): 

220 raise RomFSError("Invalid checksum") 

221 

222 def is_valid_addr(self, addr): 

223 """Validate that an inode address is valid. 

224 

225 Inodes addresses must be 16 bytes aligned and placed within 

226 the RomFS on file. 

227 """ 

228 return (self.header_end_offset <= addr <= self.eof) and (addr % 16 == 0) 

229 

230 def is_recursive(self, addr) -> bool: 

231 return addr in self.inodes 

232 

233 def recursive_walk(self, addr: int, parent: FileHeader | None = None): 

234 while self.is_valid_addr(addr) is True: 

235 addr = self.walk_dir(addr, parent) 

236 

237 def walk_dir(self, addr: int, parent: FileHeader | None = None): 

238 self.file.seek(addr, io.SEEK_SET) 

239 file_header = FileHeader(addr, self.file) 

240 file_header.parent = parent 

241 

242 if not file_header.valid_checksum(): 

243 raise RomFSError(f"Invalid file CRC at addr {addr:0x}.") 

244 

245 logger.debug("walking dir", addr=addr, file=file_header) 

246 

247 if file_header.filename not in [b".", b".."]: 

248 if ( 

249 file_header.fs_type == FSType.DIRECTORY 

250 and file_header.spec_info != 0x0 

251 and not self.is_recursive(addr) 

252 ): 

253 self.inodes[addr] = file_header 

254 self.recursive_walk(file_header.spec_info, file_header) 

255 self.inodes[addr] = file_header 

256 return file_header.next_filehdr 

257 

258 def create_symlink(self, output_path: Path, inode: FileHeader): 

259 target_path = Path(inode.content.decode("utf-8")) 

260 self.fs.create_symlink(src=target_path, dst=output_path) 

261 

262 def create_hardlink(self, output_path: Path, inode: FileHeader): 

263 if inode.spec_info in self.inodes: 

264 target_path = self.inodes[inode.spec_info].path 

265 self.fs.create_hardlink(dst=output_path, src=target_path) 

266 else: 

267 logger.warning("Invalid hard link target", inode_key=inode.spec_info) 

268 

269 def create_inode(self, inode: FileHeader): 

270 output_path = inode.path 

271 logger.info("dumping inode", inode=inode, output_path=str(output_path)) 

272 

273 if inode.fs_type == FSType.HARD_LINK: 

274 self.create_hardlink(output_path, inode) 

275 elif inode.fs_type == FSType.SYMLINK: 

276 self.create_symlink(output_path, inode) 

277 elif inode.fs_type == FSType.DIRECTORY: 

278 self.fs.mkdir(output_path, mode=inode.mode, exist_ok=True) 

279 elif inode.fs_type == FSType.FILE: 

280 self.fs.write_bytes(output_path, inode.content) 

281 elif inode.fs_type in [FSType.BLOCK_DEV, FSType.CHAR_DEV]: 

282 self.fs.mknod(output_path, mode=inode.mode, device=inode.dev) 

283 elif inode.fs_type == FSType.FIFO: 

284 self.fs.mkfifo(output_path, mode=inode.mode) 

285 

286 def dump_fs(self): 

287 def inodes(*inode_types): 

288 return sorted( 

289 (v for v in self.inodes.values() if v.fs_type in inode_types), 

290 key=lambda inode: inode.path, 

291 ) 

292 

293 # order of file object creation is important 

294 sorted_inodes = ( 

295 inodes(FSType.FILE, FSType.DIRECTORY, FSType.FIFO, FSType.SOCKET) 

296 + inodes(FSType.BLOCK_DEV, FSType.CHAR_DEV) 

297 + inodes(FSType.SYMLINK, FSType.HARD_LINK) 

298 ) 

299 

300 for inode in sorted_inodes: 

301 self.create_inode(inode) 

302 

303 def __str__(self): 

304 return f"signature: {self.signature}\nfull_size: {self.full_size}\nchecksum: {self.checksum}\nvolume_name: {self.volume_name}" 

305 

306 

307class RomfsExtractor(Extractor): 

308 def extract(self, inpath: Path, outdir: Path): 

309 fs = FileSystem(outdir) 

310 with File.from_path(inpath) as f: 

311 header = RomFSHeader(f, fs) 

312 header.validate() 

313 header.recursive_walk(header.header_end_offset, None) 

314 header.dump_fs() 

315 return ExtractResult(reports=fs.problems) 

316 

317 

318class RomFSFSHandler(StructHandler): 

319 NAME = "romfs" 

320 

321 PATTERNS = [ 

322 # '-rom1fs-' 

323 HexString("2D 72 6F 6D 31 66 73 2d") 

324 ] 

325 

326 C_DEFINITIONS = r""" 

327 struct romfs_header { 

328 char magic[8]; 

329 uint32 full_size; 

330 uint32 checksum; 

331 } 

332 """ 

333 HEADER_STRUCT = "romfs_header" 

334 EXTRACTOR = RomfsExtractor() 

335 

336 DOC = HandlerDoc( 

337 name="RomFS", 

338 description="RomFS is a simple, space-efficient, read-only file system format designed for embedded systems. It features 16-byte alignment, minimal metadata overhead, and supports basic file types like directories, files, symlinks, and devices.", 

339 handler_type=HandlerType.FILESYSTEM, 

340 vendor=None, 

341 references=[ 

342 Reference( 

343 title="RomFS Documentation", 

344 url="https://www.kernel.org/doc/html/latest/filesystems/romfs.html", 

345 ), 

346 Reference( 

347 title="RomFS Wikipedia", 

348 url="https://en.wikipedia.org/wiki/Romfs", 

349 ), 

350 ], 

351 limitations=[], 

352 ) 

353 

354 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

355 if not valid_checksum(file.read(512)): 

356 raise InvalidInputFormat("Invalid RomFS checksum.") 

357 

358 file.seek(-512, io.SEEK_CUR) 

359 

360 # Every multi byte value must be in big endian order. 

361 header = self.parse_header(file, Endian.BIG) 

362 

363 # The zero terminated name of the volume, padded to 16 byte boundary. 

364 get_string(file) 

365 

366 # seek filesystem size (number of accessible bytes in this fs) 

367 # from the actual end of the header 

368 file.seek(header.full_size, io.SEEK_CUR) 

369 

370 # Another thing to note is that romfs works on file headers and data 

371 # aligned to 16 byte boundaries, but most hardware devices and the block 

372 # device drivers are unable to cope with smaller than block-sized data. 

373 # To overcome this limitation, the whole size of the file system must be 

374 # padded to an 1024 byte boundary. 

375 read_until_past(file, b"\x00") 

376 

377 return ValidChunk( 

378 start_offset=start_offset, 

379 end_offset=file.tell(), 

380 )