Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/datasets/manager.py: 46%
48 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING
22from sqlalchemy import exc
23from sqlalchemy.orm.session import Session
25from airflow.configuration import conf
26from airflow.datasets import Dataset
27from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
28from airflow.stats import Stats
29from airflow.utils.log.logging_mixin import LoggingMixin
31if TYPE_CHECKING:
32 from airflow.models.taskinstance import TaskInstance
35class DatasetManager(LoggingMixin):
36 """
37 A pluggable class that manages operations for datasets.
39 The intent is to have one place to handle all Dataset-related operations, so different
40 Airflow deployments can use plugins that broadcast dataset events to each other.
41 """
43 def __init__(self, **kwargs):
44 super().__init__(**kwargs)
46 def register_dataset_change(
47 self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
48 ) -> None:
49 """
50 Register dataset related changes.
52 For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
53 the dataset event
54 """
55 dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
56 if not dataset_model:
57 self.log.warning("DatasetModel %s not found", dataset)
58 return
59 session.add(
60 DatasetEvent(
61 dataset_id=dataset_model.id,
62 source_task_id=task_instance.task_id,
63 source_dag_id=task_instance.dag_id,
64 source_run_id=task_instance.run_id,
65 source_map_index=task_instance.map_index,
66 extra=extra,
67 )
68 )
69 session.flush()
70 Stats.incr("dataset.updates")
71 if dataset_model.consuming_dags:
72 self._queue_dagruns(dataset_model, session)
73 session.flush()
75 def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
76 # Possible race condition: if multiple dags or multiple (usually
77 # mapped) tasks update the same dataset, this can fail with a unique
78 # constraint violation.
79 #
80 # If we support it, use ON CONFLICT to do nothing, otherwise
81 # "fallback" to running this in a nested transaction. This is needed
82 # so that the adding of these rows happens in the same transaction
83 # where `ti.state` is changed.
85 if session.bind.dialect.name == "postgresql":
86 return self._postgres_queue_dagruns(dataset, session)
87 return self._slow_path_queue_dagruns(dataset, session)
89 def _slow_path_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
90 consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
91 self.log.debug("consuming dag ids %s", consuming_dag_ids)
93 # Don't error whole transaction when a single RunQueue item conflicts.
94 # https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
95 for dag_id in consuming_dag_ids:
96 item = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset.id)
97 try:
98 with session.begin_nested():
99 session.merge(item)
100 except exc.IntegrityError:
101 self.log.debug("Skipping record %s", item, exc_info=True)
103 def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
104 from sqlalchemy.dialects.postgresql import insert
106 stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()
107 session.execute(
108 stmt,
109 [{"target_dag_id": target_dag.dag_id} for target_dag in dataset.consuming_dags],
110 )
113def resolve_dataset_manager() -> DatasetManager:
114 """Retrieve the dataset manager."""
115 _dataset_manager_class = conf.getimport(
116 section="core",
117 key="dataset_manager_class",
118 fallback="airflow.datasets.manager.DatasetManager",
119 )
120 _dataset_manager_kwargs = conf.getjson(
121 section="core",
122 key="dataset_manager_kwargs",
123 fallback={},
124 )
125 return _dataset_manager_class(**_dataset_manager_kwargs)
128dataset_manager = resolve_dataset_manager()