Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/filesystem/yaffs.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

312 statements  

1import io 

2import itertools 

3from collections import defaultdict 

4from collections.abc import Iterable 

5from enum import IntEnum 

6from pathlib import Path 

7 

8import attrs 

9from structlog import get_logger 

10from treelib.exceptions import NodeIDAbsentError 

11from treelib.tree import Tree 

12 

13from unblob.file_utils import ( 

14 Endian, 

15 File, 

16 FileSystem, 

17 InvalidInputFormat, 

18 StructParser, 

19 get_endian_multi, 

20 read_until_past, 

21 snull, 

22) 

23from unblob.models import ( 

24 Extractor, 

25 ExtractResult, 

26 Handler, 

27 HandlerDoc, 

28 HandlerType, 

29 HexString, 

30 Reference, 

31 ValidChunk, 

32) 

33 

34logger = get_logger() 

35 

36SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00" 

37SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00" 

38SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00" 

39SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00" 

40SPARE_START_LEN = 6 

41 

42# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE 

43BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03] 

44 

45VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032] 

46VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512] 

47YAFFS1_PAGE_SIZE = 512 

48YAFFS1_SPARE_SIZE = 16 

49 

50C_DEFINITIONS = """ 

51 struct yaffs1_obj_hdr { 

52 uint32 type; /* enum yaffs_obj_type */ 

53 uint32 parent_obj_id; 

54 uint16 sum_no_longer_used; 

55 char name[258]; 

56 uint32 st_mode; // protection 

57 uint32 st_uid; // user ID of owner 

58 uint32 st_gid; // group ID of owner 

59 uint32 st_atime; // time of last access 

60 uint32 st_mtime; // time of last modification 

61 uint32 st_ctime; // time of last change 

62 uint32 file_size; // File size applies to files only 

63 uint32 equivalent_object_id; // Equivalent object id applies to hard links only. 

64 char alias[160]; // alias only applies to symlinks 

65 } yaffs1_obj_hdr_t; 

66 

67 struct yaffs1_packed_tags { 

68 uint32 chunk_id:20; 

69 uint32 serial:2; 

70 uint32 byte_count:10; 

71 uint32 object_id:18; 

72 uint32 ecc:12; 

73 uint32 unused:2; 

74 } yaffs1_packed_tags_t; 

75 

76 typedef struct yaffs_spare 

77 { 

78 uint8 tag_b0; 

79 uint8 tag_b1; 

80 uint8 tag_b2; 

81 uint8 tag_b3; 

82 uint8 page_status; // set to 0 to delete the chunk 

83 uint8 block_status; 

84 uint8 tag_b4; 

85 uint8 tag_b5; 

86 uint8 ecc_0; 

87 uint8 ecc_1; 

88 uint8 ecc_2; 

89 uint8 tag_b6; 

90 uint8 tag_b7; 

91 uint8 ecc_3; 

92 uint8 ecc_4; 

93 uint8 ecc_5; 

94 } yaffs_spare_t; 

95 

96 struct yaffs_file_var { 

97 uint32 file_size; 

98 uint32 stored_size; 

99 uint32 shrink_size; 

100 int top_level; 

101 }; 

102 

103 typedef struct yaffs2_obj_hdr { 

104 uint32 type; /* enum yaffs_obj_type */ 

105 /* Apply to everything */ 

106 uint32 parent_obj_id; 

107 uint16 sum_no_longer_used; /* checksum of name. No longer used */ 

108 char name[256]; 

109 uint16 chksum; 

110 /* The following apply to all object types except for hard links */ 

111 uint32 st_mode; /* protection */ 

112 uint32 st_uid; 

113 uint32 st_gid; 

114 uint32 st_atime; 

115 uint32 st_mtime; 

116 uint32 st_ctime; 

117 uint32 file_size_low; /* File size applies to files only */ 

118 int equiv_id; /* Equivalent object id applies to hard links only. */ 

119 char alias[160]; /* Alias is for symlinks only. */ 

120 uint32 st_rdev; /* stuff for block and char devices (major/min) */ 

121 uint32 win_ctime[2]; 

122 uint32 win_atime[2]; 

123 uint32 win_mtime[2]; 

124 uint32 inband_shadowed_obj_id; 

125 uint32 inband_is_shrink; 

126 uint32 file_size_high; 

127 uint32 reserved[1]; 

128 int shadows_obj; /* This object header shadows the specified object if > 0 */ 

129 /* is_shrink applies to object headers written when we make a hole. */ 

130 uint32 is_shrink; 

131 yaffs_file_var filehead; 

132 } yaffs2_obj_hdr_t; 

133 

134 typedef struct yaffs2_packed_tags { 

135 uint32 seq_number; 

136 uint32 object_id; 

137 uint32 chunk_id; 

138 uint32 byte_count; 

139 } yaffs2_packed_tags_t; 

140""" 

