Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/dagbag.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

374 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import hashlib 

21import importlib 

22import importlib.machinery 

23import importlib.util 

24import os 

25import sys 

26import textwrap 

27import traceback 

28import warnings 

29import zipfile 

30from datetime import datetime, timedelta 

31from pathlib import Path 

32from typing import TYPE_CHECKING, NamedTuple 

33 

34from sqlalchemy import ( 

35 Column, 

36 String, 

37) 

38from sqlalchemy.exc import OperationalError 

39from tabulate import tabulate 

40 

41from airflow import settings 

42from airflow.configuration import conf 

43from airflow.exceptions import ( 

44 AirflowClusterPolicyError, 

45 AirflowClusterPolicySkipDag, 

46 AirflowClusterPolicyViolation, 

47 AirflowDagCycleException, 

48 AirflowDagDuplicatedIdException, 

49 RemovedInAirflow3Warning, 

50) 

51from airflow.models.base import Base 

52from airflow.stats import Stats 

53from airflow.utils import timezone 

54from airflow.utils.dag_cycle_tester import check_cycle 

55from airflow.utils.docs import get_docs_url 

56from airflow.utils.file import ( 

57 correct_maybe_zipped, 

58 get_unique_dag_module_name, 

59 list_py_file_paths, 

60 might_contain_dag, 

61) 

62from airflow.utils.log.logging_mixin import LoggingMixin 

63from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries 

64from airflow.utils.session import NEW_SESSION, provide_session 

65from airflow.utils.timeout import timeout 

66from airflow.utils.types import NOTSET 

67from airflow.utils.warnings import capture_with_reraise 

68 

69if TYPE_CHECKING: 

70 from sqlalchemy.orm import Session 

71 

72 from airflow.models.dag import DAG 

73 from airflow.utils.types import ArgNotSet 

74 

75 

76class FileLoadStat(NamedTuple): 

77 """ 

78 Information about single file. 

79 

80 :param file: Loaded file. 

81 :param duration: Time spent on process file. 

82 :param dag_num: Total number of DAGs loaded in this file. 

83 :param task_num: Total number of Tasks loaded in this file. 

84 :param dags: DAGs names loaded in this file. 

85 :param warning_num: Total number of warnings captured from processing this file. 

86 """ 

87 

88 file: str 

89 duration: timedelta 

90 dag_num: int 

91 task_num: int 

92 dags: str 

93 warning_num: int 

94 

95 

96class DagBag(LoggingMixin): 

97 """ 

98 A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings. 

99 

100 Some possible setting are database to use as a backend and what executor 

101 to use to fire off tasks. This makes it easier to run distinct environments 

102 for say production and development, tests, or for different teams or security 

103 profiles. What would have been system level settings are now dagbag level so 

104 that one system can run multiple, independent settings sets. 

105 

106 :param dag_folder: the folder to scan to find DAGs 

107 :param include_examples: whether to include the examples that ship 

108 with airflow or not 

109 :param safe_mode: when ``False``, scans all python modules for dags. 

110 When ``True`` uses heuristics (files containing ``DAG`` and ``airflow`` strings) 

111 to filter python modules to scan for dags. 

112 :param read_dags_from_db: Read DAGs from DB if ``True`` is passed. 

113 If ``False`` DAGs are read from python files. 

114 :param store_serialized_dags: deprecated parameter, same effect as `read_dags_from_db` 

115 :param load_op_links: Should the extra operator link be loaded via plugins when 

116 de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links 

117 are not loaded to not run User code in Scheduler. 

118 :param collect_dags: when True, collects dags during class initialization. 

119 """ 

120 

121 def __init__( 

122 self, 

123 dag_folder: str | Path | None = None, 

124 include_examples: bool | ArgNotSet = NOTSET, 

125 safe_mode: bool | ArgNotSet = NOTSET, 

126 read_dags_from_db: bool = False, 

127 store_serialized_dags: bool | None = None, 

128 load_op_links: bool = True, 

129 collect_dags: bool = True, 

130 ): 

