Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/exceptions.py: 58%

144 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18# Note: Any AirflowException raised is expected to cause the TaskInstance 

19# to be marked in an ERROR state 

20"""Exceptions used by Airflow.""" 

21from __future__ import annotations 

22 

23import datetime 

24import warnings 

25from http import HTTPStatus 

26from typing import TYPE_CHECKING, Any, NamedTuple, Sized 

27 

28if TYPE_CHECKING: 

29 from airflow.models import DagRun 

30 

31 

32class AirflowException(Exception): 

33 """ 

34 Base class for all Airflow's errors. 

35 

36 Each custom exception should be derived from this class. 

37 """ 

38 

39 status_code = HTTPStatus.INTERNAL_SERVER_ERROR 

40 

41 

42class AirflowBadRequest(AirflowException): 

43 """Raise when the application or server cannot handle the request.""" 

44 

45 status_code = HTTPStatus.BAD_REQUEST 

46 

47 

48class AirflowNotFoundException(AirflowException): 

49 """Raise when the requested object/resource is not available in the system.""" 

50 

51 status_code = HTTPStatus.NOT_FOUND 

52 

53 

54class AirflowConfigException(AirflowException): 

55 """Raise when there is configuration problem.""" 

56 

57 

58class AirflowSensorTimeout(AirflowException): 

59 """Raise when there is a timeout on sensor polling.""" 

60 

61 

62class AirflowRescheduleException(AirflowException): 

63 """ 

64 Raise when the task should be re-scheduled at a later time. 

65 

66 :param reschedule_date: The date when the task should be rescheduled 

67 """ 

68 

69 def __init__(self, reschedule_date): 

70 super().__init__() 

71 self.reschedule_date = reschedule_date 

72 

73 

74class InvalidStatsNameException(AirflowException): 

75 """Raise when name of the stats is invalid.""" 

76 

77 

78class AirflowTaskTimeout(AirflowException): 

79 """Raise when the task execution times-out.""" 

80 

81 

82class AirflowWebServerTimeout(AirflowException): 

83 """Raise when the web server times out.""" 

84 

85 

86class AirflowSkipException(AirflowException): 

87 """Raise when the task should be skipped.""" 

88 

89 

90class AirflowFailException(AirflowException): 

91 """Raise when the task should be failed without retrying.""" 

92 

93 

94class AirflowOptionalProviderFeatureException(AirflowException): 

95 """Raise by providers when imports are missing for optional provider features.""" 

96 

97 

98class XComNotFound(AirflowException): 

99 """Raise when an XCom reference is being resolved against a non-existent XCom.""" 

100 

101 def __init__(self, dag_id: str, task_id: str, key: str) -> None: 

102 super().__init__() 

103 self.dag_id = dag_id 

104 self.task_id = task_id 

105 self.key = key 

106 

107 def __str__(self) -> str: 

108 return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!' 

109 

110 

111class UnmappableOperator(AirflowException): 

112 """Raise when an operator is not implemented to be mappable.""" 

113 

114 

115class XComForMappingNotPushed(AirflowException): 

116 """Raise when a mapped downstream's dependency fails to push XCom for task mapping.""" 

117 

118 def __str__(self) -> str: 

119 return "did not push XCom for task mapping" 

120 

121 

122class UnmappableXComTypePushed(AirflowException): 

123 """Raise when an unmappable type is pushed as a mapped downstream's dependency.""" 

124 

125 def __init__(self, value: Any, *values: Any) -> None: 

126 super().__init__(value, *values) 

127 

128 def __str__(self) -> str: 

129 typename = type(self.args[0]).__qualname__ 

130 for arg in self.args[1:]: 

131 typename = f"{typename}[{type(arg).__qualname__}]" 

132 return f"unmappable return type {typename!r}" 

133 

134 

135class UnmappableXComLengthPushed(AirflowException): 

136 """Raise when the pushed value is too large to map as a downstream's dependency.""" 

137 

138 def __init__(self, value: Sized, max_length: int) -> None: 

139 super().__init__(value) 

140 self.value = value 

141 self.max_length = max_length 

142 

143 def __str__(self) -> str: 

144 return f"unmappable return value length: {len(self.value)} > {self.max_length}" 

145 

146 

147class AirflowDagCycleException(AirflowException): 

148 """Raise when there is a cycle in DAG definition.""" 

149 

150 

151class AirflowDagDuplicatedIdException(AirflowException): 

152 """Raise when a DAG's ID is already used by another DAG.""" 

153 

154 def __init__(self, dag_id: str, incoming: str, existing: str) -> None: 

155 super().__init__(dag_id, incoming, existing) 

156 self.dag_id = dag_id 

157 self.incoming = incoming 

158 self.existing = existing 

159 

160 def __str__(self) -> str: 

161 return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}" 

162 

163 

164class AirflowDagInconsistent(AirflowException): 

165 """Raise when a DAG has inconsistent attributes.""" 

166 

167 

168class AirflowClusterPolicyViolation(AirflowException): 

169 """Raise when there is a violation of a Cluster Policy in DAG definition.""" 

170 

171 

172class AirflowTimetableInvalid(AirflowException): 

173 """Raise when a DAG has an invalid timetable.""" 

174 

175 

176class DagNotFound(AirflowNotFoundException): 

177 """Raise when a DAG is not available in the system.""" 

178 

179 

180class DagCodeNotFound(AirflowNotFoundException): 

181 """Raise when a DAG code is not available in the system.""" 

182 

183 

184class DagRunNotFound(AirflowNotFoundException): 

185 """Raise when a DAG Run is not available in the system.""" 

186 

187 

188class DagRunAlreadyExists(AirflowBadRequest): 

189 """Raise when creating a DAG run for DAG which already has DAG run entry.""" 

