Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/minixfs.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

253 statements  

1from __future__ import annotations 

2 

3import io 

4import math 

5import stat 

6import struct 

7from dataclasses import dataclass 

8from pathlib import Path 

9from typing import TYPE_CHECKING 

10 

11from unblob.file_utils import ( 

12 Endian, 

13 FileSystem, 

14 InvalidInputFormat, 

15 StructParser, 

16) 

17from unblob.models import ( 

18 Extractor, 

19 File, 

20 HandlerDoc, 

21 HandlerType, 

22 HexString, 

23 Reference, 

24 StructHandler, 

25 ValidChunk, 

26) 

27 

28if TYPE_CHECKING: 

29 from collections.abc import Iterator 

30 

31 

32@dataclass 

33class MinixInode: 

34 i_mode: int 

35 i_nlinks: int 

36 i_uid: int 

37 i_gid: int 

38 i_size: int 

39 i_time: int | None 

40 i_atime: int | None 

41 i_mtime: int | None 

42 i_ctime: int | None 

43 i_zone: list[int] 

44 

45 

46@dataclass 

47class MinixDirEntry: 

48 inode: int 

49 name: bytes 

50 

51 

52# see linux/minix_fs.h 

53SUPERBLOCK_OFFSET = 0x400 

54ROOT_INODE_INDEX = 1 

55STATIC_BLOCK_SIZE = 1_024 

56HEADER_STRUCT = "minix_super_block" 

57INODE_V1_CDEF = """ 

58 typedef struct minix_inode { 

59 uint16 i_mode; 

60 uint16 i_uid; 

61 uint32 i_size; 

62 uint32 i_time; 

63 uint8 i_gid; 

64 uint8 i_nlinks; 

65 uint16 i_zone[9]; 

66 } minix_inode; 

67""" 

68 

69INODE_V2_CDEF = """ 

70 typedef struct minix_inode { 

71 uint16 i_mode; 

72 uint16 i_nlinks; 

73 uint16 i_uid; 

74 uint16 i_gid; 

75 uint32 i_size; 

76 uint32 i_atime; 

77 uint32 i_mtime; 

78 uint32 i_ctime; 

79 uint32 i_zone[10]; 

80 } minix_inode; 

81""" 

82 

83SUPERBLOCK_V1_CDEF = """ 

84 typedef struct minix_super_block { 

85 uint16 s_ninodes; /* number of inodes */ 

86 uint16 s_nzones; /* number of zones (v1 only) */ 

87 uint16 s_imap_blocks; /* inode map size (in blocks) */ 

88 uint16 s_zmap_blocks; /* zone map size (in blocks) */ 

89 uint16 s_firstdatazone; /* first zone containing file data */ 

90 uint16 s_log_zone_size; /* log_2(zone_size / blocks_size); 0 => zone_size == block_size */ 

91 uint32 s_max_size; /* max file size */ 

92 uint16 s_magic; /* minix magic */ 

93 uint16 s_state; /* mount state */ 

94 uint32 s_zones; /* number of zones (v2 only) */ 

95 } minix_super_block; 

96""" 

97 

98SUPERBLOCK_V3_CDEF = """ 

99 typedef struct minix_super_block { 

100 uint32 s_ninodes; 

101 uint16 s_pad0; 

102 uint16 s_imap_blocks; 

103 uint16 s_zmap_blocks; 

104 uint16 s_firstdatazone; 

105 uint16 s_log_zone_size; 

106 uint16 s_pad1; 

107 uint32 s_max_size; 

108 uint32 s_zones; 

109 uint16 s_magic; 

110 uint16 s_pad2; 

111 uint16 s_blocksize; 

112 uint8 s_disk_version; 

113 } minix_super_block; 

114""" 

115 

116DIR_V1_CDEF = """ 

117 typedef struct minix_dir_entry { 

118 uint16 inode; 

119 char name[]; 

120 } minix_dir_entry; 

121""" 

122 

123DIR_V3_CDEF = """ 

124 typedef struct minix_dir_entry { 

125 uint32 inode; 

126 char name[]; 

127 } minix3_dir_entry; 

128""" 

129 

130VERSION_TO_BIG_ENDIAN_MAGIC = { 

131 1: {0x13_7F, 0x13_8F}, 

132 2: {0x24_68, 0x24_78}, 

133 3: {0x4D_5A}, 

134} 

135VERSION_TO_MAGIC_OFFSET = { 

136 1: 0x10, 

137 2: 0x10, 

138 3: 0x18, 

139} 

140 