131 # Avoid circular import 

132 

133 super().__init__() 

134 

135 include_examples = ( 

136 include_examples 

137 if isinstance(include_examples, bool) 

138 else conf.getboolean("core", "LOAD_EXAMPLES") 

139 ) 

140 safe_mode = ( 

141 safe_mode if isinstance(safe_mode, bool) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE") 

142 ) 

143 

144 if store_serialized_dags: 

145 warnings.warn( 

146 "The store_serialized_dags parameter has been deprecated. " 

147 "You should pass the read_dags_from_db parameter.", 

148 RemovedInAirflow3Warning, 

149 stacklevel=2, 

150 ) 

151 read_dags_from_db = store_serialized_dags 

152 

153 dag_folder = dag_folder or settings.DAGS_FOLDER 

154 self.dag_folder = dag_folder 

155 self.dags: dict[str, DAG] = {} 

156 # the file's last modified timestamp when we last read it 

157 self.file_last_changed: dict[str, datetime] = {} 

158 self.import_errors: dict[str, str] = {} 

159 self.captured_warnings: dict[str, tuple[str, ...]] = {} 

160 self.has_logged = False 

161 self.read_dags_from_db = read_dags_from_db 

162 # Only used by read_dags_from_db=True 

163 self.dags_last_fetched: dict[str, datetime] = {} 

164 # Only used by SchedulerJob to compare the dag_hash to identify change in DAGs 

165 self.dags_hash: dict[str, str] = {} 

166 

167 self.dagbag_import_error_tracebacks = conf.getboolean("core", "dagbag_import_error_tracebacks") 

168 self.dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth") 

169 if collect_dags: 

170 self.collect_dags( 

171 dag_folder=dag_folder, 

172 include_examples=include_examples, 

173 safe_mode=safe_mode, 

174 ) 

175 # Should the extra operator link be loaded via plugins? 

176 # This flag is set to False in Scheduler so that Extra Operator links are not loaded 

177 self.load_op_links = load_op_links 

178 

179 def size(self) -> int: 

180 """:return: the amount of dags contained in this dagbag""" 

181 return len(self.dags) 

182 

183 @property 

184 def store_serialized_dags(self) -> bool: 

185 """Whether to read dags from DB.""" 

186 warnings.warn( 

187 "The store_serialized_dags property has been deprecated. Use read_dags_from_db instead.", 

188 RemovedInAirflow3Warning, 

189 stacklevel=2, 

190 ) 

191 return self.read_dags_from_db 

192 

193 @property 

194 def dag_ids(self) -> list[str]: 

195 """ 

196 Get DAG ids. 

197 

198 :return: a list of DAG IDs in this bag 

199 """ 

200 return list(self.dags) 

201 

202 @provide_session 

203 def get_dag(self, dag_id, session: Session = None): 

204 """ 

205 Get the DAG out of the dictionary, and refreshes it if expired. 

206 

207 :param dag_id: DAG ID 

208 """ 

209 # Avoid circular import 

210 from airflow.models.dag import DagModel 

211 

212 if self.read_dags_from_db: 

213 # Import here so that serialized dag is only imported when serialization is enabled 

214 from airflow.models.serialized_dag import SerializedDagModel 

215 

216 if dag_id not in self.dags: 

217 # Load from DB if not (yet) in the bag 

218 self._add_dag_from_db(dag_id=dag_id, session=session) 

219 return self.dags.get(dag_id) 

220 

221 # If DAG is in the DagBag, check the following 

222 # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs) 

223 # 2. check the last_updated and hash columns in SerializedDag table to see if 

224 # Serialized DAG is updated 

225 # 3. if (2) is yes, fetch the Serialized DAG. 

226 # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag 

227 # if it exists and return None. 

228 min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL) 

229 if ( 

230 dag_id in self.dags_last_fetched 

231 and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs 

232 ): 

