Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/exceptions.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21 

22from __future__ import annotations 

23 

24import warnings 

25from http import HTTPStatus 

26from typing import TYPE_CHECKING, Any, NamedTuple 

27 

28from airflow.utils.trigger_rule import TriggerRule 

29 

30if TYPE_CHECKING: 

31 import datetime 

32 from collections.abc import Sized 

33 

34 from airflow.models import DAG, DagRun 

35 

36 

37class AirflowException(Exception): 

38 """ 

39 Base class for all Airflow's errors. 

40 

41 Each custom exception should be derived from this class. 

42 """ 

43 

44 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

45 

46 

47class AirflowBadRequest(AirflowException): 

48 """Raise when the application or server cannot handle the request.""" 

49 

50 status_code = HTTPStatus.BAD_REQUEST 

51 

52 

53class AirflowNotFoundException(AirflowException): 

54 """Raise when the requested object/resource is not available in the system.""" 

55 

56 status_code = HTTPStatus.NOT_FOUND 

57 

58 

59class AirflowConfigException(AirflowException): 

60 """Raise when there is configuration problem.""" 

61 

62 

63class AirflowSensorTimeout(AirflowException): 

64 """Raise when there is a timeout on sensor polling.""" 

65 

66 

67class AirflowRescheduleException(AirflowException): 

68 """ 

69 Raise when the task should be re-scheduled at a later time. 

70 

71 :param reschedule_date: The date when the task should be rescheduled 

72 """ 

73 

74 def __init__(self, reschedule_date): 

75 super().__init__() 

76 self.reschedule_date = reschedule_date 

77 

78 def serialize(self): 

79 return "AirflowRescheduleException", (), {"reschedule_date": self.reschedule_date} 

80 

81 

82class InvalidStatsNameException(AirflowException): 

83 """Raise when name of the stats is invalid.""" 

84 

85 

86# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

87# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

88# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

89class AirflowTaskTimeout(BaseException): 

90 """Raise when the task execution times-out.""" 

91 

92 

93class AirflowTaskTerminated(BaseException): 

94 """Raise when the task execution is terminated.""" 

95 

96 

97class AirflowWebServerTimeout(AirflowException): 

98 """Raise when the web server times out.""" 

99 

100 

101class AirflowSkipException(AirflowException): 

102 """Raise when the task should be skipped.""" 

103 

104 

105class AirflowFailException(AirflowException): 

106 """Raise when the task should be failed without retrying.""" 

107 

108 

109class AirflowOptionalProviderFeatureException(AirflowException): 

110 """Raise by providers when imports are missing for optional provider features.""" 

111 

112 

113class AirflowInternalRuntimeError(BaseException): 

114 """ 

115 Airflow Internal runtime error. 

116 

117 Indicates that something really terrible happens during the Airflow execution. 

118 

119 :meta private: 

120 """ 

121 

122 

123class XComNotFound(AirflowException): 

124 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

125 

126 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

127 super().__init__() 

128 self.dag_id = dag_id 

129 self.task_id = task_id 

130 self.key = key 

131 

132 def __str__(self) -> str: 

133 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

134 

135 

136class UnmappableOperator(AirflowException): 

137 """Raise when an operator is not implemented to be mappable.""" 

138 

139 

140class XComForMappingNotPushed(AirflowException): 

141 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

142 

143 def __str__(self) -> str: 

144 return "did not push XCom for task mapping" 

145 

146 

147class UnmappableXComTypePushed(AirflowException): 

148 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

149 

150 def __init__(self, value: Any, *values: Any) -> None: 

151 super().__init__(value, *values) 

152 

153 def __str__(self) -> str: 

154 typename = type(self.args[0]).__qualname__ 

155 for arg in self.args[1:]: 

156 typename = f"{typename}[{type(arg).__qualname__}]" 

157 return f"unmappable return type {typename!r}" 

158 

159 

160class UnmappableXComLengthPushed(AirflowException): 

161 """Raise when the pushed value is too large to map as a downstream's dependency.""" 

162 

163 def __init__(self, value: Sized, max_length: int) -> None: 

164 super().__init__(value) 

165 self.value = value 

166 self.max_length = max_length 

167 

168 def __str__(self) -> str: 

169 return f"unmappable return value length: {len(self.value)} > {self.max_length}" 

170 

171 

172class AirflowDagCycleException(AirflowException): 

