1import io
2import itertools
3from collections import defaultdict
4from collections.abc import Iterable
5from enum import IntEnum
6from pathlib import Path
7
8import attrs
9from structlog import get_logger
10from treelib.exceptions import NodeIDAbsentError
11from treelib.tree import Tree
12
13from unblob.file_utils import (
14 Endian,
15 File,
16 FileSystem,
17 InvalidInputFormat,
18 StructParser,
19 get_endian_multi,
20 read_until_past,
21 snull,
22)
23from unblob.models import (
24 Extractor,
25 ExtractResult,
26 Handler,
27 HandlerDoc,
28 HandlerType,
29 HexString,
30 Reference,
31 ValidChunk,
32)
33
34logger = get_logger()
35
36SPARE_START_BIG_ENDIAN_ECC = b"\x00\x00\x10\x00"
37SPARE_START_BIG_ENDIAN_NO_ECC = b"\xff\xff\x00\x00\x10\x00"
38SPARE_START_LITTLE_ENDIAN_ECC = b"\x00\x10\x00\x00"
39SPARE_START_LITTLE_ENDIAN_NO_ECC = b"\xff\xff\x00\x10\x00\x00"
40SPARE_START_LEN = 6
41
42# YAFFS_OBJECT_TYPE_DIRECTORY, YAFFS_OBJECT_TYPE_FILE
43BIG_ENDIAN_MAGICS = [0x00_00_00_01, 0x00_00_00_03]
44
45VALID_PAGE_SIZES = [512, 1024, 2048, 4096, 8192, 16384, 2032]
46VALID_SPARE_SIZES = [16, 32, 64, 128, 256, 512]
47YAFFS1_PAGE_SIZE = 512
48YAFFS1_SPARE_SIZE = 16
49
50C_DEFINITIONS = """
51 struct yaffs1_obj_hdr {
52 uint32 type; /* enum yaffs_obj_type */
53 uint32 parent_obj_id;
54 uint16 sum_no_longer_used;
55 char name[258];
56 uint32 st_mode; // protection
57 uint32 st_uid; // user ID of owner
58 uint32 st_gid; // group ID of owner
59 uint32 st_atime; // time of last access
60 uint32 st_mtime; // time of last modification
61 uint32 st_ctime; // time of last change
62 uint32 file_size; // File size applies to files only
63 uint32 equivalent_object_id; // Equivalent object id applies to hard links only.
64 char alias[160]; // alias only applies to symlinks
65 } yaffs1_obj_hdr_t;
66
67 struct yaffs1_packed_tags {
68 uint32 chunk_id:20;
69 uint32 serial:2;
70 uint32 byte_count:10;
71 uint32 object_id:18;
72 uint32 ecc:12;
73 uint32 unused:2;
74 } yaffs1_packed_tags_t;
75
76 typedef struct yaffs_spare
77 {
78 uint8 tag_b0;
79 uint8 tag_b1;
80 uint8 tag_b2;
81 uint8 tag_b3;
82 uint8 page_status; // set to 0 to delete the chunk
83 uint8 block_status;
84 uint8 tag_b4;
85 uint8 tag_b5;
86 uint8 ecc_0;
87 uint8 ecc_1;
88 uint8 ecc_2;
89 uint8 tag_b6;
90 uint8 tag_b7;
91 uint8 ecc_3;
92 uint8 ecc_4;
93 uint8 ecc_5;
94 } yaffs_spare_t;
95
96 struct yaffs_file_var {
97 uint32 file_size;
98 uint32 stored_size;
99 uint32 shrink_size;
100 int top_level;
101 };
102
103 typedef struct yaffs2_obj_hdr {
104 uint32 type; /* enum yaffs_obj_type */
105 /* Apply to everything */
106 uint32 parent_obj_id;
107 uint16 sum_no_longer_used; /* checksum of name. No longer used */
108 char name[256];
109 uint16 chksum;
110 /* The following apply to all object types except for hard links */
111 uint32 st_mode; /* protection */
112 uint32 st_uid;
113 uint32 st_gid;
114 uint32 st_atime;
115 uint32 st_mtime;
116 uint32 st_ctime;
117 uint32 file_size_low; /* File size applies to files only */
118 int equiv_id; /* Equivalent object id applies to hard links only. */
119 char alias[160]; /* Alias is for symlinks only. */
120 uint32 st_rdev; /* stuff for block and char devices (major/min) */
121 uint32 win_ctime[2];
122 uint32 win_atime[2];
123 uint32 win_mtime[2];
124 uint32 inband_shadowed_obj_id;
125 uint32 inband_is_shrink;
126 uint32 file_size_high;
127 uint32 reserved[1];
128 int shadows_obj; /* This object header shadows the specified object if > 0 */
129 /* is_shrink applies to object headers written when we make a hole. */
130 uint32 is_shrink;
131 yaffs_file_var filehead;
132 } yaffs2_obj_hdr_t;
133
134 typedef struct yaffs2_packed_tags {
135 uint32 seq_number;
136 uint32 object_id;
137 uint32 chunk_id;
138 uint32 byte_count;
139 } yaffs2_packed_tags_t;
140"""
141
142_STRUCT_PARSER = StructParser(C_DEFINITIONS)
143
144
145class YaffsObjectType(IntEnum):
146 UNKNOWN = 0
147 FILE = 1
148 SYMLINK = 2
149 DIRECTORY = 3
150 HARDLINK = 4
151 SPECIAL = 5
152
153
154@attrs.define
155class YAFFSChunk:
156 chunk_id: int
157 offset: int
158 byte_count: int
159 object_id: int
160
161
162@attrs.define
163class YAFFS1Chunk(YAFFSChunk):
164 serial: int
165 ecc: bytes
166 page_status: int
167 block_status: int
168
169
170@attrs.define
171class YAFFS2Chunk(YAFFSChunk):
172 seq_number: int
173
174
175@attrs.define
176class YAFFSFileVar:
177 file_size: int
178 stored_size: int
179 shrink_size: int
180 top_level: int
181
182
183@attrs.define
184class YAFFSConfig:
185 endianness: Endian
186 page_size: int
187 spare_size: int
188 ecc: bool
189
190
191@attrs.define
192class YAFFSEntry:
193 object_type: YaffsObjectType
194 object_id: int
195 parent_obj_id: int
196 sum_no_longer_used: int = attrs.field(default=0)
197 name: str = attrs.field(default="")
198 alias: str = attrs.field(default="")
199 equiv_id: int = attrs.field(default=0)
200 file_size: int = attrs.field(default=0)
201 st_mode: int = attrs.field(default=0)
202 st_uid: int = attrs.field(default=0)
203 st_gid: int = attrs.field(default=0)
204 st_atime: int = attrs.field(default=0)
205 st_mtime: int = attrs.field(default=0)
206 st_ctime: int = attrs.field(default=0)
207
208 def __str__(self):
209 return f"{self.object_id}: {self.name}"
210
211
212@attrs.define(kw_only=True)
213class YAFFS2Entry(YAFFSEntry):
214 chksum: int = attrs.field(default=0)
215 st_rdev: int = attrs.field(default=0)
216 win_ctime: list[int] = attrs.field(default=[])
217 win_mtime: list[int] = attrs.field(default=[])
218 inband_shadowed_obj_id: int = attrs.field(default=0)
219 inband_is_shrink: int = attrs.field(default=0)
220 reserved: list[int] = attrs.field(default=[])
221 shadows_obj: int = attrs.field(default=0)
222 is_shrink: int = attrs.field(default=0)
223 filehead: YAFFSFileVar = attrs.field(default=None)
224
225
226def iterate_over_file(
227 file: File, config: YAFFSConfig
228) -> Iterable[tuple[int, bytes, bytes]]:
229 start_offset = file.tell()
230 page = file.read(config.page_size)
231 spare = file.read(config.spare_size)
232
233 while len(page) == config.page_size and len(spare) == config.spare_size:
234 yield (start_offset, page, spare)
235 start_offset = file.tell()
236 page = file.read(config.page_size)
237 spare = file.read(config.spare_size)
238
239
240def decode_file_size(high: int, low: int) -> int:
241 """File size can be encoded as 64 bits or 32 bits values.
242
243 If upper 32 bits are set, it's a 64 bits integer value.
244 Otherwise it's a 32 bits value. 0xFFFFFFFF means zero.
245 """
246 if high != 0xFFFFFFFF:
247 return (high << 32) | (low & 0xFFFFFFFF)
248 if low != 0xFFFFFFFF:
249 return low
250 return 0
251
252
253def valid_name(name: bytes) -> bool:
254 # a valid name is either full of null bytes, or unicode decodable
255 try:
256 snull(name[:-1]).decode("utf-8")
257 except UnicodeDecodeError:
258 return False
259 else:
260 return True
261
262
263def is_valid_header(header) -> bool:
264 if not valid_name(header.name[:-3]):
265 return False
266 if header.type > 5:
267 return False
268 if header.sum_no_longer_used != 0xFFFF: # noqa: SIM103
269 return False
270 return True
271
272
273class YAFFSParser:
274 HEADER_STRUCT: str
275
276 def __init__(self, file: File, config: YAFFSConfig | None = None):
277 self.file_entries = Tree()
278 self.data_chunks = defaultdict(list)
279 self.file = file
280 self.end_offset = -1
281 if config is None:
282 self.config = self.auto_detect()
283 logger.debug("auto-detected config", config=self.config)
284 else:
285 self.config = config
286
287 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
288 raise NotImplementedError
289
290 def build_chunk(self, spare: bytes, offset: int) -> YAFFSChunk:
291 raise NotImplementedError
292
293 def get_chunks(self, object_id: int) -> Iterable[YAFFSChunk]:
294 raise NotImplementedError
295
296 def init_tree(self):
297 return
298
299 def parse(self, store: bool = False): # noqa: C901,FBT001,FBT002
300 self.init_tree()
301 entries = 0
302 for offset, page, spare in iterate_over_file(self.file, self.config):
303 try:
304 data_chunk = self.build_chunk(spare, offset)
305 except EOFError:
306 break
307
308 # ignore chunks tagged as deleted
309 if isinstance(data_chunk, YAFFS1Chunk) and data_chunk.page_status == 0x0:
310 continue
311
312 if data_chunk.chunk_id == 0:
313 try:
314 header = _STRUCT_PARSER.parse(
315 self.HEADER_STRUCT, page, self.config.endianness
316 )
317 logger.debug(self.HEADER_STRUCT, yaffs_obj_hdr=header, _verbosity=3)
318 except EOFError:
319 break
320
321 if not is_valid_header(header):
322 break
323
324 if store:
325 self.insert_entry(self.build_entry(header, data_chunk))
326 entries += 1
327 elif store:
328 self.data_chunks[data_chunk.object_id].append(data_chunk)
329 if not entries:
330 raise InvalidInputFormat("YAFFS filesystem with no entries.")
331 self.end_offset = self.file.tell()
332
333 def auto_detect(self) -> YAFFSConfig:
334 """Auto-detect page_size, spare_size, and ECC using known signatures."""
335 page_size = 0
336 config = None
337 for page_size in VALID_PAGE_SIZES:
338 spare_start = self.file[page_size : page_size + SPARE_START_LEN]
339 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_ECC):
340 config = YAFFSConfig(
341 endianness=Endian.LITTLE,
342 page_size=page_size,
343 ecc=True,
344 spare_size=-1,
345 )
346 break
347 if spare_start.startswith(SPARE_START_LITTLE_ENDIAN_NO_ECC):
348 config = YAFFSConfig(
349 endianness=Endian.LITTLE,
350 page_size=page_size,
351 ecc=False,
352 spare_size=-1,
353 )
354 break
355 if spare_start.startswith(SPARE_START_BIG_ENDIAN_ECC):
356 config = YAFFSConfig(
357 endianness=Endian.BIG, page_size=page_size, ecc=True, spare_size=-1
358 )
359 break
360 if spare_start.startswith(SPARE_START_BIG_ENDIAN_NO_ECC):
361 config = YAFFSConfig(
362 endianness=Endian.BIG, page_size=page_size, ecc=False, spare_size=-1
363 )
364 break
365
366 if config is None:
367 raise InvalidInputFormat("Cannot detect YAFFS configuration.")
368
369 # If not using the ECC layout, there are 2 extra bytes at the beginning of the
370 # spare data block. Ignore them.
371
372 ecc_offset = 0 if config.ecc else 2
373
374 # The spare data signature is built dynamically, as there are repeating data patterns
375 # that we can match on to find where the spare data ends. Take this hexdump for example:
376 #
377 # 00000800 00 10 00 00 01 01 00 00 00 00 00 00 ff ff ff ff |................|
378 # 00000810 03 00 00 00 01 01 00 00 ff ff 62 61 72 00 00 00 |..........bar...|
379 # 00000820 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
380 #
381 # The spare data starts at offset 0x800 and is 16 bytes in size. The next page data then
382 # starts at offset 0x810. Not that the four bytes at 0x804 (in the spare data section) and
383 # the four bytes at 0x814 (in the next page data section) are identical. This is because
384 # the four bytes at offset 0x804 represent the object ID of the previous object, and the four
385 # bytes at offset 0x814 represent the parent object ID of the next object. Also, the
386 # four bytes in the page data are always followed by 0xFFFF, as those are the unused name
387 # checksum bytes.
388 #
389 # Thus, the signature for identifying the next page section (and hence, the end of the
390 # spare data section) becomes: [the 4 bytes starting at offset 0x804] + 0xFFFF
391 #
392 # Note that this requires at least one non-empty subdirectory; in practice, any Linux
393 # file system should meet this requirement, but one could create a file system that
394 # does not meet this requirement.
395
396 object_id_offset = 4
397 object_id_start = page_size + ecc_offset + object_id_offset
398 object_id_end = object_id_start + 4
399 spare_signature = self.file[object_id_start:object_id_end] + b"\xff\xff"
400
401 config.spare_size = (
402 self.file[object_id_end : object_id_end + page_size].find(spare_signature)
403 + object_id_offset
404 + ecc_offset
405 )
406
407 # Sanity check the spare size, make sure it looks legit
408 if config.spare_size not in VALID_SPARE_SIZES:
409 raise InvalidInputFormat(
410 f"Auto-detection failed: Detected an unlikely spare size: {config.spare_size}"
411 )
412
413 return config
414
415 def insert_entry(self, entry: YAFFSEntry):
416 duplicate_node = self.get_entry(entry.object_id)
417 if duplicate_node is not None:
418 # a header chunk with the same object ID already exists
419 # in the tree, meaning the file metadata were modified,
420 # or the file got truncated / rewritten.
421 # Given that YAFFS is a log filesystem, whichever chunk comes
422 # last takes precendence.
423 self.file_entries.update_node(str(entry.object_id), data=entry)
424 return
425
426 if entry.object_id == entry.parent_obj_id:
427 self.file_entries.create_node(
428 str(entry.object_id),
429 str(entry.object_id),
430 data=entry,
431 )
432 else:
433 parent_node = self.get_entry(entry.parent_obj_id)
434 if parent_node is None:
435 logger.warning("Trying to insert an orphaned entry.", entry=entry)
436 return
437 if parent_node.object_type != YaffsObjectType.DIRECTORY:
438 logger.warning(
439 "Trying to insert an entry with non-directory parent.", entry=entry
440 )
441 return
442 self.file_entries.create_node(
443 str(entry.object_id),
444 str(entry.object_id),
445 data=entry,
446 parent=str(entry.parent_obj_id),
447 )
448
449 def get_entry(self, object_id: int) -> YAFFSEntry | None:
450 try:
451 entry = self.file_entries.get_node(str(object_id))
452 if entry:
453 return entry.data
454 except NodeIDAbsentError:
455 logger.warning(
456 "Can't find entry within the YAFFS tree, something's wrong.",
457 object_id=object_id,
458 )
459 return None
460
461 def resolve_path(self, entry: YAFFSEntry) -> Path:
462 resolved_path = Path(entry.name)
463 if self.file_entries.parent(str(entry.object_id)) is not None:
464 parent_entry = self.file_entries[str(entry.parent_obj_id)].data
465 return self.resolve_path(parent_entry).joinpath(resolved_path)
466 return resolved_path
467
468 def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
469 for chunk in self.get_chunks(entry.object_id):
470 byte_count = min(chunk.byte_count, self.config.page_size)
471 yield self.file[chunk.offset : chunk.offset + byte_count]
472
473 def extract(self, fs: FileSystem):
474 for entry in [
475 self.file_entries.get_node(node)
476 for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
477 ]:
478 if entry is None or entry.data is None:
479 continue
480 self.extract_entry(entry.data, fs)
481
482 def extract_entry(self, entry: YAFFSEntry, fs: FileSystem):
483 if entry.object_type == YaffsObjectType.UNKNOWN:
484 logger.warning("unknown entry type", entry=entry)
485 return
486
487 out_path = self.resolve_path(entry)
488
489 if entry.object_type == YaffsObjectType.SPECIAL:
490 if not isinstance(entry, YAFFS2Entry):
491 logger.warning("non YAFFS2 special object", entry=entry)
492 return
493
494 fs.mknod(out_path, entry.st_mode, entry.st_rdev)
495 elif entry.object_type == YaffsObjectType.DIRECTORY:
496 fs.mkdir(out_path, exist_ok=True)
497 elif entry.object_type == YaffsObjectType.FILE:
498 fs.write_chunks(out_path, self.get_file_chunks(entry))
499 elif entry.object_type == YaffsObjectType.SYMLINK:
500 fs.create_symlink(src=Path(entry.alias), dst=out_path)
501 elif entry.object_type == YaffsObjectType.HARDLINK:
502 dst_entry = self.file_entries[str(entry.equiv_id)].data
503 dst_path = self.resolve_path(dst_entry)
504 fs.create_hardlink(src=dst_path, dst=out_path)
505
506
507class YAFFS2Parser(YAFFSParser):
508 HEADER_STRUCT = "yaffs2_obj_hdr_t"
509
510 def build_chunk(self, spare: bytes, offset: int) -> YAFFS2Chunk:
511 # images built without ECC have two superfluous bytes before the chunk ID.
512 if not self.config.ecc:
513 # adding two null bytes at the end only works if it's LE
514 spare = spare[2:] + b"\x00\x00"
515
516 yaffs2_packed_tags = _STRUCT_PARSER.parse(
517 "yaffs2_packed_tags_t", spare, self.config.endianness
518 )
519 logger.debug(
520 "yaffs2_packed_tags_t",
521 yaffs2_packed_tags=yaffs2_packed_tags,
522 config=self.config,
523 _verbosity=3,
524 )
525
526 return YAFFS2Chunk(
527 offset=offset,
528 chunk_id=yaffs2_packed_tags.chunk_id,
529 seq_number=yaffs2_packed_tags.seq_number,
530 byte_count=yaffs2_packed_tags.byte_count,
531 object_id=yaffs2_packed_tags.object_id,
532 )
533
534 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
535 return YAFFS2Entry(
536 object_id=chunk.object_id,
537 object_type=header.type,
538 parent_obj_id=header.parent_obj_id,
539 sum_no_longer_used=header.sum_no_longer_used,
540 name=snull(header.name[:-1]).decode("utf-8"),
541 chksum=header.chksum,
542 st_mode=header.st_mode,
543 st_uid=header.st_uid,
544 st_gid=header.st_gid,
545 st_atime=header.st_atime,
546 st_mtime=header.st_mtime,
547 st_ctime=header.st_ctime,
548 equiv_id=header.equiv_id,
549 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
550 st_rdev=header.st_rdev,
551 win_ctime=header.win_ctime,
552 win_mtime=header.win_mtime,
553 inband_shadowed_obj_id=header.inband_shadowed_obj_id,
554 inband_is_shrink=header.inband_is_shrink,
555 reserved=header.reserved,
556 shadows_obj=header.shadows_obj,
557 is_shrink=header.is_shrink,
558 filehead=YAFFSFileVar(
559 file_size=header.filehead.file_size,
560 stored_size=header.filehead.stored_size,
561 shrink_size=header.filehead.shrink_size,
562 top_level=header.filehead.top_level,
563 ),
564 file_size=decode_file_size(header.file_size_high, header.file_size_low),
565 )
566
567 def get_chunks(self, object_id: int) -> Iterable[YAFFS2Chunk]:
568 """Return a filtered and ordered list of chunks."""
569 # The Yaffs2 sequence number is not the same as the Yaffs1 serial number!
570
571 # As each block is allocated, the file system's
572 # sequence number is incremented and each chunk in the block is marked with that
573 # sequence number. The sequence number thus provides a way of organising the log in
574 # chronological order.
575
576 # Since we're scanning backwards, the most recently written - and thus current - chunk
577 # matching an obj_id:chunk_id pair will be encountered first and all subsequent matching chunks must be obsolete and treated as deleted.
578
579 # note: there is no deletion marker in YAFFS2
580
581 for _, chunks in itertools.groupby(
582 sorted(self.data_chunks[object_id], key=lambda chunk: chunk.chunk_id)
583 ):
584 yield max(chunks, key=lambda chunk: chunk.seq_number)
585
586 def init_tree(self):
587 # YAFFS2 do not store the root in file.
588 root = YAFFS2Entry(
589 object_type=YaffsObjectType.DIRECTORY,
590 object_id=1,
591 parent_obj_id=1,
592 )
593 self.insert_entry(root)
594
595
596class YAFFS1Parser(YAFFSParser):
597 HEADER_STRUCT = "yaffs1_obj_hdr_t"
598
599 def __init__(self, file: File, config: YAFFSConfig | None = None):
600 # from https://yaffs.net/archives/yaffs-development-notes: currently each chunk
601 # is the same size as a NAND flash page (ie. 512 bytes + 16 byte spare).
602 # In the future we might decide to allow for different chunk sizes.
603 config = YAFFSConfig(
604 page_size=YAFFS1_PAGE_SIZE,
605 spare_size=YAFFS1_SPARE_SIZE,
606 endianness=get_endian_multi(file, BIG_ENDIAN_MAGICS),
607 ecc=False,
608 )
609 super().__init__(file, config)
610
611 def build_chunk(self, spare: bytes, offset: int) -> YAFFS1Chunk:
612 yaffs_sparse = _STRUCT_PARSER.parse(
613 "yaffs_spare_t", spare, self.config.endianness
614 )
615
616 yaffs_packed_tags = _STRUCT_PARSER.parse(
617 "yaffs1_packed_tags_t",
618 bytes(
619 [
620 yaffs_sparse.tag_b0,
621 yaffs_sparse.tag_b1,
622 yaffs_sparse.tag_b2,
623 yaffs_sparse.tag_b3,
624 yaffs_sparse.tag_b4,
625 yaffs_sparse.tag_b5,
626 yaffs_sparse.tag_b6,
627 yaffs_sparse.tag_b7,
628 ]
629 ),
630 self.config.endianness,
631 )
632
633 return YAFFS1Chunk(
634 offset=offset,
635 chunk_id=yaffs_packed_tags.chunk_id,
636 serial=yaffs_packed_tags.serial,
637 byte_count=yaffs_packed_tags.byte_count,
638 object_id=yaffs_packed_tags.object_id,
639 ecc=yaffs_packed_tags.ecc,
640 page_status=yaffs_sparse.page_status,
641 block_status=yaffs_sparse.block_status,
642 )
643
644 def build_entry(self, header, chunk: YAFFSChunk) -> YAFFSEntry:
645 return YAFFSEntry(
646 object_type=header.type,
647 object_id=chunk.object_id,
648 parent_obj_id=header.parent_obj_id,
649 sum_no_longer_used=header.sum_no_longer_used,
650 name=snull(header.name[0:128]).decode("utf-8"),
651 alias=snull(header.alias.replace(b"\xff", b"")).decode("utf-8"),
652 file_size=header.file_size,
653 equiv_id=header.equivalent_object_id,
654 )
655
656 def get_chunks(self, object_id: int) -> Iterable[YAFFS1Chunk]:
657 """Return a filtered and ordered list of chunks."""
658 # YAFFS1 chunks have a serial number that is used to track
659 # which chunk takes precedence if two chunks have the same
660 # identifier. This is used in scenarios like power loss
661 # during a copy operation. Whenever we have two chunks with
662 # the same id, we only return the one with the highest serial.
663
664 for _, chunks in itertools.groupby(
665 sorted(
666 self.data_chunks[object_id],
667 key=lambda chunk: chunk.chunk_id,
668 )
669 ):
670 # serial is a 2 bit, this function works since there's always at most
671 # two chunks with the same chunk_id at any given time
672 yield max(chunks, key=lambda chunk: (chunk.serial + 1) & 3)
673
674
675def is_yaffs_v1(file: File, start_offset: int) -> bool:
676 file.seek(start_offset, io.SEEK_SET)
677 if file[0:4] == b"\x03\x00\x00\x00" or file[0:4] == b"\x01\x00\x00\x00":
678 endian = Endian.LITTLE
679 else:
680 endian = Endian.BIG
681 file.seek(start_offset + YAFFS1_PAGE_SIZE, io.SEEK_SET)
682 spare = file.read(YAFFS1_SPARE_SIZE)
683
684 yaffs_sparse = _STRUCT_PARSER.parse("yaffs_spare_t", spare, endian)
685
686 yaffs_packed_tags = _STRUCT_PARSER.parse(
687 "yaffs1_packed_tags_t",
688 bytes(
689 [
690 yaffs_sparse.tag_b0,
691 yaffs_sparse.tag_b1,
692 yaffs_sparse.tag_b2,
693 yaffs_sparse.tag_b3,
694 yaffs_sparse.tag_b4,
695 yaffs_sparse.tag_b5,
696 yaffs_sparse.tag_b6,
697 yaffs_sparse.tag_b7,
698 ]
699 ),
700 endian,
701 )
702 file.seek(start_offset, io.SEEK_SET)
703 return (
704 yaffs_packed_tags.chunk_id == 0
705 and yaffs_packed_tags.serial == 0
706 and yaffs_packed_tags.object_id == 1
707 )
708
709
710def instantiate_parser(file: File, start_offset: int = 0) -> YAFFSParser:
711 if is_yaffs_v1(file, start_offset):
712 return YAFFS1Parser(file)
713 return YAFFS2Parser(file)
714
715
716class YAFFSExtractor(Extractor):
717 def extract(self, inpath: Path, outdir: Path):
718 infile = File.from_path(inpath)
719 parser = instantiate_parser(infile)
720 parser.parse(store=True)
721 fs = FileSystem(outdir)
722 parser.extract(fs)
723 return ExtractResult(reports=fs.problems)
724
725
726class YAFFSHandler(Handler):
727 NAME = "yaffs"
728
729 PATTERNS = [
730 HexString(
731 "03 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in little endian"
732 ),
733 HexString(
734 "01 00 00 00 01 00 00 00 ff ff // YAFFS_OBJECT_TYPE_FILE in little endian"
735 ),
736 HexString(
737 "00 00 00 03 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_DIRECTORY in big endian"
738 ),
739 HexString(
740 "00 00 00 01 00 00 00 01 ff ff // YAFFS_OBJECT_TYPE_FILE in big endian"
741 ),
742 ]
743
744 EXTRACTOR = YAFFSExtractor()
745
746 DOC = HandlerDoc(
747 name="YAFFS",
748 description="YAFFS (Yet Another Flash File System) is a log-structured file system designed for NAND flash memory, storing data in fixed-size chunks with associated metadata. It supports features like wear leveling, error correction, and efficient handling of power loss scenarios.",
749 handler_type=HandlerType.FILESYSTEM,
750 vendor=None,
751 references=[
752 Reference(
753 title="YAFFS Documentation",
754 url="https://yaffs.net/",
755 ),
756 Reference(
757 title="YAFFS Wikipedia",
758 url="https://en.wikipedia.org/wiki/YAFFS",
759 ),
760 ],
761 limitations=[],
762 )
763
764 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
765 parser = instantiate_parser(file, start_offset)
766 parser.parse()
767 # skip 0xFF padding
768 file.seek(parser.end_offset, io.SEEK_SET)
769 read_until_past(file, b"\xff")
770 return ValidChunk(start_offset=start_offset, end_offset=file.tell())