Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tenacity/wait.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1# Copyright 2016–2021 Julien Danjou 

2# Copyright 2016 Joshua Harlow 

3# Copyright 2013-2014 Ray Holder 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17import abc 

18import random 

19import typing 

20 

21from tenacity import _utils 

22 

23if typing.TYPE_CHECKING: 

24 from tenacity import RetryCallState 

25 

26 

27class wait_base(abc.ABC): 

28 """Abstract base class for wait strategies.""" 

29 

30 @abc.abstractmethod 

31 def __call__(self, retry_state: "RetryCallState") -> float: 

32 pass 

33 

34 def __add__(self, other: "wait_base") -> "wait_combine": 

35 return wait_combine(self, other) 

36 

37 def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: 

38 # make it possible to use multiple waits with the built-in sum function 

39 if other == 0: # type: ignore[comparison-overlap] 

40 return self 

41 return self.__add__(other) 

42 

43 

44WaitBaseT = typing.Union[ 

45 wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]] 

46] 

47 

48 

49class wait_fixed(wait_base): 

50 """Wait strategy that waits a fixed amount of time between each retry.""" 

51 

52 def __init__(self, wait: _utils.time_unit_type) -> None: 

53 self.wait_fixed = _utils.to_seconds(wait) 

54 

55 def __call__(self, retry_state: "RetryCallState") -> float: 

56 return self.wait_fixed 

57 

58 

59class wait_none(wait_fixed): 

60 """Wait strategy that doesn't wait at all before retrying.""" 

61 

62 def __init__(self) -> None: 

63 super().__init__(0) 

64 

65 

66class wait_random(wait_base): 

67 """Wait strategy that waits a random amount of time between min/max.""" 

68 

69 def __init__( 

70 self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1 

71 ) -> None: # noqa 

72 self.wait_random_min = _utils.to_seconds(min) 

73 self.wait_random_max = _utils.to_seconds(max) 

74 

75 def __call__(self, retry_state: "RetryCallState") -> float: 

76 return self.wait_random_min + ( 

77 random.random() * (self.wait_random_max - self.wait_random_min) 

78 ) 

79 

80 

81class wait_combine(wait_base): 

82 """Combine several waiting strategies.""" 

83 

84 def __init__(self, *strategies: wait_base) -> None: 

85 self.wait_funcs = strategies 

86 

87 def __call__(self, retry_state: "RetryCallState") -> float: 

88 return sum(x(retry_state=retry_state) for x in self.wait_funcs) 

89 

90 

91class wait_chain(wait_base): 

92 """Chain two or more waiting strategies. 

93 

94 If all strategies are exhausted, the very last strategy is used 

95 thereafter. 

96 

97 For example:: 

98 

99 @retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] + 

100 [wait_fixed(2) for j in range(5)] + 

101 [wait_fixed(5) for k in range(4))) 

102 def wait_chained(): 

103 print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s 

104 thereafter.") 

105 """ 

106 

107 def __init__(self, *strategies: wait_base) -> None: 

108 self.strategies = strategies 

109 

110 def __call__(self, retry_state: "RetryCallState") -> float: 

111 wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies)) 

112 wait_func = self.strategies[wait_func_no - 1] 

113 return wait_func(retry_state=retry_state) 

114 

115 

116class wait_incrementing(wait_base): 

117 """Wait an incremental amount of time after each attempt. 

118 

119 Starting at a starting value and incrementing by a value for each attempt 

120 (and restricting the upper limit to some maximum value). 

121 """ 

122 

123 def __init__( 

124 self, 

125 start: _utils.time_unit_type = 0, 

126 increment: _utils.time_unit_type = 100, 

127 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

128 ) -> None: 

129 self.start = _utils.to_seconds(start) 

130 self.increment = _utils.to_seconds(increment) 

131 self.max = _utils.to_seconds(max) 

132 

133 def __call__(self, retry_state: "RetryCallState") -> float: 

134 result = self.start + (self.increment * (retry_state.attempt_number - 1)) 

135 return max(0, min(result, self.max)) 

136 

137 

138class wait_exponential(wait_base): 

139 """Wait strategy that applies exponential backoff. 

140 

141 It allows for a customized multiplier and an ability to restrict the 

142 upper and lower limits to some maximum and minimum value. 

143 

144 The intervals are fixed (i.e. there is no jitter), so this strategy is 

145 suitable for balancing retries against latency when a required resource is 

146 unavailable for an unknown duration, but *not* suitable for resolving 

147 contention between multiple processes for a shared resource. Use 

148 wait_random_exponential for the latter case. 

149 """ 

150 

151 def __init__( 

152 self, 

153 multiplier: typing.Union[int, float] = 1, 

154 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

155 exp_base: typing.Union[int, float] = 2, 

156 min: _utils.time_unit_type = 0, # noqa 

157 ) -> None: 

158 self.multiplier = multiplier 

159 self.min = _utils.to_seconds(min) 

160 self.max = _utils.to_seconds(max) 

161 self.exp_base = exp_base 

162 

163 def __call__(self, retry_state: "RetryCallState") -> float: 

164 try: 

165 exp = self.exp_base ** (retry_state.attempt_number - 1) 

166 result = self.multiplier * exp 

167 except OverflowError: 

168 return self.max 

169 return max(max(0, self.min), min(result, self.max)) 

170 

171 

172class wait_random_exponential(wait_exponential): 

173 """Random wait with exponentially widening window. 

174 

175 An exponential backoff strategy used to mediate contention between multiple 

176 uncoordinated processes for a shared resource in distributed systems. This 

177 is the sense in which "exponential backoff" is meant in e.g. Ethernet 

178 networking, and corresponds to the "Full Jitter" algorithm described in 

179 this blog post: 

180 

181 https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ 

182 

183 Each retry occurs at a random time in a geometrically expanding interval. 

184 It allows for a custom multiplier and an ability to restrict the upper 

185 limit of the random interval to some maximum value. 

186 

187 Example:: 

188 

189 wait_random_exponential(multiplier=0.5, # initial window 0.5s 

190 max=60) # max 60s timeout 

191 

192 When waiting for an unavailable resource to become available again, as 

193 opposed to trying to resolve contention for a shared resource, the 

194 wait_exponential strategy (which uses a fixed interval) may be preferable. 

195 

196 """ 

197 

198 def __call__(self, retry_state: "RetryCallState") -> float: 

199 high = super().__call__(retry_state=retry_state) 

200 return random.uniform(0, high) 

201 

202 

203class wait_exponential_jitter(wait_base): 

204 """Wait strategy that applies exponential backoff and jitter. 

205 

206 It allows for a customized initial wait, maximum wait and jitter. 

207 

208 This implements the strategy described here: 

209 https://cloud.google.com/storage/docs/retry-strategy 

210 

211 The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum) 

212 where n is the retry count. 

213 """ 

214 

215 def __init__( 

216 self, 

217 initial: float = 1, 

218 max: float = _utils.MAX_WAIT, # noqa 

219 exp_base: float = 2, 

220 jitter: float = 1, 

221 ) -> None: 

222 self.initial = initial 

223 self.max = max 

224 self.exp_base = exp_base 

225 self.jitter = jitter 

226 

227 def __call__(self, retry_state: "RetryCallState") -> float: 

228 jitter = random.uniform(0, self.jitter) 

229 try: 

230 exp = self.exp_base ** (retry_state.attempt_number - 1) 

231 result = self.initial * exp + jitter 

232 except OverflowError: 

233 result = self.max 

234 return max(0, min(result, self.max))