Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/finder.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1"""Searching Chunk related functions. 

2 

3The main "entry point" is search_chunks_by_priority. 

4""" 

5 

6import sys 

7from functools import lru_cache 

8 

9import attrs 

10from pyperscan import Flag, Pattern, Scan, StreamDatabase 

11from structlog import get_logger 

12 

13from .file_utils import DEFAULT_BUFSIZE, InvalidInputFormat, SeekError 

14from .handlers import Handlers 

15from .models import File, Handler, TaskResult, ValidChunk 

16from .parser import InvalidHexString 

17from .report import CalculateChunkExceptionReport 

18 

19logger = get_logger() 

20 

21 

22@attrs.define 

23class HyperscanMatchContext: 

24 file: File 

25 file_size: int 

26 all_chunks: list 

27 task_result: TaskResult 

28 start_offset: int 

29 

30 

31def _calculate_chunk( 

32 handler: Handler, file: File, real_offset, task_result: TaskResult 

33) -> ValidChunk | None: 

34 file.seek(real_offset) 

35 try: 

36 return handler.calculate_chunk(file, real_offset) 

37 except InvalidInputFormat as exc: 

38 logger.debug( 

39 "File format is invalid", 

40 exc_info=exc, 

41 handler=handler.NAME, 

42 _verbosity=2, 

43 ) 

44 except EOFError as exc: 

45 logger.debug( 

46 "File ends before header could be read", 

47 exc_info=exc, 

48 handler=handler.NAME, 

49 _verbosity=2, 

50 ) 

51 except SeekError as exc: 

52 logger.debug( 

53 "Seek outside file during chunk calculation", 

54 exc_info=exc, 

55 handler=handler.NAME, 

56 _verbosity=2, 

57 ) 

58 except Exception as exc: 

59 error_report = CalculateChunkExceptionReport( 

60 handler=handler.NAME, 

61 start_offset=real_offset, 

62 exception=exc, 

63 ) 

64 task_result.add_report(error_report) 

65 logger.error( 

66 "Unhandled Exception during chunk calculation", **error_report.model_dump() 

67 ) 

68 

69 

70def _hyperscan_match( 

71 context: HyperscanMatchContext, handler: Handler, offset: int, end: int 

72) -> Scan: 

73 del end # unused argument 

74 offset += context.start_offset 

75 real_offset = offset + handler.PATTERN_MATCH_OFFSET 

76 

77 # See https://github.com/vlaci/pyperscan/issues/110 

78 if real_offset < 0 or real_offset > sys.maxsize: 

79 return Scan.Continue 

80 

81 # Skip chunk calculation if this would start inside another one, 

82 # similar to remove_inner_chunks, but before we even begin calculating. 

83 if any(chunk.contains_offset(real_offset) for chunk in context.all_chunks): 

84 logger.debug( 

85 "Skip chunk calculation as pattern is inside an other chunk", 

86 handler=handler.NAME, 

87 offset=real_offset, 

88 _verbosity=2, 

89 ) 

90 return Scan.Continue 

91 

92 logger.debug( 

93 "Calculating chunk for pattern match", 

94 start_offset=offset, 

95 real_offset=real_offset, 

96 _verbosity=2, 

97 handler=handler.NAME, 

98 ) 

99 

100 chunk = _calculate_chunk(handler, context.file, real_offset, context.task_result) 

101 

102 # We found some random bytes this handler couldn't parse 

103 if chunk is None: 

104 return Scan.Continue 

105 

106 if chunk.end_offset > context.file_size: 

107 logger.debug("Chunk overflows file", chunk=chunk, _verbosity=2) 

108 return Scan.Continue 

109 

110 chunk.handler = handler 

111 logger.debug("Found valid chunk", chunk=chunk, handler=handler.NAME, _verbosity=1) 

112 context.all_chunks.append(chunk) 

113 context.start_offset = chunk.end_offset 

114 

115 return Scan.Terminate 

116 

117 

118def stream_scan_chunks(scanner, file: File, context: HyperscanMatchContext): 

119 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode.""" 

120 i = context.start_offset 

121 with memoryview(file) as data: 

122 while i < file.size(): 

123 if scanner.scan(data[i : i + DEFAULT_BUFSIZE]) == Scan.Terminate: 

124 scanner.reset() 

125 i = context.start_offset 

126 else: 

127 i += DEFAULT_BUFSIZE 

128 

129 

130def search_chunks( 

131 file: File, 

132 file_size: int, 

133 handlers: Handlers, 

134 task_result: TaskResult, 

135) -> list[ValidChunk]: 

136 """Search all ValidChunks within the file. 

137 

138 Search for patterns and run Handler.calculate_chunk() on the found 

139 matches. We don't deal with offset within already found 

140 ValidChunks and invalid chunks are thrown away. If chunk covers 

141 the whole file we stop any further search and processing. 

142 """ 

143 all_chunks = [] 

144 

145 hyperscan_db = build_hyperscan_database(handlers) 

146 

147 hyperscan_context = HyperscanMatchContext( 

148 file=file, 

149 file_size=file_size, 

150 all_chunks=all_chunks, 

151 task_result=task_result, 

152 start_offset=0, 

153 ) 

154 

155 scanner = hyperscan_db.build(hyperscan_context, _hyperscan_match) # type: ignore 

156 

157 try: 

158 stream_scan_chunks(scanner, file, hyperscan_context) 

159 except Exception as e: 

160 logger.error( 

161 "Error scanning for patterns", 

162 error=e, 

163 ) 

164 

165 logger.debug( 

166 "Ended searching for chunks", 

167 all_chunks=all_chunks, 

168 ) 

169 

170 return all_chunks 

171 

172 

173@lru_cache 

174def build_hyperscan_database(handlers: Handlers) -> StreamDatabase: 

175 patterns = [] 

176 for handler_class in handlers: 

177 handler = handler_class() 

178 for pattern in handler.PATTERNS: 

179 try: 

180 patterns.append( 

181 Pattern( 

182 pattern.as_regex(), 

183 Flag.SOM_LEFTMOST, 

184 Flag.DOTALL, 

185 tag=handler, 

186 ) 

187 ) 

188 except InvalidHexString as e: 

189 logger.error( 

190 "Invalid pattern", 

191 handler=handler.NAME, 

192 pattern=pattern, 

193 error=str(e), 

194 ) 

195 raise 

196 return StreamDatabase(*patterns)