Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 86%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import hashlib
2import stat
3import traceback
4from enum import Enum
5from pathlib import Path
6from typing import Optional, Union, final
8import attrs
11@attrs.define(kw_only=True, frozen=True)
12class Report:
13 """A common base class for different reports."""
15 def __attrs_post_init__(self):
16 for field in attrs.fields(type(self)):
17 value = getattr(self, field.name)
18 if isinstance(value, int):
19 object.__setattr__(self, field.name, int(value))
21 def asdict(self) -> dict:
22 return attrs.asdict(self)
25class Severity(Enum):
26 """Represents possible problems encountered during execution."""
28 ERROR = "ERROR"
29 WARNING = "WARNING"
32@attrs.define(kw_only=True, frozen=True)
33class ErrorReport(Report):
34 severity: Severity
37def _convert_exception_to_str(obj: Union[str, Exception]) -> str:
38 if isinstance(obj, str):
39 return obj
40 if isinstance(obj, Exception):
41 e: Exception = obj
42 return "".join(traceback.format_exception(type(e), e, e.__traceback__))
43 raise ValueError("Invalid exception object", obj)
46@attrs.define(kw_only=True, frozen=True)
47class UnknownError(ErrorReport):
48 """Describes an exception raised during file processing."""
50 severity: Severity = attrs.field(default=Severity.ERROR)
51 exception: Union[str, Exception] = attrs.field( # pyright: ignore[reportGeneralTypeIssues]
52 converter=_convert_exception_to_str
53 )
54 """Exceptions are also formatted at construct time.
56 `attrs` is not integrated enough with type checker/LSP provider `pyright` to include converters.
58 See: https://www.attrs.org/en/stable/types.html#pyright
59 """
62@attrs.define(kw_only=True, frozen=True)
63class CalculateChunkExceptionReport(UnknownError):
64 """Describes an exception raised during calculate_chunk execution."""
66 start_offset: int
67 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
68 handler: str
71@attrs.define(kw_only=True, frozen=True)
72class CalculateMultiFileExceptionReport(UnknownError):
73 """Describes an exception raised during calculate_chunk execution."""
75 path: Path
76 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
77 handler: str
80@attrs.define(kw_only=True, frozen=True)
81class ExtractCommandFailedReport(ErrorReport):
82 """Describes an error when failed to run the extraction command."""
84 severity: Severity = Severity.WARNING
85 command: str
86 stdout: bytes
87 stderr: bytes
88 exit_code: int
91@attrs.define(kw_only=True, frozen=True)
92class OutputDirectoryExistsReport(ErrorReport):
93 severity: Severity = Severity.ERROR
94 path: Path
97@attrs.define(kw_only=True, frozen=True)
98class ExtractorDependencyNotFoundReport(ErrorReport):
99 """Describes an error when the dependency of an extractor doesn't exist."""
101 severity: Severity = Severity.ERROR
102 dependencies: list[str]
105@attrs.define(kw_only=True, frozen=True)
106class ExtractorTimedOut(ErrorReport):
107 """Describes an error when the extractor execution timed out."""
109 severity: Severity = Severity.ERROR
110 cmd: str
111 timeout: float
114@attrs.define(kw_only=True, frozen=True)
115class MaliciousSymlinkRemoved(ErrorReport):
116 """Describes an error when malicious symlinks have been removed from disk."""
118 severity: Severity = Severity.WARNING
119 link: str
120 target: str
123@attrs.define(kw_only=True, frozen=True)
124class MultiFileCollisionReport(ErrorReport):
125 """Describes an error when MultiFiles collide on the same file."""
127 severity: Severity = Severity.ERROR
128 paths: set[Path]
129 handler: str
132@attrs.define(kw_only=True, frozen=True)
133class StatReport(Report):
134 path: Path
135 size: int
136 is_dir: bool
137 is_file: bool
138 is_link: bool
139 link_target: Optional[Path]
141 @classmethod
142 def from_path(cls, path: Path):
143 st = path.lstat()
144 mode = st.st_mode
145 try:
146 link_target = Path.readlink(path)
147 except OSError:
148 link_target = None
150 return cls(
151 path=path,
152 size=st.st_size,
153 is_dir=stat.S_ISDIR(mode),
154 is_file=stat.S_ISREG(mode),
155 is_link=stat.S_ISLNK(mode),
156 link_target=link_target,
157 )
160@attrs.define(kw_only=True, frozen=True)
161class HashReport(Report):
162 md5: str
163 sha1: str
164 sha256: str
166 @classmethod
167 def from_path(cls, path: Path):
168 chunk_size = 1024 * 64
169 md5 = hashlib.md5() # noqa: S324
170 sha1 = hashlib.sha1() # noqa: S324
171 sha256 = hashlib.sha256()
173 with path.open("rb") as f:
174 while chunk := f.read(chunk_size):
175 md5.update(chunk)
176 sha1.update(chunk)
177 sha256.update(chunk)
179 return cls(
180 md5=md5.hexdigest(),
181 sha1=sha1.hexdigest(),
182 sha256=sha256.hexdigest(),
183 )
186@attrs.define(kw_only=True, frozen=True)
187class FileMagicReport(Report):
188 magic: str
189 mime_type: str
192@attrs.define(kw_only=True, frozen=True)
193class RandomnessMeasurements:
194 percentages: list[float]
195 block_size: int
196 mean: float
198 @property
199 def highest(self):
200 return max(self.percentages)
202 @property
203 def lowest(self):
204 return min(self.percentages)
207@attrs.define(kw_only=True, frozen=True)
208class RandomnessReport(Report):
209 shannon: RandomnessMeasurements
210 chi_square: RandomnessMeasurements
213@final
214@attrs.define(kw_only=True, frozen=True)
215class ChunkReport(Report):
216 id: str
217 handler_name: str
218 start_offset: int
219 end_offset: int
220 size: int
221 is_encrypted: bool
222 extraction_reports: list[Report]
225@final
226@attrs.define(kw_only=True, frozen=True)
227class UnknownChunkReport(Report):
228 id: str
229 start_offset: int
230 end_offset: int
231 size: int
232 randomness: Optional[RandomnessReport]
235@attrs.define(kw_only=True, frozen=True)
236class CarveDirectoryReport(Report):
237 carve_dir: Path
240@final
241@attrs.define(kw_only=True, frozen=True)
242class MultiFileReport(Report):
243 id: str
244 handler_name: str
245 name: str
246 paths: list[Path]
247 extraction_reports: list[Report]
250@attrs.define(kw_only=True, frozen=True)
251class ExtractionProblem(Report):
252 """A non-fatal problem discovered during extraction.
254 A report like this still means, that the extraction was successful,
255 but there were problems that got resolved.
256 The output is expected to be complete, with the exception of
257 the reported path.
259 Examples
260 --------
261 - duplicate entries for certain archive formats (tar, zip)
262 - unsafe symlinks pointing outside of extraction directory
264 """
266 problem: str
267 resolution: str
268 path: Optional[str] = None
270 @property
271 def log_msg(self):
272 return f"{self.problem} {self.resolution}"
274 def log_with(self, logger):
275 logger.warning(self.log_msg, path=self.path)
278@attrs.define(kw_only=True, frozen=True)
279class PathTraversalProblem(ExtractionProblem):
280 extraction_path: str
282 def log_with(self, logger):
283 logger.warning(
284 self.log_msg,
285 path=self.path,
286 extraction_path=self.extraction_path,
287 )
290@attrs.define(kw_only=True, frozen=True)
291class LinkExtractionProblem(ExtractionProblem):
292 link_path: str
294 def log_with(self, logger):
295 logger.warning(self.log_msg, path=self.path, link_path=self.link_path)
298@attrs.define(kw_only=True, frozen=True)
299class SpecialFileExtractionProblem(ExtractionProblem):
300 mode: int
301 device: int
303 def log_with(self, logger):
304 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device)