Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 74%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4import dataclasses
5import itertools
6import json
7from enum import Enum
8from pathlib import Path # noqa: TC003
9from typing import TYPE_CHECKING, Generic, TypeVar
11import attrs
12from pydantic import BaseModel, TypeAdapter, field_validator
13from structlog import get_logger
15from .file_utils import Endian, File, InvalidInputFormat, StructParser
16from .identifiers import new_id
17from .parser import hexstring2regex
18from .report import (
19 CarveDirectoryReport,
20 ChunkReport,
21 ErrorReport,
22 MultiFileReport,
23 RandomnessReport,
24 Report,
25 UnknownChunkReport,
26 validate_report_list,
27)
29if TYPE_CHECKING:
30 from collections.abc import Iterable
32logger = get_logger()
34# The state transitions are:
35#
36# file ──► pattern match ──► ValidChunk
37#
40class HandlerType(Enum):
41 ARCHIVE = "Archive"
42 COMPRESSION = "Compression"
43 FILESYSTEM = "FileSystem"
44 EXECUTABLE = "Executable"
45 BAREMETAL = "Baremetal"
46 BOOTLOADER = "Bootloader"
47 ENCRYPTION = "Encryption"
50@dataclasses.dataclass(frozen=True)
51class Reference:
52 title: str
53 url: str
56@dataclasses.dataclass
57class HandlerDoc:
58 name: str
59 description: str | None
60 vendor: str | None
61 references: list[Reference]
62 limitations: list[str]
63 handler_type: HandlerType
64 private: bool = False
65 fully_supported: bool = dataclasses.field(init=False)
67 def __post_init__(self):
68 self.fully_supported = len(self.limitations) == 0
71class Task(BaseModel):
72 path: Path
73 depth: int
74 blob_id: str
75 is_multi_file: bool = False
78@attrs.define
79class Blob:
80 id: str = attrs.field(
81 factory=new_id,
82 )
85@attrs.define
86class Chunk(Blob):
87 """File chunk, have start and end offset, but still can be invalid.
89 For an array ``b``, a chunk ``c`` represents the slice:
90 ::
92 b[c.start_offset:c.end_offset]
93 """
95 start_offset: int = attrs.field(kw_only=True)
96 """The index of the first byte of the chunk"""
98 end_offset: int = attrs.field(kw_only=True)
99 """The index of the first byte after the end of the chunk"""
101 file: File | None = None
103 def __attrs_post_init__(self):
104 if self.start_offset < 0 or self.end_offset < 0:
105 raise InvalidInputFormat(f"Chunk has negative offset: {self}")
106 if self.start_offset >= self.end_offset:
107 raise InvalidInputFormat(
108 f"Chunk has higher start_offset than end_offset: {self}"
109 )
111 @property
112 def size(self) -> int:
113 return self.end_offset - self.start_offset
115 @property
116 def range_hex(self) -> str:
117 return f"0x{self.start_offset:x}-0x{self.end_offset:x}"
119 @property
120 def is_whole_file(self):
121 assert self.file
122 return self.start_offset == 0 and self.end_offset == self.file.size()
124 def contains(self, other: Chunk) -> bool:
125 return (
126 self.start_offset < other.start_offset
127 and self.end_offset >= other.end_offset
128 ) or (
129 self.start_offset <= other.start_offset
130 and self.end_offset > other.end_offset
131 )
133 def contains_offset(self, offset: int) -> bool:
134 return self.start_offset <= offset < self.end_offset
136 def __repr__(self) -> str:
137 return self.range_hex
140@attrs.define(repr=False)
141class ValidChunk(Chunk):
142 """Known to be valid chunk of a File, can be extracted with an external program."""
144 handler: Handler = attrs.field(init=False, eq=False)
145 is_encrypted: bool = attrs.field(default=False)
147 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
148 if self.is_encrypted:
149 logger.warning(
150 "Encrypted file is not extracted",
151 path=inpath,
152 chunk=self,
153 )
154 raise ExtractError
156 return self.handler.extract(inpath, outdir)
158 def as_report(self, extraction_reports: list[Report]) -> ChunkReport:
159 return ChunkReport(
160 id=self.id,
161 start_offset=self.start_offset,
162 end_offset=self.end_offset,
163 size=self.size,
164 handler_name=self.handler.NAME,
165 is_encrypted=self.is_encrypted,
166 extraction_reports=extraction_reports,
167 )
170@attrs.define(repr=False)
171class UnknownChunk(Chunk):
172 r"""Gaps between valid chunks or otherwise unknown chunks.
174 Important for manual analysis, and analytical certainty: for example
175 randomness, other chunks inside it, metadata, etc.
177 These are not extracted, just logged for information purposes and further analysis,
178 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc.
179 """
181 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport:
182 return UnknownChunkReport(
183 id=self.id,
184 start_offset=self.start_offset,
185 end_offset=self.end_offset,
186 size=self.size,
187 randomness=randomness,
188 )
191@attrs.define(repr=False)
192class PaddingChunk(Chunk):
193 r"""Gaps between valid chunks or otherwise unknown chunks.
195 Important for manual analysis, and analytical certanity: for example
196 randomness, other chunks inside it, metadata, etc.
197 """
199 def as_report(
200 self,
201 randomness: RandomnessReport | None, # noqa: ARG002
202 ) -> ChunkReport:
203 return ChunkReport(
204 id=self.id,
205 start_offset=self.start_offset,
206 end_offset=self.end_offset,
207 size=self.size,
208 is_encrypted=False,
209 handler_name="padding",
210 extraction_reports=[],
211 )
214@attrs.define
215class MultiFile(Blob):
216 name: str = attrs.field(kw_only=True)
217 paths: list[Path] = attrs.field(kw_only=True)
219 handler: DirectoryHandler = attrs.field(init=False, eq=False)
221 def extract(self, outdir: Path) -> ExtractResult | None:
222 return self.handler.extract(self.paths, outdir)
224 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport:
225 return MultiFileReport(
226 id=self.id,
227 name=self.name,
228 paths=self.paths,
229 handler_name=self.handler.NAME,
230 extraction_reports=extraction_reports,
231 )
234ReportType = TypeVar("ReportType", bound=Report)
237class TaskResult(BaseModel):
238 task: Task
239 reports: list[Report] = []
240 subtasks: list[Task] = []
242 @field_validator("reports", mode="before")
243 @classmethod
244 def validate_reports(cls, value):
245 return validate_report_list(value)
247 def add_report(self, report: Report):
248 self.reports.append(report)
250 def add_subtask(self, task: Task):
251 self.subtasks.append(task)
253 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]:
254 return [report for report in self.reports if isinstance(report, report_class)]
257class ProcessResult(BaseModel):
258 results: list[TaskResult] = []
260 @property
261 def errors(self) -> list[ErrorReport]:
262 reports = itertools.chain.from_iterable(r.reports for r in self.results)
263 interesting_reports = (
264 r for r in reports if isinstance(r, ErrorReport | ChunkReport)
265 )
266 errors = []
267 for report in interesting_reports:
268 if isinstance(report, ErrorReport):
269 errors.append(report)
270 else:
271 errors.extend(
272 r for r in report.extraction_reports if isinstance(r, ErrorReport)
273 )
274 return errors
276 def register(self, result: TaskResult):
277 self.results.append(result)
279 def to_json(self, indent=" "):
280 return json.dumps(
281 [
282 result.model_dump(mode="json", serialize_as_any=True)
283 for result in self.results
284 ],
285 indent=indent,
286 )
288 def get_output_dir(self) -> Path | None:
289 try:
290 top_result = self.results[0]
291 if carves := top_result.filter_reports(CarveDirectoryReport):
292 # we have a top level carve
293 return carves[0].carve_dir
295 # we either have an extraction,
296 # and the extract directory registered as subtask
297 return top_result.subtasks[0].path
298 except IndexError:
299 # or no extraction
300 return None
303ReportModel = list[TaskResult]
304ReportModelAdapter = TypeAdapter(ReportModel)
305"""Use this for deserialization (import JSON report back into Python
306objects) of the JSON report.
308For example:
310with open('report.json', 'r') as f:
311 data = f.read()
312 report_data = ReportModelAdapter.validate_json(data)
314For another example see:
315tests/test_models.py::Test_to_json::test_process_result_deserialization
316"""
319class ExtractError(Exception):
320 """There was an error during extraction."""
322 def __init__(self, *reports: Report):
323 super().__init__()
324 self.reports: tuple[Report, ...] = reports
327@attrs.define(kw_only=True)
328class ExtractResult:
329 reports: list[Report]
332class Extractor(abc.ABC):
333 def get_dependencies(self) -> list[str]:
334 """Return the external command dependencies."""
335 return []
337 @abc.abstractmethod
338 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
339 """Extract the carved out chunk.
341 Raises ExtractError on failure.
342 """
345class DirectoryExtractor(abc.ABC):
346 def get_dependencies(self) -> list[str]:
347 """Return the external command dependencies."""
348 return []
350 @abc.abstractmethod
351 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
352 """Extract from a multi file path list.
354 Raises ExtractError on failure.
355 """
358class Pattern(str):
359 def as_regex(self) -> bytes:
360 raise NotImplementedError
363class HexString(Pattern):
364 """Hex string can be a YARA rule like hexadecimal string.
366 It is useful to simplify defining binary strings using hex
367 encoding, wild-cards, jumps and alternatives. Hexstrings are
368 convereted to hyperscan compatible PCRE regex.
370 See YARA & Hyperscan documentation for more details:
372 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings
374 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support
376 You can specify the following:
378 - normal bytes using hexadecimals: 01 de ad co de ff
380 - wild-cards can match single bytes and can be mixed with
381 normal hex: 01 ?? 02
383 - wild-cards can also match first and second nibles: 0? ?0
385 - jumps can be specified for multiple wildcard bytes: [3]
386 [2-5]
388 - alternatives can be specified as well: ( 01 02 | 03 04 ) The
389 above can be combined and alternatives nested: 01 02 ( 03 04
390 | (0? | 03 | ?0) | 05 ?? ) 06
392 Single line comments can be specified using //
394 We do NOT support the following YARA syntax:
396 - comments using /* */ notation
398 - infinite jumps: [-]
400 - unbounded jumps: [3-] or [-4] (use [0-4] instead)
401 """
403 def as_regex(self) -> bytes:
404 return hexstring2regex(self)
407class Regex(Pattern):
408 """Byte PCRE regex.
410 See hyperscan documentation for more details:
411 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support.
412 """
414 def as_regex(self) -> bytes:
415 return self.encode()
418class DirectoryPattern:
419 def get_files(self, directory: Path) -> Iterable[Path]:
420 raise NotImplementedError
423class Glob(DirectoryPattern):
424 def __init__(self, *patterns):
425 if not patterns:
426 raise ValueError("At least one pattern must be provided")
427 self._patterns = patterns
429 def get_files(self, directory: Path) -> Iterable[Path]:
430 for pattern in self._patterns:
431 yield from directory.glob(pattern)
434class SingleFile(DirectoryPattern):
435 def __init__(self, filename):
436 self._filename = filename
438 def get_files(self, directory: Path) -> Iterable[Path]:
439 path = directory / self._filename
440 return [path] if path.exists() else []
443DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor)
446class DirectoryHandler(abc.ABC, Generic[DExtractor]):
447 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""
449 NAME: str
451 EXTRACTOR: DExtractor
453 PATTERN: DirectoryPattern
455 DOC: HandlerDoc | None
457 @classmethod
458 def get_dependencies(cls):
459 """Return external command dependencies needed for this handler to work."""
460 if cls.EXTRACTOR is not None:
461 return cls.EXTRACTOR.get_dependencies()
462 return []
464 @abc.abstractmethod
465 def calculate_multifile(self, file: Path) -> MultiFile | None:
466 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point."""
468 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
469 if self.EXTRACTOR is None:
470 logger.debug("Skipping file: no extractor.", paths=paths)
471 raise ExtractError
473 # We only extract every blob once, it's a mistake to extract the same blob again
474 outdir.mkdir(parents=True, exist_ok=False)
476 return self.EXTRACTOR.extract(paths, outdir)
479TExtractor = TypeVar("TExtractor", bound=None | Extractor)
482class Handler(abc.ABC, Generic[TExtractor]):
483 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs."""
485 NAME: str
486 PATTERNS: list[Pattern]
487 # We need this, because not every match reflects the actual start
488 # (e.g. tar magic is in the middle of the header)
489 PATTERN_MATCH_OFFSET: int = 0
491 EXTRACTOR: TExtractor
493 DOC: HandlerDoc | None
495 @classmethod
496 def get_dependencies(cls):
497 """Return external command dependencies needed for this handler to work."""
498 if cls.EXTRACTOR is not None:
499 return cls.EXTRACTOR.get_dependencies()
500 return []
502 @abc.abstractmethod
503 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
504 """Calculate the Chunk offsets from the File and the file type headers."""
506 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
507 if self.EXTRACTOR is None:
508 logger.debug("Skipping file: no extractor.", path=inpath)
509 raise ExtractError
511 # We only extract every blob once, it's a mistake to extract the same blob again
512 outdir.mkdir(parents=True, exist_ok=False)
514 return self.EXTRACTOR.extract(inpath, outdir)
517class StructHandler(Handler):
518 C_DEFINITIONS: str
519 # A struct from the C_DEFINITIONS used to parse the file's header
520 HEADER_STRUCT: str
522 def __init__(self):
523 self._struct_parser = StructParser(self.C_DEFINITIONS)
525 @property
526 def cparser_le(self):
527 return self._struct_parser.cparser_le
529 @property
530 def cparser_be(self):
531 return self._struct_parser.cparser_be
533 def parse_header(self, file: File, endian=Endian.LITTLE):
534 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian)
535 logger.debug("Header parsed", header=header, _verbosity=3)
536 return header
539Handlers = tuple[type[Handler], ...]
540DirectoryHandlers = tuple[type[DirectoryHandler], ...]