Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18# Note: Any AirflowException raised is expected to cause the TaskInstance
19# to be marked in an ERROR state
20"""Exceptions used by Airflow."""
22from __future__ import annotations
24from http import HTTPStatus
25from typing import TYPE_CHECKING, NamedTuple
27if TYPE_CHECKING:
28 from airflow.models import DagRun
30# Re exporting AirflowConfigException from shared configuration
31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
33try:
34 from airflow.sdk.exceptions import (
35 AirflowException,
36 AirflowNotFoundException,
37 AirflowRescheduleException as AirflowRescheduleException,
38 AirflowTimetableInvalid as AirflowTimetableInvalid,
39 TaskNotFound as TaskNotFound,
40 )
41except ModuleNotFoundError:
42 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
43 # In that case, we define fallback exception classes that mirror the SDK ones.
44 class AirflowException(Exception): # type: ignore[no-redef]
45 """Base exception for Airflow errors."""
47 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef]
48 """Raise when a requested object is not found."""
50 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef]
51 """Raise when a DAG has an invalid timetable."""
53 class TaskNotFound(AirflowException): # type: ignore[no-redef]
54 """Raise when a Task is not available in the system."""
56 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef]
57 """
58 Raise when the task should be re-scheduled at a later time.
60 :param reschedule_date: The date when the task should be rescheduled
61 """
63 def __init__(self, reschedule_date):
64 super().__init__()
65 self.reschedule_date = reschedule_date
67 def serialize(self):
68 cls = self.__class__
69 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
72class AirflowBadRequest(AirflowException):
73 """Raise when the application or server cannot handle the request."""
75 status_code = HTTPStatus.BAD_REQUEST
78class InvalidStatsNameException(AirflowException):
79 """Raise when name of the stats is invalid."""
82class AirflowOptionalProviderFeatureException(AirflowException):
83 """Raise by providers when imports are missing for optional provider features."""
86class AirflowInternalRuntimeError(BaseException):
87 """
88 Airflow Internal runtime error.
90 Indicates that something really terrible happens during the Airflow execution.
92 :meta private:
93 """
96class AirflowDagDuplicatedIdException(AirflowException):
97 """Raise when a DAG's ID is already used by another DAG."""
99 def __init__(self, dag_id: str, incoming: str, existing: str) -> None:
100 super().__init__(dag_id, incoming, existing)
101 self.dag_id = dag_id
102 self.incoming = incoming
103 self.existing = existing
105 def __str__(self) -> str:
106 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
109class AirflowClusterPolicyViolation(AirflowException):
110 """Raise when there is a violation of a Cluster Policy in DAG definition."""
113class AirflowClusterPolicySkipDag(AirflowException):
114 """Raise when skipping dag is needed in Cluster Policy."""
117class AirflowClusterPolicyError(AirflowException):
118 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""
121class DagNotFound(AirflowNotFoundException):
122 """Raise when a DAG is not available in the system."""
125class DagCodeNotFound(AirflowNotFoundException):
126 """Raise when a DAG code is not available in the system."""
129class DagRunNotFound(AirflowNotFoundException):
130 """Raise when a DAG Run is not available in the system."""
133class DagRunAlreadyExists(AirflowBadRequest):
134 """Raise when creating a DAG run for DAG which already has DAG run entry."""
136 def __init__(self, dag_run: DagRun) -> None:
137 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}")
138 self.dag_run = dag_run
140 def serialize(self):
141 cls = self.__class__
142 # Note the DagRun object will be detached here and fails serialization, we need to create a new one
143 from airflow.models import DagRun
145 dag_run = DagRun(
146 state=self.dag_run.state,
147 dag_id=self.dag_run.dag_id,
148 run_id=self.dag_run.run_id,
149 run_type=self.dag_run.run_type,
150 )
151 dag_run.id = self.dag_run.id
152 return (
153 f"{cls.__module__}.{cls.__name__}",
154 (),
155 {"dag_run": dag_run},
156 )
159class SerializationError(AirflowException):
160 """A problem occurred when trying to serialize something."""
163class TaskInstanceNotFound(AirflowNotFoundException):
164 """Raise when a task instance is not available in the system."""
167class NotMapped(Exception):
168 """Raise if a task is neither mapped nor has any parent mapped groups."""
171class PoolNotFound(AirflowNotFoundException):
172 """Raise when a Pool is not available in the system."""
175class FileSyntaxError(NamedTuple):
176 """Information about a single error in a file."""
178 line_no: int | None
179 message: str
181 def __str__(self):
182 return f"{self.message}. Line number: s{str(self.line_no)},"
185class AirflowFileParseException(AirflowException):
186 """
187 Raises when connection or variable file can not be parsed.
189 :param msg: The human-readable description of the exception
190 :param file_path: A processed file that contains errors
191 :param parse_errors: File syntax errors
192 """
194 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None:
195 super().__init__(msg)
196 self.msg = msg
197 self.file_path = file_path
198 self.parse_errors = parse_errors
200 def __str__(self):
201 from airflow.utils.code_utils import prepare_code_snippet
202 from airflow.utils.platform import is_tty
204 result = f"{self.msg}\nFilename: {self.file_path}\n\n"
206 for error_no, parse_error in enumerate(self.parse_errors, 1):
207 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n"
208 result += f"{parse_error.message}\n"
209 if parse_error.line_no:
210 result += f"Line number: {parse_error.line_no}\n"
211 if parse_error.line_no and is_tty():
212 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n"
214 return result
217class AirflowUnsupportedFileTypeException(AirflowException):
218 """Raise when a file type is not supported."""
221class ConnectionNotUnique(AirflowException):
222 """Raise when multiple values are found for the same connection ID."""
225class VariableNotUnique(AirflowException):
226 """Raise when multiple values are found for the same variable name."""
229# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider
230# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need
231# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator
232# and it raises one of those exceptions. The code should be backwards compatible even if you import
233# and try/except the exception using direct imports from airflow.exceptions.
234# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception.
235# 2) if you have new provider, both provider and pod generator will throw the
236# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider.
237try:
238 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException
239except ImportError:
241 class PodMutationHookException(AirflowException): # type: ignore[no-redef]
242 """Raised when exception happens during Pod Mutation Hook execution."""
245try:
246 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError
247except ImportError:
249 class PodReconciliationError(AirflowException): # type: ignore[no-redef]
250 """Raised when an error is encountered while trying to merge pod configs."""
253class RemovedInAirflow4Warning(DeprecationWarning):
254 """Issued for usage of deprecated features that will be removed in Airflow4."""
256 deprecated_since: str | None = None
257 "Indicates the airflow version that started raising this deprecation warning"
260class AirflowProviderDeprecationWarning(DeprecationWarning):
261 """Issued for usage of deprecated features of Airflow provider."""
263 deprecated_provider_since: str | None = None
264 "Indicates the provider version that started raising this deprecation warning"
267class DeserializingResultError(ValueError):
268 """Raised when an error is encountered while a pickling library deserializes a pickle file."""
270 def __str__(self):
271 return (
272 "Error deserializing result. Note that result deserialization "
273 "is not supported across major Python versions. Cause: " + str(self.__cause__)
274 )
277class UnknownExecutorException(ValueError):
278 """Raised when an attempt is made to load an executor which is not configured."""
281class DeserializationError(Exception):
282 """
283 Raised when a Dag cannot be deserialized.
285 This exception should be raised using exception chaining:
286 `raise DeserializationError(dag_id) from original_exception`
287 """
289 def __init__(self, dag_id: str | None = None, message: str | None = None):
290 self.dag_id = dag_id
291 if message:
292 # Use custom message if provided
293 super().__init__(message)
294 elif dag_id is None:
295 super().__init__("Missing Dag ID in serialized Dag")
296 else:
297 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'")
300class AirflowClearRunningTaskException(AirflowException):
301 """Raise when the user attempts to clear currently running tasks."""
304_DEPRECATED_EXCEPTIONS = {
305 "AirflowDagCycleException",
306 "AirflowFailException",
307 "AirflowInactiveAssetInInletOrOutletException",
308 "AirflowSensorTimeout",
309 "AirflowSkipException",
310 "AirflowTaskTerminated",
311 "AirflowTaskTimeout",
312 "DagRunTriggerException",
313 "DownstreamTasksSkipped",
314 "DuplicateTaskIdFound",
315 "FailFastDagInvalidTriggerRule",
316 "ParamValidationError",
317 "TaskAlreadyInTaskGroup",
318 "TaskDeferralError",
319 "TaskDeferralTimeout",
320 "TaskDeferred",
321 "XComNotFound",
322}
325def __getattr__(name: str):
326 """Provide backward compatibility for moved exceptions."""
327 if name in _DEPRECATED_EXCEPTIONS:
328 import warnings
330 from airflow import DeprecatedImportWarning
331 from airflow._shared.module_loading import import_string
333 target_path = f"airflow.sdk.exceptions.{name}"
334 warnings.warn(
335 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.",
336 DeprecatedImportWarning,
337 stacklevel=2,
338 )
339 return import_string(target_path)
340 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")