Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/yaffs.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

313 statements  

1import io 

2import itertools 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from enum import IntEnum 

6from pathlib import Path 

7 

8import attrs 

9from structlog import get_logger 

10from treelib.exceptions import NodeIDAbsentError 

11from treelib.tree import Tree 

12 

13from unblob.file_utils import ( 

14 Endian, 

15 File, 

16 FileSystem, 

17 InvalidInputFormat, 

18 StructParser, 

19 get_endian_multi, 

20 read_until_past, 

21 snull, 

22) 

23from unblob.models import ( 

24 Extractor, 

25 ExtractResult, 

26 Handler, 

27 HandlerDoc, 

28 HandlerType, 

29 HexString, 

30 Reference, 

31 ValidChunk, 

32) 

33 

34logger = get_logger() 

35 

36SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00" 

37SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00" 

38SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00" 

39SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00" 

40SPARE_START_LEN = 6 

41 

42# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE 

43BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03] 

44 

45VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032] 

46VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512] 

47YAFFS1_PAGE_SIZE = 512 

48YAFFS1_SPARE_SIZE = 16 

49 

50C_DEFINITIONS = """ 

51 struct yaffs1_obj_hdr { 

52 uint32 type; /* enum yaffs_obj_type */ 

53 uint32 parent_obj_id; 

54 uint16 sum_no_longer_used; 

55 char name[258]; 

56 uint32 st_mode; // protection 

57 uint32 st_uid; // user ID of owner 

58 uint32 st_gid; // group ID of owner 

59 uint32 st_atime; // time of last access 

60 uint32 st_mtime; // time of last modification 

61 uint32 st_ctime; // time of last change 

62 uint32 file_size; // File size applies to files only 

63 uint32 equivalent_object_id; // Equivalent object id applies to hard links only. 

64 char alias[160]; // alias only applies to symlinks 

65 } yaffs1_obj_hdr_t; 

66 

67 struct yaffs1_packed_tags { 

68 uint32 chunk_id:20; 

69 uint32 serial:2; 

70 uint32 byte_count:10; 

71 uint32 object_id:18; 

72 uint32 ecc:12; 

73 uint32 unused:2; 

74 } yaffs1_packed_tags_t; 

75 

76 typedef struct yaffs_spare 

77 { 

78 uint8 tag_b0; 

79 uint8 tag_b1; 

80 uint8 tag_b2; 

81 uint8 tag_b3; 

82 uint8 page_status; // set to 0 to delete the chunk 

83 uint8 block_status; 

84 uint8 tag_b4; 

85 uint8 tag_b5; 

86 uint8 ecc_0; 

87 uint8 ecc_1; 

88 uint8 ecc_2; 

89 uint8 tag_b6; 

90 uint8 tag_b7; 

91 uint8 ecc_3; 

92 uint8 ecc_4; 

93 uint8 ecc_5; 

94 } yaffs_spare_t; 

95 

96 struct yaffs_file_var { 

97 uint32 file_size; 

98 uint32 stored_size; 

99 uint32 shrink_size; 

100 int top_level; 

101 }; 

102 

103 typedef struct yaffs2_obj_hdr { 

104 uint32 type; /* enum yaffs_obj_type */ 

105 /* Apply to everything */ 

106 uint32 parent_obj_id; 

107 uint16 sum_no_longer_used; /* checksum of name. No longer used */ 

108 char name[256]; 

109 uint16 chksum; 

110 /* The following apply to all object types except for hard links */ 

111 uint32 st_mode; /* protection */ 

112 uint32 st_uid; 

113 uint32 st_gid; 

114 uint32 st_atime; 

115 uint32 st_mtime; 

116 uint32 st_ctime; 

117 uint32 file_size_low; /* File size applies to files only */ 

118 int equiv_id; /* Equivalent object id applies to hard links only. */ 

119 char alias[160]; /* Alias is for symlinks only. */ 

120 uint32 st_rdev; /* stuff for block and char devices (major/min) */ 

121 uint32 win_ctime[2]; 

122 uint32 win_atime[2]; 

123 uint32 win_mtime[2]; 

124 uint32 inband_shadowed_obj_id; 

125 uint32 inband_is_shrink; 

126 uint32 file_size_high; 

127 uint32 reserved[1]; 

128 int shadows_obj; /* This object header shadows the specified object if > 0 */ 

129 /* is_shrink applies to object headers written when we make a hole. */ 

130 uint32 is_shrink; 

131 yaffs_file_var filehead; 

132 } yaffs2_obj_hdr_t; 

133 

134 typedef struct yaffs2_packed_tags { 

135 uint32 seq_number; 

136 uint32 object_id; 

137 uint32 chunk_id; 

138 uint32 byte_count; 

139 } yaffs2_packed_tags_t; 

140""" 

