Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/ti_deps/deps/prev_dagrun_dep.py: 23%

60 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from sqlalchemy import func 

21 

22from airflow.models.taskinstance import PAST_DEPENDS_MET, TaskInstance as TI 

23from airflow.ti_deps.deps.base_ti_dep import BaseTIDep 

24from airflow.utils.session import provide_session 

25from airflow.utils.state import State 

26 

27 

28class PrevDagrunDep(BaseTIDep): 

29 """ 

30 Is the past dagrun in a state that allows this task instance to run. 

31 

32 For example, did this task instance's task in the previous dagrun complete 

33 if we are depending on past? 

34 """ 

35 

36 NAME = "Previous Dagrun State" 

37 IGNORABLE = True 

38 IS_TASK_DEP = True 

39 

40 @staticmethod 

41 def _push_past_deps_met_xcom_if_needed(ti: TI, dep_context): 

42 if dep_context.wait_for_past_depends_before_skipping: 

43 ti.xcom_push(key=PAST_DEPENDS_MET, value=True) 

44 

45 @provide_session 

46 def _get_dep_statuses(self, ti: TI, session, dep_context): 

47 if dep_context.ignore_depends_on_past: 

48 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

49 reason = "The context specified that the state of past DAGs could be ignored." 

50 yield self._passing_status(reason=reason) 

51 return 

52 

53 if not ti.task.depends_on_past: 

54 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

55 yield self._passing_status(reason="The task did not have depends_on_past set.") 

56 return 

57 

58 dr = ti.get_dagrun(session=session) 

59 if not dr: 

60 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

61 yield self._passing_status(reason="This task instance does not belong to a DAG.") 

62 return 

63 

64 # Don't depend on the previous task instance if we are the first task. 

65 catchup = ti.task.dag and ti.task.dag.catchup 

66 if catchup: 

67 last_dagrun = dr.get_previous_scheduled_dagrun(session) 

68 else: 

69 last_dagrun = dr.get_previous_dagrun(session=session) 

70 

71 # First ever run for this DAG. 

72 if not last_dagrun: 

73 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

74 yield self._passing_status(reason="This task instance was the first task instance for its task.") 

75 return 

76 

77 # There was a DAG run, but the task wasn't active back then. 

78 if catchup and last_dagrun.execution_date < ti.task.start_date: 

79 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

80 yield self._passing_status(reason="This task instance was the first task instance for its task.") 

81 return 

82 

83 previous_ti = last_dagrun.get_task_instance(ti.task_id, map_index=ti.map_index, session=session) 

84 if not previous_ti: 

85 if ti.task.ignore_first_depends_on_past: 

86 has_historical_ti = ( 

87 session.query(func.count(TI.dag_id)) 

88 .filter( 

89 TI.dag_id == ti.dag_id, 

90 TI.task_id == ti.task_id, 

91 TI.execution_date < ti.execution_date, 

92 ) 

93 .scalar() 

94 > 0 

95 ) 

96 if not has_historical_ti: 

97 self._push_past_deps_met_xcom_if_needed(ti, dep_context) 

98 yield self._passing_status( 

99 reason="ignore_first_depends_on_past is true for this task " 

100 "and it is the first task instance for its task." 

101 ) 

102 return 

103 

104 yield self._failing_status( 

105 reason="depends_on_past is true for this task's DAG, but the previous " 

106 "task instance has not run yet." 

107 ) 

108 return 

109 

110 if previous_ti.state not in {State.SKIPPED, State.SUCCESS}: 

111 yield self._failing_status( 

112 reason=( 

113 f"depends_on_past is true for this task, but the previous task instance {previous_ti} " 

114 f"is in the state '{previous_ti.state}' which is not a successful state." 

115 ) 

116 ) 

117 return 

118 

119 previous_ti.task = ti.task 

120 if ti.task.wait_for_downstream and not previous_ti.are_dependents_done(session=session): 

121 yield self._failing_status( 

122 reason=( 

123 f"The tasks downstream of the previous task instance {previous_ti} haven't completed " 

124 f"(and wait_for_downstream is True)." 

125 ) 

126 ) 

127 return 

128 self._push_past_deps_met_xcom_if_needed(ti, dep_context)