Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/yaffs.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

314 statements  

1import io 

2import itertools 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from enum import IntEnum 

6from pathlib import Path 

7from typing import Optional 

8 

9import attrs 

10from structlog import get_logger 

11from treelib.exceptions import NodeIDAbsentError 

12from treelib.tree import Tree 

13 

14from unblob.file_utils import ( 

15 Endian, 

16 File, 

17 FileSystem, 

18 InvalidInputFormat, 

19 StructParser, 

20 get_endian_multi, 

21 read_until_past, 

22 snull, 

23) 

24from unblob.models import ( 

25 Extractor, 

26 ExtractResult, 

27 Handler, 

28 HandlerDoc, 

29 HandlerType, 

30 HexString, 

31 Reference, 

32 ValidChunk, 

33) 

34 

35logger = get_logger() 

36 

37SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00" 

38SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00" 

39SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00" 

40SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00" 

41SPARE_START_LEN = 6 

42 

43# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE 

44BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03] 

45 

46VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032] 

47VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512] 

48YAFFS1_PAGE_SIZE = 512 

49YAFFS1_SPARE_SIZE = 16 

50 

51C_DEFINITIONS = """ 

52 struct yaffs1_obj_hdr { 

53 uint32 type; /* enum yaffs_obj_type */ 

54 uint32 parent_obj_id; 

55 uint16 sum_no_longer_used; 

56 char name[258]; 

57 uint32 st_mode; // protection 

58 uint32 st_uid; // user ID of owner 

59 uint32 st_gid; // group ID of owner 

60 uint32 st_atime; // time of last access 

61 uint32 st_mtime; // time of last modification 

62 uint32 st_ctime; // time of last change 

63 uint32 file_size; // File size applies to files only 

64 uint32 equivalent_object_id; // Equivalent object id applies to hard links only. 

65 char alias[160]; // alias only applies to symlinks 

66 } yaffs1_obj_hdr_t; 

67 

68 struct yaffs1_packed_tags { 

69 uint32 chunk_id:20; 

70 uint32 serial:2; 

71 uint32 byte_count:10; 

72 uint32 object_id:18; 

73 uint32 ecc:12; 

74 uint32 unused:2; 

75 } yaffs1_packed_tags_t; 

76 

77 typedef struct yaffs_spare 

78 { 

79 uint8 tag_b0; 

80 uint8 tag_b1; 

81 uint8 tag_b2; 

82 uint8 tag_b3; 

83 uint8 page_status; // set to 0 to delete the chunk 

84 uint8 block_status; 

85 uint8 tag_b4; 

86 uint8 tag_b5; 

87 uint8 ecc_0; 

88 uint8 ecc_1; 

89 uint8 ecc_2; 

90 uint8 tag_b6; 

91 uint8 tag_b7; 

92 uint8 ecc_3; 

93 uint8 ecc_4; 

94 uint8 ecc_5; 

95 } yaffs_spare_t; 

96 

97 struct yaffs_file_var { 

98 uint32 file_size; 

99 uint32 stored_size; 

100 uint32 shrink_size; 

101 int top_level; 

102 }; 

103 

104 typedef struct yaffs2_obj_hdr { 

105 uint32 type; /* enum yaffs_obj_type */ 

106 /* Apply to everything */ 

107 uint32 parent_obj_id; 

108 uint16 sum_no_longer_used; /* checksum of name. No longer used */ 

109 char name[256]; 

110 uint16 chksum; 

111 /* The following apply to all object types except for hard links */ 

112 uint32 st_mode; /* protection */ 

113 uint32 st_uid; 

114 uint32 st_gid; 

115 uint32 st_atime; 

116 uint32 st_mtime; 

117 uint32 st_ctime; 

118 uint32 file_size_low; /* File size applies to files only */ 

119 int equiv_id; /* Equivalent object id applies to hard links only. */ 

120 char alias[160]; /* Alias is for symlinks only. */ 

121 uint32 st_rdev; /* stuff for block and char devices (major/min) */ 

122 uint32 win_ctime[2]; 

123 uint32 win_atime[2]; 

124 uint32 win_mtime[2]; 

125 uint32 inband_shadowed_obj_id; 

126 uint32 inband_is_shrink; 

127 uint32 file_size_high; 

128 uint32 reserved[1]; 

129 int shadows_obj; /* This object header shadows the specified object if > 0 */ 

130 /* is_shrink applies to object headers written when we make a hole. */ 

131 uint32 is_shrink; 

132 yaffs_file_var filehead; 

133 } yaffs2_obj_hdr_t; 

134 

135 typedef struct yaffs2_packed_tags { 

136 uint32 seq_number; 

137 uint32 object_id; 

138 uint32 chunk_id; 

139 uint32 byte_count; 

140 } yaffs2_packed_tags_t; 

141""" 

