Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/file_utils.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

360 statements  

1import enum 

2import functools 

3import hashlib 

4import io 

5import math 

6import mmap 

7import os 

8import re 

9import shutil 

10import struct 

11import sys 

12import unicodedata 

13from collections.abc import Iterable, Iterator 

14from pathlib import Path 

15from typing import Literal, Protocol, overload 

16 

17from dissect.cstruct import cstruct 

18from structlog import get_logger 

19 

20from .logging import format_hex 

21from .report import ( 

22 ExtractionProblem, 

23 LinkExtractionProblem, 

24 PathTraversalProblem, 

25 Report, 

26 SpecialFileExtractionProblem, 

27) 

28 

29DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore 

30logger = get_logger() 

31 

32 

33def is_safe_path(basedir: Path, path: Path) -> bool: 

34 try: 

35 basedir.joinpath(path).resolve().relative_to(basedir.resolve()) 

36 except ValueError: 

37 return False 

38 return True 

39 

40 

41class SeekError(ValueError): 

42 """Specific ValueError for File.seek.""" 

43 

44 

45class File(mmap.mmap): 

46 access: int 

47 

48 @classmethod 

49 def from_bytes(cls, content: bytes | bytearray): 

50 if not content: 

51 raise ValueError("Can't create File from empty bytes.") 

52 m = cls(-1, len(content)) 

53 m.write(content) 

54 m.seek(0) 

55 m.access = mmap.ACCESS_WRITE 

56 m.madvise(mmap.MADV_SEQUENTIAL) 

57 return m 

58 

59 @classmethod 

60 def from_path(cls, path: Path, access=mmap.ACCESS_READ): 

61 """Create File. 

62 

63 Needs a valid non-empty file, 

64 raises ValueError on empty files. 

65 """ 

66 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb" 

67 with path.open(mode) as base_file: 

68 m = cls(base_file.fileno(), 0, access=access) 

69 m.access = access 

70 m.madvise(mmap.MADV_SEQUENTIAL) 

71 return m 

72 

73 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride] 

74 try: 

75 super().seek(pos, whence) # pyright: ignore[reportArgumentType] 

76 except ValueError as e: 

77 raise SeekError from e 

78 return self.tell() 

79 

80 def size(self) -> int: 

81 return len(self) 

82 

83 def __enter__(self): 

84 return self 

85 

86 def __exit__(self, *args): 

87 self.close() 

88 

89 def readable(self) -> bool: 

90 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY) 

91 

92 def writable(self) -> bool: 

93 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY) 

94 

95 if sys.version_info < (3, 13): 

96 

97 def seekable(self) -> Literal[True]: 

98 return True # Memory-mapped files are always seekable 

99 

100 

101class OffsetFile: 

102 def __init__(self, file: File, offset: int): 

103 self._file = file 

104 self._offset = offset 

105 self._file.seek(offset) 

106 

107 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: 

108 if whence == os.SEEK_SET: 

109 pos += self._offset 

110 self._file.seek(pos, whence) 

111 return self._file.tell() - self._offset 

112 

113 def read(self, n=None): 

114 return self._file.read(n) 

115 

116 def tell(self): 

117 return self._file.tell() - self._offset 

118 

119 

120class InvalidInputFormat(Exception): 

121 pass 

122 

123 

124class Endian(enum.Enum): 

125 LITTLE = "<" 

126 BIG = ">" 

127 

128 

129def iterbits(file: File) -> Iterator[int]: 

130 """bit-wise reading of file in little-endian mode.""" 

131 while cur_bytes := file.read(DEFAULT_BUFSIZE): 

132 for b in cur_bytes: 

133 for i in range(7, -1, -1): 

134 yield (b >> i) & 1 

135 

136 

137def snull(content: bytes): 

138 """Strip null bytes from the end of the string.""" 

139 return content.rstrip(b"\x00") 

140 

141 

142def round_down(size: int, alignment: int): 

143 """Round down size to the alignment boundary.""" 

144 return alignment * math.floor(size / alignment) 

145 

146 

147def round_up(size: int, alignment: int): 

148 """Round up size to the alignment boundary.""" 

149 return alignment * math.ceil(size / alignment) 

150 

151 

152def convert_int8(value: bytes, endian: Endian) -> int: 

153 """Convert 1 byte integer to a Python int.""" 

154 try: 

155 return struct.unpack(f"{endian.value}B", value)[0] 

156 except struct.error as exc: 

157 raise InvalidInputFormat from exc 

158 

159 

