Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 74%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

247 statements  

1from __future__ import annotations 

2 

3import abc 

4import dataclasses 

5import itertools 

6import json 

7from enum import Enum 

8from pathlib import Path # noqa: TC003 

9from typing import TYPE_CHECKING, Generic, TypeVar 

10 

11import attrs 

12from pydantic import BaseModel, TypeAdapter, field_validator 

13from structlog import get_logger 

14 

15from .file_utils import Endian, File, InvalidInputFormat, StructParser 

16from .identifiers import new_id 

17from .parser import hexstring2regex 

18from .report import ( 

19 CarveDirectoryReport, 

20 ChunkReport, 

21 ErrorReport, 

22 MultiFileReport, 

23 RandomnessReport, 

24 Report, 

25 UnknownChunkReport, 

26 validate_report_list, 

27) 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Iterable 

31 

32logger = get_logger() 

33 

34# The state transitions are: 

35# 

36# file ──► pattern match ──► ValidChunk 

37# 

38 

39 

40class HandlerType(Enum): 

41 ARCHIVE = "Archive" 

42 COMPRESSION = "Compression" 

43 FILESYSTEM = "FileSystem" 

44 EXECUTABLE = "Executable" 

45 BAREMETAL = "Baremetal" 

46 BOOTLOADER = "Bootloader" 

47 ENCRYPTION = "Encryption" 

48 

49 

50@dataclasses.dataclass(frozen=True) 

51class Reference: 

52 title: str 

53 url: str 

54 

55 

56@dataclasses.dataclass 

57class HandlerDoc: 

58 name: str 

59 description: str | None 

60 vendor: str | None 

61 references: list[Reference] 

62 limitations: list[str] 

63 handler_type: HandlerType 

64 private: bool = False 

65 fully_supported: bool = dataclasses.field(init=False) 

66 

67 def __post_init__(self): 

68 self.fully_supported = len(self.limitations) == 0 

69 

70 

71class Task(BaseModel): 

72 path: Path 

73 depth: int 

74 blob_id: str 

75 is_multi_file: bool = False 

76 

77 

78@attrs.define 

79class Blob: 

80 id: str = attrs.field( 

81 factory=new_id, 

82 ) 

83 

84 

85@attrs.define 

86class Chunk(Blob): 

87 """File chunk, have start and end offset, but still can be invalid. 

88 

89 For an array ``b``, a chunk ``c`` represents the slice: 

90 :: 

91 

92 b[c.start_offset:c.end_offset] 

93 """ 

94 

95 start_offset: int = attrs.field(kw_only=True) 

96 """The index of the first byte of the chunk""" 

97 

98 end_offset: int = attrs.field(kw_only=True) 

99 """The index of the first byte after the end of the chunk""" 

100 

101 file: File | None = None 

102 

103 def __attrs_post_init__(self): 

104 if self.start_offset < 0 or self.end_offset < 0: 

105 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

106 if self.start_offset >= self.end_offset: 

107 raise InvalidInputFormat( 

108 f"Chunk has higher start_offset than end_offset: {self}" 

109 ) 

110 

111 @property 

112 def size(self) -> int: 

113 return self.end_offset - self.start_offset 

114 

115 @property 

116 def range_hex(self) -> str: 

117 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

118 

119 @property 

120 def is_whole_file(self): 

121 assert self.file 

122 return self.start_offset == 0 and self.end_offset == self.file.size() 

123 

124 def contains(self, other: Chunk) -> bool: 

125 return ( 

126 self.start_offset < other.start_offset 

127 and self.end_offset >= other.end_offset 

128 ) or ( 

129 self.start_offset <= other.start_offset 

130 and self.end_offset > other.end_offset 

131 ) 

132 

133 def contains_offset(self, offset: int) -> bool: 

134 return self.start_offset <= offset < self.end_offset 

135 

136 def __repr__(self) -> str: 

137 return self.range_hex 

138 

139 

140@attrs.define(repr=False) 

141class ValidChunk(Chunk): 

