Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/models.py: 70%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

256 statements  

1import abc 

2import dataclasses 

3import itertools 

4import json 

5from collections.abc import Iterable 

6from enum import Enum 

7from pathlib import Path 

8from typing import Generic, Optional, TypeVar, Union 

9 

10import attrs 

11from structlog import get_logger 

12 

13from .file_utils import Endian, File, InvalidInputFormat, StructParser 

14from .identifiers import new_id 

15from .parser import hexstring2regex 

16from .report import ( 

17 CarveDirectoryReport, 

18 ChunkReport, 

19 ErrorReport, 

20 MultiFileReport, 

21 RandomnessReport, 

22 Report, 

23 UnknownChunkReport, 

24) 

25 

26logger = get_logger() 

27 

28# The state transitions are: 

29# 

30# file ──► pattern match ──► ValidChunk 

31# 

32 

33 

34class HandlerType(Enum): 

35 ARCHIVE = "Archive" 

36 COMPRESSION = "Compression" 

37 FILESYSTEM = "FileSystem" 

38 EXECUTABLE = "Executable" 

39 BAREMETAL = "Baremetal" 

40 BOOTLOADER = "Bootloader" 

41 ENCRYPTION = "Encryption" 

42 

43 

44@dataclasses.dataclass(frozen=True) 

45class Reference: 

46 title: str 

47 url: str 

48 

49 

50@dataclasses.dataclass 

51class HandlerDoc: 

52 name: str 

53 description: Union[str, None] 

54 vendor: Union[str, None] 

55 references: list[Reference] 

56 limitations: list[str] 

57 handler_type: HandlerType 

58 fully_supported: bool = dataclasses.field(init=False) 

59 

60 def __post_init__(self): 

61 self.fully_supported = len(self.limitations) == 0 

62 

63 

64@attrs.define(frozen=True) 

65class Task: 

66 path: Path 

67 depth: int 

68 blob_id: str 

69 is_multi_file: bool = attrs.field(default=False) 

70 

71 

72@attrs.define 

73class Blob: 

74 id: str = attrs.field( 

75 factory=new_id, 

76 ) 

77 

78 

79@attrs.define 

80class Chunk(Blob): 

81 """File chunk, have start and end offset, but still can be invalid. 

82 

83 For an array ``b``, a chunk ``c`` represents the slice: 

84 :: 

85 

86 b[c.start_offset:c.end_offset] 

87 """ 

88 

89 start_offset: int = attrs.field(kw_only=True) 

90 """The index of the first byte of the chunk""" 

91 

92 end_offset: int = attrs.field(kw_only=True) 

93 """The index of the first byte after the end of the chunk""" 

94 

95 file: Optional[File] = None 

96 

97 def __attrs_post_init__(self): 

98 if self.start_offset < 0 or self.end_offset < 0: 

99 raise InvalidInputFormat(f"Chunk has negative offset: {self}") 

100 if self.start_offset >= self.end_offset: 

101 raise InvalidInputFormat( 

102 f"Chunk has higher start_offset than end_offset: {self}" 

103 ) 

104 

105 @property 

106 def size(self) -> int: 

107 return self.end_offset - self.start_offset 

108 

109 @property 

110 def range_hex(self) -> str: 

111 return f"0x{self.start_offset:x}-0x{self.end_offset:x}" 

112 

113 @property 

114 def is_whole_file(self): 

115 assert self.file 

116 return self.start_offset == 0 and self.end_offset == self.file.size() 

117 

118 def contains(self, other: "Chunk") -> bool: 

119 return ( 

120 self.start_offset < other.start_offset 

121 and self.end_offset >= other.end_offset 

122 ) or ( 

123 self.start_offset <= other.start_offset 

124 and self.end_offset > other.end_offset 

125 ) 

126 

127 def contains_offset(self, offset: int) -> bool: 

128 return self.start_offset <= offset < self.end_offset 

129 

130 def __repr__(self) -> str: 

131 return self.range_hex 

132 

133 

134@attrs.define(repr=False) 

135class ValidChunk(Chunk): 

136 """Known to be valid chunk of a File, can be extracted with an external program.""" 

137 

138 handler: "Handler" = attrs.field(init=False, eq=False) 

139 is_encrypted: bool = attrs.field(default=False) 

140 

141 def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: 

142 if self.is_encrypted: 