160def convert_int16(value: bytes, endian: Endian) -> int: 

161 """Convert 2 byte integer to a Python int.""" 

162 try: 

163 return struct.unpack(f"{endian.value}H", value)[0] 

164 except struct.error as exc: 

165 raise InvalidInputFormat from exc 

166 

167 

168def convert_int32(value: bytes, endian: Endian) -> int: 

169 """Convert 4 byte integer to a Python int.""" 

170 try: 

171 return struct.unpack(f"{endian.value}I", value)[0] 

172 except struct.error as exc: 

173 raise InvalidInputFormat from exc 

174 

175 

176def convert_int64(value: bytes, endian: Endian) -> int: 

177 """Convert 8 byte integer to a Python int.""" 

178 try: 

179 return struct.unpack(f"{endian.value}Q", value)[0] 

180 except struct.error as exc: 

181 raise InvalidInputFormat from exc 

182 

183 

184def decode_int(value, base: int) -> int: 

185 try: 

186 return int(value, base) 

187 except ValueError as exc: 

188 raise InvalidInputFormat from exc 

189 

190 

191def decode_multibyte_integer(data: bytes | bytearray) -> tuple[int, int]: 

192 """Decode multi-bytes integer into integer size and integer value. 

193 

194 Multibyte integers of static length are stored in little endian byte order. 

195 

196 When smaller values are more likely than bigger values (for example file sizes), 

197 multibyte integers are encoded in a variable-length representation: 

198 - Numbers in the range [0, 127] are copied as is, and take one byte of space. 

199 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte 

200 representation have the highest (eighth) bit set. 

201 """ 

202 value = 0 

203 for size, byte in enumerate(data): 

204 value |= (byte & 0x7F) << (size * 7) 

205 if not byte & 0x80: 

206 return (size + 1, value) 

207 raise InvalidInputFormat("Multibyte integer decoding failed.") 

208 

209 

210def iterate_patterns( 

211 file: File, pattern: bytes, chunk_size: int = 0x1000 

212) -> Iterator[int]: 

213 """Iterate on the file searching for pattern until all occurences has been found. 

214 

215 Seek the file pointer to the next byte of where we found the pattern or 

216 seek back to the initial position when the iterator is exhausted. 

217 """ 

218 if chunk_size < len(pattern): 

219 chunk_hex = format_hex(chunk_size) 

220 raise ValueError( 

221 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!" 

222 ) 

223 

224 initial_position = file.tell() 

225 

226 compensation = len(pattern) - 1 

227 try: 

228 while True: 

229 current_position = file.tell() 

230 

231 # Prepend the padding from the last chunk, to make sure that we find the pattern, 

232 # even if it straddles the chunk boundary. 

233 data = file.read(chunk_size) 

234 if data == b"": 

235 # We've reached the end of the stream. 

236 return 

237 

238 if len(data) < len(pattern): 

239 # The length that we read from the file is the same 

240 # length or less than as the pattern we're looking 

241 # for, and we didn't find the pattern in there. 

242 return 

243 

244 marker = data.find(pattern) 

245 while marker != -1: 

246 found_pos = current_position + marker 

247 # Reset the file pointer so that calling code cannot 

248 # depend on the side-effect of this iterator advancing 

249 # it. 

250 file.seek(initial_position) 

251 yield found_pos 

252 # We want to seek past the found position to the next byte, 

253 # so we can call find_first again without extra seek 

254 # This might seek past the actual end of the file 

255 file.seek(found_pos + len(pattern)) 

256 marker = data.find(pattern, marker + len(pattern)) 

257 

258 file.seek(-compensation, os.SEEK_CUR) 

259 finally: 

260 file.seek(initial_position) 

261 

262 

263class RandomReader(Protocol): 

264 # File implements this interface 

265 

266 @overload 

267 def read(self) -> bytes: ... 

268 @overload 

269 def read(self, n: int, /) -> bytes: ... 

270 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ... 

271 

272 

273def iterate_file( 

274 file: RandomReader, 

275 start_offset: int, 

276 size: int, 

277 # default buffer size in shutil for unix based systems 

278 buffer_size: int = DEFAULT_BUFSIZE, 

279) -> Iterator[bytes]: 

280 if buffer_size <= 0: 

281 raise ValueError( 

282 "The file needs to be read until a specific size, so buffer_size must be greater than 0" 

283 ) 

284 

285 read_bytes = 0 

286 file.seek(start_offset) 

287 file_read = file.read 

288 while read_bytes < size: 

289 remaining = size - read_bytes 

