Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 70%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import dataclasses
3import itertools
4import json
5from collections.abc import Iterable
6from enum import Enum
7from pathlib import Path
8from typing import Generic, Optional, TypeVar, Union
10import attrs
11from structlog import get_logger
13from .file_utils import Endian, File, InvalidInputFormat, StructParser
14from .identifiers import new_id
15from .parser import hexstring2regex
16from .report import (
17 CarveDirectoryReport,
18 ChunkReport,
19 ErrorReport,
20 MultiFileReport,
21 RandomnessReport,
22 Report,
23 UnknownChunkReport,
24)
26logger = get_logger()
28# The state transitions are:
29#
30# file ──► pattern match ──► ValidChunk
31#
34class HandlerType(Enum):
35 ARCHIVE = "Archive"
36 COMPRESSION = "Compression"
37 FILESYSTEM = "FileSystem"
38 EXECUTABLE = "Executable"
39 BAREMETAL = "Baremetal"
40 BOOTLOADER = "Bootloader"
41 ENCRYPTION = "Encryption"
44@dataclasses.dataclass(frozen=True)
45class Reference:
46 title: str
47 url: str
50@dataclasses.dataclass
51class HandlerDoc:
52 name: str
53 description: Union[str, None]
54 vendor: Union[str, None]
55 references: list[Reference]
56 limitations: list[str]
57 handler_type: HandlerType
58 fully_supported: bool = dataclasses.field(init=False)
60 def __post_init__(self):
61 self.fully_supported = len(self.limitations) == 0
64@attrs.define(frozen=True)
65class Task:
66 path: Path
67 depth: int
68 blob_id: str
69 is_multi_file: bool = attrs.field(default=False)
72@attrs.define
73class Blob:
74 id: str = attrs.field(
75 factory=new_id,
76 )
79@attrs.define
80class Chunk(Blob):
81 """File chunk, have start and end offset, but still can be invalid.
83 For an array ``b``, a chunk ``c`` represents the slice:
84 ::
86 b[c.start_offset:c.end_offset]
87 """
89 start_offset: int = attrs.field(kw_only=True)
90 """The index of the first byte of the chunk"""
92 end_offset: int = attrs.field(kw_only=True)
93 """The index of the first byte after the end of the chunk"""
95 file: Optional[File] = None
97 def __attrs_post_init__(self):
98 if self.start_offset < 0 or self.end_offset < 0:
99 raise InvalidInputFormat(f"Chunk has negative offset: {self}")
100 if self.start_offset >= self.end_offset:
101 raise InvalidInputFormat(
102 f"Chunk has higher start_offset than end_offset: {self}"
103 )
105 @property
106 def size(self) -> int:
107 return self.end_offset - self.start_offset
109 @property
110 def range_hex(self) -> str:
111 return f"0x{self.start_offset:x}-0x{self.end_offset:x}"
113 @property
114 def is_whole_file(self):
115 assert self.file
116 return self.start_offset == 0 and self.end_offset == self.file.size()
118 def contains(self, other: "Chunk") -> bool:
119 return (
120 self.start_offset < other.start_offset
121 and self.end_offset >= other.end_offset
122 ) or (
123 self.start_offset <= other.start_offset
124 and self.end_offset > other.end_offset
125 )
127 def contains_offset(self, offset: int) -> bool:
128 return self.start_offset <= offset < self.end_offset
130 def __repr__(self) -> str:
131 return self.range_hex
134@attrs.define(repr=False)
135class ValidChunk(Chunk):
136 """Known to be valid chunk of a File, can be extracted with an external program."""
138 handler: "Handler" = attrs.field(init=False, eq=False)
139 is_encrypted: bool = attrs.field(default=False)
141 def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]:
142 if self.is_encrypted:
143 logger.warning(
144 "Encrypted file is not extracted",
145 path=inpath,
146 chunk=self,
147 )
148 raise ExtractError
150 return self.handler.extract(inpath, outdir)
152 def as_report(self, extraction_reports: list[Report]) -> ChunkReport:
153 return ChunkReport(
154 id=self.id,
155 start_offset=self.start_offset,
156 end_offset=self.end_offset,
157 size=self.size,
158 handler_name=self.handler.NAME,
159 is_encrypted=self.is_encrypted,
160 extraction_reports=extraction_reports,
161 )
164@attrs.define(repr=False)
165class UnknownChunk(Chunk):
166 r"""Gaps between valid chunks or otherwise unknown chunks.
168 Important for manual analysis, and analytical certainty: for example
169 randomness, other chunks inside it, metadata, etc.
171 These are not extracted, just logged for information purposes and further analysis,
172 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc.
173 """
175 def as_report(self, randomness: Optional[RandomnessReport]) -> UnknownChunkReport:
176 return UnknownChunkReport(
177 id=self.id,
178 start_offset=self.start_offset,
179 end_offset=self.end_offset,
180 size=self.size,
181 randomness=randomness,
182 )
185@attrs.define(repr=False)
186class PaddingChunk(Chunk):
187 r"""Gaps between valid chunks or otherwise unknown chunks.
189 Important for manual analysis, and analytical certanity: for example
190 randomness, other chunks inside it, metadata, etc.
191 """
193 def as_report(
194 self,
195 randomness: Optional[RandomnessReport], # noqa: ARG002
196 ) -> ChunkReport:
197 return ChunkReport(
198 id=self.id,
199 start_offset=self.start_offset,
200 end_offset=self.end_offset,
201 size=self.size,
202 is_encrypted=False,
203 handler_name="padding",
204 extraction_reports=[],
205 )
208@attrs.define
209class MultiFile(Blob):
210 name: str = attrs.field(kw_only=True)
211 paths: list[Path] = attrs.field(kw_only=True)
213 handler: "DirectoryHandler" = attrs.field(init=False, eq=False)
215 def extract(self, outdir: Path) -> Optional["ExtractResult"]:
216 return self.handler.extract(self.paths, outdir)
218 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport:
219 return MultiFileReport(
220 id=self.id,
221 name=self.name,
222 paths=self.paths,
223 handler_name=self.handler.NAME,
224 extraction_reports=extraction_reports,
225 )
228ReportType = TypeVar("ReportType", bound=Report)
231@attrs.define
232class TaskResult:
233 task: Task
234 reports: list[Report] = attrs.field(factory=list)
235 subtasks: list[Task] = attrs.field(factory=list)
237 def add_report(self, report: Report):
238 self.reports.append(report)
240 def add_subtask(self, task: Task):
241 self.subtasks.append(task)
243 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]:
244 return [report for report in self.reports if isinstance(report, report_class)]
247@attrs.define
248class ProcessResult:
249 results: list[TaskResult] = attrs.field(factory=list)
251 @property
252 def errors(self) -> list[ErrorReport]:
253 reports = itertools.chain.from_iterable(r.reports for r in self.results)
254 interesting_reports = (
255 r for r in reports if isinstance(r, (ErrorReport, ChunkReport))
256 )
257 errors = []
258 for report in interesting_reports:
259 if isinstance(report, ErrorReport):
260 errors.append(report)
261 else:
262 errors.extend(
263 r for r in report.extraction_reports if isinstance(r, ErrorReport)
264 )
265 return errors
267 def register(self, result: TaskResult):
268 self.results.append(result)
270 def to_json(self, indent=" "):
271 return to_json(self.results, indent=indent)
273 def get_output_dir(self) -> Optional[Path]:
274 try:
275 top_result = self.results[0]
276 if carves := top_result.filter_reports(CarveDirectoryReport):
277 # we have a top level carve
278 return carves[0].carve_dir
280 # we either have an extraction,
281 # and the extract directory registered as subtask
282 return top_result.subtasks[0].path
283 except IndexError:
284 # or no extraction
285 return None
288class _JSONEncoder(json.JSONEncoder):
289 def default(self, o):
290 obj = o
291 if attrs.has(type(obj)):
292 extend_attr_output = True
293 attr_output = attrs.asdict(obj, recurse=not extend_attr_output)
294 attr_output["__typename__"] = obj.__class__.__name__
295 return attr_output
297 if isinstance(obj, Enum):
298 return obj.name
300 if isinstance(obj, Path):
301 return str(obj)
303 if isinstance(obj, bytes):
304 try:
305 return obj.decode()
306 except UnicodeDecodeError:
307 return str(obj)
309 logger.error("JSONEncoder met a non-JSON encodable value", obj=obj)
310 # the usual fail path of custom JSONEncoders is to call the parent and let it fail
311 # return json.JSONEncoder.default(self, obj)
312 # instead of failing, just return something usable
313 return f"Non-JSON encodable value: {obj}"
316def to_json(obj, indent=" ") -> str:
317 """Encode any UnBlob object as a serialized JSON."""
318 return json.dumps(obj, cls=_JSONEncoder, indent=indent)
321class ExtractError(Exception):
322 """There was an error during extraction."""
324 def __init__(self, *reports: Report):
325 super().__init__()
326 self.reports: tuple[Report, ...] = reports
329@attrs.define(kw_only=True)
330class ExtractResult:
331 reports: list[Report]
334class Extractor(abc.ABC):
335 def get_dependencies(self) -> list[str]:
336 """Return the external command dependencies."""
337 return []
339 @abc.abstractmethod
340 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
341 """Extract the carved out chunk.
343 Raises ExtractError on failure.
344 """
347class DirectoryExtractor(abc.ABC):
348 def get_dependencies(self) -> list[str]:
349 """Return the external command dependencies."""
350 return []
352 @abc.abstractmethod
353 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]:
354 """Extract from a multi file path list.
356 Raises ExtractError on failure.
357 """
360class Pattern(str):
361 def as_regex(self) -> bytes:
362 raise NotImplementedError
365class HexString(Pattern):
366 """Hex string can be a YARA rule like hexadecimal string.
368 It is useful to simplify defining binary strings using hex
369 encoding, wild-cards, jumps and alternatives. Hexstrings are
370 convereted to hyperscan compatible PCRE regex.
372 See YARA & Hyperscan documentation for more details:
374 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings
376 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support
378 You can specify the following:
380 - normal bytes using hexadecimals: 01 de ad co de ff
382 - wild-cards can match single bytes and can be mixed with
383 normal hex: 01 ?? 02
385 - wild-cards can also match first and second nibles: 0? ?0
387 - jumps can be specified for multiple wildcard bytes: [3]
388 [2-5]
390 - alternatives can be specified as well: ( 01 02 | 03 04 ) The
391 above can be combined and alternatives nested: 01 02 ( 03 04
392 | (0? | 03 | ?0) | 05 ?? ) 06
394 Single line comments can be specified using //
396 We do NOT support the following YARA syntax:
398 - comments using /* */ notation
400 - infinite jumps: [-]
402 - unbounded jumps: [3-] or [-4] (use [0-4] instead)
403 """
405 def as_regex(self) -> bytes:
406 return hexstring2regex(self)
409class Regex(Pattern):
410 """Byte PCRE regex.
412 See hyperscan documentation for more details:
413 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support.
414 """
416 def as_regex(self) -> bytes:
417 return self.encode()
420class DirectoryPattern:
421 def get_files(self, directory: Path) -> Iterable[Path]:
422 raise NotImplementedError
425class Glob(DirectoryPattern):
426 def __init__(self, pattern):
427 self._pattern = pattern
429 def get_files(self, directory: Path) -> Iterable[Path]:
430 return directory.glob(self._pattern)
433class SingleFile(DirectoryPattern):
434 def __init__(self, filename):
435 self._filename = filename
437 def get_files(self, directory: Path) -> Iterable[Path]:
438 path = directory / self._filename
439 return [path] if path.exists() else []
442class DirectoryHandler(abc.ABC):
443 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory."""
445 NAME: str
447 EXTRACTOR: DirectoryExtractor
449 PATTERN: DirectoryPattern
451 DOC: HandlerDoc
453 @classmethod
454 def get_dependencies(cls):
455 """Return external command dependencies needed for this handler to work."""
456 if cls.EXTRACTOR:
457 return cls.EXTRACTOR.get_dependencies()
458 return []
460 @abc.abstractmethod
461 def calculate_multifile(self, file: Path) -> Optional[MultiFile]:
462 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point."""
464 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]:
465 if self.EXTRACTOR is None:
466 logger.debug("Skipping file: no extractor.", paths=paths)
467 raise ExtractError
469 # We only extract every blob once, it's a mistake to extract the same blob again
470 outdir.mkdir(parents=True, exist_ok=False)
472 return self.EXTRACTOR.extract(paths, outdir)
475TExtractor = TypeVar("TExtractor", bound=Union[None, Extractor])
478class Handler(abc.ABC, Generic[TExtractor]):
479 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs."""
481 NAME: str
482 PATTERNS: list[Pattern]
483 # We need this, because not every match reflects the actual start
484 # (e.g. tar magic is in the middle of the header)
485 PATTERN_MATCH_OFFSET: int = 0
487 EXTRACTOR: TExtractor
489 DOC: HandlerDoc
491 @classmethod
492 def get_dependencies(cls):
493 """Return external command dependencies needed for this handler to work."""
494 if cls.EXTRACTOR is not None:
495 return cls.EXTRACTOR.get_dependencies()
496 return []
498 @abc.abstractmethod
499 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
500 """Calculate the Chunk offsets from the File and the file type headers."""
502 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]:
503 if self.EXTRACTOR is None:
504 logger.debug("Skipping file: no extractor.", path=inpath)
505 raise ExtractError
507 # We only extract every blob once, it's a mistake to extract the same blob again
508 outdir.mkdir(parents=True, exist_ok=False)
510 return self.EXTRACTOR.extract(inpath, outdir)
513class StructHandler(Handler):
514 C_DEFINITIONS: str
515 # A struct from the C_DEFINITIONS used to parse the file's header
516 HEADER_STRUCT: str
518 def __init__(self):
519 self._struct_parser = StructParser(self.C_DEFINITIONS)
521 @property
522 def cparser_le(self):
523 return self._struct_parser.cparser_le
525 @property
526 def cparser_be(self):
527 return self._struct_parser.cparser_be
529 def parse_header(self, file: File, endian=Endian.LITTLE):
530 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian)
531 logger.debug("Header parsed", header=header, _verbosity=3)
532 return header
535Handlers = tuple[type[Handler], ...]
536DirectoryHandlers = tuple[type[DirectoryHandler], ...]