Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 74%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

246 statements  

1from __future__ import annotations 

2 

3import abc 

4import dataclasses 

5import itertools 

6import json 

7from enum import Enum 

8from pathlib import Path # noqa: TC003 

9from typing import TYPE_CHECKING, Generic, TypeVar 

10 

11import attrs 

12from pydantic import BaseModel, TypeAdapter, field_validator 

13from structlog import get_logger 

14 

15from .file_utils import Endian, File, InvalidInputFormat, StructParser 

16from .identifiers import new_id 

17from .parser import hexstring2regex 

18from .report import ( 

19 CarveDirectoryReport, 

20 ChunkReport, 

21 ErrorReport, 

22 MultiFileReport, 

23 RandomnessReport, 

24 Report, 

25 UnknownChunkReport, 

26 validate_report_list, 

27) 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Iterable 

31 

32logger = get_logger() 

33 

34# The state transitions are: 

35# 

36# file ──► pattern match ──► ValidChunk 

37# 

38 

39 

40class HandlerType(Enum): 

41 ARCHIVE = "Archive" 

42 COMPRESSION = "Compression" 

43 FILESYSTEM = "FileSystem" 

44 EXECUTABLE = "Executable" 

45 BAREMETAL = "Baremetal" 

46 BOOTLOADER = "Bootloader" 

47 ENCRYPTION = "Encryption" 

48 

49 

50@dataclasses.dataclass(frozen=True) 

51class Reference: 

52 title: str 

53 url: str 

54 

55 

56@dataclasses.dataclass 

57class HandlerDoc: 

58 name: str 

59 description: str | None 

60 vendor: str | None 

61 references: list[Reference] 

62 limitations: list[str] 

63 handler_type: HandlerType 

64 fully_supported: bool = dataclasses.field(init=False) 

65 

66 def __post_init__(self): 

67 self.fully_supported = len(self.limitations) == 0 

68 

69 

70class Task(BaseModel): 

71 path: Path 

72 depth: int 

73 blob_id: str 

74 is_multi_file: bool = False 

75 

76 

77@attrs.define 

78class Blob: 

79 id: str = attrs.field( 

80 factory=new_id, 

81 ) 

82 

83 

84@attrs.define 

85class Chunk(Blob): 

86 """File chunk, have start and end offset, but still can be invalid. 

87 

88 For an array ``b``, a chunk ``c`` represents the slice: 

89 :: 

90 

91 b[c.start_offset:c.end_offset] 

92 """ 

93 

94 start_offset: int = attrs.field(kw_only=True) 

95 """The index of the first byte of the chunk""" 

96 

97 end_offset: int = attrs.field(kw_only=True) 

98 """The index of the first byte after the end of the chunk""" 

99 

100 file: File | None = None 

101 

102 def __attrs_post_init__(self): 

103 if self.start_offset < 0 or self.end_offset < 0: 

104 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

105 if self.start_offset >= self.end_offset: 

106 raise InvalidInputFormat( 

107 f"Chunk has higher start_offset than end_offset: {self}" 

108 ) 

109 

110 @property 

111 def size(self) -> int: 

112 return self.end_offset - self.start_offset 

113 

114 @property 

115 def range_hex(self) -> str: 

116 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

117 

118 @property 

119 def is_whole_file(self): 

120 assert self.file 

121 return self.start_offset == 0 and self.end_offset == self.file.size() 

122 

123 def contains(self, other: Chunk) -> bool: 

124 return ( 

125 self.start_offset < other.start_offset 

126 and self.end_offset >= other.end_offset 

127 ) or ( 

128 self.start_offset <= other.start_offset 

129 and self.end_offset > other.end_offset 

130 ) 

131 

132 def contains_offset(self, offset: int) -> bool: 

133 return self.start_offset <= offset < self.end_offset 

134 

135 def __repr__(self) -> str: 

136 return self.range_hex 

137 

138 

139@attrs.define(repr=False) 

140class ValidChunk(Chunk): 

141 """Known to be valid chunk of a File, can be extracted with an external program.""" 

142 

