Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/ti_deps/deps/prev_dagrun_dep.py: 26%
47 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from sqlalchemy import func
22from airflow.models.taskinstance import TaskInstance as TI
23from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
24from airflow.utils.session import provide_session
25from airflow.utils.state import State
28class PrevDagrunDep(BaseTIDep):
29 """
30 Is the past dagrun in a state that allows this task instance to run.
32 For example, did this task instance's task in the previous dagrun complete
33 if we are depending on past?
34 """
36 NAME = "Previous Dagrun State"
37 IGNORABLE = True
38 IS_TASK_DEP = True
40 @provide_session
41 def _get_dep_statuses(self, ti: TI, session, dep_context):
42 if dep_context.ignore_depends_on_past:
43 reason = "The context specified that the state of past DAGs could be ignored."
44 yield self._passing_status(reason=reason)
45 return
47 if not ti.task.depends_on_past:
48 yield self._passing_status(reason="The task did not have depends_on_past set.")
49 return
51 dr = ti.get_dagrun(session=session)
52 if not dr:
53 yield self._passing_status(reason="This task instance does not belong to a DAG.")
54 return
56 # Don't depend on the previous task instance if we are the first task.
57 catchup = ti.task.dag and ti.task.dag.catchup
58 if catchup:
59 last_dagrun = dr.get_previous_scheduled_dagrun(session)
60 else:
61 last_dagrun = dr.get_previous_dagrun(session=session)
63 # First ever run for this DAG.
64 if not last_dagrun:
65 yield self._passing_status(reason="This task instance was the first task instance for its task.")
66 return
68 # There was a DAG run, but the task wasn't active back then.
69 if catchup and last_dagrun.execution_date < ti.task.start_date:
70 yield self._passing_status(reason="This task instance was the first task instance for its task.")
71 return
73 previous_ti = last_dagrun.get_task_instance(ti.task_id, map_index=ti.map_index, session=session)
74 if not previous_ti:
75 if ti.task.ignore_first_depends_on_past:
76 has_historical_ti = (
77 session.query(func.count(TI.dag_id))
78 .filter(
79 TI.dag_id == ti.dag_id,
80 TI.task_id == ti.task_id,
81 TI.execution_date < ti.execution_date,
82 )
83 .scalar()
84 > 0
85 )
86 if not has_historical_ti:
87 yield self._passing_status(
88 reason="ignore_first_depends_on_past is true for this task "
89 "and it is the first task instance for its task."
90 )
91 return
93 yield self._failing_status(
94 reason="depends_on_past is true for this task's DAG, but the previous "
95 "task instance has not run yet."
96 )
97 return
99 if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
100 yield self._failing_status(
101 reason=(
102 f"depends_on_past is true for this task, but the previous task instance {previous_ti} "
103 f"is in the state '{previous_ti.state}' which is not a successful state."
104 )
105 )
107 previous_ti.task = ti.task
108 if ti.task.wait_for_downstream and not previous_ti.are_dependents_done(session=session):
109 yield self._failing_status(
110 reason=(
111 f"The tasks downstream of the previous task instance {previous_ti} haven't completed "
112 f"(and wait_for_downstream is True)."
113 )
114 )