Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

248 statements  

1from __future__ import annotations 

2 

3import abc 

4import dataclasses 

5import itertools 

6import json 

7from enum import Enum 

8from pathlib import Path # noqa: TC003 

9from typing import TYPE_CHECKING, Generic, TypeVar 

10 

11import attrs 

12from pydantic import BaseModel, TypeAdapter, field_validator 

13from structlog import get_logger 

14 

15from .file_utils import Endian, File, InvalidInputFormat, StructParser 

16from .identifiers import new_id 

17from .parser import hexstring2regex 

18from .report import ( 

19 CarveDirectoryReport, 

20 ChunkReport, 

21 ErrorReport, 

22 MultiFileReport, 

23 RandomnessReport, 

24 Report, 

25 UnknownChunkReport, 

26 validate_report_list, 

27) 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Iterable 

31 

32__all__ = [ 

33 "Blob", 

34 "Chunk", 

35 "DExtractor", 

36 "DirectoryExtractor", 

37 "DirectoryHandler", 

38 "DirectoryHandlers", 

39 "DirectoryPattern", 

40 "Endian", 

41 "ExtractError", 

42 "ExtractResult", 

43 "Extractor", 

44 "File", 

45 "Glob", 

46 "Handler", 

47 "HandlerDoc", 

48 "HandlerType", 

49 "Handlers", 

50 "HexString", 

51 "InvalidInputFormat", 

52 "MultiFile", 

53 "PaddingChunk", 

54 "Pattern", 

55 "ProcessResult", 

56 "Reference", 

57 "Regex", 

58 "ReportModel", 

59 "ReportModelAdapter", 

60 "SingleFile", 

61 "StructHandler", 

62 "StructParser", 

63 "TExtractor", 

64 "Task", 

65 "TaskResult", 

66 "UnknownChunk", 

67 "ValidChunk", 

68] 

69 

70logger = get_logger() 

71 

72# The state transitions are: 

73# 

74# file ──► pattern match ──► ValidChunk 

75# 

76 

77 

78class HandlerType(Enum): 

79 ARCHIVE = "Archive" 

80 COMPRESSION = "Compression" 

81 FILESYSTEM = "FileSystem" 

82 EXECUTABLE = "Executable" 

83 BAREMETAL = "Baremetal" 

84 BOOTLOADER = "Bootloader" 

85 ENCRYPTION = "Encryption" 

86 

87 

88@dataclasses.dataclass(frozen=True) 

89class Reference: 

90 title: str 

91 url: str 

92 

93 

94@dataclasses.dataclass 

95class HandlerDoc: 

96 name: str 

97 description: str | None 

98 vendor: str | None 

99 references: list[Reference] 

100 limitations: list[str] 

101 handler_type: HandlerType 

102 private: bool = False 

103 fully_supported: bool = dataclasses.field(init=False) 

104 

105 def __post_init__(self): 

106 self.fully_supported = len(self.limitations) == 0 

107 

108 

109class Task(BaseModel): 

110 path: Path 

111 depth: int 

112 blob_id: str 

113 is_multi_file: bool = False 

114 

115 

116@attrs.define 

117class Blob: 

118 id: str = attrs.field( 

119 factory=new_id, 

120 ) 

121 

122 

123@attrs.define 

124class Chunk(Blob): 

125 """File chunk, have start and end offset, but still can be invalid. 

126 

127 For an array ``b``, a chunk ``c`` represents the slice: 

128 :: 

129 

130 b[c.start_offset:c.end_offset] 

131 """ 

132 

133 start_offset: int = attrs.field(kw_only=True) 

134 """The index of the first byte of the chunk""" 

135 

136 end_offset: int = attrs.field(kw_only=True) 

137 """The index of the first byte after the end of the chunk""" 

138 

139 file: File | None = None 

140 

141 def __attrs_post_init__(self): 

142 if self.start_offset < 0 or self.end_offset < 0: 

143 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

144 if self.start_offset >= self.end_offset: 

145 raise InvalidInputFormat( 

146 f"Chunk has higher start_offset than end_offset: {self}" 

147 ) 

148 

149 @property 

150 def size(self) -> int: 

151 return self.end_offset - self.start_offset 

152 

153 @property 