233 sd_latest_version_and_updated_datetime = ( 

234 SerializedDagModel.get_latest_version_hash_and_updated_datetime( 

235 dag_id=dag_id, session=session 

236 ) 

237 ) 

238 if not sd_latest_version_and_updated_datetime: 

239 self.log.warning("Serialized DAG %s no longer exists", dag_id) 

240 del self.dags[dag_id] 

241 del self.dags_last_fetched[dag_id] 

242 del self.dags_hash[dag_id] 

243 return None 

244 

245 sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime 

246 

247 if ( 

248 sd_last_updated_datetime > self.dags_last_fetched[dag_id] 

249 or sd_latest_version != self.dags_hash[dag_id] 

250 ): 

251 self._add_dag_from_db(dag_id=dag_id, session=session) 

252 

253 return self.dags.get(dag_id) 

254 

255 # If asking for a known subdag, we want to refresh the parent 

256 dag = None 

257 root_dag_id = dag_id 

258 if dag_id in self.dags: 

259 dag = self.dags[dag_id] 

260 if dag.parent_dag: 

261 root_dag_id = dag.parent_dag.dag_id 

262 

263 # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized? 

264 orm_dag = DagModel.get_current(root_dag_id, session=session) 

265 if not orm_dag: 

266 return self.dags.get(dag_id) 

267 

268 # If the dag corresponding to root_dag_id is absent or expired 

269 is_missing = root_dag_id not in self.dags 

270 is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired 

271 if is_expired: 

272 # Remove associated dags so we can re-add them. 

273 self.dags = { 

274 key: dag 

275 for key, dag in self.dags.items() 

276 if root_dag_id != key and not (dag.parent_dag and root_dag_id == dag.parent_dag.dag_id) 

277 } 

278 if is_missing or is_expired: 

279 # Reprocess source file. 

280 found_dags = self.process_file( 

281 filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False 

282 ) 

283 

284 # If the source file no longer exports `dag_id`, delete it from self.dags 

285 if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]: 

286 return self.dags[dag_id] 

287 elif dag_id in self.dags: 

288 del self.dags[dag_id] 

289 return self.dags.get(dag_id) 

290 

291 def _add_dag_from_db(self, dag_id: str, session: Session): 

292 """Add DAG to DagBag from DB.""" 

293 from airflow.models.serialized_dag import SerializedDagModel 

294 

295 row = SerializedDagModel.get(dag_id, session) 

296 if not row: 

297 return None 

298 

299 row.load_op_links = self.load_op_links 

300 dag = row.dag 

301 for subdag in dag.subdags: 

302 self.dags[subdag.dag_id] = subdag 

303 self.dags[dag.dag_id] = dag 

304 self.dags_last_fetched[dag.dag_id] = timezone.utcnow() 

305 self.dags_hash[dag.dag_id] = row.dag_hash 

306 

307 def process_file(self, filepath, only_if_updated=True, safe_mode=True): 

308 """Given a path to a python module or zip file, import the module and look for dag objects within.""" 

309 from airflow.models.dag import DagContext 

310 

311 # if the source file no longer exists in the DB or in the filesystem, 

312 # return an empty list 

313 # todo: raise exception? 

314 

315 if filepath is None or not os.path.isfile(filepath): 

316 return [] 

317 

318 try: 

319 # This failed before in what may have been a git sync 

320 # race condition 

321 file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath)) 

322 if ( 

323 only_if_updated 

324 and filepath in self.file_last_changed 

325 and file_last_changed_on_disk == self.file_last_changed[filepath] 

326 ): 

327 return [] 

328 except Exception as e: 

329 self.log.exception(e) 

330 return [] 

331 

332 # Ensure we don't pick up anything else we didn't mean to 

333 DagContext.autoregistered_dags.clear() 

334 

335 self.captured_warnings.pop(filepath, None) 

336 with capture_with_reraise() as captured_warnings: 

337 if filepath.endswith(".py") or not zipfile.is_zipfile(filepath): 

