Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/module_loading/file_discovery.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File discovery utilities for finding files while respecting ignore patterns.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23import os 

24import re 

25from collections.abc import Generator 

26from pathlib import Path 

27from re import Pattern 

28from typing import NamedTuple, Protocol 

29 

30from pathspec.patterns import GitWildMatchPattern 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class _IgnoreRule(Protocol): 

36 """Interface for ignore rules for structural subtyping.""" 

37 

38 @staticmethod 

39 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

40 """ 

41 Build an ignore rule from the supplied pattern. 

42 

43 ``base_dir`` and ``definition_file`` should be absolute paths. 

44 """ 

45 

46 @staticmethod 

47 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

48 """Match a candidate absolute path against a list of rules.""" 

49 

50 

51class _RegexpIgnoreRule(NamedTuple): 

52 """Typed namedtuple with utility functions for regexp ignore rules.""" 

53 

54 pattern: Pattern 

55 base_dir: Path 

56 

57 @staticmethod 

58 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

59 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid.""" 

60 try: 

61 return _RegexpIgnoreRule(re.compile(pattern), base_dir) 

62 except re.error as e: 

63 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) 

64 return None 

65 

66 @staticmethod 

67 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

68 """Match a list of ignore rules against the supplied path.""" 

69 for rule in rules: 

70 if not isinstance(rule, _RegexpIgnoreRule): 

71 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") 

72 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: 

73 return True 

74 return False 

75 

76 

77class _GlobIgnoreRule(NamedTuple): 

78 """Typed namedtuple with utility functions for glob ignore rules.""" 

79 

80 wild_match_pattern: GitWildMatchPattern 

81 relative_to: Path | None = None 

82 dir_only: bool = False 

83 

84 @staticmethod 

85 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

86 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid.""" 

87 relative_to: Path | None = None 

88 if pattern.strip() == "/": 

89 # "/" doesn't match anything in gitignore 

90 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) 

91 return None 

92 if pattern.startswith("/") or "/" in pattern.rstrip("/"): 

93 # See https://git-scm.com/docs/gitignore 

94 # > If there is a separator at the beginning or middle (or both) of the pattern, then the 

95 # > pattern is relative to the directory level of the particular .gitignore file itself. 

96 # > Otherwise the pattern may also match at any level below the .gitignore level. 

97 relative_to = definition_file.parent 

98 

99 # See https://git-scm.com/docs/gitignore 

100 # > If there is a separator at the end of the pattern then the pattern will only match 

101 # > directories, otherwise the pattern can match both files and directories. 

102 # Strip the negation prefix before checking for trailing separator. 

103 raw_pattern = pattern.lstrip("!") 

104 dir_only = raw_pattern.rstrip() != raw_pattern.rstrip().rstrip("/") 

105 

106 ignore_pattern = GitWildMatchPattern(pattern) 

107 return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to, dir_only=dir_only) 

108 

109 @staticmethod 

110 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

111 """Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering.""" 

112 matched = False 

113 for rule in rules: 

114 if not isinstance(rule, _GlobIgnoreRule): 

115 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}") 

116 # See https://git-scm.com/docs/gitignore 

117 # > If there is a separator at the end of the pattern then the pattern will only match 

118 # > directories, otherwise the pattern can match both files and directories. 

119 is_dir = path.is_dir() 

120 if rule.dir_only and not is_dir: 

121 continue 

122 rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name) 

123 if is_dir: 

124 rel_path = f"{rel_obj.as_posix()}/" 

125 else: 

126 rel_path = rel_obj.as_posix() 

127 if ( 

128 rule.wild_match_pattern.include is not None 

129 and rule.wild_match_pattern.match_file(rel_path) is not None 

130 ): 

131 matched = rule.wild_match_pattern.include 

132 

133 return matched 

134 

135 

136def _find_path_from_directory( 

137 base_dir_path: str | os.PathLike[str], 

138 ignore_file_name: str, 

139 ignore_rule_type: type[_IgnoreRule], 

140) -> Generator[str, None, None]: 

141 """ 

142 Recursively search the base path and return the list of file paths that should not be ignored. 

143 

144 :param base_dir_path: the base path to be searched 

145 :param ignore_file_name: the file name containing regular expressions for files that should be ignored. 

146 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. 

147 

148 :return: a generator of file paths which should not be ignored. 

149 """ 

150 # A Dict of patterns, keyed using resolved, absolute paths 

151 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} 

152 

153 for root, dirs, files in os.walk(base_dir_path, followlinks=True): 

154 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) 

155 

156 ignore_file_path = Path(root) / ignore_file_name 

157 if ignore_file_path.is_file(): 

158 with open(ignore_file_path) as ifile: 

159 patterns_to_match_excluding_comments = [ 

160 re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n") 

161 ] 

162 # append new patterns and filter out "None" objects, which are invalid patterns 

163 patterns += [ 

164 p 

165 for p in [ 

166 ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path) 

167 for pattern in patterns_to_match_excluding_comments 

168 if pattern 

169 ] 

170 if p is not None 

171 ] 

172 # evaluation order of patterns is important with negation 

173 # so that later patterns can override earlier patterns 

174 

175 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] 

176 # explicit loop for infinite recursion detection since we are following symlinks in this walk 

177 for sd in dirs: 

178 dirpath = (Path(root) / sd).resolve() 

179 if dirpath in patterns_by_dir: 

180 raise RuntimeError( 

181 "Detected recursive loop when walking DAG directory " 

182 f"{base_dir_path}: {dirpath} has appeared more than once." 

183 ) 

184 patterns_by_dir.update({dirpath: patterns.copy()}) 

185 

186 for file in files: 

187 if file != ignore_file_name: 

188 abs_file_path = Path(root) / file 

189 if not ignore_rule_type.match(abs_file_path, patterns): 

190 yield str(abs_file_path) 

191 

192 

193def find_path_from_directory( 

194 base_dir_path: str | os.PathLike[str], 

195 ignore_file_name: str, 

196 ignore_file_syntax: str = "glob", 

197) -> Generator[str, None, None]: 

198 """ 

199 Recursively search the base path for a list of file paths that should not be ignored. 

200 

201 :param base_dir_path: the base path to be searched 

202 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored 

203 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob (default: glob) 

204 

205 :return: a generator of file paths. 

206 """ 

207 if ignore_file_syntax == "glob" or not ignore_file_syntax: 

208 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) 

209 if ignore_file_syntax == "regexp": 

210 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) 

211 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")