154 def range_hex(self) -> str: 

155 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

156 

157 @property 

158 def is_whole_file(self): 

159 assert self.file 

160 return self.start_offset == 0 and self.end_offset == self.file.size() 

161 

162 def contains(self, other: Chunk) -> bool: 

163 return ( 

164 self.start_offset < other.start_offset 

165 and self.end_offset >= other.end_offset 

166 ) or ( 

167 self.start_offset <= other.start_offset 

168 and self.end_offset > other.end_offset 

169 ) 

170 

171 def contains_offset(self, offset: int) -> bool: 

172 return self.start_offset <= offset < self.end_offset 

173 

174 def __repr__(self) -> str: 

175 return self.range_hex 

176 

177 

178@attrs.define(repr=False) 

179class ValidChunk(Chunk): 

180 """Known to be valid chunk of a File, can be extracted with an external program.""" 

181 

182 handler: Handler = attrs.field(init=False, eq=False) 

183 is_encrypted: bool = attrs.field(default=False) 

184 

185 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

186 if self.is_encrypted: 

187 logger.warning( 

188 "Encrypted file is not extracted", 

189 path=inpath, 

190 chunk=self, 

191 ) 

192 raise ExtractError 

193 

194 return self.handler.extract(inpath, outdir) 

195 

196 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

197 return ChunkReport( 

198 id=self.id, 

199 start_offset=self.start_offset, 

200 end_offset=self.end_offset, 

201 size=self.size, 

202 handler_name=self.handler.NAME, 

203 is_encrypted=self.is_encrypted, 

204 extraction_reports=extraction_reports, 

205 ) 

206 

207 

208@attrs.define(repr=False) 

209class UnknownChunk(Chunk): 

210 r"""Gaps between valid chunks or otherwise unknown chunks. 

211 

212 Important for manual analysis, and analytical certainty: for example 

213 randomness, other chunks inside it, metadata, etc. 

214 

215 These are not extracted, just logged for information purposes and further analysis, 

216 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

217 """ 

218 

219 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport: 

220 return UnknownChunkReport( 

221 id=self.id, 

222 start_offset=self.start_offset, 

223 end_offset=self.end_offset, 

224 size=self.size, 

225 randomness=randomness, 

226 ) 

227 

228 

229@attrs.define(repr=False) 

230class PaddingChunk(Chunk): 

231 r"""Gaps between valid chunks or otherwise unknown chunks. 

232 

233 Important for manual analysis, and analytical certanity: for example 

234 randomness, other chunks inside it, metadata, etc. 

235 """ 

236 

237 def as_report( 

238 self, 

239 randomness: RandomnessReport | None, # noqa: ARG002 

240 ) -> ChunkReport: 

241 return ChunkReport( 

242 id=self.id, 

243 start_offset=self.start_offset, 

244 end_offset=self.end_offset, 

245 size=self.size, 

246 is_encrypted=False, 

247 handler_name="padding", 

248 extraction_reports=[], 

249 ) 

250 

251 

252@attrs.define 

253class MultiFile(Blob): 

254 name: str = attrs.field(kw_only=True) 

255 paths: list[Path] = attrs.field(kw_only=True) 

256 

257 handler: DirectoryHandler = attrs.field(init=False, eq=False) 

258 

259 def extract(self, outdir: Path) -> ExtractResult | None: 

260 return self.handler.extract(self.paths, outdir) 

261 

262 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

263 return MultiFileReport( 

264 id=self.id, 

265 name=self.name, 

266 paths=self.paths, 

267 handler_name=self.handler.NAME, 

268 extraction_reports=extraction_reports, 

269 ) 

270 

271 

272ReportType = TypeVar("ReportType", bound=Report) 

273 

274 

275class TaskResult(BaseModel): 

276 task: Task 

277 reports: list[Report] = [] 

278 subtasks: list[Task] = [] 

279 

280 @field_validator("reports", mode="before") 

281 @classmethod 

282 def validate_reports(cls, value): 

283 return validate_report_list(value) 

284 

285 def add_report(self, report: Report): 

286 self.reports.append(report) 

287 

288 def add_subtask(self, task: Task): 

289 self.subtasks.append(task) 

290 

291 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

292 return [report for report in self.reports if isinstance(report, report_class)] 

