Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/processing.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

408 statements  

1import multiprocessing 

2import shutil 

3from collections.abc import Iterable, Sequence 

4from operator import attrgetter 

5from pathlib import Path 

6from typing import Optional, Union 

7 

8import attrs 

9import magic 

10import plotext as plt 

11from structlog import get_logger 

12 

13from unblob import math_tools as mt 

14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers 

15 

16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory 

17from .file_utils import InvalidInputFormat, iterate_file 

18from .finder import search_chunks 

19from .iter_utils import pairwise 

20from .logging import noformat 

21from .models import ( 

22 Chunk, 

23 DirectoryHandler, 

24 DirectoryHandlers, 

25 ExtractError, 

26 File, 

27 MultiFile, 

28 PaddingChunk, 

29 ProcessResult, 

30 Task, 

31 TaskResult, 

32 UnknownChunk, 

33 ValidChunk, 

34) 

35from .pool import make_pool 

36from .report import ( 

37 CalculateMultiFileExceptionReport, 

38 CarveDirectoryReport, 

39 FileMagicReport, 

40 HashReport, 

41 MultiFileCollisionReport, 

42 OutputDirectoryExistsReport, 

43 RandomnessMeasurements, 

44 RandomnessReport, 

45 Report, 

46 StatReport, 

47 UnknownError, 

48) 

49from .ui import NullProgressReporter, ProgressReporter 

50 

51logger = get_logger() 

52 

53DEFAULT_DEPTH = 10 

54DEFAULT_PROCESS_NUM = multiprocessing.cpu_count() 

55DEFAULT_SKIP_MAGIC = ( 

56 "BFLT", 

57 "Erlang BEAM file", 

58 "GIF", 

59 "GNU message catalog", 

60 "HP Printer Job Language", 

61 "JPEG", 

62 "Java module image", 

63 "MPEG", 

64 "MS Windows icon resource", 

65 "Macromedia Flash data", 

66 "Microsoft Excel", 

67 "Microsoft PowerPoint", 

68 "Microsoft Word", 

69 "OpenDocument", 

70 "PDF document", 

71 "PNG", 

72 "SQLite", 

73 "TrueType Font data", 

74 "Web Open Font Format", 

75 "Windows Embedded CE binary image", 

76 "Xilinx BIT data", 

77 "compiled Java class", 

78 "magic binary file", 

79 "python", # # (e.g. python 2.7 byte-compiled) 

80) 

81DEFAULT_SKIP_EXTENSION = (".rlib",) 

82 

83 

84@attrs.define(kw_only=True) 

85class ExtractionConfig: 

86 extract_root: Path = attrs.field(converter=lambda value: value.resolve()) 

87 force_extract: bool = False 

88 randomness_depth: int 

89 randomness_plot: bool = False 

90 max_depth: int = DEFAULT_DEPTH 

91 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC 

92 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION 

93 skip_extraction: bool = False 

94 process_num: int = DEFAULT_PROCESS_NUM 

95 keep_extracted_chunks: bool = False 

96 extract_suffix: str = "_extract" 

97 carve_suffix: str = "_extract" 

98 handlers: Handlers = BUILTIN_HANDLERS 

99 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS 

100 verbose: int = 1 

101 progress_reporter: type[ProgressReporter] = NullProgressReporter 

102 

103 def _get_output_path(self, path: Path) -> Path: 

104 """Return path under extract root.""" 

105 try: 

106 relative_path = path.relative_to(self.extract_root) 

107 except ValueError: 

108 # path is not inside root, i.e. it is an input file 

109 relative_path = Path(path.name) 

110 return (self.extract_root / relative_path).expanduser().resolve() 

111 

112 def get_extract_dir_for(self, path: Path) -> Path: 

113 return self._get_output_path(path.with_name(path.name + self.extract_suffix)) 

114 

115 def get_carve_dir_for(self, path: Path) -> Path: 

116 return self._get_output_path(path.with_name(path.name + self.carve_suffix)) 

117 

118 

119def process_file( 

120 config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None 

121) -> ProcessResult: 

122 task = Task( 

123 blob_id="", 

124 path=input_path, 

125 depth=0, 

126 ) 

127 

128 if not input_path.is_file(): 

