Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

123 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21 

22from __future__ import annotations 

23 

24from http import HTTPStatus 

25from typing import TYPE_CHECKING, NamedTuple 

26 

27if TYPE_CHECKING: 

28 from airflow.models import DagRun 

29 

30# Re exporting AirflowConfigException from shared configuration 

31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

32 

33try: 

34 from airflow.sdk.exceptions import ( 

35 AirflowException, 

36 AirflowNotFoundException, 

37 AirflowOptionalProviderFeatureException as AirflowOptionalProviderFeatureException, 

38 AirflowRescheduleException as AirflowRescheduleException, 

39 AirflowTimetableInvalid as AirflowTimetableInvalid, 

40 NodeNotFound as NodeNotFound, 

41 ParamValidationError as ParamValidationError, 

42 TaskNotFound as TaskNotFound, 

43 ) 

44except ModuleNotFoundError: 

45 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed. 

46 # In that case, we define fallback exception classes that mirror the SDK ones. 

47 class AirflowException(Exception): # type: ignore[no-redef] 

48 """Base exception for Airflow errors.""" 

49 

50 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef] 

51 """Raise when a requested object is not found.""" 

52 

53 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef] 

54 """Raise when a DAG has an invalid timetable.""" 

55 

56 class TaskNotFound(AirflowException): # type: ignore[no-redef] 

57 """Raise when a Task is not available in the system.""" 

58 

59 class NodeNotFound(TaskNotFound, KeyError): # type: ignore[no-redef] 

60 """Raise when attempting to access an invalid node (task or task group) using [] notation.""" 

61 

62 def __str__(self) -> str: 

63 return str(self.args[0]) if self.args else "" 

64 

65 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef] 

66 """ 

67 Raise when the task should be re-scheduled at a later time. 

68 

69 :param reschedule_date: The date when the task should be rescheduled 

70 """ 

71 

72 def __init__(self, reschedule_date): 

73 super().__init__() 

74 self.reschedule_date = reschedule_date 

75 

76 def serialize(self): 

77 cls = self.__class__ 

78 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

79 

80 class AirflowOptionalProviderFeatureException(AirflowException): # type: ignore[no-redef] 

81 """Raise by providers when imports are missing for optional provider features.""" 

82 

83 class ParamValidationError(AirflowException, ValueError): # type: ignore[no-redef] 

84 """Raise when DAG params fail validation.""" 

85 

86 

87class AirflowBadRequest(AirflowException): 

88 """Raise when the application or server cannot handle the request.""" 

89 

90 status_code = HTTPStatus.BAD_REQUEST 

91 

92 

93class InvalidStatsNameException(AirflowException): 

94 """Raise when name of the stats is invalid.""" 

95 

96 

97class AirflowInternalRuntimeError(BaseException): 

98 """ 

99 Airflow Internal runtime error. 

100 

101 Indicates that something really terrible happens during the Airflow execution. 

102 

103 :meta private: 

104 """ 

105 

106 

107class AirflowDagDuplicatedIdException(AirflowException): 

108 """Raise when a DAG's ID is already used by another DAG.""" 

109 

110 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

111 super().__init__(dag_id, incoming, existing) 

112 self.dag_id = dag_id 

113 self.incoming = incoming 

114 self.existing = existing 

115 

116 def __str__(self) -> str: 

117 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

118 

119 

120class AirflowClusterPolicyViolation(AirflowException): 

121 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

122 

123 

124class AirflowClusterPolicySkipDag(AirflowException): 

125 """Raise when skipping dag is needed in Cluster Policy.""" 

126 

127 

128class AirflowClusterPolicyError(AirflowException): 

129 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag.""" 

130 

131 

132class DagNotFound(AirflowNotFoundException): 

133 """Raise when a DAG is not available in the system.""" 

134 

135 

136class DagCodeNotFound(AirflowNotFoundException): 

137 """Raise when a DAG code is not available in the system.""" 

138 

139 

140class DagRunNotFound(AirflowNotFoundException): 

141 """Raise when a DAG Run is not available in the system.""" 

142 

143 

144class DagRunAlreadyExists(AirflowBadRequest): 

145 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

146 

147 def __init__(self, dag_run: DagRun) -> None: 

148 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}") 

149 self.dag_run = dag_run 

150 

151 def serialize(self): 

152 cls = self.__class__ 

153 # Note the DagRun object will be detached here and fails serialization, we need to create a new one 

154 from airflow.models import DagRun 

155 

156 dag_run = DagRun( 

157 state=self.dag_run.state, 

158 dag_id=self.dag_run.dag_id, 

159 run_id=self.dag_run.run_id, 

160 run_type=self.dag_run.run_type, 

161 ) 

162 dag_run.id = self.dag_run.id 

163 return ( 

164 f"{cls.__module__}.{cls.__name__}", 

165 (), 

166 {"dag_run": dag_run}, 

167 ) 

168 

169 

170class SerializationError(AirflowException): 

171 """A problem occurred when trying to serialize something.""" 

172 

173 

174class TaskInstanceNotFound(AirflowNotFoundException): 

175 """Raise when a task instance is not available in the system.""" 

176 

177 

178class NotMapped(Exception): 

179 """Raise if a task is neither mapped nor has any parent mapped groups.""" 

180 

181 

182class PoolNotFound(AirflowNotFoundException): 

183 """Raise when a Pool is not available in the system.""" 

184 

185 

186class FileSyntaxError(NamedTuple): 

187 """Information about a single error in a file.""" 

188 

189 line_no: int | None 

190 message: str 

191 

192 def __str__(self): 

193 return f"{self.message}. Line number: {str(self.line_no)}," 

194 

195 

196class AirflowFileParseException(AirflowException): 

197 """ 

