Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/retry.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import abc
2import socket
3from time import sleep
4from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Tuple, Type, TypeVar
6from redis.exceptions import ConnectionError, TimeoutError
8T = TypeVar("T")
9E = TypeVar("E", bound=Exception, covariant=True)
11if TYPE_CHECKING:
12 from redis.backoff import AbstractBackoff
15class AbstractRetry(Generic[E], abc.ABC):
16 """Retry a specific number of times after a failure"""
18 _supported_errors: Tuple[Type[E], ...]
20 def __init__(
21 self,
22 backoff: "AbstractBackoff",
23 retries: int,
24 supported_errors: Tuple[Type[E], ...],
25 ):
26 """
27 Initialize a `Retry` object with a `Backoff` object
28 that retries a maximum of `retries` times.
29 `retries` can be negative to retry forever.
30 You can specify the types of supported errors which trigger
31 a retry with the `supported_errors` parameter.
32 """
33 self._backoff = backoff
34 self._retries = retries
35 self._supported_errors = supported_errors
37 @abc.abstractmethod
38 def __eq__(self, other: Any) -> bool:
39 return NotImplemented
41 def __hash__(self) -> int:
42 return hash((self._backoff, self._retries, frozenset(self._supported_errors)))
44 def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None:
45 """
46 Updates the supported errors with the specified error types
47 """
48 self._supported_errors = tuple(
49 set(self._supported_errors + tuple(specified_errors))
50 )
52 def get_retries(self) -> int:
53 """
54 Get the number of retries.
55 """
56 return self._retries
58 def update_retries(self, value: int) -> None:
59 """
60 Set the number of retries.
61 """
62 self._retries = value
65class Retry(AbstractRetry[Exception]):
66 __hash__ = AbstractRetry.__hash__
68 def __init__(
69 self,
70 backoff: "AbstractBackoff",
71 retries: int,
72 supported_errors: Tuple[Type[Exception], ...] = (
73 ConnectionError,
74 TimeoutError,
75 socket.timeout,
76 ),
77 ):
78 super().__init__(backoff, retries, supported_errors)
80 def __eq__(self, other: Any) -> bool:
81 if not isinstance(other, Retry):
82 return NotImplemented
84 return (
85 self._backoff == other._backoff
86 and self._retries == other._retries
87 and set(self._supported_errors) == set(other._supported_errors)
88 )
90 def call_with_retry(
91 self,
92 do: Callable[[], T],
93 fail: Callable[[Exception], Any],
94 ) -> T:
95 """
96 Execute an operation that might fail and returns its result, or
97 raise the exception that was thrown depending on the `Backoff` object.
98 `do`: the operation to call. Expects no argument.
99 `fail`: the failure handler, expects the last error that was thrown
100 """
101 self._backoff.reset()
102 failures = 0
103 while True:
104 try:
105 return do()
106 except self._supported_errors as error:
107 failures += 1
108 fail(error)
109 if self._retries >= 0 and failures > self._retries:
110 raise error
111 backoff = self._backoff.compute(failures)
112 if backoff > 0:
113 sleep(backoff)