338 mods = self._load_modules_from_file(filepath, safe_mode) 

339 else: 

340 mods = self._load_modules_from_zip(filepath, safe_mode) 

341 

342 if captured_warnings: 

343 formatted_warnings = [] 

344 for msg in captured_warnings: 

345 category = msg.category.__name__ 

346 if (module := msg.category.__module__) != "builtins": 

347 category = f"{module}.{category}" 

348 formatted_warnings.append(f"{msg.filename}:{msg.lineno}: {category}: {msg.message}") 

349 self.captured_warnings[filepath] = tuple(formatted_warnings) 

350 

351 found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk) 

352 

353 self.file_last_changed[filepath] = file_last_changed_on_disk 

354 return found_dags 

355 

356 def _load_modules_from_file(self, filepath, safe_mode): 

357 from airflow.models.dag import DagContext 

358 

359 if not might_contain_dag(filepath, safe_mode): 

360 # Don't want to spam user with skip messages 

361 if not self.has_logged: 

362 self.has_logged = True 

363 self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath) 

364 return [] 

365 

366 self.log.debug("Importing %s", filepath) 

367 mod_name = get_unique_dag_module_name(filepath) 

368 

369 if mod_name in sys.modules: 

370 del sys.modules[mod_name] 

371 

372 DagContext.current_autoregister_module_name = mod_name 

373 

374 def parse(mod_name, filepath): 

375 try: 

376 loader = importlib.machinery.SourceFileLoader(mod_name, filepath) 

377 spec = importlib.util.spec_from_loader(mod_name, loader) 

378 new_module = importlib.util.module_from_spec(spec) 

379 sys.modules[spec.name] = new_module 

380 loader.exec_module(new_module) 

381 return [new_module] 

382 except Exception as e: 

383 DagContext.autoregistered_dags.clear() 

384 self.log.exception("Failed to import: %s", filepath) 

385 if self.dagbag_import_error_tracebacks: 

386 self.import_errors[filepath] = traceback.format_exc( 

387 limit=-self.dagbag_import_error_traceback_depth 

388 ) 

389 else: 

390 self.import_errors[filepath] = str(e) 

391 return [] 

392 

393 dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath) 

394 

395 if not isinstance(dagbag_import_timeout, (int, float)): 

396 raise TypeError( 

397 f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float" 

398 ) 

399 

400 if dagbag_import_timeout <= 0: # no parsing timeout 

401 return parse(mod_name, filepath) 

402 

403 timeout_msg = ( 

404 f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n" 

405 "Please take a look at these docs to improve your DAG import time:\n" 

406 f"* {get_docs_url('best-practices.html#top-level-python-code')}\n" 

407 f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}" 

408 ) 

409 with timeout(dagbag_import_timeout, error_message=timeout_msg): 

410 return parse(mod_name, filepath) 

411 

412 def _load_modules_from_zip(self, filepath, safe_mode): 

413 from airflow.models.dag import DagContext 

414 

415 mods = [] 

416 with zipfile.ZipFile(filepath) as current_zip_file: 

417 for zip_info in current_zip_file.infolist(): 

418 zip_path = Path(zip_info.filename) 

419 if zip_path.suffix not in [".py", ".pyc"] or len(zip_path.parts) > 1: 

420 continue 

421 

422 if zip_path.stem == "__init__": 

423 self.log.warning("Found %s at root of %s", zip_path.name, filepath) 

424 

425 self.log.debug("Reading %s from %s", zip_info.filename, filepath) 

426 

427 if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file): 

428 # todo: create ignore list 

429 # Don't want to spam user with skip messages 

430 if not self.has_logged: 

431 self.has_logged = True 

432 self.log.info( 

433 "File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename 

434 ) 

435 continue 

436 

437 mod_name = zip_path.stem 

438 if mod_name in sys.modules: 

439 del sys.modules[mod_name] 

440 

441 DagContext.current_autoregister_module_name = mod_name 

