Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/processing.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

442 statements  

1import enum 

2import multiprocessing 

3import shutil 

4from collections.abc import Iterable, Sequence 

5from operator import attrgetter 

6from pathlib import Path 

7 

8import attrs 

9import magic 

10import plotext as plt 

11from structlog import get_logger 

12 

13from unblob import math_tools as mt 

14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers 

15 

16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory 

17from .file_utils import InvalidInputFormat, iterate_file 

18from .finder import search_chunks 

19from .iter_utils import pairwise 

20from .logging import noformat 

21from .models import ( 

22 Chunk, 

23 DirectoryHandler, 

24 DirectoryHandlers, 

25 ExtractError, 

26 File, 

27 MultiFile, 

28 PaddingChunk, 

29 ProcessResult, 

30 Task, 

31 TaskResult, 

32 UnknownChunk, 

33 ValidChunk, 

34) 

35from .pool import make_pool 

36from .report import ( 

37 CalculateMultiFileExceptionReport, 

38 CarveDirectoryReport, 

39 ErrorReport, 

40 ExtractedFileDeletedReport, 

41 FileMagicReport, 

42 HashReport, 

43 MultiFileCollisionReport, 

44 OutputDirectoryExistsReport, 

45 RandomnessMeasurements, 

46 RandomnessReport, 

47 Report, 

48 StatReport, 

49 UnknownError, 

50) 

51from .ui import NullProgressReporter, ProgressReporter 

52 

53logger = get_logger() 

54 

55DEFAULT_DEPTH = 10 

56DEFAULT_PROCESS_NUM = multiprocessing.cpu_count() 

57DEFAULT_SKIP_MAGIC = ( 

58 "BFLT", 

59 "Erlang BEAM file", 

60 "GIF", 

61 "GNU message catalog", 

62 "HP Printer Job Language", 

63 "JPEG", 

64 "Java module image", 

65 "MPEG", 

66 "MS Windows icon resource", 

67 "Macromedia Flash data", 

68 "Microsoft Excel", 

69 "Microsoft PowerPoint", 

70 "Microsoft Word", 

71 "OpenDocument", 

72 "PDF document", 

73 "PNG", 

74 "SQLite", 

75 "TrueType Font data", 

76 "Web Open Font Format", 

77 "Windows Embedded CE binary image", 

78 "Xilinx BIT data", 

79 "compiled Java class", 

80 "magic binary file", 

81 "python", # # (e.g. python 2.7 byte-compiled) 

82) 

83DEFAULT_SKIP_EXTENSION = (".rlib",) 

84 

85 

86class ExtractedFileDeletionMode(enum.Enum): 

87 NONE = "none" 

88 SELECTED = "selected" 

89 ALL = "all" 

90 

91 

92@attrs.define(kw_only=True) 

93class ExtractionConfig: 

94 extract_root: Path = attrs.field(converter=lambda value: value.resolve()) 

95 force_extract: bool = False 

96 randomness_depth: int 

97 randomness_plot: bool = False 

98 max_depth: int = DEFAULT_DEPTH 

99 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC 

100 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION 

101 skip_extraction: bool = False 

102 process_num: int = DEFAULT_PROCESS_NUM 

103 keep_extracted_chunks: bool = False 

104 extract_suffix: str = "_extract" 

105 carve_suffix: str = "_extract" 

106 handlers: Handlers = BUILTIN_HANDLERS 

107 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS 

108 verbose: int = 1 

109 progress_reporter: type[ProgressReporter] = NullProgressReporter 

110 extracted_file_deletion: ExtractedFileDeletionMode = ExtractedFileDeletionMode.NONE 

111 extracted_file_handler_filter: Iterable[str] = attrs.field( 

112 default=(), 

113 converter=lambda values: tuple(str(value) for value in values), 

114 ) 

115 

116 def _get_output_path(self, path: Path) -> Path: 

117 """Return path under extract root.""" 

118 try: 

119 relative_path = path.relative_to(self.extract_root) 

