Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/state.py: 86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from enum import Enum 

21 

22 

23class TerminalTIState(str, Enum): 

24 """States that a Task Instance can be in that indicate it has reached a terminal state.""" 

25 

26 SUCCESS = "success" 

27 FAILED = "failed" 

28 SKIPPED = "skipped" # A user can raise a AirflowSkipException from a task & it will be marked as skipped 

29 UPSTREAM_FAILED = "upstream_failed" 

30 REMOVED = "removed" 

31 

32 def __str__(self) -> str: 

33 return self.value 

34 

35 

36class IntermediateTIState(str, Enum): 

37 """States that a Task Instance can be in that indicate it is not yet in a terminal or running state.""" 

38 

39 SCHEDULED = "scheduled" 

40 QUEUED = "queued" 

41 RESTARTING = "restarting" 

42 UP_FOR_RETRY = "up_for_retry" 

43 UP_FOR_RESCHEDULE = "up_for_reschedule" 

44 DEFERRED = "deferred" 

45 

46 def __str__(self) -> str: 

47 return self.value 

48 

49 

50class TaskInstanceState(str, Enum): 

51 """ 

52 All possible states that a Task Instance can be in. 

53 

54 Note that None is also allowed, so always use this in a type hint with Optional. 

55 """ 

56 

57 # The scheduler sets a TaskInstance state to None when it's created but not 

58 # yet run, but we don't list it here since TaskInstance is a string enum. 

59 # Use None instead if need this state. 

60 

61 # Set by the scheduler 

62 REMOVED = TerminalTIState.REMOVED # Task vanished from DAG before it ran 

63 SCHEDULED = IntermediateTIState.SCHEDULED # Task should run and will be handed to executor soon 

64 

65 # Set by the task instance itself 

66 QUEUED = IntermediateTIState.QUEUED # Executor has enqueued the task 

67 RUNNING = "running" # Task is executing 

68 SUCCESS = TerminalTIState.SUCCESS # Task completed 

69 RESTARTING = IntermediateTIState.RESTARTING # External request to restart (e.g. cleared when running) 

70 FAILED = TerminalTIState.FAILED # Task errored out 

71 UP_FOR_RETRY = IntermediateTIState.UP_FOR_RETRY # Task failed but has retries left 

72 UP_FOR_RESCHEDULE = IntermediateTIState.UP_FOR_RESCHEDULE # A waiting `reschedule` sensor 

73 UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed 

74 SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism 

75 DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger 

76 

77 def __str__(self) -> str: 

78 return self.value 

79 

80 

81class DagRunState(str, Enum): 

82 """ 

83 All possible states that a DagRun can be in. 

84 

85 These are "shared" with TaskInstanceState in some parts of the code, 

86 so please ensure that their values always match the ones with the 

87 same name in TaskInstanceState. 

88 """ 

89 

90 QUEUED = "queued" 

91 RUNNING = "running" 

92 SUCCESS = "success" 

93 FAILED = "failed" 

94 

95 def __str__(self) -> str: 

96 return self.value 

97 

98 

99class State: 

100 """Static class with task instance state constants and color methods to avoid hard-coding.""" 

101 

102 # Backwards-compat constants for code that does not yet use the enum 

103 # These first three are shared by DagState and TaskState 

104 SUCCESS = TaskInstanceState.SUCCESS 

105 RUNNING = TaskInstanceState.RUNNING 

106 FAILED = TaskInstanceState.FAILED 

107 

108 # These are TaskState only 

109 NONE = None 

110 REMOVED = TaskInstanceState.REMOVED 

111 SCHEDULED = TaskInstanceState.SCHEDULED 

112 QUEUED = TaskInstanceState.QUEUED 

113 RESTARTING = TaskInstanceState.RESTARTING 

114 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY 

115 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE 

116 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED 

117 SKIPPED = TaskInstanceState.SKIPPED 

118 DEFERRED = TaskInstanceState.DEFERRED 

119 

120 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) 

121 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) 

