Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

48 statements  

1import abc 

2import socket 

3from time import sleep 

4from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Tuple, Type, TypeVar 

5 

6from redis.exceptions import ConnectionError, TimeoutError 

7 

8T = TypeVar("T") 

9E = TypeVar("E", bound=Exception, covariant=True) 

10 

11if TYPE_CHECKING: 

12 from redis.backoff import AbstractBackoff 

13 

14 

15class AbstractRetry(Generic[E], abc.ABC): 

16 """Retry a specific number of times after a failure""" 

17 

18 _supported_errors: Tuple[Type[E], ...] 

19 

20 def __init__( 

21 self, 

22 backoff: "AbstractBackoff", 

23 retries: int, 

24 supported_errors: Tuple[Type[E], ...], 

25 ): 

26 """ 

27 Initialize a `Retry` object with a `Backoff` object 

28 that retries a maximum of `retries` times. 

29 `retries` can be negative to retry forever. 

30 You can specify the types of supported errors which trigger 

31 a retry with the `supported_errors` parameter. 

32 """ 

33 self._backoff = backoff 

34 self._retries = retries 

35 self._supported_errors = supported_errors 

36 

37 @abc.abstractmethod 

38 def __eq__(self, other: Any) -> bool: 

39 return NotImplemented 

40 

41 def __hash__(self) -> int: 

42 return hash((self._backoff, self._retries, frozenset(self._supported_errors))) 

43 

44 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None: 

45 """ 

46 Updates the supported errors with the specified error types 

47 """ 

48 self._supported_errors = tuple( 

49 set(self._supported_errors + tuple(specified_errors)) 

50 ) 

51 

52 def get_retries(self) -> int: 

53 """ 

54 Get the number of retries. 

55 """ 

56 return self._retries 

57 

58 def update_retries(self, value: int) -> None: 

59 """ 

60 Set the number of retries. 

61 """ 

62 self._retries = value 

63 

64 

65class Retry(AbstractRetry[Exception]): 

66 __hash__ = AbstractRetry.__hash__ 

67 

68 def __init__( 

69 self, 

70 backoff: "AbstractBackoff", 

71 retries: int, 

72 supported_errors: Tuple[Type[Exception], ...] = ( 

73 ConnectionError, 

74 TimeoutError, 

75 socket.timeout, 

76 ), 

77 ): 

78 super().__init__(backoff, retries, supported_errors) 

79 

80 def __eq__(self, other: Any) -> bool: 

81 if not isinstance(other, Retry): 

82 return NotImplemented 

83 

84 return ( 

85 self._backoff == other._backoff 

86 and self._retries == other._retries 

87 and set(self._supported_errors) == set(other._supported_errors) 

88 ) 

89 

90 def call_with_retry( 

91 self, 

92 do: Callable[[], T], 

93 fail: Callable[[Exception], Any], 

94 ) -> T: 

95 """ 

96 Execute an operation that might fail and returns its result, or 

97 raise the exception that was thrown depending on the `Backoff` object. 

98 `do`: the operation to call. Expects no argument. 

99 `fail`: the failure handler, expects the last error that was thrown 

100 """ 

101 self._backoff.reset() 

102 failures = 0 

103 while True: 

104 try: 

105 return do() 

106 except self._supported_errors as error: 

107 failures += 1 

108 fail(error) 

109 if self._retries >= 0 and failures > self._retries: 

110 raise error 

111 backoff = self._backoff.compute(failures) 

112 if backoff > 0: 

113 sleep(backoff)