1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9from collections.abc import (
10 Sequence)
11from typing import (
12 Any,
13 Callable, # Replaced by `collections.abc.Callable` in 3.9.2.
14 Optional) # Replaced by `X | None` in 3.10.
15
16try:
17 import hyperscan
18except ModuleNotFoundError:
19 hyperscan = None # type: ignore[assignment]
20
21from pathspec.backend import (
22 _Backend)
23from pathspec.pattern import (
24 RegexPattern)
25from pathspec._typing import (
26 override) # Added in 3.12.
27
28from .._utils import (
29 enumerate_patterns)
30
31from .base import (
32 hyperscan_error)
33from ._base import (
34 HS_FLAGS,
35 HyperscanExprDat,
36 HyperscanExprDebug)
37
38
39class HyperscanPsBackend(_Backend):
40 """
41 The :class:`HyperscanPsBackend` class is the :module:`hyperscan`
42 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching
43 files. The Hyperscan database uses block mode for matching files.
44 """
45
46 def __init__(
47 self,
48 patterns: Sequence[RegexPattern],
49 *,
50 _debug_exprs: Optional[bool] = None,
51 _test_sort: Optional[Callable[[list], None]] = None,
52 ) -> None:
53 """
54 Initialize the :class:`HyperscanPsBackend` instance.
55
56 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
57 compiled patterns.
58 """
59 if hyperscan is None:
60 assert hyperscan_error is not None, (hyperscan, hyperscan_error)
61 raise hyperscan_error
62
63 if patterns and not isinstance(patterns[0], RegexPattern):
64 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.")
65
66 use_patterns = enumerate_patterns(
67 patterns, filter=True, reverse=False,
68 )
69
70 debug_exprs = bool(_debug_exprs)
71 if use_patterns:
72 db = self._make_db()
73 expr_data = self._init_db(
74 db=db,
75 debug=debug_exprs,
76 patterns=use_patterns,
77 sort_ids=_test_sort,
78 )
79 else:
80 # WARNING: The hyperscan database cannot be initialized with zero
81 # patterns.
82 db = None
83 expr_data = []
84
85 self._db: Optional[hyperscan.Database] = db # type: ignore
86 """
87 *_db* (:class:`hyperscan.Database`) is the Hyperscan database.
88 """
89
90 self._debug_exprs = debug_exprs
91 """
92 *_debug_exprs* (:class:`bool`) is whether to include additional debugging
93 information for the expressions.
94 """
95
96 self._expr_data: list[HyperscanExprDat] = expr_data
97 """
98 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to
99 expression data (:class:`:class:`HyperscanExprDat`).
100 """
101
102 self._out: tuple[Optional[bool], int] = (None, -1)
103 """
104 *_out* (:class:`tuple`) stores the current match:
105
106 - *0* (:class:`bool` or :data:`None`) is the match include.
107
108 - *1* (:class:`int`) is the match index.
109 """
110
111 self._patterns: dict[int, RegexPattern] = dict(use_patterns)
112 """
113 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern
114 (:class:`RegexPattern`).
115 """
116
117 @staticmethod
118 def _init_db(
119 db: hyperscan.Database, # type: ignore
120 debug: bool,
121 patterns: list[tuple[int, RegexPattern]],
122 sort_ids: Optional[Callable[[list[int]], None]],
123 ) -> list[HyperscanExprDat]:
124 """
125 Initialize the Hyperscan database from the given patterns.
126
127 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
128
129 *debug* (:class:`bool`) is whether to include additional debugging
130 information for the expressions.
131
132 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
133 contains the patterns.
134
135 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
136 the compiled expression ids. This is used during testing to ensure the order
137 of expressions is not accidentally relied on.
138
139 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
140 (:class:`HyperscanExprDat`).
141 """
142 # WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with
143 # zero elements.
144 assert patterns, patterns
145
146 # Prepare patterns.
147 expr_data: list[HyperscanExprDat] = []
148 exprs: list[bytes] = []
149 for pattern_index, pattern in patterns:
150 assert pattern.include is not None, (pattern_index, pattern)
151 assert pattern.regex is not None, (pattern_index, pattern)
152
153 # Encode regex.
154 assert isinstance(pattern, RegexPattern), pattern
155 regex = pattern.regex.pattern
156
157 if isinstance(regex, bytes):
158 regex_bytes = regex
159 else:
160 assert isinstance(regex, str), regex
161 regex_bytes = regex.encode('utf8')
162
163 if debug:
164 expr_data.append(HyperscanExprDebug(
165 include=pattern.include,
166 index=pattern_index,
167 is_dir_pattern=False,
168 regex=regex,
169 ))
170 else:
171 expr_data.append(HyperscanExprDat(
172 include=pattern.include,
173 index=pattern_index,
174 is_dir_pattern=False,
175 ))
176
177 exprs.append(regex_bytes)
178
179 # Sort expressions.
180 ids = list(range(len(exprs)))
181 if sort_ids is not None:
182 sort_ids(ids)
183 exprs = [exprs[__id] for __id in ids]
184
185 # Compile patterns.
186 db.compile(
187 expressions=exprs,
188 ids=ids,
189 elements=len(exprs),
190 flags=HS_FLAGS,
191 )
192
193 return expr_data
194
195 @override
196 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
197 """
198 Check the file against the patterns.
199
200 *file* (:class:`str`) is the normalized file path to check.
201
202 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
203 or :data:`None`), and the index of the last matched pattern (:class:`int` or
204 :data:`None`).
205 """
206 # NOTICE: According to benchmarking, a method callback is 20% faster than
207 # using a closure here.
208 db = self._db
209 if db is None:
210 # Database was not initialized because there were no patterns. Return no
211 # match.
212 return (None, None)
213
214 self._out = (None, -1)
215 db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
216
217 out_index: Optional[int]
218 out_include, out_index = self._out
219 if out_index == -1:
220 out_index = None
221
222 return (out_include, out_index)
223
224 @staticmethod
225 def _make_db() -> hyperscan.Database: # type: ignore
226 """
227 Create the Hyperscan database.
228
229 Returns the database (:class:`hyperscan.Database`).
230 """
231 assert hyperscan is not None, (hyperscan, hyperscan_error)
232 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)
233
234 def __on_match(
235 self,
236 expr_id: int,
237 _from: int,
238 _to: int,
239 _flags: int,
240 _context: Any,
241 ) -> Optional[bool]:
242 """
243 Called on each match.
244
245 *expr_id* (:class:`int`) is the expression id (index) of the matched
246 pattern.
247 """
248 # Store match.
249 # - WARNING: Hyperscan does not guarantee matches will be produced in order!
250 # Later expressions have higher priority.
251 expr_dat = self._expr_data[expr_id]
252 index = expr_dat.index
253 prev_index = self._out[1]
254 if index > prev_index:
255 self._out = (expr_dat.include, index)
256
257 return None