141VERSION_TO_C_DEFINITIONS = { 

142 1: INODE_V1_CDEF + SUPERBLOCK_V1_CDEF + DIR_V1_CDEF, 

143 2: INODE_V2_CDEF + SUPERBLOCK_V1_CDEF + DIR_V1_CDEF, 

144 3: INODE_V2_CDEF + SUPERBLOCK_V3_CDEF + DIR_V3_CDEF, 

145} 

146 

147 

148class MinixFS: 

149 def __init__(self, file: File, version: int, c_definitions: str): 

150 self.file = file 

151 self.version = version 

152 self.struct_parser = StructParser(c_definitions) 

153 self.file.seek(SUPERBLOCK_OFFSET, io.SEEK_SET) 

154 self.endianness = get_endianness(file, version) 

155 self.superblock = self.struct_parser.parse(HEADER_STRUCT, file, self.endianness) 

156 block_size = get_block_size(self.superblock) 

157 imap_offset = 2 * block_size 

158 zmap_offset = imap_offset + self.superblock.s_imap_blocks * block_size 

159 self.inode_offset = zmap_offset + self.superblock.s_zmap_blocks * block_size 

160 self.zone_size = (block_size << self.superblock.s_log_zone_size) & 0xFF_FF_FF_FF 

161 self.inode_size = self.struct_parser.cparser_le.minix_inode.size 

162 self.zone_ptr_size = self.struct_parser.cparser_le.minix_inode.fields[ 

163 "i_zone" 

164 ].type.type.size 

165 dirent_inode_size = self.struct_parser.cparser_le.minix_dir_entry.fields[ 

166 "inode" 

167 ].type.size 

168 self.dir_entry_size = self._get_name_len() + dirent_inode_size 

169 

170 def _get_name_len(self) -> int: 

171 lower_magic: int = self.superblock.s_magic & 0x00FF 

172 if lower_magic in {0x7F, 0x68}: 

173 return 14 # v1/v2 

174 if lower_magic in {0x8F, 0x78}: 

175 return 30 # v1/v2 

176 if lower_magic == 0x5A: 

177 return 60 # v3 

178 raise InvalidInputFormat(f"Invalid magic: {self.superblock.s_magic:x}") 

179 

180 def _read_zone_data(self, zone_index: int) -> bytes: 

181 self.file.seek(zone_index * self.zone_size, io.SEEK_SET) 

182 return self.file.read(self.zone_size) 

183 

184 def _get_zone_pointers(self, zone_index: int) -> list[int]: 

185 data = self._read_zone_data(zone_index) 

186 ptr_fmt = "H" if self.zone_ptr_size == 2 else "I" 

187 count = self.zone_size // self.zone_ptr_size 

188 endianness_fmt = "<" if self.endianness == Endian.LITTLE else ">" 

189 return list(struct.unpack(f"{endianness_fmt}{count}{ptr_fmt}", data)) 

190 

191 def _stream_file_data(self, inode: MinixInode) -> Iterator[bytes]: 

192 remaining = inode.i_size 

193 for zone_data in self._iter_zones(inode.i_zone): 

194 chunk = zone_data[:remaining] 

195 remaining -= len(chunk) 

196 yield chunk 

197 if remaining <= 0: 

198 break 

199 

200 def _iter_zones(self, zones: list[int]) -> Iterator[bytes]: 

201 # Data zones are a bit complicated. See e.g. https://osblog.stephenmarz.com/ch10.html 

202 # for a more detailed explanation. 

203 yield from self._read_zones(zones[:7]) # direct zones 0-6: 

204 if zones[7] != 0: # indirect zone 7: 

205 yield from self._read_zones([zones[7]], 1) 

206 if zones[8] != 0: # doubly indirect zone 8: 

207 yield from self._read_zones([zones[8]], 2) 

208 if len(zones) == 10 and zones[9] != 0: # triply indirect zone 9 (V2/V3 only) 

209 yield from self._read_zones([zones[9]], 3) 

210 

211 def _read_zones( 

212 self, zone_index_list: list[int], indirectness: int = 0 

213 ) -> Iterator[bytes]: 

214 for index in zone_index_list: 

215 if index == 0: 

216 break 

217 if indirectness > 0: 

218 zone_pointers = self._get_zone_pointers(index) 

219 yield from self._read_zones(zone_pointers, indirectness - 1) 

220 else: 

221 yield self._read_zone_data(index) 

222 

223 def _read_directory(self, inode: MinixInode) -> Iterator[MinixDirEntry]: 

224 for zone_data in self._stream_file_data(inode): 

