Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/file_utils.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

365 statements  

1import enum 

2import functools 

3import hashlib 

4import io 

5import math 

6import mmap 

7import os 

8import re 

9import shutil 

10import struct 

11import sys 

12import unicodedata 

13from collections.abc import Iterable, Iterator 

14from pathlib import Path 

15from typing import Literal, Optional, Protocol, Union, overload 

16 

17from dissect.cstruct import cstruct 

18from structlog import get_logger 

19 

20from .logging import format_hex 

21from .report import ( 

22 ExtractionProblem, 

23 LinkExtractionProblem, 

24 PathTraversalProblem, 

25 Report, 

26 SpecialFileExtractionProblem, 

27) 

28 

29DEFAULT_BUFSIZE = shutil.COPY_BUFSIZE # type: ignore 

30logger = get_logger() 

31 

32 

33def is_safe_path(basedir: Path, path: Path) -> bool: 

34 try: 

35 basedir.joinpath(path).resolve().relative_to(basedir.resolve()) 

36 except ValueError: 

37 return False 

38 return True 

39 

40 

41class SeekError(ValueError): 

42 """Specific ValueError for File.seek.""" 

43 

44 

45class File(mmap.mmap): 

46 access: int 

47 

48 @classmethod 

49 def from_bytes(cls, content: Union[bytes, bytearray]): 

50 if not content: 

51 raise ValueError("Can't create File from empty bytes.") 

52 m = cls(-1, len(content)) 

53 m.write(content) 

54 m.seek(0) 

55 m.access = mmap.ACCESS_WRITE 

56 return m 

57 

58 @classmethod 

59 def from_path(cls, path: Path, access=mmap.ACCESS_READ): 

60 """Create File. 

61 

62 Needs a valid non-empty file, 

63 raises ValueError on empty files. 

64 """ 

65 mode = "r+b" if access == mmap.ACCESS_WRITE else "rb" 

66 with path.open(mode) as base_file: 

67 m = cls(base_file.fileno(), 0, access=access) 

68 m.access = access 

69 return m 

70 

71 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: # pyright: ignore[reportIncompatibleMethodOverride] 

72 try: 

73 super().seek(pos, whence) 

74 except ValueError as e: 

75 raise SeekError from e 

76 return self.tell() 

77 

78 def size(self) -> int: 

79 size = 0 

80 try: 

81 size = super().size() 

82 except OSError: 

83 # the file was built with from_bytes() so it's not on disk, 

84 # triggering an OSError on fstat() call 

85 current_offset = self.tell() 

86 self.seek(0, io.SEEK_END) 

87 size = self.tell() 

88 self.seek(current_offset, io.SEEK_SET) 

89 

90 return size 

91 

92 def __enter__(self): 

93 return self 

94 

95 def __exit__(self, *args): 

96 self.close() 

97 

98 def readable(self) -> bool: 

99 return self.access in (mmap.ACCESS_READ, mmap.ACCESS_COPY) 

100 

101 def writable(self) -> bool: 

102 return self.access in (mmap.ACCESS_WRITE, mmap.ACCESS_COPY) 

103 

104 if sys.version_info < (3, 13): 

105 

106 def seekable(self) -> Literal[True]: 

107 return True # Memory-mapped files are always seekable 

108 

109 

110class OffsetFile: 

111 def __init__(self, file: File, offset: int): 

112 self._file = file 

113 self._offset = offset 

114 self._file.seek(offset) 

115 

116 def seek(self, pos: int, whence: int = os.SEEK_SET) -> int: 

117 if whence == os.SEEK_SET: 

118 pos += self._offset 

119 self._file.seek(pos, whence) 

120 return self._file.tell() - self._offset 

121 

122 def read(self, n=None): 

123 return self._file.read(n) 

124 

125 def tell(self): 

126 return self._file.tell() - self._offset 

127 

128 

129class InvalidInputFormat(Exception): 

130 pass 

131 

132 

133class Endian(enum.Enum): 

134 LITTLE = "<" 

135 BIG = ">" 

