Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/retry_async.py: 47%

70 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for retrying coroutine functions with exponential back-off. 

16 

17The :class:`AsyncRetry` decorator shares most functionality and behavior with 

18:class:`Retry`, but supports coroutine functions. Please refer to description 

19of :class:`Retry` for more details. 

20 

21By default, this decorator will retry transient 

22API errors (see :func:`if_transient_error`). For example: 

23 

24.. code-block:: python 

25 

26 @retry_async.AsyncRetry() 

27 async def call_flaky_rpc(): 

28 return await client.flaky_rpc() 

29 

30 # Will retry flaky_rpc() if it raises transient API errors. 

31 result = await call_flaky_rpc() 

32 

33You can pass a custom predicate to retry on different exceptions, such as 

34waiting for an eventually consistent item to be available: 

35 

36.. code-block:: python 

37 

38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) 

39 async def check_if_exists(): 

40 return await client.does_thing_exist() 

41 

42 is_available = await check_if_exists() 

43 

44Some client library methods apply retry automatically. These methods can accept 

45a ``retry`` parameter that allows you to configure the behavior: 

46 

47.. code-block:: python 

48 

49 my_retry = retry_async.AsyncRetry(deadline=60) 

50 result = await client.some_method(retry=my_retry) 

51 

52""" 

53 

54import asyncio 

55import datetime 

56import functools 

57import logging 

58 

59from google.api_core import datetime_helpers 

60from google.api_core import exceptions 

61from google.api_core.retry import exponential_sleep_generator 

62from google.api_core.retry import if_exception_type # noqa: F401 

63from google.api_core.retry import if_transient_error 

64 

65 

66_LOGGER = logging.getLogger(__name__) 

67_DEFAULT_INITIAL_DELAY = 1.0 # seconds 

68_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds 

69_DEFAULT_DELAY_MULTIPLIER = 2.0 

70_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds 

71_DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds 

72 

73 

74async def retry_target( 

75 target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs 

76): 

77 """Call a function and retry if it fails. 

78 

79 This is the lowest-level retry helper. Generally, you'll use the 

80 higher-level retry helper :class:`Retry`. 

81 

82 Args: 

83 target(Callable): The function to call and retry. This must be a 

84 nullary function - apply arguments with `functools.partial`. 

85 predicate (Callable[Exception]): A callable used to determine if an 

86 exception raised by the target should be considered retryable. 

87 It should return True to retry or False otherwise. 

88 sleep_generator (Iterable[float]): An infinite iterator that determines 

89 how long to sleep between retries. 

90 timeout (float): How long to keep retrying the target, in seconds. 

91 on_error (Callable[Exception]): A function to call while processing a 

92 retryable exception. Any error raised by this function will *not* 

93 be caught. 

94 deadline (float): DEPRECATED use ``timeout`` instead. For backward 

95 compatibility, if set it will override the ``timeout`` parameter. 

96 

97 Returns: 

98 Any: the return value of the target function. 

99 

100 Raises: 

101 google.api_core.RetryError: If the deadline is exceeded while retrying. 

102 ValueError: If the sleep generator stops yielding values. 

103 Exception: If the target raises a method that isn't retryable. 

104 """ 

105 

106 timeout = kwargs.get("deadline", timeout) 

107 

108 deadline_dt = ( 

109 (datetime_helpers.utcnow() + datetime.timedelta(seconds=timeout)) 

110 if timeout 

111 else None 

112 ) 

113 

114 last_exc = None 

115 

116 for sleep in sleep_generator: 

117 try: 

118 if not deadline_dt: 

119 return await target() 

120 else: 

121 return await asyncio.wait_for( 

122 target(), 

123 timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds(), 

124 ) 

125 # pylint: disable=broad-except 

126 # This function explicitly must deal with broad exceptions. 

127 except Exception as exc: 

128 if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError): 

129 raise 

130 last_exc = exc 

131 if on_error is not None: 

132 on_error(exc) 

133 

134 now = datetime_helpers.utcnow() 

135 

136 if deadline_dt: 

137 if deadline_dt <= now: 

138 # Chains the raising RetryError with the root cause error, 

139 # which helps observability and debugability. 

140 raise exceptions.RetryError( 

141 "Timeout of {:.1f}s exceeded while calling target function".format( 

142 timeout 

143 ), 

144 last_exc, 

145 ) from last_exc 

146 else: 

147 time_to_deadline = (deadline_dt - now).total_seconds() 

148 sleep = min(time_to_deadline, sleep) 

149 

150 _LOGGER.debug( 

151 "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep) 

152 ) 

153 await asyncio.sleep(sleep) 

154 

155 raise ValueError("Sleep generator stopped yielding sleep values.") 

156 

157 

158class AsyncRetry: 

159 """Exponential retry decorator for async functions. 

160 

161 This class is a decorator used to add exponential back-off retry behavior 

162 to an RPC call. 

163 

164 Although the default behavior is to retry transient API errors, a 

165 different predicate can be provided to retry other exceptions. 

