Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/exceptions.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

166 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import enum 

21from http import HTTPStatus 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow.sdk import TriggerRule 

25 

26# Re exporting AirflowConfigException from shared configuration 

27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Collection 

31 

32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

33 from airflow.sdk.execution_time.comms import ErrorResponse 

34 

35 

36class AirflowException(Exception): 

37 """ 

38 Base class for all Airflow's errors. 

39 

40 Each custom exception should be derived from this class. 

41 """ 

42 

43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

44 

45 def serialize(self): 

46 cls = self.__class__ 

47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {} 

48 

49 

50class AirflowOptionalProviderFeatureException(AirflowException): 

51 """Raise by providers when imports are missing for optional provider features.""" 

52 

53 

54class AirflowNotFoundException(AirflowException): 

55 """Raise when the requested object/resource is not available in the system.""" 

56 

57 status_code = HTTPStatus.NOT_FOUND 

58 

59 

60class AirflowSecretsBackendAccessDenied(PermissionError): 

61 """ 

62 Authoritative deny from a secrets backend; dispatcher must NOT fall through. 

63 

64 Distinct from a generic ``PermissionError`` (e.g. an incidental filesystem 

65 ``OSError``-family raise from inside an unrelated backend) so the 

66 secrets-backend dispatcher loops can re-raise only this signal and keep 

67 treating other exceptions as "try the next backend". 

68 """ 

69 

70 

71class AirflowDagCycleException(AirflowException): 

72 """Raise when there is a cycle in Dag definition.""" 

73 

74 

75class AirflowRuntimeError(Exception): 

76 """Generic Airflow error raised by runtime functions.""" 

77 

78 def __init__(self, error: ErrorResponse): 

79 self.error = error 

80 super().__init__(f"{error.error.value}: {error.detail}") 

81 

82 

83class AirflowTimetableInvalid(AirflowException): 

84 """Raise when a DAG has an invalid timetable.""" 

85 

86 

87class ErrorType(enum.Enum): 

88 """Error types used in the API client.""" 

89 

90 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND" 

91 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND" 

92 XCOM_NOT_FOUND = "XCOM_NOT_FOUND" 

93 ASSET_NOT_FOUND = "ASSET_NOT_FOUND" 

94 TASK_STORE_NOT_FOUND = "TASK_STORE_NOT_FOUND" 

95 ASSET_STORE_NOT_FOUND = "ASSET_STORE_NOT_FOUND" 

96 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS" 

97 # Distinct from API_SERVER_ERROR: signals an explicit 401/403 from the 

98 # Execution API. Callers like ExecutionAPISecretsBackend treat this as 

99 # a deny rather than a "not found" so the secrets-backend dispatcher 

100 # does NOT fall through to a less-restrictive backend (e.g. env vars). 

101 PERMISSION_DENIED = "PERMISSION_DENIED" 

102 GENERIC_ERROR = "GENERIC_ERROR" 

103 API_SERVER_ERROR = "API_SERVER_ERROR" 

104 

105 

106class XComForMappingNotPushed(TypeError): 

107 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

108 

109 def __str__(self) -> str: 

110 return "did not push XCom for task mapping" 

111 

112 

113class UnmappableXComTypePushed(TypeError): 

114 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

115 

116 def __init__(self, value: Any, *values: Any) -> None: 

117 super().__init__(value, *values) 

118 

119 def __str__(self) -> str: 

120 typename = type(self.args[0]).__qualname__ 

121 for arg in self.args[1:]: 

122 typename = f"{typename}[{type(arg).__qualname__}]" 

123 return f"unmappable return type {typename!r}" 

124 

125 

126class AirflowFailException(AirflowException): 

127 """Raise when the task should be failed without retrying.""" 

128 

129 

130class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException): 

131 main_message: str 

132 

133 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None: 

134 self.inactive_asset_keys = inactive_asset_keys 

135 

136 @staticmethod 

137 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str: 

138 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

139 

140 if isinstance(key, AssetUniqueKey): 

141 return f"Asset(name={key.name!r}, uri={key.uri!r})" 

142 if isinstance(key, AssetNameRef): 

143 return f"Asset.ref(name={key.name!r})" 

144 if isinstance(key, AssetUriRef): 