129 raise ValueError("input_path is not a file", input_path) 

130 

131 extract_dir = config.get_extract_dir_for(input_path) 

132 if config.force_extract and extract_dir.exists(): 

133 logger.info("Removing extract dir", path=extract_dir) 

134 shutil.rmtree(extract_dir) 

135 

136 carve_dir = config.get_carve_dir_for(input_path) 

137 if config.force_extract and carve_dir.exists(): 

138 logger.info("Removing carve dir", path=carve_dir) 

139 shutil.rmtree(carve_dir) 

140 

141 if not prepare_report_file(config, report_file): 

142 logger.error( 

143 "File not processed, as report could not be written", file=input_path 

144 ) 

145 return ProcessResult() 

146 

147 process_result = _process_task(config, task) 

148 

149 if report_file: 

150 write_json_report(report_file, process_result) 

151 

152 return process_result 

153 

154 

155def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult: 

156 processor = Processor(config) 

157 aggregated_result = ProcessResult() 

158 

159 progress_reporter = config.progress_reporter() 

160 

161 def process_result(pool, result): 

162 progress_reporter.update(result) 

163 

164 for new_task in result.subtasks: 

165 pool.submit(new_task) 

166 aggregated_result.register(result) 

167 

168 pool = make_pool( 

169 process_num=config.process_num, 

170 handler=processor.process_task, 

171 result_callback=process_result, 

172 ) 

173 

174 with pool, progress_reporter: 

175 pool.submit(task) 

176 pool.process_until_done() 

177 

178 return aggregated_result 

179 

180 

181def prepare_report_file(config: ExtractionConfig, report_file: Optional[Path]) -> bool: 

182 """Prevent report writing failing after an expensive extraction. 

183 

184 Should be called before processing tasks. 

185 

186 Returns True if there is no foreseen problem, 

187 False if report writing is known in advance to fail. 

188 """ 

189 if not report_file: 

190 # we will not write report at all 

191 return True 

192 

193 if report_file.exists(): 

194 if config.force_extract: 

195 logger.warning("Overwriting existing report file", path=report_file) 

196 try: 

197 report_file.write_text("") 

198 except OSError as e: 

199 logger.error( 

200 "Can not overwrite existing report file", 

201 path=report_file, 

202 msg=str(e), 

203 ) 

204 return False 

205 else: 

206 logger.error( 

207 "Report file exists and --force not specified", path=report_file 

208 ) 

209 return False 

210 if not report_file.parent.exists(): 

211 logger.error( 

212 "Trying to write report file to a non-existent directory", path=report_file 

213 ) 

214 return False 

215 return True 

216 

217 

218def write_json_report(report_file: Path, process_result: ProcessResult): 

219 try: 

220 report_file.write_text(process_result.to_json()) 

221 except OSError as e: 

222 logger.error("Can not write JSON report", path=report_file, msg=str(e)) 

223 except Exception: 

224 logger.exception("Can not write JSON report", path=report_file) 

225 else: 

226 logger.info("JSON report written", path=report_file) 

227 

228 

229class Processor: 

230 def __init__(self, config: ExtractionConfig): 

231 self._config = config 

232 # libmagic helpers 

233 # file magic uses a rule-set to guess the file type, however as rules are added they could 

234 # shadow each other. File magic uses rule priorities to determine which is the best matching 

235 # rule, however this could shadow other valid matches as well, which could eventually break 

236 # any further processing that depends on magic. 

237 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns 

238 # will be included in the magic string at the cost of being a bit slower, but increasing 

239 # accuracy by no shadowing rules. 

240 self._get_magic = magic.Magic(keep_going=True).from_file 

241 self._get_mime_type = magic.Magic(mime=True).from_file 

242 

243 def process_task(self, task: Task) -> TaskResult: 

244 result = TaskResult(task=task) 

245 try: 

246 self._process_task(result, task) 

247 except Exception as exc: 

248 self._process_error(result, exc) 

249 return result 

250 

251 def _process_error(self, result: TaskResult, exc: Exception): 

252 error_report = UnknownError(exception=exc) 

253 result.add_report(error_report) 

254 logger.exception("Unknown error happened", exc_info=exc) 

255 

