Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/dagbag.py: 18%

349 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import hashlib 

21import importlib 

22import importlib.machinery 

23import importlib.util 

24import os 

25import sys 

26import textwrap 

27import traceback 

28import warnings 

29import zipfile 

30from datetime import datetime, timedelta 

31from typing import TYPE_CHECKING, NamedTuple 

32 

33from sqlalchemy.exc import OperationalError 

34from sqlalchemy.orm import Session 

35from tabulate import tabulate 

36 

37from airflow import settings 

38from airflow.configuration import conf 

39from airflow.exceptions import ( 

40 AirflowClusterPolicyError, 

41 AirflowClusterPolicyViolation, 

42 AirflowDagCycleException, 

43 AirflowDagDuplicatedIdException, 

44 RemovedInAirflow3Warning, 

45) 

46from airflow.stats import Stats 

47from airflow.utils import timezone 

48from airflow.utils.dag_cycle_tester import check_cycle 

49from airflow.utils.docs import get_docs_url 

50from airflow.utils.file import correct_maybe_zipped, list_py_file_paths, might_contain_dag 

51from airflow.utils.log.logging_mixin import LoggingMixin 

52from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries 

53from airflow.utils.session import NEW_SESSION, provide_session 

54from airflow.utils.timeout import timeout 

55from airflow.utils.types import NOTSET, ArgNotSet 

56 

57if TYPE_CHECKING: 

58 import pathlib 

59 

60 from airflow.models.dag import DAG 

61 

62 

63class FileLoadStat(NamedTuple): 

64 """Information about single file.""" 

65 

66 file: str 

67 duration: timedelta 

68 dag_num: int 

69 task_num: int 

70 dags: str 

71 

72 

73class DagBag(LoggingMixin): 

74 """ 

75 A dagbag is a collection of dags, parsed out of a folder tree and has high 

76 level configuration settings, like what database to use as a backend and 

77 what executor to use to fire off tasks. This makes it easier to run 

78 distinct environments for say production and development, tests, or for 

79 different teams or security profiles. What would have been system level 

80 settings are now dagbag level so that one system can run multiple, 

81 independent settings sets. 

82 

83 :param dag_folder: the folder to scan to find DAGs 

84 :param include_examples: whether to include the examples that ship 

85 with airflow or not 

86 :param read_dags_from_db: Read DAGs from DB if ``True`` is passed. 

87 If ``False`` DAGs are read from python files. 

88 :param load_op_links: Should the extra operator link be loaded via plugins when 

89 de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links 

90 are not loaded to not run User code in Scheduler. 

91 """ 

92 

93 def __init__( 

94 self, 

95 dag_folder: str | pathlib.Path | None = None, 

96 include_examples: bool | ArgNotSet = NOTSET, 

97 safe_mode: bool | ArgNotSet = NOTSET, 

98 read_dags_from_db: bool = False, 

99 store_serialized_dags: bool | None = None, 

100 load_op_links: bool = True, 

101 collect_dags: bool = True, 

102 ): 

103 # Avoid circular import 

104 from airflow.models.dag import DAG 

105 

106 super().__init__() 

107 

108 include_examples = ( 

109 include_examples 

110 if isinstance(include_examples, bool) 

111 else conf.getboolean("core", "LOAD_EXAMPLES") 

112 ) 

113 safe_mode = ( 

114 safe_mode if isinstance(safe_mode, bool) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE") 

115 ) 

116 

117 if store_serialized_dags: 

118 warnings.warn( 

119 "The store_serialized_dags parameter has been deprecated. " 

120 "You should pass the read_dags_from_db parameter.", 

121 RemovedInAirflow3Warning, 

122 stacklevel=2, 

123 ) 

124 read_dags_from_db = store_serialized_dags 

125 

126 dag_folder = dag_folder or settings.DAGS_FOLDER 

127 self.dag_folder = dag_folder 

128 self.dags: dict[str, DAG] = {} 

129 # the file's last modified timestamp when we last read it 

130 self.file_last_changed: dict[str, datetime] = {} 

131 self.import_errors: dict[str, str] = {} 

132 self.has_logged = False 