143 logger.warning( 

144 "Encrypted file is not extracted", 

145 path=inpath, 

146 chunk=self, 

147 ) 

148 raise ExtractError 

149 

150 return self.handler.extract(inpath, outdir) 

151 

152 def as_report(self, extraction_reports: list[Report]) -> ChunkReport: 

153 return ChunkReport( 

154 id=self.id, 

155 start_offset=self.start_offset, 

156 end_offset=self.end_offset, 

157 size=self.size, 

158 handler_name=self.handler.NAME, 

159 is_encrypted=self.is_encrypted, 

160 extraction_reports=extraction_reports, 

161 ) 

162 

163 

164@attrs.define(repr=False) 

165class UnknownChunk(Chunk): 

166 r"""Gaps between valid chunks or otherwise unknown chunks. 

167 

168 Important for manual analysis, and analytical certainty: for example 

169 randomness, other chunks inside it, metadata, etc. 

170 

171 These are not extracted, just logged for information purposes and further analysis, 

172 like most common bytes (like \x00 and \xFF), ASCII strings, high randomness, etc. 

173 """ 

174 

175 def as_report(self, randomness: Optional[RandomnessReport]) -> UnknownChunkReport: 

176 return UnknownChunkReport( 

177 id=self.id, 

178 start_offset=self.start_offset, 

179 end_offset=self.end_offset, 

180 size=self.size, 

181 randomness=randomness, 

182 ) 

183 

184 

185@attrs.define(repr=False) 

186class PaddingChunk(Chunk): 

187 r"""Gaps between valid chunks or otherwise unknown chunks. 

188 

189 Important for manual analysis, and analytical certanity: for example 

190 randomness, other chunks inside it, metadata, etc. 

191 """ 

192 

193 def as_report( 

194 self, 

195 randomness: Optional[RandomnessReport], # noqa: ARG002 

196 ) -> ChunkReport: 

197 return ChunkReport( 

198 id=self.id, 

199 start_offset=self.start_offset, 

200 end_offset=self.end_offset, 

201 size=self.size, 

202 is_encrypted=False, 

203 handler_name="padding", 

204 extraction_reports=[], 

205 ) 

206 

207 

208@attrs.define 

209class MultiFile(Blob): 

210 name: str = attrs.field(kw_only=True) 

211 paths: list[Path] = attrs.field(kw_only=True) 

212 

213 handler: "DirectoryHandler" = attrs.field(init=False, eq=False) 

214 

215 def extract(self, outdir: Path) -> Optional["ExtractResult"]: 

216 return self.handler.extract(self.paths, outdir) 

217 

218 def as_report(self, extraction_reports: list[Report]) -> MultiFileReport: 

219 return MultiFileReport( 

220 id=self.id, 

221 name=self.name, 

222 paths=self.paths, 

223 handler_name=self.handler.NAME, 

224 extraction_reports=extraction_reports, 

225 ) 

226 

227 

228ReportType = TypeVar("ReportType", bound=Report) 

229 

230 

231@attrs.define 

232class TaskResult: 

233 task: Task 

234 reports: list[Report] = attrs.field(factory=list) 

235 subtasks: list[Task] = attrs.field(factory=list) 

236 

237 def add_report(self, report: Report): 

238 self.reports.append(report) 

239 

240 def add_subtask(self, task: Task): 

241 self.subtasks.append(task) 

242 

243 def filter_reports(self, report_class: type[ReportType]) -> list[ReportType]: 

244 return [report for report in self.reports if isinstance(report, report_class)] 

245 

246 

247@attrs.define 

248class ProcessResult: 

249 results: list[TaskResult] = attrs.field(factory=list) 

250 

251 @property 

252 def errors(self) -> list[ErrorReport]: 

253 reports = itertools.chain.from_iterable(r.reports for r in self.results) 

254 interesting_reports = ( 

255 r for r in reports if isinstance(r, (ErrorReport, ChunkReport)) 

256 ) 

257 errors = [] 

258 for report in interesting_reports: 

259 if isinstance(report, ErrorReport): 

260 errors.append(report) 

261 else: 

262 errors.extend( 

263 r for r in report.extraction_reports if isinstance(r, ErrorReport) 

264 ) 

265 return errors 

266 

267 def register(self, result: TaskResult): 

268 self.results.append(result) 

269 

270 def to_json(self, indent=" "): 

271 return to_json(self.results, indent=indent) 

272 

273 def get_output_dir(self) -> Optional[Path]: 

