Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/yaffs.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

322 statements  

1import io 

2import itertools 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from enum import IntEnum 

6from pathlib import Path 

7from typing import Optional 

8 

9import attrs 

10from structlog import get_logger 

11from treelib.exceptions import NodeIDAbsentError 

12from treelib.tree import Tree 

13 

14from unblob.file_utils import ( 

15 Endian, 

16 File, 

17 FileSystem, 

18 InvalidInputFormat, 

19 StructParser, 

20 get_endian_multi, 

21 read_until_past, 

22 snull, 

23) 

24from unblob.models import ( 

25 Extractor, 

26 ExtractResult, 

27 Handler, 

28 HandlerDoc, 

29 HandlerType, 

30 HexString, 

31 Reference, 

32 ValidChunk, 

33) 

34 

35logger = get_logger() 

36 

37SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00" 

38SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00" 

39SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00" 

40SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00" 

41SPARE_START_LEN = 6 

42 

43# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE 

44BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03] 

45 

46VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032] 

47VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512] 

48YAFFS1_PAGE_SIZE = 512 

49YAFFS1_SPARE_SIZE = 16 

50 

51C_DEFINITIONS = """ 

52 struct yaffs1_obj_hdr { 

53 uint32 type; /* enum yaffs_obj_type */ 

54 uint32 parent_obj_id; 

55 uint16 sum_no_longer_used; 

56 char name[258]; 

57 uint32 st_mode; // protection 

58 uint32 st_uid; // user ID of owner 

59 uint32 st_gid; // group ID of owner 

60 uint32 st_atime; // time of last access 

61 uint32 st_mtime; // time of last modification 

62 uint32 st_ctime; // time of last change 

63 uint32 file_size; // File size applies to files only 

64 uint32 equivalent_object_id; // Equivalent object id applies to hard links only. 

65 char alias[160]; // alias only applies to symlinks 

66 } yaffs1_obj_hdr_t; 

67 

68 struct yaffs1_packed_tags { 

69 uint32 chunk_id:20; 

70 uint32 serial:2; 

71 uint32 byte_count:10; 

72 uint32 object_id:18; 

73 uint32 ecc:12; 

74 uint32 unused:2; 

75 } yaffs1_packed_tags_t; 

76 

77 typedef struct yaffs_spare 

78 { 

79 uint8 tag_b0; 

80 uint8 tag_b1; 

81 uint8 tag_b2; 

82 uint8 tag_b3; 

83 uint8 page_status; // set to 0 to delete the chunk 

84 uint8 block_status; 

85 uint8 tag_b4; 

86 uint8 tag_b5; 

87 uint8 ecc_0; 

88 uint8 ecc_1; 

89 uint8 ecc_2; 

90 uint8 tag_b6; 

91 uint8 tag_b7; 

92 uint8 ecc_3; 

93 uint8 ecc_4; 

94 uint8 ecc_5; 

95 } yaffs_spare_t; 

96 

97 struct yaffs_file_var { 

98 uint32 file_size; 

99 uint32 stored_size; 

100 uint32 shrink_size; 

101 int top_level; 

102 }; 

103 

104 typedef struct yaffs2_obj_hdr { 

105 uint32 type; /* enum yaffs_obj_type */ 

106 /* Apply to everything */ 

107 uint32 parent_obj_id; 

108 uint16 sum_no_longer_used; /* checksum of name. No longer used */ 

109 char name[256]; 

110 uint16 chksum; 

111 /* The following apply to all object types except for hard links */ 

112 uint32 st_mode; /* protection */ 

113 uint32 st_uid; 

114 uint32 st_gid; 

115 uint32 st_atime; 

116 uint32 st_mtime; 

117 uint32 st_ctime; 

118 uint32 file_size_low; /* File size applies to files only */ 

119 int equiv_id; /* Equivalent object id applies to hard links only. */ 

120 char alias[160]; /* Alias is for symlinks only. */ 

121 uint32 st_rdev; /* stuff for block and char devices (major/min) */ 

122 uint32 win_ctime[2]; 

123 uint32 win_atime[2]; 

124 uint32 win_mtime[2]; 

125 uint32 inband_shadowed_obj_id; 

126 uint32 inband_is_shrink; 

127 uint32 file_size_high; 

128 uint32 reserved[1]; 

129 int shadows_obj; /* This object header shadows the specified object if > 0 */ 

130 /* is_shrink applies to object headers written when we make a hole. */ 

131 uint32 is_shrink; 

132 yaffs_file_var filehead; 

133 } yaffs2_obj_hdr_t; 

134 

135 typedef struct yaffs2_packed_tags { 

136 uint32 seq_number; 

137 uint32 object_id; 

138 uint32 chunk_id; 

139 uint32 byte_count; 

140 } yaffs2_packed_tags_t; 

141""" 

