Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/extractor.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""File extraction related functions."""
3import errno
4import os
5from pathlib import Path
6from typing import Union
8from structlog import get_logger
10from .file_utils import carve, is_safe_path
11from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
12from .report import MaliciousSymlinkRemoved
14logger = get_logger()
16FILE_PERMISSION_MASK = 0o644
17DIR_PERMISSION_MASK = 0o775
20def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk):
21 """Extract valid chunk to a file, which we then pass to another tool to extract it."""
22 logger.debug("Carving chunk", path=carve_path)
23 carve(carve_path, file, chunk.start_offset, chunk.size)
26def fix_permission(path: Path):
27 if path.is_symlink():
28 return
30 if not path.exists():
31 return
33 mode = path.stat().st_mode
35 if path.is_file():
36 mode |= FILE_PERMISSION_MASK
37 elif path.is_dir():
38 mode |= DIR_PERMISSION_MASK
40 path.chmod(mode)
43def is_recursive_link(path: Path) -> bool:
44 try:
45 path.resolve()
46 except RuntimeError:
47 return True
48 return False
51def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
52 """Rewrites absolute symlinks to point within the extraction directory (outdir).
54 If it's not a relative symlink it is either removed it it attempts
55 to traverse outside of the extraction directory or rewritten to be
56 fully portable (no mention of the extraction directory in the link
57 value).
58 """
59 if is_recursive_link(path):
60 logger.error("Symlink loop identified, removing", path=path)
61 error_report = MaliciousSymlinkRemoved(
62 link=path.as_posix(), target=path.readlink().as_posix()
63 )
64 task_result.add_report(error_report)
65 path.unlink()
66 return path
68 raw_target = os.readlink(path) # noqa: PTH115
69 if not raw_target:
70 logger.error("Symlink with empty target, removing.")
71 path.unlink()
72 return path
74 target = Path(raw_target)
75 if target.is_absolute():
76 target = Path(target.as_posix().lstrip("/"))
77 else:
78 target = path.resolve()
80 safe = is_safe_path(outdir, target)
82 if not safe:
83 logger.error("Path traversal attempt through symlink, removing", target=target)
84 error_report = MaliciousSymlinkRemoved(
85 link=path.as_posix(), target=target.as_posix()
86 )
87 task_result.add_report(error_report)
88 path.unlink()
89 else:
90 relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
91 path.unlink()
92 path.symlink_to(relative_target)
93 return path
96def fix_extracted_directory(outdir: Path, task_result: TaskResult):
97 def _fix_extracted_directory(directory: Path):
98 if not directory.exists():
99 return
100 for path in directory.iterdir():
101 try:
102 fix_permission(path)
103 if path.is_symlink():
104 fix_symlink(path, outdir, task_result)
105 continue
106 if path.is_dir():
107 _fix_extracted_directory(path)
108 except OSError as e:
109 if e.errno == errno.ENAMETOOLONG:
110 continue
111 raise e from None
113 fix_permission(outdir)
114 _fix_extracted_directory(outdir)
117def carve_unknown_chunk(
118 extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
119) -> Path:
120 extension = "unknown"
121 if isinstance(chunk, PaddingChunk):
122 extension = "padding"
124 filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
125 carve_path = extract_dir / filename
126 logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
127 carve_chunk_to_file(carve_path, file, chunk)
128 return carve_path
131def carve_valid_chunk(extract_dir: Path, file: File, chunk: ValidChunk) -> Path:
132 filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}"
133 carve_path = extract_dir / filename
134 logger.info("Extracting valid chunk", path=carve_path, chunk=chunk)
135 carve_chunk_to_file(carve_path, file, chunk)
136 return carve_path