Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/file.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import ast
21import hashlib
22import logging
23import os
24import re
25import zipfile
26from collections.abc import Generator
27from io import TextIOWrapper
28from pathlib import Path
29from typing import overload
31from airflow.configuration import conf
33log = logging.getLogger(__name__)
35MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
38ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
41@overload
42def correct_maybe_zipped(fileloc: None) -> None: ...
45@overload
46def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
49def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
50 """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path."""
51 if not fileloc:
52 return fileloc
53 search_ = ZIP_REGEX.search(str(fileloc))
54 if not search_:
55 return fileloc
56 _, archive, _ = search_.groups()
57 if archive and zipfile.is_zipfile(archive):
58 return archive
59 return fileloc
62def open_maybe_zipped(fileloc, mode="r"):
63 """
64 Open the given file.
66 If the path contains a folder with a .zip suffix, then the folder
67 is treated as a zip archive, opening the file inside the archive.
69 :return: a file object, as in `open`, or as in `ZipFile.open`.
70 """
71 _, archive, filename = ZIP_REGEX.search(fileloc).groups()
72 if archive and zipfile.is_zipfile(archive):
73 return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
74 return open(fileloc, mode=mode)
77def list_py_file_paths(
78 directory: str | os.PathLike[str] | None,
79 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
80) -> list[str]:
81 """
82 Traverse a directory and look for Python files.
84 :param directory: the directory to traverse
85 :param safe_mode: whether to use a heuristic to determine whether a file
86 contains Airflow DAG definitions. If not provided, use the
87 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
88 to safe.
89 :return: a list of paths to Python files in the specified directory
90 """
91 file_paths: list[str] = []
92 if directory is None:
93 file_paths = []
94 elif os.path.isfile(directory):
95 file_paths = [str(directory)]
96 elif os.path.isdir(directory):
97 file_paths.extend(find_dag_file_paths(directory, safe_mode))
98 return file_paths
101def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
102 """Find file paths of all DAG files."""
103 from airflow._shared.module_loading.file_discovery import find_path_from_directory
105 file_paths = []
106 ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
108 for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
109 path = Path(file_path)
110 try:
111 if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
112 if might_contain_dag(file_path, safe_mode):
113 file_paths.append(file_path)
114 except Exception:
115 log.exception("Error while examining %s", file_path)
117 return file_paths
120COMMENT_PATTERN = re.compile(r"\s*#.*")
123def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
124 """
125 Check whether a Python file contains Airflow DAGs.
127 When safe_mode is off (with False value), this function always returns True.
129 If might_contain_dag_callable isn't specified, it uses airflow default heuristic
130 """
131 if not safe_mode:
132 return True
134 might_contain_dag_callable = conf.getimport(
135 "core",
136 "might_contain_dag_callable",
137 fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
138 )
139 return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
142def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
143 """
144 Heuristic that guesses whether a Python file contains an Airflow DAG definition.
146 :param file_path: Path to the file to be checked.
147 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
148 :return: True, if file might contain DAGs.
149 """
150 if zip_file:
151 with zip_file.open(file_path) as current_file:
152 content = current_file.read()
153 else:
154 if zipfile.is_zipfile(file_path):
155 return True
156 with open(file_path, "rb") as dag_file:
157 content = dag_file.read()
158 content = content.lower()
159 if b"airflow" not in content:
160 return False
161 return any(s in content for s in (b"dag", b"asset"))
164def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
165 for st in module.body:
166 if isinstance(st, ast.Import):
167 for n in st.names:
168 yield n.name
169 elif isinstance(st, ast.ImportFrom) and st.module is not None:
170 yield st.module
173def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
174 """Find Airflow modules imported in the given file."""
175 try:
176 parsed = ast.parse(Path(file_path).read_bytes())
177 except Exception:
178 return
179 for m in _find_imported_modules(parsed):
180 if m.startswith("airflow."):
181 yield m
184def get_unique_dag_module_name(file_path: str) -> str:
185 """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
186 if isinstance(file_path, str):
187 path_hash = hashlib.sha1(file_path.encode("utf-8"), usedforsecurity=False).hexdigest()
188 org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
189 return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
190 raise ValueError("file_path should be a string to generate unique module name")
193def __getattr__(name: str):
194 if name == "find_path_from_directory":
195 import warnings
197 from airflow._shared.module_loading import find_path_from_directory
198 from airflow.utils.deprecation_tools import DeprecatedImportWarning
200 warnings.warn(
201 "Importing find_path_from_directory from airflow.utils.file is deprecated "
202 "and will be removed in a future version. "
203 "Use airflow._shared.module_loading.find_path_from_directory instead.",
204 DeprecatedImportWarning,
205 stacklevel=2,
206 )
207 return find_path_from_directory
208 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")