1import io
2import itertools
3from collections import defaultdict
4from collections.abc import Iterable
5from enum import IntEnum
6from pathlib import Path
7from typing import Optional
8
9import attrs
10from structlog import get_logger
11from treelib.exceptions import NodeIDAbsentError
12from treelib.tree import Tree
13
14from unblob.file_utils import (
15 Endian,
16 File,
17 FileSystem,
18 InvalidInputFormat,
19 StructParser,
20 get_endian_multi,
21 read_until_past,
22 snull,
23)
24from unblob.models import (
25 Extractor,
26 ExtractResult,
27 Handler,
28 HandlerDoc,
29 HandlerType,
30 HexString,
31 Reference,
32 ValidChunk,
33)
34
35logger = get_logger()
36
37SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00"
38SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00"
39SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00"
40SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00"
41SPARE_START_LEN = 6
42
43# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE
44BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03]
45
46VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032]
47VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512]
48YAFFS1_PAGE_SIZE = 512
49YAFFS1_SPARE_SIZE = 16
50
51C_DEFINITIONS = """
52 struct yaffs1_obj_hdr {
53 uint32 type; /* enum yaffs_obj_type */
54 uint32 parent_obj_id;
55 uint16 sum_no_longer_used;
56 char name[258];
57 uint32 st_mode; // protection
58 uint32 st_uid; // user ID of owner
59 uint32 st_gid; // group ID of owner
60 uint32 st_atime; // time of last access
61 uint32 st_mtime; // time of last modification
62 uint32 st_ctime; // time of last change
63 uint32 file_size; // File size applies to files only
64 uint32 equivalent_object_id; // Equivalent object id applies to hard links only.
65 char alias[160]; // alias only applies to symlinks
66 } yaffs1_obj_hdr_t;
67
68 struct yaffs1_packed_tags {
69 uint32 chunk_id:20;
70 uint32 serial:2;
71 uint32 byte_count:10;
72 uint32 object_id:18;
73 uint32 ecc:12;
74 uint32 unused:2;
75 } yaffs1_packed_tags_t;
76
77 typedef struct yaffs_spare
78 {
79 uint8 tag_b0;
80 uint8 tag_b1;
81 uint8 tag_b2;
82 uint8 tag_b3;
83 uint8 page_status; // set to 0 to delete the chunk
84 uint8 block_status;
85 uint8 tag_b4;
86 uint8 tag_b5;
87 uint8 ecc_0;
88 uint8 ecc_1;
89 uint8 ecc_2;
90 uint8 tag_b6;
91 uint8 tag_b7;
92 uint8 ecc_3;
93 uint8 ecc_4;
94 uint8 ecc_5;
95 } yaffs_spare_t;
96
97 struct yaffs_file_var {
98 uint32 file_size;
99 uint32 stored_size;
100 uint32 shrink_size;
101 int top_level;
102 };
103
104 typedef struct yaffs2_obj_hdr {
105 uint32 type; /* enum yaffs_obj_type */
106 /* Apply to everything */
107 uint32 parent_obj_id;
108 uint16 sum_no_longer_used; /* checksum of name. No longer used */
109 char name[256];
110 uint16 chksum;
111 /* The following apply to all object types except for hard links */
112 uint32 st_mode; /* protection */
113 uint32 st_uid;
114 uint32 st_gid;
115 uint32 st_atime;
116 uint32 st_mtime;
117 uint32 st_ctime;
118 uint32 file_size_low; /* File size applies to files only */
119 int equiv_id; /* Equivalent object id applies to hard links only. */
120 char alias[160]; /* Alias is for symlinks only. */
121 uint32 st_rdev; /* stuff for block and char devices (major/min) */
122 uint32 win_ctime[2];
123 uint32 win_atime[2];
124 uint32 win_mtime[2];
125 uint32 inband_shadowed_obj_id;
126 uint32 inband_is_shrink;
127 uint32 file_size_high;
128 uint32 reserved[1];
129 int shadows_obj; /* This object header shadows the specified object if > 0 */
130 /* is_shrink applies to object headers written when we make a hole. */
131 uint32 is_shrink;
132 yaffs_file_var filehead;
133 } yaffs2_obj_hdr_t;
134
135 typedef struct yaffs2_packed_tags {
136 uint32 seq_number;
137 uint32 object_id;
138 uint32 chunk_id;
139 uint32 byte_count;
140 } yaffs2_packed_tags_t;
141"""
142
143
144class YaffsObjectType(IntEnum):
145 UNKNOWN = 0
146 FILE = 1
147 SYMLINK = 2
148 DIRECTORY = 3
149 HARDLINK = 4
150 SPECIAL = 5
151
152
153@attrs.define
154class YAFFSChunk:
155 chunk_id: int
156 offset: int
157 byte_count: int
158 object_id: int
159
160
161@attrs.define
162class YAFFS1Chunk(YAFFSChunk):
163 serial: int
164 ecc: bytes
165 page_status: int
166 block_status: int
167
168
169@attrs.define
170class YAFFS2Chunk(YAFFSChunk):
171 seq_number: int
172
173
174@attrs.define
175class YAFFSFileVar:
176 file_size: int
177 stored_size: int
178 shrink_size: int
179 top_level: int
180
181
182@attrs.define
183class YAFFSConfig:
184 endianness: Endian
185 page_size: int
186 spare_size: int
187 ecc: bool
188
189
190@attrs.define
191class YAFFSEntry:
192 object_type: YaffsObjectType
193 object_id: int
194 parent_obj_id: int
195 sum_no_longer_used: int = attrs.field(default=0)
196 name: str = attrs.field(default="")
197 alias: str = attrs.field(default="")
198 equiv_id: int = attrs.field(default=0)
199 file_size: int = attrs.field(default=0)
200 st_mode: int = attrs.field(default=0)
201 st_uid: int = attrs.field(default=0)
202 st_gid: int = attrs.field(default=0)
203 st_atime: int = attrs.field(default=0)
204 st_mtime: int = attrs.field(default=0)
205 st_ctime: int = attrs.field(default=0)
206
207 def __lt__(self, other):
208 return self.object_id < other.object_id
209
210 def __gt__(self, other):
211 return self.object_id > other.object_id
212
213 def __eq__(self, other):
214 return self.object_id == other.object_id
215
216 def __hash__(self):
217 return hash(self.object_id)
218
219 def __str__(self):
220 return f"{self.object_id}: {self.name}"
221
222
223@attrs.define(kw_only=True)
224class YAFFS2Entry(YAFFSEntry):
225 chksum: int = attrs.field(default=0)
226 st_rdev: int = attrs.field(default=0)
227 win_ctime: list[int] = attrs.field(default=[])
228 win_mtime: list[int] = attrs.field(default=[])
229 inband_shadowed_obj_id: int = attrs.field(default=0)
230 inband_is_shrink: int = attrs.field(default=0)
231 reserved: list[int] = attrs.field(default=[])
232 shadows_obj: int = attrs.field(default=0)
233 is_shrink: int = attrs.field(default=0)
234 filehead: YAFFSFileVar = attrs.field(default=None)
235
236
237def iterate_over_file(
238 file: File, config: YAFFSConfig
239) -> Iterable[tuple[int, bytes, bytes]]:
240 start_offset = file.tell()
241 page = file.read(config.page_size)
242 spare = file.read(config.spare_size)
243
244 while len(page) == config.page_size and len(spare) == config.spare_size:
245 yield (start_offset, page, spare)
246 page = file.read(config.page_size)
247 spare = file.read(config.spare_size)
248 start_offset = file.tell()
249
250
251def decode_file_size(high: int, low: int) -> int:
252 """File size can be encoded as 64 bits or 32 bits values.
253
254 If upper 32 bits are set, it's a 64 bits integer value.
255 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero.
256 """
257 if high != 0xFFFFFFFF:
258 return (high << 32) | (low & 0xFFFFFFFF)
259 if low != 0xFFFFFFFF:
260 return low
261 return 0
262
263
264def valid_name(name: bytes) -> bool:
265 # a valid name is either full of null bytes, or unicode decodable
266 try:
267 snull(name[:-1]).decode("utf-8")
268 except UnicodeDecodeError:
269 return False
270 else:
271 return True
272
273
274def is_valid_header(header) -> bool:
275 if not valid_name(header.name[:-3]):
276 return False
277 if header.type > 5:
278 return False
279 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103
280 return False
281 return True
282
283
284class YAFFSParser:
285 HEADER_STRUCT: str
286
287 def __init__(self, file: File, config: Optional[YAFFSConfig] = None):
288 self.file_entries = Tree()
289 self.data_chunks = defaultdict(list)
290 self.file = file
291 self._struct_parser = StructParser(C_DEFINITIONS)
292 self.end_offset = -1
293 if config is None:
294 self.config = self.auto_detect()
295 logger.debug("auto-detected config", config=self.config)
296 else:
297 self.config = config
298
299 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
300 raise NotImplementedError
301
302 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk:
303 raise NotImplementedError
304
305 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]:
306 raise NotImplementedError
307
308 def init_tree(self):
309 return
310
311 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002
312 self.init_tree()
313 entries = 0
314 for offset, page, spare in iterate_over_file(self.file, self.config):
315 try:
316 data_chunk = self.build_chunk(
317 spare, offset - self.config.page_size - self.config.spare_size
318 )
319 except EOFError:
320 break
321
322 # ignore chunks tagged as deleted
323 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0:
324 continue
325
326 if data_chunk.chunk_id == 0:
327 try:
328 header = self._struct_parser.parse(
329 self.HEADER_STRUCT, page, self.config.endianness
330 )
331 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3)
332 except EOFError:
333 break
334
335 if not is_valid_header(header):
336 break
337
338 if store:
339 self.insert_entry(self.build_entry(header, data_chunk))
340 entries += 1
341 elif store:
342 self.data_chunks[data_chunk.object_id].append(data_chunk)
343 if not entries:
344 raise InvalidInputFormat("YAFFS filesystem with no entries.")
345 self.end_offset = self.file.tell()
346
347 def auto_detect(self) -> YAFFSConfig:
348 """Auto-detect page_size, spare_size, and ECC using known signatures."""
349 page_size = 0
350 config = None
351 for page_size in VALID_PAGE_SIZES:
352 spare_start = self.file[page_size : page_size + SPARE_START_LEN]
353 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC):
354 config = YAFFSConfig(
355 endianness=Endian.LITTLE,
356 page_size=page_size,
357 ecc=True,
358 spare_size=-1,
359 )
360 break
361 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC):
362 config = YAFFSConfig(
363 endianness=Endian.LITTLE,
364 page_size=page_size,
365 ecc=False,
366 spare_size=-1,
367 )
368 break
369 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC):
370 config = YAFFSConfig(
371 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1
372 )
373 break
374 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC):
375 config = YAFFSConfig(
376 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1
377 )
378 break
379
380 if config is None:
381 raise InvalidInputFormat("Cannot detect YAFFS configuration.")
382
383 # If not using the ECC layout, there are 2 extra bytes at the beginning of the
384 # spare data block. Ignore them.
385
386 ecc_offset = 0 if config.ecc else 2
387
388 # The spare data signature is built dynamically, as there are repeating data patterns
389 # that we can match on to find where the spare data ends. Take this hexdump for example:
390 #
391 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................|
392 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...|
393 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
394 #
395 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then
396 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and
397 # the four bytes at 0x814 (in the next page data section) are identical. This is because
398 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four
399 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the
400 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name
401 # checksum bytes.
402 #
403 # Thus, the signature for identifying the next page section (and hence, the end of the
404 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF
405 #
406 # Note that this requires at least one non-empty subdirectory; in practice, any Linux
407 # file system should meet this requirement, but one could create a file system that
408 # does not meet this requirement.
409
410 object_id_offset = 4
411 object_id_start = page_size + ecc_offset + object_id_offset
412 object_id_end = object_id_start + 4
413 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff"
414
415 config.spare_size = (
416 self.file[object_id_end : object_id_end + page_size].find(spare_signature)
417 + object_id_offset
418 + ecc_offset
419 )
420
421 # Sanity check the spare size, make sure it looks legit
422 if config.spare_size not in VALID_SPARE_SIZES:
423 raise InvalidInputFormat(
424 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}"
425 )
426
427 return config
428
429 def insert_entry(self, entry: YAFFSEntry):
430 duplicate_node = self.get_entry(entry.object_id)
431 if duplicate_node is not None:
432 # a header chunk with the same object ID already exists
433 # in the tree, meaning the file metadata were modified,
434 # or the file got truncated / rewritten.
435 # Given that YAFFS is a log filesystem, whichever chunk comes
436 # last takes precendence.
437 self.file_entries.update_node(str(entry.object_id), data=entry)
438 return
439
440 if entry.object_id == entry.parent_obj_id:
441 self.file_entries.create_node(
442 str(entry.object_id),
443 str(entry.object_id),
444 data=entry,
445 )
446 else:
447 parent_node = self.get_entry(entry.parent_obj_id)
448 if parent_node is None:
449 logger.warning("Trying to insert an orphaned entry.", entry=entry)
450 return
451 if parent_node.object_type != YaffsObjectType.DIRECTORY:
452 logger.warning(
453 "Trying to insert an entry with non-directory parent.", entry=entry
454 )
455 return
456 self.file_entries.create_node(
457 str(entry.object_id),
458 str(entry.object_id),
459 data=entry,
460 parent=str(entry.parent_obj_id),
461 )
462
463 def get_entry(self, object_id: int) -> Optional[YAFFSEntry]:
464 try:
465 entry = self.file_entries.get_node(str(object_id))
466 if entry:
467 return entry.data
468 except NodeIDAbsentError:
469 logger.warning(
470 "Can't find entry within the YAFFS tree, something's wrong.",
471 object_id=object_id,
472 )
473 return None
474
475 def resolve_path(self, entry: YAFFSEntry) -> Path:
476 resolved_path = Path(entry.name)
477 if self.file_entries.parent(str(entry.object_id)) is not None:
478 parent_entry = self.file_entries[str(entry.parent_obj_id)].data
479 return self.resolve_path(parent_entry).joinpath(resolved_path)
480 return resolved_path
481
482 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
483 for chunk in self.get_chunks(entry.object_id):
484 yield self.file[chunk.offset : chunk.offset + chunk.byte_count]
485
486 def extract(self, fs: FileSystem):
487 for entry in [
488 self.file_entries.get_node(node)
489 for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
490 ]:
491 if entry is None or entry.data is None:
492 continue
493 self.extract_entry(entry.data, fs)
494
495 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem):
496 if entry.object_type == YaffsObjectType.UNKNOWN:
497 logger.warning("unknown entry type", entry=entry)
498 return
499
500 out_path = self.resolve_path(entry)
501
502 if entry.object_type == YaffsObjectType.SPECIAL:
503 if not isinstance(entry, YAFFS2Entry):
504 logger.warning("non YAFFS2 special object", entry=entry)
505 return
506
507 fs.mknod(out_path, entry.st_mode, entry.st_rdev)
508 elif entry.object_type == YaffsObjectType.DIRECTORY:
509 fs.mkdir(out_path, exist_ok=True)
510 elif entry.object_type == YaffsObjectType.FILE:
511 fs.write_chunks(out_path, self.get_file_chunks(entry))
512 elif entry.object_type == YaffsObjectType.SYMLINK:
513 fs.create_symlink(src=Path(entry.alias), dst=out_path)
514 elif entry.object_type == YaffsObjectType.HARDLINK:
515 dst_entry = self.file_entries[str(entry.equiv_id)].data
516 dst_path = self.resolve_path(dst_entry)
517 fs.create_hardlink(src=dst_path, dst=out_path)
518
519
520class YAFFS2Parser(YAFFSParser):
521 HEADER_STRUCT = "yaffs2_obj_hdr_t"
522
523 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk:
524 # images built without ECC have two superfluous bytes before the chunk ID.
525 if not self.config.ecc:
526 # adding two null bytes at the end only works if it's LE
527 spare = spare[2:] + b"\x00\x00"
528
529 yaffs2_packed_tags = self._struct_parser.parse(
530 "yaffs2_packed_tags_t", spare, self.config.endianness
531 )
532 logger.debug(
533 "yaffs2_packed_tags_t",
534 yaffs2_packed_tags=yaffs2_packed_tags,
535 config=self.config,
536 _verbosity=3,
537 )
538
539 return YAFFS2Chunk(
540 offset=offset,
541 chunk_id=yaffs2_packed_tags.chunk_id,
542 seq_number=yaffs2_packed_tags.seq_number,
543 byte_count=yaffs2_packed_tags.byte_count,
544 object_id=yaffs2_packed_tags.object_id,
545 )
546
547 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
548 return YAFFS2Entry(
549 object_id=chunk.object_id,
550 object_type=header.type,
551 parent_obj_id=header.parent_obj_id,
552 sum_no_longer_used=header.sum_no_longer_used,
553 name=snull(header.name[:-1]).decode("utf-8"),
554 chksum=header.chksum,
555 st_mode=header.st_mode,
556 st_uid=header.st_uid,
557 st_gid=header.st_gid,
558 st_atime=header.st_atime,
559 st_mtime=header.st_mtime,
560 st_ctime=header.st_ctime,
561 equiv_id=header.equiv_id,
562 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
563 st_rdev=header.st_rdev,
564 win_ctime=header.win_ctime,
565 win_mtime=header.win_mtime,
566 inband_shadowed_obj_id=header.inband_shadowed_obj_id,
567 inband_is_shrink=header.inband_is_shrink,
568 reserved=header.reserved,
569 shadows_obj=header.shadows_obj,
570 is_shrink=header.is_shrink,
571 filehead=YAFFSFileVar(
572 file_size=header.filehead.file_size,
573 stored_size=header.filehead.stored_size,
574 shrink_size=header.filehead.shrink_size,
575 top_level=header.filehead.top_level,
576 ),
577 file_size=decode_file_size(header.file_size_high, header.file_size_low),
578 )
579
580 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]:
581 """Return a filtered and ordered list of chunks."""
582 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number!
583
584 # As each block is allocated, the file system's
585 # sequence number is incremented and each chunk in the block is marked with that
586 # sequence number. The sequence number thus provides a way of organising the log in
587 # chronological order.
588
589 # Since we're scanning backwards, the most recently written - and thus current - chunk
590 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted.
591
592 # note: there is no deletion marker in YAFFS2
593
594 for _, chunks in itertools.groupby(
595 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id)
596 ):
597 yield max(chunks, key=lambda chunk: chunk.seq_number)
598
599 def init_tree(self):
600 # YAFFS2 do not store the root in file.
601 root = YAFFS2Entry(
602 object_type=YaffsObjectType.DIRECTORY,
603 object_id=1,
604 parent_obj_id=1,
605 )
606 self.insert_entry(root)
607
608
609class YAFFS1Parser(YAFFSParser):
610 HEADER_STRUCT = "yaffs1_obj_hdr_t"
611
612 def __init__(self, file: File, config: Optional[YAFFSConfig] = None):
613 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk
614 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare).
615 # In the future we might decide to allow for different chunk sizes.
616 config = YAFFSConfig(
617 page_size=YAFFS1_PAGE_SIZE,
618 spare_size=YAFFS1_SPARE_SIZE,
619 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS),
620 ecc=False,
621 )
622 super().__init__(file, config)
623
624 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk:
625 yaffs_sparse = self._struct_parser.parse(
626 "yaffs_spare_t", spare, self.config.endianness
627 )
628
629 yaffs_packed_tags = self._struct_parser.parse(
630 "yaffs1_packed_tags_t",
631 bytes(
632 [
633 yaffs_sparse.tag_b0,
634 yaffs_sparse.tag_b1,
635 yaffs_sparse.tag_b2,
636 yaffs_sparse.tag_b3,
637 yaffs_sparse.tag_b4,
638 yaffs_sparse.tag_b5,
639 yaffs_sparse.tag_b6,
640 yaffs_sparse.tag_b7,
641 ]
642 ),
643 self.config.endianness,
644 )
645
646 return YAFFS1Chunk(
647 offset=offset,
648 chunk_id=yaffs_packed_tags.chunk_id,
649 serial=yaffs_packed_tags.serial,
650 byte_count=yaffs_packed_tags.byte_count,
651 object_id=yaffs_packed_tags.object_id,
652 ecc=yaffs_packed_tags.ecc,
653 page_status=yaffs_sparse.page_status,
654 block_status=yaffs_sparse.block_status,
655 )
656
657 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
658 return YAFFSEntry(
659 object_type=header.type,
660 object_id=chunk.object_id,
661 parent_obj_id=header.parent_obj_id,
662 sum_no_longer_used=header.sum_no_longer_used,
663 name=snull(header.name[0:128]).decode("utf-8"),
664 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
665 file_size=header.file_size,
666 equiv_id=header.equivalent_object_id,
667 )
668
669 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]:
670 """Return a filtered and ordered list of chunks."""
671 # YAFFS1 chunks have a serial number that is used to track
672 # which chunk takes precedence if two chunks have the same
673 # identifier. This is used in scenarios like power loss
674 # during a copy operation. Whenever we have two chunks with
675 # the same id, we only return the one with the highest serial.
676
677 for _, chunks in itertools.groupby(
678 sorted(
679 self.data_chunks[object_id],
680 key=lambda chunk: chunk.chunk_id,
681 )
682 ):
683 # serial is a 2 bit, this function works since there's always at most
684 # two chunks with the same chunk_id at any given time
685 yield max(chunks, key=lambda chunk: ((chunk.serial + 1) & 3))
686
687
688def is_yaffs_v1(file: File, start_offset: int) -> bool:
689 struct_parser = StructParser(C_DEFINITIONS)
690 file.seek(start_offset, io.SEEK_SET)
691 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00":
692 endian = Endian.LITTLE
693 else:
694 endian = Endian.BIG
695 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET)
696 spare = file.read(YAFFS1_SPARE_SIZE)
697
698 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian)
699
700 yaffs_packed_tags = struct_parser.parse(
701 "yaffs1_packed_tags_t",
702 bytes(
703 [
704 yaffs_sparse.tag_b0,
705 yaffs_sparse.tag_b1,
706 yaffs_sparse.tag_b2,
707 yaffs_sparse.tag_b3,
708 yaffs_sparse.tag_b4,
709 yaffs_sparse.tag_b5,
710 yaffs_sparse.tag_b6,
711 yaffs_sparse.tag_b7,
712 ]
713 ),
714 endian,
715 )
716 file.seek(start_offset, io.SEEK_SET)
717 return (
718 yaffs_packed_tags.chunk_id == 0
719 and yaffs_packed_tags.serial == 0
720 and yaffs_packed_tags.object_id == 1
721 )
722
723
724def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser:
725 if is_yaffs_v1(file, start_offset):
726 return YAFFS1Parser(file)
727 return YAFFS2Parser(file)
728
729
730class YAFFSExtractor(Extractor):
731 def extract(self, inpath: Path, outdir: Path):
732 infile = File.from_path(inpath)
733 parser = instantiate_parser(infile)
734 parser.parse(store=True)
735 fs = FileSystem(outdir)
736 parser.extract(fs)
737 return ExtractResult(reports=fs.problems)
738
739
740class YAFFSHandler(Handler):
741 NAME = "yaffs"
742
743 PATTERNS = [
744 HexString(
745 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian"
746 ),
747 HexString(
748 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian"
749 ),
750 HexString(
751 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian"
752 ),
753 HexString(
754 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian"
755 ),
756 ]
757
758 EXTRACTOR = YAFFSExtractor()
759
760 DOC = HandlerDoc(
761 name="YAFFS",
762 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.",
763 handler_type=HandlerType.FILESYSTEM,
764 vendor=None,
765 references=[
766 Reference(
767 title="YAFFS Documentation",
768 url="https://yaffs.net/",
769 ),
770 Reference(
771 title="YAFFS Wikipedia",
772 url="https://en.wikipedia.org/wiki/YAFFS",
773 ),
774 ],
775 limitations=[],
776 )
777
778 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
779 parser = instantiate_parser(file, start_offset)
780 parser.parse()
781 # skip 0xFF padding
782 file.seek(parser.end_offset, io.SEEK_SET)
783 read_until_past(file, b"\xff")
784 return ValidChunk(start_offset=start_offset, end_offset=file.tell())