Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 72%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import base64
4import hashlib
5import stat
6import traceback
7from enum import Enum
8from pathlib import Path
9from typing import TYPE_CHECKING, Annotated, Any
11if TYPE_CHECKING:
12 from collections.abc import Iterable
14from pydantic import (
15 BaseModel,
16 BeforeValidator,
17 ConfigDict,
18 computed_field,
19 field_serializer,
20 field_validator,
21)
24def ensure_bytes(value: Any) -> bytes:
25 if isinstance(value, bytes):
26 return value
27 if isinstance(value, str):
28 return value.encode()
29 if value is None:
30 return b""
31 raise ValueError(f"Unsupported type in ensure_bytes: {type(value)}")
34class Report(BaseModel):
35 """A common base class for different reports. This will enable easy pydantic configuration of all models from a single point in the future if desired."""
37 @computed_field
38 @property
39 def __typename__(self) -> str:
40 return self.__class__.__name__
43class Severity(Enum):
44 """Represents possible problems encountered during execution."""
46 ERROR = "ERROR"
47 WARNING = "WARNING"
50class ErrorReport(Report):
51 severity: Severity
54class UnknownError(ErrorReport):
55 """Describes an exception raised during file processing."""
57 severity: Severity = Severity.ERROR
58 exception: str | Exception
60 model_config = ConfigDict(
61 arbitrary_types_allowed=True
62 ) # Necessary to support Exception type
64 def model_post_init(self, _: Any) -> None:
65 if isinstance(self.exception, Exception):
66 self.exception = "".join(
67 traceback.format_exception(
68 type(self.exception), self.exception, self.exception.__traceback__
69 )
70 )
72 """Exceptions are also formatted at construct time."""
75class CalculateChunkExceptionReport(UnknownError):
76 """Describes an exception raised during calculate_chunk execution."""
78 start_offset: int
79 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
80 handler: str
83class CalculateMultiFileExceptionReport(UnknownError):
84 """Describes an exception raised during calculate_chunk execution."""
86 path: Path
87 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
88 handler: str
91class ExtractCommandFailedReport(ErrorReport):
92 """Describes an error when failed to run the extraction command."""
94 severity: Severity = Severity.WARNING
95 command: str
96 stdout: Annotated[bytes, BeforeValidator(ensure_bytes)]
97 stderr: Annotated[bytes, BeforeValidator(ensure_bytes)]
98 exit_code: int
100 # Use base64 to encode and decode bytes data in case there are non-standard characters
101 @field_serializer("stdout", "stderr")
102 def encode_bytes(self, v: bytes, _):
103 return base64.b64encode(v).decode("ascii")
105 @field_validator("stdout", "stderr", mode="before")
106 @classmethod
107 def decode_bytes(cls, v: Any):
108 if isinstance(v, str):
109 return base64.b64decode(v)
110 return v
113class OutputDirectoryExistsReport(ErrorReport):
114 severity: Severity = Severity.ERROR
115 path: Path
118class ExtractorDependencyNotFoundReport(ErrorReport):
119 """Describes an error when the dependency of an extractor doesn't exist."""
121 severity: Severity = Severity.ERROR
122 dependencies: list[str]
125class ExtractorTimedOut(ErrorReport):
126 """Describes an error when the extractor execution timed out."""
128 severity: Severity = Severity.ERROR
129 cmd: str
130 timeout: float
133class MaliciousSymlinkRemoved(ErrorReport):
134 """Describes an error when malicious symlinks have been removed from disk."""
136 severity: Severity = Severity.WARNING
137 link: str
138 target: str
141class MultiFileCollisionReport(ErrorReport):
142 """Describes an error when MultiFiles collide on the same file."""
144 severity: Severity = Severity.ERROR
145 paths: set[Path]
146 handler: str
149class StatReport(Report):
150 path: Path
151 size: int
152 is_dir: bool
153 is_file: bool
154 is_link: bool
155 link_target: Path | None
157 @classmethod
158 def from_path(cls, path: Path):
159 st = path.lstat()
160 mode = st.st_mode
161 try:
162 link_target = Path.readlink(path)
163 except OSError:
164 link_target = None
166 return cls(
167 path=path,
168 size=st.st_size,
169 is_dir=stat.S_ISDIR(mode),
170 is_file=stat.S_ISREG(mode),
171 is_link=stat.S_ISLNK(mode),
172 link_target=link_target,
173 )
176class HashReport(Report):
177 md5: str
178 sha1: str
179 sha256: str
181 @classmethod
182 def from_path(cls, path: Path):
183 chunk_size = 1024 * 64
184 md5 = hashlib.md5(usedforsecurity=False)
185 sha1 = hashlib.sha1(usedforsecurity=False)
186 sha256 = hashlib.sha256()
188 with path.open("rb") as f:
189 while chunk := f.read(chunk_size):
190 md5.update(chunk)
191 sha1.update(chunk)
192 sha256.update(chunk)
194 return cls(
195 md5=md5.hexdigest(),
196 sha1=sha1.hexdigest(),
197 sha256=sha256.hexdigest(),
198 )
201class FileMagicReport(Report):
202 magic: str
203 mime_type: str
206class RandomnessMeasurements(BaseModel):
207 percentages: list[float]
208 block_size: int
209 mean: float
211 @property
212 def highest(self):
213 return max(self.percentages)
215 @property
216 def lowest(self):
217 return min(self.percentages)
220class RandomnessReport(Report):
221 shannon: RandomnessMeasurements
222 chi_square: RandomnessMeasurements
225class ChunkReport(Report):
226 id: str
227 handler_name: str
228 start_offset: int
229 end_offset: int
230 size: int
231 is_encrypted: bool
232 extraction_reports: list[Report]
234 @field_validator("extraction_reports", mode="before")
235 @classmethod
236 def validate_extraction_reports(cls, value: Any) -> list[Report]:
237 return validate_report_list(value)
240class UnknownChunkReport(Report):
241 id: str
242 start_offset: int
243 end_offset: int
244 size: int
245 randomness: RandomnessReport | None
247 @field_validator("randomness", mode="before")
248 @classmethod
249 def validate_randomness(cls, value: Any) -> RandomnessReport | None:
250 if value is None:
251 return None
252 parsed = parse_report(value)
253 if not isinstance(parsed, RandomnessReport):
254 raise TypeError("Randomness must be a RandomnessReport.")
255 return parsed
258class CarveDirectoryReport(Report):
259 carve_dir: Path
262class MultiFileReport(Report):
263 id: str
264 handler_name: str
265 name: str
266 paths: list[Path]
267 extraction_reports: list[Report]
269 @field_validator("extraction_reports", mode="before")
270 @classmethod
271 def validate_extraction_reports(cls, value: Any) -> list[Report]:
272 return validate_report_list(value)
275class ExtractedFileDeletedReport(Report):
276 path: Path
277 handler_name: str
280class ExtractionProblem(Report):
281 """A non-fatal problem discovered during extraction.
283 A report like this still means, that the extraction was successful,
284 but there were problems that got resolved.
285 The output is expected to be complete, with the exception of
286 the reported path.
288 Examples
289 --------
290 - duplicate entries for certain archive formats (tar, zip)
291 - unsafe symlinks pointing outside of extraction directory
293 """
295 problem: str
296 resolution: str
297 path: str | None = None
299 @property
300 def log_msg(self):
301 return f"{self.problem} {self.resolution}"
303 def log_with(self, logger):
304 logger.warning(self.log_msg, path=self.path)
307class PathTraversalProblem(ExtractionProblem):
308 extraction_path: str
310 def log_with(self, logger):
311 logger.warning(
312 self.log_msg,
313 path=self.path,
314 extraction_path=self.extraction_path,
315 )
318class LinkExtractionProblem(ExtractionProblem):
319 link_path: str
321 def log_with(self, logger):
322 logger.warning(self.log_msg, path=self.path, link_path=self.link_path)
325class SpecialFileExtractionProblem(ExtractionProblem):
326 mode: int
327 device: int
329 def log_with(self, logger):
330 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device)
333BUILTIN_REPORT_TYPES: tuple[type[Report], ...] = (
334 ErrorReport,
335 UnknownError,
336 CalculateChunkExceptionReport,
337 CalculateMultiFileExceptionReport,
338 ExtractCommandFailedReport,
339 OutputDirectoryExistsReport,
340 ExtractorDependencyNotFoundReport,
341 ExtractedFileDeletedReport,
342 ExtractorTimedOut,
343 MaliciousSymlinkRemoved,
344 MultiFileCollisionReport,
345 StatReport,
346 HashReport,
347 FileMagicReport,
348 RandomnessReport,
349 ChunkReport,
350 UnknownChunkReport,
351 CarveDirectoryReport,
352 MultiFileReport,
353 ExtractionProblem,
354 PathTraversalProblem,
355 LinkExtractionProblem,
356 SpecialFileExtractionProblem,
357)
359_REPORT_REGISTRY: dict[str, type[Report]] = {}
362def register_report_type(report_type: type[Report]) -> None:
363 typename = report_type.__name__
364 existing = _REPORT_REGISTRY.get(typename)
365 if existing is not None and existing is not report_type:
366 raise ValueError(f"Report type name conflict: {typename}")
367 _REPORT_REGISTRY[typename] = report_type
370def register_report_types(report_types: Iterable[type[Report]]) -> None:
371 for report_type in report_types:
372 register_report_type(report_type)
375def get_report_type(typename: str) -> type[Report] | None:
376 return _REPORT_REGISTRY.get(typename)
379def parse_report(report: Report | dict[str, Any]) -> Report:
380 if isinstance(report, Report):
381 return report
382 if not isinstance(report, dict):
383 raise TypeError("Report data must be a mapping or Report instance.")
384 typename = report.get("__typename__")
385 if not typename:
386 raise ValueError("Report data is missing __typename__.")
387 report_type = get_report_type(typename)
388 if report_type is None:
389 raise ValueError(f"Unknown report type: {typename}")
390 return report_type.model_validate(report)
393def validate_report_list(value: Any) -> list[Report]:
394 if not isinstance(value, list):
395 raise TypeError("Report list must be a list.")
396 return [parse_report(item) for item in value]
399register_report_types(BUILTIN_REPORT_TYPES)