Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/file_utils.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

421 statements  

1import enum 

2import functools 

3import hashlib 

4import io 

5import math 

6import mmap 

7import os 

8import re 

9import shutil 

10import struct 

11import sys 

12import unicodedata 

13from collections.abc import Iterable, Iterator 

14from pathlib import Path 

15from typing import Literal, Protocol, overload 

16 

17from dissect.cstruct import cstruct 

18from structlog import get_logger 

19 

20from .logging import format_hex 

21from .report import ( 

22 ExtendedAttributeExtractionProblem, 

23 ExtractionProblem, 

24 LinkExtractionProblem, 

25 PathTraversalProblem, 

26 Report, 

27 SpecialFileExtractionProblem, 

28) 

29 

30DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore 

31logger = get_logger() 

32 

33 

34def is_safe_path(basedir: Path, path: Path) -> bool: 

35 try: 

36 basedir.joinpath(path).resolve().relative_to(basedir.resolve()) 

37 except ValueError: 

38 return False 

39 return True 

40 

41 

42class SeekError(ValueError): 

43 """Specific ValueError for File.seek.""" 

44 

45 

46class File(mmap.mmap): 

47 access: int 

48 

49 @classmethod 

50 def from_bytes(cls, content: bytes | bytearray): 

51 if not content: 

52 raise ValueError("Can't create File from empty bytes.") 

53 m = cls(-1, len(content)) 

54 m.write(content) 

55 m.seek(0) 

56 m.access = mmap.ACCESS_WRITE 

57 m.madvise(mmap.MADV_SEQUENTIAL) 

58 return m 

59 

60 @classmethod 

61 def from_path(cls, path: Path, access=mmap.ACCESS_READ): 

62 """Create File. 

63 

64 Needs a valid non-empty file, 

65 raises ValueError on empty files. 

66 """ 

67 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb" 

68 with path.open(mode) as base_file: 

69 m = cls(base_file.fileno(), 0, access=access) 

70 m.access = access 

71 m.madvise(mmap.MADV_SEQUENTIAL) 

72 return m 

73 

74 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride] 

75 try: 

76 super().seek(pos, whence) # pyright: ignore[reportArgumentType] 

77 except ValueError as e: 

78 raise SeekError from e 

79 return self.tell() 

80 

81 def size(self) -> int: 

82 return len(self) 

83 

84 def __enter__(self): 

85 return self 

86 

87 def __exit__(self, *args): 

88 self.close() 

89 

90 def readable(self) -> bool: 

91 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY) 

92 

93 def writable(self) -> bool: 

94 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY) 

95 

96 if sys.version_info < (3, 13): 

97 

98 def seekable(self) -> Literal[True]: 

99 return True # Memory-mapped files are always seekable 

100 

101 

102class OffsetFile: 

103 def __init__(self, file: File, offset: int): 

104 self._file = file 

105 self._offset = offset 

106 self._file.seek(offset) 

107 

108 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: 

109 if whence == os.SEEK_SET: 

110 pos += self._offset 

111 self._file.seek(pos, whence) 

112 return self._file.tell() - self._offset 

113 

114 def read(self, n=None): 

115 return self._file.read(n) 

116 

117 def tell(self): 

118 return self._file.tell() - self._offset 

119 

120 

121class InvalidInputFormat(Exception): 

122 pass 

123 

124 

125class Endian(enum.Enum): 

126 LITTLE = "<" 

127 BIG = ">" 

128 

129 

130def iterbits(file: File) -> Iterator[int]: 

131 """bit-wise reading of file in little-endian mode.""" 

132 while cur_bytes := file.read(DEFAULT_BUFSIZE): 

133 for b in cur_bytes: 

134 for i in range(7, -1, -1): 

135 yield (b >> i) & 1 

136 

137 

138def snull(content: bytes): 

139 """Strip null bytes from the end of the string.""" 

140 return content.rstrip(b"\x00") 

141 

142 

143def round_down(size: int, alignment: int): 