142 

143 

144class YaffsObjectType(IntEnum): 

145 UNKNOWN = 0 

146 FILE = 1 

147 SYMLINK = 2 

148 DIRECTORY = 3 

149 HARDLINK = 4 

150 SPECIAL = 5 

151 

152 

153@attrs.define 

154class YAFFSChunk: 

155 chunk_id: int 

156 offset: int 

157 byte_count: int 

158 object_id: int 

159 

160 

161@attrs.define 

162class YAFFS1Chunk(YAFFSChunk): 

163 serial: int 

164 ecc: bytes 

165 page_status: int 

166 block_status: int 

167 

168 

169@attrs.define 

170class YAFFS2Chunk(YAFFSChunk): 

171 seq_number: int 

172 

173 

174@attrs.define 

175class YAFFSFileVar: 

176 file_size: int 

177 stored_size: int 

178 shrink_size: int 

179 top_level: int 

180 

181 

182@attrs.define 

183class YAFFSConfig: 

184 endianness: Endian 

185 page_size: int 

186 spare_size: int 

187 ecc: bool 

188 

189 

190@attrs.define 

191class YAFFSEntry: 

192 object_type: YaffsObjectType 

193 object_id: int 

194 parent_obj_id: int 

195 sum_no_longer_used: int = attrs.field(default=0) 

196 name: str = attrs.field(default="") 

197 alias: str = attrs.field(default="") 

198 equiv_id: int = attrs.field(default=0) 

199 file_size: int = attrs.field(default=0) 

200 st_mode: int = attrs.field(default=0) 

201 st_uid: int = attrs.field(default=0) 

202 st_gid: int = attrs.field(default=0) 

203 st_atime: int = attrs.field(default=0) 

204 st_mtime: int = attrs.field(default=0) 

205 st_ctime: int = attrs.field(default=0) 

206 

207 def __lt__(self, other): 

208 return self.object_id < other.object_id 

209 

210 def __gt__(self, other): 

211 return self.object_id > other.object_id 

212 

213 def __eq__(self, other): 

214 return self.object_id == other.object_id 

215 

216 def __hash__(self): 

217 return hash(self.object_id) 

218 

219 def __str__(self): 

220 return f"{self.object_id}: {self.name}" 

221 

222 

223@attrs.define(kw_only=True) 

224class YAFFS2Entry(YAFFSEntry): 

225 chksum: int = attrs.field(default=0) 

226 st_rdev: int = attrs.field(default=0) 

227 win_ctime: list[int] = attrs.field(default=[]) 

228 win_mtime: list[int] = attrs.field(default=[]) 

229 inband_shadowed_obj_id: int = attrs.field(default=0) 

230 inband_is_shrink: int = attrs.field(default=0) 

231 reserved: list[int] = attrs.field(default=[]) 

232 shadows_obj: int = attrs.field(default=0) 

233 is_shrink: int = attrs.field(default=0) 

234 filehead: YAFFSFileVar = attrs.field(default=None) 

