1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File discovery utilities for finding files while respecting ignore patterns."""
19
20from __future__ import annotations
21
22import logging
23import os
24import re
25from collections.abc import Generator
26from pathlib import Path
27from re import Pattern
28from typing import NamedTuple, Protocol
29
30from pathspec.patterns import GitWildMatchPattern
31
32log = logging.getLogger(__name__)
33
34
35class _IgnoreRule(Protocol):
36 """Interface for ignore rules for structural subtyping."""
37
38 @staticmethod
39 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
40 """
41 Build an ignore rule from the supplied pattern.
42
43 ``base_dir`` and ``definition_file`` should be absolute paths.
44 """
45
46 @staticmethod
47 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
48 """Match a candidate absolute path against a list of rules."""
49
50
51class _RegexpIgnoreRule(NamedTuple):
52 """Typed namedtuple with utility functions for regexp ignore rules."""
53
54 pattern: Pattern
55 base_dir: Path
56
57 @staticmethod
58 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
59 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
60 try:
61 return _RegexpIgnoreRule(re.compile(pattern), base_dir)
62 except re.error as e:
63 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
64 return None
65
66 @staticmethod
67 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
68 """Match a list of ignore rules against the supplied path."""
69 for rule in rules:
70 if not isinstance(rule, _RegexpIgnoreRule):
71 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
72 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
73 return True
74 return False
75
76
77class _GlobIgnoreRule(NamedTuple):
78 """Typed namedtuple with utility functions for glob ignore rules."""
79
80 wild_match_pattern: GitWildMatchPattern
81 relative_to: Path | None = None
82
83 @staticmethod
84 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
85 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
86 relative_to: Path | None = None
87 if pattern.strip() == "/":
88 # "/" doesn't match anything in gitignore
89 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
90 return None
91 if pattern.startswith("/") or "/" in pattern.rstrip("/"):
92 # See https://git-scm.com/docs/gitignore
93 # > If there is a separator at the beginning or middle (or both) of the pattern, then the
94 # > pattern is relative to the directory level of the particular .gitignore file itself.
95 # > Otherwise the pattern may also match at any level below the .gitignore level.
96 relative_to = definition_file.parent
97
98 ignore_pattern = GitWildMatchPattern(pattern)
99 return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to)
100
101 @staticmethod
102 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
103 """Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering."""
104 matched = False
105 for rule in rules:
106 if not isinstance(rule, _GlobIgnoreRule):
107 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}")
108 rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name)
109 if path.is_dir():
110 rel_path = f"{rel_obj.as_posix()}/"
111 else:
112 rel_path = rel_obj.as_posix()
113 if (
114 rule.wild_match_pattern.include is not None
115 and rule.wild_match_pattern.match_file(rel_path) is not None
116 ):
117 matched = rule.wild_match_pattern.include
118
119 return matched
120
121
122def _find_path_from_directory(
123 base_dir_path: str | os.PathLike[str],
124 ignore_file_name: str,
125 ignore_rule_type: type[_IgnoreRule],
126) -> Generator[str, None, None]:
127 """
128 Recursively search the base path and return the list of file paths that should not be ignored.
129
130 :param base_dir_path: the base path to be searched
131 :param ignore_file_name: the file name containing regular expressions for files that should be ignored.
132 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
133
134 :return: a generator of file paths which should not be ignored.
135 """
136 # A Dict of patterns, keyed using resolved, absolute paths
137 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
138
139 for root, dirs, files in os.walk(base_dir_path, followlinks=True):
140 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
141
142 ignore_file_path = Path(root) / ignore_file_name
143 if ignore_file_path.is_file():
144 with open(ignore_file_path) as ifile:
145 patterns_to_match_excluding_comments = [
146 re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")
147 ]
148 # append new patterns and filter out "None" objects, which are invalid patterns
149 patterns += [
150 p
151 for p in [
152 ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path)
153 for pattern in patterns_to_match_excluding_comments
154 if pattern
155 ]
156 if p is not None
157 ]
158 # evaluation order of patterns is important with negation
159 # so that later patterns can override earlier patterns
160
161 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
162 # explicit loop for infinite recursion detection since we are following symlinks in this walk
163 for sd in dirs:
164 dirpath = (Path(root) / sd).resolve()
165 if dirpath in patterns_by_dir:
166 raise RuntimeError(
167 "Detected recursive loop when walking DAG directory "
168 f"{base_dir_path}: {dirpath} has appeared more than once."
169 )
170 patterns_by_dir.update({dirpath: patterns.copy()})
171
172 for file in files:
173 if file != ignore_file_name:
174 abs_file_path = Path(root) / file
175 if not ignore_rule_type.match(abs_file_path, patterns):
176 yield str(abs_file_path)
177
178
179def find_path_from_directory(
180 base_dir_path: str | os.PathLike[str],
181 ignore_file_name: str,
182 ignore_file_syntax: str = "glob",
183) -> Generator[str, None, None]:
184 """
185 Recursively search the base path for a list of file paths that should not be ignored.
186
187 :param base_dir_path: the base path to be searched
188 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
189 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob (default: glob)
190
191 :return: a generator of file paths.
192 """
193 if ignore_file_syntax == "glob" or not ignore_file_syntax:
194 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
195 if ignore_file_syntax == "regexp":
196 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
197 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")