1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9from collections.abc import (
10 Callable,
11 Sequence)
12from typing import (
13 Any,
14 Optional) # Replaced by `X | None` in 3.10.
15
16try:
17 import hyperscan
18except ModuleNotFoundError:
19 hyperscan = None
20
21from ...pattern import (
22 RegexPattern)
23from ..._typing import (
24 override) # Added in 3.12.
25
26from ..base import (
27 Backend)
28from .._utils import (
29 enumerate_patterns)
30
31from .base import (
32 hyperscan_error)
33from ._base import (
34 HS_FLAGS,
35 HyperscanExprDat,
36 HyperscanExprDebug)
37
38
39class HyperscanPsBackend(Backend):
40 """
41 The :class:`HyperscanPsBackend` class is the :module:`hyperscan`
42 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching
43 files. The Hyperscan database uses block mode for matching files.
44 """
45
46 def __init__(
47 self,
48 patterns: Sequence[RegexPattern],
49 *,
50 _debug_exprs: Optional[bool] = None,
51 _test_sort: Optional[Callable[[list], None]] = None,
52 ) -> None:
53 """
54 Initialize the :class:`HyperscanPsBackend` instance.
55
56 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
57 compiled patterns.
58 """
59 if hyperscan is None:
60 raise hyperscan_error
61
62 if not patterns:
63 raise ValueError(f"{patterns=!r} cannot be empty.")
64 elif not isinstance(patterns[0], RegexPattern):
65 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.")
66
67 use_patterns = enumerate_patterns(
68 patterns, filter=True, reverse=False,
69 )
70
71 self._db = self._make_db()
72 """
73 *_db* (:class:`hyperscan.Database`) is the Hyperscan database.
74 """
75
76 self._debug_exprs = bool(_debug_exprs)
77 """
78 *_debug_exprs* (:class:`bool`) is whether to include additional debugging
79 information for the expressions.
80 """
81
82 self._expr_data: list[HyperscanExprDat] = self._init_db(
83 db=self._db,
84 debug=self._debug_exprs,
85 patterns=use_patterns,
86 sort_ids=_test_sort,
87 )
88 """
89 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to
90 expression data (:class:`:class:`HyperscanExprDat`).
91 """
92
93 self._out: tuple[Optional[bool], int] = (None, -1)
94 """
95 *_out* (:class:`tuple`) stores the current match:
96
97 - *0* (:class:`bool` or :data:`None`) is the match include.
98
99 - *1* (:class:`int`) is the match index.
100 """
101
102 self._patterns: dict[int, RegexPattern] = dict(use_patterns)
103 """
104 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern
105 (:class:`RegexPattern`).
106 """
107
108 @staticmethod
109 def _init_db(
110 db: hyperscan.Database,
111 debug: bool,
112 patterns: list[tuple[int, RegexPattern]],
113 sort_ids: Optional[Callable[[list[int]], None]],
114 ) -> list[HyperscanExprDat]:
115 """
116 Initialize the Hyperscan database from the given patterns.
117
118 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
119
120 *debug* (:class:`bool`) is whether to include additional debugging
121 information for the expressions.
122
123 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
124 contains the patterns.
125
126 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
127 the compiled expression ids. This is used during testing to ensure the order
128 of expressions is not accidentally relied on.
129
130 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
131 (:class:`HyperscanExprDat`).
132 """
133 # Prepare patterns.
134 expr_data: list[HyperscanExprDat] = []
135 exprs: list[bytes] = []
136 for pattern_index, pattern in patterns:
137 if pattern.include is None:
138 continue
139
140 # Encode regex.
141 assert isinstance(pattern, RegexPattern), pattern
142 regex = pattern.regex.pattern
143
144 if isinstance(regex, bytes):
145 regex_bytes = regex
146 else:
147 assert isinstance(regex, str), regex
148 regex_bytes = regex.encode('utf8')
149
150 if debug:
151 expr_data.append(HyperscanExprDebug(
152 include=pattern.include,
153 index=pattern_index,
154 is_dir_pattern=False,
155 regex=regex,
156 ))
157 else:
158 expr_data.append(HyperscanExprDat(
159 include=pattern.include,
160 index=pattern_index,
161 is_dir_pattern=False,
162 ))
163
164 exprs.append(regex_bytes)
165
166 # Sort expressions.
167 ids = list(range(len(exprs)))
168 if sort_ids is not None:
169 sort_ids(ids)
170 exprs = [exprs[__id] for __id in ids]
171
172 # Compile patterns.
173 db.compile(
174 expressions=exprs,
175 ids=ids,
176 elements=len(exprs),
177 flags=HS_FLAGS,
178 )
179 return expr_data
180
181 @override
182 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
183 """
184 Check the file against the patterns.
185
186 *file* (:class:`str`) is the normalized file path to check.
187
188 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
189 or :data:`None`), and the index of the last matched pattern (:class:`int` or
190 :data:`None`).
191 """
192 # NOTICE: According to benchmarking, a method callback is 20% faster than
193 # using a closure here.
194 self._out = (None, -1)
195 self._db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
196
197 out_include, out_index = self._out
198 if out_index == -1:
199 out_index = None
200
201 return (out_include, out_index)
202
203 @staticmethod
204 def _make_db() -> hyperscan.Database:
205 """
206 Create the Hyperscan database.
207
208 Returns the database (:class:`hyperscan.Database`).
209 """
210 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)
211
212 def __on_match(
213 self,
214 expr_id: int,
215 _from: int,
216 _to: int,
217 _flags: int,
218 _context: Any,
219 ) -> Optional[bool]:
220 """
221 Called on each match.
222
223 *expr_id* (:class:`int`) is the expression id (index) of the matched
224 pattern.
225 """
226 # Store match.
227 # - WARNING: Hyperscan does not guarantee matches will be produced in order!
228 # Later expressions have higher priority.
229 expr_dat = self._expr_data[expr_id]
230 index = expr_dat.index
231 prev_index = self._out[1]
232 if index > prev_index:
233 self._out = (expr_dat.include, index)