442 try: 

443 sys.path.insert(0, filepath) 

444 current_module = importlib.import_module(mod_name) 

445 mods.append(current_module) 

446 except Exception as e: 

447 DagContext.autoregistered_dags.clear() 

448 fileloc = os.path.join(filepath, zip_info.filename) 

449 self.log.exception("Failed to import: %s", fileloc) 

450 if self.dagbag_import_error_tracebacks: 

451 self.import_errors[fileloc] = traceback.format_exc( 

452 limit=-self.dagbag_import_error_traceback_depth 

453 ) 

454 else: 

455 self.import_errors[fileloc] = str(e) 

456 finally: 

457 if sys.path[0] == filepath: 

458 del sys.path[0] 

459 return mods 

460 

461 def _process_modules(self, filepath, mods, file_last_changed_on_disk): 

462 from airflow.models.dag import DAG, DagContext # Avoid circular import 

463 

464 top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)} 

465 

466 top_level_dags.update(DagContext.autoregistered_dags) 

467 

468 DagContext.current_autoregister_module_name = None 

469 DagContext.autoregistered_dags.clear() 

470 

471 found_dags = [] 

472 

473 for dag, mod in top_level_dags: 

474 dag.fileloc = mod.__file__ 

475 try: 

476 dag.validate() 

477 self.bag_dag(dag=dag, root_dag=dag) 

478 except AirflowClusterPolicySkipDag: 

479 pass 

480 except Exception as e: 

481 self.log.exception("Failed to bag_dag: %s", dag.fileloc) 

482 self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}" 

483 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk 

484 else: 

485 found_dags.append(dag) 

486 found_dags += dag.subdags 

487 return found_dags 

488 

489 def bag_dag(self, dag, root_dag): 

490 """ 

491 Add the DAG into the bag, recurses into sub dags. 

492 

493 :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags. 

494 :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag. 

495 """ 

496 self._bag_dag(dag=dag, root_dag=root_dag, recursive=True) 

497 

498 def _bag_dag(self, *, dag, root_dag, recursive): 

499 """Actual implementation of bagging a dag. 

500 

501 The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``, 

502 intended to only be used by the ``_bag_dag()`` implementation. 

503 """ 

504 check_cycle(dag) # throws if a task cycle is found 

505 

506 dag.resolve_template_files() 

507 dag.last_loaded = timezone.utcnow() 

508 

509 try: 

510 # Check policies 

511 settings.dag_policy(dag) 

512 

513 for task in dag.tasks: 

514 settings.task_policy(task) 

515 except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag): 

516 raise 

517 except Exception as e: 

518 self.log.exception(e) 

519 raise AirflowClusterPolicyError(e) 

520 

521 subdags = dag.subdags 

522 

523 try: 

524 # DAG.subdags automatically performs DFS search, so we don't recurse 

525 # into further _bag_dag() calls. 

526 if recursive: 

527 for subdag in subdags: 

528 subdag.fileloc = dag.fileloc 

529 subdag.parent_dag = dag 

530 self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False) 

531 

532 prev_dag = self.dags.get(dag.dag_id) 

533 if prev_dag and prev_dag.fileloc != dag.fileloc: 

534 raise AirflowDagDuplicatedIdException( 

535 dag_id=dag.dag_id, 

536 incoming=dag.fileloc, 

537 existing=self.dags[dag.dag_id].fileloc, 

538 ) 

539 self.dags[dag.dag_id] = dag 

540 self.log.debug("Loaded DAG %s", dag) 

541 except (AirflowDagCycleException, AirflowDagDuplicatedIdException): 

542 # There was an error in bagging the dag. Remove it from the list of dags 

543 self.log.exception("Exception bagging dag: %s", dag.dag_id) 

544 # Only necessary at the root level since DAG.subdags automatically 

545 # performs DFS to search through all subdags 

546 if recursive: 

547 for subdag in subdags: 

548 if subdag.dag_id in self.dags: 

