Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1import abc 

2import dataclasses 

3import itertools 

4import json 

5from collections.abc import Iterable 

6from enum import Enum 

7from pathlib import Path 

8from typing import Generic, Optional, TypeVar, Union 

9 

10import attrs 

11from pydantic import BaseModel, TypeAdapter 

12from structlog import get_logger 

13 

14from .file_utils import Endian, File, InvalidInputFormat, StructParser 

15from .identifiers import new_id 

16from .parser import hexstring2regex 

17from .report import ( 

18 CarveDirectoryReport, 

19 ChunkReport, 

20 ErrorReport, 

21 MultiFileReport, 

22 RandomnessReport, 

23 Report, 

24 UnknownChunkReport, 

25) 

26 

27logger = get_logger() 

28 

29# The state transitions are: 

30# 

31# file ──► pattern match ──► ValidChunk 

32# 

33 

34 

35class HandlerType(Enum): 

36 ARCHIVE = "Archive" 

37 COMPRESSION = "Compression" 

38 FILESYSTEM = "FileSystem" 

39 EXECUTABLE = "Executable" 

40 BAREMETAL = "Baremetal" 

41 BOOTLOADER = "Bootloader" 

42 ENCRYPTION = "Encryption" 

43 

44 

45@dataclasses.dataclass(frozen=True) 

46class Reference: 

47 title: str 

48 url: str 

49 

50 

51@dataclasses.dataclass 

52class HandlerDoc: 

53 name: str 

54 description: Union[str, None] 

55 vendor: Union[str, None] 

56 references: list[Reference] 

57 limitations: list[str] 

58 handler_type: HandlerType 

59 fully_supported: bool = dataclasses.field(init=False) 

60 

61 def __post_init__(self): 

62 self.fully_supported = len(self.limitations) == 0 

63 

64 

65class Task(BaseModel): 

66 path: Path 

67 depth: int 

68 blob_id: str 

69 is_multi_file: bool = False 

70 

71 

72@attrs.define 

73class Blob: 

74 id: str = attrs.field( 

75 factory=new_id, 

76 ) 

77 

78 

79@attrs.define 

80class Chunk(Blob): 

81 """File chunk, have start and end offset, but still can be invalid. 

82 

83 For an array ``b``, a chunk ``c`` represents the slice: 

84 :: 

85 

86 b[c.start_offset:c.end_offset] 

87 """ 

88 

89 start_offset: int = attrs.field(kw_only=True) 

90 """The index of the first byte of the chunk""" 

91 

92 end_offset: int = attrs.field(kw_only=True) 

93 """The index of the first byte after the end of the chunk""" 

94 

95 file: Optional[File] = None 

96 

97 def __attrs_post_init__(self): 

98 if self.start_offset < 0 or self.end_offset < 0: 

99 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

100 if self.start_offset >= self.end_offset: 

101 raise InvalidInputFormat( 

102 f"Chunk has higher start_offset than end_offset: {self}" 

103 ) 

104 

105 @property 

106 def size(self) -> int: 

107 return self.end_offset - self.start_offset 

108 

109 @property 

110 def range_hex(self) -> str: 

111 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

112 

113 @property 

114 def is_whole_file(self): 

115 assert self.file 

116 return self.start_offset == 0 and self.end_offset == self.file.size() 

117 

118 def contains(self, other: "Chunk") -> bool: 

119 return ( 

120 self.start_offset < other.start_offset 

121 and self.end_offset >= other.end_offset 

122 ) or ( 

123 self.start_offset <= other.start_offset 

124 and self.end_offset > other.end_offset 

125 ) 

126 

127 def contains_offset(self, offset: int) -> bool: 

128 return self.start_offset <= offset < self.end_offset 

129 

130 def __repr__(self) -> str: 

131 return self.range_hex 

132 

133 

134@attrs.define(repr=False) 

135class ValidChunk(Chunk): 