235 

236 

237def iterate_over_file( 

238 file: File, config: YAFFSConfig 

239) -> Iterable[tuple[int, bytes, bytes]]: 

240 start_offset = file.tell() 

241 page = file.read(config.page_size) 

242 spare = file.read(config.spare_size) 

243 

244 while len(page) == config.page_size and len(spare) == config.spare_size: 

245 yield (start_offset, page, spare) 

246 page = file.read(config.page_size) 

247 spare = file.read(config.spare_size) 

248 start_offset = file.tell() 

249 

250 

251def decode_file_size(high: int, low: int) -> int: 

252 """File size can be encoded as 64 bits or 32 bits values. 

253 

254 If upper 32 bits are set, it's a 64 bits integer value. 

255 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero. 

256 """ 

257 if high != 0xFFFFFFFF: 

258 return (high << 32) | (low & 0xFFFFFFFF) 

259 if low != 0xFFFFFFFF: 

260 return low 

261 return 0 

262 

263 

264def valid_name(name: bytes) -> bool: 

265 # a valid name is either full of null bytes, or unicode decodable 

266 try: 

267 snull(name[:-1]).decode("utf-8") 

268 except UnicodeDecodeError: 

269 return False 

270 else: 

271 return True 

272 

273 

274def is_valid_header(header) -> bool: 

275 if not valid_name(header.name[:-3]): 

276 return False 

277 if header.type > 5: 

278 return False 

279 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103 

280 return False 

281 return True 

282 

283 

284class YAFFSParser: 

285 HEADER_STRUCT: str 

286 

287 def __init__(self, file: File, config: Optional[YAFFSConfig] = None): 

288 self.file_entries = Tree() 

289 self.data_chunks = defaultdict(list) 

290 self.file = file 

291 self._struct_parser = StructParser(C_DEFINITIONS) 

292 self.end_offset = -1 

293 if config is None: 

294 self.config = self.auto_detect() 

295 logger.debug("auto-detected config", config=self.config) 

296 else: 

297 self.config = config 

298 

299 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

300 raise NotImplementedError 

301 

302 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: 

303 raise NotImplementedError 

304 

305 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]: 

306 raise NotImplementedError 

307 

308 def init_tree(self): 

309 return 

310 

311 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002 

312 self.init_tree() 

313 entries = 0 

314 for offset, page, spare in iterate_over_file(self.file, self.config): 

315 try: 

316 data_chunk = self.build_chunk( 

317 spare, offset - self.config.page_size - self.config.spare_size 

318 ) 

319 except EOFError: 

320 break 

321 

322 # ignore chunks tagged as deleted 

323 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0: 

324 continue 

325 

326 if data_chunk.chunk_id == 0: 

327 try: 

328 header = self._struct_parser.parse( 

329 self.HEADER_STRUCT, page, self.config.endianness 

330 ) 

331 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3) 

332 except EOFError: 

333 break 

334 

335 if not is_valid_header(header): 

336 break 

337 

338 if store: 

339 self.insert_entry(self.build_entry(header, data_chunk)) 

340 entries += 1 

341 elif store: 

342 self.data_chunks[data_chunk.object_id].append(data_chunk) 

343 if not entries: 

344 raise InvalidInputFormat("YAFFS filesystem with no entries.") 

345 self.end_offset = self.file.tell() 

346 

347 def auto_detect(self) -> YAFFSConfig: 

348 """Auto-detect page_size, spare_size, and ECC using known signatures.""" 

349 page_size = 0 

350 config = None 

351 for page_size in VALID_PAGE_SIZES: 

352 spare_start = self.file[page_size : page_size + SPARE_START_LEN] 

353 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC): 

354 config = YAFFSConfig( 

355 endianness=Endian.LITTLE, 

356 page_size=page_size, 

357 ecc=True, 

358 spare_size=-1, 

359 ) 

360 break 