290 buffer_size = min(remaining, buffer_size) 

291 read_bytes += buffer_size 

292 data = file_read(buffer_size) 

293 

294 if data == b"": 

295 # We've reached the end of the stream. 

296 break 

297 

298 yield data 

299 

300 

301def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int): 

302 """Extract part of a file.""" 

303 carve_path.parent.mkdir(parents=True, exist_ok=True) 

304 

305 with carve_path.open("xb") as f: 

306 for data in iterate_file(file, start_offset, size): 

307 f.write(data) 

308 

309 

310def stream_scan(scanner, file: File): 

311 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode.""" 

312 scanner.scan(file, DEFAULT_BUFSIZE) 

313 

314 

315class StructParser: 

316 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically.""" 

317 

318 def __init__(self, definitions: str): 

319 self._definitions = definitions 

320 self.__cparser_le = None 

321 self.__cparser_be = None 

322 

323 @property 

324 def cparser_le(self): 

325 if self.__cparser_le is None: 

326 # Default endianness is little 

327 self.__cparser_le = cstruct() 

328 self.__cparser_le.load(self._definitions) 

329 return self.__cparser_le 

330 

331 @property 

332 def cparser_be(self): 

333 if self.__cparser_be is None: 

334 self.__cparser_be = cstruct(endian=">") 

335 self.__cparser_be.load(self._definitions) 

336 return self.__cparser_be 

337 

338 def parse( 

339 self, 

340 struct_name: str, 

341 file: File | bytes, 

342 endian: Endian, 

343 ): 

344 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be 

345 struct_parser = getattr(cparser, struct_name) 

346 return struct_parser(file) 

347 

348 

349def get_endian(file: File, big_endian_magic: int) -> Endian: 

350 """Read a four bytes magic and derive endianness from it. 

351 

352 It compares the read data with the big endian magic and then seeks back 

353 the amount of read bytes. 

354 """ 

355 if big_endian_magic > 0xFF_FF_FF_FF: 

356 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

357 magic_bytes = file.read(4) 

358 file.seek(-len(magic_bytes), io.SEEK_CUR) 

359 magic = convert_int32(magic_bytes, Endian.BIG) 

360 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

361 

362 

363def get_endian_short(file: File, big_endian_magic: int) -> Endian: 

364 """Read a two bytes magic and derive endianness from it. 

365 

366 It compares the read data with the big endian magic and then seeks back 

367 the amount of read bytes. 

368 """ 

369 if big_endian_magic > 0xFF_FF: 

370 raise ValueError("big_endian_magic is larger than a 16 bit integer.") 

371 magic_bytes = file.read(2) 

372 file.seek(-len(magic_bytes), io.SEEK_CUR) 

373 magic = convert_int16(magic_bytes, Endian.BIG) 

374 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

375 

376 

377def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian: 

378 """Read a four bytes magic and derive endianness from it. 

379 

380 It compares the read data with the big endian magic. It reads 

381 four bytes and seeks back after that. 

382 """ 

383 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics): 

384 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

385 magic_bytes = file.read(4) 

386 file.seek(-len(magic_bytes), io.SEEK_CUR) 

387 magic = convert_int32(magic_bytes, Endian.BIG) 

388 return ( 

389 Endian.BIG 

390 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics) 

391 else Endian.LITTLE 

392 ) 

393 

394 

395def read_until_past(file: File, pattern: bytes): 

396 """Read until the bytes are not 0x00 or 0xff.""" 

397 while True: 

398 next_byte = file.read(1) 

399 if next_byte == b"": 

400 # We've hit the EoF 

401 return file.tell() 

402 if next_byte not in pattern: 

403 return file.tell() - 1 

404 

405 

406def chop_root(path: Path): 

407 """Make absolute paths relative by chopping off the root.""" 

408 if not path.is_absolute(): 

409 return path 

410 

411 relative_parts = list(path.parts[1:]) 

412 return Path("/".join(relative_parts)) 

413 

414 

415def make_lost_and_found_path(path: Path) -> Path: 

416 """Make a human readable, safe path.""" 

417 dir_path = path.parent 

418 

419 # . and .. would not be a valid filename, but they would lead to confusion 

420 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name) 

421 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest() 

422 

423 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python 

424 dir_slug = str(dir_path) 

425 dir_slug = unicodedata.normalize("NFKD", dir_slug) 

426 dir_slug = dir_slug.encode("ascii", "ignore").lower() 

427 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-") 

428 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode() 

429 

430 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}") 

431 

432 

433class _FSPath: 

