Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/file_utils.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import enum
2import functools
3import hashlib
4import io
5import math
6import mmap
7import os
8import re
9import shutil
10import struct
11import sys
12import unicodedata
13from collections.abc import Iterable, Iterator
14from pathlib import Path
15from typing import Literal, Protocol, overload
17from dissect.cstruct import cstruct
18from structlog import get_logger
20from .logging import format_hex
21from .report import (
22 ExtractionProblem,
23 LinkExtractionProblem,
24 PathTraversalProblem,
25 Report,
26 SpecialFileExtractionProblem,
27)
29DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore
30logger = get_logger()
33def is_safe_path(basedir: Path, path: Path) -> bool:
34 try:
35 basedir.joinpath(path).resolve().relative_to(basedir.resolve())
36 except ValueError:
37 return False
38 return True
41class SeekError(ValueError):
42 """Specific ValueError for File.seek."""
45class File(mmap.mmap):
46 access: int
48 @classmethod
49 def from_bytes(cls, content: bytes | bytearray):
50 if not content:
51 raise ValueError("Can't create File from empty bytes.")
52 m = cls(-1, len(content))
53 m.write(content)
54 m.seek(0)
55 m.access = mmap.ACCESS_WRITE
56 m.madvise(mmap.MADV_SEQUENTIAL)
57 return m
59 @classmethod
60 def from_path(cls, path: Path, access=mmap.ACCESS_READ):
61 """Create File.
63 Needs a valid non-empty file,
64 raises ValueError on empty files.
65 """
66 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb"
67 with path.open(mode) as base_file:
68 m = cls(base_file.fileno(), 0, access=access)
69 m.access = access
70 m.madvise(mmap.MADV_SEQUENTIAL)
71 return m
73 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride]
74 try:
75 super().seek(pos, whence) # pyright: ignore[reportArgumentType]
76 except ValueError as e:
77 raise SeekError from e
78 return self.tell()
80 def size(self) -> int:
81 return len(self)
83 def __enter__(self):
84 return self
86 def __exit__(self, *args):
87 self.close()
89 def readable(self) -> bool:
90 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY)
92 def writable(self) -> bool:
93 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY)
95 if sys.version_info < (3, 13):
97 def seekable(self) -> Literal[True]:
98 return True # Memory-mapped files are always seekable
101class OffsetFile:
102 def __init__(self, file: File, offset: int):
103 self._file = file
104 self._offset = offset
105 self._file.seek(offset)
107 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int:
108 if whence == os.SEEK_SET:
109 pos += self._offset
110 self._file.seek(pos, whence)
111 return self._file.tell() - self._offset
113 def read(self, n=None):
114 return self._file.read(n)
116 def tell(self):
117 return self._file.tell() - self._offset
120class InvalidInputFormat(Exception):
121 pass
124class Endian(enum.Enum):
125 LITTLE = "<"
126 BIG = ">"
129def iterbits(file: File) -> Iterator[int]:
130 """bit-wise reading of file in little-endian mode."""
131 while cur_bytes := file.read(DEFAULT_BUFSIZE):
132 for b in cur_bytes:
133 for i in range(7, -1, -1):
134 yield (b >> i) & 1
137def snull(content: bytes):
138 """Strip null bytes from the end of the string."""
139 return content.rstrip(b"\x00")
142def round_down(size: int, alignment: int):
143 """Round down size to the alignment boundary."""
144 return alignment * math.floor(size / alignment)
147def round_up(size: int, alignment: int):
148 """Round up size to the alignment boundary."""
149 return alignment * math.ceil(size / alignment)
152def convert_int8(value: bytes, endian: Endian) -> int:
153 """Convert 1 byte integer to a Python int."""
154 try:
155 return struct.unpack(f"{endian.value}B", value)[0]
156 except struct.error as exc:
157 raise InvalidInputFormat from exc
160def convert_int16(value: bytes, endian: Endian) -> int:
161 """Convert 2 byte integer to a Python int."""
162 try:
163 return struct.unpack(f"{endian.value}H", value)[0]
164 except struct.error as exc:
165 raise InvalidInputFormat from exc
168def convert_int32(value: bytes, endian: Endian) -> int:
169 """Convert 4 byte integer to a Python int."""
170 try:
171 return struct.unpack(f"{endian.value}I", value)[0]
172 except struct.error as exc:
173 raise InvalidInputFormat from exc
176def convert_int64(value: bytes, endian: Endian) -> int:
177 """Convert 8 byte integer to a Python int."""
178 try:
179 return struct.unpack(f"{endian.value}Q", value)[0]
180 except struct.error as exc:
181 raise InvalidInputFormat from exc
184def decode_int(value, base: int) -> int:
185 try:
186 return int(value, base)
187 except ValueError as exc:
188 raise InvalidInputFormat from exc
191def decode_multibyte_integer(data: bytes | bytearray) -> tuple[int, int]:
192 """Decode multi-bytes integer into integer size and integer value.
194 Multibyte integers of static length are stored in little endian byte order.
196 When smaller values are more likely than bigger values (for example file sizes),
197 multibyte integers are encoded in a variable-length representation:
198 - Numbers in the range [0, 127] are copied as is, and take one byte of space.
199 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte
200 representation have the highest (eighth) bit set.
201 """
202 value = 0
203 for size, byte in enumerate(data):
204 value |= (byte & 0x7F) << (size * 7)
205 if not byte & 0x80:
206 return (size + 1, value)
207 raise InvalidInputFormat("Multibyte integer decoding failed.")
210def iterate_patterns(
211 file: File, pattern: bytes, chunk_size: int = 0x1000
212) -> Iterator[int]:
213 """Iterate on the file searching for pattern until all occurences has been found.
215 Seek the file pointer to the next byte of where we found the pattern or
216 seek back to the initial position when the iterator is exhausted.
217 """
218 if chunk_size < len(pattern):
219 chunk_hex = format_hex(chunk_size)
220 raise ValueError(
221 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!"
222 )
224 initial_position = file.tell()
226 compensation = len(pattern) - 1
227 try:
228 while True:
229 current_position = file.tell()
231 # Prepend the padding from the last chunk, to make sure that we find the pattern,
232 # even if it straddles the chunk boundary.
233 data = file.read(chunk_size)
234 if data == b"":
235 # We've reached the end of the stream.
236 return
238 if len(data) < len(pattern):
239 # The length that we read from the file is the same
240 # length or less than as the pattern we're looking
241 # for, and we didn't find the pattern in there.
242 return
244 marker = data.find(pattern)
245 while marker != -1:
246 found_pos = current_position + marker
247 # Reset the file pointer so that calling code cannot
248 # depend on the side-effect of this iterator advancing
249 # it.
250 file.seek(initial_position)
251 yield found_pos
252 # We want to seek past the found position to the next byte,
253 # so we can call find_first again without extra seek
254 # This might seek past the actual end of the file
255 file.seek(found_pos + len(pattern))
256 marker = data.find(pattern, marker + len(pattern))
258 file.seek(-compensation, os.SEEK_CUR)
259 finally:
260 file.seek(initial_position)
263class RandomReader(Protocol):
264 # File implements this interface
266 @overload
267 def read(self) -> bytes: ...
268 @overload
269 def read(self, n: int, /) -> bytes: ...
270 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ...
273def iterate_file(
274 file: RandomReader,
275 start_offset: int,
276 size: int,
277 # default buffer size in shutil for unix based systems
278 buffer_size: int = DEFAULT_BUFSIZE,
279) -> Iterator[bytes]:
280 if buffer_size <= 0:
281 raise ValueError(
282 "The file needs to be read until a specific size, so buffer_size must be greater than 0"
283 )
285 read_bytes = 0
286 file.seek(start_offset)
287 file_read = file.read
288 while read_bytes < size:
289 remaining = size - read_bytes
290 buffer_size = min(remaining, buffer_size)
291 read_bytes += buffer_size
292 data = file_read(buffer_size)
294 if data == b"":
295 # We've reached the end of the stream.
296 break
298 yield data
301def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int):
302 """Extract part of a file."""
303 carve_path.parent.mkdir(parents=True, exist_ok=True)
305 with carve_path.open("xb") as f:
306 for data in iterate_file(file, start_offset, size):
307 f.write(data)
310def stream_scan(scanner, file: File):
311 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode."""
312 scanner.scan(file, DEFAULT_BUFSIZE)
315class StructParser:
316 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically."""
318 def __init__(self, definitions: str):
319 self._definitions = definitions
320 self.__cparser_le = None
321 self.__cparser_be = None
323 @property
324 def cparser_le(self):
325 if self.__cparser_le is None:
326 # Default endianness is little
327 self.__cparser_le = cstruct()
328 self.__cparser_le.load(self._definitions)
329 return self.__cparser_le
331 @property
332 def cparser_be(self):
333 if self.__cparser_be is None:
334 self.__cparser_be = cstruct(endian=">")
335 self.__cparser_be.load(self._definitions)
336 return self.__cparser_be
338 def parse(
339 self,
340 struct_name: str,
341 file: File | bytes,
342 endian: Endian,
343 ):
344 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be
345 struct_parser = getattr(cparser, struct_name)
346 return struct_parser(file)
349def get_endian(file: File, big_endian_magic: int) -> Endian:
350 """Read a four bytes magic and derive endianness from it.
352 It compares the read data with the big endian magic and then seeks back
353 the amount of read bytes.
354 """
355 if big_endian_magic > 0xFF_FF_FF_FF:
356 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
357 magic_bytes = file.read(4)
358 file.seek(-len(magic_bytes), io.SEEK_CUR)
359 magic = convert_int32(magic_bytes, Endian.BIG)
360 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
363def get_endian_short(file: File, big_endian_magic: int) -> Endian:
364 """Read a two bytes magic and derive endianness from it.
366 It compares the read data with the big endian magic and then seeks back
367 the amount of read bytes.
368 """
369 if big_endian_magic > 0xFF_FF:
370 raise ValueError("big_endian_magic is larger than a 16 bit integer.")
371 magic_bytes = file.read(2)
372 file.seek(-len(magic_bytes), io.SEEK_CUR)
373 magic = convert_int16(magic_bytes, Endian.BIG)
374 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
377def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian:
378 """Read a four bytes magic and derive endianness from it.
380 It compares the read data with the big endian magic. It reads
381 four bytes and seeks back after that.
382 """
383 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics):
384 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
385 magic_bytes = file.read(4)
386 file.seek(-len(magic_bytes), io.SEEK_CUR)
387 magic = convert_int32(magic_bytes, Endian.BIG)
388 return (
389 Endian.BIG
390 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics)
391 else Endian.LITTLE
392 )
395def read_until_past(file: File, pattern: bytes):
396 """Read until the bytes are not 0x00 or 0xff."""
397 while True:
398 next_byte = file.read(1)
399 if next_byte == b"":
400 # We've hit the EoF
401 return file.tell()
402 if next_byte not in pattern:
403 return file.tell() - 1
406def chop_root(path: Path):
407 """Make absolute paths relative by chopping off the root."""
408 if not path.is_absolute():
409 return path
411 relative_parts = list(path.parts[1:])
412 return Path("/".join(relative_parts))
415def make_lost_and_found_path(path: Path) -> Path:
416 """Make a human readable, safe path."""
417 dir_path = path.parent
419 # . and .. would not be a valid filename, but they would lead to confusion
420 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name)
421 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest()
423 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python
424 dir_slug = str(dir_path)
425 dir_slug = unicodedata.normalize("NFKD", dir_slug)
426 dir_slug = dir_slug.encode("ascii", "ignore").lower()
427 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-")
428 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode()
430 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}")
433class _FSPath:
434 def __init__(self, *, root: Path, path: Path) -> None:
435 self.root = root
436 self.relative_path = chop_root(path)
437 absolute_path = root / self.relative_path
438 self.is_safe = is_safe_path(root, absolute_path)
440 if self.is_safe:
441 self.safe_relative_path = self.relative_path
442 self.absolute_path = absolute_path
443 else:
444 self.safe_relative_path = make_lost_and_found_path(path)
445 self.absolute_path = root / self.safe_relative_path
446 assert is_safe_path(root, self.absolute_path)
449class _FSLink:
450 def __init__(self, *, root: Path, src: Path, dst: Path) -> None:
451 self.dst = _FSPath(root=root, path=dst)
452 self.src = _FSPath(root=root, path=src)
453 self.is_safe = self.dst.is_safe and self.src.is_safe
455 def format_report(
456 self, description, resolution="Skipped."
457 ) -> LinkExtractionProblem:
458 return LinkExtractionProblem(
459 problem=description,
460 resolution=resolution,
461 path=str(self.dst.relative_path),
462 link_path=str(self.src.relative_path),
463 )
466class FileSystem:
467 """Restricts file system operations to a directory.
469 Path traversal violations are collected as a list of :ExtractionProblem:-s
470 and not reported immediately - violating operations looks like successful for the caller.
472 All input paths are interpreted as relative to the root directory.
473 Absolute paths are converted to relative paths by dropping the root /.
474 There is one exception to this universal base: symlink targets,
475 which are relative to the directory containing the symbolic link, because
476 this is how symlinks work.
477 """
479 problems: list[Report]
481 def __init__(self, root: Path):
482 self.root = root.resolve()
483 self.problems = []
485 def record_problem(self, problem: ExtractionProblem):
486 self.problems.append(problem)
487 problem.log_with(logger)
489 @functools.cached_property
490 def has_root_permissions(self):
491 return os.geteuid() == 0
493 def _fs_path(self, path: Path) -> _FSPath:
494 return _FSPath(root=self.root, path=path)
496 def _ensure_parent_dir(self, path: Path):
497 path.parent.mkdir(parents=True, exist_ok=True)
499 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path:
500 fs_path = self._fs_path(path)
502 if not fs_path.is_safe:
503 report = PathTraversalProblem(
504 path=str(fs_path.relative_path),
505 extraction_path=str(fs_path.safe_relative_path),
506 problem=f"Potential path traversal through {path_use_description}",
507 resolution="Redirected.",
508 )
509 self.record_problem(report)
511 return fs_path.absolute_path
513 def write_bytes(self, path: Path, content: bytes):
514 logger.debug("creating file", file_path=path, _verbosity=3)
515 safe_path = self._get_extraction_path(path, "write_bytes")
517 self._ensure_parent_dir(safe_path)
518 safe_path.write_bytes(content)
520 def write_chunks(self, path: Path, chunks: Iterable[bytes]):
521 logger.debug("creating file", file_path=path, _verbosity=3)
522 safe_path = self._get_extraction_path(path, "write_chunks")
524 self._ensure_parent_dir(safe_path)
525 with safe_path.open("wb") as f:
526 for chunk in chunks:
527 f.write(chunk)
529 def carve(self, path: Path, file: File, start_offset: int, size: int):
530 logger.debug("carving file", path=path, _verbosity=3)
531 safe_path = self._get_extraction_path(path, "carve")
533 self._ensure_parent_dir(safe_path)
534 carve(safe_path, file, start_offset, size)
536 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False):
537 logger.debug("creating directory", dir_path=path, _verbosity=3)
538 safe_path = self._get_extraction_path(path, "mkdir")
540 # Directories with restrictive permission bits (e.g. 0o000) immediately
541 # block creation of nested entries, so force owner rwx during extraction.
542 safe_mode = mode | 0o700
543 safe_path.mkdir(mode=safe_mode, parents=parents, exist_ok=exist_ok)
545 def mkfifo(self, path: Path, mode=0o666):
546 logger.debug("creating fifo", path=path, _verbosity=3)
547 safe_path = self._get_extraction_path(path, "mkfifo")
549 self._ensure_parent_dir(safe_path)
550 os.mkfifo(safe_path, mode=mode)
552 def mknod(self, path: Path, mode=0o600, device=0):
553 logger.debug("creating special file", special_path=path, _verbosity=3)
554 safe_path = self._get_extraction_path(path, "mknod")
556 if self.has_root_permissions:
557 self._ensure_parent_dir(safe_path)
558 os.mknod(safe_path, mode=mode, device=device)
559 else:
560 problem = SpecialFileExtractionProblem(
561 problem="Root privileges are required to create block and char devices.",
562 resolution="Skipped.",
563 path=str(path),
564 mode=mode,
565 device=device,
566 )
567 self.record_problem(problem)
569 def _get_checked_link(self, src: Path, dst: Path) -> _FSLink | None:
570 link = _FSLink(root=self.root, src=src, dst=dst)
571 if link.is_safe:
572 return link
574 self.record_problem(link.format_report("Potential path traversal through link"))
575 return None
577 def _path_to_root(self, from_dir: Path) -> Path:
578 # This version does not look at the existing symlinks, so while it looks cleaner it is also
579 # somewhat less precise:
580 #
581 # os.path.relpath(self.root, start=self.root / chop_root(from_dir))
582 #
583 # In contrast, the below version looks like a kludge, but using .resolve() actually
584 # calculates the correct path in more cases, even if it can still give a bad result due
585 # to ordering of symlink creation and resolve defaulting to non-strict checking.
586 # Calculation unfortunately might fall back to the potentially wrong string interpretation,
587 # which is the same as os.path.relpath, sharing the same failure case.
588 # Ultimately we can not easily catch all symlink based path traversals here, so there
589 # still remains work for `unblob.extractor.fix_symlink()`
590 #
591 absolute_from_dir = (self.root / chop_root(from_dir)).resolve()
592 ups = len(absolute_from_dir.parts) - len(self.root.parts)
593 return Path("/".join(["."] + [".."] * ups))
595 def create_symlink(self, src: Path, dst: Path):
596 """Create a symlink dst with the link/content/target src."""
597 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3)
599 if src.is_absolute():
600 # convert absolute paths to dst relative paths
601 # these would point to the same path if self.root would be the real root "/"
602 # but they are relocatable
603 src = self._path_to_root(dst.parent) / chop_root(src)
605 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst)
607 if safe_link:
608 dst = safe_link.dst.absolute_path
609 self._ensure_parent_dir(dst)
610 dst.symlink_to(src)
612 def create_hardlink(self, src: Path, dst: Path):
613 """Create a new hardlink dst to the existing file src."""
614 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3)
615 safe_link = self._get_checked_link(src=src, dst=dst)
617 if safe_link:
618 try:
619 src = safe_link.src.absolute_path
620 dst = safe_link.dst.absolute_path
621 self._ensure_parent_dir(dst)
622 os.link(src, dst)
623 # FIXME: from python 3.10 change the above to
624 # dst.hardlink_to(src)
625 # so as to make it consistent with create_symlink
626 # (see Path.link_to vs Path.hardlink_to parameter order mess up)
627 except FileNotFoundError:
628 self.record_problem(
629 safe_link.format_report("Hard link target does not exist.")
630 )
631 except PermissionError:
632 not_enough_privileges = (
633 "Not enough privileges to create hardlink to block/char device."
634 )
635 self.record_problem(safe_link.format_report(not_enough_privileges))
637 def open(
638 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+"
639 ) -> io.BufferedRandom:
640 """Create/open binary file for random access read-writing.
642 There is no intention in supporting anything other than binary files opened for random access.
643 """
644 logger.debug("create/open binary file for writing", file_path=path)
645 safe_path = self._get_extraction_path(path, "open")
647 self._ensure_parent_dir(safe_path)
648 return safe_path.open(mode)
650 def unlink(self, path):
651 """Delete file within extraction path."""
652 logger.debug("unlink file", file_path=path, _verbosity=3)
653 safe_path = self._get_extraction_path(path, "unlink")
655 safe_path.unlink(missing_ok=True)