1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20from typing import TYPE_CHECKING
21
22from sqlalchemy import func, or_, select
23
24from airflow.models.dagrun import DagRun
25from airflow.models.taskinstance import PAST_DEPENDS_MET, TaskInstance as TI
26from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
27from airflow.utils.db import exists_query
28from airflow.utils.session import provide_session
29from airflow.utils.state import TaskInstanceState
30
31if TYPE_CHECKING:
32 from sqlalchemy.orm import Session
33
34 from airflow.models.operator import Operator
35
36_SUCCESSFUL_STATES = (TaskInstanceState.SKIPPED, TaskInstanceState.SUCCESS)
37
38
39class PrevDagrunDep(BaseTIDep):
40 """
41 Is the past dagrun in a state that allows this task instance to run.
42
43 For example, did this task instance's task in the previous dagrun complete
44 if we are depending on past?
45 """
46
47 NAME = "Previous Dagrun State"
48 IGNORABLE = True
49 IS_TASK_DEP = True
50
51 @staticmethod
52 def _push_past_deps_met_xcom_if_needed(ti: TI, dep_context):
53 if dep_context.wait_for_past_depends_before_skipping:
54 ti.xcom_push(key=PAST_DEPENDS_MET, value=True)
55
56 @staticmethod
57 def _has_tis(dagrun: DagRun, task_id: str, *, session: Session) -> bool:
58 """Check if a task has presence in the specified DAG run.
59
60 This function exists for easy mocking in tests.
61 """
62 return exists_query(
63 TI.dag_id == dagrun.dag_id,
64 TI.task_id == task_id,
65 TI.run_id == dagrun.run_id,
66 session=session,
67 )
68
69 @staticmethod
70 def _has_any_prior_tis(ti: TI, *, session: Session) -> bool:
71 """Check if a task has ever been run before.
72
73 This function exists for easy mocking in tests.
74 """
75 return exists_query(
76 TI.dag_id == ti.dag_id,
77 TI.task_id == ti.task_id,
78 TI.execution_date < ti.execution_date,
79 session=session,
80 )
81
82 @staticmethod
83 def _count_unsuccessful_tis(dagrun: DagRun, task_id: str, *, session: Session) -> int:
84 """Get a count of unsuccessful task instances in a given run.
85
86 Due to historical design considerations, "unsuccessful" here means the
87 task instance is not in either SUCCESS or SKIPPED state. This means that
88 unfinished states such as RUNNING are considered unsuccessful.
89
90 This function exists for easy mocking in tests.
91 """
92 return session.scalar(
93 select(func.count()).where(
94 TI.dag_id == dagrun.dag_id,
95 TI.task_id == task_id,
96 TI.run_id == dagrun.run_id,
97 or_(TI.state.is_(None), TI.state.not_in(_SUCCESSFUL_STATES)),
98 )
99 )
100
101 @staticmethod
102 def _has_unsuccessful_dependants(dagrun: DagRun, task: Operator, *, session: Session) -> bool:
103 """Check if any of the task's dependants are unsuccessful in a given run.
104
105 Due to historical design considerations, "unsuccessful" here means the
106 task instance is not in either SUCCESS or SKIPPED state. This means that
107 unfinished states such as RUNNING are considered unsuccessful.
108
109 This function exists for easy mocking in tests.
110 """
111 if not task.downstream_task_ids:
112 return False
113 return exists_query(
114 TI.dag_id == dagrun.dag_id,
115 TI.task_id.in_(task.downstream_task_ids),
116 TI.run_id == dagrun.run_id,
117 or_(TI.state.is_(None), TI.state.not_in(_SUCCESSFUL_STATES)),
118 session=session,
119 )
120
121 @provide_session
122 def _get_dep_statuses(self, ti: TI, session: Session, dep_context):
123 if TYPE_CHECKING:
124 assert ti.task
125 if dep_context.ignore_depends_on_past:
126 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
127 reason = "The context specified that the state of past DAGs could be ignored."
128 yield self._passing_status(reason=reason)
129 return
130
131 if not ti.task.depends_on_past:
132 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
133 yield self._passing_status(reason="The task did not have depends_on_past set.")
134 return
135
136 dr = ti.get_dagrun(session=session)
137 if not dr:
138 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
139 yield self._passing_status(reason="This task instance does not belong to a DAG.")
140 return
141
142 # Don't depend on the previous task instance if we are the first task.
143 catchup = ti.task.dag and ti.task.dag.catchup
144 if catchup:
145 last_dagrun = DagRun.get_previous_scheduled_dagrun(dr.id, session)
146 else:
147 last_dagrun = DagRun.get_previous_dagrun(dr, session=session)
148
149 # First ever run for this DAG.
150 if not last_dagrun:
151 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
152 yield self._passing_status(reason="This task instance was the first task instance for its task.")
153 return
154
155 # There was a DAG run, but the task wasn't active back then.
156 if catchup and last_dagrun.execution_date < ti.task.start_date:
157 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
158 yield self._passing_status(reason="This task instance was the first task instance for its task.")
159 return
160
161 if not self._has_tis(last_dagrun, ti.task_id, session=session):
162 if ti.task.ignore_first_depends_on_past:
163 if not self._has_any_prior_tis(ti, session=session):
164 self._push_past_deps_met_xcom_if_needed(ti, dep_context)
165 yield self._passing_status(
166 reason="ignore_first_depends_on_past is true for this task "
167 "and it is the first task instance for its task."
168 )
169 return
170
171 yield self._failing_status(
172 reason="depends_on_past is true for this task's DAG, but the previous "
173 "task instance has not run yet."
174 )
175 return
176
177 unsuccessful_tis_count = self._count_unsuccessful_tis(last_dagrun, ti.task_id, session=session)
178 if unsuccessful_tis_count > 0:
179 reason = (
180 f"depends_on_past is true for this task, but {unsuccessful_tis_count} "
181 f"previous task instance(s) are not in a successful state."
182 )
183 yield self._failing_status(reason=reason)
184 return
185
186 if ti.task.wait_for_downstream and self._has_unsuccessful_dependants(
187 last_dagrun, ti.task, session=session
188 ):
189 yield self._failing_status(
190 reason=(
191 "The tasks downstream of the previous task instance(s) "
192 "haven't completed, and wait_for_downstream is True."
193 )
194 )
195 return
196
197 self._push_past_deps_met_xcom_if_needed(ti, dep_context)