1import io
2import itertools
3from collections import defaultdict
4from collections.abc import Iterable
5from enum import IntEnum
6from pathlib import Path
7
8import attrs
9from structlog import get_logger
10from treelib.exceptions import NodeIDAbsentError
11from treelib.tree import Tree
12
13from unblob.file_utils import (
14 Endian,
15 File,
16 FileSystem,
17 InvalidInputFormat,
18 StructParser,
19 get_endian_multi,
20 read_until_past,
21 snull,
22)
23from unblob.models import (
24 Extractor,
25 ExtractResult,
26 Handler,
27 HandlerDoc,
28 HandlerType,
29 HexString,
30 Reference,
31 ValidChunk,
32)
33
34logger = get_logger()
35
36SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00"
37SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00"
38SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00"
39SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00"
40SPARE_START_LEN = 6
41
42# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE
43BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03]
44
45VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032]
46VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512]
47YAFFS1_PAGE_SIZE = 512
48YAFFS1_SPARE_SIZE = 16
49
50C_DEFINITIONS = """
51 struct yaffs1_obj_hdr {
52 uint32 type; /* enum yaffs_obj_type */
53 uint32 parent_obj_id;
54 uint16 sum_no_longer_used;
55 char name[258];
56 uint32 st_mode; // protection
57 uint32 st_uid; // user ID of owner
58 uint32 st_gid; // group ID of owner
59 uint32 st_atime; // time of last access
60 uint32 st_mtime; // time of last modification
61 uint32 st_ctime; // time of last change
62 uint32 file_size; // File size applies to files only
63 uint32 equivalent_object_id; // Equivalent object id applies to hard links only.
64 char alias[160]; // alias only applies to symlinks
65 } yaffs1_obj_hdr_t;
66
67 struct yaffs1_packed_tags {
68 uint32 chunk_id:20;
69 uint32 serial:2;
70 uint32 byte_count:10;
71 uint32 object_id:18;
72 uint32 ecc:12;
73 uint32 unused:2;
74 } yaffs1_packed_tags_t;
75
76 typedef struct yaffs_spare
77 {
78 uint8 tag_b0;
79 uint8 tag_b1;
80 uint8 tag_b2;
81 uint8 tag_b3;
82 uint8 page_status; // set to 0 to delete the chunk
83 uint8 block_status;
84 uint8 tag_b4;
85 uint8 tag_b5;
86 uint8 ecc_0;
87 uint8 ecc_1;
88 uint8 ecc_2;
89 uint8 tag_b6;
90 uint8 tag_b7;
91 uint8 ecc_3;
92 uint8 ecc_4;
93 uint8 ecc_5;
94 } yaffs_spare_t;
95
96 struct yaffs_file_var {
97 uint32 file_size;
98 uint32 stored_size;
99 uint32 shrink_size;
100 int top_level;
101 };
102
103 typedef struct yaffs2_obj_hdr {
104 uint32 type; /* enum yaffs_obj_type */
105 /* Apply to everything */
106 uint32 parent_obj_id;
107 uint16 sum_no_longer_used; /* checksum of name. No longer used */
108 char name[256];
109 uint16 chksum;
110 /* The following apply to all object types except for hard links */
111 uint32 st_mode; /* protection */
112 uint32 st_uid;
113 uint32 st_gid;
114 uint32 st_atime;
115 uint32 st_mtime;
116 uint32 st_ctime;
117 uint32 file_size_low; /* File size applies to files only */
118 int equiv_id; /* Equivalent object id applies to hard links only. */
119 char alias[160]; /* Alias is for symlinks only. */
120 uint32 st_rdev; /* stuff for block and char devices (major/min) */
121 uint32 win_ctime[2];
122 uint32 win_atime[2];
123 uint32 win_mtime[2];
124 uint32 inband_shadowed_obj_id;
125 uint32 inband_is_shrink;
126 uint32 file_size_high;
127 uint32 reserved[1];
128 int shadows_obj; /* This object header shadows the specified object if > 0 */
129 /* is_shrink applies to object headers written when we make a hole. */
130 uint32 is_shrink;
131 yaffs_file_var filehead;
132 } yaffs2_obj_hdr_t;
133
134 typedef struct yaffs2_packed_tags {
135 uint32 seq_number;
136 uint32 object_id;
137 uint32 chunk_id;
138 uint32 byte_count;
139 } yaffs2_packed_tags_t;
140"""
141
142
143class YaffsObjectType(IntEnum):
144 UNKNOWN = 0
145 FILE = 1
146 SYMLINK = 2
147 DIRECTORY = 3
148 HARDLINK = 4
149 SPECIAL = 5
150
151
152@attrs.define
153class YAFFSChunk:
154 chunk_id: int
155 offset: int
156 byte_count: int
157 object_id: int
158
159
160@attrs.define
161class YAFFS1Chunk(YAFFSChunk):
162 serial: int
163 ecc: bytes
164 page_status: int
165 block_status: int
166
167
168@attrs.define
169class YAFFS2Chunk(YAFFSChunk):
170 seq_number: int
171
172
173@attrs.define
174class YAFFSFileVar:
175 file_size: int
176 stored_size: int
177 shrink_size: int
178 top_level: int
179
180
181@attrs.define
182class YAFFSConfig:
183 endianness: Endian
184 page_size: int
185 spare_size: int
186 ecc: bool
187
188
189@attrs.define
190class YAFFSEntry:
191 object_type: YaffsObjectType
192 object_id: int
193 parent_obj_id: int
194 sum_no_longer_used: int = attrs.field(default=0)
195 name: str = attrs.field(default="")
196 alias: str = attrs.field(default="")
197 equiv_id: int = attrs.field(default=0)
198 file_size: int = attrs.field(default=0)
199 st_mode: int = attrs.field(default=0)
200 st_uid: int = attrs.field(default=0)
201 st_gid: int = attrs.field(default=0)
202 st_atime: int = attrs.field(default=0)
203 st_mtime: int = attrs.field(default=0)
204 st_ctime: int = attrs.field(default=0)
205
206 def __str__(self):
207 return f"{self.object_id}: {self.name}"
208
209
210@attrs.define(kw_only=True)
211class YAFFS2Entry(YAFFSEntry):
212 chksum: int = attrs.field(default=0)
213 st_rdev: int = attrs.field(default=0)
214 win_ctime: list[int] = attrs.field(default=[])
215 win_mtime: list[int] = attrs.field(default=[])
216 inband_shadowed_obj_id: int = attrs.field(default=0)
217 inband_is_shrink: int = attrs.field(default=0)
218 reserved: list[int] = attrs.field(default=[])
219 shadows_obj: int = attrs.field(default=0)
220 is_shrink: int = attrs.field(default=0)
221 filehead: YAFFSFileVar = attrs.field(default=None)
222
223
224def iterate_over_file(
225 file: File, config: YAFFSConfig
226) -> Iterable[tuple[int, bytes, bytes]]:
227 start_offset = file.tell()
228 page = file.read(config.page_size)
229 spare = file.read(config.spare_size)
230
231 while len(page) == config.page_size and len(spare) == config.spare_size:
232 yield (start_offset, page, spare)
233 page = file.read(config.page_size)
234 spare = file.read(config.spare_size)
235 start_offset = file.tell()
236
237
238def decode_file_size(high: int, low: int) -> int:
239 """File size can be encoded as 64 bits or 32 bits values.
240
241 If upper 32 bits are set, it's a 64 bits integer value.
242 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero.
243 """
244 if high != 0xFFFFFFFF:
245 return (high << 32) | (low & 0xFFFFFFFF)
246 if low != 0xFFFFFFFF:
247 return low
248 return 0
249
250
251def valid_name(name: bytes) -> bool:
252 # a valid name is either full of null bytes, or unicode decodable
253 try:
254 snull(name[:-1]).decode("utf-8")
255 except UnicodeDecodeError:
256 return False
257 else:
258 return True
259
260
261def is_valid_header(header) -> bool:
262 if not valid_name(header.name[:-3]):
263 return False
264 if header.type > 5:
265 return False
266 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103
267 return False
268 return True
269
270
271class YAFFSParser:
272 HEADER_STRUCT: str
273
274 def __init__(self, file: File, config: YAFFSConfig | None = None):
275 self.file_entries = Tree()
276 self.data_chunks = defaultdict(list)
277 self.file = file
278 self._struct_parser = StructParser(C_DEFINITIONS)
279 self.end_offset = -1
280 if config is None:
281 self.config = self.auto_detect()
282 logger.debug("auto-detected config", config=self.config)
283 else:
284 self.config = config
285
286 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
287 raise NotImplementedError
288
289 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk:
290 raise NotImplementedError
291
292 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]:
293 raise NotImplementedError
294
295 def init_tree(self):
296 return
297
298 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002
299 self.init_tree()
300 entries = 0
301 for offset, page, spare in iterate_over_file(self.file, self.config):
302 try:
303 data_chunk = self.build_chunk(
304 spare, offset - self.config.page_size - self.config.spare_size
305 )
306 except EOFError:
307 break
308
309 # ignore chunks tagged as deleted
310 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0:
311 continue
312
313 if data_chunk.chunk_id == 0:
314 try:
315 header = self._struct_parser.parse(
316 self.HEADER_STRUCT, page, self.config.endianness
317 )
318 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3)
319 except EOFError:
320 break
321
322 if not is_valid_header(header):
323 break
324
325 if store:
326 self.insert_entry(self.build_entry(header, data_chunk))
327 entries += 1
328 elif store:
329 self.data_chunks[data_chunk.object_id].append(data_chunk)
330 if not entries:
331 raise InvalidInputFormat("YAFFS filesystem with no entries.")
332 self.end_offset = self.file.tell()
333
334 def auto_detect(self) -> YAFFSConfig:
335 """Auto-detect page_size, spare_size, and ECC using known signatures."""
336 page_size = 0
337 config = None
338 for page_size in VALID_PAGE_SIZES:
339 spare_start = self.file[page_size : page_size + SPARE_START_LEN]
340 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC):
341 config = YAFFSConfig(
342 endianness=Endian.LITTLE,
343 page_size=page_size,
344 ecc=True,
345 spare_size=-1,
346 )
347 break
348 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC):
349 config = YAFFSConfig(
350 endianness=Endian.LITTLE,
351 page_size=page_size,
352 ecc=False,
353 spare_size=-1,
354 )
355 break
356 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC):
357 config = YAFFSConfig(
358 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1
359 )
360 break
361 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC):
362 config = YAFFSConfig(
363 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1
364 )
365 break
366
367 if config is None:
368 raise InvalidInputFormat("Cannot detect YAFFS configuration.")
369
370 # If not using the ECC layout, there are 2 extra bytes at the beginning of the
371 # spare data block. Ignore them.
372
373 ecc_offset = 0 if config.ecc else 2
374
375 # The spare data signature is built dynamically, as there are repeating data patterns
376 # that we can match on to find where the spare data ends. Take this hexdump for example:
377 #
378 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................|
379 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...|
380 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
381 #
382 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then
383 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and
384 # the four bytes at 0x814 (in the next page data section) are identical. This is because
385 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four
386 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the
387 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name
388 # checksum bytes.
389 #
390 # Thus, the signature for identifying the next page section (and hence, the end of the
391 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF
392 #
393 # Note that this requires at least one non-empty subdirectory; in practice, any Linux
394 # file system should meet this requirement, but one could create a file system that
395 # does not meet this requirement.
396
397 object_id_offset = 4
398 object_id_start = page_size + ecc_offset + object_id_offset
399 object_id_end = object_id_start + 4
400 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff"
401
402 config.spare_size = (
403 self.file[object_id_end : object_id_end + page_size].find(spare_signature)
404 + object_id_offset
405 + ecc_offset
406 )
407
408 # Sanity check the spare size, make sure it looks legit
409 if config.spare_size not in VALID_SPARE_SIZES:
410 raise InvalidInputFormat(
411 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}"
412 )
413
414 return config
415
416 def insert_entry(self, entry: YAFFSEntry):
417 duplicate_node = self.get_entry(entry.object_id)
418 if duplicate_node is not None:
419 # a header chunk with the same object ID already exists
420 # in the tree, meaning the file metadata were modified,
421 # or the file got truncated / rewritten.
422 # Given that YAFFS is a log filesystem, whichever chunk comes
423 # last takes precendence.
424 self.file_entries.update_node(str(entry.object_id), data=entry)
425 return
426
427 if entry.object_id == entry.parent_obj_id:
428 self.file_entries.create_node(
429 str(entry.object_id),
430 str(entry.object_id),
431 data=entry,
432 )
433 else:
434 parent_node = self.get_entry(entry.parent_obj_id)
435 if parent_node is None:
436 logger.warning("Trying to insert an orphaned entry.", entry=entry)
437 return
438 if parent_node.object_type != YaffsObjectType.DIRECTORY:
439 logger.warning(
440 "Trying to insert an entry with non-directory parent.", entry=entry
441 )
442 return
443 self.file_entries.create_node(
444 str(entry.object_id),
445 str(entry.object_id),
446 data=entry,
447 parent=str(entry.parent_obj_id),
448 )
449
450 def get_entry(self, object_id: int) -> YAFFSEntry | None:
451 try:
452 entry = self.file_entries.get_node(str(object_id))
453 if entry:
454 return entry.data
455 except NodeIDAbsentError:
456 logger.warning(
457 "Can't find entry within the YAFFS tree, something's wrong.",
458 object_id=object_id,
459 )
460 return None
461
462 def resolve_path(self, entry: YAFFSEntry) -> Path:
463 resolved_path = Path(entry.name)
464 if self.file_entries.parent(str(entry.object_id)) is not None:
465 parent_entry = self.file_entries[str(entry.parent_obj_id)].data
466 return self.resolve_path(parent_entry).joinpath(resolved_path)
467 return resolved_path
468
469 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
470 for chunk in self.get_chunks(entry.object_id):
471 yield self.file[chunk.offset : chunk.offset + chunk.byte_count]
472
473 def extract(self, fs: FileSystem):
474 for entry in [
475 self.file_entries.get_node(node)
476 for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
477 ]:
478 if entry is None or entry.data is None:
479 continue
480 self.extract_entry(entry.data, fs)
481
482 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem):
483 if entry.object_type == YaffsObjectType.UNKNOWN:
484 logger.warning("unknown entry type", entry=entry)
485 return
486
487 out_path = self.resolve_path(entry)
488
489 if entry.object_type == YaffsObjectType.SPECIAL:
490 if not isinstance(entry, YAFFS2Entry):
491 logger.warning("non YAFFS2 special object", entry=entry)
492 return
493
494 fs.mknod(out_path, entry.st_mode, entry.st_rdev)
495 elif entry.object_type == YaffsObjectType.DIRECTORY:
496 fs.mkdir(out_path, exist_ok=True)
497 elif entry.object_type == YaffsObjectType.FILE:
498 fs.write_chunks(out_path, self.get_file_chunks(entry))
499 elif entry.object_type == YaffsObjectType.SYMLINK:
500 fs.create_symlink(src=Path(entry.alias), dst=out_path)
501 elif entry.object_type == YaffsObjectType.HARDLINK:
502 dst_entry = self.file_entries[str(entry.equiv_id)].data
503 dst_path = self.resolve_path(dst_entry)
504 fs.create_hardlink(src=dst_path, dst=out_path)
505
506
507class YAFFS2Parser(YAFFSParser):
508 HEADER_STRUCT = "yaffs2_obj_hdr_t"
509
510 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk:
511 # images built without ECC have two superfluous bytes before the chunk ID.
512 if not self.config.ecc:
513 # adding two null bytes at the end only works if it's LE
514 spare = spare[2:] + b"\x00\x00"
515
516 yaffs2_packed_tags = self._struct_parser.parse(
517 "yaffs2_packed_tags_t", spare, self.config.endianness
518 )
519 logger.debug(
520 "yaffs2_packed_tags_t",
521 yaffs2_packed_tags=yaffs2_packed_tags,
522 config=self.config,
523 _verbosity=3,
524 )
525
526 return YAFFS2Chunk(
527 offset=offset,
528 chunk_id=yaffs2_packed_tags.chunk_id,
529 seq_number=yaffs2_packed_tags.seq_number,
530 byte_count=yaffs2_packed_tags.byte_count,
531 object_id=yaffs2_packed_tags.object_id,
532 )
533
534 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
535 return YAFFS2Entry(
536 object_id=chunk.object_id,
537 object_type=header.type,
538 parent_obj_id=header.parent_obj_id,
539 sum_no_longer_used=header.sum_no_longer_used,
540 name=snull(header.name[:-1]).decode("utf-8"),
541 chksum=header.chksum,
542 st_mode=header.st_mode,
543 st_uid=header.st_uid,
544 st_gid=header.st_gid,
545 st_atime=header.st_atime,
546 st_mtime=header.st_mtime,
547 st_ctime=header.st_ctime,
548 equiv_id=header.equiv_id,
549 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
550 st_rdev=header.st_rdev,
551 win_ctime=header.win_ctime,
552 win_mtime=header.win_mtime,
553 inband_shadowed_obj_id=header.inband_shadowed_obj_id,
554 inband_is_shrink=header.inband_is_shrink,
555 reserved=header.reserved,
556 shadows_obj=header.shadows_obj,
557 is_shrink=header.is_shrink,
558 filehead=YAFFSFileVar(
559 file_size=header.filehead.file_size,
560 stored_size=header.filehead.stored_size,
561 shrink_size=header.filehead.shrink_size,
562 top_level=header.filehead.top_level,
563 ),
564 file_size=decode_file_size(header.file_size_high, header.file_size_low),
565 )
566
567 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]:
568 """Return a filtered and ordered list of chunks."""
569 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number!
570
571 # As each block is allocated, the file system's
572 # sequence number is incremented and each chunk in the block is marked with that
573 # sequence number. The sequence number thus provides a way of organising the log in
574 # chronological order.
575
576 # Since we're scanning backwards, the most recently written - and thus current - chunk
577 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted.
578
579 # note: there is no deletion marker in YAFFS2
580
581 for _, chunks in itertools.groupby(
582 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id)
583 ):
584 yield max(chunks, key=lambda chunk: chunk.seq_number)
585
586 def init_tree(self):
587 # YAFFS2 do not store the root in file.
588 root = YAFFS2Entry(
589 object_type=YaffsObjectType.DIRECTORY,
590 object_id=1,
591 parent_obj_id=1,
592 )
593 self.insert_entry(root)
594
595
596class YAFFS1Parser(YAFFSParser):
597 HEADER_STRUCT = "yaffs1_obj_hdr_t"
598
599 def __init__(self, file: File, config: YAFFSConfig | None = None):
600 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk
601 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare).
602 # In the future we might decide to allow for different chunk sizes.
603 config = YAFFSConfig(
604 page_size=YAFFS1_PAGE_SIZE,
605 spare_size=YAFFS1_SPARE_SIZE,
606 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS),
607 ecc=False,
608 )
609 super().__init__(file, config)
610
611 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk:
612 yaffs_sparse = self._struct_parser.parse(
613 "yaffs_spare_t", spare, self.config.endianness
614 )
615
616 yaffs_packed_tags = self._struct_parser.parse(
617 "yaffs1_packed_tags_t",
618 bytes(
619 [
620 yaffs_sparse.tag_b0,
621 yaffs_sparse.tag_b1,
622 yaffs_sparse.tag_b2,
623 yaffs_sparse.tag_b3,
624 yaffs_sparse.tag_b4,
625 yaffs_sparse.tag_b5,
626 yaffs_sparse.tag_b6,
627 yaffs_sparse.tag_b7,
628 ]
629 ),
630 self.config.endianness,
631 )
632
633 return YAFFS1Chunk(
634 offset=offset,
635 chunk_id=yaffs_packed_tags.chunk_id,
636 serial=yaffs_packed_tags.serial,
637 byte_count=yaffs_packed_tags.byte_count,
638 object_id=yaffs_packed_tags.object_id,
639 ecc=yaffs_packed_tags.ecc,
640 page_status=yaffs_sparse.page_status,
641 block_status=yaffs_sparse.block_status,
642 )
643
644 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
645 return YAFFSEntry(
646 object_type=header.type,
647 object_id=chunk.object_id,
648 parent_obj_id=header.parent_obj_id,
649 sum_no_longer_used=header.sum_no_longer_used,
650 name=snull(header.name[0:128]).decode("utf-8"),
651 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
652 file_size=header.file_size,
653 equiv_id=header.equivalent_object_id,
654 )
655
656 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]:
657 """Return a filtered and ordered list of chunks."""
658 # YAFFS1 chunks have a serial number that is used to track
659 # which chunk takes precedence if two chunks have the same
660 # identifier. This is used in scenarios like power loss
661 # during a copy operation. Whenever we have two chunks with
662 # the same id, we only return the one with the highest serial.
663
664 for _, chunks in itertools.groupby(
665 sorted(
666 self.data_chunks[object_id],
667 key=lambda chunk: chunk.chunk_id,
668 )
669 ):
670 # serial is a 2 bit, this function works since there's always at most
671 # two chunks with the same chunk_id at any given time
672 yield max(chunks, key=lambda chunk: (chunk.serial + 1) & 3)
673
674
675def is_yaffs_v1(file: File, start_offset: int) -> bool:
676 struct_parser = StructParser(C_DEFINITIONS)
677 file.seek(start_offset, io.SEEK_SET)
678 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00":
679 endian = Endian.LITTLE
680 else:
681 endian = Endian.BIG
682 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET)
683 spare = file.read(YAFFS1_SPARE_SIZE)
684
685 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian)
686
687 yaffs_packed_tags = struct_parser.parse(
688 "yaffs1_packed_tags_t",
689 bytes(
690 [
691 yaffs_sparse.tag_b0,
692 yaffs_sparse.tag_b1,
693 yaffs_sparse.tag_b2,
694 yaffs_sparse.tag_b3,
695 yaffs_sparse.tag_b4,
696 yaffs_sparse.tag_b5,
697 yaffs_sparse.tag_b6,
698 yaffs_sparse.tag_b7,
699 ]
700 ),
701 endian,
702 )
703 file.seek(start_offset, io.SEEK_SET)
704 return (
705 yaffs_packed_tags.chunk_id == 0
706 and yaffs_packed_tags.serial == 0
707 and yaffs_packed_tags.object_id == 1
708 )
709
710
711def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser:
712 if is_yaffs_v1(file, start_offset):
713 return YAFFS1Parser(file)
714 return YAFFS2Parser(file)
715
716
717class YAFFSExtractor(Extractor):
718 def extract(self, inpath: Path, outdir: Path):
719 infile = File.from_path(inpath)
720 parser = instantiate_parser(infile)
721 parser.parse(store=True)
722 fs = FileSystem(outdir)
723 parser.extract(fs)
724 return ExtractResult(reports=fs.problems)
725
726
727class YAFFSHandler(Handler):
728 NAME = "yaffs"
729
730 PATTERNS = [
731 HexString(
732 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian"
733 ),
734 HexString(
735 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian"
736 ),
737 HexString(
738 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian"
739 ),
740 HexString(
741 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian"
742 ),
743 ]
744
745 EXTRACTOR = YAFFSExtractor()
746
747 DOC = HandlerDoc(
748 name="YAFFS",
749 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.",
750 handler_type=HandlerType.FILESYSTEM,
751 vendor=None,
752 references=[
753 Reference(
754 title="YAFFS Documentation",
755 url="https://yaffs.net/",
756 ),
757 Reference(
758 title="YAFFS Wikipedia",
759 url="https://en.wikipedia.org/wiki/YAFFS",
760 ),
761 ],
762 limitations=[],
763 )
764
765 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
766 parser = instantiate_parser(file, start_offset)
767 parser.parse()
768 # skip 0xFF padding
769 file.seek(parser.end_offset, io.SEEK_SET)
770 read_until_past(file, b"\xff")
771 return ValidChunk(start_offset=start_offset, end_offset=file.tell())