Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/file.py: 30%

170 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import io 

21import logging 

22import os 

23import re 

24import zipfile 

25from collections import OrderedDict 

26from pathlib import Path 

27from typing import TYPE_CHECKING, Generator, NamedTuple, Pattern, overload 

28 

29from pathspec.patterns import GitWildMatchPattern 

30from typing_extensions import Protocol 

31 

32from airflow.configuration import conf 

33from airflow.exceptions import RemovedInAirflow3Warning 

34 

35if TYPE_CHECKING: 

36 import pathlib 

37 

38log = logging.getLogger(__name__) 

39 

40 

41class _IgnoreRule(Protocol): 

42 """Interface for ignore rules for structural subtyping""" 

43 

44 @staticmethod 

45 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

46 """ 

47 Build an ignore rule from the supplied pattern where base_dir 

48 and definition_file should be absolute paths. 

49 """ 

50 

51 @staticmethod 

52 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

53 """Match a candidate absolute path against a list of rules""" 

54 

55 

56class _RegexpIgnoreRule(NamedTuple): 

57 """Typed namedtuple with utility functions for regexp ignore rules""" 

58 

59 pattern: Pattern 

60 base_dir: Path 

61 

62 @staticmethod 

63 def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: 

64 """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid""" 

65 try: 

66 return _RegexpIgnoreRule(re.compile(pattern), base_dir) 

67 except re.error as e: 

68 log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) 

69 return None 

70 

71 @staticmethod 

72 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

73 """Match a list of ignore rules against the supplied path""" 

74 for rule in rules: 

75 if not isinstance(rule, _RegexpIgnoreRule): 

76 raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") 

77 if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: 

78 return True 

79 return False 

80 

81 

82class _GlobIgnoreRule(NamedTuple): 

83 """Typed namedtuple with utility functions for glob ignore rules""" 

84 

85 pattern: Pattern 

86 raw_pattern: str 

87 include: bool | None = None 

88 relative_to: Path | None = None 

89 

90 @staticmethod 

91 def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None: 

92 """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid""" 

93 relative_to: Path | None = None 

94 if pattern.strip() == "/": 

95 # "/" doesn't match anything in gitignore 

96 log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) 

97 return None 

98 if pattern.startswith("/") or "/" in pattern.rstrip("/"): 

99 # See https://git-scm.com/docs/gitignore 

100 # > If there is a separator at the beginning or middle (or both) of the pattern, then the 

101 # > pattern is relative to the directory level of the particular .gitignore file itself. 

102 # > Otherwise the pattern may also match at any level below the .gitignore level. 

103 relative_to = definition_file.parent 

104 ignore_pattern = GitWildMatchPattern(pattern) 

105 return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to) 

106 

107 @staticmethod 

108 def match(path: Path, rules: list[_IgnoreRule]) -> bool: 

109 """Match a list of ignore rules against the supplied path""" 

110 matched = False 

111 for r in rules: 

112 if not isinstance(r, _GlobIgnoreRule): 

113 raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}") 

114 rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely 

115 rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name) 

116 if rule.raw_pattern.endswith("/") and path.is_dir(): 

117 # ensure the test path will potentially match a directory pattern if it is a directory 

118 rel_path += "/" 

119 if rule.include is not None and rule.pattern.match(rel_path) is not None: 

120 matched = rule.include 

121 return matched 

122 

123 

124def TemporaryDirectory(*args, **kwargs): 

125 """This function is deprecated. Please use `tempfile.TemporaryDirectory`""" 

126 import warnings 

127 from tempfile import TemporaryDirectory as TmpDir 

128 

129 warnings.warn( 

130 "This function is deprecated. Please use `tempfile.TemporaryDirectory`", 

131 RemovedInAirflow3Warning, 

132 stacklevel=2, 

133 ) 

134 

135 return TmpDir(*args, **kwargs) 

136 

137 

138def mkdirs(path, mode): 

139 """ 

140 Creates the directory specified by path, creating intermediate directories 

141 as necessary. If directory already exists, this is a no-op. 

142 

143 :param path: The directory to create 

144 :param mode: The mode to give to the directory e.g. 0o755, ignores umask 

145 """ 

146 import warnings 

147 