293 

294 

295class ProcessResult(BaseModel): 

296 results: list[TaskResult] = [] 

297 

298 @property 

299 def errors(self) -> list[ErrorReport]: 

300 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

301 interesting_reports = ( 

302 r for r in reports if isinstance(r, ErrorReport | ChunkReport) 

303 ) 

304 errors = [] 

305 for report in interesting_reports: 

306 if isinstance(report, ErrorReport): 

307 errors.append(report) 

308 else: 

309 errors.extend( 

310 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

311 ) 

312 return errors 

313 

314 def register(self, result: TaskResult): 

315 self.results.append(result) 

316 

317 def to_json(self, indent=" "): 

318 return json.dumps( 

319 [ 

320 result.model_dump(mode="json", serialize_as_any=True) 

321 for result in self.results 

322 ], 

323 indent=indent, 

324 ) 

325 

326 def get_output_dir(self) -> Path | None: 

327 try: 

328 top_result = self.results[0] 

329 if carves := top_result.filter_reports(CarveDirectoryReport): 

330 # we have a top level carve 

331 return carves[0].carve_dir 

332 

333 # we either have an extraction, 

334 # and the extract directory registered as subtask 

335 return top_result.subtasks[0].path 

336 except IndexError: 

337 # or no extraction 

338 return None 

339 

340 

341ReportModel = list[TaskResult] 

342ReportModelAdapter = TypeAdapter(ReportModel) 

343"""Use this for deserialization (import JSON report back into Python 

344objects) of the JSON report. 

345 

346For example: 

347 

348with open('report.json', 'r') as f: 

349 data = f.read() 

350 report_data = ReportModelAdapter.validate_json(data) 

351 

352For another example see: 

353tests/test_models.py::Test_to_json::test_process_result_deserialization 

354""" 

355 

356 

357class ExtractError(Exception): 

358 """There was an error during extraction.""" 

359 

360 def __init__(self, *reports: Report): 

361 super().__init__() 

362 self.reports: tuple[Report, ...] = reports 

363 

364 

365@attrs.define(kw_only=True) 

366class ExtractResult: 

367 reports: list[Report] 

368 

369 

370class Extractor(abc.ABC): 

371 def get_dependencies(self) -> list[str]: 

372 """Return the external command dependencies.""" 

373 return [] 

374 

375 @abc.abstractmethod 

376 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

377 """Extract the carved out chunk. 

378 

379 Raises ExtractError on failure. 

380 """ 

381 

382 

383class DirectoryExtractor(abc.ABC): 

384 def get_dependencies(self) -> list[str]: 

385 """Return the external command dependencies.""" 

386 return [] 

387 

388 @abc.abstractmethod 

389 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

390 """Extract from a multi file path list. 

391 

392 Raises ExtractError on failure. 

393 """ 

394 

395 

396class Pattern(str): 

397 def as_regex(self) -> bytes: 

398 raise NotImplementedError 

399 

400 

401class HexString(Pattern): 

402 """Hex string can be a YARA rule like hexadecimal string. 

403 

404 It is useful to simplify defining binary strings using hex 

405 encoding, wild-cards, jumps and alternatives. Hexstrings are 

406 convereted to hyperscan compatible PCRE regex. 

407 

408 See YARA & Hyperscan documentation for more details: 

409 

410 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

411 

412 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

413 

414 You can specify the following: 

415 

416 - normal bytes using hexadecimals: 01 de ad co de ff 

417 

418 - wild-cards can match single bytes and can be mixed with 

419 normal hex: 01 ?? 02 

420 

421 - wild-cards can also match first and second nibles: 0? ?0 

422 

423 - jumps can be specified for multiple wildcard bytes: [3] 

424 [2-5] 

425 

426 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

427 above can be combined and alternatives nested: 01 02 ( 03 04 

428 | (0? | 03 | ?0) | 05 ?? ) 06 

429 

430 Single line comments can be specified using // 

431 

432 We do NOT support the following YARA syntax: 

433 

434 - comments using /* */ notation 

435 

436 - infinite jumps: [-] 

437 

438 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

439 """ 

440 

441 def as_regex(self) -> bytes: 

442 return hexstring2regex(self) 

443 

444 

