Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

125 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21 

22from __future__ import annotations 

23 

24from http import HTTPStatus 

25from typing import TYPE_CHECKING, NamedTuple 

26 

27if TYPE_CHECKING: 

28 from airflow.models import DagRun 

29 

30# Re exporting AirflowConfigException from shared configuration 

31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

32 

33try: 

34 from airflow.sdk.exceptions import ( 

35 AirflowException, 

36 AirflowNotFoundException, 

37 AirflowOptionalProviderFeatureException as AirflowOptionalProviderFeatureException, 

38 AirflowRescheduleException as AirflowRescheduleException, 

39 AirflowTimetableInvalid as AirflowTimetableInvalid, 

40 NodeNotFound as NodeNotFound, 

41 ParamValidationError as ParamValidationError, 

42 TaskNotFound as TaskNotFound, 

43 ) 

44except ModuleNotFoundError: 

45 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed. 

46 # In that case, we define fallback exception classes that mirror the SDK ones. 

47 class AirflowException(Exception): # type: ignore[no-redef] 

48 """Base exception for Airflow errors.""" 

49 

50 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef] 

51 """Raise when a requested object is not found.""" 

52 

53 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef] 

54 """Raise when a DAG has an invalid timetable.""" 

55 

56 class TaskNotFound(AirflowException): # type: ignore[no-redef] 

57 """Raise when a Task is not available in the system.""" 

58 

59 class NodeNotFound(TaskNotFound, KeyError): # type: ignore[no-redef] 

60 """Raise when attempting to access an invalid node (task or task group) using [] notation.""" 

61 

62 def __str__(self) -> str: 

63 return str(self.args[0]) if self.args else "" 

64 

65 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef] 

66 """ 

67 Raise when the task should be re-scheduled at a later time. 

68 

69 :param reschedule_date: The date when the task should be rescheduled 

70 """ 

71 

72 def __init__(self, reschedule_date): 

73 super().__init__() 

74 self.reschedule_date = reschedule_date 

75 

76 def serialize(self): 

77 cls = self.__class__ 

78 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

79 

80 class AirflowOptionalProviderFeatureException(AirflowException): # type: ignore[no-redef] 

81 """Raise by providers when imports are missing for optional provider features.""" 

82 

83 class ParamValidationError(AirflowException, ValueError): # type: ignore[no-redef] 

84 """Raise when DAG params fail validation.""" 

85 

86 

87class AirflowBadRequest(AirflowException): 

88 """Raise when the application or server cannot handle the request.""" 

89 

90 status_code = HTTPStatus.BAD_REQUEST 

91 

92 

93class InvalidStatsNameException(AirflowException): 

94 """Raise when name of the stats is invalid.""" 

95 

96 

97class AirflowInternalRuntimeError(BaseException): 

98 """ 

99 Airflow Internal runtime error. 

100 

101 Indicates that something really terrible happens during the Airflow execution. 

102 

103 :meta private: 

104 """ 

105 

106 

107class AirflowDagDuplicatedIdException(AirflowException): 

108 """Raise when a DAG's ID is already used by another DAG.""" 

109 

110 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

111 super().__init__(dag_id, incoming, existing) 

112 self.dag_id = dag_id 

113 self.incoming = incoming 

114 self.existing = existing 

115 

116 def __str__(self) -> str: 

117 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

118 

119 

120class AirflowClusterPolicyViolation(AirflowException): 

121 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

122 

123 

124class AirflowClusterPolicySkipDag(AirflowException): 

125 """Raise when skipping dag is needed in Cluster Policy.""" 

126 

127 

128class AirflowClusterPolicyError(AirflowException): 

129 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag.""" 

130 

131 

132class DagNotFound(AirflowNotFoundException): 

133 """Raise when a DAG is not available in the system.""" 

134 

135 

136class DagCodeNotFound(AirflowNotFoundException): 

137 """Raise when a DAG code is not available in the system.""" 

138 

139 

140class DagRunNotFound(AirflowNotFoundException): 

141 """Raise when a DAG Run is not available in the system.""" 

142 

143 

144class DagNotPartitionedError(ValueError): 

145 """Raise when a partition_key is supplied for a Dag that is not partitioned.""" 

146 

147 

148class InvalidPartitionKeyError(ValueError): 

149 """ 

150 Raise when a partition_key value is invalid. 

151 

152 1. empty or exceeds the maximum allowed length 

153 2. cannot be decoded to a partition_date by the timetable 

154 """ 

155 

156 

157class DagRunAlreadyExists(AirflowBadRequest): 

158 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

159 

160 def __init__(self, dag_run: DagRun) -> None: 

161 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}") 

162 self.dag_run = dag_run 

163 

164 def serialize(self): 

165 cls = self.__class__ 

166 # Note the DagRun object will be detached here and fails serialization, we need to create a new one 

167 from airflow.models import DagRun 

168 

169 dag_run = DagRun( 

170 state=self.dag_run.state, 

171 dag_id=self.dag_run.dag_id, 

172 run_id=self.dag_run.run_id, 

173 run_type=self.dag_run.run_type, 

174 ) 

175 dag_run.id = self.dag_run.id 

176 return ( 

177 f"{cls.__module__}.{cls.__name__}", 

178 (), 

179 {"dag_run": dag_run}, 

180 ) 

181 

182 

183class SerializationError(AirflowException): 

184 """A problem occurred when trying to serialize something.""" 

185 

186 

187class TaskInstanceNotFound(AirflowNotFoundException): 

188 """Raise when a task instance is not available in the system.""" 

189 

190 

191class NotMapped(Exception): 

192 """Raise if a task is neither mapped nor has any parent mapped groups.""" 

193 

194 

195class PoolNotFound(AirflowNotFoundException): 

196 """Raise when a Pool is not available in the system.""" 

197 

198 

199class FileSyntaxError(NamedTuple): 

200 """Information about a single error in a file.""" 

201 

202 line_no: int | None 

203 message: str 

204 

205 def __str__(self): 

206 return f"{self.message}. Line number: {str(self.line_no)}," 

207 

208 

209class AirflowFileParseException(AirflowException): 

210 """ 