148 warnings.warn( 

149 f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`", 

150 RemovedInAirflow3Warning, 

151 stacklevel=2, 

152 ) 

153 Path(path).mkdir(mode=mode, parents=True, exist_ok=True) 

154 

155 

156ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)") 

157 

158 

159@overload 

160def correct_maybe_zipped(fileloc: None) -> None: 

161 ... 

162 

163 

164@overload 

165def correct_maybe_zipped(fileloc: str | Path) -> str | Path: 

166 ... 

167 

168 

169def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: 

170 """ 

171 If the path contains a folder with a .zip suffix, then 

172 the folder is treated as a zip archive and path to zip is returned. 

173 """ 

174 if not fileloc: 

175 return fileloc 

176 search_ = ZIP_REGEX.search(str(fileloc)) 

177 if not search_: 

178 return fileloc 

179 _, archive, _ = search_.groups() 

180 if archive and zipfile.is_zipfile(archive): 

181 return archive 

182 else: 

183 return fileloc 

184 

185 

186def open_maybe_zipped(fileloc, mode="r"): 

187 """ 

188 Opens the given file. If the path contains a folder with a .zip suffix, then 

189 the folder is treated as a zip archive, opening the file inside the archive. 

190 

191 :return: a file object, as in `open`, or as in `ZipFile.open`. 

192 """ 

193 _, archive, filename = ZIP_REGEX.search(fileloc).groups() 

194 if archive and zipfile.is_zipfile(archive): 

195 return io.TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename)) 

196 else: 

197 

198 return open(fileloc, mode=mode) 

199 

200 

201def _find_path_from_directory( 

202 base_dir_path: str, 

203 ignore_file_name: str, 

204 ignore_rule_type: type[_IgnoreRule], 

205) -> Generator[str, None, None]: 

206 """ 

207 Recursively search the base path and return the list of file paths that should not be ignored by 

208 regular expressions in any ignore files at each directory level. 

209 :param base_dir_path: the base path to be searched 

210 :param ignore_file_name: the file name containing regular expressions for files that should be ignored. 

211 :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. 

212 

213 :return: a generator of file paths which should not be ignored. 

214 """ 

215 # A Dict of patterns, keyed using resolved, absolute paths 

216 patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} 

217 

218 for root, dirs, files in os.walk(base_dir_path, followlinks=True): 

219 patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) 

220 

221 ignore_file_path = Path(root) / ignore_file_name 

222 if ignore_file_path.is_file(): 

223 with open(ignore_file_path) as ifile: 

224 lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")] 

225 # append new patterns and filter out "None" objects, which are invalid patterns 

226 patterns += [ 

227 p 

228 for p in [ 

229 ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path) 

230 for line in lines_no_comments 

231 if line 

232 ] 

233 if p is not None 

234 ] 

235 # evaluation order of patterns is important with negation 

236 # so that later patterns can override earlier patterns 

237 patterns = list(OrderedDict.fromkeys(patterns).keys()) 

238 

239 dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] 

240 

241 # explicit loop for infinite recursion detection since we are following symlinks in this walk 

242 for sd in dirs: 

243 dirpath = (Path(root) / sd).resolve() 

244 if dirpath in patterns_by_dir: 

245 raise RuntimeError( 

246 "Detected recursive loop when walking DAG directory " 

247 f"{base_dir_path}: {dirpath} has appeared more than once." 

248 ) 

249 patterns_by_dir.update({dirpath: patterns.copy()}) 

250 

251 for file in files: 

252 if file == ignore_file_name: 

253 continue 

254 abs_file_path = Path(root) / file 

255 if ignore_rule_type.match(abs_file_path, patterns): 

256 continue 

257 yield str(abs_file_path) 

258 

259 

260def find_path_from_directory( 

261 base_dir_path: str, 

262 ignore_file_name: str, 

263 ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"), 

264) -> Generator[str, None, None]: 

265 """ 

266 Recursively search the base path and return the list of file paths that should not be ignored. 

267 :param base_dir_path: the base path to be searched 

268 :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored 

269 :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob 

270 

271 :return: a generator of file paths. 

272 """ 

273 if ignore_file_syntax == "glob": 

274 return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) 

275 elif ignore_file_syntax == "regexp" or not ignore_file_syntax: 

276 return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) 

277 else: 

278 raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}") 

279 

280 

281def list_py_file_paths( 

282 directory: str | pathlib.Path, 

283 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True), 

284 include_examples: bool | None = None, 

285) -> list[str]: 

286 """ 

287 Traverse a directory and look for Python files. 

288 

289 :param directory: the directory to traverse 

290 :param safe_mode: whether to use a heuristic to determine whether a file 

291 contains Airflow DAG definitions. If not provided, use the 

292 core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default 

293 to safe. 

294 :param include_examples: include example DAGs 

295 :return: a list of paths to Python files in the specified directory 

296 """ 

297 if include_examples is None: 

298 include_examples = conf.getboolean("core", "LOAD_EXAMPLES") 

299 file_paths: list[str] = [] 

300 if directory is None: 

301 file_paths = [] 

302 elif os.path.isfile(directory): 

303 file_paths = [str(directory)] 

304 elif os.path.isdir(directory): 

305 file_paths.extend(find_dag_file_paths(directory, safe_mode)) 

306 if include_examples: 

307 from airflow import example_dags 

308 

309 example_dag_folder = example_dags.__path__[0] # type: ignore 

310 file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False)) 

311 return file_paths 

312 

313 

314def find_dag_file_paths(directory: str | pathlib.Path, safe_mode: bool) -> list[str]: 

315 """Finds file paths of all DAG files.""" 

316 file_paths = [] 

317 

318 for file_path in find_path_from_directory(str(directory), ".airflowignore"): 

319 try: 

320 if not os.path.isfile(file_path): 

321 continue 

322 _, file_ext = os.path.splitext(os.path.split(file_path)[-1]) 

323 if file_ext != ".py" and not zipfile.is_zipfile(file_path): 

324 continue 

325 if not might_contain_dag(file_path, safe_mode): 

326 continue 

327 

328 file_paths.append(file_path) 

329 except Exception: 

330 log.exception("Error while examining %s", file_path) 

331 

332 return file_paths 

333 

334 

335COMMENT_PATTERN = re.compile(r"\s*#.*") 

336 

337 

338def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None): 

339 """ 

340 Heuristic that guesses whether a Python file contains an Airflow DAG definition. 

341 

342 :param file_path: Path to the file to be checked. 

343 :param safe_mode: Is safe mode active?. If no, this function always returns True. 

344 :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. 

345 :return: True, if file might contain DAGs. 

346 """ 

347 if not safe_mode: 

348 return True 

349 if zip_file: 

350 with zip_file.open(file_path) as current_file: 

351 content = current_file.read() 

352 else: 

353 if zipfile.is_zipfile(file_path): 

354 return True 

355 with open(file_path, "rb") as dag_file: 

356 content = dag_file.read() 

357 content = content.lower() 

358 return all(s in content for s in (b"dag", b"airflow"))