1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File discovery utilities for finding files while respecting ignore patterns."""
19
20from __future__ import annotations
21
22import logging
23import os
24import re
25from collections.abc import Generator
26from pathlib import Path
27from re import Pattern
28from typing import NamedTuple, Protocol
29
30from pathspec.patterns import GitWildMatchPattern
31
32log = logging.getLogger(__name__)
33
34
35class _IgnoreRule(Protocol):
36 """Interface for ignore rules for structural subtyping."""
37
38 @staticmethod
39 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
40 """
41 Build an ignore rule from the supplied pattern.
42
43 ``base_dir`` and ``definition_file`` should be absolute paths.
44 """
45
46 @staticmethod
47 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
48 """Match a candidate absolute path against a list of rules."""
49
50
51class _RegexpIgnoreRule(NamedTuple):
52 """Typed namedtuple with utility functions for regexp ignore rules."""
53
54 pattern: Pattern
55 base_dir: Path
56
57 @staticmethod
58 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
59 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
60 try:
61 return _RegexpIgnoreRule(re.compile(pattern), base_dir)
62 except re.error as e:
63 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
64 return None
65
66 @staticmethod
67 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
68 """Match a list of ignore rules against the supplied path."""
69 for rule in rules:
70 if not isinstance(rule, _RegexpIgnoreRule):
71 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
72 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
73 return True
74 return False
75
76
77class _GlobIgnoreRule(NamedTuple):
78 """Typed namedtuple with utility functions for glob ignore rules."""
79
80 wild_match_pattern: GitWildMatchPattern
81 relative_to: Path | None = None
82 dir_only: bool = False
83
84 @staticmethod
85 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
86 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
87 relative_to: Path | None = None
88 if pattern.strip() == "/":
89 # "/" doesn't match anything in gitignore
90 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
91 return None
92 if pattern.startswith("/") or "/" in pattern.rstrip("/"):
93 # See https://git-scm.com/docs/gitignore
94 # > If there is a separator at the beginning or middle (or both) of the pattern, then the
95 # > pattern is relative to the directory level of the particular .gitignore file itself.
96 # > Otherwise the pattern may also match at any level below the .gitignore level.
97 relative_to = definition_file.parent
98
99 # See https://git-scm.com/docs/gitignore
100 # > If there is a separator at the end of the pattern then the pattern will only match
101 # > directories, otherwise the pattern can match both files and directories.
102 # Strip the negation prefix before checking for trailing separator.
103 raw_pattern = pattern.lstrip("!")
104 dir_only = raw_pattern.rstrip() != raw_pattern.rstrip().rstrip("/")
105
106 ignore_pattern = GitWildMatchPattern(pattern)
107 return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to, dir_only=dir_only)
108
109 @staticmethod
110 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
111 """Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering."""
112 matched = False
113 for rule in rules:
114 if not isinstance(rule, _GlobIgnoreRule):
115 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}")
116 # See https://git-scm.com/docs/gitignore
117 # > If there is a separator at the end of the pattern then the pattern will only match
118 # > directories, otherwise the pattern can match both files and directories.
119 is_dir = path.is_dir()
120 if rule.dir_only and not is_dir:
121 continue
122 rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name)
123 if is_dir:
124 rel_path = f"{rel_obj.as_posix()}/"
125 else:
126 rel_path = rel_obj.as_posix()
127 if (
128 rule.wild_match_pattern.include is not None
129 and rule.wild_match_pattern.match_file(rel_path) is not None
130 ):
131 matched = rule.wild_match_pattern.include
132
133 return matched
134
135
136def _find_path_from_directory(
137 base_dir_path: str | os.PathLike[str],
138 ignore_file_name: str,
139 ignore_rule_type: type[_IgnoreRule],
140) -> Generator[str, None, None]:
141 """
142 Recursively search the base path and return the list of file paths that should not be ignored.
143
144 :param base_dir_path: the base path to be searched
145 :param ignore_file_name: the file name containing regular expressions for files that should be ignored.
146 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
147
148 :return: a generator of file paths which should not be ignored.
149 """
150 # A Dict of patterns, keyed using resolved, absolute paths
151 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
152
153 for root, dirs, files in os.walk(base_dir_path, followlinks=True):
154 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
155
156 ignore_file_path = Path(root) / ignore_file_name
157 if ignore_file_path.is_file():
158 with open(ignore_file_path) as ifile:
159 patterns_to_match_excluding_comments = [
160 re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")
161 ]
162 # append new patterns and filter out "None" objects, which are invalid patterns
163 patterns += [
164 p
165 for p in [
166 ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path)
167 for pattern in patterns_to_match_excluding_comments
168 if pattern
169 ]
170 if p is not None
171 ]
172 # evaluation order of patterns is important with negation
173 # so that later patterns can override earlier patterns
174
175 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
176 # explicit loop for infinite recursion detection since we are following symlinks in this walk
177 for sd in dirs:
178 dirpath = (Path(root) / sd).resolve()
179 if dirpath in patterns_by_dir:
180 raise RuntimeError(
181 "Detected recursive loop when walking DAG directory "
182 f"{base_dir_path}: {dirpath} has appeared more than once."
183 )
184 patterns_by_dir.update({dirpath: patterns.copy()})
185
186 for file in files:
187 if file != ignore_file_name:
188 abs_file_path = Path(root) / file
189 if not ignore_rule_type.match(abs_file_path, patterns):
190 yield str(abs_file_path)
191
192
193def find_path_from_directory(
194 base_dir_path: str | os.PathLike[str],
195 ignore_file_name: str,
196 ignore_file_syntax: str = "glob",
197) -> Generator[str, None, None]:
198 """
199 Recursively search the base path for a list of file paths that should not be ignored.
200
201 :param base_dir_path: the base path to be searched
202 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
203 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob (default: glob)
204
205 :return: a generator of file paths.
206 """
207 if ignore_file_syntax == "glob" or not ignore_file_syntax:
208 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
209 if ignore_file_syntax == "regexp":
210 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
211 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")