Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

237 statements  

1import abc 

2import dataclasses 

3import itertools 

4import json 

5from collections.abc import Iterable 

6from enum import Enum 

7from pathlib import Path 

8from typing import Generic, Optional, TypeVar, Union 

9 

10import attrs 

11from pydantic import BaseModel, TypeAdapter 

12from structlog import get_logger 

13 

14from .file_utils import Endian, File, InvalidInputFormat, StructParser 

15from .identifiers import new_id 

16from .parser import hexstring2regex 

17from .report import ( 

18 CarveDirectoryReport, 

19 ChunkReport, 

20 ErrorReport, 

21 MultiFileReport, 

22 RandomnessReport, 

23 Report, 

24 UnknownChunkReport, 

25) 

26 

27logger = get_logger() 

28 

29# The state transitions are: 

30# 

31# file ──► pattern match ──► ValidChunk 

32# 

33 

34 

35class HandlerType(Enum): 

36 ARCHIVE = "Archive" 

37 COMPRESSION = "Compression" 

38 FILESYSTEM = "FileSystem" 

39 EXECUTABLE = "Executable" 

40 BAREMETAL = "Baremetal" 

41 BOOTLOADER = "Bootloader" 

42 ENCRYPTION = "Encryption" 

43 

44 

45@dataclasses.dataclass(frozen=True) 

46class Reference: 

47 title: str 

48 url: str 

49 

50 

51@dataclasses.dataclass 

52class HandlerDoc: 

53 name: str 

54 description: Union[str, None] 

55 vendor: Union[str, None] 

56 references: list[Reference] 

57 limitations: list[str] 

58 handler_type: HandlerType 

59 fully_supported: bool = dataclasses.field(init=False) 

60 

61 def __post_init__(self): 

62 self.fully_supported = len(self.limitations) == 0 

63 

64 

65class Task(BaseModel): 

66 path: Path 

67 depth: int 

68 blob_id: str 

69 is_multi_file: bool = False 

70 

71 

72@attrs.define 

73class Blob: 

74 id: str = attrs.field( 

75 factory=new_id, 

76 ) 

77 

78 

79@attrs.define 

80class Chunk(Blob): 

81 """File chunk, have start and end offset, but still can be invalid. 

82 

83 For an array ``b``, a chunk ``c`` represents the slice: 

84 :: 

85 

86 b[c.start_offset:c.end_offset] 

87 """ 

88 

89 start_offset: int = attrs.field(kw_only=True) 

90 """The index of the first byte of the chunk""" 

91 

92 end_offset: int = attrs.field(kw_only=True) 

93 """The index of the first byte after the end of the chunk""" 

94 

95 file: Optional[File] = None 

96 

97 def __attrs_post_init__(self): 

98 if self.start_offset < 0 or self.end_offset < 0: 

99 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

100 if self.start_offset >= self.end_offset: 

101 raise InvalidInputFormat( 

102 f"Chunk has higher start_offset than end_offset: {self}" 

103 ) 

104 

105 @property 

106 def size(self) -> int: 

107 return self.end_offset - self.start_offset 

108 

109 @property 

110 def range_hex(self) -> str: 

111 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

112 

113 @property 

114 def is_whole_file(self): 

115 assert self.file 

116 return self.start_offset == 0 and self.end_offset == self.file.size() 

117 

118 def contains(self, other: "Chunk") -> bool: 

119 return ( 

120 self.start_offset < other.start_offset 

121 and self.end_offset >= other.end_offset 

122 ) or ( 

123 self.start_offset <= other.start_offset 

124 and self.end_offset > other.end_offset 

125 ) 

126 

127 def contains_offset(self, offset: int) -> bool: 

128 return self.start_offset <= offset < self.end_offset 

129 

130 def __repr__(self) -> str: 

131 return self.range_hex 

132 

133 

134@attrs.define(repr=False) 

135class ValidChunk(Chunk): 

