Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/finder.py: 94%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Searching Chunk related functions.
3The main "entry point" is search_chunks_by_priority.
4"""
6from functools import lru_cache
7from typing import Optional
9import attrs
10from pyperscan import Flag, Pattern, Scan, StreamDatabase
11from structlog import get_logger
13from .file_utils import DEFAULT_BUFSIZE, InvalidInputFormat, SeekError
14from .handlers import Handlers
15from .models import File, Handler, TaskResult, ValidChunk
16from .parser import InvalidHexString
17from .report import CalculateChunkExceptionReport
19logger = get_logger()
22@attrs.define
23class HyperscanMatchContext:
24 file: File
25 file_size: int
26 all_chunks: list
27 task_result: TaskResult
28 start_offset: int
31def _calculate_chunk(
32 handler: Handler, file: File, real_offset, task_result: TaskResult
33) -> Optional[ValidChunk]:
34 file.seek(real_offset)
35 try:
36 return handler.calculate_chunk(file, real_offset)
37 except InvalidInputFormat as exc:
38 logger.debug(
39 "File format is invalid",
40 exc_info=exc,
41 handler=handler.NAME,
42 _verbosity=2,
43 )
44 except EOFError as exc:
45 logger.debug(
46 "File ends before header could be read",
47 exc_info=exc,
48 handler=handler.NAME,
49 _verbosity=2,
50 )
51 except SeekError as exc:
52 logger.debug(
53 "Seek outside file during chunk calculation",
54 exc_info=exc,
55 handler=handler.NAME,
56 _verbosity=2,
57 )
58 except Exception as exc:
59 error_report = CalculateChunkExceptionReport(
60 handler=handler.NAME,
61 start_offset=real_offset,
62 exception=exc,
63 )
64 task_result.add_report(error_report)
65 logger.error(
66 "Unhandled Exception during chunk calculation", **error_report.asdict()
67 )
70def _hyperscan_match(
71 context: HyperscanMatchContext, handler: Handler, offset: int, end: int
72) -> Scan:
73 del end # unused argument
74 offset += context.start_offset
75 real_offset = offset + handler.PATTERN_MATCH_OFFSET
77 if real_offset < 0:
78 return Scan.Continue
80 # Skip chunk calculation if this would start inside another one,
81 # similar to remove_inner_chunks, but before we even begin calculating.
82 if any(chunk.contains_offset(real_offset) for chunk in context.all_chunks):
83 logger.debug(
84 "Skip chunk calculation as pattern is inside an other chunk",
85 handler=handler.NAME,
86 offset=real_offset,
87 _verbosity=2,
88 )
89 return Scan.Continue
91 logger.debug(
92 "Calculating chunk for pattern match",
93 start_offset=offset,
94 real_offset=real_offset,
95 _verbosity=2,
96 handler=handler.NAME,
97 )
99 chunk = _calculate_chunk(handler, context.file, real_offset, context.task_result)
101 # We found some random bytes this handler couldn't parse
102 if chunk is None:
103 return Scan.Continue
105 if chunk.end_offset > context.file_size:
106 logger.debug("Chunk overflows file", chunk=chunk, _verbosity=2)
107 return Scan.Continue
109 chunk.handler = handler
110 logger.debug("Found valid chunk", chunk=chunk, handler=handler.NAME, _verbosity=1)
111 context.all_chunks.append(chunk)
112 context.start_offset = chunk.end_offset
114 return Scan.Terminate
117def stream_scan_chunks(scanner, file: File, context: HyperscanMatchContext):
118 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode."""
119 i = context.start_offset
120 with memoryview(file) as data:
121 while i < file.size():
122 if scanner.scan(data[i : i + DEFAULT_BUFSIZE]) == Scan.Terminate:
123 scanner.reset()
124 i = context.start_offset
125 else:
126 i += DEFAULT_BUFSIZE
129def search_chunks(
130 file: File,
131 file_size: int,
132 handlers: Handlers,
133 task_result: TaskResult,
134) -> list[ValidChunk]:
135 """Search all ValidChunks within the file.
137 Search for patterns and run Handler.calculate_chunk() on the found
138 matches. We don't deal with offset within already found
139 ValidChunks and invalid chunks are thrown away. If chunk covers
140 the whole file we stop any further search and processing.
141 """
142 all_chunks = []
144 hyperscan_db = build_hyperscan_database(handlers)
146 hyperscan_context = HyperscanMatchContext(
147 file=file,
148 file_size=file_size,
149 all_chunks=all_chunks,
150 task_result=task_result,
151 start_offset=0,
152 )
154 scanner = hyperscan_db.build(hyperscan_context, _hyperscan_match) # type: ignore
156 try:
157 stream_scan_chunks(scanner, file, hyperscan_context)
158 except Exception as e:
159 logger.error(
160 "Error scanning for patterns",
161 error=e,
162 )
164 logger.debug(
165 "Ended searching for chunks",
166 all_chunks=all_chunks,
167 )
169 return all_chunks
172@lru_cache
173def build_hyperscan_database(handlers: Handlers) -> StreamDatabase:
174 patterns = []
175 for handler_class in handlers:
176 handler = handler_class()
177 for pattern in handler.PATTERNS:
178 try:
179 patterns.append(
180 Pattern(
181 pattern.as_regex(),
182 Flag.SOM_LEFTMOST,
183 Flag.DOTALL,
184 tag=handler,
185 )
186 )
187 except InvalidHexString as e:
188 logger.error(
189 "Invalid pattern",
190 handler=handler.NAME,
191 pattern=pattern,
192 error=str(e),
193 )
194 raise
195 return StreamDatabase(*patterns)