1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import enum
21from http import HTTPStatus
22from typing import TYPE_CHECKING, Any
23
24from airflow.sdk import TriggerRule
25
26# Re exporting AirflowConfigException from shared configuration
27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
28
29if TYPE_CHECKING:
30 from collections.abc import Collection
31
32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
33 from airflow.sdk.execution_time.comms import ErrorResponse
34
35
36class AirflowException(Exception):
37 """
38 Base class for all Airflow's errors.
39
40 Each custom exception should be derived from this class.
41 """
42
43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR
44
45 def serialize(self):
46 cls = self.__class__
47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
48
49
50class AirflowOptionalProviderFeatureException(AirflowException):
51 """Raise by providers when imports are missing for optional provider features."""
52
53
54class AirflowNotFoundException(AirflowException):
55 """Raise when the requested object/resource is not available in the system."""
56
57 status_code = HTTPStatus.NOT_FOUND
58
59
60class AirflowDagCycleException(AirflowException):
61 """Raise when there is a cycle in Dag definition."""
62
63
64class AirflowRuntimeError(Exception):
65 """Generic Airflow error raised by runtime functions."""
66
67 def __init__(self, error: ErrorResponse):
68 self.error = error
69 super().__init__(f"{error.error.value}: {error.detail}")
70
71
72class AirflowTimetableInvalid(AirflowException):
73 """Raise when a DAG has an invalid timetable."""
74
75
76class ErrorType(enum.Enum):
77 """Error types used in the API client."""
78
79 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
80 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND"
81 XCOM_NOT_FOUND = "XCOM_NOT_FOUND"
82 ASSET_NOT_FOUND = "ASSET_NOT_FOUND"
83 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS"
84 GENERIC_ERROR = "GENERIC_ERROR"
85 API_SERVER_ERROR = "API_SERVER_ERROR"
86
87
88class XComForMappingNotPushed(TypeError):
89 """Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
90
91 def __str__(self) -> str:
92 return "did not push XCom for task mapping"
93
94
95class UnmappableXComTypePushed(TypeError):
96 """Raise when an unmappable type is pushed as a mapped downstream's dependency."""
97
98 def __init__(self, value: Any, *values: Any) -> None:
99 super().__init__(value, *values)
100
101 def __str__(self) -> str:
102 typename = type(self.args[0]).__qualname__
103 for arg in self.args[1:]:
104 typename = f"{typename}[{type(arg).__qualname__}]"
105 return f"unmappable return type {typename!r}"
106
107
108class AirflowFailException(AirflowException):
109 """Raise when the task should be failed without retrying."""
110
111
112class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
113 main_message: str
114
115 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None:
116 self.inactive_asset_keys = inactive_asset_keys
117
118 @staticmethod
119 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
120 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
121
122 if isinstance(key, AssetUniqueKey):
123 return f"Asset(name={key.name!r}, uri={key.uri!r})"
124 if isinstance(key, AssetNameRef):
125 return f"Asset.ref(name={key.name!r})"
126 if isinstance(key, AssetUriRef):
127 return f"Asset.ref(uri={key.uri!r})"
128 return repr(key) # Should not happen, but let's fails more gracefully in an exception.
129
130 def __str__(self) -> str:
131 return f"{self.main_message}: {self.inactive_assets_message}"
132
133 @property
134 def inactive_assets_message(self) -> str:
135 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys)
136
137
138class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption):
139 """Raise when the task is executed with inactive assets in its inlet or outlet."""
140
141 main_message = "Task has the following inactive assets in its inlets or outlets"
142
143
144class AirflowRescheduleException(AirflowException):
145 """
146 Raise when the task should be re-scheduled at a later time.
147
148 :param reschedule_date: The date when the task should be rescheduled
149 """
150
151 def __init__(self, reschedule_date):
152 super().__init__()
153 self.reschedule_date = reschedule_date
154
155 def serialize(self):
156 cls = self.__class__
157 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
158
159
160class AirflowSensorTimeout(AirflowException):
161 """Raise when there is a timeout on sensor polling."""
162
163
164class AirflowSkipException(AirflowException):
165 """Raise when the task should be skipped."""
166
167
168class AirflowTaskTerminated(BaseException):
169 """Raise when the task execution is terminated."""
170
171
172# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
173# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
174# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
175class AirflowTaskTimeout(BaseException):
176 """Raise when the task execution times-out."""
177
178
179class TaskDeferred(BaseException):
180 """
181 Signal an operator moving to deferred state.
182
183 Special exception raised to signal that the operator it was raised from
184 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance
185 directly. If the trigger should end the task instance itself, ``method_name`` does not matter,
186 and can be None; otherwise, provide the name of the method that should be used when
187 resuming execution in the task.
188 """
189
190 def __init__(
191 self,
192 *,
193 trigger,
194 method_name: str,
195 kwargs: dict[str, Any] | None = None,
196 timeout=None,
197 ):
198 super().__init__()
199 self.trigger = trigger
200 self.method_name = method_name
201 self.kwargs = kwargs
202 self.timeout = timeout
203
204 def serialize(self):
205 cls = self.__class__
206 return (
207 f"{cls.__module__}.{cls.__name__}",
208 (),
209 {
210 "trigger": self.trigger,
211 "method_name": self.method_name,
212 "kwargs": self.kwargs,
213 "timeout": self.timeout,
214 },
215 )
216
217 def __repr__(self) -> str:
218 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>"
219
220
221class TaskDeferralError(AirflowException):
222 """Raised when a task failed during deferral for some reason."""
223
224
225class TaskDeferralTimeout(AirflowException):
226 """Raise when there is a timeout on the deferral."""
227
228
229class DagRunTriggerException(AirflowException):
230 """
231 Signal by an operator to trigger a specific Dag Run of a dag.
232
233 Special exception raised to signal that the operator it was raised from wishes to trigger
234 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``.
235 """
236
237 def __init__(
238 self,
239 *,
240 trigger_dag_id: str,
241 dag_run_id: str,
242 conf: dict | None,
243 logical_date=None,
244 reset_dag_run: bool,
245 skip_when_already_exists: bool,
246 wait_for_completion: bool,
247 allowed_states: list[str],
248 failed_states: list[str],
249 poke_interval: int,
250 deferrable: bool,
251 ):
252 super().__init__()
253 self.trigger_dag_id = trigger_dag_id
254 self.dag_run_id = dag_run_id
255 self.conf = conf
256 self.logical_date = logical_date
257 self.reset_dag_run = reset_dag_run
258 self.skip_when_already_exists = skip_when_already_exists
259 self.wait_for_completion = wait_for_completion
260 self.allowed_states = allowed_states
261 self.failed_states = failed_states
262 self.poke_interval = poke_interval
263 self.deferrable = deferrable
264
265
266class DownstreamTasksSkipped(AirflowException):
267 """
268 Signal by an operator to skip its downstream tasks.
269
270 Special exception raised to signal that the operator it was raised from wishes to skip
271 downstream tasks. This is used in the ShortCircuitOperator.
272
273 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip.
274 """
275
276 def __init__(self, *, tasks):
277 super().__init__()
278 self.tasks = tasks
279
280
281class XComNotFound(AirflowException):
282 """Raise when an XCom reference is being resolved against a non-existent XCom."""
283
284 def __init__(self, dag_id: str, task_id: str, key: str) -> None:
285 super().__init__()
286 self.dag_id = dag_id
287 self.task_id = task_id
288 self.key = key
289
290 def __str__(self) -> str:
291 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
292
293 def serialize(self):
294 cls = self.__class__
295 return (
296 f"{cls.__module__}.{cls.__name__}",
297 (),
298 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key},
299 )
300
301
302class ParamValidationError(AirflowException):
303 """Raise when DAG params is invalid."""
304
305
306class DuplicateTaskIdFound(AirflowException):
307 """Raise when a Task with duplicate task_id is defined in the same DAG."""
308
309
310class TaskAlreadyInTaskGroup(AirflowException):
311 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""
312
313 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str):
314 super().__init__(task_id, new_group_id)
315 self.task_id = task_id
316 self.existing_group_id = existing_group_id
317 self.new_group_id = new_group_id
318
319 def __str__(self) -> str:
320 if self.existing_group_id is None:
321 existing_group = "the DAG's root group"
322 else:
323 existing_group = f"group {self.existing_group_id!r}"
324 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
325
326
327class TaskNotFound(AirflowException):
328 """Raise when a Task is not available in the system."""
329
330
331class FailFastDagInvalidTriggerRule(AirflowException):
332 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""
333
334 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)
335
336 @classmethod
337 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
338 """
339 Check that fail_fast dag tasks have allowable trigger rules.
340
341 :meta private:
342 """
343 if fail_fast and trigger_rule not in cls._allowed_rules:
344 raise cls()
345
346 def __str__(self) -> str:
347 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
348
349
350class RemovedInAirflow4Warning(DeprecationWarning):
351 """Issued for usage of deprecated features that will be removed in Airflow4."""
352
353 deprecated_since: str | None = None
354 "Indicates the airflow version that started raising this deprecation warning"