Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/dagbag.py: 18%
349 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import hashlib
21import importlib
22import importlib.machinery
23import importlib.util
24import os
25import sys
26import textwrap
27import traceback
28import warnings
29import zipfile
30from datetime import datetime, timedelta
31from typing import TYPE_CHECKING, NamedTuple
33from sqlalchemy.exc import OperationalError
34from sqlalchemy.orm import Session
35from tabulate import tabulate
37from airflow import settings
38from airflow.configuration import conf
39from airflow.exceptions import (
40 AirflowClusterPolicyError,
41 AirflowClusterPolicyViolation,
42 AirflowDagCycleException,
43 AirflowDagDuplicatedIdException,
44 RemovedInAirflow3Warning,
45)
46from airflow.stats import Stats
47from airflow.utils import timezone
48from airflow.utils.dag_cycle_tester import check_cycle
49from airflow.utils.docs import get_docs_url
50from airflow.utils.file import correct_maybe_zipped, list_py_file_paths, might_contain_dag
51from airflow.utils.log.logging_mixin import LoggingMixin
52from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
53from airflow.utils.session import NEW_SESSION, provide_session
54from airflow.utils.timeout import timeout
55from airflow.utils.types import NOTSET, ArgNotSet
57if TYPE_CHECKING:
58 import pathlib
60 from airflow.models.dag import DAG
63class FileLoadStat(NamedTuple):
64 """Information about single file."""
66 file: str
67 duration: timedelta
68 dag_num: int
69 task_num: int
70 dags: str
73class DagBag(LoggingMixin):
74 """
75 A dagbag is a collection of dags, parsed out of a folder tree and has high
76 level configuration settings, like what database to use as a backend and
77 what executor to use to fire off tasks. This makes it easier to run
78 distinct environments for say production and development, tests, or for
79 different teams or security profiles. What would have been system level
80 settings are now dagbag level so that one system can run multiple,
81 independent settings sets.
83 :param dag_folder: the folder to scan to find DAGs
84 :param include_examples: whether to include the examples that ship
85 with airflow or not
86 :param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
87 If ``False`` DAGs are read from python files.
88 :param load_op_links: Should the extra operator link be loaded via plugins when
89 de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links
90 are not loaded to not run User code in Scheduler.
91 """
93 def __init__(
94 self,
95 dag_folder: str | pathlib.Path | None = None,
96 include_examples: bool | ArgNotSet = NOTSET,
97 safe_mode: bool | ArgNotSet = NOTSET,
98 read_dags_from_db: bool = False,
99 store_serialized_dags: bool | None = None,
100 load_op_links: bool = True,
101 collect_dags: bool = True,
102 ):
103 # Avoid circular import
104 from airflow.models.dag import DAG
106 super().__init__()
108 include_examples = (
109 include_examples
110 if isinstance(include_examples, bool)
111 else conf.getboolean("core", "LOAD_EXAMPLES")
112 )
113 safe_mode = (
114 safe_mode if isinstance(safe_mode, bool) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
115 )
117 if store_serialized_dags:
118 warnings.warn(
119 "The store_serialized_dags parameter has been deprecated. "
120 "You should pass the read_dags_from_db parameter.",
121 RemovedInAirflow3Warning,
122 stacklevel=2,
123 )
124 read_dags_from_db = store_serialized_dags
126 dag_folder = dag_folder or settings.DAGS_FOLDER
127 self.dag_folder = dag_folder
128 self.dags: dict[str, DAG] = {}
129 # the file's last modified timestamp when we last read it
130 self.file_last_changed: dict[str, datetime] = {}
131 self.import_errors: dict[str, str] = {}
132 self.has_logged = False
133 self.read_dags_from_db = read_dags_from_db
134 # Only used by read_dags_from_db=True
135 self.dags_last_fetched: dict[str, datetime] = {}
136 # Only used by SchedulerJob to compare the dag_hash to identify change in DAGs
137 self.dags_hash: dict[str, str] = {}
139 self.dagbag_import_error_tracebacks = conf.getboolean("core", "dagbag_import_error_tracebacks")
140 self.dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
141 if collect_dags:
142 self.collect_dags(
143 dag_folder=dag_folder,
144 include_examples=include_examples,
145 safe_mode=safe_mode,
146 )
147 # Should the extra operator link be loaded via plugins?
148 # This flag is set to False in Scheduler so that Extra Operator links are not loaded
149 self.load_op_links = load_op_links
151 def size(self) -> int:
152 """:return: the amount of dags contained in this dagbag"""
153 return len(self.dags)
155 @property
156 def store_serialized_dags(self) -> bool:
157 """Whether to read dags from DB."""
158 warnings.warn(
159 "The store_serialized_dags property has been deprecated. Use read_dags_from_db instead.",
160 RemovedInAirflow3Warning,
161 stacklevel=2,
162 )
163 return self.read_dags_from_db
165 @property
166 def dag_ids(self) -> list[str]:
167 """
168 Get DAG ids.
170 :return: a list of DAG IDs in this bag
171 """
172 return list(self.dags.keys())
174 @provide_session
175 def get_dag(self, dag_id, session: Session = None):
176 """
177 Gets the DAG out of the dictionary, and refreshes it if expired.
179 :param dag_id: DAG ID
180 """
181 # Avoid circular import
182 from airflow.models.dag import DagModel
184 if self.read_dags_from_db:
185 # Import here so that serialized dag is only imported when serialization is enabled
186 from airflow.models.serialized_dag import SerializedDagModel
188 if dag_id not in self.dags:
189 # Load from DB if not (yet) in the bag
190 self._add_dag_from_db(dag_id=dag_id, session=session)
191 return self.dags.get(dag_id)
193 # If DAG is in the DagBag, check the following
194 # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
195 # 2. check the last_updated and hash columns in SerializedDag table to see if
196 # Serialized DAG is updated
197 # 3. if (2) is yes, fetch the Serialized DAG.
198 # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
199 # if it exists and return None.
200 min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
201 if (
202 dag_id in self.dags_last_fetched
203 and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
204 ):
205 sd_latest_version_and_updated_datetime = (
206 SerializedDagModel.get_latest_version_hash_and_updated_datetime(
207 dag_id=dag_id, session=session
208 )
209 )
210 if not sd_latest_version_and_updated_datetime:
211 self.log.warning("Serialized DAG %s no longer exists", dag_id)
212 del self.dags[dag_id]
213 del self.dags_last_fetched[dag_id]
214 del self.dags_hash[dag_id]
215 return None
217 sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime
219 if (
220 sd_last_updated_datetime > self.dags_last_fetched[dag_id]
221 or sd_latest_version != self.dags_hash[dag_id]
222 ):
223 self._add_dag_from_db(dag_id=dag_id, session=session)
225 return self.dags.get(dag_id)
227 # If asking for a known subdag, we want to refresh the parent
228 dag = None
229 root_dag_id = dag_id
230 if dag_id in self.dags:
231 dag = self.dags[dag_id]
232 if dag.parent_dag:
233 root_dag_id = dag.parent_dag.dag_id
235 # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized?
236 orm_dag = DagModel.get_current(root_dag_id, session=session)
237 if not orm_dag:
238 return self.dags.get(dag_id)
240 # If the dag corresponding to root_dag_id is absent or expired
241 is_missing = root_dag_id not in self.dags
242 is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired
243 if is_expired:
244 # Remove associated dags so we can re-add them.
245 self.dags = {
246 key: dag
247 for key, dag in self.dags.items()
248 if root_dag_id != key and not (dag.parent_dag and root_dag_id == dag.parent_dag.dag_id)
249 }
250 if is_missing or is_expired:
251 # Reprocess source file.
252 found_dags = self.process_file(
253 filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
254 )
256 # If the source file no longer exports `dag_id`, delete it from self.dags
257 if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
258 return self.dags[dag_id]
259 elif dag_id in self.dags:
260 del self.dags[dag_id]
261 return self.dags.get(dag_id)
263 def _add_dag_from_db(self, dag_id: str, session: Session):
264 """Add DAG to DagBag from DB."""
265 from airflow.models.serialized_dag import SerializedDagModel
267 row = SerializedDagModel.get(dag_id, session)
268 if not row:
269 return None
271 row.load_op_links = self.load_op_links
272 dag = row.dag
273 for subdag in dag.subdags:
274 self.dags[subdag.dag_id] = subdag
275 self.dags[dag.dag_id] = dag
276 self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
277 self.dags_hash[dag.dag_id] = row.dag_hash
279 def process_file(self, filepath, only_if_updated=True, safe_mode=True):
280 """
281 Given a path to a python module or zip file, this method imports
282 the module and look for dag objects within it.
283 """
284 from airflow.models.dag import DagContext
286 # if the source file no longer exists in the DB or in the filesystem,
287 # return an empty list
288 # todo: raise exception?
290 if filepath is None or not os.path.isfile(filepath):
291 return []
293 try:
294 # This failed before in what may have been a git sync
295 # race condition
296 file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath))
297 if (
298 only_if_updated
299 and filepath in self.file_last_changed
300 and file_last_changed_on_disk == self.file_last_changed[filepath]
301 ):
302 return []
303 except Exception as e:
304 self.log.exception(e)
305 return []
307 # Ensure we don't pick up anything else we didn't mean to
308 DagContext.autoregistered_dags.clear()
310 if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
311 mods = self._load_modules_from_file(filepath, safe_mode)
312 else:
313 mods = self._load_modules_from_zip(filepath, safe_mode)
315 found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
317 self.file_last_changed[filepath] = file_last_changed_on_disk
318 return found_dags
320 def _load_modules_from_file(self, filepath, safe_mode):
321 from airflow.models.dag import DagContext
323 if not might_contain_dag(filepath, safe_mode):
324 # Don't want to spam user with skip messages
325 if not self.has_logged:
326 self.has_logged = True
327 self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
328 return []
330 self.log.debug("Importing %s", filepath)
331 org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
332 path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest()
333 mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"
335 if mod_name in sys.modules:
336 del sys.modules[mod_name]
338 DagContext.current_autoregister_module_name = mod_name
340 def parse(mod_name, filepath):
341 try:
342 loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
343 spec = importlib.util.spec_from_loader(mod_name, loader)
344 new_module = importlib.util.module_from_spec(spec)
345 sys.modules[spec.name] = new_module
346 loader.exec_module(new_module)
347 return [new_module]
348 except Exception as e:
349 DagContext.autoregistered_dags.clear()
350 self.log.exception("Failed to import: %s", filepath)
351 if self.dagbag_import_error_tracebacks:
352 self.import_errors[filepath] = traceback.format_exc(
353 limit=-self.dagbag_import_error_traceback_depth
354 )
355 else:
356 self.import_errors[filepath] = str(e)
357 return []
359 dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
361 if not isinstance(dagbag_import_timeout, (int, float)):
362 raise TypeError(
363 f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float"
364 )
366 if dagbag_import_timeout <= 0: # no parsing timeout
367 return parse(mod_name, filepath)
369 timeout_msg = (
370 f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n"
371 "Please take a look at these docs to improve your DAG import time:\n"
372 f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
373 f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
374 )
375 with timeout(dagbag_import_timeout, error_message=timeout_msg):
376 return parse(mod_name, filepath)
378 def _load_modules_from_zip(self, filepath, safe_mode):
379 from airflow.models.dag import DagContext
381 mods = []
382 with zipfile.ZipFile(filepath) as current_zip_file:
383 for zip_info in current_zip_file.infolist():
384 head, _ = os.path.split(zip_info.filename)
385 mod_name, ext = os.path.splitext(zip_info.filename)
386 if ext not in [".py", ".pyc"]:
387 continue
388 if head:
389 continue
391 if mod_name == "__init__":
392 self.log.warning("Found __init__.%s at root of %s", ext, filepath)
394 self.log.debug("Reading %s from %s", zip_info.filename, filepath)
396 if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
397 # todo: create ignore list
398 # Don't want to spam user with skip messages
399 if not self.has_logged:
400 self.has_logged = True
401 self.log.info(
402 "File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename
403 )
404 continue
406 if mod_name in sys.modules:
407 del sys.modules[mod_name]
409 DagContext.current_autoregister_module_name = mod_name
410 try:
411 sys.path.insert(0, filepath)
412 current_module = importlib.import_module(mod_name)
413 mods.append(current_module)
414 except Exception as e:
415 DagContext.autoregistered_dags.clear()
416 fileloc = os.path.join(filepath, zip_info.filename)
417 self.log.exception("Failed to import: %s", fileloc)
418 if self.dagbag_import_error_tracebacks:
419 self.import_errors[fileloc] = traceback.format_exc(
420 limit=-self.dagbag_import_error_traceback_depth
421 )
422 else:
423 self.import_errors[fileloc] = str(e)
424 finally:
425 if sys.path[0] == filepath:
426 del sys.path[0]
427 return mods
429 def _process_modules(self, filepath, mods, file_last_changed_on_disk):
430 from airflow.models.dag import DAG, DagContext # Avoid circular import
432 top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)}
434 top_level_dags.update(DagContext.autoregistered_dags)
436 DagContext.current_autoregister_module_name = None
437 DagContext.autoregistered_dags.clear()
439 found_dags = []
441 for (dag, mod) in top_level_dags:
442 dag.fileloc = mod.__file__
443 try:
444 dag.validate()
445 self.bag_dag(dag=dag, root_dag=dag)
446 except Exception as e:
447 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
448 self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
449 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
450 else:
451 found_dags.append(dag)
452 found_dags += dag.subdags
453 return found_dags
455 def bag_dag(self, dag, root_dag):
456 """
457 Adds the DAG into the bag, recurses into sub dags.
459 :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
460 :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
461 """
462 self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
464 def _bag_dag(self, *, dag, root_dag, recursive):
465 """Actual implementation of bagging a dag.
467 The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``,
468 intended to only be used by the ``_bag_dag()`` implementation.
469 """
470 check_cycle(dag) # throws if a task cycle is found
472 dag.resolve_template_files()
473 dag.last_loaded = timezone.utcnow()
475 try:
476 # Check policies
477 settings.dag_policy(dag)
479 for task in dag.tasks:
480 settings.task_policy(task)
481 except AirflowClusterPolicyViolation:
482 raise
483 except Exception as e:
484 self.log.exception(e)
485 raise AirflowClusterPolicyError(e)
487 subdags = dag.subdags
489 try:
490 # DAG.subdags automatically performs DFS search, so we don't recurse
491 # into further _bag_dag() calls.
492 if recursive:
493 for subdag in subdags:
494 subdag.fileloc = dag.fileloc
495 subdag.parent_dag = dag
496 self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False)
498 prev_dag = self.dags.get(dag.dag_id)
499 if prev_dag and prev_dag.fileloc != dag.fileloc:
500 raise AirflowDagDuplicatedIdException(
501 dag_id=dag.dag_id,
502 incoming=dag.fileloc,
503 existing=self.dags[dag.dag_id].fileloc,
504 )
505 self.dags[dag.dag_id] = dag
506 self.log.debug("Loaded DAG %s", dag)
507 except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
508 # There was an error in bagging the dag. Remove it from the list of dags
509 self.log.exception("Exception bagging dag: %s", dag.dag_id)
510 # Only necessary at the root level since DAG.subdags automatically
511 # performs DFS to search through all subdags
512 if recursive:
513 for subdag in subdags:
514 if subdag.dag_id in self.dags:
515 del self.dags[subdag.dag_id]
516 raise
518 def collect_dags(
519 self,
520 dag_folder: str | pathlib.Path | None = None,
521 only_if_updated: bool = True,
522 include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
523 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
524 ):
525 """
526 Given a file path or a folder, this method looks for python modules,
527 imports them and adds them to the dagbag collection.
529 Note that if a ``.airflowignore`` file is found while processing
530 the directory, it will behave much like a ``.gitignore``,
531 ignoring files that match any of the patterns specified
532 in the file.
534 **Note**: The patterns in ``.airflowignore`` are interpreted as either
535 un-anchored regexes or gitignore-like glob expressions, depending on
536 the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
537 """
538 if self.read_dags_from_db:
539 return
541 self.log.info("Filling up the DagBag from %s", dag_folder)
542 dag_folder = dag_folder or self.dag_folder
543 # Used to store stats around DagBag processing
544 stats = []
546 # Ensure dag_folder is a str -- it may have been a pathlib.Path
547 dag_folder = correct_maybe_zipped(str(dag_folder))
548 for filepath in list_py_file_paths(
549 dag_folder,
550 safe_mode=safe_mode,
551 include_examples=include_examples,
552 ):
553 try:
554 file_parse_start_dttm = timezone.utcnow()
555 found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
557 file_parse_end_dttm = timezone.utcnow()
558 stats.append(
559 FileLoadStat(
560 file=filepath.replace(settings.DAGS_FOLDER, ""),
561 duration=file_parse_end_dttm - file_parse_start_dttm,
562 dag_num=len(found_dags),
563 task_num=sum(len(dag.tasks) for dag in found_dags),
564 dags=str([dag.dag_id for dag in found_dags]),
565 )
566 )
567 except Exception as e:
568 self.log.exception(e)
570 self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)
572 def collect_dags_from_db(self):
573 """Collects DAGs from database."""
574 from airflow.models.serialized_dag import SerializedDagModel
576 with Stats.timer("collect_db_dags"):
577 self.log.info("Filling up the DagBag from database")
579 # The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
580 # from the table by the scheduler job.
581 self.dags = SerializedDagModel.read_all_dags()
583 # Adds subdags.
584 # DAG post-processing steps such as self.bag_dag and croniter are not needed as
585 # they are done by scheduler before serialization.
586 subdags = {}
587 for dag in self.dags.values():
588 for subdag in dag.subdags:
589 subdags[subdag.dag_id] = subdag
590 self.dags.update(subdags)
592 def dagbag_report(self):
593 """Prints a report around DagBag loading stats."""
594 stats = self.dagbag_stats
595 dag_folder = self.dag_folder
596 duration = sum((o.duration for o in stats), timedelta()).total_seconds()
597 dag_num = sum(o.dag_num for o in stats)
598 task_num = sum(o.task_num for o in stats)
599 table = tabulate(stats, headers="keys")
601 report = textwrap.dedent(
602 f"""\n
603 -------------------------------------------------------------------
604 DagBag loading stats for {dag_folder}
605 -------------------------------------------------------------------
606 Number of DAGs: {dag_num}
607 Total task number: {task_num}
608 DagBag parsing time: {duration}\n{table}
609 """
610 )
611 return report
613 @classmethod
614 @provide_session
615 def _sync_to_db(
616 cls,
617 dags: dict[str, DAG],
618 processor_subdir: str | None = None,
619 session: Session = NEW_SESSION,
620 ):
621 """Save attributes about list of DAG to the DB."""
622 # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag
623 from airflow.models.dag import DAG
624 from airflow.models.serialized_dag import SerializedDagModel
626 log = cls.logger()
628 def _serialize_dag_capturing_errors(dag, session):
629 """
630 Try to serialize the dag to the DB, but make a note of any errors.
632 We can't place them directly in import_errors, as this may be retried, and work the next time
633 """
634 if dag.is_subdag:
635 return []
636 try:
637 # We can't use bulk_write_to_db as we want to capture each error individually
638 dag_was_updated = SerializedDagModel.write_dag(
639 dag,
640 min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
641 session=session,
642 )
643 if dag_was_updated:
644 DagBag._sync_perm_for_dag(dag, session=session)
645 return []
646 except OperationalError:
647 raise
648 except Exception:
649 log.exception("Failed to write serialized DAG: %s", dag.fileloc)
650 dagbag_import_error_traceback_depth = conf.getint(
651 "core", "dagbag_import_error_traceback_depth"
652 )
653 return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
655 # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
656 # of any Operational Errors
657 # In case of failures, provide_session handles rollback
658 import_errors = {}
659 for attempt in run_with_db_retries(logger=log):
660 with attempt:
661 serialize_errors = []
662 log.debug(
663 "Running dagbag.sync_to_db with retries. Try %d of %d",
664 attempt.retry_state.attempt_number,
665 MAX_DB_RETRIES,
666 )
667 log.debug("Calling the DAG.bulk_sync_to_db method")
668 try:
669 # Write Serialized DAGs to DB, capturing errors
670 for dag in dags.values():
671 serialize_errors.extend(_serialize_dag_capturing_errors(dag, session))
673 DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session)
674 except OperationalError:
675 session.rollback()
676 raise
677 # Only now we are "complete" do we update import_errors - don't want to record errors from
678 # previous failed attempts
679 import_errors.update(dict(serialize_errors))
681 return import_errors
683 @provide_session
684 def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION):
685 import_errors = DagBag._sync_to_db(dags=self.dags, processor_subdir=processor_subdir, session=session)
686 self.import_errors.update(import_errors)
688 @classmethod
689 @provide_session
690 def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
691 """Sync DAG specific permissions."""
692 root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
694 cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
695 from airflow.www.security import ApplessAirflowSecurityManager
697 security_manager = ApplessAirflowSecurityManager(session=session)
698 security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)