133 self.read_dags_from_db = read_dags_from_db 

134 # Only used by read_dags_from_db=True 

135 self.dags_last_fetched: dict[str, datetime] = {} 

136 # Only used by SchedulerJob to compare the dag_hash to identify change in DAGs 

137 self.dags_hash: dict[str, str] = {} 

138 

139 self.dagbag_import_error_tracebacks = conf.getboolean("core", "dagbag_import_error_tracebacks") 

140 self.dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth") 

141 if collect_dags: 

142 self.collect_dags( 

143 dag_folder=dag_folder, 

144 include_examples=include_examples, 

145 safe_mode=safe_mode, 

146 ) 

147 # Should the extra operator link be loaded via plugins? 

148 # This flag is set to False in Scheduler so that Extra Operator links are not loaded 

149 self.load_op_links = load_op_links 

150 

151 def size(self) -> int: 

152 """:return: the amount of dags contained in this dagbag""" 

153 return len(self.dags) 

154 

155 @property 

156 def store_serialized_dags(self) -> bool: 

157 """Whether to read dags from DB.""" 

158 warnings.warn( 

159 "The store_serialized_dags property has been deprecated. Use read_dags_from_db instead.", 

160 RemovedInAirflow3Warning, 

161 stacklevel=2, 

162 ) 

163 return self.read_dags_from_db 

164 

165 @property 

166 def dag_ids(self) -> list[str]: 

167 """ 

168 Get DAG ids. 

169 

170 :return: a list of DAG IDs in this bag 

171 """ 

172 return list(self.dags.keys()) 

173 

174 @provide_session 

175 def get_dag(self, dag_id, session: Session = None): 

176 """ 

177 Gets the DAG out of the dictionary, and refreshes it if expired. 

178 

179 :param dag_id: DAG ID 

180 """ 

181 # Avoid circular import 

182 from airflow.models.dag import DagModel 

183 

184 if self.read_dags_from_db: 

185 # Import here so that serialized dag is only imported when serialization is enabled 

186 from airflow.models.serialized_dag import SerializedDagModel 

187 

188 if dag_id not in self.dags: 

189 # Load from DB if not (yet) in the bag 

190 self._add_dag_from_db(dag_id=dag_id, session=session) 

191 return self.dags.get(dag_id) 

192 

193 # If DAG is in the DagBag, check the following 

194 # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs) 

195 # 2. check the last_updated and hash columns in SerializedDag table to see if 

196 # Serialized DAG is updated 

197 # 3. if (2) is yes, fetch the Serialized DAG. 

198 # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag 

199 # if it exists and return None. 

200 min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL) 

201 if ( 

202 dag_id in self.dags_last_fetched 

203 and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs 

204 ): 

205 sd_latest_version_and_updated_datetime = ( 

206 SerializedDagModel.get_latest_version_hash_and_updated_datetime( 

207 dag_id=dag_id, session=session 

208 ) 

209 ) 

210 if not sd_latest_version_and_updated_datetime: 

211 self.log.warning("Serialized DAG %s no longer exists", dag_id) 

212 del self.dags[dag_id] 

213 del self.dags_last_fetched[dag_id] 

214 del self.dags_hash[dag_id] 

215 return None 

216 

217 sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime 

218 

219 if ( 

220 sd_last_updated_datetime > self.dags_last_fetched[dag_id] 

221 or sd_latest_version != self.dags_hash[dag_id] 

222 ): 

223 self._add_dag_from_db(dag_id=dag_id, session=session) 

224 

225 return self.dags.get(dag_id) 

226 

227 # If asking for a known subdag, we want to refresh the parent 

228 dag = None 

229 root_dag_id = dag_id 

230 if dag_id in self.dags: 

231 dag = self.dags[dag_id] 

232 if dag.parent_dag: 

233 root_dag_id = dag.parent_dag.dag_id 

234 

235 # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized? 

236 orm_dag = DagModel.get_current(root_dag_id, session=session) 

237 if not orm_dag: 

238 return self.dags.get(dag_id) 

239 

240 # If the dag corresponding to root_dag_id is absent or expired 

241 is_missing = root_dag_id not in self.dags 