549 del self.dags[subdag.dag_id] 

550 raise 

551 

552 def collect_dags( 

553 self, 

554 dag_folder: str | Path | None = None, 

555 only_if_updated: bool = True, 

556 include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"), 

557 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"), 

558 ): 

559 """ 

560 Look for python modules in a given path, import them, and add them to the dagbag collection. 

561 

562 Note that if a ``.airflowignore`` file is found while processing 

563 the directory, it will behave much like a ``.gitignore``, 

564 ignoring files that match any of the patterns specified 

565 in the file. 

566 

567 **Note**: The patterns in ``.airflowignore`` are interpreted as either 

568 un-anchored regexes or gitignore-like glob expressions, depending on 

569 the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter. 

570 """ 

571 if self.read_dags_from_db: 

572 return 

573 

574 self.log.info("Filling up the DagBag from %s", dag_folder) 

575 dag_folder = dag_folder or self.dag_folder 

576 # Used to store stats around DagBag processing 

577 stats = [] 

578 

579 # Ensure dag_folder is a str -- it may have been a pathlib.Path 

580 dag_folder = correct_maybe_zipped(str(dag_folder)) 

581 for filepath in list_py_file_paths( 

582 dag_folder, 

583 safe_mode=safe_mode, 

584 include_examples=include_examples, 

585 ): 

586 try: 

587 file_parse_start_dttm = timezone.utcnow() 

588 found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode) 

589 

590 file_parse_end_dttm = timezone.utcnow() 

591 stats.append( 

592 FileLoadStat( 

593 file=filepath.replace(settings.DAGS_FOLDER, ""), 

594 duration=file_parse_end_dttm - file_parse_start_dttm, 

595 dag_num=len(found_dags), 

596 task_num=sum(len(dag.tasks) for dag in found_dags), 

597 dags=str([dag.dag_id for dag in found_dags]), 

598 warning_num=len(self.captured_warnings.get(filepath, [])), 

599 ) 

600 ) 

601 except Exception as e: 

602 self.log.exception(e) 

603 

604 self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True) 

605 

606 def collect_dags_from_db(self): 

607 """Collect DAGs from database.""" 

608 from airflow.models.serialized_dag import SerializedDagModel 

609 

610 with Stats.timer("collect_db_dags"): 

611 self.log.info("Filling up the DagBag from database") 

612 

613 # The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted 

614 # from the table by the scheduler job. 

615 self.dags = SerializedDagModel.read_all_dags() 

616 

617 # Adds subdags. 

618 # DAG post-processing steps such as self.bag_dag and croniter are not needed as 

619 # they are done by scheduler before serialization. 

620 subdags = {} 

621 for dag in self.dags.values(): 

622 for subdag in dag.subdags: 

623 subdags[subdag.dag_id] = subdag 

624 self.dags.update(subdags) 

625 

626 def dagbag_report(self): 

627 """Print a report around DagBag loading stats.""" 

628 stats = self.dagbag_stats 

629 dag_folder = self.dag_folder 

630 duration = sum((o.duration for o in stats), timedelta()).total_seconds() 

631 dag_num = sum(o.dag_num for o in stats) 

632 task_num = sum(o.task_num for o in stats) 

633 table = tabulate(stats, headers="keys") 

634 

635 report = textwrap.dedent( 

636 f"""\n 

637 ------------------------------------------------------------------- 

638 DagBag loading stats for {dag_folder} 

639 ------------------------------------------------------------------- 

640 Number of DAGs: {dag_num} 

641 Total task number: {task_num} 

642 DagBag parsing time: {duration}\n{table} 

643 """ 

644 ) 

645 return report 

646 

647 @classmethod 

648 @provide_session 

649 def _sync_to_db( 

650 cls, 

651 dags: dict[str, DAG], 

652 processor_subdir: str | None = None, 

653 session: Session = NEW_SESSION, 

654 ): 

655 """Save attributes about list of DAG to the DB.""" 