256 def _process_task(self, result: TaskResult, task: Task): 

257 stat_report = StatReport.from_path(task.path) 

258 result.add_report(stat_report) 

259 log = logger.bind(path=task.path) 

260 

261 if task.depth >= self._config.max_depth: 

262 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached. 

263 log.debug( 

264 "Reached maximum depth, stop further processing", depth=task.depth 

265 ) 

266 return 

267 

268 if stat_report.is_dir: 

269 if not task.is_multi_file: 

270 _DirectoryTask(self._config, task, result).process() 

271 return 

272 

273 if not stat_report.is_file: 

274 log.debug( 

275 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)." 

276 ) 

277 return 

278 

279 magic = self._get_magic(task.path) 

280 mime_type = self._get_mime_type(task.path) 

281 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2) 

282 

283 magic_report = FileMagicReport(magic=magic, mime_type=mime_type) 

284 result.add_report(magic_report) 

285 

286 hash_report = HashReport.from_path(task.path) 

287 result.add_report(hash_report) 

288 

289 if task.is_multi_file: 

290 # The file has been processed as part of a MultiFile, we just run the task to gather reports 

291 return 

292 

293 if stat_report.size == 0: 

294 log.debug("Ignoring empty file") 

295 return 

296 

297 should_skip_file = any( 

298 magic.startswith(pattern) for pattern in self._config.skip_magic 

299 ) 

300 should_skip_file |= task.path.suffix in self._config.skip_extension 

301 

302 if should_skip_file: 

303 log.debug( 

304 "Ignoring file based on magic or extension.", 

305 magic=magic, 

306 extension=task.path.suffix, 

307 ) 

308 return 

309 

310 _FileTask(self._config, task, stat_report.size, result).process() 

311 

312 

313class DirectoryProcessingError(Exception): 

314 def __init__(self, message: str, report: Report): 

315 super().__init__() 

316 self.message = message 

317 self.report: Report = report 

318 

319 

320class _DirectoryTask: 

321 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult): 

322 self.config = config 

323 self.dir_task = dir_task 

324 self.result = result 

325 

326 def process(self): 

327 logger.debug("Processing directory", path=self.dir_task.path) 

328 

329 try: 

330 processed_paths, extract_dirs = self._process_directory() 

331 except DirectoryProcessingError as e: 

332 logger.error(e.message, report=e.report) 

333 self.result.add_report(e.report) 

334 return 

335 

336 self._iterate_directory(extract_dirs, processed_paths) 

337 

338 self._iterate_processed_files(processed_paths) 

339 

340 def _process_directory(self) -> tuple[set[Path], set[Path]]: 

341 processed_paths: set[Path] = set() 

342 extract_dirs: set[Path] = set() 

343 for dir_handler_class in self.config.dir_handlers: 

344 dir_handler = dir_handler_class() 

345 

346 for path in dir_handler.PATTERN.get_files(self.dir_task.path): 

347 multi_file = self._calculate_multifile(dir_handler, path, self.result) 

348 

349 if multi_file is None: 

350 continue 

351 

352 multi_file.handler = dir_handler 

353 

354 self._check_conflicting_files(multi_file, processed_paths) 

355 

356 extract_dir = self._extract_multi_file(multi_file) 

357 

358 # Process files in extracted directory 

359 if extract_dir.exists(): 

360 self.result.add_subtask( 

361 Task( 

362 blob_id=multi_file.id, 

363 path=extract_dir, 

364 depth=self.dir_task.depth + 1, 

365 ) 

366 ) 

367 extract_dirs.add(extract_dir) 

368 

369 processed_paths.update(multi_file.paths) 

370 return processed_paths, extract_dirs 

371 

372 @staticmethod 

373 def _calculate_multifile( 

374 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult 

375 ) -> Optional[MultiFile]: 

376 try: 

377 return dir_handler.calculate_multifile(path) 

378 except InvalidInputFormat as exc: 

379 logger.debug( 

380 "Invalid MultiFile format", 

381 exc_info=exc, 

382 handler=dir_handler.NAME, 

383 path=path, 

384 _verbosity=2, 

385 ) 

386 except Exception as exc: 

