Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/yaffs.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

313 statements  

1import io 

2import itertools 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from enum import IntEnum 

6from pathlib import Path 

7 

8import attrs 

9from structlog import get_logger 

10from treelib.exceptions import NodeIDAbsentError 

11from treelib.tree import Tree 

12 

13from unblob.file_utils import ( 

14 Endian, 

15 File, 

16 FileSystem, 

17 InvalidInputFormat, 

18 StructParser, 

19 get_endian_multi, 

20 read_until_past, 

21 snull, 

22) 

23from unblob.models import ( 

24 Extractor, 

25 ExtractResult, 

26 Handler, 

27 HandlerDoc, 

28 HandlerType, 

29 HexString, 

30 Reference, 

31 ValidChunk, 

32) 

33 

34logger = get_logger() 

35 

36SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00" 

37SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00" 

38SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00" 

39SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00" 

40SPARE_START_LEN = 6 

41 

42# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE 

43BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03] 

44 

45VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032] 

46VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512] 

47YAFFS1_PAGE_SIZE = 512 

48YAFFS1_SPARE_SIZE = 16 

49 

50C_DEFINITIONS = """ 

51 struct yaffs1_obj_hdr { 

52 uint32 type; /* enum yaffs_obj_type */ 

53 uint32 parent_obj_id; 

54 uint16 sum_no_longer_used; 

55 char name[258]; 

56 uint32 st_mode; // protection 

57 uint32 st_uid; // user ID of owner 

58 uint32 st_gid; // group ID of owner 

59 uint32 st_atime; // time of last access 

60 uint32 st_mtime; // time of last modification 

61 uint32 st_ctime; // time of last change 

62 uint32 file_size; // File size applies to files only 

63 uint32 equivalent_object_id; // Equivalent object id applies to hard links only. 

64 char alias[160]; // alias only applies to symlinks 

65 } yaffs1_obj_hdr_t; 

66 

67 struct yaffs1_packed_tags { 

68 uint32 chunk_id:20; 

69 uint32 serial:2; 

70 uint32 byte_count:10; 

71 uint32 object_id:18; 

72 uint32 ecc:12; 

73 uint32 unused:2; 

74 } yaffs1_packed_tags_t; 

75 

76 typedef struct yaffs_spare 

77 { 

78 uint8 tag_b0; 

79 uint8 tag_b1; 

80 uint8 tag_b2; 

81 uint8 tag_b3; 

82 uint8 page_status; // set to 0 to delete the chunk 

83 uint8 block_status; 

84 uint8 tag_b4; 

85 uint8 tag_b5; 

86 uint8 ecc_0; 

87 uint8 ecc_1; 

88 uint8 ecc_2; 

89 uint8 tag_b6; 

90 uint8 tag_b7; 

91 uint8 ecc_3; 

92 uint8 ecc_4; 

93 uint8 ecc_5; 

94 } yaffs_spare_t; 

95 

96 struct yaffs_file_var { 

97 uint32 file_size; 

98 uint32 stored_size; 

99 uint32 shrink_size; 

100 int top_level; 

101 }; 

102 

103 typedef struct yaffs2_obj_hdr { 

104 uint32 type; /* enum yaffs_obj_type */ 

105 /* Apply to everything */ 

106 uint32 parent_obj_id; 

107 uint16 sum_no_longer_used; /* checksum of name. No longer used */ 

108 char name[256]; 

109 uint16 chksum; 

110 /* The following apply to all object types except for hard links */ 

111 uint32 st_mode; /* protection */ 

112 uint32 st_uid; 

113 uint32 st_gid; 

114 uint32 st_atime; 

115 uint32 st_mtime; 

116 uint32 st_ctime; 

117 uint32 file_size_low; /* File size applies to files only */ 

118 int equiv_id; /* Equivalent object id applies to hard links only. */ 

119 char alias[160]; /* Alias is for symlinks only. */ 

120 uint32 st_rdev; /* stuff for block and char devices (major/min) */ 

121 uint32 win_ctime[2]; 

122 uint32 win_atime[2]; 

123 uint32 win_mtime[2]; 

124 uint32 inband_shadowed_obj_id; 

125 uint32 inband_is_shrink; 

126 uint32 file_size_high; 

127 uint32 reserved[1]; 

128 int shadows_obj; /* This object header shadows the specified object if > 0 */ 

129 /* is_shrink applies to object headers written when we make a hole. */ 

130 uint32 is_shrink; 

131 yaffs_file_var filehead; 

132 } yaffs2_obj_hdr_t; 

133 

134 typedef struct yaffs2_packed_tags { 

135 uint32 seq_number; 

136 uint32 object_id; 

137 uint32 chunk_id; 

138 uint32 byte_count; 

139 } yaffs2_packed_tags_t; 

140""" 

