Coverage for /pythoncovmergedfiles/medio/medio/src/unblob/fuzzing/search_chunks_fuzzer.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/env python3
13import logging
14import sys
15from pathlib import Path
17import atheris.import_hook
18import atheris.instrument_bytecode
19import structlog
22def set_unblob_log_level(level=logging.CRITICAL):
23 logger = logging.getLogger("unblob")
25 def logger_factory():
26 return logger
28 structlog.configure(logger_factory=logger_factory)
29 logger.setLevel(level)
32def extract(inpath: Path, outpath: Path): # noqa: ARG001
33 return
36with atheris.import_hook.instrument_imports(
37 include=["unblob"], exclude=["unblob._rust"]
38):
39 from unblob.extractors.command import Command
40 from unblob.file_utils import File
41 from unblob.finder import search_chunks
42 from unblob.models import Task, TaskResult
43 from unblob.processing import ExtractionConfig
45 # NOTE: monkey patch Command extractor so we don't loose time executing subprocesses
46 Command.extract = classmethod(extract) # type: ignore
49@atheris.instrument_bytecode.instrument_func
50def test_search_chunks(data):
51 config = ExtractionConfig(
52 extract_root=Path("/dev/shm"), # noqa: S108
53 force_extract=True,
54 randomness_depth=0,
55 randomness_plot=False,
56 skip_magic=[],
57 skip_extension=[],
58 skip_extraction=False,
59 process_num=1,
60 keep_extracted_chunks=True,
61 verbose=0,
62 )
64 if not len(data):
65 return
67 with File.from_bytes(data) as file:
68 task = Task(
69 path=Path("/dev/shm/nonexistent"), # noqa: S108
70 depth=0,
71 blob_id="",
72 )
73 result = TaskResult(task)
74 search_chunks(file, len(data), config.handlers, result)
77if __name__ == "__main__":
78 set_unblob_log_level()
79 atheris.Setup(sys.argv, test_search_chunks)
80 atheris.Fuzz()