Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1import abc 

2import socket 

3from time import sleep 

4from typing import ( 

5 TYPE_CHECKING, 

6 Any, 

7 Callable, 

8 Generic, 

9 Iterable, 

10 Optional, 

11 Tuple, 

12 Type, 

13 TypeVar, 

14 Union, 

15) 

16 

17from redis.exceptions import ConnectionError, TimeoutError 

18 

19T = TypeVar("T") 

20E = TypeVar("E", bound=Exception, covariant=True) 

21 

22if TYPE_CHECKING: 

23 from redis.backoff import AbstractBackoff 

24 

25 

26class AbstractRetry(Generic[E], abc.ABC): 

27 """Retry a specific number of times after a failure""" 

28 

29 _supported_errors: Tuple[Type[E], ...] 

30 

31 def __init__( 

32 self, 

33 backoff: "AbstractBackoff", 

34 retries: int, 

35 supported_errors: Tuple[Type[E], ...], 

36 ): 

37 """ 

38 Initialize a `Retry` object with a `Backoff` object 

39 that retries a maximum of `retries` times. 

40 `retries` can be negative to retry forever. 

41 You can specify the types of supported errors which trigger 

42 a retry with the `supported_errors` parameter. 

43 """ 

44 self._backoff = backoff 

45 self._retries = retries 

46 self._supported_errors = supported_errors 

47 

48 @abc.abstractmethod 

49 def __eq__(self, other: Any) -> bool: 

50 return NotImplemented 

51 

52 def __hash__(self) -> int: 

53 return hash((self._backoff, self._retries, frozenset(self._supported_errors))) 

54 

55 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None: 

56 """ 

57 Updates the supported errors with the specified error types 

58 """ 

59 self._supported_errors = tuple( 

60 set(self._supported_errors + tuple(specified_errors)) 

61 ) 

62 

63 def get_retries(self) -> int: 

64 """ 

65 Get the number of retries. 

66 """ 

67 return self._retries 

68 

69 def update_retries(self, value: int) -> None: 

70 """ 

71 Set the number of retries. 

72 """ 

73 self._retries = value 

74 

75 

76class Retry(AbstractRetry[Exception]): 

77 __hash__ = AbstractRetry.__hash__ 

78 

79 def __init__( 

80 self, 

81 backoff: "AbstractBackoff", 

82 retries: int, 

83 supported_errors: Tuple[Type[Exception], ...] = ( 

84 ConnectionError, 

85 TimeoutError, 

86 socket.timeout, 

87 ), 

88 ): 

89 super().__init__(backoff, retries, supported_errors) 

90 

91 def __eq__(self, other: Any) -> bool: 

92 if not isinstance(other, Retry): 

93 return NotImplemented 

94 

95 return ( 

96 self._backoff == other._backoff 

97 and self._retries == other._retries 

98 and set(self._supported_errors) == set(other._supported_errors) 

99 ) 

100 

101 def call_with_retry( 

102 self, 

103 do: Callable[[], T], 

104 fail: Union[Callable[[Exception], Any], Callable[[Exception, int], Any]], 

105 is_retryable: Optional[Callable[[Exception], bool]] = None, 

106 with_failure_count: bool = False, 

107 ) -> T: 

108 """ 

109 Execute an operation that might fail and returns its result, or 

110 raise the exception that was thrown depending on the `Backoff` object. 

111 `do`: the operation to call. Expects no argument. 

112 `fail`: the failure handler, expects the last error that was thrown 

113 ``is_retryable``: optional function to determine if an error is retryable 

114 ``with_failure_count``: if True, the failure count is passed to the failure handler 

115 """ 

116 self._backoff.reset() 

117 failures = 0 

118 while True: 

119 try: 

120 return do() 

121 except self._supported_errors as error: 

122 if is_retryable and not is_retryable(error): 

123 raise 

124 failures += 1 

125 

126 if with_failure_count: 

127 fail(error, failures) 

128 else: 

129 fail(error) 

130 

131 if self._retries >= 0 and failures > self._retries: 

132 raise error 

133 backoff = self._backoff.compute(failures) 

134 if backoff > 0: 

135 sleep(backoff)