1import enum
2import functools
3import hashlib
4import io
5import math
6import mmap
7import os
8import re
9import shutil
10import struct
11import sys
12import unicodedata
13from collections.abc import Iterable, Iterator
14from pathlib import Path
15from typing import Literal, Optional, Protocol, Union, overload
16
17from dissect.cstruct import cstruct
18from structlog import get_logger
19
20from .logging import format_hex
21from .report import (
22 ExtractionProblem,
23 LinkExtractionProblem,
24 PathTraversalProblem,
25 Report,
26 SpecialFileExtractionProblem,
27)
28
29DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore
30logger = get_logger()
31
32
33def is_safe_path(basedir: Path, path: Path) -> bool:
34 try:
35 basedir.joinpath(path).resolve().relative_to(basedir.resolve())
36 except ValueError:
37 return False
38 return True
39
40
41class SeekError(ValueError):
42 """Specific ValueError for File.seek."""
43
44
45class File(mmap.mmap):
46 access: int
47
48 @classmethod
49 def from_bytes(cls, content: Union[bytes, bytearray]):
50 if not content:
51 raise ValueError("Can't create File from empty bytes.")
52 m = cls(-1, len(content))
53 m.write(content)
54 m.seek(0)
55 m.access = mmap.ACCESS_WRITE
56 return m
57
58 @classmethod
59 def from_path(cls, path: Path, access=mmap.ACCESS_READ):
60 """Create File.
61
62 Needs a valid non-empty file,
63 raises ValueError on empty files.
64 """
65 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb"
66 with path.open(mode) as base_file:
67 m = cls(base_file.fileno(), 0, access=access)
68 m.access = access
69 return m
70
71 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride]
72 try:
73 super().seek(pos, whence)
74 except ValueError as e:
75 raise SeekError from e
76 return self.tell()
77
78 def size(self) -> int:
79 size = 0
80 try:
81 size = super().size()
82 except OSError:
83 # the file was built with from_bytes() so it's not on disk,
84 # triggering an OSError on fstat() call
85 current_offset = self.tell()
86 self.seek(0, io.SEEK_END)
87 size = self.tell()
88 self.seek(current_offset, io.SEEK_SET)
89
90 return size
91
92 def __enter__(self):
93 return self
94
95 def __exit__(self, *args):
96 self.close()
97
98 def readable(self) -> bool:
99 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY)
100
101 def writable(self) -> bool:
102 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY)
103
104 if sys.version_info < (3, 13):
105
106 def seekable(self) -> Literal[True]:
107 return True # Memory-mapped files are always seekable
108
109
110class OffsetFile:
111 def __init__(self, file: File, offset: int):
112 self._file = file
113 self._offset = offset
114 self._file.seek(offset)
115
116 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int:
117 if whence == os.SEEK_SET:
118 pos += self._offset
119 self._file.seek(pos, whence)
120 return self._file.tell() - self._offset
121
122 def read(self, n=None):
123 return self._file.read(n)
124
125 def tell(self):
126 return self._file.tell() - self._offset
127
128
129class InvalidInputFormat(Exception):
130 pass
131
132
133class Endian(enum.Enum):
134 LITTLE = "<"
135 BIG = ">"
136
137
138def iterbits(file: File) -> Iterator[int]:
139 """bit-wise reading of file in little-endian mode."""
140 while cur_bytes := file.read(DEFAULT_BUFSIZE):
141 for b in cur_bytes:
142 for i in range(7, -1, -1):
143 yield (b >> i) & 1
144
145
146def snull(content: bytes):
147 """Strip null bytes from the end of the string."""
148 return content.rstrip(b"\x00")
149
150
151def round_down(size: int, alignment: int):
152 """Round down size to the alignment boundary."""
153 return alignment * math.floor(size / alignment)
154
155
156def round_up(size: int, alignment: int):
157 """Round up size to the alignment boundary."""
158 return alignment * math.ceil(size / alignment)
159
160
161def convert_int8(value: bytes, endian: Endian) -> int:
162 """Convert 1 byte integer to a Python int."""
163 try:
164 return struct.unpack(f"{endian.value}B", value)[0]
165 except struct.error as exc:
166 raise InvalidInputFormat from exc
167
168
169def convert_int16(value: bytes, endian: Endian) -> int:
170 """Convert 2 byte integer to a Python int."""
171 try:
172 return struct.unpack(f"{endian.value}H", value)[0]
173 except struct.error as exc:
174 raise InvalidInputFormat from exc
175
176
177def convert_int32(value: bytes, endian: Endian) -> int:
178 """Convert 4 byte integer to a Python int."""
179 try:
180 return struct.unpack(f"{endian.value}I", value)[0]
181 except struct.error as exc:
182 raise InvalidInputFormat from exc
183
184
185def convert_int64(value: bytes, endian: Endian) -> int:
186 """Convert 8 byte integer to a Python int."""
187 try:
188 return struct.unpack(f"{endian.value}Q", value)[0]
189 except struct.error as exc:
190 raise InvalidInputFormat from exc
191
192
193def decode_int(value, base: int) -> int:
194 try:
195 return int(value, base)
196 except ValueError as exc:
197 raise InvalidInputFormat from exc
198
199
200def decode_multibyte_integer(data: Union[bytes, bytearray]) -> tuple[int, int]:
201 """Decode multi-bytes integer into integer size and integer value.
202
203 Multibyte integers of static length are stored in little endian byte order.
204
205 When smaller values are more likely than bigger values (for example file sizes),
206 multibyte integers are encoded in a variable-length representation:
207 - Numbers in the range [0, 127] are copied as is, and take one byte of space.
208 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte
209 representation have the highest (eighth) bit set.
210 """
211 value = 0
212 for size, byte in enumerate(data):
213 value |= (byte & 0x7F) << (size * 7)
214 if not byte & 0x80:
215 return (size + 1, value)
216 raise InvalidInputFormat("Multibyte integer decoding failed.")
217
218
219def iterate_patterns(
220 file: File, pattern: bytes, chunk_size: int = 0x1000
221) -> Iterator[int]:
222 """Iterate on the file searching for pattern until all occurences has been found.
223
224 Seek the file pointer to the next byte of where we found the pattern or
225 seek back to the initial position when the iterator is exhausted.
226 """
227 if chunk_size < len(pattern):
228 chunk_hex = format_hex(chunk_size)
229 raise ValueError(
230 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!"
231 )
232
233 initial_position = file.tell()
234
235 compensation = len(pattern) - 1
236 try:
237 while True:
238 current_position = file.tell()
239
240 # Prepend the padding from the last chunk, to make sure that we find the pattern,
241 # even if it straddles the chunk boundary.
242 data = file.read(chunk_size)
243 if data == b"":
244 # We've reached the end of the stream.
245 return
246
247 if len(data) < len(pattern):
248 # The length that we read from the file is the same
249 # length or less than as the pattern we're looking
250 # for, and we didn't find the pattern in there.
251 return
252
253 marker = data.find(pattern)
254 while marker != -1:
255 found_pos = current_position + marker
256 # Reset the file pointer so that calling code cannot
257 # depend on the side-effect of this iterator advancing
258 # it.
259 file.seek(initial_position)
260 yield found_pos
261 # We want to seek past the found position to the next byte,
262 # so we can call find_first again without extra seek
263 # This might seek past the actual end of the file
264 file.seek(found_pos + len(pattern))
265 marker = data.find(pattern, marker + len(pattern))
266
267 file.seek(-compensation, os.SEEK_CUR)
268 finally:
269 file.seek(initial_position)
270
271
272class RandomReader(Protocol):
273 # File implements this interface
274
275 @overload
276 def read(self) -> bytes: ...
277 @overload
278 def read(self, n: int, /) -> bytes: ...
279 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ...
280
281
282def iterate_file(
283 file: RandomReader,
284 start_offset: int,
285 size: int,
286 # default buffer size in shutil for unix based systems
287 buffer_size: int = DEFAULT_BUFSIZE,
288) -> Iterator[bytes]:
289 if buffer_size <= 0:
290 raise ValueError(
291 "The file needs to be read until a specific size, so buffer_size must be greater than 0"
292 )
293
294 read_bytes = 0
295 file.seek(start_offset)
296 file_read = file.read
297 while read_bytes < size:
298 remaining = size - read_bytes
299 buffer_size = min(remaining, buffer_size)
300 read_bytes += buffer_size
301 data = file_read(buffer_size)
302
303 if data == b"":
304 # We've reached the end of the stream.
305 break
306
307 yield data
308
309
310def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int):
311 """Extract part of a file."""
312 carve_path.parent.mkdir(parents=True, exist_ok=True)
313
314 with carve_path.open("xb") as f:
315 for data in iterate_file(file, start_offset, size):
316 f.write(data)
317
318
319def stream_scan(scanner, file: File):
320 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode."""
321 scanner.scan(file, DEFAULT_BUFSIZE)
322
323
324class StructParser:
325 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically."""
326
327 def __init__(self, definitions: str):
328 self._definitions = definitions
329 self.__cparser_le = None
330 self.__cparser_be = None
331
332 @property
333 def cparser_le(self):
334 if self.__cparser_le is None:
335 # Default endianness is little
336 self.__cparser_le = cstruct()
337 self.__cparser_le.load(self._definitions)
338 return self.__cparser_le
339
340 @property
341 def cparser_be(self):
342 if self.__cparser_be is None:
343 self.__cparser_be = cstruct(endian=">")
344 self.__cparser_be.load(self._definitions)
345 return self.__cparser_be
346
347 def parse(
348 self,
349 struct_name: str,
350 file: Union[File, bytes],
351 endian: Endian,
352 ):
353 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be
354 struct_parser = getattr(cparser, struct_name)
355 return struct_parser(file)
356
357
358def get_endian(file: File, big_endian_magic: int) -> Endian:
359 """Read a four bytes magic and derive endianness from it.
360
361 It compares the read data with the big endian magic and then seeks back
362 the amount of read bytes.
363 """
364 if big_endian_magic > 0xFF_FF_FF_FF:
365 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
366 magic_bytes = file.read(4)
367 file.seek(-len(magic_bytes), io.SEEK_CUR)
368 magic = convert_int32(magic_bytes, Endian.BIG)
369 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
370
371
372def get_endian_short(file: File, big_endian_magic: int) -> Endian:
373 """Read a two bytes magic and derive endianness from it.
374
375 It compares the read data with the big endian magic and then seeks back
376 the amount of read bytes.
377 """
378 if big_endian_magic > 0xFF_FF:
379 raise ValueError("big_endian_magic is larger than a 16 bit integer.")
380 magic_bytes = file.read(2)
381 file.seek(-len(magic_bytes), io.SEEK_CUR)
382 magic = convert_int16(magic_bytes, Endian.BIG)
383 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
384
385
386def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian:
387 """Read a four bytes magic and derive endianness from it.
388
389 It compares the read data with the big endian magic. It reads
390 four bytes and seeks back after that.
391 """
392 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics):
393 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
394 magic_bytes = file.read(4)
395 file.seek(-len(magic_bytes), io.SEEK_CUR)
396 magic = convert_int32(magic_bytes, Endian.BIG)
397 return (
398 Endian.BIG
399 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics)
400 else Endian.LITTLE
401 )
402
403
404def read_until_past(file: File, pattern: bytes):
405 """Read until the bytes are not 0x00 or 0xff."""
406 while True:
407 next_byte = file.read(1)
408 if next_byte == b"":
409 # We've hit the EoF
410 return file.tell()
411 if next_byte not in pattern:
412 return file.tell() - 1
413
414
415def chop_root(path: Path):
416 """Make absolute paths relative by chopping off the root."""
417 if not path.is_absolute():
418 return path
419
420 relative_parts = list(path.parts[1:])
421 return Path("/".join(relative_parts))
422
423
424def make_lost_and_found_path(path: Path) -> Path:
425 """Make a human readable, safe path."""
426 dir_path = path.parent
427
428 # . and .. would not be a valid filename, but they would lead to confusion
429 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name)
430 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest()
431
432 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python
433 dir_slug = str(dir_path)
434 dir_slug = unicodedata.normalize("NFKD", dir_slug)
435 dir_slug = dir_slug.encode("ascii", "ignore").lower()
436 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-")
437 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode()
438
439 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}")
440
441
442class _FSPath:
443 def __init__(self, *, root: Path, path: Path) -> None:
444 self.root = root
445 self.relative_path = chop_root(path)
446 absolute_path = root / self.relative_path
447 self.is_safe = is_safe_path(root, absolute_path)
448
449 if self.is_safe:
450 self.safe_relative_path = self.relative_path
451 self.absolute_path = absolute_path
452 else:
453 self.safe_relative_path = make_lost_and_found_path(path)
454 self.absolute_path = root / self.safe_relative_path
455 assert is_safe_path(root, self.absolute_path)
456
457
458class _FSLink:
459 def __init__(self, *, root: Path, src: Path, dst: Path) -> None:
460 self.dst = _FSPath(root=root, path=dst)
461 self.src = _FSPath(root=root, path=src)
462 self.is_safe = self.dst.is_safe and self.src.is_safe
463
464 def format_report(
465 self, description, resolution="Skipped."
466 ) -> LinkExtractionProblem:
467 return LinkExtractionProblem(
468 problem=description,
469 resolution=resolution,
470 path=str(self.dst.relative_path),
471 link_path=str(self.src.relative_path),
472 )
473
474
475class FileSystem:
476 """Restricts file system operations to a directory.
477
478 Path traversal violations are collected as a list of :ExtractionProblem:-s
479 and not reported immediately - violating operations looks like successful for the caller.
480
481 All input paths are interpreted as relative to the root directory.
482 Absolute paths are converted to relative paths by dropping the root /.
483 There is one exception to this universal base: symlink targets,
484 which are relative to the directory containing the symbolic link, because
485 this is how symlinks work.
486 """
487
488 problems: list[Report]
489
490 def __init__(self, root: Path):
491 self.root = root.resolve()
492 self.problems = []
493
494 def record_problem(self, problem: ExtractionProblem):
495 self.problems.append(problem)
496 problem.log_with(logger)
497
498 @functools.cached_property
499 def has_root_permissions(self):
500 return os.geteuid() == 0
501
502 def _fs_path(self, path: Path) -> _FSPath:
503 return _FSPath(root=self.root, path=path)
504
505 def _ensure_parent_dir(self, path: Path):
506 path.parent.mkdir(parents=True, exist_ok=True)
507
508 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path:
509 fs_path = self._fs_path(path)
510
511 if not fs_path.is_safe:
512 report = PathTraversalProblem(
513 path=str(fs_path.relative_path),
514 extraction_path=str(fs_path.safe_relative_path),
515 problem=f"Potential path traversal through {path_use_description}",
516 resolution="Redirected.",
517 )
518 self.record_problem(report)
519
520 return fs_path.absolute_path
521
522 def write_bytes(self, path: Path, content: bytes):
523 logger.debug("creating file", file_path=path, _verbosity=3)
524 safe_path = self._get_extraction_path(path, "write_bytes")
525
526 self._ensure_parent_dir(safe_path)
527 safe_path.write_bytes(content)
528
529 def write_chunks(self, path: Path, chunks: Iterable[bytes]):
530 logger.debug("creating file", file_path=path, _verbosity=3)
531 safe_path = self._get_extraction_path(path, "write_chunks")
532
533 self._ensure_parent_dir(safe_path)
534 with safe_path.open("wb") as f:
535 for chunk in chunks:
536 f.write(chunk)
537
538 def carve(self, path: Path, file: File, start_offset: int, size: int):
539 logger.debug("carving file", path=path, _verbosity=3)
540 safe_path = self._get_extraction_path(path, "carve")
541
542 self._ensure_parent_dir(safe_path)
543 carve(safe_path, file, start_offset, size)
544
545 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False):
546 logger.debug("creating directory", dir_path=path, _verbosity=3)
547 safe_path = self._get_extraction_path(path, "mkdir")
548
549 safe_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
550
551 def mkfifo(self, path: Path, mode=0o666):
552 logger.debug("creating fifo", path=path, _verbosity=3)
553 safe_path = self._get_extraction_path(path, "mkfifo")
554
555 self._ensure_parent_dir(safe_path)
556 os.mkfifo(safe_path, mode=mode)
557
558 def mknod(self, path: Path, mode=0o600, device=0):
559 logger.debug("creating special file", special_path=path, _verbosity=3)
560 safe_path = self._get_extraction_path(path, "mknod")
561
562 if self.has_root_permissions:
563 self._ensure_parent_dir(safe_path)
564 os.mknod(safe_path, mode=mode, device=device)
565 else:
566 problem = SpecialFileExtractionProblem(
567 problem="Root privileges are required to create block and char devices.",
568 resolution="Skipped.",
569 path=str(path),
570 mode=mode,
571 device=device,
572 )
573 self.record_problem(problem)
574
575 def _get_checked_link(self, src: Path, dst: Path) -> Optional[_FSLink]:
576 link = _FSLink(root=self.root, src=src, dst=dst)
577 if link.is_safe:
578 return link
579
580 self.record_problem(link.format_report("Potential path traversal through link"))
581 return None
582
583 def _path_to_root(self, from_dir: Path) -> Path:
584 # This version does not look at the existing symlinks, so while it looks cleaner it is also
585 # somewhat less precise:
586 #
587 # os.path.relpath(self.root, start=self.root / chop_root(from_dir))
588 #
589 # In contrast, the below version looks like a kludge, but using .resolve() actually
590 # calculates the correct path in more cases, even if it can still give a bad result due
591 # to ordering of symlink creation and resolve defaulting to non-strict checking.
592 # Calculation unfortunately might fall back to the potentially wrong string interpretation,
593 # which is the same as os.path.relpath, sharing the same failure case.
594 # Ultimately we can not easily catch all symlink based path traversals here, so there
595 # still remains work for `unblob.extractor.fix_symlink()`
596 #
597 absolute_from_dir = (self.root / chop_root(from_dir)).resolve()
598 ups = len(absolute_from_dir.parts) - len(self.root.parts)
599 return Path("/".join(["."] + [".."] * ups))
600
601 def create_symlink(self, src: Path, dst: Path):
602 """Create a symlink dst with the link/content/target src."""
603 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3)
604
605 if src.is_absolute():
606 # convert absolute paths to dst relative paths
607 # these would point to the same path if self.root would be the real root "/"
608 # but they are relocatable
609 src = self._path_to_root(dst.parent) / chop_root(src)
610
611 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst)
612
613 if safe_link:
614 dst = safe_link.dst.absolute_path
615 self._ensure_parent_dir(dst)
616 dst.symlink_to(src)
617
618 def create_hardlink(self, src: Path, dst: Path):
619 """Create a new hardlink dst to the existing file src."""
620 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3)
621 safe_link = self._get_checked_link(src=src, dst=dst)
622
623 if safe_link:
624 try:
625 src = safe_link.src.absolute_path
626 dst = safe_link.dst.absolute_path
627 self._ensure_parent_dir(dst)
628 os.link(src, dst)
629 # FIXME: from python 3.10 change the above to
630 # dst.hardlink_to(src)
631 # so as to make it consistent with create_symlink
632 # (see Path.link_to vs Path.hardlink_to parameter order mess up)
633 except FileNotFoundError:
634 self.record_problem(
635 safe_link.format_report("Hard link target does not exist.")
636 )
637 except PermissionError:
638 not_enough_privileges = (
639 "Not enough privileges to create hardlink to block/char device."
640 )
641 self.record_problem(safe_link.format_report(not_enough_privileges))
642
643 def open(
644 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+"
645 ) -> io.BufferedRandom:
646 """Create/open binary file for random access read-writing.
647
648 There is no intention in supporting anything other than binary files opened for random access.
649 """
650 logger.debug("create/open binary file for writing", file_path=path)
651 safe_path = self._get_extraction_path(path, "open")
652
653 self._ensure_parent_dir(safe_path)
654 return safe_path.open(mode)
655
656 def unlink(self, path):
657 """Delete file within extraction path."""
658 logger.debug("unlink file", file_path=path, _verbosity=3)
659 safe_path = self._get_extraction_path(path, "unlink")
660
661 safe_path.unlink(missing_ok=True)