1"""
2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`.
3
4WARNING: The *pathspec._backends.hyperscan* package is not part of the public
5API. Its contents and structure are likely to change.
6"""
7from __future__ import annotations
8
9import itertools
10from collections.abc import (
11 Sequence)
12from typing import (
13 Any,
14 ClassVar,
15 Optional) # Replaced by `X | None` in 3.10.
16
17try:
18 import hyperscan
19except ModuleNotFoundError:
20 hyperscan = None
21
22from ...pattern import (
23 RegexPattern)
24from ..._typing import (
25 override) # Added in 3.12.
26
27from ..base import (
28 Backend)
29from .._utils import (
30 enumerate_patterns)
31
32from .base import (
33 hyperscan_error)
34from ._base import (
35 HyperscanExprDat)
36
37
38class HyperscanPsBackend(Backend):
39 """
40 The :class:`HyperscanPsBackend` class is the :module:`hyperscan`
41 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching
42 files. The Hyperscan database uses block mode for matching files.
43 """
44
45 _reverse_patterns: ClassVar[bool] = False
46 """
47 *_reverse_patterns* (:class:`bool`) is whether the patterns are reversed.
48 """
49
50 def __init__(
51 self,
52 patterns: Sequence[RegexPattern],
53 ) -> None:
54 """
55 Initialize the :class:`HyperscanPsBackend` instance.
56
57 *patterns* (:class:`Sequence` of :class:`.Pattern`) contains the compiled
58 patterns.
59 """
60 if hyperscan is None:
61 raise hyperscan_error
62
63 if not patterns:
64 raise ValueError(f"{patterns=!r} cannot be empty.")
65 elif not isinstance(patterns[0], RegexPattern):
66 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.")
67
68 use_patterns = enumerate_patterns(
69 patterns, filter=True, reverse=self._reverse_patterns,
70 )
71
72 self._db = self._make_db()
73 """
74 *_db* (:class:`hyperscan.Database`) is the Hyperscan database.
75 """
76
77 self._expr_data: list[HyperscanExprDat] = self._init_db(self._db, use_patterns)
78 """
79 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to
80 expression data (:class:`:class:`HyperscanExprDat`).
81 """
82
83 self._out: tuple[Optional[bool], Optional[int]] = (None, None)
84 """
85 *_out* (:class:`tuple`) stores the current match.
86 """
87
88 self._patterns: dict[int, RegexPattern] = dict(use_patterns)
89 """
90 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern
91 (:class:`RegexPattern`).
92 """
93
94 @staticmethod
95 def _init_db(
96 db: hyperscan.Database,
97 patterns: list[tuple[int, RegexPattern]],
98 ) -> list[HyperscanExprDat]:
99 """
100 Initialize the Hyperscan database from the given patterns.
101
102 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database.
103
104 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`)
105 contains the patterns.
106
107 Returns a :class:`list` indexed by expression id (:class:`int`) to its data
108 (:class:`HyperscanExprDat`).
109 """
110 # Prepare patterns.
111 expr_data: list[HyperscanExprDat] = []
112 exprs: list[bytes] = []
113 id_counter = itertools.count(0)
114 ids: list[int] = []
115 for pattern_index, pattern in patterns:
116 if pattern.include is None:
117 continue
118
119 # Encode regex.
120 assert isinstance(pattern, RegexPattern), pattern
121 regex = pattern.regex.pattern
122
123 if isinstance(regex, bytes):
124 regex_bytes = regex
125 else:
126 assert isinstance(regex, str), regex
127 regex_bytes = regex.encode('utf8')
128
129 expr_data.append(HyperscanExprDat(
130 include=pattern.include,
131 index=pattern_index,
132 is_dir_pattern=False,
133 ))
134 exprs.append(regex_bytes)
135 ids.append(next(id_counter))
136
137 # Compile patterns.
138 db.compile(
139 expressions=exprs,
140 ids=ids,
141 elements=len(exprs),
142 flags=hyperscan.HS_FLAG_UTF8,
143 )
144 return expr_data
145
146 @override
147 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]:
148 """
149 Check the file against the patterns.
150
151 *file* (:class:`str`) is the normalized file path to check.
152
153 Returns a :class:`tuple` containing whether to include *file* (:class:`bool`
154 or :data:`None`), and the index of the last matched pattern (:class:`int` or
155 :data:`None`).
156 """
157 # NOTICE: According to benchmarking, a method callback is 20% faster than
158 # using a closure here.
159 self._out = (None, None)
160 self._db.scan(file.encode('utf8'), match_event_handler=self.__on_match)
161 return self._out
162
163 @staticmethod
164 def _make_db() -> hyperscan.Database:
165 """
166 Create the Hyperscan database.
167
168 Returns the database (:class:`hyperscan.Database`).
169 """
170 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK)
171
172 def __on_match(
173 self,
174 expr_id: int,
175 _from: int,
176 _to: int,
177 _flags: int,
178 _context: Any,
179 ) -> Optional[bool]:
180 """
181 Called on each match.
182
183 *expr_id* (:class:`int`) is the expression id (index) of the matched
184 pattern.
185 """
186 expr_dat = self._expr_data[expr_id]
187
188 # Store match.
189 self._out = (expr_dat.include, expr_dat.index)