136 """Known to be valid chunk of a File, can be extracted with an external program.""" 

137 

138 handler: "Handler" = attrs.field(init=False, eq=False) 

139 is_encrypted: bool = attrs.field(default=False) 

140 

141 def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: 

142 if self.is_encrypted: 

143 logger.warning( 

144 "Encrypted file is not extracted", 

145 path=inpath, 

146 chunk=self, 

147 ) 

148 raise ExtractError 

149 

150 return self.handler.extract(inpath, outdir) 

151 

152 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

153 return ChunkReport( 

154 id=self.id, 

155 start_offset=self.start_offset, 

156 end_offset=self.end_offset, 

157 size=self.size, 

158 handler_name=self.handler.NAME, 

159 is_encrypted=self.is_encrypted, 

160 extraction_reports=extraction_reports, 

161 ) 

162 

163 

164@attrs.define(repr=False) 

165class UnknownChunk(Chunk): 

166 r"""Gaps between valid chunks or otherwise unknown chunks. 

167 

168 Important for manual analysis, and analytical certainty: for example 

169 randomness, other chunks inside it, metadata, etc. 

170 

171 These are not extracted, just logged for information purposes and further analysis, 

172 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

173 """ 

174 

175 def as_report(self, randomness: Optional[RandomnessReport]) -> UnknownChunkReport: 

176 return UnknownChunkReport( 

177 id=self.id, 

178 start_offset=self.start_offset, 

179 end_offset=self.end_offset, 

180 size=self.size, 

181 randomness=randomness, 

182 ) 

183 

184 

185@attrs.define(repr=False) 

186class PaddingChunk(Chunk): 

187 r"""Gaps between valid chunks or otherwise unknown chunks. 

188 

189 Important for manual analysis, and analytical certanity: for example 

190 randomness, other chunks inside it, metadata, etc. 

191 """ 

192 

193 def as_report( 

194 self, 

195 randomness: Optional[RandomnessReport], # noqa: ARG002 

196 ) -> ChunkReport: 

197 return ChunkReport( 

198 id=self.id, 

199 start_offset=self.start_offset, 

200 end_offset=self.end_offset, 

201 size=self.size, 

202 is_encrypted=False, 

203 handler_name="padding", 

204 extraction_reports=[], 

205 ) 

206 

207 

208@attrs.define 

209class MultiFile(Blob): 

210 name: str = attrs.field(kw_only=True) 

211 paths: list[Path] = attrs.field(kw_only=True) 

212 

213 handler: "DirectoryHandler" = attrs.field(init=False, eq=False) 

214 

215 def extract(self, outdir: Path) -> Optional["ExtractResult"]: 

216 return self.handler.extract(self.paths, outdir) 

217 

218 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

219 return MultiFileReport( 

220 id=self.id, 

221 name=self.name, 

222 paths=self.paths, 

223 handler_name=self.handler.NAME, 

224 extraction_reports=extraction_reports, 

225 ) 

226 

227 

228ReportType = TypeVar("ReportType", bound=Report) 

229 

230 

231class TaskResult(BaseModel): 

232 task: Task 

233 reports: list[Report] = [] 

234 subtasks: list[Task] = [] 

235 

236 def add_report(self, report: Report): 

237 self.reports.append(report) 

238 

239 def add_subtask(self, task: Task): 

240 self.subtasks.append(task) 

241 

242 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

243 return [report for report in self.reports if isinstance(report, report_class)] 

244 

245 

246class ProcessResult(BaseModel): 

247 results: list[TaskResult] = [] 

248 

249 @property 

250 def errors(self) -> list[ErrorReport]: 

251 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

252 interesting_reports = ( 

253 r for r in reports if isinstance(r, (ErrorReport, ChunkReport)) 

254 ) 

255 errors = [] 

256 for report in interesting_reports: 

257 if isinstance(report, ErrorReport): 

258 errors.append(report) 

259 else: 

260 errors.extend( 

261 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

262 ) 