122 

123 task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState) 

124 

125 dag_states: tuple[DagRunState, ...] = ( 

126 DagRunState.QUEUED, 

127 DagRunState.SUCCESS, 

128 DagRunState.RUNNING, 

129 DagRunState.FAILED, 

130 ) 

131 

132 state_color: dict[TaskInstanceState | None, str] = { 

133 None: "lightblue", 

134 TaskInstanceState.QUEUED: "gray", 

135 TaskInstanceState.RUNNING: "lime", 

136 TaskInstanceState.SUCCESS: "green", 

137 TaskInstanceState.RESTARTING: "violet", 

138 TaskInstanceState.FAILED: "red", 

139 TaskInstanceState.UP_FOR_RETRY: "gold", 

140 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise", 

141 TaskInstanceState.UPSTREAM_FAILED: "orange", 

142 TaskInstanceState.SKIPPED: "hotpink", 

143 TaskInstanceState.REMOVED: "lightgrey", 

144 TaskInstanceState.SCHEDULED: "tan", 

145 TaskInstanceState.DEFERRED: "mediumpurple", 

146 } 

147 

148 @classmethod 

149 def color(cls, state): 

150 """Return color for a state.""" 

151 return cls.state_color.get(state, "white") 

152 

153 @classmethod 

154 def color_fg(cls, state): 

155 """Black&white colors for a state.""" 

156 color = cls.color(state) 

157 if color in ["green", "red"]: 

158 return "white" 

159 return "black" 

160 

161 finished: frozenset[TaskInstanceState] = frozenset( 

162 [ 

163 TaskInstanceState.SUCCESS, 

164 TaskInstanceState.FAILED, 

165 TaskInstanceState.SKIPPED, 

166 TaskInstanceState.UPSTREAM_FAILED, 

167 TaskInstanceState.REMOVED, 

168 ] 

169 ) 

170 """ 

171 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no 

172 further action. 

173 

174 Note that the attempt could have resulted in failure or have been 

175 interrupted; or perhaps never run at all (skip, or upstream_failed) in any 

176 case, it is no longer running. 

177 """ 

178 

179 unfinished: frozenset[TaskInstanceState | None] = frozenset( 

180 [ 

181 None, 

182 TaskInstanceState.SCHEDULED, 

183 TaskInstanceState.QUEUED, 

184 TaskInstanceState.RUNNING, 

185 TaskInstanceState.RESTARTING, 

186 TaskInstanceState.UP_FOR_RETRY, 

187 TaskInstanceState.UP_FOR_RESCHEDULE, 

188 TaskInstanceState.DEFERRED, 

189 ] 

190 ) 

191 """ 

192 A list of states indicating that a task either has not completed 

193 a run or has not even started. 

194 """ 

195 

196 failed_states: frozenset[TaskInstanceState] = frozenset( 

197 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] 

198 ) 

199 """ 

200 A list of states indicating that a task or dag is a failed state. 

201 """ 

202 

203 success_states: frozenset[TaskInstanceState] = frozenset( 

204 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] 

205 ) 

206 """ 

207 A list of states indicating that a task or dag is a success state. 

208 """ 

209 

210 adoptable_states = frozenset( 

211 [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING] 

212 ) 

213 """ 

214 A list of states indicating that a task can be adopted or reset by a scheduler job 

215 if it was queued by another scheduler job that is not running anymore. 

216 """ 

217 

218 

219def __getattr__(name: str): 

220 """Provide backward compatibility for moved classes.""" 

221 if name == "JobState": 

222 import warnings 

223 

224 from airflow.jobs.job import JobState 

225 

226 warnings.warn( 

227 "The `airflow.utils.state.JobState` attribute is deprecated and will be removed in a future version. " 

228 "Please use `airflow.jobs.job.JobState` instead.", 

229 DeprecationWarning, 

230 stacklevel=2, 

231 ) 

232 return JobState 

233 

234 raise AttributeError(f"module 'airflow.utils.state' has no attribute '{name}'")