144 """Round down size to the alignment boundary.""" 

145 return alignment * math.floor(size / alignment) 

146 

147 

148def round_up(size: int, alignment: int): 

149 """Round up size to the alignment boundary.""" 

150 return alignment * math.ceil(size / alignment) 

151 

152 

153def convert_int8(value: bytes, endian: Endian) -> int: 

154 """Convert 1 byte integer to a Python int.""" 

155 try: 

156 return struct.unpack(f"{endian.value}B", value)[0] 

157 except struct.error as exc: 

158 raise InvalidInputFormat from exc 

159 

160 

161def convert_int16(value: bytes, endian: Endian) -> int: 

162 """Convert 2 byte integer to a Python int.""" 

163 try: 

164 return struct.unpack(f"{endian.value}H", value)[0] 

165 except struct.error as exc: 

166 raise InvalidInputFormat from exc 

167 

168 

169def convert_int32(value: bytes, endian: Endian) -> int: 

170 """Convert 4 byte integer to a Python int.""" 

171 try: 

172 return struct.unpack(f"{endian.value}I", value)[0] 

173 except struct.error as exc: 

174 raise InvalidInputFormat from exc 

175 

176 

177def convert_int64(value: bytes, endian: Endian) -> int: 

178 """Convert 8 byte integer to a Python int.""" 

179 try: 

180 return struct.unpack(f"{endian.value}Q", value)[0] 

181 except struct.error as exc: 

182 raise InvalidInputFormat from exc 

183 

184 

185def decode_int(value, base: int) -> int: 

186 try: 

187 return int(value, base) 

188 except ValueError as exc: 

189 raise InvalidInputFormat from exc 

190 

191 

192def decode_multibyte_integer(data: bytes | bytearray) -> tuple[int, int]: 

193 """Decode multi-bytes integer into integer size and integer value. 

194 

195 Multibyte integers of static length are stored in little endian byte order. 

196 

197 When smaller values are more likely than bigger values (for example file sizes), 

198 multibyte integers are encoded in a variable-length representation: 

199 - Numbers in the range [0, 127] are copied as is, and take one byte of space. 

200 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte 

201 representation have the highest (eighth) bit set. 

202 """ 

203 value = 0 

204 for size, byte in enumerate(data): 

205 value |= (byte & 0x7F) << (size * 7) 

206 if not byte & 0x80: 

207 return (size + 1, value) 

208 raise InvalidInputFormat("Multibyte integer decoding failed.") 

209 

210 

211def iterate_patterns( 

212 file: File, pattern: bytes, chunk_size: int = 0x1000 

213) -> Iterator[int]: 

214 """Iterate on the file searching for pattern until all occurences has been found. 

215 

216 Seek the file pointer to the next byte of where we found the pattern or 

217 seek back to the initial position when the iterator is exhausted. 

218 """ 

219 if chunk_size < len(pattern): 

220 chunk_hex = format_hex(chunk_size) 

221 raise ValueError( 

222 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!" 

223 ) 

224 

225 initial_position = file.tell() 

226 

227 compensation = len(pattern) - 1 

228 try: 

229 while True: 

230 current_position = file.tell() 

231 

232 # Prepend the padding from the last chunk, to make sure that we find the pattern, 

233 # even if it straddles the chunk boundary. 

234 data = file.read(chunk_size) 

235 if data == b"": 

236 # We've reached the end of the stream. 

237 return 

238 

239 if len(data) < len(pattern): 

240 # The length that we read from the file is the same 

241 # length or less than as the pattern we're looking 

242 # for, and we didn't find the pattern in there. 

243 return 

244 

245 marker = data.find(pattern) 

246 while marker != -1: 

247 found_pos = current_position + marker 

248 # Reset the file pointer so that calling code cannot 

249 # depend on the side-effect of this iterator advancing 

250 # it. 

251 file.seek(initial_position) 

252 yield found_pos 

253 # We want to seek past the found position to the next byte, 

