Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/processing.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

408 statements  

1import multiprocessing 

2import shutil 

3from collections.abc import Iterable, Sequence 

4from operator import attrgetter 

5from pathlib import Path 

6from typing import Optional, Union 

7 

8import attrs 

9import magic 

10import plotext as plt 

11from structlog import get_logger 

12 

13from unblob import math_tools as mt 

14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers 

15 

16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory 

17from .file_utils import InvalidInputFormat, iterate_file 

18from .finder import search_chunks 

19from .iter_utils import pairwise 

20from .logging import noformat 

21from .models import ( 

22 Chunk, 

23 DirectoryHandler, 

24 DirectoryHandlers, 

25 ExtractError, 

26 File, 

27 MultiFile, 

28 PaddingChunk, 

29 ProcessResult, 

30 Task, 

31 TaskResult, 

32 UnknownChunk, 

33 ValidChunk, 

34) 

35from .pool import make_pool 

36from .report import ( 

37 CalculateMultiFileExceptionReport, 

38 CarveDirectoryReport, 

39 FileMagicReport, 

40 HashReport, 

41 MultiFileCollisionReport, 

42 OutputDirectoryExistsReport, 

43 RandomnessMeasurements, 

44 RandomnessReport, 

45 Report, 

46 StatReport, 

47 UnknownError, 

48) 

49from .ui import NullProgressReporter, ProgressReporter 

50 

51logger = get_logger() 

52 

53DEFAULT_DEPTH = 10 

54DEFAULT_PROCESS_NUM = multiprocessing.cpu_count() 

55DEFAULT_SKIP_MAGIC = ( 

56 "BFLT", 

57 "Composite Document File V2 Document", 

58 "Erlang BEAM file", 

59 "GIF", 

60 "GNU message catalog", 

61 "HP Printer Job Language", 

62 "JPEG", 

63 "Java module image", 

64 "MPEG", 

65 "MS Windows icon resource", 

66 "Macromedia Flash data", 

67 "Microsoft Excel", 

68 "Microsoft PowerPoint", 

69 "Microsoft Word", 

70 "OpenDocument", 

71 "PDF document", 

72 "PNG", 

73 "SQLite", 

74 "TrueType Font data", 

75 "Web Open Font Format", 

76 "Windows Embedded CE binary image", 

77 "Xilinx BIT data", 

78 "compiled Java class", 

79 "magic binary file", 

80 "python", # # (e.g. python 2.7 byte-compiled) 

81) 

82DEFAULT_SKIP_EXTENSION = (".rlib",) 

83 

84 

85@attrs.define(kw_only=True) 

86class ExtractionConfig: 

87 extract_root: Path = attrs.field(converter=lambda value: value.resolve()) 

88 force_extract: bool = False 

89 randomness_depth: int 

90 randomness_plot: bool = False 

91 max_depth: int = DEFAULT_DEPTH 

92 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC 

93 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION 

94 skip_extraction: bool = False 

95 process_num: int = DEFAULT_PROCESS_NUM 

96 keep_extracted_chunks: bool = False 

97 extract_suffix: str = "_extract" 

98 carve_suffix: str = "_extract" 

99 handlers: Handlers = BUILTIN_HANDLERS 

100 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS 

101 verbose: int = 1 

102 progress_reporter: type[ProgressReporter] = NullProgressReporter 

103 

104 def _get_output_path(self, path: Path) -> Path: 

105 """Return path under extract root.""" 

106 try: 

107 relative_path = path.relative_to(self.extract_root) 

108 except ValueError: 

109 # path is not inside root, i.e. it is an input file 

110 relative_path = Path(path.name) 

111 return (self.extract_root / relative_path).expanduser().resolve() 

112 

113 def get_extract_dir_for(self, path: Path) -> Path: 

114 return self._get_output_path(path.with_name(path.name + self.extract_suffix)) 

115 

116 def get_carve_dir_for(self, path: Path) -> Path: 

117 return self._get_output_path(path.with_name(path.name + self.carve_suffix)) 

118 

119 

120def process_file( 

121 config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None 

122) -> ProcessResult: 

123 task = Task( 

124 blob_id="", 

125 path=input_path, 

126 depth=0, 

127 ) 

