1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.gitignore.GitIgnoreSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9from collections.abc import (
10 Sequence)
11from typing import (
12 Any,
13 Callable, # Replaced by `collections.abc.Callable` in 3.9.2.
14 Optional, # Replaced by `X | None` in 3.10.
15 Union) # Replaced by `X | Y` in 3.10.
16
17try:
18 import hyperscan
19except ModuleNotFoundError:
20 hyperscan = None
21
22from pathspec.pattern import (
23 RegexPattern)
24from pathspec.patterns.gitignore.spec import (
25 GitIgnoreSpecPattern,
26 _BYTES_ENCODING,
27 _DIR_MARK_CG,
28 _DIR_MARK_OPT)
29from pathspec._typing import (
30 override) # Added in 3.12.
31
32from ._base import (
33 HS_FLAGS,
34 HyperscanExprDat,
35 HyperscanExprDebug)
36from .pathspec import (
37 HyperscanPsBackend)
38
39
40class HyperscanGiBackend(HyperscanPsBackend):
41 """
42 The :class:`HyperscanGiBackend` class is the :module:`hyperscan`
43 implementation used by :class:`~pathspec.gitignore.GitIgnoreSpec`. The
44 Hyperscan database uses block mode for matching files.
45 """
46
47 # Change type hint.
48 _out: tuple[Optional[bool], int, int]
49
50 def __init__(
51 self,
52 patterns: Sequence[RegexPattern],
53 *,
54 _debug_exprs: Optional[bool] = None,
55 _test_sort: Optional[Callable[[list], None]] = None,
56 ) -> None:
57 """
58 Initialize the :class:`HyperscanMatcher` instance.
59
60 *patterns* (:class:`Sequence` of :class:`.RegexPattern`) contains the
61 compiled patterns.
62 """
63 super().__init__(patterns, _debug_exprs=_debug_exprs, _test_sort=_test_sort)
64
65 self._out = (None, -1, 0)
66 """
67 *_out* (:class:`tuple`) stores the current match:
68
69 - *0* (:class:`bool` or :data:`None`) is the match include.
70
71 - *1* (:class:`int`) is the match index.
72
73 - *2* (:class:`int`) is the match priority.
74 """
75
76 @override
77 @staticmethod
78 def _init_db(
79 db: hyperscan.Database,
80 debug: bool,
81 patterns: list[tuple[int, RegexPattern]],
82 sort_ids: Optional[Callable[[list[int]], None]],
83 ) -> list[HyperscanExprDat]:
84 """
85 Create the Hyperscan database from the given patterns.
86
87 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
88
89 *debug* (:class:`bool`) is whether to include additional debugging
90 information for the expressions.
91
92 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
93 contains the patterns.
94
95 *sort_ids* (:class:`callable` or :data:`None`) is a function used to sort
96 the compiled expression ids. This is used during testing to ensure the order
97 of expressions is not accidentally relied on.
98
99 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
100 (:class:`HyperscanExprDat`).
101 """
102 # WARNING: Hyperscan raises a `hyperscan.error` exception when compiled with
103 # zero elements.
104 assert patterns, patterns
105
106 # Prepare patterns.
107 expr_data: list[HyperscanExprDat] = []
108 exprs: list[bytes] = []
109 for pattern_index, pattern in patterns:
110 assert pattern.include is not None, (pattern_index, pattern)
111
112 # Encode regex.
113 assert isinstance(pattern, RegexPattern), pattern
114 regex = pattern.regex.pattern
115
116 use_regexes: list[tuple[Union[str, bytes], bool]] = []
117 if isinstance(pattern, GitIgnoreSpecPattern):
118 # GitIgnoreSpecPattern uses capture groups for its directory marker but
119 # Hyperscan does not support capture groups. Handle this scenario.
120 regex_str: str
121 if isinstance(regex, str):
122 regex_str: str = regex
123 else:
124 assert isinstance(regex, bytes), regex
125 regex_str = regex.decode(_BYTES_ENCODING)
126
127 if _DIR_MARK_CG in regex_str:
128 # Found directory marker.
129 if regex_str.endswith(_DIR_MARK_OPT):
130 # Regex has optional directory marker. Split regex into directory
131 # and file variants.
132 base_regex = regex_str[:-len(_DIR_MARK_OPT)]
133 use_regexes.append((f'{base_regex}/', True))
134 use_regexes.append((f'{base_regex}$', False))
135 else:
136 # Remove capture group.
137 base_regex = regex_str.replace(_DIR_MARK_CG, '/')
138 use_regexes.append((base_regex, True))
139
140 if not use_regexes:
141 # No special case for regex.
142 use_regexes.append((regex, False))
143
144 for regex, is_dir_pattern in use_regexes:
145 if isinstance(regex, bytes):
146 regex_bytes = regex
147 else:
148 assert isinstance(regex, str), regex
149 regex_bytes = regex.encode('utf8')
150
151 if debug:
152 expr_data.append(HyperscanExprDebug(
153 include=pattern.include,
154 index=pattern_index,
155 is_dir_pattern=is_dir_pattern,
156 regex=regex,
157 ))
158 else:
159 expr_data.append(HyperscanExprDat(
160 include=pattern.include,
161 index=pattern_index,
162 is_dir_pattern=is_dir_pattern,
163 ))
164
165 exprs.append(regex_bytes)
166
167 # Sort expressions.
168 ids = list(range(len(exprs)))
169 if sort_ids is not None:
170 sort_ids(ids)
171 exprs = [exprs[__id] for __id in ids]
172
173 # Compile patterns.
174 db.compile(
175 expressions=exprs,
176 ids=ids,
177 elements=len(exprs),
178 flags=HS_FLAGS,
179 )
180 return expr_data
181
182 @override
183 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
184 """
185 Check the file against the patterns.
186
187 *file* (:class:`str`) is the normalized file path to check.
188
189 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
190 or :data:`None`), and the index of the last matched pattern (:class:`int` or
191 :data:`None`).
192 """
193 # NOTICE: According to benchmarking, a method callback is 13% faster than
194 # using a closure here.
195 db = self._db
196 if self._db is None:
197 # Database was not initialized because there were no patterns. Return no
198 # match.
199 return (None, None)
200
201 self._out = (None, -1, 0)
202 db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
203
204 out_include, out_index = self._out[:2]
205 if out_index == -1:
206 out_index = None
207
208 return (out_include, out_index)
209
210 @override
211 def __on_match(
212 self,
213 expr_id: int,
214 _from: int,
215 _to: int,
216 _flags: int,
217 _context: Any,
218 ) -> Optional[bool]:
219 """
220 Called on each match.
221
222 *expr_id* (:class:`int`) is the expression id (index) of the matched
223 pattern.
224 """
225 expr_dat = self._expr_data[expr_id]
226
227 is_dir_pattern = expr_dat.is_dir_pattern
228 if is_dir_pattern:
229 # Pattern matched by a directory pattern.
230 priority = 1
231 else:
232 # Pattern matched by a file pattern.
233 priority = 2
234
235 # WARNING: Hyperscan does not guarantee matches will be produced in order!
236 include = expr_dat.include
237 index = expr_dat.index
238 prev_index = self._out[1]
239 prev_priority = self._out[2]
240 if (
241 (include and is_dir_pattern and index > prev_index)
242 or (priority == prev_priority and index > prev_index)
243 or priority > prev_priority
244 ):
245 self._out = (include, expr_dat.index, priority)