Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/exceptions.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import enum 

21from http import HTTPStatus 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow.sdk import TriggerRule 

25 

26# Re exporting AirflowConfigException from shared configuration 

27from airflow.sdk._shared.configuration.exceptions import AirflowConfigException as AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Collection 

31 

32 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

33 from airflow.sdk.execution_time.comms import ErrorResponse 

34 

35 

36class AirflowException(Exception): 

37 """ 

38 Base class for all Airflow's errors. 

39 

40 Each custom exception should be derived from this class. 

41 """ 

42 

43 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

44 

45 def serialize(self): 

46 cls = self.__class__ 

47 return f"{cls.__module__}.{cls.__name__}", (str(self),), {} 

48 

49 

50class AirflowOptionalProviderFeatureException(AirflowException): 

51 """Raise by providers when imports are missing for optional provider features.""" 

52 

53 

54class AirflowNotFoundException(AirflowException): 

55 """Raise when the requested object/resource is not available in the system.""" 

56 

57 status_code = HTTPStatus.NOT_FOUND 

58 

59 

60class AirflowDagCycleException(AirflowException): 

61 """Raise when there is a cycle in Dag definition.""" 

62 

63 

64class AirflowRuntimeError(Exception): 

65 """Generic Airflow error raised by runtime functions.""" 

66 

67 def __init__(self, error: ErrorResponse): 

68 self.error = error 

69 super().__init__(f"{error.error.value}: {error.detail}") 

70 

71 

72class AirflowTimetableInvalid(AirflowException): 

73 """Raise when a DAG has an invalid timetable.""" 

74 

75 

76class ErrorType(enum.Enum): 

77 """Error types used in the API client.""" 

78 

79 CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND" 

80 VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND" 

81 XCOM_NOT_FOUND = "XCOM_NOT_FOUND" 

82 ASSET_NOT_FOUND = "ASSET_NOT_FOUND" 

83 DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS" 

84 GENERIC_ERROR = "GENERIC_ERROR" 

85 API_SERVER_ERROR = "API_SERVER_ERROR" 

86 

87 

88class XComForMappingNotPushed(TypeError): 

89 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

90 

91 def __str__(self) -> str: 

92 return "did not push XCom for task mapping" 

93 

94 

95class UnmappableXComTypePushed(TypeError): 

96 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

97 

98 def __init__(self, value: Any, *values: Any) -> None: 

99 super().__init__(value, *values) 

100 

101 def __str__(self) -> str: 

102 typename = type(self.args[0]).__qualname__ 

103 for arg in self.args[1:]: 

104 typename = f"{typename}[{type(arg).__qualname__}]" 

105 return f"unmappable return type {typename!r}" 

106 

107 

108class AirflowFailException(AirflowException): 

109 """Raise when the task should be failed without retrying.""" 

110 

111 

112class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException): 

113 main_message: str 

114 

115 def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None: 

116 self.inactive_asset_keys = inactive_asset_keys 

117 

118 @staticmethod 

119 def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str: 

120 from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef 

121 

122 if isinstance(key, AssetUniqueKey): 

123 return f"Asset(name={key.name!r}, uri={key.uri!r})" 

124 if isinstance(key, AssetNameRef): 

125 return f"Asset.ref(name={key.name!r})" 

126 if isinstance(key, AssetUriRef): 

127 return f"Asset.ref(uri={key.uri!r})" 

128 return repr(key) # Should not happen, but let's fails more gracefully in an exception. 

129 

130 def __str__(self) -> str: 

131 return f"{self.main_message}: {self.inactive_assets_message}" 

132 

133 @property 

134 def inactive_assets_message(self) -> str: 

135 return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys) 

136 

137 

138class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption): 

139 """Raise when the task is executed with inactive assets in its inlet or outlet.""" 

140 

141 main_message = "Task has the following inactive assets in its inlets or outlets" 

142 

143 

144class AirflowRescheduleException(AirflowException): 

145 """ 

146 Raise when the task should be re-scheduled at a later time. 

147 

148 :param reschedule_date: The date when the task should be rescheduled 

149 """ 

150 

151 def __init__(self, reschedule_date): 

152 super().__init__() 

153 self.reschedule_date = reschedule_date 

154 

155 def serialize(self): 

156 cls = self.__class__ 

157 return f"{cls.__module__}.{cls.__name__}", (), {"reschedule_date": self.reschedule_date} 

158 

159 

160class AirflowSensorTimeout(AirflowException): 

161 """Raise when there is a timeout on sensor polling.""" 

162 

163 

164class AirflowSkipException(AirflowException): 

165 """Raise when the task should be skipped.""" 

166 

167 

168class AirflowTaskTerminated(BaseException): 

169 """Raise when the task execution is terminated.""" 

170 

171 

172# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used 

173# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat 

174# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt) 

175class AirflowTaskTimeout(BaseException): 

176 """Raise when the task execution times-out.""" 

177 

178 

179class TaskDeferred(BaseException): 

180 """ 

181 Signal an operator moving to deferred state. 

182 

183 Special exception raised to signal that the operator it was raised from 

184 wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance 

185 directly. If the trigger should end the task instance itself, ``method_name`` does not matter, 

186 and can be None; otherwise, provide the name of the method that should be used when 

187 resuming execution in the task. 

188 """ 