128 

129 if not input_path.is_file(): 

130 raise ValueError("input_path is not a file", input_path) 

131 

132 extract_dir = config.get_extract_dir_for(input_path) 

133 if config.force_extract and extract_dir.exists(): 

134 logger.info("Removing extract dir", path=extract_dir) 

135 shutil.rmtree(extract_dir) 

136 

137 carve_dir = config.get_carve_dir_for(input_path) 

138 if config.force_extract and carve_dir.exists(): 

139 logger.info("Removing carve dir", path=carve_dir) 

140 shutil.rmtree(carve_dir) 

141 

142 if not prepare_report_file(config, report_file): 

143 logger.error( 

144 "File not processed, as report could not be written", file=input_path 

145 ) 

146 return ProcessResult() 

147 

148 process_result = _process_task(config, task) 

149 

150 if report_file: 

151 write_json_report(report_file, process_result) 

152 

153 return process_result 

154 

155 

156def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult: 

157 processor = Processor(config) 

158 aggregated_result = ProcessResult() 

159 

160 progress_reporter = config.progress_reporter() 

161 

162 def process_result(pool, result): 

163 progress_reporter.update(result) 

164 

165 for new_task in result.subtasks: 

166 pool.submit(new_task) 

167 aggregated_result.register(result) 

168 

169 pool = make_pool( 

170 process_num=config.process_num, 

171 handler=processor.process_task, 

172 result_callback=process_result, 

173 ) 

174 

175 with pool, progress_reporter: 

176 pool.submit(task) 

177 pool.process_until_done() 

178 

179 return aggregated_result 

180 

181 

182def prepare_report_file(config: ExtractionConfig, report_file: Optional[Path]) -> bool: 

183 """Prevent report writing failing after an expensive extraction. 

184 

185 Should be called before processing tasks. 

186 

187 Returns True if there is no foreseen problem, 

188 False if report writing is known in advance to fail. 

189 """ 

190 if not report_file: 

191 # we will not write report at all 

192 return True 

193 

194 if report_file.exists(): 

195 if config.force_extract: 

196 logger.warning("Overwriting existing report file", path=report_file) 

197 try: 

198 report_file.write_text("") 

199 except OSError as e: 

200 logger.error( 

201 "Can not overwrite existing report file", 

202 path=report_file, 

203 msg=str(e), 

204 ) 

205 return False 

206 else: 

207 logger.error( 

208 "Report file exists and --force not specified", path=report_file 

209 ) 

210 return False 

211 if not report_file.parent.exists(): 

212 logger.error( 

213 "Trying to write report file to a non-existent directory", path=report_file 

214 ) 

215 return False 

216 return True 

217 

218 

219def write_json_report(report_file: Path, process_result: ProcessResult): 

220 try: 

221 report_file.write_text(process_result.to_json()) 

222 except OSError as e: 

223 logger.error("Can not write JSON report", path=report_file, msg=str(e)) 

224 except Exception: 

225 logger.exception("Can not write JSON report", path=report_file) 

226 else: 

227 logger.info("JSON report written", path=report_file) 

228 

229 

230class Processor: 

231 def __init__(self, config: ExtractionConfig): 

232 self._config = config 

233 # libmagic helpers 

234 # file magic uses a rule-set to guess the file type, however as rules are added they could 

235 # shadow each other. File magic uses rule priorities to determine which is the best matching 

236 # rule, however this could shadow other valid matches as well, which could eventually break 

237 # any further processing that depends on magic. 

238 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns 

239 # will be included in the magic string at the cost of being a bit slower, but increasing 

240 # accuracy by no shadowing rules. 

241 self._get_magic = magic.Magic(keep_going=True).from_file 

242 self._get_mime_type = magic.Magic(mime=True).from_file 

243 

244 def process_task(self, task: Task) -> TaskResult: 

245 result = TaskResult(task) 

246 try: 

247 self._process_task(result, task) 

248 except Exception as exc: 

249 self._process_error(result, exc) 

250 return result 

251 

252 def _process_error(self, result: TaskResult, exc: Exception): 

253 error_report = UnknownError(exception=exc) 

254 result.add_report(error_report) 

255 logger.exception("Unknown error happened", exc_info=exc) 

