Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/state.py: 89%

61 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from enum import Enum 

21 

22from airflow.settings import STATE_COLORS 

23 

24 

25class TaskInstanceState(str, Enum): 

26 """ 

27 Enum that represents all possible states that a Task Instance can be in. 

28 

29 Note that None is also allowed, so always use this in a type hint with Optional. 

30 """ 

31 

32 # The scheduler sets a TaskInstance state to None when it's created but not 

33 # yet run, but we don't list it here since TaskInstance is a string enum. 

34 # Use None instead if need this state. 

35 

36 # Set by the scheduler 

37 REMOVED = "removed" # Task vanished from DAG before it ran 

38 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon 

39 

40 # Set by the task instance itself 

41 QUEUED = "queued" # Executor has enqueued the task 

42 RUNNING = "running" # Task is executing 

43 SUCCESS = "success" # Task completed 

44 SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running) 

45 RESTARTING = "restarting" # External request to restart (e.g. cleared when running) 

46 FAILED = "failed" # Task errored out 

47 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left 

48 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor 

49 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed 

50 SKIPPED = "skipped" # Skipped by branching or some other mechanism 

51 DEFERRED = "deferred" # Deferrable operator waiting on a trigger 

52 

53 def __str__(self) -> str: 

54 return self.value 

55 

56 

57class DagRunState(str, Enum): 

58 """ 

59 Enum that represents all possible states that a DagRun can be in. 

60 

61 These are "shared" with TaskInstanceState in some parts of the code, 

62 so please ensure that their values always match the ones with the 

63 same name in TaskInstanceState. 

64 """ 

65 

66 QUEUED = "queued" 

67 RUNNING = "running" 

68 SUCCESS = "success" 

69 FAILED = "failed" 

70 

71 def __str__(self) -> str: 

72 return self.value 

73 

74 

75class State: 

76 """ 

77 Static class with task instance state constants and color methods to 

78 avoid hardcoding. 

79 """ 

80 

81 # Backwards-compat constants for code that does not yet use the enum 

82 # These first three are shared by DagState and TaskState 

83 SUCCESS = TaskInstanceState.SUCCESS 

84 RUNNING = TaskInstanceState.RUNNING 

85 FAILED = TaskInstanceState.FAILED 

86 

87 # These are TaskState only 

88 NONE = None 

89 REMOVED = TaskInstanceState.REMOVED 

90 SCHEDULED = TaskInstanceState.SCHEDULED 

91 QUEUED = TaskInstanceState.QUEUED 

92 SHUTDOWN = TaskInstanceState.SHUTDOWN 

93 RESTARTING = TaskInstanceState.RESTARTING 

94 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY 

95 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE 

96 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED 

97 SKIPPED = TaskInstanceState.SKIPPED 

98 DEFERRED = TaskInstanceState.DEFERRED 

99 

100 task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState) 

101 

102 dag_states: tuple[DagRunState, ...] = ( 

103 DagRunState.QUEUED, 

104 DagRunState.SUCCESS, 

105 DagRunState.RUNNING, 

106 DagRunState.FAILED, 

107 ) 

108 

109 state_color: dict[TaskInstanceState | None, str] = { 

110 None: "lightblue", 

111 TaskInstanceState.QUEUED: "gray", 

112 TaskInstanceState.RUNNING: "lime", 

113 TaskInstanceState.SUCCESS: "green", 

114 TaskInstanceState.SHUTDOWN: "blue", 

115 TaskInstanceState.RESTARTING: "violet", 

116 TaskInstanceState.FAILED: "red", 

117 TaskInstanceState.UP_FOR_RETRY: "gold", 

118 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise", 

119 TaskInstanceState.UPSTREAM_FAILED: "orange", 

120 TaskInstanceState.SKIPPED: "hotpink", 

121 TaskInstanceState.REMOVED: "lightgrey", 

122 TaskInstanceState.SCHEDULED: "tan", 

123 TaskInstanceState.DEFERRED: "mediumpurple", 

124 } 

125 state_color.update(STATE_COLORS) # type: ignore 

126 

127 @classmethod 

128 def color(cls, state): 

129 """Returns color for a state.""" 

130 return cls.state_color.get(state, "white") 

131 

132 @classmethod 

133 def color_fg(cls, state): 

134 """Black&white colors for a state.""" 

135 color = cls.color(state) 

136 if color in ["green", "red"]: 

137 return "white" 

138 return "black" 

139 

140 running: frozenset[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED]) 

141 """ 

142 A list of states indicating that a task is being executed. 

143 """ 

144 

145 finished: frozenset[TaskInstanceState] = frozenset( 

146 [ 

147 TaskInstanceState.SUCCESS, 

148 TaskInstanceState.FAILED, 

149 TaskInstanceState.SKIPPED, 

150 TaskInstanceState.UPSTREAM_FAILED, 

151 TaskInstanceState.REMOVED, 

152 ] 

153 ) 

154 """ 

155 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no 

156 further action. 

157 

158 Note that the attempt could have resulted in failure or have been 

159 interrupted; or perhaps never run at all (skip, or upstream_failed) in any 

160 case, it is no longer running. 

161 """ 

162 

163 unfinished: frozenset[TaskInstanceState | None] = frozenset( 

164 [ 

165 None, 

166 TaskInstanceState.SCHEDULED, 

167 TaskInstanceState.QUEUED, 

168 TaskInstanceState.RUNNING, 

169 TaskInstanceState.SHUTDOWN, 

170 TaskInstanceState.RESTARTING, 

171 TaskInstanceState.UP_FOR_RETRY, 

172 TaskInstanceState.UP_FOR_RESCHEDULE, 

173 TaskInstanceState.DEFERRED, 

174 ] 

175 ) 

176 """ 

177 A list of states indicating that a task either has not completed 

178 a run or has not even started. 

179 """ 

180 

181 failed_states: frozenset[TaskInstanceState] = frozenset( 

182 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] 

183 ) 

184 """ 

185 A list of states indicating that a task or dag is a failed state. 

186 """ 

187 

188 success_states: frozenset[TaskInstanceState] = frozenset( 

189 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] 

190 ) 

191 """ 

192 A list of states indicating that a task or dag is a success state. 

193 """ 

194 

195 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING]) 

196 """ 

197 A list of states indicating that a task has been terminated. 

198 """