Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/exceptions.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import enum 

21from http import HTTPStatus 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow.sdk import TriggerRule 

25 

26# Re exporting AirflowConfigException from shared configuration 

27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Collection 

31 

32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

33 from airflow.sdk.execution_time.comms import ErrorResponse 

34 

35 

36class AirflowException(Exception): 

37 """ 

38 Base class for all Airflow's errors. 

39 

40 Each custom exception should be derived from this class. 

41 """ 

42 

43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

44 

45 def serialize(self): 

46 cls = self.__class__ 

47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {} 

48 

49 

50class AirflowNotFoundException(AirflowException): 

51 """Raise when the requested object/resource is not available in the system.""" 

52 

53 status_code = HTTPStatus.NOT_FOUND 

54 

55 

56class AirflowDagCycleException(AirflowException): 

57 """Raise when there is a cycle in Dag definition.""" 

58 

59 

60class AirflowRuntimeError(Exception): 

61 """Generic Airflow error raised by runtime functions.""" 

62 

63 def __init__(self, error: ErrorResponse): 

64 self.error = error 

65 super().__init__(f"{error.error.value}: {error.detail}") 

66 

67 

68class AirflowTimetableInvalid(AirflowException): 

69 """Raise when a DAG has an invalid timetable.""" 

70 

71 

72class ErrorType(enum.Enum): 

73 """Error types used in the API client.""" 

74 

75 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND" 

76 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND" 

77 XCOM_NOT_FOUND = "XCOM_NOT_FOUND" 

78 ASSET_NOT_FOUND = "ASSET_NOT_FOUND" 

79 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS" 

80 GENERIC_ERROR = "GENERIC_ERROR" 

81 API_SERVER_ERROR = "API_SERVER_ERROR" 

82 

83 

84class XComForMappingNotPushed(TypeError): 

85 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

86 

87 def __str__(self) -> str: 

88 return "did not push XCom for task mapping" 

89 

90 

91class UnmappableXComTypePushed(TypeError): 

92 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

93 

94 def __init__(self, value: Any, *values: Any) -> None: 

95 super().__init__(value, *values) 

96 

97 def __str__(self) -> str: 

98 typename = type(self.args[0]).__qualname__ 

99 for arg in self.args[1:]: 

100 typename = f"{typename}[{type(arg).__qualname__}]" 

101 return f"unmappable return type {typename!r}" 

102 

103 

104class AirflowFailException(AirflowException): 

105 """Raise when the task should be failed without retrying.""" 

106 

107 

108class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException): 

109 main_message: str 

110 

111 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None: 

112 self.inactive_asset_keys = inactive_asset_keys 

113 

114 @staticmethod 

115 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str: 

116 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

117 

118 if isinstance(key, AssetUniqueKey): 

119 return f"Asset(name={key.name!r}, uri={key.uri!r})" 

120 if isinstance(key, AssetNameRef): 

121 return f"Asset.ref(name={key.name!r})" 

122 if isinstance(key, AssetUriRef): 

123 return f"Asset.ref(uri={key.uri!r})" 

124 return repr(key) # Should not happen, but let's fails more gracefully in an exception. 

125 

126 def __str__(self) -> str: 

127 return f"{self.main_message}: {self.inactive_assets_message}" 

128 

129 @property 

130 def inactive_assets_message(self) -> str: 

131 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys) 

132 

133 

134class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption): 

135 """Raise when the task is executed with inactive assets in its inlet or outlet.""" 

136 

137 main_message = "Task has the following inactive assets in its inlets or outlets" 

138 

139 

140class AirflowRescheduleException(AirflowException): 

141 """ 

142 Raise when the task should be re-scheduled at a later time. 

143 

144 :param reschedule_date: The date when the task should be rescheduled 

145 """ 

146 

147 def __init__(self, reschedule_date): 

148 super().__init__() 

149 self.reschedule_date = reschedule_date 

150 

151 def serialize(self): 

152 cls = self.__class__ 

153 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

154 

155 

156class AirflowSensorTimeout(AirflowException): 

157 """Raise when there is a timeout on sensor polling.""" 

158 

159 

160class AirflowSkipException(AirflowException): 

161 """Raise when the task should be skipped.""" 

162 

163 

164class AirflowTaskTerminated(BaseException): 

165 """Raise when the task execution is terminated.""" 

166 

167 

168# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

169# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

170# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

171class AirflowTaskTimeout(BaseException): 

172 """Raise when the task execution times-out.""" 

173 

174 

175class TaskDeferred(BaseException): 

176 """ 

177 Signal an operator moving to deferred state. 

178 

179 Special exception raised to signal that the operator it was raised from 

180 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance 

181 directly. If the trigger should end the task instance itself, ``method_name`` does not matter, 

182 and can be None; otherwise, provide the name of the method that should be used when 

183 resuming execution in the task. 

184 """ 

