1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import enum
21from http import HTTPStatus
22from typing import TYPE_CHECKING, Any
23
24from airflow.sdk import TriggerRule
25
26# Re exporting AirflowConfigException from shared configuration
27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
28
29if TYPE_CHECKING:
30 from collections.abc import Collection
31
32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
33 from airflow.sdk.execution_time.comms import ErrorResponse
34
35
36class AirflowException(Exception):
37 """
38 Base class for all Airflow's errors.
39
40 Each custom exception should be derived from this class.
41 """
42
43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR
44
45 def serialize(self):
46 cls = self.__class__
47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
48
49
50class AirflowOptionalProviderFeatureException(AirflowException):
51 """Raise by providers when imports are missing for optional provider features."""
52
53
54class AirflowNotFoundException(AirflowException):
55 """Raise when the requested object/resource is not available in the system."""
56
57 status_code = HTTPStatus.NOT_FOUND
58
59
60class AirflowSecretsBackendAccessDenied(PermissionError):
61 """
62 Authoritative deny from a secrets backend; dispatcher must NOT fall through.
63
64 Distinct from a generic ``PermissionError`` (e.g. an incidental filesystem
65 ``OSError``-family raise from inside an unrelated backend) so the
66 secrets-backend dispatcher loops can re-raise only this signal and keep
67 treating other exceptions as "try the next backend".
68 """
69
70
71class AirflowDagCycleException(AirflowException):
72 """Raise when there is a cycle in Dag definition."""
73
74
75class AirflowRuntimeError(Exception):
76 """Generic Airflow error raised by runtime functions."""
77
78 def __init__(self, error: ErrorResponse):
79 self.error = error
80 super().__init__(f"{error.error.value}: {error.detail}")
81
82
83class AirflowTimetableInvalid(AirflowException):
84 """Raise when a DAG has an invalid timetable."""
85
86
87class ErrorType(enum.Enum):
88 """Error types used in the API client."""
89
90 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
91 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND"
92 XCOM_NOT_FOUND = "XCOM_NOT_FOUND"
93 ASSET_NOT_FOUND = "ASSET_NOT_FOUND"
94 TASK_STORE_NOT_FOUND = "TASK_STORE_NOT_FOUND"
95 ASSET_STORE_NOT_FOUND = "ASSET_STORE_NOT_FOUND"
96 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS"
97 # Distinct from API_SERVER_ERROR: signals an explicit 401/403 from the
98 # Execution API. Callers like ExecutionAPISecretsBackend treat this as
99 # a deny rather than a "not found" so the secrets-backend dispatcher
100 # does NOT fall through to a less-restrictive backend (e.g. env vars).
101 PERMISSION_DENIED = "PERMISSION_DENIED"
102 GENERIC_ERROR = "GENERIC_ERROR"
103 API_SERVER_ERROR = "API_SERVER_ERROR"
104
105
106class XComForMappingNotPushed(TypeError):
107 """Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
108
109 def __str__(self) -> str:
110 return "did not push XCom for task mapping"
111
112
113class UnmappableXComTypePushed(TypeError):
114 """Raise when an unmappable type is pushed as a mapped downstream's dependency."""
115
116 def __init__(self, value: Any, *values: Any) -> None:
117 super().__init__(value, *values)
118
119 def __str__(self) -> str:
120 typename = type(self.args[0]).__qualname__
121 for arg in self.args[1:]:
122 typename = f"{typename}[{type(arg).__qualname__}]"
123 return f"unmappable return type {typename!r}"
124
125
126class AirflowFailException(AirflowException):
127 """Raise when the task should be failed without retrying."""
128
129
130class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
131 main_message: str
132
133 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None:
134 self.inactive_asset_keys = inactive_asset_keys
135
136 @staticmethod
137 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
138 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
139
140 if isinstance(key, AssetUniqueKey):
141 return f"Asset(name={key.name!r}, uri={key.uri!r})"
142 if isinstance(key, AssetNameRef):
143 return f"Asset.ref(name={key.name!r})"
144 if isinstance(key, AssetUriRef):
145 return f"Asset.ref(uri={key.uri!r})"
146 return repr(key) # Should not happen, but let's fails more gracefully in an exception.
147
148 def __str__(self) -> str:
149 return f"{self.main_message}: {self.inactive_assets_message}"
150
151 @property
152 def inactive_assets_message(self) -> str:
153 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys)
154
155
156class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption):
157 """Raise when the task is executed with inactive assets in its inlet or outlet."""
158
159 main_message = "Task has the following inactive assets in its inlets or outlets"
160
161
162class AirflowRescheduleException(AirflowException):
163 """
164 Raise when the task should be re-scheduled at a later time.
165
166 :param reschedule_date: The date when the task should be rescheduled
167 """
168
169 def __init__(self, reschedule_date):
170 super().__init__()
171 self.reschedule_date = reschedule_date
172
173 def serialize(self):
174 cls = self.__class__
175 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
176
177
178class AirflowSensorTimeout(AirflowException):
179 """Raise when there is a timeout on sensor polling."""
180
181
182class AirflowSkipException(AirflowException):
183 """Raise when the task should be skipped."""
184
185
186class AirflowTaskTerminated(BaseException):
187 """Raise when the task execution is terminated."""
188
189
190# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
191# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
192# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
193class AirflowTaskTimeout(BaseException):
194 """Raise when the task execution times-out."""
195
196
197class TaskDeferred(BaseException):
198 """
199 Signal an operator moving to deferred state.
200
201 Special exception raised to signal that the operator it was raised from
202 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance
203 directly. If the trigger should end the task instance itself, ``method_name`` does not matter,
204 and can be None; otherwise, provide the name of the method that should be used when
205 resuming execution in the task.
206 """
207
208 def __init__(
209 self,
210 *,
211 trigger,
212 method_name: str,
213 kwargs: dict[str, Any] | None = None,
214 timeout=None,
215 ):
216 super().__init__()
217 self.trigger = trigger
218 self.method_name = method_name
219 self.kwargs = kwargs
220 self.timeout = timeout
221
222 def serialize(self):
223 cls = self.__class__
224 return (
225 f"{cls.__module__}.{cls.__name__}",
226 (),
227 {
228 "trigger": self.trigger,
229 "method_name": self.method_name,
230 "kwargs": self.kwargs,
231 "timeout": self.timeout,
232 },
233 )
234
235 def __repr__(self) -> str:
236 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>"
237
238
239class TaskAwaitingInput(BaseException):
240 """
241 Signal an operator parking the task in awaiting_input state (Human-in-the-loop).
242
243 Raised to signal that the operator wishes to pause until external human input arrives,
244 WITHOUT creating a trigger or involving the triggerer. Resumption is driven by the Core API
245 response handler (or the scheduler timeout sweep) flipping the task instance back to SCHEDULED
246 with ``next_method`` / ``next_kwargs`` intact, after which the worker calls
247 ``resume_execution(method_name, kwargs)``.
248
249 Subclasses ``BaseException`` (like ``TaskDeferred``) so that a user ``except Exception`` in
250 ``execute()`` cannot accidentally swallow the park signal.
251 """
252
253 def __init__(
254 self,
255 *,
256 method_name: str,
257 kwargs: dict[str, Any] | None = None,
258 timeout=None,
259 ):
260 super().__init__()
261 self.method_name = method_name
262 self.kwargs = kwargs
263 self.timeout = timeout
264
265 def serialize(self):
266 cls = self.__class__
267 return (
268 f"{cls.__module__}.{cls.__name__}",
269 (),
270 {
271 "method_name": self.method_name,
272 "kwargs": self.kwargs,
273 "timeout": self.timeout,
274 },
275 )
276
277 def __repr__(self) -> str:
278 return f"<TaskAwaitingInput method={self.method_name}>"
279
280
281class TaskDeferralError(AirflowException):
282 """Raised when a task failed during deferral for some reason."""
283
284
285class TaskDeferralTimeout(AirflowException):
286 """Raise when there is a timeout on the deferral."""
287
288
289class DagRunTriggerException(AirflowException):
290 """
291 Signal by an operator to trigger a specific Dag Run of a dag.
292
293 Special exception raised to signal that the operator it was raised from wishes to trigger
294 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``.
295 """
296
297 def __init__(
298 self,
299 *,
300 trigger_dag_id: str,
301 dag_run_id: str,
302 conf: dict | None,
303 logical_date=None,
304 run_after=None,
305 reset_dag_run: bool,
306 skip_when_already_exists: bool,
307 wait_for_completion: bool,
308 allowed_states: list[str],
309 failed_states: list[str],
310 poke_interval: int,
311 deferrable: bool,
312 note: str | None = None,
313 ):
314 super().__init__()
315 self.trigger_dag_id = trigger_dag_id
316 self.dag_run_id = dag_run_id
317 self.conf = conf
318 self.logical_date = logical_date
319 self.run_after = run_after
320 self.reset_dag_run = reset_dag_run
321 self.skip_when_already_exists = skip_when_already_exists
322 self.wait_for_completion = wait_for_completion
323 self.allowed_states = allowed_states
324 self.failed_states = failed_states
325 self.poke_interval = poke_interval
326 self.deferrable = deferrable
327 self.note = note
328
329
330class DownstreamTasksSkipped(AirflowException):
331 """
332 Signal by an operator to skip its downstream tasks.
333
334 Special exception raised to signal that the operator it was raised from wishes to skip
335 downstream tasks. This is used in the ShortCircuitOperator.
336
337 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip.
338 """
339
340 def __init__(self, *, tasks):
341 super().__init__()
342 self.tasks = tasks
343
344
345class XComNotFound(AirflowException):
346 """Raise when an XCom reference is being resolved against a non-existent XCom."""
347
348 def __init__(self, dag_id: str, task_id: str, key: str) -> None:
349 super().__init__()
350 self.dag_id = dag_id
351 self.task_id = task_id
352 self.key = key
353
354 def __str__(self) -> str:
355 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
356
357 def serialize(self):
358 cls = self.__class__
359 return (
360 f"{cls.__module__}.{cls.__name__}",
361 (),
362 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key},
363 )
364
365
366class ParamValidationError(AirflowException, ValueError):
367 """Raise when DAG params is invalid."""
368
369
370class DuplicateTaskIdFound(AirflowException):
371 """Raise when a Task with duplicate task_id is defined in the same DAG."""
372
373
374class TaskAlreadyInTaskGroup(AirflowException):
375 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""
376
377 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str):
378 super().__init__(task_id, new_group_id)
379 self.task_id = task_id
380 self.existing_group_id = existing_group_id
381 self.new_group_id = new_group_id
382
383 def __str__(self) -> str:
384 if self.existing_group_id is None:
385 existing_group = "the DAG's root group"
386 else:
387 existing_group = f"group {self.existing_group_id!r}"
388 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
389
390
391class TaskNotFound(AirflowException):
392 """Raise when a Task is not available in the system."""
393
394
395class NodeNotFound(TaskNotFound, KeyError):
396 """Raise when attempting to access an invalid node (task or task group) using [] notation."""
397
398 def __str__(self) -> str:
399 return str(self.args[0]) if self.args else ""
400
401
402class TaskAlreadyRunningError(AirflowException):
403 """Raised when a task is already running on another worker."""
404
405
406class FailFastDagInvalidTriggerRule(AirflowException):
407 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule."""
408
409 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)
410
411 @classmethod
412 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
413 """
414 Check that fail_fast dag tasks have allowable trigger rules.
415
416 :meta private:
417 """
418 if fail_fast and trigger_rule not in cls._allowed_rules:
419 raise cls()
420
421 def __str__(self) -> str:
422 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
423
424
425class RemovedInAirflow4Warning(DeprecationWarning):
426 """Issued for usage of deprecated features that will be removed in Airflow4."""
427
428 deprecated_since: str | None = None
429 "Indicates the airflow version that started raising this deprecation warning"