136 

137 

138def iterbits(file: File) -> Iterator[int]: 

139 """bit-wise reading of file in little-endian mode.""" 

140 while cur_bytes := file.read(DEFAULT_BUFSIZE): 

141 for b in cur_bytes: 

142 for i in range(7, -1, -1): 

143 yield (b >> i) & 1 

144 

145 

146def snull(content: bytes): 

147 """Strip null bytes from the end of the string.""" 

148 return content.rstrip(b"\x00") 

149 

150 

151def round_down(size: int, alignment: int): 

152 """Round down size to the alignment boundary.""" 

153 return alignment * math.floor(size / alignment) 

154 

155 

156def round_up(size: int, alignment: int): 

157 """Round up size to the alignment boundary.""" 

158 return alignment * math.ceil(size / alignment) 

159 

160 

161def convert_int8(value: bytes, endian: Endian) -> int: 

162 """Convert 1 byte integer to a Python int.""" 

163 try: 

164 return struct.unpack(f"{endian.value}B", value)[0] 

165 except struct.error as exc: 

166 raise InvalidInputFormat from exc 

167 

168 

169def convert_int16(value: bytes, endian: Endian) -> int: 

170 """Convert 2 byte integer to a Python int.""" 

171 try: 

172 return struct.unpack(f"{endian.value}H", value)[0] 

173 except struct.error as exc: 

174 raise InvalidInputFormat from exc 

175 

176 

177def convert_int32(value: bytes, endian: Endian) -> int: 

178 """Convert 4 byte integer to a Python int.""" 

179 try: 

180 return struct.unpack(f"{endian.value}I", value)[0] 

181 except struct.error as exc: 

182 raise InvalidInputFormat from exc 

183 

184 

185def convert_int64(value: bytes, endian: Endian) -> int: 

186 """Convert 8 byte integer to a Python int.""" 

187 try: 

188 return struct.unpack(f"{endian.value}Q", value)[0] 

189 except struct.error as exc: 

190 raise InvalidInputFormat from exc 

191 

192 

193def decode_int(value, base: int) -> int: 

194 try: 

195 return int(value, base) 

196 except ValueError as exc: 

197 raise InvalidInputFormat from exc 

198 

199 

200def decode_multibyte_integer(data: Union[bytes, bytearray]) -> tuple[int, int]: 

201 """Decode multi-bytes integer into integer size and integer value. 

202 

203 Multibyte integers of static length are stored in little endian byte order. 

204 

205 When smaller values are more likely than bigger values (for example file sizes), 

206 multibyte integers are encoded in a variable-length representation: 

207 - Numbers in the range [0, 127] are copied as is, and take one byte of space. 

208 - Bigger numbers will occupy two or more bytes. All but the last byte of the multibyte 

209 representation have the highest (eighth) bit set. 

210 """ 

211 value = 0 

212 for size, byte in enumerate(data): 

213 value |= (byte & 0x7F) << (size * 7) 

214 if not byte & 0x80: 

215 return (size + 1, value) 

216 raise InvalidInputFormat("Multibyte integer decoding failed.") 

217 

218 

219def iterate_patterns( 

220 file: File, pattern: bytes, chunk_size: int = 0x1000 

221) -> Iterator[int]: 

222 """Iterate on the file searching for pattern until all occurences has been found. 

223 

224 Seek the file pointer to the next byte of where we found the pattern or 

225 seek back to the initial position when the iterator is exhausted. 

226 """ 

227 if chunk_size < len(pattern): 

228 chunk_hex = format_hex(chunk_size) 

229 raise ValueError( 

230 f"Chunk size ({chunk_hex}) shouldn't be shorter than pattern's ({pattern}) length ({len(pattern)})!" 

231 ) 

232 

233 initial_position = file.tell() 

234 

235 compensation = len(pattern) - 1 

236 try: 

237 while True: 

238 current_position = file.tell() 

239 

240 # Prepend the padding from the last chunk, to make sure that we find the pattern, 

241 # even if it straddles the chunk boundary. 