185 

186 def __init__( 

187 self, 

188 *, 

189 trigger, 

190 method_name: str, 

191 kwargs: dict[str, Any] | None = None, 

192 timeout=None, 

193 ): 

194 super().__init__() 

195 self.trigger = trigger 

196 self.method_name = method_name 

197 self.kwargs = kwargs 

198 self.timeout = timeout 

199 

200 def serialize(self): 

201 cls = self.__class__ 

202 return ( 

203 f"{cls.__module__}.{cls.__name__}", 

204 (), 

205 { 

206 "trigger": self.trigger, 

207 "method_name": self.method_name, 

208 "kwargs": self.kwargs, 

209 "timeout": self.timeout, 

210 }, 

211 ) 

212 

213 def __repr__(self) -> str: 

214 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

215 

216 

217class TaskDeferralError(AirflowException): 

218 """Raised when a task failed during deferral for some reason.""" 

219 

220 

221class TaskDeferralTimeout(AirflowException): 

222 """Raise when there is a timeout on the deferral.""" 

223 

224 

225class DagRunTriggerException(AirflowException): 

226 """ 

227 Signal by an operator to trigger a specific Dag Run of a dag. 

228 

229 Special exception raised to signal that the operator it was raised from wishes to trigger 

230 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``. 

231 """ 

232 

233 def __init__( 

234 self, 

235 *, 

236 trigger_dag_id: str, 

237 dag_run_id: str, 

238 conf: dict | None, 

239 logical_date=None, 

240 reset_dag_run: bool, 

241 skip_when_already_exists: bool, 

242 wait_for_completion: bool, 

243 allowed_states: list[str], 

244 failed_states: list[str], 

245 poke_interval: int, 

246 deferrable: bool, 

247 ): 

248 super().__init__() 

249 self.trigger_dag_id = trigger_dag_id 

250 self.dag_run_id = dag_run_id 

251 self.conf = conf 

252 self.logical_date = logical_date 

253 self.reset_dag_run = reset_dag_run 

254 self.skip_when_already_exists = skip_when_already_exists 

255 self.wait_for_completion = wait_for_completion 

256 self.allowed_states = allowed_states 

257 self.failed_states = failed_states 

258 self.poke_interval = poke_interval 

259 self.deferrable = deferrable 

260 

261 

262class DownstreamTasksSkipped(AirflowException): 

263 """ 

264 Signal by an operator to skip its downstream tasks. 

265 

266 Special exception raised to signal that the operator it was raised from wishes to skip 

267 downstream tasks. This is used in the ShortCircuitOperator. 

268 

269 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip. 

270 """ 

271 

272 def __init__(self, *, tasks): 

273 super().__init__() 

274 self.tasks = tasks 

275 

276 

277class XComNotFound(AirflowException): 

278 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

279 

280 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

281 super().__init__() 

282 self.dag_id = dag_id 

283 self.task_id = task_id 

284 self.key = key 

285 

286 def __str__(self) -> str: 

287 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

288 

289 def serialize(self): 

290 cls = self.__class__ 

291 return ( 

292 f"{cls.__module__}.{cls.__name__}", 

293 (), 

294 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key}, 

295 ) 

296 

297 

298class ParamValidationError(AirflowException): 

299 """Raise when DAG params is invalid.""" 

300 

301 

302class DuplicateTaskIdFound(AirflowException): 

303 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

304 

305 

306class TaskAlreadyInTaskGroup(AirflowException): 

307 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

308 

309 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str): 

310 super().__init__(task_id, new_group_id) 

311 self.task_id = task_id 

312 self.existing_group_id = existing_group_id 

313 self.new_group_id = new_group_id 

314 

315 def __str__(self) -> str: 

316 if self.existing_group_id is None: 

317 existing_group = "the DAG's root group" 

318 else: 

319 existing_group = f"group {self.existing_group_id!r}" 

320 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

321 

322 

323class TaskNotFound(AirflowException): 

324 """Raise when a Task is not available in the system.""" 

325 

326 

327class FailFastDagInvalidTriggerRule(AirflowException): 

328 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" 

329 

330 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

331 

332 @classmethod 

333 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): 

334 """ 

335 Check that fail_fast dag tasks have allowable trigger rules. 

336 

337 :meta private: 

338 """ 

339 if fail_fast and trigger_rule not in cls._allowed_rules: 

340 raise cls() 

341 

342 def __str__(self) -> str: 

343 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

344 

345 

346class RemovedInAirflow4Warning(DeprecationWarning): 

347 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

348 

349 deprecated_since: str | None = None 

350 "Indicates the airflow version that started raising this deprecation warning"