142 

143 

144class YaffsObjectType(IntEnum): 

145 UNKNOWN = 0 

146 FILE = 1 

147 SYMLINK = 2 

148 DIRECTORY = 3 

149 HARDLINK = 4 

150 SPECIAL = 5 

151 

152 

153@attrs.define 

154class YAFFSChunk: 

155 chunk_id: int 

156 offset: int 

157 byte_count: int 

158 object_id: int 

159 

160 

161@attrs.define 

162class YAFFS1Chunk(YAFFSChunk): 

163 serial: int 

164 ecc: bytes 

165 page_status: int 

166 block_status: int 

167 

168 

169@attrs.define 

170class YAFFS2Chunk(YAFFSChunk): 

171 seq_number: int 

172 

173 

174@attrs.define 

175class YAFFSFileVar: 

176 file_size: int 

177 stored_size: int 

178 shrink_size: int 

179 top_level: int 

180 

181 

182@attrs.define 

183class YAFFSConfig: 

184 endianness: Endian 

185 page_size: int 

186 spare_size: int 

187 ecc: bool 

188 

189 

190@attrs.define 

191class YAFFSEntry: 

192 object_type: YaffsObjectType 

193 object_id: int 

194 parent_obj_id: int 

195 sum_no_longer_used: int = attrs.field(default=0) 

196 name: str = attrs.field(default="") 

197 alias: str = attrs.field(default="") 

198 equiv_id: int = attrs.field(default=0) 

199 file_size: int = attrs.field(default=0) 

200 st_mode: int = attrs.field(default=0) 

201 st_uid: int = attrs.field(default=0) 

202 st_gid: int = attrs.field(default=0) 

203 st_atime: int = attrs.field(default=0) 

204 st_mtime: int = attrs.field(default=0) 

205 st_ctime: int = attrs.field(default=0) 

206 

207 def __str__(self): 

208 return f"{self.object_id}: {self.name}" 

209 

210 

211@attrs.define(kw_only=True) 

212class YAFFS2Entry(YAFFSEntry): 

213 chksum: int = attrs.field(default=0) 

214 st_rdev: int = attrs.field(default=0) 

215 win_ctime: list[int] = attrs.field(default=[]) 

216 win_mtime: list[int] = attrs.field(default=[]) 

217 inband_shadowed_obj_id: int = attrs.field(default=0) 

218 inband_is_shrink: int = attrs.field(default=0) 

219 reserved: list[int] = attrs.field(default=[]) 

220 shadows_obj: int = attrs.field(default=0) 

221 is_shrink: int = attrs.field(default=0) 

222 filehead: YAFFSFileVar = attrs.field(default=None) 

223 

224 

225def iterate_over_file( 

226 file: File, config: YAFFSConfig 

227) -> Iterable[tuple[int, bytes, bytes]]: 

228 start_offset = file.tell() 

229 page = file.read(config.page_size) 

230 spare = file.read(config.spare_size) 

231 

232 while len(page) == config.page_size and len(spare) == config.spare_size: 

233 yield (start_offset, page, spare) 

234 page = file.read(config.page_size) 

235 spare = file.read(config.spare_size) 

236 start_offset = file.tell() 

237 

238 

239def decode_file_size(high: int, low: int) -> int: 

240 """File size can be encoded as 64 bits or 32 bits values. 

241 

242 If upper 32 bits are set, it's a 64 bits integer value. 

243 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero. 

244 """ 

245 if high != 0xFFFFFFFF: 

246 return (high << 32) | (low & 0xFFFFFFFF) 

247 if low != 0xFFFFFFFF: 

248 return low 

249 return 0 

250 

251 

252def valid_name(name: bytes) -> bool: 

253 # a valid name is either full of null bytes, or unicode decodable 

254 try: 

255 snull(name[:-1]).decode("utf-8") 

256 except UnicodeDecodeError: 

257 return False 

258 else: 

259 return True 

260 

261 

262def is_valid_header(header) -> bool: 

263 if not valid_name(header.name[:-3]): 

264 return False 

265 if header.type > 5: 

266 return False 

267 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103 

268 return False 

269 return True 

270 

271 

