Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/state.py: 88%

60 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from enum import Enum 

21 

22 

23class TaskInstanceState(str, Enum): 

24 """ 

25 Enum that represents all possible states that a Task Instance can be in. 

26 

27 Note that None is also allowed, so always use this in a type hint with Optional. 

28 """ 

29 

30 # The scheduler sets a TaskInstance state to None when it's created but not 

31 # yet run, but we don't list it here since TaskInstance is a string enum. 

32 # Use None instead if need this state. 

33 

34 # Set by the scheduler 

35 REMOVED = "removed" # Task vanished from DAG before it ran 

36 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon 

37 

38 # Set by the task instance itself 

39 QUEUED = "queued" # Executor has enqueued the task 

40 RUNNING = "running" # Task is executing 

41 SUCCESS = "success" # Task completed 

42 SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running) 

43 RESTARTING = "restarting" # External request to restart (e.g. cleared when running) 

44 FAILED = "failed" # Task errored out 

45 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left 

46 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor 

47 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed 

48 SKIPPED = "skipped" # Skipped by branching or some other mechanism 

49 DEFERRED = "deferred" # Deferrable operator waiting on a trigger 

50 

51 def __str__(self) -> str: 

52 return self.value 

53 

54 

55class DagRunState(str, Enum): 

56 """ 

57 Enum that represents all possible states that a DagRun can be in. 

58 

59 These are "shared" with TaskInstanceState in some parts of the code, 

60 so please ensure that their values always match the ones with the 

61 same name in TaskInstanceState. 

62 """ 

63 

64 QUEUED = "queued" 

65 RUNNING = "running" 

66 SUCCESS = "success" 

67 FAILED = "failed" 

68 

69 def __str__(self) -> str: 

70 return self.value 

71 

72 

73class State: 

74 """ 

75 Static class with task instance state constants and color methods to 

76 avoid hardcoding. 

77 """ 

78 

79 # Backwards-compat constants for code that does not yet use the enum 

80 # These first three are shared by DagState and TaskState 

81 SUCCESS = TaskInstanceState.SUCCESS 

82 RUNNING = TaskInstanceState.RUNNING 

83 FAILED = TaskInstanceState.FAILED 

84 

85 # These are TaskState only 

86 NONE = None 

87 REMOVED = TaskInstanceState.REMOVED 

88 SCHEDULED = TaskInstanceState.SCHEDULED 

89 QUEUED = TaskInstanceState.QUEUED 

90 SHUTDOWN = TaskInstanceState.SHUTDOWN 

91 RESTARTING = TaskInstanceState.RESTARTING 

92 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY 

93 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE 

94 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED 

95 SKIPPED = TaskInstanceState.SKIPPED 

96 DEFERRED = TaskInstanceState.DEFERRED 

97 

98 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) 

99 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) 

100 

101 task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState) 

102 

103 dag_states: tuple[DagRunState, ...] = ( 

104 DagRunState.QUEUED, 

105 DagRunState.SUCCESS, 

106 DagRunState.RUNNING, 

107 DagRunState.FAILED, 

108 ) 

109 

110 state_color: dict[TaskInstanceState | None, str] = { 

111 None: "lightblue", 

112 TaskInstanceState.QUEUED: "gray", 

113 TaskInstanceState.RUNNING: "lime", 

114 TaskInstanceState.SUCCESS: "green", 

115 TaskInstanceState.SHUTDOWN: "blue", 

116 TaskInstanceState.RESTARTING: "violet", 

117 TaskInstanceState.FAILED: "red", 

118 TaskInstanceState.UP_FOR_RETRY: "gold", 

119 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise", 

120 TaskInstanceState.UPSTREAM_FAILED: "orange", 

121 TaskInstanceState.SKIPPED: "hotpink", 

122 TaskInstanceState.REMOVED: "lightgrey", 

123 TaskInstanceState.SCHEDULED: "tan", 

124 TaskInstanceState.DEFERRED: "mediumpurple", 

125 } 

126 

127 @classmethod 

128 def color(cls, state): 

129 """Returns color for a state.""" 

130 return cls.state_color.get(state, "white") 

131 

132 @classmethod 

133 def color_fg(cls, state): 

134 """Black&white colors for a state.""" 

135 color = cls.color(state) 

136 if color in ["green", "red"]: 

137 return "white" 

138 return "black" 

139 

140 finished: frozenset[TaskInstanceState] = frozenset( 

141 [ 

142 TaskInstanceState.SUCCESS, 

143 TaskInstanceState.FAILED, 

144 TaskInstanceState.SKIPPED, 

145 TaskInstanceState.UPSTREAM_FAILED, 

146 TaskInstanceState.REMOVED, 

147 ] 

148 ) 

149 """ 

150 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no 

151 further action. 

152 

153 Note that the attempt could have resulted in failure or have been 

154 interrupted; or perhaps never run at all (skip, or upstream_failed) in any 

155 case, it is no longer running. 

156 """ 

157 

158 unfinished: frozenset[TaskInstanceState | None] = frozenset( 

159 [ 

160 None, 

161 TaskInstanceState.SCHEDULED, 

162 TaskInstanceState.QUEUED, 

163 TaskInstanceState.RUNNING, 

164 TaskInstanceState.SHUTDOWN, 

165 TaskInstanceState.RESTARTING, 

166 TaskInstanceState.UP_FOR_RETRY, 

167 TaskInstanceState.UP_FOR_RESCHEDULE, 

168 TaskInstanceState.DEFERRED, 

169 ] 

170 ) 

171 """ 

172 A list of states indicating that a task either has not completed 

173 a run or has not even started. 

174 """ 

175 

176 failed_states: frozenset[TaskInstanceState] = frozenset( 

177 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] 

178 ) 

179 """ 

180 A list of states indicating that a task or dag is a failed state. 

181 """ 

182 

183 success_states: frozenset[TaskInstanceState] = frozenset( 

184 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] 

185 ) 

186 """ 

187 A list of states indicating that a task or dag is a success state. 

188 """ 

189 

190 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING]) 

191 """ 

192 A list of states indicating that a task has been terminated. 

193 """