Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/module_loading/file_discovery.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File discovery utilities for finding files while respecting ignore patterns.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23import os 

24import re 

25from collections.abc import Generator 

26from pathlib import Path 

27from re import Pattern 

28from typing import NamedTuple, Protocol 

29 

30from pathspec.patterns import GitWildMatchPattern 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class _IgnoreRule(Protocol): 

36 """Interface for ignore rules for structural subtyping.""" 

37 

38 @staticmethod 

39 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

40 """ 

41 Build an ignore rule from the supplied pattern. 

42 

43 ``base_dir`` and ``definition_file`` should be absolute paths. 

44 """ 

45 

46 @staticmethod 

47 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

48 """Match a candidate absolute path against a list of rules.""" 

49 

50 

51class _RegexpIgnoreRule(NamedTuple): 

52 """Typed namedtuple with utility functions for regexp ignore rules.""" 

53 

54 pattern: Pattern 

55 base_dir: Path 

56 

57 @staticmethod 

58 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

59 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid.""" 

60 try: 

61 return _RegexpIgnoreRule(re.compile(pattern), base_dir) 

62 except re.error as e: 

63 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) 

64 return None 

65 

66 @staticmethod 

67 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

68 """Match a list of ignore rules against the supplied path.""" 

69 for rule in rules: 

70 if not isinstance(rule, _RegexpIgnoreRule): 

71 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") 

72 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: 

73 return True 

74 return False 

75 

76 

77class _GlobIgnoreRule(NamedTuple): 

78 """Typed namedtuple with utility functions for glob ignore rules.""" 

79 

80 wild_match_pattern: GitWildMatchPattern 

81 relative_to: Path | None = None 

82 

83 @staticmethod 

84 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

85 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid.""" 

86 relative_to: Path | None = None 

87 if pattern.strip() == "/": 

88 # "/" doesn't match anything in gitignore 

89 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) 

90 return None 

91 if pattern.startswith("/") or "/" in pattern.rstrip("/"): 

92 # See https://git-scm.com/docs/gitignore 

93 # > If there is a separator at the beginning or middle (or both) of the pattern, then the 

94 # > pattern is relative to the directory level of the particular .gitignore file itself. 

95 # > Otherwise the pattern may also match at any level below the .gitignore level. 

96 relative_to = definition_file.parent 

97 

98 ignore_pattern = GitWildMatchPattern(pattern) 

99 return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to) 

100 

101 @staticmethod 

102 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

103 """Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering.""" 

104 matched = False 

105 for rule in rules: 

106 if not isinstance(rule, _GlobIgnoreRule): 

107 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}") 

108 rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name) 

109 if path.is_dir(): 

110 rel_path = f"{rel_obj.as_posix()}/" 

111 else: 

112 rel_path = rel_obj.as_posix() 

113 if ( 

114 rule.wild_match_pattern.include is not None 

115 and rule.wild_match_pattern.match_file(rel_path) is not None 

116 ): 

117 matched = rule.wild_match_pattern.include 

118 

119 return matched 

120 

121 

122def _find_path_from_directory( 

123 base_dir_path: str | os.PathLike[str], 

124 ignore_file_name: str, 

125 ignore_rule_type: type[_IgnoreRule], 

126) -> Generator[str, None, None]: 

127 """ 

128 Recursively search the base path and return the list of file paths that should not be ignored. 

129 

130 :param base_dir_path: the base path to be searched 

131 :param ignore_file_name: the file name containing regular expressions for files that should be ignored. 

132 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. 

133 

134 :return: a generator of file paths which should not be ignored. 

135 """ 

136 # A Dict of patterns, keyed using resolved, absolute paths 

137 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} 

138 

139 for root, dirs, files in os.walk(base_dir_path, followlinks=True): 

140 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) 

141 

142 ignore_file_path = Path(root) / ignore_file_name 

143 if ignore_file_path.is_file(): 

144 with open(ignore_file_path) as ifile: 

145 patterns_to_match_excluding_comments = [ 

146 re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n") 

147 ] 

148 # append new patterns and filter out "None" objects, which are invalid patterns 

149 patterns += [ 

150 p 

151 for p in [ 

152 ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path) 

153 for pattern in patterns_to_match_excluding_comments 

154 if pattern 

155 ] 

156 if p is not None 

157 ] 

158 # evaluation order of patterns is important with negation 

159 # so that later patterns can override earlier patterns 

160 

161 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] 

162 # explicit loop for infinite recursion detection since we are following symlinks in this walk 

163 for sd in dirs: 

164 dirpath = (Path(root) / sd).resolve() 

165 if dirpath in patterns_by_dir: 

166 raise RuntimeError( 

167 "Detected recursive loop when walking DAG directory " 

168 f"{base_dir_path}: {dirpath} has appeared more than once." 

169 ) 

170 patterns_by_dir.update({dirpath: patterns.copy()}) 

171 

172 for file in files: 

173 if file != ignore_file_name: 

174 abs_file_path = Path(root) / file 

175 if not ignore_rule_type.match(abs_file_path, patterns): 

176 yield str(abs_file_path) 

177 

178 

179def find_path_from_directory( 

180 base_dir_path: str | os.PathLike[str], 

181 ignore_file_name: str, 

182 ignore_file_syntax: str = "glob", 

183) -> Generator[str, None, None]: 

184 """ 

185 Recursively search the base path for a list of file paths that should not be ignored. 

186 

187 :param base_dir_path: the base path to be searched 

188 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored 

189 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob (default: glob) 

190 

191 :return: a generator of file paths. 

192 """ 

193 if ignore_file_syntax == "glob" or not ignore_file_syntax: 

194 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) 

195 if ignore_file_syntax == "regexp": 

196 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) 

197 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")