242 data = file.read(chunk_size) 

243 if data == b"": 

244 # We've reached the end of the stream. 

245 return 

246 

247 if len(data) < len(pattern): 

248 # The length that we read from the file is the same 

249 # length or less than as the pattern we're looking 

250 # for, and we didn't find the pattern in there. 

251 return 

252 

253 marker = data.find(pattern) 

254 while marker != -1: 

255 found_pos = current_position + marker 

256 # Reset the file pointer so that calling code cannot 

257 # depend on the side-effect of this iterator advancing 

258 # it. 

259 file.seek(initial_position) 

260 yield found_pos 

261 # We want to seek past the found position to the next byte, 

262 # so we can call find_first again without extra seek 

263 # This might seek past the actual end of the file 

264 file.seek(found_pos + len(pattern)) 

265 marker = data.find(pattern, marker + len(pattern)) 

266 

267 file.seek(-compensation, os.SEEK_CUR) 

268 finally: 

269 file.seek(initial_position) 

270 

271 

272class RandomReader(Protocol): 

273 # File implements this interface 

274 

275 @overload 

276 def read(self) -> bytes: ... 

277 @overload 

278 def read(self, n: int, /) -> bytes: ... 

279 def seek(self, pos: int, /, whence: int = io.SEEK_SET) -> int: ... 

280 

281 

282def iterate_file( 

283 file: RandomReader, 

284 start_offset: int, 

285 size: int, 

286 # default buffer size in shutil for unix based systems 

287 buffer_size: int = DEFAULT_BUFSIZE, 

288) -> Iterator[bytes]: 

289 if buffer_size <= 0: 

290 raise ValueError( 

291 "The file needs to be read until a specific size, so buffer_size must be greater than 0" 

292 ) 

293 

294 read_bytes = 0 

295 file.seek(start_offset) 

296 file_read = file.read 

297 while read_bytes < size: 

298 remaining = size - read_bytes 

299 buffer_size = min(remaining, buffer_size) 

300 read_bytes += buffer_size 

301 data = file_read(buffer_size) 

302 

303 if data == b"": 

304 # We've reached the end of the stream. 

305 break 

306 

307 yield data 

308 

309 

310def carve(carve_path: Path, file: RandomReader, start_offset: int, size: int): 

311 """Extract part of a file.""" 

312 carve_path.parent.mkdir(parents=True, exist_ok=True) 

313 

314 with carve_path.open("xb") as f: 

315 for data in iterate_file(file, start_offset, size): 

316 f.write(data) 

317 

318 

319def stream_scan(scanner, file: File): 

320 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode.""" 

321 scanner.scan(file, DEFAULT_BUFSIZE) 

322 

323 

324class StructParser: 

325 """Wrapper for dissect.cstruct to handle different endianness parsing dynamically.""" 

326 

327 def __init__(self, definitions: str): 

328 self._definitions = definitions 

329 self.__cparser_le = None 

330 self.__cparser_be = None 

331 

332 @property 

333 def cparser_le(self): 

334 if self.__cparser_le is None: 

335 # Default endianness is little 

336 self.__cparser_le = cstruct() 

337 self.__cparser_le.load(self._definitions) 

338 return self.__cparser_le 

339 

340 @property 

341 def cparser_be(self): 

342 if self.__cparser_be is None: 

343 self.__cparser_be = cstruct(endian=">") 

344 self.__cparser_be.load(self._definitions) 

345 return self.__cparser_be 

346 

347 def parse( 

348 self, 

349 struct_name: str, 

350 file: Union[File, bytes], 

351 endian: Endian, 

352 ): 

353 cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be 

354 struct_parser = getattr(cparser, struct_name) 

355 return struct_parser(file) 

356 

357 

358def get_endian(file: File, big_endian_magic: int) -> Endian: 

359 """Read a four bytes magic and derive endianness from it. 

360 

361 It compares the read data with the big endian magic and then seeks back 

362 the amount of read bytes. 

