Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/dagbag.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import hashlib
21import importlib
22import importlib.machinery
23import importlib.util
24import os
25import sys
26import textwrap
27import traceback
28import warnings
29import zipfile
30from datetime import datetime, timedelta
31from pathlib import Path
32from typing import TYPE_CHECKING, NamedTuple
34from sqlalchemy import (
35 Column,
36 String,
37)
38from sqlalchemy.exc import OperationalError
39from tabulate import tabulate
41from airflow import settings
42from airflow.configuration import conf
43from airflow.exceptions import (
44 AirflowClusterPolicyError,
45 AirflowClusterPolicySkipDag,
46 AirflowClusterPolicyViolation,
47 AirflowDagCycleException,
48 AirflowDagDuplicatedIdException,
49 RemovedInAirflow3Warning,
50)
51from airflow.models.base import Base
52from airflow.stats import Stats
53from airflow.utils import timezone
54from airflow.utils.dag_cycle_tester import check_cycle
55from airflow.utils.docs import get_docs_url
56from airflow.utils.file import (
57 correct_maybe_zipped,
58 get_unique_dag_module_name,
59 list_py_file_paths,
60 might_contain_dag,
61)
62from airflow.utils.log.logging_mixin import LoggingMixin
63from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
64from airflow.utils.session import NEW_SESSION, provide_session
65from airflow.utils.timeout import timeout
66from airflow.utils.types import NOTSET
67from airflow.utils.warnings import capture_with_reraise
69if TYPE_CHECKING:
70 from sqlalchemy.orm import Session
72 from airflow.models.dag import DAG
73 from airflow.utils.types import ArgNotSet
76class FileLoadStat(NamedTuple):
77 """
78 Information about single file.
80 :param file: Loaded file.
81 :param duration: Time spent on process file.
82 :param dag_num: Total number of DAGs loaded in this file.
83 :param task_num: Total number of Tasks loaded in this file.
84 :param dags: DAGs names loaded in this file.
85 :param warning_num: Total number of warnings captured from processing this file.
86 """
88 file: str
89 duration: timedelta
90 dag_num: int
91 task_num: int
92 dags: str
93 warning_num: int
96class DagBag(LoggingMixin):
97 """
98 A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings.
100 Some possible setting are database to use as a backend and what executor
101 to use to fire off tasks. This makes it easier to run distinct environments
102 for say production and development, tests, or for different teams or security
103 profiles. What would have been system level settings are now dagbag level so
104 that one system can run multiple, independent settings sets.
106 :param dag_folder: the folder to scan to find DAGs
107 :param include_examples: whether to include the examples that ship
108 with airflow or not
109 :param safe_mode: when ``False``, scans all python modules for dags.
110 When ``True`` uses heuristics (files containing ``DAG`` and ``airflow`` strings)
111 to filter python modules to scan for dags.
112 :param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
113 If ``False`` DAGs are read from python files.
114 :param store_serialized_dags: deprecated parameter, same effect as `read_dags_from_db`
115 :param load_op_links: Should the extra operator link be loaded via plugins when
116 de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links
117 are not loaded to not run User code in Scheduler.
118 :param collect_dags: when True, collects dags during class initialization.
119 """
121 def __init__(
122 self,
123 dag_folder: str | Path | None = None,
124 include_examples: bool | ArgNotSet = NOTSET,
125 safe_mode: bool | ArgNotSet = NOTSET,
126 read_dags_from_db: bool = False,
127 store_serialized_dags: bool | None = None,
128 load_op_links: bool = True,
129 collect_dags: bool = True,
130 ):
131 # Avoid circular import
133 super().__init__()
135 include_examples = (
136 include_examples
137 if isinstance(include_examples, bool)
138 else conf.getboolean("core", "LOAD_EXAMPLES")
139 )
140 safe_mode = (
141 safe_mode if isinstance(safe_mode, bool) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
142 )
144 if store_serialized_dags:
145 warnings.warn(
146 "The store_serialized_dags parameter has been deprecated. "
147 "You should pass the read_dags_from_db parameter.",
148 RemovedInAirflow3Warning,
149 stacklevel=2,
150 )
151 read_dags_from_db = store_serialized_dags
153 dag_folder = dag_folder or settings.DAGS_FOLDER
154 self.dag_folder = dag_folder
155 self.dags: dict[str, DAG] = {}
156 # the file's last modified timestamp when we last read it
157 self.file_last_changed: dict[str, datetime] = {}
158 self.import_errors: dict[str, str] = {}
159 self.captured_warnings: dict[str, tuple[str, ...]] = {}
160 self.has_logged = False
161 self.read_dags_from_db = read_dags_from_db
162 # Only used by read_dags_from_db=True
163 self.dags_last_fetched: dict[str, datetime] = {}
164 # Only used by SchedulerJob to compare the dag_hash to identify change in DAGs
165 self.dags_hash: dict[str, str] = {}
167 self.dagbag_import_error_tracebacks = conf.getboolean("core", "dagbag_import_error_tracebacks")
168 self.dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
169 if collect_dags:
170 self.collect_dags(
171 dag_folder=dag_folder,
172 include_examples=include_examples,
173 safe_mode=safe_mode,
174 )
175 # Should the extra operator link be loaded via plugins?
176 # This flag is set to False in Scheduler so that Extra Operator links are not loaded
177 self.load_op_links = load_op_links
179 def size(self) -> int:
180 """:return: the amount of dags contained in this dagbag"""
181 return len(self.dags)
183 @property
184 def store_serialized_dags(self) -> bool:
185 """Whether to read dags from DB."""
186 warnings.warn(
187 "The store_serialized_dags property has been deprecated. Use read_dags_from_db instead.",
188 RemovedInAirflow3Warning,
189 stacklevel=2,
190 )
191 return self.read_dags_from_db
193 @property
194 def dag_ids(self) -> list[str]:
195 """
196 Get DAG ids.
198 :return: a list of DAG IDs in this bag
199 """
200 return list(self.dags)
202 @provide_session
203 def get_dag(self, dag_id, session: Session = None):
204 """
205 Get the DAG out of the dictionary, and refreshes it if expired.
207 :param dag_id: DAG ID
208 """
209 # Avoid circular import
210 from airflow.models.dag import DagModel
212 if self.read_dags_from_db:
213 # Import here so that serialized dag is only imported when serialization is enabled
214 from airflow.models.serialized_dag import SerializedDagModel
216 if dag_id not in self.dags:
217 # Load from DB if not (yet) in the bag
218 self._add_dag_from_db(dag_id=dag_id, session=session)
219 return self.dags.get(dag_id)
221 # If DAG is in the DagBag, check the following
222 # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
223 # 2. check the last_updated and hash columns in SerializedDag table to see if
224 # Serialized DAG is updated
225 # 3. if (2) is yes, fetch the Serialized DAG.
226 # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
227 # if it exists and return None.
228 min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
229 if (
230 dag_id in self.dags_last_fetched
231 and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
232 ):
233 sd_latest_version_and_updated_datetime = (
234 SerializedDagModel.get_latest_version_hash_and_updated_datetime(
235 dag_id=dag_id, session=session
236 )
237 )
238 if not sd_latest_version_and_updated_datetime:
239 self.log.warning("Serialized DAG %s no longer exists", dag_id)
240 del self.dags[dag_id]
241 del self.dags_last_fetched[dag_id]
242 del self.dags_hash[dag_id]
243 return None
245 sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime
247 if (
248 sd_last_updated_datetime > self.dags_last_fetched[dag_id]
249 or sd_latest_version != self.dags_hash[dag_id]
250 ):
251 self._add_dag_from_db(dag_id=dag_id, session=session)
253 return self.dags.get(dag_id)
255 # If asking for a known subdag, we want to refresh the parent
256 dag = None
257 root_dag_id = dag_id
258 if dag_id in self.dags:
259 dag = self.dags[dag_id]
260 if dag.parent_dag:
261 root_dag_id = dag.parent_dag.dag_id
263 # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized?
264 orm_dag = DagModel.get_current(root_dag_id, session=session)
265 if not orm_dag:
266 return self.dags.get(dag_id)
268 # If the dag corresponding to root_dag_id is absent or expired
269 is_missing = root_dag_id not in self.dags
270 is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired
271 if is_expired:
272 # Remove associated dags so we can re-add them.
273 self.dags = {
274 key: dag
275 for key, dag in self.dags.items()
276 if root_dag_id != key and not (dag.parent_dag and root_dag_id == dag.parent_dag.dag_id)
277 }
278 if is_missing or is_expired:
279 # Reprocess source file.
280 found_dags = self.process_file(
281 filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
282 )
284 # If the source file no longer exports `dag_id`, delete it from self.dags
285 if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
286 return self.dags[dag_id]
287 elif dag_id in self.dags:
288 del self.dags[dag_id]
289 return self.dags.get(dag_id)
291 def _add_dag_from_db(self, dag_id: str, session: Session):
292 """Add DAG to DagBag from DB."""
293 from airflow.models.serialized_dag import SerializedDagModel
295 row = SerializedDagModel.get(dag_id, session)
296 if not row:
297 return None
299 row.load_op_links = self.load_op_links
300 dag = row.dag
301 for subdag in dag.subdags:
302 self.dags[subdag.dag_id] = subdag
303 self.dags[dag.dag_id] = dag
304 self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
305 self.dags_hash[dag.dag_id] = row.dag_hash
307 def process_file(self, filepath, only_if_updated=True, safe_mode=True):
308 """Given a path to a python module or zip file, import the module and look for dag objects within."""
309 from airflow.models.dag import DagContext
311 # if the source file no longer exists in the DB or in the filesystem,
312 # return an empty list
313 # todo: raise exception?
315 if filepath is None or not os.path.isfile(filepath):
316 return []
318 try:
319 # This failed before in what may have been a git sync
320 # race condition
321 file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath))
322 if (
323 only_if_updated
324 and filepath in self.file_last_changed
325 and file_last_changed_on_disk == self.file_last_changed[filepath]
326 ):
327 return []
328 except Exception as e:
329 self.log.exception(e)
330 return []
332 # Ensure we don't pick up anything else we didn't mean to
333 DagContext.autoregistered_dags.clear()
335 self.captured_warnings.pop(filepath, None)
336 with capture_with_reraise() as captured_warnings:
337 if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
338 mods = self._load_modules_from_file(filepath, safe_mode)
339 else:
340 mods = self._load_modules_from_zip(filepath, safe_mode)
342 if captured_warnings:
343 formatted_warnings = []
344 for msg in captured_warnings:
345 category = msg.category.__name__
346 if (module := msg.category.__module__) != "builtins":
347 category = f"{module}.{category}"
348 formatted_warnings.append(f"{msg.filename}:{msg.lineno}: {category}: {msg.message}")
349 self.captured_warnings[filepath] = tuple(formatted_warnings)
351 found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
353 self.file_last_changed[filepath] = file_last_changed_on_disk
354 return found_dags
356 def _load_modules_from_file(self, filepath, safe_mode):
357 from airflow.models.dag import DagContext
359 if not might_contain_dag(filepath, safe_mode):
360 # Don't want to spam user with skip messages
361 if not self.has_logged:
362 self.has_logged = True
363 self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
364 return []
366 self.log.debug("Importing %s", filepath)
367 mod_name = get_unique_dag_module_name(filepath)
369 if mod_name in sys.modules:
370 del sys.modules[mod_name]
372 DagContext.current_autoregister_module_name = mod_name
374 def parse(mod_name, filepath):
375 try:
376 loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
377 spec = importlib.util.spec_from_loader(mod_name, loader)
378 new_module = importlib.util.module_from_spec(spec)
379 sys.modules[spec.name] = new_module
380 loader.exec_module(new_module)
381 return [new_module]
382 except Exception as e:
383 DagContext.autoregistered_dags.clear()
384 self.log.exception("Failed to import: %s", filepath)
385 if self.dagbag_import_error_tracebacks:
386 self.import_errors[filepath] = traceback.format_exc(
387 limit=-self.dagbag_import_error_traceback_depth
388 )
389 else:
390 self.import_errors[filepath] = str(e)
391 return []
393 dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
395 if not isinstance(dagbag_import_timeout, (int, float)):
396 raise TypeError(
397 f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float"
398 )
400 if dagbag_import_timeout <= 0: # no parsing timeout
401 return parse(mod_name, filepath)
403 timeout_msg = (
404 f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n"
405 "Please take a look at these docs to improve your DAG import time:\n"
406 f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
407 f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
408 )
409 with timeout(dagbag_import_timeout, error_message=timeout_msg):
410 return parse(mod_name, filepath)
412 def _load_modules_from_zip(self, filepath, safe_mode):
413 from airflow.models.dag import DagContext
415 mods = []
416 with zipfile.ZipFile(filepath) as current_zip_file:
417 for zip_info in current_zip_file.infolist():
418 zip_path = Path(zip_info.filename)
419 if zip_path.suffix not in [".py", ".pyc"] or len(zip_path.parts) > 1:
420 continue
422 if zip_path.stem == "__init__":
423 self.log.warning("Found %s at root of %s", zip_path.name, filepath)
425 self.log.debug("Reading %s from %s", zip_info.filename, filepath)
427 if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
428 # todo: create ignore list
429 # Don't want to spam user with skip messages
430 if not self.has_logged:
431 self.has_logged = True
432 self.log.info(
433 "File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename
434 )
435 continue
437 mod_name = zip_path.stem
438 if mod_name in sys.modules:
439 del sys.modules[mod_name]
441 DagContext.current_autoregister_module_name = mod_name
442 try:
443 sys.path.insert(0, filepath)
444 current_module = importlib.import_module(mod_name)
445 mods.append(current_module)
446 except Exception as e:
447 DagContext.autoregistered_dags.clear()
448 fileloc = os.path.join(filepath, zip_info.filename)
449 self.log.exception("Failed to import: %s", fileloc)
450 if self.dagbag_import_error_tracebacks:
451 self.import_errors[fileloc] = traceback.format_exc(
452 limit=-self.dagbag_import_error_traceback_depth
453 )
454 else:
455 self.import_errors[fileloc] = str(e)
456 finally:
457 if sys.path[0] == filepath:
458 del sys.path[0]
459 return mods
461 def _process_modules(self, filepath, mods, file_last_changed_on_disk):
462 from airflow.models.dag import DAG, DagContext # Avoid circular import
464 top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)}
466 top_level_dags.update(DagContext.autoregistered_dags)
468 DagContext.current_autoregister_module_name = None
469 DagContext.autoregistered_dags.clear()
471 found_dags = []
473 for dag, mod in top_level_dags:
474 dag.fileloc = mod.__file__
475 try:
476 dag.validate()
477 self.bag_dag(dag=dag, root_dag=dag)
478 except AirflowClusterPolicySkipDag:
479 pass
480 except Exception as e:
481 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
482 self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
483 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
484 else:
485 found_dags.append(dag)
486 found_dags += dag.subdags
487 return found_dags
489 def bag_dag(self, dag, root_dag):
490 """
491 Add the DAG into the bag, recurses into sub dags.
493 :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
494 :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
495 """
496 self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
498 def _bag_dag(self, *, dag, root_dag, recursive):
499 """Actual implementation of bagging a dag.
501 The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``,
502 intended to only be used by the ``_bag_dag()`` implementation.
503 """
504 check_cycle(dag) # throws if a task cycle is found
506 dag.resolve_template_files()
507 dag.last_loaded = timezone.utcnow()
509 try:
510 # Check policies
511 settings.dag_policy(dag)
513 for task in dag.tasks:
514 settings.task_policy(task)
515 except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
516 raise
517 except Exception as e:
518 self.log.exception(e)
519 raise AirflowClusterPolicyError(e)
521 subdags = dag.subdags
523 try:
524 # DAG.subdags automatically performs DFS search, so we don't recurse
525 # into further _bag_dag() calls.
526 if recursive:
527 for subdag in subdags:
528 subdag.fileloc = dag.fileloc
529 subdag.parent_dag = dag
530 self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False)
532 prev_dag = self.dags.get(dag.dag_id)
533 if prev_dag and prev_dag.fileloc != dag.fileloc:
534 raise AirflowDagDuplicatedIdException(
535 dag_id=dag.dag_id,
536 incoming=dag.fileloc,
537 existing=self.dags[dag.dag_id].fileloc,
538 )
539 self.dags[dag.dag_id] = dag
540 self.log.debug("Loaded DAG %s", dag)
541 except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
542 # There was an error in bagging the dag. Remove it from the list of dags
543 self.log.exception("Exception bagging dag: %s", dag.dag_id)
544 # Only necessary at the root level since DAG.subdags automatically
545 # performs DFS to search through all subdags
546 if recursive:
547 for subdag in subdags:
548 if subdag.dag_id in self.dags:
549 del self.dags[subdag.dag_id]
550 raise
552 def collect_dags(
553 self,
554 dag_folder: str | Path | None = None,
555 only_if_updated: bool = True,
556 include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
557 safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
558 ):
559 """
560 Look for python modules in a given path, import them, and add them to the dagbag collection.
562 Note that if a ``.airflowignore`` file is found while processing
563 the directory, it will behave much like a ``.gitignore``,
564 ignoring files that match any of the patterns specified
565 in the file.
567 **Note**: The patterns in ``.airflowignore`` are interpreted as either
568 un-anchored regexes or gitignore-like glob expressions, depending on
569 the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
570 """
571 if self.read_dags_from_db:
572 return
574 self.log.info("Filling up the DagBag from %s", dag_folder)
575 dag_folder = dag_folder or self.dag_folder
576 # Used to store stats around DagBag processing
577 stats = []
579 # Ensure dag_folder is a str -- it may have been a pathlib.Path
580 dag_folder = correct_maybe_zipped(str(dag_folder))
581 for filepath in list_py_file_paths(
582 dag_folder,
583 safe_mode=safe_mode,
584 include_examples=include_examples,
585 ):
586 try:
587 file_parse_start_dttm = timezone.utcnow()
588 found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
590 file_parse_end_dttm = timezone.utcnow()
591 stats.append(
592 FileLoadStat(
593 file=filepath.replace(settings.DAGS_FOLDER, ""),
594 duration=file_parse_end_dttm - file_parse_start_dttm,
595 dag_num=len(found_dags),
596 task_num=sum(len(dag.tasks) for dag in found_dags),
597 dags=str([dag.dag_id for dag in found_dags]),
598 warning_num=len(self.captured_warnings.get(filepath, [])),
599 )
600 )
601 except Exception as e:
602 self.log.exception(e)
604 self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)
606 def collect_dags_from_db(self):
607 """Collect DAGs from database."""
608 from airflow.models.serialized_dag import SerializedDagModel
610 with Stats.timer("collect_db_dags"):
611 self.log.info("Filling up the DagBag from database")
613 # The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
614 # from the table by the scheduler job.
615 self.dags = SerializedDagModel.read_all_dags()
617 # Adds subdags.
618 # DAG post-processing steps such as self.bag_dag and croniter are not needed as
619 # they are done by scheduler before serialization.
620 subdags = {}
621 for dag in self.dags.values():
622 for subdag in dag.subdags:
623 subdags[subdag.dag_id] = subdag
624 self.dags.update(subdags)
626 def dagbag_report(self):
627 """Print a report around DagBag loading stats."""
628 stats = self.dagbag_stats
629 dag_folder = self.dag_folder
630 duration = sum((o.duration for o in stats), timedelta()).total_seconds()
631 dag_num = sum(o.dag_num for o in stats)
632 task_num = sum(o.task_num for o in stats)
633 table = tabulate(stats, headers="keys")
635 report = textwrap.dedent(
636 f"""\n
637 -------------------------------------------------------------------
638 DagBag loading stats for {dag_folder}
639 -------------------------------------------------------------------
640 Number of DAGs: {dag_num}
641 Total task number: {task_num}
642 DagBag parsing time: {duration}\n{table}
643 """
644 )
645 return report
647 @classmethod
648 @provide_session
649 def _sync_to_db(
650 cls,
651 dags: dict[str, DAG],
652 processor_subdir: str | None = None,
653 session: Session = NEW_SESSION,
654 ):
655 """Save attributes about list of DAG to the DB."""
656 # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag
657 from airflow.models.dag import DAG
658 from airflow.models.serialized_dag import SerializedDagModel
660 log = cls.logger()
662 def _serialize_dag_capturing_errors(dag, session, processor_subdir):
663 """
664 Try to serialize the dag to the DB, but make a note of any errors.
666 We can't place them directly in import_errors, as this may be retried, and work the next time
667 """
668 if dag.is_subdag:
669 return []
670 try:
671 # We can't use bulk_write_to_db as we want to capture each error individually
672 dag_was_updated = SerializedDagModel.write_dag(
673 dag,
674 min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
675 session=session,
676 processor_subdir=processor_subdir,
677 )
678 if dag_was_updated:
679 DagBag._sync_perm_for_dag(dag, session=session)
680 return []
681 except OperationalError:
682 raise
683 except Exception:
684 log.exception("Failed to write serialized DAG: %s", dag.fileloc)
685 dagbag_import_error_traceback_depth = conf.getint(
686 "core", "dagbag_import_error_traceback_depth"
687 )
688 return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
690 # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
691 # of any Operational Errors
692 # In case of failures, provide_session handles rollback
693 import_errors = {}
694 for attempt in run_with_db_retries(logger=log):
695 with attempt:
696 serialize_errors = []
697 log.debug(
698 "Running dagbag.sync_to_db with retries. Try %d of %d",
699 attempt.retry_state.attempt_number,
700 MAX_DB_RETRIES,
701 )
702 log.debug("Calling the DAG.bulk_sync_to_db method")
703 try:
704 # Write Serialized DAGs to DB, capturing errors
705 for dag in dags.values():
706 serialize_errors.extend(
707 _serialize_dag_capturing_errors(dag, session, processor_subdir)
708 )
710 DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session)
711 except OperationalError:
712 session.rollback()
713 raise
714 # Only now we are "complete" do we update import_errors - don't want to record errors from
715 # previous failed attempts
716 import_errors.update(dict(serialize_errors))
718 return import_errors
720 @provide_session
721 def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION):
722 import_errors = DagBag._sync_to_db(dags=self.dags, processor_subdir=processor_subdir, session=session)
723 self.import_errors.update(import_errors)
725 @classmethod
726 @provide_session
727 def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
728 """Sync DAG specific permissions."""
729 root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
731 cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
732 from airflow.www.security_appless import ApplessAirflowSecurityManager
734 security_manager = ApplessAirflowSecurityManager(session=session)
735 security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
738def generate_md5_hash(context):
739 fileloc = context.get_current_parameters()["fileloc"]
740 return hashlib.md5(fileloc.encode()).hexdigest()
743class DagPriorityParsingRequest(Base):
744 """Model to store the dag parsing requests that will be prioritized when parsing files."""
746 __tablename__ = "dag_priority_parsing_request"
748 # Adding a unique constraint to fileloc results in the creation of an index and we have a limitation
749 # on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc
750 # size consistent with other tables. This is a workaround to enforce the unique constraint.
751 id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash)
753 # The location of the file containing the DAG object
754 # Note: Do not depend on fileloc pointing to a file; in the case of a
755 # packaged DAG, it will point to the subpath of the DAG within the
756 # associated zip.
757 fileloc = Column(String(2000), nullable=False)
759 def __init__(self, fileloc: str) -> None:
760 super().__init__()
761 self.fileloc = fileloc
763 def __repr__(self) -> str:
764 return f"<DagPriorityParsingRequest: fileloc={self.fileloc}>"