263 return errors 

264 

265 def register(self, result: TaskResult): 

266 self.results.append(result) 

267 

268 def to_json(self, indent=" "): 

269 return json.dumps( 

270 [result.model_dump(mode="json") for result in self.results], indent=indent 

271 ) 

272 

273 def get_output_dir(self) -> Optional[Path]: 

274 try: 

275 top_result = self.results[0] 

276 if carves := top_result.filter_reports(CarveDirectoryReport): 

277 # we have a top level carve 

278 return carves[0].carve_dir 

279 

280 # we either have an extraction, 

281 # and the extract directory registered as subtask 

282 return top_result.subtasks[0].path 

283 except IndexError: 

284 # or no extraction 

285 return None 

286 

287 

288ReportModel = list[TaskResult] 

289ReportModelAdapter = TypeAdapter(ReportModel) 

290"""Use this for deserialization (import JSON report back into Python 

291objects) of the JSON report. 

292 

293For example: 

294 

295with open('report.json', 'r') as f: 

296 data = f.read() 

297 report_data = ReportModelAdapter.validate_json(data) 

298 

299For another example see: 

300tests/test_models.py::Test_to_json::test_process_result_deserialization 

301""" 

302 

303 

304class ExtractError(Exception): 

305 """There was an error during extraction.""" 

306 

307 def __init__(self, *reports: Report): 

308 super().__init__() 

309 self.reports: tuple[Report, ...] = reports 

310 

311 

312@attrs.define(kw_only=True) 

313class ExtractResult: 

314 reports: list[Report] 

315 

316 

317class Extractor(abc.ABC): 

318 def get_dependencies(self) -> list[str]: 

319 """Return the external command dependencies.""" 

320 return [] 

321 

322 @abc.abstractmethod 

323 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

324 """Extract the carved out chunk. 

325 

326 Raises ExtractError on failure. 

327 """ 

328 

329 

330class DirectoryExtractor(abc.ABC): 

331 def get_dependencies(self) -> list[str]: 

332 """Return the external command dependencies.""" 

333 return [] 

334 

335 @abc.abstractmethod 

336 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

337 """Extract from a multi file path list. 

338 

339 Raises ExtractError on failure. 

340 """ 

341 

342 

343class Pattern(str): 

344 def as_regex(self) -> bytes: 

345 raise NotImplementedError 

346 

347 

348class HexString(Pattern): 

349 """Hex string can be a YARA rule like hexadecimal string. 

350 

351 It is useful to simplify defining binary strings using hex 

352 encoding, wild-cards, jumps and alternatives. Hexstrings are 

353 convereted to hyperscan compatible PCRE regex. 

354 

355 See YARA & Hyperscan documentation for more details: 

356 

357 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

358 

359 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

360 

361 You can specify the following: 

362 

363 - normal bytes using hexadecimals: 01 de ad co de ff 

364 

365 - wild-cards can match single bytes and can be mixed with 

366 normal hex: 01 ?? 02 

367 

368 - wild-cards can also match first and second nibles: 0? ?0 

369 

370 - jumps can be specified for multiple wildcard bytes: [3] 

371 [2-5] 

372 

373 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

374 above can be combined and alternatives nested: 01 02 ( 03 04 

375 | (0? | 03 | ?0) | 05 ?? ) 06 

376 

377 Single line comments can be specified using // 

378 

379 We do NOT support the following YARA syntax: 

380 

381 - comments using /* */ notation 

382 

383 - infinite jumps: [-] 

384 

385 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

386 """ 

387 

388 def as_regex(self) -> bytes: 

389 return hexstring2regex(self) 

390 

391 

392class Regex(Pattern): 

393 """Byte PCRE regex. 

394 

395 See hyperscan documentation for more details: 

396 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

397 """ 

398 

399 def as_regex(self) -> bytes: 

400 return self.encode() 

401 

402 

403class DirectoryPattern: 

404 def get_files(self, directory: Path) -> Iterable[Path]: 