225 for i in range(0, len(zone_data), self.dir_entry_size): 

226 raw_entry = self.struct_parser.parse( 

227 "minix_dir_entry", 

228 zone_data[i : i + self.dir_entry_size], 

229 self.endianness, 

230 ) 

231 yield MinixDirEntry(inode=raw_entry.inode, name=raw_entry.name) 

232 

233 def _read_inode(self, index: int) -> MinixInode: 

234 if not 1 <= index <= self.superblock.s_ninodes: 

235 raise InvalidInputFormat(f"Invalid inode number: {index}") 

236 offset = self.inode_offset + (index - 1) * self.inode_size 

237 self.file.seek(offset, io.SEEK_SET) 

238 raw_inode = self.struct_parser.parse("minix_inode", self.file, self.endianness) 

239 if self.version == 1: 

240 return MinixInode( 

241 i_mode=raw_inode.i_mode, 

242 i_nlinks=raw_inode.i_nlinks, 

243 i_uid=raw_inode.i_uid, 

244 i_gid=raw_inode.i_gid, 

245 i_size=raw_inode.i_size, 

246 i_time=raw_inode.i_time, 

247 i_atime=None, 

248 i_mtime=None, 

249 i_ctime=None, 

250 i_zone=list(raw_inode.i_zone), 

251 ) 

252 return MinixInode( 

253 i_mode=raw_inode.i_mode, 

254 i_nlinks=raw_inode.i_nlinks, 

255 i_uid=raw_inode.i_uid, 

256 i_gid=raw_inode.i_gid, 

257 i_size=raw_inode.i_size, 

258 i_time=None, 

259 i_atime=raw_inode.i_atime, 

260 i_mtime=raw_inode.i_mtime, 

261 i_ctime=raw_inode.i_ctime, 

262 i_zone=list(raw_inode.i_zone), 

263 ) 

264 

265 def extract( 

266 self, 

267 fs: FileSystem, 

268 inode=None, 

269 path: Path = Path(), 

270 inode_index: int = ROOT_INODE_INDEX, 

271 visited_dirs: set[int] | None = None, 

272 ): 

273 if visited_dirs is None: 

274 visited_dirs = set() 

275 

276 if not inode: 

277 try: 

278 inode = self._read_inode(inode_index) 

279 if inode is None: 

280 raise InvalidInputFormat("Root inode is empty") 

281 if not stat.S_ISDIR(inode.i_mode): 

282 raise InvalidInputFormat("Root entries should be directories") 

283 except EOFError as error: 

284 raise InvalidInputFormat("File system is empty") from error 

285 

286 if inode_index in visited_dirs: 

287 return 

288 visited_dirs.add(inode_index) 

289 self.walk_inode(fs, inode, path, visited_dirs) 

290 

291 def walk_inode( 

292 self, fs: FileSystem, inode: MinixInode, path: Path, visited_dirs: set[int] 

293 ): 

294 for entry in self._read_directory(inode): 

295 if entry.name in (b".", b"..") or entry.inode < 1: 

296 continue 

297 entry_path = path / entry.name.decode("utf-8", errors="replace") 

298 entry_inode = self._read_inode(entry.inode) 

299 

300 if stat.S_ISREG(entry_inode.i_mode): 

301 fs.write_chunks(entry_path, self._stream_file_data(entry_inode)) 

302 

303 elif stat.S_ISLNK(entry_inode.i_mode): 

304 contents = b"".join(self._stream_file_data(entry_inode)) 

305 link_target = contents.decode("utf-8", errors="replace") 

306 fs.create_symlink(Path(link_target), entry_path) 

307 

308 elif stat.S_ISFIFO(entry_inode.i_mode): 

309 fs.mkfifo(entry_path, mode=entry_inode.i_mode) 

310 

311 elif stat.S_ISCHR(entry_inode.i_mode) or stat.S_ISBLK(entry_inode.i_mode): 

312 fs.mknod(entry_path, mode=entry_inode.i_mode) 

313 

314 elif stat.S_ISDIR(entry_inode.i_mode): 

315 fs.mkdir(entry_path, parents=True, exist_ok=True) 

316 self.extract(fs, entry_inode, entry_path, entry.inode, visited_dirs) 

317 

318 

319class MinixFSExtractor(Extractor): 

320 def __init__(self, version: int): 

321 self.version = version 

322 self.c_definitions = self._get_c_definitions() 

323 

324 def _get_c_definitions(self) -> str: 

325 cdefs = VERSION_TO_C_DEFINITIONS.get(self.version) 

