Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/state.py: 88%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

68 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from enum import Enum 

21 

22 

23class JobState(str, Enum): 

24 """All possible states that a Job can be in.""" 

25 

26 RUNNING = "running" 

27 SUCCESS = "success" 

28 RESTARTING = "restarting" 

29 FAILED = "failed" 

30 

31 def __str__(self) -> str: 

32 return self.value 

33 

34 

35class TaskInstanceState(str, Enum): 

36 """All possible states that a Task Instance can be in. 

37 

38 Note that None is also allowed, so always use this in a type hint with Optional. 

39 """ 

40 

41 # The scheduler sets a TaskInstance state to None when it's created but not 

42 # yet run, but we don't list it here since TaskInstance is a string enum. 

43 # Use None instead if need this state. 

44 

45 # Set by the scheduler 

46 REMOVED = "removed" # Task vanished from DAG before it ran 

47 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon 

48 

49 # Set by the task instance itself 

50 QUEUED = "queued" # Executor has enqueued the task 

51 RUNNING = "running" # Task is executing 

52 SUCCESS = "success" # Task completed 

53 RESTARTING = "restarting" # External request to restart (e.g. cleared when running) 

54 FAILED = "failed" # Task errored out 

55 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left 

56 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor 

57 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed 

58 SKIPPED = "skipped" # Skipped by branching or some other mechanism 

59 DEFERRED = "deferred" # Deferrable operator waiting on a trigger 

60 

61 # Not used anymore, kept for compatibility. 

62 # TODO: Remove in Airflow 3.0. 

63 SHUTDOWN = "shutdown" 

64 """The task instance is being shut down. 

65 

66 :meta private: 

67 """ 

68 

69 def __str__(self) -> str: 

70 return self.value 

71 

72 

73class DagRunState(str, Enum): 

74 """All possible states that a DagRun can be in. 

75 

76 These are "shared" with TaskInstanceState in some parts of the code, 

77 so please ensure that their values always match the ones with the 

78 same name in TaskInstanceState. 

79 """ 

80 

81 QUEUED = "queued" 

82 RUNNING = "running" 

83 SUCCESS = "success" 

84 FAILED = "failed" 

85 

86 def __str__(self) -> str: 

87 return self.value 

88 

89 

90class State: 

91 """Static class with task instance state constants and color methods to avoid hard-coding.""" 

92 

93 # Backwards-compat constants for code that does not yet use the enum 

94 # These first three are shared by DagState and TaskState 

95 SUCCESS = TaskInstanceState.SUCCESS 

96 RUNNING = TaskInstanceState.RUNNING 

97 FAILED = TaskInstanceState.FAILED 

98 

99 # These are TaskState only 

100 NONE = None 

101 REMOVED = TaskInstanceState.REMOVED 

102 SCHEDULED = TaskInstanceState.SCHEDULED 

103 QUEUED = TaskInstanceState.QUEUED 

104 RESTARTING = TaskInstanceState.RESTARTING 

105 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY 

106 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE 

107 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED 

108 SKIPPED = TaskInstanceState.SKIPPED 

109 DEFERRED = TaskInstanceState.DEFERRED 

110 

111 # Not used anymore, kept for compatibility. 

112 # TODO: Remove in Airflow 3.0. 

113 SHUTDOWN = TaskInstanceState.SHUTDOWN 

114 """The task instance is being shut down. 

115 

116 :meta private: 

117 """ 

118 

119 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) 

120 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) 

121 

122 task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState) 

123 

124 dag_states: tuple[DagRunState, ...] = ( 

125 DagRunState.QUEUED, 

126 DagRunState.SUCCESS, 

127 DagRunState.RUNNING, 

128 DagRunState.FAILED, 

129 ) 

130 

131 state_color: dict[TaskInstanceState | None, str] = { 

132 None: "lightblue", 

133 TaskInstanceState.QUEUED: "gray", 

134 TaskInstanceState.RUNNING: "lime", 

135 TaskInstanceState.SUCCESS: "green", 

136 TaskInstanceState.RESTARTING: "violet", 

137 TaskInstanceState.FAILED: "red", 

138 TaskInstanceState.UP_FOR_RETRY: "gold", 

139 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise", 

140 TaskInstanceState.UPSTREAM_FAILED: "orange", 

141 TaskInstanceState.SKIPPED: "hotpink", 

142 TaskInstanceState.REMOVED: "lightgrey", 

143 TaskInstanceState.SCHEDULED: "tan", 

144 TaskInstanceState.DEFERRED: "mediumpurple", 

145 } 

146 

147 @classmethod 

148 def color(cls, state): 

149 """Return color for a state.""" 

150 return cls.state_color.get(state, "white") 

151 

152 @classmethod 

153 def color_fg(cls, state): 

154 """Black&white colors for a state.""" 

155 color = cls.color(state) 

156 if color in ["green", "red"]: 

157 return "white" 

158 return "black" 

159 

160 finished: frozenset[TaskInstanceState] = frozenset( 

161 [ 

162 TaskInstanceState.SUCCESS, 

163 TaskInstanceState.FAILED, 

164 TaskInstanceState.SKIPPED, 

165 TaskInstanceState.UPSTREAM_FAILED, 

166 TaskInstanceState.REMOVED, 

167 ] 

168 ) 

169 """ 

170 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no 

171 further action. 

172 

173 Note that the attempt could have resulted in failure or have been 

174 interrupted; or perhaps never run at all (skip, or upstream_failed) in any 

175 case, it is no longer running. 

176 """ 

177 

178 unfinished: frozenset[TaskInstanceState | None] = frozenset( 

179 [ 

180 None, 

181 TaskInstanceState.SCHEDULED, 

182 TaskInstanceState.QUEUED, 

183 TaskInstanceState.RUNNING, 

184 TaskInstanceState.RESTARTING, 

185 TaskInstanceState.UP_FOR_RETRY, 

186 TaskInstanceState.UP_FOR_RESCHEDULE, 

187 TaskInstanceState.DEFERRED, 

188 ] 

189 ) 

190 """ 

191 A list of states indicating that a task either has not completed 

192 a run or has not even started. 

193 """ 

194 

195 failed_states: frozenset[TaskInstanceState] = frozenset( 

196 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] 

197 ) 

198 """ 

199 A list of states indicating that a task or dag is a failed state. 

200 """ 

201 

202 success_states: frozenset[TaskInstanceState] = frozenset( 

203 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] 

204 ) 

205 """ 

206 A list of states indicating that a task or dag is a success state. 

207 """ 

208 

209 # Kept for compatibility. DO NOT USE. 

210 # TODO: Remove in Airflow 3.0. 

211 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING]) 

212 """ 

213 A list of states indicating that a task has been terminated. 

214 

215 :meta private: 

216 """ 

217 

218 adoptable_states = frozenset( 

219 [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING] 

220 ) 

221 """ 

222 A list of states indicating that a task can be adopted or reset by a scheduler job 

223 if it was queued by another scheduler job that is not running anymore. 

224 """