256 

257 def _process_task(self, result: TaskResult, task: Task): 

258 stat_report = StatReport.from_path(task.path) 

259 result.add_report(stat_report) 

260 log = logger.bind(path=task.path) 

261 

262 if task.depth >= self._config.max_depth: 

263 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached. 

264 log.debug( 

265 "Reached maximum depth, stop further processing", depth=task.depth 

266 ) 

267 return 

268 

269 if stat_report.is_dir: 

270 if not task.is_multi_file: 

271 _DirectoryTask(self._config, task, result).process() 

272 return 

273 

274 if not stat_report.is_file: 

275 log.debug( 

276 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)." 

277 ) 

278 return 

279 

280 magic = self._get_magic(task.path) 

281 mime_type = self._get_mime_type(task.path) 

282 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2) 

283 

284 magic_report = FileMagicReport(magic=magic, mime_type=mime_type) 

285 result.add_report(magic_report) 

286 

287 hash_report = HashReport.from_path(task.path) 

288 result.add_report(hash_report) 

289 

290 if task.is_multi_file: 

291 # The file has been processed as part of a MultiFile, we just run the task to gather reports 

292 return 

293 

294 if stat_report.size == 0: 

295 log.debug("Ignoring empty file") 

296 return 

297 

298 should_skip_file = any( 

299 magic.startswith(pattern) for pattern in self._config.skip_magic 

300 ) 

301 should_skip_file |= task.path.suffix in self._config.skip_extension 

302 

303 if should_skip_file: 

304 log.debug( 

305 "Ignoring file based on magic or extension.", 

306 magic=magic, 

307 extension=task.path.suffix, 

308 ) 

309 return 

310 

311 _FileTask(self._config, task, stat_report.size, result).process() 

312 

313 

314class DirectoryProcessingError(Exception): 

315 def __init__(self, message: str, report: Report): 

316 super().__init__() 

317 self.message = message 

318 self.report: Report = report 

319 

320 

321class _DirectoryTask: 

322 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult): 

323 self.config = config 

324 self.dir_task = dir_task 

325 self.result = result 

326 

327 def process(self): 

328 logger.debug("Processing directory", path=self.dir_task.path) 

329 

330 try: 

331 processed_paths, extract_dirs = self._process_directory() 

332 except DirectoryProcessingError as e: 

333 logger.error(e.message, report=e.report) 

334 self.result.add_report(e.report) 

335 return 

336 

337 self._iterate_directory(extract_dirs, processed_paths) 

338 

339 self._iterate_processed_files(processed_paths) 

340 

341 def _process_directory(self) -> tuple[set[Path], set[Path]]: 

342 processed_paths: set[Path] = set() 

343 extract_dirs: set[Path] = set() 

344 for dir_handler_class in self.config.dir_handlers: 

345 dir_handler = dir_handler_class() 

346 

347 for path in dir_handler.PATTERN.get_files(self.dir_task.path): 

348 multi_file = self._calculate_multifile(dir_handler, path, self.result) 

349 

350 if multi_file is None: 

351 continue 

352 

353 multi_file.handler = dir_handler 

354 

355 self._check_conflicting_files(multi_file, processed_paths) 

356 

357 extract_dir = self._extract_multi_file(multi_file) 

358 

359 # Process files in extracted directory 

360 if extract_dir.exists(): 

361 self.result.add_subtask( 

362 Task( 

363 blob_id=multi_file.id, 

364 path=extract_dir, 

365 depth=self.dir_task.depth + 1, 

366 ) 

367 ) 

368 extract_dirs.add(extract_dir) 

369 

370 processed_paths.update(multi_file.paths) 

371 return processed_paths, extract_dirs 

372 

373 @staticmethod 

374 def _calculate_multifile( 

375 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult 

376 ) -> Optional[MultiFile]: 

377 try: 

378 return dir_handler.calculate_multifile(path) 

379 except InvalidInputFormat as exc: 

380 logger.debug( 

381 "Invalid MultiFile format", 

382 exc_info=exc, 

383 handler=dir_handler.NAME, 

384 path=path, 

385 _verbosity=2, 

386 ) 

387 except Exception as exc: 