326 if not cdefs: 

327 raise ValueError(f"Unsupported MINIX FS version: {self.version}") 

328 return cdefs 

329 

330 def extract(self, inpath: Path, outdir: Path): 

331 fs = FileSystem(outdir) 

332 with File.from_path(inpath) as file: 

333 minix = MinixFS(file, self.version, self.c_definitions) 

334 minix.extract(fs) 

335 

336 

337def get_endianness(file: File, version: int) -> Endian: 

338 offset = VERSION_TO_MAGIC_OFFSET[version] 

339 start = file.tell() 

340 file.seek(start + offset, io.SEEK_SET) 

341 magic_bytes = file.read(2) 

342 file.seek(start, io.SEEK_SET) 

343 if len(magic_bytes) < 2: 

344 raise InvalidInputFormat("Not enough bytes to read MINIX magic.") 

345 magic_be = int.from_bytes(magic_bytes, byteorder="big", signed=False) 

346 magic_le = int.from_bytes(magic_bytes, byteorder="little", signed=False) 

347 magics = VERSION_TO_BIG_ENDIAN_MAGIC[version] 

348 if magic_be in magics: 

349 return Endian.BIG 

350 if magic_le in magics: 

351 return Endian.LITTLE 

352 raise InvalidInputFormat(f"Invalid MINIX magic: 0x{magic_be:04x}") 

353 

354 

355def get_block_size(superblock) -> int: 

356 return getattr(superblock, "s_blocksize", STATIC_BLOCK_SIZE) 

357 

358 

359class _MinixFSHandlerBase(StructHandler): 

360 VERSION = 1 

361 HEADER_STRUCT = "minix_super_block" 

362 PATTERN_MATCH_OFFSET = -SUPERBLOCK_OFFSET 

363 

364 def __init_subclass__(cls, **kwargs): 

365 super().__init_subclass__(**kwargs) 

366 version = getattr(cls, "VERSION", None) 

367 if version: 

368 cls.EXTRACTOR = MinixFSExtractor(version=version) 

369 cls.C_DEFINITIONS = VERSION_TO_C_DEFINITIONS[version] 

370 

371 def _get_zone_count(self, header) -> int: 

372 return header.s_nzones 

373 

374 def validate_header( # noqa: C901 

375 self, header, file: File, block_size: int, start_offset: int = 0 

376 ) -> None: 

377 if header.s_ninodes < 1: 

378 raise InvalidInputFormat("Invalid inode count") 

379 if header.s_imap_blocks < 1: 

380 raise InvalidInputFormat("Invalid inode map block count") 

381 if header.s_zmap_blocks < 1: 

382 raise InvalidInputFormat("Invalid zone map block count") 

383 if header.s_max_size == 0: 

384 raise InvalidInputFormat("Invalid max file size") 

385 # according to https://www.minix3.org/doc/A-312.html valid blocksizes for v3 are 1, 2, 4 and 8 KiB 

386 if self.VERSION == 3 and header.s_blocksize not in { 

387 2**x for x in range(10, 14) 

388 }: 

389 raise InvalidInputFormat("Invalid block size") 

390 if header.s_log_zone_size > 10: 

391 # The default log_zone_size is 0 (meaning zone_size == block_size). Though there does not seem to 

392 # be a hard cap on this value, values larger than 10 (2**10 = 1024 blocks per zone) are not realistic 

393 raise InvalidInputFormat("Invalid log zone size") 

394 zone_count = header.s_zones or header.s_nzones 

395 if zone_count < 1: 

396 raise InvalidInputFormat("Invalid zone count") 

397 

398 blocks_per_zone = 2**header.s_log_zone_size 

399 total_size = zone_count * blocks_per_zone * block_size 

400 if total_size > file.size() - start_offset: 

401 raise InvalidInputFormat("larger than the file size") 

402 inode_size = 32 if self.VERSION == 1 else 64 

403 inodes_per_block = block_size // inode_size 

404 first_data_block = ( 

405 2 # boot block + superblock 

406 + header.s_imap_blocks 

407 + header.s_zmap_blocks 

408 + math.ceil(header.s_ninodes / inodes_per_block) 

409 ) 

410 first_data_zone = math.ceil(first_data_block / blocks_per_zone) 

411 if header.s_firstdatazone != first_data_zone: 

412 raise InvalidInputFormat("Invalid first data zone") 

413 

414 if self._get_zone_count(header) == 0: 

415 raise InvalidInputFormat("Invalid zone count") 

416 