142 """Known to be valid chunk of a File, can be extracted with an external program.""" 

143 

144 handler: Handler = attrs.field(init=False, eq=False) 

145 is_encrypted: bool = attrs.field(default=False) 

146 

147 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

148 if self.is_encrypted: 

149 logger.warning( 

150 "Encrypted file is not extracted", 

151 path=inpath, 

152 chunk=self, 

153 ) 

154 raise ExtractError 

155 

156 return self.handler.extract(inpath, outdir) 

157 

158 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

159 return ChunkReport( 

160 id=self.id, 

161 start_offset=self.start_offset, 

162 end_offset=self.end_offset, 

163 size=self.size, 

164 handler_name=self.handler.NAME, 

165 is_encrypted=self.is_encrypted, 

166 extraction_reports=extraction_reports, 

167 ) 

168 

169 

170@attrs.define(repr=False) 

171class UnknownChunk(Chunk): 

172 r"""Gaps between valid chunks or otherwise unknown chunks. 

173 

174 Important for manual analysis, and analytical certainty: for example 

175 randomness, other chunks inside it, metadata, etc. 

176 

177 These are not extracted, just logged for information purposes and further analysis, 

178 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

179 """ 

180 

181 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport: 

182 return UnknownChunkReport( 

183 id=self.id, 

184 start_offset=self.start_offset, 

185 end_offset=self.end_offset, 

186 size=self.size, 

187 randomness=randomness, 

188 ) 

189 

190 

191@attrs.define(repr=False) 

192class PaddingChunk(Chunk): 

193 r"""Gaps between valid chunks or otherwise unknown chunks. 

194 

195 Important for manual analysis, and analytical certanity: for example 

196 randomness, other chunks inside it, metadata, etc. 

197 """ 

198 

199 def as_report( 

200 self, 

201 randomness: RandomnessReport | None, # noqa: ARG002 

202 ) -> ChunkReport: 

203 return ChunkReport( 

204 id=self.id, 

205 start_offset=self.start_offset, 

206 end_offset=self.end_offset, 

207 size=self.size, 

208 is_encrypted=False, 

209 handler_name="padding", 

210 extraction_reports=[], 

211 ) 

212 

213 

214@attrs.define 

215class MultiFile(Blob): 

216 name: str = attrs.field(kw_only=True) 

217 paths: list[Path] = attrs.field(kw_only=True) 

218 

219 handler: DirectoryHandler = attrs.field(init=False, eq=False) 

220 

221 def extract(self, outdir: Path) -> ExtractResult | None: 

222 return self.handler.extract(self.paths, outdir) 

223 

224 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

225 return MultiFileReport( 

226 id=self.id, 

227 name=self.name, 

228 paths=self.paths, 

229 handler_name=self.handler.NAME, 

230 extraction_reports=extraction_reports, 

231 ) 

232 

233 

234ReportType = TypeVar("ReportType", bound=Report) 

235 

236 

237class TaskResult(BaseModel): 

238 task: Task 

239 reports: list[Report] = [] 

240 subtasks: list[Task] = [] 

241 

242 @field_validator("reports", mode="before") 

243 @classmethod 

244 def validate_reports(cls, value): 

245 return validate_report_list(value) 

246 

247 def add_report(self, report: Report): 

248 self.reports.append(report) 

249 

250 def add_subtask(self, task: Task): 

251 self.subtasks.append(task) 

252 

253 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

254 return [report for report in self.reports if isinstance(report, report_class)] 

255 

256 

257class ProcessResult(BaseModel): 

258 results: list[TaskResult] = [] 

259 

260 @property 

261 def errors(self) -> list[ErrorReport]: 

262 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

263 interesting_reports = ( 

264 r for r in reports if isinstance(r, ErrorReport | ChunkReport) 

265 ) 

266 errors = [] 

267 for report in interesting_reports: 

268 if isinstance(report, ErrorReport): 

269 errors.append(report) 

270 else: 

