Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import socket
3from time import sleep
4from typing import (
5 TYPE_CHECKING,
6 Any,
7 Callable,
8 Generic,
9 Iterable,
10 Optional,
11 Tuple,
12 Type,
13 TypeVar,
14)
16from redis.exceptions import ConnectionError, TimeoutError
18T = TypeVar("T")
19E = TypeVar("E", bound=Exception, covariant=True)
21if TYPE_CHECKING:
22 from redis.backoff import AbstractBackoff
25class AbstractRetry(Generic[E], abc.ABC):
26 """Retry a specific number of times after a failure"""
28 _supported_errors: Tuple[Type[E], ...]
30 def __init__(
31 self,
32 backoff: "AbstractBackoff",
33 retries: int,
34 supported_errors: Tuple[Type[E], ...],
35 ):
36 """
37 Initialize a `Retry` object with a `Backoff` object
38 that retries a maximum of `retries` times.
39 `retries` can be negative to retry forever.
40 You can specify the types of supported errors which trigger
41 a retry with the `supported_errors` parameter.
42 """
43 self._backoff = backoff
44 self._retries = retries
45 self._supported_errors = supported_errors
47 @abc.abstractmethod
48 def __eq__(self, other: Any) -> bool:
49 return NotImplemented
51 def __hash__(self) -> int:
52 return hash((self._backoff, self._retries, frozenset(self._supported_errors)))
54 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None:
55 """
56 Updates the supported errors with the specified error types
57 """
58 self._supported_errors = tuple(
59 set(self._supported_errors + tuple(specified_errors))
60 )
62 def get_retries(self) -> int:
63 """
64 Get the number of retries.
65 """
66 return self._retries
68 def update_retries(self, value: int) -> None:
69 """
70 Set the number of retries.
71 """
72 self._retries = value
75class Retry(AbstractRetry[Exception]):
76 __hash__ = AbstractRetry.__hash__
78 def __init__(
79 self,
80 backoff: "AbstractBackoff",
81 retries: int,
82 supported_errors: Tuple[Type[Exception], ...] = (
83 ConnectionError,
84 TimeoutError,
85 socket.timeout,
86 ),
87 ):
88 super().__init__(backoff, retries, supported_errors)
90 def __eq__(self, other: Any) -> bool:
91 if not isinstance(other, Retry):
92 return NotImplemented
94 return (
95 self._backoff == other._backoff
96 and self._retries == other._retries
97 and set(self._supported_errors) == set(other._supported_errors)
98 )
100 def call_with_retry(
101 self,
102 do: Callable[[], T],
103 fail: Callable[[Exception], Any],
104 is_retryable: Optional[Callable[[Exception], bool]] = None,
105 ) -> T:
106 """
107 Execute an operation that might fail and returns its result, or
108 raise the exception that was thrown depending on the `Backoff` object.
109 `do`: the operation to call. Expects no argument.
110 `fail`: the failure handler, expects the last error that was thrown
111 """
112 self._backoff.reset()
113 failures = 0
114 while True:
115 try:
116 return do()
117 except self._supported_errors as error:
118 if is_retryable and not is_retryable(error):
119 raise
120 failures += 1
121 fail(error)
122 if self._retries >= 0 and failures > self._retries:
123 raise error
124 backoff = self._backoff.compute(failures)
125 if backoff > 0:
126 sleep(backoff)