1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import enum
21from http import HTTPStatus
22from typing import TYPE_CHECKING, Any
23
24from airflow.sdk import TriggerRule
25
26if TYPE_CHECKING:
27 from collections.abc import Collection
28
29 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
30 from airflow.sdk.execution_time.comms import ErrorResponse
31
32
33class AirflowException(Exception):
34 """
35 Base class for all Airflow's errors.
36
37 Each custom exception should be derived from this class.
38 """
39
40 status_code = HTTPStatus.INTERNAL_SERVER_ERROR
41
42 def serialize(self):
43 cls = self.__class__
44 return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
45
46
47class AirflowNotFoundException(AirflowException):
48 """Raise when the requested object/resource is not available in the system."""
49
50 status_code = HTTPStatus.NOT_FOUND
51
52
53class AirflowDagCycleException(AirflowException):
54 """Raise when there is a cycle in Dag definition."""
55
56
57class AirflowRuntimeError(Exception):
58 """Generic Airflow error raised by runtime functions."""
59
60 def __init__(self, error: ErrorResponse):
61 self.error = error
62 super().__init__(f"{error.error.value}: {error.detail}")
63
64
65class AirflowTimetableInvalid(AirflowException):
66 """Raise when a DAG has an invalid timetable."""
67
68
69class ErrorType(enum.Enum):
70 """Error types used in the API client."""
71
72 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
73 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND"
74 XCOM_NOT_FOUND = "XCOM_NOT_FOUND"
75 ASSET_NOT_FOUND = "ASSET_NOT_FOUND"
76 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS"
77 GENERIC_ERROR = "GENERIC_ERROR"
78 API_SERVER_ERROR = "API_SERVER_ERROR"
79
80
81class XComForMappingNotPushed(TypeError):
82 """Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
83
84 def __str__(self) -> str:
85 return "did not push XCom for task mapping"
86
87
88class UnmappableXComTypePushed(TypeError):
89 """Raise when an unmappable type is pushed as a mapped downstream's dependency."""
90
91 def __init__(self, value: Any, *values: Any) -> None:
92 super().__init__(value, *values)
93
94 def __str__(self) -> str:
95 typename = type(self.args[0]).__qualname__
96 for arg in self.args[1:]:
97 typename = f"{typename}[{type(arg).__qualname__}]"
98 return f"unmappable return type {typename!r}"
99
100
101class AirflowFailException(AirflowException):
102 """Raise when the task should be failed without retrying."""
103
104
105class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
106 main_message: str
107
108 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None:
109 self.inactive_asset_keys = inactive_asset_keys
110
111 @staticmethod
112 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
113 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
114
115 if isinstance(key, AssetUniqueKey):
116 return f"Asset(name={key.name!r}, uri={key.uri!r})"
117 if isinstance(key, AssetNameRef):
118 return f"Asset.ref(name={key.name!r})"
119 if isinstance(key, AssetUriRef):
120 return f"Asset.ref(uri={key.uri!r})"
121 return repr(key) # Should not happen, but let's fails more gracefully in an exception.
122
123 def __str__(self) -> str:
124 return f"{self.main_message}: {self.inactive_assets_message}"
125
126 @property
127 def inactive_assets_message(self) -> str:
128 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys)
129
130
131class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption):
132 """Raise when the task is executed with inactive assets in its inlet or outlet."""
133
134 main_message = "Task has the following inactive assets in its inlets or outlets"
135
136
137class AirflowRescheduleException(AirflowException):
138 """
139 Raise when the task should be re-scheduled at a later time.
140
141 :param reschedule_date: The date when the task should be rescheduled
142 """
143
144 def __init__(self, reschedule_date):
145 super().__init__()
146 self.reschedule_date = reschedule_date
147
148 def serialize(self):
149 cls = self.__class__
150 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
151
152
153class AirflowSensorTimeout(AirflowException):
154 """Raise when there is a timeout on sensor polling."""
155
156
157class AirflowSkipException(AirflowException):
158 """Raise when the task should be skipped."""
159
160
161class AirflowTaskTerminated(BaseException):
162 """Raise when the task execution is terminated."""
163
164
165# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
166# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
167# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
168class AirflowTaskTimeout(BaseException):
169 """Raise when the task execution times-out."""
170
171
172class TaskDeferred(BaseException):
173 """
174 Signal an operator moving to deferred state.
175
176 Special exception raised to signal that the operator it was raised from
177 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance
178 directly. If the trigger should end the task instance itself, ``method_name`` does not matter,
179 and can be None; otherwise, provide the name of the method that should be used when
180 resuming execution in the task.
181 """
182
183 def __init__(
184 self,
185 *,
186 trigger,
187 method_name: str,
188 kwargs: dict[str, Any] | None = None,
189 timeout=None,
190 ):
191 super().__init__()
192 self.trigger = trigger
193 self.method_name = method_name
194 self.kwargs = kwargs
195 self.timeout = timeout
196
197 def serialize(self):
198 cls = self.__class__
199 return (
200 f"{cls.__module__}.{cls.__name__}",
201 (),
202 {
203 "trigger": self.trigger,
204 "method_name": self.method_name,
205 "kwargs": self.kwargs,
206 "timeout": self.timeout,
207 },
208 )
209
210 def __repr__(self) -> str:
211 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>"
212
213
214class TaskDeferralError(AirflowException):
215 """Raised when a task failed during deferral for some reason."""
216
217
218class TaskDeferralTimeout(AirflowException):
219 """Raise when there is a timeout on the deferral."""
220
221
222class DagRunTriggerException(AirflowException):
223 """
224 Signal by an operator to trigger a specific Dag Run of a dag.
225
226 Special exception raised to signal that the operator it was raised from wishes to trigger
227 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``.
228 """
229
230 def __init__(
231 self,
232 *,
233 trigger_dag_id: str,
234 dag_run_id: str,
235 conf: dict | None,
236 logical_date=None,
237 reset_dag_run: bool,
238 skip_when_already_exists: bool,
239 wait_for_completion: bool,
240 allowed_states: list[str],
241 failed_states: list[str],
242 poke_interval: int,
243 deferrable: bool,
244 ):
245 super().__init__()
246 self.trigger_dag_id = trigger_dag_id
247 self.dag_run_id = dag_run_id
248 self.conf = conf
249 self.logical_date = logical_date
250 self.reset_dag_run = reset_dag_run
251 self.skip_when_already_exists = skip_when_already_exists
252 self.wait_for_completion = wait_for_completion
253 self.allowed_states = allowed_states
254 self.failed_states = failed_states
255 self.poke_interval = poke_interval
256 self.deferrable = deferrable
257
258
259class DownstreamTasksSkipped(AirflowException):
260 """
261 Signal by an operator to skip its downstream tasks.
262
263 Special exception raised to signal that the operator it was raised from wishes to skip
264 downstream tasks. This is used in the ShortCircuitOperator.
265
266 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip.
267 """
268
269 def __init__(self, *, tasks):
270 super().__init__()
271 self.tasks = tasks
272
273
274class XComNotFound(AirflowException):
275 """Raise when an XCom reference is being resolved against a non-existent XCom."""
276
277 def __init__(self, dag_id: str, task_id: str, key: str) -> None:
278 super().__init__()
279 self.dag_id = dag_id
280 self.task_id = task_id
281 self.key = key
282
283 def __str__(self) -> str:
284 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
285
286 def serialize(self):
287 cls = self.__class__
288 return (
289 f"{cls.__module__}.{cls.__name__}",
290 (),
291 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key},
292 )
293
294
295class ParamValidationError(AirflowException):
296 """Raise when DAG params is invalid."""
297
298
299class DuplicateTaskIdFound(AirflowException):
300 """Raise when a Task with duplicate task_id is defined in the same DAG."""
301
302
303class TaskAlreadyInTaskGroup(AirflowException):
304 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""
305
306 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str):
307 super().__init__(task_id, new_group_id)
308 self.task_id = task_id
309 self.existing_group_id = existing_group_id
310 self.new_group_id = new_group_id
311
312 def __str__(self) -> str:
313 if self.existing_group_id is None:
314 existing_group = "the DAG's root group"
315 else:
316 existing_group = f"group {self.existing_group_id!r}"
317 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
318
319
320class TaskNotFound(AirflowException):
321 """Raise when a Task is not available in the system."""
322
323
324class FailFastDagInvalidTriggerRule(AirflowException):
325 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""
326
327 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)
328
329 @classmethod
330 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
331 """
332 Check that fail_fast dag tasks have allowable trigger rules.
333
334 :meta private:
335 """
336 if fail_fast and trigger_rule not in cls._allowed_rules:
337 raise cls()
338
339 def __str__(self) -> str:
340 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
341
342
343class RemovedInAirflow4Warning(DeprecationWarning):
344 """Issued for usage of deprecated features that will be removed in Airflow4."""
345
346 deprecated_since: str | None = None
347 "Indicates the airflow version that started raising this deprecation warning"