Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

38 statements  

1import socket 

2from time import sleep 

3from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple, Type, TypeVar 

4 

5from redis.exceptions import ConnectionError, TimeoutError 

6 

7T = TypeVar("T") 

8 

9if TYPE_CHECKING: 

10 from redis.backoff import AbstractBackoff 

11 

12 

13class Retry: 

14 """Retry a specific number of times after a failure""" 

15 

16 def __init__( 

17 self, 

18 backoff: "AbstractBackoff", 

19 retries: int, 

20 supported_errors: Tuple[Type[Exception], ...] = ( 

21 ConnectionError, 

22 TimeoutError, 

23 socket.timeout, 

24 ), 

25 ): 

26 """ 

27 Initialize a `Retry` object with a `Backoff` object 

28 that retries a maximum of `retries` times. 

29 `retries` can be negative to retry forever. 

30 You can specify the types of supported errors which trigger 

31 a retry with the `supported_errors` parameter. 

32 """ 

33 self._backoff = backoff 

34 self._retries = retries 

35 self._supported_errors = supported_errors 

36 

37 def __eq__(self, other: Any) -> bool: 

38 if not isinstance(other, Retry): 

39 return NotImplemented 

40 

41 return ( 

42 self._backoff == other._backoff 

43 and self._retries == other._retries 

44 and set(self._supported_errors) == set(other._supported_errors) 

45 ) 

46 

47 def __hash__(self) -> int: 

48 return hash((self._backoff, self._retries, frozenset(self._supported_errors))) 

49 

50 def update_supported_errors( 

51 self, specified_errors: Iterable[Type[Exception]] 

52 ) -> None: 

53 """ 

54 Updates the supported errors with the specified error types 

55 """ 

56 self._supported_errors = tuple( 

57 set(self._supported_errors + tuple(specified_errors)) 

58 ) 

59 

60 def get_retries(self) -> int: 

61 """ 

62 Get the number of retries. 

63 """ 

64 return self._retries 

65 

66 def update_retries(self, value: int) -> None: 

67 """ 

68 Set the number of retries. 

69 """ 

70 self._retries = value 

71 

72 def call_with_retry( 

73 self, 

74 do: Callable[[], T], 

75 fail: Callable[[Exception], Any], 

76 ) -> T: 

77 """ 

78 Execute an operation that might fail and returns its result, or 

79 raise the exception that was thrown depending on the `Backoff` object. 

80 `do`: the operation to call. Expects no argument. 

81 `fail`: the failure handler, expects the last error that was thrown 

82 """ 

83 self._backoff.reset() 

84 failures = 0 

85 while True: 

86 try: 

87 return do() 

88 except self._supported_errors as error: 

89 failures += 1 

90 fail(error) 

91 if self._retries >= 0 and failures > self._retries: 

92 raise error 

93 backoff = self._backoff.compute(failures) 

94 if backoff > 0: 

95 sleep(backoff)