141 

142 

143class YaffsObjectType(IntEnum): 

144 UNKNOWN = 0 

145 FILE = 1 

146 SYMLINK = 2 

147 DIRECTORY = 3 

148 HARDLINK = 4 

149 SPECIAL = 5 

150 

151 

152@attrs.define 

153class YAFFSChunk: 

154 chunk_id: int 

155 offset: int 

156 byte_count: int 

157 object_id: int 

158 

159 

160@attrs.define 

161class YAFFS1Chunk(YAFFSChunk): 

162 serial: int 

163 ecc: bytes 

164 page_status: int 

165 block_status: int 

166 

167 

168@attrs.define 

169class YAFFS2Chunk(YAFFSChunk): 

170 seq_number: int 

171 

172 

173@attrs.define 

174class YAFFSFileVar: 

175 file_size: int 

176 stored_size: int 

177 shrink_size: int 

178 top_level: int 

179 

180 

181@attrs.define 

182class YAFFSConfig: 

183 endianness: Endian 

184 page_size: int 

185 spare_size: int 

186 ecc: bool 

187 

188 

189@attrs.define 

190class YAFFSEntry: 

191 object_type: YaffsObjectType 

192 object_id: int 

193 parent_obj_id: int 

194 sum_no_longer_used: int = attrs.field(default=0) 

195 name: str = attrs.field(default="") 

196 alias: str = attrs.field(default="") 

197 equiv_id: int = attrs.field(default=0) 

198 file_size: int = attrs.field(default=0) 

199 st_mode: int = attrs.field(default=0) 

200 st_uid: int = attrs.field(default=0) 

201 st_gid: int = attrs.field(default=0) 

202 st_atime: int = attrs.field(default=0) 

203 st_mtime: int = attrs.field(default=0) 

204 st_ctime: int = attrs.field(default=0) 

205 

206 def __str__(self): 

207 return f"{self.object_id}: {self.name}" 

208 

209 

210@attrs.define(kw_only=True) 

211class YAFFS2Entry(YAFFSEntry): 

212 chksum: int = attrs.field(default=0) 

213 st_rdev: int = attrs.field(default=0) 

214 win_ctime: list[int] = attrs.field(default=[]) 

215 win_mtime: list[int] = attrs.field(default=[]) 

216 inband_shadowed_obj_id: int = attrs.field(default=0) 

217 inband_is_shrink: int = attrs.field(default=0) 

218 reserved: list[int] = attrs.field(default=[]) 

219 shadows_obj: int = attrs.field(default=0) 

220 is_shrink: int = attrs.field(default=0) 

221 filehead: YAFFSFileVar = attrs.field(default=None) 

222 

223 

224def iterate_over_file( 

225 file: File, config: YAFFSConfig 

226) -> Iterable[tuple[int, bytes, bytes]]: 

227 start_offset = file.tell() 

228 page = file.read(config.page_size) 

229 spare = file.read(config.spare_size) 

230 

231 while len(page) == config.page_size and len(spare) == config.spare_size: 

232 yield (start_offset, page, spare) 

233 page = file.read(config.page_size) 

234 spare = file.read(config.spare_size) 

235 start_offset = file.tell() 

236 

237 

238def decode_file_size(high: int, low: int) -> int: 

239 """File size can be encoded as 64 bits or 32 bits values. 

240 

241 If upper 32 bits are set, it's a 64 bits integer value. 

242 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero. 

243 """ 

244 if high != 0xFFFFFFFF: 

245 return (high << 32) | (low & 0xFFFFFFFF) 

246 if low != 0xFFFFFFFF: 

247 return low 

248 return 0 

249 

250 

251def valid_name(name: bytes) -> bool: 

252 # a valid name is either full of null bytes, or unicode decodable 

253 try: 

254 snull(name[:-1]).decode("utf-8") 

255 except UnicodeDecodeError: 

256 return False 

257 else: 

258 return True 

259 

260 

261def is_valid_header(header) -> bool: 

262 if not valid_name(header.name[:-3]): 

263 return False 

264 if header.type > 5: 

265 return False 

266 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103 

267 return False 

268 return True 

269 

270 