143 handler: Handler = attrs.field(init=False, eq=False) 

144 is_encrypted: bool = attrs.field(default=False) 

145 

146 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

147 if self.is_encrypted: 

148 logger.warning( 

149 "Encrypted file is not extracted", 

150 path=inpath, 

151 chunk=self, 

152 ) 

153 raise ExtractError 

154 

155 return self.handler.extract(inpath, outdir) 

156 

157 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

158 return ChunkReport( 

159 id=self.id, 

160 start_offset=self.start_offset, 

161 end_offset=self.end_offset, 

162 size=self.size, 

163 handler_name=self.handler.NAME, 

164 is_encrypted=self.is_encrypted, 

165 extraction_reports=extraction_reports, 

166 ) 

167 

168 

169@attrs.define(repr=False) 

170class UnknownChunk(Chunk): 

171 r"""Gaps between valid chunks or otherwise unknown chunks. 

172 

173 Important for manual analysis, and analytical certainty: for example 

174 randomness, other chunks inside it, metadata, etc. 

175 

176 These are not extracted, just logged for information purposes and further analysis, 

177 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

178 """ 

179 

180 def as_report(self, randomness: RandomnessReport | None) -> UnknownChunkReport: 

181 return UnknownChunkReport( 

182 id=self.id, 

183 start_offset=self.start_offset, 

184 end_offset=self.end_offset, 

185 size=self.size, 

186 randomness=randomness, 

187 ) 

188 

189 

190@attrs.define(repr=False) 

191class PaddingChunk(Chunk): 

192 r"""Gaps between valid chunks or otherwise unknown chunks. 

193 

194 Important for manual analysis, and analytical certanity: for example 

195 randomness, other chunks inside it, metadata, etc. 

196 """ 

197 

198 def as_report( 

199 self, 

200 randomness: RandomnessReport | None, # noqa: ARG002 

201 ) -> ChunkReport: 

202 return ChunkReport( 

203 id=self.id, 

204 start_offset=self.start_offset, 

205 end_offset=self.end_offset, 

206 size=self.size, 

207 is_encrypted=False, 

208 handler_name="padding", 

209 extraction_reports=[], 

210 ) 

211 

212 

213@attrs.define 

214class MultiFile(Blob): 

215 name: str = attrs.field(kw_only=True) 

216 paths: list[Path] = attrs.field(kw_only=True) 

217 

218 handler: DirectoryHandler = attrs.field(init=False, eq=False) 

219 

220 def extract(self, outdir: Path) -> ExtractResult | None: 

221 return self.handler.extract(self.paths, outdir) 

222 

223 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

224 return MultiFileReport( 

225 id=self.id, 

226 name=self.name, 

227 paths=self.paths, 

228 handler_name=self.handler.NAME, 

229 extraction_reports=extraction_reports, 

230 ) 

231 

232 

233ReportType = TypeVar("ReportType", bound=Report) 

234 

235 

236class TaskResult(BaseModel): 

237 task: Task 

238 reports: list[Report] = [] 

239 subtasks: list[Task] = [] 

240 

241 @field_validator("reports", mode="before") 

242 @classmethod 

243 def validate_reports(cls, value): 

244 return validate_report_list(value) 

245 

246 def add_report(self, report: Report): 

247 self.reports.append(report) 

248 

249 def add_subtask(self, task: Task): 

250 self.subtasks.append(task) 

251 

252 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

253 return [report for report in self.reports if isinstance(report, report_class)] 

254 

255 

256class ProcessResult(BaseModel): 

257 results: list[TaskResult] = [] 

258 

259 @property 

260 def errors(self) -> list[ErrorReport]: 

261 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

262 interesting_reports = ( 

263 r for r in reports if isinstance(r, ErrorReport | ChunkReport) 

264 ) 

265 errors = [] 

266 for report in interesting_reports: 

267 if isinstance(report, ErrorReport): 

268 errors.append(report) 

269 else: 

270 errors.extend( 

271 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

272 ) 

273 return errors 

274 

275 def register(self, result: TaskResult): 