434 def __init__(self, *, root: Path, path: Path) -> None: 

435 self.root = root 

436 self.relative_path = chop_root(path) 

437 absolute_path = root / self.relative_path 

438 self.is_safe = is_safe_path(root, absolute_path) 

439 

440 if self.is_safe: 

441 self.safe_relative_path = self.relative_path 

442 self.absolute_path = absolute_path 

443 else: 

444 self.safe_relative_path = make_lost_and_found_path(path) 

445 self.absolute_path = root / self.safe_relative_path 

446 assert is_safe_path(root, self.absolute_path) 

447 

448 

449class _FSLink: 

450 def __init__(self, *, root: Path, src: Path, dst: Path) -> None: 

451 self.dst = _FSPath(root=root, path=dst) 

452 self.src = _FSPath(root=root, path=src) 

453 self.is_safe = self.dst.is_safe and self.src.is_safe 

454 

455 def format_report( 

456 self, description, resolution="Skipped." 

457 ) -> LinkExtractionProblem: 

458 return LinkExtractionProblem( 

459 problem=description, 

460 resolution=resolution, 

461 path=str(self.dst.relative_path), 

462 link_path=str(self.src.relative_path), 

463 ) 

464 

465 

466class FileSystem: 

467 """Restricts file system operations to a directory. 

468 

469 Path traversal violations are collected as a list of :ExtractionProblem:-s 

470 and not reported immediately - violating operations looks like successful for the caller. 

471 

472 All input paths are interpreted as relative to the root directory. 

473 Absolute paths are converted to relative paths by dropping the root /. 

474 There is one exception to this universal base: symlink targets, 

475 which are relative to the directory containing the symbolic link, because 

476 this is how symlinks work. 

477 """ 

478 

479 problems: list[Report] 

480 

481 def __init__(self, root: Path): 

482 self.root = root.resolve() 

483 self.problems = [] 

484 

485 def record_problem(self, problem: ExtractionProblem): 

486 self.problems.append(problem) 

487 problem.log_with(logger) 

488 

489 @functools.cached_property 

490 def has_root_permissions(self): 

491 return os.geteuid() == 0 

492 

493 def _fs_path(self, path: Path) -> _FSPath: 

494 return _FSPath(root=self.root, path=path) 

495 

496 def _ensure_parent_dir(self, path: Path): 

497 path.parent.mkdir(parents=True, exist_ok=True) 

498 

499 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path: 

500 fs_path = self._fs_path(path) 

501 

502 if not fs_path.is_safe: 

503 report = PathTraversalProblem( 

504 path=str(fs_path.relative_path), 

505 extraction_path=str(fs_path.safe_relative_path), 

506 problem=f"Potential path traversal through {path_use_description}", 

507 resolution="Redirected.", 

508 ) 

509 self.record_problem(report) 

510 

511 return fs_path.absolute_path 

512 

513 def write_bytes(self, path: Path, content: bytes): 

514 logger.debug("creating file", file_path=path, _verbosity=3) 

515 safe_path = self._get_extraction_path(path, "write_bytes") 

516 

517 self._ensure_parent_dir(safe_path) 

518 safe_path.write_bytes(content) 

519 

520 def write_chunks(self, path: Path, chunks: Iterable[bytes]): 

521 logger.debug("creating file", file_path=path, _verbosity=3) 

522 safe_path = self._get_extraction_path(path, "write_chunks") 

523 

524 self._ensure_parent_dir(safe_path) 

525 with safe_path.open("wb") as f: 

526 for chunk in chunks: 

527 f.write(chunk) 

528 

529 def carve(self, path: Path, file: File, start_offset: int, size: int): 

530 logger.debug("carving file", path=path, _verbosity=3) 

531 safe_path = self._get_extraction_path(path, "carve") 

532 

533 self._ensure_parent_dir(safe_path) 

534 carve(safe_path, file, start_offset, size) 

535 

536 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False): 

537 logger.debug("creating directory", dir_path=path, _verbosity=3) 

538 safe_path = self._get_extraction_path(path, "mkdir") 

539 

540 # Directories with restrictive permission bits (e.g. 0o000) immediately 

541 # block creation of nested entries, so force owner rwx during extraction. 

542 safe_mode = mode | 0o700 

543 safe_path.mkdir(mode=safe_mode, parents=parents, exist_ok=exist_ok) 

544 

545 def mkfifo(self, path: Path, mode=0o666): 

546 logger.debug("creating fifo", path=path, _verbosity=3) 

