Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/file.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import ast 

21import hashlib 

22import logging 

23import os 

24import zipfile 

25from io import TextIOWrapper 

26from pathlib import Path 

27from typing import Generator, NamedTuple, Pattern, Protocol, overload 

28 

29import re2 

30from pathspec.patterns import GitWildMatchPattern 

31 

32from airflow.configuration import conf 

33from airflow.exceptions import RemovedInAirflow3Warning 

34 

35log = logging.getLogger(__name__) 

36 

37MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}" 

38 

39 

40class _IgnoreRule(Protocol): 

41 """Interface for ignore rules for structural subtyping.""" 

42 

43 @staticmethod 

44 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

45 """ 

46 Build an ignore rule from the supplied pattern. 

47 

48 ``base_dir`` and ``definition_file`` should be absolute paths. 

49 """ 

50 

51 @staticmethod 

52 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

53 """Match a candidate absolute path against a list of rules.""" 

54 

55 

56class _RegexpIgnoreRule(NamedTuple): 

57 """Typed namedtuple with utility functions for regexp ignore rules.""" 

58 

59 pattern: Pattern 

60 base_dir: Path 

61 

62 @staticmethod 

63 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

64 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid.""" 

65 try: 

66 return _RegexpIgnoreRule(re2.compile(pattern), base_dir) 

67 except re2.error as e: 

68 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) 

69 return None 

70 

71 @staticmethod 

72 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

73 """Match a list of ignore rules against the supplied path.""" 

74 for rule in rules: 

75 if not isinstance(rule, _RegexpIgnoreRule): 

76 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") 

77 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: 

78 return True 

79 return False 

80 

81 

82class _GlobIgnoreRule(NamedTuple): 

83 """Typed namedtuple with utility functions for glob ignore rules.""" 

84 

85 pattern: Pattern 

86 raw_pattern: str 

87 include: bool | None = None 

88 relative_to: Path | None = None 

89 

90 @staticmethod 

91 def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None: 

92 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid.""" 

93 relative_to: Path | None = None 

94 if pattern.strip() == "/": 

95 # "/" doesn't match anything in gitignore 

96 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) 

97 return None 

98 if pattern.startswith("/") or "/" in pattern.rstrip("/"): 

99 # See https://git-scm.com/docs/gitignore 

100 # > If there is a separator at the beginning or middle (or both) of the pattern, then the 

101 # > pattern is relative to the directory level of the particular .gitignore file itself. 

102 # > Otherwise the pattern may also match at any level below the .gitignore level. 

103 relative_to = definition_file.parent 

104 ignore_pattern = GitWildMatchPattern(pattern) 

105 return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to) 

106 

107 @staticmethod 

108 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

109 """Match a list of ignore rules against the supplied path.""" 

110 matched = False 

111 for r in rules: 

112 if not isinstance(r, _GlobIgnoreRule): 

113 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}") 

114 rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely 

115 rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name) 

116 if rule.raw_pattern.endswith("/") and path.is_dir(): 

117 # ensure the test path will potentially match a directory pattern if it is a directory 

118 rel_path += "/" 

119 if rule.include is not None and rule.pattern.match(rel_path) is not None: 

120 matched = rule.include 

121 return matched 

122 

123 

124def TemporaryDirectory(*args, **kwargs): 

125 """Use `tempfile.TemporaryDirectory`, this function is deprecated.""" 

126 import warnings 

127 from tempfile import TemporaryDirectory as TmpDir 

128 

129 warnings.warn( 

130 "This function is deprecated. Please use `tempfile.TemporaryDirectory`", 

131 RemovedInAirflow3Warning, 

132 stacklevel=2, 

133 ) 

134 

135 return TmpDir(*args, **kwargs) 

136 

137 

138def mkdirs(path, mode): 

139 """ 

140 Create the directory specified by path, creating intermediate directories as necessary. 

141 

142 If directory already exists, this is a no-op. 

143 

144 :param path: The directory to create 

145 :param mode: The mode to give to the directory e.g. 0o755, ignores umask 

146 """ 