656 # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag 

657 from airflow.models.dag import DAG 

658 from airflow.models.serialized_dag import SerializedDagModel 

659 

660 log = cls.logger() 

661 

662 def _serialize_dag_capturing_errors(dag, session, processor_subdir): 

663 """ 

664 Try to serialize the dag to the DB, but make a note of any errors. 

665 

666 We can't place them directly in import_errors, as this may be retried, and work the next time 

667 """ 

668 if dag.is_subdag: 

669 return [] 

670 try: 

671 # We can't use bulk_write_to_db as we want to capture each error individually 

672 dag_was_updated = SerializedDagModel.write_dag( 

673 dag, 

674 min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL, 

675 session=session, 

676 processor_subdir=processor_subdir, 

677 ) 

678 if dag_was_updated: 

679 DagBag._sync_perm_for_dag(dag, session=session) 

680 return [] 

681 except OperationalError: 

682 raise 

683 except Exception: 

684 log.exception("Failed to write serialized DAG: %s", dag.fileloc) 

685 dagbag_import_error_traceback_depth = conf.getint( 

686 "core", "dagbag_import_error_traceback_depth" 

687 ) 

688 return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))] 

689 

690 # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case 

691 # of any Operational Errors 

692 # In case of failures, provide_session handles rollback 

693 import_errors = {} 

694 for attempt in run_with_db_retries(logger=log): 

695 with attempt: 

696 serialize_errors = [] 

697 log.debug( 

698 "Running dagbag.sync_to_db with retries. Try %d of %d", 

699 attempt.retry_state.attempt_number, 

700 MAX_DB_RETRIES, 

701 ) 

702 log.debug("Calling the DAG.bulk_sync_to_db method") 

703 try: 

704 # Write Serialized DAGs to DB, capturing errors 

705 for dag in dags.values(): 

706 serialize_errors.extend( 

707 _serialize_dag_capturing_errors(dag, session, processor_subdir) 

708 ) 

709 

710 DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session) 

711 except OperationalError: 

712 session.rollback() 

713 raise 

714 # Only now we are "complete" do we update import_errors - don't want to record errors from 

715 # previous failed attempts 

716 import_errors.update(dict(serialize_errors)) 

717 

718 return import_errors 

719 

720 @provide_session 

721 def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION): 

722 import_errors = DagBag._sync_to_db(dags=self.dags, processor_subdir=processor_subdir, session=session) 

723 self.import_errors.update(import_errors) 

724 

725 @classmethod 

726 @provide_session 

727 def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): 

728 """Sync DAG specific permissions.""" 

729 root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id 

730 

731 cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) 

732 from airflow.www.security_appless import ApplessAirflowSecurityManager 

733 

734 security_manager = ApplessAirflowSecurityManager(session=session) 

735 security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) 

736 

737 

738def generate_md5_hash(context): 

739 fileloc = context.get_current_parameters()["fileloc"] 

740 return hashlib.md5(fileloc.encode()).hexdigest() 

741 

742 

743class DagPriorityParsingRequest(Base): 

744 """Model to store the dag parsing requests that will be prioritized when parsing files.""" 

745 

746 __tablename__ = "dag_priority_parsing_request" 

747 

748 # Adding a unique constraint to fileloc results in the creation of an index and we have a limitation 

749 # on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc 

750 # size consistent with other tables. This is a workaround to enforce the unique constraint. 

751 id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash) 

752 

753 # The location of the file containing the DAG object 

754 # Note: Do not depend on fileloc pointing to a file; in the case of a 

755 # packaged DAG, it will point to the subpath of the DAG within the 

756 # associated zip. 

757 fileloc = Column(String(2000), nullable=False) 

758 

759 def __init__(self, fileloc: str) -> None: 

760 super().__init__() 

761 self.fileloc = fileloc 

762 

763 def __repr__(self) -> str: 

764 return f"<DagPriorityParsingRequest: fileloc={self.fileloc}>"