Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/exceptions.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import enum 

21from http import HTTPStatus 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow.sdk import TriggerRule 

25 

26# Re exporting AirflowConfigException from shared configuration 

27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Collection 

31 

32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

33 from airflow.sdk.execution_time.comms import ErrorResponse 

34 

35 

36class AirflowException(Exception): 

37 """ 

38 Base class for all Airflow's errors. 

39 

40 Each custom exception should be derived from this class. 

41 """ 

42 

43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

44 

45 def serialize(self): 

46 cls = self.__class__ 

47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {} 

48 

49 

50class AirflowOptionalProviderFeatureException(AirflowException): 

51 """Raise by providers when imports are missing for optional provider features.""" 

52 

53 

54class AirflowNotFoundException(AirflowException): 

55 """Raise when the requested object/resource is not available in the system.""" 

56 

57 status_code = HTTPStatus.NOT_FOUND 

58 

59 

60class AirflowDagCycleException(AirflowException): 

61 """Raise when there is a cycle in Dag definition.""" 

62 

63 

64class AirflowRuntimeError(Exception): 

65 """Generic Airflow error raised by runtime functions.""" 

66 

67 def __init__(self, error: ErrorResponse): 

68 self.error = error 

69 super().__init__(f"{error.error.value}: {error.detail}") 

70 

71 

72class AirflowTimetableInvalid(AirflowException): 

73 """Raise when a DAG has an invalid timetable.""" 

74 

75 

76class ErrorType(enum.Enum): 

77 """Error types used in the API client.""" 

78 

79 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND" 

80 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND" 

81 XCOM_NOT_FOUND = "XCOM_NOT_FOUND" 

82 ASSET_NOT_FOUND = "ASSET_NOT_FOUND" 

83 TASK_STATE_NOT_FOUND = "TASK_STATE_NOT_FOUND" 

84 ASSET_STATE_NOT_FOUND = "ASSET_STATE_NOT_FOUND" 

85 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS" 

86 GENERIC_ERROR = "GENERIC_ERROR" 

87 API_SERVER_ERROR = "API_SERVER_ERROR" 

88 

89 

90class XComForMappingNotPushed(TypeError): 

91 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

92 

93 def __str__(self) -> str: 

94 return "did not push XCom for task mapping" 

95 

96 

97class UnmappableXComTypePushed(TypeError): 

98 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

99 

100 def __init__(self, value: Any, *values: Any) -> None: 

101 super().__init__(value, *values) 

102 

103 def __str__(self) -> str: 

104 typename = type(self.args[0]).__qualname__ 

105 for arg in self.args[1:]: 

106 typename = f"{typename}[{type(arg).__qualname__}]" 

107 return f"unmappable return type {typename!r}" 

108 

109 

110class AirflowFailException(AirflowException): 

111 """Raise when the task should be failed without retrying.""" 

112 

113 

114class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException): 

115 main_message: str 

116 

117 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None: 

118 self.inactive_asset_keys = inactive_asset_keys 

119 

120 @staticmethod 

121 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str: 

122 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

123 

124 if isinstance(key, AssetUniqueKey): 

125 return f"Asset(name={key.name!r}, uri={key.uri!r})" 

126 if isinstance(key, AssetNameRef): 

127 return f"Asset.ref(name={key.name!r})" 

128 if isinstance(key, AssetUriRef): 

129 return f"Asset.ref(uri={key.uri!r})" 

130 return repr(key) # Should not happen, but let's fails more gracefully in an exception. 

131 

132 def __str__(self) -> str: 

133 return f"{self.main_message}: {self.inactive_assets_message}" 

134 

135 @property 

136 def inactive_assets_message(self) -> str: 

137 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys) 

138 

139 

140class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption): 

141 """Raise when the task is executed with inactive assets in its inlet or outlet.""" 

142 

143 main_message = "Task has the following inactive assets in its inlets or outlets" 

144 

145 

146class AirflowRescheduleException(AirflowException): 

147 """ 

148 Raise when the task should be re-scheduled at a later time. 

149 

150 :param reschedule_date: The date when the task should be rescheduled 

151 """ 

152 

153 def __init__(self, reschedule_date): 

154 super().__init__() 

155 self.reschedule_date = reschedule_date 

156 

157 def serialize(self): 

158 cls = self.__class__ 

159 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

160 

161 

162class AirflowSensorTimeout(AirflowException): 

163 """Raise when there is a timeout on sensor polling.""" 

164 

165 

166class AirflowSkipException(AirflowException): 

167 """Raise when the task should be skipped.""" 

168 

169 

170class AirflowTaskTerminated(BaseException): 

171 """Raise when the task execution is terminated.""" 

172 

173 

174# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

175# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

176# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

177class AirflowTaskTimeout(BaseException): 

178 """Raise when the task execution times-out.""" 

179 

180 

181class TaskDeferred(BaseException): 

182 """ 

183 Signal an operator moving to deferred state. 

184 

185 Special exception raised to signal that the operator it was raised from 

186 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance 

187 directly. If the trigger should end the task instance itself, ``method_name`` does not matter, 

188 and can be None; otherwise, provide the name of the method that should be used when 

189 resuming execution in the task. 

190 """ 