272class YAFFSParser: 

273 HEADER_STRUCT: str 

274 

275 def __init__(self, file: File, config: Optional[YAFFSConfig] = None): 

276 self.file_entries = Tree() 

277 self.data_chunks = defaultdict(list) 

278 self.file = file 

279 self._struct_parser = StructParser(C_DEFINITIONS) 

280 self.end_offset = -1 

281 if config is None: 

282 self.config = self.auto_detect() 

283 logger.debug("auto-detected config", config=self.config) 

284 else: 

285 self.config = config 

286 

287 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

288 raise NotImplementedError 

289 

290 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: 

291 raise NotImplementedError 

292 

293 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]: 

294 raise NotImplementedError 

295 

296 def init_tree(self): 

297 return 

298 

299 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002 

300 self.init_tree() 

301 entries = 0 

302 for offset, page, spare in iterate_over_file(self.file, self.config): 

303 try: 

304 data_chunk = self.build_chunk( 

305 spare, offset - self.config.page_size - self.config.spare_size 

306 ) 

307 except EOFError: 

308 break 

309 

310 # ignore chunks tagged as deleted 

311 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0: 

312 continue 

313 

314 if data_chunk.chunk_id == 0: 

315 try: 

316 header = self._struct_parser.parse( 

317 self.HEADER_STRUCT, page, self.config.endianness 

318 ) 

319 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3) 

320 except EOFError: 

321 break 

322 

323 if not is_valid_header(header): 

324 break 

325 

326 if store: 

327 self.insert_entry(self.build_entry(header, data_chunk)) 

328 entries += 1 

329 elif store: 

330 self.data_chunks[data_chunk.object_id].append(data_chunk) 

331 if not entries: 

332 raise InvalidInputFormat("YAFFS filesystem with no entries.") 

333 self.end_offset = self.file.tell() 

334 

335 def auto_detect(self) -> YAFFSConfig: 

336 """Auto-detect page_size, spare_size, and ECC using known signatures.""" 

337 page_size = 0 

338 config = None 

339 for page_size in VALID_PAGE_SIZES: 

340 spare_start = self.file[page_size : page_size + SPARE_START_LEN] 

341 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC): 

342 config = YAFFSConfig( 

343 endianness=Endian.LITTLE, 

344 page_size=page_size, 

345 ecc=True, 

346 spare_size=-1, 

347 ) 

348 break 

349 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC): 

350 config = YAFFSConfig( 

351 endianness=Endian.LITTLE, 

352 page_size=page_size, 

353 ecc=False, 

354 spare_size=-1, 

355 ) 

356 break 

357 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC): 

358 config = YAFFSConfig( 

359 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1 

360 ) 

361 break 

362 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC): 

363 config = YAFFSConfig( 

364 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1 

365 ) 

366 break 

367 

368 if config is None: 

369 raise InvalidInputFormat("Cannot detect YAFFS configuration.") 

370 

371 # If not using the ECC layout, there are 2 extra bytes at the beginning of the 

372 # spare data block. Ignore them. 

373 

374 ecc_offset = 0 if config.ecc else 2 

375 

376 # The spare data signature is built dynamically, as there are repeating data patterns 

377 # that we can match on to find where the spare data ends. Take this hexdump for example: 

378 # 

379 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................| 

380 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...| 

381 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| 

382 # 

383 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then 

384 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and 

385 # the four bytes at 0x814 (in the next page data section) are identical. This is because 

386 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four 

387 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the 

388 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name 

389 # checksum bytes. 

390 # 

391 # Thus, the signature for identifying the next page section (and hence, the end of the 

392 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF 

393 # 

394 # Note that this requires at least one non-empty subdirectory; in practice, any Linux 

395 # file system should meet this requirement, but one could create a file system that 

396 # does not meet this requirement. 

397 

398 object_id_offset = 4 

399 object_id_start = page_size + ecc_offset + object_id_offset 

400 object_id_end = object_id_start + 4 

401 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" 

402 

403 config.spare_size = ( 

404 self.file[object_id_end : object_id_end + page_size].find(spare_signature) 

405 + object_id_offset 

406 + ecc_offset 

407 ) 

408 

409 # Sanity check the spare size, make sure it looks legit 

410 if config.spare_size not in VALID_SPARE_SIZES: 

411 raise InvalidInputFormat( 

412 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}" 

413 ) 

414 

415 return config 

416 

417 def insert_entry(self, entry: YAFFSEntry): 

