Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18# Note: Any AirflowException raised is expected to cause the TaskInstance
19# to be marked in an ERROR state
20"""Exceptions used by Airflow."""
22from __future__ import annotations
24from http import HTTPStatus
25from typing import TYPE_CHECKING, NamedTuple
27if TYPE_CHECKING:
28 from airflow.models import DagRun
30# Re exporting AirflowConfigException from shared configuration
31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
33try:
34 from airflow.sdk.exceptions import (
35 AirflowException,
36 AirflowNotFoundException,
37 AirflowOptionalProviderFeatureException as AirflowOptionalProviderFeatureException,
38 AirflowRescheduleException as AirflowRescheduleException,
39 AirflowTimetableInvalid as AirflowTimetableInvalid,
40 NodeNotFound as NodeNotFound,
41 ParamValidationError as ParamValidationError,
42 TaskNotFound as TaskNotFound,
43 )
44except ModuleNotFoundError:
45 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
46 # In that case, we define fallback exception classes that mirror the SDK ones.
47 class AirflowException(Exception): # type: ignore[no-redef]
48 """Base exception for Airflow errors."""
50 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef]
51 """Raise when a requested object is not found."""
53 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef]
54 """Raise when a DAG has an invalid timetable."""
56 class TaskNotFound(AirflowException): # type: ignore[no-redef]
57 """Raise when a Task is not available in the system."""
59 class NodeNotFound(TaskNotFound, KeyError): # type: ignore[no-redef]
60 """Raise when attempting to access an invalid node (task or task group) using [] notation."""
62 def __str__(self) -> str:
63 return str(self.args[0]) if self.args else ""
65 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef]
66 """
67 Raise when the task should be re-scheduled at a later time.
69 :param reschedule_date: The date when the task should be rescheduled
70 """
72 def __init__(self, reschedule_date):
73 super().__init__()
74 self.reschedule_date = reschedule_date
76 def serialize(self):
77 cls = self.__class__
78 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
80 class AirflowOptionalProviderFeatureException(AirflowException): # type: ignore[no-redef]
81 """Raise by providers when imports are missing for optional provider features."""
83 class ParamValidationError(AirflowException, ValueError): # type: ignore[no-redef]
84 """Raise when DAG params fail validation."""
87class AirflowBadRequest(AirflowException):
88 """Raise when the application or server cannot handle the request."""
90 status_code = HTTPStatus.BAD_REQUEST
93class InvalidStatsNameException(AirflowException):
94 """Raise when name of the stats is invalid."""
97class AirflowInternalRuntimeError(BaseException):
98 """
99 Airflow Internal runtime error.
101 Indicates that something really terrible happens during the Airflow execution.
103 :meta private:
104 """
107class AirflowDagDuplicatedIdException(AirflowException):
108 """Raise when a DAG's ID is already used by another DAG."""
110 def __init__(self, dag_id: str, incoming: str, existing: str) -> None:
111 super().__init__(dag_id, incoming, existing)
112 self.dag_id = dag_id
113 self.incoming = incoming
114 self.existing = existing
116 def __str__(self) -> str:
117 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
120class AirflowClusterPolicyViolation(AirflowException):
121 """Raise when there is a violation of a Cluster Policy in DAG definition."""
124class AirflowClusterPolicySkipDag(AirflowException):
125 """Raise when skipping dag is needed in Cluster Policy."""
128class AirflowClusterPolicyError(AirflowException):
129 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""
132class DagNotFound(AirflowNotFoundException):
133 """Raise when a DAG is not available in the system."""
136class DagCodeNotFound(AirflowNotFoundException):
137 """Raise when a DAG code is not available in the system."""
140class DagRunNotFound(AirflowNotFoundException):
141 """Raise when a DAG Run is not available in the system."""
144class DagRunAlreadyExists(AirflowBadRequest):
145 """Raise when creating a DAG run for DAG which already has DAG run entry."""
147 def __init__(self, dag_run: DagRun) -> None:
148 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}")
149 self.dag_run = dag_run
151 def serialize(self):
152 cls = self.__class__
153 # Note the DagRun object will be detached here and fails serialization, we need to create a new one
154 from airflow.models import DagRun
156 dag_run = DagRun(
157 state=self.dag_run.state,
158 dag_id=self.dag_run.dag_id,
159 run_id=self.dag_run.run_id,
160 run_type=self.dag_run.run_type,
161 )
162 dag_run.id = self.dag_run.id
163 return (
164 f"{cls.__module__}.{cls.__name__}",
165 (),
166 {"dag_run": dag_run},
167 )
170class SerializationError(AirflowException):
171 """A problem occurred when trying to serialize something."""
174class TaskInstanceNotFound(AirflowNotFoundException):
175 """Raise when a task instance is not available in the system."""
178class NotMapped(Exception):
179 """Raise if a task is neither mapped nor has any parent mapped groups."""
182class PoolNotFound(AirflowNotFoundException):
183 """Raise when a Pool is not available in the system."""
186class FileSyntaxError(NamedTuple):
187 """Information about a single error in a file."""
189 line_no: int | None
190 message: str
192 def __str__(self):
193 return f"{self.message}. Line number: {str(self.line_no)},"
196class AirflowFileParseException(AirflowException):
197 """
198 Raises when connection or variable file can not be parsed.
200 :param msg: The human-readable description of the exception
201 :param file_path: A processed file that contains errors
202 :param parse_errors: File syntax errors
203 """
205 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None:
206 super().__init__(msg)
207 self.msg = msg
208 self.file_path = file_path
209 self.parse_errors = parse_errors
211 def __str__(self):
212 from airflow.utils.code_utils import prepare_code_snippet
213 from airflow.utils.platform import is_tty
215 result = f"{self.msg}\nFilename: {self.file_path}\n\n"
217 for error_no, parse_error in enumerate(self.parse_errors, 1):
218 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n"
219 result += f"{parse_error.message}\n"
220 if parse_error.line_no:
221 result += f"Line number: {parse_error.line_no}\n"
222 if parse_error.line_no and is_tty():
223 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n"
225 return result
228class AirflowUnsupportedFileTypeException(AirflowException):
229 """Raise when a file type is not supported."""
232class ConnectionNotUnique(AirflowException):
233 """Raise when multiple values are found for the same connection ID."""
236class VariableNotUnique(AirflowException):
237 """Raise when multiple values are found for the same variable name."""
240# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider
241# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need
242# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator
243# and it raises one of those exceptions. The code should be backwards compatible even if you import
244# and try/except the exception using direct imports from airflow.exceptions.
245# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception.
246# 2) if you have new provider, both provider and pod generator will throw the
247# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider.
248try:
249 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException
250except ImportError:
252 class PodMutationHookException(AirflowException): # type: ignore[no-redef]
253 """Raised when exception happens during Pod Mutation Hook execution."""
256try:
257 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError
258except ImportError:
260 class PodReconciliationError(AirflowException): # type: ignore[no-redef]
261 """Raised when an error is encountered while trying to merge pod configs."""
264class RemovedInAirflow4Warning(DeprecationWarning):
265 """Issued for usage of deprecated features that will be removed in Airflow4."""
267 deprecated_since: str | None = None
268 "Indicates the airflow version that started raising this deprecation warning"
271class AirflowProviderDeprecationWarning(DeprecationWarning):
272 """Issued for usage of deprecated features of Airflow provider."""
274 deprecated_provider_since: str | None = None
275 "Indicates the provider version that started raising this deprecation warning"
278class DeserializingResultError(ValueError):
279 """Raised when an error is encountered while a pickling library deserializes a pickle file."""
281 def __str__(self):
282 return (
283 "Error deserializing result. Note that result deserialization "
284 "is not supported across major Python versions. Cause: " + str(self.__cause__)
285 )
288class UnknownExecutorException(ValueError):
289 """Raised when an attempt is made to load an executor which is not configured."""
292class DeserializationError(Exception):
293 """
294 Raised when a Dag cannot be deserialized.
296 This exception should be raised using exception chaining:
297 `raise DeserializationError(dag_id) from original_exception`
298 """
300 def __init__(self, dag_id: str | None = None, message: str | None = None):
301 self.dag_id = dag_id
302 if message:
303 # Use custom message if provided
304 super().__init__(message)
305 elif dag_id is None:
306 super().__init__("Missing Dag ID in serialized Dag")
307 else:
308 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'")
311class DagRunTypeNotAllowed(AirflowException):
312 """Raised when a Dag does not allow the requested run type."""
315class AirflowClearRunningTaskException(AirflowException):
316 """Raise when the user attempts to clear currently running tasks."""
319_DEPRECATED_EXCEPTIONS = {
320 "AirflowDagCycleException",
321 "AirflowFailException",
322 "AirflowInactiveAssetInInletOrOutletException",
323 "AirflowSensorTimeout",
324 "AirflowSkipException",
325 "AirflowTaskTerminated",
326 "AirflowTaskTimeout",
327 "DagRunTriggerException",
328 "DownstreamTasksSkipped",
329 "DuplicateTaskIdFound",
330 "FailFastDagInvalidTriggerRule",
331 "ParamValidationError",
332 "TaskAlreadyInTaskGroup",
333 "TaskDeferralError",
334 "TaskDeferralTimeout",
335 "TaskDeferred",
336 "XComNotFound",
337}
340def __getattr__(name: str):
341 """Provide backward compatibility for moved exceptions."""
342 if name in _DEPRECATED_EXCEPTIONS:
343 import warnings
345 from airflow import DeprecatedImportWarning
346 from airflow._shared.module_loading import import_string
348 target_path = f"airflow.sdk.exceptions.{name}"
349 warnings.warn(
350 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.",
351 DeprecatedImportWarning,
352 stacklevel=2,
353 )
354 return import_string(target_path)
355 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")