145 return f"Asset.ref(uri={key.uri!r})" 

146 return repr(key) # Should not happen, but let's fails more gracefully in an exception. 

147 

148 def __str__(self) -> str: 

149 return f"{self.main_message}: {self.inactive_assets_message}" 

150 

151 @property 

152 def inactive_assets_message(self) -> str: 

153 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys) 

154 

155 

156class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption): 

157 """Raise when the task is executed with inactive assets in its inlet or outlet.""" 

158 

159 main_message = "Task has the following inactive assets in its inlets or outlets" 

160 

161 

162class AirflowRescheduleException(AirflowException): 

163 """ 

164 Raise when the task should be re-scheduled at a later time. 

165 

166 :param reschedule_date: The date when the task should be rescheduled 

167 """ 

168 

169 def __init__(self, reschedule_date): 

170 super().__init__() 

171 self.reschedule_date = reschedule_date 

172 

173 def serialize(self): 

174 cls = self.__class__ 

175 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

176 

177 

178class AirflowSensorTimeout(AirflowException): 

179 """Raise when there is a timeout on sensor polling.""" 

180 

181 

182class AirflowSkipException(AirflowException): 

183 """Raise when the task should be skipped.""" 

184 

185 

186class AirflowTaskTerminated(BaseException): 

187 """Raise when the task execution is terminated.""" 

188 

189 

190# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

191# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

192# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

193class AirflowTaskTimeout(BaseException): 

194 """Raise when the task execution times-out.""" 

195 

196 

197class TaskDeferred(BaseException): 

198 """ 

199 Signal an operator moving to deferred state. 

200 

201 Special exception raised to signal that the operator it was raised from 

202 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance 

203 directly. If the trigger should end the task instance itself, ``method_name`` does not matter, 

204 and can be None; otherwise, provide the name of the method that should be used when 

205 resuming execution in the task. 

206 """ 

207 

208 def __init__( 

209 self, 

210 *, 

211 trigger, 

212 method_name: str, 

213 kwargs: dict[str, Any] | None = None, 

214 timeout=None, 

215 ): 

216 super().__init__() 

217 self.trigger = trigger 

218 self.method_name = method_name 

219 self.kwargs = kwargs 

220 self.timeout = timeout 

221 

222 def serialize(self): 

223 cls = self.__class__ 

224 return ( 

225 f"{cls.__module__}.{cls.__name__}", 

226 (), 

227 { 

228 "trigger": self.trigger, 

229 "method_name": self.method_name, 

230 "kwargs": self.kwargs, 

231 "timeout": self.timeout, 

232 }, 

233 ) 

234 

235 def __repr__(self) -> str: 

236 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

237 

238 

239class TaskAwaitingInput(BaseException): 

240 """ 

241 Signal an operator parking the task in awaiting_input state (Human-in-the-loop). 

242 

243 Raised to signal that the operator wishes to pause until external human input arrives, 

244 WITHOUT creating a trigger or involving the triggerer. Resumption is driven by the Core API 

245 response handler (or the scheduler timeout sweep) flipping the task instance back to SCHEDULED 

246 with ``next_method`` / ``next_kwargs`` intact, after which the worker calls 

247 ``resume_execution(method_name, kwargs)``. 

248 

249 Subclasses ``BaseException`` (like ``TaskDeferred``) so that a user ``except Exception`` in 

250 ``execute()`` cannot accidentally swallow the park signal. 

251 """ 

252 

253 def __init__( 

254 self, 

255 *, 

256 method_name: str, 

257 kwargs: dict[str, Any] | None = None, 

258 timeout=None, 

259 ): 

260 super().__init__() 

261 self.method_name = method_name 

262 self.kwargs = kwargs 

263 self.timeout = timeout 

264 

265 def serialize(self): 

266 cls = self.__class__ 

267 return ( 

268 f"{cls.__module__}.{cls.__name__}", 

269 (), 

270 { 

271 "method_name": self.method_name, 

272 "kwargs": self.kwargs, 

273 "timeout": self.timeout, 

274 }, 

275 ) 

276 

277 def __repr__(self) -> str: 

278 return f"<TaskAwaitingInput method={self.method_name}>" 

279 

280 

281class TaskDeferralError(AirflowException): 

282 """Raised when a task failed during deferral for some reason.""" 