141 

142_STRUCT_PARSER = StructParser(C_DEFINITIONS) 

143 

144 

145class YaffsObjectType(IntEnum): 

146 UNKNOWN = 0 

147 FILE = 1 

148 SYMLINK = 2 

149 DIRECTORY = 3 

150 HARDLINK = 4 

151 SPECIAL = 5 

152 

153 

154@attrs.define 

155class YAFFSChunk: 

156 chunk_id: int 

157 offset: int 

158 byte_count: int 

159 object_id: int 

160 

161 

162@attrs.define 

163class YAFFS1Chunk(YAFFSChunk): 

164 serial: int 

165 ecc: bytes 

166 page_status: int 

167 block_status: int 

168 

169 

170@attrs.define 

171class YAFFS2Chunk(YAFFSChunk): 

172 seq_number: int 

173 

174 

175@attrs.define 

176class YAFFSFileVar: 

177 file_size: int 

178 stored_size: int 

179 shrink_size: int 

180 top_level: int 

181 

182 

183@attrs.define 

184class YAFFSConfig: 

185 endianness: Endian 

186 page_size: int 

187 spare_size: int 

188 ecc: bool 

189 

190 

191@attrs.define 

192class YAFFSEntry: 

193 object_type: YaffsObjectType 

194 object_id: int 

195 parent_obj_id: int 

196 sum_no_longer_used: int = attrs.field(default=0) 

197 name: str = attrs.field(default="") 

198 alias: str = attrs.field(default="") 

199 equiv_id: int = attrs.field(default=0) 

200 file_size: int = attrs.field(default=0) 

201 st_mode: int = attrs.field(default=0) 

202 st_uid: int = attrs.field(default=0) 

203 st_gid: int = attrs.field(default=0) 

204 st_atime: int = attrs.field(default=0) 

205 st_mtime: int = attrs.field(default=0) 

206 st_ctime: int = attrs.field(default=0) 

207 

208 def __str__(self): 

209 return f"{self.object_id}: {self.name}" 

210 

211 

212@attrs.define(kw_only=True) 

213class YAFFS2Entry(YAFFSEntry): 

214 chksum: int = attrs.field(default=0) 

215 st_rdev: int = attrs.field(default=0) 

216 win_ctime: list[int] = attrs.field(default=[]) 

217 win_mtime: list[int] = attrs.field(default=[]) 

218 inband_shadowed_obj_id: int = attrs.field(default=0) 

219 inband_is_shrink: int = attrs.field(default=0) 

220 reserved: list[int] = attrs.field(default=[]) 

221 shadows_obj: int = attrs.field(default=0) 

222 is_shrink: int = attrs.field(default=0) 

223 filehead: YAFFSFileVar = attrs.field(default=None) 

224 

225 

226def iterate_over_file( 

227 file: File, config: YAFFSConfig 

228) -> Iterable[tuple[int, bytes, bytes]]: 

229 start_offset = file.tell() 

230 page = file.read(config.page_size) 

231 spare = file.read(config.spare_size) 

232 

233 while len(page) == config.page_size and len(spare) == config.spare_size: 

234 yield (start_offset, page, spare) 

235 start_offset = file.tell() 

236 page = file.read(config.page_size) 

237 spare = file.read(config.spare_size) 

238 

239 

240def decode_file_size(high: int, low: int) -> int: 

241 """File size can be encoded as 64 bits or 32 bits values. 

242 

243 If upper 32 bits are set, it's a 64 bits integer value. 

244 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero. 

245 """ 

246 if high != 0xFFFFFFFF: 

247 return (high << 32) | (low & 0xFFFFFFFF) 

248 if low != 0xFFFFFFFF: 

249 return low 

250 return 0 

251 

252 

253def valid_name(name: bytes) -> bool: 

254 # a valid name is either full of null bytes, or unicode decodable 

255 try: 

256 snull(name[:-1]).decode("utf-8") 

257 except UnicodeDecodeError: 

258 return False 

259 else: 

260 return True 

261 

262 

263def is_valid_header(header) -> bool: 

264 if not valid_name(header.name[:-3]): 

265 return False 

266 if header.type > 5: 

267 return False 

268 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103 