271class YAFFSParser: 

272 HEADER_STRUCT: str 

273 

274 def __init__(self, file: File, config: YAFFSConfig | None = None): 

275 self.file_entries = Tree() 

276 self.data_chunks = defaultdict(list) 

277 self.file = file 

278 self._struct_parser = StructParser(C_DEFINITIONS) 

279 self.end_offset = -1 

280 if config is None: 

281 self.config = self.auto_detect() 

282 logger.debug("auto-detected config", config=self.config) 

283 else: 

284 self.config = config 

285 

286 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

287 raise NotImplementedError 

288 

289 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: 

290 raise NotImplementedError 

291 

292 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]: 

293 raise NotImplementedError 

294 

295 def init_tree(self): 

296 return 

297 

298 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002 

299 self.init_tree() 

300 entries = 0 

301 for offset, page, spare in iterate_over_file(self.file, self.config): 

302 try: 

303 data_chunk = self.build_chunk( 

304 spare, offset - self.config.page_size - self.config.spare_size 

305 ) 

306 except EOFError: 

307 break 

308 

309 # ignore chunks tagged as deleted 

310 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0: 

311 continue 

312 

313 if data_chunk.chunk_id == 0: 

314 try: 

315 header = self._struct_parser.parse( 

316 self.HEADER_STRUCT, page, self.config.endianness 

317 ) 

318 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3) 

319 except EOFError: 

320 break 

321 

322 if not is_valid_header(header): 

323 break 

324 

325 if store: 

326 self.insert_entry(self.build_entry(header, data_chunk)) 

327 entries += 1 

328 elif store: 

329 self.data_chunks[data_chunk.object_id].append(data_chunk) 

330 if not entries: 

331 raise InvalidInputFormat("YAFFS filesystem with no entries.") 

332 self.end_offset = self.file.tell() 

333 

334 def auto_detect(self) -> YAFFSConfig: 

335 """Auto-detect page_size, spare_size, and ECC using known signatures.""" 

336 page_size = 0 

337 config = None 

338 for page_size in VALID_PAGE_SIZES: 

339 spare_start = self.file[page_size : page_size + SPARE_START_LEN] 

340 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC): 

341 config = YAFFSConfig( 

342 endianness=Endian.LITTLE, 

343 page_size=page_size, 

344 ecc=True, 

345 spare_size=-1, 

346 ) 

347 break 

348 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC): 

349 config = YAFFSConfig( 

350 endianness=Endian.LITTLE, 

351 page_size=page_size, 

352 ecc=False, 

353 spare_size=-1, 

354 ) 

355 break 

356 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC): 

357 config = YAFFSConfig( 

358 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1 

359 ) 

360 break 

361 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC): 

362 config = YAFFSConfig( 

363 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1 

364 ) 

365 break 

366 

367 if config is None: 

368 raise InvalidInputFormat("Cannot detect YAFFS configuration.") 

369 

370 # If not using the ECC layout, there are 2 extra bytes at the beginning of the 

371 # spare data block. Ignore them. 

372 

373 ecc_offset = 0 if config.ecc else 2 

374 

375 # The spare data signature is built dynamically, as there are repeating data patterns 

376 # that we can match on to find where the spare data ends. Take this hexdump for example: 

377 # 

378 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................| 

379 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...| 

380 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| 

381 # 

382 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then 

383 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and 

384 # the four bytes at 0x814 (in the next page data section) are identical. This is because 

385 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four 

386 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the 

387 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name 

388 # checksum bytes. 

389 # 

390 # Thus, the signature for identifying the next page section (and hence, the end of the 

391 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF 

392 # 

393 # Note that this requires at least one non-empty subdirectory; in practice, any Linux 

394 # file system should meet this requirement, but one could create a file system that 

395 # does not meet this requirement. 

396 

397 object_id_offset = 4 

398 object_id_start = page_size + ecc_offset + object_id_offset 

399 object_id_end = object_id_start + 4 

400 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" 

401 

402 config.spare_size = ( 

403 self.file[object_id_end : object_id_end + page_size].find(spare_signature) 

404 + object_id_offset 

405 + ecc_offset 

406 ) 

407 

408 # Sanity check the spare size, make sure it looks legit 

409 if config.spare_size not in VALID_SPARE_SIZES: 

410 raise InvalidInputFormat( 

411 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}" 

412 ) 

413 

414 return config 

415 

416 def insert_entry(self, entry: YAFFSEntry): 

417 duplicate_node = self.get_entry(entry.object_id) 