361 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC): 

362 config = YAFFSConfig( 

363 endianness=Endian.LITTLE, 

364 page_size=page_size, 

365 ecc=False, 

366 spare_size=-1, 

367 ) 

368 break 

369 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC): 

370 config = YAFFSConfig( 

371 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1 

372 ) 

373 break 

374 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC): 

375 config = YAFFSConfig( 

376 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1 

377 ) 

378 break 

379 

380 if config is None: 

381 raise InvalidInputFormat("Cannot detect YAFFS configuration.") 

382 

383 # If not using the ECC layout, there are 2 extra bytes at the beginning of the 

384 # spare data block. Ignore them. 

385 

386 ecc_offset = 0 if config.ecc else 2 

387 

388 # The spare data signature is built dynamically, as there are repeating data patterns 

389 # that we can match on to find where the spare data ends. Take this hexdump for example: 

390 # 

391 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................| 

392 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...| 

393 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| 

394 # 

395 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then 

396 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and 

397 # the four bytes at 0x814 (in the next page data section) are identical. This is because 

398 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four 

399 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the 

400 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name 

401 # checksum bytes. 

402 # 

403 # Thus, the signature for identifying the next page section (and hence, the end of the 

404 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF 

405 # 

406 # Note that this requires at least one non-empty subdirectory; in practice, any Linux 

407 # file system should meet this requirement, but one could create a file system that 

408 # does not meet this requirement. 

409 

410 object_id_offset = 4 

411 object_id_start = page_size + ecc_offset + object_id_offset 

412 object_id_end = object_id_start + 4 

413 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" 

414 

415 config.spare_size = ( 

416 self.file[object_id_end : object_id_end + page_size].find(spare_signature) 

417 + object_id_offset 

418 + ecc_offset 

419 ) 

420 

421 # Sanity check the spare size, make sure it looks legit 

422 if config.spare_size not in VALID_SPARE_SIZES: 

423 raise InvalidInputFormat( 

424 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}" 

425 ) 

426 

427 return config 

428 

429 def insert_entry(self, entry: YAFFSEntry): 

430 duplicate_node = self.get_entry(entry.object_id) 

431 if duplicate_node is not None: 

432 # a header chunk with the same object ID already exists 

433 # in the tree, meaning the file metadata were modified, 

434 # or the file got truncated / rewritten. 

435 # Given that YAFFS is a log filesystem, whichever chunk comes 

436 # last takes precendence. 

437 self.file_entries.update_node(str(entry.object_id), data=entry) 

438 return 

439 

440 if entry.object_id == entry.parent_obj_id: 

441 self.file_entries.create_node( 

442 str(entry.object_id), 

443 str(entry.object_id), 

444 data=entry, 

445 ) 

446 else: 

447 parent_node = self.get_entry(entry.parent_obj_id) 

448 if parent_node is None: 

449 logger.warning("Trying to insert an orphaned entry.", entry=entry) 

450 return 

451 if parent_node.object_type != YaffsObjectType.DIRECTORY: 

452 logger.warning( 

453 "Trying to insert an entry with non-directory parent.", entry=entry 

454 ) 

455 return 

456 self.file_entries.create_node( 

457 str(entry.object_id), 

458 str(entry.object_id), 

459 data=entry, 

460 parent=str(entry.parent_obj_id), 

461 ) 

462 

463 def get_entry(self, object_id: int) -> Optional[YAFFSEntry]: 

464 try: 

465 entry = self.file_entries.get_node(str(object_id)) 

466 if entry: 

467 return entry.data 

468 except NodeIDAbsentError: 

469 logger.warning( 

470 "Can't find entry within the YAFFS tree, something's wrong.", 

471 object_id=object_id, 

472 ) 

473 return None 

474 

475 def resolve_path(self, entry: YAFFSEntry) -> Path: 

476 resolved_path = Path(entry.name) 

