1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9from collections.abc import (
10 Sequence)
11from typing import (
12 Any,
13 Callable, # Replaced by `collections.abc.Callable` in 3.9.2.
14 Optional) # Replaced by `X | None` in 3.10.
15
16try:
17 import hyperscan
18except ModuleNotFoundError:
19 hyperscan = None
20
21from pathspec.backend import (
22 _Backend)
23from pathspec.pattern import (
24 RegexPattern)
25from pathspec._typing import (
26 override) # Added in 3.12.
27
28from .._utils import (
29 enumerate_patterns)
30
31from .base import (
32 hyperscan_error)
33from ._base import (
34 HS_FLAGS,
35 HyperscanExprDat,
36 HyperscanExprDebug)
37
38
39class HyperscanPsBackend(_Backend):
40 """
41 The :class:`HyperscanPsBackend` class is the :module:`hyperscan`
42 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching
43 files. The Hyperscan database uses block mode for matching files.
44 """
45
46 def __init__(
47 self,
48 patterns: Sequence[RegexPattern],
49 *,
50 _debug_exprs: Optional[bool] = None,
51 _test_sort: Optional[Callable[[list], None]] = None,
52 ) -> None:
53 """
54 Initialize the :class:`HyperscanPsBackend` instance.
55
56 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
57 compiled patterns.
58 """
59 if hyperscan is None:
60 raise hyperscan_error
61
62 if patterns and not isinstance(patterns[0], RegexPattern):
63 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.")
64
65 use_patterns = enumerate_patterns(
66 patterns, filter=True, reverse=False,
67 )
68
69 debug_exprs = bool(_debug_exprs)
70 if use_patterns:
71 db = self._make_db()
72 expr_data = self._init_db(
73 db=db,
74 debug=debug_exprs,
75 patterns=use_patterns,
76 sort_ids=_test_sort,
77 )
78 else:
79 # WARNING: The hyperscan database cannot be initialized with zero
80 # patterns.
81 db = None
82 expr_data = []
83
84 self._db: Optional[hyperscan.Database] = db
85 """
86 *_db* (:class:`hyperscan.Database`) is the Hyperscan database.
87 """
88
89 self._debug_exprs = debug_exprs
90 """
91 *_debug_exprs* (:class:`bool`) is whether to include additional debugging
92 information for the expressions.
93 """
94
95 self._expr_data: list[HyperscanExprDat] = expr_data
96 """
97 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to
98 expression data (:class:`:class:`HyperscanExprDat`).
99 """
100
101 self._out: tuple[Optional[bool], int] = (None, -1)
102 """
103 *_out* (:class:`tuple`) stores the current match:
104
105 - *0* (:class:`bool` or :data:`None`) is the match include.
106
107 - *1* (:class:`int`) is the match index.
108 """
109
110 self._patterns: dict[int, RegexPattern] = dict(use_patterns)
111 """
112 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern
113 (:class:`RegexPattern`).
114 """
115
116 @staticmethod
117 def _init_db(
118 db: hyperscan.Database,
119 debug: bool,
120 patterns: list[tuple[int, RegexPattern]],
121 sort_ids: Optional[Callable[[list[int]], None]],
122 ) -> list[HyperscanExprDat]:
123 """
124 Initialize the Hyperscan database from the given patterns.
125
126 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
127
128 *debug* (:class:`bool`) is whether to include additional debugging
129 information for the expressions.
130
131 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
132 contains the patterns.
133
134 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
135 the compiled expression ids. This is used during testing to ensure the order
136 of expressions is not accidentally relied on.
137
138 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
139 (:class:`HyperscanExprDat`).
140 """
141 # WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with
142 # zero elements.
143 assert patterns, patterns
144
145 # Prepare patterns.
146 expr_data: list[HyperscanExprDat] = []
147 exprs: list[bytes] = []
148 for pattern_index, pattern in patterns:
149 assert pattern.include is not None, (pattern_index, pattern)
150
151 # Encode regex.
152 assert isinstance(pattern, RegexPattern), pattern
153 regex = pattern.regex.pattern
154
155 if isinstance(regex, bytes):
156 regex_bytes = regex
157 else:
158 assert isinstance(regex, str), regex
159 regex_bytes = regex.encode('utf8')
160
161 if debug:
162 expr_data.append(HyperscanExprDebug(
163 include=pattern.include,
164 index=pattern_index,
165 is_dir_pattern=False,
166 regex=regex,
167 ))
168 else:
169 expr_data.append(HyperscanExprDat(
170 include=pattern.include,
171 index=pattern_index,
172 is_dir_pattern=False,
173 ))
174
175 exprs.append(regex_bytes)
176
177 # Sort expressions.
178 ids = list(range(len(exprs)))
179 if sort_ids is not None:
180 sort_ids(ids)
181 exprs = [exprs[__id] for __id in ids]
182
183 # Compile patterns.
184 db.compile(
185 expressions=exprs,
186 ids=ids,
187 elements=len(exprs),
188 flags=HS_FLAGS,
189 )
190
191 return expr_data
192
193 @override
194 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
195 """
196 Check the file against the patterns.
197
198 *file* (:class:`str`) is the normalized file path to check.
199
200 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
201 or :data:`None`), and the index of the last matched pattern (:class:`int` or
202 :data:`None`).
203 """
204 # NOTICE: According to benchmarking, a method callback is 20% faster than
205 # using a closure here.
206 db = self._db
207 if self._db is None:
208 # Database was not initialized because there were no patterns. Return no
209 # match.
210 return (None, None)
211
212 self._out = (None, -1)
213 db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
214
215 out_include, out_index = self._out
216 if out_index == -1:
217 out_index = None
218
219 return (out_include, out_index)
220
221 @staticmethod
222 def _make_db() -> hyperscan.Database:
223 """
224 Create the Hyperscan database.
225
226 Returns the database (:class:`hyperscan.Database`).
227 """
228 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)
229
230 def __on_match(
231 self,
232 expr_id: int,
233 _from: int,
234 _to: int,
235 _flags: int,
236 _context: Any,
237 ) -> Optional[bool]:
238 """
239 Called on each match.
240
241 *expr_id* (:class:`int`) is the expression id (index) of the matched
242 pattern.
243 """
244 # Store match.
245 # - WARNING: Hyperscan does not guarantee matches will be produced in order!
246 # Later expressions have higher priority.
247 expr_dat = self._expr_data[expr_id]
248 index = expr_dat.index
249 prev_index = self._out[1]
250 if index > prev_index:
251 self._out = (expr_dat.include, index)