211 Raises when connection or variable file can not be parsed. 

212 

213 :param msg: The human-readable description of the exception 

214 :param file_path: A processed file that contains errors 

215 :param parse_errors: File syntax errors 

216 """ 

217 

218 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

219 super().__init__(msg) 

220 self.msg = msg 

221 self.file_path = file_path 

222 self.parse_errors = parse_errors 

223 

224 def __str__(self): 

225 from airflow.utils.code_utils import prepare_code_snippet 

226 from airflow.utils.platform import is_tty 

227 

228 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

229 

230 for error_no, parse_error in enumerate(self.parse_errors, 1): 

231 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

232 result += f"{parse_error.message}\n" 

233 if parse_error.line_no: 

234 result += f"Line number: {parse_error.line_no}\n" 

235 if parse_error.line_no and is_tty(): 

236 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

237 

238 return result 

239 

240 

241class AirflowUnsupportedFileTypeException(AirflowException): 

242 """Raise when a file type is not supported.""" 

243 

244 

245class ConnectionNotUnique(AirflowException): 

246 """Raise when multiple values are found for the same connection ID.""" 

247 

248 

249class VariableNotUnique(AirflowException): 

250 """Raise when multiple values are found for the same variable name.""" 

251 

252 

253# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider 

254# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need 

255# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator 

256# and it raises one of those exceptions. The code should be backwards compatible even if you import 

257# and try/except the exception using direct imports from airflow.exceptions. 

258# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception. 

259# 2) if you have new provider, both provider and pod generator will throw the 

260# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider. 

261try: 

262 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException 

263except ImportError: 

264 

265 class PodMutationHookException(AirflowException): # type: ignore[no-redef] 

266 """Raised when exception happens during Pod Mutation Hook execution.""" 

267 

268 

269try: 

270 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError 

271except ImportError: 

272 

273 class PodReconciliationError(AirflowException): # type: ignore[no-redef] 

274 """Raised when an error is encountered while trying to merge pod configs.""" 

275 

276 

277class RemovedInAirflow4Warning(DeprecationWarning): 

278 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

279 

280 deprecated_since: str | None = None 

281 "Indicates the airflow version that started raising this deprecation warning" 

282 

283 

284class AirflowProviderDeprecationWarning(DeprecationWarning): 

285 """Issued for usage of deprecated features of Airflow provider.""" 

286 

287 deprecated_provider_since: str | None = None 

288 "Indicates the provider version that started raising this deprecation warning" 

289 

290 

291class DeserializingResultError(ValueError): 

292 """Raised when an error is encountered while a pickling library deserializes a pickle file.""" 

293 

294 def __str__(self): 

295 return ( 

296 "Error deserializing result. Note that result deserialization " 

297 "is not supported across major Python versions. Cause: " + str(self.__cause__) 

298 ) 

299 

300 

301class UnknownExecutorException(ValueError): 

302 """Raised when an attempt is made to load an executor which is not configured.""" 

303 

304 

305class DeserializationError(Exception): 

306 """ 

307 Raised when a Dag cannot be deserialized. 

308 

309 This exception should be raised using exception chaining: 

310 `raise DeserializationError(dag_id) from original_exception` 

311 """ 

312 

313 def __init__(self, dag_id: str | None = None, message: str | None = None): 

314 self.dag_id = dag_id 

315 if message: 

316 # Use custom message if provided 

317 super().__init__(message) 

318 elif dag_id is None: 

319 super().__init__("Missing Dag ID in serialized Dag") 

320 else: 

321 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'") 

322 

323 

324class DagRunTypeNotAllowed(AirflowException): 

325 """Raised when a Dag does not allow the requested run type.""" 

326 

327 

328class AirflowClearRunningTaskException(AirflowException): 

329 """Raise when the user attempts to clear currently running tasks.""" 

330 

331 

332_DEPRECATED_EXCEPTIONS = { 

333 "AirflowDagCycleException", 

334 "AirflowFailException", 

335 "AirflowInactiveAssetInInletOrOutletException", 

336 "AirflowSensorTimeout", 

337 "AirflowSkipException", 

338 "AirflowTaskTerminated", 

339 "AirflowTaskTimeout", 

340 "DagRunTriggerException", 

341 "DownstreamTasksSkipped", 

342 "DuplicateTaskIdFound", 

343 "FailFastDagInvalidTriggerRule", 

344 "ParamValidationError", 

345 "TaskAlreadyInTaskGroup", 

346 "TaskDeferralError", 

347 "TaskDeferralTimeout", 

348 "TaskDeferred", 

349 "XComNotFound", 

350} 

351 

352 

353def __getattr__(name: str): 

354 """Provide backward compatibility for moved exceptions.""" 

355 if name in _DEPRECATED_EXCEPTIONS: 

356 import warnings 

357 

358 from airflow import DeprecatedImportWarning 

359 from airflow._shared.module_loading import import_string 

360 

361 target_path = f"airflow.sdk.exceptions.{name}" 

362 warnings.warn( 

363 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.", 

364 DeprecatedImportWarning, 

365 stacklevel=2, 

366 ) 

367 return import_string(target_path) 

368 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")