1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.gitignore.GitIgnoreSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9from collections.abc import (
10 Sequence)
11from typing import (
12 Any,
13 Callable, # Replaced by `collections.abc.Callable` in 3.9.2.
14 Optional, # Replaced by `X | None` in 3.10.
15 Union) # Replaced by `X | Y` in 3.10.
16
17try:
18 import hyperscan
19except ModuleNotFoundError:
20 hyperscan = None # type: ignore[assignment]
21
22from pathspec.pattern import (
23 RegexPattern)
24from pathspec.patterns.gitignore.spec import (
25 GitIgnoreSpecPattern,
26 _BYTES_ENCODING,
27 _DIR_MARK_CG,
28 _DIR_MARK_OPT)
29from pathspec._typing import (
30 override) # Added in 3.12.
31
32from ._base import (
33 HS_FLAGS,
34 HyperscanExprDat,
35 HyperscanExprDebug)
36from .pathspec import (
37 HyperscanPsBackend)
38
39
40class HyperscanGiBackend(HyperscanPsBackend):
41 """
42 The :class:`HyperscanGiBackend` class is the :module:`hyperscan`
43 implementation used by :class:`~pathspec.gitignore.GitIgnoreSpec`. The
44 Hyperscan database uses block mode for matching files.
45 """
46
47 # Change type hint.
48 _out: tuple[Optional[bool], int, int] # type: ignore[assignment]
49
50 def __init__(
51 self,
52 patterns: Sequence[RegexPattern],
53 *,
54 _debug_exprs: Optional[bool] = None,
55 _test_sort: Optional[Callable[[list], None]] = None,
56 ) -> None:
57 """
58 Initialize the :class:`HyperscanMatcher` instance.
59
60 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
61 compiled patterns.
62 """
63 super().__init__(patterns, _debug_exprs=_debug_exprs, _test_sort=_test_sort)
64
65 self._out = (None, -1, 0)
66 """
67 *_out* (:class:`tuple`) stores the current match:
68
69 - *0* (:class:`bool` or :data:`None`) is the match include.
70
71 - *1* (:class:`int`) is the match index.
72
73 - *2* (:class:`int`) is the match priority.
74 """
75
76 @override
77 @staticmethod
78 def _init_db(
79 db: hyperscan.Database, # type: ignore
80 debug: bool,
81 patterns: list[tuple[int, RegexPattern]],
82 sort_ids: Optional[Callable[[list[int]], None]],
83 ) -> list[HyperscanExprDat]:
84 """
85 Create the Hyperscan database from the given patterns.
86
87 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
88
89 *debug* (:class:`bool`) is whether to include additional debugging
90 information for the expressions.
91
92 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
93 contains the patterns.
94
95 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
96 the compiled expression ids. This is used during testing to ensure the order
97 of expressions is not accidentally relied on.
98
99 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
100 (:class:`HyperscanExprDat`).
101 """
102 # WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with
103 # zero elements.
104 assert patterns, patterns
105
106 # Prepare patterns.
107 expr_data: list[HyperscanExprDat] = []
108 exprs: list[bytes] = []
109 for pattern_index, pattern in patterns:
110 assert pattern.include is not None, (pattern_index, pattern)
111 assert pattern.regex is not None, (pattern_index, pattern)
112
113 # Encode regex.
114 assert isinstance(pattern, RegexPattern), pattern
115 regex = pattern.regex.pattern
116
117 use_regexes: list[tuple[Union[str, bytes], bool]] = []
118 if isinstance(pattern, GitIgnoreSpecPattern):
119 # GitIgnoreSpecPattern uses capture groups for its directory marker but
120 # Hyperscan does not support capture groups. Handle this scenario.
121 regex_str: str
122 if isinstance(regex, str):
123 regex_str = regex
124 else:
125 assert isinstance(regex, bytes), regex
126 regex_str = regex.decode(_BYTES_ENCODING)
127
128 if _DIR_MARK_CG in regex_str:
129 # Found directory marker.
130 if regex_str.endswith(_DIR_MARK_OPT):
131 # Regex has optional directory marker. Split regex into directory
132 # and file variants.
133 base_regex = regex_str[:-len(_DIR_MARK_OPT)]
134 use_regexes.append((f'{base_regex}/', True))
135 use_regexes.append((f'{base_regex}$', False))
136 else:
137 # Remove capture group.
138 base_regex = regex_str.replace(_DIR_MARK_CG, '/')
139 use_regexes.append((base_regex, True))
140
141 if not use_regexes:
142 # No special case for regex.
143 use_regexes.append((regex, False))
144
145 for regex, is_dir_pattern in use_regexes:
146 if isinstance(regex, bytes):
147 regex_bytes = regex
148 else:
149 assert isinstance(regex, str), regex
150 regex_bytes = regex.encode('utf8')
151
152 if debug:
153 expr_data.append(HyperscanExprDebug(
154 include=pattern.include,
155 index=pattern_index,
156 is_dir_pattern=is_dir_pattern,
157 regex=regex,
158 ))
159 else:
160 expr_data.append(HyperscanExprDat(
161 include=pattern.include,
162 index=pattern_index,
163 is_dir_pattern=is_dir_pattern,
164 ))
165
166 exprs.append(regex_bytes)
167
168 # Sort expressions.
169 ids = list(range(len(exprs)))
170 if sort_ids is not None:
171 sort_ids(ids)
172 exprs = [exprs[__id] for __id in ids]
173
174 # Compile patterns.
175 db.compile(
176 expressions=exprs,
177 ids=ids,
178 elements=len(exprs),
179 flags=HS_FLAGS,
180 )
181 return expr_data
182
183 @override
184 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
185 """
186 Check the file against the patterns.
187
188 *file* (:class:`str`) is the normalized file path to check.
189
190 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
191 or :data:`None`), and the index of the last matched pattern (:class:`int` or
192 :data:`None`).
193 """
194 # NOTICE: According to benchmarking, a method callback is 13% faster than
195 # using a closure here.
196 db = self._db
197 if db is None:
198 # Database was not initialized because there were no patterns. Return no
199 # match.
200 return (None, None)
201
202 self._out = (None, -1, 0)
203 db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
204
205 out_index: Optional[int]
206 out_include, out_index = self._out[:2]
207 if out_index == -1:
208 out_index = None
209
210 return (out_include, out_index)
211
212 @override
213 def __on_match(
214 self,
215 expr_id: int,
216 _from: int,
217 _to: int,
218 _flags: int,
219 _context: Any,
220 ) -> Optional[bool]:
221 """
222 Called on each match.
223
224 *expr_id* (:class:`int`) is the expression id (index) of the matched
225 pattern.
226 """
227 expr_dat = self._expr_data[expr_id]
228
229 is_dir_pattern = expr_dat.is_dir_pattern
230 if is_dir_pattern:
231 # Pattern matched by a directory pattern.
232 priority = 1
233 else:
234 # Pattern matched by a file pattern.
235 priority = 2
236
237 # WARNING: Hyperscan does not guarantee matches will be produced in order!
238 include = expr_dat.include
239 index = expr_dat.index
240 prev_index = self._out[1]
241 prev_priority = self._out[2]
242 if (
243 (include and is_dir_pattern and index > prev_index)
244 or (priority == prev_priority and index > prev_index)
245 or priority > prev_priority
246 ):
247 out_tup = (include, expr_dat.index, priority)
248 self._out = out_tup # type: ignore
249
250 return None