Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathspec/_backends/hyperscan/pathspec.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1""" 

2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`. 

3 

4WARNING: The *pathspec._backends.hyperscan* package is not part of the public 

5API. Its contents and structure are likely to change. 

6""" 

7from __future__ import annotations 

8 

9from collections.abc import ( 

10 Sequence) 

11from typing import ( 

12 Any, 

13 Callable, # Replaced by `collections.abc.Callable` in 3.9.2. 

14 Optional) # Replaced by `X | None` in 3.10. 

15 

16try: 

17 import hyperscan 

18except ModuleNotFoundError: 

19 hyperscan = None # type: ignore[assignment] 

20 

21from pathspec.backend import ( 

22 _Backend) 

23from pathspec.pattern import ( 

24 RegexPattern) 

25from pathspec._typing import ( 

26 override) # Added in 3.12. 

27 

28from .._utils import ( 

29 enumerate_patterns) 

30 

31from .base import ( 

32 hyperscan_error) 

33from ._base import ( 

34 HS_FLAGS, 

35 HyperscanExprDat, 

36 HyperscanExprDebug) 

37 

38 

39class HyperscanPsBackend(_Backend): 

40 """ 

41 The :class:`HyperscanPsBackend` class is the :module:`hyperscan` 

42 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching 

43 files. The Hyperscan database uses block mode for matching files. 

44 """ 

45 

46 def __init__( 

47 self, 

48 patterns: Sequence[RegexPattern], 

49 *, 

50 _debug_exprs: Optional[bool] = None, 

51 _test_sort: Optional[Callable[[list], None]] = None, 

52 ) -> None: 

53 """ 

54 Initialize the :class:`HyperscanPsBackend` instance. 

55 

56 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the 

57 compiled patterns. 

58 """ 

59 if hyperscan is None: 

60 assert hyperscan_error is not None, (hyperscan, hyperscan_error) 

61 raise hyperscan_error 

62 

63 if patterns and not isinstance(patterns[0], RegexPattern): 

64 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.") 

65 

66 use_patterns = enumerate_patterns( 

67 patterns, filter=True, reverse=False, 

68 ) 

69 

70 debug_exprs = bool(_debug_exprs) 

71 if use_patterns: 

72 db = self._make_db() 

73 expr_data = self._init_db( 

74 db=db, 

75 debug=debug_exprs, 

76 patterns=use_patterns, 

77 sort_ids=_test_sort, 

78 ) 

79 else: 

80 # WARNING: The hyperscan database cannot be initialized with zero 

81 # patterns. 

82 db = None 

83 expr_data = [] 

84 

85 self._db: Optional[hyperscan.Database] = db # type: ignore 

86 """ 

87 *_db* (:class:`hyperscan.Database`) is the Hyperscan database. 

88 """ 

89 

90 self._debug_exprs = debug_exprs 

91 """ 

92 *_debug_exprs* (:class:`bool`) is whether to include additional debugging 

93 information for the expressions. 

94 """ 

95 

96 self._expr_data: list[HyperscanExprDat] = expr_data 

97 """ 

98 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to 

99 expression data (:class:`:class:`HyperscanExprDat`). 

100 """ 

101 

102 self._out: tuple[Optional[bool], int] = (None, -1) 

103 """ 

104 *_out* (:class:`tuple`) stores the current match: 

105 

106 - *0* (:class:`bool` or :data:`None`) is the match include. 

107 

108 - *1* (:class:`int`) is the match index. 

109 """ 

110 

111 self._patterns: dict[int, RegexPattern] = dict(use_patterns) 

112 """ 

113 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern 

114 (:class:`RegexPattern`). 

115 """ 

116 

117 @staticmethod 

118 def _init_db( 

119 db: hyperscan.Database, # type: ignore 

120 debug: bool, 

121 patterns: list[tuple[int, RegexPattern]], 

122 sort_ids: Optional[Callable[[list[int]], None]], 

123 ) -> list[HyperscanExprDat]: 