274 try: 

275 top_result = self.results[0] 

276 if carves := top_result.filter_reports(CarveDirectoryReport): 

277 # we have a top level carve 

278 return carves[0].carve_dir 

279 

280 # we either have an extraction, 

281 # and the extract directory registered as subtask 

282 return top_result.subtasks[0].path 

283 except IndexError: 

284 # or no extraction 

285 return None 

286 

287 

288class _JSONEncoder(json.JSONEncoder): 

289 def default(self, o): 

290 obj = o 

291 if attrs.has(type(obj)): 

292 extend_attr_output = True 

293 attr_output = attrs.asdict(obj, recurse=not extend_attr_output) 

294 attr_output["__typename__"] = obj.__class__.__name__ 

295 return attr_output 

296 

297 if isinstance(obj, Enum): 

298 return obj.name 

299 

300 if isinstance(obj, Path): 

301 return str(obj) 

302 

303 if isinstance(obj, bytes): 

304 try: 

305 return obj.decode() 

306 except UnicodeDecodeError: 

307 return str(obj) 

308 

309 logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) 

310 # the usual fail path of custom JSONEncoders is to call the parent and let it fail 

311 # return json.JSONEncoder.default(self, obj) 

312 # instead of failing, just return something usable 

313 return f"Non-JSON encodable value: {obj}" 

314 

315 

316def to_json(obj, indent=" ") -> str: 

317 """Encode any UnBlob object as a serialized JSON.""" 

318 return json.dumps(obj, cls=_JSONEncoder, indent=indent) 

319 

320 

321class ExtractError(Exception): 

322 """There was an error during extraction.""" 

323 

324 def __init__(self, *reports: Report): 

325 super().__init__() 

326 self.reports: tuple[Report, ...] = reports 

327 

328 

329@attrs.define(kw_only=True) 

330class ExtractResult: 

331 reports: list[Report] 

332 

333 

334class Extractor(abc.ABC): 

335 def get_dependencies(self) -> list[str]: 

336 """Return the external command dependencies.""" 

337 return [] 

338 

339 @abc.abstractmethod 

340 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

341 """Extract the carved out chunk. 

342 

343 Raises ExtractError on failure. 

344 """ 

345 

346 

347class DirectoryExtractor(abc.ABC): 

348 def get_dependencies(self) -> list[str]: 

349 """Return the external command dependencies.""" 

350 return [] 

351 

352 @abc.abstractmethod 

353 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

354 """Extract from a multi file path list. 

355 

356 Raises ExtractError on failure. 

357 """ 

358 

359 

360class Pattern(str): 

361 def as_regex(self) -> bytes: 

362 raise NotImplementedError 

363 

364 

365class HexString(Pattern): 

366 """Hex string can be a YARA rule like hexadecimal string. 

367 

368 It is useful to simplify defining binary strings using hex 

369 encoding, wild-cards, jumps and alternatives. Hexstrings are 

370 convereted to hyperscan compatible PCRE regex. 

371 

372 See YARA & Hyperscan documentation for more details: 

373 

374 - https://yara.readthedocs.io/en/stable/writingrules.html#hexadecimal-strings 

375 

376 - https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support 

377 

378 You can specify the following: 

379 

380 - normal bytes using hexadecimals: 01 de ad co de ff 

381 

382 - wild-cards can match single bytes and can be mixed with 

383 normal hex: 01 ?? 02 

384 

385 - wild-cards can also match first and second nibles: 0? ?0 

386 

387 - jumps can be specified for multiple wildcard bytes: [3] 

388 [2-5] 

389 

390 - alternatives can be specified as well: ( 01 02 | 03 04 ) The 

391 above can be combined and alternatives nested: 01 02 ( 03 04 

392 | (0? | 03 | ?0) | 05 ?? ) 06 

393 

394 Single line comments can be specified using // 

395 

396 We do NOT support the following YARA syntax: 

397 

398 - comments using /* */ notation 

399 

400 - infinite jumps: [-] 

401 

402 - unbounded jumps: [3-] or [-4] (use [0-4] instead) 

403 """ 

404 

405 def as_regex(self) -> bytes: 

406 return hexstring2regex(self) 

407 

408 

409class Regex(Pattern): 

410 """Byte PCRE regex. 

411 

412 See hyperscan documentation for more details: 

413 https://intel.github.io/hyperscan/dev-reference/compilation.html#pattern-support. 

414 """ 

