Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pathspec/_backends/hyperscan/gitignore.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

75 statements  

1""" 

2This module provides the :module:`hyperscan` backend for :class:`~pathspec.gitignore.GitIgnoreSpec`. 

3 

4WARNING: The *pathspec._backends.hyperscan* package is not part of the public 

5API. Its contents and structure are likely to change. 

6""" 

7from __future__ import annotations 

8 

9import itertools 

10from collections.abc import ( 

11 Sequence) 

12from typing import ( 

13 Any, 

14 Optional, # Replaced by `X | None` in 3.10. 

15 Union) # Replaced by `X | Y` in 3.10. 

16 

17try: 

18 import hyperscan 

19except ModuleNotFoundError: 

20 hyperscan = None 

21 

22from ...pattern import ( 

23 RegexPattern) 

24from ...patterns.gitwildmatch import ( 

25 GitWildMatchPattern, 

26 _BYTES_ENCODING, 

27 _DIR_MARK) 

28from ..._typing import ( 

29 override) # Added in 3.12. 

30 

31from ._base import ( 

32 HyperscanExprDat) 

33from .pathspec import ( 

34 HyperscanPsBackend) 

35 

36_DIR_MARK_CG = f'(?P<{_DIR_MARK}>/)' 

37""" 

38This regular expression matches the directory marker. 

39""" 

40 

41_DIR_MARK_OPT = f'(?:{_DIR_MARK_CG}.*)?$' 

42""" 

43This regular expression matches the optional directory marker and sub-path. 

44""" 

45 

46 

47class HyperscanGiBackend(HyperscanPsBackend): 

48 """ 

49 The :class:`HyperscanGiBackend` class is the :module:`hyperscan` 

50 implementation used by :class:`~pathspec.gitignore.GitIgnoreSpec`. The 

51 Hyperscan database uses block mode for matching files. 

52 """ 

53 

54 # Change type hint. 

55 _out: tuple[Optional[bool], Optional[int], int] 

56 

57 def __init__(self, patterns: Sequence[RegexPattern]) -> None: 

58 """ 

59 Initialize the :class:`HyperscanMatcher` instance. 

60 

61 *patterns* (:class:`Sequence` of :class:`.Pattern`) contains the compiled 

62 patterns. 

63 """ 

64 super().__init__(patterns) 

65 self._out = (None, None, 0) 

66 

67 @override 

68 @staticmethod 

69 def _init_db( 

70 db: hyperscan.Database, 

71 patterns: list[tuple[int, RegexPattern]], 

72 ) -> list[HyperscanExprDat]: 

73 """ 

74 Create the Hyperscan database from the given patterns. 

75 

76 *db* (:class:`hyperscan.Hyperscan`) is the Hyperscan database. 

77 

78 *patterns* (:class:`~collections.abc.Sequence` of :class:`.RegexPattern`) 

79 contains the patterns. 

80 

81 Returns a :class:`list` indexed by expression id (:class:`int`) to its data 

82 (:class:`HyperscanExprDat`). 

83 """ 

84 # Prepare patterns. 

85 expr_data: list[HyperscanExprDat] = [] 

86 exprs: list[bytes] = [] 

87 id_counter = itertools.count(0) 

88 ids: list[int] = [] 

89 for pattern_index, pattern in patterns: 

90 if pattern.include is None: 

91 continue 

92 

93 # Encode regex. 

94 assert isinstance(pattern, RegexPattern), pattern 

95 regex = pattern.regex.pattern 

96 

97 use_regexes: list[tuple[Union[str, bytes], bool]] = [] 

98 if isinstance(pattern, GitWildMatchPattern): 

99 # GitWildMatch uses capture groups for its directory marker but 

100 # Hyperscan does not support capture groups. Check for this scenario. 

101 if isinstance(regex, str): 

102 regex_str = regex 

103 else: 

104 assert isinstance(regex, bytes), regex 

105 regex_str = regex.decode(_BYTES_ENCODING) 

106 

107 if _DIR_MARK_CG in regex_str: 

108 # Found directory marker. 

109 if regex_str.endswith(_DIR_MARK_OPT): 

110 # Regex has optional directory marker. Split regex into directory 

111 # and file variants. 

112 base_regex = regex_str[:-len(_DIR_MARK_OPT)] 

113 use_regexes.append((f'{base_regex}/.*$', True)) 

114 use_regexes.append((f'{base_regex}$', False)) 

115 else: 

116 # Remove capture group. 

117 base_regex = regex_str.replace(_DIR_MARK_CG, '/') 

118 use_regexes.append((base_regex, True)) 

119 

120 if not use_regexes: 

121 # No special case for regex. 

122 use_regexes.append((regex, False)) 

123 

124 for regex, is_dir_pattern in use_regexes: 

125 if isinstance(regex, bytes): 

126 regex_bytes = regex 

127 else: 

128 assert isinstance(regex, str), regex 

129 regex_bytes = regex.encode('utf8') 

130 

131 expr_data.append(HyperscanExprDat( 

132 include=pattern.include, 

133 index=pattern_index, 

134 is_dir_pattern=is_dir_pattern, 

135 )) 

136 exprs.append(regex_bytes) 

137 ids.append(next(id_counter)) 

138 

139 # Compile patterns. 

140 db.compile( 

141 expressions=exprs, 

142 ids=ids, 

143 elements=len(exprs), 

144 flags=hyperscan.HS_FLAG_UTF8, 

145 ) 

146 return expr_data 

147 

148 @override 

149 def match_file(self, file: str) -> tuple[Optional[bool], Optional[int]]: 

150 """ 

151 Check the file against the patterns. 

152 

153 *file* (:class:`str`) is the normalized file path to check. 

154 

155 Returns a :class:`tuple` containing whether to include *file* (:class:`bool` 

156 or :data:`None`), and the index of the last matched pattern (:class:`int` or 

157 :data:`None`). 

158 """ 

159 # NOTICE: According to benchmarking, a method callback is 13% faster than 

160 # using a closure here. 

161 self._out = (None, None, 0) 

162 self._db.scan(file.encode('utf8'), match_event_handler=self.__on_match) 

163 return self._out[:2] 

164 

165 @override 

166 def __on_match( 

167 self, 

168 expr_id: int, 

169 _from: int, 

170 _to: int, 

171 _flags: int, 

172 _context: Any, 

173 ) -> Optional[bool]: 

174 """ 

175 Called on each match. 

176 

177 *expr_id* (:class:`int`) is the expression id (index) of the matched 

178 pattern. 

179 """ 

180 expr_dat = self._expr_data[expr_id] 

181 

182 is_dir_pattern = expr_dat.is_dir_pattern 

183 if is_dir_pattern: 

184 # Pattern matched by a directory pattern. 

185 priority = 1 

186 else: 

187 # Pattern matched by a file pattern. 

188 priority = 2 

189 

190 include = expr_dat.include 

191 if (include and is_dir_pattern) or priority >= self._out[2]: 

192 self._out = (include, expr_dat.index, priority)