Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tenacity/wait.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016–2021 Julien Danjou
2# Copyright 2016 Joshua Harlow
3# Copyright 2013-2014 Ray Holder
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17import abc
18import random
19import typing
21from tenacity import _utils
23if typing.TYPE_CHECKING:
24 from tenacity import RetryCallState
27class wait_base(abc.ABC):
28 """Abstract base class for wait strategies."""
30 @abc.abstractmethod
31 def __call__(self, retry_state: "RetryCallState") -> float:
32 pass
34 def __add__(self, other: "wait_base") -> "wait_combine":
35 return wait_combine(self, other)
37 def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]:
38 # make it possible to use multiple waits with the built-in sum function
39 if other == 0: # type: ignore[comparison-overlap]
40 return self
41 return self.__add__(other)
44WaitBaseT = typing.Union[
45 wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]]
46]
49class wait_fixed(wait_base):
50 """Wait strategy that waits a fixed amount of time between each retry."""
52 def __init__(self, wait: _utils.time_unit_type) -> None:
53 self.wait_fixed = _utils.to_seconds(wait)
55 def __call__(self, retry_state: "RetryCallState") -> float:
56 return self.wait_fixed
59class wait_none(wait_fixed):
60 """Wait strategy that doesn't wait at all before retrying."""
62 def __init__(self) -> None:
63 super().__init__(0)
66class wait_random(wait_base):
67 """Wait strategy that waits a random amount of time between min/max."""
69 def __init__(
70 self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1
71 ) -> None: # noqa
72 self.wait_random_min = _utils.to_seconds(min)
73 self.wait_random_max = _utils.to_seconds(max)
75 def __call__(self, retry_state: "RetryCallState") -> float:
76 return self.wait_random_min + (
77 random.random() * (self.wait_random_max - self.wait_random_min)
78 )
81class wait_combine(wait_base):
82 """Combine several waiting strategies."""
84 def __init__(self, *strategies: wait_base) -> None:
85 self.wait_funcs = strategies
87 def __call__(self, retry_state: "RetryCallState") -> float:
88 return sum(x(retry_state=retry_state) for x in self.wait_funcs)
91class wait_chain(wait_base):
92 """Chain two or more waiting strategies.
94 If all strategies are exhausted, the very last strategy is used
95 thereafter.
97 For example::
99 @retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] +
100 [wait_fixed(2) for j in range(5)] +
101 [wait_fixed(5) for k in range(4)))
102 def wait_chained():
103 print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s
104 thereafter.")
105 """
107 def __init__(self, *strategies: wait_base) -> None:
108 self.strategies = strategies
110 def __call__(self, retry_state: "RetryCallState") -> float:
111 wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies))
112 wait_func = self.strategies[wait_func_no - 1]
113 return wait_func(retry_state=retry_state)
116class wait_incrementing(wait_base):
117 """Wait an incremental amount of time after each attempt.
119 Starting at a starting value and incrementing by a value for each attempt
120 (and restricting the upper limit to some maximum value).
121 """
123 def __init__(
124 self,
125 start: _utils.time_unit_type = 0,
126 increment: _utils.time_unit_type = 100,
127 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa
128 ) -> None:
129 self.start = _utils.to_seconds(start)
130 self.increment = _utils.to_seconds(increment)
131 self.max = _utils.to_seconds(max)
133 def __call__(self, retry_state: "RetryCallState") -> float:
134 result = self.start + (self.increment * (retry_state.attempt_number - 1))
135 return max(0, min(result, self.max))
138class wait_exponential(wait_base):
139 """Wait strategy that applies exponential backoff.
141 It allows for a customized multiplier and an ability to restrict the
142 upper and lower limits to some maximum and minimum value.
144 The intervals are fixed (i.e. there is no jitter), so this strategy is
145 suitable for balancing retries against latency when a required resource is
146 unavailable for an unknown duration, but *not* suitable for resolving
147 contention between multiple processes for a shared resource. Use
148 wait_random_exponential for the latter case.
149 """
151 def __init__(
152 self,
153 multiplier: typing.Union[int, float] = 1,
154 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa
155 exp_base: typing.Union[int, float] = 2,
156 min: _utils.time_unit_type = 0, # noqa
157 ) -> None:
158 self.multiplier = multiplier
159 self.min = _utils.to_seconds(min)
160 self.max = _utils.to_seconds(max)
161 self.exp_base = exp_base
163 def __call__(self, retry_state: "RetryCallState") -> float:
164 try:
165 exp = self.exp_base ** (retry_state.attempt_number - 1)
166 result = self.multiplier * exp
167 except OverflowError:
168 return self.max
169 return max(max(0, self.min), min(result, self.max))
172class wait_random_exponential(wait_exponential):
173 """Random wait with exponentially widening window.
175 An exponential backoff strategy used to mediate contention between multiple
176 uncoordinated processes for a shared resource in distributed systems. This
177 is the sense in which "exponential backoff" is meant in e.g. Ethernet
178 networking, and corresponds to the "Full Jitter" algorithm described in
179 this blog post:
181 https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
183 Each retry occurs at a random time in a geometrically expanding interval.
184 It allows for a custom multiplier and an ability to restrict the upper
185 limit of the random interval to some maximum value.
187 Example::
189 wait_random_exponential(multiplier=0.5, # initial window 0.5s
190 max=60) # max 60s timeout
192 When waiting for an unavailable resource to become available again, as
193 opposed to trying to resolve contention for a shared resource, the
194 wait_exponential strategy (which uses a fixed interval) may be preferable.
196 """
198 def __call__(self, retry_state: "RetryCallState") -> float:
199 high = super().__call__(retry_state=retry_state)
200 return random.uniform(0, high)
203class wait_exponential_jitter(wait_base):
204 """Wait strategy that applies exponential backoff and jitter.
206 It allows for a customized initial wait, maximum wait and jitter.
208 This implements the strategy described here:
209 https://cloud.google.com/storage/docs/retry-strategy
211 The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum)
212 where n is the retry count.
213 """
215 def __init__(
216 self,
217 initial: float = 1,
218 max: float = _utils.MAX_WAIT, # noqa
219 exp_base: float = 2,
220 jitter: float = 1,
221 ) -> None:
222 self.initial = initial
223 self.max = max
224 self.exp_base = exp_base
225 self.jitter = jitter
227 def __call__(self, retry_state: "RetryCallState") -> float:
228 jitter = random.uniform(0, self.jitter)
229 try:
230 exp = self.exp_base ** (retry_state.attempt_number - 1)
231 result = self.initial * exp + jitter
232 except OverflowError:
233 result = self.max
234 return max(0, min(result, self.max))