Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tenacity/wait.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# Copyright 2016–2021 Julien Danjou 

2# Copyright 2016 Joshua Harlow 

3# Copyright 2013-2014 Ray Holder 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17import abc 

18import random 

19import typing 

20 

21from tenacity import _utils 

22 

23if typing.TYPE_CHECKING: 

24 from tenacity import RetryCallState 

25 

26 

27class wait_base(abc.ABC): 

28 """Abstract base class for wait strategies.""" 

29 

30 @abc.abstractmethod 

31 def __call__(self, retry_state: "RetryCallState") -> float: 

32 pass 

33 

34 def __add__(self, other: "wait_base") -> "wait_combine": 

35 return wait_combine(self, other) 

36 

37 def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: 

38 # make it possible to use multiple waits with the built-in sum function 

39 if other == 0: # type: ignore[comparison-overlap] 

40 return self 

41 return self.__add__(other) 

42 

43 

44WaitBaseT = typing.Union[ 

45 wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]] 

46] 

47 

48 

49class wait_fixed(wait_base): 

50 """Wait strategy that waits a fixed amount of time between each retry.""" 

51 

52 def __init__(self, wait: _utils.time_unit_type) -> None: 

53 self.wait_fixed = _utils.to_seconds(wait) 

54 

55 def __call__(self, retry_state: "RetryCallState") -> float: 

56 return self.wait_fixed 

57 

58 

59class wait_none(wait_fixed): 

60 """Wait strategy that doesn't wait at all before retrying.""" 

61 

62 def __init__(self) -> None: 

63 super().__init__(0) 

64 

65 

66class wait_random(wait_base): 

67 """Wait strategy that waits a random amount of time between min/max.""" 

68 

69 def __init__( 

70 self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1 

71 ) -> None: # noqa 

72 self.wait_random_min = _utils.to_seconds(min) 

73 self.wait_random_max = _utils.to_seconds(max) 

74 

75 def __call__(self, retry_state: "RetryCallState") -> float: 

76 return self.wait_random_min + ( 

77 random.random() * (self.wait_random_max - self.wait_random_min) 

78 ) 

79 

80 

81class wait_combine(wait_base): 

82 """Combine several waiting strategies.""" 

83 

84 def __init__(self, *strategies: wait_base) -> None: 

85 self.wait_funcs = strategies 

86 

87 def __call__(self, retry_state: "RetryCallState") -> float: 

88 return sum(x(retry_state=retry_state) for x in self.wait_funcs) 

89 

90 

91class wait_chain(wait_base): 

92 """Chain two or more waiting strategies. 

93 

94 If all strategies are exhausted, the very last strategy is used 

95 thereafter. 

96 

97 For example:: 

98 

99 @retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] + 

100 [wait_fixed(2) for j in range(5)] + 

101 [wait_fixed(5) for k in range(4)])) 

102 def wait_chained(): 

103 print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s " 

104 "thereafter.") 

105 """ 

106 

107 def __init__(self, *strategies: wait_base) -> None: 

108 self.strategies = strategies 

109 

110 def __call__(self, retry_state: "RetryCallState") -> float: 

111 wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies)) 

112 wait_func = self.strategies[wait_func_no - 1] 

113 return wait_func(retry_state=retry_state) 

114 

115 

116class wait_exception(wait_base): 

117 """Wait strategy that waits the amount of time returned by the predicate. 

118 

119 The predicate is passed the exception object. Based on the exception, the 

120 user can decide how much time to wait before retrying. 

121 

122 For example:: 

123 

124 def http_error(exception: BaseException) -> float: 

125 if ( 

126 isinstance(exception, requests.HTTPError) 

127 and exception.response.status_code == requests.codes.too_many_requests 

128 ): 

129 return float(exception.response.headers.get("Retry-After", "1")) 

130 return 60.0 

131 

132 

133 @retry( 

134 stop=stop_after_attempt(3), 

135 wait=wait_exception(http_error), 

136 ) 

137 def http_get_request(url: str) -> None: 

138 response = requests.get(url) 

139 response.raise_for_status() 

140 """ 

141 

142 def __init__(self, predicate: typing.Callable[[BaseException], float]) -> None: 

143 self.predicate = predicate 

144 

145 def __call__(self, retry_state: "RetryCallState") -> float: 

146 if retry_state.outcome is None: 

147 raise RuntimeError("__call__() called before outcome was set") 

148 

149 exception = retry_state.outcome.exception() 

150 if exception is None: 