388 error_report = CalculateMultiFileExceptionReport( 

389 handler=dir_handler.NAME, 

390 exception=exc, 

391 path=path, 

392 ) 

393 task_result.add_report(error_report) 

394 logger.warning( 

395 "Unhandled Exception during multi file calculation", 

396 **error_report.asdict(), 

397 ) 

398 

399 def _check_conflicting_files( 

400 self, multi_file: MultiFile, processed_paths: set[Path] 

401 ): 

402 conflicting_paths = processed_paths.intersection(set(multi_file.paths)) 

403 if conflicting_paths: 

404 raise DirectoryProcessingError( 

405 "Conflicting match on files", 

406 report=MultiFileCollisionReport( 

407 paths=conflicting_paths, handler=multi_file.handler.NAME 

408 ), 

409 ) 

410 

411 def _extract_multi_file(self, multi_file: MultiFile) -> Path: 

412 extract_dir = self.config.get_extract_dir_for( 

413 self.dir_task.path / multi_file.name 

414 ) 

415 if extract_dir.exists(): 

416 raise DirectoryProcessingError( 

417 "Skipped: extraction directory exists", 

418 report=multi_file.as_report( 

419 [OutputDirectoryExistsReport(path=extract_dir)] 

420 ), 

421 ) 

422 

423 extraction_reports = [] 

424 try: 

425 if result := multi_file.extract(extract_dir): 

426 extraction_reports.extend(result.reports) 

427 except ExtractError as e: 

428 extraction_reports.extend(e.reports) 

429 except Exception as exc: 

430 logger.exception("Unknown error happened while extracting MultiFile") 

431 extraction_reports.append(UnknownError(exception=exc)) 

432 

433 self.result.add_report(multi_file.as_report(extraction_reports)) 

434 

435 fix_extracted_directory(extract_dir, self.result) 

436 

437 return extract_dir 

438 

439 def _iterate_processed_files(self, processed_paths): 

440 for path in processed_paths: 

441 self.result.add_subtask( 

442 Task( 

443 blob_id=self.dir_task.blob_id, 

444 path=path, 

445 depth=self.dir_task.depth, 

446 is_multi_file=True, 

447 ) 

448 ) 

449 

450 def _iterate_directory(self, extract_dirs, processed_paths): 

451 for path in self.dir_task.path.iterdir(): 

452 if path in extract_dirs or path in processed_paths: 

453 continue 

454 

455 self.result.add_subtask( 

456 Task( 

457 blob_id=self.dir_task.blob_id, 

458 path=path, 

459 depth=self.dir_task.depth, 

460 ) 

461 ) 

462 

463 

464def is_padding(file: File, chunk: UnknownChunk): 

465 chunk_bytes = set() 

466 

467 for small_chunk in iterate_file( 

468 file, chunk.start_offset, chunk.end_offset - chunk.start_offset 

469 ): 

470 chunk_bytes.update(small_chunk) 

471 

472 # early return optimization 

473 if len(chunk_bytes) > 1: 

474 return False 

475 

476 return len(chunk_bytes) == 1 

477 

478 

479def process_patterns( 

480 unknown_chunks: list[UnknownChunk], file: File 

481) -> list[Union[UnknownChunk, PaddingChunk]]: 

482 processed_chunks = [] 

483 for unknown_chunk in unknown_chunks: 

484 if is_padding(file, unknown_chunk): 

485 processed_chunks.append( 

486 PaddingChunk( 

487 start_offset=unknown_chunk.start_offset, 

488 end_offset=unknown_chunk.end_offset, 

489 id=unknown_chunk.id, 

490 file=unknown_chunk.file, 

491 ) 

492 ) 

493 else: 

494 processed_chunks.append(unknown_chunk) 

495 return processed_chunks 

496 

497 

498class _FileTask: 

499 def __init__( 

500 self, 

501 config: ExtractionConfig, 

502 task: Task, 

503 size: int, 

504 result: TaskResult, 

505 ): 

506 self.config = config 

507 self.task = task 

508 self.size = size 

509 self.result = result 

510 

511 def process(self): 

512 logger.debug("Processing file", path=self.task.path, size=self.size) 

513 

514 with File.from_path(self.task.path) as file: 