387 error_report = CalculateMultiFileExceptionReport( 

388 handler=dir_handler.NAME, 

389 exception=exc, 

390 path=path, 

391 ) 

392 task_result.add_report(error_report) 

393 logger.warning( 

394 "Unhandled Exception during multi file calculation", 

395 **error_report.model_dump(), 

396 ) 

397 

398 def _check_conflicting_files( 

399 self, multi_file: MultiFile, processed_paths: set[Path] 

400 ): 

401 conflicting_paths = processed_paths.intersection(set(multi_file.paths)) 

402 if conflicting_paths: 

403 raise DirectoryProcessingError( 

404 "Conflicting match on files", 

405 report=MultiFileCollisionReport( 

406 paths=conflicting_paths, handler=multi_file.handler.NAME 

407 ), 

408 ) 

409 

410 def _extract_multi_file(self, multi_file: MultiFile) -> Path: 

411 extract_dir = self.config.get_extract_dir_for( 

412 self.dir_task.path / multi_file.name 

413 ) 

414 if extract_dir.exists(): 

415 raise DirectoryProcessingError( 

416 "Skipped: extraction directory exists", 

417 report=multi_file.as_report( 

418 [OutputDirectoryExistsReport(path=extract_dir)] 

419 ), 

420 ) 

421 

422 extraction_reports = [] 

423 try: 

424 if result := multi_file.extract(extract_dir): 

425 extraction_reports.extend(result.reports) 

426 except ExtractError as e: 

427 extraction_reports.extend(e.reports) 

428 except Exception as exc: 

429 logger.exception("Unknown error happened while extracting MultiFile") 

430 extraction_reports.append(UnknownError(exception=exc)) 

431 

432 self.result.add_report(multi_file.as_report(extraction_reports)) 

433 

434 fix_extracted_directory(extract_dir, self.result) 

435 

436 return extract_dir 

437 

438 def _iterate_processed_files(self, processed_paths): 

439 for path in processed_paths: 

440 self.result.add_subtask( 

441 Task( 

442 blob_id=self.dir_task.blob_id, 

443 path=path, 

444 depth=self.dir_task.depth, 

445 is_multi_file=True, 

446 ) 

447 ) 

448 

449 def _iterate_directory(self, extract_dirs, processed_paths): 

450 for path in self.dir_task.path.iterdir(): 

451 if path in extract_dirs or path in processed_paths: 

452 continue 

453 

454 self.result.add_subtask( 

455 Task( 

456 blob_id=self.dir_task.blob_id, 

457 path=path, 

458 depth=self.dir_task.depth, 

459 ) 

460 ) 

461 

462 

463def is_padding(file: File, chunk: UnknownChunk): 

464 chunk_bytes = set() 

465 

466 for small_chunk in iterate_file( 

467 file, chunk.start_offset, chunk.end_offset - chunk.start_offset 

468 ): 

469 chunk_bytes.update(small_chunk) 

470 

471 # early return optimization 

472 if len(chunk_bytes) > 1: 

473 return False 

474 

475 return len(chunk_bytes) == 1 

476 

477 

478def process_patterns( 

479 unknown_chunks: list[UnknownChunk], file: File 

480) -> list[Union[UnknownChunk, PaddingChunk]]: 

481 processed_chunks = [] 

482 for unknown_chunk in unknown_chunks: 

483 if is_padding(file, unknown_chunk): 

484 processed_chunks.append( 

485 PaddingChunk( 

486 start_offset=unknown_chunk.start_offset, 

487 end_offset=unknown_chunk.end_offset, 

488 id=unknown_chunk.id, 

489 file=unknown_chunk.file, 

490 ) 

491 ) 

492 else: 

493 processed_chunks.append(unknown_chunk) 

494 return processed_chunks 

495 

496 

497class _FileTask: 

498 def __init__( 

499 self, 

500 config: ExtractionConfig, 

501 task: Task, 

502 size: int, 

503 result: TaskResult, 

504 ): 

505 self.config = config 

506 self.task = task 

507 self.size = size 

508 self.result = result 

509 

510 def process(self): 

511 logger.debug("Processing file", path=self.task.path, size=self.size) 

512 

513 with File.from_path(self.task.path) as file: 

