Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/tenacity/wait.py: 46%

82 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# Copyright 2016–2021 Julien Danjou 

2# Copyright 2016 Joshua Harlow 

3# Copyright 2013-2014 Ray Holder 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17import abc 

18import random 

19import typing 

20 

21from pip._vendor.tenacity import _utils 

22 

23if typing.TYPE_CHECKING: 

24 from pip._vendor.tenacity import RetryCallState 

25 

26 

27class wait_base(abc.ABC): 

28 """Abstract base class for wait strategies.""" 

29 

30 @abc.abstractmethod 

31 def __call__(self, retry_state: "RetryCallState") -> float: 

32 pass 

33 

34 def __add__(self, other: "wait_base") -> "wait_combine": 

35 return wait_combine(self, other) 

36 

37 def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: 

38 # make it possible to use multiple waits with the built-in sum function 

39 if other == 0: # type: ignore[comparison-overlap] 

40 return self 

41 return self.__add__(other) 

42 

43 

44WaitBaseT = typing.Union[wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]]] 

45 

46 

47class wait_fixed(wait_base): 

48 """Wait strategy that waits a fixed amount of time between each retry.""" 

49 

50 def __init__(self, wait: _utils.time_unit_type) -> None: 

51 self.wait_fixed = _utils.to_seconds(wait) 

52 

53 def __call__(self, retry_state: "RetryCallState") -> float: 

54 return self.wait_fixed 

55 

56 

57class wait_none(wait_fixed): 

58 """Wait strategy that doesn't wait at all before retrying.""" 

59 

60 def __init__(self) -> None: 

61 super().__init__(0) 

62 

63 

64class wait_random(wait_base): 

65 """Wait strategy that waits a random amount of time between min/max.""" 

66 

67 def __init__(self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1) -> None: # noqa 

68 self.wait_random_min = _utils.to_seconds(min) 

69 self.wait_random_max = _utils.to_seconds(max) 

70 

71 def __call__(self, retry_state: "RetryCallState") -> float: 

72 return self.wait_random_min + (random.random() * (self.wait_random_max - self.wait_random_min)) 

73 

74 

75class wait_combine(wait_base): 

76 """Combine several waiting strategies.""" 

77 

78 def __init__(self, *strategies: wait_base) -> None: 

79 self.wait_funcs = strategies 

80 

81 def __call__(self, retry_state: "RetryCallState") -> float: 

82 return sum(x(retry_state=retry_state) for x in self.wait_funcs) 

83 

84 

85class wait_chain(wait_base): 

86 """Chain two or more waiting strategies. 

87 

88 If all strategies are exhausted, the very last strategy is used 

89 thereafter. 

90 

91 For example:: 

92 

93 @retry(wait=wait_chain(*[wait_fixed(1) for i in range(3)] + 

94 [wait_fixed(2) for j in range(5)] + 

95 [wait_fixed(5) for k in range(4))) 

96 def wait_chained(): 

97 print("Wait 1s for 3 attempts, 2s for 5 attempts and 5s 

98 thereafter.") 

99 """ 

100 

101 def __init__(self, *strategies: wait_base) -> None: 

102 self.strategies = strategies 

103 

104 def __call__(self, retry_state: "RetryCallState") -> float: 

105 wait_func_no = min(max(retry_state.attempt_number, 1), len(self.strategies)) 

106 wait_func = self.strategies[wait_func_no - 1] 

107 return wait_func(retry_state=retry_state) 

108 

109 

110class wait_incrementing(wait_base): 

111 """Wait an incremental amount of time after each attempt. 

112 

113 Starting at a starting value and incrementing by a value for each attempt 

114 (and restricting the upper limit to some maximum value). 

115 """ 

116 

117 def __init__( 

118 self, 

119 start: _utils.time_unit_type = 0, 

120 increment: _utils.time_unit_type = 100, 

121 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

122 ) -> None: 

123 self.start = _utils.to_seconds(start) 

