Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/state.py: 89%
61 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from enum import Enum
22from airflow.settings import STATE_COLORS
25class TaskInstanceState(str, Enum):
26 """
27 Enum that represents all possible states that a Task Instance can be in.
29 Note that None is also allowed, so always use this in a type hint with Optional.
30 """
32 # The scheduler sets a TaskInstance state to None when it's created but not
33 # yet run, but we don't list it here since TaskInstance is a string enum.
34 # Use None instead if need this state.
36 # Set by the scheduler
37 REMOVED = "removed" # Task vanished from DAG before it ran
38 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
40 # Set by the task instance itself
41 QUEUED = "queued" # Executor has enqueued the task
42 RUNNING = "running" # Task is executing
43 SUCCESS = "success" # Task completed
44 SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
45 RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
46 FAILED = "failed" # Task errored out
47 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
48 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
49 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
50 SKIPPED = "skipped" # Skipped by branching or some other mechanism
51 DEFERRED = "deferred" # Deferrable operator waiting on a trigger
53 def __str__(self) -> str:
54 return self.value
57class DagRunState(str, Enum):
58 """
59 Enum that represents all possible states that a DagRun can be in.
61 These are "shared" with TaskInstanceState in some parts of the code,
62 so please ensure that their values always match the ones with the
63 same name in TaskInstanceState.
64 """
66 QUEUED = "queued"
67 RUNNING = "running"
68 SUCCESS = "success"
69 FAILED = "failed"
71 def __str__(self) -> str:
72 return self.value
75class State:
76 """
77 Static class with task instance state constants and color methods to
78 avoid hardcoding.
79 """
81 # Backwards-compat constants for code that does not yet use the enum
82 # These first three are shared by DagState and TaskState
83 SUCCESS = TaskInstanceState.SUCCESS
84 RUNNING = TaskInstanceState.RUNNING
85 FAILED = TaskInstanceState.FAILED
87 # These are TaskState only
88 NONE = None
89 REMOVED = TaskInstanceState.REMOVED
90 SCHEDULED = TaskInstanceState.SCHEDULED
91 QUEUED = TaskInstanceState.QUEUED
92 SHUTDOWN = TaskInstanceState.SHUTDOWN
93 RESTARTING = TaskInstanceState.RESTARTING
94 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
95 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
96 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
97 SKIPPED = TaskInstanceState.SKIPPED
98 DEFERRED = TaskInstanceState.DEFERRED
100 task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState)
102 dag_states: tuple[DagRunState, ...] = (
103 DagRunState.QUEUED,
104 DagRunState.SUCCESS,
105 DagRunState.RUNNING,
106 DagRunState.FAILED,
107 )
109 state_color: dict[TaskInstanceState | None, str] = {
110 None: "lightblue",
111 TaskInstanceState.QUEUED: "gray",
112 TaskInstanceState.RUNNING: "lime",
113 TaskInstanceState.SUCCESS: "green",
114 TaskInstanceState.SHUTDOWN: "blue",
115 TaskInstanceState.RESTARTING: "violet",
116 TaskInstanceState.FAILED: "red",
117 TaskInstanceState.UP_FOR_RETRY: "gold",
118 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
119 TaskInstanceState.UPSTREAM_FAILED: "orange",
120 TaskInstanceState.SKIPPED: "hotpink",
121 TaskInstanceState.REMOVED: "lightgrey",
122 TaskInstanceState.SCHEDULED: "tan",
123 TaskInstanceState.DEFERRED: "mediumpurple",
124 }
125 state_color.update(STATE_COLORS) # type: ignore
127 @classmethod
128 def color(cls, state):
129 """Returns color for a state."""
130 return cls.state_color.get(state, "white")
132 @classmethod
133 def color_fg(cls, state):
134 """Black&white colors for a state."""
135 color = cls.color(state)
136 if color in ["green", "red"]:
137 return "white"
138 return "black"
140 running: frozenset[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED])
141 """
142 A list of states indicating that a task is being executed.
143 """
145 finished: frozenset[TaskInstanceState] = frozenset(
146 [
147 TaskInstanceState.SUCCESS,
148 TaskInstanceState.FAILED,
149 TaskInstanceState.SKIPPED,
150 TaskInstanceState.UPSTREAM_FAILED,
151 TaskInstanceState.REMOVED,
152 ]
153 )
154 """
155 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
156 further action.
158 Note that the attempt could have resulted in failure or have been
159 interrupted; or perhaps never run at all (skip, or upstream_failed) in any
160 case, it is no longer running.
161 """
163 unfinished: frozenset[TaskInstanceState | None] = frozenset(
164 [
165 None,
166 TaskInstanceState.SCHEDULED,
167 TaskInstanceState.QUEUED,
168 TaskInstanceState.RUNNING,
169 TaskInstanceState.SHUTDOWN,
170 TaskInstanceState.RESTARTING,
171 TaskInstanceState.UP_FOR_RETRY,
172 TaskInstanceState.UP_FOR_RESCHEDULE,
173 TaskInstanceState.DEFERRED,
174 ]
175 )
176 """
177 A list of states indicating that a task either has not completed
178 a run or has not even started.
179 """
181 failed_states: frozenset[TaskInstanceState] = frozenset(
182 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
183 )
184 """
185 A list of states indicating that a task or dag is a failed state.
186 """
188 success_states: frozenset[TaskInstanceState] = frozenset(
189 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
190 )
191 """
192 A list of states indicating that a task or dag is a success state.
193 """
195 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
196 """
197 A list of states indicating that a task has been terminated.
198 """