Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/processing.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import enum
2import multiprocessing
3import shutil
4from collections.abc import Iterable, Sequence
5from operator import attrgetter
6from pathlib import Path
8import attrs
9import magic
10import plotext as plt
11from structlog import get_logger
13from unblob import math_tools as mt
14from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers
16from .extractor import carve_unknown_chunk, carve_valid_chunk, fix_extracted_directory
17from .file_utils import InvalidInputFormat, iterate_file
18from .finder import search_chunks
19from .iter_utils import pairwise
20from .logging import noformat
21from .models import (
22 Chunk,
23 DirectoryHandler,
24 DirectoryHandlers,
25 ExtractError,
26 File,
27 MultiFile,
28 PaddingChunk,
29 ProcessResult,
30 Task,
31 TaskResult,
32 UnknownChunk,
33 ValidChunk,
34)
35from .pool import make_pool
36from .report import (
37 CalculateMultiFileExceptionReport,
38 CarveDirectoryReport,
39 ErrorReport,
40 ExtractedFileDeletedReport,
41 FileMagicReport,
42 HashReport,
43 MultiFileCollisionReport,
44 OutputDirectoryExistsReport,
45 RandomnessMeasurements,
46 RandomnessReport,
47 Report,
48 StatReport,
49 UnknownError,
50)
51from .ui import NullProgressReporter, ProgressReporter
53logger = get_logger()
55DEFAULT_DEPTH = 10
56DEFAULT_PROCESS_NUM = multiprocessing.cpu_count()
57DEFAULT_SKIP_MAGIC = (
58 "BFLT",
59 "Erlang BEAM file",
60 "GIF",
61 "GNU message catalog",
62 "HP Printer Job Language",
63 "JPEG",
64 "Java module image",
65 "MPEG",
66 "MS Windows icon resource",
67 "Macromedia Flash data",
68 "Microsoft Excel",
69 "Microsoft PowerPoint",
70 "Microsoft Word",
71 "OpenDocument",
72 "PDF document",
73 "PNG",
74 "SQLite",
75 "TrueType Font data",
76 "Web Open Font Format",
77 "Windows Embedded CE binary image",
78 "Xilinx BIT data",
79 "compiled Java class",
80 "magic binary file",
81 "python", # # (e.g. python 2.7 byte-compiled)
82)
83DEFAULT_SKIP_EXTENSION = (".rlib",)
86class ExtractedFileDeletionMode(enum.Enum):
87 NONE = "none"
88 SELECTED = "selected"
89 ALL = "all"
92@attrs.define(kw_only=True)
93class ExtractionConfig:
94 extract_root: Path = attrs.field(converter=lambda value: value.resolve())
95 force_extract: bool = False
96 randomness_depth: int
97 randomness_plot: bool = False
98 max_depth: int = DEFAULT_DEPTH
99 skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC
100 skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION
101 skip_extraction: bool = False
102 process_num: int = DEFAULT_PROCESS_NUM
103 keep_extracted_chunks: bool = False
104 extract_suffix: str = "_extract"
105 carve_suffix: str = "_extract"
106 handlers: Handlers = BUILTIN_HANDLERS
107 dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS
108 verbose: int = 1
109 progress_reporter: type[ProgressReporter] = NullProgressReporter
110 extracted_file_deletion: ExtractedFileDeletionMode = ExtractedFileDeletionMode.NONE
111 extracted_file_handler_filter: Iterable[str] = attrs.field(
112 default=(),
113 converter=lambda values: tuple(str(value) for value in values),
114 )
116 def _get_output_path(self, path: Path) -> Path:
117 """Return path under extract root."""
118 try:
119 relative_path = path.relative_to(self.extract_root)
120 except ValueError:
121 # path is not inside root, i.e. it is an input file
122 relative_path = Path(path.name)
123 return (self.extract_root / relative_path).expanduser().resolve()
125 def get_extract_dir_for(self, path: Path) -> Path:
126 return self._get_output_path(path.with_name(path.name + self.extract_suffix))
128 def get_carve_dir_for(self, path: Path) -> Path:
129 return self._get_output_path(path.with_name(path.name + self.carve_suffix))
132def process_file(
133 config: ExtractionConfig, input_path: Path, report_file: Path | None = None
134) -> ProcessResult:
135 task = Task(
136 blob_id="",
137 path=input_path,
138 depth=0,
139 )
141 if not input_path.is_file():
142 raise ValueError("input_path is not a file", input_path)
144 extract_dir = config.get_extract_dir_for(input_path)
145 if config.force_extract and extract_dir.exists():
146 logger.info("Removing extract dir", path=extract_dir)
147 shutil.rmtree(extract_dir)
149 carve_dir = config.get_carve_dir_for(input_path)
150 if config.force_extract and carve_dir.exists():
151 logger.info("Removing carve dir", path=carve_dir)
152 shutil.rmtree(carve_dir)
154 if not prepare_report_file(config, report_file):
155 logger.error(
156 "File not processed, as report could not be written", file=input_path
157 )
158 return ProcessResult()
160 process_result = _process_task(config, task)
162 if report_file:
163 write_json_report(report_file, process_result)
165 return process_result
168def _process_task(config: ExtractionConfig, task: Task) -> ProcessResult:
169 processor = Processor(config)
170 aggregated_result = ProcessResult()
172 progress_reporter = config.progress_reporter()
174 def process_result(pool, result):
175 progress_reporter.update(result)
177 for new_task in result.subtasks:
178 pool.submit(new_task)
179 aggregated_result.register(result)
181 pool = make_pool(
182 process_num=config.process_num,
183 handler=processor.process_task,
184 result_callback=process_result,
185 )
187 with pool, progress_reporter:
188 pool.submit(task)
189 pool.process_until_done()
191 return aggregated_result
194def prepare_report_file(config: ExtractionConfig, report_file: Path | None) -> bool:
195 """Prevent report writing failing after an expensive extraction.
197 Should be called before processing tasks.
199 Returns True if there is no foreseen problem,
200 False if report writing is known in advance to fail.
201 """
202 if not report_file:
203 # we will not write report at all
204 return True
206 if report_file.exists():
207 if config.force_extract:
208 logger.warning("Overwriting existing report file", path=report_file)
209 try:
210 report_file.write_text("")
211 except OSError as e:
212 logger.error(
213 "Can not overwrite existing report file",
214 path=report_file,
215 msg=str(e),
216 )
217 return False
218 else:
219 logger.error(
220 "Report file exists and --force not specified", path=report_file
221 )
222 return False
223 if not report_file.parent.exists():
224 logger.error(
225 "Trying to write report file to a non-existent directory", path=report_file
226 )
227 return False
228 return True
231def write_json_report(report_file: Path, process_result: ProcessResult):
232 try:
233 report_file.write_text(process_result.to_json())
234 except OSError as e:
235 logger.error("Can not write JSON report", path=report_file, msg=str(e))
236 except Exception:
237 logger.exception("Can not write JSON report", path=report_file)
238 else:
239 logger.info("JSON report written", path=report_file)
242class Processor:
243 def __init__(self, config: ExtractionConfig):
244 self._config = config
245 # libmagic helpers
246 # file magic uses a rule-set to guess the file type, however as rules are added they could
247 # shadow each other. File magic uses rule priorities to determine which is the best matching
248 # rule, however this could shadow other valid matches as well, which could eventually break
249 # any further processing that depends on magic.
250 # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns
251 # will be included in the magic string at the cost of being a bit slower, but increasing
252 # accuracy by no shadowing rules.
253 self._get_magic = magic.Magic(keep_going=True).from_file
254 self._get_mime_type = magic.Magic(mime=True).from_file
256 def process_task(self, task: Task) -> TaskResult:
257 result = TaskResult(task=task)
258 try:
259 self._process_task(result, task)
260 except Exception as exc:
261 self._process_error(result, exc)
262 return result
264 def _process_error(self, result: TaskResult, exc: Exception):
265 error_report = UnknownError(exception=exc)
266 result.add_report(error_report)
267 logger.exception("Unknown error happened", exc_info=exc)
269 def _process_task(self, result: TaskResult, task: Task):
270 stat_report = StatReport.from_path(task.path)
271 result.add_report(stat_report)
272 log = logger.bind(path=task.path)
274 if task.depth >= self._config.max_depth:
275 # TODO: Use the reporting feature to warn the user (ONLY ONCE) at the end of execution, that this limit was reached.
276 log.debug(
277 "Reached maximum depth, stop further processing", depth=task.depth
278 )
279 return
281 if stat_report.is_dir:
282 if not task.is_multi_file:
283 _DirectoryTask(self._config, task, result).process()
284 return
286 if not stat_report.is_file:
287 log.debug(
288 "Ignoring special file (link, chrdev, blkdev, fifo, socket, door)."
289 )
290 return
292 magic = self._get_magic(task.path)
293 mime_type = self._get_mime_type(task.path)
294 logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2)
296 magic_report = FileMagicReport(magic=magic, mime_type=mime_type)
297 result.add_report(magic_report)
299 hash_report = HashReport.from_path(task.path)
300 result.add_report(hash_report)
302 if task.is_multi_file:
303 # The file has been processed as part of a MultiFile, we just run the task to gather reports
304 return
306 if stat_report.size == 0:
307 log.debug("Ignoring empty file")
308 return
310 should_skip_file = any(
311 magic.startswith(pattern) for pattern in self._config.skip_magic
312 )
313 should_skip_file |= task.path.suffix in self._config.skip_extension
315 if should_skip_file:
316 log.debug(
317 "Ignoring file based on magic or extension.",
318 magic=magic,
319 extension=task.path.suffix,
320 )
321 return
323 _FileTask(self._config, task, stat_report.size, result).process()
326class DirectoryProcessingError(Exception):
327 def __init__(self, message: str, report: Report):
328 super().__init__()
329 self.message = message
330 self.report: Report = report
333class _DirectoryTask:
334 def __init__(self, config: ExtractionConfig, dir_task: Task, result: TaskResult):
335 self.config = config
336 self.dir_task = dir_task
337 self.result = result
339 def process(self):
340 logger.debug("Processing directory", path=self.dir_task.path)
342 try:
343 processed_paths, extract_dirs = self._process_directory()
344 except DirectoryProcessingError as e:
345 logger.error(e.message, report=e.report)
346 self.result.add_report(e.report)
347 return
349 self._iterate_directory(extract_dirs, processed_paths)
351 self._iterate_processed_files(processed_paths)
353 def _process_directory(self) -> tuple[set[Path], set[Path]]:
354 processed_paths: set[Path] = set()
355 extract_dirs: set[Path] = set()
356 for dir_handler_class in self.config.dir_handlers:
357 dir_handler = dir_handler_class()
359 for path in dir_handler.PATTERN.get_files(self.dir_task.path):
360 multi_file = self._calculate_multifile(dir_handler, path, self.result)
362 if multi_file is None:
363 continue
365 multi_file.handler = dir_handler
367 self._check_conflicting_files(multi_file, processed_paths)
369 extract_dir = self._extract_multi_file(multi_file)
371 # Process files in extracted directory
372 if extract_dir.exists():
373 self.result.add_subtask(
374 Task(
375 blob_id=multi_file.id,
376 path=extract_dir,
377 depth=self.dir_task.depth + 1,
378 )
379 )
380 extract_dirs.add(extract_dir)
382 processed_paths.update(multi_file.paths)
383 return processed_paths, extract_dirs
385 @staticmethod
386 def _calculate_multifile(
387 dir_handler: DirectoryHandler, path: Path, task_result: TaskResult
388 ) -> MultiFile | None:
389 try:
390 return dir_handler.calculate_multifile(path)
391 except InvalidInputFormat as exc:
392 logger.debug(
393 "Invalid MultiFile format",
394 exc_info=exc,
395 handler=dir_handler.NAME,
396 path=path,
397 _verbosity=2,
398 )
399 except Exception as exc:
400 error_report = CalculateMultiFileExceptionReport(
401 handler=dir_handler.NAME,
402 exception=exc,
403 path=path,
404 )
405 task_result.add_report(error_report)
406 logger.warning(
407 "Unhandled Exception during multi file calculation",
408 **error_report.model_dump(),
409 )
411 def _check_conflicting_files(
412 self, multi_file: MultiFile, processed_paths: set[Path]
413 ):
414 conflicting_paths = processed_paths.intersection(set(multi_file.paths))
415 if conflicting_paths:
416 raise DirectoryProcessingError(
417 "Conflicting match on files",
418 report=MultiFileCollisionReport(
419 paths=conflicting_paths, handler=multi_file.handler.NAME
420 ),
421 )
423 def _extract_multi_file(self, multi_file: MultiFile) -> Path:
424 extract_dir = self.config.get_extract_dir_for(
425 self.dir_task.path / multi_file.name
426 )
427 if extract_dir.exists():
428 raise DirectoryProcessingError(
429 "Skipped: extraction directory exists",
430 report=multi_file.as_report(
431 [OutputDirectoryExistsReport(path=extract_dir)]
432 ),
433 )
435 extraction_reports = []
436 try:
437 if result := multi_file.extract(extract_dir):
438 extraction_reports.extend(result.reports)
439 except ExtractError as e:
440 extraction_reports.extend(e.reports)
441 except Exception as exc:
442 logger.exception("Unknown error happened while extracting MultiFile")
443 extraction_reports.append(UnknownError(exception=exc))
445 self.result.add_report(multi_file.as_report(extraction_reports))
447 fix_extracted_directory(extract_dir, self.result)
449 return extract_dir
451 def _iterate_processed_files(self, processed_paths):
452 for path in processed_paths:
453 self.result.add_subtask(
454 Task(
455 blob_id=self.dir_task.blob_id,
456 path=path,
457 depth=self.dir_task.depth,
458 is_multi_file=True,
459 )
460 )
462 def _iterate_directory(self, extract_dirs, processed_paths):
463 for path in self.dir_task.path.iterdir():
464 if path in extract_dirs or path in processed_paths:
465 continue
467 self.result.add_subtask(
468 Task(
469 blob_id=self.dir_task.blob_id,
470 path=path,
471 depth=self.dir_task.depth,
472 )
473 )
476def is_padding(file: File, chunk: UnknownChunk):
477 chunk_bytes = set()
479 for small_chunk in iterate_file(
480 file, chunk.start_offset, chunk.end_offset - chunk.start_offset
481 ):
482 chunk_bytes.update(small_chunk)
484 # early return optimization
485 if len(chunk_bytes) > 1:
486 return False
488 return len(chunk_bytes) == 1
491def process_patterns(
492 unknown_chunks: list[UnknownChunk], file: File
493) -> list[UnknownChunk | PaddingChunk]:
494 processed_chunks = []
495 for unknown_chunk in unknown_chunks:
496 if is_padding(file, unknown_chunk):
497 processed_chunks.append(
498 PaddingChunk(
499 start_offset=unknown_chunk.start_offset,
500 end_offset=unknown_chunk.end_offset,
501 id=unknown_chunk.id,
502 file=unknown_chunk.file,
503 )
504 )
505 else:
506 processed_chunks.append(unknown_chunk)
507 return processed_chunks
510class _FileTask:
511 def __init__(
512 self,
513 config: ExtractionConfig,
514 task: Task,
515 size: int,
516 result: TaskResult,
517 ):
518 self.config = config
519 self.task = task
520 self.size = size
521 self.result = result
523 def process(self):
524 logger.debug("Processing file", path=self.task.path, size=self.size)
526 with File.from_path(self.task.path) as file:
527 all_chunks = search_chunks(
528 file, self.size, self.config.handlers, self.result
529 )
530 outer_chunks = remove_inner_chunks(all_chunks)
531 unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
532 unknown_chunks = process_patterns(unknown_chunks, file)
533 assign_file_to_chunks(outer_chunks, file=file)
534 assign_file_to_chunks(unknown_chunks, file=file)
536 if outer_chunks or unknown_chunks:
537 self._process_chunks(file, outer_chunks, unknown_chunks)
538 else:
539 # we don't consider whole files as unknown chunks, but we still want to
540 # calculate randomness for whole files which produced no valid chunks
541 randomness = self._calculate_randomness(self.task.path)
542 if randomness:
543 self.result.add_report(randomness)
545 def _process_chunks(
546 self,
547 file: File,
548 outer_chunks: list[ValidChunk],
549 unknown_chunks: list[UnknownChunk | PaddingChunk],
550 ):
551 if unknown_chunks:
552 logger.warning("Found unknown Chunks", chunks=unknown_chunks)
554 if self.config.skip_extraction:
555 for chunk in unknown_chunks:
556 self.result.add_report(chunk.as_report(randomness=None))
557 for chunk in outer_chunks:
558 self.result.add_report(chunk.as_report(extraction_reports=[]))
559 return
561 is_whole_file_chunk = len(outer_chunks) + len(unknown_chunks) == 1
562 if is_whole_file_chunk:
563 # skip carving, extract directly the whole file (chunk)
564 carved_path = self.task.path
565 for chunk in outer_chunks:
566 extraction_successful = self._extract_chunk(
567 carved_path,
568 chunk,
569 self.config.get_extract_dir_for(carved_path),
570 # since we do not carve, we want to keep the input around
571 remove_extracted_input=False,
572 )
573 if extraction_successful:
574 self._delete_extracted_file_if_needed(self.task.path, chunk)
575 else:
576 self._carve_then_extract_chunks(file, outer_chunks, unknown_chunks)
578 def _carve_then_extract_chunks(self, file, outer_chunks, unknown_chunks):
579 assert not self.config.skip_extraction
581 carve_dir = self.config.get_carve_dir_for(self.task.path)
583 # report the technical carve directory explicitly
584 self.result.add_report(CarveDirectoryReport(carve_dir=carve_dir))
586 if carve_dir.exists():
587 # Carve directory is not supposed to exist, it is usually a simple mistake of running
588 # unblob again without cleaning up or using --force.
589 # It would cause problems continuing, as it would mix up original and extracted files,
590 # and it would just introduce weird, non-deterministic problems due to interference on paths
591 # by multiple workers (parallel processing, modifying content (fix_symlink),
592 # and `mmap` + open for write with O_TRUNC).
593 logger.error("Skipped: carve directory exists", carve_dir=carve_dir)
594 self.result.add_report(OutputDirectoryExistsReport(path=carve_dir))
595 return
597 for chunk in unknown_chunks:
598 carved_unknown_path = carve_unknown_chunk(carve_dir, file, chunk)
599 randomness = self._calculate_randomness(carved_unknown_path)
600 self.result.add_report(chunk.as_report(randomness=randomness))
602 for chunk in outer_chunks:
603 carved_path = carve_valid_chunk(carve_dir, file, chunk)
604 self._extract_chunk(
605 carved_path,
606 chunk,
607 self.config.get_extract_dir_for(carved_path),
608 # when a carved chunk is successfully extracted, usually
609 # we want to get rid of it, as its data is available in
610 # extracted format, and the raw data is still part of
611 # the file the chunk belongs to
612 remove_extracted_input=not self.config.keep_extracted_chunks,
613 )
615 def _calculate_randomness(self, path: Path) -> RandomnessReport | None:
616 if self.task.depth < self.config.randomness_depth:
617 report = calculate_randomness(path)
618 if self.config.randomness_plot:
619 logger.debug(
620 "Randomness chart",
621 # New line so that chart title will be aligned correctly in the next line
622 chart="\n" + format_randomness_plot(report),
623 path=path,
624 _verbosity=3,
625 )
626 return report
627 return None
629 def _extract_chunk(
630 self,
631 carved_path: Path,
632 chunk: ValidChunk,
633 extract_dir: Path,
634 *,
635 remove_extracted_input: bool,
636 ) -> bool:
637 extraction_successful = False
638 if extract_dir.exists():
639 # Extraction directory is not supposed to exist, it mixes up original and extracted files,
640 # and it would just introduce weird, non-deterministic problems due to interference on paths
641 # by multiple workers (parallel processing, modifying content (fix_symlink),
642 # and `mmap` + open for write with O_TRUNC).
643 logger.error(
644 "Skipped: extraction directory exists",
645 extract_dir=extract_dir,
646 chunk=chunk,
647 )
648 self.result.add_report(
649 chunk.as_report([OutputDirectoryExistsReport(path=extract_dir)])
650 )
651 return False
653 if self.config.skip_extraction:
654 fix_extracted_directory(extract_dir, self.result)
655 return False
657 extraction_reports = []
658 try:
659 if result := chunk.extract(carved_path, extract_dir):
660 extraction_reports.extend(result.reports)
662 if remove_extracted_input:
663 logger.debug("Removing extracted chunk", path=carved_path)
664 carved_path.unlink()
666 except ExtractError as e:
667 extraction_reports.extend(e.reports)
668 except Exception as exc:
669 logger.exception("Unknown error happened while extracting chunk")
670 extraction_reports.append(UnknownError(exception=exc))
672 extraction_successful = not any(
673 isinstance(report, ErrorReport) for report in extraction_reports
674 )
675 self.result.add_report(chunk.as_report(extraction_reports))
677 # we want to get consistent partial output even in case of unforeseen problems
678 fix_extracted_directory(extract_dir, self.result)
679 delete_empty_extract_dir(extract_dir)
681 if extract_dir.exists():
682 self.result.add_subtask(
683 Task(
684 blob_id=chunk.id,
685 path=extract_dir,
686 depth=self.task.depth + 1,
687 )
688 )
689 return extraction_successful
691 def _delete_extracted_file_if_needed(
692 self, delete_candidate_path: Path, chunk: ValidChunk
693 ) -> None:
694 filter_set = set(self.config.extracted_file_handler_filter)
695 if not self._should_delete_extracted_file(chunk, filter_set):
696 return
698 if self.task.depth == 0:
699 return
701 if not delete_candidate_path.exists() or delete_candidate_path.is_dir():
702 return
704 try:
705 delete_candidate_path.unlink()
706 self.result.add_report(
707 ExtractedFileDeletedReport(
708 path=delete_candidate_path,
709 handler_name=chunk.handler.NAME,
710 )
711 )
712 logger.debug(
713 "Removed extracted file after extraction",
714 path=delete_candidate_path,
715 handler=chunk.handler.NAME,
716 )
717 except OSError:
718 logger.warning(
719 "Failed to remove extracted file after extraction",
720 path=delete_candidate_path,
721 )
723 def _should_delete_extracted_file(
724 self, chunk: ValidChunk, filter_set: set[str]
725 ) -> bool:
726 if not chunk.is_whole_file:
727 return False
729 deletion_mode = self.config.extracted_file_deletion
731 if deletion_mode is ExtractedFileDeletionMode.NONE:
732 return False
734 if deletion_mode is ExtractedFileDeletionMode.SELECTED:
735 return chunk.handler.NAME in filter_set
737 return True
740def assign_file_to_chunks(chunks: Sequence[Chunk], file: File):
741 for chunk in chunks:
742 assert chunk.file is None
743 chunk.file = file
746def delete_empty_extract_dir(extract_dir: Path):
747 if extract_dir.exists() and not any(extract_dir.iterdir()):
748 extract_dir.rmdir()
751def remove_inner_chunks(chunks: list[ValidChunk]) -> list[ValidChunk]:
752 """Remove all chunks from the list which are within another bigger chunks."""
753 if not chunks:
754 return []
756 chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True)
757 outer_chunks = [chunks_by_size[0]]
758 for chunk in chunks_by_size[1:]:
759 if not any(outer.contains(chunk) for outer in outer_chunks):
760 outer_chunks.append(chunk)
762 outer_count = len(outer_chunks)
763 removed_count = len(chunks) - outer_count
764 logger.debug(
765 "Removed inner chunks",
766 outer_chunk_count=noformat(outer_count),
767 removed_inner_chunk_count=noformat(removed_count),
768 _verbosity=2,
769 )
770 return outer_chunks
773def calculate_unknown_chunks(
774 chunks: list[ValidChunk], file_size: int
775) -> list[UnknownChunk]:
776 """Calculate the empty gaps between chunks."""
777 if not chunks or file_size == 0:
778 return []
780 sorted_by_offset = sorted(chunks, key=attrgetter("start_offset"))
782 unknown_chunks = []
784 first = sorted_by_offset[0]
785 if first.start_offset != 0:
786 unknown_chunk = UnknownChunk(start_offset=0, end_offset=first.start_offset)
787 unknown_chunks.append(unknown_chunk)
789 for chunk, next_chunk in pairwise(sorted_by_offset):
790 diff = next_chunk.start_offset - chunk.end_offset
791 if diff != 0:
792 unknown_chunk = UnknownChunk(
793 start_offset=chunk.end_offset,
794 end_offset=next_chunk.start_offset,
795 )
796 unknown_chunks.append(unknown_chunk)
798 last = sorted_by_offset[-1]
799 if last.end_offset < file_size:
800 unknown_chunk = UnknownChunk(
801 start_offset=last.end_offset,
802 end_offset=file_size,
803 )
804 unknown_chunks.append(unknown_chunk)
806 return unknown_chunks
809def calculate_randomness(path: Path) -> RandomnessReport:
810 """Calculate and log shannon entropy divided by 8 for the file in chunks.
812 Shannon entropy returns the amount of information (in bits) of some numeric
813 sequence. We calculate the average entropy of byte chunks, which in theory
814 can contain 0-8 bits of entropy. We normalize it for visualization to a
815 0-100% scale, to make it easier to interpret the graph.
817 The chi square distribution is calculated for the stream of bytes in the
818 chunk and expressed as an absolute number and a percentage which indicates
819 how frequently a truly random sequence would exceed the value calculated.
820 """
821 shannon_percentages = []
822 chi_square_percentages = []
824 # We could use the chunk size instead of another syscall,
825 # but we rely on the actual file size written to the disk
826 file_size = path.stat().st_size
827 logger.debug("Calculating entropy for file", path=path, size=file_size)
829 # Smaller chunk size would be very slow to calculate.
830 # 1Mb chunk size takes ~ 3sec for a 4,5 GB file.
831 block_size = calculate_block_size(
832 file_size,
833 chunk_count=80,
834 min_limit=1024,
835 max_limit=1024 * 1024,
836 )
838 shannon_entropy_sum = 0.0
839 chisquare_probability_sum = 0.0
840 with File.from_path(path) as file:
841 for chunk in iterate_file(file, 0, file_size, buffer_size=block_size):
842 shannon_entropy = mt.shannon_entropy(chunk)
843 shannon_entropy_percentage = round(shannon_entropy / 8 * 100, 2)
844 shannon_percentages.append(shannon_entropy_percentage)
845 shannon_entropy_sum += shannon_entropy * len(chunk)
847 chi_square_probability = mt.chi_square_probability(chunk)
848 chisquare_probability_percentage = round(chi_square_probability * 100, 2)
849 chi_square_percentages.append(chisquare_probability_percentage)
850 chisquare_probability_sum += chi_square_probability * len(chunk)
852 report = RandomnessReport(
853 shannon=RandomnessMeasurements(
854 percentages=shannon_percentages,
855 block_size=block_size,
856 mean=shannon_entropy_sum / file_size / 8 * 100,
857 ),
858 chi_square=RandomnessMeasurements(
859 percentages=chi_square_percentages,
860 block_size=block_size,
861 mean=chisquare_probability_sum / file_size * 100,
862 ),
863 )
865 logger.debug(
866 "Shannon entropy calculated",
867 path=path,
868 size=file_size,
869 block_size=report.shannon.block_size,
870 mean=round(report.shannon.mean, 2),
871 highest=round(report.shannon.highest, 2),
872 lowest=round(report.shannon.lowest, 2),
873 )
874 logger.debug(
875 "Chi square probability calculated",
876 path=path,
877 size=file_size,
878 block_size=report.chi_square.block_size,
879 mean=round(report.chi_square.mean, 2),
880 highest=round(report.chi_square.highest, 2),
881 lowest=round(report.chi_square.lowest, 2),
882 )
884 return report
887def calculate_block_size(
888 file_size, *, chunk_count: int, min_limit: int, max_limit: int
889) -> int:
890 """Split the file into even sized chunks, limited by lower and upper values."""
891 # We don't care about floating point precision here
892 block_size = file_size // chunk_count
893 block_size = max(min_limit, block_size)
894 block_size = min(block_size, max_limit)
895 return block_size # noqa: RET504
898def format_randomness_plot(report: RandomnessReport):
899 # start from scratch
900 plt.clear_figure()
901 # go colorless
902 plt.clear_color()
903 plt.title("Entropy distribution")
904 plt.xlabel(f"{report.shannon.block_size} bytes")
906 plt.plot(report.shannon.percentages, label="Shannon entropy (%)", marker="dot")
907 plt.plot(
908 report.chi_square.percentages,
909 label="Chi square probability (%)",
910 marker="cross",
911 )
912 # 16 height leaves no gaps between the lines
913 plt.plot_size(100, 16)
914 plt.ylim(0, 100)
915 # Draw ticks every 1Mb on the x axis.
916 plt.xticks(range(len(report.shannon.percentages) + 1))
917 # Always show 0% and 100%
918 plt.yticks(range(0, 101, 10))
920 return plt.build()