Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/report.py: 80%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# ruff: noqa: UP007,UP045
3from __future__ import annotations
5import base64
6import hashlib
7import stat
8import traceback
9from enum import Enum
10from pathlib import Path
11from typing import Annotated, Any, Optional, Union
13from pydantic import (
14 BaseModel,
15 ConfigDict,
16 Discriminator,
17 Tag,
18 computed_field,
19 field_serializer,
20 field_validator,
21)
24class ReportBase(BaseModel):
25 """A common base class for different reports. This will enable easy pydantic configuration of all models from a single point in the future if desired."""
27 @computed_field
28 @property
29 def __typename__(self) -> str:
30 return self.__class__.__name__
33class Severity(Enum):
34 """Represents possible problems encountered during execution."""
36 ERROR = "ERROR"
37 WARNING = "WARNING"
40class ErrorReport(ReportBase):
41 severity: Severity
44class UnknownError(ErrorReport):
45 """Describes an exception raised during file processing."""
47 severity: Severity = Severity.ERROR
48 exception: Union[str, Exception]
50 model_config = ConfigDict(
51 arbitrary_types_allowed=True
52 ) # Necessary to support Exception type
54 def model_post_init(self, _: Any) -> None:
55 if isinstance(self.exception, Exception):
56 self.exception = "".join(
57 traceback.format_exception(
58 type(self.exception), self.exception, self.exception.__traceback__
59 )
60 )
62 """Exceptions are also formatted at construct time."""
65class CalculateChunkExceptionReport(UnknownError):
66 """Describes an exception raised during calculate_chunk execution."""
68 start_offset: int
69 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
70 handler: str
73class CalculateMultiFileExceptionReport(UnknownError):
74 """Describes an exception raised during calculate_chunk execution."""
76 path: Path
77 # Stored in `str` rather than `Handler`, because the pickle picks ups structs from `C_DEFINITIONS`
78 handler: str
81class ExtractCommandFailedReport(ErrorReport):
82 """Describes an error when failed to run the extraction command."""
84 severity: Severity = Severity.WARNING
85 command: str
86 stdout: bytes
87 stderr: bytes
88 exit_code: int
90 # Use base64 to encode and decode bytes data in case there are non-standard characters
91 @field_serializer("stdout", "stderr")
92 def encode_bytes(self, v: bytes, _):
93 return base64.b64encode(v).decode("ascii")
95 @field_validator("stdout", "stderr", mode="before")
96 @classmethod
97 def decode_bytes(cls, v: Any):
98 if isinstance(v, str):
99 return base64.b64decode(v)
100 return v
103class OutputDirectoryExistsReport(ErrorReport):
104 severity: Severity = Severity.ERROR
105 path: Path
108class ExtractorDependencyNotFoundReport(ErrorReport):
109 """Describes an error when the dependency of an extractor doesn't exist."""
111 severity: Severity = Severity.ERROR
112 dependencies: list[str]
115class ExtractorTimedOut(ErrorReport):
116 """Describes an error when the extractor execution timed out."""
118 severity: Severity = Severity.ERROR
119 cmd: str
120 timeout: float
123class MaliciousSymlinkRemoved(ErrorReport):
124 """Describes an error when malicious symlinks have been removed from disk."""
126 severity: Severity = Severity.WARNING
127 link: str
128 target: str
131class MultiFileCollisionReport(ErrorReport):
132 """Describes an error when MultiFiles collide on the same file."""
134 severity: Severity = Severity.ERROR
135 paths: set[Path]
136 handler: str
139class StatReport(ReportBase):
140 path: Path
141 size: int
142 is_dir: bool
143 is_file: bool
144 is_link: bool
145 link_target: Optional[Path]
147 @classmethod
148 def from_path(cls, path: Path):
149 st = path.lstat()
150 mode = st.st_mode
151 try:
152 link_target = Path.readlink(path)
153 except OSError:
154 link_target = None
156 return cls(
157 path=path,
158 size=st.st_size,
159 is_dir=stat.S_ISDIR(mode),
160 is_file=stat.S_ISREG(mode),
161 is_link=stat.S_ISLNK(mode),
162 link_target=link_target,
163 )
166class HashReport(ReportBase):
167 md5: str
168 sha1: str
169 sha256: str
171 @classmethod
172 def from_path(cls, path: Path):
173 chunk_size = 1024 * 64
174 md5 = hashlib.md5() # noqa: S324
175 sha1 = hashlib.sha1() # noqa: S324
176 sha256 = hashlib.sha256()
178 with path.open("rb") as f:
179 while chunk := f.read(chunk_size):
180 md5.update(chunk)
181 sha1.update(chunk)
182 sha256.update(chunk)
184 return cls(
185 md5=md5.hexdigest(),
186 sha1=sha1.hexdigest(),
187 sha256=sha256.hexdigest(),
188 )
191class FileMagicReport(ReportBase):
192 magic: str
193 mime_type: str
196class RandomnessMeasurements(BaseModel):
197 percentages: list[float]
198 block_size: int
199 mean: float
201 @property
202 def highest(self):
203 return max(self.percentages)
205 @property
206 def lowest(self):
207 return min(self.percentages)
210class RandomnessReport(ReportBase):
211 shannon: RandomnessMeasurements
212 chi_square: RandomnessMeasurements
215class ChunkReport(ReportBase):
216 id: str
217 handler_name: str
218 start_offset: int
219 end_offset: int
220 size: int
221 is_encrypted: bool
222 extraction_reports: list[Report]
225class UnknownChunkReport(ReportBase):
226 id: str
227 start_offset: int
228 end_offset: int
229 size: int
230 randomness: Optional[RandomnessReport]
233class CarveDirectoryReport(ReportBase):
234 carve_dir: Path
237class MultiFileReport(ReportBase):
238 id: str
239 handler_name: str
240 name: str
241 paths: list[Path]
242 extraction_reports: list[Report]
245class ExtractionProblem(ReportBase):
246 """A non-fatal problem discovered during extraction.
248 A report like this still means, that the extraction was successful,
249 but there were problems that got resolved.
250 The output is expected to be complete, with the exception of
251 the reported path.
253 Examples
254 --------
255 - duplicate entries for certain archive formats (tar, zip)
256 - unsafe symlinks pointing outside of extraction directory
258 """
260 problem: str
261 resolution: str
262 path: Optional[str] = None
264 @property
265 def log_msg(self):
266 return f"{self.problem} {self.resolution}"
268 def log_with(self, logger):
269 logger.warning(self.log_msg, path=self.path)
272class PathTraversalProblem(ExtractionProblem):
273 extraction_path: str
275 def log_with(self, logger):
276 logger.warning(
277 self.log_msg,
278 path=self.path,
279 extraction_path=self.extraction_path,
280 )
283class LinkExtractionProblem(ExtractionProblem):
284 link_path: str
286 def log_with(self, logger):
287 logger.warning(self.log_msg, path=self.path, link_path=self.link_path)
290class SpecialFileExtractionProblem(ExtractionProblem):
291 mode: int
292 device: int
294 def log_with(self, logger):
295 logger.warning(self.log_msg, path=self.path, mode=self.mode, device=self.device)
298def _get_report_type(report: dict | ReportBase):
299 if isinstance(report, dict):
300 return report.get("__typename__")
301 return report.__typename__
304Report = Annotated[
305 Union[
306 Annotated[ErrorReport, Tag("ErrorReport")],
307 Annotated[UnknownError, Tag("UnknownError")],
308 Annotated[CalculateChunkExceptionReport, Tag("CalculateChunkExceptionReport")],
309 Annotated[
310 CalculateMultiFileExceptionReport, Tag("CalculateMultiFileExceptionReport")
311 ],
312 Annotated[ExtractCommandFailedReport, Tag("ExtractCommandFailedReport")],
313 Annotated[OutputDirectoryExistsReport, Tag("OutputDirectoryExistsReport")],
314 Annotated[
315 ExtractorDependencyNotFoundReport, Tag("ExtractorDependencyNotFoundReport")
316 ],
317 Annotated[ExtractorTimedOut, Tag("ExtractorTimedOut")],
318 Annotated[MaliciousSymlinkRemoved, Tag("MaliciousSymlinkRemoved")],
319 Annotated[MultiFileCollisionReport, Tag("MultiFileCollisionReport")],
320 Annotated[StatReport, Tag("StatReport")],
321 Annotated[HashReport, Tag("HashReport")],
322 Annotated[FileMagicReport, Tag("FileMagicReport")],
323 Annotated[RandomnessReport, Tag("RandomnessReport")],
324 Annotated[ChunkReport, Tag("ChunkReport")],
325 Annotated[UnknownChunkReport, Tag("UnknownChunkReport")],
326 Annotated[CarveDirectoryReport, Tag("CarveDirectoryReport")],
327 Annotated[MultiFileReport, Tag("MultiFileReport")],
328 Annotated[ExtractionProblem, Tag("ExtractionProblem")],
329 Annotated[PathTraversalProblem, Tag("PathTraversalProblem")],
330 Annotated[LinkExtractionProblem, Tag("LinkExtractionProblem")],
331 Annotated[SpecialFileExtractionProblem, Tag("SpecialFileExtractionProblem")],
332 ],
333 Discriminator(_get_report_type),
334]