120 except ValueError: 

121 # path is not inside root, i.e. it is an input file 

122 relative_path = Path(path.name) 

123 return (self.extract_root / relative_path).expanduser().resolve() 

124 

125 def get_extract_dir_for(self, path: Path) -> Path: 

126 return self._get_output_path(path.with_name(path.name + self.extract_suffix)) 

127 

128 def get_carve_dir_for(self, path: Path) -> Path: 

129 return self._get_output_path(path.with_name(path.name + self.carve_suffix)) 

130 

131 

132def process_file( 

133 config: ExtractionConfig, input_path: Path, report_file: Path | None = None 

134) -> ProcessResult: 

135 task = Task( 

136 blob_id="", 

137 path=input_path, 

138 depth=0, 

139 ) 

140 

141 if not input_path.is_file(): 

142 raise ValueError("input_path is not a file", input_path) 

143 

144 extract_dir = config.get_extract_dir_for(input_path) 

145 if config.force_extract and extract_dir.exists(): 

146 logger.info("Removing extract dir", path=extract_dir) 

147 shutil.rmtree(extract_dir) 

148 

149 carve_dir = config.get_carve_dir_for(input_path) 

150 if config.force_extract and carve_dir.exists(): 

151 logger.info("Removing carve dir", path=carve_dir) 

152 shutil.rmtree(carve_dir) 

153 

154 if not prepare_report_file(config, report_file): 

155 logger.error( 

156 "File not processed, as report could not be written", file=input_path 

157 ) 

158 return ProcessResult() 

159 

160 process_result = _process_task(config, task) 

161 

162 if report_file: 

163 write_json_report(report_file, process_result) 

164 

165 return process_result 

166 

167 

168def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult: 

169 processor = Processor(config) 

170 aggregated_result = ProcessResult() 

171 

172 progress_reporter = config.progress_reporter() 

173 

174 def process_result(pool, result): 

175 progress_reporter.update(result) 

176 

177 for new_task in result.subtasks: 

178 pool.submit(new_task) 

179 aggregated_result.register(result) 

180 

181 pool = make_pool( 

182 process_num=config.process_num, 

183 handler=processor.process_task, 

184 result_callback=process_result, 

185 ) 

186 

187 with pool, progress_reporter: 

188 pool.submit(task) 

189 pool.process_until_done() 

190 

191 return aggregated_result 

192 

193 

194def prepare_report_file(config: ExtractionConfig, report_file: Path | None) -> bool: 

195 """Prevent report writing failing after an expensive extraction. 

196 

197 Should be called before processing tasks. 

198 

199 Returns True if there is no foreseen problem, 

200 False if report writing is known in advance to fail. 

201 """ 

202 if not report_file: 

203 # we will not write report at all 

204 return True 

205 

206 if report_file.exists(): 

207 if config.force_extract: 

208 logger.warning("Overwriting existing report file", path=report_file) 

209 try: 

210 report_file.write_text("") 

211 except OSError as e: 

212 logger.error( 

213 "Can not overwrite existing report file", 

214 path=report_file, 

215 msg=str(e), 

216 ) 

217 return False 

218 else: 

219 logger.error( 

220 "Report file exists and --force not specified", path=report_file 

221 ) 

222 return False 

223 if not report_file.parent.exists(): 

224 logger.error( 

225 "Trying to write report file to a non-existent directory", path=report_file 

226 ) 

227 return False 

228 return True 

229 

230 

231def write_json_report(report_file: Path, process_result: ProcessResult): 

232 try: 

233 report_file.write_text(process_result.to_json()) 

234 except OSError as e: 

235 logger.error("Can not write JSON report", path=report_file, msg=str(e)) 

236 except Exception: 

237 logger.exception("Can not write JSON report", path=report_file) 

238 else: 

239 logger.info("JSON report written", path=report_file) 

240 

241 

242class Processor: 

243 def __init__(self, config: ExtractionConfig): 

244 self._config = config 

245 # libmagic helpers 