254 # so we can call find_first again without extra seek 

255 # This might seek past the actual end of the file 

256 file.seek(found_pos + len(pattern)) 

257 marker = data.find(pattern, marker + len(pattern)) 

258 

259 file.seek(-compensation, os.SEEK_CUR) 

260 finally: 

261 file.seek(initial_position) 

262 

263 

264class RandomReader(Protocol): 

265 # File implements this interface 

266 

267 @overload 

268 def read(self) -> bytes: ... 

269 @overload 

270 def read(self, n: int, /) -> bytes: ... 

271 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ... 

272 

273 

274def iterate_file( 

275 file: RandomReader, 

276 start_offset: int, 

277 size: int, 

278 # default buffer size in shutil for unix based systems 

279 buffer_size: int = DEFAULT_BUFSIZE, 

280) -> Iterator[bytes]: 

281 if buffer_size <= 0: 

282 raise ValueError( 

283 "The file needs to be read until a specific size, so buffer_size must be greater than 0" 

284 ) 

285 

286 read_bytes = 0 

287 file.seek(start_offset) 

288 file_read = file.read 

289 while read_bytes < size: 

290 remaining = size - read_bytes 

291 buffer_size = min(remaining, buffer_size) 

292 read_bytes += buffer_size 

293 data = file_read(buffer_size) 

294 

295 if data == b"": 

296 # We've reached the end of the stream. 

297 break 

298 

299 yield data 

300 

301 

302def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int): 

303 """Extract part of a file.""" 

304 carve_path.parent.mkdir(parents=True, exist_ok=True) 

305 

306 with carve_path.open("xb") as f: 

307 for data in iterate_file(file, start_offset, size): 

308 f.write(data) 

309 

310 

311def stream_scan(scanner, file: File): 

312 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode.""" 

313 scanner.scan(file, DEFAULT_BUFSIZE) 

314 

315 

316class StructParser: 

317 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically.""" 

318 

319 def __init__(self, definitions: str): 

320 self._definitions = definitions 

321 self.__cparser_le = None 

322 self.__cparser_be = None 

323 

324 @property 

325 def cparser_le(self): 

326 if self.__cparser_le is None: 

327 # Default endianness is little 

328 self.__cparser_le = cstruct() 

329 self.__cparser_le.load(self._definitions) 

330 return self.__cparser_le 

331 

332 @property 

333 def cparser_be(self): 

334 if self.__cparser_be is None: 

335 self.__cparser_be = cstruct(endian=">") 

336 self.__cparser_be.load(self._definitions) 

337 return self.__cparser_be 

338 

339 def parse( 

340 self, 

341 struct_name: str, 

342 file: File | bytes, 

343 endian: Endian, 

344 ): 

345 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be 

346 struct_parser = getattr(cparser, struct_name) 

347 return struct_parser(file) 

348 

349 

350def get_endian(file: File, big_endian_magic: int) -> Endian: 

351 """Read a four bytes magic and derive endianness from it. 

352 

353 It compares the read data with the big endian magic and then seeks back 

354 the amount of read bytes. 

355 """ 

356 if big_endian_magic > 0xFF_FF_FF_FF: 

357 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

358 magic_bytes = file.read(4) 

359 file.seek(-len(magic_bytes), io.SEEK_CUR) 

360 magic = convert_int32(magic_bytes, Endian.BIG) 

361 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

362 

363 

364def get_endian_short(file: File, big_endian_magic: int) -> Endian: 

365 """Read a two bytes magic and derive endianness from it. 

366 

367 It compares the read data with the big endian magic and then seeks back 

368 the amount of read bytes. 

369 """ 

370 if big_endian_magic > 0xFF_FF: 

371 raise ValueError("big_endian_magic is larger than a 16 bit integer.") 

372 magic_bytes = file.read(2) 

373 file.seek(-len(magic_bytes), io.SEEK_CUR) 

374 magic = convert_int16(magic_bytes, Endian.BIG) 

375 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

376 