418 duplicate_node = self.get_entry(entry.object_id) 

419 if duplicate_node is not None: 

420 # a header chunk with the same object ID already exists 

421 # in the tree, meaning the file metadata were modified, 

422 # or the file got truncated / rewritten. 

423 # Given that YAFFS is a log filesystem, whichever chunk comes 

424 # last takes precendence. 

425 self.file_entries.update_node(str(entry.object_id), data=entry) 

426 return 

427 

428 if entry.object_id == entry.parent_obj_id: 

429 self.file_entries.create_node( 

430 str(entry.object_id), 

431 str(entry.object_id), 

432 data=entry, 

433 ) 

434 else: 

435 parent_node = self.get_entry(entry.parent_obj_id) 

436 if parent_node is None: 

437 logger.warning("Trying to insert an orphaned entry.", entry=entry) 

438 return 

439 if parent_node.object_type != YaffsObjectType.DIRECTORY: 

440 logger.warning( 

441 "Trying to insert an entry with non-directory parent.", entry=entry 

442 ) 

443 return 

444 self.file_entries.create_node( 

445 str(entry.object_id), 

446 str(entry.object_id), 

447 data=entry, 

448 parent=str(entry.parent_obj_id), 

449 ) 

450 

451 def get_entry(self, object_id: int) -> Optional[YAFFSEntry]: 

452 try: 

453 entry = self.file_entries.get_node(str(object_id)) 

454 if entry: 

455 return entry.data 

456 except NodeIDAbsentError: 

457 logger.warning( 

458 "Can't find entry within the YAFFS tree, something's wrong.", 

459 object_id=object_id, 

460 ) 

461 return None 

462 

463 def resolve_path(self, entry: YAFFSEntry) -> Path: 

464 resolved_path = Path(entry.name) 

465 if self.file_entries.parent(str(entry.object_id)) is not None: 

466 parent_entry = self.file_entries[str(entry.parent_obj_id)].data 

467 return self.resolve_path(parent_entry).joinpath(resolved_path) 

468 return resolved_path 

469 

470 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: 

471 for chunk in self.get_chunks(entry.object_id): 

472 yield self.file[chunk.offset : chunk.offset + chunk.byte_count] 

473 

474 def extract(self, fs: FileSystem): 

475 for entry in [ 

476 self.file_entries.get_node(node) 

477 for node in self.file_entries.expand_tree(mode=Tree.DEPTH) 

478 ]: 

479 if entry is None or entry.data is None: 

480 continue 

481 self.extract_entry(entry.data, fs) 

482 

483 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem): 

484 if entry.object_type == YaffsObjectType.UNKNOWN: 

485 logger.warning("unknown entry type", entry=entry) 

486 return 

487 

488 out_path = self.resolve_path(entry) 

489 

490 if entry.object_type == YaffsObjectType.SPECIAL: 

491 if not isinstance(entry, YAFFS2Entry): 

492 logger.warning("non YAFFS2 special object", entry=entry) 

493 return 

494 

495 fs.mknod(out_path, entry.st_mode, entry.st_rdev) 

496 elif entry.object_type == YaffsObjectType.DIRECTORY: 

497 fs.mkdir(out_path, exist_ok=True) 

498 elif entry.object_type == YaffsObjectType.FILE: 

499 fs.write_chunks(out_path, self.get_file_chunks(entry)) 

500 elif entry.object_type == YaffsObjectType.SYMLINK: 

501 fs.create_symlink(src=Path(entry.alias), dst=out_path) 

502 elif entry.object_type == YaffsObjectType.HARDLINK: 

503 dst_entry = self.file_entries[str(entry.equiv_id)].data 

504 dst_path = self.resolve_path(dst_entry) 

505 fs.create_hardlink(src=dst_path, dst=out_path) 

506 

507 

508class YAFFS2Parser(YAFFSParser): 

509 HEADER_STRUCT = "yaffs2_obj_hdr_t" 

510 

511 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: 

512 # images built without ECC have two superfluous bytes before the chunk ID. 

513 if not self.config.ecc: 

514 # adding two null bytes at the end only works if it's LE 

515 spare = spare[2:] + b"\x00\x00" 

516 

517 yaffs2_packed_tags = self._struct_parser.parse( 

518 "yaffs2_packed_tags_t", spare, self.config.endianness 

519 ) 

520 logger.debug( 

521 "yaffs2_packed_tags_t", 

522 yaffs2_packed_tags=yaffs2_packed_tags, 

523 config=self.config, 

524 _verbosity=3, 

525 ) 

