Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathspec/_backends/hyperscan/pathspec.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1""" 

2This module provides the :module:`hyperscan` backend for :class:`~pathspec.pathspec.PathSpec`. 

3 

4WARNING: The *pathspec._backends.hyperscan* package is not part of the public 

5API. Its contents and structure are likely to change. 

6""" 

7from __future__ import annotations 

8 

9import itertools 

10from collections.abc import ( 

11 Sequence) 

12from typing import ( 

13 Any, 

14 ClassVar, 

15 Optional) # Replaced by `X | None` in 3.10. 

16 

17try: 

18 import hyperscan 

19except ModuleNotFoundError: 

20 hyperscan = None 

21 

22from ...pattern import ( 

23 RegexPattern) 

24from ..._typing import ( 

25 override) # Added in 3.12. 

26 

27from ..base import ( 

28 Backend) 

29from .._utils import ( 

30 enumerate_patterns) 

31 

32from .base import ( 

33 hyperscan_error) 

34from ._base import ( 

35 HyperscanExprDat) 

36 

37 

38class HyperscanPsBackend(Backend): 

39 """ 

40 The :class:`HyperscanPsBackend` class is the :module:`hyperscan` 

41 implementation used by :class:`~pathspec.pathspec.PathSpec` for matching 

42 files. The Hyperscan database uses block mode for matching files. 

43 """ 

44 

45 _reverse_patterns: ClassVar[bool] = False 

46 """ 

47 *_reverse_patterns* (:class:`bool`) is whether the patterns are reversed. 

48 """ 

49 

50 def __init__( 

51 self, 

52 patterns: Sequence[RegexPattern], 

53 ) -> None: 

54 """ 

55 Initialize the :class:`HyperscanPsBackend` instance. 

56 

57 *patterns* (:class:`Sequence` of :class:`.Pattern`) contains the compiled 

58 patterns. 

59 """ 

60 if hyperscan is None: 

61 raise hyperscan_error 

62 

63 if not patterns: 

64 raise ValueError(f"{patterns=!r} cannot be empty.") 

65 elif not isinstance(patterns[0], RegexPattern): 

66 raise TypeError(f"{patterns[0]=!r} must be a RegexPattern.") 

67 

68 use_patterns = enumerate_patterns( 

69 patterns, filter=True, reverse=self._reverse_patterns, 

70 ) 

71 

72 self._db = self._make_db() 

73 """ 

74 *_db* (:class:`hyperscan.Database`) is the Hyperscan database. 

75 """ 

76 

77 self._expr_data: list[HyperscanExprDat] = self._init_db(self._db, use_patterns) 

78 """ 

79 *_expr_data* (:class:`list`) maps expression index (:class:`int`) to 

80 expression data (:class:`:class:`HyperscanExprDat`). 

81 """ 

82 

83 self._out: tuple[Optional[bool], Optional[int]] = (None, None) 

84 """ 

85 *_out* (:class:`tuple`) stores the current match. 

86 """ 

87 

88 self._patterns: dict[int, RegexPattern] = dict(use_patterns) 

89 """ 

90 *_patterns* (:class:`dict`) maps pattern index (:class:`int`) to pattern 

91 (:class:`RegexPattern`). 

92 """ 

93 

94 @staticmethod 

95 def _init_db( 

96 db: hyperscan.Database, 

97 patterns: list[tuple[int, RegexPattern]], 

98 ) -> list[HyperscanExprDat]: 

99 """ 

100 Initialize the Hyperscan database from the given patterns. 

101 

102 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database. 

103 

104 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`) 

105 contains the patterns. 

106 

107 Returns a :class:`list` indexed by expression id (:class:`int`) to its data 

108 (:class:`HyperscanExprDat`). 

109 """ 

110 # Prepare patterns. 

111 expr_data: list[HyperscanExprDat] = [] 

112 exprs: list[bytes] = [] 

113 id_counter = itertools.count(0) 

114 ids: list[int] = [] 

115 for pattern_index, pattern in patterns: 

116 if pattern.include is None: 

117 continue 

118 

119 # Encode regex. 

120 assert isinstance(pattern, RegexPattern), pattern 

121 regex = pattern.regex.pattern 

122 

123 if isinstance(regex, bytes): 

124 regex_bytes = regex 

125 else: 

126 assert isinstance(regex, str), regex 

127 regex_bytes = regex.encode('utf8') 

128 

129 expr_data.append(HyperscanExprDat( 

130 include=pattern.include, 

131 index=pattern_index, 

132 is_dir_pattern=False, 

133 )) 

134 exprs.append(regex_bytes) 

135 ids.append(next(id_counter)) 

136 

137 # Compile patterns. 

138 db.compile( 

139 expressions=exprs, 

140 ids=ids, 

141 elements=len(exprs), 

142 flags=hyperscan.HS_FLAG_UTF8, 

143 ) 

144 return expr_data 

145 

146 @override 

147 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]: 

148 """ 

149 Check the file against the patterns. 

150 

151 *file* (:class:`str`) is the normalized file path to check. 

152 

153 Returns a :class:`tuple` containing whether to include *file* (:class:`bool` 

154 or :data:`None`), and the index of the last matched pattern (:class:`int` or 

155 :data:`None`). 

156 """ 

157 # NOTICE: According to benchmarking, a method callback is 20% faster than 

158 # using a closure here. 

159 self._out = (None, None) 

160 self._db.scan(file.encode('utf8'), match_event_handler=self.__on_match) 

161 return self._out 

162 

163 @staticmethod 

164 def _make_db() -> hyperscan.Database: 

165 """ 

166 Create the Hyperscan database. 

167 

168 Returns the database (:class:`hyperscan.Database`). 

169 """ 

170 return hyperscan.Database(mode=hyperscan.HS_MODE_BLOCK) 

171 

172 def __on_match( 

173 self, 

174 expr_id: int, 

175 _from: int, 

176 _to: int, 

177 _flags: int, 

178 _context: Any, 

179 ) -> Optional[bool]: 

180 """ 

181 Called on each match. 

182 

183 *expr_id* (:class:`int`) is the expression id (index) of the matched 

184 pattern. 

185 """ 

186 expr_dat = self._expr_data[expr_id] 

187 

188 # Store match. 

189 self._out = (expr_dat.include, expr_dat.index)