405 raise NotImplementedError 

406 

407 

408class Glob(DirectoryPattern): 

409 def __init__(self, pattern): 

410 self._pattern = pattern 

411 

412 def get_files(self, directory: Path) -> Iterable[Path]: 

413 return directory.glob(self._pattern) 

414 

415 

416class SingleFile(DirectoryPattern): 

417 def __init__(self, filename): 

418 self._filename = filename 

419 

420 def get_files(self, directory: Path) -> Iterable[Path]: 

421 path = directory / self._filename 

422 return [path] if path.exists() else [] 

423 

424 

425DExtractor = TypeVar("DExtractor", bound=Union[None, DirectoryExtractor]) 

426 

427 

428class DirectoryHandler(abc.ABC, Generic[DExtractor]): 

429 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

430 

431 NAME: str 

432 

433 EXTRACTOR: DExtractor 

434 

435 PATTERN: DirectoryPattern 

436 

437 DOC: Union[HandlerDoc, None] 

438 

439 @classmethod 

440 def get_dependencies(cls): 

441 """Return external command dependencies needed for this handler to work.""" 

442 if cls.EXTRACTOR is not None: 

443 return cls.EXTRACTOR.get_dependencies() 

444 return [] 

445 

446 @abc.abstractmethod 

447 def calculate_multifile(self, file: Path) -> Optional[MultiFile]: 

448 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

449 

450 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

451 if self.EXTRACTOR is None: 

452 logger.debug("Skipping file: no extractor.", paths=paths) 

453 raise ExtractError 

454 

455 # We only extract every blob once, it's a mistake to extract the same blob again 

456 outdir.mkdir(parents=True, exist_ok=False) 

457 

458 return self.EXTRACTOR.extract(paths, outdir) 

459 

460 

461TExtractor = TypeVar("TExtractor", bound=Union[None, Extractor]) 

462 

463 

464class Handler(abc.ABC, Generic[TExtractor]): 

465 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

466 

467 NAME: str 

468 PATTERNS: list[Pattern] 

469 # We need this, because not every match reflects the actual start 

470 # (e.g. tar magic is in the middle of the header) 

471 PATTERN_MATCH_OFFSET: int = 0 

472 

473 EXTRACTOR: TExtractor 

474 

475 DOC: Union[HandlerDoc, None] 

476 

477 @classmethod 

478 def get_dependencies(cls): 

479 """Return external command dependencies needed for this handler to work.""" 

480 if cls.EXTRACTOR is not None: 

481 return cls.EXTRACTOR.get_dependencies() 

482 return [] 

483 

484 @abc.abstractmethod 

485 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

486 """Calculate the Chunk offsets from the File and the file type headers.""" 

487 

488 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

489 if self.EXTRACTOR is None: 

490 logger.debug("Skipping file: no extractor.", path=inpath) 

491 raise ExtractError 

492 

493 # We only extract every blob once, it's a mistake to extract the same blob again 

494 outdir.mkdir(parents=True, exist_ok=False) 

495 

496 return self.EXTRACTOR.extract(inpath, outdir) 

497 

498 

499class StructHandler(Handler): 

500 C_DEFINITIONS: str 

501 # A struct from the C_DEFINITIONS used to parse the file's header 

502 HEADER_STRUCT: str 

503 

504 def __init__(self): 

505 self._struct_parser = StructParser(self.C_DEFINITIONS) 

506 

507 @property 

508 def cparser_le(self): 

509 return self._struct_parser.cparser_le 

510 

511 @property 

512 def cparser_be(self): 

513 return self._struct_parser.cparser_be 

514 

515 def parse_header(self, file: File, endian=Endian.LITTLE): 

516 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

517 logger.debug("Header parsed", header=header, _verbosity=3) 

518 return header 

519 

520 

521Handlers = tuple[type[Handler], ...] 

522DirectoryHandlers = tuple[type[DirectoryHandler], ...]