141 

142_STRUCT_PARSER = StructParser(C_DEFINITIONS) 

143 

144 

145class YaffsObjectType(IntEnum): 

146 UNKNOWN = 0 

147 FILE = 1 

148 SYMLINK = 2 

149 DIRECTORY = 3 

150 HARDLINK = 4 

151 SPECIAL = 5 

152 

153 

154@attrs.define 

155class YAFFSChunk: 

156 chunk_id: int 

157 offset: int 

158 byte_count: int 

159 object_id: int 

160 

161 

162@attrs.define 

163class YAFFS1Chunk(YAFFSChunk): 

164 serial: int 

165 ecc: bytes 

166 page_status: int 

167 block_status: int 

168 

169 

170@attrs.define 

171class YAFFS2Chunk(YAFFSChunk): 

172 seq_number: int 

173 

174 

175@attrs.define 

176class YAFFSFileVar: 

177 file_size: int 

178 stored_size: int 

179 shrink_size: int 

180 top_level: int 

181 

182 

183@attrs.define 

184class YAFFSConfig: 

185 endianness: Endian 

186 page_size: int 

187 spare_size: int 

188 ecc: bool 

189 

190 

191@attrs.define 

192class YAFFSEntry: 

193 object_type: YaffsObjectType 

194 object_id: int 

195 parent_obj_id: int 

196 sum_no_longer_used: int = attrs.field(default=0) 

197 name: str = attrs.field(default="") 

198 alias: str = attrs.field(default="") 

199 equiv_id: int = attrs.field(default=0) 

200 file_size: int = attrs.field(default=0) 

201 st_mode: int = attrs.field(default=0) 

202 st_uid: int = attrs.field(default=0) 

203 st_gid: int = attrs.field(default=0) 

204 st_atime: int = attrs.field(default=0) 

205 st_mtime: int = attrs.field(default=0) 

206 st_ctime: int = attrs.field(default=0) 

207 

208 def __str__(self): 

209 return f"{self.object_id}: {self.name}" 

210 

211 

212@attrs.define(kw_only=True) 

213class YAFFS2Entry(YAFFSEntry): 

214 chksum: int = attrs.field(default=0) 

215 st_rdev: int = attrs.field(default=0) 

216 win_ctime: list[int] = attrs.field(default=[]) 

217 win_mtime: list[int] = attrs.field(default=[]) 

218 inband_shadowed_obj_id: int = attrs.field(default=0) 

219 inband_is_shrink: int = attrs.field(default=0) 

220 reserved: list[int] = attrs.field(default=[]) 

221 shadows_obj: int = attrs.field(default=0) 

222 is_shrink: int = attrs.field(default=0) 

223 filehead: YAFFSFileVar = attrs.field(default=None) 

224 

225 

226def iterate_over_file( 

227 file: File, config: YAFFSConfig 

228) -> Iterable[tuple[int, bytes, bytes]]: 

229 start_offset = file.tell() 

230 page = file.read(config.page_size) 

231 spare = file.read(config.spare_size) 

232 

233 while len(page) == config.page_size and len(spare) == config.spare_size: 

234 yield (start_offset, page, spare) 

235 page = file.read(config.page_size) 

236 spare = file.read(config.spare_size) 

237 start_offset = file.tell() 

238 

239 

240def decode_file_size(high: int, low: int) -> int: 

241 """File size can be encoded as 64 bits or 32 bits values. 

242 

243 If upper 32 bits are set, it's a 64 bits integer value. 

244 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero. 

245 """ 

246 if high != 0xFFFFFFFF: 

247 return (high << 32) | (low & 0xFFFFFFFF) 

248 if low != 0xFFFFFFFF: 

249 return low 

250 return 0 

251 

252 

253def valid_name(name: bytes) -> bool: 

254 # a valid name is either full of null bytes, or unicode decodable 

255 try: 

256 snull(name[:-1]).decode("utf-8") 

257 except UnicodeDecodeError: 

258 return False 

259 else: 

260 return True 

261 

262 

263def is_valid_header(header) -> bool: 

264 if not valid_name(header.name[:-3]): 

265 return False 

266 if header.type > 5: 

