Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/finder.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1"""Searching Chunk related functions. 

2 

3The main "entry point" is search_chunks_by_priority. 

4""" 

5 

6from functools import lru_cache 

7from typing import Optional 

8 

9import attrs 

10from pyperscan import Flag, Pattern, Scan, StreamDatabase 

11from structlog import get_logger 

12 

13from .file_utils import DEFAULT_BUFSIZE, InvalidInputFormat, SeekError 

14from .handlers import Handlers 

15from .models import File, Handler, TaskResult, ValidChunk 

16from .parser import InvalidHexString 

17from .report import CalculateChunkExceptionReport 

18 

19logger = get_logger() 

20 

21 

22@attrs.define 

23class HyperscanMatchContext: 

24 file: File 

25 file_size: int 

26 all_chunks: list 

27 task_result: TaskResult 

28 start_offset: int 

29 

30 

31def _calculate_chunk( 

32 handler: Handler, file: File, real_offset, task_result: TaskResult 

33) -> Optional[ValidChunk]: 

34 file.seek(real_offset) 

35 try: 

36 return handler.calculate_chunk(file, real_offset) 

37 except InvalidInputFormat as exc: 

38 logger.debug( 

39 "File format is invalid", 

40 exc_info=exc, 

41 handler=handler.NAME, 

42 _verbosity=2, 

43 ) 

44 except EOFError as exc: 

45 logger.debug( 

46 "File ends before header could be read", 

47 exc_info=exc, 

48 handler=handler.NAME, 

49 _verbosity=2, 

50 ) 

51 except SeekError as exc: 

52 logger.debug( 

53 "Seek outside file during chunk calculation", 

54 exc_info=exc, 

55 handler=handler.NAME, 

56 _verbosity=2, 

57 ) 

58 except Exception as exc: 

59 error_report = CalculateChunkExceptionReport( 

60 handler=handler.NAME, 

61 start_offset=real_offset, 

62 exception=exc, 

63 ) 

64 task_result.add_report(error_report) 

65 logger.error( 

66 "Unhandled Exception during chunk calculation", **error_report.asdict() 

67 ) 

68 

69 

70def _hyperscan_match( 

71 context: HyperscanMatchContext, handler: Handler, offset: int, end: int 

72) -> Scan: 

73 del end # unused argument 

74 offset += context.start_offset 

75 real_offset = offset + handler.PATTERN_MATCH_OFFSET 

76 

77 if real_offset < 0: 

78 return Scan.Continue 

79 

80 # Skip chunk calculation if this would start inside another one, 

81 # similar to remove_inner_chunks, but before we even begin calculating. 

82 if any(chunk.contains_offset(real_offset) for chunk in context.all_chunks): 

83 logger.debug( 

84 "Skip chunk calculation as pattern is inside an other chunk", 

85 handler=handler.NAME, 

86 offset=real_offset, 

87 _verbosity=2, 

88 ) 

89 return Scan.Continue 

90 

91 logger.debug( 

92 "Calculating chunk for pattern match", 

93 start_offset=offset, 

94 real_offset=real_offset, 

95 _verbosity=2, 

96 handler=handler.NAME, 

97 ) 

98 

99 chunk = _calculate_chunk(handler, context.file, real_offset, context.task_result) 

100 

101 # We found some random bytes this handler couldn't parse 

102 if chunk is None: 

103 return Scan.Continue 

104 

105 if chunk.end_offset > context.file_size: 

106 logger.debug("Chunk overflows file", chunk=chunk, _verbosity=2) 

107 return Scan.Continue 

108 

109 chunk.handler = handler 

110 logger.debug("Found valid chunk", chunk=chunk, handler=handler.NAME, _verbosity=1) 

111 context.all_chunks.append(chunk) 

112 context.start_offset = chunk.end_offset 

113 

114 return Scan.Terminate 

115 

116 

117def stream_scan_chunks(scanner, file: File, context: HyperscanMatchContext): 

118 """Scan the whole file by increment of DEFAULT_BUFSIZE using Hyperscan's streaming mode.""" 

119 i = context.start_offset 

120 with memoryview(file) as data: 

121 while i < file.size(): 

122 if scanner.scan(data[i : i + DEFAULT_BUFSIZE]) == Scan.Terminate: 

123 scanner.reset() 

124 i = context.start_offset 

125 else: 

126 i += DEFAULT_BUFSIZE 

127 

128 

129def search_chunks( 

130 file: File, 

131 file_size: int, 

132 handlers: Handlers, 

133 task_result: TaskResult, 

134) -> list[ValidChunk]: 

135 """Search all ValidChunks within the file. 

136 

137 Search for patterns and run Handler.calculate_chunk() on the found 

138 matches. We don't deal with offset within already found 

139 ValidChunks and invalid chunks are thrown away. If chunk covers 

140 the whole file we stop any further search and processing. 

141 """ 

142 all_chunks = [] 

143 

144 hyperscan_db = build_hyperscan_database(handlers) 

145 

146 hyperscan_context = HyperscanMatchContext( 

147 file=file, 

148 file_size=file_size, 

149 all_chunks=all_chunks, 

150 task_result=task_result, 

151 start_offset=0, 

152 ) 

153 

154 scanner = hyperscan_db.build(hyperscan_context, _hyperscan_match) # type: ignore 

155 

156 try: 

157 stream_scan_chunks(scanner, file, hyperscan_context) 

158 except Exception as e: 

159 logger.error( 

160 "Error scanning for patterns", 

161 error=e, 

162 ) 

163 

164 logger.debug( 

165 "Ended searching for chunks", 

166 all_chunks=all_chunks, 

167 ) 

168 

169 return all_chunks 

170 

171 

172@lru_cache 

173def build_hyperscan_database(handlers: Handlers) -> StreamDatabase: 

174 patterns = [] 

175 for handler_class in handlers: 

176 handler = handler_class() 

177 for pattern in handler.PATTERNS: 

178 try: 

179 patterns.append( 

180 Pattern( 

181 pattern.as_regex(), 

182 Flag.SOM_LEFTMOST, 

183 Flag.DOTALL, 

184 tag=handler, 

185 ) 

186 ) 

187 except InvalidHexString as e: 

188 logger.error( 

189 "Invalid pattern", 

190 handler=handler.NAME, 

191 pattern=pattern, 

192 error=str(e), 

193 ) 

194 raise 

195 return StreamDatabase(*patterns)