269 return False 

270 return True 

271 

272 

273class YAFFSParser: 

274 HEADER_STRUCT: str 

275 

276 def __init__(self, file: File, config: YAFFSConfig | None = None): 

277 self.file_entries = Tree() 

278 self.data_chunks = defaultdict(list) 

279 self.file = file 

280 self.end_offset = -1 

281 if config is None: 

282 self.config = self.auto_detect() 

283 logger.debug("auto-detected config", config=self.config) 

284 else: 

285 self.config = config 

286 

287 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

288 raise NotImplementedError 

289 

290 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: 

291 raise NotImplementedError 

292 

293 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]: 

294 raise NotImplementedError 

295 

296 def init_tree(self): 

297 return 

298 

299 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002 

300 self.init_tree() 

301 entries = 0 

302 for offset, page, spare in iterate_over_file(self.file, self.config): 

303 try: 

304 data_chunk = self.build_chunk(spare, offset) 

305 except EOFError: 

306 break 

307 

308 # ignore chunks tagged as deleted 

309 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0: 

310 continue 

311 

312 if data_chunk.chunk_id == 0: 

313 try: 

314 header = _STRUCT_PARSER.parse( 

315 self.HEADER_STRUCT, page, self.config.endianness 

316 ) 

317 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3) 

318 except EOFError: 

319 break 

320 

321 if not is_valid_header(header): 

322 break 

323 

324 if store: 

325 self.insert_entry(self.build_entry(header, data_chunk)) 

326 entries += 1 

327 elif store: 

328 self.data_chunks[data_chunk.object_id].append(data_chunk) 

329 if not entries: 

330 raise InvalidInputFormat("YAFFS filesystem with no entries.") 

331 self.end_offset = self.file.tell() 

332 

333 def auto_detect(self) -> YAFFSConfig: 

334 """Auto-detect page_size, spare_size, and ECC using known signatures.""" 

335 page_size = 0 

336 config = None 

337 for page_size in VALID_PAGE_SIZES: 

338 spare_start = self.file[page_size : page_size + SPARE_START_LEN] 

339 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC): 

340 config = YAFFSConfig( 

341 endianness=Endian.LITTLE, 

342 page_size=page_size, 

343 ecc=True, 

344 spare_size=-1, 

345 ) 

346 break 

347 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC): 

348 config = YAFFSConfig( 

349 endianness=Endian.LITTLE, 

350 page_size=page_size, 

351 ecc=False, 

352 spare_size=-1, 

353 ) 

354 break 

355 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC): 

356 config = YAFFSConfig( 

357 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1 

358 ) 

359 break 

360 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC): 

361 config = YAFFSConfig( 

362 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1 

363 ) 

364 break 

365 

366 if config is None: 

367 raise InvalidInputFormat("Cannot detect YAFFS configuration.") 

368 

369 # If not using the ECC layout, there are 2 extra bytes at the beginning of the 

370 # spare data block. Ignore them. 

371 

372 ecc_offset = 0 if config.ecc else 2 

373 

374 # The spare data signature is built dynamically, as there are repeating data patterns 

375 # that we can match on to find where the spare data ends. Take this hexdump for example: 

376 # 

377 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................| 

378 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...| 

379 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| 

380 # 

381 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then 

382 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and 

383 # the four bytes at 0x814 (in the next page data section) are identical. This is because 

384 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four 

385 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the 

386 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name 

387 # checksum bytes. 

388 # 

389 # Thus, the signature for identifying the next page section (and hence, the end of the 

390 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF 

391 # 

392 # Note that this requires at least one non-empty subdirectory; in practice, any Linux 

393 # file system should meet this requirement, but one could create a file system that 

394 # does not meet this requirement. 

395 

396 object_id_offset = 4 

397 object_id_start = page_size + ecc_offset + object_id_offset 

398 object_id_end = object_id_start + 4 

399 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" 

400 

401 config.spare_size = ( 

402 self.file[object_id_end : object_id_end + page_size].find(spare_signature) 

403 + object_id_offset 

404 + ecc_offset 

405 ) 

406 

407 # Sanity check the spare size, make sure it looks legit 

408 if config.spare_size not in VALID_SPARE_SIZES: 

409 raise InvalidInputFormat( 

410 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}" 

411 ) 

412 

413 return config 

414 

415 def insert_entry(self, entry: YAFFSEntry): 

416 duplicate_node = self.get_entry(entry.object_id) 

417 if duplicate_node is not None: 

