Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/datasets/manager.py: 46%
46 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING
22from sqlalchemy import exc
23from sqlalchemy.orm.session import Session
25from airflow.configuration import conf
26from airflow.datasets import Dataset
27from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
28from airflow.utils.log.logging_mixin import LoggingMixin
30if TYPE_CHECKING:
31 from airflow.models.taskinstance import TaskInstance
34class DatasetManager(LoggingMixin):
35 """
36 A pluggable class that manages operations for datasets.
38 The intent is to have one place to handle all Dataset-related operations, so different
39 Airflow deployments can use plugins that broadcast dataset events to each other.
40 """
42 def __init__(self, **kwargs):
43 super().__init__(**kwargs)
45 def register_dataset_change(
46 self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs
47 ) -> None:
48 """
49 Register dataset related changes.
51 For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
52 the dataset event
53 """
54 dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
55 if not dataset_model:
56 self.log.warning("DatasetModel %s not found", dataset)
57 return
58 session.add(
59 DatasetEvent(
60 dataset_id=dataset_model.id,
61 source_task_id=task_instance.task_id,
62 source_dag_id=task_instance.dag_id,
63 source_run_id=task_instance.run_id,
64 source_map_index=task_instance.map_index,
65 extra=extra,
66 )
67 )
68 session.flush()
69 if dataset_model.consuming_dags:
70 self._queue_dagruns(dataset_model, session)
71 session.flush()
73 def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
74 # Possible race condition: if multiple dags or multiple (usually
75 # mapped) tasks update the same dataset, this can fail with a unique
76 # constraint violation.
77 #
78 # If we support it, use ON CONFLICT to do nothing, otherwise
79 # "fallback" to running this in a nested transaction. This is needed
80 # so that the adding of these rows happens in the same transaction
81 # where `ti.state` is changed.
83 if session.bind.dialect.name == "postgresql":
84 return self._postgres_queue_dagruns(dataset, session)
85 return self._slow_path_queue_dagruns(dataset, session)
87 def _slow_path_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
88 consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
89 self.log.debug("consuming dag ids %s", consuming_dag_ids)
91 # Don't error whole transaction when a single RunQueue item conflicts.
92 # https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
93 for dag_id in consuming_dag_ids:
94 item = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset.id)
95 try:
96 with session.begin_nested():
97 session.merge(item)
98 except exc.IntegrityError:
99 self.log.debug("Skipping record %s", item, exc_info=True)
101 def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
102 from sqlalchemy.dialects.postgresql import insert
104 stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()
105 session.execute(
106 stmt,
107 [{"target_dag_id": target_dag.dag_id} for target_dag in dataset.consuming_dags],
108 )
111def resolve_dataset_manager() -> DatasetManager:
112 """Retrieve the dataset manager."""
113 _dataset_manager_class = conf.getimport(
114 section="core",
115 key="dataset_manager_class",
116 fallback="airflow.datasets.manager.DatasetManager",
117 )
118 _dataset_manager_kwargs = conf.getjson(
119 section="core",
120 key="dataset_manager_kwargs",
121 fallback={},
122 )
123 return _dataset_manager_class(**_dataset_manager_kwargs)
126dataset_manager = resolve_dataset_manager()