Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/state.py: 88%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from enum import Enum
23class JobState(str, Enum):
24 """All possible states that a Job can be in."""
26 RUNNING = "running"
27 SUCCESS = "success"
28 RESTARTING = "restarting"
29 FAILED = "failed"
31 def __str__(self) -> str:
32 return self.value
35class TaskInstanceState(str, Enum):
36 """All possible states that a Task Instance can be in.
38 Note that None is also allowed, so always use this in a type hint with Optional.
39 """
41 # The scheduler sets a TaskInstance state to None when it's created but not
42 # yet run, but we don't list it here since TaskInstance is a string enum.
43 # Use None instead if need this state.
45 # Set by the scheduler
46 REMOVED = "removed" # Task vanished from DAG before it ran
47 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
49 # Set by the task instance itself
50 QUEUED = "queued" # Executor has enqueued the task
51 RUNNING = "running" # Task is executing
52 SUCCESS = "success" # Task completed
53 RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
54 FAILED = "failed" # Task errored out
55 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
56 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
57 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
58 SKIPPED = "skipped" # Skipped by branching or some other mechanism
59 DEFERRED = "deferred" # Deferrable operator waiting on a trigger
61 # Not used anymore, kept for compatibility.
62 # TODO: Remove in Airflow 3.0.
63 SHUTDOWN = "shutdown"
64 """The task instance is being shut down.
66 :meta private:
67 """
69 def __str__(self) -> str:
70 return self.value
73class DagRunState(str, Enum):
74 """All possible states that a DagRun can be in.
76 These are "shared" with TaskInstanceState in some parts of the code,
77 so please ensure that their values always match the ones with the
78 same name in TaskInstanceState.
79 """
81 QUEUED = "queued"
82 RUNNING = "running"
83 SUCCESS = "success"
84 FAILED = "failed"
86 def __str__(self) -> str:
87 return self.value
90class State:
91 """Static class with task instance state constants and color methods to avoid hard-coding."""
93 # Backwards-compat constants for code that does not yet use the enum
94 # These first three are shared by DagState and TaskState
95 SUCCESS = TaskInstanceState.SUCCESS
96 RUNNING = TaskInstanceState.RUNNING
97 FAILED = TaskInstanceState.FAILED
99 # These are TaskState only
100 NONE = None
101 REMOVED = TaskInstanceState.REMOVED
102 SCHEDULED = TaskInstanceState.SCHEDULED
103 QUEUED = TaskInstanceState.QUEUED
104 RESTARTING = TaskInstanceState.RESTARTING
105 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
106 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
107 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
108 SKIPPED = TaskInstanceState.SKIPPED
109 DEFERRED = TaskInstanceState.DEFERRED
111 # Not used anymore, kept for compatibility.
112 # TODO: Remove in Airflow 3.0.
113 SHUTDOWN = TaskInstanceState.SHUTDOWN
114 """The task instance is being shut down.
116 :meta private:
117 """
119 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
120 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
122 task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)
124 dag_states: tuple[DagRunState, ...] = (
125 DagRunState.QUEUED,
126 DagRunState.SUCCESS,
127 DagRunState.RUNNING,
128 DagRunState.FAILED,
129 )
131 state_color: dict[TaskInstanceState | None, str] = {
132 None: "lightblue",
133 TaskInstanceState.QUEUED: "gray",
134 TaskInstanceState.RUNNING: "lime",
135 TaskInstanceState.SUCCESS: "green",
136 TaskInstanceState.RESTARTING: "violet",
137 TaskInstanceState.FAILED: "red",
138 TaskInstanceState.UP_FOR_RETRY: "gold",
139 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
140 TaskInstanceState.UPSTREAM_FAILED: "orange",
141 TaskInstanceState.SKIPPED: "hotpink",
142 TaskInstanceState.REMOVED: "lightgrey",
143 TaskInstanceState.SCHEDULED: "tan",
144 TaskInstanceState.DEFERRED: "mediumpurple",
145 }
147 @classmethod
148 def color(cls, state):
149 """Return color for a state."""
150 return cls.state_color.get(state, "white")
152 @classmethod
153 def color_fg(cls, state):
154 """Black&white colors for a state."""
155 color = cls.color(state)
156 if color in ["green", "red"]:
157 return "white"
158 return "black"
160 finished: frozenset[TaskInstanceState] = frozenset(
161 [
162 TaskInstanceState.SUCCESS,
163 TaskInstanceState.FAILED,
164 TaskInstanceState.SKIPPED,
165 TaskInstanceState.UPSTREAM_FAILED,
166 TaskInstanceState.REMOVED,
167 ]
168 )
169 """
170 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
171 further action.
173 Note that the attempt could have resulted in failure or have been
174 interrupted; or perhaps never run at all (skip, or upstream_failed) in any
175 case, it is no longer running.
176 """
178 unfinished: frozenset[TaskInstanceState | None] = frozenset(
179 [
180 None,
181 TaskInstanceState.SCHEDULED,
182 TaskInstanceState.QUEUED,
183 TaskInstanceState.RUNNING,
184 TaskInstanceState.RESTARTING,
185 TaskInstanceState.UP_FOR_RETRY,
186 TaskInstanceState.UP_FOR_RESCHEDULE,
187 TaskInstanceState.DEFERRED,
188 ]
189 )
190 """
191 A list of states indicating that a task either has not completed
192 a run or has not even started.
193 """
195 failed_states: frozenset[TaskInstanceState] = frozenset(
196 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
197 )
198 """
199 A list of states indicating that a task or dag is a failed state.
200 """
202 success_states: frozenset[TaskInstanceState] = frozenset(
203 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
204 )
205 """
206 A list of states indicating that a task or dag is a success state.
207 """
209 # Kept for compatibility. DO NOT USE.
210 # TODO: Remove in Airflow 3.0.
211 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
212 """
213 A list of states indicating that a task has been terminated.
215 :meta private:
216 """
218 adoptable_states = frozenset(
219 [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING]
220 )
221 """
222 A list of states indicating that a task can be adopted or reset by a scheduler job
223 if it was queued by another scheduler job that is not running anymore.
224 """