283 

284 

285class TaskDeferralTimeout(AirflowException): 

286 """Raise when there is a timeout on the deferral.""" 

287 

288 

289class DagRunTriggerException(AirflowException): 

290 """ 

291 Signal by an operator to trigger a specific Dag Run of a dag. 

292 

293 Special exception raised to signal that the operator it was raised from wishes to trigger 

294 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``. 

295 """ 

296 

297 def __init__( 

298 self, 

299 *, 

300 trigger_dag_id: str, 

301 dag_run_id: str, 

302 conf: dict | None, 

303 logical_date=None, 

304 run_after=None, 

305 reset_dag_run: bool, 

306 skip_when_already_exists: bool, 

307 wait_for_completion: bool, 

308 allowed_states: list[str], 

309 failed_states: list[str], 

310 poke_interval: int, 

311 deferrable: bool, 

312 note: str | None = None, 

313 ): 

314 super().__init__() 

315 self.trigger_dag_id = trigger_dag_id 

316 self.dag_run_id = dag_run_id 

317 self.conf = conf 

318 self.logical_date = logical_date 

319 self.run_after = run_after 

320 self.reset_dag_run = reset_dag_run 

321 self.skip_when_already_exists = skip_when_already_exists 

322 self.wait_for_completion = wait_for_completion 

323 self.allowed_states = allowed_states 

324 self.failed_states = failed_states 

325 self.poke_interval = poke_interval 

326 self.deferrable = deferrable 

327 self.note = note 

328 

329 

330class DownstreamTasksSkipped(AirflowException): 

331 """ 

332 Signal by an operator to skip its downstream tasks. 

333 

334 Special exception raised to signal that the operator it was raised from wishes to skip 

335 downstream tasks. This is used in the ShortCircuitOperator. 

336 

337 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip. 

338 """ 

339 

340 def __init__(self, *, tasks): 

341 super().__init__() 

342 self.tasks = tasks 

343 

344 

345class XComNotFound(AirflowException): 

346 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

347 

348 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

349 super().__init__() 

350 self.dag_id = dag_id 

351 self.task_id = task_id 

352 self.key = key 

353 

354 def __str__(self) -> str: 

355 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

356 

357 def serialize(self): 

358 cls = self.__class__ 

359 return ( 

360 f"{cls.__module__}.{cls.__name__}", 

361 (), 

362 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key}, 

363 ) 

364 

365 

366class ParamValidationError(AirflowException, ValueError): 

367 """Raise when DAG params is invalid.""" 

368 

369 

370class DuplicateTaskIdFound(AirflowException): 

371 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

372 

373 

374class TaskAlreadyInTaskGroup(AirflowException): 

375 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

376 

377 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str): 

378 super().__init__(task_id, new_group_id) 

379 self.task_id = task_id 

380 self.existing_group_id = existing_group_id 

381 self.new_group_id = new_group_id 

382 

383 def __str__(self) -> str: 

384 if self.existing_group_id is None: 

385 existing_group = "the DAG's root group" 

386 else: 

387 existing_group = f"group {self.existing_group_id!r}" 

388 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

389 

390 

391class TaskNotFound(AirflowException): 

392 """Raise when a Task is not available in the system.""" 

393 

394 

395class NodeNotFound(TaskNotFound, KeyError): 

396 """Raise when attempting to access an invalid node (task or task group) using [] notation.""" 

397 

398 def __str__(self) -> str: 

399 return str(self.args[0]) if self.args else "" 

400 

401 

402class TaskAlreadyRunningError(AirflowException): 

403 """Raised when a task is already running on another worker.""" 

404 

405 

406class FailFastDagInvalidTriggerRule(AirflowException): 

407 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" 

408 

409 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

410 

411 @classmethod 

412 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): 

413 """ 

414 Check that fail_fast dag tasks have allowable trigger rules. 

415 

416 :meta private: 

417 """ 

418 if fail_fast and trigger_rule not in cls._allowed_rules: 

419 raise cls() 

420 

421 def __str__(self) -> str: 

422 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

423 

424 

425class RemovedInAirflow4Warning(DeprecationWarning): 

426 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

427 

428 deprecated_since: str | None = None 

429 "Indicates the airflow version that started raising this deprecation warning"