Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/extractor.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""File extraction related functions."""
3import errno
4import os
5from pathlib import Path
7from structlog import get_logger
9from .file_utils import carve, is_safe_path
10from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
11from .report import MaliciousSymlinkRemoved
13logger = get_logger()
15FILE_PERMISSION_MASK = 0o644
16DIR_PERMISSION_MASK = 0o775
19def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk):
20 """Extract valid chunk to a file, which we then pass to another tool to extract it."""
21 logger.debug("Carving chunk", path=carve_path)
22 carve(carve_path, file, chunk.start_offset, chunk.size)
25def fix_permission(path: Path):
26 if path.is_symlink():
27 return
29 if not path.exists():
30 return
32 mode = path.stat().st_mode
34 if path.is_file():
35 mode |= FILE_PERMISSION_MASK
36 elif path.is_dir():
37 mode |= DIR_PERMISSION_MASK
39 path.chmod(mode)
42def is_recursive_link(path: Path) -> bool:
43 try:
44 path.resolve()
45 except RuntimeError:
46 return True
47 return False
50def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
51 """Rewrites absolute symlinks to point within the extraction directory (outdir).
53 If it's not a relative symlink it is either removed it it attempts
54 to traverse outside of the extraction directory or rewritten to be
55 fully portable (no mention of the extraction directory in the link
56 value).
57 """
58 if is_recursive_link(path):
59 logger.error("Symlink loop identified, removing", path=path)
60 error_report = MaliciousSymlinkRemoved(
61 link=path.as_posix(), target=path.readlink().as_posix()
62 )
63 task_result.add_report(error_report)
64 path.unlink()
65 return path
67 raw_target = os.readlink(path) # noqa: PTH115
68 if not raw_target:
69 logger.error("Symlink with empty target, removing.")
70 path.unlink()
71 return path
73 target = Path(raw_target)
74 if target.is_absolute():
75 target = Path(target.as_posix().lstrip("/"))
76 else:
77 target = path.resolve()
79 safe = is_safe_path(outdir, target)
81 if not safe:
82 logger.error("Path traversal attempt through symlink, removing", target=target)
83 error_report = MaliciousSymlinkRemoved(
84 link=path.as_posix(), target=target.as_posix()
85 )
86 task_result.add_report(error_report)
87 path.unlink()
88 else:
89 relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
90 path.unlink()
91 path.symlink_to(relative_target)
92 return path
95def fix_extracted_directory(outdir: Path, task_result: TaskResult):
96 def _fix_extracted_directory(directory: Path):
97 if not directory.exists():
98 return
99 for path in directory.iterdir():
100 try:
101 fix_permission(path)
102 if path.is_symlink():
103 fix_symlink(path, outdir, task_result)
104 continue
105 if path.is_dir():
106 _fix_extracted_directory(path)
107 except OSError as e:
108 if e.errno == errno.ENAMETOOLONG:
109 continue
110 raise e from None
112 fix_permission(outdir)
113 _fix_extracted_directory(outdir)
116def carve_unknown_chunk(
117 extract_dir: Path, file: File, chunk: UnknownChunk | PaddingChunk
118) -> Path:
119 extension = "unknown"
120 if isinstance(chunk, PaddingChunk):
121 extension = "padding"
123 filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
124 carve_path = extract_dir / filename
125 logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
126 carve_chunk_to_file(carve_path, file, chunk)
127 return carve_path
130def carve_valid_chunk(extract_dir: Path, file: File, chunk: ValidChunk) -> Path:
131 filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}"
132 carve_path = extract_dir / filename
133 logger.info("Extracting valid chunk", path=carve_path, chunk=chunk)
134 carve_chunk_to_file(carve_path, file, chunk)
135 return carve_path