267 return False 

268 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103 

269 return False 

270 return True 

271 

272 

273class YAFFSParser: 

274 HEADER_STRUCT: str 

275 

276 def __init__(self, file: File, config: YAFFSConfig | None = None): 

277 self.file_entries = Tree() 

278 self.data_chunks = defaultdict(list) 

279 self.file = file 

280 self.end_offset = -1 

281 if config is None: 

282 self.config = self.auto_detect() 

283 logger.debug("auto-detected config", config=self.config) 

284 else: 

285 self.config = config 

286 

287 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

288 raise NotImplementedError 

289 

290 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk: 

291 raise NotImplementedError 

292 

293 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]: 

294 raise NotImplementedError 

295 

296 def init_tree(self): 

297 return 

298 

299 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002 

300 self.init_tree() 

301 entries = 0 

302 for offset, page, spare in iterate_over_file(self.file, self.config): 

303 try: 

304 data_chunk = self.build_chunk( 

305 spare, offset - self.config.page_size - self.config.spare_size 

306 ) 

307 except EOFError: 

308 break 

309 

310 # ignore chunks tagged as deleted 

311 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0: 

312 continue 

313 

314 if data_chunk.chunk_id == 0: 

315 try: 

316 header = _STRUCT_PARSER.parse( 

317 self.HEADER_STRUCT, page, self.config.endianness 

318 ) 

319 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3) 

320 except EOFError: 

321 break 

322 

323 if not is_valid_header(header): 

324 break 

325 

326 if store: 

327 self.insert_entry(self.build_entry(header, data_chunk)) 

328 entries += 1 

329 elif store: 

330 self.data_chunks[data_chunk.object_id].append(data_chunk) 

331 if not entries: 

332 raise InvalidInputFormat("YAFFS filesystem with no entries.") 

333 self.end_offset = self.file.tell() 

334 

335 def auto_detect(self) -> YAFFSConfig: 

336 """Auto-detect page_size, spare_size, and ECC using known signatures.""" 

337 page_size = 0 

338 config = None 

339 for page_size in VALID_PAGE_SIZES: 

340 spare_start = self.file[page_size : page_size + SPARE_START_LEN] 

341 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC): 

342 config = YAFFSConfig( 

343 endianness=Endian.LITTLE, 

344 page_size=page_size, 

345 ecc=True, 

346 spare_size=-1, 

347 ) 

348 break 

349 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC): 

350 config = YAFFSConfig( 

351 endianness=Endian.LITTLE, 

352 page_size=page_size, 

353 ecc=False, 

354 spare_size=-1, 

355 ) 

356 break 

357 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC): 

358 config = YAFFSConfig( 

359 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1 

360 ) 

361 break 

362 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC): 

363 config = YAFFSConfig( 

364 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1 

365 ) 

366 break 

367 

368 if config is None: 

369 raise InvalidInputFormat("Cannot detect YAFFS configuration.") 

370 

371 # If not using the ECC layout, there are 2 extra bytes at the beginning of the 

372 # spare data block. Ignore them. 

373 

374 ecc_offset = 0 if config.ecc else 2 

375 

376 # The spare data signature is built dynamically, as there are repeating data patterns 

377 # that we can match on to find where the spare data ends. Take this hexdump for example: 

378 # 

379 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................| 

380 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...| 

381 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| 

382 # 

383 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then 

384 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and 

385 # the four bytes at 0x814 (in the next page data section) are identical. This is because 

386 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four 

387 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the 

388 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name 

389 # checksum bytes. 

390 # 

391 # Thus, the signature for identifying the next page section (and hence, the end of the 

392 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF 

393 # 

394 # Note that this requires at least one non-empty subdirectory; in practice, any Linux 

395 # file system should meet this requirement, but one could create a file system that 

396 # does not meet this requirement. 

397 

398 object_id_offset = 4 

399 object_id_start = page_size + ecc_offset + object_id_offset 

400 object_id_end = object_id_start + 4 

401 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff" 

402 

403 config.spare_size = ( 

404 self.file[object_id_end : object_id_end + page_size].find(spare_signature) 

405 + object_id_offset 

406 + ecc_offset 

407 ) 

408 

409 # Sanity check the spare size, make sure it looks legit 

410 if config.spare_size not in VALID_SPARE_SIZES: 

411 raise InvalidInputFormat( 

412 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}" 

413 ) 

414 

415 return config 

416 

417 def insert_entry(self, entry: YAFFSEntry): 

