Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/file_utils.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import enum
2import functools
3import hashlib
4import io
5import math
6import mmap
7import os
8import re
9import shutil
10import struct
11import sys
12import unicodedata
13from collections.abc import Iterable, Iterator
14from pathlib import Path
15from typing import Literal, Protocol, overload
17from dissect.cstruct import cstruct
18from structlog import get_logger
20from .logging import format_hex
21from .report import (
22 ExtendedAttributeExtractionProblem,
23 ExtractionProblem,
24 LinkExtractionProblem,
25 PathTraversalProblem,
26 Report,
27 SpecialFileExtractionProblem,
28)
30DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore
31logger = get_logger()
34def is_safe_path(basedir: Path, path: Path) -> bool:
35 try:
36 basedir.joinpath(path).resolve().relative_to(basedir.resolve())
37 except ValueError:
38 return False
39 return True
42class SeekError(ValueError):
43 """Specific ValueError for File.seek."""
46class File(mmap.mmap):
47 access: int
49 @classmethod
50 def from_bytes(cls, content: bytes | bytearray):
51 if not content:
52 raise ValueError("Can't create File from empty bytes.")
53 m = cls(-1, len(content))
54 m.write(content)
55 m.seek(0)
56 m.access = mmap.ACCESS_WRITE
57 m.madvise(mmap.MADV_SEQUENTIAL)
58 return m
60 @classmethod
61 def from_path(cls, path: Path, access=mmap.ACCESS_READ):
62 """Create File.
64 Needs a valid non-empty file,
65 raises ValueError on empty files.
66 """
67 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb"
68 with path.open(mode) as base_file:
69 m = cls(base_file.fileno(), 0, access=access)
70 m.access = access
71 m.madvise(mmap.MADV_SEQUENTIAL)
72 return m
74 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride]
75 try:
76 super().seek(pos, whence) # pyright: ignore[reportArgumentType]
77 except ValueError as e:
78 raise SeekError from e
79 return self.tell()
81 def size(self) -> int:
82 return len(self)
84 def __enter__(self):
85 return self
87 def __exit__(self, *args):
88 self.close()
90 def readable(self) -> bool:
91 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY)
93 def writable(self) -> bool:
94 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY)
96 if sys.version_info < (3, 13):
98 def seekable(self) -> Literal[True]:
99 return True # Memory-mapped files are always seekable
102class OffsetFile:
103 def __init__(self, file: File, offset: int):
104 self._file = file
105 self._offset = offset
106 self._file.seek(offset)
108 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int:
109 if whence == os.SEEK_SET:
110 pos += self._offset
111 self._file.seek(pos, whence)
112 return self._file.tell() - self._offset
114 def read(self, n=None):
115 return self._file.read(n)
117 def tell(self):
118 return self._file.tell() - self._offset
121class InvalidInputFormat(Exception):
122 pass
125class Endian(enum.Enum):
126 LITTLE = "<"
127 BIG = ">"
130def iterbits(file: File) -> Iterator[int]:
131 """bit-wise reading of file in little-endian mode."""
132 while cur_bytes := file.read(DEFAULT_BUFSIZE):
133 for b in cur_bytes:
134 for i in range(7, -1, -1):
135 yield (b >> i) & 1
138def snull(content: bytes):
139 """Strip null bytes from the end of the string."""
140 return content.rstrip(b"\x00")
143def round_down(size: int, alignment: int):
144 """Round down size to the alignment boundary."""
145 return alignment * math.floor(size / alignment)
148def round_up(size: int, alignment: int):
149 """Round up size to the alignment boundary."""
150 return alignment * math.ceil(size / alignment)
153def convert_int8(value: bytes, endian: Endian) -> int:
154 """Convert 1 byte integer to a Python int."""
155 try:
156 return struct.unpack(f"{endian.value}B", value)[0]
157 except struct.error as exc:
158 raise InvalidInputFormat from exc
161def convert_int16(value: bytes, endian: Endian) -> int:
162 """Convert 2 byte integer to a Python int."""
163 try:
164 return struct.unpack(f"{endian.value}H", value)[0]
165 except struct.error as exc:
166 raise InvalidInputFormat from exc
169def convert_int32(value: bytes, endian: Endian) -> int:
170 """Convert 4 byte integer to a Python int."""
171 try:
172 return struct.unpack(f"{endian.value}I", value)[0]
173 except struct.error as exc:
174 raise InvalidInputFormat from exc
177def convert_int64(value: bytes, endian: Endian) -> int:
178 """Convert 8 byte integer to a Python int."""
179 try:
180 return struct.unpack(f"{endian.value}Q", value)[0]
181 except struct.error as exc:
182 raise InvalidInputFormat from exc
185def decode_int(value, base: int) -> int:
186 try:
187 return int(value, base)
188 except ValueError as exc:
189 raise InvalidInputFormat from exc
192def decode_multibyte_integer(data: bytes | bytearray) -> tuple[int, int]:
193 """Decode multi-bytes integer into integer size and integer value.
195 Multibyte integers of static length are stored in little endian byte order.
197 When smaller values are more likely than bigger values (for example file sizes),
198 multibyte integers are encoded in a variable-length representation:
199 - Numbers in the range [0, 127] are copied as is, and take one byte of space.
200 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte
201 representation have the highest (eighth) bit set.
202 """
203 value = 0
204 for size, byte in enumerate(data):
205 value |= (byte & 0x7F) << (size * 7)
206 if not byte & 0x80:
207 return (size + 1, value)
208 raise InvalidInputFormat("Multibyte integer decoding failed.")
211def iterate_patterns(
212 file: File, pattern: bytes, chunk_size: int = 0x1000
213) -> Iterator[int]:
214 """Iterate on the file searching for pattern until all occurences has been found.
216 Seek the file pointer to the next byte of where we found the pattern or
217 seek back to the initial position when the iterator is exhausted.
218 """
219 if chunk_size < len(pattern):
220 chunk_hex = format_hex(chunk_size)
221 raise ValueError(
222 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!"
223 )
225 initial_position = file.tell()
227 compensation = len(pattern) - 1
228 try:
229 while True:
230 current_position = file.tell()
232 # Prepend the padding from the last chunk, to make sure that we find the pattern,
233 # even if it straddles the chunk boundary.
234 data = file.read(chunk_size)
235 if data == b"":
236 # We've reached the end of the stream.
237 return
239 if len(data) < len(pattern):
240 # The length that we read from the file is the same
241 # length or less than as the pattern we're looking
242 # for, and we didn't find the pattern in there.
243 return
245 marker = data.find(pattern)
246 while marker != -1:
247 found_pos = current_position + marker
248 # Reset the file pointer so that calling code cannot
249 # depend on the side-effect of this iterator advancing
250 # it.
251 file.seek(initial_position)
252 yield found_pos
253 # We want to seek past the found position to the next byte,
254 # so we can call find_first again without extra seek
255 # This might seek past the actual end of the file
256 file.seek(found_pos + len(pattern))
257 marker = data.find(pattern, marker + len(pattern))
259 file.seek(-compensation, os.SEEK_CUR)
260 finally:
261 file.seek(initial_position)
264class RandomReader(Protocol):
265 # File implements this interface
267 @overload
268 def read(self) -> bytes: ...
269 @overload
270 def read(self, n: int, /) -> bytes: ...
271 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ...
274def iterate_file(
275 file: RandomReader,
276 start_offset: int,
277 size: int,
278 # default buffer size in shutil for unix based systems
279 buffer_size: int = DEFAULT_BUFSIZE,
280) -> Iterator[bytes]:
281 if buffer_size <= 0:
282 raise ValueError(
283 "The file needs to be read until a specific size, so buffer_size must be greater than 0"
284 )
286 read_bytes = 0
287 file.seek(start_offset)
288 file_read = file.read
289 while read_bytes < size:
290 remaining = size - read_bytes
291 buffer_size = min(remaining, buffer_size)
292 read_bytes += buffer_size
293 data = file_read(buffer_size)
295 if data == b"":
296 # We've reached the end of the stream.
297 break
299 yield data
302def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int):
303 """Extract part of a file."""
304 carve_path.parent.mkdir(parents=True, exist_ok=True)
306 with carve_path.open("xb") as f:
307 for data in iterate_file(file, start_offset, size):
308 f.write(data)
311def stream_scan(scanner, file: File):
312 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode."""
313 scanner.scan(file, DEFAULT_BUFSIZE)
316class StructParser:
317 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically."""
319 def __init__(self, definitions: str):
320 self._definitions = definitions
321 self.__cparser_le = None
322 self.__cparser_be = None
324 @property
325 def cparser_le(self):
326 if self.__cparser_le is None:
327 # Default endianness is little
328 self.__cparser_le = cstruct()
329 self.__cparser_le.load(self._definitions)
330 return self.__cparser_le
332 @property
333 def cparser_be(self):
334 if self.__cparser_be is None:
335 self.__cparser_be = cstruct(endian=">")
336 self.__cparser_be.load(self._definitions)
337 return self.__cparser_be
339 def parse(
340 self,
341 struct_name: str,
342 file: File | bytes,
343 endian: Endian,
344 ):
345 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be
346 struct_parser = getattr(cparser, struct_name)
347 return struct_parser(file)
350def get_endian(file: File, big_endian_magic: int) -> Endian:
351 """Read a four bytes magic and derive endianness from it.
353 It compares the read data with the big endian magic and then seeks back
354 the amount of read bytes.
355 """
356 if big_endian_magic > 0xFF_FF_FF_FF:
357 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
358 magic_bytes = file.read(4)
359 file.seek(-len(magic_bytes), io.SEEK_CUR)
360 magic = convert_int32(magic_bytes, Endian.BIG)
361 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
364def get_endian_short(file: File, big_endian_magic: int) -> Endian:
365 """Read a two bytes magic and derive endianness from it.
367 It compares the read data with the big endian magic and then seeks back
368 the amount of read bytes.
369 """
370 if big_endian_magic > 0xFF_FF:
371 raise ValueError("big_endian_magic is larger than a 16 bit integer.")
372 magic_bytes = file.read(2)
373 file.seek(-len(magic_bytes), io.SEEK_CUR)
374 magic = convert_int16(magic_bytes, Endian.BIG)
375 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE
378def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian:
379 """Read a four bytes magic and derive endianness from it.
381 It compares the read data with the big endian magic. It reads
382 four bytes and seeks back after that.
383 """
384 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics):
385 raise ValueError("big_endian_magic is larger than a 32 bit integer.")
386 magic_bytes = file.read(4)
387 file.seek(-len(magic_bytes), io.SEEK_CUR)
388 magic = convert_int32(magic_bytes, Endian.BIG)
389 return (
390 Endian.BIG
391 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics)
392 else Endian.LITTLE
393 )
396def read_until_past(file: File, pattern: bytes):
397 """Read until the bytes are not 0x00 or 0xff."""
398 while True:
399 next_byte = file.read(1)
400 if next_byte == b"":
401 # We've hit the EoF
402 return file.tell()
403 if next_byte not in pattern:
404 return file.tell() - 1
407def chop_root(path: Path):
408 """Make absolute paths relative by chopping off the root."""
409 if not path.is_absolute():
410 return path
412 relative_parts = list(path.parts[1:])
413 return Path("/".join(relative_parts))
416def make_lost_and_found_path(path: Path) -> Path:
417 """Make a human readable, safe path."""
418 dir_path = path.parent
420 # . and .. would not be a valid filename, but they would lead to confusion
421 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name)
422 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest()
424 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python
425 dir_slug = str(dir_path)
426 dir_slug = unicodedata.normalize("NFKD", dir_slug)
427 dir_slug = dir_slug.encode("ascii", "ignore").lower()
428 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-")
429 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode()
431 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}")
434class _FSPath:
435 def __init__(self, *, root: Path, path: Path) -> None:
436 self.root = root
437 self.relative_path = chop_root(path)
438 absolute_path = root / self.relative_path
439 self.is_safe = is_safe_path(root, absolute_path)
441 if self.is_safe:
442 self.safe_relative_path = self.relative_path
443 self.absolute_path = absolute_path
444 else:
445 self.safe_relative_path = make_lost_and_found_path(path)
446 self.absolute_path = root / self.safe_relative_path
447 assert is_safe_path(root, self.absolute_path)
450class _FSLink:
451 def __init__(self, *, root: Path, src: Path, dst: Path) -> None:
452 self.dst = _FSPath(root=root, path=dst)
453 self.src = _FSPath(root=root, path=src)
454 self.is_safe = self.dst.is_safe and self.src.is_safe
456 def format_report(
457 self, description, resolution="Skipped."
458 ) -> LinkExtractionProblem:
459 return LinkExtractionProblem(
460 problem=description,
461 resolution=resolution,
462 path=str(self.dst.relative_path),
463 link_path=str(self.src.relative_path),
464 )
467class FileSystem:
468 """Restricts file system operations to a directory.
470 Path traversal violations are collected as a list of :ExtractionProblem:-s
471 and not reported immediately - violating operations looks like successful for the caller.
473 All input paths are interpreted as relative to the root directory.
474 Absolute paths are converted to relative paths by dropping the root /.
475 There is one exception to this universal base: symlink targets,
476 which are relative to the directory containing the symbolic link, because
477 this is how symlinks work.
478 """
480 problems: list[Report]
482 def __init__(self, root: Path):
483 self.root = root.resolve()
484 self.problems = []
486 def record_problem(self, problem: ExtractionProblem):
487 self.problems.append(problem)
488 problem.log_with(logger)
490 @functools.cached_property
491 def has_root_permissions(self):
492 return os.geteuid() == 0
494 def _fs_path(self, path: Path) -> _FSPath:
495 return _FSPath(root=self.root, path=path)
497 def _ensure_parent_dir(self, path: Path):
498 path.parent.mkdir(parents=True, exist_ok=True)
500 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path:
501 fs_path = self._fs_path(path)
503 if not fs_path.is_safe:
504 report = PathTraversalProblem(
505 path=str(fs_path.relative_path),
506 extraction_path=str(fs_path.safe_relative_path),
507 problem=f"Potential path traversal through {path_use_description}",
508 resolution="Redirected.",
509 )
510 self.record_problem(report)
512 return fs_path.absolute_path
514 def write_bytes(self, path: Path, content: bytes):
515 logger.debug("creating file", file_path=path, _verbosity=3)
516 safe_path = self._get_extraction_path(path, "write_bytes")
518 self._ensure_parent_dir(safe_path)
519 safe_path.write_bytes(content)
521 def write_chunks(self, path: Path, chunks: Iterable[bytes]):
522 logger.debug("creating file", file_path=path, _verbosity=3)
523 safe_path = self._get_extraction_path(path, "write_chunks")
525 self._ensure_parent_dir(safe_path)
526 with safe_path.open("wb") as f:
527 for chunk in chunks:
528 f.write(chunk)
530 def carve(self, path: Path, file: File, start_offset: int, size: int):
531 logger.debug("carving file", path=path, _verbosity=3)
532 safe_path = self._get_extraction_path(path, "carve")
534 self._ensure_parent_dir(safe_path)
535 carve(safe_path, file, start_offset, size)
537 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False):
538 logger.debug("creating directory", dir_path=path, _verbosity=3)
539 safe_path = self._get_extraction_path(path, "mkdir")
541 # Directories with restrictive permission bits (e.g. 0o000) immediately
542 # block creation of nested entries, so force owner rwx during extraction.
543 safe_mode = mode | 0o700
544 safe_path.mkdir(mode=safe_mode, parents=parents, exist_ok=exist_ok)
546 def mkfifo(self, path: Path, mode=0o666):
547 logger.debug("creating fifo", path=path, _verbosity=3)
548 safe_path = self._get_extraction_path(path, "mkfifo")
550 self._ensure_parent_dir(safe_path)
551 os.mkfifo(safe_path, mode=mode)
553 def mknod(self, path: Path, mode=0o600, device=0):
554 logger.debug("creating special file", special_path=path, _verbosity=3)
555 safe_path = self._get_extraction_path(path, "mknod")
557 if self.has_root_permissions:
558 self._ensure_parent_dir(safe_path)
559 os.mknod(safe_path, mode=mode, device=device)
560 else:
561 problem = SpecialFileExtractionProblem(
562 problem="Root privileges are required to create block and char devices.",
563 resolution="Skipped.",
564 path=str(path),
565 mode=mode,
566 device=device,
567 )
568 self.record_problem(problem)
570 def _get_checked_link(self, src: Path, dst: Path) -> _FSLink | None:
571 link = _FSLink(root=self.root, src=src, dst=dst)
572 if link.is_safe:
573 return link
575 self.record_problem(link.format_report("Potential path traversal through link"))
576 return None
578 def _path_to_root(self, from_dir: Path) -> Path:
579 # This version does not look at the existing symlinks, so while it looks cleaner it is also
580 # somewhat less precise:
581 #
582 # os.path.relpath(self.root, start=self.root / chop_root(from_dir))
583 #
584 # In contrast, the below version looks like a kludge, but using .resolve() actually
585 # calculates the correct path in more cases, even if it can still give a bad result due
586 # to ordering of symlink creation and resolve defaulting to non-strict checking.
587 # Calculation unfortunately might fall back to the potentially wrong string interpretation,
588 # which is the same as os.path.relpath, sharing the same failure case.
589 # Ultimately we can not easily catch all symlink based path traversals here, so there
590 # still remains work for `unblob.extractor.fix_symlink()`
591 #
592 absolute_from_dir = (self.root / chop_root(from_dir)).resolve()
593 ups = len(absolute_from_dir.parts) - len(self.root.parts)
594 return Path("/".join(["."] + [".."] * ups))
596 def create_symlink(self, src: Path, dst: Path):
597 """Create a symlink dst with the link/content/target src."""
598 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3)
600 if src.is_absolute():
601 # convert absolute paths to dst relative paths
602 # these would point to the same path if self.root would be the real root "/"
603 # but they are relocatable
604 src = self._path_to_root(dst.parent) / chop_root(src)
606 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst)
608 if safe_link:
609 dst = safe_link.dst.absolute_path
610 self._ensure_parent_dir(dst)
611 dst.symlink_to(src)
613 def create_hardlink(self, src: Path, dst: Path):
614 """Create a new hardlink dst to the existing file src."""
615 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3)
616 safe_link = self._get_checked_link(src=src, dst=dst)
618 if safe_link:
619 try:
620 src = safe_link.src.absolute_path
621 dst = safe_link.dst.absolute_path
622 self._ensure_parent_dir(dst)
623 os.link(src, dst)
624 # FIXME: from python 3.10 change the above to
625 # dst.hardlink_to(src)
626 # so as to make it consistent with create_symlink
627 # (see Path.link_to vs Path.hardlink_to parameter order mess up)
628 except FileNotFoundError:
629 self.record_problem(
630 safe_link.format_report("Hard link target does not exist.")
631 )
632 except PermissionError:
633 not_enough_privileges = (
634 "Not enough privileges to create hardlink to block/char device."
635 )
636 self.record_problem(safe_link.format_report(not_enough_privileges))
638 def open(
639 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+"
640 ) -> io.BufferedRandom:
641 """Create/open binary file for random access read-writing.
643 There is no intention in supporting anything other than binary files opened for random access.
644 """
645 logger.debug("create/open binary file for writing", file_path=path)
646 safe_path = self._get_extraction_path(path, "open")
648 self._ensure_parent_dir(safe_path)
649 return safe_path.open(mode)
651 def unlink(self, path):
652 """Delete file within extraction path."""
653 logger.debug("unlink file", file_path=path, _verbosity=3)
654 safe_path = self._get_extraction_path(path, "unlink")
656 safe_path.unlink(missing_ok=True)
658 def rmdir(self, path: Path):
659 """Remove an empty directory."""
660 logger.debug("removing directory", dir_path=path, _verbosity=3)
661 safe_path = self._get_extraction_path(path, "rmdir")
662 try:
663 safe_path.rmdir()
664 except FileNotFoundError:
665 self.record_problem(
666 ExtractionProblem(
667 problem=f"{safe_path} not found", resolution="Skipped"
668 )
669 )
671 def rename(self, src: Path, dst: Path):
672 """Rename a file or directory."""
673 logger.debug("renaming", src_path=src, dst_path=dst, _verbosity=3)
674 safe_src = self._get_extraction_path(src, "rename src")
675 safe_dst = self._get_extraction_path(dst, "rename dst")
676 self._ensure_parent_dir(safe_dst)
677 try:
678 safe_src.rename(safe_dst)
679 except FileNotFoundError:
680 self.record_problem(
681 ExtractionProblem(problem=f"{safe_dst} not found", resolution="Skipped")
682 )
684 def truncate(self, path: Path, size: int):
685 """Truncate a file to the specified size."""
686 logger.debug("Truncate file", dir_path=path, _verbosity=3)
687 safe_path = self._get_extraction_path(path, "truncate")
688 try:
689 os.truncate(safe_path, size)
690 except FileNotFoundError:
691 self.record_problem(
692 ExtractionProblem(
693 problem=f"{safe_path} not found", resolution="Skipped"
694 )
695 )
697 def set_xattr(self, path: Path, attribute: str, data: bytes):
698 """Set an extended attribute for a file."""
699 logger.debug("set extented attribute", dir_path=path, _verbosity=3)
700 safe_path = self._get_extraction_path(path, "set xattr")
701 if not hasattr(os, "setxattr"):
702 self.record_problem(
703 ExtendedAttributeExtractionProblem(
704 problem="Extended attributes are not supported on this platform, only available on linux",
705 resolution="Skipped",
706 path=str(safe_path),
707 attribute=attribute,
708 )
709 )
710 return
711 try:
712 os.setxattr(safe_path, attribute, data)
713 except PermissionError:
714 self.record_problem(
715 ExtendedAttributeExtractionProblem(
716 problem="Extended attributes are blocked by unblob sandbox",
717 resolution="Skipped",
718 path=str(safe_path),
719 attribute=attribute,
720 )
721 )
722 except OSError:
723 self.record_problem(
724 ExtendedAttributeExtractionProblem(
725 problem="This extended attribute is not supported on this filesystem",
726 resolution="Skipped",
727 path=str(safe_path),
728 attribute=attribute,
729 )
730 )
732 def remove_xattr(self, path: Path, attribute: str):
733 """Remove an extended attribute from a file."""
734 logger.debug("remove extented attribute", dir_path=path, _verbosity=3)
735 safe_path = self._get_extraction_path(path, "rm xattr")
736 if not hasattr(os, "removexattr"):
737 self.record_problem(
738 ExtendedAttributeExtractionProblem(
739 problem="Extended attributes are not supported on this platform, only available on linux",
740 resolution="Skipped",
741 path=str(safe_path),
742 attribute=attribute,
743 )
744 )
745 return
746 try:
747 os.removexattr(safe_path, attribute)
748 except PermissionError:
749 self.record_problem(
750 ExtendedAttributeExtractionProblem(
751 problem="Extended attributes are blocked by unblob sandbox",
752 resolution="Skipped",
753 path=str(safe_path),
754 attribute=attribute,
755 )
756 )
757 except FileNotFoundError:
758 self.record_problem(
759 ExtractionProblem(
760 problem=f"{safe_path} not found", resolution="Skipped"
761 )
762 )
764 def utime(self, path: Path, times: tuple[float, float] | tuple[int, int]):
765 """Set the access and modification times of a file."""
766 logger.debug("time attribution", dir_path=path, _verbosity=3)
767 safe_path = self._get_extraction_path(path, "utime")
768 try:
769 os.utime(safe_path, times)
770 except FileNotFoundError:
771 self.record_problem(
772 ExtractionProblem(
773 problem=f"{safe_path} not found", resolution="Skipped"
774 )
775 )
777 def chmod(self, path: Path, mode: int):
778 """Set the right bits of a file."""
779 logger.debug("change file mode bits")
780 safe_path = self._get_extraction_path(path, "chmod")
781 try:
782 Path.chmod(safe_path, mode)
783 except FileNotFoundError:
784 self.record_problem(
785 ExtractionProblem(
786 problem=f"{safe_path} not found", resolution="Skipped"
787 )
788 )