1import multiprocessing
2import shutil
3from collections.abc import Iterable, Sequence
4from operator import attrgetter
5from pathlib import Path
6from typing import Optional, Union
7
8import attrs
9import magic
10import plotext as plt
11from structlog import get_logger
12
13from unblob import math_tools as mt
14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers
15
16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory
17from .file_utils import InvalidInputFormat, iterate_file
18from .finder import search_chunks
19from .iter_utils import pairwise
20from .logging import noformat
21from .models import (
22 Chunk,
23 DirectoryHandler,
24 DirectoryHandlers,
25 ExtractError,
26 File,
27 MultiFile,
28 PaddingChunk,
29 ProcessResult,
30 Task,
31 TaskResult,
32 UnknownChunk,
33 ValidChunk,
34)
35from .pool import make_pool
36from .report import (
37 CalculateMultiFileExceptionReport,
38 CarveDirectoryReport,
39 FileMagicReport,
40 HashReport,
41 MultiFileCollisionReport,
42 OutputDirectoryExistsReport,
43 RandomnessMeasurements,
44 RandomnessReport,
45 Report,
46 StatReport,
47 UnknownError,
48)
49from .ui import NullProgressReporter, ProgressReporter
50
51logger = get_logger()
52
53DEFAULT_DEPTH = 10
54DEFAULT_PROCESS_NUM = multiprocessing.cpu_count()
55DEFAULT_SKIP_MAGIC = (
56 "BFLT",
57 "Composite Document File V2 Document",
58 "Erlang BEAM file",
59 "GIF",
60 "GNU message catalog",
61 "HP Printer Job Language",
62 "JPEG",
63 "Java module image",
64 "MPEG",
65 "MS Windows icon resource",
66 "Macromedia Flash data",
67 "Microsoft Excel",
68 "Microsoft PowerPoint",
69 "Microsoft Word",
70 "OpenDocument",
71 "PDF document",
72 "PNG",
73 "SQLite",
74 "TrueType Font data",
75 "Web Open Font Format",
76 "Windows Embedded CE binary image",
77 "Xilinx BIT data",
78 "compiled Java class",
79 "magic binary file",
80 "python", # # (e.g. python 2.7 byte-compiled)
81)
82DEFAULT_SKIP_EXTENSION = (".rlib",)
83
84
85@attrs.define(kw_only=True)
86class ExtractionConfig:
87 extract_root: Path = attrs.field(converter=lambda value: value.resolve())
88 force_extract: bool = False
89 randomness_depth: int
90 randomness_plot: bool = False
91 max_depth: int = DEFAULT_DEPTH
92 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC
93 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION
94 skip_extraction: bool = False
95 process_num: int = DEFAULT_PROCESS_NUM
96 keep_extracted_chunks: bool = False
97 extract_suffix: str = "_extract"
98 carve_suffix: str = "_extract"
99 handlers: Handlers = BUILTIN_HANDLERS
100 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS
101 verbose: int = 1
102 progress_reporter: type[ProgressReporter] = NullProgressReporter
103
104 def _get_output_path(self, path: Path) -> Path:
105 """Return path under extract root."""
106 try:
107 relative_path = path.relative_to(self.extract_root)
108 except ValueError:
109 # path is not inside root, i.e. it is an input file
110 relative_path = Path(path.name)
111 return (self.extract_root / relative_path).expanduser().resolve()
112
113 def get_extract_dir_for(self, path: Path) -> Path:
114 return self._get_output_path(path.with_name(path.name + self.extract_suffix))
115
116 def get_carve_dir_for(self, path: Path) -> Path:
117 return self._get_output_path(path.with_name(path.name + self.carve_suffix))
118
119
120def process_file(
121 config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None
122) -> ProcessResult:
123 task = Task(
124 blob_id="",
125 path=input_path,
126 depth=0,
127 )
128
129 if not input_path.is_file():
130 raise ValueError("input_path is not a file", input_path)
131
132 extract_dir = config.get_extract_dir_for(input_path)
133 if config.force_extract and extract_dir.exists():
134 logger.info("Removing extract dir", path=extract_dir)
135 shutil.rmtree(extract_dir)
136
137 carve_dir = config.get_carve_dir_for(input_path)
138 if config.force_extract and carve_dir.exists():
139 logger.info("Removing carve dir", path=carve_dir)
140 shutil.rmtree(carve_dir)
141
142 if not prepare_report_file(config, report_file):
143 logger.error(
144 "File not processed, as report could not be written", file=input_path
145 )
146 return ProcessResult()
147
148 process_result = _process_task(config, task)
149
150 if report_file:
151 write_json_report(report_file, process_result)
152
153 return process_result
154
155
156def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult:
157 processor = Processor(config)
158 aggregated_result = ProcessResult()
159
160 progress_reporter = config.progress_reporter()
161
162 def process_result(pool, result):
163 progress_reporter.update(result)
164
165 for new_task in result.subtasks:
166 pool.submit(new_task)
167 aggregated_result.register(result)
168
169 pool = make_pool(
170 process_num=config.process_num,
171 handler=processor.process_task,
172 result_callback=process_result,
173 )
174
175 with pool, progress_reporter:
176 pool.submit(task)
177 pool.process_until_done()
178
179 return aggregated_result
180
181
182def prepare_report_file(config: ExtractionConfig, report_file: Optional[Path]) -> bool:
183 """Prevent report writing failing after an expensive extraction.
184
185 Should be called before processing tasks.
186
187 Returns True if there is no foreseen problem,
188 False if report writing is known in advance to fail.
189 """
190 if not report_file:
191 # we will not write report at all
192 return True
193
194 if report_file.exists():
195 if config.force_extract:
196 logger.warning("Overwriting existing report file", path=report_file)
197 try:
198 report_file.write_text("")
199 except OSError as e:
200 logger.error(
201 "Can not overwrite existing report file",
202 path=report_file,
203 msg=str(e),
204 )
205 return False
206 else:
207 logger.error(
208 "Report file exists and --force not specified", path=report_file
209 )
210 return False
211 if not report_file.parent.exists():
212 logger.error(
213 "Trying to write report file to a non-existent directory", path=report_file
214 )
215 return False
216 return True
217
218
219def write_json_report(report_file: Path, process_result: ProcessResult):
220 try:
221 report_file.write_text(process_result.to_json())
222 except OSError as e:
223 logger.error("Can not write JSON report", path=report_file, msg=str(e))
224 except Exception:
225 logger.exception("Can not write JSON report", path=report_file)
226 else:
227 logger.info("JSON report written", path=report_file)
228
229
230class Processor:
231 def __init__(self, config: ExtractionConfig):
232 self._config = config
233 # libmagic helpers
234 # file magic uses a rule-set to guess the file type, however as rules are added they could
235 # shadow each other. File magic uses rule priorities to determine which is the best matching
236 # rule, however this could shadow other valid matches as well, which could eventually break
237 # any further processing that depends on magic.
238 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns
239 # will be included in the magic string at the cost of being a bit slower, but increasing
240 # accuracy by no shadowing rules.
241 self._get_magic = magic.Magic(keep_going=True).from_file
242 self._get_mime_type = magic.Magic(mime=True).from_file
243
244 def process_task(self, task: Task) -> TaskResult:
245 result = TaskResult(task)
246 try:
247 self._process_task(result, task)
248 except Exception as exc:
249 self._process_error(result, exc)
250 return result
251
252 def _process_error(self, result: TaskResult, exc: Exception):
253 error_report = UnknownError(exception=exc)
254 result.add_report(error_report)
255 logger.exception("Unknown error happened", exc_info=exc)
256
257 def _process_task(self, result: TaskResult, task: Task):
258 stat_report = StatReport.from_path(task.path)
259 result.add_report(stat_report)
260 log = logger.bind(path=task.path)
261
262 if task.depth >= self._config.max_depth:
263 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached.
264 log.debug(
265 "Reached maximum depth, stop further processing", depth=task.depth
266 )
267 return
268
269 if stat_report.is_dir:
270 if not task.is_multi_file:
271 _DirectoryTask(self._config, task, result).process()
272 return
273
274 if not stat_report.is_file:
275 log.debug(
276 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)."
277 )
278 return
279
280 magic = self._get_magic(task.path)
281 mime_type = self._get_mime_type(task.path)
282 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2)
283
284 magic_report = FileMagicReport(magic=magic, mime_type=mime_type)
285 result.add_report(magic_report)
286
287 hash_report = HashReport.from_path(task.path)
288 result.add_report(hash_report)
289
290 if task.is_multi_file:
291 # The file has been processed as part of a MultiFile, we just run the task to gather reports
292 return
293
294 if stat_report.size == 0:
295 log.debug("Ignoring empty file")
296 return
297
298 should_skip_file = any(
299 magic.startswith(pattern) for pattern in self._config.skip_magic
300 )
301 should_skip_file |= task.path.suffix in self._config.skip_extension
302
303 if should_skip_file:
304 log.debug(
305 "Ignoring file based on magic or extension.",
306 magic=magic,
307 extension=task.path.suffix,
308 )
309 return
310
311 _FileTask(self._config, task, stat_report.size, result).process()
312
313
314class DirectoryProcessingError(Exception):
315 def __init__(self, message: str, report: Report):
316 super().__init__()
317 self.message = message
318 self.report: Report = report
319
320
321class _DirectoryTask:
322 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult):
323 self.config = config
324 self.dir_task = dir_task
325 self.result = result
326
327 def process(self):
328 logger.debug("Processing directory", path=self.dir_task.path)
329
330 try:
331 processed_paths, extract_dirs = self._process_directory()
332 except DirectoryProcessingError as e:
333 logger.error(e.message, report=e.report)
334 self.result.add_report(e.report)
335 return
336
337 self._iterate_directory(extract_dirs, processed_paths)
338
339 self._iterate_processed_files(processed_paths)
340
341 def _process_directory(self) -> tuple[set[Path], set[Path]]:
342 processed_paths: set[Path] = set()
343 extract_dirs: set[Path] = set()
344 for dir_handler_class in self.config.dir_handlers:
345 dir_handler = dir_handler_class()
346
347 for path in dir_handler.PATTERN.get_files(self.dir_task.path):
348 multi_file = self._calculate_multifile(dir_handler, path, self.result)
349
350 if multi_file is None:
351 continue
352
353 multi_file.handler = dir_handler
354
355 self._check_conflicting_files(multi_file, processed_paths)
356
357 extract_dir = self._extract_multi_file(multi_file)
358
359 # Process files in extracted directory
360 if extract_dir.exists():
361 self.result.add_subtask(
362 Task(
363 blob_id=multi_file.id,
364 path=extract_dir,
365 depth=self.dir_task.depth + 1,
366 )
367 )
368 extract_dirs.add(extract_dir)
369
370 processed_paths.update(multi_file.paths)
371 return processed_paths, extract_dirs
372
373 @staticmethod
374 def _calculate_multifile(
375 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult
376 ) -> Optional[MultiFile]:
377 try:
378 return dir_handler.calculate_multifile(path)
379 except InvalidInputFormat as exc:
380 logger.debug(
381 "Invalid MultiFile format",
382 exc_info=exc,
383 handler=dir_handler.NAME,
384 path=path,
385 _verbosity=2,
386 )
387 except Exception as exc:
388 error_report = CalculateMultiFileExceptionReport(
389 handler=dir_handler.NAME,
390 exception=exc,
391 path=path,
392 )
393 task_result.add_report(error_report)
394 logger.warning(
395 "Unhandled Exception during multi file calculation",
396 **error_report.asdict(),
397 )
398
399 def _check_conflicting_files(
400 self, multi_file: MultiFile, processed_paths: set[Path]
401 ):
402 conflicting_paths = processed_paths.intersection(set(multi_file.paths))
403 if conflicting_paths:
404 raise DirectoryProcessingError(
405 "Conflicting match on files",
406 report=MultiFileCollisionReport(
407 paths=conflicting_paths, handler=multi_file.handler.NAME
408 ),
409 )
410
411 def _extract_multi_file(self, multi_file: MultiFile) -> Path:
412 extract_dir = self.config.get_extract_dir_for(
413 self.dir_task.path / multi_file.name
414 )
415 if extract_dir.exists():
416 raise DirectoryProcessingError(
417 "Skipped: extraction directory exists",
418 report=multi_file.as_report(
419 [OutputDirectoryExistsReport(path=extract_dir)]
420 ),
421 )
422
423 extraction_reports = []
424 try:
425 if result := multi_file.extract(extract_dir):
426 extraction_reports.extend(result.reports)
427 except ExtractError as e:
428 extraction_reports.extend(e.reports)
429 except Exception as exc:
430 logger.exception("Unknown error happened while extracting MultiFile")
431 extraction_reports.append(UnknownError(exception=exc))
432
433 self.result.add_report(multi_file.as_report(extraction_reports))
434
435 fix_extracted_directory(extract_dir, self.result)
436
437 return extract_dir
438
439 def _iterate_processed_files(self, processed_paths):
440 for path in processed_paths:
441 self.result.add_subtask(
442 Task(
443 blob_id=self.dir_task.blob_id,
444 path=path,
445 depth=self.dir_task.depth,
446 is_multi_file=True,
447 )
448 )
449
450 def _iterate_directory(self, extract_dirs, processed_paths):
451 for path in self.dir_task.path.iterdir():
452 if path in extract_dirs or path in processed_paths:
453 continue
454
455 self.result.add_subtask(
456 Task(
457 blob_id=self.dir_task.blob_id,
458 path=path,
459 depth=self.dir_task.depth,
460 )
461 )
462
463
464def is_padding(file: File, chunk: UnknownChunk):
465 chunk_bytes = set()
466
467 for small_chunk in iterate_file(
468 file, chunk.start_offset, chunk.end_offset - chunk.start_offset
469 ):
470 chunk_bytes.update(small_chunk)
471
472 # early return optimization
473 if len(chunk_bytes) > 1:
474 return False
475
476 return len(chunk_bytes) == 1
477
478
479def process_patterns(
480 unknown_chunks: list[UnknownChunk], file: File
481) -> list[Union[UnknownChunk, PaddingChunk]]:
482 processed_chunks = []
483 for unknown_chunk in unknown_chunks:
484 if is_padding(file, unknown_chunk):
485 processed_chunks.append(
486 PaddingChunk(
487 start_offset=unknown_chunk.start_offset,
488 end_offset=unknown_chunk.end_offset,
489 id=unknown_chunk.id,
490 file=unknown_chunk.file,
491 )
492 )
493 else:
494 processed_chunks.append(unknown_chunk)
495 return processed_chunks
496
497
498class _FileTask:
499 def __init__(
500 self,
501 config: ExtractionConfig,
502 task: Task,
503 size: int,
504 result: TaskResult,
505 ):
506 self.config = config
507 self.task = task
508 self.size = size
509 self.result = result
510
511 def process(self):
512 logger.debug("Processing file", path=self.task.path, size=self.size)
513
514 with File.from_path(self.task.path) as file:
515 all_chunks = search_chunks(
516 file, self.size, self.config.handlers, self.result
517 )
518 outer_chunks = remove_inner_chunks(all_chunks)
519 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
520 unknown_chunks = process_patterns(unknown_chunks, file)
521 assign_file_to_chunks(outer_chunks, file=file)
522 assign_file_to_chunks(unknown_chunks, file=file)
523
524 if outer_chunks or unknown_chunks:
525 self._process_chunks(file, outer_chunks, unknown_chunks)
526 else:
527 # we don't consider whole files as unknown chunks, but we still want to
528 # calculate randomness for whole files which produced no valid chunks
529 randomness = self._calculate_randomness(self.task.path)
530 if randomness:
531 self.result.add_report(randomness)
532
533 def _process_chunks(
534 self,
535 file: File,
536 outer_chunks: list[ValidChunk],
537 unknown_chunks: list[Union[UnknownChunk, PaddingChunk]],
538 ):
539 if unknown_chunks:
540 logger.warning("Found unknown Chunks", chunks=unknown_chunks)
541
542 if self.config.skip_extraction:
543 for chunk in unknown_chunks:
544 self.result.add_report(chunk.as_report(randomness=None))
545 for chunk in outer_chunks:
546 self.result.add_report(chunk.as_report(extraction_reports=[]))
547 return
548
549 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1
550 if is_whole_file_chunk:
551 # skip carving, extract directly the whole file (chunk)
552 carved_path = self.task.path
553 for chunk in outer_chunks:
554 self._extract_chunk(
555 carved_path,
556 chunk,
557 self.config.get_extract_dir_for(carved_path),
558 # since we do not carve, we want to keep the input around
559 remove_extracted_input=False,
560 )
561 else:
562 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks)
563
564 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks):
565 assert not self.config.skip_extraction
566
567 carve_dir = self.config.get_carve_dir_for(self.task.path)
568
569 # report the technical carve directory explicitly
570 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir))
571
572 if carve_dir.exists():
573 # Carve directory is not supposed to exist, it is usually a simple mistake of running
574 # unblob again without cleaning up or using --force.
575 # It would cause problems continuing, as it would mix up original and extracted files,
576 # and it would just introduce weird, non-deterministic problems due to interference on paths
577 # by multiple workers (parallel processing, modifying content (fix_symlink),
578 # and `mmap` + open for write with O_TRUNC).
579 logger.error("Skipped: carve directory exists", carve_dir=carve_dir)
580 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir))
581 return
582
583 for chunk in unknown_chunks:
584 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk)
585 randomness = self._calculate_randomness(carved_unknown_path)
586 self.result.add_report(chunk.as_report(randomness=randomness))
587
588 for chunk in outer_chunks:
589 carved_path = carve_valid_chunk(carve_dir, file, chunk)
590
591 self._extract_chunk(
592 carved_path,
593 chunk,
594 self.config.get_extract_dir_for(carved_path),
595 # when a carved chunk is successfully extracted, usually
596 # we want to get rid of it, as its data is available in
597 # extracted format, and the raw data is still part of
598 # the file the chunk belongs to
599 remove_extracted_input=not self.config.keep_extracted_chunks,
600 )
601
602 def _calculate_randomness(self, path: Path) -> Optional[RandomnessReport]:
603 if self.task.depth < self.config.randomness_depth:
604 report = calculate_randomness(path)
605 if self.config.randomness_plot:
606 logger.debug(
607 "Randomness chart",
608 # New line so that chart title will be aligned correctly in the next line
609 chart="\n" + format_randomness_plot(report),
610 path=path,
611 _verbosity=3,
612 )
613 return report
614 return None
615
616 def _extract_chunk(
617 self,
618 carved_path: Path,
619 chunk: ValidChunk,
620 extract_dir: Path,
621 *,
622 remove_extracted_input: bool,
623 ):
624 if extract_dir.exists():
625 # Extraction directory is not supposed to exist, it mixes up original and extracted files,
626 # and it would just introduce weird, non-deterministic problems due to interference on paths
627 # by multiple workers (parallel processing, modifying content (fix_symlink),
628 # and `mmap` + open for write with O_TRUNC).
629 logger.error(
630 "Skipped: extraction directory exists",
631 extract_dir=extract_dir,
632 chunk=chunk,
633 )
634 self.result.add_report(
635 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)])
636 )
637 return
638
639 if self.config.skip_extraction:
640 fix_extracted_directory(extract_dir, self.result)
641 return
642
643 extraction_reports = []
644 try:
645 if result := chunk.extract(carved_path, extract_dir):
646 extraction_reports.extend(result.reports)
647
648 if remove_extracted_input:
649 logger.debug("Removing extracted chunk", path=carved_path)
650 carved_path.unlink()
651
652 except ExtractError as e:
653 extraction_reports.extend(e.reports)
654 except Exception as exc:
655 logger.exception("Unknown error happened while extracting chunk")
656 extraction_reports.append(UnknownError(exception=exc))
657
658 self.result.add_report(chunk.as_report(extraction_reports))
659
660 # we want to get consistent partial output even in case of unforeseen problems
661 fix_extracted_directory(extract_dir, self.result)
662 delete_empty_extract_dir(extract_dir)
663
664 if extract_dir.exists():
665 self.result.add_subtask(
666 Task(
667 blob_id=chunk.id,
668 path=extract_dir,
669 depth=self.task.depth + 1,
670 )
671 )
672
673
674def assign_file_to_chunks(chunks: Sequence[Chunk], file: File):
675 for chunk in chunks:
676 assert chunk.file is None
677 chunk.file = file
678
679
680def delete_empty_extract_dir(extract_dir: Path):
681 if extract_dir.exists() and not any(extract_dir.iterdir()):
682 extract_dir.rmdir()
683
684
685def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]:
686 """Remove all chunks from the list which are within another bigger chunks."""
687 if not chunks:
688 return []
689
690 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True)
691 outer_chunks = [chunks_by_size[0]]
692 for chunk in chunks_by_size[1:]:
693 if not any(outer.contains(chunk) for outer in outer_chunks):
694 outer_chunks.append(chunk)
695
696 outer_count = len(outer_chunks)
697 removed_count = len(chunks) - outer_count
698 logger.debug(
699 "Removed inner chunks",
700 outer_chunk_count=noformat(outer_count),
701 removed_inner_chunk_count=noformat(removed_count),
702 _verbosity=2,
703 )
704 return outer_chunks
705
706
707def calculate_unknown_chunks(
708 chunks: list[ValidChunk], file_size: int
709) -> list[UnknownChunk]:
710 """Calculate the empty gaps between chunks."""
711 if not chunks or file_size == 0:
712 return []
713
714 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset"))
715
716 unknown_chunks = []
717
718 first = sorted_by_offset[0]
719 if first.start_offset != 0:
720 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset)
721 unknown_chunks.append(unknown_chunk)
722
723 for chunk, next_chunk in pairwise(sorted_by_offset):
724 diff = next_chunk.start_offset - chunk.end_offset
725 if diff != 0:
726 unknown_chunk = UnknownChunk(
727 start_offset=chunk.end_offset,
728 end_offset=next_chunk.start_offset,
729 )
730 unknown_chunks.append(unknown_chunk)
731
732 last = sorted_by_offset[-1]
733 if last.end_offset < file_size:
734 unknown_chunk = UnknownChunk(
735 start_offset=last.end_offset,
736 end_offset=file_size,
737 )
738 unknown_chunks.append(unknown_chunk)
739
740 return unknown_chunks
741
742
743def calculate_randomness(path: Path) -> RandomnessReport:
744 """Calculate and log shannon entropy divided by 8 for the file in chunks.
745
746 Shannon entropy returns the amount of information (in bits) of some numeric
747 sequence. We calculate the average entropy of byte chunks, which in theory
748 can contain 0-8 bits of entropy. We normalize it for visualization to a
749 0-100% scale, to make it easier to interpret the graph.
750
751 The chi square distribution is calculated for the stream of bytes in the
752 chunk and expressed as an absolute number and a percentage which indicates
753 how frequently a truly random sequence would exceed the value calculated.
754 """
755 shannon_percentages = []
756 chi_square_percentages = []
757
758 # We could use the chunk size instead of another syscall,
759 # but we rely on the actual file size written to the disk
760 file_size = path.stat().st_size
761 logger.debug("Calculating entropy for file", path=path, size=file_size)
762
763 # Smaller chunk size would be very slow to calculate.
764 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file.
765 block_size = calculate_block_size(
766 file_size,
767 chunk_count=80,
768 min_limit=1024,
769 max_limit=1024 * 1024,
770 )
771
772 shannon_entropy_sum = 0.0
773 chisquare_probability_sum = 0.0
774 with File.from_path(path) as file:
775 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size):
776 shannon_entropy = mt.shannon_entropy(chunk)
777 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2)
778 shannon_percentages.append(shannon_entropy_percentage)
779 shannon_entropy_sum += shannon_entropy * len(chunk)
780
781 chi_square_probability = mt.chi_square_probability(chunk)
782 chisquare_probability_percentage = round(chi_square_probability * 100, 2)
783 chi_square_percentages.append(chisquare_probability_percentage)
784 chisquare_probability_sum += chi_square_probability * len(chunk)
785
786 report = RandomnessReport(
787 shannon=RandomnessMeasurements(
788 percentages=shannon_percentages,
789 block_size=block_size,
790 mean=shannon_entropy_sum / file_size / 8 * 100,
791 ),
792 chi_square=RandomnessMeasurements(
793 percentages=chi_square_percentages,
794 block_size=block_size,
795 mean=chisquare_probability_sum / file_size * 100,
796 ),
797 )
798
799 logger.debug(
800 "Shannon entropy calculated",
801 path=path,
802 size=file_size,
803 block_size=report.shannon.block_size,
804 mean=round(report.shannon.mean, 2),
805 highest=round(report.shannon.highest, 2),
806 lowest=round(report.shannon.lowest, 2),
807 )
808 logger.debug(
809 "Chi square probability calculated",
810 path=path,
811 size=file_size,
812 block_size=report.chi_square.block_size,
813 mean=round(report.chi_square.mean, 2),
814 highest=round(report.chi_square.highest, 2),
815 lowest=round(report.chi_square.lowest, 2),
816 )
817
818 return report
819
820
821def calculate_block_size(
822 file_size, *, chunk_count: int, min_limit: int, max_limit: int
823) -> int:
824 """Split the file into even sized chunks, limited by lower and upper values."""
825 # We don't care about floating point precision here
826 block_size = file_size // chunk_count
827 block_size = max(min_limit, block_size)
828 block_size = min(block_size, max_limit)
829 return block_size # noqa: RET504
830
831
832def format_randomness_plot(report: RandomnessReport):
833 # start from scratch
834 plt.clear_figure()
835 # go colorless
836 plt.clear_color()
837 plt.title("Entropy distribution")
838 plt.xlabel(f"{report.shannon.block_size} bytes")
839
840 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot")
841 plt.plot(
842 report.chi_square.percentages,
843 label="Chi square probability (%)",
844 marker="cross",
845 )
846 # 16 height leaves no gaps between the lines
847 plt.plot_size(100, 16)
848 plt.ylim(0, 100)
849 # Draw ticks every 1Mb on the x axis.
850 plt.xticks(range(len(report.shannon.percentages) + 1))
851 # Always show 0% and 100%
852 plt.yticks(range(0, 101, 10))
853
854 return plt.build()