190 

191 def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None: 

192 super().__init__( 

193 f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}" 

194 ) 

195 self.dag_run = dag_run 

196 

197 

198class DagFileExists(AirflowBadRequest): 

199 """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder.""" 

200 

201 def __init__(self, *args, **kwargs): 

202 super().__init__(*args, **kwargs) 

203 warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2) 

204 

205 

206class DuplicateTaskIdFound(AirflowException): 

207 """Raise when a Task with duplicate task_id is defined in the same DAG.""" 

208 

209 

210class TaskAlreadyInTaskGroup(AirflowException): 

211 """Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.""" 

212 

213 def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str) -> None: 

214 super().__init__(task_id, new_group_id) 

215 self.task_id = task_id 

216 self.existing_group_id = existing_group_id 

217 self.new_group_id = new_group_id 

218 

219 def __str__(self) -> str: 

220 if self.existing_group_id is None: 

221 existing_group = "the DAG's root group" 

222 else: 

223 existing_group = f"group {self.existing_group_id!r}" 

224 return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})" 

225 

226 

227class SerializationError(AirflowException): 

228 """A problem occurred when trying to serialize something.""" 

229 

230 

231class ParamValidationError(AirflowException): 

232 """Raise when DAG params is invalid.""" 

233 

234 

235class TaskNotFound(AirflowNotFoundException): 

236 """Raise when a Task is not available in the system.""" 

237 

238 

239class TaskInstanceNotFound(AirflowNotFoundException): 

240 """Raise when a task instance is not available in the system.""" 

241 

242 

243class PoolNotFound(AirflowNotFoundException): 

244 """Raise when a Pool is not available in the system.""" 

245 

246 

247class NoAvailablePoolSlot(AirflowException): 

248 """Raise when there is not enough slots in pool.""" 

249 

250 

251class DagConcurrencyLimitReached(AirflowException): 

252 """Raise when DAG max_active_tasks limit is reached.""" 

253 

254 

255class TaskConcurrencyLimitReached(AirflowException): 

256 """Raise when task max_active_tasks limit is reached.""" 

257 

258 

259class BackfillUnfinished(AirflowException): 

260 """ 

261 Raises when not all tasks succeed in backfill. 

262 

263 :param message: The human-readable description of the exception 

264 :param ti_status: The information about all task statuses 

265 """ 

266 

267 def __init__(self, message, ti_status): 

268 super().__init__(message) 

269 self.ti_status = ti_status 

270 

271 

272class FileSyntaxError(NamedTuple): 

273 """Information about a single error in a file.""" 

274 

275 line_no: int | None 

276 message: str 

277 

278 def __str__(self): 

279 return f"{self.message}. Line number: s{str(self.line_no)}," 

280 

281 

282class AirflowFileParseException(AirflowException): 

283 """ 

284 Raises when connection or variable file can not be parsed. 

285 

286 :param msg: The human-readable description of the exception 

287 :param file_path: A processed file that contains errors 

288 :param parse_errors: File syntax errors 

289 """ 

290 

291 def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None: 

292 super().__init__(msg) 

293 self.msg = msg 

294 self.file_path = file_path 

295 self.parse_errors = parse_errors 

296 

297 def __str__(self): 

298 from airflow.utils.code_utils import prepare_code_snippet 

299 from airflow.utils.platform import is_tty 

300 

301 result = f"{self.msg}\nFilename: {self.file_path}\n\n" 

302 

303 for error_no, parse_error in enumerate(self.parse_errors, 1): 

304 result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n" 

305 result += f"{parse_error.message}\n" 

306 if parse_error.line_no: 

307 result += f"Line number: {parse_error.line_no}\n" 

308 if parse_error.line_no and is_tty(): 

309 result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n" 

310 

311 return result 

312 

313 

314class ConnectionNotUnique(AirflowException): 

315 """Raise when multiple values are found for the same connection ID.""" 

316 

317 

318class TaskDeferred(BaseException): 

319 """ 

320 Signal an operator moving to deferred state. 

321 

322 Special exception raised to signal that the operator it was raised from 

323 wishes to defer until a trigger fires. 

324 """ 

325 

326 def __init__( 

327 self, 

328 *, 

329 trigger, 

330 method_name: str, 

331 kwargs: dict[str, Any] | None = None, 

332 timeout: datetime.timedelta | None = None, 

333 ): 

334 super().__init__() 

335 self.trigger = trigger 

336 self.method_name = method_name 

337 self.kwargs = kwargs 

338 self.timeout = timeout 

339 # Check timeout type at runtime 

340 if self.timeout is not None and not hasattr(self.timeout, "total_seconds"): 

341 raise ValueError("Timeout value must be a timedelta") 

342 

343 def __repr__(self) -> str: 

344 return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>" 

345 

346 

347class TaskDeferralError(AirflowException): 

348 """Raised when a task failed during deferral for some reason.""" 

349 

350 

351class PodMutationHookException(AirflowException): 

352 """Raised when exception happens during Pod Mutation Hook execution.""" 

353 

354 

355class PodReconciliationError(AirflowException): 

356 """Raised when an error is encountered while trying to merge pod configs.""" 

357 

358 

359class RemovedInAirflow3Warning(DeprecationWarning): 

360 """Issued for usage of deprecated features that will be removed in Airflow3.""" 

361 

362 deprecated_since: str | None = None 

363 "Indicates the airflow version that started raising this deprecation warning" 

364 

365 

366class AirflowProviderDeprecationWarning(DeprecationWarning): 

367 """Issued for usage of deprecated features of Airflow provider.""" 

368 

369 deprecated_provider_since: str | None = None 

370 "Indicates the provider version that started raising this deprecation warning"