377 

378def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian: 

379 """Read a four bytes magic and derive endianness from it. 

380 

381 It compares the read data with the big endian magic. It reads 

382 four bytes and seeks back after that. 

383 """ 

384 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics): 

385 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

386 magic_bytes = file.read(4) 

387 file.seek(-len(magic_bytes), io.SEEK_CUR) 

388 magic = convert_int32(magic_bytes, Endian.BIG) 

389 return ( 

390 Endian.BIG 

391 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics) 

392 else Endian.LITTLE 

393 ) 

394 

395 

396def read_until_past(file: File, pattern: bytes): 

397 """Read until the bytes are not 0x00 or 0xff.""" 

398 while True: 

399 next_byte = file.read(1) 

400 if next_byte == b"": 

401 # We've hit the EoF 

402 return file.tell() 

403 if next_byte not in pattern: 

404 return file.tell() - 1 

405 

406 

407def chop_root(path: Path): 

408 """Make absolute paths relative by chopping off the root.""" 

409 if not path.is_absolute(): 

410 return path 

411 

412 relative_parts = list(path.parts[1:]) 

413 return Path("/".join(relative_parts)) 

414 

415 

416def make_lost_and_found_path(path: Path) -> Path: 

417 """Make a human readable, safe path.""" 

418 dir_path = path.parent 

419 

420 # . and .. would not be a valid filename, but they would lead to confusion 

421 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name) 

422 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest() 

423 

424 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python 

425 dir_slug = str(dir_path) 

426 dir_slug = unicodedata.normalize("NFKD", dir_slug) 

427 dir_slug = dir_slug.encode("ascii", "ignore").lower() 

428 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-") 

429 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode() 

430 

431 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}") 

432 

433 

434class _FSPath: 

435 def __init__(self, *, root: Path, path: Path) -> None: 

436 self.root = root 

437 self.relative_path = chop_root(path) 

438 absolute_path = root / self.relative_path 

439 self.is_safe = is_safe_path(root, absolute_path) 

440 

441 if self.is_safe: 

442 self.safe_relative_path = self.relative_path 

443 self.absolute_path = absolute_path 

444 else: 

445 self.safe_relative_path = make_lost_and_found_path(path) 

446 self.absolute_path = root / self.safe_relative_path 

447 assert is_safe_path(root, self.absolute_path) 

448 

449 

450class _FSLink: 

451 def __init__(self, *, root: Path, src: Path, dst: Path) -> None: 

452 self.dst = _FSPath(root=root, path=dst) 

453 self.src = _FSPath(root=root, path=src) 

454 self.is_safe = self.dst.is_safe and self.src.is_safe 

455 

456 def format_report( 

457 self, description, resolution="Skipped." 

458 ) -> LinkExtractionProblem: 

459 return LinkExtractionProblem( 

460 problem=description, 

461 resolution=resolution, 

462 path=str(self.dst.relative_path), 

463 link_path=str(self.src.relative_path), 

464 ) 

465 

466 

467class FileSystem: 

468 """Restricts file system operations to a directory. 

469 

470 Path traversal violations are collected as a list of :ExtractionProblem:-s 

471 and not reported immediately - violating operations looks like successful for the caller. 

472 

473 All input paths are interpreted as relative to the root directory. 

474 Absolute paths are converted to relative paths by dropping the root /. 

475 There is one exception to this universal base: symlink targets, 

476 which are relative to the directory containing the symbolic link, because 

477 this is how symlinks work. 

478 """ 

479 

480 problems: list[Report] 

481 

482 def __init__(self, root: Path): 

483 self.root = root.resolve() 

484 self.problems = [] 

485 

486 def record_problem(self, problem: ExtractionProblem): 

487 self.problems.append(problem) 

488 problem.log_with(logger) 

489 

490 @functools.cached_property 

491 def has_root_permissions(self): 

492 return os.geteuid() == 0 

493 

494 def _fs_path(self, path: Path) -> _FSPath: 