526 

527 return YAFFS2Chunk( 

528 offset=offset, 

529 chunk_id=yaffs2_packed_tags.chunk_id, 

530 seq_number=yaffs2_packed_tags.seq_number, 

531 byte_count=yaffs2_packed_tags.byte_count, 

532 object_id=yaffs2_packed_tags.object_id, 

533 ) 

534 

535 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

536 return YAFFS2Entry( 

537 object_id=chunk.object_id, 

538 object_type=header.type, 

539 parent_obj_id=header.parent_obj_id, 

540 sum_no_longer_used=header.sum_no_longer_used, 

541 name=snull(header.name[:-1]).decode("utf-8"), 

542 chksum=header.chksum, 

543 st_mode=header.st_mode, 

544 st_uid=header.st_uid, 

545 st_gid=header.st_gid, 

546 st_atime=header.st_atime, 

547 st_mtime=header.st_mtime, 

548 st_ctime=header.st_ctime, 

549 equiv_id=header.equiv_id, 

550 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

551 st_rdev=header.st_rdev, 

552 win_ctime=header.win_ctime, 

553 win_mtime=header.win_mtime, 

554 inband_shadowed_obj_id=header.inband_shadowed_obj_id, 

555 inband_is_shrink=header.inband_is_shrink, 

556 reserved=header.reserved, 

557 shadows_obj=header.shadows_obj, 

558 is_shrink=header.is_shrink, 

559 filehead=YAFFSFileVar( 

560 file_size=header.filehead.file_size, 

561 stored_size=header.filehead.stored_size, 

562 shrink_size=header.filehead.shrink_size, 

563 top_level=header.filehead.top_level, 

564 ), 

565 file_size=decode_file_size(header.file_size_high, header.file_size_low), 

566 ) 

567 

568 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]: 

569 """Return a filtered and ordered list of chunks.""" 

570 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number! 

571 

572 # As each block is allocated, the file system's 

573 # sequence number is incremented and each chunk in the block is marked with that 

574 # sequence number. The sequence number thus provides a way of organising the log in 

575 # chronological order. 

576 

577 # Since we're scanning backwards, the most recently written - and thus current - chunk 

578 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted. 

579 

580 # note: there is no deletion marker in YAFFS2 

581 

582 for _, chunks in itertools.groupby( 

583 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id) 

584 ): 

585 yield max(chunks, key=lambda chunk: chunk.seq_number) 

586 

587 def init_tree(self): 

588 # YAFFS2 do not store the root in file. 

589 root = YAFFS2Entry( 

590 object_type=YaffsObjectType.DIRECTORY, 

591 object_id=1, 

592 parent_obj_id=1, 

593 ) 

594 self.insert_entry(root) 

595 

596 

597class YAFFS1Parser(YAFFSParser): 

598 HEADER_STRUCT = "yaffs1_obj_hdr_t" 

599 

600 def __init__(self, file: File, config: Optional[YAFFSConfig] = None): 

601 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk 

602 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare). 

603 # In the future we might decide to allow for different chunk sizes. 

604 config = YAFFSConfig( 

605 page_size=YAFFS1_PAGE_SIZE, 

606 spare_size=YAFFS1_SPARE_SIZE, 

607 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS), 

608 ecc=False, 

609 ) 

610 super().__init__(file, config) 

611 

612 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: 

613 yaffs_sparse = self._struct_parser.parse( 

614 "yaffs_spare_t", spare, self.config.endianness 

615 ) 

616 

617 yaffs_packed_tags = self._struct_parser.parse( 

618 "yaffs1_packed_tags_t", 

619 bytes( 

620 [ 

621 yaffs_sparse.tag_b0, 

622 yaffs_sparse.tag_b1, 

623 yaffs_sparse.tag_b2, 

624 yaffs_sparse.tag_b3, 

625 yaffs_sparse.tag_b4, 

626 yaffs_sparse.tag_b5, 

627 yaffs_sparse.tag_b6, 

628 yaffs_sparse.tag_b7, 

629 ] 

630 ), 

631 self.config.endianness, 

632 ) 

633 

634 return YAFFS1Chunk( 

635 offset=offset, 

636 chunk_id=yaffs_packed_tags.chunk_id, 

637 serial=yaffs_packed_tags.serial, 

638 byte_count=yaffs_packed_tags.byte_count, 

639 object_id=yaffs_packed_tags.object_id, 

640 ecc=yaffs_packed_tags.ecc, 

641 page_status=yaffs_sparse.page_status, 

642 block_status=yaffs_sparse.block_status, 

643 ) 