418 duplicate_node = self.get_entry(entry.object_id) 

419 if duplicate_node is not None: 

420 # a header chunk with the same object ID already exists 

421 # in the tree, meaning the file metadata were modified, 

422 # or the file got truncated / rewritten. 

423 # Given that YAFFS is a log filesystem, whichever chunk comes 

424 # last takes precendence. 

425 self.file_entries.update_node(str(entry.object_id), data=entry) 

426 return 

427 

428 if entry.object_id == entry.parent_obj_id: 

429 self.file_entries.create_node( 

430 str(entry.object_id), 

431 str(entry.object_id), 

432 data=entry, 

433 ) 

434 else: 

435 parent_node = self.get_entry(entry.parent_obj_id) 

436 if parent_node is None: 

437 logger.warning("Trying to insert an orphaned entry.", entry=entry) 

438 return 

439 if parent_node.object_type != YaffsObjectType.DIRECTORY: 

440 logger.warning( 

441 "Trying to insert an entry with non-directory parent.", entry=entry 

442 ) 

443 return 

444 self.file_entries.create_node( 

445 str(entry.object_id), 

446 str(entry.object_id), 

447 data=entry, 

448 parent=str(entry.parent_obj_id), 

449 ) 

450 

451 def get_entry(self, object_id: int) -> YAFFSEntry | None: 

452 try: 

453 entry = self.file_entries.get_node(str(object_id)) 

454 if entry: 

455 return entry.data 

456 except NodeIDAbsentError: 

457 logger.warning( 

458 "Can't find entry within the YAFFS tree, something's wrong.", 

459 object_id=object_id, 

460 ) 

461 return None 

462 

463 def resolve_path(self, entry: YAFFSEntry) -> Path: 

464 resolved_path = Path(entry.name) 

465 if self.file_entries.parent(str(entry.object_id)) is not None: 

466 parent_entry = self.file_entries[str(entry.parent_obj_id)].data 

467 return self.resolve_path(parent_entry).joinpath(resolved_path) 

468 return resolved_path 

469 

470 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]: 

471 for chunk in self.get_chunks(entry.object_id): 

472 yield self.file[chunk.offset : chunk.offset + chunk.byte_count] 

473 

474 def extract(self, fs: FileSystem): 

475 for entry in [ 

476 self.file_entries.get_node(node) 

477 for node in self.file_entries.expand_tree(mode=Tree.DEPTH) 

478 ]: 

479 if entry is None or entry.data is None: 

480 continue 

481 self.extract_entry(entry.data, fs) 

482 

483 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem): 

484 if entry.object_type == YaffsObjectType.UNKNOWN: 

485 logger.warning("unknown entry type", entry=entry) 

486 return 

487 

488 out_path = self.resolve_path(entry) 

489 

490 if entry.object_type == YaffsObjectType.SPECIAL: 

491 if not isinstance(entry, YAFFS2Entry): 

492 logger.warning("non YAFFS2 special object", entry=entry) 

493 return 

494 

495 fs.mknod(out_path, entry.st_mode, entry.st_rdev) 

496 elif entry.object_type == YaffsObjectType.DIRECTORY: 

497 fs.mkdir(out_path, exist_ok=True) 

498 elif entry.object_type == YaffsObjectType.FILE: 

499 fs.write_chunks(out_path, self.get_file_chunks(entry)) 

500 elif entry.object_type == YaffsObjectType.SYMLINK: 

501 fs.create_symlink(src=Path(entry.alias), dst=out_path) 

502 elif entry.object_type == YaffsObjectType.HARDLINK: 

503 dst_entry = self.file_entries[str(entry.equiv_id)].data 

504 dst_path = self.resolve_path(dst_entry) 

505 fs.create_hardlink(src=dst_path, dst=out_path) 

506 

507 

508class YAFFS2Parser(YAFFSParser): 

509 HEADER_STRUCT = "yaffs2_obj_hdr_t" 

510 

511 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk: 

512 # images built without ECC have two superfluous bytes before the chunk ID. 

513 if not self.config.ecc: 

514 # adding two null bytes at the end only works if it's LE 

515 spare = spare[2:] + b"\x00\x00" 

516 

517 yaffs2_packed_tags = _STRUCT_PARSER.parse( 

518 "yaffs2_packed_tags_t", spare, self.config.endianness 

519 ) 

520 logger.debug( 

521 "yaffs2_packed_tags_t", 

522 yaffs2_packed_tags=yaffs2_packed_tags, 

523 config=self.config, 

524 _verbosity=3, 

525 ) 

