Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/ti_deps/deps/base_ti_dep.py: 57%

47 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING, Any, Iterator, NamedTuple 

21 

22from airflow.ti_deps.dep_context import DepContext 

23from airflow.utils.session import provide_session 

24 

25if TYPE_CHECKING: 

26 from sqlalchemy.orm import Session 

27 

28 from airflow.models.taskinstance import TaskInstance 

29 

30 

31class BaseTIDep: 

32 """ 

33 Abstract base class for task instances dependencies. 

34 

35 All dependencies must be satisfied in order for task instances to run. 

36 For example, a task that can only run if a certain number of its upstream tasks succeed. 

37 This is an abstract class and must be subclassed to be used. 

38 """ 

39 

40 # If this dependency can be ignored by a context in which it is added to. Needed 

41 # because some dependencies should never be ignorable in their contexts. 

42 IGNORABLE = False 

43 

44 # Whether this dependency is not a global task instance dependency but specific 

45 # to some tasks (e.g. depends_on_past is not specified by all tasks). 

46 IS_TASK_DEP = False 

47 

48 def __eq__(self, other: Any) -> bool: 

49 return isinstance(self, type(other)) 

50 

51 def __hash__(self) -> int: 

52 return hash(type(self)) 

53 

54 def __repr__(self) -> str: 

55 return f"<TIDep({self.name})>" 

56 

57 @property 

58 def name(self) -> str: 

59 """The human-readable name for the dependency. 

60 

61 Use the class name as the default if ``NAME`` is not provided. 

62 """ 

63 return getattr(self, "NAME", self.__class__.__name__) 

64 

65 def _get_dep_statuses( 

66 self, 

67 ti: TaskInstance, 

68 session: Session, 

69 dep_context: DepContext, 

70 ) -> Iterator[TIDepStatus]: 

71 """ 

72 Abstract method that returns an iterable of TIDepStatus objects. 

73 

74 Each object describes whether the given task instance has this dependency met. 

75 

76 For example a subclass could return an iterable of TIDepStatus objects, each one 

77 representing if each of the passed in task's upstream tasks succeeded or not. 

78 

79 :param ti: the task instance to get the dependency status for 

80 :param session: database session 

81 :param dep_context: the context for which this dependency should be evaluated for 

82 """ 

83 raise NotImplementedError 

84 

85 @provide_session 

86 def get_dep_statuses( 

87 self, 

88 ti: TaskInstance, 

89 session: Session, 

90 dep_context: DepContext | None = None, 

91 ) -> Iterator[TIDepStatus]: 

92 """ 

93 Wrapper around the private _get_dep_statuses method. 

94 

95 Contains some global checks for all dependencies. 

96 

97 :param ti: the task instance to get the dependency status for 

98 :param session: database session 

99 :param dep_context: the context for which this dependency should be evaluated for 

100 """ 

101 cxt = DepContext() if dep_context is None else dep_context 

102 

103 if self.IGNORABLE and cxt.ignore_all_deps: 

104 yield self._passing_status(reason="Context specified all dependencies should be ignored.") 

105 return 

106 

107 if self.IS_TASK_DEP and cxt.ignore_task_deps: 

108 yield self._passing_status(reason="Context specified all task dependencies should be ignored.") 

109 return 

110 

111 yield from self._get_dep_statuses(ti, session, cxt) 

112 

113 @provide_session 

114 def is_met(self, ti: TaskInstance, session: Session, dep_context: DepContext | None = None) -> bool: 

115 """ 

116 Returns whether a dependency is met for a given task instance. 

117 

118 A dependency is considered met if all the dependency statuses it reports are passing. 

119 

120 :param ti: the task instance to see if this dependency is met for 

121 :param session: database session 

122 :param dep_context: The context this dependency is being checked under that stores 

123 state that can be used by this dependency. 

124 """ 

125 return all(status.passed for status in self.get_dep_statuses(ti, session, dep_context)) 

126 

127 @provide_session 

128 def get_failure_reasons( 

129 self, 

130 ti: TaskInstance, 

131 session: Session, 

132 dep_context: DepContext | None = None, 

133 ) -> Iterator[str]: 

134 """ 

135 Returns an iterable of strings that explain why this dependency wasn't met. 

136 

137 :param ti: the task instance to see if this dependency is met for 

138 :param session: database session 

139 :param dep_context: The context this dependency is being checked under that stores 

140 state that can be used by this dependency. 

141 """ 

142 for dep_status in self.get_dep_statuses(ti, session, dep_context): 

143 if not dep_status.passed: 

144 yield dep_status.reason 

145 

146 def _failing_status(self, reason: str = "") -> TIDepStatus: 

147 return TIDepStatus(self.name, False, reason) 

148 

149 def _passing_status(self, reason: str = "") -> TIDepStatus: 

150 return TIDepStatus(self.name, True, reason) 

151 

152 

153class TIDepStatus(NamedTuple): 

154 """Dependency status for a task instance indicating whether the task instance passed the dependency.""" 

155 

156 dep_name: str 

157 passed: bool 

158 reason: str