514 all_chunks = search_chunks( 

515 file, self.size, self.config.handlers, self.result 

516 ) 

517 outer_chunks = remove_inner_chunks(all_chunks) 

518 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size) 

519 unknown_chunks = process_patterns(unknown_chunks, file) 

520 assign_file_to_chunks(outer_chunks, file=file) 

521 assign_file_to_chunks(unknown_chunks, file=file) 

522 

523 if outer_chunks or unknown_chunks: 

524 self._process_chunks(file, outer_chunks, unknown_chunks) 

525 else: 

526 # we don't consider whole files as unknown chunks, but we still want to 

527 # calculate randomness for whole files which produced no valid chunks 

528 randomness = self._calculate_randomness(self.task.path) 

529 if randomness: 

530 self.result.add_report(randomness) 

531 

532 def _process_chunks( 

533 self, 

534 file: File, 

535 outer_chunks: list[ValidChunk], 

536 unknown_chunks: list[Union[UnknownChunk, PaddingChunk]], 

537 ): 

538 if unknown_chunks: 

539 logger.warning("Found unknown Chunks", chunks=unknown_chunks) 

540 

541 if self.config.skip_extraction: 

542 for chunk in unknown_chunks: 

543 self.result.add_report(chunk.as_report(randomness=None)) 

544 for chunk in outer_chunks: 

545 self.result.add_report(chunk.as_report(extraction_reports=[])) 

546 return 

547 

548 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1 

549 if is_whole_file_chunk: 

550 # skip carving, extract directly the whole file (chunk) 

551 carved_path = self.task.path 

552 for chunk in outer_chunks: 

553 self._extract_chunk( 

554 carved_path, 

555 chunk, 

556 self.config.get_extract_dir_for(carved_path), 

557 # since we do not carve, we want to keep the input around 

558 remove_extracted_input=False, 

559 ) 

560 else: 

561 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks) 

562 

563 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks): 

564 assert not self.config.skip_extraction 

565 

566 carve_dir = self.config.get_carve_dir_for(self.task.path) 

567 

568 # report the technical carve directory explicitly 

569 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir)) 

570 

571 if carve_dir.exists(): 

572 # Carve directory is not supposed to exist, it is usually a simple mistake of running 

573 # unblob again without cleaning up or using --force. 

574 # It would cause problems continuing, as it would mix up original and extracted files, 

575 # and it would just introduce weird, non-deterministic problems due to interference on paths 

576 # by multiple workers (parallel processing, modifying content (fix_symlink), 

577 # and `mmap` + open for write with O_TRUNC). 

578 logger.error("Skipped: carve directory exists", carve_dir=carve_dir) 

579 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir)) 

580 return 

581 

582 for chunk in unknown_chunks: 

583 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk) 

584 randomness = self._calculate_randomness(carved_unknown_path) 

585 self.result.add_report(chunk.as_report(randomness=randomness)) 

586 

587 for chunk in outer_chunks: 

588 carved_path = carve_valid_chunk(carve_dir, file, chunk) 

589 

590 self._extract_chunk( 

591 carved_path, 

592 chunk, 

593 self.config.get_extract_dir_for(carved_path), 

594 # when a carved chunk is successfully extracted, usually 

595 # we want to get rid of it, as its data is available in 

596 # extracted format, and the raw data is still part of 

597 # the file the chunk belongs to 

598 remove_extracted_input=not self.config.keep_extracted_chunks, 

599 ) 

600 

601 def _calculate_randomness(self, path: Path) -> Optional[RandomnessReport]: 

602 if self.task.depth < self.config.randomness_depth: 

603 report = calculate_randomness(path) 

604 if self.config.randomness_plot: 

605 logger.debug( 

606 "Randomness chart", 

607 # New line so that chart title will be aligned correctly in the next line 

608 chart="\n" + format_randomness_plot(report), 

609 path=path, 

610 _verbosity=3, 

611 ) 

612 return report 

613 return None 

614 

615 def _extract_chunk( 

616 self, 

617 carved_path: Path, 

618 chunk: ValidChunk, 

619 extract_dir: Path, 

620 *, 

621 remove_extracted_input: bool, 

622 ): 

623 if extract_dir.exists(): 