191 

192 def __init__( 

193 self, 

194 *, 

195 trigger, 

196 method_name: str, 

197 kwargs: dict[str, Any] | None = None, 

198 timeout=None, 

199 ): 

200 super().__init__() 

201 self.trigger = trigger 

202 self.method_name = method_name 

203 self.kwargs = kwargs 

204 self.timeout = timeout 

205 

206 def serialize(self): 

207 cls = self.__class__ 

208 return ( 

209 f"{cls.__module__}.{cls.__name__}", 

210 (), 

211 { 

212 "trigger": self.trigger, 

213 "method_name": self.method_name, 

214 "kwargs": self.kwargs, 

215 "timeout": self.timeout, 

216 }, 

217 ) 

218 

219 def __repr__(self) -> str: 

220 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

221 

222 

223class TaskDeferralError(AirflowException): 

224 """Raised when a task failed during deferral for some reason.""" 

225 

226 

227class TaskDeferralTimeout(AirflowException): 

228 """Raise when there is a timeout on the deferral.""" 

229 

230 

231class DagRunTriggerException(AirflowException): 

232 """ 

233 Signal by an operator to trigger a specific Dag Run of a dag. 

234 

235 Special exception raised to signal that the operator it was raised from wishes to trigger 

236 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``. 

237 """ 

238 

239 def __init__( 

240 self, 

241 *, 

242 trigger_dag_id: str, 

243 dag_run_id: str, 

244 conf: dict | None, 

245 logical_date=None, 

246 run_after=None, 

247 reset_dag_run: bool, 

248 skip_when_already_exists: bool, 

249 wait_for_completion: bool, 

250 allowed_states: list[str], 

251 failed_states: list[str], 

252 poke_interval: int, 

253 deferrable: bool, 

254 note: str | None = None, 

255 ): 

256 super().__init__() 

257 self.trigger_dag_id = trigger_dag_id 

258 self.dag_run_id = dag_run_id 

259 self.conf = conf 

260 self.logical_date = logical_date 

261 self.run_after = run_after 

262 self.reset_dag_run = reset_dag_run 

263 self.skip_when_already_exists = skip_when_already_exists 

264 self.wait_for_completion = wait_for_completion 

265 self.allowed_states = allowed_states 

266 self.failed_states = failed_states 

267 self.poke_interval = poke_interval 

268 self.deferrable = deferrable 

269 self.note = note 

270 

271 

272class DownstreamTasksSkipped(AirflowException): 

273 """ 

274 Signal by an operator to skip its downstream tasks. 

275 

276 Special exception raised to signal that the operator it was raised from wishes to skip 

277 downstream tasks. This is used in the ShortCircuitOperator. 

278 

279 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip. 

280 """ 

281 

282 def __init__(self, *, tasks): 

283 super().__init__() 

284 self.tasks = tasks 

285 

286 

287class XComNotFound(AirflowException): 

288 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

289 

290 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

291 super().__init__() 

292 self.dag_id = dag_id 

293 self.task_id = task_id 

294 self.key = key 

295 

296 def __str__(self) -> str: 

297 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

298 

299 def serialize(self): 

300 cls = self.__class__ 

301 return ( 

302 f"{cls.__module__}.{cls.__name__}", 

303 (), 

304 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key}, 

305 ) 

306 

307 

308class ParamValidationError(AirflowException, ValueError): 

309 """Raise when DAG params is invalid.""" 

310 

311 

312class DuplicateTaskIdFound(AirflowException): 

313 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

314 

315 

316class TaskAlreadyInTaskGroup(AirflowException): 

317 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

318 

319 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str): 

320 super().__init__(task_id, new_group_id) 

321 self.task_id = task_id 

322 self.existing_group_id = existing_group_id 

323 self.new_group_id = new_group_id 

324 

325 def __str__(self) -> str: 

326 if self.existing_group_id is None: 

327 existing_group = "the DAG's root group" 

328 else: 

329 existing_group = f"group {self.existing_group_id!r}" 

330 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

331 

332 

333class TaskNotFound(AirflowException): 

334 """Raise when a Task is not available in the system.""" 

335 

336 

337class NodeNotFound(TaskNotFound, KeyError): 

338 """Raise when attempting to access an invalid node (task or task group) using [] notation.""" 

339 

340 def __str__(self) -> str: 

341 return str(self.args[0]) if self.args else "" 

342 

343 

344class TaskAlreadyRunningError(AirflowException): 

345 """Raised when a task is already running on another worker.""" 

346 

347 

348class FailFastDagInvalidTriggerRule(AirflowException): 

349 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" 

350 

351 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

352 

353 @classmethod 

354 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): 

355 """ 

356 Check that fail_fast dag tasks have allowable trigger rules. 

357 

358 :meta private: 

359 """ 

360 if fail_fast and trigger_rule not in cls._allowed_rules: 

361 raise cls() 

362 

363 def __str__(self) -> str: 

364 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

365 

366 

367class RemovedInAirflow4Warning(DeprecationWarning): 

368 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

369 

370 deprecated_since: str | None = None 

371 "Indicates the airflow version that started raising this deprecation warning"