418 if duplicate_node is not None: 

419 # a header chunk with the same object ID already exists 

420 # in the tree, meaning the file metadata were modified, 

421 # or the file got truncated / rewritten. 

422 # Given that YAFFS is a log filesystem, whichever chunk comes 

423 # last takes precendence. 

424 self.file_entries.update_node(str(entry.object_id), data=entry) 

425 return 

426 

427 if entry.object_id == entry.parent_obj_id: 

428 self.file_entries.create_node( 

429 str(entry.object_id), 

430 str(entry.object_id), 

431 data=entry, 

432 ) 

433 else: 

434 parent_node = self.get_entry(entry.parent_obj_id) 

435 if parent_node is None: 

436 logger.warning("Trying to insert an orphaned entry.", entry=entry) 

437 return 

438 if parent_node.object_type != YaffsObjectType.DIRECTORY: 

439 logger.warning( 

440 "Trying to insert an entry with non-directory parent.", entry=entry 

441 ) 

442 return 

443 self.file_entries.create_node( 

444 str(entry.object_id), 

445 str(entry.object_id), 

446 data=entry, 

447 parent=str(entry.parent_obj_id), 

448 ) 

449 

450 def get_entry(self, object_id: int) -> YAFFSEntry | None: 

451 try: 

452 entry = self.file_entries.get_node(str(object_id)) 

453 if entry: 

454 return entry.data 

455 except NodeIDAbsentError: 

456 logger.warning( 

457 "Can't find entry within the YAFFS tree, something's wrong.", 

458 object_id=object_id, 

459 ) 

460 return None 

461 

462 def resolve_path(self, entry: YAFFSEntry) -> Path: 

463 resolved_path = Path(entry.name) 

464 if self.file_entries.parent(str(entry.object_id)) is not None: 

465 parent_entry = self.file_entries[str(entry.parent_obj_id)].data 

466 return self.resolve_path(parent_entry).joinpath(resolved_path) 

467 return resolved_path 

468 

469 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: 

470 for chunk in self.get_chunks(entry.object_id): 

471 yield self.file[chunk.offset : chunk.offset + chunk.byte_count] 

472 

473 def extract(self, fs: FileSystem): 

474 for entry in [ 

475 self.file_entries.get_node(node) 

476 for node in self.file_entries.expand_tree(mode=Tree.DEPTH) 

477 ]: 

478 if entry is None or entry.data is None: 

479 continue 

480 self.extract_entry(entry.data, fs) 

481 

482 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem): 

483 if entry.object_type == YaffsObjectType.UNKNOWN: 

484 logger.warning("unknown entry type", entry=entry) 

485 return 

486 

487 out_path = self.resolve_path(entry) 

488 

489 if entry.object_type == YaffsObjectType.SPECIAL: 

490 if not isinstance(entry, YAFFS2Entry): 

491 logger.warning("non YAFFS2 special object", entry=entry) 

492 return 

493 

494 fs.mknod(out_path, entry.st_mode, entry.st_rdev) 

495 elif entry.object_type == YaffsObjectType.DIRECTORY: 

496 fs.mkdir(out_path, exist_ok=True) 

497 elif entry.object_type == YaffsObjectType.FILE: 

498 fs.write_chunks(out_path, self.get_file_chunks(entry)) 

499 elif entry.object_type == YaffsObjectType.SYMLINK: 

500 fs.create_symlink(src=Path(entry.alias), dst=out_path) 

501 elif entry.object_type == YaffsObjectType.HARDLINK: 

502 dst_entry = self.file_entries[str(entry.equiv_id)].data 

503 dst_path = self.resolve_path(dst_entry) 

504 fs.create_hardlink(src=dst_path, dst=out_path) 

505 

506 

507class YAFFS2Parser(YAFFSParser): 

508 HEADER_STRUCT = "yaffs2_obj_hdr_t" 

509 

510 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: 

511 # images built without ECC have two superfluous bytes before the chunk ID. 

512 if not self.config.ecc: 

513 # adding two null bytes at the end only works if it's LE 

514 spare = spare[2:] + b"\x00\x00" 

515 

516 yaffs2_packed_tags = self._struct_parser.parse( 

517 "yaffs2_packed_tags_t", spare, self.config.endianness 

518 ) 

519 logger.debug( 

520 "yaffs2_packed_tags_t", 

521 yaffs2_packed_tags=yaffs2_packed_tags, 

522 config=self.config, 

523 _verbosity=3, 

524 ) 

