Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/exceptions.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21 

22from __future__ import annotations 

23 

24from http import HTTPStatus 

25from typing import TYPE_CHECKING, NamedTuple 

26 

27if TYPE_CHECKING: 

28 from airflow.models import DagRun 

29 

30# Re exporting AirflowConfigException from shared configuration 

31from airflow._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

32 

33try: 

34 from airflow.sdk.exceptions import ( 

35 AirflowException, 

36 AirflowNotFoundException, 

37 AirflowRescheduleException as AirflowRescheduleException, 

38 AirflowTimetableInvalid as AirflowTimetableInvalid, 

39 TaskNotFound as TaskNotFound, 

40 ) 

41except ModuleNotFoundError: 

42 # When _AIRFLOW__AS_LIBRARY is set, airflow.sdk may not be installed. 

43 # In that case, we define fallback exception classes that mirror the SDK ones. 

44 class AirflowException(Exception): # type: ignore[no-redef] 

45 """Base exception for Airflow errors.""" 

46 

47 class AirflowNotFoundException(AirflowException): # type: ignore[no-redef] 

48 """Raise when a requested object is not found.""" 

49 

50 class AirflowTimetableInvalid(AirflowException): # type: ignore[no-redef] 

51 """Raise when a DAG has an invalid timetable.""" 

52 

53 class TaskNotFound(AirflowException): # type: ignore[no-redef] 

54 """Raise when a Task is not available in the system.""" 

55 

56 class AirflowRescheduleException(AirflowException): # type: ignore[no-redef] 

57 """ 

58 Raise when the task should be re-scheduled at a later time. 

59 

60 :param reschedule_date: The date when the task should be rescheduled 

61 """ 

62 

63 def __init__(self, reschedule_date): 

64 super().__init__() 

65 self.reschedule_date = reschedule_date 

66 

67 def serialize(self): 

68 cls = self.__class__ 

69 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

70 

71 

72class AirflowBadRequest(AirflowException): 

73 """Raise when the application or server cannot handle the request.""" 

74 

75 status_code = HTTPStatus.BAD_REQUEST 

76 

77 

78class InvalidStatsNameException(AirflowException): 

79 """Raise when name of the stats is invalid.""" 

80 

81 

82class AirflowOptionalProviderFeatureException(AirflowException): 

83 """Raise by providers when imports are missing for optional provider features.""" 

84 

85 

86class AirflowInternalRuntimeError(BaseException): 

87 """ 

88 Airflow Internal runtime error. 

89 

90 Indicates that something really terrible happens during the Airflow execution. 

91 

92 :meta private: 

93 """ 

94 

95 

96class AirflowDagDuplicatedIdException(AirflowException): 

97 """Raise when a DAG's ID is already used by another DAG.""" 

98 

99 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

100 super().__init__(dag_id, incoming, existing) 

101 self.dag_id = dag_id 

102 self.incoming = incoming 

103 self.existing = existing 

104 

105 def __str__(self) -> str: 

106 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

107 

108 

109class AirflowClusterPolicyViolation(AirflowException): 

110 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

111 

112 

113class AirflowClusterPolicySkipDag(AirflowException): 

114 """Raise when skipping dag is needed in Cluster Policy.""" 

115 

116 

117class AirflowClusterPolicyError(AirflowException): 

118 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag.""" 

119 

120 

121class DagNotFound(AirflowNotFoundException): 

122 """Raise when a DAG is not available in the system.""" 

123 

124 

125class DagCodeNotFound(AirflowNotFoundException): 

126 """Raise when a DAG code is not available in the system.""" 

127 

128 

129class DagRunNotFound(AirflowNotFoundException): 

130 """Raise when a DAG Run is not available in the system.""" 

131 

132 

133class DagRunAlreadyExists(AirflowBadRequest): 

134 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

135 

136 def __init__(self, dag_run: DagRun) -> None: 

137 super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} with run id {dag_run.run_id}") 

138 self.dag_run = dag_run 

139 

140 def serialize(self): 

141 cls = self.__class__ 

142 # Note the DagRun object will be detached here and fails serialization, we need to create a new one 

143 from airflow.models import DagRun 

144 

145 dag_run = DagRun( 

146 state=self.dag_run.state, 

147 dag_id=self.dag_run.dag_id, 

148 run_id=self.dag_run.run_id, 

149 run_type=self.dag_run.run_type, 

150 ) 

151 dag_run.id = self.dag_run.id 

152 return ( 

153 f"{cls.__module__}.{cls.__name__}", 

154 (), 

155 {"dag_run": dag_run}, 

156 ) 

157 

158 

159class SerializationError(AirflowException): 

160 """A problem occurred when trying to serialize something.""" 

161 

162 

163class TaskInstanceNotFound(AirflowNotFoundException): 

164 """Raise when a task instance is not available in the system.""" 

165 

166 

167class NotMapped(Exception): 

168 """Raise if a task is neither mapped nor has any parent mapped groups.""" 

169 

170 

171class PoolNotFound(AirflowNotFoundException): 

172 """Raise when a Pool is not available in the system.""" 

173 

174 

175class FileSyntaxError(NamedTuple): 

176 """Information about a single error in a file.""" 

177 

178 line_no: int | None 

179 message: str 

180 

181 def __str__(self): 

182 return f"{self.message}. Line number: s{str(self.line_no)}," 

183 

184 

185class AirflowFileParseException(AirflowException): 

186 """ 

