Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/file.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import ast 

21import hashlib 

22import logging 

23import os 

24import re 

25import zipfile 

26from collections.abc import Generator 

27from io import TextIOWrapper 

28from pathlib import Path 

29from re import Pattern 

30from typing import NamedTuple, Protocol, overload 

31 

32from pathspec.patterns import GitWildMatchPattern 

33 

34from airflow.configuration import conf 

35 

36log = logging.getLogger(__name__) 

37 

38MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}" 

39 

40 

41class _IgnoreRule(Protocol): 

42 """Interface for ignore rules for structural subtyping.""" 

43 

44 @staticmethod 

45 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

46 """ 

47 Build an ignore rule from the supplied pattern. 

48 

49 ``base_dir`` and ``definition_file`` should be absolute paths. 

50 """ 

51 

52 @staticmethod 

53 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

54 """Match a candidate absolute path against a list of rules.""" 

55 

56 

57class _RegexpIgnoreRule(NamedTuple): 

58 """Typed namedtuple with utility functions for regexp ignore rules.""" 

59 

60 pattern: Pattern 

61 base_dir: Path 

62 

63 @staticmethod 

64 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

65 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid.""" 

66 try: 

67 return _RegexpIgnoreRule(re.compile(pattern), base_dir) 

68 except re.error as e: 

69 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) 

70 return None 

71 

72 @staticmethod 

73 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

74 """Match a list of ignore rules against the supplied path.""" 

75 for rule in rules: 

76 if not isinstance(rule, _RegexpIgnoreRule): 

77 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") 

78 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: 

79 return True 

80 return False 

81 

82 

83class _GlobIgnoreRule(NamedTuple): 

84 """Typed namedtuple with utility functions for glob ignore rules.""" 

85 

86 wild_match_pattern: GitWildMatchPattern 

87 relative_to: Path | None = None 

88 

89 @staticmethod 

90 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

91 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid.""" 

92 relative_to: Path | None = None 

93 if pattern.strip() == "/": 

94 # "/" doesn't match anything in gitignore 

95 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) 

96 return None 

97 if pattern.startswith("/") or "/" in pattern.rstrip("/"): 

98 # See https://git-scm.com/docs/gitignore 

99 # > If there is a separator at the beginning or middle (or both) of the pattern, then the 

100 # > pattern is relative to the directory level of the particular .gitignore file itself. 

101 # > Otherwise the pattern may also match at any level below the .gitignore level. 

102 relative_to = definition_file.parent 

103 

104 ignore_pattern = GitWildMatchPattern(pattern) 

105 return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to) 

106 

107 @staticmethod 

108 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

109 """Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering.""" 

110 matched = False 

111 for rule in rules: 

112 if not isinstance(rule, _GlobIgnoreRule): 

113 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}") 

114 rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name) 

115 if path.is_dir(): 

116 rel_path = f"{rel_obj.as_posix()}/" 

117 else: 

118 rel_path = rel_obj.as_posix() 

119 if ( 

120 rule.wild_match_pattern.include is not None 

121 and rule.wild_match_pattern.match_file(rel_path) is not None 

122 ): 

123 matched = rule.wild_match_pattern.include 

124 

125 return matched 

126 

127 

128ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)") 

129 

130 

131@overload 

132def correct_maybe_zipped(fileloc: None) -> None: ... 

133 

134 

135@overload 

136def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ... 

137 

138 

139def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: 

140 """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path.""" 

141 if not fileloc: 

142 return fileloc 

143 search_ = ZIP_REGEX.search(str(fileloc)) 

144 if not search_: 

145 return fileloc 

146 _, archive, _ = search_.groups() 

147 if archive and zipfile.is_zipfile(archive): 

148 return archive 

149 return fileloc 

150 

151 

152def open_maybe_zipped(fileloc, mode="r"): 

153 """ 

154 Open the given file. 

155 

156 If the path contains a folder with a .zip suffix, then the folder 

157 is treated as a zip archive, opening the file inside the archive. 

158 

159 :return: a file object, as in `open`, or as in `ZipFile.open`. 

160 """ 

161 _, archive, filename = ZIP_REGEX.search(fileloc).groups() 

162 if archive and zipfile.is_zipfile(archive): 

163 return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename)) 

164 return open(fileloc, mode=mode) 

165 

166 

167def _find_path_from_directory( 

168 base_dir_path: str | os.PathLike[str], 

169 ignore_file_name: str, 

170 ignore_rule_type: type[_IgnoreRule], 

171) -> Generator[str, None, None]: 

172 """ 

173 Recursively search the base path and return the list of file paths that should not be ignored. 

174 

175 :param base_dir_path: the base path to be searched 

176 :param ignore_file_name: the file name containing regular expressions for files that should be ignored. 

177 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. 

178 

179 :return: a generator of file paths which should not be ignored. 

180 """ 

181 # A Dict of patterns, keyed using resolved, absolute paths 

182 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} 

183 

184 for root, dirs, files in os.walk(base_dir_path, followlinks=True): 

185 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) 

186 

187 ignore_file_path = Path(root) / ignore_file_name 

188 if ignore_file_path.is_file(): 

189 with open(ignore_file_path) as ifile: 

190 patterns_to_match_excluding_comments = [ 

191 re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n") 

192 ] 

