Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/exceptions.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import enum 

21from http import HTTPStatus 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow.sdk import TriggerRule 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Collection 

28 

29 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

30 from airflow.sdk.execution_time.comms import ErrorResponse 

31 

32 

33class AirflowException(Exception): 

34 """ 

35 Base class for all Airflow's errors. 

36 

37 Each custom exception should be derived from this class. 

38 """ 

39 

40 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

41 

42 def serialize(self): 

43 cls = self.__class__ 

44 return f"{cls.__module__}.{cls.__name__}", (str(self),), {} 

45 

46 

47class AirflowNotFoundException(AirflowException): 

48 """Raise when the requested object/resource is not available in the system.""" 

49 

50 status_code = HTTPStatus.NOT_FOUND 

51 

52 

53class AirflowDagCycleException(AirflowException): 

54 """Raise when there is a cycle in Dag definition.""" 

55 

56 

57class AirflowRuntimeError(Exception): 

58 """Generic Airflow error raised by runtime functions.""" 

59 

60 def __init__(self, error: ErrorResponse): 

61 self.error = error 

62 super().__init__(f"{error.error.value}: {error.detail}") 

63 

64 

65class AirflowTimetableInvalid(AirflowException): 

66 """Raise when a DAG has an invalid timetable.""" 

67 

68 

69class ErrorType(enum.Enum): 

70 """Error types used in the API client.""" 

71 

72 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND" 

73 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND" 

74 XCOM_NOT_FOUND = "XCOM_NOT_FOUND" 

75 ASSET_NOT_FOUND = "ASSET_NOT_FOUND" 

76 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS" 

77 GENERIC_ERROR = "GENERIC_ERROR" 

78 API_SERVER_ERROR = "API_SERVER_ERROR" 

79 

80 

81class XComForMappingNotPushed(TypeError): 

82 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

83 

84 def __str__(self) -> str: 

85 return "did not push XCom for task mapping" 

86 

87 

88class UnmappableXComTypePushed(TypeError): 

89 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

90 

91 def __init__(self, value: Any, *values: Any) -> None: 

92 super().__init__(value, *values) 

93 

94 def __str__(self) -> str: 

95 typename = type(self.args[0]).__qualname__ 

96 for arg in self.args[1:]: 

97 typename = f"{typename}[{type(arg).__qualname__}]" 

98 return f"unmappable return type {typename!r}" 

99 

100 

101class AirflowFailException(AirflowException): 

102 """Raise when the task should be failed without retrying.""" 

103 

104 

105class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException): 

106 main_message: str 

107 

108 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None: 

109 self.inactive_asset_keys = inactive_asset_keys 

110 

111 @staticmethod 

112 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str: 

113 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

114 

115 if isinstance(key, AssetUniqueKey): 

116 return f"Asset(name={key.name!r}, uri={key.uri!r})" 

117 if isinstance(key, AssetNameRef): 

118 return f"Asset.ref(name={key.name!r})" 

119 if isinstance(key, AssetUriRef): 

120 return f"Asset.ref(uri={key.uri!r})" 

121 return repr(key) # Should not happen, but let's fails more gracefully in an exception. 

122 

123 def __str__(self) -> str: 

124 return f"{self.main_message}: {self.inactive_assets_message}" 

125 

126 @property 

127 def inactive_assets_message(self) -> str: 

128 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys) 

129 

130 

131class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption): 

132 """Raise when the task is executed with inactive assets in its inlet or outlet.""" 

133 

134 main_message = "Task has the following inactive assets in its inlets or outlets" 

135 

136 

137class AirflowRescheduleException(AirflowException): 

138 """ 

139 Raise when the task should be re-scheduled at a later time. 

140 

141 :param reschedule_date: The date when the task should be rescheduled 

142 """ 

143 

144 def __init__(self, reschedule_date): 

145 super().__init__() 

146 self.reschedule_date = reschedule_date 

147 

148 def serialize(self): 

149 cls = self.__class__ 

150 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

151 

152 

153class AirflowSensorTimeout(AirflowException): 

154 """Raise when there is a timeout on sensor polling.""" 

155 

156 

157class AirflowSkipException(AirflowException): 

158 """Raise when the task should be skipped.""" 

159 

160 

161class AirflowTaskTerminated(BaseException): 

162 """Raise when the task execution is terminated.""" 

163 

164 

165# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

166# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

167# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

168class AirflowTaskTimeout(BaseException): 

169 """Raise when the task execution times-out.""" 

170 

171 

172class TaskDeferred(BaseException): 

173 """ 

174 Signal an operator moving to deferred state. 

175 

176 Special exception raised to signal that the operator it was raised from 

177 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance 

178 directly. If the trigger should end the task instance itself, ``method_name`` does not matter, 

179 and can be None; otherwise, provide the name of the method that should be used when 

180 resuming execution in the task. 

181 """ 