246 # file magic uses a rule-set to guess the file type, however as rules are added they could 

247 # shadow each other. File magic uses rule priorities to determine which is the best matching 

248 # rule, however this could shadow other valid matches as well, which could eventually break 

249 # any further processing that depends on magic. 

250 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns 

251 # will be included in the magic string at the cost of being a bit slower, but increasing 

252 # accuracy by no shadowing rules. 

253 self._get_magic = magic.Magic(keep_going=True).from_file 

254 self._get_mime_type = magic.Magic(mime=True).from_file 

255 

256 def process_task(self, task: Task) -> TaskResult: 

257 result = TaskResult(task=task) 

258 try: 

259 self._process_task(result, task) 

260 except Exception as exc: 

261 self._process_error(result, exc) 

262 return result 

263 

264 def _process_error(self, result: TaskResult, exc: Exception): 

265 error_report = UnknownError(exception=exc) 

266 result.add_report(error_report) 

267 logger.exception("Unknown error happened", exc_info=exc) 

268 

269 def _process_task(self, result: TaskResult, task: Task): 

270 stat_report = StatReport.from_path(task.path) 

271 result.add_report(stat_report) 

272 log = logger.bind(path=task.path) 

273 

274 if task.depth >= self._config.max_depth: 

275 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached. 

276 log.debug( 

277 "Reached maximum depth, stop further processing", depth=task.depth 

278 ) 

279 return 

280 

281 if stat_report.is_dir: 

282 if not task.is_multi_file: 

283 _DirectoryTask(self._config, task, result).process() 

284 return 

285 

286 if not stat_report.is_file: 

287 log.debug( 

288 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)." 

289 ) 

290 return 

291 

292 magic = self._get_magic(task.path) 

293 mime_type = self._get_mime_type(task.path) 

294 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2) 

295 

296 magic_report = FileMagicReport(magic=magic, mime_type=mime_type) 

297 result.add_report(magic_report) 

298 

299 hash_report = HashReport.from_path(task.path) 

300 result.add_report(hash_report) 

301 

302 if task.is_multi_file: 

303 # The file has been processed as part of a MultiFile, we just run the task to gather reports 

304 return 

305 

306 if stat_report.size == 0: 

307 log.debug("Ignoring empty file") 

308 return 

309 

310 should_skip_file = any( 

311 magic.startswith(pattern) for pattern in self._config.skip_magic 

312 ) 

313 should_skip_file |= task.path.suffix in self._config.skip_extension 

314 

315 if should_skip_file: 

316 log.debug( 

317 "Ignoring file based on magic or extension.", 

318 magic=magic, 

319 extension=task.path.suffix, 

320 ) 

321 return 

322 

323 _FileTask(self._config, task, stat_report.size, result).process() 

324 

325 

326class DirectoryProcessingError(Exception): 

327 def __init__(self, message: str, report: Report): 

328 super().__init__() 

329 self.message = message 

330 self.report: Report = report 

331 

332 

333class _DirectoryTask: 

334 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult): 

335 self.config = config 

336 self.dir_task = dir_task 

337 self.result = result 

338 

339 def process(self): 

340 logger.debug("Processing directory", path=self.dir_task.path) 

341 

342 try: 

343 processed_paths, extract_dirs = self._process_directory() 

344 except DirectoryProcessingError as e: 

345 logger.error(e.message, report=e.report) 

346 self.result.add_report(e.report) 

347 return 

348 

349 self._iterate_directory(extract_dirs, processed_paths) 

350 

351 self._iterate_processed_files(processed_paths) 

352 

353 def _process_directory(self) -> tuple[set[Path], set[Path]]: 

354 processed_paths: set[Path] = set() 

355 extract_dirs: set[Path] = set() 

356 for dir_handler_class in self.config.dir_handlers: 

357 dir_handler = dir_handler_class() 

358 

359 for path in dir_handler.PATTERN.get_files(self.dir_task.path): 

360 multi_file = self._calculate_multifile(dir_handler, path, self.result) 