198 Raises when connection or variable file can not be parsed. 

199 

200 :param msg: The human-readable description of the exception 

201 :param file_path: A processed file that contains errors 

202 :param parse_errors: File syntax errors 

203 """ 

204 

205 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

206 super().__init__(msg) 

207 self.msg = msg 

208 self.file_path = file_path 

209 self.parse_errors = parse_errors 

210 

211 def __str__(self): 

212 from airflow.utils.code_utils import prepare_code_snippet 

213 from airflow.utils.platform import is_tty 

214 

215 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

216 

217 for error_no, parse_error in enumerate(self.parse_errors, 1): 

218 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

219 result += f"{parse_error.message}\n" 

220 if parse_error.line_no: 

221 result += f"Line number: {parse_error.line_no}\n" 

222 if parse_error.line_no and is_tty(): 

223 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

224 

225 return result 

226 

227 

228class AirflowUnsupportedFileTypeException(AirflowException): 

229 """Raise when a file type is not supported.""" 

230 

231 

232class ConnectionNotUnique(AirflowException): 

233 """Raise when multiple values are found for the same connection ID.""" 

234 

235 

236class VariableNotUnique(AirflowException): 

237 """Raise when multiple values are found for the same variable name.""" 

238 

239 

240# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider 

241# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need 

242# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator 

243# and it raises one of those exceptions. The code should be backwards compatible even if you import 

244# and try/except the exception using direct imports from airflow.exceptions. 

245# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception. 

246# 2) if you have new provider, both provider and pod generator will throw the 

247# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider. 

248try: 

249 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException 

250except ImportError: 

251 

252 class PodMutationHookException(AirflowException): # type: ignore[no-redef] 

253 """Raised when exception happens during Pod Mutation Hook execution.""" 

254 

255 

256try: 

257 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError 

258except ImportError: 

259 

260 class PodReconciliationError(AirflowException): # type: ignore[no-redef] 

261 """Raised when an error is encountered while trying to merge pod configs.""" 

262 

263 

264class RemovedInAirflow4Warning(DeprecationWarning): 

265 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

266 

267 deprecated_since: str | None = None 

268 "Indicates the airflow version that started raising this deprecation warning" 

269 

270 

271class AirflowProviderDeprecationWarning(DeprecationWarning): 

272 """Issued for usage of deprecated features of Airflow provider.""" 

273 

274 deprecated_provider_since: str | None = None 

275 "Indicates the provider version that started raising this deprecation warning" 

276 

277 

278class DeserializingResultError(ValueError): 

279 """Raised when an error is encountered while a pickling library deserializes a pickle file.""" 

280 

281 def __str__(self): 

282 return ( 

283 "Error deserializing result. Note that result deserialization " 

284 "is not supported across major Python versions. Cause: " + str(self.__cause__) 

285 ) 

286 

287 

288class UnknownExecutorException(ValueError): 

289 """Raised when an attempt is made to load an executor which is not configured.""" 

290 

291 

292class DeserializationError(Exception): 

293 """ 

294 Raised when a Dag cannot be deserialized. 

295 

296 This exception should be raised using exception chaining: 

297 `raise DeserializationError(dag_id) from original_exception` 

298 """ 

299 

300 def __init__(self, dag_id: str | None = None, message: str | None = None): 

301 self.dag_id = dag_id 

302 if message: 

303 # Use custom message if provided 

304 super().__init__(message) 

305 elif dag_id is None: 

306 super().__init__("Missing Dag ID in serialized Dag") 

307 else: 

308 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'") 

309 

310 

311class DagRunTypeNotAllowed(AirflowException): 

312 """Raised when a Dag does not allow the requested run type.""" 

313 

314 

315class AirflowClearRunningTaskException(AirflowException): 

316 """Raise when the user attempts to clear currently running tasks.""" 

317 

318 

319_DEPRECATED_EXCEPTIONS = { 

320 "AirflowDagCycleException", 

321 "AirflowFailException", 

322 "AirflowInactiveAssetInInletOrOutletException", 

323 "AirflowSensorTimeout", 

324 "AirflowSkipException", 

325 "AirflowTaskTerminated", 

326 "AirflowTaskTimeout", 

327 "DagRunTriggerException", 

328 "DownstreamTasksSkipped", 

329 "DuplicateTaskIdFound", 

330 "FailFastDagInvalidTriggerRule", 

331 "ParamValidationError", 

332 "TaskAlreadyInTaskGroup", 

333 "TaskDeferralError", 

334 "TaskDeferralTimeout", 

335 "TaskDeferred", 

336 "XComNotFound", 

337} 

338 

339 

340def __getattr__(name: str): 

341 """Provide backward compatibility for moved exceptions.""" 

342 if name in _DEPRECATED_EXCEPTIONS: 

343 import warnings 

344 

345 from airflow import DeprecatedImportWarning 

346 from airflow._shared.module_loading import import_string 

347 

348 target_path = f"airflow.sdk.exceptions.{name}" 

349 warnings.warn( 

350 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.", 

351 DeprecatedImportWarning, 

352 stacklevel=2, 

353 ) 

354 return import_string(target_path) 

355 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")