418 # a header chunk with the same object ID already exists 

419 # in the tree, meaning the file metadata were modified, 

420 # or the file got truncated / rewritten. 

421 # Given that YAFFS is a log filesystem, whichever chunk comes 

422 # last takes precendence. 

423 self.file_entries.update_node(str(entry.object_id), data=entry) 

424 return 

425 

426 if entry.object_id == entry.parent_obj_id: 

427 self.file_entries.create_node( 

428 str(entry.object_id), 

429 str(entry.object_id), 

430 data=entry, 

431 ) 

432 else: 

433 parent_node = self.get_entry(entry.parent_obj_id) 

434 if parent_node is None: 

435 logger.warning("Trying to insert an orphaned entry.", entry=entry) 

436 return 

437 if parent_node.object_type != YaffsObjectType.DIRECTORY: 

438 logger.warning( 

439 "Trying to insert an entry with non-directory parent.", entry=entry 

440 ) 

441 return 

442 self.file_entries.create_node( 

443 str(entry.object_id), 

444 str(entry.object_id), 

445 data=entry, 

446 parent=str(entry.parent_obj_id), 

447 ) 

448 

449 def get_entry(self, object_id: int) -> YAFFSEntry | None: 

450 try: 

451 entry = self.file_entries.get_node(str(object_id)) 

452 if entry: 

453 return entry.data 

454 except NodeIDAbsentError: 

455 logger.warning( 

456 "Can't find entry within the YAFFS tree, something's wrong.", 

457 object_id=object_id, 

458 ) 

459 return None 

460 

461 def resolve_path(self, entry: YAFFSEntry) -> Path: 

462 resolved_path = Path(entry.name) 

463 if self.file_entries.parent(str(entry.object_id)) is not None: 

464 parent_entry = self.file_entries[str(entry.parent_obj_id)].data 

465 return self.resolve_path(parent_entry).joinpath(resolved_path) 

466 return resolved_path 

467 

468 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: 

469 for chunk in self.get_chunks(entry.object_id): 

470 byte_count = min(chunk.byte_count, self.config.page_size) 

471 yield self.file[chunk.offset : chunk.offset + byte_count] 

472 

473 def extract(self, fs: FileSystem): 

474 for entry in [ 

475 self.file_entries.get_node(node) 

476 for node in self.file_entries.expand_tree(mode=Tree.DEPTH) 

477 ]: 

478 if entry is None or entry.data is None: 

479 continue 

480 self.extract_entry(entry.data, fs) 

481 

482 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem): 

483 if entry.object_type == YaffsObjectType.UNKNOWN: 

484 logger.warning("unknown entry type", entry=entry) 

485 return 

486 

487 out_path = self.resolve_path(entry) 

488 

489 if entry.object_type == YaffsObjectType.SPECIAL: 

490 if not isinstance(entry, YAFFS2Entry): 

491 logger.warning("non YAFFS2 special object", entry=entry) 

492 return 

493 

494 fs.mknod(out_path, entry.st_mode, entry.st_rdev) 

495 elif entry.object_type == YaffsObjectType.DIRECTORY: 

496 fs.mkdir(out_path, exist_ok=True) 

497 elif entry.object_type == YaffsObjectType.FILE: 

498 fs.write_chunks(out_path, self.get_file_chunks(entry)) 

499 elif entry.object_type == YaffsObjectType.SYMLINK: 

500 fs.create_symlink(src=Path(entry.alias), dst=out_path) 

501 elif entry.object_type == YaffsObjectType.HARDLINK: 

502 dst_entry = self.file_entries[str(entry.equiv_id)].data 

503 dst_path = self.resolve_path(dst_entry) 

504 fs.create_hardlink(src=dst_path, dst=out_path) 

505 

506 

507class YAFFS2Parser(YAFFSParser): 

508 HEADER_STRUCT = "yaffs2_obj_hdr_t" 

509 

510 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: 

511 # images built without ECC have two superfluous bytes before the chunk ID. 

512 if not self.config.ecc: 

513 # adding two null bytes at the end only works if it's LE 

514 spare = spare[2:] + b"\x00\x00" 

515 

516 yaffs2_packed_tags = _STRUCT_PARSER.parse( 

517 "yaffs2_packed_tags_t", spare, self.config.endianness 

518 ) 

519 logger.debug( 

520 "yaffs2_packed_tags_t", 

521 yaffs2_packed_tags=yaffs2_packed_tags, 

522 config=self.config, 

523 _verbosity=3, 

524 ) 