136 """Known to be valid chunk of a File, can be extracted with an external program.""" 

137 

138 handler: "Handler" = attrs.field(init=False, eq=False) 

139 is_encrypted: bool = attrs.field(default=False) 

140 

141 def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: 

142 if self.is_encrypted: 

143 logger.warning( 

144 "Encrypted file is not extracted", 

145 path=inpath, 

146 chunk=self, 

147 ) 

148 raise ExtractError 

149 

150 return self.handler.extract(inpath, outdir) 

151 

152 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

153 return ChunkReport( 

154 id=self.id, 

155 start_offset=self.start_offset, 

156 end_offset=self.end_offset, 

157 size=self.size, 

158 handler_name=self.handler.NAME, 

159 is_encrypted=self.is_encrypted, 

160 extraction_reports=extraction_reports, 

161 ) 

162 

163 

164@attrs.define(repr=False) 

165class UnknownChunk(Chunk): 

166 r"""Gaps between valid chunks or otherwise unknown chunks. 

167 

168 Important for manual analysis, and analytical certainty: for example 

169 randomness, other chunks inside it, metadata, etc. 

170 

171 These are not extracted, just logged for information purposes and further analysis, 

172 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

173 """ 

174 

175 def as_report(self, randomness: Optional[RandomnessReport]) -> UnknownChunkReport: 

176 return UnknownChunkReport( 

177 id=self.id, 

178 start_offset=self.start_offset, 

179 end_offset=self.end_offset, 

180 size=self.size, 

181 randomness=randomness, 

182 ) 

183 

184 

185@attrs.define(repr=False) 

186class PaddingChunk(Chunk): 

187 r"""Gaps between valid chunks or otherwise unknown chunks. 

188 

189 Important for manual analysis, and analytical certanity: for example 

190 randomness, other chunks inside it, metadata, etc. 

191 """ 

192 

193 def as_report( 

194 self, 

195 randomness: Optional[RandomnessReport], # noqa: ARG002 

196 ) -> ChunkReport: 

197 return ChunkReport( 

198 id=self.id, 

199 start_offset=self.start_offset, 

200 end_offset=self.end_offset, 

201 size=self.size, 

202 is_encrypted=False, 

203 handler_name="padding", 

204 extraction_reports=[], 

205 ) 

206 

207 

208@attrs.define 

209class MultiFile(Blob): 

210 name: str = attrs.field(kw_only=True) 

211 paths: list[Path] = attrs.field(kw_only=True) 

212 

213 handler: "DirectoryHandler" = attrs.field(init=False, eq=False) 

214 

215 def extract(self, outdir: Path) -> Optional["ExtractResult"]: 

216 return self.handler.extract(self.paths, outdir) 

217 

218 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

219 return MultiFileReport( 

220 id=self.id, 

221 name=self.name, 

222 paths=self.paths, 

223 handler_name=self.handler.NAME, 

224 extraction_reports=extraction_reports, 

225 ) 

226 

227 

228ReportType = TypeVar("ReportType", bound=Report) 

229 

230 

231class TaskResult(BaseModel): 

232 task: Task 

233 reports: list[Report] = [] 

234 subtasks: list[Task] = [] 

235 

236 def add_report(self, report: Report): 

237 self.reports.append(report) 

238 

239 def add_subtask(self, task: Task): 

240 self.subtasks.append(task) 

241 

242 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

243 return [report for report in self.reports if isinstance(report, report_class)] 

244 

245 

246class ProcessResult(BaseModel): 

247 results: list[TaskResult] = [] 

248 

249 @property 

250 def errors(self) -> list[ErrorReport]: 

251 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

252 interesting_reports = ( 

253 r for r in reports if isinstance(r, (ErrorReport, ChunkReport)) 

254 ) 

255 errors = [] 

256 for report in interesting_reports: 

257 if isinstance(report, ErrorReport): 

258 errors.append(report) 

