1import io
2import itertools
3from collections import defaultdict
4from collections.abc import Iterable
5from enum import IntEnum
6from pathlib import Path
7from typing import Optional
8
9import attrs
10from structlog import get_logger
11from treelib.exceptions import NodeIDAbsentError
12from treelib.tree import Tree
13
14from unblob.file_utils import (
15 Endian,
16 File,
17 FileSystem,
18 InvalidInputFormat,
19 StructParser,
20 get_endian_multi,
21 read_until_past,
22 snull,
23)
24from unblob.models import (
25 Extractor,
26 ExtractResult,
27 Handler,
28 HandlerDoc,
29 HandlerType,
30 HexString,
31 Reference,
32 ValidChunk,
33)
34
35logger = get_logger()
36
37SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00"
38SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00"
39SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00"
40SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00"
41SPARE_START_LEN = 6
42
43# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE
44BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03]
45
46VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032]
47VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512]
48YAFFS1_PAGE_SIZE = 512
49YAFFS1_SPARE_SIZE = 16
50
51C_DEFINITIONS = """
52 struct yaffs1_obj_hdr {
53 uint32 type; /* enum yaffs_obj_type */
54 uint32 parent_obj_id;
55 uint16 sum_no_longer_used;
56 char name[258];
57 uint32 st_mode; // protection
58 uint32 st_uid; // user ID of owner
59 uint32 st_gid; // group ID of owner
60 uint32 st_atime; // time of last access
61 uint32 st_mtime; // time of last modification
62 uint32 st_ctime; // time of last change
63 uint32 file_size; // File size applies to files only
64 uint32 equivalent_object_id; // Equivalent object id applies to hard links only.
65 char alias[160]; // alias only applies to symlinks
66 } yaffs1_obj_hdr_t;
67
68 struct yaffs1_packed_tags {
69 uint32 chunk_id:20;
70 uint32 serial:2;
71 uint32 byte_count:10;
72 uint32 object_id:18;
73 uint32 ecc:12;
74 uint32 unused:2;
75 } yaffs1_packed_tags_t;
76
77 typedef struct yaffs_spare
78 {
79 uint8 tag_b0;
80 uint8 tag_b1;
81 uint8 tag_b2;
82 uint8 tag_b3;
83 uint8 page_status; // set to 0 to delete the chunk
84 uint8 block_status;
85 uint8 tag_b4;
86 uint8 tag_b5;
87 uint8 ecc_0;
88 uint8 ecc_1;
89 uint8 ecc_2;
90 uint8 tag_b6;
91 uint8 tag_b7;
92 uint8 ecc_3;
93 uint8 ecc_4;
94 uint8 ecc_5;
95 } yaffs_spare_t;
96
97 struct yaffs_file_var {
98 uint32 file_size;
99 uint32 stored_size;
100 uint32 shrink_size;
101 int top_level;
102 };
103
104 typedef struct yaffs2_obj_hdr {
105 uint32 type; /* enum yaffs_obj_type */
106 /* Apply to everything */
107 uint32 parent_obj_id;
108 uint16 sum_no_longer_used; /* checksum of name. No longer used */
109 char name[256];
110 uint16 chksum;
111 /* The following apply to all object types except for hard links */
112 uint32 st_mode; /* protection */
113 uint32 st_uid;
114 uint32 st_gid;
115 uint32 st_atime;
116 uint32 st_mtime;
117 uint32 st_ctime;
118 uint32 file_size_low; /* File size applies to files only */
119 int equiv_id; /* Equivalent object id applies to hard links only. */
120 char alias[160]; /* Alias is for symlinks only. */
121 uint32 st_rdev; /* stuff for block and char devices (major/min) */
122 uint32 win_ctime[2];
123 uint32 win_atime[2];
124 uint32 win_mtime[2];
125 uint32 inband_shadowed_obj_id;
126 uint32 inband_is_shrink;
127 uint32 file_size_high;
128 uint32 reserved[1];
129 int shadows_obj; /* This object header shadows the specified object if > 0 */
130 /* is_shrink applies to object headers written when we make a hole. */
131 uint32 is_shrink;
132 yaffs_file_var filehead;
133 } yaffs2_obj_hdr_t;
134
135 typedef struct yaffs2_packed_tags {
136 uint32 seq_number;
137 uint32 object_id;
138 uint32 chunk_id;
139 uint32 byte_count;
140 } yaffs2_packed_tags_t;
141"""
142
143
144class YaffsObjectType(IntEnum):
145 UNKNOWN = 0
146 FILE = 1
147 SYMLINK = 2
148 DIRECTORY = 3
149 HARDLINK = 4
150 SPECIAL = 5
151
152
153@attrs.define
154class YAFFSChunk:
155 chunk_id: int
156 offset: int
157 byte_count: int
158 object_id: int
159
160
161@attrs.define
162class YAFFS1Chunk(YAFFSChunk):
163 serial: int
164 ecc: bytes
165 page_status: int
166 block_status: int
167
168
169@attrs.define
170class YAFFS2Chunk(YAFFSChunk):
171 seq_number: int
172
173
174@attrs.define
175class YAFFSFileVar:
176 file_size: int
177 stored_size: int
178 shrink_size: int
179 top_level: int
180
181
182@attrs.define
183class YAFFSConfig:
184 endianness: Endian
185 page_size: int
186 spare_size: int
187 ecc: bool
188
189
190@attrs.define
191class YAFFSEntry:
192 object_type: YaffsObjectType
193 object_id: int
194 parent_obj_id: int
195 sum_no_longer_used: int = attrs.field(default=0)
196 name: str = attrs.field(default="")
197 alias: str = attrs.field(default="")
198 equiv_id: int = attrs.field(default=0)
199 file_size: int = attrs.field(default=0)
200 st_mode: int = attrs.field(default=0)
201 st_uid: int = attrs.field(default=0)
202 st_gid: int = attrs.field(default=0)
203 st_atime: int = attrs.field(default=0)
204 st_mtime: int = attrs.field(default=0)
205 st_ctime: int = attrs.field(default=0)
206
207 def __str__(self):
208 return f"{self.object_id}: {self.name}"
209
210
211@attrs.define(kw_only=True)
212class YAFFS2Entry(YAFFSEntry):
213 chksum: int = attrs.field(default=0)
214 st_rdev: int = attrs.field(default=0)
215 win_ctime: list[int] = attrs.field(default=[])
216 win_mtime: list[int] = attrs.field(default=[])
217 inband_shadowed_obj_id: int = attrs.field(default=0)
218 inband_is_shrink: int = attrs.field(default=0)
219 reserved: list[int] = attrs.field(default=[])
220 shadows_obj: int = attrs.field(default=0)
221 is_shrink: int = attrs.field(default=0)
222 filehead: YAFFSFileVar = attrs.field(default=None)
223
224
225def iterate_over_file(
226 file: File, config: YAFFSConfig
227) -> Iterable[tuple[int, bytes, bytes]]:
228 start_offset = file.tell()
229 page = file.read(config.page_size)
230 spare = file.read(config.spare_size)
231
232 while len(page) == config.page_size and len(spare) == config.spare_size:
233 yield (start_offset, page, spare)
234 page = file.read(config.page_size)
235 spare = file.read(config.spare_size)
236 start_offset = file.tell()
237
238
239def decode_file_size(high: int, low: int) -> int:
240 """File size can be encoded as 64 bits or 32 bits values.
241
242 If upper 32 bits are set, it's a 64 bits integer value.
243 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero.
244 """
245 if high != 0xFFFFFFFF:
246 return (high << 32) | (low & 0xFFFFFFFF)
247 if low != 0xFFFFFFFF:
248 return low
249 return 0
250
251
252def valid_name(name: bytes) -> bool:
253 # a valid name is either full of null bytes, or unicode decodable
254 try:
255 snull(name[:-1]).decode("utf-8")
256 except UnicodeDecodeError:
257 return False
258 else:
259 return True
260
261
262def is_valid_header(header) -> bool:
263 if not valid_name(header.name[:-3]):
264 return False
265 if header.type > 5:
266 return False
267 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103
268 return False
269 return True
270
271
272class YAFFSParser:
273 HEADER_STRUCT: str
274
275 def __init__(self, file: File, config: Optional[YAFFSConfig] = None):
276 self.file_entries = Tree()
277 self.data_chunks = defaultdict(list)
278 self.file = file
279 self._struct_parser = StructParser(C_DEFINITIONS)
280 self.end_offset = -1
281 if config is None:
282 self.config = self.auto_detect()
283 logger.debug("auto-detected config", config=self.config)
284 else:
285 self.config = config
286
287 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
288 raise NotImplementedError
289
290 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk:
291 raise NotImplementedError
292
293 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]:
294 raise NotImplementedError
295
296 def init_tree(self):
297 return
298
299 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002
300 self.init_tree()
301 entries = 0
302 for offset, page, spare in iterate_over_file(self.file, self.config):
303 try:
304 data_chunk = self.build_chunk(
305 spare, offset - self.config.page_size - self.config.spare_size
306 )
307 except EOFError:
308 break
309
310 # ignore chunks tagged as deleted
311 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0:
312 continue
313
314 if data_chunk.chunk_id == 0:
315 try:
316 header = self._struct_parser.parse(
317 self.HEADER_STRUCT, page, self.config.endianness
318 )
319 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3)
320 except EOFError:
321 break
322
323 if not is_valid_header(header):
324 break
325
326 if store:
327 self.insert_entry(self.build_entry(header, data_chunk))
328 entries += 1
329 elif store:
330 self.data_chunks[data_chunk.object_id].append(data_chunk)
331 if not entries:
332 raise InvalidInputFormat("YAFFS filesystem with no entries.")
333 self.end_offset = self.file.tell()
334
335 def auto_detect(self) -> YAFFSConfig:
336 """Auto-detect page_size, spare_size, and ECC using known signatures."""
337 page_size = 0
338 config = None
339 for page_size in VALID_PAGE_SIZES:
340 spare_start = self.file[page_size : page_size + SPARE_START_LEN]
341 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC):
342 config = YAFFSConfig(
343 endianness=Endian.LITTLE,
344 page_size=page_size,
345 ecc=True,
346 spare_size=-1,
347 )
348 break
349 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC):
350 config = YAFFSConfig(
351 endianness=Endian.LITTLE,
352 page_size=page_size,
353 ecc=False,
354 spare_size=-1,
355 )
356 break
357 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC):
358 config = YAFFSConfig(
359 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1
360 )
361 break
362 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC):
363 config = YAFFSConfig(
364 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1
365 )
366 break
367
368 if config is None:
369 raise InvalidInputFormat("Cannot detect YAFFS configuration.")
370
371 # If not using the ECC layout, there are 2 extra bytes at the beginning of the
372 # spare data block. Ignore them.
373
374 ecc_offset = 0 if config.ecc else 2
375
376 # The spare data signature is built dynamically, as there are repeating data patterns
377 # that we can match on to find where the spare data ends. Take this hexdump for example:
378 #
379 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................|
380 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...|
381 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
382 #
383 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then
384 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and
385 # the four bytes at 0x814 (in the next page data section) are identical. This is because
386 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four
387 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the
388 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name
389 # checksum bytes.
390 #
391 # Thus, the signature for identifying the next page section (and hence, the end of the
392 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF
393 #
394 # Note that this requires at least one non-empty subdirectory; in practice, any Linux
395 # file system should meet this requirement, but one could create a file system that
396 # does not meet this requirement.
397
398 object_id_offset = 4
399 object_id_start = page_size + ecc_offset + object_id_offset
400 object_id_end = object_id_start + 4
401 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff"
402
403 config.spare_size = (
404 self.file[object_id_end : object_id_end + page_size].find(spare_signature)
405 + object_id_offset
406 + ecc_offset
407 )
408
409 # Sanity check the spare size, make sure it looks legit
410 if config.spare_size not in VALID_SPARE_SIZES:
411 raise InvalidInputFormat(
412 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}"
413 )
414
415 return config
416
417 def insert_entry(self, entry: YAFFSEntry):
418 duplicate_node = self.get_entry(entry.object_id)
419 if duplicate_node is not None:
420 # a header chunk with the same object ID already exists
421 # in the tree, meaning the file metadata were modified,
422 # or the file got truncated / rewritten.
423 # Given that YAFFS is a log filesystem, whichever chunk comes
424 # last takes precendence.
425 self.file_entries.update_node(str(entry.object_id), data=entry)
426 return
427
428 if entry.object_id == entry.parent_obj_id:
429 self.file_entries.create_node(
430 str(entry.object_id),
431 str(entry.object_id),
432 data=entry,
433 )
434 else:
435 parent_node = self.get_entry(entry.parent_obj_id)
436 if parent_node is None:
437 logger.warning("Trying to insert an orphaned entry.", entry=entry)
438 return
439 if parent_node.object_type != YaffsObjectType.DIRECTORY:
440 logger.warning(
441 "Trying to insert an entry with non-directory parent.", entry=entry
442 )
443 return
444 self.file_entries.create_node(
445 str(entry.object_id),
446 str(entry.object_id),
447 data=entry,
448 parent=str(entry.parent_obj_id),
449 )
450
451 def get_entry(self, object_id: int) -> Optional[YAFFSEntry]:
452 try:
453 entry = self.file_entries.get_node(str(object_id))
454 if entry:
455 return entry.data
456 except NodeIDAbsentError:
457 logger.warning(
458 "Can't find entry within the YAFFS tree, something's wrong.",
459 object_id=object_id,
460 )
461 return None
462
463 def resolve_path(self, entry: YAFFSEntry) -> Path:
464 resolved_path = Path(entry.name)
465 if self.file_entries.parent(str(entry.object_id)) is not None:
466 parent_entry = self.file_entries[str(entry.parent_obj_id)].data
467 return self.resolve_path(parent_entry).joinpath(resolved_path)
468 return resolved_path
469
470 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
471 for chunk in self.get_chunks(entry.object_id):
472 yield self.file[chunk.offset : chunk.offset + chunk.byte_count]
473
474 def extract(self, fs: FileSystem):
475 for entry in [
476 self.file_entries.get_node(node)
477 for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
478 ]:
479 if entry is None or entry.data is None:
480 continue
481 self.extract_entry(entry.data, fs)
482
483 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem):
484 if entry.object_type == YaffsObjectType.UNKNOWN:
485 logger.warning("unknown entry type", entry=entry)
486 return
487
488 out_path = self.resolve_path(entry)
489
490 if entry.object_type == YaffsObjectType.SPECIAL:
491 if not isinstance(entry, YAFFS2Entry):
492 logger.warning("non YAFFS2 special object", entry=entry)
493 return
494
495 fs.mknod(out_path, entry.st_mode, entry.st_rdev)
496 elif entry.object_type == YaffsObjectType.DIRECTORY:
497 fs.mkdir(out_path, exist_ok=True)
498 elif entry.object_type == YaffsObjectType.FILE:
499 fs.write_chunks(out_path, self.get_file_chunks(entry))
500 elif entry.object_type == YaffsObjectType.SYMLINK:
501 fs.create_symlink(src=Path(entry.alias), dst=out_path)
502 elif entry.object_type == YaffsObjectType.HARDLINK:
503 dst_entry = self.file_entries[str(entry.equiv_id)].data
504 dst_path = self.resolve_path(dst_entry)
505 fs.create_hardlink(src=dst_path, dst=out_path)
506
507
508class YAFFS2Parser(YAFFSParser):
509 HEADER_STRUCT = "yaffs2_obj_hdr_t"
510
511 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk:
512 # images built without ECC have two superfluous bytes before the chunk ID.
513 if not self.config.ecc:
514 # adding two null bytes at the end only works if it's LE
515 spare = spare[2:] + b"\x00\x00"
516
517 yaffs2_packed_tags = self._struct_parser.parse(
518 "yaffs2_packed_tags_t", spare, self.config.endianness
519 )
520 logger.debug(
521 "yaffs2_packed_tags_t",
522 yaffs2_packed_tags=yaffs2_packed_tags,
523 config=self.config,
524 _verbosity=3,
525 )
526
527 return YAFFS2Chunk(
528 offset=offset,
529 chunk_id=yaffs2_packed_tags.chunk_id,
530 seq_number=yaffs2_packed_tags.seq_number,
531 byte_count=yaffs2_packed_tags.byte_count,
532 object_id=yaffs2_packed_tags.object_id,
533 )
534
535 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
536 return YAFFS2Entry(
537 object_id=chunk.object_id,
538 object_type=header.type,
539 parent_obj_id=header.parent_obj_id,
540 sum_no_longer_used=header.sum_no_longer_used,
541 name=snull(header.name[:-1]).decode("utf-8"),
542 chksum=header.chksum,
543 st_mode=header.st_mode,
544 st_uid=header.st_uid,
545 st_gid=header.st_gid,
546 st_atime=header.st_atime,
547 st_mtime=header.st_mtime,
548 st_ctime=header.st_ctime,
549 equiv_id=header.equiv_id,
550 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
551 st_rdev=header.st_rdev,
552 win_ctime=header.win_ctime,
553 win_mtime=header.win_mtime,
554 inband_shadowed_obj_id=header.inband_shadowed_obj_id,
555 inband_is_shrink=header.inband_is_shrink,
556 reserved=header.reserved,
557 shadows_obj=header.shadows_obj,
558 is_shrink=header.is_shrink,
559 filehead=YAFFSFileVar(
560 file_size=header.filehead.file_size,
561 stored_size=header.filehead.stored_size,
562 shrink_size=header.filehead.shrink_size,
563 top_level=header.filehead.top_level,
564 ),
565 file_size=decode_file_size(header.file_size_high, header.file_size_low),
566 )
567
568 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]:
569 """Return a filtered and ordered list of chunks."""
570 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number!
571
572 # As each block is allocated, the file system's
573 # sequence number is incremented and each chunk in the block is marked with that
574 # sequence number. The sequence number thus provides a way of organising the log in
575 # chronological order.
576
577 # Since we're scanning backwards, the most recently written - and thus current - chunk
578 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted.
579
580 # note: there is no deletion marker in YAFFS2
581
582 for _, chunks in itertools.groupby(
583 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id)
584 ):
585 yield max(chunks, key=lambda chunk: chunk.seq_number)
586
587 def init_tree(self):
588 # YAFFS2 do not store the root in file.
589 root = YAFFS2Entry(
590 object_type=YaffsObjectType.DIRECTORY,
591 object_id=1,
592 parent_obj_id=1,
593 )
594 self.insert_entry(root)
595
596
597class YAFFS1Parser(YAFFSParser):
598 HEADER_STRUCT = "yaffs1_obj_hdr_t"
599
600 def __init__(self, file: File, config: Optional[YAFFSConfig] = None):
601 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk
602 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare).
603 # In the future we might decide to allow for different chunk sizes.
604 config = YAFFSConfig(
605 page_size=YAFFS1_PAGE_SIZE,
606 spare_size=YAFFS1_SPARE_SIZE,
607 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS),
608 ecc=False,
609 )
610 super().__init__(file, config)
611
612 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk:
613 yaffs_sparse = self._struct_parser.parse(
614 "yaffs_spare_t", spare, self.config.endianness
615 )
616
617 yaffs_packed_tags = self._struct_parser.parse(
618 "yaffs1_packed_tags_t",
619 bytes(
620 [
621 yaffs_sparse.tag_b0,
622 yaffs_sparse.tag_b1,
623 yaffs_sparse.tag_b2,
624 yaffs_sparse.tag_b3,
625 yaffs_sparse.tag_b4,
626 yaffs_sparse.tag_b5,
627 yaffs_sparse.tag_b6,
628 yaffs_sparse.tag_b7,
629 ]
630 ),
631 self.config.endianness,
632 )
633
634 return YAFFS1Chunk(
635 offset=offset,
636 chunk_id=yaffs_packed_tags.chunk_id,
637 serial=yaffs_packed_tags.serial,
638 byte_count=yaffs_packed_tags.byte_count,
639 object_id=yaffs_packed_tags.object_id,
640 ecc=yaffs_packed_tags.ecc,
641 page_status=yaffs_sparse.page_status,
642 block_status=yaffs_sparse.block_status,
643 )
644
645 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
646 return YAFFSEntry(
647 object_type=header.type,
648 object_id=chunk.object_id,
649 parent_obj_id=header.parent_obj_id,
650 sum_no_longer_used=header.sum_no_longer_used,
651 name=snull(header.name[0:128]).decode("utf-8"),
652 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
653 file_size=header.file_size,
654 equiv_id=header.equivalent_object_id,
655 )
656
657 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]:
658 """Return a filtered and ordered list of chunks."""
659 # YAFFS1 chunks have a serial number that is used to track
660 # which chunk takes precedence if two chunks have the same
661 # identifier. This is used in scenarios like power loss
662 # during a copy operation. Whenever we have two chunks with
663 # the same id, we only return the one with the highest serial.
664
665 for _, chunks in itertools.groupby(
666 sorted(
667 self.data_chunks[object_id],
668 key=lambda chunk: chunk.chunk_id,
669 )
670 ):
671 # serial is a 2 bit, this function works since there's always at most
672 # two chunks with the same chunk_id at any given time
673 yield max(chunks, key=lambda chunk: ((chunk.serial + 1) & 3))
674
675
676def is_yaffs_v1(file: File, start_offset: int) -> bool:
677 struct_parser = StructParser(C_DEFINITIONS)
678 file.seek(start_offset, io.SEEK_SET)
679 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00":
680 endian = Endian.LITTLE
681 else:
682 endian = Endian.BIG
683 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET)
684 spare = file.read(YAFFS1_SPARE_SIZE)
685
686 yaffs_sparse = struct_parser.parse("yaffs_spare_t", spare, endian)
687
688 yaffs_packed_tags = struct_parser.parse(
689 "yaffs1_packed_tags_t",
690 bytes(
691 [
692 yaffs_sparse.tag_b0,
693 yaffs_sparse.tag_b1,
694 yaffs_sparse.tag_b2,
695 yaffs_sparse.tag_b3,
696 yaffs_sparse.tag_b4,
697 yaffs_sparse.tag_b5,
698 yaffs_sparse.tag_b6,
699 yaffs_sparse.tag_b7,
700 ]
701 ),
702 endian,
703 )
704 file.seek(start_offset, io.SEEK_SET)
705 return (
706 yaffs_packed_tags.chunk_id == 0
707 and yaffs_packed_tags.serial == 0
708 and yaffs_packed_tags.object_id == 1
709 )
710
711
712def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser:
713 if is_yaffs_v1(file, start_offset):
714 return YAFFS1Parser(file)
715 return YAFFS2Parser(file)
716
717
718class YAFFSExtractor(Extractor):
719 def extract(self, inpath: Path, outdir: Path):
720 infile = File.from_path(inpath)
721 parser = instantiate_parser(infile)
722 parser.parse(store=True)
723 fs = FileSystem(outdir)
724 parser.extract(fs)
725 return ExtractResult(reports=fs.problems)
726
727
728class YAFFSHandler(Handler):
729 NAME = "yaffs"
730
731 PATTERNS = [
732 HexString(
733 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian"
734 ),
735 HexString(
736 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian"
737 ),
738 HexString(
739 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian"
740 ),
741 HexString(
742 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian"
743 ),
744 ]
745
746 EXTRACTOR = YAFFSExtractor()
747
748 DOC = HandlerDoc(
749 name="YAFFS",
750 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.",
751 handler_type=HandlerType.FILESYSTEM,
752 vendor=None,
753 references=[
754 Reference(
755 title="YAFFS Documentation",
756 url="https://yaffs.net/",
757 ),
758 Reference(
759 title="YAFFS Wikipedia",
760 url="https://en.wikipedia.org/wiki/YAFFS",
761 ),
762 ],
763 limitations=[],
764 )
765
766 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
767 parser = instantiate_parser(file, start_offset)
768 parser.parse()
769 # skip 0xFF padding
770 file.seek(parser.end_offset, io.SEEK_SET)
771 read_until_past(file, b"\xff")
772 return ValidChunk(start_offset=start_offset, end_offset=file.tell())