363 """ 

364 if big_endian_magic > 0xFF_FF_FF_FF: 

365 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

366 magic_bytes = file.read(4) 

367 file.seek(-len(magic_bytes), io.SEEK_CUR) 

368 magic = convert_int32(magic_bytes, Endian.BIG) 

369 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

370 

371 

372def get_endian_short(file: File, big_endian_magic: int) -> Endian: 

373 """Read a two bytes magic and derive endianness from it. 

374 

375 It compares the read data with the big endian magic and then seeks back 

376 the amount of read bytes. 

377 """ 

378 if big_endian_magic > 0xFF_FF: 

379 raise ValueError("big_endian_magic is larger than a 16 bit integer.") 

380 magic_bytes = file.read(2) 

381 file.seek(-len(magic_bytes), io.SEEK_CUR) 

382 magic = convert_int16(magic_bytes, Endian.BIG) 

383 return Endian.BIG if magic == big_endian_magic else Endian.LITTLE 

384 

385 

386def get_endian_multi(file: File, big_endian_magics: list[int]) -> Endian: 

387 """Read a four bytes magic and derive endianness from it. 

388 

389 It compares the read data with the big endian magic. It reads 

390 four bytes and seeks back after that. 

391 """ 

392 if any(big_endian_magic > 0xFF_FF_FF_FF for big_endian_magic in big_endian_magics): 

393 raise ValueError("big_endian_magic is larger than a 32 bit integer.") 

394 magic_bytes = file.read(4) 

395 file.seek(-len(magic_bytes), io.SEEK_CUR) 

396 magic = convert_int32(magic_bytes, Endian.BIG) 

397 return ( 

398 Endian.BIG 

399 if any((magic == big_endian_magic) for big_endian_magic in big_endian_magics) 

400 else Endian.LITTLE 

401 ) 

402 

403 

404def read_until_past(file: File, pattern: bytes): 

405 """Read until the bytes are not 0x00 or 0xff.""" 

406 while True: 

407 next_byte = file.read(1) 

408 if next_byte == b"": 

409 # We've hit the EoF 

410 return file.tell() 

411 if next_byte not in pattern: 

412 return file.tell() - 1 

413 

414 

415def chop_root(path: Path): 

416 """Make absolute paths relative by chopping off the root.""" 

417 if not path.is_absolute(): 

418 return path 

419 

420 relative_parts = list(path.parts[1:]) 

421 return Path("/".join(relative_parts)) 

422 

423 

424def make_lost_and_found_path(path: Path) -> Path: 

425 """Make a human readable, safe path.""" 

426 dir_path = path.parent 

427 

428 # . and .. would not be a valid filename, but they would lead to confusion 

429 filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name) 

430 dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest() 

431 

432 # adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python 

433 dir_slug = str(dir_path) 

434 dir_slug = unicodedata.normalize("NFKD", dir_slug) 

435 dir_slug = dir_slug.encode("ascii", "ignore").lower() 

436 dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-") 

437 dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode() 

438 

439 return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}") 

440 

441 

442class _FSPath: 

443 def __init__(self, *, root: Path, path: Path) -> None: 

444 self.root = root 

445 self.relative_path = chop_root(path) 

446 absolute_path = root / self.relative_path 

447 self.is_safe = is_safe_path(root, absolute_path) 

448 

449 if self.is_safe: 

450 self.safe_relative_path = self.relative_path 

451 self.absolute_path = absolute_path 

452 else: 

453 self.safe_relative_path = make_lost_and_found_path(path) 

454 self.absolute_path = root / self.safe_relative_path 

455 assert is_safe_path(root, self.absolute_path) 

456 

457 

458class _FSLink: 

459 def __init__(self, *, root: Path, src: Path, dst: Path) -> None: 

460 self.dst = _FSPath(root=root, path=dst) 

461 self.src = _FSPath(root=root, path=src) 

462 self.is_safe = self.dst.is_safe and self.src.is_safe 

463 

464 def format_report( 

465 self, description, resolution="Skipped." 

466 ) -> LinkExtractionProblem: 

467 return LinkExtractionProblem( 

468 problem=description, 

469 resolution=resolution, 

470 path=str(self.dst.relative_path), 

471 link_path=str(self.src.relative_path), 

472 ) 

473 

474 

475class FileSystem: 

476 """Restricts file system operations to a directory. 

