Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/file.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

104 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import ast 

21import hashlib 

22import logging 

23import os 

24import re 

25import zipfile 

26from collections.abc import Generator 

27from io import TextIOWrapper 

28from pathlib import Path 

29from typing import overload 

30 

31from airflow.configuration import conf 

32 

33log = logging.getLogger(__name__) 

34 

35MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}" 

36 

37 

38ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)") 

39 

40 

41@overload 

42def correct_maybe_zipped(fileloc: None) -> None: ... 

43 

44 

45@overload 

46def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ... 

47 

48 

49def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: 

50 """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path.""" 

51 if not fileloc: 

52 return fileloc 

53 search_ = ZIP_REGEX.search(str(fileloc)) 

54 if not search_: 

55 return fileloc 

56 _, archive, _ = search_.groups() 

57 if archive and zipfile.is_zipfile(archive): 

58 return archive 

59 return fileloc 

60 

61 

62def open_maybe_zipped(fileloc, mode="r"): 

63 """ 

64 Open the given file. 

65 

66 If the path contains a folder with a .zip suffix, then the folder 

67 is treated as a zip archive, opening the file inside the archive. 

68 

69 :return: a file object, as in `open`, or as in `ZipFile.open`. 

70 """ 

71 _, archive, filename = ZIP_REGEX.search(fileloc).groups() 

72 if archive and zipfile.is_zipfile(archive): 

73 return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename)) 

74 return open(fileloc, mode=mode) 

75 

76 

77def list_py_file_paths( 

78 directory: str | os.PathLike[str] | None, 

79 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True), 

80) -> list[str]: 

81 """ 

82 Traverse a directory and look for Python files. 

83 

84 :param directory: the directory to traverse 

85 :param safe_mode: whether to use a heuristic to determine whether a file 

86 contains Airflow DAG definitions. If not provided, use the 

87 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default 

88 to safe. 

89 :return: a list of paths to Python files in the specified directory 

90 """ 

91 file_paths: list[str] = [] 

92 if directory is None: 

93 file_paths = [] 

94 elif os.path.isfile(directory): 

95 file_paths = [str(directory)] 

96 elif os.path.isdir(directory): 

97 file_paths.extend(find_dag_file_paths(directory, safe_mode)) 

98 return file_paths 

99 

100 

101def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]: 

102 """Find file paths of all DAG files.""" 

103 from airflow._shared.module_loading.file_discovery import find_path_from_directory 

104 

105 file_paths = [] 

106 ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob") 

107 

108 for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax): 

109 path = Path(file_path) 

110 try: 

111 if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)): 

112 if might_contain_dag(file_path, safe_mode): 

113 file_paths.append(file_path) 

114 except Exception: 

115 log.exception("Error while examining %s", file_path) 

116 

117 return file_paths 

118 

119 

120COMMENT_PATTERN = re.compile(r"\s*#.*") 

121 

122 

123def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool: 

124 """ 

125 Check whether a Python file contains Airflow DAGs. 

126 

127 When safe_mode is off (with False value), this function always returns True. 

128 

129 If might_contain_dag_callable isn't specified, it uses airflow default heuristic 

130 """ 

131 if not safe_mode: 

132 return True 

133 

134 might_contain_dag_callable = conf.getimport( 

135 "core", 

136 "might_contain_dag_callable", 

137 fallback="airflow.utils.file.might_contain_dag_via_default_heuristic", 

138 ) 

139 return might_contain_dag_callable(file_path=file_path, zip_file=zip_file) 

140 

141 

142def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool: 

143 """ 

144 Heuristic that guesses whether a Python file contains an Airflow DAG definition. 

145 

146 :param file_path: Path to the file to be checked. 

147 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. 

148 :return: True, if file might contain DAGs. 

149 """ 

150 if zip_file: 

151 with zip_file.open(file_path) as current_file: 

152 content = current_file.read() 

153 else: 

154 if zipfile.is_zipfile(file_path): 

155 return True 

156 with open(file_path, "rb") as dag_file: 

157 content = dag_file.read() 

158 content = content.lower() 

159 if b"airflow" not in content: 

160 return False 

161 return any(s in content for s in (b"dag", b"asset")) 

162 

163 

164def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: 

165 for st in module.body: 

166 if isinstance(st, ast.Import): 

167 for n in st.names: 

168 yield n.name 

169 elif isinstance(st, ast.ImportFrom) and st.module is not None: 

170 yield st.module 

171 

172 

173def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: 

174 """Find Airflow modules imported in the given file.""" 

175 try: 

176 parsed = ast.parse(Path(file_path).read_bytes()) 

177 except Exception: 

178 return 

179 for m in _find_imported_modules(parsed): 

180 if m.startswith("airflow."): 

181 yield m 

182 

183 

184def get_unique_dag_module_name(file_path: str) -> str: 

185 """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}.""" 

186 if isinstance(file_path, str): 

187 path_hash = hashlib.sha1(file_path.encode("utf-8"), usedforsecurity=False).hexdigest() 

188 org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem) 

189 return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name) 

190 raise ValueError("file_path should be a string to generate unique module name") 

191 

192 

193def __getattr__(name: str): 

194 if name == "find_path_from_directory": 

195 import warnings 

196 

197 from airflow._shared.module_loading import find_path_from_directory 

198 from airflow.utils.deprecation_tools import DeprecatedImportWarning 

199 

200 warnings.warn( 

201 "Importing find_path_from_directory from airflow.utils.file is deprecated " 

202 "and will be removed in a future version. " 

203 "Use airflow._shared.module_loading.find_path_from_directory instead.", 

204 DeprecatedImportWarning, 

205 stacklevel=2, 

206 ) 

207 return find_path_from_directory 

208 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")