Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/ti_deps/deps/base_ti_dep.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

47 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING, Any, Iterator, NamedTuple 

21 

22from airflow.ti_deps.dep_context import DepContext 

23from airflow.utils.session import provide_session 

24 

25if TYPE_CHECKING: 

26 from sqlalchemy.orm import Session 

27 

28 from airflow.models.taskinstance import TaskInstance 

29 

30 

31class BaseTIDep: 

32 """ 

33 Abstract base class for task instances dependencies. 

34 

35 All dependencies must be satisfied in order for task instances to run. 

36 For example, a task that can only run if a certain number of its upstream tasks succeed. 

37 This is an abstract class and must be subclassed to be used. 

38 """ 

39 

40 # If this dependency can be ignored by a context in which it is added to. Needed 

41 # because some dependencies should never be ignorable in their contexts. 

42 IGNORABLE = False 

43 

44 # Whether this dependency is not a global task instance dependency but specific 

45 # to some tasks (e.g. depends_on_past is not specified by all tasks). 

46 IS_TASK_DEP = False 

47 

48 def __eq__(self, other: Any) -> bool: 

49 """Check if two task instance dependencies are equal by comparing their types.""" 

50 return isinstance(self, type(other)) 

51 

52 def __hash__(self) -> int: 

53 """Compute the hash value based on the task instance dependency type.""" 

54 return hash(type(self)) 

55 

56 def __repr__(self) -> str: 

57 """Return a string representation of the task instance dependency.""" 

58 return f"<TIDep({self.name})>" 

59 

60 @property 

61 def name(self) -> str: 

62 """The human-readable name for the dependency. 

63 

64 Use the class name as the default if ``NAME`` is not provided. 

65 """ 

66 return getattr(self, "NAME", self.__class__.__name__) 

67 

68 def _get_dep_statuses( 

69 self, 

70 ti: TaskInstance, 

71 session: Session, 

72 dep_context: DepContext, 

73 ) -> Iterator[TIDepStatus]: 

74 """ 

75 Abstract method that returns an iterable of TIDepStatus objects. 

76 

77 Each object describes whether the given task instance has this dependency met. 

78 

79 For example a subclass could return an iterable of TIDepStatus objects, each one 

80 representing if each of the passed in task's upstream tasks succeeded or not. 

81 

82 :param ti: the task instance to get the dependency status for 

83 :param session: database session 

84 :param dep_context: the context for which this dependency should be evaluated for 

85 """ 

86 raise NotImplementedError 

87 

88 @provide_session 

89 def get_dep_statuses( 

90 self, 

91 ti: TaskInstance, 

92 session: Session, 

93 dep_context: DepContext | None = None, 

94 ) -> Iterator[TIDepStatus]: 

95 """ 

96 Wrap around the private _get_dep_statuses method. 

97 

98 Contains some global checks for all dependencies. 

99 

100 :param ti: the task instance to get the dependency status for 

101 :param session: database session 

102 :param dep_context: the context for which this dependency should be evaluated for 

103 """ 

104 cxt = DepContext() if dep_context is None else dep_context 

105 

106 if self.IGNORABLE and cxt.ignore_all_deps: 

107 yield self._passing_status(reason="Context specified all dependencies should be ignored.") 

108 return 

109 

110 if self.IS_TASK_DEP and cxt.ignore_task_deps: 

111 yield self._passing_status(reason="Context specified all task dependencies should be ignored.") 

112 return 

113 

114 yield from self._get_dep_statuses(ti, session, cxt) 

115 

116 @provide_session 

117 def is_met(self, ti: TaskInstance, session: Session, dep_context: DepContext | None = None) -> bool: 

118 """ 

119 Return whether a dependency is met for a given task instance. 

120 

121 A dependency is considered met if all the dependency statuses it reports are passing. 

122 

123 :param ti: the task instance to see if this dependency is met for 

124 :param session: database session 

125 :param dep_context: The context this dependency is being checked under that stores 

126 state that can be used by this dependency. 

127 """ 

128 return all(status.passed for status in self.get_dep_statuses(ti, session, dep_context)) 

129 

130 @provide_session 

131 def get_failure_reasons( 

132 self, 

133 ti: TaskInstance, 

134 session: Session, 

135 dep_context: DepContext | None = None, 

136 ) -> Iterator[str]: 

137 """ 

138 Return an iterable of strings that explain why this dependency wasn't met. 

139 

140 :param ti: the task instance to see if this dependency is met for 

141 :param session: database session 

142 :param dep_context: The context this dependency is being checked under that stores 

143 state that can be used by this dependency. 

144 """ 

145 for dep_status in self.get_dep_statuses(ti, session, dep_context): 

146 if not dep_status.passed: 

147 yield dep_status.reason 

148 

149 def _failing_status(self, reason: str = "") -> TIDepStatus: 

150 return TIDepStatus(self.name, False, reason) 

151 

152 def _passing_status(self, reason: str = "") -> TIDepStatus: 

153 return TIDepStatus(self.name, True, reason) 

154 

155 

156class TIDepStatus(NamedTuple): 

157 """Dependency status for a task instance indicating whether the task instance passed the dependency.""" 

158 

159 dep_name: str 

160 passed: bool 

161 reason: str