495 return _FSPath(root=self.root, path=path) 

496 

497 def _ensure_parent_dir(self, path: Path): 

498 path.parent.mkdir(parents=True, exist_ok=True) 

499 

500 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path: 

501 fs_path = self._fs_path(path) 

502 

503 if not fs_path.is_safe: 

504 report = PathTraversalProblem( 

505 path=str(fs_path.relative_path), 

506 extraction_path=str(fs_path.safe_relative_path), 

507 problem=f"Potential path traversal through {path_use_description}", 

508 resolution="Redirected.", 

509 ) 

510 self.record_problem(report) 

511 

512 return fs_path.absolute_path 

513 

514 def write_bytes(self, path: Path, content: bytes): 

515 logger.debug("creating file", file_path=path, _verbosity=3) 

516 safe_path = self._get_extraction_path(path, "write_bytes") 

517 

518 self._ensure_parent_dir(safe_path) 

519 safe_path.write_bytes(content) 

520 

521 def write_chunks(self, path: Path, chunks: Iterable[bytes]): 

522 logger.debug("creating file", file_path=path, _verbosity=3) 

523 safe_path = self._get_extraction_path(path, "write_chunks") 

524 

525 self._ensure_parent_dir(safe_path) 

526 with safe_path.open("wb") as f: 

527 for chunk in chunks: 

528 f.write(chunk) 

529 

530 def carve(self, path: Path, file: File, start_offset: int, size: int): 

531 logger.debug("carving file", path=path, _verbosity=3) 

532 safe_path = self._get_extraction_path(path, "carve") 

533 

534 self._ensure_parent_dir(safe_path) 

535 carve(safe_path, file, start_offset, size) 

536 

537 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False): 

538 logger.debug("creating directory", dir_path=path, _verbosity=3) 

539 safe_path = self._get_extraction_path(path, "mkdir") 

540 

541 # Directories with restrictive permission bits (e.g. 0o000) immediately 

542 # block creation of nested entries, so force owner rwx during extraction. 

543 safe_mode = mode | 0o700 

544 safe_path.mkdir(mode=safe_mode, parents=parents, exist_ok=exist_ok) 

545 

546 def mkfifo(self, path: Path, mode=0o666): 

547 logger.debug("creating fifo", path=path, _verbosity=3) 

548 safe_path = self._get_extraction_path(path, "mkfifo") 

549 

550 self._ensure_parent_dir(safe_path) 

551 os.mkfifo(safe_path, mode=mode) 

552 

553 def mknod(self, path: Path, mode=0o600, device=0): 

554 logger.debug("creating special file", special_path=path, _verbosity=3) 

555 safe_path = self._get_extraction_path(path, "mknod") 

556 

557 if self.has_root_permissions: 

558 self._ensure_parent_dir(safe_path) 

559 os.mknod(safe_path, mode=mode, device=device) 

560 else: 

561 problem = SpecialFileExtractionProblem( 

562 problem="Root privileges are required to create block and char devices.", 

563 resolution="Skipped.", 

564 path=str(path), 

565 mode=mode, 

566 device=device, 

567 ) 

568 self.record_problem(problem) 

569 

570 def _get_checked_link(self, src: Path, dst: Path) -> _FSLink | None: 

571 link = _FSLink(root=self.root, src=src, dst=dst) 

572 if link.is_safe: 

573 return link 

574 

575 self.record_problem(link.format_report("Potential path traversal through link")) 

576 return None 

577 

578 def _path_to_root(self, from_dir: Path) -> Path: 

579 # This version does not look at the existing symlinks, so while it looks cleaner it is also 

580 # somewhat less precise: 

581 # 

582 # os.path.relpath(self.root, start=self.root / chop_root(from_dir)) 

583 # 

584 # In contrast, the below version looks like a kludge, but using .resolve() actually 

585 # calculates the correct path in more cases, even if it can still give a bad result due 

586 # to ordering of symlink creation and resolve defaulting to non-strict checking. 