361 

362 if multi_file is None: 

363 continue 

364 

365 multi_file.handler = dir_handler 

366 

367 self._check_conflicting_files(multi_file, processed_paths) 

368 

369 extract_dir = self._extract_multi_file(multi_file) 

370 

371 # Process files in extracted directory 

372 if extract_dir.exists(): 

373 self.result.add_subtask( 

374 Task( 

375 blob_id=multi_file.id, 

376 path=extract_dir, 

377 depth=self.dir_task.depth + 1, 

378 ) 

379 ) 

380 extract_dirs.add(extract_dir) 

381 

382 processed_paths.update(multi_file.paths) 

383 return processed_paths, extract_dirs 

384 

385 @staticmethod 

386 def _calculate_multifile( 

387 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult 

388 ) -> MultiFile | None: 

389 try: 

390 return dir_handler.calculate_multifile(path) 

391 except InvalidInputFormat as exc: 

392 logger.debug( 

393 "Invalid MultiFile format", 

394 exc_info=exc, 

395 handler=dir_handler.NAME, 

396 path=path, 

397 _verbosity=2, 

398 ) 

399 except Exception as exc: 

400 error_report = CalculateMultiFileExceptionReport( 

401 handler=dir_handler.NAME, 

402 exception=exc, 

403 path=path, 

404 ) 

405 task_result.add_report(error_report) 

406 logger.warning( 

407 "Unhandled Exception during multi file calculation", 

408 **error_report.model_dump(), 

409 ) 

410 

411 def _check_conflicting_files( 

412 self, multi_file: MultiFile, processed_paths: set[Path] 

413 ): 

414 conflicting_paths = processed_paths.intersection(set(multi_file.paths)) 

415 if conflicting_paths: 

416 raise DirectoryProcessingError( 

417 "Conflicting match on files", 

418 report=MultiFileCollisionReport( 

419 paths=conflicting_paths, handler=multi_file.handler.NAME 

420 ), 

421 ) 

422 

423 def _extract_multi_file(self, multi_file: MultiFile) -> Path: 

424 extract_dir = self.config.get_extract_dir_for( 

425 self.dir_task.path / multi_file.name 

426 ) 

427 if extract_dir.exists(): 

428 raise DirectoryProcessingError( 

429 "Skipped: extraction directory exists", 

430 report=multi_file.as_report( 

431 [OutputDirectoryExistsReport(path=extract_dir)] 

432 ), 

433 ) 

434 

435 extraction_reports = [] 

436 try: 

437 if result := multi_file.extract(extract_dir): 

438 extraction_reports.extend(result.reports) 

439 except ExtractError as e: 

440 extraction_reports.extend(e.reports) 

441 except Exception as exc: 

442 logger.exception("Unknown error happened while extracting MultiFile") 

443 extraction_reports.append(UnknownError(exception=exc)) 

444 

445 self.result.add_report(multi_file.as_report(extraction_reports)) 

446 

447 fix_extracted_directory(extract_dir, self.result) 

448 

449 return extract_dir 

450 

451 def _iterate_processed_files(self, processed_paths): 

452 for path in processed_paths: 

453 self.result.add_subtask( 

454 Task( 

455 blob_id=self.dir_task.blob_id, 

456 path=path, 

457 depth=self.dir_task.depth, 

458 is_multi_file=True, 

459 ) 

460 ) 

461 

462 def _iterate_directory(self, extract_dirs, processed_paths): 

463 for path in self.dir_task.path.iterdir(): 

464 if path in extract_dirs or path in processed_paths: 

465 continue 

466 

467 self.result.add_subtask( 

468 Task( 

469 blob_id=self.dir_task.blob_id, 

470 path=path, 

471 depth=self.dir_task.depth, 

472 ) 

473 ) 

474 

475 

476def is_padding(file: File, chunk: UnknownChunk): 

477 chunk_bytes = set() 

478 

479 for small_chunk in iterate_file( 

480 file, chunk.start_offset, chunk.end_offset - chunk.start_offset 

481 ): 