271 errors.extend( 

272 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

273 ) 

274 return errors 

275 

276 def register(self, result: TaskResult): 

277 self.results.append(result) 

278 

279 def to_json(self, indent=" "): 

280 return json.dumps( 

281 [ 

282 result.model_dump(mode="json", serialize_as_any=True) 

283 for result in self.results 

284 ], 

285 indent=indent, 

286 ) 

287 

288 def get_output_dir(self) -> Path | None: 

289 try: 

290 top_result = self.results[0] 

291 if carves := top_result.filter_reports(CarveDirectoryReport): 

292 # we have a top level carve 

293 return carves[0].carve_dir 

294 

295 # we either have an extraction, 

296 # and the extract directory registered as subtask 

297 return top_result.subtasks[0].path 

298 except IndexError: 

299 # or no extraction 

300 return None 

301 

302 

303ReportModel = list[TaskResult] 

304ReportModelAdapter = TypeAdapter(ReportModel) 

305"""Use this for deserialization (import JSON report back into Python 

306objects) of the JSON report. 

307 

308For example: 

309 

310with open('report.json', 'r') as f: 

311 data = f.read() 

312 report_data = ReportModelAdapter.validate_json(data) 

313 

314For another example see: 

315tests/test_models.py::Test_to_json::test_process_result_deserialization 

316""" 

317 

318 

319class ExtractError(Exception): 

320 """There was an error during extraction.""" 

321 

322 def __init__(self, *reports: Report): 

323 super().__init__() 

324 self.reports: tuple[Report, ...] = reports 

325 

326 

327@attrs.define(kw_only=True) 

328class ExtractResult: 

329 reports: list[Report] 

330 

331 

332class Extractor(abc.ABC): 

333 def get_dependencies(self) -> list[str]: 

334 """Return the external command dependencies.""" 

335 return [] 

336 

337 @abc.abstractmethod 

338 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

339 """Extract the carved out chunk. 

340 

341 Raises ExtractError on failure. 

342 """ 

343 

344 

345class DirectoryExtractor(abc.ABC): 

346 def get_dependencies(self) -> list[str]: 

347 """Return the external command dependencies.""" 

348 return [] 

349 

350 @abc.abstractmethod 

351 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

352 """Extract from a multi file path list. 

353 

354 Raises ExtractError on failure. 

355 """ 

356 

357 

358class Pattern(str): 

359 def as_regex(self) -> bytes: 

360 raise NotImplementedError 

361 

362 

363class HexString(Pattern): 

364 """Hex string can be a YARA rule like hexadecimal string. 

365 

366 It is useful to simplify defining binary strings using hex 

367 encoding, wild-cards, jumps and alternatives. Hexstrings are 

368 convereted to hyperscan compatible PCRE regex. 

369 

370 See YARA & Hyperscan documentation for more details: 

371 

372 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

373 

374 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

375 

376 You can specify the following: 

377 

378 - normal bytes using hexadecimals: 01 de ad co de ff 

379 

380 - wild-cards can match single bytes and can be mixed with 

381 normal hex: 01 ?? 02 

382 

383 - wild-cards can also match first and second nibles: 0? ?0 

384 

385 - jumps can be specified for multiple wildcard bytes: [3] 

386 [2-5] 

387 

388 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

389 above can be combined and alternatives nested: 01 02 ( 03 04 

390 | (0? | 03 | ?0) | 05 ?? ) 06 

391 

392 Single line comments can be specified using // 

393 

394 We do NOT support the following YARA syntax: 

395 

396 - comments using /* */ notation 

397 

398 - infinite jumps: [-] 

399 

400 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

401 """ 

402 

403 def as_regex(self) -> bytes: 

404 return hexstring2regex(self) 

405 

406 

407class Regex(Pattern): 

408 """Byte PCRE regex. 

409 

410 See hyperscan documentation for more details: 

411 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

412 """ 

413 

414 def as_regex(self) -> bytes: 

415 return self.encode() 

416 

417 

418class DirectoryPattern: 