242 is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired 

243 if is_expired: 

244 # Remove associated dags so we can re-add them. 

245 self.dags = { 

246 key: dag 

247 for key, dag in self.dags.items() 

248 if root_dag_id != key and not (dag.parent_dag and root_dag_id == dag.parent_dag.dag_id) 

249 } 

250 if is_missing or is_expired: 

251 # Reprocess source file. 

252 found_dags = self.process_file( 

253 filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False 

254 ) 

255 

256 # If the source file no longer exports `dag_id`, delete it from self.dags 

257 if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]: 

258 return self.dags[dag_id] 

259 elif dag_id in self.dags: 

260 del self.dags[dag_id] 

261 return self.dags.get(dag_id) 

262 

263 def _add_dag_from_db(self, dag_id: str, session: Session): 

264 """Add DAG to DagBag from DB.""" 

265 from airflow.models.serialized_dag import SerializedDagModel 

266 

267 row = SerializedDagModel.get(dag_id, session) 

268 if not row: 

269 return None 

270 

271 row.load_op_links = self.load_op_links 

272 dag = row.dag 

273 for subdag in dag.subdags: 

274 self.dags[subdag.dag_id] = subdag 

275 self.dags[dag.dag_id] = dag 

276 self.dags_last_fetched[dag.dag_id] = timezone.utcnow() 

277 self.dags_hash[dag.dag_id] = row.dag_hash 

278 

279 def process_file(self, filepath, only_if_updated=True, safe_mode=True): 

280 """ 

281 Given a path to a python module or zip file, this method imports 

282 the module and look for dag objects within it. 

283 """ 

284 from airflow.models.dag import DagContext 

285 

286 # if the source file no longer exists in the DB or in the filesystem, 

287 # return an empty list 

288 # todo: raise exception? 

289 

290 if filepath is None or not os.path.isfile(filepath): 

291 return [] 

292 

293 try: 

294 # This failed before in what may have been a git sync 

295 # race condition 

296 file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath)) 

297 if ( 

298 only_if_updated 

299 and filepath in self.file_last_changed 

300 and file_last_changed_on_disk == self.file_last_changed[filepath] 

301 ): 

302 return [] 

303 except Exception as e: 

304 self.log.exception(e) 

305 return [] 

306 

307 # Ensure we don't pick up anything else we didn't mean to 

308 DagContext.autoregistered_dags.clear() 

309 

310 if filepath.endswith(".py") or not zipfile.is_zipfile(filepath): 

311 mods = self._load_modules_from_file(filepath, safe_mode) 

312 else: 

313 mods = self._load_modules_from_zip(filepath, safe_mode) 

314 

315 found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk) 

316 

317 self.file_last_changed[filepath] = file_last_changed_on_disk 

318 return found_dags 

319 

320 def _load_modules_from_file(self, filepath, safe_mode): 

321 from airflow.models.dag import DagContext 

322 

323 if not might_contain_dag(filepath, safe_mode): 

324 # Don't want to spam user with skip messages 

325 if not self.has_logged: 

326 self.has_logged = True 

327 self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath) 

328 return [] 

329 

330 self.log.debug("Importing %s", filepath) 

331 org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1]) 

332 path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest() 

333 mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}" 

334 

335 if mod_name in sys.modules: 

336 del sys.modules[mod_name] 

337 

338 DagContext.current_autoregister_module_name = mod_name 

339 

340 def parse(mod_name, filepath): 

341 try: 

342 loader = importlib.machinery.SourceFileLoader(mod_name, filepath) 

343 spec = importlib.util.spec_from_loader(mod_name, loader) 

344 new_module = importlib.util.module_from_spec(spec) 

345 sys.modules[spec.name] = new_module 

346 loader.exec_module(new_module) 

347 return [new_module] 

348 except Exception as e: 

349 DagContext.autoregistered_dags.clear() 

350 self.log.exception("Failed to import: %s", filepath) 

351 if self.dagbag_import_error_tracebacks: 

352 self.import_errors[filepath] = traceback.format_exc( 

353 limit=-self.dagbag_import_error_traceback_depth 

354 ) 