482 chunk_bytes.update(small_chunk) 

483 

484 # early return optimization 

485 if len(chunk_bytes) > 1: 

486 return False 

487 

488 return len(chunk_bytes) == 1 

489 

490 

491def process_patterns( 

492 unknown_chunks: list[UnknownChunk], file: File 

493) -> list[UnknownChunk | PaddingChunk]: 

494 processed_chunks = [] 

495 for unknown_chunk in unknown_chunks: 

496 if is_padding(file, unknown_chunk): 

497 processed_chunks.append( 

498 PaddingChunk( 

499 start_offset=unknown_chunk.start_offset, 

500 end_offset=unknown_chunk.end_offset, 

501 id=unknown_chunk.id, 

502 file=unknown_chunk.file, 

503 ) 

504 ) 

505 else: 

506 processed_chunks.append(unknown_chunk) 

507 return processed_chunks 

508 

509 

510class _FileTask: 

511 def __init__( 

512 self, 

513 config: ExtractionConfig, 

514 task: Task, 

515 size: int, 

516 result: TaskResult, 

517 ): 

518 self.config = config 

519 self.task = task 

520 self.size = size 

521 self.result = result 

522 

523 def process(self): 

524 logger.debug("Processing file", path=self.task.path, size=self.size) 

525 

526 with File.from_path(self.task.path) as file: 

527 all_chunks = search_chunks( 

528 file, self.size, self.config.handlers, self.result 

529 ) 

530 outer_chunks = remove_inner_chunks(all_chunks) 

531 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size) 

532 unknown_chunks = process_patterns(unknown_chunks, file) 

533 assign_file_to_chunks(outer_chunks, file=file) 

534 assign_file_to_chunks(unknown_chunks, file=file) 

535 

536 if outer_chunks or unknown_chunks: 

537 self._process_chunks(file, outer_chunks, unknown_chunks) 

538 else: 

539 # we don't consider whole files as unknown chunks, but we still want to 

540 # calculate randomness for whole files which produced no valid chunks 

541 randomness = self._calculate_randomness(self.task.path) 

542 if randomness: 

543 self.result.add_report(randomness) 

544 

545 def _process_chunks( 

546 self, 

547 file: File, 

548 outer_chunks: list[ValidChunk], 

549 unknown_chunks: list[UnknownChunk | PaddingChunk], 

550 ): 

551 if unknown_chunks: 

552 logger.warning("Found unknown Chunks", chunks=unknown_chunks) 

553 

554 if self.config.skip_extraction: 

555 for chunk in unknown_chunks: 

556 self.result.add_report(chunk.as_report(randomness=None)) 

557 for chunk in outer_chunks: 

558 self.result.add_report(chunk.as_report(extraction_reports=[])) 

559 return 

560 

561 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1 

562 if is_whole_file_chunk: 

563 # skip carving, extract directly the whole file (chunk) 

564 carved_path = self.task.path 

565 for chunk in outer_chunks: 

566 extraction_successful = self._extract_chunk( 

567 carved_path, 

568 chunk, 

569 self.config.get_extract_dir_for(carved_path), 

570 # since we do not carve, we want to keep the input around 

571 remove_extracted_input=False, 

572 ) 

573 if extraction_successful: 

574 self._delete_extracted_file_if_needed(self.task.path, chunk) 

575 else: 

576 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks) 

577 

578 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks): 

579 assert not self.config.skip_extraction 

580 

581 carve_dir = self.config.get_carve_dir_for(self.task.path) 

582 

583 # report the technical carve directory explicitly 

584 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir)) 

585 

586 if carve_dir.exists(): 

587 # Carve directory is not supposed to exist, it is usually a simple mistake of running 

588 # unblob again without cleaning up or using --force. 

589 # It would cause problems continuing, as it would mix up original and extracted files, 

590 # and it would just introduce weird, non-deterministic problems due to interference on paths 