624 # Extraction directory is not supposed to exist, it mixes up original and extracted files, 

625 # and it would just introduce weird, non-deterministic problems due to interference on paths 

626 # by multiple workers (parallel processing, modifying content (fix_symlink), 

627 # and `mmap` + open for write with O_TRUNC). 

628 logger.error( 

629 "Skipped: extraction directory exists", 

630 extract_dir=extract_dir, 

631 chunk=chunk, 

632 ) 

633 self.result.add_report( 

634 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)]) 

635 ) 

636 return 

637 

638 if self.config.skip_extraction: 

639 fix_extracted_directory(extract_dir, self.result) 

640 return 

641 

642 extraction_reports = [] 

643 try: 

644 if result := chunk.extract(carved_path, extract_dir): 

645 extraction_reports.extend(result.reports) 

646 

647 if remove_extracted_input: 

648 logger.debug("Removing extracted chunk", path=carved_path) 

649 carved_path.unlink() 

650 

651 except ExtractError as e: 

652 extraction_reports.extend(e.reports) 

653 except Exception as exc: 

654 logger.exception("Unknown error happened while extracting chunk") 

655 extraction_reports.append(UnknownError(exception=exc)) 

656 

657 self.result.add_report(chunk.as_report(extraction_reports)) 

658 

659 # we want to get consistent partial output even in case of unforeseen problems 

660 fix_extracted_directory(extract_dir, self.result) 

661 delete_empty_extract_dir(extract_dir) 

662 

663 if extract_dir.exists(): 

664 self.result.add_subtask( 

665 Task( 

666 blob_id=chunk.id, 

667 path=extract_dir, 

668 depth=self.task.depth + 1, 

669 ) 

670 ) 

671 

672 

673def assign_file_to_chunks(chunks: Sequence[Chunk], file: File): 

674 for chunk in chunks: 

675 assert chunk.file is None 

676 chunk.file = file 

677 

678 

679def delete_empty_extract_dir(extract_dir: Path): 

680 if extract_dir.exists() and not any(extract_dir.iterdir()): 

681 extract_dir.rmdir() 

682 

683 

684def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]: 

685 """Remove all chunks from the list which are within another bigger chunks.""" 

686 if not chunks: 

687 return [] 

688 

689 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True) 

690 outer_chunks = [chunks_by_size[0]] 

691 for chunk in chunks_by_size[1:]: 

692 if not any(outer.contains(chunk) for outer in outer_chunks): 

693 outer_chunks.append(chunk) 

694 

695 outer_count = len(outer_chunks) 

696 removed_count = len(chunks) - outer_count 

697 logger.debug( 

698 "Removed inner chunks", 

699 outer_chunk_count=noformat(outer_count), 

700 removed_inner_chunk_count=noformat(removed_count), 

701 _verbosity=2, 

702 ) 

703 return outer_chunks 

704 

705 

706def calculate_unknown_chunks( 

707 chunks: list[ValidChunk], file_size: int 

708) -> list[UnknownChunk]: 

709 """Calculate the empty gaps between chunks.""" 

710 if not chunks or file_size == 0: 

711 return [] 

712 

713 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset")) 

714 

715 unknown_chunks = [] 

716 

717 first = sorted_by_offset[0] 

718 if first.start_offset != 0: 

719 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset) 

720 unknown_chunks.append(unknown_chunk) 

721 

722 for chunk, next_chunk in pairwise(sorted_by_offset): 

723 diff = next_chunk.start_offset - chunk.end_offset 

724 if diff != 0: 

725 unknown_chunk = UnknownChunk( 

726 start_offset=chunk.end_offset, 

727 end_offset=next_chunk.start_offset, 

728 ) 

729 unknown_chunks.append(unknown_chunk) 

730 

731 last = sorted_by_offset[-1] 

732 if last.end_offset < file_size: 

733 unknown_chunk = UnknownChunk( 

734 start_offset=last.end_offset, 

735 end_offset=file_size, 

736 ) 

737 unknown_chunks.append(unknown_chunk) 

738 

739 return unknown_chunks 

740 

741 

742def calculate_randomness(path: Path) -> RandomnessReport: 