355 else: 

356 self.import_errors[filepath] = str(e) 

357 return [] 

358 

359 dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath) 

360 

361 if not isinstance(dagbag_import_timeout, (int, float)): 

362 raise TypeError( 

363 f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float" 

364 ) 

365 

366 if dagbag_import_timeout <= 0: # no parsing timeout 

367 return parse(mod_name, filepath) 

368 

369 timeout_msg = ( 

370 f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n" 

371 "Please take a look at these docs to improve your DAG import time:\n" 

372 f"* {get_docs_url('best-practices.html#top-level-python-code')}\n" 

373 f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}" 

374 ) 

375 with timeout(dagbag_import_timeout, error_message=timeout_msg): 

376 return parse(mod_name, filepath) 

377 

378 def _load_modules_from_zip(self, filepath, safe_mode): 

379 from airflow.models.dag import DagContext 

380 

381 mods = [] 

382 with zipfile.ZipFile(filepath) as current_zip_file: 

383 for zip_info in current_zip_file.infolist(): 

384 head, _ = os.path.split(zip_info.filename) 

385 mod_name, ext = os.path.splitext(zip_info.filename) 

386 if ext not in [".py", ".pyc"]: 

387 continue 

388 if head: 

389 continue 

390 

391 if mod_name == "__init__": 

392 self.log.warning("Found __init__.%s at root of %s", ext, filepath) 

393 

394 self.log.debug("Reading %s from %s", zip_info.filename, filepath) 

395 

396 if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file): 

397 # todo: create ignore list 

398 # Don't want to spam user with skip messages 

399 if not self.has_logged: 

400 self.has_logged = True 

401 self.log.info( 

402 "File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename 

403 ) 

404 continue 

405 

406 if mod_name in sys.modules: 

407 del sys.modules[mod_name] 

408 

409 DagContext.current_autoregister_module_name = mod_name 

410 try: 

411 sys.path.insert(0, filepath) 

412 current_module = importlib.import_module(mod_name) 

413 mods.append(current_module) 

414 except Exception as e: 

415 DagContext.autoregistered_dags.clear() 

416 fileloc = os.path.join(filepath, zip_info.filename) 

417 self.log.exception("Failed to import: %s", fileloc) 

418 if self.dagbag_import_error_tracebacks: 

419 self.import_errors[fileloc] = traceback.format_exc( 

420 limit=-self.dagbag_import_error_traceback_depth 

421 ) 

422 else: 

423 self.import_errors[fileloc] = str(e) 

424 finally: 

425 if sys.path[0] == filepath: 

426 del sys.path[0] 

427 return mods 

428 

429 def _process_modules(self, filepath, mods, file_last_changed_on_disk): 

430 from airflow.models.dag import DAG, DagContext # Avoid circular import 

431 

432 top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)} 

433 

434 top_level_dags.update(DagContext.autoregistered_dags) 

435 

436 DagContext.current_autoregister_module_name = None 

437 DagContext.autoregistered_dags.clear() 

438 

439 found_dags = [] 

440 

441 for (dag, mod) in top_level_dags: 

442 dag.fileloc = mod.__file__ 

443 try: 

444 dag.validate() 

445 self.bag_dag(dag=dag, root_dag=dag) 

446 except Exception as e: 

447 self.log.exception("Failed to bag_dag: %s", dag.fileloc) 

448 self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}" 

449 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk 

450 else: 

451 found_dags.append(dag) 

452 found_dags += dag.subdags 

453 return found_dags 

454 

455 def bag_dag(self, dag, root_dag): 

456 """ 

457 Adds the DAG into the bag, recurses into sub dags. 

458 

459 :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags. 

460 :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag. 

461 """ 

462 self._bag_dag(dag=dag, root_dag=root_dag, recursive=True) 

463 

464 def _bag_dag(self, *, dag, root_dag, recursive): 

465 """Actual implementation of bagging a dag. 

466 

467 The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``, 

468 intended to only be used by the ``_bag_dag()`` implementation. 

469 """ 

470 check_cycle(dag) # throws if a task cycle is found 

471 

472 dag.resolve_template_files() 