477 

478 Path traversal violations are collected as a list of :ExtractionProblem:-s 

479 and not reported immediately - violating operations looks like successful for the caller. 

480 

481 All input paths are interpreted as relative to the root directory. 

482 Absolute paths are converted to relative paths by dropping the root /. 

483 There is one exception to this universal base: symlink targets, 

484 which are relative to the directory containing the symbolic link, because 

485 this is how symlinks work. 

486 """ 

487 

488 problems: list[Report] 

489 

490 def __init__(self, root: Path): 

491 self.root = root.resolve() 

492 self.problems = [] 

493 

494 def record_problem(self, problem: ExtractionProblem): 

495 self.problems.append(problem) 

496 problem.log_with(logger) 

497 

498 @functools.cached_property 

499 def has_root_permissions(self): 

500 return os.geteuid() == 0 

501 

502 def _fs_path(self, path: Path) -> _FSPath: 

503 return _FSPath(root=self.root, path=path) 

504 

505 def _ensure_parent_dir(self, path: Path): 

506 path.parent.mkdir(parents=True, exist_ok=True) 

507 

508 def _get_extraction_path(self, path: Path, path_use_description: str) -> Path: 

509 fs_path = self._fs_path(path) 

510 

511 if not fs_path.is_safe: 

512 report = PathTraversalProblem( 

513 path=str(fs_path.relative_path), 

514 extraction_path=str(fs_path.safe_relative_path), 

515 problem=f"Potential path traversal through {path_use_description}", 

516 resolution="Redirected.", 

517 ) 

518 self.record_problem(report) 

519 

520 return fs_path.absolute_path 

521 

522 def write_bytes(self, path: Path, content: bytes): 

523 logger.debug("creating file", file_path=path, _verbosity=3) 

524 safe_path = self._get_extraction_path(path, "write_bytes") 

525 

526 self._ensure_parent_dir(safe_path) 

527 safe_path.write_bytes(content) 

528 

529 def write_chunks(self, path: Path, chunks: Iterable[bytes]): 

530 logger.debug("creating file", file_path=path, _verbosity=3) 

531 safe_path = self._get_extraction_path(path, "write_chunks") 

532 

533 self._ensure_parent_dir(safe_path) 

534 with safe_path.open("wb") as f: 

535 for chunk in chunks: 

536 f.write(chunk) 

537 

538 def carve(self, path: Path, file: File, start_offset: int, size: int): 

539 logger.debug("carving file", path=path, _verbosity=3) 

540 safe_path = self._get_extraction_path(path, "carve") 

541 

542 self._ensure_parent_dir(safe_path) 

543 carve(safe_path, file, start_offset, size) 

544 

545 def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False): 

546 logger.debug("creating directory", dir_path=path, _verbosity=3) 

547 safe_path = self._get_extraction_path(path, "mkdir") 

548 

549 safe_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) 

550 

551 def mkfifo(self, path: Path, mode=0o666): 

552 logger.debug("creating fifo", path=path, _verbosity=3) 

553 safe_path = self._get_extraction_path(path, "mkfifo") 

554 

555 self._ensure_parent_dir(safe_path) 

556 os.mkfifo(safe_path, mode=mode) 

557 

558 def mknod(self, path: Path, mode=0o600, device=0): 

559 logger.debug("creating special file", special_path=path, _verbosity=3) 

560 safe_path = self._get_extraction_path(path, "mknod") 

561 

562 if self.has_root_permissions: 

563 self._ensure_parent_dir(safe_path) 

564 os.mknod(safe_path, mode=mode, device=device) 

565 else: 

566 problem = SpecialFileExtractionProblem( 

567 problem="Root privileges are required to create block and char devices.", 

568 resolution="Skipped.", 

569 path=str(path), 

570 mode=mode, 

571 device=device, 

572 ) 

573 self.record_problem(problem) 

574 

575 def _get_checked_link(self, src: Path, dst: Path) -> Optional[_FSLink]: 

576 link = _FSLink(root=self.root, src=src, dst=dst) 

577 if link.is_safe: 

578 return link 

579 

580 self.record_problem(link.format_report("Potential path traversal through link")) 

581 return None 

582 

583 def _path_to_root(self, from_dir: Path) -> Path: 

584 # This version does not look at the existing symlinks, so while it looks cleaner it is also 

585 # somewhat less precise: 

586 # 

587 # os.path.relpath(self.root, start=self.root / chop_root(from_dir)) 

588 # 

589 # In contrast, the below version looks like a kludge, but using .resolve() actually 

590 # calculates the correct path in more cases, even if it can still give a bad result due 

591 # to ordering of symlink creation and resolve defaulting to non-strict checking. 

592 # Calculation unfortunately might fall back to the potentially wrong string interpretation, 

593 # which is the same as os.path.relpath, sharing the same failure case. 

594 # Ultimately we can not easily catch all symlink based path traversals here, so there 

595 # still remains work for `unblob.extractor.fix_symlink()` 

596 # 

597 absolute_from_dir = (self.root / chop_root(from_dir)).resolve() 

598 ups = len(absolute_from_dir.parts) - len(self.root.parts) 

599 return Path("/".join(["."] + [".."] * ups)) 

600 

601 def create_symlink(self, src: Path, dst: Path): 

602 """Create a symlink dst with the link/content/target src.""" 

603 logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3) 

604 

605 if src.is_absolute(): 

606 # convert absolute paths to dst relative paths 

607 # these would point to the same path if self.root would be the real root "/" 

608 # but they are relocatable 

609 src = self._path_to_root(dst.parent) / chop_root(src) 

610 

611 safe_link = self._get_checked_link(src=dst.parent / src, dst=dst) 

612 

613 if safe_link: 

614 dst = safe_link.dst.absolute_path 

615 self._ensure_parent_dir(dst) 

616 dst.symlink_to(src) 

617 

618 def create_hardlink(self, src: Path, dst: Path): 

619 """Create a new hardlink dst to the existing file src.""" 

620 logger.debug("creating hardlink", file_path=dst, link_target=src, _verbosity=3) 

621 safe_link = self._get_checked_link(src=src, dst=dst) 

622 

623 if safe_link: 

624 try: 

625 src = safe_link.src.absolute_path 

626 dst = safe_link.dst.absolute_path 

627 self._ensure_parent_dir(dst) 

628 os.link(src, dst) 

629 # FIXME: from python 3.10 change the above to 

630 # dst.hardlink_to(src) 

631 # so as to make it consistent with create_symlink 

632 # (see Path.link_to vs Path.hardlink_to parameter order mess up) 

633 except FileNotFoundError: 

634 self.record_problem( 

635 safe_link.format_report("Hard link target does not exist.") 

636 ) 

637 except PermissionError: 

638 not_enough_privileges = ( 

639 "Not enough privileges to create hardlink to block/char device." 

640 ) 

641 self.record_problem(safe_link.format_report(not_enough_privileges)) 

642 

643 def open( 

644 self, path, mode: Literal["wb+", "rb+", "xb+"] = "wb+" 

645 ) -> io.BufferedRandom: 

646 """Create/open binary file for random access read-writing. 

647 

648 There is no intention in supporting anything other than binary files opened for random access. 

649 """ 

650 logger.debug("create/open binary file for writing", file_path=path) 

651 safe_path = self._get_extraction_path(path, "open") 

652 

653 self._ensure_parent_dir(safe_path) 

654 return safe_path.open(mode) 

655 

656 def unlink(self, path): 

657 """Delete file within extraction path.""" 

658 logger.debug("unlink file", file_path=path, _verbosity=3) 

659 safe_path = self._get_extraction_path(path, "unlink") 

660 

661 safe_path.unlink(missing_ok=True)