587 # Calculation unfortunately might fall back to the potentially wrong string interpretation, 

588 # which is the same as os.path.relpath, sharing the same failure case. 

589 # Ultimately we can not easily catch all symlink based path traversals here, so there 

590 # still remains work for `unblob.extractor.fix_symlink()` 

591 # 

592 absolute_from_dir = (self.root / chop_root(from_dir)).resolve() 

593 ups = len(absolute_from_dir.parts) - len(self.root.parts) 

594 return Path("/".join(["."] + [".."] * ups)) 

595 

596 def create_symlink(self, src: Path, dst: Path): 

597 """Create a symlink dst with the link/content/target src.""" 

598 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3) 

599 

600 if src.is_absolute(): 

601 # convert absolute paths to dst relative paths 

602 # these would point to the same path if self.root would be the real root "/" 

603 # but they are relocatable 

604 src = self._path_to_root(dst.parent) / chop_root(src) 

605 

606 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst) 

607 

608 if safe_link: 

609 dst = safe_link.dst.absolute_path 

610 self._ensure_parent_dir(dst) 

611 dst.symlink_to(src) 

612 

613 def create_hardlink(self, src: Path, dst: Path): 

614 """Create a new hardlink dst to the existing file src.""" 

615 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3) 

616 safe_link = self._get_checked_link(src=src, dst=dst) 

617 

618 if safe_link: 

619 try: 

620 src = safe_link.src.absolute_path 

621 dst = safe_link.dst.absolute_path 

622 self._ensure_parent_dir(dst) 

623 os.link(src, dst) 

624 # FIXME: from python 3.10 change the above to 

625 # dst.hardlink_to(src) 

626 # so as to make it consistent with create_symlink 

627 # (see Path.link_to vs Path.hardlink_to parameter order mess up) 

628 except FileNotFoundError: 

629 self.record_problem( 

630 safe_link.format_report("Hard link target does not exist.") 

631 ) 

632 except PermissionError: 

633 not_enough_privileges = ( 

634 "Not enough privileges to create hardlink to block/char device." 

635 ) 

636 self.record_problem(safe_link.format_report(not_enough_privileges)) 

637 

638 def open( 

639 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+" 

640 ) -> io.BufferedRandom: 

641 """Create/open binary file for random access read-writing. 

642 

643 There is no intention in supporting anything other than binary files opened for random access. 

644 """ 

645 logger.debug("create/open binary file for writing", file_path=path) 

646 safe_path = self._get_extraction_path(path, "open") 

647 

648 self._ensure_parent_dir(safe_path) 

649 return safe_path.open(mode) 

650 

651 def unlink(self, path): 

652 """Delete file within extraction path.""" 

653 logger.debug("unlink file", file_path=path, _verbosity=3) 

654 safe_path = self._get_extraction_path(path, "unlink") 

655 

656 safe_path.unlink(missing_ok=True) 

657 

658 def rmdir(self, path: Path): 

659 """Remove an empty directory.""" 

660 logger.debug("removing directory", dir_path=path, _verbosity=3) 

661 safe_path = self._get_extraction_path(path, "rmdir") 

662 try: 

663 safe_path.rmdir() 

664 except FileNotFoundError: 

665 self.record_problem( 

666 ExtractionProblem( 

667 problem=f"{safe_path} not found", resolution="Skipped" 

668 ) 

669 ) 

670 

671 def rename(self, src: Path, dst: Path): 

672 """Rename a file or directory.""" 

673 logger.debug("renaming", src_path=src, dst_path=dst, _verbosity=3) 

674 safe_src = self._get_extraction_path(src, "rename src") 

675 safe_dst = self._get_extraction_path(dst, "rename dst") 

676 self._ensure_parent_dir(safe_dst) 

677 try: 

678 safe_src.rename(safe_dst) 

679 except FileNotFoundError: 

680 self.record_problem( 

681 ExtractionProblem(problem=f"{safe_dst} not found", resolution="Skipped") 

682 ) 