515 all_chunks = search_chunks( 

516 file, self.size, self.config.handlers, self.result 

517 ) 

518 outer_chunks = remove_inner_chunks(all_chunks) 

519 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size) 

520 unknown_chunks = process_patterns(unknown_chunks, file) 

521 assign_file_to_chunks(outer_chunks, file=file) 

522 assign_file_to_chunks(unknown_chunks, file=file) 

523 

524 if outer_chunks or unknown_chunks: 

525 self._process_chunks(file, outer_chunks, unknown_chunks) 

526 else: 

527 # we don't consider whole files as unknown chunks, but we still want to 

528 # calculate randomness for whole files which produced no valid chunks 

529 randomness = self._calculate_randomness(self.task.path) 

530 if randomness: 

531 self.result.add_report(randomness) 

532 

533 def _process_chunks( 

534 self, 

535 file: File, 

536 outer_chunks: list[ValidChunk], 

537 unknown_chunks: list[Union[UnknownChunk, PaddingChunk]], 

538 ): 

539 if unknown_chunks: 

540 logger.warning("Found unknown Chunks", chunks=unknown_chunks) 

541 

542 if self.config.skip_extraction: 

543 for chunk in unknown_chunks: 

544 self.result.add_report(chunk.as_report(randomness=None)) 

545 for chunk in outer_chunks: 

546 self.result.add_report(chunk.as_report(extraction_reports=[])) 

547 return 

548 

549 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1 

550 if is_whole_file_chunk: 

551 # skip carving, extract directly the whole file (chunk) 

552 carved_path = self.task.path 

553 for chunk in outer_chunks: 

554 self._extract_chunk( 

555 carved_path, 

556 chunk, 

557 self.config.get_extract_dir_for(carved_path), 

558 # since we do not carve, we want to keep the input around 

559 remove_extracted_input=False, 

560 ) 

561 else: 

562 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks) 

563 

564 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks): 

565 assert not self.config.skip_extraction 

566 

567 carve_dir = self.config.get_carve_dir_for(self.task.path) 

568 

569 # report the technical carve directory explicitly 

570 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir)) 

571 

572 if carve_dir.exists(): 

573 # Carve directory is not supposed to exist, it is usually a simple mistake of running 

574 # unblob again without cleaning up or using --force. 

575 # It would cause problems continuing, as it would mix up original and extracted files, 

576 # and it would just introduce weird, non-deterministic problems due to interference on paths 

577 # by multiple workers (parallel processing, modifying content (fix_symlink), 

578 # and `mmap` + open for write with O_TRUNC). 

579 logger.error("Skipped: carve directory exists", carve_dir=carve_dir) 

580 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir)) 

581 return 

582 

583 for chunk in unknown_chunks: 

584 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk) 

585 randomness = self._calculate_randomness(carved_unknown_path) 

586 self.result.add_report(chunk.as_report(randomness=randomness)) 

587 

588 for chunk in outer_chunks: 

589 carved_path = carve_valid_chunk(carve_dir, file, chunk) 

590 

591 self._extract_chunk( 

592 carved_path, 

593 chunk, 

594 self.config.get_extract_dir_for(carved_path), 

595 # when a carved chunk is successfully extracted, usually 

596 # we want to get rid of it, as its data is available in 

597 # extracted format, and the raw data is still part of 

598 # the file the chunk belongs to 

599 remove_extracted_input=not self.config.keep_extracted_chunks, 

600 ) 

601 

602 def _calculate_randomness(self, path: Path) -> Optional[RandomnessReport]: 

603 if self.task.depth < self.config.randomness_depth: 

604 report = calculate_randomness(path) 

605 if self.config.randomness_plot: 

606 logger.debug( 

607 "Randomness chart", 

608 # New line so that chart title will be aligned correctly in the next line 

609 chart="\n" + format_randomness_plot(report), 

610 path=path, 

611 _verbosity=3, 

612 ) 

613 return report 

614 return None 

615 

616 def _extract_chunk( 

617 self, 

618 carved_path: Path, 

619 chunk: ValidChunk, 

620 extract_dir: Path, 

621 *, 

622 remove_extracted_input: bool, 

623 ): 

624 if extract_dir.exists(): 