525 

526 return YAFFS2Chunk( 

527 offset=offset, 

528 chunk_id=yaffs2_packed_tags.chunk_id, 

529 seq_number=yaffs2_packed_tags.seq_number, 

530 byte_count=yaffs2_packed_tags.byte_count, 

531 object_id=yaffs2_packed_tags.object_id, 

532 ) 

533 

534 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

535 return YAFFS2Entry( 

536 object_id=chunk.object_id, 

537 object_type=header.type, 

538 parent_obj_id=header.parent_obj_id, 

539 sum_no_longer_used=header.sum_no_longer_used, 

540 name=snull(header.name[:-1]).decode("utf-8"), 

541 chksum=header.chksum, 

542 st_mode=header.st_mode, 

543 st_uid=header.st_uid, 

544 st_gid=header.st_gid, 

545 st_atime=header.st_atime, 

546 st_mtime=header.st_mtime, 

547 st_ctime=header.st_ctime, 

548 equiv_id=header.equiv_id, 

549 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

550 st_rdev=header.st_rdev, 

551 win_ctime=header.win_ctime, 

552 win_mtime=header.win_mtime, 

553 inband_shadowed_obj_id=header.inband_shadowed_obj_id, 

554 inband_is_shrink=header.inband_is_shrink, 

555 reserved=header.reserved, 

556 shadows_obj=header.shadows_obj, 

557 is_shrink=header.is_shrink, 

558 filehead=YAFFSFileVar( 

559 file_size=header.filehead.file_size, 

560 stored_size=header.filehead.stored_size, 

561 shrink_size=header.filehead.shrink_size, 

562 top_level=header.filehead.top_level, 

563 ), 

564 file_size=decode_file_size(header.file_size_high, header.file_size_low), 

565 ) 

566 

567 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]: 

568 """Return a filtered and ordered list of chunks.""" 

569 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number! 

570 

571 # As each block is allocated, the file system's 

572 # sequence number is incremented and each chunk in the block is marked with that 

573 # sequence number. The sequence number thus provides a way of organising the log in 

574 # chronological order. 

575 

576 # Since we're scanning backwards, the most recently written - and thus current - chunk 

577 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted. 

578 

579 # note: there is no deletion marker in YAFFS2 

580 

581 for _, chunks in itertools.groupby( 

582 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id) 

583 ): 

584 yield max(chunks, key=lambda chunk: chunk.seq_number) 

585 

586 def init_tree(self): 

587 # YAFFS2 do not store the root in file. 

588 root = YAFFS2Entry( 

589 object_type=YaffsObjectType.DIRECTORY, 

590 object_id=1, 

591 parent_obj_id=1, 

592 ) 

593 self.insert_entry(root) 

594 

595 

596class YAFFS1Parser(YAFFSParser): 

597 HEADER_STRUCT = "yaffs1_obj_hdr_t" 

598 

599 def __init__(self, file: File, config: YAFFSConfig | None = None): 

600 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk 

601 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare). 

602 # In the future we might decide to allow for different chunk sizes. 

603 config = YAFFSConfig( 

604 page_size=YAFFS1_PAGE_SIZE, 

605 spare_size=YAFFS1_SPARE_SIZE, 

606 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS), 

607 ecc=False, 

608 ) 

609 super().__init__(file, config) 

610 

611 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: 

612 yaffs_sparse = _STRUCT_PARSER.parse( 

613 "yaffs_spare_t", spare, self.config.endianness 

614 ) 

615 

616 yaffs_packed_tags = _STRUCT_PARSER.parse( 

617 "yaffs1_packed_tags_t", 

618 bytes( 

619 [ 

620 yaffs_sparse.tag_b0, 

621 yaffs_sparse.tag_b1, 

622 yaffs_sparse.tag_b2, 

623 yaffs_sparse.tag_b3, 

624 yaffs_sparse.tag_b4, 

625 yaffs_sparse.tag_b5, 

626 yaffs_sparse.tag_b6, 

627 yaffs_sparse.tag_b7, 

628 ] 

629 ), 

630 self.config.endianness, 

631 ) 

632 

633 return YAFFS1Chunk( 

634 offset=offset, 

635 chunk_id=yaffs_packed_tags.chunk_id, 

636 serial=yaffs_packed_tags.serial, 

637 byte_count=yaffs_packed_tags.byte_count, 

638 object_id=yaffs_packed_tags.object_id, 

639 ecc=yaffs_packed_tags.ecc, 

640 page_status=yaffs_sparse.page_status, 

641 block_status=yaffs_sparse.block_status, 

642 ) 

