1"""File extraction related functions."""
2
3import errno
4import os
5from pathlib import Path
6from typing import Union
7
8from structlog import get_logger
9
10from .file_utils import carve, is_safe_path
11from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
12from .report import MaliciousSymlinkRemoved
13
14logger = get_logger()
15
16FILE_PERMISSION_MASK = 0o644
17DIR_PERMISSION_MASK = 0o775
18
19
20def carve_chunk_to_file(carve_path: Path, file: File, chunk: Chunk):
21 """Extract valid chunk to a file, which we then pass to another tool to extract it."""
22 logger.debug("Carving chunk", path=carve_path)
23 carve(carve_path, file, chunk.start_offset, chunk.size)
24
25
26def fix_permission(path: Path):
27 if path.is_symlink():
28 return
29
30 if not path.exists():
31 return
32
33 mode = path.stat().st_mode
34
35 if path.is_file():
36 mode |= FILE_PERMISSION_MASK
37 elif path.is_dir():
38 mode |= DIR_PERMISSION_MASK
39
40 path.chmod(mode)
41
42
43def is_recursive_link(path: Path) -> bool:
44 try:
45 path.resolve()
46 except RuntimeError:
47 return True
48 return False
49
50
51def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
52 """Rewrites absolute symlinks to point within the extraction directory (outdir).
53
54 If it's not a relative symlink it is either removed it it attempts
55 to traverse outside of the extraction directory or rewritten to be
56 fully portable (no mention of the extraction directory in the link
57 value).
58 """
59 if is_recursive_link(path):
60 logger.error("Symlink loop identified, removing", path=path)
61 error_report = MaliciousSymlinkRemoved(
62 link=path.as_posix(), target=path.readlink().as_posix()
63 )
64 task_result.add_report(error_report)
65 path.unlink()
66 return path
67
68 raw_target = os.readlink(path) # noqa: PTH115
69 if not raw_target:
70 logger.error("Symlink with empty target, removing.")
71 path.unlink()
72 return path
73
74 target = Path(raw_target)
75 if target.is_absolute():
76 target = Path(target.as_posix().lstrip("/"))
77 else:
78 target = path.resolve()
79
80 safe = is_safe_path(outdir, target)
81
82 if not safe:
83 logger.error("Path traversal attempt through symlink, removing", target=target)
84 error_report = MaliciousSymlinkRemoved(
85 link=path.as_posix(), target=target.as_posix()
86 )
87 task_result.add_report(error_report)
88 path.unlink()
89 else:
90 relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
91 path.unlink()
92 path.symlink_to(relative_target)
93 return path
94
95
96def fix_extracted_directory(outdir: Path, task_result: TaskResult):
97 def _fix_extracted_directory(directory: Path):
98 if not directory.exists():
99 return
100 for path in directory.iterdir():
101 try:
102 fix_permission(path)
103 if path.is_symlink():
104 fix_symlink(path, outdir, task_result)
105 continue
106 if path.is_dir():
107 _fix_extracted_directory(path)
108 except OSError as e:
109 if e.errno == errno.ENAMETOOLONG:
110 continue
111 raise e from None
112
113 fix_permission(outdir)
114 _fix_extracted_directory(outdir)
115
116
117def carve_unknown_chunk(
118 extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
119) -> Path:
120 extension = "unknown"
121 if isinstance(chunk, PaddingChunk):
122 extension = "padding"
123
124 filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
125 carve_path = extract_dir / filename
126 logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
127 carve_chunk_to_file(carve_path, file, chunk)
128 return carve_path
129
130
131def carve_valid_chunk(extract_dir: Path, file: File, chunk: ValidChunk) -> Path:
132 filename = f"{chunk.start_offset}-{chunk.end_offset}.{chunk.handler.NAME}"
133 carve_path = extract_dir / filename
134 logger.info("Extracting valid chunk", path=carve_path, chunk=chunk)
135 carve_chunk_to_file(carve_path, file, chunk)
136 return carve_path