189 

190 def __init__( 

191 self, 

192 *, 

193 trigger, 

194 method_name: str, 

195 kwargs: dict[str, Any] | None = None, 

196 timeout=None, 

197 ): 

198 super().__init__() 

199 self.trigger = trigger 

200 self.method_name = method_name 

201 self.kwargs = kwargs 

202 self.timeout = timeout 

203 

204 def serialize(self): 

205 cls = self.__class__ 

206 return ( 

207 f"{cls.__module__}.{cls.__name__}", 

208 (), 

209 { 

210 "trigger": self.trigger, 

211 "method_name": self.method_name, 

212 "kwargs": self.kwargs, 

213 "timeout": self.timeout, 

214 }, 

215 ) 

216 

217 def __repr__(self) -> str: 

218 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

219 

220 

221class TaskDeferralError(AirflowException): 

222 """Raised when a task failed during deferral for some reason.""" 

223 

224 

225class TaskDeferralTimeout(AirflowException): 

226 """Raise when there is a timeout on the deferral.""" 

227 

228 

229class DagRunTriggerException(AirflowException): 

230 """ 

231 Signal by an operator to trigger a specific Dag Run of a dag. 

232 

233 Special exception raised to signal that the operator it was raised from wishes to trigger 

234 a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``. 

235 """ 

236 

237 def __init__( 

238 self, 

239 *, 

240 trigger_dag_id: str, 

241 dag_run_id: str, 

242 conf: dict | None, 

243 logical_date=None, 

244 reset_dag_run: bool, 

245 skip_when_already_exists: bool, 

246 wait_for_completion: bool, 

247 allowed_states: list[str], 

248 failed_states: list[str], 

249 poke_interval: int, 

250 deferrable: bool, 

251 note: str | None = None, 

252 ): 

253 super().__init__() 

254 self.trigger_dag_id = trigger_dag_id 

255 self.dag_run_id = dag_run_id 

256 self.conf = conf 

257 self.logical_date = logical_date 

258 self.reset_dag_run = reset_dag_run 

259 self.skip_when_already_exists = skip_when_already_exists 

260 self.wait_for_completion = wait_for_completion 

261 self.allowed_states = allowed_states 

262 self.failed_states = failed_states 

263 self.poke_interval = poke_interval 

264 self.deferrable = deferrable 

265 self.note = note 

266 

267 

268class DownstreamTasksSkipped(AirflowException): 

269 """ 

270 Signal by an operator to skip its downstream tasks. 

271 

272 Special exception raised to signal that the operator it was raised from wishes to skip 

273 downstream tasks. This is used in the ShortCircuitOperator. 

274 

275 :param tasks: List of task_ids to skip or a list of tuples with task_id and map_index to skip. 

276 """ 

277 

278 def __init__(self, *, tasks): 

279 super().__init__() 

280 self.tasks = tasks 

281 

282 

283class XComNotFound(AirflowException): 

284 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

285 

286 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

287 super().__init__() 

288 self.dag_id = dag_id 

289 self.task_id = task_id 

290 self.key = key 

291 

292 def __str__(self) -> str: 

293 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

294 

295 def serialize(self): 

296 cls = self.__class__ 

297 return ( 

298 f"{cls.__module__}.{cls.__name__}", 

299 (), 

300 {"dag_id": self.dag_id, "task_id": self.task_id, "key": self.key}, 

301 ) 

302 

303 

304class ParamValidationError(AirflowException): 

305 """Raise when DAG params is invalid.""" 

306 

307 

308class DuplicateTaskIdFound(AirflowException): 

309 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

310 

311 

312class TaskAlreadyInTaskGroup(AirflowException): 

313 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

314 

315 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str): 

316 super().__init__(task_id, new_group_id) 

317 self.task_id = task_id 

318 self.existing_group_id = existing_group_id 

319 self.new_group_id = new_group_id 

320 

321 def __str__(self) -> str: 

322 if self.existing_group_id is None: 

323 existing_group = "the DAG's root group" 

324 else: 

325 existing_group = f"group {self.existing_group_id!r}" 

326 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

327 

328 

329class TaskNotFound(AirflowException): 

330 """Raise when a Task is not available in the system.""" 

331 

332 

333class TaskAlreadyRunningError(AirflowException): 

334 """Raised when a task is already running on another worker.""" 

335 

336 

337class FailFastDagInvalidTriggerRule(AirflowException): 

338 """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger rule.""" 

339 

340 _allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS) 

341 

342 @classmethod 

343 def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule): 

344 """ 

345 Check that fail_fast dag tasks have allowable trigger rules. 

346 

347 :meta private: 

348 """ 

349 if fail_fast and trigger_rule not in cls._allowed_rules: 

350 raise cls() 

351 

352 def __str__(self) -> str: 

353 return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule" 

354 

355 

356class RemovedInAirflow4Warning(DeprecationWarning): 

357 """Issued for usage of deprecated features that will be removed in Airflow4.""" 

358 

359 deprecated_since: str | None = None 

360 "Indicates the airflow version that started raising this deprecation warning"