276 self.results.append(result) 

277 

278 def to_json(self, indent=" "): 

279 return json.dumps( 

280 [ 

281 result.model_dump(mode="json", serialize_as_any=True) 

282 for result in self.results 

283 ], 

284 indent=indent, 

285 ) 

286 

287 def get_output_dir(self) -> Path | None: 

288 try: 

289 top_result = self.results[0] 

290 if carves := top_result.filter_reports(CarveDirectoryReport): 

291 # we have a top level carve 

292 return carves[0].carve_dir 

293 

294 # we either have an extraction, 

295 # and the extract directory registered as subtask 

296 return top_result.subtasks[0].path 

297 except IndexError: 

298 # or no extraction 

299 return None 

300 

301 

302ReportModel = list[TaskResult] 

303ReportModelAdapter = TypeAdapter(ReportModel) 

304"""Use this for deserialization (import JSON report back into Python 

305objects) of the JSON report. 

306 

307For example: 

308 

309with open('report.json', 'r') as f: 

310 data = f.read() 

311 report_data = ReportModelAdapter.validate_json(data) 

312 

313For another example see: 

314tests/test_models.py::Test_to_json::test_process_result_deserialization 

315""" 

316 

317 

318class ExtractError(Exception): 

319 """There was an error during extraction.""" 

320 

321 def __init__(self, *reports: Report): 

322 super().__init__() 

323 self.reports: tuple[Report, ...] = reports 

324 

325 

326@attrs.define(kw_only=True) 

327class ExtractResult: 

328 reports: list[Report] 

329 

330 

331class Extractor(abc.ABC): 

332 def get_dependencies(self) -> list[str]: 

333 """Return the external command dependencies.""" 

334 return [] 

335 

336 @abc.abstractmethod 

337 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

338 """Extract the carved out chunk. 

339 

340 Raises ExtractError on failure. 

341 """ 

342 

343 

344class DirectoryExtractor(abc.ABC): 

345 def get_dependencies(self) -> list[str]: 

346 """Return the external command dependencies.""" 

347 return [] 

348 

349 @abc.abstractmethod 

350 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

351 """Extract from a multi file path list. 

352 

353 Raises ExtractError on failure. 

354 """ 

355 

356 

357class Pattern(str): 

358 def as_regex(self) -> bytes: 

359 raise NotImplementedError 

360 

361 

362class HexString(Pattern): 

363 """Hex string can be a YARA rule like hexadecimal string. 

364 

365 It is useful to simplify defining binary strings using hex 

366 encoding, wild-cards, jumps and alternatives. Hexstrings are 

367 convereted to hyperscan compatible PCRE regex. 

368 

369 See YARA & Hyperscan documentation for more details: 

370 

371 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

372 

373 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

374 

375 You can specify the following: 

376 

377 - normal bytes using hexadecimals: 01 de ad co de ff 

378 

379 - wild-cards can match single bytes and can be mixed with 

380 normal hex: 01 ?? 02 

381 

382 - wild-cards can also match first and second nibles: 0? ?0 

383 

384 - jumps can be specified for multiple wildcard bytes: [3] 

385 [2-5] 

386 

387 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

388 above can be combined and alternatives nested: 01 02 ( 03 04 

389 | (0? | 03 | ?0) | 05 ?? ) 06 

390 

391 Single line comments can be specified using // 

392 

393 We do NOT support the following YARA syntax: 

394 

395 - comments using /* */ notation 

396 

397 - infinite jumps: [-] 

398 

399 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

400 """ 

401 

402 def as_regex(self) -> bytes: 

403 return hexstring2regex(self) 

404 

405 

406class Regex(Pattern): 

407 """Byte PCRE regex. 

408 

409 See hyperscan documentation for more details: 

410 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

411 """ 

412 

413 def as_regex(self) -> bytes: 

414 return self.encode() 

415 

416 

417class DirectoryPattern: 

418 def get_files(self, directory: Path) -> Iterable[Path]: 

419 raise NotImplementedError 

420 

421 

422class Glob(DirectoryPattern): 