526 

527 return YAFFS2Chunk( 

528 offset=offset, 

529 chunk_id=yaffs2_packed_tags.chunk_id, 

530 seq_number=yaffs2_packed_tags.seq_number, 

531 byte_count=yaffs2_packed_tags.byte_count, 

532 object_id=yaffs2_packed_tags.object_id, 

533 ) 

534 

535 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

536 return YAFFS2Entry( 

537 object_id=chunk.object_id, 

538 object_type=header.type, 

539 parent_obj_id=header.parent_obj_id, 

540 sum_no_longer_used=header.sum_no_longer_used, 

541 name=snull(header.name[:-1]).decode("utf-8"), 

542 chksum=header.chksum, 

543 st_mode=header.st_mode, 

544 st_uid=header.st_uid, 

545 st_gid=header.st_gid, 

546 st_atime=header.st_atime, 

547 st_mtime=header.st_mtime, 

548 st_ctime=header.st_ctime, 

549 equiv_id=header.equiv_id, 

550 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

551 st_rdev=header.st_rdev, 

552 win_ctime=header.win_ctime, 

553 win_mtime=header.win_mtime, 

554 inband_shadowed_obj_id=header.inband_shadowed_obj_id, 

555 inband_is_shrink=header.inband_is_shrink, 

556 reserved=header.reserved, 

557 shadows_obj=header.shadows_obj, 

558 is_shrink=header.is_shrink, 

559 filehead=YAFFSFileVar( 

560 file_size=header.filehead.file_size, 

561 stored_size=header.filehead.stored_size, 

562 shrink_size=header.filehead.shrink_size, 

563 top_level=header.filehead.top_level, 

564 ), 

565 file_size=decode_file_size(header.file_size_high, header.file_size_low), 

566 ) 

567 

568 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]: 

569 """Return a filtered and ordered list of chunks.""" 

570 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number! 

571 

572 # As each block is allocated, the file system's 

573 # sequence number is incremented and each chunk in the block is marked with that 

574 # sequence number. The sequence number thus provides a way of organising the log in 

575 # chronological order. 

576 

577 # Since we're scanning backwards, the most recently written - and thus current - chunk 

578 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted. 

579 

580 # note: there is no deletion marker in YAFFS2 

581 

582 for _, chunks in itertools.groupby( 

583 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id) 

584 ): 

585 yield max(chunks, key=lambda chunk: chunk.seq_number) 

586 

587 def init_tree(self): 

588 # YAFFS2 do not store the root in file. 

589 root = YAFFS2Entry( 

590 object_type=YaffsObjectType.DIRECTORY, 

591 object_id=1, 

592 parent_obj_id=1, 

593 ) 

594 self.insert_entry(root) 

595 

596 

597class YAFFS1Parser(YAFFSParser): 

598 HEADER_STRUCT = "yaffs1_obj_hdr_t" 

599 

600 def __init__(self, file: File, config: YAFFSConfig | None = None): 

601 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk 

602 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare). 

603 # In the future we might decide to allow for different chunk sizes. 

604 config = YAFFSConfig( 

605 page_size=YAFFS1_PAGE_SIZE, 

606 spare_size=YAFFS1_SPARE_SIZE, 

607 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS), 

608 ecc=False, 

609 ) 

610 super().__init__(file, config) 

611 

612 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk: 

613 yaffs_sparse = _STRUCT_PARSER.parse( 

614 "yaffs_spare_t", spare, self.config.endianness 

615 ) 

616 

617 yaffs_packed_tags = _STRUCT_PARSER.parse( 

618 "yaffs1_packed_tags_t", 

619 bytes( 

620 [ 

621 yaffs_sparse.tag_b0, 

622 yaffs_sparse.tag_b1, 

623 yaffs_sparse.tag_b2, 

624 yaffs_sparse.tag_b3, 

625 yaffs_sparse.tag_b4, 

626 yaffs_sparse.tag_b5, 

627 yaffs_sparse.tag_b6, 

628 yaffs_sparse.tag_b7, 

629 ] 

630 ), 

631 self.config.endianness, 

632 ) 

633 

634 return YAFFS1Chunk( 

635 offset=offset, 

636 chunk_id=yaffs_packed_tags.chunk_id, 

637 serial=yaffs_packed_tags.serial, 

638 byte_count=yaffs_packed_tags.byte_count, 

639 object_id=yaffs_packed_tags.object_id, 

640 ecc=yaffs_packed_tags.ecc, 

641 page_status=yaffs_sparse.page_status, 

642 block_status=yaffs_sparse.block_status, 

643 ) 