417 def is_valid_header(self, header, file: File, block_size: int) -> bool: 

418 try: 

419 self.validate_header(header, file, block_size) 

420 except InvalidInputFormat: 

421 return False 

422 return True 

423 

424 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

425 file.seek(start_offset + SUPERBLOCK_OFFSET, io.SEEK_SET) 

426 endianness = get_endianness(file, self.VERSION) 

427 superblock = self.parse_header(file, endianness) 

428 

429 # TODO: should probably be moved to MinixFS and rename MinixFS to MinixFSParser, 

430 # with a get_end_offset() function. 

431 

432 block_size = get_block_size(superblock) 

433 self.validate_header(superblock, file, block_size, start_offset) 

434 

435 zone_size = 2**superblock.s_log_zone_size 

436 zone_count = self._get_zone_count(superblock) 

437 return ValidChunk( 

438 start_offset=start_offset, 

439 end_offset=start_offset + zone_count * zone_size * block_size, 

440 ) 

441 

442 

443class MinixFSv1Handler(_MinixFSHandlerBase): 

444 NAME = "minix_fs_v1" 

445 PATTERNS = [ 

446 # the magic comes at offset 0x10 in the header 

447 # the field that comes after this (s_state) indicates the FS state (1 -> valid; 2 -> error). 

448 # A value of 0 has been seen in the wild, but is not documented. 

449 # There are two variants with the only difference being the maximum name length (0x7f -> 14; 0x8f -> 30) 

450 HexString("[16] (7f | 8f) 13 (00 | 01 | 02) 00 [2] 00 00 00 00"), # LE 

451 HexString("[16] 13 (7f | 8f) 00 (00 | 01 | 02) [2] 00 00 00 00"), # BE 

452 ] 

453 VERSION = 1 

454 

455 DOC = HandlerDoc( 

456 name="MINIX FS (v1)", 

457 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.", 

458 handler_type=HandlerType.FILESYSTEM, 

459 vendor=None, 

460 references=[ 

461 Reference( 

462 title="Official website", 

463 url="https://www.minix3.org/", 

464 ), 

465 Reference( 

466 title="Linux headers (minix_fs.h)", 

467 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h", 

468 ), 

469 Reference( 

470 title="Official tool for creating MINIX filesystems", 

471 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs", 

472 ), 

473 ], 

474 limitations=[], 

475 ) 

476 

477 

478class MinixFSv2Handler(MinixFSv1Handler): 

479 NAME = "minix_fs_v2" 

480 PATTERNS = [ 

481 # v2 also has two variants regarding the maximum name length (0x68 -> 14; 0x78 -> 30) 

482 HexString("[16] (68 | 78) 24 (00 | 01 | 02) 00 [4] 00 00 00 00"), # LE 

483 HexString("[16] 24 (68 | 78) 00 (00 | 01 | 02) [4] 00 00 00 00"), # BE 

484 ] 

485 VERSION = 2 

486 

487 def _get_zone_count(self, header) -> int: 

488 return header.s_zones 

489 

490 DOC = HandlerDoc( 

491 name="MINIX FS (v2)", 

492 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.", 

493 handler_type=HandlerType.FILESYSTEM, 

494 vendor=None, 

495 references=[ 

496 Reference( 

497 title="Official website", 

498 url="https://www.minix3.org/", 

499 ), 

500 Reference( 

501 title="Linux headers (minix_fs.h)", 

502 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h", 

503 ), 

504 Reference( 

505 title="Official tool for creating MINIX filesystems", 

506 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs", 

507 ), 

508 ], 

509 limitations=[], 

510 ) 

511 

512 

513class MinixFSv3Handler(MinixFSv2Handler): 

514 NAME = "minix_fs_v3" 

515 PATTERNS = [ 

516 HexString("[4] 00 00 [18] 5a 4d 00 00"), # LE 

517 HexString("[4] 00 00 [18] 4d 5a 00 00"), # BE 

518 ] 

519 VERSION = 3 

520 

521 DOC = HandlerDoc( 

522 name="MINIX FS (v3)", 

523 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.", 

524 handler_type=HandlerType.FILESYSTEM, 

525 vendor=None, 

526 references=[ 

527 Reference( 

528 title="Official website", 

529 url="https://www.minix3.org/", 

530 ), 

531 Reference( 

532 title="Linux headers (minix_fs.h)", 

533 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h", 

534 ), 

535 Reference( 

536 title="Official tool for creating MINIX filesystems", 

537 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs", 

538 ), 

539 ], 

540 limitations=[], 

541 )