173 """Raise when there is a cycle in DAG definition.""" 

174 

175 

176class AirflowDagDuplicatedIdException(AirflowException): 

177 """Raise when a DAG's ID is already used by another DAG.""" 

178 

179 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

180 super().__init__(dag_id, incoming, existing) 

181 self.dag_id = dag_id 

182 self.incoming = incoming 

183 self.existing = existing 

184 

185 def __str__(self) -> str: 

186 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

187 

188 

189class AirflowDagInconsistent(AirflowException): 

190 """Raise when a DAG has inconsistent attributes.""" 

191 

192 

193class AirflowClusterPolicyViolation(AirflowException): 

194 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

195 

196 

197class AirflowClusterPolicySkipDag(AirflowException): 

198 """Raise when skipping dag is needed in Cluster Policy.""" 

199 

200 

201class AirflowClusterPolicyError(AirflowException): 

202 """Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag.""" 

203 

204 

205class AirflowTimetableInvalid(AirflowException): 

206 """Raise when a DAG has an invalid timetable.""" 

207 

208 

209class DagNotFound(AirflowNotFoundException): 

210 """Raise when a DAG is not available in the system.""" 

211 

212 

213class DagCodeNotFound(AirflowNotFoundException): 

214 """Raise when a DAG code is not available in the system.""" 

215 

216 

217class DagRunNotFound(AirflowNotFoundException): 

218 """Raise when a DAG Run is not available in the system.""" 

219 

220 

221class DagRunAlreadyExists(AirflowBadRequest): 

222 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

223 

224 def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None: 

225 super().__init__( 

226 f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}" 

227 ) 

228 self.dag_run = dag_run 

229 

230 

231class DagFileExists(AirflowBadRequest): 

232 """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder.""" 

233 

234 def __init__(self, *args, **kwargs): 

235 super().__init__(*args, **kwargs) 

236 warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2) 

237 

238 

239class FailStopDagInvalidTriggerRule(AirflowException): 

240 """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule.""" 

241 

242 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

243 

244 @classmethod 

245 def check(cls, *, dag: DAG | None, trigger_rule: TriggerRule): 

246 """ 

247 Check that fail_stop dag tasks have allowable trigger rules. 

248 

249 :meta private: 

250 """ 

251 if dag is not None and dag.fail_stop and trigger_rule not in cls._allowed_rules: 

252 raise cls() 

253 

254 def __str__(self) -> str: 

255 return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

256 

257 

258class DuplicateTaskIdFound(AirflowException): 

259 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

260 

261 

262class TaskAlreadyInTaskGroup(AirflowException): 

263 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

264 

265 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str) -> None: 

266 super().__init__(task_id, new_group_id) 

267 self.task_id = task_id 

268 self.existing_group_id = existing_group_id 

269 self.new_group_id = new_group_id 

270 

271 def __str__(self) -> str: 

272 if self.existing_group_id is None: 

273 existing_group = "the DAG's root group" 

274 else: 

275 existing_group = f"group {self.existing_group_id!r}" 

276 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

277 

278 

279class SerializationError(AirflowException): 

280 """A problem occurred when trying to serialize something.""" 

281 

282 

283class ParamValidationError(AirflowException): 

284 """Raise when DAG params is invalid.""" 

285 

286 

287class TaskNotFound(AirflowNotFoundException): 

288 """Raise when a Task is not available in the system.""" 

289 

290 

291class TaskInstanceNotFound(AirflowNotFoundException): 

292 """Raise when a task instance is not available in the system.""" 

293 

294 

295class PoolNotFound(AirflowNotFoundException): 

296 """Raise when a Pool is not available in the system.""" 

297 

298 

299class NoAvailablePoolSlot(AirflowException): 

300 """Raise when there is not enough slots in pool.""" 

301 

302 

303class DagConcurrencyLimitReached(AirflowException): 

304 """Raise when DAG max_active_tasks limit is reached.""" 

305 

306 

307class TaskConcurrencyLimitReached(AirflowException): 

308 """Raise when task max_active_tasks limit is reached.""" 

309 

310 

311class BackfillUnfinished(AirflowException): 

312 """ 

313 Raises when not all tasks succeed in backfill. 

314 

315 :param message: The human-readable description of the exception 

316 :param ti_status: The information about all task statuses 

317 """ 

318 

319 def __init__(self, message, ti_status): 

320 super().__init__(message) 