547 safe_path = self._get_extraction_path(path, "mkfifo") 

548 

549 self._ensure_parent_dir(safe_path) 

550 os.mkfifo(safe_path, mode=mode) 

551 

552 def mknod(self, path: Path, mode=0o600, device=0): 

553 logger.debug("creating special file", special_path=path, _verbosity=3) 

554 safe_path = self._get_extraction_path(path, "mknod") 

555 

556 if self.has_root_permissions: 

557 self._ensure_parent_dir(safe_path) 

558 os.mknod(safe_path, mode=mode, device=device) 

559 else: 

560 problem = SpecialFileExtractionProblem( 

561 problem="Root privileges are required to create block and char devices.", 

562 resolution="Skipped.", 

563 path=str(path), 

564 mode=mode, 

565 device=device, 

566 ) 

567 self.record_problem(problem) 

568 

569 def _get_checked_link(self, src: Path, dst: Path) -> _FSLink | None: 

570 link = _FSLink(root=self.root, src=src, dst=dst) 

571 if link.is_safe: 

572 return link 

573 

574 self.record_problem(link.format_report("Potential path traversal through link")) 

575 return None 

576 

577 def _path_to_root(self, from_dir: Path) -> Path: 

578 # This version does not look at the existing symlinks, so while it looks cleaner it is also 

579 # somewhat less precise: 

580 # 

581 # os.path.relpath(self.root, start=self.root / chop_root(from_dir)) 

582 # 

583 # In contrast, the below version looks like a kludge, but using .resolve() actually 

584 # calculates the correct path in more cases, even if it can still give a bad result due 

585 # to ordering of symlink creation and resolve defaulting to non-strict checking. 

586 # Calculation unfortunately might fall back to the potentially wrong string interpretation, 

587 # which is the same as os.path.relpath, sharing the same failure case. 

588 # Ultimately we can not easily catch all symlink based path traversals here, so there 

589 # still remains work for `unblob.extractor.fix_symlink()` 

590 # 

591 absolute_from_dir = (self.root / chop_root(from_dir)).resolve() 

592 ups = len(absolute_from_dir.parts) - len(self.root.parts) 

593 return Path("/".join(["."] + [".."] * ups)) 

594 

595 def create_symlink(self, src: Path, dst: Path): 

596 """Create a symlink dst with the link/content/target src.""" 

597 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3) 

598 

599 if src.is_absolute(): 

600 # convert absolute paths to dst relative paths 

601 # these would point to the same path if self.root would be the real root "/" 

602 # but they are relocatable 

603 src = self._path_to_root(dst.parent) / chop_root(src) 

604 

605 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst) 

606 

607 if safe_link: 

608 dst = safe_link.dst.absolute_path 

609 self._ensure_parent_dir(dst) 

610 dst.symlink_to(src) 

611 

612 def create_hardlink(self, src: Path, dst: Path): 

613 """Create a new hardlink dst to the existing file src.""" 

614 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3) 

615 safe_link = self._get_checked_link(src=src, dst=dst) 

616 

617 if safe_link: 

618 try: 

619 src = safe_link.src.absolute_path 

620 dst = safe_link.dst.absolute_path 

621 self._ensure_parent_dir(dst) 

622 os.link(src, dst) 

623 # FIXME: from python 3.10 change the above to 

624 # dst.hardlink_to(src) 

625 # so as to make it consistent with create_symlink 

626 # (see Path.link_to vs Path.hardlink_to parameter order mess up) 

627 except FileNotFoundError: 

628 self.record_problem( 

629 safe_link.format_report("Hard link target does not exist.") 

630 ) 

631 except PermissionError: 

632 not_enough_privileges = ( 

633 "Not enough privileges to create hardlink to block/char device." 

634 ) 

635 self.record_problem(safe_link.format_report(not_enough_privileges)) 

636 

637 def open( 

638 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+" 

639 ) -> io.BufferedRandom: 

640 """Create/open binary file for random access read-writing. 

641 

642 There is no intention in supporting anything other than binary files opened for random access. 

643 """ 

644 logger.debug("create/open binary file for writing", file_path=path) 

645 safe_path = self._get_extraction_path(path, "open") 

646 

647 self._ensure_parent_dir(safe_path) 

648 return safe_path.open(mode) 

649 

650 def unlink(self, path): 

651 """Delete file within extraction path.""" 

652 logger.debug("unlink file", file_path=path, _verbosity=3) 

653 safe_path = self._get_extraction_path(path, "unlink") 

654 

655 safe_path.unlink(missing_ok=True)