643 

644 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

645 return YAFFSEntry( 

646 object_type=header.type, 

647 object_id=chunk.object_id, 

648 parent_obj_id=header.parent_obj_id, 

649 sum_no_longer_used=header.sum_no_longer_used, 

650 name=snull(header.name[0:128]).decode("utf-8"), 

651 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

652 file_size=header.file_size, 

653 equiv_id=header.equivalent_object_id, 

654 ) 

655 

656 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]: 

657 """Return a filtered and ordered list of chunks.""" 

658 # YAFFS1 chunks have a serial number that is used to track 

659 # which chunk takes precedence if two chunks have the same 

660 # identifier. This is used in scenarios like power loss 

661 # during a copy operation. Whenever we have two chunks with 

662 # the same id, we only return the one with the highest serial. 

663 

664 for _, chunks in itertools.groupby( 

665 sorted( 

666 self.data_chunks[object_id], 

667 key=lambda chunk: chunk.chunk_id, 

668 ) 

669 ): 

670 # serial is a 2 bit, this function works since there's always at most 

671 # two chunks with the same chunk_id at any given time 

672 yield max(chunks, key=lambda chunk: (chunk.serial + 1) & 3) 

673 

674 

675def is_yaffs_v1(file: File, start_offset: int) -> bool: 

676 file.seek(start_offset, io.SEEK_SET) 

677 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00": 

678 endian = Endian.LITTLE 

679 else: 

680 endian = Endian.BIG 

681 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET) 

682 spare = file.read(YAFFS1_SPARE_SIZE) 

683 

684 yaffs_sparse = _STRUCT_PARSER.parse("yaffs_spare_t", spare, endian) 

685 

686 yaffs_packed_tags = _STRUCT_PARSER.parse( 

687 "yaffs1_packed_tags_t", 

688 bytes( 

689 [ 

690 yaffs_sparse.tag_b0, 

691 yaffs_sparse.tag_b1, 

692 yaffs_sparse.tag_b2, 

693 yaffs_sparse.tag_b3, 

694 yaffs_sparse.tag_b4, 

695 yaffs_sparse.tag_b5, 

696 yaffs_sparse.tag_b6, 

697 yaffs_sparse.tag_b7, 

698 ] 

699 ), 

700 endian, 

701 ) 

702 file.seek(start_offset, io.SEEK_SET) 

703 return ( 

704 yaffs_packed_tags.chunk_id == 0 

705 and yaffs_packed_tags.serial == 0 

706 and yaffs_packed_tags.object_id == 1 

707 ) 

708 

709 

710def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser: 

711 if is_yaffs_v1(file, start_offset): 

712 return YAFFS1Parser(file) 

713 return YAFFS2Parser(file) 

714 

715 

716class YAFFSExtractor(Extractor): 

717 def extract(self, inpath: Path, outdir: Path): 

718 infile = File.from_path(inpath) 

719 parser = instantiate_parser(infile) 

720 parser.parse(store=True) 

721 fs = FileSystem(outdir) 

722 parser.extract(fs) 

723 return ExtractResult(reports=fs.problems) 

724 

725 

726class YAFFSHandler(Handler): 

727 NAME = "yaffs" 

728 

729 PATTERNS = [ 

730 HexString( 

731 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian" 

732 ), 

733 HexString( 

734 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian" 

735 ), 

736 HexString( 

737 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian" 

738 ), 

739 HexString( 

740 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian" 

741 ), 

742 ] 

743 

744 EXTRACTOR = YAFFSExtractor() 

745 

746 DOC = HandlerDoc( 

747 name="YAFFS", 

748 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.", 

749 handler_type=HandlerType.FILESYSTEM, 

750 vendor=None, 

751 references=[ 

752 Reference( 

753 title="YAFFS Documentation", 

754 url="https://yaffs.net/", 

755 ), 

756 Reference( 

757 title="YAFFS Wikipedia", 

758 url="https://en.wikipedia.org/wiki/YAFFS", 

759 ), 

760 ], 

761 limitations=[], 

762 ) 

763 

764 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

765 parser = instantiate_parser(file, start_offset) 

766 parser.parse() 

767 # skip 0xFF padding 

768 file.seek(parser.end_offset, io.SEEK_SET) 

769 read_until_past(file, b"\xff") 

770 return ValidChunk(start_offset=start_offset, end_offset=file.tell())