124 self.increment = _utils.to_seconds(increment) 

125 self.max = _utils.to_seconds(max) 

126 

127 def __call__(self, retry_state: "RetryCallState") -> float: 

128 result = self.start + (self.increment * (retry_state.attempt_number - 1)) 

129 return max(0, min(result, self.max)) 

130 

131 

132class wait_exponential(wait_base): 

133 """Wait strategy that applies exponential backoff. 

134 

135 It allows for a customized multiplier and an ability to restrict the 

136 upper and lower limits to some maximum and minimum value. 

137 

138 The intervals are fixed (i.e. there is no jitter), so this strategy is 

139 suitable for balancing retries against latency when a required resource is 

140 unavailable for an unknown duration, but *not* suitable for resolving 

141 contention between multiple processes for a shared resource. Use 

142 wait_random_exponential for the latter case. 

143 """ 

144 

145 def __init__( 

146 self, 

147 multiplier: typing.Union[int, float] = 1, 

148 max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa 

149 exp_base: typing.Union[int, float] = 2, 

150 min: _utils.time_unit_type = 0, # noqa 

151 ) -> None: 

152 self.multiplier = multiplier 

153 self.min = _utils.to_seconds(min) 

154 self.max = _utils.to_seconds(max) 

155 self.exp_base = exp_base 

156 

157 def __call__(self, retry_state: "RetryCallState") -> float: 

158 try: 

159 exp = self.exp_base ** (retry_state.attempt_number - 1) 

160 result = self.multiplier * exp 

161 except OverflowError: 

162 return self.max 

163 return max(max(0, self.min), min(result, self.max)) 

164 

165 

166class wait_random_exponential(wait_exponential): 

167 """Random wait with exponentially widening window. 

168 

169 An exponential backoff strategy used to mediate contention between multiple 

170 uncoordinated processes for a shared resource in distributed systems. This 

171 is the sense in which "exponential backoff" is meant in e.g. Ethernet 

172 networking, and corresponds to the "Full Jitter" algorithm described in 

173 this blog post: 

174 

175 https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ 

176 

177 Each retry occurs at a random time in a geometrically expanding interval. 

178 It allows for a custom multiplier and an ability to restrict the upper 

179 limit of the random interval to some maximum value. 

180 

181 Example:: 

182 

183 wait_random_exponential(multiplier=0.5, # initial window 0.5s 

184 max=60) # max 60s timeout 

185 

186 When waiting for an unavailable resource to become available again, as 

187 opposed to trying to resolve contention for a shared resource, the 

188 wait_exponential strategy (which uses a fixed interval) may be preferable. 

189 

190 """ 

191 

192 def __call__(self, retry_state: "RetryCallState") -> float: 

193 high = super().__call__(retry_state=retry_state) 

194 return random.uniform(0, high) 

195 

196 

197class wait_exponential_jitter(wait_base): 

198 """Wait strategy that applies exponential backoff and jitter. 

199 

200 It allows for a customized initial wait, maximum wait and jitter. 

201 

202 This implements the strategy described here: 

203 https://cloud.google.com/storage/docs/retry-strategy 

204 

205 The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum) 

206 where n is the retry count. 

207 """ 

208 

209 def __init__( 

210 self, 

211 initial: float = 1, 

212 max: float = _utils.MAX_WAIT, # noqa 

213 exp_base: float = 2, 

214 jitter: float = 1, 

215 ) -> None: 

216 self.initial = initial 

217 self.max = max 

218 self.exp_base = exp_base 

219 self.jitter = jitter 

220 

221 def __call__(self, retry_state: "RetryCallState") -> float: 

222 jitter = random.uniform(0, self.jitter) 

223 try: 

224 exp = self.exp_base ** (retry_state.attempt_number - 1) 

225 result = self.initial * exp + jitter 

226 except OverflowError: 

227 result = self.max 

228 return max(0, min(result, self.max))