Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/state.py: 88%
60 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from enum import Enum
23class TaskInstanceState(str, Enum):
24 """
25 Enum that represents all possible states that a Task Instance can be in.
27 Note that None is also allowed, so always use this in a type hint with Optional.
28 """
30 # The scheduler sets a TaskInstance state to None when it's created but not
31 # yet run, but we don't list it here since TaskInstance is a string enum.
32 # Use None instead if need this state.
34 # Set by the scheduler
35 REMOVED = "removed" # Task vanished from DAG before it ran
36 SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
38 # Set by the task instance itself
39 QUEUED = "queued" # Executor has enqueued the task
40 RUNNING = "running" # Task is executing
41 SUCCESS = "success" # Task completed
42 SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
43 RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
44 FAILED = "failed" # Task errored out
45 UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
46 UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
47 UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
48 SKIPPED = "skipped" # Skipped by branching or some other mechanism
49 DEFERRED = "deferred" # Deferrable operator waiting on a trigger
51 def __str__(self) -> str:
52 return self.value
55class DagRunState(str, Enum):
56 """
57 Enum that represents all possible states that a DagRun can be in.
59 These are "shared" with TaskInstanceState in some parts of the code,
60 so please ensure that their values always match the ones with the
61 same name in TaskInstanceState.
62 """
64 QUEUED = "queued"
65 RUNNING = "running"
66 SUCCESS = "success"
67 FAILED = "failed"
69 def __str__(self) -> str:
70 return self.value
73class State:
74 """
75 Static class with task instance state constants and color methods to
76 avoid hardcoding.
77 """
79 # Backwards-compat constants for code that does not yet use the enum
80 # These first three are shared by DagState and TaskState
81 SUCCESS = TaskInstanceState.SUCCESS
82 RUNNING = TaskInstanceState.RUNNING
83 FAILED = TaskInstanceState.FAILED
85 # These are TaskState only
86 NONE = None
87 REMOVED = TaskInstanceState.REMOVED
88 SCHEDULED = TaskInstanceState.SCHEDULED
89 QUEUED = TaskInstanceState.QUEUED
90 SHUTDOWN = TaskInstanceState.SHUTDOWN
91 RESTARTING = TaskInstanceState.RESTARTING
92 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
93 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
94 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
95 SKIPPED = TaskInstanceState.SKIPPED
96 DEFERRED = TaskInstanceState.DEFERRED
98 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
99 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
101 task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState)
103 dag_states: tuple[DagRunState, ...] = (
104 DagRunState.QUEUED,
105 DagRunState.SUCCESS,
106 DagRunState.RUNNING,
107 DagRunState.FAILED,
108 )
110 state_color: dict[TaskInstanceState | None, str] = {
111 None: "lightblue",
112 TaskInstanceState.QUEUED: "gray",
113 TaskInstanceState.RUNNING: "lime",
114 TaskInstanceState.SUCCESS: "green",
115 TaskInstanceState.SHUTDOWN: "blue",
116 TaskInstanceState.RESTARTING: "violet",
117 TaskInstanceState.FAILED: "red",
118 TaskInstanceState.UP_FOR_RETRY: "gold",
119 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
120 TaskInstanceState.UPSTREAM_FAILED: "orange",
121 TaskInstanceState.SKIPPED: "hotpink",
122 TaskInstanceState.REMOVED: "lightgrey",
123 TaskInstanceState.SCHEDULED: "tan",
124 TaskInstanceState.DEFERRED: "mediumpurple",
125 }
127 @classmethod
128 def color(cls, state):
129 """Returns color for a state."""
130 return cls.state_color.get(state, "white")
132 @classmethod
133 def color_fg(cls, state):
134 """Black&white colors for a state."""
135 color = cls.color(state)
136 if color in ["green", "red"]:
137 return "white"
138 return "black"
140 finished: frozenset[TaskInstanceState] = frozenset(
141 [
142 TaskInstanceState.SUCCESS,
143 TaskInstanceState.FAILED,
144 TaskInstanceState.SKIPPED,
145 TaskInstanceState.UPSTREAM_FAILED,
146 TaskInstanceState.REMOVED,
147 ]
148 )
149 """
150 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
151 further action.
153 Note that the attempt could have resulted in failure or have been
154 interrupted; or perhaps never run at all (skip, or upstream_failed) in any
155 case, it is no longer running.
156 """
158 unfinished: frozenset[TaskInstanceState | None] = frozenset(
159 [
160 None,
161 TaskInstanceState.SCHEDULED,
162 TaskInstanceState.QUEUED,
163 TaskInstanceState.RUNNING,
164 TaskInstanceState.SHUTDOWN,
165 TaskInstanceState.RESTARTING,
166 TaskInstanceState.UP_FOR_RETRY,
167 TaskInstanceState.UP_FOR_RESCHEDULE,
168 TaskInstanceState.DEFERRED,
169 ]
170 )
171 """
172 A list of states indicating that a task either has not completed
173 a run or has not even started.
174 """
176 failed_states: frozenset[TaskInstanceState] = frozenset(
177 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
178 )
179 """
180 A list of states indicating that a task or dag is a failed state.
181 """
183 success_states: frozenset[TaskInstanceState] = frozenset(
184 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
185 )
186 """
187 A list of states indicating that a task or dag is a success state.
188 """
190 terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
191 """
192 A list of states indicating that a task has been terminated.
193 """