Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/exceptions.py: 59%

157 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21from __future__ import annotations 

22 

23import datetime 

24import warnings 

25from http import HTTPStatus 

26from typing import TYPE_CHECKING, Any, NamedTuple, Sized 

27 

28if TYPE_CHECKING: 

29 from airflow.models import DAG, DagRun 

30 

31 

32class AirflowException(Exception): 

33 """ 

34 Base class for all Airflow's errors. 

35 

36 Each custom exception should be derived from this class. 

37 """ 

38 

39 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

40 

41 

42class AirflowBadRequest(AirflowException): 

43 """Raise when the application or server cannot handle the request.""" 

44 

45 status_code = HTTPStatus.BAD_REQUEST 

46 

47 

48class AirflowNotFoundException(AirflowException): 

49 """Raise when the requested object/resource is not available in the system.""" 

50 

51 status_code = HTTPStatus.NOT_FOUND 

52 

53 

54class AirflowConfigException(AirflowException): 

55 """Raise when there is configuration problem.""" 

56 

57 

58class AirflowSensorTimeout(AirflowException): 

59 """Raise when there is a timeout on sensor polling.""" 

60 

61 

62class AirflowRescheduleException(AirflowException): 

63 """ 

64 Raise when the task should be re-scheduled at a later time. 

65 

66 :param reschedule_date: The date when the task should be rescheduled 

67 """ 

68 

69 def __init__(self, reschedule_date): 

70 super().__init__() 

71 self.reschedule_date = reschedule_date 

72 

73 

74class InvalidStatsNameException(AirflowException): 

75 """Raise when name of the stats is invalid.""" 

76 

77 

78class AirflowTaskTimeout(AirflowException): 

79 """Raise when the task execution times-out.""" 

80 

81 

82class AirflowWebServerTimeout(AirflowException): 

83 """Raise when the web server times out.""" 

84 

85 

86class AirflowSkipException(AirflowException): 

87 """Raise when the task should be skipped.""" 

88 

89 

90class AirflowFailException(AirflowException): 

91 """Raise when the task should be failed without retrying.""" 

92 

93 

94class AirflowOptionalProviderFeatureException(AirflowException): 

95 """Raise by providers when imports are missing for optional provider features.""" 

96 

97 

98class XComNotFound(AirflowException): 

99 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

100 

101 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

102 super().__init__() 

103 self.dag_id = dag_id 

104 self.task_id = task_id 

105 self.key = key 

106 

107 def __str__(self) -> str: 

108 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

109 

110 

111class UnmappableOperator(AirflowException): 

112 """Raise when an operator is not implemented to be mappable.""" 

113 

114 

115class XComForMappingNotPushed(AirflowException): 

116 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

117 

118 def __str__(self) -> str: 

119 return "did not push XCom for task mapping" 

120 

121 

122class UnmappableXComTypePushed(AirflowException): 

123 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

124 

125 def __init__(self, value: Any, *values: Any) -> None: 

126 super().__init__(value, *values) 

127 

128 def __str__(self) -> str: 

129 typename = type(self.args[0]).__qualname__ 

130 for arg in self.args[1:]: 

131 typename = f"{typename}[{type(arg).__qualname__}]" 

132 return f"unmappable return type {typename!r}" 

133 

134 

135class UnmappableXComLengthPushed(AirflowException): 

136 """Raise when the pushed value is too large to map as a downstream's dependency.""" 

137 

138 def __init__(self, value: Sized, max_length: int) -> None: 

139 super().__init__(value) 

140 self.value = value 

141 self.max_length = max_length 

142 

143 def __str__(self) -> str: 

144 return f"unmappable return value length: {len(self.value)} > {self.max_length}" 

145 

146 

147class AirflowDagCycleException(AirflowException): 

148 """Raise when there is a cycle in DAG definition.""" 

149 

150 

151class AirflowDagDuplicatedIdException(AirflowException): 

152 """Raise when a DAG's ID is already used by another DAG.""" 

153 

154 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

155 super().__init__(dag_id, incoming, existing) 

156 self.dag_id = dag_id 

157 self.incoming = incoming 

158 self.existing = existing 

159 

160 def __str__(self) -> str: 

161 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

162 

163 

164class AirflowDagInconsistent(AirflowException): 

165 """Raise when a DAG has inconsistent attributes.""" 

166 

167 

168class AirflowClusterPolicyViolation(AirflowException): 

169 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

170 

171 

172class AirflowClusterPolicyError(AirflowException): 

173 """Raise when there is an error except AirflowClusterPolicyViolation in Cluster Policy.""" 

174 

175 

176class AirflowTimetableInvalid(AirflowException): 

177 """Raise when a DAG has an invalid timetable.""" 

178 

179 

180class DagNotFound(AirflowNotFoundException): 

181 """Raise when a DAG is not available in the system.""" 

182 

183 

184class DagCodeNotFound(AirflowNotFoundException): 

185 """Raise when a DAG code is not available in the system.""" 

186 

187 

188class DagRunNotFound(AirflowNotFoundException): 

189 """Raise when a DAG Run is not available in the system.""" 

190 

191 

192class DagRunAlreadyExists(AirflowBadRequest): 

193 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

194 

195 def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None: 

196 super().__init__( 

197 f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}" 

198 ) 

199 self.dag_run = dag_run 

200 

201 

202class DagFileExists(AirflowBadRequest): 

203 """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder.""" 

204 

205 def __init__(self, *args, **kwargs): 

206 super().__init__(*args, **kwargs) 

207 warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2) 

208 

209 

210class DagInvalidTriggerRule(AirflowException): 

211 """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule.""" 

212 

213 @classmethod 

214 def check(cls, dag: DAG | None, trigger_rule: str): 

215 from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE 

216 