259 else: 

260 errors.extend( 

261 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

262 ) 

263 return errors 

264 

265 def register(self, result: TaskResult): 

266 self.results.append(result) 

267 

268 def to_json(self, indent=" "): 

269 return json.dumps( 

270 [result.model_dump(mode="json") for result in self.results], indent=indent 

271 ) 

272 

273 def get_output_dir(self) -> Optional[Path]: 

274 try: 

275 top_result = self.results[0] 

276 if carves := top_result.filter_reports(CarveDirectoryReport): 

277 # we have a top level carve 

278 return carves[0].carve_dir 

279 

280 # we either have an extraction, 

281 # and the extract directory registered as subtask 

282 return top_result.subtasks[0].path 

283 except IndexError: 

284 # or no extraction 

285 return None 

286 

287 

288ReportModel = list[TaskResult] 

289ReportModelAdapter = TypeAdapter(ReportModel) 

290"""Use this for deserialization (import JSON report back into Python 

291objects) of the JSON report. 

292 

293For example: 

294 

295with open('report.json', 'r') as f: 

296 data = f.read() 

297 report_data = ReportModelAdapter.validate_json(data) 

298 

299For another example see: 

300tests/test_models.py::Test_to_json::test_process_result_deserialization 

301""" 

302 

303 

304class ExtractError(Exception): 

305 """There was an error during extraction.""" 

306 

307 def __init__(self, *reports: Report): 

308 super().__init__() 

309 self.reports: tuple[Report, ...] = reports 

310 

311 

312@attrs.define(kw_only=True) 

313class ExtractResult: 

314 reports: list[Report] 

315 

316 

317class Extractor(abc.ABC): 

318 def get_dependencies(self) -> list[str]: 

319 """Return the external command dependencies.""" 

320 return [] 

321 

322 @abc.abstractmethod 

323 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

324 """Extract the carved out chunk. 

325 

326 Raises ExtractError on failure. 

327 """ 

328 

329 

330class DirectoryExtractor(abc.ABC): 

331 def get_dependencies(self) -> list[str]: 

332 """Return the external command dependencies.""" 

333 return [] 

334 

335 @abc.abstractmethod 

336 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

337 """Extract from a multi file path list. 

338 

339 Raises ExtractError on failure. 

340 """ 

341 

342 

343class Pattern(str): 

344 def as_regex(self) -> bytes: 

345 raise NotImplementedError 

346 

347 

348class HexString(Pattern): 

349 """Hex string can be a YARA rule like hexadecimal string. 

350 

351 It is useful to simplify defining binary strings using hex 

352 encoding, wild-cards, jumps and alternatives. Hexstrings are 

353 convereted to hyperscan compatible PCRE regex. 

354 

355 See YARA & Hyperscan documentation for more details: 

356 

357 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

358 

359 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

360 

361 You can specify the following: 

362 

363 - normal bytes using hexadecimals: 01 de ad co de ff 

364 

365 - wild-cards can match single bytes and can be mixed with 

366 normal hex: 01 ?? 02 

367 

368 - wild-cards can also match first and second nibles: 0? ?0 

369 

370 - jumps can be specified for multiple wildcard bytes: [3] 

371 [2-5] 

372 

373 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

374 above can be combined and alternatives nested: 01 02 ( 03 04 

375 | (0? | 03 | ?0) | 05 ?? ) 06 

376 

377 Single line comments can be specified using // 

378 

379 We do NOT support the following YARA syntax: 

380 

381 - comments using /* */ notation 

382 

383 - infinite jumps: [-] 

384 

385 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

386 """ 

387 

388 def as_regex(self) -> bytes: 

389 return hexstring2regex(self) 

390 

391 

392class Regex(Pattern): 

393 """Byte PCRE regex. 

394 

395 See hyperscan documentation for more details: 

396 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

397 """ 

398 

399 def as_regex(self) -> bytes: 

400 return self.encode() 

