Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 75%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import dataclasses
3import itertools
4import json
5from collections.abc import Iterable
6from enum import Enum
7from pathlib import Path
8from typing import Generic, Optional, TypeVar, Union
10import attrs
11from pydantic import BaseModel, TypeAdapter
12from structlog import get_logger
14from .file_utils import Endian, File, InvalidInputFormat, StructParser
15from .identifiers import new_id
16from .parser import hexstring2regex
17from .report import (
18 CarveDirectoryReport,
19 ChunkReport,
20 ErrorReport,
21 MultiFileReport,
22 RandomnessReport,
23 Report,
24 UnknownChunkReport,
25)
27logger = get_logger()
29# The state transitions are:
30#
31# file ──► pattern match ──► ValidChunk
32#
35class HandlerType(Enum):
36 ARCHIVE = "Archive"
37 COMPRESSION = "Compression"
38 FILESYSTEM = "FileSystem"
39 EXECUTABLE = "Executable"
40 BAREMETAL = "Baremetal"
41 BOOTLOADER = "Bootloader"
42 ENCRYPTION = "Encryption"
45@dataclasses.dataclass(frozen=True)
46class Reference:
47 title: str
48 url: str
51@dataclasses.dataclass
52class HandlerDoc:
53 name: str
54 description: Union[str, None]
55 vendor: Union[str, None]
56 references: list[Reference]
57 limitations: list[str]
58 handler_type: HandlerType
59 fully_supported: bool = dataclasses.field(init=False)
61 def __post_init__(self):
62 self.fully_supported = len(self.limitations) == 0
65class Task(BaseModel):
66 path: Path
67 depth: int
68 blob_id: str
69 is_multi_file: bool = False
72@attrs.define
73class Blob:
74 id: str = attrs.field(
75 factory=new_id,
76 )
79@attrs.define
80class Chunk(Blob):
81 """File chunk, have start and end offset, but still can be invalid.
83 For an array ``b``, a chunk ``c`` represents the slice:
84 ::
86 b[c.start_offset:c.end_offset]
87 """
89 start_offset: int = attrs.field(kw_only=True)
90 """The index of the first byte of the chunk"""
92 end_offset: int = attrs.field(kw_only=True)
93 """The index of the first byte after the end of the chunk"""
95 file: Optional[File] = None
97 def __attrs_post_init__(self):
98 if self.start_offset < 0 or self.end_offset < 0:
99 raise InvalidInputFormat(f"Chunk has negative offset: {self}")
100 if self.start_offset >= self.end_offset:
101 raise InvalidInputFormat(
102 f"Chunk has higher start_offset than end_offset: {self}"
103 )
105 @property
106 def size(self) -> int:
107 return self.end_offset - self.start_offset
109 @property
110 def range_hex(self) -> str:
111 return f"0x{self.start_offset:x}-0x{self.end_offset:x}"
113 @property
114 def is_whole_file(self):
115 assert self.file
116 return self.start_offset == 0 and self.end_offset == self.file.size()
118 def contains(self, other: "Chunk") -> bool:
119 return (
120 self.start_offset < other.start_offset
121 and self.end_offset >= other.end_offset
122 ) or (
123 self.start_offset <= other.start_offset
124 and self.end_offset > other.end_offset
125 )
127 def contains_offset(self, offset: int) -> bool:
128 return self.start_offset <= offset < self.end_offset
130 def __repr__(self) -> str:
131 return self.range_hex
134@attrs.define(repr=False)
135class ValidChunk(Chunk):
136 """Known to be valid chunk of a File, can be extracted with an external program."""
138 handler: "Handler" = attrs.field(init=False, eq=False)
139 is_encrypted: bool = attrs.field(default=False)
141 def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]:
142 if self.is_encrypted:
143 logger.warning(
144 "Encrypted file is not extracted",
145 path=inpath,
146 chunk=self,
147 )
148 raise ExtractError
150 return self.handler.extract(inpath, outdir)
152 def as_report(self, extraction_reports: list[Report]) -> ChunkReport:
153 return ChunkReport(
154 id=self.id,
155 start_offset=self.start_offset,
156 end_offset=self.end_offset,
157 size=self.size,
158 handler_name=self.handler.NAME,
159 is_encrypted=self.is_encrypted,
160 extraction_reports=extraction_reports,
161 )
164@attrs.define(repr=False)
165class UnknownChunk(Chunk):
166 r"""Gaps between valid chunks or otherwise unknown chunks.
168 Important for manual analysis, and analytical certainty: for example
169 randomness, other chunks inside it, metadata, etc.
171 These are not extracted, just logged for information purposes and further analysis,
172 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc.
173 """
175 def as_report(self, randomness: Optional[RandomnessReport]) -> UnknownChunkReport:
176 return UnknownChunkReport(
177 id=self.id,
178 start_offset=self.start_offset,
179 end_offset=self.end_offset,
180 size=self.size,
181 randomness=randomness,
182 )
185@attrs.define(repr=False)
186class PaddingChunk(Chunk):
187 r"""Gaps between valid chunks or otherwise unknown chunks.
189 Important for manual analysis, and analytical certanity: for example
190 randomness, other chunks inside it, metadata, etc.
191 """
193 def as_report(
194 self,
195 randomness: Optional[RandomnessReport], # noqa: ARG002
196 ) -> ChunkReport:
197 return ChunkReport(
198 id=self.id,
199 start_offset=self.start_offset,
200 end_offset=self.end_offset,
201 size=self.size,
202 is_encrypted=False,
203 handler_name="padding",
204 extraction_reports=[],
205 )
208@attrs.define
209class MultiFile(Blob):
210 name: str = attrs.field(kw_only=True)
211 paths: list[Path] = attrs.field(kw_only=True)
213 handler: "DirectoryHandler" = attrs.field(init=False, eq=False)
215 def extract(self, outdir: Path) -> Optional["ExtractResult"]:
216 return self.handler.extract(self.paths, outdir)
218 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport:
219 return MultiFileReport(
220 id=self.id,
221 name=self.name,
222 paths=self.paths,
223 handler_name=self.handler.NAME,
224 extraction_reports=extraction_reports,
225 )
228ReportType = TypeVar("ReportType", bound=Report)
231class TaskResult(BaseModel):
232 task: Task
233 reports: list[Report] = []
234 subtasks: list[Task] = []
236 def add_report(self, report: Report):
237 self.reports.append(report)
239 def add_subtask(self, task: Task):
240 self.subtasks.append(task)
242 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]:
243 return [report for report in self.reports if isinstance(report, report_class)]
246class ProcessResult(BaseModel):
247 results: list[TaskResult] = []
249 @property
250 def errors(self) -> list[ErrorReport]:
251 reports = itertools.chain.from_iterable(r.reports for r in self.results)
252 interesting_reports = (
253 r for r in reports if isinstance(r, (ErrorReport, ChunkReport))
254 )
255 errors = []
256 for report in interesting_reports:
257 if isinstance(report, ErrorReport):
258 errors.append(report)
259 else:
260 errors.extend(
261 r for r in report.extraction_reports if isinstance(r, ErrorReport)
262 )
263 return errors
265 def register(self, result: TaskResult):
266 self.results.append(result)
268 def to_json(self, indent=" "):
269 return json.dumps(
270 [result.model_dump(mode="json") for result in self.results], indent=indent
271 )
273 def get_output_dir(self) -> Optional[Path]:
274 try:
275 top_result = self.results[0]
276 if carves := top_result.filter_reports(CarveDirectoryReport):
277 # we have a top level carve
278 return carves[0].carve_dir
280 # we either have an extraction,
281 # and the extract directory registered as subtask
282 return top_result.subtasks[0].path
283 except IndexError:
284 # or no extraction
285 return None
288ReportModel = list[TaskResult]
289ReportModelAdapter = TypeAdapter(ReportModel)
290"""Use this for deserialization (import JSON report back into Python
291objects) of the JSON report.
293For example:
295with open('report.json', 'r') as f:
296 data = f.read()
297 report_data = ReportModelAdapter.validate_json(data)
299For another example see:
300tests/test_models.py::Test_to_json::test_process_result_deserialization
301"""
304class ExtractError(Exception):
305 """There was an error during extraction."""
307 def __init__(self, *reports: Report):
308 super().__init__()
309 self.reports: tuple[Report, ...] = reports
312@attrs.define(kw_only=True)
313class ExtractResult:
314 reports: list[Report]
317class Extractor(abc.ABC):
318 def get_dependencies(self) -> list[str]:
319 """Return the external command dependencies."""
320 return []
322 @abc.abstractmethod
323 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
324 """Extract the carved out chunk.
326 Raises ExtractError on failure.
327 """
330class DirectoryExtractor(abc.ABC):
331 def get_dependencies(self) -> list[str]:
332 """Return the external command dependencies."""
333 return []
335 @abc.abstractmethod
336 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]:
337 """Extract from a multi file path list.
339 Raises ExtractError on failure.
340 """
343class Pattern(str):
344 def as_regex(self) -> bytes:
345 raise NotImplementedError
348class HexString(Pattern):
349 """Hex string can be a YARA rule like hexadecimal string.
351 It is useful to simplify defining binary strings using hex
352 encoding, wild-cards, jumps and alternatives. Hexstrings are
353 convereted to hyperscan compatible PCRE regex.
355 See YARA & Hyperscan documentation for more details:
357 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings
359 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support
361 You can specify the following:
363 - normal bytes using hexadecimals: 01 de ad co de ff
365 - wild-cards can match single bytes and can be mixed with
366 normal hex: 01 ?? 02
368 - wild-cards can also match first and second nibles: 0? ?0
370 - jumps can be specified for multiple wildcard bytes: [3]
371 [2-5]
373 - alternatives can be specified as well: ( 01 02 | 03 04 ) The
374 above can be combined and alternatives nested: 01 02 ( 03 04
375 | (0? | 03 | ?0) | 05 ?? ) 06
377 Single line comments can be specified using //
379 We do NOT support the following YARA syntax:
381 - comments using /* */ notation
383 - infinite jumps: [-]
385 - unbounded jumps: [3-] or [-4] (use [0-4] instead)
386 """
388 def as_regex(self) -> bytes:
389 return hexstring2regex(self)
392class Regex(Pattern):
393 """Byte PCRE regex.
395 See hyperscan documentation for more details:
396 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support.
397 """
399 def as_regex(self) -> bytes:
400 return self.encode()
403class DirectoryPattern:
404 def get_files(self, directory: Path) -> Iterable[Path]:
405 raise NotImplementedError
408class Glob(DirectoryPattern):
409 def __init__(self, pattern):
410 self._pattern = pattern
412 def get_files(self, directory: Path) -> Iterable[Path]:
413 return directory.glob(self._pattern)
416class SingleFile(DirectoryPattern):
417 def __init__(self, filename):
418 self._filename = filename
420 def get_files(self, directory: Path) -> Iterable[Path]:
421 path = directory / self._filename
422 return [path] if path.exists() else []
425class DirectoryHandler(abc.ABC):
426 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""
428 NAME: str
430 EXTRACTOR: DirectoryExtractor
432 PATTERN: DirectoryPattern
434 DOC: Union[HandlerDoc, None]
436 @classmethod
437 def get_dependencies(cls):
438 """Return external command dependencies needed for this handler to work."""
439 if cls.EXTRACTOR:
440 return cls.EXTRACTOR.get_dependencies()
441 return []
443 @abc.abstractmethod
444 def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
445 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point."""
447 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]:
448 if self.EXTRACTOR is None:
449 logger.debug("Skipping file: no extractor.", paths=paths)
450 raise ExtractError
452 # We only extract every blob once, it's a mistake to extract the same blob again
453 outdir.mkdir(parents=True, exist_ok=False)
455 return self.EXTRACTOR.extract(paths, outdir)
458TExtractor = TypeVar("TExtractor", bound=Union[None, Extractor])
461class Handler(abc.ABC, Generic[TExtractor]):
462 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs."""
464 NAME: str
465 PATTERNS: list[Pattern]
466 # We need this, because not every match reflects the actual start
467 # (e.g. tar magic is in the middle of the header)
468 PATTERN_MATCH_OFFSET: int = 0
470 EXTRACTOR: TExtractor
472 DOC: Union[HandlerDoc, None]
474 @classmethod
475 def get_dependencies(cls):
476 """Return external command dependencies needed for this handler to work."""
477 if cls.EXTRACTOR is not None:
478 return cls.EXTRACTOR.get_dependencies()
479 return []
481 @abc.abstractmethod
482 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
483 """Calculate the Chunk offsets from the File and the file type headers."""
485 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
486 if self.EXTRACTOR is None:
487 logger.debug("Skipping file: no extractor.", path=inpath)
488 raise ExtractError
490 # We only extract every blob once, it's a mistake to extract the same blob again
491 outdir.mkdir(parents=True, exist_ok=False)
493 return self.EXTRACTOR.extract(inpath, outdir)
496class StructHandler(Handler):
497 C_DEFINITIONS: str
498 # A struct from the C_DEFINITIONS used to parse the file's header
499 HEADER_STRUCT: str
501 def __init__(self):
502 self._struct_parser = StructParser(self.C_DEFINITIONS)
504 @property
505 def cparser_le(self):
506 return self._struct_parser.cparser_le
508 @property
509 def cparser_be(self):
510 return self._struct_parser.cparser_be
512 def parse_header(self, file: File, endian=Endian.LITTLE):
513 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian)
514 logger.debug("Header parsed", header=header, _verbosity=3)
515 return header
518Handlers = tuple[type[Handler], ...]
519DirectoryHandlers = tuple[type[DirectoryHandler], ...]