Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import socket
3from time import sleep
4from typing import (
5 TYPE_CHECKING,
6 Any,
7 Callable,
8 Generic,
9 Iterable,
10 Optional,
11 Tuple,
12 Type,
13 TypeVar,
14 Union,
15)
17from redis.exceptions import ConnectionError, TimeoutError
19T = TypeVar("T")
20E = TypeVar("E", bound=Exception, covariant=True)
22if TYPE_CHECKING:
23 from redis.backoff import AbstractBackoff
26class AbstractRetry(Generic[E], abc.ABC):
27 """Retry a specific number of times after a failure"""
29 _supported_errors: Tuple[Type[E], ...]
31 def __init__(
32 self,
33 backoff: "AbstractBackoff",
34 retries: int,
35 supported_errors: Tuple[Type[E], ...],
36 ):
37 """
38 Initialize a `Retry` object with a `Backoff` object
39 that retries a maximum of `retries` times.
40 `retries` can be negative to retry forever.
41 You can specify the types of supported errors which trigger
42 a retry with the `supported_errors` parameter.
43 """
44 self._backoff = backoff
45 self._retries = retries
46 self._supported_errors = supported_errors
48 @abc.abstractmethod
49 def __eq__(self, other: Any) -> bool:
50 return NotImplemented
52 def __hash__(self) -> int:
53 return hash((self._backoff, self._retries, frozenset(self._supported_errors)))
55 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None:
56 """
57 Updates the supported errors with the specified error types
58 """
59 self._supported_errors = tuple(
60 set(self._supported_errors + tuple(specified_errors))
61 )
63 def get_retries(self) -> int:
64 """
65 Get the number of retries.
66 """
67 return self._retries
69 def update_retries(self, value: int) -> None:
70 """
71 Set the number of retries.
72 """
73 self._retries = value
76class Retry(AbstractRetry[Exception]):
77 __hash__ = AbstractRetry.__hash__
79 def __init__(
80 self,
81 backoff: "AbstractBackoff",
82 retries: int,
83 supported_errors: Tuple[Type[Exception], ...] = (
84 ConnectionError,
85 TimeoutError,
86 socket.timeout,
87 ),
88 ):
89 super().__init__(backoff, retries, supported_errors)
91 def __eq__(self, other: Any) -> bool:
92 if not isinstance(other, Retry):
93 return NotImplemented
95 return (
96 self._backoff == other._backoff
97 and self._retries == other._retries
98 and set(self._supported_errors) == set(other._supported_errors)
99 )
101 def call_with_retry(
102 self,
103 do: Callable[[], T],
104 fail: Union[Callable[[Exception], Any], Callable[[Exception, int], Any]],
105 is_retryable: Optional[Callable[[Exception], bool]] = None,
106 with_failure_count: bool = False,
107 ) -> T:
108 """
109 Execute an operation that might fail and returns its result, or
110 raise the exception that was thrown depending on the `Backoff` object.
111 `do`: the operation to call. Expects no argument.
112 `fail`: the failure handler, expects the last error that was thrown
113 ``is_retryable``: optional function to determine if an error is retryable
114 ``with_failure_count``: if True, the failure count is passed to the failure handler
115 """
116 self._backoff.reset()
117 failures = 0
118 while True:
119 try:
120 return do()
121 except self._supported_errors as error:
122 if is_retryable and not is_retryable(error):
123 raise
124 failures += 1
126 if with_failure_count:
127 fail(error, failures)
128 else:
129 fail(error)
131 if self._retries >= 0 and failures > self._retries:
132 raise error
133 backoff = self._backoff.compute(failures)
134 if backoff > 0:
135 sleep(backoff)