477 if self.file_entries.parent(str(entry.object_id)) is not None: 

478 parent_entry = self.file_entries[str(entry.parent_obj_id)].data 

479 return self.resolve_path(parent_entry).joinpath(resolved_path) 

480 return resolved_path 

481 

482 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: 

483 for chunk in self.get_chunks(entry.object_id): 

484 yield self.file[chunk.offset : chunk.offset + chunk.byte_count] 

485 

486 def extract(self, fs: FileSystem): 

487 for entry in [ 

488 self.file_entries.get_node(node) 

489 for node in self.file_entries.expand_tree(mode=Tree.DEPTH) 

490 ]: 

491 if entry is None or entry.data is None: 

492 continue 

493 self.extract_entry(entry.data, fs) 

494 

495 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem): 

496 if entry.object_type == YaffsObjectType.UNKNOWN: 

497 logger.warning("unknown entry type", entry=entry) 

498 return 

499 

500 out_path = self.resolve_path(entry) 

501 

502 if entry.object_type == YaffsObjectType.SPECIAL: 

503 if not isinstance(entry, YAFFS2Entry): 

504 logger.warning("non YAFFS2 special object", entry=entry) 

505 return 

506 

507 fs.mknod(out_path, entry.st_mode, entry.st_rdev) 

508 elif entry.object_type == YaffsObjectType.DIRECTORY: 

509 fs.mkdir(out_path, exist_ok=True) 

510 elif entry.object_type == YaffsObjectType.FILE: 

511 fs.write_chunks(out_path, self.get_file_chunks(entry)) 

512 elif entry.object_type == YaffsObjectType.SYMLINK: 

513 fs.create_symlink(src=Path(entry.alias), dst=out_path) 

514 elif entry.object_type == YaffsObjectType.HARDLINK: 

515 dst_entry = self.file_entries[str(entry.equiv_id)].data 

516 dst_path = self.resolve_path(dst_entry) 

517 fs.create_hardlink(src=dst_path, dst=out_path) 

518 

519 

520class YAFFS2Parser(YAFFSParser): 

521 HEADER_STRUCT = "yaffs2_obj_hdr_t" 

522 

523 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: 

524 # images built without ECC have two superfluous bytes before the chunk ID. 

525 if not self.config.ecc: 

526 # adding two null bytes at the end only works if it's LE 

527 spare = spare[2:] + b"\x00\x00" 

528 

529 yaffs2_packed_tags = self._struct_parser.parse( 

530 "yaffs2_packed_tags_t", spare, self.config.endianness 

531 ) 

532 logger.debug( 

533 "yaffs2_packed_tags_t", 

534 yaffs2_packed_tags=yaffs2_packed_tags, 

535 config=self.config, 

536 _verbosity=3, 

537 ) 

538 

539 return YAFFS2Chunk( 

540 offset=offset, 

541 chunk_id=yaffs2_packed_tags.chunk_id, 

542 seq_number=yaffs2_packed_tags.seq_number, 

543 byte_count=yaffs2_packed_tags.byte_count, 

544 object_id=yaffs2_packed_tags.object_id, 

545 ) 

546 

547 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

548 return YAFFS2Entry( 

549 object_id=chunk.object_id, 

550 object_type=header.type, 

551 parent_obj_id=header.parent_obj_id, 

552 sum_no_longer_used=header.sum_no_longer_used, 

553 name=snull(header.name[:-1]).decode("utf-8"), 

554 chksum=header.chksum, 

555 st_mode=header.st_mode, 

556 st_uid=header.st_uid, 

557 st_gid=header.st_gid, 

558 st_atime=header.st_atime, 

559 st_mtime=header.st_mtime, 

560 st_ctime=header.st_ctime, 

561 equiv_id=header.equiv_id, 

562 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

563 st_rdev=header.st_rdev, 

564 win_ctime=header.win_ctime, 

565 win_mtime=header.win_mtime, 

566 inband_shadowed_obj_id=header.inband_shadowed_obj_id, 

567 inband_is_shrink=header.inband_is_shrink, 

568 reserved=header.reserved, 

569 shadows_obj=header.shadows_obj, 

570 is_shrink=header.is_shrink, 

571 filehead=YAFFSFileVar( 

572 file_size=header.filehead.file_size, 

573 stored_size=header.filehead.stored_size, 

574 shrink_size=header.filehead.shrink_size, 

575 top_level=header.filehead.top_level, 

576 ), 

577 file_size=decode_file_size(header.file_size_high, header.file_size_low), 

578 ) 