415 

416 def as_regex(self) -> bytes: 

417 return self.encode() 

418 

419 

420class DirectoryPattern: 

421 def get_files(self, directory: Path) -> Iterable[Path]: 

422 raise NotImplementedError 

423 

424 

425class Glob(DirectoryPattern): 

426 def __init__(self, pattern): 

427 self._pattern = pattern 

428 

429 def get_files(self, directory: Path) -> Iterable[Path]: 

430 return directory.glob(self._pattern) 

431 

432 

433class SingleFile(DirectoryPattern): 

434 def __init__(self, filename): 

435 self._filename = filename 

436 

437 def get_files(self, directory: Path) -> Iterable[Path]: 

438 path = directory / self._filename 

439 return [path] if path.exists() else [] 

440 

441 

442class DirectoryHandler(abc.ABC): 

443 """A directory type handler is responsible for searching, validating and "unblobbing" files from multiple files in a directory.""" 

444 

445 NAME: str 

446 

447 EXTRACTOR: DirectoryExtractor 

448 

449 PATTERN: DirectoryPattern 

450 

451 DOC: HandlerDoc 

452 

453 @classmethod 

454 def get_dependencies(cls): 

455 """Return external command dependencies needed for this handler to work.""" 

456 if cls.EXTRACTOR: 

457 return cls.EXTRACTOR.get_dependencies() 

458 return [] 

459 

460 @abc.abstractmethod 

461 def calculate_multifile(self, file: Path) -> Optional[MultiFile]: 

462 """Calculate the MultiFile in a directory, using a file matched by the pattern as a starting point.""" 

463 

464 def extract(self, paths: list[Path], outdir: Path) -> Optional[ExtractResult]: 

465 if self.EXTRACTOR is None: 

466 logger.debug("Skipping file: no extractor.", paths=paths) 

467 raise ExtractError 

468 

469 # We only extract every blob once, it's a mistake to extract the same blob again 

470 outdir.mkdir(parents=True, exist_ok=False) 

471 

472 return self.EXTRACTOR.extract(paths, outdir) 

473 

474 

475TExtractor = TypeVar("TExtractor", bound=Union[None, Extractor]) 

476 

477 

478class Handler(abc.ABC, Generic[TExtractor]): 

479 """A file type handler is responsible for searching, validating and "unblobbing" files from Blobs.""" 

480 

481 NAME: str 

482 PATTERNS: list[Pattern] 

483 # We need this, because not every match reflects the actual start 

484 # (e.g. tar magic is in the middle of the header) 

485 PATTERN_MATCH_OFFSET: int = 0 

486 

487 EXTRACTOR: TExtractor 

488 

489 DOC: HandlerDoc 

490 

491 @classmethod 

492 def get_dependencies(cls): 

493 """Return external command dependencies needed for this handler to work.""" 

494 if cls.EXTRACTOR is not None: 

495 return cls.EXTRACTOR.get_dependencies() 

496 return [] 

497 

498 @abc.abstractmethod 

499 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

500 """Calculate the Chunk offsets from the File and the file type headers.""" 

501 

502 def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: 

503 if self.EXTRACTOR is None: 

504 logger.debug("Skipping file: no extractor.", path=inpath) 

505 raise ExtractError 

506 

507 # We only extract every blob once, it's a mistake to extract the same blob again 

508 outdir.mkdir(parents=True, exist_ok=False) 

509 

510 return self.EXTRACTOR.extract(inpath, outdir) 

511 

512 

513class StructHandler(Handler): 

514 C_DEFINITIONS: str 

515 # A struct from the C_DEFINITIONS used to parse the file's header 

516 HEADER_STRUCT: str 

517 

518 def __init__(self): 

519 self._struct_parser = StructParser(self.C_DEFINITIONS) 

520 

521 @property 

522 def cparser_le(self): 

523 return self._struct_parser.cparser_le 

524 

525 @property 

526 def cparser_be(self): 

527 return self._struct_parser.cparser_be 

528 

529 def parse_header(self, file: File, endian=Endian.LITTLE): 

530 header = self._struct_parser.parse(self.HEADER_STRUCT, file, endian) 

531 logger.debug("Header parsed", header=header, _verbosity=3) 

532 return header 

533 

534 

535Handlers = tuple[type[Handler], ...] 

536DirectoryHandlers = tuple[type[DirectoryHandler], ...]