Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/file.py: 29%
188 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import ast
21import io
22import logging
23import os
24import re
25import zipfile
26from collections import OrderedDict
27from pathlib import Path
28from typing import TYPE_CHECKING, Generator, NamedTuple, Pattern, Protocol, overload
30from pathspec.patterns import GitWildMatchPattern
32from airflow.configuration import conf
33from airflow.exceptions import RemovedInAirflow3Warning
35if TYPE_CHECKING:
36 import pathlib
38log = logging.getLogger(__name__)
41class _IgnoreRule(Protocol):
42 """Interface for ignore rules for structural subtyping."""
44 @staticmethod
45 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
46 """
47 Build an ignore rule from the supplied pattern where base_dir
48 and definition_file should be absolute paths.
49 """
51 @staticmethod
52 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
53 """Match a candidate absolute path against a list of rules."""
56class _RegexpIgnoreRule(NamedTuple):
57 """Typed namedtuple with utility functions for regexp ignore rules."""
59 pattern: Pattern
60 base_dir: Path
62 @staticmethod
63 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
64 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
65 try:
66 return _RegexpIgnoreRule(re.compile(pattern), base_dir)
67 except re.error as e:
68 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
69 return None
71 @staticmethod
72 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
73 """Match a list of ignore rules against the supplied path."""
74 for rule in rules:
75 if not isinstance(rule, _RegexpIgnoreRule):
76 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
77 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
78 return True
79 return False
82class _GlobIgnoreRule(NamedTuple):
83 """Typed namedtuple with utility functions for glob ignore rules."""
85 pattern: Pattern
86 raw_pattern: str
87 include: bool | None = None
88 relative_to: Path | None = None
90 @staticmethod
91 def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None:
92 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
93 relative_to: Path | None = None
94 if pattern.strip() == "/":
95 # "/" doesn't match anything in gitignore
96 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
97 return None
98 if pattern.startswith("/") or "/" in pattern.rstrip("/"):
99 # See https://git-scm.com/docs/gitignore
100 # > If there is a separator at the beginning or middle (or both) of the pattern, then the
101 # > pattern is relative to the directory level of the particular .gitignore file itself.
102 # > Otherwise the pattern may also match at any level below the .gitignore level.
103 relative_to = definition_file.parent
104 ignore_pattern = GitWildMatchPattern(pattern)
105 return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)
107 @staticmethod
108 def match(path: Path, rules: list[_IgnoreRule]) -> bool:
109 """Match a list of ignore rules against the supplied path."""
110 matched = False
111 for r in rules:
112 if not isinstance(r, _GlobIgnoreRule):
113 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
114 rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
115 rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name)
116 if rule.raw_pattern.endswith("/") and path.is_dir():
117 # ensure the test path will potentially match a directory pattern if it is a directory
118 rel_path += "/"
119 if rule.include is not None and rule.pattern.match(rel_path) is not None:
120 matched = rule.include
121 return matched
124def TemporaryDirectory(*args, **kwargs):
125 """This function is deprecated. Please use `tempfile.TemporaryDirectory`."""
126 import warnings
127 from tempfile import TemporaryDirectory as TmpDir
129 warnings.warn(
130 "This function is deprecated. Please use `tempfile.TemporaryDirectory`",
131 RemovedInAirflow3Warning,
132 stacklevel=2,
133 )
135 return TmpDir(*args, **kwargs)
138def mkdirs(path, mode):
139 """
140 Creates the directory specified by path, creating intermediate directories
141 as necessary. If directory already exists, this is a no-op.
143 :param path: The directory to create
144 :param mode: The mode to give to the directory e.g. 0o755, ignores umask
145 """
146 import warnings
148 warnings.warn(
149 f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
150 RemovedInAirflow3Warning,
151 stacklevel=2,
152 )
153 Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
156ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
159@overload
160def correct_maybe_zipped(fileloc: None) -> None:
161 ...
164@overload
165def correct_maybe_zipped(fileloc: str | Path) -> str | Path:
166 ...
169def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
170 """
171 If the path contains a folder with a .zip suffix, then
172 the folder is treated as a zip archive and path to zip is returned.
173 """
174 if not fileloc:
175 return fileloc
176 search_ = ZIP_REGEX.search(str(fileloc))
177 if not search_:
178 return fileloc
179 _, archive, _ = search_.groups()
180 if archive and zipfile.is_zipfile(archive):
181 return archive
182 else:
183 return fileloc
186def open_maybe_zipped(fileloc, mode="r"):
187 """
188 Opens the given file. If the path contains a folder with a .zip suffix, then
189 the folder is treated as a zip archive, opening the file inside the archive.
191 :return: a file object, as in `open`, or as in `ZipFile.open`.
192 """
193 _, archive, filename = ZIP_REGEX.search(fileloc).groups()
194 if archive and zipfile.is_zipfile(archive):
195 return io.TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
196 else:
198 return open(fileloc, mode=mode)
201def _find_path_from_directory(
202 base_dir_path: str,
203 ignore_file_name: str,
204 ignore_rule_type: type[_IgnoreRule],
205) -> Generator[str, None, None]:
206 """
207 Recursively search the base path and return the list of file paths that should not be ignored by
208 regular expressions in any ignore files at each directory level.
209 :param base_dir_path: the base path to be searched
210 :param ignore_file_name: the file name containing regular expressions for files that should be ignored.
211 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
213 :return: a generator of file paths which should not be ignored.
214 """
215 # A Dict of patterns, keyed using resolved, absolute paths
216 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
218 for root, dirs, files in os.walk(base_dir_path, followlinks=True):
219 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
221 ignore_file_path = Path(root) / ignore_file_name
222 if ignore_file_path.is_file():
223 with open(ignore_file_path) as ifile:
224 lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
225 # append new patterns and filter out "None" objects, which are invalid patterns
226 patterns += [
227 p
228 for p in [
229 ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
230 for line in lines_no_comments
231 if line
232 ]
233 if p is not None
234 ]
235 # evaluation order of patterns is important with negation
236 # so that later patterns can override earlier patterns
237 patterns = list(OrderedDict.fromkeys(patterns).keys())
239 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
241 # explicit loop for infinite recursion detection since we are following symlinks in this walk
242 for sd in dirs:
243 dirpath = (Path(root) / sd).resolve()
244 if dirpath in patterns_by_dir:
245 raise RuntimeError(
246 "Detected recursive loop when walking DAG directory "
247 f"{base_dir_path}: {dirpath} has appeared more than once."
248 )
249 patterns_by_dir.update({dirpath: patterns.copy()})
251 for file in files:
252 if file == ignore_file_name:
253 continue
254 abs_file_path = Path(root) / file
255 if ignore_rule_type.match(abs_file_path, patterns):
256 continue
257 yield str(abs_file_path)
260def find_path_from_directory(
261 base_dir_path: str,
262 ignore_file_name: str,
263 ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"),
264) -> Generator[str, None, None]:
265 """
266 Recursively search the base path and return the list of file paths that should not be ignored.
268 :param base_dir_path: the base path to be searched
269 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
270 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
272 :return: a generator of file paths.
273 """
274 if ignore_file_syntax == "glob":
275 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
276 elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
277 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
278 else:
279 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
282def list_py_file_paths(
283 directory: str | pathlib.Path,
284 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
285 include_examples: bool | None = None,
286) -> list[str]:
287 """
288 Traverse a directory and look for Python files.
290 :param directory: the directory to traverse
291 :param safe_mode: whether to use a heuristic to determine whether a file
292 contains Airflow DAG definitions. If not provided, use the
293 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
294 to safe.
295 :param include_examples: include example DAGs
296 :return: a list of paths to Python files in the specified directory
297 """
298 if include_examples is None:
299 include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
300 file_paths: list[str] = []
301 if directory is None:
302 file_paths = []
303 elif os.path.isfile(directory):
304 file_paths = [str(directory)]
305 elif os.path.isdir(directory):
306 file_paths.extend(find_dag_file_paths(directory, safe_mode))
307 if include_examples:
308 from airflow import example_dags
310 example_dag_folder = example_dags.__path__[0] # type: ignore
311 file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
312 return file_paths
315def find_dag_file_paths(directory: str | pathlib.Path, safe_mode: bool) -> list[str]:
316 """Finds file paths of all DAG files."""
317 file_paths = []
319 for file_path in find_path_from_directory(str(directory), ".airflowignore"):
320 try:
321 if not os.path.isfile(file_path):
322 continue
323 _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
324 if file_ext != ".py" and not zipfile.is_zipfile(file_path):
325 continue
326 if not might_contain_dag(file_path, safe_mode):
327 continue
329 file_paths.append(file_path)
330 except Exception:
331 log.exception("Error while examining %s", file_path)
333 return file_paths
336COMMENT_PATTERN = re.compile(r"\s*#.*")
339def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
340 """
341 Check whether a Python file contains Airflow DAGs.
342 When safe_mode is off (with False value), this function always returns True.
344 If might_contain_dag_callable isn't specified, it uses airflow default heuristic
345 """
346 if not safe_mode:
347 return True
349 might_contain_dag_callable = conf.getimport(
350 "core",
351 "might_contain_dag_callable",
352 fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
353 )
354 return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
357def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
358 """
359 Heuristic that guesses whether a Python file contains an Airflow DAG definition.
361 :param file_path: Path to the file to be checked.
362 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
363 :return: True, if file might contain DAGs.
364 """
365 if zip_file:
366 with zip_file.open(file_path) as current_file:
367 content = current_file.read()
368 else:
369 if zipfile.is_zipfile(file_path):
370 return True
371 with open(file_path, "rb") as dag_file:
372 content = dag_file.read()
373 content = content.lower()
374 return all(s in content for s in (b"dag", b"airflow"))
377def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
378 for st in module.body:
379 if isinstance(st, ast.Import):
380 for n in st.names:
381 yield n.name
382 elif isinstance(st, ast.ImportFrom) and st.module is not None:
383 yield st.module
386def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
387 """Find Airflow modules imported in the given file."""
388 try:
389 parsed = ast.parse(Path(file_path).read_bytes())
390 except Exception:
391 return
392 for m in _find_imported_modules(parsed):
393 if m.startswith("airflow."):
394 yield m