Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/processing.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import multiprocessing
2import shutil
3from collections.abc import Iterable, Sequence
4from operator import attrgetter
5from pathlib import Path
6from typing import Optional, Union
8import attrs
9import magic
10import plotext as plt
11from structlog import get_logger
13from unblob import math_tools as mt
14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers
16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory
17from .file_utils import InvalidInputFormat, iterate_file
18from .finder import search_chunks
19from .iter_utils import pairwise
20from .logging import noformat
21from .models import (
22 Chunk,
23 DirectoryHandler,
24 DirectoryHandlers,
25 ExtractError,
26 File,
27 MultiFile,
28 PaddingChunk,
29 ProcessResult,
30 Task,
31 TaskResult,
32 UnknownChunk,
33 ValidChunk,
34)
35from .pool import make_pool
36from .report import (
37 CalculateMultiFileExceptionReport,
38 CarveDirectoryReport,
39 FileMagicReport,
40 HashReport,
41 MultiFileCollisionReport,
42 OutputDirectoryExistsReport,
43 RandomnessMeasurements,
44 RandomnessReport,
45 Report,
46 StatReport,
47 UnknownError,
48)
49from .ui import NullProgressReporter, ProgressReporter
51logger = get_logger()
53DEFAULT_DEPTH = 10
54DEFAULT_PROCESS_NUM = multiprocessing.cpu_count()
55DEFAULT_SKIP_MAGIC = (
56 "BFLT",
57 "Erlang BEAM file",
58 "GIF",
59 "GNU message catalog",
60 "HP Printer Job Language",
61 "JPEG",
62 "Java module image",
63 "MPEG",
64 "MS Windows icon resource",
65 "Macromedia Flash data",
66 "Microsoft Excel",
67 "Microsoft PowerPoint",
68 "Microsoft Word",
69 "OpenDocument",
70 "PDF document",
71 "PNG",
72 "SQLite",
73 "TrueType Font data",
74 "Web Open Font Format",
75 "Windows Embedded CE binary image",
76 "Xilinx BIT data",
77 "compiled Java class",
78 "magic binary file",
79 "python", # # (e.g. python 2.7 byte-compiled)
80)
81DEFAULT_SKIP_EXTENSION = (".rlib",)
84@attrs.define(kw_only=True)
85class ExtractionConfig:
86 extract_root: Path = attrs.field(converter=lambda value: value.resolve())
87 force_extract: bool = False
88 randomness_depth: int
89 randomness_plot: bool = False
90 max_depth: int = DEFAULT_DEPTH
91 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC
92 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION
93 skip_extraction: bool = False
94 process_num: int = DEFAULT_PROCESS_NUM
95 keep_extracted_chunks: bool = False
96 extract_suffix: str = "_extract"
97 carve_suffix: str = "_extract"
98 handlers: Handlers = BUILTIN_HANDLERS
99 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS
100 verbose: int = 1
101 progress_reporter: type[ProgressReporter] = NullProgressReporter
103 def _get_output_path(self, path: Path) -> Path:
104 """Return path under extract root."""
105 try:
106 relative_path = path.relative_to(self.extract_root)
107 except ValueError:
108 # path is not inside root, i.e. it is an input file
109 relative_path = Path(path.name)
110 return (self.extract_root / relative_path).expanduser().resolve()
112 def get_extract_dir_for(self, path: Path) -> Path:
113 return self._get_output_path(path.with_name(path.name + self.extract_suffix))
115 def get_carve_dir_for(self, path: Path) -> Path:
116 return self._get_output_path(path.with_name(path.name + self.carve_suffix))
119def process_file(
120 config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None
121) -> ProcessResult:
122 task = Task(
123 blob_id="",
124 path=input_path,
125 depth=0,
126 )
128 if not input_path.is_file():
129 raise ValueError("input_path is not a file", input_path)
131 extract_dir = config.get_extract_dir_for(input_path)
132 if config.force_extract and extract_dir.exists():
133 logger.info("Removing extract dir", path=extract_dir)
134 shutil.rmtree(extract_dir)
136 carve_dir = config.get_carve_dir_for(input_path)
137 if config.force_extract and carve_dir.exists():
138 logger.info("Removing carve dir", path=carve_dir)
139 shutil.rmtree(carve_dir)
141 if not prepare_report_file(config, report_file):
142 logger.error(
143 "File not processed, as report could not be written", file=input_path
144 )
145 return ProcessResult()
147 process_result = _process_task(config, task)
149 if report_file:
150 write_json_report(report_file, process_result)
152 return process_result
155def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult:
156 processor = Processor(config)
157 aggregated_result = ProcessResult()
159 progress_reporter = config.progress_reporter()
161 def process_result(pool, result):
162 progress_reporter.update(result)
164 for new_task in result.subtasks:
165 pool.submit(new_task)
166 aggregated_result.register(result)
168 pool = make_pool(
169 process_num=config.process_num,
170 handler=processor.process_task,
171 result_callback=process_result,
172 )
174 with pool, progress_reporter:
175 pool.submit(task)
176 pool.process_until_done()
178 return aggregated_result
181def prepare_report_file(config: ExtractionConfig, report_file: Optional[Path]) -> bool:
182 """Prevent report writing failing after an expensive extraction.
184 Should be called before processing tasks.
186 Returns True if there is no foreseen problem,
187 False if report writing is known in advance to fail.
188 """
189 if not report_file:
190 # we will not write report at all
191 return True
193 if report_file.exists():
194 if config.force_extract:
195 logger.warning("Overwriting existing report file", path=report_file)
196 try:
197 report_file.write_text("")
198 except OSError as e:
199 logger.error(
200 "Can not overwrite existing report file",
201 path=report_file,
202 msg=str(e),
203 )
204 return False
205 else:
206 logger.error(
207 "Report file exists and --force not specified", path=report_file
208 )
209 return False
210 if not report_file.parent.exists():
211 logger.error(
212 "Trying to write report file to a non-existent directory", path=report_file
213 )
214 return False
215 return True
218def write_json_report(report_file: Path, process_result: ProcessResult):
219 try:
220 report_file.write_text(process_result.to_json())
221 except OSError as e:
222 logger.error("Can not write JSON report", path=report_file, msg=str(e))
223 except Exception:
224 logger.exception("Can not write JSON report", path=report_file)
225 else:
226 logger.info("JSON report written", path=report_file)
229class Processor:
230 def __init__(self, config: ExtractionConfig):
231 self._config = config
232 # libmagic helpers
233 # file magic uses a rule-set to guess the file type, however as rules are added they could
234 # shadow each other. File magic uses rule priorities to determine which is the best matching
235 # rule, however this could shadow other valid matches as well, which could eventually break
236 # any further processing that depends on magic.
237 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns
238 # will be included in the magic string at the cost of being a bit slower, but increasing
239 # accuracy by no shadowing rules.
240 self._get_magic = magic.Magic(keep_going=True).from_file
241 self._get_mime_type = magic.Magic(mime=True).from_file
243 def process_task(self, task: Task) -> TaskResult:
244 result = TaskResult(task=task)
245 try:
246 self._process_task(result, task)
247 except Exception as exc:
248 self._process_error(result, exc)
249 return result
251 def _process_error(self, result: TaskResult, exc: Exception):
252 error_report = UnknownError(exception=exc)
253 result.add_report(error_report)
254 logger.exception("Unknown error happened", exc_info=exc)
256 def _process_task(self, result: TaskResult, task: Task):
257 stat_report = StatReport.from_path(task.path)
258 result.add_report(stat_report)
259 log = logger.bind(path=task.path)
261 if task.depth >= self._config.max_depth:
262 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached.
263 log.debug(
264 "Reached maximum depth, stop further processing", depth=task.depth
265 )
266 return
268 if stat_report.is_dir:
269 if not task.is_multi_file:
270 _DirectoryTask(self._config, task, result).process()
271 return
273 if not stat_report.is_file:
274 log.debug(
275 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)."
276 )
277 return
279 magic = self._get_magic(task.path)
280 mime_type = self._get_mime_type(task.path)
281 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2)
283 magic_report = FileMagicReport(magic=magic, mime_type=mime_type)
284 result.add_report(magic_report)
286 hash_report = HashReport.from_path(task.path)
287 result.add_report(hash_report)
289 if task.is_multi_file:
290 # The file has been processed as part of a MultiFile, we just run the task to gather reports
291 return
293 if stat_report.size == 0:
294 log.debug("Ignoring empty file")
295 return
297 should_skip_file = any(
298 magic.startswith(pattern) for pattern in self._config.skip_magic
299 )
300 should_skip_file |= task.path.suffix in self._config.skip_extension
302 if should_skip_file:
303 log.debug(
304 "Ignoring file based on magic or extension.",
305 magic=magic,
306 extension=task.path.suffix,
307 )
308 return
310 _FileTask(self._config, task, stat_report.size, result).process()
313class DirectoryProcessingError(Exception):
314 def __init__(self, message: str, report: Report):
315 super().__init__()
316 self.message = message
317 self.report: Report = report
320class _DirectoryTask:
321 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult):
322 self.config = config
323 self.dir_task = dir_task
324 self.result = result
326 def process(self):
327 logger.debug("Processing directory", path=self.dir_task.path)
329 try:
330 processed_paths, extract_dirs = self._process_directory()
331 except DirectoryProcessingError as e:
332 logger.error(e.message, report=e.report)
333 self.result.add_report(e.report)
334 return
336 self._iterate_directory(extract_dirs, processed_paths)
338 self._iterate_processed_files(processed_paths)
340 def _process_directory(self) -> tuple[set[Path], set[Path]]:
341 processed_paths: set[Path] = set()
342 extract_dirs: set[Path] = set()
343 for dir_handler_class in self.config.dir_handlers:
344 dir_handler = dir_handler_class()
346 for path in dir_handler.PATTERN.get_files(self.dir_task.path):
347 multi_file = self._calculate_multifile(dir_handler, path, self.result)
349 if multi_file is None:
350 continue
352 multi_file.handler = dir_handler
354 self._check_conflicting_files(multi_file, processed_paths)
356 extract_dir = self._extract_multi_file(multi_file)
358 # Process files in extracted directory
359 if extract_dir.exists():
360 self.result.add_subtask(
361 Task(
362 blob_id=multi_file.id,
363 path=extract_dir,
364 depth=self.dir_task.depth + 1,
365 )
366 )
367 extract_dirs.add(extract_dir)
369 processed_paths.update(multi_file.paths)
370 return processed_paths, extract_dirs
372 @staticmethod
373 def _calculate_multifile(
374 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult
375 ) -> Optional[MultiFile]:
376 try:
377 return dir_handler.calculate_multifile(path)
378 except InvalidInputFormat as exc:
379 logger.debug(
380 "Invalid MultiFile format",
381 exc_info=exc,
382 handler=dir_handler.NAME,
383 path=path,
384 _verbosity=2,
385 )
386 except Exception as exc:
387 error_report = CalculateMultiFileExceptionReport(
388 handler=dir_handler.NAME,
389 exception=exc,
390 path=path,
391 )
392 task_result.add_report(error_report)
393 logger.warning(
394 "Unhandled Exception during multi file calculation",
395 **error_report.model_dump(),
396 )
398 def _check_conflicting_files(
399 self, multi_file: MultiFile, processed_paths: set[Path]
400 ):
401 conflicting_paths = processed_paths.intersection(set(multi_file.paths))
402 if conflicting_paths:
403 raise DirectoryProcessingError(
404 "Conflicting match on files",
405 report=MultiFileCollisionReport(
406 paths=conflicting_paths, handler=multi_file.handler.NAME
407 ),
408 )
410 def _extract_multi_file(self, multi_file: MultiFile) -> Path:
411 extract_dir = self.config.get_extract_dir_for(
412 self.dir_task.path / multi_file.name
413 )
414 if extract_dir.exists():
415 raise DirectoryProcessingError(
416 "Skipped: extraction directory exists",
417 report=multi_file.as_report(
418 [OutputDirectoryExistsReport(path=extract_dir)]
419 ),
420 )
422 extraction_reports = []
423 try:
424 if result := multi_file.extract(extract_dir):
425 extraction_reports.extend(result.reports)
426 except ExtractError as e:
427 extraction_reports.extend(e.reports)
428 except Exception as exc:
429 logger.exception("Unknown error happened while extracting MultiFile")
430 extraction_reports.append(UnknownError(exception=exc))
432 self.result.add_report(multi_file.as_report(extraction_reports))
434 fix_extracted_directory(extract_dir, self.result)
436 return extract_dir
438 def _iterate_processed_files(self, processed_paths):
439 for path in processed_paths:
440 self.result.add_subtask(
441 Task(
442 blob_id=self.dir_task.blob_id,
443 path=path,
444 depth=self.dir_task.depth,
445 is_multi_file=True,
446 )
447 )
449 def _iterate_directory(self, extract_dirs, processed_paths):
450 for path in self.dir_task.path.iterdir():
451 if path in extract_dirs or path in processed_paths:
452 continue
454 self.result.add_subtask(
455 Task(
456 blob_id=self.dir_task.blob_id,
457 path=path,
458 depth=self.dir_task.depth,
459 )
460 )
463def is_padding(file: File, chunk: UnknownChunk):
464 chunk_bytes = set()
466 for small_chunk in iterate_file(
467 file, chunk.start_offset, chunk.end_offset - chunk.start_offset
468 ):
469 chunk_bytes.update(small_chunk)
471 # early return optimization
472 if len(chunk_bytes) > 1:
473 return False
475 return len(chunk_bytes) == 1
478def process_patterns(
479 unknown_chunks: list[UnknownChunk], file: File
480) -> list[Union[UnknownChunk, PaddingChunk]]:
481 processed_chunks = []
482 for unknown_chunk in unknown_chunks:
483 if is_padding(file, unknown_chunk):
484 processed_chunks.append(
485 PaddingChunk(
486 start_offset=unknown_chunk.start_offset,
487 end_offset=unknown_chunk.end_offset,
488 id=unknown_chunk.id,
489 file=unknown_chunk.file,
490 )
491 )
492 else:
493 processed_chunks.append(unknown_chunk)
494 return processed_chunks
497class _FileTask:
498 def __init__(
499 self,
500 config: ExtractionConfig,
501 task: Task,
502 size: int,
503 result: TaskResult,
504 ):
505 self.config = config
506 self.task = task
507 self.size = size
508 self.result = result
510 def process(self):
511 logger.debug("Processing file", path=self.task.path, size=self.size)
513 with File.from_path(self.task.path) as file:
514 all_chunks = search_chunks(
515 file, self.size, self.config.handlers, self.result
516 )
517 outer_chunks = remove_inner_chunks(all_chunks)
518 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
519 unknown_chunks = process_patterns(unknown_chunks, file)
520 assign_file_to_chunks(outer_chunks, file=file)
521 assign_file_to_chunks(unknown_chunks, file=file)
523 if outer_chunks or unknown_chunks:
524 self._process_chunks(file, outer_chunks, unknown_chunks)
525 else:
526 # we don't consider whole files as unknown chunks, but we still want to
527 # calculate randomness for whole files which produced no valid chunks
528 randomness = self._calculate_randomness(self.task.path)
529 if randomness:
530 self.result.add_report(randomness)
532 def _process_chunks(
533 self,
534 file: File,
535 outer_chunks: list[ValidChunk],
536 unknown_chunks: list[Union[UnknownChunk, PaddingChunk]],
537 ):
538 if unknown_chunks:
539 logger.warning("Found unknown Chunks", chunks=unknown_chunks)
541 if self.config.skip_extraction:
542 for chunk in unknown_chunks:
543 self.result.add_report(chunk.as_report(randomness=None))
544 for chunk in outer_chunks:
545 self.result.add_report(chunk.as_report(extraction_reports=[]))
546 return
548 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1
549 if is_whole_file_chunk:
550 # skip carving, extract directly the whole file (chunk)
551 carved_path = self.task.path
552 for chunk in outer_chunks:
553 self._extract_chunk(
554 carved_path,
555 chunk,
556 self.config.get_extract_dir_for(carved_path),
557 # since we do not carve, we want to keep the input around
558 remove_extracted_input=False,
559 )
560 else:
561 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks)
563 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks):
564 assert not self.config.skip_extraction
566 carve_dir = self.config.get_carve_dir_for(self.task.path)
568 # report the technical carve directory explicitly
569 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir))
571 if carve_dir.exists():
572 # Carve directory is not supposed to exist, it is usually a simple mistake of running
573 # unblob again without cleaning up or using --force.
574 # It would cause problems continuing, as it would mix up original and extracted files,
575 # and it would just introduce weird, non-deterministic problems due to interference on paths
576 # by multiple workers (parallel processing, modifying content (fix_symlink),
577 # and `mmap` + open for write with O_TRUNC).
578 logger.error("Skipped: carve directory exists", carve_dir=carve_dir)
579 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir))
580 return
582 for chunk in unknown_chunks:
583 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk)
584 randomness = self._calculate_randomness(carved_unknown_path)
585 self.result.add_report(chunk.as_report(randomness=randomness))
587 for chunk in outer_chunks:
588 carved_path = carve_valid_chunk(carve_dir, file, chunk)
590 self._extract_chunk(
591 carved_path,
592 chunk,
593 self.config.get_extract_dir_for(carved_path),
594 # when a carved chunk is successfully extracted, usually
595 # we want to get rid of it, as its data is available in
596 # extracted format, and the raw data is still part of
597 # the file the chunk belongs to
598 remove_extracted_input=not self.config.keep_extracted_chunks,
599 )
601 def _calculate_randomness(self, path: Path) -> Optional[RandomnessReport]:
602 if self.task.depth < self.config.randomness_depth:
603 report = calculate_randomness(path)
604 if self.config.randomness_plot:
605 logger.debug(
606 "Randomness chart",
607 # New line so that chart title will be aligned correctly in the next line
608 chart="\n" + format_randomness_plot(report),
609 path=path,
610 _verbosity=3,
611 )
612 return report
613 return None
615 def _extract_chunk(
616 self,
617 carved_path: Path,
618 chunk: ValidChunk,
619 extract_dir: Path,
620 *,
621 remove_extracted_input: bool,
622 ):
623 if extract_dir.exists():
624 # Extraction directory is not supposed to exist, it mixes up original and extracted files,
625 # and it would just introduce weird, non-deterministic problems due to interference on paths
626 # by multiple workers (parallel processing, modifying content (fix_symlink),
627 # and `mmap` + open for write with O_TRUNC).
628 logger.error(
629 "Skipped: extraction directory exists",
630 extract_dir=extract_dir,
631 chunk=chunk,
632 )
633 self.result.add_report(
634 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)])
635 )
636 return
638 if self.config.skip_extraction:
639 fix_extracted_directory(extract_dir, self.result)
640 return
642 extraction_reports = []
643 try:
644 if result := chunk.extract(carved_path, extract_dir):
645 extraction_reports.extend(result.reports)
647 if remove_extracted_input:
648 logger.debug("Removing extracted chunk", path=carved_path)
649 carved_path.unlink()
651 except ExtractError as e:
652 extraction_reports.extend(e.reports)
653 except Exception as exc:
654 logger.exception("Unknown error happened while extracting chunk")
655 extraction_reports.append(UnknownError(exception=exc))
657 self.result.add_report(chunk.as_report(extraction_reports))
659 # we want to get consistent partial output even in case of unforeseen problems
660 fix_extracted_directory(extract_dir, self.result)
661 delete_empty_extract_dir(extract_dir)
663 if extract_dir.exists():
664 self.result.add_subtask(
665 Task(
666 blob_id=chunk.id,
667 path=extract_dir,
668 depth=self.task.depth + 1,
669 )
670 )
673def assign_file_to_chunks(chunks: Sequence[Chunk], file: File):
674 for chunk in chunks:
675 assert chunk.file is None
676 chunk.file = file
679def delete_empty_extract_dir(extract_dir: Path):
680 if extract_dir.exists() and not any(extract_dir.iterdir()):
681 extract_dir.rmdir()
684def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]:
685 """Remove all chunks from the list which are within another bigger chunks."""
686 if not chunks:
687 return []
689 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True)
690 outer_chunks = [chunks_by_size[0]]
691 for chunk in chunks_by_size[1:]:
692 if not any(outer.contains(chunk) for outer in outer_chunks):
693 outer_chunks.append(chunk)
695 outer_count = len(outer_chunks)
696 removed_count = len(chunks) - outer_count
697 logger.debug(
698 "Removed inner chunks",
699 outer_chunk_count=noformat(outer_count),
700 removed_inner_chunk_count=noformat(removed_count),
701 _verbosity=2,
702 )
703 return outer_chunks
706def calculate_unknown_chunks(
707 chunks: list[ValidChunk], file_size: int
708) -> list[UnknownChunk]:
709 """Calculate the empty gaps between chunks."""
710 if not chunks or file_size == 0:
711 return []
713 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset"))
715 unknown_chunks = []
717 first = sorted_by_offset[0]
718 if first.start_offset != 0:
719 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset)
720 unknown_chunks.append(unknown_chunk)
722 for chunk, next_chunk in pairwise(sorted_by_offset):
723 diff = next_chunk.start_offset - chunk.end_offset
724 if diff != 0:
725 unknown_chunk = UnknownChunk(
726 start_offset=chunk.end_offset,
727 end_offset=next_chunk.start_offset,
728 )
729 unknown_chunks.append(unknown_chunk)
731 last = sorted_by_offset[-1]
732 if last.end_offset < file_size:
733 unknown_chunk = UnknownChunk(
734 start_offset=last.end_offset,
735 end_offset=file_size,
736 )
737 unknown_chunks.append(unknown_chunk)
739 return unknown_chunks
742def calculate_randomness(path: Path) -> RandomnessReport:
743 """Calculate and log shannon entropy divided by 8 for the file in chunks.
745 Shannon entropy returns the amount of information (in bits) of some numeric
746 sequence. We calculate the average entropy of byte chunks, which in theory
747 can contain 0-8 bits of entropy. We normalize it for visualization to a
748 0-100% scale, to make it easier to interpret the graph.
750 The chi square distribution is calculated for the stream of bytes in the
751 chunk and expressed as an absolute number and a percentage which indicates
752 how frequently a truly random sequence would exceed the value calculated.
753 """
754 shannon_percentages = []
755 chi_square_percentages = []
757 # We could use the chunk size instead of another syscall,
758 # but we rely on the actual file size written to the disk
759 file_size = path.stat().st_size
760 logger.debug("Calculating entropy for file", path=path, size=file_size)
762 # Smaller chunk size would be very slow to calculate.
763 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file.
764 block_size = calculate_block_size(
765 file_size,
766 chunk_count=80,
767 min_limit=1024,
768 max_limit=1024 * 1024,
769 )
771 shannon_entropy_sum = 0.0
772 chisquare_probability_sum = 0.0
773 with File.from_path(path) as file:
774 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size):
775 shannon_entropy = mt.shannon_entropy(chunk)
776 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2)
777 shannon_percentages.append(shannon_entropy_percentage)
778 shannon_entropy_sum += shannon_entropy * len(chunk)
780 chi_square_probability = mt.chi_square_probability(chunk)
781 chisquare_probability_percentage = round(chi_square_probability * 100, 2)
782 chi_square_percentages.append(chisquare_probability_percentage)
783 chisquare_probability_sum += chi_square_probability * len(chunk)
785 report = RandomnessReport(
786 shannon=RandomnessMeasurements(
787 percentages=shannon_percentages,
788 block_size=block_size,
789 mean=shannon_entropy_sum / file_size / 8 * 100,
790 ),
791 chi_square=RandomnessMeasurements(
792 percentages=chi_square_percentages,
793 block_size=block_size,
794 mean=chisquare_probability_sum / file_size * 100,
795 ),
796 )
798 logger.debug(
799 "Shannon entropy calculated",
800 path=path,
801 size=file_size,
802 block_size=report.shannon.block_size,
803 mean=round(report.shannon.mean, 2),
804 highest=round(report.shannon.highest, 2),
805 lowest=round(report.shannon.lowest, 2),
806 )
807 logger.debug(
808 "Chi square probability calculated",
809 path=path,
810 size=file_size,
811 block_size=report.chi_square.block_size,
812 mean=round(report.chi_square.mean, 2),
813 highest=round(report.chi_square.highest, 2),
814 lowest=round(report.chi_square.lowest, 2),
815 )
817 return report
820def calculate_block_size(
821 file_size, *, chunk_count: int, min_limit: int, max_limit: int
822) -> int:
823 """Split the file into even sized chunks, limited by lower and upper values."""
824 # We don't care about floating point precision here
825 block_size = file_size // chunk_count
826 block_size = max(min_limit, block_size)
827 block_size = min(block_size, max_limit)
828 return block_size # noqa: RET504
831def format_randomness_plot(report: RandomnessReport):
832 # start from scratch
833 plt.clear_figure()
834 # go colorless
835 plt.clear_color()
836 plt.title("Entropy distribution")
837 plt.xlabel(f"{report.shannon.block_size} bytes")
839 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot")
840 plt.plot(
841 report.chi_square.percentages,
842 label="Chi square probability (%)",
843 marker="cross",
844 )
845 # 16 height leaves no gaps between the lines
846 plt.plot_size(100, 16)
847 plt.ylim(0, 100)
848 # Draw ticks every 1Mb on the x axis.
849 plt.xticks(range(len(report.shannon.percentages) + 1))
850 # Always show 0% and 100%
851 plt.yticks(range(0, 101, 10))
853 return plt.build()