445class Regex(Pattern): 

446 """Byte PCRE regex. 

447 

448 See hyperscan documentation for more details: 

449 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

450 """ 

451 

452 def as_regex(self) -> bytes: 

453 return self.encode() 

454 

455 

456class DirectoryPattern: 

457 def get_files(self, directory: Path) -> Iterable[Path]: 

458 raise NotImplementedError 

459 

460 

461class Glob(DirectoryPattern): 

462 def __init__(self, *patterns): 

463 if not patterns: 

464 raise ValueError("At least one pattern must be provided") 

465 self._patterns = patterns 

466 

467 def get_files(self, directory: Path) -> Iterable[Path]: 

468 for pattern in self._patterns: 

469 yield from directory.glob(pattern) 

470 

471 

472class SingleFile(DirectoryPattern): 

473 def __init__(self, filename): 

474 self._filename = filename 

475 

476 def get_files(self, directory: Path) -> Iterable[Path]: 

477 path = directory / self._filename 

478 return [path] if path.exists() else [] 

479 

480 

481DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor) 

482 

483 

484class DirectoryHandler(abc.ABC, Generic[DExtractor]): 

485 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

486 

487 NAME: str 

488 

489 EXTRACTOR: DExtractor 

490 

491 PATTERN: DirectoryPattern 

492 

493 DOC: HandlerDoc | None 

494 

495 @classmethod 

496 def get_dependencies(cls): 

497 """Return external command dependencies needed for this handler to work.""" 

498 if cls.EXTRACTOR is not None: 

499 return cls.EXTRACTOR.get_dependencies() 

500 return [] 

501 

502 @abc.abstractmethod 

503 def calculate_multifile(self, file: Path) -> MultiFile | None: 

504 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

505 

506 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

507 if self.EXTRACTOR is None: 

508 logger.debug("Skipping file: no extractor.", paths=paths) 

509 raise ExtractError 

510 

511 # We only extract every blob once, it's a mistake to extract the same blob again 

512 outdir.mkdir(parents=True, exist_ok=False) 

513 

514 return self.EXTRACTOR.extract(paths, outdir) 

515 

516 

517TExtractor = TypeVar("TExtractor", bound=None | Extractor) 

518 

519 

520class Handler(abc.ABC, Generic[TExtractor]): 

521 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

522 

523 NAME: str 

524 PATTERNS: list[Pattern] 

525 # We need this, because not every match reflects the actual start 

526 # (e.g. tar magic is in the middle of the header) 

527 PATTERN_MATCH_OFFSET: int = 0 

528 

529 EXTRACTOR: TExtractor 

530 

531 DOC: HandlerDoc | None 

532 

533 @classmethod 

534 def get_dependencies(cls): 

535 """Return external command dependencies needed for this handler to work.""" 

536 if cls.EXTRACTOR is not None: 

537 return cls.EXTRACTOR.get_dependencies() 

538 return [] 

539 

540 @abc.abstractmethod 

541 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

542 """Calculate the Chunk offsets from the File and the file type headers.""" 

543 

544 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

545 if self.EXTRACTOR is None: 

546 logger.debug("Skipping file: no extractor.", path=inpath) 

547 raise ExtractError 

548 

549 # We only extract every blob once, it's a mistake to extract the same blob again 

550 outdir.mkdir(parents=True, exist_ok=False) 

551 

552 return self.EXTRACTOR.extract(inpath, outdir) 

553 

554 

555class StructHandler(Handler): 

556 C_DEFINITIONS: str 

557 # A struct from the C_DEFINITIONS used to parse the file's header 

558 HEADER_STRUCT: str 

559 

560 def __init__(self): 

561 self._struct_parser = StructParser(self.C_DEFINITIONS) 

562 

563 @property 

564 def cparser_le(self): 

565 return self._struct_parser.cparser_le 

566 

567 @property 

568 def cparser_be(self): 

569 return self._struct_parser.cparser_be 

570 

571 def parse_header(self, file: File, endian=Endian.LITTLE): 

572 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

573 logger.debug("Header parsed", header=header, _verbosity=3) 

574 return header 

575 

576 

577Handlers = tuple[type[Handler], ...] 

578DirectoryHandlers = tuple[type[DirectoryHandler], ...]