591 # by multiple workers (parallel processing, modifying content (fix_symlink), 

592 # and `mmap` + open for write with O_TRUNC). 

593 logger.error("Skipped: carve directory exists", carve_dir=carve_dir) 

594 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir)) 

595 return 

596 

597 for chunk in unknown_chunks: 

598 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk) 

599 randomness = self._calculate_randomness(carved_unknown_path) 

600 self.result.add_report(chunk.as_report(randomness=randomness)) 

601 

602 for chunk in outer_chunks: 

603 carved_path = carve_valid_chunk(carve_dir, file, chunk) 

604 self._extract_chunk( 

605 carved_path, 

606 chunk, 

607 self.config.get_extract_dir_for(carved_path), 

608 # when a carved chunk is successfully extracted, usually 

609 # we want to get rid of it, as its data is available in 

610 # extracted format, and the raw data is still part of 

611 # the file the chunk belongs to 

612 remove_extracted_input=not self.config.keep_extracted_chunks, 

613 ) 

614 

615 def _calculate_randomness(self, path: Path) -> RandomnessReport | None: 

616 if self.task.depth < self.config.randomness_depth: 

617 report = calculate_randomness(path) 

618 if self.config.randomness_plot: 

619 logger.debug( 

620 "Randomness chart", 

621 # New line so that chart title will be aligned correctly in the next line 

622 chart="\n" + format_randomness_plot(report), 

623 path=path, 

624 _verbosity=3, 

625 ) 

626 return report 

627 return None 

628 

629 def _extract_chunk( 

630 self, 

631 carved_path: Path, 

632 chunk: ValidChunk, 

633 extract_dir: Path, 

634 *, 

635 remove_extracted_input: bool, 

636 ) -> bool: 

637 extraction_successful = False 

638 if extract_dir.exists(): 

639 # Extraction directory is not supposed to exist, it mixes up original and extracted files, 

640 # and it would just introduce weird, non-deterministic problems due to interference on paths 

641 # by multiple workers (parallel processing, modifying content (fix_symlink), 

642 # and `mmap` + open for write with O_TRUNC). 

643 logger.error( 

644 "Skipped: extraction directory exists", 

645 extract_dir=extract_dir, 

646 chunk=chunk, 

647 ) 

648 self.result.add_report( 

649 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)]) 

650 ) 

651 return False 

652 

653 if self.config.skip_extraction: 

654 fix_extracted_directory(extract_dir, self.result) 

655 return False 

656 

657 extraction_reports = [] 

658 try: 

659 if result := chunk.extract(carved_path, extract_dir): 

660 extraction_reports.extend(result.reports) 

661 

662 if remove_extracted_input: 

663 logger.debug("Removing extracted chunk", path=carved_path) 

664 carved_path.unlink() 

665 

666 except ExtractError as e: 

667 extraction_reports.extend(e.reports) 

668 except Exception as exc: 

669 logger.exception("Unknown error happened while extracting chunk") 

670 extraction_reports.append(UnknownError(exception=exc)) 

671 

672 extraction_successful = not any( 

673 isinstance(report, ErrorReport) for report in extraction_reports 

674 ) 

675 self.result.add_report(chunk.as_report(extraction_reports)) 

676 

677 # we want to get consistent partial output even in case of unforeseen problems 

678 fix_extracted_directory(extract_dir, self.result) 

679 delete_empty_extract_dir(extract_dir) 

680 

681 if extract_dir.exists(): 

682 self.result.add_subtask( 

683 Task( 

684 blob_id=chunk.id, 

685 path=extract_dir, 

686 depth=self.task.depth + 1, 

687 ) 

688 ) 

689 return extraction_successful 

690 

691 def _delete_extracted_file_if_needed( 

692 self, delete_candidate_path: Path, chunk: ValidChunk 

693 ) -> None: 

694 filter_set = set(self.config.extracted_file_handler_filter) 

695 if not self._should_delete_extracted_file(chunk, filter_set): 

696 return 

697 

698 if self.task.depth == 0: 