579 

580 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]: 

581 """Return a filtered and ordered list of chunks.""" 

582 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number! 

583 

584 # As each block is allocated, the file system's 

585 # sequence number is incremented and each chunk in the block is marked with that 

586 # sequence number. The sequence number thus provides a way of organising the log in 

587 # chronological order. 

588 

589 # Since we're scanning backwards, the most recently written - and thus current - chunk 

590 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted. 

591 

592 # note: there is no deletion marker in YAFFS2 

593 

594 for _, chunks in itertools.groupby( 

595 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id) 

596 ): 

597 yield max(chunks, key=lambda chunk: chunk.seq_number) 

598 

599 def init_tree(self): 

600 # YAFFS2 do not store the root in file. 

601 root = YAFFS2Entry( 

602 object_type=YaffsObjectType.DIRECTORY, 

603 object_id=1, 

604 parent_obj_id=1, 

605 ) 

606 self.insert_entry(root) 

607 

608 

609class YAFFS1Parser(YAFFSParser): 

610 HEADER_STRUCT = "yaffs1_obj_hdr_t" 

611 

612 def __init__(self, file: File, config: Optional[YAFFSConfig] = None): 

613 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk 

614 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare). 

615 # In the future we might decide to allow for different chunk sizes. 

616 config = YAFFSConfig( 

617 page_size=YAFFS1_PAGE_SIZE, 

618 spare_size=YAFFS1_SPARE_SIZE, 

619 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS), 

620 ecc=False, 

621 ) 

622 super().__init__(file, config) 

623 

624 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: 

625 yaffs_sparse = self._struct_parser.parse( 

626 "yaffs_spare_t", spare, self.config.endianness 

627 ) 

628 

629 yaffs_packed_tags = self._struct_parser.parse( 

630 "yaffs1_packed_tags_t", 

631 bytes( 

632 [ 

633 yaffs_sparse.tag_b0, 

634 yaffs_sparse.tag_b1, 

635 yaffs_sparse.tag_b2, 

636 yaffs_sparse.tag_b3, 

637 yaffs_sparse.tag_b4, 

638 yaffs_sparse.tag_b5, 

639 yaffs_sparse.tag_b6, 

640 yaffs_sparse.tag_b7, 

641 ] 

642 ), 

643 self.config.endianness, 

644 ) 

645 

646 return YAFFS1Chunk( 

647 offset=offset, 

648 chunk_id=yaffs_packed_tags.chunk_id, 

649 serial=yaffs_packed_tags.serial, 

650 byte_count=yaffs_packed_tags.byte_count, 

651 object_id=yaffs_packed_tags.object_id, 

652 ecc=yaffs_packed_tags.ecc, 

653 page_status=yaffs_sparse.page_status, 

654 block_status=yaffs_sparse.block_status, 

655 ) 

656 

657 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

658 return YAFFSEntry( 

659 object_type=header.type, 

660 object_id=chunk.object_id, 

661 parent_obj_id=header.parent_obj_id, 

662 sum_no_longer_used=header.sum_no_longer_used, 

663 name=snull(header.name[0:128]).decode("utf-8"), 

664 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

665 file_size=header.file_size, 

666 equiv_id=header.equivalent_object_id, 

667 ) 

668 

669 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]: 