743 """Calculate and log shannon entropy divided by 8 for the file in chunks. 

744 

745 Shannon entropy returns the amount of information (in bits) of some numeric 

746 sequence. We calculate the average entropy of byte chunks, which in theory 

747 can contain 0-8 bits of entropy. We normalize it for visualization to a 

748 0-100% scale, to make it easier to interpret the graph. 

749 

750 The chi square distribution is calculated for the stream of bytes in the 

751 chunk and expressed as an absolute number and a percentage which indicates 

752 how frequently a truly random sequence would exceed the value calculated. 

753 """ 

754 shannon_percentages = [] 

755 chi_square_percentages = [] 

756 

757 # We could use the chunk size instead of another syscall, 

758 # but we rely on the actual file size written to the disk 

759 file_size = path.stat().st_size 

760 logger.debug("Calculating entropy for file", path=path, size=file_size) 

761 

762 # Smaller chunk size would be very slow to calculate. 

763 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file. 

764 block_size = calculate_block_size( 

765 file_size, 

766 chunk_count=80, 

767 min_limit=1024, 

768 max_limit=1024 * 1024, 

769 ) 

770 

771 shannon_entropy_sum = 0.0 

772 chisquare_probability_sum = 0.0 

773 with File.from_path(path) as file: 

774 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size): 

775 shannon_entropy = mt.shannon_entropy(chunk) 

776 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2) 

777 shannon_percentages.append(shannon_entropy_percentage) 

778 shannon_entropy_sum += shannon_entropy * len(chunk) 

779 

780 chi_square_probability = mt.chi_square_probability(chunk) 

781 chisquare_probability_percentage = round(chi_square_probability * 100, 2) 

782 chi_square_percentages.append(chisquare_probability_percentage) 

783 chisquare_probability_sum += chi_square_probability * len(chunk) 

784 

785 report = RandomnessReport( 

786 shannon=RandomnessMeasurements( 

787 percentages=shannon_percentages, 

788 block_size=block_size, 

789 mean=shannon_entropy_sum / file_size / 8 * 100, 

790 ), 

791 chi_square=RandomnessMeasurements( 

792 percentages=chi_square_percentages, 

793 block_size=block_size, 

794 mean=chisquare_probability_sum / file_size * 100, 

795 ), 

796 ) 

797 

798 logger.debug( 

799 "Shannon entropy calculated", 

800 path=path, 

801 size=file_size, 

802 block_size=report.shannon.block_size, 

803 mean=round(report.shannon.mean, 2), 

804 highest=round(report.shannon.highest, 2), 

805 lowest=round(report.shannon.lowest, 2), 

806 ) 

807 logger.debug( 

808 "Chi square probability calculated", 

809 path=path, 

810 size=file_size, 

811 block_size=report.chi_square.block_size, 

812 mean=round(report.chi_square.mean, 2), 

813 highest=round(report.chi_square.highest, 2), 

814 lowest=round(report.chi_square.lowest, 2), 

815 ) 

816 

817 return report 

818 

819 

820def calculate_block_size( 

821 file_size, *, chunk_count: int, min_limit: int, max_limit: int 

822) -> int: 

823 """Split the file into even sized chunks, limited by lower and upper values.""" 

824 # We don't care about floating point precision here 

825 block_size = file_size // chunk_count 

826 block_size = max(min_limit, block_size) 

827 block_size = min(block_size, max_limit) 

828 return block_size # noqa: RET504 

829 

830 

831def format_randomness_plot(report: RandomnessReport): 

832 # start from scratch 

833 plt.clear_figure() 

834 # go colorless 

835 plt.clear_color() 

836 plt.title("Entropy distribution") 

837 plt.xlabel(f"{report.shannon.block_size} bytes") 

838 

839 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot") 

840 plt.plot( 

841 report.chi_square.percentages, 

842 label="Chi square probability (%)", 

843 marker="cross", 

844 ) 

845 # 16 height leaves no gaps between the lines 

846 plt.plot_size(100, 16) 

847 plt.ylim(0, 100) 

848 # Draw ticks every 1Mb on the x axis. 

849 plt.xticks(range(len(report.shannon.percentages) + 1)) 

850 # Always show 0% and 100% 

851 plt.yticks(range(0, 101, 10)) 

852 

853 return plt.build()