217 if dag is not None and dag.fail_stop and trigger_rule != DEFAULT_TRIGGER_RULE: 

218 raise cls() 

219 

220 def __str__(self) -> str: 

221 from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE 

222 

223 return f"A 'fail-stop' dag can only have {DEFAULT_TRIGGER_RULE} trigger rule" 

224 

225 

226class DuplicateTaskIdFound(AirflowException): 

227 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

228 

229 

230class TaskAlreadyInTaskGroup(AirflowException): 

231 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

232 

233 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str) -> None: 

234 super().__init__(task_id, new_group_id) 

235 self.task_id = task_id 

236 self.existing_group_id = existing_group_id 

237 self.new_group_id = new_group_id 

238 

239 def __str__(self) -> str: 

240 if self.existing_group_id is None: 

241 existing_group = "the DAG's root group" 

242 else: 

243 existing_group = f"group {self.existing_group_id!r}" 

244 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

245 

246 

247class SerializationError(AirflowException): 

248 """A problem occurred when trying to serialize something.""" 

249 

250 

251class ParamValidationError(AirflowException): 

252 """Raise when DAG params is invalid.""" 

253 

254 

255class TaskNotFound(AirflowNotFoundException): 

256 """Raise when a Task is not available in the system.""" 

257 

258 

259class TaskInstanceNotFound(AirflowNotFoundException): 

260 """Raise when a task instance is not available in the system.""" 

261 

262 

263class PoolNotFound(AirflowNotFoundException): 

264 """Raise when a Pool is not available in the system.""" 

265 

266 

267class NoAvailablePoolSlot(AirflowException): 

268 """Raise when there is not enough slots in pool.""" 

269 

270 

271class DagConcurrencyLimitReached(AirflowException): 

272 """Raise when DAG max_active_tasks limit is reached.""" 

273 

274 

275class TaskConcurrencyLimitReached(AirflowException): 

276 """Raise when task max_active_tasks limit is reached.""" 

277 

278 

279class BackfillUnfinished(AirflowException): 

280 """ 

281 Raises when not all tasks succeed in backfill. 

282 

283 :param message: The human-readable description of the exception 

284 :param ti_status: The information about all task statuses 

285 """ 

286 

287 def __init__(self, message, ti_status): 

288 super().__init__(message) 

289 self.ti_status = ti_status 

290 

291 

292class FileSyntaxError(NamedTuple): 

293 """Information about a single error in a file.""" 

294 

295 line_no: int | None 

296 message: str 

297 

298 def __str__(self): 

299 return f"{self.message}. Line number: s{str(self.line_no)}," 

300 

301 

302class AirflowFileParseException(AirflowException): 

303 """ 

304 Raises when connection or variable file can not be parsed. 

305 

306 :param msg: The human-readable description of the exception 

307 :param file_path: A processed file that contains errors 

308 :param parse_errors: File syntax errors 

309 """ 

310 

311 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

312 super().__init__(msg) 

313 self.msg = msg 

314 self.file_path = file_path 

315 self.parse_errors = parse_errors 

316 

317 def __str__(self): 

318 from airflow.utils.code_utils import prepare_code_snippet 

319 from airflow.utils.platform import is_tty 

320 

321 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

322 

323 for error_no, parse_error in enumerate(self.parse_errors, 1): 

324 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

325 result += f"{parse_error.message}\n" 

326 if parse_error.line_no: 

327 result += f"Line number: {parse_error.line_no}\n" 

328 if parse_error.line_no and is_tty(): 

329 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

330 

331 return result 

332 

333 

334class ConnectionNotUnique(AirflowException): 

335 """Raise when multiple values are found for the same connection ID.""" 

336 

337 

338class TaskDeferred(BaseException): 

339 """ 

340 Signal an operator moving to deferred state. 

341 

342 Special exception raised to signal that the operator it was raised from 

343 wishes to defer until a trigger fires. 

344 """ 

345 

346 def __init__( 

347 self, 

348 *, 

349 trigger, 

350 method_name: str, 

351 kwargs: dict[str, Any] | None = None, 

352 timeout: datetime.timedelta | None = None, 

353 ): 

354 super().__init__() 

355 self.trigger = trigger 

356 self.method_name = method_name 

357 self.kwargs = kwargs 

358 self.timeout = timeout 

359 # Check timeout type at runtime 

360 if self.timeout is not None and not hasattr(self.timeout, "total_seconds"): 

361 raise ValueError("Timeout value must be a timedelta") 

362 

363 def __repr__(self) -> str: 

364 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

365 

366 

367class TaskDeferralError(AirflowException): 

368 """Raised when a task failed during deferral for some reason.""" 

369 

370 

371class PodMutationHookException(AirflowException): 

372 """Raised when exception happens during Pod Mutation Hook execution.""" 

373 

374 

375class PodReconciliationError(AirflowException): 

376 """Raised when an error is encountered while trying to merge pod configs.""" 

377 

378 

379class RemovedInAirflow3Warning(DeprecationWarning): 

380 """Issued for usage of deprecated features that will be removed in Airflow3.""" 

381 

382 deprecated_since: str | None = None 

383 "Indicates the airflow version that started raising this deprecation warning" 

384 

385 

386class AirflowProviderDeprecationWarning(DeprecationWarning): 

387 """Issued for usage of deprecated features of Airflow provider.""" 

388 

389 deprecated_provider_since: str | None = None 

390 "Indicates the provider version that started raising this deprecation warning" 

391 

392 

393class DeserializingResultError(ValueError): 

394 """Raised when an error is encountered while a pickling library deserializes a pickle file.""" 

395 

396 def __str__(self): 

397 return ( 

398 "Error deserializing result. Note that result deserialization " 

399 "is not supported across major Python versions. Cause: " + str(self.__cause__) 

400 )