473 dag.last_loaded = timezone.utcnow() 

474 

475 try: 

476 # Check policies 

477 settings.dag_policy(dag) 

478 

479 for task in dag.tasks: 

480 settings.task_policy(task) 

481 except AirflowClusterPolicyViolation: 

482 raise 

483 except Exception as e: 

484 self.log.exception(e) 

485 raise AirflowClusterPolicyError(e) 

486 

487 subdags = dag.subdags 

488 

489 try: 

490 # DAG.subdags automatically performs DFS search, so we don't recurse 

491 # into further _bag_dag() calls. 

492 if recursive: 

493 for subdag in subdags: 

494 subdag.fileloc = dag.fileloc 

495 subdag.parent_dag = dag 

496 self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False) 

497 

498 prev_dag = self.dags.get(dag.dag_id) 

499 if prev_dag and prev_dag.fileloc != dag.fileloc: 

500 raise AirflowDagDuplicatedIdException( 

501 dag_id=dag.dag_id, 

502 incoming=dag.fileloc, 

503 existing=self.dags[dag.dag_id].fileloc, 

504 ) 

505 self.dags[dag.dag_id] = dag 

506 self.log.debug("Loaded DAG %s", dag) 

507 except (AirflowDagCycleException, AirflowDagDuplicatedIdException): 

508 # There was an error in bagging the dag. Remove it from the list of dags 

509 self.log.exception("Exception bagging dag: %s", dag.dag_id) 

510 # Only necessary at the root level since DAG.subdags automatically 

511 # performs DFS to search through all subdags 

512 if recursive: 

513 for subdag in subdags: 

514 if subdag.dag_id in self.dags: 

515 del self.dags[subdag.dag_id] 

516 raise 

517 

518 def collect_dags( 

519 self, 

520 dag_folder: str | pathlib.Path | None = None, 

521 only_if_updated: bool = True, 

522 include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"), 

523 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"), 

524 ): 

525 """ 

526 Given a file path or a folder, this method looks for python modules, 

527 imports them and adds them to the dagbag collection. 

528 

529 Note that if a ``.airflowignore`` file is found while processing 

530 the directory, it will behave much like a ``.gitignore``, 

531 ignoring files that match any of the patterns specified 

532 in the file. 

533 

534 **Note**: The patterns in ``.airflowignore`` are interpreted as either 

535 un-anchored regexes or gitignore-like glob expressions, depending on 

536 the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter. 

537 """ 

538 if self.read_dags_from_db: 

539 return 

540 

541 self.log.info("Filling up the DagBag from %s", dag_folder) 

542 dag_folder = dag_folder or self.dag_folder 

543 # Used to store stats around DagBag processing 

544 stats = [] 

545 

546 # Ensure dag_folder is a str -- it may have been a pathlib.Path 

547 dag_folder = correct_maybe_zipped(str(dag_folder)) 

548 for filepath in list_py_file_paths( 

549 dag_folder, 

550 safe_mode=safe_mode, 

551 include_examples=include_examples, 

552 ): 

553 try: 

554 file_parse_start_dttm = timezone.utcnow() 

555 found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode) 

556 

557 file_parse_end_dttm = timezone.utcnow() 

558 stats.append( 

559 FileLoadStat( 

560 file=filepath.replace(settings.DAGS_FOLDER, ""), 

561 duration=file_parse_end_dttm - file_parse_start_dttm, 

562 dag_num=len(found_dags), 

563 task_num=sum(len(dag.tasks) for dag in found_dags), 

564 dags=str([dag.dag_id for dag in found_dags]), 

565 ) 

566 ) 

567 except Exception as e: 

568 self.log.exception(e) 

569 

570 self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True) 

571 

572 def collect_dags_from_db(self): 

573 """Collects DAGs from database.""" 

574 from airflow.models.serialized_dag import SerializedDagModel 

575 

576 with Stats.timer("collect_db_dags"): 

577 self.log.info("Filling up the DagBag from database") 

578 

579 # The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted 

580 # from the table by the scheduler job. 

581 self.dags = SerializedDagModel.read_all_dags() 

582 

583 # Adds subdags. 

