Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1import abc 

2import socket 

3from time import sleep 

4from typing import ( 

5 TYPE_CHECKING, 

6 Any, 

7 Callable, 

8 Generic, 

9 Iterable, 

10 Optional, 

11 Tuple, 

12 Type, 

13 TypeVar, 

14) 

15 

16from redis.exceptions import ConnectionError, TimeoutError 

17 

18T = TypeVar("T") 

19E = TypeVar("E", bound=Exception, covariant=True) 

20 

21if TYPE_CHECKING: 

22 from redis.backoff import AbstractBackoff 

23 

24 

25class AbstractRetry(Generic[E], abc.ABC): 

26 """Retry a specific number of times after a failure""" 

27 

28 _supported_errors: Tuple[Type[E], ...] 

29 

30 def __init__( 

31 self, 

32 backoff: "AbstractBackoff", 

33 retries: int, 

34 supported_errors: Tuple[Type[E], ...], 

35 ): 

36 """ 

37 Initialize a `Retry` object with a `Backoff` object 

38 that retries a maximum of `retries` times. 

39 `retries` can be negative to retry forever. 

40 You can specify the types of supported errors which trigger 

41 a retry with the `supported_errors` parameter. 

42 """ 

43 self._backoff = backoff 

44 self._retries = retries 

45 self._supported_errors = supported_errors 

46 

47 @abc.abstractmethod 

48 def __eq__(self, other: Any) -> bool: 

49 return NotImplemented 

50 

51 def __hash__(self) -> int: 

52 return hash((self._backoff, self._retries, frozenset(self._supported_errors))) 

53 

54 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None: 

55 """ 

56 Updates the supported errors with the specified error types 

57 """ 

58 self._supported_errors = tuple( 

59 set(self._supported_errors + tuple(specified_errors)) 

60 ) 

61 

62 def get_retries(self) -> int: 

63 """ 

64 Get the number of retries. 

65 """ 

66 return self._retries 

67 

68 def update_retries(self, value: int) -> None: 

69 """ 

70 Set the number of retries. 

71 """ 

72 self._retries = value 

73 

74 

75class Retry(AbstractRetry[Exception]): 

76 __hash__ = AbstractRetry.__hash__ 

77 

78 def __init__( 

79 self, 

80 backoff: "AbstractBackoff", 

81 retries: int, 

82 supported_errors: Tuple[Type[Exception], ...] = ( 

83 ConnectionError, 

84 TimeoutError, 

85 socket.timeout, 

86 ), 

87 ): 

88 super().__init__(backoff, retries, supported_errors) 

89 

90 def __eq__(self, other: Any) -> bool: 

91 if not isinstance(other, Retry): 

92 return NotImplemented 

93 

94 return ( 

95 self._backoff == other._backoff 

96 and self._retries == other._retries 

97 and set(self._supported_errors) == set(other._supported_errors) 

98 ) 

99 

100 def call_with_retry( 

101 self, 

102 do: Callable[[], T], 

103 fail: Callable[[Exception], Any], 

104 is_retryable: Optional[Callable[[Exception], bool]] = None, 

105 ) -> T: 

106 """ 

107 Execute an operation that might fail and returns its result, or 

108 raise the exception that was thrown depending on the `Backoff` object. 

109 `do`: the operation to call. Expects no argument. 

110 `fail`: the failure handler, expects the last error that was thrown 

111 """ 

112 self._backoff.reset() 

113 failures = 0 

114 while True: 

115 try: 

116 return do() 

117 except self._supported_errors as error: 

118 if is_retryable and not is_retryable(error): 

119 raise 

120 failures += 1 

121 fail(error) 

122 if self._retries >= 0 and failures > self._retries: 

123 raise error 

124 backoff = self._backoff.compute(failures) 

125 if backoff > 0: 

126 sleep(backoff)