419 def get_files(self, directory: Path) -> Iterable[Path]: 

420 raise NotImplementedError 

421 

422 

423class Glob(DirectoryPattern): 

424 def __init__(self, *patterns): 

425 if not patterns: 

426 raise ValueError("At least one pattern must be provided") 

427 self._patterns = patterns 

428 

429 def get_files(self, directory: Path) -> Iterable[Path]: 

430 for pattern in self._patterns: 

431 yield from directory.glob(pattern) 

432 

433 

434class SingleFile(DirectoryPattern): 

435 def __init__(self, filename): 

436 self._filename = filename 

437 

438 def get_files(self, directory: Path) -> Iterable[Path]: 

439 path = directory / self._filename 

440 return [path] if path.exists() else [] 

441 

442 

443DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor) 

444 

445 

446class DirectoryHandler(abc.ABC, Generic[DExtractor]): 

447 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

448 

449 NAME: str 

450 

451 EXTRACTOR: DExtractor 

452 

453 PATTERN: DirectoryPattern 

454 

455 DOC: HandlerDoc | None 

456 

457 @classmethod 

458 def get_dependencies(cls): 

459 """Return external command dependencies needed for this handler to work.""" 

460 if cls.EXTRACTOR is not None: 

461 return cls.EXTRACTOR.get_dependencies() 

462 return [] 

463 

464 @abc.abstractmethod 

465 def calculate_multifile(self, file: Path) -> MultiFile | None: 

466 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

467 

468 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

469 if self.EXTRACTOR is None: 

470 logger.debug("Skipping file: no extractor.", paths=paths) 

471 raise ExtractError 

472 

473 # We only extract every blob once, it's a mistake to extract the same blob again 

474 outdir.mkdir(parents=True, exist_ok=False) 

475 

476 return self.EXTRACTOR.extract(paths, outdir) 

477 

478 

479TExtractor = TypeVar("TExtractor", bound=None | Extractor) 

480 

481 

482class Handler(abc.ABC, Generic[TExtractor]): 

483 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

484 

485 NAME: str 

486 PATTERNS: list[Pattern] 

487 # We need this, because not every match reflects the actual start 

488 # (e.g. tar magic is in the middle of the header) 

489 PATTERN_MATCH_OFFSET: int = 0 

490 

491 EXTRACTOR: TExtractor 

492 

493 DOC: HandlerDoc | None 

494 

495 @classmethod 

496 def get_dependencies(cls): 

497 """Return external command dependencies needed for this handler to work.""" 

498 if cls.EXTRACTOR is not None: 

499 return cls.EXTRACTOR.get_dependencies() 

500 return [] 

501 

502 @abc.abstractmethod 

503 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

504 """Calculate the Chunk offsets from the File and the file type headers.""" 

505 

506 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

507 if self.EXTRACTOR is None: 

508 logger.debug("Skipping file: no extractor.", path=inpath) 

509 raise ExtractError 

510 

511 # We only extract every blob once, it's a mistake to extract the same blob again 

512 outdir.mkdir(parents=True, exist_ok=False) 

513 

514 return self.EXTRACTOR.extract(inpath, outdir) 

515 

516 

517class StructHandler(Handler): 

518 C_DEFINITIONS: str 

519 # A struct from the C_DEFINITIONS used to parse the file's header 

520 HEADER_STRUCT: str 

521 

522 def __init__(self): 

523 self._struct_parser = StructParser(self.C_DEFINITIONS) 

524 

525 @property 

526 def cparser_le(self): 

527 return self._struct_parser.cparser_le 

528 

529 @property 

530 def cparser_be(self): 

531 return self._struct_parser.cparser_be 

532 

533 def parse_header(self, file: File, endian=Endian.LITTLE): 

534 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

535 logger.debug("Header parsed", header=header, _verbosity=3) 

536 return header 

537 

538 

539Handlers = tuple[type[Handler], ...] 

540DirectoryHandlers = tuple[type[DirectoryHandler], ...]