Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/retries.py: 24%
42 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import functools
20import logging
21from inspect import signature
22from typing import Any
24from sqlalchemy.exc import DBAPIError, OperationalError
26from airflow.configuration import conf
28MAX_DB_RETRIES = conf.getint("database", "max_db_retries", fallback=3)
31def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: logging.Logger | None = None, **kwargs):
32 """Return Tenacity Retrying object with project specific default"""
33 import tenacity
35 # Default kwargs
36 retry_kwargs = dict(
37 retry=tenacity.retry_if_exception_type(exception_types=(OperationalError, DBAPIError)),
38 wait=tenacity.wait_random_exponential(multiplier=0.5, max=5),
39 stop=tenacity.stop_after_attempt(max_retries),
40 reraise=True,
41 **kwargs,
42 )
43 if logger and isinstance(logger, logging.Logger):
44 retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, logging.DEBUG, True)
46 return tenacity.Retrying(**retry_kwargs)
49def retry_db_transaction(_func: Any = None, retries: int = MAX_DB_RETRIES, **retry_kwargs):
50 """
51 Decorator to retry Class Methods and Functions in case of ``OperationalError`` from DB.
52 It should not be used with ``@provide_session``.
53 """
55 def retry_decorator(func):
56 # Get Positional argument for 'session'
57 func_params = signature(func).parameters
58 try:
59 # func_params is an ordered dict -- this is the "recommended" way of getting the position
60 session_args_idx = tuple(func_params).index("session")
61 except ValueError:
62 raise ValueError(f"Function {func.__qualname__} has no `session` argument")
63 # We don't need this anymore -- ensure we don't keep a reference to it by mistake
64 del func_params
66 @functools.wraps(func)
67 def wrapped_function(*args, **kwargs):
68 logger = args[0].log if args and hasattr(args[0], "log") else logging.getLogger(func.__module__)
70 # Get session from args or kwargs
71 if "session" in kwargs:
72 session = kwargs["session"]
73 elif len(args) > session_args_idx:
74 session = args[session_args_idx]
75 else:
76 raise TypeError(f"session is a required argument for {func.__qualname__}")
78 for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
79 with attempt:
80 logger.debug(
81 "Running %s with retries. Try %d of %d",
82 func.__qualname__,
83 attempt.retry_state.attempt_number,
84 retries,
85 )
86 try:
87 return func(*args, **kwargs)
88 except OperationalError:
89 session.rollback()
90 raise
92 return wrapped_function
94 # Allow using decorator with and without arguments
95 if _func is None:
96 return retry_decorator
97 else:
98 return retry_decorator(_func)