Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 74%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4import dataclasses
5import itertools
6import json
7from enum import Enum
8from pathlib import Path # noqa: TC003
9from typing import TYPE_CHECKING, Generic, TypeVar
11import attrs
12from pydantic import BaseModel, TypeAdapter, field_validator
13from structlog import get_logger
15from .file_utils import Endian, File, InvalidInputFormat, StructParser
16from .identifiers import new_id
17from .parser import hexstring2regex
18from .report import (
19 CarveDirectoryReport,
20 ChunkReport,
21 ErrorReport,
22 MultiFileReport,
23 RandomnessReport,
24 Report,
25 UnknownChunkReport,
26 validate_report_list,
27)
29if TYPE_CHECKING:
30 from collections.abc import Iterable
32logger = get_logger()
34# The state transitions are:
35#
36# file ──► pattern match ──► ValidChunk
37#
40class HandlerType(Enum):
41 ARCHIVE = "Archive"
42 COMPRESSION = "Compression"
43 FILESYSTEM = "FileSystem"
44 EXECUTABLE = "Executable"
45 BAREMETAL = "Baremetal"
46 BOOTLOADER = "Bootloader"
47 ENCRYPTION = "Encryption"
50@dataclasses.dataclass(frozen=True)
51class Reference:
52 title: str
53 url: str
56@dataclasses.dataclass
57class HandlerDoc:
58 name: str
59 description: str | None
60 vendor: str | None
61 references: list[Reference]
62 limitations: list[str]
63 handler_type: HandlerType
64 fully_supported: bool = dataclasses.field(init=False)
66 def __post_init__(self):
67 self.fully_supported = len(self.limitations) == 0
70class Task(BaseModel):
71 path: Path
72 depth: int
73 blob_id: str
74 is_multi_file: bool = False
77@attrs.define
78class Blob:
79 id: str = attrs.field(
80 factory=new_id,
81 )
84@attrs.define
85class Chunk(Blob):
86 """File chunk, have start and end offset, but still can be invalid.
88 For an array ``b``, a chunk ``c`` represents the slice:
89 ::
91 b[c.start_offset:c.end_offset]
92 """
94 start_offset: int = attrs.field(kw_only=True)
95 """The index of the first byte of the chunk"""
97 end_offset: int = attrs.field(kw_only=True)
98 """The index of the first byte after the end of the chunk"""
100 file: File | None = None
102 def __attrs_post_init__(self):
103 if self.start_offset < 0 or self.end_offset < 0:
104 raise InvalidInputFormat(f"Chunk has negative offset: {self}")
105 if self.start_offset >= self.end_offset:
106 raise InvalidInputFormat(
107 f"Chunk has higher start_offset than end_offset: {self}"
108 )
110 @property
111 def size(self) -> int:
112 return self.end_offset - self.start_offset
114 @property
115 def range_hex(self) -> str:
116 return f"0x{self.start_offset:x}-0x{self.end_offset:x}"
118 @property
119 def is_whole_file(self):
120 assert self.file
121 return self.start_offset == 0 and self.end_offset == self.file.size()
123 def contains(self, other: Chunk) -> bool:
124 return (
125 self.start_offset < other.start_offset
126 and self.end_offset >= other.end_offset
127 ) or (
128 self.start_offset <= other.start_offset
129 and self.end_offset > other.end_offset
130 )
132 def contains_offset(self, offset: int) -> bool:
133 return self.start_offset <= offset < self.end_offset
135 def __repr__(self) -> str:
136 return self.range_hex
139@attrs.define(repr=False)
140class ValidChunk(Chunk):
141 """Known to be valid chunk of a File, can be extracted with an external program."""
143 handler: Handler = attrs.field(init=False, eq=False)
144 is_encrypted: bool = attrs.field(default=False)
146 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
147 if self.is_encrypted:
148 logger.warning(
149 "Encrypted file is not extracted",
150 path=inpath,
151 chunk=self,
152 )
153 raise ExtractError
155 return self.handler.extract(inpath, outdir)
157 def as_report(self, extraction_reports: list[Report]) -> ChunkReport:
158 return ChunkReport(
159 id=self.id,
160 start_offset=self.start_offset,
161 end_offset=self.end_offset,
162 size=self.size,
163 handler_name=self.handler.NAME,
164 is_encrypted=self.is_encrypted,
165 extraction_reports=extraction_reports,
166 )
169@attrs.define(repr=False)
170class UnknownChunk(Chunk):
171 r"""Gaps between valid chunks or otherwise unknown chunks.
173 Important for manual analysis, and analytical certainty: for example
174 randomness, other chunks inside it, metadata, etc.
176 These are not extracted, just logged for information purposes and further analysis,
177 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc.
178 """
180 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport:
181 return UnknownChunkReport(
182 id=self.id,
183 start_offset=self.start_offset,
184 end_offset=self.end_offset,
185 size=self.size,
186 randomness=randomness,
187 )
190@attrs.define(repr=False)
191class PaddingChunk(Chunk):
192 r"""Gaps between valid chunks or otherwise unknown chunks.
194 Important for manual analysis, and analytical certanity: for example
195 randomness, other chunks inside it, metadata, etc.
196 """
198 def as_report(
199 self,
200 randomness: RandomnessReport | None, # noqa: ARG002
201 ) -> ChunkReport:
202 return ChunkReport(
203 id=self.id,
204 start_offset=self.start_offset,
205 end_offset=self.end_offset,
206 size=self.size,
207 is_encrypted=False,
208 handler_name="padding",
209 extraction_reports=[],
210 )
213@attrs.define
214class MultiFile(Blob):
215 name: str = attrs.field(kw_only=True)
216 paths: list[Path] = attrs.field(kw_only=True)
218 handler: DirectoryHandler = attrs.field(init=False, eq=False)
220 def extract(self, outdir: Path) -> ExtractResult | None:
221 return self.handler.extract(self.paths, outdir)
223 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport:
224 return MultiFileReport(
225 id=self.id,
226 name=self.name,
227 paths=self.paths,
228 handler_name=self.handler.NAME,
229 extraction_reports=extraction_reports,
230 )
233ReportType = TypeVar("ReportType", bound=Report)
236class TaskResult(BaseModel):
237 task: Task
238 reports: list[Report] = []
239 subtasks: list[Task] = []
241 @field_validator("reports", mode="before")
242 @classmethod
243 def validate_reports(cls, value):
244 return validate_report_list(value)
246 def add_report(self, report: Report):
247 self.reports.append(report)
249 def add_subtask(self, task: Task):
250 self.subtasks.append(task)
252 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]:
253 return [report for report in self.reports if isinstance(report, report_class)]
256class ProcessResult(BaseModel):
257 results: list[TaskResult] = []
259 @property
260 def errors(self) -> list[ErrorReport]:
261 reports = itertools.chain.from_iterable(r.reports for r in self.results)
262 interesting_reports = (
263 r for r in reports if isinstance(r, ErrorReport | ChunkReport)
264 )
265 errors = []
266 for report in interesting_reports:
267 if isinstance(report, ErrorReport):
268 errors.append(report)
269 else:
270 errors.extend(
271 r for r in report.extraction_reports if isinstance(r, ErrorReport)
272 )
273 return errors
275 def register(self, result: TaskResult):
276 self.results.append(result)
278 def to_json(self, indent=" "):
279 return json.dumps(
280 [
281 result.model_dump(mode="json", serialize_as_any=True)
282 for result in self.results
283 ],
284 indent=indent,
285 )
287 def get_output_dir(self) -> Path | None:
288 try:
289 top_result = self.results[0]
290 if carves := top_result.filter_reports(CarveDirectoryReport):
291 # we have a top level carve
292 return carves[0].carve_dir
294 # we either have an extraction,
295 # and the extract directory registered as subtask
296 return top_result.subtasks[0].path
297 except IndexError:
298 # or no extraction
299 return None
302ReportModel = list[TaskResult]
303ReportModelAdapter = TypeAdapter(ReportModel)
304"""Use this for deserialization (import JSON report back into Python
305objects) of the JSON report.
307For example:
309with open('report.json', 'r') as f:
310 data = f.read()
311 report_data = ReportModelAdapter.validate_json(data)
313For another example see:
314tests/test_models.py::Test_to_json::test_process_result_deserialization
315"""
318class ExtractError(Exception):
319 """There was an error during extraction."""
321 def __init__(self, *reports: Report):
322 super().__init__()
323 self.reports: tuple[Report, ...] = reports
326@attrs.define(kw_only=True)
327class ExtractResult:
328 reports: list[Report]
331class Extractor(abc.ABC):
332 def get_dependencies(self) -> list[str]:
333 """Return the external command dependencies."""
334 return []
336 @abc.abstractmethod
337 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
338 """Extract the carved out chunk.
340 Raises ExtractError on failure.
341 """
344class DirectoryExtractor(abc.ABC):
345 def get_dependencies(self) -> list[str]:
346 """Return the external command dependencies."""
347 return []
349 @abc.abstractmethod
350 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
351 """Extract from a multi file path list.
353 Raises ExtractError on failure.
354 """
357class Pattern(str):
358 def as_regex(self) -> bytes:
359 raise NotImplementedError
362class HexString(Pattern):
363 """Hex string can be a YARA rule like hexadecimal string.
365 It is useful to simplify defining binary strings using hex
366 encoding, wild-cards, jumps and alternatives. Hexstrings are
367 convereted to hyperscan compatible PCRE regex.
369 See YARA & Hyperscan documentation for more details:
371 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings
373 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support
375 You can specify the following:
377 - normal bytes using hexadecimals: 01 de ad co de ff
379 - wild-cards can match single bytes and can be mixed with
380 normal hex: 01 ?? 02
382 - wild-cards can also match first and second nibles: 0? ?0
384 - jumps can be specified for multiple wildcard bytes: [3]
385 [2-5]
387 - alternatives can be specified as well: ( 01 02 | 03 04 ) The
388 above can be combined and alternatives nested: 01 02 ( 03 04
389 | (0? | 03 | ?0) | 05 ?? ) 06
391 Single line comments can be specified using //
393 We do NOT support the following YARA syntax:
395 - comments using /* */ notation
397 - infinite jumps: [-]
399 - unbounded jumps: [3-] or [-4] (use [0-4] instead)
400 """
402 def as_regex(self) -> bytes:
403 return hexstring2regex(self)
406class Regex(Pattern):
407 """Byte PCRE regex.
409 See hyperscan documentation for more details:
410 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support.
411 """
413 def as_regex(self) -> bytes:
414 return self.encode()
417class DirectoryPattern:
418 def get_files(self, directory: Path) -> Iterable[Path]:
419 raise NotImplementedError
422class Glob(DirectoryPattern):
423 def __init__(self, *patterns):
424 if not patterns:
425 raise ValueError("At least one pattern must be provided")
426 self._patterns = patterns
428 def get_files(self, directory: Path) -> Iterable[Path]:
429 for pattern in self._patterns:
430 yield from directory.glob(pattern)
433class SingleFile(DirectoryPattern):
434 def __init__(self, filename):
435 self._filename = filename
437 def get_files(self, directory: Path) -> Iterable[Path]:
438 path = directory / self._filename
439 return [path] if path.exists() else []
442DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor)
445class DirectoryHandler(abc.ABC, Generic[DExtractor]):
446 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""
448 NAME: str
450 EXTRACTOR: DExtractor
452 PATTERN: DirectoryPattern
454 DOC: HandlerDoc | None
456 @classmethod
457 def get_dependencies(cls):
458 """Return external command dependencies needed for this handler to work."""
459 if cls.EXTRACTOR is not None:
460 return cls.EXTRACTOR.get_dependencies()
461 return []
463 @abc.abstractmethod
464 def calculate_multifile(self, file: Path) -> MultiFile | None:
465 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point."""
467 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
468 if self.EXTRACTOR is None:
469 logger.debug("Skipping file: no extractor.", paths=paths)
470 raise ExtractError
472 # We only extract every blob once, it's a mistake to extract the same blob again
473 outdir.mkdir(parents=True, exist_ok=False)
475 return self.EXTRACTOR.extract(paths, outdir)
478TExtractor = TypeVar("TExtractor", bound=None | Extractor)
481class Handler(abc.ABC, Generic[TExtractor]):
482 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs."""
484 NAME: str
485 PATTERNS: list[Pattern]
486 # We need this, because not every match reflects the actual start
487 # (e.g. tar magic is in the middle of the header)
488 PATTERN_MATCH_OFFSET: int = 0
490 EXTRACTOR: TExtractor
492 DOC: HandlerDoc | None
494 @classmethod
495 def get_dependencies(cls):
496 """Return external command dependencies needed for this handler to work."""
497 if cls.EXTRACTOR is not None:
498 return cls.EXTRACTOR.get_dependencies()
499 return []
501 @abc.abstractmethod
502 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
503 """Calculate the Chunk offsets from the File and the file type headers."""
505 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
506 if self.EXTRACTOR is None:
507 logger.debug("Skipping file: no extractor.", path=inpath)
508 raise ExtractError
510 # We only extract every blob once, it's a mistake to extract the same blob again
511 outdir.mkdir(parents=True, exist_ok=False)
513 return self.EXTRACTOR.extract(inpath, outdir)
516class StructHandler(Handler):
517 C_DEFINITIONS: str
518 # A struct from the C_DEFINITIONS used to parse the file's header
519 HEADER_STRUCT: str
521 def __init__(self):
522 self._struct_parser = StructParser(self.C_DEFINITIONS)
524 @property
525 def cparser_le(self):
526 return self._struct_parser.cparser_le
528 @property
529 def cparser_be(self):
530 return self._struct_parser.cparser_be
532 def parse_header(self, file: File, endian=Endian.LITTLE):
533 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian)
534 logger.debug("Header parsed", header=header, _verbosity=3)
535 return header
538Handlers = tuple[type[Handler], ...]
539DirectoryHandlers = tuple[type[DirectoryHandler], ...]