525 

526 return YAFFS2Chunk( 

527 offset=offset, 

528 chunk_id=yaffs2_packed_tags.chunk_id, 

529 seq_number=yaffs2_packed_tags.seq_number, 

530 byte_count=yaffs2_packed_tags.byte_count, 

531 object_id=yaffs2_packed_tags.object_id, 

532 ) 

533 

534 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

535 return YAFFS2Entry( 

536 object_id=chunk.object_id, 

537 object_type=header.type, 

538 parent_obj_id=header.parent_obj_id, 

539 sum_no_longer_used=header.sum_no_longer_used, 

540 name=snull(header.name[:-1]).decode("utf-8"), 

541 chksum=header.chksum, 

542 st_mode=header.st_mode, 

543 st_uid=header.st_uid, 

544 st_gid=header.st_gid, 

545 st_atime=header.st_atime, 

546 st_mtime=header.st_mtime, 

547 st_ctime=header.st_ctime, 

548 equiv_id=header.equiv_id, 

549 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

550 st_rdev=header.st_rdev, 

551 win_ctime=header.win_ctime, 

552 win_mtime=header.win_mtime, 

553 inband_shadowed_obj_id=header.inband_shadowed_obj_id, 

554 inband_is_shrink=header.inband_is_shrink, 

555 reserved=header.reserved, 

556 shadows_obj=header.shadows_obj, 

557 is_shrink=header.is_shrink, 

558 filehead=YAFFSFileVar( 

559 file_size=header.filehead.file_size, 

560 stored_size=header.filehead.stored_size, 

561 shrink_size=header.filehead.shrink_size, 

562 top_level=header.filehead.top_level, 

563 ), 

564 file_size=decode_file_size(header.file_size_high, header.file_size_low), 

565 ) 

566 

567 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]: 

568 """Return a filtered and ordered list of chunks.""" 

569 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number! 

570 

571 # As each block is allocated, the file system's 

572 # sequence number is incremented and each chunk in the block is marked with that 

573 # sequence number. The sequence number thus provides a way of organising the log in 

574 # chronological order. 

575 

576 # Since we're scanning backwards, the most recently written - and thus current - chunk 

577 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted. 

578 

579 # note: there is no deletion marker in YAFFS2 

580 

581 for _, chunks in itertools.groupby( 

582 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id) 

583 ): 

584 yield max(chunks, key=lambda chunk: chunk.seq_number) 

585 

586 def init_tree(self): 

587 # YAFFS2 do not store the root in file. 

588 root = YAFFS2Entry( 

589 object_type=YaffsObjectType.DIRECTORY, 

590 object_id=1, 

591 parent_obj_id=1, 

592 ) 

593 self.insert_entry(root) 

594 

595 

596class YAFFS1Parser(YAFFSParser): 

597 HEADER_STRUCT = "yaffs1_obj_hdr_t" 

598 

599 def __init__(self, file: File, config: YAFFSConfig | None = None): 

600 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk 

601 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare). 

602 # In the future we might decide to allow for different chunk sizes. 

603 config = YAFFSConfig( 

604 page_size=YAFFS1_PAGE_SIZE, 

605 spare_size=YAFFS1_SPARE_SIZE, 

606 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS), 

607 ecc=False, 

608 ) 

609 super().__init__(file, config) 

610 

611 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: 

612 yaffs_sparse = self._struct_parser.parse( 

613 "yaffs_spare_t", spare, self.config.endianness 

614 ) 

615 

616 yaffs_packed_tags = self._struct_parser.parse( 

617 "yaffs1_packed_tags_t", 

618 bytes( 

619 [ 

620 yaffs_sparse.tag_b0, 

621 yaffs_sparse.tag_b1, 

622 yaffs_sparse.tag_b2, 

623 yaffs_sparse.tag_b3, 

624 yaffs_sparse.tag_b4, 

625 yaffs_sparse.tag_b5, 

626 yaffs_sparse.tag_b6, 

627 yaffs_sparse.tag_b7, 

628 ] 

629 ), 

630 self.config.endianness, 

631 ) 

632 

633 return YAFFS1Chunk( 

634 offset=offset, 

635 chunk_id=yaffs_packed_tags.chunk_id, 

636 serial=yaffs_packed_tags.serial, 

637 byte_count=yaffs_packed_tags.byte_count, 

638 object_id=yaffs_packed_tags.object_id, 

639 ecc=yaffs_packed_tags.ecc, 

640 page_status=yaffs_sparse.page_status, 

641 block_status=yaffs_sparse.block_status, 

642 ) 