644 

645 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry: 

646 return YAFFSEntry( 

647 object_type=header.type, 

648 object_id=chunk.object_id, 

649 parent_obj_id=header.parent_obj_id, 

650 sum_no_longer_used=header.sum_no_longer_used, 

651 name=snull(header.name[0:128]).decode("utf-8"), 

652 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"), 

653 file_size=header.file_size, 

654 equiv_id=header.equivalent_object_id, 

655 ) 

656 

657 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]: 

658 """Return a filtered and ordered list of chunks.""" 

659 # YAFFS1 chunks have a serial number that is used to track 

660 # which chunk takes precedence if two chunks have the same 

661 # identifier. This is used in scenarios like power loss 

662 # during a copy operation. Whenever we have two chunks with 

663 # the same id, we only return the one with the highest serial. 

664 

665 for _, chunks in itertools.groupby( 

666 sorted( 

667 self.data_chunks[object_id], 

668 key=lambda chunk: chunk.chunk_id, 

669 ) 

670 ): 

671 # serial is a 2 bit, this function works since there's always at most 

672 # two chunks with the same chunk_id at any given time 

673 yield max(chunks, key=lambda chunk: (chunk.serial + 1) & 3) 

674 

675 

676def is_yaffs_v1(file: File, start_offset: int) -> bool: 

677 file.seek(start_offset, io.SEEK_SET) 

678 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00": 

679 endian = Endian.LITTLE 

680 else: 

681 endian = Endian.BIG 

682 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET) 

683 spare = file.read(YAFFS1_SPARE_SIZE) 

684 

685 yaffs_sparse = _STRUCT_PARSER.parse("yaffs_spare_t", spare, endian) 

686 

687 yaffs_packed_tags = _STRUCT_PARSER.parse( 

688 "yaffs1_packed_tags_t", 

689 bytes( 

690 [ 

691 yaffs_sparse.tag_b0, 

692 yaffs_sparse.tag_b1, 

693 yaffs_sparse.tag_b2, 

694 yaffs_sparse.tag_b3, 

695 yaffs_sparse.tag_b4, 

696 yaffs_sparse.tag_b5, 

697 yaffs_sparse.tag_b6, 

698 yaffs_sparse.tag_b7, 

699 ] 

700 ), 

701 endian, 

702 ) 

703 file.seek(start_offset, io.SEEK_SET) 

704 return ( 

705 yaffs_packed_tags.chunk_id == 0 

706 and yaffs_packed_tags.serial == 0 

707 and yaffs_packed_tags.object_id == 1 

708 ) 

709 

710 

711def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser: 

712 if is_yaffs_v1(file, start_offset): 

713 return YAFFS1Parser(file) 

714 return YAFFS2Parser(file) 

715 

716 

717class YAFFSExtractor(Extractor): 

718 def extract(self, inpath: Path, outdir: Path): 

719 infile = File.from_path(inpath) 

720 parser = instantiate_parser(infile) 

721 parser.parse(store=True) 

722 fs = FileSystem(outdir) 

723 parser.extract(fs) 

724 return ExtractResult(reports=fs.problems) 

725 

726 

727class YAFFSHandler(Handler): 

728 NAME = "yaffs" 

729 

730 PATTERNS = [ 

731 HexString( 

732 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian" 

733 ), 

734 HexString( 

735 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian" 

736 ), 

737 HexString( 

738 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian" 

739 ), 

740 HexString( 

741 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian" 

742 ), 

743 ] 

744 

745 EXTRACTOR = YAFFSExtractor() 

746 

747 DOC = HandlerDoc( 

748 name="YAFFS", 

749 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.", 

750 handler_type=HandlerType.FILESYSTEM, 

751 vendor=None, 

752 references=[ 

753 Reference( 

754 title="YAFFS Documentation", 

755 url="https://yaffs.net/", 

756 ), 

757 Reference( 

758 title="YAFFS Wikipedia", 

759 url="https://en.wikipedia.org/wiki/YAFFS", 

760 ), 

761 ], 

762 limitations=[], 

763 ) 

764 

765 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

766 parser = instantiate_parser(file, start_offset) 

767 parser.parse() 

768 # skip 0xFF padding 

769 file.seek(parser.end_offset, io.SEEK_SET) 

770 read_until_past(file, b"\xff") 

771 return ValidChunk(start_offset=start_offset, end_offset=file.tell())