Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18# Note: Any AirflowException raised is expected to cause the TaskInstance
19# to be marked in an ERROR state
20"""Exceptions used by Airflow."""
22from __future__ import annotations
24from http import HTTPStatus
25from typing import TYPE_CHECKING, NamedTuple
27if TYPE_CHECKING:
28 from airflow.models import DagRun
30# Re exporting AirflowConfigException from shared configuration
31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException
33try:
34 from airflow.sdk.exceptions import (
35 AirflowException,
36 AirflowNotFoundException,
37 AirflowOptionalProviderFeatureException as AirflowOptionalProviderFeatureException,
38 AirflowRescheduleException as AirflowRescheduleException,
39 AirflowTimetableInvalid as AirflowTimetableInvalid,
40 NodeNotFound as NodeNotFound,
41 ParamValidationError as ParamValidationError,
42 TaskNotFound as TaskNotFound,
43 )
44except ModuleNotFoundError:
45 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed.
46 # In that case, we define fallback exception classes that mirror the SDK ones.
47 class AirflowException(Exception): # type: ignore[no-redef]
48 """Base exception for Airflow errors."""
50 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef]
51 """Raise when a requested object is not found."""
53 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef]
54 """Raise when a DAG has an invalid timetable."""
56 class TaskNotFound(AirflowException): # type: ignore[no-redef]
57 """Raise when a Task is not available in the system."""
59 class NodeNotFound(TaskNotFound, KeyError): # type: ignore[no-redef]
60 """Raise when attempting to access an invalid node (task or task group) using [] notation."""
62 def __str__(self) -> str:
63 return str(self.args[0]) if self.args else ""
65 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef]
66 """
67 Raise when the task should be re-scheduled at a later time.
69 :param reschedule_date: The date when the task should be rescheduled
70 """
72 def __init__(self, reschedule_date):
73 super().__init__()
74 self.reschedule_date = reschedule_date
76 def serialize(self):
77 cls = self.__class__
78 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date}
80 class AirflowOptionalProviderFeatureException(AirflowException): # type: ignore[no-redef]
81 """Raise by providers when imports are missing for optional provider features."""
83 class ParamValidationError(AirflowException, ValueError): # type: ignore[no-redef]
84 """Raise when DAG params fail validation."""
87class AirflowBadRequest(AirflowException):
88 """Raise when the application or server cannot handle the request."""
90 status_code = HTTPStatus.BAD_REQUEST
93class InvalidStatsNameException(AirflowException):
94 """Raise when name of the stats is invalid."""
97class AirflowInternalRuntimeError(BaseException):
98 """
99 Airflow Internal runtime error.
101 Indicates that something really terrible happens during the Airflow execution.
103 :meta private:
104 """
107class AirflowDagDuplicatedIdException(AirflowException):
108 """Raise when a DAG's ID is already used by another DAG."""
110 def __init__(self, dag_id: str, incoming: str, existing: str) -> None:
111 super().__init__(dag_id, incoming, existing)
112 self.dag_id = dag_id
113 self.incoming = incoming
114 self.existing = existing
116 def __str__(self) -> str:
117 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
120class AirflowClusterPolicyViolation(AirflowException):
121 """Raise when there is a violation of a Cluster Policy in DAG definition."""
124class AirflowClusterPolicySkipDag(AirflowException):
125 """Raise when skipping dag is needed in Cluster Policy."""
128class AirflowClusterPolicyError(AirflowException):
129 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""
132class DagNotFound(AirflowNotFoundException):
133 """Raise when a DAG is not available in the system."""
136class DagCodeNotFound(AirflowNotFoundException):
137 """Raise when a DAG code is not available in the system."""
140class DagRunNotFound(AirflowNotFoundException):
141 """Raise when a DAG Run is not available in the system."""
144class DagNotPartitionedError(ValueError):
145 """Raise when a partition_key is supplied for a Dag that is not partitioned."""
148class InvalidPartitionKeyError(ValueError):
149 """
150 Raise when a partition_key value is invalid.
152 1. empty or exceeds the maximum allowed length
153 2. cannot be decoded to a partition_date by the timetable
154 """
157class DagRunAlreadyExists(AirflowBadRequest):
158 """Raise when creating a DAG run for DAG which already has DAG run entry."""
160 def __init__(self, dag_run: DagRun) -> None:
161 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}")
162 self.dag_run = dag_run
164 def serialize(self):
165 cls = self.__class__
166 # Note the DagRun object will be detached here and fails serialization, we need to create a new one
167 from airflow.models import DagRun
169 dag_run = DagRun(
170 state=self.dag_run.state,
171 dag_id=self.dag_run.dag_id,
172 run_id=self.dag_run.run_id,
173 run_type=self.dag_run.run_type,
174 )
175 dag_run.id = self.dag_run.id
176 return (
177 f"{cls.__module__}.{cls.__name__}",
178 (),
179 {"dag_run": dag_run},
180 )
183class SerializationError(AirflowException):
184 """A problem occurred when trying to serialize something."""
187class TaskInstanceNotFound(AirflowNotFoundException):
188 """Raise when a task instance is not available in the system."""
191class NotMapped(Exception):
192 """Raise if a task is neither mapped nor has any parent mapped groups."""
195class PoolNotFound(AirflowNotFoundException):
196 """Raise when a Pool is not available in the system."""
199class FileSyntaxError(NamedTuple):
200 """Information about a single error in a file."""
202 line_no: int | None
203 message: str
205 def __str__(self):
206 return f"{self.message}. Line number: {str(self.line_no)},"
209class AirflowFileParseException(AirflowException):
210 """
211 Raises when connection or variable file can not be parsed.
213 :param msg: The human-readable description of the exception
214 :param file_path: A processed file that contains errors
215 :param parse_errors: File syntax errors
216 """
218 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None:
219 super().__init__(msg)
220 self.msg = msg
221 self.file_path = file_path
222 self.parse_errors = parse_errors
224 def __str__(self):
225 from airflow.utils.code_utils import prepare_code_snippet
226 from airflow.utils.platform import is_tty
228 result = f"{self.msg}\nFilename: {self.file_path}\n\n"
230 for error_no, parse_error in enumerate(self.parse_errors, 1):
231 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n"
232 result += f"{parse_error.message}\n"
233 if parse_error.line_no:
234 result += f"Line number: {parse_error.line_no}\n"
235 if parse_error.line_no and is_tty():
236 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n"
238 return result
241class AirflowUnsupportedFileTypeException(AirflowException):
242 """Raise when a file type is not supported."""
245class ConnectionNotUnique(AirflowException):
246 """Raise when multiple values are found for the same connection ID."""
249class VariableNotUnique(AirflowException):
250 """Raise when multiple values are found for the same variable name."""
253# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider
254# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need
255# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator
256# and it raises one of those exceptions. The code should be backwards compatible even if you import
257# and try/except the exception using direct imports from airflow.exceptions.
258# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception.
259# 2) if you have new provider, both provider and pod generator will throw the
260# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider.
261try:
262 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException
263except ImportError:
265 class PodMutationHookException(AirflowException): # type: ignore[no-redef]
266 """Raised when exception happens during Pod Mutation Hook execution."""
269try:
270 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError
271except ImportError:
273 class PodReconciliationError(AirflowException): # type: ignore[no-redef]
274 """Raised when an error is encountered while trying to merge pod configs."""
277class RemovedInAirflow4Warning(DeprecationWarning):
278 """Issued for usage of deprecated features that will be removed in Airflow4."""
280 deprecated_since: str | None = None
281 "Indicates the airflow version that started raising this deprecation warning"
284class AirflowProviderDeprecationWarning(DeprecationWarning):
285 """Issued for usage of deprecated features of Airflow provider."""
287 deprecated_provider_since: str | None = None
288 "Indicates the provider version that started raising this deprecation warning"
291class DeserializingResultError(ValueError):
292 """Raised when an error is encountered while a pickling library deserializes a pickle file."""
294 def __str__(self):
295 return (
296 "Error deserializing result. Note that result deserialization "
297 "is not supported across major Python versions. Cause: " + str(self.__cause__)
298 )
301class UnknownExecutorException(ValueError):
302 """Raised when an attempt is made to load an executor which is not configured."""
305class DeserializationError(Exception):
306 """
307 Raised when a Dag cannot be deserialized.
309 This exception should be raised using exception chaining:
310 `raise DeserializationError(dag_id) from original_exception`
311 """
313 def __init__(self, dag_id: str | None = None, message: str | None = None):
314 self.dag_id = dag_id
315 if message:
316 # Use custom message if provided
317 super().__init__(message)
318 elif dag_id is None:
319 super().__init__("Missing Dag ID in serialized Dag")
320 else:
321 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'")
324class DagRunTypeNotAllowed(AirflowException):
325 """Raised when a Dag does not allow the requested run type."""
328class AirflowClearRunningTaskException(AirflowException):
329 """Raise when the user attempts to clear currently running tasks."""
332_DEPRECATED_EXCEPTIONS = {
333 "AirflowDagCycleException",
334 "AirflowFailException",
335 "AirflowInactiveAssetInInletOrOutletException",
336 "AirflowSensorTimeout",
337 "AirflowSkipException",
338 "AirflowTaskTerminated",
339 "AirflowTaskTimeout",
340 "DagRunTriggerException",
341 "DownstreamTasksSkipped",
342 "DuplicateTaskIdFound",
343 "FailFastDagInvalidTriggerRule",
344 "ParamValidationError",
345 "TaskAlreadyInTaskGroup",
346 "TaskDeferralError",
347 "TaskDeferralTimeout",
348 "TaskDeferred",
349 "XComNotFound",
350}
353def __getattr__(name: str):
354 """Provide backward compatibility for moved exceptions."""
355 if name in _DEPRECATED_EXCEPTIONS:
356 import warnings
358 from airflow import DeprecatedImportWarning
359 from airflow._shared.module_loading import import_string
361 target_path = f"airflow.sdk.exceptions.{name}"
362 warnings.warn(
363 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.",
364 DeprecatedImportWarning,
365 stacklevel=2,
366 )
367 return import_string(target_path)
368 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")