321 self.ti_status = ti_status 

322 

323 

324class FileSyntaxError(NamedTuple): 

325 """Information about a single error in a file.""" 

326 

327 line_no: int | None 

328 message: str 

329 

330 def __str__(self): 

331 return f"{self.message}. Line number: s{str(self.line_no)}," 

332 

333 

334class AirflowFileParseException(AirflowException): 

335 """ 

336 Raises when connection or variable file can not be parsed. 

337 

338 :param msg: The human-readable description of the exception 

339 :param file_path: A processed file that contains errors 

340 :param parse_errors: File syntax errors 

341 """ 

342 

343 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

344 super().__init__(msg) 

345 self.msg = msg 

346 self.file_path = file_path 

347 self.parse_errors = parse_errors 

348 

349 def __str__(self): 

350 from airflow.utils.code_utils import prepare_code_snippet 

351 from airflow.utils.platform import is_tty 

352 

353 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

354 

355 for error_no, parse_error in enumerate(self.parse_errors, 1): 

356 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

357 result += f"{parse_error.message}\n" 

358 if parse_error.line_no: 

359 result += f"Line number: {parse_error.line_no}\n" 

360 if parse_error.line_no and is_tty(): 

361 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

362 

363 return result 

364 

365 

366class ConnectionNotUnique(AirflowException): 

367 """Raise when multiple values are found for the same connection ID.""" 

368 

369 

370class TaskDeferred(BaseException): 

371 """ 

372 Signal an operator moving to deferred state. 

373 

374 Special exception raised to signal that the operator it was raised from 

375 wishes to defer until a trigger fires. 

376 """ 

377 

378 def __init__( 

379 self, 

380 *, 

381 trigger, 

382 method_name: str, 

383 kwargs: dict[str, Any] | None = None, 

384 timeout: datetime.timedelta | None = None, 

385 ): 

386 super().__init__() 

387 self.trigger = trigger 

388 self.method_name = method_name 

389 self.kwargs = kwargs 

390 self.timeout = timeout 

391 # Check timeout type at runtime 

392 if self.timeout is not None and not hasattr(self.timeout, "total_seconds"): 

393 raise ValueError("Timeout value must be a timedelta") 

394 

395 def serialize(self): 

396 return ( 

397 self.__class__.__name__, 

398 (), 

399 { 

400 "trigger": self.trigger, 

401 "method_name": self.method_name, 

402 "kwargs": self.kwargs, 

403 "timeout": self.timeout, 

404 }, 

405 ) 

406 

407 def __repr__(self) -> str: 

408 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

409 

410 

411class TaskDeferralError(AirflowException): 

412 """Raised when a task failed during deferral for some reason.""" 

413 

414 

415# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider 

416# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need 

417# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator 

418# and it raises one of those exceptions. The code should be backwards compatible even if you import 

419# and try/except the exception using direct imports from airflow.exceptions. 

420# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception. 

421# 2) if you have new provider, both provider and pod generator will throw the 

422# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider. 

423try: 

424 from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException 

425except ImportError: 

426 

427 class PodMutationHookException(AirflowException): # type: ignore[no-redef] 

428 """Raised when exception happens during Pod Mutation Hook execution.""" 

429 

430 

431try: 

432 from airflow.providers.cncf.kubernetes.pod_generator import PodReconciliationError 

433except ImportError: 

434 

435 class PodReconciliationError(AirflowException): # type: ignore[no-redef] 

436 """Raised when an error is encountered while trying to merge pod configs.""" 

437 

438 

439class RemovedInAirflow3Warning(DeprecationWarning): 

440 """Issued for usage of deprecated features that will be removed in Airflow3.""" 

441 

442 deprecated_since: str | None = None 

443 "Indicates the airflow version that started raising this deprecation warning" 

444 

445 

446class AirflowProviderDeprecationWarning(DeprecationWarning): 

447 """Issued for usage of deprecated features of Airflow provider.""" 

448 

449 deprecated_provider_since: str | None = None 

450 "Indicates the provider version that started raising this deprecation warning" 

451 

452 

453class DeserializingResultError(ValueError): 

454 """Raised when an error is encountered while a pickling library deserializes a pickle file.""" 

455 

456 def __str__(self): 

457 return ( 

458 "Error deserializing result. Note that result deserialization " 

459 "is not supported across major Python versions. Cause: " + str(self.__cause__) 

460 )