670 """Return a filtered and ordered list of chunks.""" 

671 # YAFFS1 chunks have a serial number that is used to track 

672 # which chunk takes precedence if two chunks have the same 

673 # identifier. This is used in scenarios like power loss 

674 # during a copy operation. Whenever we have two chunks with 

675 # the same id, we only return the one with the highest serial. 

676 

677 for _, chunks in itertools.groupby( 

678 sorted( 

679 self.data_chunks[object_id], 

680 key=lambda chunk: chunk.chunk_id, 

681 ) 

682 ): 

683 # serial is a 2 bit, this function works since there's always at most 

684 # two chunks with the same chunk_id at any given time 

685 yield max(chunks, key=lambda chunk: ((chunk.serial + 1) & 3)) 

686 

687 

688def is_yaffs_v1(file: File, start_offset: int) -> bool: 

689 struct_parser = StructParser(C_DEFINITIONS) 

690 file.seek(start_offset, io.SEEK_SET) 

691 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00": 

692 endian = Endian.LITTLE 

693 else: 

694 endian = Endian.BIG 

695 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET) 

696 spare = file.read(YAFFS1_SPARE_SIZE) 

697 

698 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian) 

699 

700 yaffs_packed_tags = struct_parser.parse( 

701 "yaffs1_packed_tags_t", 

702 bytes( 

703 [ 

704 yaffs_sparse.tag_b0, 

705 yaffs_sparse.tag_b1, 

706 yaffs_sparse.tag_b2, 

707 yaffs_sparse.tag_b3, 

708 yaffs_sparse.tag_b4, 

709 yaffs_sparse.tag_b5, 

710 yaffs_sparse.tag_b6, 

711 yaffs_sparse.tag_b7, 

712 ] 

713 ), 

714 endian, 

715 ) 

716 file.seek(start_offset, io.SEEK_SET) 

717 return ( 

718 yaffs_packed_tags.chunk_id == 0 

719 and yaffs_packed_tags.serial == 0 

720 and yaffs_packed_tags.object_id == 1 

721 ) 

722 

723 

724def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser: 

725 if is_yaffs_v1(file, start_offset): 

726 return YAFFS1Parser(file) 

727 return YAFFS2Parser(file) 

728 

729 

730class YAFFSExtractor(Extractor): 

731 def extract(self, inpath: Path, outdir: Path): 

732 infile = File.from_path(inpath) 

733 parser = instantiate_parser(infile) 

734 parser.parse(store=True) 

735 fs = FileSystem(outdir) 

736 parser.extract(fs) 

737 return ExtractResult(reports=fs.problems) 

738 

739 

740class YAFFSHandler(Handler): 

741 NAME = "yaffs" 

742 

743 PATTERNS = [ 

744 HexString( 

745 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian" 

746 ), 

747 HexString( 

748 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian" 

749 ), 

750 HexString( 

751 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian" 

752 ), 

753 HexString( 

754 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian" 

755 ), 

756 ] 

757 

758 EXTRACTOR = YAFFSExtractor() 

759 

760 DOC = HandlerDoc( 

761 name="YAFFS", 

762 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.", 

763 handler_type=HandlerType.FILESYSTEM, 

764 vendor=None, 

765 references=[ 

766 Reference( 

767 title="YAFFS Documentation", 

768 url="https://yaffs.net/", 

769 ), 

770 Reference( 

771 title="YAFFS Wikipedia", 

772 url="https://en.wikipedia.org/wiki/YAFFS", 

773 ), 

774 ], 

775 limitations=[], 

776 ) 

777 

778 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

779 parser = instantiate_parser(file, start_offset) 

780 parser.parse() 

781 # skip 0xFF padding 

782 file.seek(parser.end_offset, io.SEEK_SET) 

783 read_until_past(file, b"\xff") 

784 return ValidChunk(start_offset=start_offset, end_offset=file.tell())