182 

183 def __init__( 

184 self, 

185 *, 

186 trigger, 

187 method_name: str, 

188 kwargs: dict[str, Any] | None = None, 

189 timeout=None, 

190 ): 

191 super().__init__() 

192 self.trigger = trigger 

193 self.method_name = method_name 

194 self.kwargs = kwargs 

195 self.timeout = timeout 

196 

197 def serialize(self): 

198 cls = self.__class__ 

199 return ( 

200 f"{cls.__module__}.{cls.__name__}", 

201 (), 

202 { 

203 "trigger": self.trigger, 

204 "method_name": self.method_name, 

205 "kwargs": self.kwargs, 

206 "timeout": self.timeout, 

207 }, 

208 ) 

209 

210 def __repr__(self) -> str: 

211 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

212 

213 

214class TaskDeferralError(AirflowException): 

215 """Raised when a task failed during deferral for some reason.""" 

216 

217 

218class TaskDeferralTimeout(AirflowException): 

219 """Raise when there is a timeout on the deferral.""" 

220 

221 

222class DagRunTriggerException(AirflowException): 

223 """ 

224 Signal by an operator to trigger a specific Dag Run of a dag. 

225 

226 Special exception raised to signal that the operator it was raised from wishes to trigger 

227 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``. 

228 """ 

229 

230 def __init__( 

231 self, 

232 *, 

233 trigger_dag_id: str, 

234 dag_run_id: str, 

235 conf: dict | None, 

236 logical_date=None, 

237 reset_dag_run: bool, 

238 skip_when_already_exists: bool, 

239 wait_for_completion: bool, 

240 allowed_states: list[str], 

241 failed_states: list[str], 

242 poke_interval: int, 

243 deferrable: bool, 

244 ): 

245 super().__init__() 

246 self.trigger_dag_id = trigger_dag_id 

247 self.dag_run_id = dag_run_id 

248 self.conf = conf 

249 self.logical_date = logical_date 

250 self.reset_dag_run = reset_dag_run 

251 self.skip_when_already_exists = skip_when_already_exists 

252 self.wait_for_completion = wait_for_completion 

253 self.allowed_states = allowed_states 

254 self.failed_states = failed_states 

255 self.poke_interval = poke_interval 

256 self.deferrable = deferrable 

257 

258 

259class DownstreamTasksSkipped(AirflowException): 

260 """ 

261 Signal by an operator to skip its downstream tasks. 

262 

263 Special exception raised to signal that the operator it was raised from wishes to skip 

264 downstream tasks. This is used in the ShortCircuitOperator. 

265 

266 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip. 

267 """ 

268 

269 def __init__(self, *, tasks): 

270 super().__init__() 

271 self.tasks = tasks 

272 

273 

274class XComNotFound(AirflowException): 

275 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

276 

277 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

278 super().__init__() 

279 self.dag_id = dag_id 

280 self.task_id = task_id 

281 self.key = key 

282 

283 def __str__(self) -> str: 

284 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

285 

286 def serialize(self): 

287 cls = self.__class__ 

288 return ( 

289 f"{cls.__module__}.{cls.__name__}", 

290 (), 

291 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key}, 

292 ) 

293 

294 

295class ParamValidationError(AirflowException): 

296 """Raise when DAG params is invalid.""" 

297 

298 

299class DuplicateTaskIdFound(AirflowException): 

300 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

301 

302 

303class TaskAlreadyInTaskGroup(AirflowException): 

304 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

305 

306 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str): 

307 super().__init__(task_id, new_group_id) 

308 self.task_id = task_id 

309 self.existing_group_id = existing_group_id 

310 self.new_group_id = new_group_id 

311 

312 def __str__(self) -> str: 

313 if self.existing_group_id is None: 

314 existing_group = "the DAG's root group" 

315 else: 

316 existing_group = f"group {self.existing_group_id!r}" 

317 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

318 

319 

320class TaskNotFound(AirflowException): 

321 """Raise when a Task is not available in the system.""" 

322 

323 

324class FailFastDagInvalidTriggerRule(AirflowException): 

325 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" 

326 

327 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

328 

329 @classmethod 

330 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): 

331 """ 

332 Check that fail_fast dag tasks have allowable trigger rules. 

333 

334 :meta private: 

335 """ 

336 if fail_fast and trigger_rule not in cls._allowed_rules: 

337 raise cls() 

338 

339 def __str__(self) -> str: 

340 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

341 

342 

343class RemovedInAirflow4Warning(DeprecationWarning): 

344 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

345 

346 deprecated_since: str | None = None 

347 "Indicates the airflow version that started raising this deprecation warning"