Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/datasets/manager.py: 46%

48 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22from sqlalchemy import exc 

23from sqlalchemy.orm.session import Session 

24 

25from airflow.configuration import conf 

26from airflow.datasets import Dataset 

27from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel 

28from airflow.stats import Stats 

29from airflow.utils.log.logging_mixin import LoggingMixin 

30 

31if TYPE_CHECKING: 

32 from airflow.models.taskinstance import TaskInstance 

33 

34 

35class DatasetManager(LoggingMixin): 

36 """ 

37 A pluggable class that manages operations for datasets. 

38 

39 The intent is to have one place to handle all Dataset-related operations, so different 

40 Airflow deployments can use plugins that broadcast dataset events to each other. 

41 """ 

42 

43 def __init__(self, **kwargs): 

44 super().__init__(**kwargs) 

45 

46 def register_dataset_change( 

47 self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs 

48 ) -> None: 

49 """ 

50 Register dataset related changes. 

51 

52 For local datasets, look them up, record the dataset event, queue dagruns, and broadcast 

53 the dataset event 

54 """ 

55 dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none() 

56 if not dataset_model: 

57 self.log.warning("DatasetModel %s not found", dataset) 

58 return 

59 session.add( 

60 DatasetEvent( 

61 dataset_id=dataset_model.id, 

62 source_task_id=task_instance.task_id, 

63 source_dag_id=task_instance.dag_id, 

64 source_run_id=task_instance.run_id, 

65 source_map_index=task_instance.map_index, 

66 extra=extra, 

67 ) 

68 ) 

69 session.flush() 

70 Stats.incr("dataset.updates") 

71 if dataset_model.consuming_dags: 

72 self._queue_dagruns(dataset_model, session) 

73 session.flush() 

74 

75 def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None: 

76 # Possible race condition: if multiple dags or multiple (usually 

77 # mapped) tasks update the same dataset, this can fail with a unique 

78 # constraint violation. 

79 # 

80 # If we support it, use ON CONFLICT to do nothing, otherwise 

81 # "fallback" to running this in a nested transaction. This is needed 

82 # so that the adding of these rows happens in the same transaction 

83 # where `ti.state` is changed. 

84 

85 if session.bind.dialect.name == "postgresql": 

86 return self._postgres_queue_dagruns(dataset, session) 

87 return self._slow_path_queue_dagruns(dataset, session) 

88 

89 def _slow_path_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None: 

90 consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags] 

91 self.log.debug("consuming dag ids %s", consuming_dag_ids) 

92 

93 # Don't error whole transaction when a single RunQueue item conflicts. 

94 # https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint 

95 for dag_id in consuming_dag_ids: 

96 item = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset.id) 

97 try: 

98 with session.begin_nested(): 

99 session.merge(item) 

100 except exc.IntegrityError: 

101 self.log.debug("Skipping record %s", item, exc_info=True) 

102 

103 def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None: 

104 from sqlalchemy.dialects.postgresql import insert 

105 

106 stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing() 

107 session.execute( 

108 stmt, 

109 [{"target_dag_id": target_dag.dag_id} for target_dag in dataset.consuming_dags], 

110 ) 

111 

112 

113def resolve_dataset_manager() -> DatasetManager: 

114 """Retrieve the dataset manager.""" 

115 _dataset_manager_class = conf.getimport( 

116 section="core", 

117 key="dataset_manager_class", 

118 fallback="airflow.datasets.manager.DatasetManager", 

119 ) 

120 _dataset_manager_kwargs = conf.getjson( 

121 section="core", 

122 key="dataset_manager_kwargs", 

123 fallback={}, 

124 ) 

125 return _dataset_manager_class(**_dataset_manager_kwargs) 

126 

127 

128dataset_manager = resolve_dataset_manager()