Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 75%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import abc
4import dataclasses
5import itertools
6import json
7from enum import Enum
8from pathlib import Path # noqa: TC003
9from typing import TYPE_CHECKING, Generic, TypeVar
11import attrs
12from pydantic import BaseModel, TypeAdapter, field_validator
13from structlog import get_logger
15from .file_utils import Endian, File, InvalidInputFormat, StructParser
16from .identifiers import new_id
17from .parser import hexstring2regex
18from .report import (
19 CarveDirectoryReport,
20 ChunkReport,
21 ErrorReport,
22 MultiFileReport,
23 RandomnessReport,
24 Report,
25 UnknownChunkReport,
26 validate_report_list,
27)
29if TYPE_CHECKING:
30 from collections.abc import Iterable
32__all__ = [
33 "Blob",
34 "Chunk",
35 "DExtractor",
36 "DirectoryExtractor",
37 "DirectoryHandler",
38 "DirectoryHandlers",
39 "DirectoryPattern",
40 "Endian",
41 "ExtractError",
42 "ExtractResult",
43 "Extractor",
44 "File",
45 "Glob",
46 "Handler",
47 "HandlerDoc",
48 "HandlerType",
49 "Handlers",
50 "HexString",
51 "InvalidInputFormat",
52 "MultiFile",
53 "PaddingChunk",
54 "Pattern",
55 "ProcessResult",
56 "Reference",
57 "Regex",
58 "ReportModel",
59 "ReportModelAdapter",
60 "SingleFile",
61 "StructHandler",
62 "StructParser",
63 "TExtractor",
64 "Task",
65 "TaskResult",
66 "UnknownChunk",
67 "ValidChunk",
68]
70logger = get_logger()
72# The state transitions are:
73#
74# file ──► pattern match ──► ValidChunk
75#
78class HandlerType(Enum):
79 ARCHIVE = "Archive"
80 COMPRESSION = "Compression"
81 FILESYSTEM = "FileSystem"
82 EXECUTABLE = "Executable"
83 BAREMETAL = "Baremetal"
84 BOOTLOADER = "Bootloader"
85 ENCRYPTION = "Encryption"
88@dataclasses.dataclass(frozen=True)
89class Reference:
90 title: str
91 url: str
94@dataclasses.dataclass
95class HandlerDoc:
96 name: str
97 description: str | None
98 vendor: str | None
99 references: list[Reference]
100 limitations: list[str]
101 handler_type: HandlerType
102 private: bool = False
103 fully_supported: bool = dataclasses.field(init=False)
105 def __post_init__(self):
106 self.fully_supported = len(self.limitations) == 0
109class Task(BaseModel):
110 path: Path
111 depth: int
112 blob_id: str
113 is_multi_file: bool = False
116@attrs.define
117class Blob:
118 id: str = attrs.field(
119 factory=new_id,
120 )
123@attrs.define
124class Chunk(Blob):
125 """File chunk, have start and end offset, but still can be invalid.
127 For an array ``b``, a chunk ``c`` represents the slice:
128 ::
130 b[c.start_offset:c.end_offset]
131 """
133 start_offset: int = attrs.field(kw_only=True)
134 """The index of the first byte of the chunk"""
136 end_offset: int = attrs.field(kw_only=True)
137 """The index of the first byte after the end of the chunk"""
139 file: File | None = None
141 def __attrs_post_init__(self):
142 if self.start_offset < 0 or self.end_offset < 0:
143 raise InvalidInputFormat(f"Chunk has negative offset: {self}")
144 if self.start_offset >= self.end_offset:
145 raise InvalidInputFormat(
146 f"Chunk has higher start_offset than end_offset: {self}"
147 )
149 @property
150 def size(self) -> int:
151 return self.end_offset - self.start_offset
153 @property
154 def range_hex(self) -> str:
155 return f"0x{self.start_offset:x}-0x{self.end_offset:x}"
157 @property
158 def is_whole_file(self):
159 assert self.file
160 return self.start_offset == 0 and self.end_offset == self.file.size()
162 def contains(self, other: Chunk) -> bool:
163 return (
164 self.start_offset < other.start_offset
165 and self.end_offset >= other.end_offset
166 ) or (
167 self.start_offset <= other.start_offset
168 and self.end_offset > other.end_offset
169 )
171 def contains_offset(self, offset: int) -> bool:
172 return self.start_offset <= offset < self.end_offset
174 def __repr__(self) -> str:
175 return self.range_hex
178@attrs.define(repr=False)
179class ValidChunk(Chunk):
180 """Known to be valid chunk of a File, can be extracted with an external program."""
182 handler: Handler = attrs.field(init=False, eq=False)
183 is_encrypted: bool = attrs.field(default=False)
185 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
186 if self.is_encrypted:
187 logger.warning(
188 "Encrypted file is not extracted",
189 path=inpath,
190 chunk=self,
191 )
192 raise ExtractError
194 return self.handler.extract(inpath, outdir)
196 def as_report(self, extraction_reports: list[Report]) -> ChunkReport:
197 return ChunkReport(
198 id=self.id,
199 start_offset=self.start_offset,
200 end_offset=self.end_offset,
201 size=self.size,
202 handler_name=self.handler.NAME,
203 is_encrypted=self.is_encrypted,
204 extraction_reports=extraction_reports,
205 )
208@attrs.define(repr=False)
209class UnknownChunk(Chunk):
210 r"""Gaps between valid chunks or otherwise unknown chunks.
212 Important for manual analysis, and analytical certainty: for example
213 randomness, other chunks inside it, metadata, etc.
215 These are not extracted, just logged for information purposes and further analysis,
216 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc.
217 """
219 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport:
220 return UnknownChunkReport(
221 id=self.id,
222 start_offset=self.start_offset,
223 end_offset=self.end_offset,
224 size=self.size,
225 randomness=randomness,
226 )
229@attrs.define(repr=False)
230class PaddingChunk(Chunk):
231 r"""Gaps between valid chunks or otherwise unknown chunks.
233 Important for manual analysis, and analytical certanity: for example
234 randomness, other chunks inside it, metadata, etc.
235 """
237 def as_report(
238 self,
239 randomness: RandomnessReport | None, # noqa: ARG002
240 ) -> ChunkReport:
241 return ChunkReport(
242 id=self.id,
243 start_offset=self.start_offset,
244 end_offset=self.end_offset,
245 size=self.size,
246 is_encrypted=False,
247 handler_name="padding",
248 extraction_reports=[],
249 )
252@attrs.define
253class MultiFile(Blob):
254 name: str = attrs.field(kw_only=True)
255 paths: list[Path] = attrs.field(kw_only=True)
257 handler: DirectoryHandler = attrs.field(init=False, eq=False)
259 def extract(self, outdir: Path) -> ExtractResult | None:
260 return self.handler.extract(self.paths, outdir)
262 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport:
263 return MultiFileReport(
264 id=self.id,
265 name=self.name,
266 paths=self.paths,
267 handler_name=self.handler.NAME,
268 extraction_reports=extraction_reports,
269 )
272ReportType = TypeVar("ReportType", bound=Report)
275class TaskResult(BaseModel):
276 task: Task
277 reports: list[Report] = []
278 subtasks: list[Task] = []
280 @field_validator("reports", mode="before")
281 @classmethod
282 def validate_reports(cls, value):
283 return validate_report_list(value)
285 def add_report(self, report: Report):
286 self.reports.append(report)
288 def add_subtask(self, task: Task):
289 self.subtasks.append(task)
291 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]:
292 return [report for report in self.reports if isinstance(report, report_class)]
295class ProcessResult(BaseModel):
296 results: list[TaskResult] = []
298 @property
299 def errors(self) -> list[ErrorReport]:
300 reports = itertools.chain.from_iterable(r.reports for r in self.results)
301 interesting_reports = (
302 r for r in reports if isinstance(r, ErrorReport | ChunkReport)
303 )
304 errors = []
305 for report in interesting_reports:
306 if isinstance(report, ErrorReport):
307 errors.append(report)
308 else:
309 errors.extend(
310 r for r in report.extraction_reports if isinstance(r, ErrorReport)
311 )
312 return errors
314 def register(self, result: TaskResult):
315 self.results.append(result)
317 def to_json(self, indent=" "):
318 return json.dumps(
319 [
320 result.model_dump(mode="json", serialize_as_any=True)
321 for result in self.results
322 ],
323 indent=indent,
324 )
326 def get_output_dir(self) -> Path | None:
327 try:
328 top_result = self.results[0]
329 if carves := top_result.filter_reports(CarveDirectoryReport):
330 # we have a top level carve
331 return carves[0].carve_dir
333 # we either have an extraction,
334 # and the extract directory registered as subtask
335 return top_result.subtasks[0].path
336 except IndexError:
337 # or no extraction
338 return None
341ReportModel = list[TaskResult]
342ReportModelAdapter = TypeAdapter(ReportModel)
343"""Use this for deserialization (import JSON report back into Python
344objects) of the JSON report.
346For example:
348with open('report.json', 'r') as f:
349 data = f.read()
350 report_data = ReportModelAdapter.validate_json(data)
352For another example see:
353tests/test_models.py::Test_to_json::test_process_result_deserialization
354"""
357class ExtractError(Exception):
358 """There was an error during extraction."""
360 def __init__(self, *reports: Report):
361 super().__init__()
362 self.reports: tuple[Report, ...] = reports
365@attrs.define(kw_only=True)
366class ExtractResult:
367 reports: list[Report]
370class Extractor(abc.ABC):
371 def get_dependencies(self) -> list[str]:
372 """Return the external command dependencies."""
373 return []
375 @abc.abstractmethod
376 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
377 """Extract the carved out chunk.
379 Raises ExtractError on failure.
380 """
383class DirectoryExtractor(abc.ABC):
384 def get_dependencies(self) -> list[str]:
385 """Return the external command dependencies."""
386 return []
388 @abc.abstractmethod
389 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
390 """Extract from a multi file path list.
392 Raises ExtractError on failure.
393 """
396class Pattern(str):
397 def as_regex(self) -> bytes:
398 raise NotImplementedError
401class HexString(Pattern):
402 """Hex string can be a YARA rule like hexadecimal string.
404 It is useful to simplify defining binary strings using hex
405 encoding, wild-cards, jumps and alternatives. Hexstrings are
406 convereted to hyperscan compatible PCRE regex.
408 See YARA & Hyperscan documentation for more details:
410 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings
412 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support
414 You can specify the following:
416 - normal bytes using hexadecimals: 01 de ad co de ff
418 - wild-cards can match single bytes and can be mixed with
419 normal hex: 01 ?? 02
421 - wild-cards can also match first and second nibles: 0? ?0
423 - jumps can be specified for multiple wildcard bytes: [3]
424 [2-5]
426 - alternatives can be specified as well: ( 01 02 | 03 04 ) The
427 above can be combined and alternatives nested: 01 02 ( 03 04
428 | (0? | 03 | ?0) | 05 ?? ) 06
430 Single line comments can be specified using //
432 We do NOT support the following YARA syntax:
434 - comments using /* */ notation
436 - infinite jumps: [-]
438 - unbounded jumps: [3-] or [-4] (use [0-4] instead)
439 """
441 def as_regex(self) -> bytes:
442 return hexstring2regex(self)
445class Regex(Pattern):
446 """Byte PCRE regex.
448 See hyperscan documentation for more details:
449 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support.
450 """
452 def as_regex(self) -> bytes:
453 return self.encode()
456class DirectoryPattern:
457 def get_files(self, directory: Path) -> Iterable[Path]:
458 raise NotImplementedError
461class Glob(DirectoryPattern):
462 def __init__(self, *patterns):
463 if not patterns:
464 raise ValueError("At least one pattern must be provided")
465 self._patterns = patterns
467 def get_files(self, directory: Path) -> Iterable[Path]:
468 for pattern in self._patterns:
469 yield from directory.glob(pattern)
472class SingleFile(DirectoryPattern):
473 def __init__(self, filename):
474 self._filename = filename
476 def get_files(self, directory: Path) -> Iterable[Path]:
477 path = directory / self._filename
478 return [path] if path.exists() else []
481DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor)
484class DirectoryHandler(abc.ABC, Generic[DExtractor]):
485 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""
487 NAME: str
489 EXTRACTOR: DExtractor
491 PATTERN: DirectoryPattern
493 DOC: HandlerDoc | None
495 @classmethod
496 def get_dependencies(cls):
497 """Return external command dependencies needed for this handler to work."""
498 if cls.EXTRACTOR is not None:
499 return cls.EXTRACTOR.get_dependencies()
500 return []
502 @abc.abstractmethod
503 def calculate_multifile(self, file: Path) -> MultiFile | None:
504 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point."""
506 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None:
507 if self.EXTRACTOR is None:
508 logger.debug("Skipping file: no extractor.", paths=paths)
509 raise ExtractError
511 # We only extract every blob once, it's a mistake to extract the same blob again
512 outdir.mkdir(parents=True, exist_ok=False)
514 return self.EXTRACTOR.extract(paths, outdir)
517TExtractor = TypeVar("TExtractor", bound=None | Extractor)
520class Handler(abc.ABC, Generic[TExtractor]):
521 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs."""
523 NAME: str
524 PATTERNS: list[Pattern]
525 # We need this, because not every match reflects the actual start
526 # (e.g. tar magic is in the middle of the header)
527 PATTERN_MATCH_OFFSET: int = 0
529 EXTRACTOR: TExtractor
531 DOC: HandlerDoc | None
533 @classmethod
534 def get_dependencies(cls):
535 """Return external command dependencies needed for this handler to work."""
536 if cls.EXTRACTOR is not None:
537 return cls.EXTRACTOR.get_dependencies()
538 return []
540 @abc.abstractmethod
541 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
542 """Calculate the Chunk offsets from the File and the file type headers."""
544 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None:
545 if self.EXTRACTOR is None:
546 logger.debug("Skipping file: no extractor.", path=inpath)
547 raise ExtractError
549 # We only extract every blob once, it's a mistake to extract the same blob again
550 outdir.mkdir(parents=True, exist_ok=False)
552 return self.EXTRACTOR.extract(inpath, outdir)
555class StructHandler(Handler):
556 C_DEFINITIONS: str
557 # A struct from the C_DEFINITIONS used to parse the file's header
558 HEADER_STRUCT: str
560 def __init__(self):
561 self._struct_parser = StructParser(self.C_DEFINITIONS)
563 @property
564 def cparser_le(self):
565 return self._struct_parser.cparser_le
567 @property
568 def cparser_be(self):
569 return self._struct_parser.cparser_be
571 def parse_header(self, file: File, endian=Endian.LITTLE):
572 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian)
573 logger.debug("Header parsed", header=header, _verbosity=3)
574 return header
577Handlers = tuple[type[Handler], ...]
578DirectoryHandlers = tuple[type[DirectoryHandler], ...]