124 """ 

125 Initialize the Hyperscan database from the given patterns. 

126 

127 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database. 

128 

129 *debug* (:class:`bool`) is whether to include additional debugging 

130 information for the expressions. 

131 

132 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`) 

133 contains the patterns. 

134 

135 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort 

136 the compiled expression ids. This is used during testing to ensure the order 

137 of expressions is not accidentally relied on. 

138 

139 Returns a :class:`list` indexed by expression id (:class:`int`) to its data 

140 (:class:`HyperscanExprDat`). 

141 """ 

142 # WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with 

143 # zero elements. 

144 assert patterns, patterns 

145 

146 # Prepare patterns. 

147 expr_data: list[HyperscanExprDat] = [] 

148 exprs: list[bytes] = [] 

149 for pattern_index, pattern in patterns: 

150 assert pattern.include is not None, (pattern_index, pattern) 

151 assert pattern.regex is not None, (pattern_index, pattern) 

152 

153 # Encode regex. 

154 assert isinstance(pattern, RegexPattern), pattern 

155 regex = pattern.regex.pattern 

156 

157 if isinstance(regex, bytes): 

158 regex_bytes = regex 

159 else: 

160 assert isinstance(regex, str), regex 

161 regex_bytes = regex.encode('utf8') 

162 

163 if debug: 

164 expr_data.append(HyperscanExprDebug( 

165 include=pattern.include, 

166 index=pattern_index, 

167 is_dir_pattern=False, 

168 regex=regex, 

169 )) 

170 else: 

171 expr_data.append(HyperscanExprDat( 

172 include=pattern.include, 

173 index=pattern_index, 

174 is_dir_pattern=False, 

175 )) 

176 

177 exprs.append(regex_bytes) 

178 

179 # Sort expressions. 

180 ids = list(range(len(exprs))) 

181 if sort_ids is not None: 

182 sort_ids(ids) 

183 exprs = [exprs[__id] for __id in ids] 

184 

185 # Compile patterns. 

186 db.compile( 

187 expressions=exprs, 

188 ids=ids, 

189 elements=len(exprs), 

190 flags=HS_FLAGS, 

191 ) 

192 

193 return expr_data 

194 

195 @override 

196 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]: 

197 """ 

198 Check the file against the patterns. 

199 

200 *file* (:class:`str`) is the normalized file path to check. 

201 

202 Returns a :class:`tuple` containing whether to include *file* (:class:`bool` 

203 or :data:`None`), and the index of the last matched pattern (:class:`int` or 

204 :data:`None`). 

205 """ 

206 # NOTICE: According to benchmarking, a method callback is 20% faster than 

207 # using a closure here. 

208 db = self._db 

209 if db is None: 

210 # Database was not initialized because there were no patterns. Return no 

211 # match. 

212 return (None, None) 

213 

214 self._out = (None, -1) 

215 db.scan(file.encode('utf8'), match_event_handler=self.__on_match) 

216 

217 out_index: Optional[int] 

218 out_include, out_index = self._out 

219 if out_index == -1: 

220 out_index = None 

221 

222 return (out_include, out_index) 

223 

224 @staticmethod 

225 def _make_db() -> hyperscan.Database: # type: ignore 

226 """ 

227 Create the Hyperscan database. 

228 

229 Returns the database (:class:`hyperscan.Database`). 

230 """ 

231 assert hyperscan is not None, (hyperscan, hyperscan_error) 

232 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK) 

233 

234 def __on_match( 

235 self, 

236 expr_id: int, 

237 _from: int, 

238 _to: int, 

239 _flags: int, 

240 _context: Any, 

241 ) -> Optional[bool]: 

242 """ 

243 Called on each match. 

244 

245 *expr_id* (:class:`int`) is the expression id (index) of the matched 

246 pattern. 

247 """ 

248 # Store match. 

249 # - WARNING: Hyperscan does not guarantee matches will be produced in order! 

250 # Later expressions have higher priority. 

251 expr_dat = self._expr_data[expr_id] 

252 index = expr_dat.index 

253 prev_index = self._out[1] 

254 if index > prev_index: 

255 self._out = (expr_dat.include, index) 

256 

257 return None