Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/finder.py: 96%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Searching Chunk related functions.
3The main "entry point" is search_chunks_by_priority.
4"""
6import sys
7from functools import lru_cache
9import attrs
10from pyperscan import Flag, Pattern, Scan, StreamDatabase
11from structlog import get_logger
13from .file_utils import DEFAULT_BUFSIZE, InvalidInputFormat, SeekError
14from .handlers import Handlers
15from .models import File, Handler, TaskResult, ValidChunk
16from .parser import InvalidHexString
17from .report import CalculateChunkExceptionReport
19logger = get_logger()
22@attrs.define
23class HyperscanMatchContext:
24 file: File
25 file_size: int
26 all_chunks: list
27 task_result: TaskResult
28 start_offset: int
31def _calculate_chunk(
32 handler: Handler, file: File, real_offset, task_result: TaskResult
33) -> ValidChunk | None:
34 file.seek(real_offset)
35 try:
36 return handler.calculate_chunk(file, real_offset)
37 except InvalidInputFormat as exc:
38 logger.debug(
39 "File format is invalid",
40 exc_info=exc,
41 handler=handler.NAME,
42 _verbosity=2,
43 )
44 except EOFError as exc:
45 logger.debug(
46 "File ends before header could be read",
47 exc_info=exc,
48 handler=handler.NAME,
49 _verbosity=2,
50 )
51 except SeekError as exc:
52 logger.debug(
53 "Seek outside file during chunk calculation",
54 exc_info=exc,
55 handler=handler.NAME,
56 _verbosity=2,
57 )
58 except Exception as exc:
59 error_report = CalculateChunkExceptionReport(
60 handler=handler.NAME,
61 start_offset=real_offset,
62 exception=exc,
63 )
64 task_result.add_report(error_report)
65 logger.error(
66 "Unhandled Exception during chunk calculation", **error_report.model_dump()
67 )
70def _hyperscan_match(
71 context: HyperscanMatchContext, handler: Handler, offset: int, end: int
72) -> Scan:
73 del end # unused argument
74 offset += context.start_offset
75 real_offset = offset + handler.PATTERN_MATCH_OFFSET
77 # See https://github.com/vlaci/pyperscan/issues/110
78 if real_offset < 0 or real_offset > sys.maxsize:
79 return Scan.Continue
81 # Skip chunk calculation if this would start inside another one,
82 # similar to remove_inner_chunks, but before we even begin calculating.
83 if any(chunk.contains_offset(real_offset) for chunk in context.all_chunks):
84 logger.debug(
85 "Skip chunk calculation as pattern is inside an other chunk",
86 handler=handler.NAME,
87 offset=real_offset,
88 _verbosity=2,
89 )
90 return Scan.Continue
92 logger.debug(
93 "Calculating chunk for pattern match",
94 start_offset=offset,
95 real_offset=real_offset,
96 _verbosity=2,
97 handler=handler.NAME,
98 )
100 chunk = _calculate_chunk(handler, context.file, real_offset, context.task_result)
102 # We found some random bytes this handler couldn't parse
103 if chunk is None:
104 return Scan.Continue
106 if chunk.end_offset > context.file_size:
107 logger.debug("Chunk overflows file", chunk=chunk, _verbosity=2)
108 return Scan.Continue
110 chunk.handler = handler
111 logger.debug("Found valid chunk", chunk=chunk, handler=handler.NAME, _verbosity=1)
112 context.all_chunks.append(chunk)
113 context.start_offset = chunk.end_offset
115 return Scan.Terminate
118def stream_scan_chunks(scanner, file: File, context: HyperscanMatchContext):
119 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode."""
120 i = context.start_offset
121 with memoryview(file) as data:
122 while i < file.size():
123 if scanner.scan(data[i : i + DEFAULT_BUFSIZE]) == Scan.Terminate:
124 scanner.reset()
125 i = context.start_offset
126 else:
127 i += DEFAULT_BUFSIZE
130def search_chunks(
131 file: File,
132 file_size: int,
133 handlers: Handlers,
134 task_result: TaskResult,
135) -> list[ValidChunk]:
136 """Search all ValidChunks within the file.
138 Search for patterns and run Handler.calculate_chunk() on the found
139 matches. We don't deal with offset within already found
140 ValidChunks and invalid chunks are thrown away. If chunk covers
141 the whole file we stop any further search and processing.
142 """
143 all_chunks = []
145 hyperscan_db = build_hyperscan_database(handlers)
147 hyperscan_context = HyperscanMatchContext(
148 file=file,
149 file_size=file_size,
150 all_chunks=all_chunks,
151 task_result=task_result,
152 start_offset=0,
153 )
155 scanner = hyperscan_db.build(hyperscan_context, _hyperscan_match) # type: ignore
157 try:
158 stream_scan_chunks(scanner, file, hyperscan_context)
159 except Exception as e:
160 logger.error(
161 "Error scanning for patterns",
162 error=e,
163 )
165 logger.debug(
166 "Ended searching for chunks",
167 all_chunks=all_chunks,
168 )
170 return all_chunks
173@lru_cache
174def build_hyperscan_database(handlers: Handlers) -> StreamDatabase:
175 patterns = []
176 for handler_class in handlers:
177 handler = handler_class()
178 for pattern in handler.PATTERNS:
179 try:
180 patterns.append(
181 Pattern(
182 pattern.as_regex(),
183 Flag.SOM_LEFTMOST,
184 Flag.DOTALL,
185 tag=handler,
186 )
187 )
188 except InvalidHexString as e:
189 logger.error(
190 "Invalid pattern",
191 handler=handler.NAME,
192 pattern=pattern,
193 error=str(e),
194 )
195 raise
196 return StreamDatabase(*patterns)