Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/retries.py: 24%

42 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import functools 

20import logging 

21from inspect import signature 

22from typing import Any 

23 

24from sqlalchemy.exc import DBAPIError, OperationalError 

25 

26from airflow.configuration import conf 

27 

28MAX_DB_RETRIES = conf.getint("database", "max_db_retries", fallback=3) 

29 

30 

31def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: logging.Logger | None = None, **kwargs): 

32 """Return Tenacity Retrying object with project specific default""" 

33 import tenacity 

34 

35 # Default kwargs 

36 retry_kwargs = dict( 

37 retry=tenacity.retry_if_exception_type(exception_types=(OperationalError, DBAPIError)), 

38 wait=tenacity.wait_random_exponential(multiplier=0.5, max=5), 

39 stop=tenacity.stop_after_attempt(max_retries), 

40 reraise=True, 

41 **kwargs, 

42 ) 

43 if logger and isinstance(logger, logging.Logger): 

44 retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, logging.DEBUG, True) 

45 

46 return tenacity.Retrying(**retry_kwargs) 

47 

48 

49def retry_db_transaction(_func: Any = None, retries: int = MAX_DB_RETRIES, **retry_kwargs): 

50 """ 

51 Decorator to retry Class Methods and Functions in case of ``OperationalError`` from DB. 

52 It should not be used with ``@provide_session``. 

53 """ 

54 

55 def retry_decorator(func): 

56 # Get Positional argument for 'session' 

57 func_params = signature(func).parameters 

58 try: 

59 # func_params is an ordered dict -- this is the "recommended" way of getting the position 

60 session_args_idx = tuple(func_params).index("session") 

61 except ValueError: 

62 raise ValueError(f"Function {func.__qualname__} has no `session` argument") 

63 # We don't need this anymore -- ensure we don't keep a reference to it by mistake 

64 del func_params 

65 

66 @functools.wraps(func) 

67 def wrapped_function(*args, **kwargs): 

68 logger = args[0].log if args and hasattr(args[0], "log") else logging.getLogger(func.__module__) 

69 

70 # Get session from args or kwargs 

71 if "session" in kwargs: 

72 session = kwargs["session"] 

73 elif len(args) > session_args_idx: 

74 session = args[session_args_idx] 

75 else: 

76 raise TypeError(f"session is a required argument for {func.__qualname__}") 

77 

78 for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs): 

79 with attempt: 

80 logger.debug( 

81 "Running %s with retries. Try %d of %d", 

82 func.__qualname__, 

83 attempt.retry_state.attempt_number, 

84 retries, 

85 ) 

86 try: 

87 return func(*args, **kwargs) 

88 except OperationalError: 

89 session.rollback() 

90 raise 

91 

92 return wrapped_function 

93 

94 # Allow using decorator with and without arguments 

95 if _func is None: 

96 return retry_decorator 

97 else: 

98 return retry_decorator(_func)