151 raise RuntimeError("outcome failed but the exception is None") 

152 return self.predicate(exception) 

153 

154 

155class wait_incrementing(wait_base): 

156 """Wait an incremental amount of time after each attempt. 

157 

158 Starting at a starting value and incrementing by a value for each attempt 

159 (and restricting the upper limit to some maximum value). 

160 """ 

161 

162 def __init__( 

163 self, 

164 start: _utils.time_unit_type = 0, 

165 increment: _utils.time_unit_type = 100, 

166 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

167 ) -> None: 

168 self.start = _utils.to_seconds(start) 

169 self.increment = _utils.to_seconds(increment) 

170 self.max = _utils.to_seconds(max) 

171 

172 def __call__(self, retry_state: "RetryCallState") -> float: 

173 result = self.start + (self.increment * (retry_state.attempt_number - 1)) 

174 return max(0, min(result, self.max)) 

175 

176 

177class wait_exponential(wait_base): 

178 """Wait strategy that applies exponential backoff. 

179 

180 It allows for a customized multiplier and an ability to restrict the 

181 upper and lower limits to some maximum and minimum value. 

182 

183 The intervals are fixed (i.e. there is no jitter), so this strategy is 

184 suitable for balancing retries against latency when a required resource is 

185 unavailable for an unknown duration, but *not* suitable for resolving 

186 contention between multiple processes for a shared resource. Use 

187 wait_random_exponential for the latter case. 

188 """ 

189 

190 def __init__( 

191 self, 

192 multiplier: typing.Union[int, float] = 1, 

193 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

194 exp_base: typing.Union[int, float] = 2, 

195 min: _utils.time_unit_type = 0, # noqa 

196 ) -> None: 

197 self.multiplier = multiplier 

198 self.min = _utils.to_seconds(min) 

199 self.max = _utils.to_seconds(max) 

200 self.exp_base = exp_base 

201 

202 def __call__(self, retry_state: "RetryCallState") -> float: 

203 try: 

204 exp = self.exp_base ** (retry_state.attempt_number - 1) 

205 result = self.multiplier * exp 

206 except OverflowError: 

207 return self.max 

208 return max(max(0, self.min), min(result, self.max)) 

209 

210 

211class wait_random_exponential(wait_exponential): 

212 """Random wait with exponentially widening window. 

213 

214 An exponential backoff strategy used to mediate contention between multiple 

215 uncoordinated processes for a shared resource in distributed systems. This 

216 is the sense in which "exponential backoff" is meant in e.g. Ethernet 

217 networking, and corresponds to the "Full Jitter" algorithm described in 

218 this blog post: 

219 

220 https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ 

221 

222 Each retry occurs at a random time in a geometrically expanding interval. 

223 It allows for a custom multiplier and an ability to restrict the upper 

224 limit of the random interval to some maximum value. 

225 

226 Example:: 

227 

228 wait_random_exponential(multiplier=0.5, # initial window 0.5s 

229 max=60) # max 60s timeout 

230 

231 When waiting for an unavailable resource to become available again, as 

232 opposed to trying to resolve contention for a shared resource, the 

233 wait_exponential strategy (which uses a fixed interval) may be preferable. 

234 

235 """ 

236 

237 def __call__(self, retry_state: "RetryCallState") -> float: 

238 high = super().__call__(retry_state=retry_state) 

239 return random.uniform(self.min, high) 

240 

241 

242class wait_exponential_jitter(wait_base): 

243 """Wait strategy that applies exponential backoff and jitter. 

244 

245 It allows for a customized initial wait, maximum wait and jitter. 

246 

247 This implements the strategy described here: 

248 https://cloud.google.com/storage/docs/retry-strategy 

249 

250 The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum) 

251 where n is the retry count. 

252 """ 

253 

254 def __init__( 

255 self, 

256 initial: float = 1, 

257 max: float = _utils.MAX_WAIT, # noqa 

258 exp_base: float = 2, 

259 jitter: float = 1, 

260 ) -> None: 

261 self.initial = initial 

262 self.max = max 

263 self.exp_base = exp_base 

264 self.jitter = jitter 

265 

266 def __call__(self, retry_state: "RetryCallState") -> float: 

267 jitter = random.uniform(0, self.jitter) 

268 try: 

269 exp = self.exp_base ** (retry_state.attempt_number - 1) 

270 result = self.initial * exp + jitter 

271 except OverflowError: 

272 result = self.max 

273 return max(0, min(result, self.max))