683 

684 def truncate(self, path: Path, size: int): 

685 """Truncate a file to the specified size.""" 

686 logger.debug("Truncate file", dir_path=path, _verbosity=3) 

687 safe_path = self._get_extraction_path(path, "truncate") 

688 try: 

689 os.truncate(safe_path, size) 

690 except FileNotFoundError: 

691 self.record_problem( 

692 ExtractionProblem( 

693 problem=f"{safe_path} not found", resolution="Skipped" 

694 ) 

695 ) 

696 

697 def set_xattr(self, path: Path, attribute: str, data: bytes): 

698 """Set an extended attribute for a file.""" 

699 logger.debug("set extented attribute", dir_path=path, _verbosity=3) 

700 safe_path = self._get_extraction_path(path, "set xattr") 

701 if not hasattr(os, "setxattr"): 

702 self.record_problem( 

703 ExtendedAttributeExtractionProblem( 

704 problem="Extended attributes are not supported on this platform, only available on linux", 

705 resolution="Skipped", 

706 path=str(safe_path), 

707 attribute=attribute, 

708 ) 

709 ) 

710 return 

711 try: 

712 os.setxattr(safe_path, attribute, data) 

713 except PermissionError: 

714 self.record_problem( 

715 ExtendedAttributeExtractionProblem( 

716 problem="Extended attributes are blocked by unblob sandbox", 

717 resolution="Skipped", 

718 path=str(safe_path), 

719 attribute=attribute, 

720 ) 

721 ) 

722 except OSError: 

723 self.record_problem( 

724 ExtendedAttributeExtractionProblem( 

725 problem="This extended attribute is not supported on this filesystem", 

726 resolution="Skipped", 

727 path=str(safe_path), 

728 attribute=attribute, 

729 ) 

730 ) 

731 

732 def remove_xattr(self, path: Path, attribute: str): 

733 """Remove an extended attribute from a file.""" 

734 logger.debug("remove extented attribute", dir_path=path, _verbosity=3) 

735 safe_path = self._get_extraction_path(path, "rm xattr") 

736 if not hasattr(os, "removexattr"): 

737 self.record_problem( 

738 ExtendedAttributeExtractionProblem( 

739 problem="Extended attributes are not supported on this platform, only available on linux", 

740 resolution="Skipped", 

741 path=str(safe_path), 

742 attribute=attribute, 

743 ) 

744 ) 

745 return 

746 try: 

747 os.removexattr(safe_path, attribute) 

748 except PermissionError: 

749 self.record_problem( 

750 ExtendedAttributeExtractionProblem( 

751 problem="Extended attributes are blocked by unblob sandbox", 

752 resolution="Skipped", 

753 path=str(safe_path), 

754 attribute=attribute, 

755 ) 

756 ) 

757 except FileNotFoundError: 

758 self.record_problem( 

759 ExtractionProblem( 

760 problem=f"{safe_path} not found", resolution="Skipped" 

761 ) 

762 ) 

763 

764 def utime(self, path: Path, times: tuple[float, float] | tuple[int, int]): 

765 """Set the access and modification times of a file.""" 

766 logger.debug("time attribution", dir_path=path, _verbosity=3) 

767 safe_path = self._get_extraction_path(path, "utime") 

768 try: 

769 os.utime(safe_path, times) 

770 except FileNotFoundError: 

771 self.record_problem( 

772 ExtractionProblem( 

773 problem=f"{safe_path} not found", resolution="Skipped" 

774 ) 

775 ) 

776 

777 def chmod(self, path: Path, mode: int): 

778 """Set the right bits of a file.""" 

779 logger.debug("change file mode bits") 

780 safe_path = self._get_extraction_path(path, "chmod") 

781 try: 

782 Path.chmod(safe_path, mode) 

783 except FileNotFoundError: 

784 self.record_problem( 

785 ExtractionProblem( 

786 problem=f"{safe_path} not found", resolution="Skipped" 

787 ) 

788 )