644 

645 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

646 return YAFFSEntry( 

647 object_type=header.type, 

648 object_id=chunk.object_id, 

649 parent_obj_id=header.parent_obj_id, 

650 sum_no_longer_used=header.sum_no_longer_used, 

651 name=snull(header.name[0:128]).decode("utf-8"), 

652 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

653 file_size=header.file_size, 

654 equiv_id=header.equivalent_object_id, 

655 ) 

656 

657 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]: 

658 """Return a filtered and ordered list of chunks.""" 

659 # YAFFS1 chunks have a serial number that is used to track 

660 # which chunk takes precedence if two chunks have the same 

661 # identifier. This is used in scenarios like power loss 

662 # during a copy operation. Whenever we have two chunks with 

663 # the same id, we only return the one with the highest serial. 

664 

665 for _, chunks in itertools.groupby( 

666 sorted( 

667 self.data_chunks[object_id], 

668 key=lambda chunk: chunk.chunk_id, 

669 ) 

670 ): 

671 # serial is a 2 bit, this function works since there's always at most 

672 # two chunks with the same chunk_id at any given time 

673 yield max(chunks, key=lambda chunk: ((chunk.serial + 1) & 3)) 

674 

675 

676def is_yaffs_v1(file: File, start_offset: int) -> bool: 

677 struct_parser = StructParser(C_DEFINITIONS) 

678 file.seek(start_offset, io.SEEK_SET) 

679 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00": 

680 endian = Endian.LITTLE 

681 else: 

682 endian = Endian.BIG 

683 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET) 

684 spare = file.read(YAFFS1_SPARE_SIZE) 

685 

686 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian) 

687 

688 yaffs_packed_tags = struct_parser.parse( 

689 "yaffs1_packed_tags_t", 

690 bytes( 

691 [ 

692 yaffs_sparse.tag_b0, 

693 yaffs_sparse.tag_b1, 

694 yaffs_sparse.tag_b2, 

695 yaffs_sparse.tag_b3, 

696 yaffs_sparse.tag_b4, 

697 yaffs_sparse.tag_b5, 

698 yaffs_sparse.tag_b6, 

699 yaffs_sparse.tag_b7, 

700 ] 

701 ), 

702 endian, 

703 ) 

704 file.seek(start_offset, io.SEEK_SET) 

705 return ( 

706 yaffs_packed_tags.chunk_id == 0 

707 and yaffs_packed_tags.serial == 0 

708 and yaffs_packed_tags.object_id == 1 

709 ) 

710 

711 

712def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser: 

713 if is_yaffs_v1(file, start_offset): 

714 return YAFFS1Parser(file) 

715 return YAFFS2Parser(file) 

716 

717 

718class YAFFSExtractor(Extractor): 

719 def extract(self, inpath: Path, outdir: Path): 

720 infile = File.from_path(inpath) 

721 parser = instantiate_parser(infile) 

722 parser.parse(store=True) 

723 fs = FileSystem(outdir) 

724 parser.extract(fs) 

725 return ExtractResult(reports=fs.problems) 

726 

727 

728class YAFFSHandler(Handler): 

729 NAME = "yaffs" 

730 

731 PATTERNS = [ 

732 HexString( 

733 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian" 

734 ), 

735 HexString( 

736 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian" 

737 ), 

738 HexString( 

739 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian" 

740 ), 

741 HexString( 

742 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian" 

743 ), 

744 ] 

745 

746 EXTRACTOR = YAFFSExtractor() 

747 

748 DOC = HandlerDoc( 

749 name="YAFFS", 

750 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.", 

751 handler_type=HandlerType.FILESYSTEM, 

752 vendor=None, 

753 references=[ 

754 Reference( 

755 title="YAFFS Documentation", 

756 url="https://yaffs.net/", 

757 ), 

758 Reference( 

759 title="YAFFS Wikipedia", 

760 url="https://en.wikipedia.org/wiki/YAFFS", 

761 ), 

762 ], 

763 limitations=[], 

764 ) 

765 

766 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

767 parser = instantiate_parser(file, start_offset) 

768 parser.parse() 

769 # skip 0xFF padding 

770 file.seek(parser.end_offset, io.SEEK_SET) 

771 read_until_past(file, b"\xff") 

772 return ValidChunk(start_offset=start_offset, end_offset=file.tell())