Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22from sqlalchemy import func, or_, select 

23 

24from airflow.models.dagrun import DagRun 

25from airflow.models.taskinstance import PAST_DEPENDS_MET, TaskInstance as TI 

26from airflow.ti_deps.deps.base_ti_dep import BaseTIDep 

27from airflow.utils.db import exists_query 

28from airflow.utils.session import provide_session 

29from airflow.utils.state import TaskInstanceState 

30 

31if TYPE_CHECKING: 

32 from sqlalchemy.orm import Session 

33 

34 from airflow.models.operator import Operator 

35 

36_SUCCESSFUL_STATES = (TaskInstanceState.SKIPPED, TaskInstanceState.SUCCESS) 

37 

38 

39class PrevDagrunDep(BaseTIDep): 

40 """ 

41 Is the past dagrun in a state that allows this task instance to run. 

42 

43 For example, did this task instance's task in the previous dagrun complete 

44 if we are depending on past? 

45 """ 

46 

47 NAME = "Previous Dagrun State" 

48 IGNORABLE = True 

49 IS_TASK_DEP = True 

50 

51 @staticmethod 

52 def _push_past_deps_met_xcom_if_needed(ti: TI, dep_context): 

53 if dep_context.wait_for_past_depends_before_skipping: 

54 ti.xcom_push(key=PAST_DEPENDS_MET, value=True) 

55 

56 @staticmethod 

57 def _has_tis(dagrun: DagRun, task_id: str, *, session: Session) -> bool: 

58 """Check if a task has presence in the specified DAG run. 

59 

60 This function exists for easy mocking in tests. 

61 """ 

62 return exists_query( 

63 TI.dag_id == dagrun.dag_id, 

64 TI.task_id == task_id, 

65 TI.run_id == dagrun.run_id, 

66 session=session, 

67 ) 

68 

69 @staticmethod 

70 def _has_any_prior_tis(ti: TI, *, session: Session) -> bool: 

71 """Check if a task has ever been run before. 

72 

73 This function exists for easy mocking in tests. 

74 """ 

75 return exists_query( 

76 TI.dag_id == ti.dag_id, 

77 TI.task_id == ti.task_id, 

78 TI.execution_date < ti.execution_date, 

79 session=session, 

80 ) 

81 

82 @staticmethod 

83 def _count_unsuccessful_tis(dagrun: DagRun, task_id: str, *, session: Session) -> int: 

84 """Get a count of unsuccessful task instances in a given run. 

85 

86 Due to historical design considerations, "unsuccessful" here means the 

87 task instance is not in either SUCCESS or SKIPPED state. This means that 

88 unfinished states such as RUNNING are considered unsuccessful. 

89 

90 This function exists for easy mocking in tests. 

91 """ 

92 return session.scalar( 

93 select(func.count()).where( 

94 TI.dag_id == dagrun.dag_id, 

95 TI.task_id == task_id, 

96 TI.run_id == dagrun.run_id, 

97 or_(TI.state.is_(None), TI.state.not_in(_SUCCESSFUL_STATES)), 

98 ) 

99 ) 

100 

101 @staticmethod 

102 def _has_unsuccessful_dependants(dagrun: DagRun, task: Operator, *, session: Session) -> bool: 

103 """Check if any of the task's dependants are unsuccessful in a given run. 

104 

105 Due to historical design considerations, "unsuccessful" here means the 

106 task instance is not in either SUCCESS or SKIPPED state. This means that 

107 unfinished states such as RUNNING are considered unsuccessful. 

108 

109 This function exists for easy mocking in tests. 

110 """ 

111 if not task.downstream_task_ids: 

112 return False 

113 return exists_query( 

114 TI.dag_id == dagrun.dag_id, 

115 TI.task_id.in_(task.downstream_task_ids), 

116 TI.run_id == dagrun.run_id, 

117 or_(TI.state.is_(None), TI.state.not_in(_SUCCESSFUL_STATES)), 

118 session=session, 

119 ) 

120 

121 @provide_session 

122 def _get_dep_statuses(self, ti: TI, session: Session, dep_context): 

123 if TYPE_CHECKING: 

124 assert ti.task 

125 if dep_context.ignore_depends_on_past: 

126 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

127 reason = "The context specified that the state of past DAGs could be ignored." 

128 yield self._passing_status(reason=reason) 

129 return 

130 

131 if not ti.task.depends_on_past: 

132 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

133 yield self._passing_status(reason="The task did not have depends_on_past set.") 

134 return 

135 

136 dr = ti.get_dagrun(session=session) 

137 if not dr: 

138 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

139 yield self._passing_status(reason="This task instance does not belong to a DAG.") 

140 return 

141 

142 # Don't depend on the previous task instance if we are the first task. 

143 catchup = ti.task.dag and ti.task.dag.catchup 

144 if catchup: 

145 last_dagrun = DagRun.get_previous_scheduled_dagrun(dr.id, session) 

146 else: 

147 last_dagrun = DagRun.get_previous_dagrun(dr, session=session) 

148 

149 # First ever run for this DAG. 

150 if not last_dagrun: 

151 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

152 yield self._passing_status(reason="This task instance was the first task instance for its task.") 

153 return 

154 

155 # There was a DAG run, but the task wasn't active back then. 

156 if catchup and last_dagrun.execution_date < ti.task.start_date: 

157 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

158 yield self._passing_status(reason="This task instance was the first task instance for its task.") 

159 return 

160 

161 if not self._has_tis(last_dagrun, ti.task_id, session=session): 

162 if ti.task.ignore_first_depends_on_past: 

163 if not self._has_any_prior_tis(ti, session=session): 

164 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

165 yield self._passing_status( 

166 reason="ignore_first_depends_on_past is true for this task " 

167 "and it is the first task instance for its task." 

168 ) 

169 return 

170 

171 yield self._failing_status( 

172 reason="depends_on_past is true for this task's DAG, but the previous " 

173 "task instance has not run yet." 

174 ) 

175 return 

176 

177 unsuccessful_tis_count = self._count_unsuccessful_tis(last_dagrun, ti.task_id, session=session) 

178 if unsuccessful_tis_count > 0: 

179 reason = ( 

180 f"depends_on_past is true for this task, but {unsuccessful_tis_count} " 

181 f"previous task instance(s) are not in a successful state." 

182 ) 

183 yield self._failing_status(reason=reason) 

184 return 

185 

186 if ti.task.wait_for_downstream and self._has_unsuccessful_dependants( 

187 last_dagrun, ti.task, session=session 

188 ): 

189 yield self._failing_status( 

190 reason=( 

191 "The tasks downstream of the previous task instance(s) " 

192 "haven't completed, and wait_for_downstream is True." 

193 ) 

194 ) 

195 return 

196 

197 self._push_past_deps_met_xcom_if_needed(ti, dep_context)