Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/state.py: 86%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from enum import Enum
23class TerminalTIState(str, Enum):
24 """States that a Task Instance can be in that indicate it has reached a terminal state."""
26 SUCCESS = "success"
27 FAILED = "failed"
28 SKIPPED = "skipped" # A user can raise a AirflowSkipException from a task & it will be marked as skipped
29 UPSTREAM_FAILED = "upstream_failed"
30 REMOVED = "removed"
32 def __str__(self) -> str:
33 return self.value
36class IntermediateTIState(str, Enum):
37 """States that a Task Instance can be in that indicate it is not yet in a terminal or running state."""
39 SCHEDULED = "scheduled"
40 QUEUED = "queued"
41 RESTARTING = "restarting"
42 UP_FOR_RETRY = "up_for_retry"
43 UP_FOR_RESCHEDULE = "up_for_reschedule"
44 DEFERRED = "deferred"
46 def __str__(self) -> str:
47 return self.value
50class TaskInstanceState(str, Enum):
51 """
52 All possible states that a Task Instance can be in.
54 Note that None is also allowed, so always use this in a type hint with Optional.
55 """
57 # The scheduler sets a TaskInstance state to None when it's created but not
58 # yet run, but we don't list it here since TaskInstance is a string enum.
59 # Use None instead if need this state.
61 # Set by the scheduler
62 REMOVED = TerminalTIState.REMOVED # Task vanished from DAG before it ran
63 SCHEDULED = IntermediateTIState.SCHEDULED # Task should run and will be handed to executor soon
65 # Set by the task instance itself
66 QUEUED = IntermediateTIState.QUEUED # Executor has enqueued the task
67 RUNNING = "running" # Task is executing
68 SUCCESS = TerminalTIState.SUCCESS # Task completed
69 RESTARTING = IntermediateTIState.RESTARTING # External request to restart (e.g. cleared when running)
70 FAILED = TerminalTIState.FAILED # Task errored out
71 UP_FOR_RETRY = IntermediateTIState.UP_FOR_RETRY # Task failed but has retries left
72 UP_FOR_RESCHEDULE = IntermediateTIState.UP_FOR_RESCHEDULE # A waiting `reschedule` sensor
73 UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed
74 SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism
75 DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger
77 def __str__(self) -> str:
78 return self.value
81class DagRunState(str, Enum):
82 """
83 All possible states that a DagRun can be in.
85 These are "shared" with TaskInstanceState in some parts of the code,
86 so please ensure that their values always match the ones with the
87 same name in TaskInstanceState.
88 """
90 QUEUED = "queued"
91 RUNNING = "running"
92 SUCCESS = "success"
93 FAILED = "failed"
95 def __str__(self) -> str:
96 return self.value
99class State:
100 """Static class with task instance state constants and color methods to avoid hard-coding."""
102 # Backwards-compat constants for code that does not yet use the enum
103 # These first three are shared by DagState and TaskState
104 SUCCESS = TaskInstanceState.SUCCESS
105 RUNNING = TaskInstanceState.RUNNING
106 FAILED = TaskInstanceState.FAILED
108 # These are TaskState only
109 NONE = None
110 REMOVED = TaskInstanceState.REMOVED
111 SCHEDULED = TaskInstanceState.SCHEDULED
112 QUEUED = TaskInstanceState.QUEUED
113 RESTARTING = TaskInstanceState.RESTARTING
114 UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
115 UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
116 UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
117 SKIPPED = TaskInstanceState.SKIPPED
118 DEFERRED = TaskInstanceState.DEFERRED
120 finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
121 unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
123 task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)
125 dag_states: tuple[DagRunState, ...] = (
126 DagRunState.QUEUED,
127 DagRunState.SUCCESS,
128 DagRunState.RUNNING,
129 DagRunState.FAILED,
130 )
132 state_color: dict[TaskInstanceState | None, str] = {
133 None: "lightblue",
134 TaskInstanceState.QUEUED: "gray",
135 TaskInstanceState.RUNNING: "lime",
136 TaskInstanceState.SUCCESS: "green",
137 TaskInstanceState.RESTARTING: "violet",
138 TaskInstanceState.FAILED: "red",
139 TaskInstanceState.UP_FOR_RETRY: "gold",
140 TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
141 TaskInstanceState.UPSTREAM_FAILED: "orange",
142 TaskInstanceState.SKIPPED: "hotpink",
143 TaskInstanceState.REMOVED: "lightgrey",
144 TaskInstanceState.SCHEDULED: "tan",
145 TaskInstanceState.DEFERRED: "mediumpurple",
146 }
148 @classmethod
149 def color(cls, state):
150 """Return color for a state."""
151 return cls.state_color.get(state, "white")
153 @classmethod
154 def color_fg(cls, state):
155 """Black&white colors for a state."""
156 color = cls.color(state)
157 if color in ["green", "red"]:
158 return "white"
159 return "black"
161 finished: frozenset[TaskInstanceState] = frozenset(
162 [
163 TaskInstanceState.SUCCESS,
164 TaskInstanceState.FAILED,
165 TaskInstanceState.SKIPPED,
166 TaskInstanceState.UPSTREAM_FAILED,
167 TaskInstanceState.REMOVED,
168 ]
169 )
170 """
171 A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
172 further action.
174 Note that the attempt could have resulted in failure or have been
175 interrupted; or perhaps never run at all (skip, or upstream_failed) in any
176 case, it is no longer running.
177 """
179 unfinished: frozenset[TaskInstanceState | None] = frozenset(
180 [
181 None,
182 TaskInstanceState.SCHEDULED,
183 TaskInstanceState.QUEUED,
184 TaskInstanceState.RUNNING,
185 TaskInstanceState.RESTARTING,
186 TaskInstanceState.UP_FOR_RETRY,
187 TaskInstanceState.UP_FOR_RESCHEDULE,
188 TaskInstanceState.DEFERRED,
189 ]
190 )
191 """
192 A list of states indicating that a task either has not completed
193 a run or has not even started.
194 """
196 failed_states: frozenset[TaskInstanceState] = frozenset(
197 [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
198 )
199 """
200 A list of states indicating that a task or dag is a failed state.
201 """
203 success_states: frozenset[TaskInstanceState] = frozenset(
204 [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
205 )
206 """
207 A list of states indicating that a task or dag is a success state.
208 """
210 adoptable_states = frozenset(
211 [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING]
212 )
213 """
214 A list of states indicating that a task can be adopted or reset by a scheduler job
215 if it was queued by another scheduler job that is not running anymore.
216 """
219def __getattr__(name: str):
220 """Provide backward compatibility for moved classes."""
221 if name == "JobState":
222 import warnings
224 from airflow.jobs.job import JobState
226 warnings.warn(
227 "The `airflow.utils.state.JobState` attribute is deprecated and will be removed in a future version. "
228 "Please use `airflow.jobs.job.JobState` instead.",
229 DeprecationWarning,
230 stacklevel=2,
231 )
232 return JobState
234 raise AttributeError(f"module 'airflow.utils.state' has no attribute '{name}'")