147 import warnings 

148 

149 warnings.warn( 

150 f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`", 

151 RemovedInAirflow3Warning, 

152 stacklevel=2, 

153 ) 

154 Path(path).mkdir(mode=mode, parents=True, exist_ok=True) 

155 

156 

157ZIP_REGEX = re2.compile(rf"((.*\.zip){re2.escape(os.sep)})?(.*)") 

158 

159 

160@overload 

161def correct_maybe_zipped(fileloc: None) -> None: ... 

162 

163 

164@overload 

165def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ... 

166 

167 

168def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: 

169 """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path.""" 

170 if not fileloc: 

171 return fileloc 

172 search_ = ZIP_REGEX.search(str(fileloc)) 

173 if not search_: 

174 return fileloc 

175 _, archive, _ = search_.groups() 

176 if archive and zipfile.is_zipfile(archive): 

177 return archive 

178 else: 

179 return fileloc 

180 

181 

182def open_maybe_zipped(fileloc, mode="r"): 

183 """ 

184 Open the given file. 

185 

186 If the path contains a folder with a .zip suffix, then the folder 

187 is treated as a zip archive, opening the file inside the archive. 

188 

189 :return: a file object, as in `open`, or as in `ZipFile.open`. 

190 """ 

191 _, archive, filename = ZIP_REGEX.search(fileloc).groups() 

192 if archive and zipfile.is_zipfile(archive): 

193 return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename)) 

194 else: 

195 return open(fileloc, mode=mode) 

196 

197 

198def _find_path_from_directory( 

199 base_dir_path: str | os.PathLike[str], 

200 ignore_file_name: str, 

201 ignore_rule_type: type[_IgnoreRule], 

202) -> Generator[str, None, None]: 

203 """Recursively search the base path and return the list of file paths that should not be ignored. 

204 

205 :param base_dir_path: the base path to be searched 

206 :param ignore_file_name: the file name containing regular expressions for files that should be ignored. 

207 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. 

208 

209 :return: a generator of file paths which should not be ignored. 

210 """ 

211 # A Dict of patterns, keyed using resolved, absolute paths 

212 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} 

213 

214 for root, dirs, files in os.walk(base_dir_path, followlinks=True): 

215 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) 

216 

217 ignore_file_path = Path(root) / ignore_file_name 

218 if ignore_file_path.is_file(): 

219 with open(ignore_file_path) as ifile: 

220 lines_no_comments = [re2.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")] 

221 # append new patterns and filter out "None" objects, which are invalid patterns 

222 patterns += [ 

223 p 

224 for p in [ 

225 ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path) 

226 for line in lines_no_comments 

227 if line 

228 ] 

229 if p is not None 

230 ] 

231 # evaluation order of patterns is important with negation 

232 # so that later patterns can override earlier patterns 

233 patterns = list(dict.fromkeys(patterns)) 

234 

235 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] 

236 

237 # explicit loop for infinite recursion detection since we are following symlinks in this walk 

238 for sd in dirs: 

239 dirpath = (Path(root) / sd).resolve() 

240 if dirpath in patterns_by_dir: 

241 raise RuntimeError( 

242 "Detected recursive loop when walking DAG directory " 

243 f"{base_dir_path}: {dirpath} has appeared more than once." 

244 ) 

245 patterns_by_dir.update({dirpath: patterns.copy()}) 

246 

247 for file in files: 

248 if file != ignore_file_name: 

249 abs_file_path = Path(root) / file 

250 if not ignore_rule_type.match(abs_file_path, patterns): 

251 yield str(abs_file_path) 

252 

253 

254def find_path_from_directory( 

255 base_dir_path: str | os.PathLike[str], 

256 ignore_file_name: str, 

257 ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"), 

258) -> Generator[str, None, None]: 

259 """Recursively search the base path for a list of file paths that should not be ignored. 

260 

261 :param base_dir_path: the base path to be searched 

262 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored 

263 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob 

264 

265 :return: a generator of file paths. 

266 """ 

267 if ignore_file_syntax == "glob": 

268 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) 

269 elif ignore_file_syntax == "regexp" or not ignore_file_syntax: 

270 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) 

271 else: 

272 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}") 

273 

274 

275def list_py_file_paths( 

276 directory: str | os.PathLike[str] | None, 

277 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True), 

278 include_examples: bool | None = None, 

279) -> list[str]: 

280 """Traverse a directory and look for Python files. 

281 

282 :param directory: the directory to traverse 

283 :param safe_mode: whether to use a heuristic to determine whether a file 

284 contains Airflow DAG definitions. If not provided, use the 

285 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default 

286 to safe. 

287 :param include_examples: include example DAGs 

288 :return: a list of paths to Python files in the specified directory 

289 """ 

290 if include_examples is None: 

291 include_examples = conf.getboolean("core", "LOAD_EXAMPLES") 

292 file_paths: list[str] = [] 

293 if directory is None: 

294 file_paths = [] 

295 elif os.path.isfile(directory): 

296 file_paths = [str(directory)] 

297 elif os.path.isdir(directory): 

298 file_paths.extend(find_dag_file_paths(directory, safe_mode)) 

299 if include_examples: 

300 from airflow import example_dags 

301 

302 example_dag_folder = next(iter(example_dags.__path__)) 

303 file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False)) 

304 return file_paths 

305 

306 

307def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]: 

308 """Find file paths of all DAG files.""" 

309 file_paths = [] 

310 

311 for file_path in find_path_from_directory(directory, ".airflowignore"): 

312 path = Path(file_path) 

313 try: 

314 if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)): 

315 if might_contain_dag(file_path, safe_mode): 

316 file_paths.append(file_path) 

317 except Exception: 

318 log.exception("Error while examining %s", file_path) 

319 

320 return file_paths 

321 

322 

323COMMENT_PATTERN = re2.compile(r"\s*#.*") 

324 

325 

326def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool: 

327 """ 

328 Check whether a Python file contains Airflow DAGs. 

329 

330 When safe_mode is off (with False value), this function always returns True. 

331 

332 If might_contain_dag_callable isn't specified, it uses airflow default heuristic 

333 """ 

334 if not safe_mode: 

335 return True 

336 

337 might_contain_dag_callable = conf.getimport( 

338 "core", 

339 "might_contain_dag_callable", 

340 fallback="airflow.utils.file.might_contain_dag_via_default_heuristic", 

341 ) 

342 return might_contain_dag_callable(file_path=file_path, zip_file=zip_file) 

343 

344 

345def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool: 

346 """ 

347 Heuristic that guesses whether a Python file contains an Airflow DAG definition. 

348 

349 :param file_path: Path to the file to be checked. 

350 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. 

351 :return: True, if file might contain DAGs. 

352 """ 

353 if zip_file: 

354 with zip_file.open(file_path) as current_file: 

355 content = current_file.read() 

356 else: 

357 if zipfile.is_zipfile(file_path): 

358 return True 

359 with open(file_path, "rb") as dag_file: 

360 content = dag_file.read() 

361 content = content.lower() 

362 return all(s in content for s in (b"dag", b"airflow")) 

363 

364 

365def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: 

366 for st in module.body: 

367 if isinstance(st, ast.Import): 

368 for n in st.names: 

369 yield n.name 

370 elif isinstance(st, ast.ImportFrom) and st.module is not None: 

371 yield st.module 

372 

373 

374def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: 

375 """Find Airflow modules imported in the given file.""" 

376 try: 

377 parsed = ast.parse(Path(file_path).read_bytes()) 

378 except Exception: 

379 return 

380 for m in _find_imported_modules(parsed): 

381 if m.startswith("airflow."): 

382 yield m 

383 

384 

385def get_unique_dag_module_name(file_path: str) -> str: 

386 """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}.""" 

387 if isinstance(file_path, str): 

388 path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest() 

389 org_mod_name = Path(file_path).stem 

390 return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name) 

391 raise ValueError("file_path should be a string to generate unique module name")