423 def __init__(self, *patterns): 

424 if not patterns: 

425 raise ValueError("At least one pattern must be provided") 

426 self._patterns = patterns 

427 

428 def get_files(self, directory: Path) -> Iterable[Path]: 

429 for pattern in self._patterns: 

430 yield from directory.glob(pattern) 

431 

432 

433class SingleFile(DirectoryPattern): 

434 def __init__(self, filename): 

435 self._filename = filename 

436 

437 def get_files(self, directory: Path) -> Iterable[Path]: 

438 path = directory / self._filename 

439 return [path] if path.exists() else [] 

440 

441 

442DExtractor = TypeVar("DExtractor", bound=None | DirectoryExtractor) 

443 

444 

445class DirectoryHandler(abc.ABC, Generic[DExtractor]): 

446 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

447 

448 NAME: str 

449 

450 EXTRACTOR: DExtractor 

451 

452 PATTERN: DirectoryPattern 

453 

454 DOC: HandlerDoc | None 

455 

456 @classmethod 

457 def get_dependencies(cls): 

458 """Return external command dependencies needed for this handler to work.""" 

459 if cls.EXTRACTOR is not None: 

460 return cls.EXTRACTOR.get_dependencies() 

461 return [] 

462 

463 @abc.abstractmethod 

464 def calculate_multifile(self, file: Path) -> MultiFile | None: 

465 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

466 

467 def extract(self, paths: list[Path], outdir: Path) -> ExtractResult | None: 

468 if self.EXTRACTOR is None: 

469 logger.debug("Skipping file: no extractor.", paths=paths) 

470 raise ExtractError 

471 

472 # We only extract every blob once, it's a mistake to extract the same blob again 

473 outdir.mkdir(parents=True, exist_ok=False) 

474 

475 return self.EXTRACTOR.extract(paths, outdir) 

476 

477 

478TExtractor = TypeVar("TExtractor", bound=None | Extractor) 

479 

480 

481class Handler(abc.ABC, Generic[TExtractor]): 

482 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

483 

484 NAME: str 

485 PATTERNS: list[Pattern] 

486 # We need this, because not every match reflects the actual start 

487 # (e.g. tar magic is in the middle of the header) 

488 PATTERN_MATCH_OFFSET: int = 0 

489 

490 EXTRACTOR: TExtractor 

491 

492 DOC: HandlerDoc | None 

493 

494 @classmethod 

495 def get_dependencies(cls): 

496 """Return external command dependencies needed for this handler to work.""" 

497 if cls.EXTRACTOR is not None: 

498 return cls.EXTRACTOR.get_dependencies() 

499 return [] 

500 

501 @abc.abstractmethod 

502 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

503 """Calculate the Chunk offsets from the File and the file type headers.""" 

504 

505 def extract(self, inpath: Path, outdir: Path) -> ExtractResult | None: 

506 if self.EXTRACTOR is None: 

507 logger.debug("Skipping file: no extractor.", path=inpath) 

508 raise ExtractError 

509 

510 # We only extract every blob once, it's a mistake to extract the same blob again 

511 outdir.mkdir(parents=True, exist_ok=False) 

512 

513 return self.EXTRACTOR.extract(inpath, outdir) 

514 

515 

516class StructHandler(Handler): 

517 C_DEFINITIONS: str 

518 # A struct from the C_DEFINITIONS used to parse the file's header 

519 HEADER_STRUCT: str 

520 

521 def __init__(self): 

522 self._struct_parser = StructParser(self.C_DEFINITIONS) 

523 

524 @property 

525 def cparser_le(self): 

526 return self._struct_parser.cparser_le 

527 

528 @property 

529 def cparser_be(self): 

530 return self._struct_parser.cparser_be 

531 

532 def parse_header(self, file: File, endian=Endian.LITTLE): 

533 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

534 logger.debug("Header parsed", header=header, _verbosity=3) 

535 return header 

536 

537 

538Handlers = tuple[type[Handler], ...] 

539DirectoryHandlers = tuple[type[DirectoryHandler], ...]