1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import enum
21from http import HTTPStatus
22from typing import TYPE_CHECKING, Any
23
24from airflow.sdk import TriggerRule
25
26# Re exporting AirflowConfigException from shared configuration
27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
28
29if TYPE_CHECKING:
30 from collections.abc import Collection
31
32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
33 from airflow.sdk.execution_time.comms import ErrorResponse
34
35
36class AirflowException(Exception):
37 """
38 Base class for all Airflow's errors.
39
40 Each custom exception should be derived from this class.
41 """
42
43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR
44
45 def serialize(self):
46 cls = self.__class__
47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
48
49
50class AirflowOptionalProviderFeatureException(AirflowException):
51 """Raise by providers when imports are missing for optional provider features."""
52
53
54class AirflowNotFoundException(AirflowException):
55 """Raise when the requested object/resource is not available in the system."""
56
57 status_code = HTTPStatus.NOT_FOUND
58
59
60class AirflowDagCycleException(AirflowException):
61 """Raise when there is a cycle in Dag definition."""
62
63
64class AirflowRuntimeError(Exception):
65 """Generic Airflow error raised by runtime functions."""
66
67 def __init__(self, error: ErrorResponse):
68 self.error = error
69 super().__init__(f"{error.error.value}: {error.detail}")
70
71
72class AirflowTimetableInvalid(AirflowException):
73 """Raise when a DAG has an invalid timetable."""
74
75
76class ErrorType(enum.Enum):
77 """Error types used in the API client."""
78
79 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
80 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND"
81 XCOM_NOT_FOUND = "XCOM_NOT_FOUND"
82 ASSET_NOT_FOUND = "ASSET_NOT_FOUND"
83 TASK_STATE_NOT_FOUND = "TASK_STATE_NOT_FOUND"
84 ASSET_STATE_NOT_FOUND = "ASSET_STATE_NOT_FOUND"
85 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS"
86 GENERIC_ERROR = "GENERIC_ERROR"
87 API_SERVER_ERROR = "API_SERVER_ERROR"
88
89
90class XComForMappingNotPushed(TypeError):
91 """Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
92
93 def __str__(self) -> str:
94 return "did not push XCom for task mapping"
95
96
97class UnmappableXComTypePushed(TypeError):
98 """Raise when an unmappable type is pushed as a mapped downstream's dependency."""
99
100 def __init__(self, value: Any, *values: Any) -> None:
101 super().__init__(value, *values)
102
103 def __str__(self) -> str:
104 typename = type(self.args[0]).__qualname__
105 for arg in self.args[1:]:
106 typename = f"{typename}[{type(arg).__qualname__}]"
107 return f"unmappable return type {typename!r}"
108
109
110class AirflowFailException(AirflowException):
111 """Raise when the task should be failed without retrying."""
112
113
114class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
115 main_message: str
116
117 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None:
118 self.inactive_asset_keys = inactive_asset_keys
119
120 @staticmethod
121 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
122 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
123
124 if isinstance(key, AssetUniqueKey):
125 return f"Asset(name={key.name!r}, uri={key.uri!r})"
126 if isinstance(key, AssetNameRef):
127 return f"Asset.ref(name={key.name!r})"
128 if isinstance(key, AssetUriRef):
129 return f"Asset.ref(uri={key.uri!r})"
130 return repr(key) # Should not happen, but let's fails more gracefully in an exception.
131
132 def __str__(self) -> str:
133 return f"{self.main_message}: {self.inactive_assets_message}"
134
135 @property
136 def inactive_assets_message(self) -> str:
137 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys)
138
139
140class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption):
141 """Raise when the task is executed with inactive assets in its inlet or outlet."""
142
143 main_message = "Task has the following inactive assets in its inlets or outlets"
144
145
146class AirflowRescheduleException(AirflowException):
147 """
148 Raise when the task should be re-scheduled at a later time.
149
150 :param reschedule_date: The date when the task should be rescheduled
151 """
152
153 def __init__(self, reschedule_date):
154 super().__init__()
155 self.reschedule_date = reschedule_date
156
157 def serialize(self):
158 cls = self.__class__
159 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
160
161
162class AirflowSensorTimeout(AirflowException):
163 """Raise when there is a timeout on sensor polling."""
164
165
166class AirflowSkipException(AirflowException):
167 """Raise when the task should be skipped."""
168
169
170class AirflowTaskTerminated(BaseException):
171 """Raise when the task execution is terminated."""
172
173
174# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
175# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
176# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
177class AirflowTaskTimeout(BaseException):
178 """Raise when the task execution times-out."""
179
180
181class TaskDeferred(BaseException):
182 """
183 Signal an operator moving to deferred state.
184
185 Special exception raised to signal that the operator it was raised from
186 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance
187 directly. If the trigger should end the task instance itself, ``method_name`` does not matter,
188 and can be None; otherwise, provide the name of the method that should be used when
189 resuming execution in the task.
190 """
191
192 def __init__(
193 self,
194 *,
195 trigger,
196 method_name: str,
197 kwargs: dict[str, Any] | None = None,
198 timeout=None,
199 ):
200 super().__init__()
201 self.trigger = trigger
202 self.method_name = method_name
203 self.kwargs = kwargs
204 self.timeout = timeout
205
206 def serialize(self):
207 cls = self.__class__
208 return (
209 f"{cls.__module__}.{cls.__name__}",
210 (),
211 {
212 "trigger": self.trigger,
213 "method_name": self.method_name,
214 "kwargs": self.kwargs,
215 "timeout": self.timeout,
216 },
217 )
218
219 def __repr__(self) -> str:
220 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>"
221
222
223class TaskDeferralError(AirflowException):
224 """Raised when a task failed during deferral for some reason."""
225
226
227class TaskDeferralTimeout(AirflowException):
228 """Raise when there is a timeout on the deferral."""
229
230
231class DagRunTriggerException(AirflowException):
232 """
233 Signal by an operator to trigger a specific Dag Run of a dag.
234
235 Special exception raised to signal that the operator it was raised from wishes to trigger
236 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``.
237 """
238
239 def __init__(
240 self,
241 *,
242 trigger_dag_id: str,
243 dag_run_id: str,
244 conf: dict | None,
245 logical_date=None,
246 run_after=None,
247 reset_dag_run: bool,
248 skip_when_already_exists: bool,
249 wait_for_completion: bool,
250 allowed_states: list[str],
251 failed_states: list[str],
252 poke_interval: int,
253 deferrable: bool,
254 note: str | None = None,
255 ):
256 super().__init__()
257 self.trigger_dag_id = trigger_dag_id
258 self.dag_run_id = dag_run_id
259 self.conf = conf
260 self.logical_date = logical_date
261 self.run_after = run_after
262 self.reset_dag_run = reset_dag_run
263 self.skip_when_already_exists = skip_when_already_exists
264 self.wait_for_completion = wait_for_completion
265 self.allowed_states = allowed_states
266 self.failed_states = failed_states
267 self.poke_interval = poke_interval
268 self.deferrable = deferrable
269 self.note = note
270
271
272class DownstreamTasksSkipped(AirflowException):
273 """
274 Signal by an operator to skip its downstream tasks.
275
276 Special exception raised to signal that the operator it was raised from wishes to skip
277 downstream tasks. This is used in the ShortCircuitOperator.
278
279 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip.
280 """
281
282 def __init__(self, *, tasks):
283 super().__init__()
284 self.tasks = tasks
285
286
287class XComNotFound(AirflowException):
288 """Raise when an XCom reference is being resolved against a non-existent XCom."""
289
290 def __init__(self, dag_id: str, task_id: str, key: str) -> None:
291 super().__init__()
292 self.dag_id = dag_id
293 self.task_id = task_id
294 self.key = key
295
296 def __str__(self) -> str:
297 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
298
299 def serialize(self):
300 cls = self.__class__
301 return (
302 f"{cls.__module__}.{cls.__name__}",
303 (),
304 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key},
305 )
306
307
308class ParamValidationError(AirflowException, ValueError):
309 """Raise when DAG params is invalid."""
310
311
312class DuplicateTaskIdFound(AirflowException):
313 """Raise when a Task with duplicate task_id is defined in the same DAG."""
314
315
316class TaskAlreadyInTaskGroup(AirflowException):
317 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""
318
319 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str):
320 super().__init__(task_id, new_group_id)
321 self.task_id = task_id
322 self.existing_group_id = existing_group_id
323 self.new_group_id = new_group_id
324
325 def __str__(self) -> str:
326 if self.existing_group_id is None:
327 existing_group = "the DAG's root group"
328 else:
329 existing_group = f"group {self.existing_group_id!r}"
330 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
331
332
333class TaskNotFound(AirflowException):
334 """Raise when a Task is not available in the system."""
335
336
337class NodeNotFound(TaskNotFound, KeyError):
338 """Raise when attempting to access an invalid node (task or task group) using [] notation."""
339
340 def __str__(self) -> str:
341 return str(self.args[0]) if self.args else ""
342
343
344class TaskAlreadyRunningError(AirflowException):
345 """Raised when a task is already running on another worker."""
346
347
348class FailFastDagInvalidTriggerRule(AirflowException):
349 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""
350
351 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)
352
353 @classmethod
354 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
355 """
356 Check that fail_fast dag tasks have allowable trigger rules.
357
358 :meta private:
359 """
360 if fail_fast and trigger_rule not in cls._allowed_rules:
361 raise cls()
362
363 def __str__(self) -> str:
364 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
365
366
367class RemovedInAirflow4Warning(DeprecationWarning):
368 """Issued for usage of deprecated features that will be removed in Airflow4."""
369
370 deprecated_since: str | None = None
371 "Indicates the airflow version that started raising this deprecation warning"