625 # Extraction directory is not supposed to exist, it mixes up original and extracted files, 

626 # and it would just introduce weird, non-deterministic problems due to interference on paths 

627 # by multiple workers (parallel processing, modifying content (fix_symlink), 

628 # and `mmap` + open for write with O_TRUNC). 

629 logger.error( 

630 "Skipped: extraction directory exists", 

631 extract_dir=extract_dir, 

632 chunk=chunk, 

633 ) 

634 self.result.add_report( 

635 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)]) 

636 ) 

637 return 

638 

639 if self.config.skip_extraction: 

640 fix_extracted_directory(extract_dir, self.result) 

641 return 

642 

643 extraction_reports = [] 

644 try: 

645 if result := chunk.extract(carved_path, extract_dir): 

646 extraction_reports.extend(result.reports) 

647 

648 if remove_extracted_input: 

649 logger.debug("Removing extracted chunk", path=carved_path) 

650 carved_path.unlink() 

651 

652 except ExtractError as e: 

653 extraction_reports.extend(e.reports) 

654 except Exception as exc: 

655 logger.exception("Unknown error happened while extracting chunk") 

656 extraction_reports.append(UnknownError(exception=exc)) 

657 

658 self.result.add_report(chunk.as_report(extraction_reports)) 

659 

660 # we want to get consistent partial output even in case of unforeseen problems 

661 fix_extracted_directory(extract_dir, self.result) 

662 delete_empty_extract_dir(extract_dir) 

663 

664 if extract_dir.exists(): 

665 self.result.add_subtask( 

666 Task( 

667 blob_id=chunk.id, 

668 path=extract_dir, 

669 depth=self.task.depth + 1, 

670 ) 

671 ) 

672 

673 

674def assign_file_to_chunks(chunks: Sequence[Chunk], file: File): 

675 for chunk in chunks: 

676 assert chunk.file is None 

677 chunk.file = file 

678 

679 

680def delete_empty_extract_dir(extract_dir: Path): 

681 if extract_dir.exists() and not any(extract_dir.iterdir()): 

682 extract_dir.rmdir() 

683 

684 

685def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]: 

686 """Remove all chunks from the list which are within another bigger chunks.""" 

687 if not chunks: 

688 return [] 

689 

690 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True) 

691 outer_chunks = [chunks_by_size[0]] 

692 for chunk in chunks_by_size[1:]: 

693 if not any(outer.contains(chunk) for outer in outer_chunks): 

694 outer_chunks.append(chunk) 

695 

696 outer_count = len(outer_chunks) 

697 removed_count = len(chunks) - outer_count 

698 logger.debug( 

699 "Removed inner chunks", 

700 outer_chunk_count=noformat(outer_count), 

701 removed_inner_chunk_count=noformat(removed_count), 

702 _verbosity=2, 

703 ) 

704 return outer_chunks 

705 

706 

707def calculate_unknown_chunks( 

708 chunks: list[ValidChunk], file_size: int 

709) -> list[UnknownChunk]: 

710 """Calculate the empty gaps between chunks.""" 

711 if not chunks or file_size == 0: 

712 return [] 

713 

714 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset")) 

715 

716 unknown_chunks = [] 

717 

718 first = sorted_by_offset[0] 

719 if first.start_offset != 0: 

720 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset) 

721 unknown_chunks.append(unknown_chunk) 

722 

723 for chunk, next_chunk in pairwise(sorted_by_offset): 

724 diff = next_chunk.start_offset - chunk.end_offset 

725 if diff != 0: 

726 unknown_chunk = UnknownChunk( 

727 start_offset=chunk.end_offset, 

728 end_offset=next_chunk.start_offset, 

729 ) 

730 unknown_chunks.append(unknown_chunk) 

731 

732 last = sorted_by_offset[-1] 

733 if last.end_offset < file_size: 

734 unknown_chunk = UnknownChunk( 

735 start_offset=last.end_offset, 

736 end_offset=file_size, 

737 ) 

738 unknown_chunks.append(unknown_chunk) 

739 

740 return unknown_chunks 

741 

742 

743def calculate_randomness(path: Path) -> RandomnessReport: 