166 

167 Args: 

168 predicate (Callable[Exception]): A callable that should return ``True`` 

169 if the given exception is retryable. 

170 initial (float): The minimum a,out of time to delay in seconds. This 

171 must be greater than 0. 

172 maximum (float): The maximum amount of time to delay in seconds. 

173 multiplier (float): The multiplier applied to the delay. 

174 timeout (float): How long to keep retrying in seconds. 

175 on_error (Callable[Exception]): A function to call while processing 

176 a retryable exception. Any error raised by this function will 

177 *not* be caught. 

178 deadline (float): DEPRECATED use ``timeout`` instead. If set it will 

179 override ``timeout`` parameter. 

180 """ 

181 

182 def __init__( 

183 self, 

184 predicate=if_transient_error, 

185 initial=_DEFAULT_INITIAL_DELAY, 

186 maximum=_DEFAULT_MAXIMUM_DELAY, 

187 multiplier=_DEFAULT_DELAY_MULTIPLIER, 

188 timeout=_DEFAULT_TIMEOUT, 

189 on_error=None, 

190 **kwargs 

191 ): 

192 self._predicate = predicate 

193 self._initial = initial 

194 self._multiplier = multiplier 

195 self._maximum = maximum 

196 self._timeout = kwargs.get("deadline", timeout) 

197 self._deadline = self._timeout 

198 self._on_error = on_error 

199 

200 def __call__(self, func, on_error=None): 

201 """Wrap a callable with retry behavior. 

202 

203 Args: 

204 func (Callable): The callable to add retry behavior to. 

205 on_error (Callable[Exception]): A function to call while processing 

206 a retryable exception. Any error raised by this function will 

207 *not* be caught. 

208 

209 Returns: 

210 Callable: A callable that will invoke ``func`` with retry 

211 behavior. 

212 """ 

213 if self._on_error is not None: 

214 on_error = self._on_error 

215 

216 @functools.wraps(func) 

217 async def retry_wrapped_func(*args, **kwargs): 

218 """A wrapper that calls target function with retry.""" 

219 target = functools.partial(func, *args, **kwargs) 

220 sleep_generator = exponential_sleep_generator( 

221 self._initial, self._maximum, multiplier=self._multiplier 

222 ) 

223 return await retry_target( 

224 target, 

225 self._predicate, 

226 sleep_generator, 

227 self._timeout, 

228 on_error=on_error, 

229 ) 

230 

231 return retry_wrapped_func 

232 

233 def _replace( 

234 self, 

235 predicate=None, 

236 initial=None, 

237 maximum=None, 

238 multiplier=None, 

239 timeout=None, 

240 on_error=None, 

241 ): 

242 return AsyncRetry( 

243 predicate=predicate or self._predicate, 

244 initial=initial or self._initial, 

245 maximum=maximum or self._maximum, 

246 multiplier=multiplier or self._multiplier, 

247 timeout=timeout or self._timeout, 

248 on_error=on_error or self._on_error, 

249 ) 

250 

251 def with_deadline(self, deadline): 

252 """Return a copy of this retry with the given deadline. 

253 DEPRECATED: use :meth:`with_timeout` instead. 

254 

255 Args: 

256 deadline (float): How long to keep retrying. 

257 

258 Returns: 

259 AsyncRetry: A new retry instance with the given deadline. 

260 """ 

261 return self._replace(timeout=deadline) 

262 

263 def with_timeout(self, timeout): 

264 """Return a copy of this retry with the given timeout. 

265 

266 Args: 

267 timeout (float): How long to keep retrying, in seconds. 

268 

269 Returns: 

270 AsyncRetry: A new retry instance with the given timeout. 

271 """ 

272 return self._replace(timeout=timeout) 

273 

274 def with_predicate(self, predicate): 

275 """Return a copy of this retry with the given predicate. 

276 

277 Args: 

278 predicate (Callable[Exception]): A callable that should return 

279 ``True`` if the given exception is retryable. 

280 

281 Returns: 

282 AsyncRetry: A new retry instance with the given predicate. 

283 """ 

284 return self._replace(predicate=predicate) 

285 

286 def with_delay(self, initial=None, maximum=None, multiplier=None): 

287 """Return a copy of this retry with the given delay options. 

288 

289 Args: 

290 initial (float): The minimum amount of time to delay. This must 

291 be greater than 0. 

292 maximum (float): The maximum amount of time to delay. 

293 multiplier (float): The multiplier applied to the delay. 

294 

295 Returns: 

296 AsyncRetry: A new retry instance with the given predicate. 

297 """ 

298 return self._replace(initial=initial, maximum=maximum, multiplier=multiplier) 

299 

300 def __str__(self): 

301 return ( 

302 "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, " 

303 "multiplier={:.1f}, timeout={:.1f}, on_error={}>".format( 

304 self._predicate, 

305 self._initial, 

306 self._maximum, 

307 self._multiplier, 

308 self._timeout, 

309 self._on_error, 

310 ) 

311 )