401 

402 

403class DirectoryPattern: 

404 def get_files(self, directory: Path) -> Iterable[Path]: 

405 raise NotImplementedError 

406 

407 

408class Glob(DirectoryPattern): 

409 def __init__(self, pattern): 

410 self._pattern = pattern 

411 

412 def get_files(self, directory: Path) -> Iterable[Path]: 

413 return directory.glob(self._pattern) 

414 

415 

416class SingleFile(DirectoryPattern): 

417 def __init__(self, filename): 

418 self._filename = filename 

419 

420 def get_files(self, directory: Path) -> Iterable[Path]: 

421 path = directory / self._filename 

422 return [path] if path.exists() else [] 

423 

424 

425class DirectoryHandler(abc.ABC): 

426 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

427 

428 NAME: str 

429 

430 EXTRACTOR: DirectoryExtractor 

431 

432 PATTERN: DirectoryPattern 

433 

434 DOC: Union[HandlerDoc, None] 

435 

436 @classmethod 

437 def get_dependencies(cls): 

438 """Return external command dependencies needed for this handler to work.""" 

439 if cls.EXTRACTOR: 

440 return cls.EXTRACTOR.get_dependencies() 

441 return [] 

442 

443 @abc.abstractmethod 

444 def calculate_multifile(self, file: Path) -> Optional[MultiFile]: 

445 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

446 

447 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

448 if self.EXTRACTOR is None: 

449 logger.debug("Skipping file: no extractor.", paths=paths) 

450 raise ExtractError 

451 

452 # We only extract every blob once, it's a mistake to extract the same blob again 

453 outdir.mkdir(parents=True, exist_ok=False) 

454 

455 return self.EXTRACTOR.extract(paths, outdir) 

456 

457 

458TExtractor = TypeVar("TExtractor", bound=Union[None, Extractor]) 

459 

460 

461class Handler(abc.ABC, Generic[TExtractor]): 

462 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

463 

464 NAME: str 

465 PATTERNS: list[Pattern] 

466 # We need this, because not every match reflects the actual start 

467 # (e.g. tar magic is in the middle of the header) 

468 PATTERN_MATCH_OFFSET: int = 0 

469 

470 EXTRACTOR: TExtractor 

471 

472 DOC: Union[HandlerDoc, None] 

473 

474 @classmethod 

475 def get_dependencies(cls): 

476 """Return external command dependencies needed for this handler to work.""" 

477 if cls.EXTRACTOR is not None: 

478 return cls.EXTRACTOR.get_dependencies() 

479 return [] 

480 

481 @abc.abstractmethod 

482 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

483 """Calculate the Chunk offsets from the File and the file type headers.""" 

484 

485 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

486 if self.EXTRACTOR is None: 

487 logger.debug("Skipping file: no extractor.", path=inpath) 

488 raise ExtractError 

489 

490 # We only extract every blob once, it's a mistake to extract the same blob again 

491 outdir.mkdir(parents=True, exist_ok=False) 

492 

493 return self.EXTRACTOR.extract(inpath, outdir) 

494 

495 

496class StructHandler(Handler): 

497 C_DEFINITIONS: str 

498 # A struct from the C_DEFINITIONS used to parse the file's header 

499 HEADER_STRUCT: str 

500 

501 def __init__(self): 

502 self._struct_parser = StructParser(self.C_DEFINITIONS) 

503 

504 @property 

505 def cparser_le(self): 

506 return self._struct_parser.cparser_le 

507 

508 @property 

509 def cparser_be(self): 

510 return self._struct_parser.cparser_be 

511 

512 def parse_header(self, file: File, endian=Endian.LITTLE): 

513 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

514 logger.debug("Header parsed", header=header, _verbosity=3) 

515 return header 

516 

517 

518Handlers = tuple[type[Handler], ...] 

519DirectoryHandlers = tuple[type[DirectoryHandler], ...]