643 

644 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

645 return YAFFSEntry( 

646 object_type=header.type, 

647 object_id=chunk.object_id, 

648 parent_obj_id=header.parent_obj_id, 

649 sum_no_longer_used=header.sum_no_longer_used, 

650 name=snull(header.name[0:128]).decode("utf-8"), 

651 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

652 file_size=header.file_size, 

653 equiv_id=header.equivalent_object_id, 

654 ) 

655 

656 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]: 

657 """Return a filtered and ordered list of chunks.""" 

658 # YAFFS1 chunks have a serial number that is used to track 

659 # which chunk takes precedence if two chunks have the same 

660 # identifier. This is used in scenarios like power loss 

661 # during a copy operation. Whenever we have two chunks with 

662 # the same id, we only return the one with the highest serial. 

663 

664 for _, chunks in itertools.groupby( 

665 sorted( 

666 self.data_chunks[object_id], 

667 key=lambda chunk: chunk.chunk_id, 

668 ) 

669 ): 

670 # serial is a 2 bit, this function works since there's always at most 

671 # two chunks with the same chunk_id at any given time 

672 yield max(chunks, key=lambda chunk: (chunk.serial + 1) & 3) 

673 

674 

675def is_yaffs_v1(file: File, start_offset: int) -> bool: 

676 struct_parser = StructParser(C_DEFINITIONS) 

677 file.seek(start_offset, io.SEEK_SET) 

678 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00": 

679 endian = Endian.LITTLE 

680 else: 

681 endian = Endian.BIG 

682 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET) 

683 spare = file.read(YAFFS1_SPARE_SIZE) 

684 

685 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian) 

686 

687 yaffs_packed_tags = struct_parser.parse( 

688 "yaffs1_packed_tags_t", 

689 bytes( 

690 [ 

691 yaffs_sparse.tag_b0, 

692 yaffs_sparse.tag_b1, 

693 yaffs_sparse.tag_b2, 

694 yaffs_sparse.tag_b3, 

695 yaffs_sparse.tag_b4, 

696 yaffs_sparse.tag_b5, 

697 yaffs_sparse.tag_b6, 

698 yaffs_sparse.tag_b7, 

699 ] 

700 ), 

701 endian, 

702 ) 

703 file.seek(start_offset, io.SEEK_SET) 

704 return ( 

705 yaffs_packed_tags.chunk_id == 0 

706 and yaffs_packed_tags.serial == 0 

707 and yaffs_packed_tags.object_id == 1 

708 ) 

709 

710 

711def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser: 

712 if is_yaffs_v1(file, start_offset): 

713 return YAFFS1Parser(file) 

714 return YAFFS2Parser(file) 

715 

716 

717class YAFFSExtractor(Extractor): 

718 def extract(self, inpath: Path, outdir: Path): 

719 infile = File.from_path(inpath) 

720 parser = instantiate_parser(infile) 

721 parser.parse(store=True) 

722 fs = FileSystem(outdir) 

723 parser.extract(fs) 

724 return ExtractResult(reports=fs.problems) 

725 

726 

727class YAFFSHandler(Handler): 

728 NAME = "yaffs" 

729 

730 PATTERNS = [ 

731 HexString( 

732 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian" 

733 ), 

734 HexString( 

735 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian" 

736 ), 

737 HexString( 

738 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian" 

739 ), 

740 HexString( 

741 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian" 

742 ), 

743 ] 

744 

745 EXTRACTOR = YAFFSExtractor() 

746 

747 DOC = HandlerDoc( 

748 name="YAFFS", 

749 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.", 

750 handler_type=HandlerType.FILESYSTEM, 

751 vendor=None, 

752 references=[ 

753 Reference( 

754 title="YAFFS Documentation", 

755 url="https://yaffs.net/", 

756 ), 

757 Reference( 

758 title="YAFFS Wikipedia", 

759 url="https://en.wikipedia.org/wiki/YAFFS", 

760 ), 

761 ], 

762 limitations=[], 

763 ) 

764 

765 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

766 parser = instantiate_parser(file, start_offset) 

767 parser.parse() 

768 # skip 0xFF padding 

769 file.seek(parser.end_offset, io.SEEK_SET) 

770 read_until_past(file, b"\xff") 

771 return ValidChunk(start_offset=start_offset, end_offset=file.tell())