193 # append new patterns and filter out "None" objects, which are invalid patterns 

194 patterns += [ 

195 p 

196 for p in [ 

197 ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path) 

198 for pattern in patterns_to_match_excluding_comments 

199 if pattern 

200 ] 

201 if p is not None 

202 ] 

203 # evaluation order of patterns is important with negation 

204 # so that later patterns can override earlier patterns 

205 

206 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] 

207 # explicit loop for infinite recursion detection since we are following symlinks in this walk 

208 for sd in dirs: 

209 dirpath = (Path(root) / sd).resolve() 

210 if dirpath in patterns_by_dir: 

211 raise RuntimeError( 

212 "Detected recursive loop when walking DAG directory " 

213 f"{base_dir_path}: {dirpath} has appeared more than once." 

214 ) 

215 patterns_by_dir.update({dirpath: patterns.copy()}) 

216 

217 for file in files: 

218 if file != ignore_file_name: 

219 abs_file_path = Path(root) / file 

220 if not ignore_rule_type.match(abs_file_path, patterns): 

221 yield str(abs_file_path) 

222 

223 

224def find_path_from_directory( 

225 base_dir_path: str | os.PathLike[str], 

226 ignore_file_name: str, 

227 ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob"), 

228) -> Generator[str, None, None]: 

229 """ 

230 Recursively search the base path for a list of file paths that should not be ignored. 

231 

232 :param base_dir_path: the base path to be searched 

233 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored 

234 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob 

235 

236 :return: a generator of file paths. 

237 """ 

238 if ignore_file_syntax == "glob" or not ignore_file_syntax: 

239 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) 

240 if ignore_file_syntax == "regexp": 

241 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) 

242 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}") 

243 

244 

245def list_py_file_paths( 

246 directory: str | os.PathLike[str] | None, 

247 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True), 

248) -> list[str]: 

249 """ 

250 Traverse a directory and look for Python files. 

251 

252 :param directory: the directory to traverse 

253 :param safe_mode: whether to use a heuristic to determine whether a file 

254 contains Airflow DAG definitions. If not provided, use the 

255 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default 

256 to safe. 

257 :return: a list of paths to Python files in the specified directory 

258 """ 

259 file_paths: list[str] = [] 

260 if directory is None: 

261 file_paths = [] 

262 elif os.path.isfile(directory): 

263 file_paths = [str(directory)] 

264 elif os.path.isdir(directory): 

265 file_paths.extend(find_dag_file_paths(directory, safe_mode)) 

266 return file_paths 

267 

268 

269def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]: 

270 """Find file paths of all DAG files.""" 

271 file_paths = [] 

272 

273 for file_path in find_path_from_directory(directory, ".airflowignore"): 

274 path = Path(file_path) 

275 try: 

276 if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)): 

277 if might_contain_dag(file_path, safe_mode): 

278 file_paths.append(file_path) 

279 except Exception: 

280 log.exception("Error while examining %s", file_path) 

281 

282 return file_paths 

283 

284 

285COMMENT_PATTERN = re.compile(r"\s*#.*") 

286 

287 

288def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool: 

289 """ 

290 Check whether a Python file contains Airflow DAGs. 

291 

292 When safe_mode is off (with False value), this function always returns True. 

293 

294 If might_contain_dag_callable isn't specified, it uses airflow default heuristic 

295 """ 

296 if not safe_mode: 

297 return True 

298 

299 might_contain_dag_callable = conf.getimport( 

300 "core", 

301 "might_contain_dag_callable", 

302 fallback="airflow.utils.file.might_contain_dag_via_default_heuristic", 

303 ) 

304 return might_contain_dag_callable(file_path=file_path, zip_file=zip_file) 

305 

306 

307def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool: 

308 """ 

309 Heuristic that guesses whether a Python file contains an Airflow DAG definition. 

310 

311 :param file_path: Path to the file to be checked. 

312 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. 

313 :return: True, if file might contain DAGs. 

314 """ 

315 if zip_file: 

316 with zip_file.open(file_path) as current_file: 

317 content = current_file.read() 

318 else: 

319 if zipfile.is_zipfile(file_path): 

320 return True 

321 with open(file_path, "rb") as dag_file: 

322 content = dag_file.read() 

323 content = content.lower() 

324 if b"airflow" not in content: 

325 return False 

326 return any(s in content for s in (b"dag", b"asset")) 

327 

328 

329def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: 

330 for st in module.body: 

331 if isinstance(st, ast.Import): 

332 for n in st.names: 

333 yield n.name 

334 elif isinstance(st, ast.ImportFrom) and st.module is not None: 

335 yield st.module 

336 

337 

338def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: 

339 """Find Airflow modules imported in the given file.""" 

340 try: 

341 parsed = ast.parse(Path(file_path).read_bytes()) 

342 except Exception: 

343 return 

344 for m in _find_imported_modules(parsed): 

345 if m.startswith("airflow."): 

346 yield m 

347 

348 

349def get_unique_dag_module_name(file_path: str) -> str: 

350 """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}.""" 

351 if isinstance(file_path, str): 

352 path_hash = hashlib.sha1(file_path.encode("utf-8"), usedforsecurity=False).hexdigest() 

353 org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem) 

354 return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name) 

355 raise ValueError("file_path should be a string to generate unique module name")