187 Raises when connection or variable file can not be parsed. 

188 

189 :param msg: The human-readable description of the exception 

190 :param file_path: A processed file that contains errors 

191 :param parse_errors: File syntax errors 

192 """ 

193 

194 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

195 super().__init__(msg) 

196 self.msg = msg 

197 self.file_path = file_path 

198 self.parse_errors = parse_errors 

199 

200 def __str__(self): 

201 from airflow.utils.code_utils import prepare_code_snippet 

202 from airflow.utils.platform import is_tty 

203 

204 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

205 

206 for error_no, parse_error in enumerate(self.parse_errors, 1): 

207 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

208 result += f"{parse_error.message}\n" 

209 if parse_error.line_no: 

210 result += f"Line number: {parse_error.line_no}\n" 

211 if parse_error.line_no and is_tty(): 

212 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

213 

214 return result 

215 

216 

217class AirflowUnsupportedFileTypeException(AirflowException): 

218 """Raise when a file type is not supported.""" 

219 

220 

221class ConnectionNotUnique(AirflowException): 

222 """Raise when multiple values are found for the same connection ID.""" 

223 

224 

225class VariableNotUnique(AirflowException): 

226 """Raise when multiple values are found for the same variable name.""" 

227 

228 

229# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider 

230# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need 

231# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator 

232# and it raises one of those exceptions. The code should be backwards compatible even if you import 

233# and try/except the exception using direct imports from airflow.exceptions. 

234# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception. 

235# 2) if you have new provider, both provider and pod generator will throw the 

236# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider. 

237try: 

238 from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException 

239except ImportError: 

240 

241 class PodMutationHookException(AirflowException): # type: ignore[no-redef] 

242 """Raised when exception happens during Pod Mutation Hook execution.""" 

243 

244 

245try: 

246 from airflow.providers.cncf.kubernetes.exceptions import PodReconciliationError 

247except ImportError: 

248 

249 class PodReconciliationError(AirflowException): # type: ignore[no-redef] 

250 """Raised when an error is encountered while trying to merge pod configs.""" 

251 

252 

253class RemovedInAirflow4Warning(DeprecationWarning): 

254 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

255 

256 deprecated_since: str | None = None 

257 "Indicates the airflow version that started raising this deprecation warning" 

258 

259 

260class AirflowProviderDeprecationWarning(DeprecationWarning): 

261 """Issued for usage of deprecated features of Airflow provider.""" 

262 

263 deprecated_provider_since: str | None = None 

264 "Indicates the provider version that started raising this deprecation warning" 

265 

266 

267class DeserializingResultError(ValueError): 

268 """Raised when an error is encountered while a pickling library deserializes a pickle file.""" 

269 

270 def __str__(self): 

271 return ( 

272 "Error deserializing result. Note that result deserialization " 

273 "is not supported across major Python versions. Cause: " + str(self.__cause__) 

274 ) 

275 

276 

277class UnknownExecutorException(ValueError): 

278 """Raised when an attempt is made to load an executor which is not configured.""" 

279 

280 

281class DeserializationError(Exception): 

282 """ 

283 Raised when a Dag cannot be deserialized. 

284 

285 This exception should be raised using exception chaining: 

286 `raise DeserializationError(dag_id) from original_exception` 

287 """ 

288 

289 def __init__(self, dag_id: str | None = None, message: str | None = None): 

290 self.dag_id = dag_id 

291 if message: 

292 # Use custom message if provided 

293 super().__init__(message) 

294 elif dag_id is None: 

295 super().__init__("Missing Dag ID in serialized Dag") 

296 else: 

297 super().__init__(f"An unexpected error occurred while trying to deserialize Dag '{dag_id}'") 

298 

299 

300class AirflowClearRunningTaskException(AirflowException): 

301 """Raise when the user attempts to clear currently running tasks.""" 

302 

303 

304_DEPRECATED_EXCEPTIONS = { 

305 "AirflowDagCycleException", 

306 "AirflowFailException", 

307 "AirflowInactiveAssetInInletOrOutletException", 

308 "AirflowSensorTimeout", 

309 "AirflowSkipException", 

310 "AirflowTaskTerminated", 

311 "AirflowTaskTimeout", 

312 "DagRunTriggerException", 

313 "DownstreamTasksSkipped", 

314 "DuplicateTaskIdFound", 

315 "FailFastDagInvalidTriggerRule", 

316 "ParamValidationError", 

317 "TaskAlreadyInTaskGroup", 

318 "TaskDeferralError", 

319 "TaskDeferralTimeout", 

320 "TaskDeferred", 

321 "XComNotFound", 

322} 

323 

324 

325def __getattr__(name: str): 

326 """Provide backward compatibility for moved exceptions.""" 

327 if name in _DEPRECATED_EXCEPTIONS: 

328 import warnings 

329 

330 from airflow import DeprecatedImportWarning 

331 from airflow._shared.module_loading import import_string 

332 

333 target_path = f"airflow.sdk.exceptions.{name}" 

334 warnings.warn( 

335 f"airflow.exceptions.{name} is deprecated and will be removed in a future version. Use {target_path} instead.", 

336 DeprecatedImportWarning, 

337 stacklevel=2, 

338 ) 

339 return import_string(target_path) 

340 raise AttributeError(f"module '{__name__}' has no attribute '{name}'")