584 # DAG post-processing steps such as self.bag_dag and croniter are not needed as 

585 # they are done by scheduler before serialization. 

586 subdags = {} 

587 for dag in self.dags.values(): 

588 for subdag in dag.subdags: 

589 subdags[subdag.dag_id] = subdag 

590 self.dags.update(subdags) 

591 

592 def dagbag_report(self): 

593 """Prints a report around DagBag loading stats.""" 

594 stats = self.dagbag_stats 

595 dag_folder = self.dag_folder 

596 duration = sum((o.duration for o in stats), timedelta()).total_seconds() 

597 dag_num = sum(o.dag_num for o in stats) 

598 task_num = sum(o.task_num for o in stats) 

599 table = tabulate(stats, headers="keys") 

600 

601 report = textwrap.dedent( 

602 f"""\n 

603 ------------------------------------------------------------------- 

604 DagBag loading stats for {dag_folder} 

605 ------------------------------------------------------------------- 

606 Number of DAGs: {dag_num} 

607 Total task number: {task_num} 

608 DagBag parsing time: {duration}\n{table} 

609 """ 

610 ) 

611 return report 

612 

613 @classmethod 

614 @provide_session 

615 def _sync_to_db( 

616 cls, 

617 dags: dict[str, DAG], 

618 processor_subdir: str | None = None, 

619 session: Session = NEW_SESSION, 

620 ): 

621 """Save attributes about list of DAG to the DB.""" 

622 # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag 

623 from airflow.models.dag import DAG 

624 from airflow.models.serialized_dag import SerializedDagModel 

625 

626 log = cls.logger() 

627 

628 def _serialize_dag_capturing_errors(dag, session): 

629 """ 

630 Try to serialize the dag to the DB, but make a note of any errors. 

631 

632 We can't place them directly in import_errors, as this may be retried, and work the next time 

633 """ 

634 if dag.is_subdag: 

635 return [] 

636 try: 

637 # We can't use bulk_write_to_db as we want to capture each error individually 

638 dag_was_updated = SerializedDagModel.write_dag( 

639 dag, 

640 min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL, 

641 session=session, 

642 ) 

643 if dag_was_updated: 

644 DagBag._sync_perm_for_dag(dag, session=session) 

645 return [] 

646 except OperationalError: 

647 raise 

648 except Exception: 

649 log.exception("Failed to write serialized DAG: %s", dag.fileloc) 

650 dagbag_import_error_traceback_depth = conf.getint( 

651 "core", "dagbag_import_error_traceback_depth" 

652 ) 

653 return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))] 

654 

655 # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case 

656 # of any Operational Errors 

657 # In case of failures, provide_session handles rollback 

658 import_errors = {} 

659 for attempt in run_with_db_retries(logger=log): 

660 with attempt: 

661 serialize_errors = [] 

662 log.debug( 

663 "Running dagbag.sync_to_db with retries. Try %d of %d", 

664 attempt.retry_state.attempt_number, 

665 MAX_DB_RETRIES, 

666 ) 

667 log.debug("Calling the DAG.bulk_sync_to_db method") 

668 try: 

669 # Write Serialized DAGs to DB, capturing errors 

670 for dag in dags.values(): 

671 serialize_errors.extend(_serialize_dag_capturing_errors(dag, session)) 

672 

673 DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session) 

674 except OperationalError: 

675 session.rollback() 

676 raise 

677 # Only now we are "complete" do we update import_errors - don't want to record errors from 

678 # previous failed attempts 

679 import_errors.update(dict(serialize_errors)) 

680 

681 return import_errors 

682 

683 @provide_session 

684 def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION): 

685 import_errors = DagBag._sync_to_db(dags=self.dags, processor_subdir=processor_subdir, session=session) 

686 self.import_errors.update(import_errors) 

687 

688 @classmethod 

689 @provide_session 

690 def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION): 

691 """Sync DAG specific permissions.""" 

692 root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id 

693 

694 cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id) 

695 from airflow.www.security import ApplessAirflowSecurityManager 

696 

697 security_manager = ApplessAirflowSecurityManager(session=session) 

698 security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)