1from __future__ import annotations
2
3import io
4import math
5import stat
6import struct
7from dataclasses import dataclass
8from pathlib import Path
9from typing import TYPE_CHECKING
10
11from unblob.file_utils import (
12 Endian,
13 FileSystem,
14 InvalidInputFormat,
15 StructParser,
16)
17from unblob.models import (
18 Extractor,
19 File,
20 HandlerDoc,
21 HandlerType,
22 HexString,
23 Reference,
24 StructHandler,
25 ValidChunk,
26)
27
28if TYPE_CHECKING:
29 from collections.abc import Iterator
30
31
32@dataclass
33class MinixInode:
34 i_mode: int
35 i_nlinks: int
36 i_uid: int
37 i_gid: int
38 i_size: int
39 i_time: int | None
40 i_atime: int | None
41 i_mtime: int | None
42 i_ctime: int | None
43 i_zone: list[int]
44
45
46@dataclass
47class MinixDirEntry:
48 inode: int
49 name: bytes
50
51
52# see linux/minix_fs.h
53SUPERBLOCK_OFFSET = 0x400
54ROOT_INODE_INDEX = 1
55STATIC_BLOCK_SIZE = 1_024
56HEADER_STRUCT = "minix_super_block"
57INODE_V1_CDEF = """
58 typedef struct minix_inode {
59 uint16 i_mode;
60 uint16 i_uid;
61 uint32 i_size;
62 uint32 i_time;
63 uint8 i_gid;
64 uint8 i_nlinks;
65 uint16 i_zone[9];
66 } minix_inode;
67"""
68
69INODE_V2_CDEF = """
70 typedef struct minix_inode {
71 uint16 i_mode;
72 uint16 i_nlinks;
73 uint16 i_uid;
74 uint16 i_gid;
75 uint32 i_size;
76 uint32 i_atime;
77 uint32 i_mtime;
78 uint32 i_ctime;
79 uint32 i_zone[10];
80 } minix_inode;
81"""
82
83SUPERBLOCK_V1_CDEF = """
84 typedef struct minix_super_block {
85 uint16 s_ninodes; /* number of inodes */
86 uint16 s_nzones; /* number of zones (v1 only) */
87 uint16 s_imap_blocks; /* inode map size (in blocks) */
88 uint16 s_zmap_blocks; /* zone map size (in blocks) */
89 uint16 s_firstdatazone; /* first zone containing file data */
90 uint16 s_log_zone_size; /* log_2(zone_size / blocks_size); 0 => zone_size == block_size */
91 uint32 s_max_size; /* max file size */
92 uint16 s_magic; /* minix magic */
93 uint16 s_state; /* mount state */
94 uint32 s_zones; /* number of zones (v2 only) */
95 } minix_super_block;
96"""
97
98SUPERBLOCK_V3_CDEF = """
99 typedef struct minix_super_block {
100 uint32 s_ninodes;
101 uint16 s_pad0;
102 uint16 s_imap_blocks;
103 uint16 s_zmap_blocks;
104 uint16 s_firstdatazone;
105 uint16 s_log_zone_size;
106 uint16 s_pad1;
107 uint32 s_max_size;
108 uint32 s_zones;
109 uint16 s_magic;
110 uint16 s_pad2;
111 uint16 s_blocksize;
112 uint8 s_disk_version;
113 } minix_super_block;
114"""
115
116DIR_V1_CDEF = """
117 typedef struct minix_dir_entry {
118 uint16 inode;
119 char name[];
120 } minix_dir_entry;
121"""
122
123DIR_V3_CDEF = """
124 typedef struct minix_dir_entry {
125 uint32 inode;
126 char name[];
127 } minix3_dir_entry;
128"""
129
130VERSION_TO_BIG_ENDIAN_MAGIC = {
131 1: {0x13_7F, 0x13_8F},
132 2: {0x24_68, 0x24_78},
133 3: {0x4D_5A},
134}
135VERSION_TO_MAGIC_OFFSET = {
136 1: 0x10,
137 2: 0x10,
138 3: 0x18,
139}
140
141VERSION_TO_C_DEFINITIONS = {
142 1: INODE_V1_CDEF + SUPERBLOCK_V1_CDEF + DIR_V1_CDEF,
143 2: INODE_V2_CDEF + SUPERBLOCK_V1_CDEF + DIR_V1_CDEF,
144 3: INODE_V2_CDEF + SUPERBLOCK_V3_CDEF + DIR_V3_CDEF,
145}
146
147
148class MinixFS:
149 def __init__(self, file: File, version: int, c_definitions: str):
150 self.file = file
151 self.version = version
152 self.struct_parser = StructParser(c_definitions)
153 self.file.seek(SUPERBLOCK_OFFSET, io.SEEK_SET)
154 self.endianness = get_endianness(file, version)
155 self.superblock = self.struct_parser.parse(HEADER_STRUCT, file, self.endianness)
156 block_size = get_block_size(self.superblock)
157 imap_offset = 2 * block_size
158 zmap_offset = imap_offset + self.superblock.s_imap_blocks * block_size
159 self.inode_offset = zmap_offset + self.superblock.s_zmap_blocks * block_size
160 self.zone_size = (block_size << self.superblock.s_log_zone_size) & 0xFF_FF_FF_FF
161 self.inode_size = self.struct_parser.cparser_le.minix_inode.size
162 self.zone_ptr_size = self.struct_parser.cparser_le.minix_inode.fields[
163 "i_zone"
164 ].type.type.size
165 dirent_inode_size = self.struct_parser.cparser_le.minix_dir_entry.fields[
166 "inode"
167 ].type.size
168 self.dir_entry_size = self._get_name_len() + dirent_inode_size
169
170 def _get_name_len(self) -> int:
171 lower_magic: int = self.superblock.s_magic & 0x00FF
172 if lower_magic in {0x7F, 0x68}:
173 return 14 # v1/v2
174 if lower_magic in {0x8F, 0x78}:
175 return 30 # v1/v2
176 if lower_magic == 0x5A:
177 return 60 # v3
178 raise InvalidInputFormat(f"Invalid magic: {self.superblock.s_magic:x}")
179
180 def _read_zone_data(self, zone_index: int) -> bytes:
181 self.file.seek(zone_index * self.zone_size, io.SEEK_SET)
182 return self.file.read(self.zone_size)
183
184 def _get_zone_pointers(self, zone_index: int) -> list[int]:
185 data = self._read_zone_data(zone_index)
186 ptr_fmt = "H" if self.zone_ptr_size == 2 else "I"
187 count = self.zone_size // self.zone_ptr_size
188 endianness_fmt = "<" if self.endianness == Endian.LITTLE else ">"
189 return list(struct.unpack(f"{endianness_fmt}{count}{ptr_fmt}", data))
190
191 def _stream_file_data(self, inode: MinixInode) -> Iterator[bytes]:
192 remaining = inode.i_size
193 for zone_data in self._iter_zones(inode.i_zone):
194 chunk = zone_data[:remaining]
195 remaining -= len(chunk)
196 yield chunk
197 if remaining <= 0:
198 break
199
200 def _iter_zones(self, zones: list[int]) -> Iterator[bytes]:
201 # Data zones are a bit complicated. See e.g. https://osblog.stephenmarz.com/ch10.html
202 # for a more detailed explanation.
203 yield from self._read_zones(zones[:7]) # direct zones 0-6:
204 if zones[7] != 0: # indirect zone 7:
205 yield from self._read_zones([zones[7]], 1)
206 if zones[8] != 0: # doubly indirect zone 8:
207 yield from self._read_zones([zones[8]], 2)
208 if len(zones) == 10 and zones[9] != 0: # triply indirect zone 9 (V2/V3 only)
209 yield from self._read_zones([zones[9]], 3)
210
211 def _read_zones(
212 self, zone_index_list: list[int], indirectness: int = 0
213 ) -> Iterator[bytes]:
214 for index in zone_index_list:
215 if index == 0:
216 break
217 if indirectness > 0:
218 zone_pointers = self._get_zone_pointers(index)
219 yield from self._read_zones(zone_pointers, indirectness - 1)
220 else:
221 yield self._read_zone_data(index)
222
223 def _read_directory(self, inode: MinixInode) -> Iterator[MinixDirEntry]:
224 for zone_data in self._stream_file_data(inode):
225 for i in range(0, len(zone_data), self.dir_entry_size):
226 raw_entry = self.struct_parser.parse(
227 "minix_dir_entry",
228 zone_data[i : i + self.dir_entry_size],
229 self.endianness,
230 )
231 yield MinixDirEntry(inode=raw_entry.inode, name=raw_entry.name)
232
233 def _read_inode(self, index: int) -> MinixInode:
234 if not 1 <= index <= self.superblock.s_ninodes:
235 raise InvalidInputFormat(f"Invalid inode number: {index}")
236 offset = self.inode_offset + (index - 1) * self.inode_size
237 self.file.seek(offset, io.SEEK_SET)
238 raw_inode = self.struct_parser.parse("minix_inode", self.file, self.endianness)
239 if self.version == 1:
240 return MinixInode(
241 i_mode=raw_inode.i_mode,
242 i_nlinks=raw_inode.i_nlinks,
243 i_uid=raw_inode.i_uid,
244 i_gid=raw_inode.i_gid,
245 i_size=raw_inode.i_size,
246 i_time=raw_inode.i_time,
247 i_atime=None,
248 i_mtime=None,
249 i_ctime=None,
250 i_zone=list(raw_inode.i_zone),
251 )
252 return MinixInode(
253 i_mode=raw_inode.i_mode,
254 i_nlinks=raw_inode.i_nlinks,
255 i_uid=raw_inode.i_uid,
256 i_gid=raw_inode.i_gid,
257 i_size=raw_inode.i_size,
258 i_time=None,
259 i_atime=raw_inode.i_atime,
260 i_mtime=raw_inode.i_mtime,
261 i_ctime=raw_inode.i_ctime,
262 i_zone=list(raw_inode.i_zone),
263 )
264
265 def extract(
266 self,
267 fs: FileSystem,
268 inode=None,
269 path: Path = Path(),
270 inode_index: int = ROOT_INODE_INDEX,
271 visited_dirs: set[int] | None = None,
272 ):
273 if visited_dirs is None:
274 visited_dirs = set()
275
276 if not inode:
277 try:
278 inode = self._read_inode(inode_index)
279 if inode is None:
280 raise InvalidInputFormat("Root inode is empty")
281 if not stat.S_ISDIR(inode.i_mode):
282 raise InvalidInputFormat("Root entries should be directories")
283 except EOFError as error:
284 raise InvalidInputFormat("File system is empty") from error
285
286 if inode_index in visited_dirs:
287 return
288 visited_dirs.add(inode_index)
289 self.walk_inode(fs, inode, path, visited_dirs)
290
291 def walk_inode(
292 self, fs: FileSystem, inode: MinixInode, path: Path, visited_dirs: set[int]
293 ):
294 for entry in self._read_directory(inode):
295 if entry.name in (b".", b"..") or entry.inode < 1:
296 continue
297 entry_path = path / entry.name.decode("utf-8", errors="replace")
298 entry_inode = self._read_inode(entry.inode)
299
300 if stat.S_ISREG(entry_inode.i_mode):
301 fs.write_chunks(entry_path, self._stream_file_data(entry_inode))
302
303 elif stat.S_ISLNK(entry_inode.i_mode):
304 contents = b"".join(self._stream_file_data(entry_inode))
305 link_target = contents.decode("utf-8", errors="replace")
306 fs.create_symlink(Path(link_target), entry_path)
307
308 elif stat.S_ISFIFO(entry_inode.i_mode):
309 fs.mkfifo(entry_path, mode=entry_inode.i_mode)
310
311 elif stat.S_ISCHR(entry_inode.i_mode) or stat.S_ISBLK(entry_inode.i_mode):
312 fs.mknod(entry_path, mode=entry_inode.i_mode)
313
314 elif stat.S_ISDIR(entry_inode.i_mode):
315 fs.mkdir(entry_path, parents=True, exist_ok=True)
316 self.extract(fs, entry_inode, entry_path, entry.inode, visited_dirs)
317
318
319class MinixFSExtractor(Extractor):
320 def __init__(self, version: int):
321 self.version = version
322 self.c_definitions = self._get_c_definitions()
323
324 def _get_c_definitions(self) -> str:
325 cdefs = VERSION_TO_C_DEFINITIONS.get(self.version)
326 if not cdefs:
327 raise ValueError(f"Unsupported MINIX FS version: {self.version}")
328 return cdefs
329
330 def extract(self, inpath: Path, outdir: Path):
331 fs = FileSystem(outdir)
332 with File.from_path(inpath) as file:
333 minix = MinixFS(file, self.version, self.c_definitions)
334 minix.extract(fs)
335
336
337def get_endianness(file: File, version: int) -> Endian:
338 offset = VERSION_TO_MAGIC_OFFSET[version]
339 start = file.tell()
340 file.seek(start + offset, io.SEEK_SET)
341 magic_bytes = file.read(2)
342 file.seek(start, io.SEEK_SET)
343 if len(magic_bytes) < 2:
344 raise InvalidInputFormat("Not enough bytes to read MINIX magic.")
345 magic_be = int.from_bytes(magic_bytes, byteorder="big", signed=False)
346 magic_le = int.from_bytes(magic_bytes, byteorder="little", signed=False)
347 magics = VERSION_TO_BIG_ENDIAN_MAGIC[version]
348 if magic_be in magics:
349 return Endian.BIG
350 if magic_le in magics:
351 return Endian.LITTLE
352 raise InvalidInputFormat(f"Invalid MINIX magic: 0x{magic_be:04x}")
353
354
355def get_block_size(superblock) -> int:
356 return getattr(superblock, "s_blocksize", STATIC_BLOCK_SIZE)
357
358
359class _MinixFSHandlerBase(StructHandler):
360 VERSION = 1
361 HEADER_STRUCT = "minix_super_block"
362 PATTERN_MATCH_OFFSET = -SUPERBLOCK_OFFSET
363
364 def __init_subclass__(cls, **kwargs):
365 super().__init_subclass__(**kwargs)
366 version = getattr(cls, "VERSION", None)
367 if version:
368 cls.EXTRACTOR = MinixFSExtractor(version=version)
369 cls.C_DEFINITIONS = VERSION_TO_C_DEFINITIONS[version]
370
371 def _get_zone_count(self, header) -> int:
372 return header.s_nzones
373
374 def validate_header( # noqa: C901
375 self, header, file: File, block_size: int, start_offset: int = 0
376 ) -> None:
377 if header.s_ninodes < 1:
378 raise InvalidInputFormat("Invalid inode count")
379 if header.s_imap_blocks < 1:
380 raise InvalidInputFormat("Invalid inode map block count")
381 if header.s_zmap_blocks < 1:
382 raise InvalidInputFormat("Invalid zone map block count")
383 if header.s_max_size == 0:
384 raise InvalidInputFormat("Invalid max file size")
385 # according to https://www.minix3.org/doc/A-312.html valid blocksizes for v3 are 1, 2, 4 and 8 KiB
386 if self.VERSION == 3 and header.s_blocksize not in {
387 2**x for x in range(10, 14)
388 }:
389 raise InvalidInputFormat("Invalid block size")
390 if header.s_log_zone_size > 10:
391 # The default log_zone_size is 0 (meaning zone_size == block_size). Though there does not seem to
392 # be a hard cap on this value, values larger than 10 (2**10 = 1024 blocks per zone) are not realistic
393 raise InvalidInputFormat("Invalid log zone size")
394 zone_count = header.s_zones or header.s_nzones
395 if zone_count < 1:
396 raise InvalidInputFormat("Invalid zone count")
397
398 blocks_per_zone = 2**header.s_log_zone_size
399 total_size = zone_count * blocks_per_zone * block_size
400 if total_size > file.size() - start_offset:
401 raise InvalidInputFormat("larger than the file size")
402 inode_size = 32 if self.VERSION == 1 else 64
403 inodes_per_block = block_size // inode_size
404 first_data_block = (
405 2 # boot block + superblock
406 + header.s_imap_blocks
407 + header.s_zmap_blocks
408 + math.ceil(header.s_ninodes / inodes_per_block)
409 )
410 first_data_zone = math.ceil(first_data_block / blocks_per_zone)
411 if header.s_firstdatazone != first_data_zone:
412 raise InvalidInputFormat("Invalid first data zone")
413
414 if self._get_zone_count(header) == 0:
415 raise InvalidInputFormat("Invalid zone count")
416
417 def is_valid_header(self, header, file: File, block_size: int) -> bool:
418 try:
419 self.validate_header(header, file, block_size)
420 except InvalidInputFormat:
421 return False
422 return True
423
424 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
425 file.seek(start_offset + SUPERBLOCK_OFFSET, io.SEEK_SET)
426 endianness = get_endianness(file, self.VERSION)
427 superblock = self.parse_header(file, endianness)
428
429 # TODO: should probably be moved to MinixFS and rename MinixFS to MinixFSParser,
430 # with a get_end_offset() function.
431
432 block_size = get_block_size(superblock)
433 self.validate_header(superblock, file, block_size, start_offset)
434
435 zone_size = 2**superblock.s_log_zone_size
436 zone_count = self._get_zone_count(superblock)
437 return ValidChunk(
438 start_offset=start_offset,
439 end_offset=start_offset + zone_count * zone_size * block_size,
440 )
441
442
443class MinixFSv1Handler(_MinixFSHandlerBase):
444 NAME = "minix_fs_v1"
445 PATTERNS = [
446 # the magic comes at offset 0x10 in the header
447 # the field that comes after this (s_state) indicates the FS state (1 -> valid; 2 -> error).
448 # A value of 0 has been seen in the wild, but is not documented.
449 # There are two variants with the only difference being the maximum name length (0x7f -> 14; 0x8f -> 30)
450 HexString("[16] (7f | 8f) 13 (00 | 01 | 02) 00 [2] 00 00 00 00"), # LE
451 HexString("[16] 13 (7f | 8f) 00 (00 | 01 | 02) [2] 00 00 00 00"), # BE
452 ]
453 VERSION = 1
454
455 DOC = HandlerDoc(
456 name="MINIX FS (v1)",
457 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.",
458 handler_type=HandlerType.FILESYSTEM,
459 vendor=None,
460 references=[
461 Reference(
462 title="Official website",
463 url="https://www.minix3.org/",
464 ),
465 Reference(
466 title="Linux headers (minix_fs.h)",
467 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h",
468 ),
469 Reference(
470 title="Official tool for creating MINIX filesystems",
471 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs",
472 ),
473 ],
474 limitations=[],
475 )
476
477
478class MinixFSv2Handler(MinixFSv1Handler):
479 NAME = "minix_fs_v2"
480 PATTERNS = [
481 # v2 also has two variants regarding the maximum name length (0x68 -> 14; 0x78 -> 30)
482 HexString("[16] (68 | 78) 24 (00 | 01 | 02) 00 [4] 00 00 00 00"), # LE
483 HexString("[16] 24 (68 | 78) 00 (00 | 01 | 02) [4] 00 00 00 00"), # BE
484 ]
485 VERSION = 2
486
487 def _get_zone_count(self, header) -> int:
488 return header.s_zones
489
490 DOC = HandlerDoc(
491 name="MINIX FS (v2)",
492 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.",
493 handler_type=HandlerType.FILESYSTEM,
494 vendor=None,
495 references=[
496 Reference(
497 title="Official website",
498 url="https://www.minix3.org/",
499 ),
500 Reference(
501 title="Linux headers (minix_fs.h)",
502 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h",
503 ),
504 Reference(
505 title="Official tool for creating MINIX filesystems",
506 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs",
507 ),
508 ],
509 limitations=[],
510 )
511
512
513class MinixFSv3Handler(MinixFSv2Handler):
514 NAME = "minix_fs_v3"
515 PATTERNS = [
516 HexString("[4] 00 00 [18] 5a 4d 00 00"), # LE
517 HexString("[4] 00 00 [18] 4d 5a 00 00"), # BE
518 ]
519 VERSION = 3
520
521 DOC = HandlerDoc(
522 name="MINIX FS (v3)",
523 description="MINIX FS is a simple file system format designed as the filesystem of MINIX. MINIX is a UNIX-like operating system, originally developed by Andrew S. Tanenbaum for educational purposes.",
524 handler_type=HandlerType.FILESYSTEM,
525 vendor=None,
526 references=[
527 Reference(
528 title="Official website",
529 url="https://www.minix3.org/",
530 ),
531 Reference(
532 title="Linux headers (minix_fs.h)",
533 url="https://github.com/torvalds/linux/blob/master/include/uapi/linux/minix_fs.h",
534 ),
535 Reference(
536 title="Official tool for creating MINIX filesystems",
537 url="https://github.com/Stichting-MINIX-Research-Foundation/minix/tree/master/minix/usr.sbin/mkfs.mfs",
538 ),
539 ],
540 limitations=[],
541 )