744 """Calculate and log shannon entropy divided by 8 for the file in chunks. 

745 

746 Shannon entropy returns the amount of information (in bits) of some numeric 

747 sequence. We calculate the average entropy of byte chunks, which in theory 

748 can contain 0-8 bits of entropy. We normalize it for visualization to a 

749 0-100% scale, to make it easier to interpret the graph. 

750 

751 The chi square distribution is calculated for the stream of bytes in the 

752 chunk and expressed as an absolute number and a percentage which indicates 

753 how frequently a truly random sequence would exceed the value calculated. 

754 """ 

755 shannon_percentages = [] 

756 chi_square_percentages = [] 

757 

758 # We could use the chunk size instead of another syscall, 

759 # but we rely on the actual file size written to the disk 

760 file_size = path.stat().st_size 

761 logger.debug("Calculating entropy for file", path=path, size=file_size) 

762 

763 # Smaller chunk size would be very slow to calculate. 

764 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file. 

765 block_size = calculate_block_size( 

766 file_size, 

767 chunk_count=80, 

768 min_limit=1024, 

769 max_limit=1024 * 1024, 

770 ) 

771 

772 shannon_entropy_sum = 0.0 

773 chisquare_probability_sum = 0.0 

774 with File.from_path(path) as file: 

775 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size): 

776 shannon_entropy = mt.shannon_entropy(chunk) 

777 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2) 

778 shannon_percentages.append(shannon_entropy_percentage) 

779 shannon_entropy_sum += shannon_entropy * len(chunk) 

780 

781 chi_square_probability = mt.chi_square_probability(chunk) 

782 chisquare_probability_percentage = round(chi_square_probability * 100, 2) 

783 chi_square_percentages.append(chisquare_probability_percentage) 

784 chisquare_probability_sum += chi_square_probability * len(chunk) 

785 

786 report = RandomnessReport( 

787 shannon=RandomnessMeasurements( 

788 percentages=shannon_percentages, 

789 block_size=block_size, 

790 mean=shannon_entropy_sum / file_size / 8 * 100, 

791 ), 

792 chi_square=RandomnessMeasurements( 

793 percentages=chi_square_percentages, 

794 block_size=block_size, 

795 mean=chisquare_probability_sum / file_size * 100, 

796 ), 

797 ) 

798 

799 logger.debug( 

800 "Shannon entropy calculated", 

801 path=path, 

802 size=file_size, 

803 block_size=report.shannon.block_size, 

804 mean=round(report.shannon.mean, 2), 

805 highest=round(report.shannon.highest, 2), 

806 lowest=round(report.shannon.lowest, 2), 

807 ) 

808 logger.debug( 

809 "Chi square probability calculated", 

810 path=path, 

811 size=file_size, 

812 block_size=report.chi_square.block_size, 

813 mean=round(report.chi_square.mean, 2), 

814 highest=round(report.chi_square.highest, 2), 

815 lowest=round(report.chi_square.lowest, 2), 

816 ) 

817 

818 return report 

819 

820 

821def calculate_block_size( 

822 file_size, *, chunk_count: int, min_limit: int, max_limit: int 

823) -> int: 

824 """Split the file into even sized chunks, limited by lower and upper values.""" 

825 # We don't care about floating point precision here 

826 block_size = file_size // chunk_count 

827 block_size = max(min_limit, block_size) 

828 block_size = min(block_size, max_limit) 

829 return block_size # noqa: RET504 

830 

831 

832def format_randomness_plot(report: RandomnessReport): 

833 # start from scratch 

834 plt.clear_figure() 

835 # go colorless 

836 plt.clear_color() 

837 plt.title("Entropy distribution") 

838 plt.xlabel(f"{report.shannon.block_size} bytes") 

839 

840 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot") 

841 plt.plot( 

842 report.chi_square.percentages, 

843 label="Chi square probability (%)", 

844 marker="cross", 

845 ) 

846 # 16 height leaves no gaps between the lines 

847 plt.plot_size(100, 16) 

848 plt.ylim(0, 100) 

849 # Draw ticks every 1Mb on the x axis. 

850 plt.xticks(range(len(report.shannon.percentages) + 1)) 

851 # Always show 0% and 100% 

852 plt.yticks(range(0, 101, 10)) 

853 

854 return plt.build()