699 return 

700 

701 if not delete_candidate_path.exists() or delete_candidate_path.is_dir(): 

702 return 

703 

704 try: 

705 delete_candidate_path.unlink() 

706 self.result.add_report( 

707 ExtractedFileDeletedReport( 

708 path=delete_candidate_path, 

709 handler_name=chunk.handler.NAME, 

710 ) 

711 ) 

712 logger.debug( 

713 "Removed extracted file after extraction", 

714 path=delete_candidate_path, 

715 handler=chunk.handler.NAME, 

716 ) 

717 except OSError: 

718 logger.warning( 

719 "Failed to remove extracted file after extraction", 

720 path=delete_candidate_path, 

721 ) 

722 

723 def _should_delete_extracted_file( 

724 self, chunk: ValidChunk, filter_set: set[str] 

725 ) -> bool: 

726 if not chunk.is_whole_file: 

727 return False 

728 

729 deletion_mode = self.config.extracted_file_deletion 

730 

731 if deletion_mode is ExtractedFileDeletionMode.NONE: 

732 return False 

733 

734 if deletion_mode is ExtractedFileDeletionMode.SELECTED: 

735 return chunk.handler.NAME in filter_set 

736 

737 return True 

738 

739 

740def assign_file_to_chunks(chunks: Sequence[Chunk], file: File): 

741 for chunk in chunks: 

742 assert chunk.file is None 

743 chunk.file = file 

744 

745 

746def delete_empty_extract_dir(extract_dir: Path): 

747 if extract_dir.exists() and not any(extract_dir.iterdir()): 

748 extract_dir.rmdir() 

749 

750 

751def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]: 

752 """Remove all chunks from the list which are within another bigger chunks.""" 

753 if not chunks: 

754 return [] 

755 

756 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True) 

757 outer_chunks = [chunks_by_size[0]] 

758 for chunk in chunks_by_size[1:]: 

759 if not any(outer.contains(chunk) for outer in outer_chunks): 

760 outer_chunks.append(chunk) 

761 

762 outer_count = len(outer_chunks) 

763 removed_count = len(chunks) - outer_count 

764 logger.debug( 

765 "Removed inner chunks", 

766 outer_chunk_count=noformat(outer_count), 

767 removed_inner_chunk_count=noformat(removed_count), 

768 _verbosity=2, 

769 ) 

770 return outer_chunks 

771 

772 

773def calculate_unknown_chunks( 

774 chunks: list[ValidChunk], file_size: int 

775) -> list[UnknownChunk]: 

776 """Calculate the empty gaps between chunks.""" 

777 if not chunks or file_size == 0: 

778 return [] 

779 

780 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset")) 

781 

782 unknown_chunks = [] 

783 

784 first = sorted_by_offset[0] 

785 if first.start_offset != 0: 

786 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset) 

787 unknown_chunks.append(unknown_chunk) 

788 

789 for chunk, next_chunk in pairwise(sorted_by_offset): 

790 diff = next_chunk.start_offset - chunk.end_offset 

791 if diff != 0: 

792 unknown_chunk = UnknownChunk( 

793 start_offset=chunk.end_offset, 

794 end_offset=next_chunk.start_offset, 

795 ) 

796 unknown_chunks.append(unknown_chunk) 

797 

798 last = sorted_by_offset[-1] 

799 if last.end_offset < file_size: 

800 unknown_chunk = UnknownChunk( 

801 start_offset=last.end_offset, 

802 end_offset=file_size, 

803 ) 

804 unknown_chunks.append(unknown_chunk) 

805 

806 return unknown_chunks 

807 

808 

809def calculate_randomness(path: Path) -> RandomnessReport: 

810 """Calculate and log shannon entropy divided by 8 for the file in chunks. 

811 

812 Shannon entropy returns the amount of information (in bits) of some numeric 

813 sequence. We calculate the average entropy of byte chunks, which in theory 

814 can contain 0-8 bits of entropy. We normalize it for visualization to a 

815 0-100% scale, to make it easier to interpret the graph. 

816 

817 The chi square distribution is calculated for the stream of bytes in the 

818 chunk and expressed as an absolute number and a percentage which indicates 

819 how frequently a truly random sequence would exceed the value calculated. 

820 """ 

821 shannon_percentages = [] 

822 chi_square_percentages = [] 

823 

824 # We could use the chunk size instead of another syscall, 

825 # but we rely on the actual file size written to the disk 

826 file_size = path.stat().st_size 

827 logger.debug("Calculating entropy for file", path=path, size=file_size) 

828 

829 # Smaller chunk size would be very slow to calculate. 

830 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file. 

831 block_size = calculate_block_size( 

832 file_size, 

833 chunk_count=80, 

834 min_limit=1024, 

835 max_limit=1024 * 1024, 

836 ) 

837 

838 shannon_entropy_sum = 0.0 

839 chisquare_probability_sum = 0.0 

840 with File.from_path(path) as file: 

841 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size): 

842 shannon_entropy = mt.shannon_entropy(chunk) 

843 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2) 

844 shannon_percentages.append(shannon_entropy_percentage) 

845 shannon_entropy_sum += shannon_entropy * len(chunk) 

846 

847 chi_square_probability = mt.chi_square_probability(chunk) 

848 chisquare_probability_percentage = round(chi_square_probability * 100, 2) 

849 chi_square_percentages.append(chisquare_probability_percentage) 

850 chisquare_probability_sum += chi_square_probability * len(chunk) 

851 

852 report = RandomnessReport( 

853 shannon=RandomnessMeasurements( 

854 percentages=shannon_percentages, 

855 block_size=block_size, 

856 mean=shannon_entropy_sum / file_size / 8 * 100, 

857 ), 

858 chi_square=RandomnessMeasurements( 

859 percentages=chi_square_percentages, 

860 block_size=block_size, 

861 mean=chisquare_probability_sum / file_size * 100, 

862 ), 

863 ) 

864 

865 logger.debug( 

866 "Shannon entropy calculated", 

867 path=path, 

868 size=file_size, 

869 block_size=report.shannon.block_size, 

870 mean=round(report.shannon.mean, 2), 

871 highest=round(report.shannon.highest, 2), 

872 lowest=round(report.shannon.lowest, 2), 

873 ) 

874 logger.debug( 

875 "Chi square probability calculated", 

876 path=path, 

877 size=file_size, 

878 block_size=report.chi_square.block_size, 

879 mean=round(report.chi_square.mean, 2), 

880 highest=round(report.chi_square.highest, 2), 

881 lowest=round(report.chi_square.lowest, 2), 

882 ) 

883 

884 return report 

885 

886 

887def calculate_block_size( 

888 file_size, *, chunk_count: int, min_limit: int, max_limit: int 

889) -> int: 

890 """Split the file into even sized chunks, limited by lower and upper values.""" 

891 # We don't care about floating point precision here 

892 block_size = file_size // chunk_count 

893 block_size = max(min_limit, block_size) 

894 block_size = min(block_size, max_limit) 

895 return block_size # noqa: RET504 

896 

897 

898def format_randomness_plot(report: RandomnessReport): 

899 # start from scratch 

900 plt.clear_figure() 

901 # go colorless 

902 plt.clear_color() 

903 plt.title("Entropy distribution") 

904 plt.xlabel(f"{report.shannon.block_size} bytes") 

905 

906 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot") 

907 plt.plot( 

908 report.chi_square.percentages, 

909 label="Chi square probability (%)", 

910 marker="cross", 

911 ) 

912 # 16 height leaves no gaps between the lines 

913 plt.plot_size(100, 16) 

914 plt.ylim(0, 100) 

915 # Draw ticks every 1Mb on the x axis. 

916 plt.xticks(range(len(report.shannon.percentages) + 1)) 

917 # Always show 0% and 100% 

918 plt.yticks(range(0, 101, 10)) 

919 

920 return plt.build()