Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_unary_async.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

46 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for retrying coroutine functions with exponential back-off. 

16 

17The :class:`AsyncRetry` decorator shares most functionality and behavior with 

18:class:`Retry`, but supports coroutine functions. Please refer to description 

19of :class:`Retry` for more details. 

20 

21By default, this decorator will retry transient 

22API errors (see :func:`if_transient_error`). For example: 

23 

24.. code-block:: python 

25 

26 @retry_async.AsyncRetry() 

27 async def call_flaky_rpc(): 

28 return await client.flaky_rpc() 

29 

30 # Will retry flaky_rpc() if it raises transient API errors. 

31 result = await call_flaky_rpc() 

32 

33You can pass a custom predicate to retry on different exceptions, such as 

34waiting for an eventually consistent item to be available: 

35 

36.. code-block:: python 

37 

38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) 

39 async def check_if_exists(): 

40 return await client.does_thing_exist() 

41 

42 is_available = await check_if_exists() 

43 

44Some client library methods apply retry automatically. These methods can accept 

45a ``retry`` parameter that allows you to configure the behavior: 

46 

47.. code-block:: python 

48 

49 my_retry = retry_async.AsyncRetry(timeout=60) 

50 result = await client.some_method(retry=my_retry) 

51 

52""" 

53 

54from __future__ import annotations 

55 

56import asyncio 

57import time 

58import functools 

59from typing import ( 

60 Awaitable, 

61 Any, 

62 Callable, 

63 Iterable, 

64 TypeVar, 

65 TYPE_CHECKING, 

66) 

67 

68from google.api_core.retry.retry_base import _BaseRetry 

69from google.api_core.retry.retry_base import _retry_error_helper 

70from google.api_core.retry.retry_base import exponential_sleep_generator 

71from google.api_core.retry.retry_base import build_retry_error 

72from google.api_core.retry.retry_base import RetryFailureReason 

73 

74# for backwards compatibility, expose helpers in this module 

75from google.api_core.retry.retry_base import if_exception_type # noqa 

76from google.api_core.retry.retry_base import if_transient_error # noqa 

77 

78if TYPE_CHECKING: 

79 import sys 

80 

81 if sys.version_info >= (3, 10): 

82 from typing import ParamSpec 

83 else: 

84 from typing_extensions import ParamSpec 

85 

86 _P = ParamSpec("_P") # target function call parameters 

87 _R = TypeVar("_R") # target function returned value 

88 

89_DEFAULT_INITIAL_DELAY = 1.0 # seconds 

90_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds 

91_DEFAULT_DELAY_MULTIPLIER = 2.0 

92_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds 

93_DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds 

94 

95 

96async def retry_target( 

97 target: Callable[[], Awaitable[_R]], 

98 predicate: Callable[[Exception], bool], 

99 sleep_generator: Iterable[float], 

100 timeout: float | None = None, 

101 on_error: Callable[[Exception], None] | None = None, 

102 exception_factory: Callable[ 

103 [list[Exception], RetryFailureReason, float | None], 

104 tuple[Exception, Exception | None], 

105 ] = build_retry_error, 

106 **kwargs, 

107): 

108 """Await a coroutine and retry if it fails. 

109 

110 This is the lowest-level retry helper. Generally, you'll use the 

111 higher-level retry helper :class:`Retry`. 

112 

113 Args: 

114 target(Callable[[], Any]): The function to call and retry. This must be a 

115 nullary function - apply arguments with `functools.partial`. 

116 predicate (Callable[Exception]): A callable used to determine if an 

117 exception raised by the target should be considered retryable. 

118 It should return True to retry or False otherwise. 

119 sleep_generator (Iterable[float]): An infinite iterator that determines 

120 how long to sleep between retries. 

121 timeout (Optional[float]): How long to keep retrying the target, in seconds. 

122 Note: timeout is only checked before initiating a retry, so the target may 

123 run past the timeout value as long as it is healthy. 

124 on_error (Optional[Callable[Exception]]): If given, the on_error 

125 callback will be called with each retryable exception raised by the 

126 target. Any error raised by this function will *not* be caught. 

127 exception_factory: A function that is called when the retryable reaches 

128 a terminal failure state, used to construct an exception to be raised. 

129 It takes a list of all exceptions encountered, a retry.RetryFailureReason 

130 enum indicating the failure cause, and the original timeout value 

131 as arguments. It should return a tuple of the exception to be raised, 

132 along with the cause exception if any. The default implementation will raise 

133 a RetryError on timeout, or the last exception encountered otherwise. 

134 deadline (float): DEPRECATED use ``timeout`` instead. For backward 

135 compatibility, if set it will override the ``timeout`` parameter. 

136 

137 Returns: 

138 Any: the return value of the target function. 

139 

140 Raises: 

141 ValueError: If the sleep generator stops yielding values. 

142 Exception: a custom exception specified by the exception_factory if provided. 

143 If no exception_factory is provided: 

144 google.api_core.RetryError: If the timeout is exceeded while retrying. 

145 Exception: If the target raises an error that isn't retryable. 

146 """ 

147 

148 timeout = kwargs.get("deadline", timeout) 

149 

150 deadline = time.monotonic() + timeout if timeout is not None else None 

151 error_list: list[Exception] = [] 

152 sleep_iter = iter(sleep_generator) 

153 

154 # continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper 

155 # TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535 

156 while True: 

157 try: 

158 return await target() 

159 # pylint: disable=broad-except 

160 # This function explicitly must deal with broad exceptions. 

161 except Exception as exc: 

162 # defer to shared logic for handling errors 

163 next_sleep = _retry_error_helper( 

164 exc, 

165 deadline, 

166 sleep_iter, 

167 error_list, 

168 predicate, 

169 on_error, 

170 exception_factory, 

171 timeout, 

172 ) 

173 # if exception not raised, sleep before next attempt 

174 await asyncio.sleep(next_sleep) 

175 

176 

177class AsyncRetry(_BaseRetry): 

178 """Exponential retry decorator for async coroutines. 

179 

180 This class is a decorator used to add exponential back-off retry behavior 

181 to an RPC call. 

182 

183 Although the default behavior is to retry transient API errors, a 

184 different predicate can be provided to retry other exceptions. 

185 

186 Args: 

187 predicate (Callable[Exception]): A callable that should return ``True`` 

188 if the given exception is retryable. 

189 initial (float): The minimum amount of time to delay in seconds. This 

190 must be greater than 0. 

191 maximum (float): The maximum amount of time to delay in seconds. 

192 multiplier (float): The multiplier applied to the delay. 

193 timeout (Optional[float]): How long to keep retrying in seconds. 

194 Note: timeout is only checked before initiating a retry, so the target may 

195 run past the timeout value as long as it is healthy. 

196 on_error (Optional[Callable[Exception]]): A function to call while processing 

197 a retryable exception. Any error raised by this function will 

198 *not* be caught. 

199 deadline (float): DEPRECATED use ``timeout`` instead. If set it will 

200 override ``timeout`` parameter. 

201 """ 

202 

203 def __call__( 

204 self, 

205 func: Callable[..., Awaitable[_R]], 

206 on_error: Callable[[Exception], Any] | None = None, 

207 ) -> Callable[_P, Awaitable[_R]]: 

208 """Wrap a callable with retry behavior. 

209 

210 Args: 

211 func (Callable): The callable or stream to add retry behavior to. 

212 on_error (Optional[Callable[Exception]]): If given, the 

213 on_error callback will be called with each retryable exception 

214 raised by the wrapped function. Any error raised by this 

215 function will *not* be caught. If on_error was specified in the 

216 constructor, this value will be ignored. 

217 

218 Returns: 

219 Callable: A callable that will invoke ``func`` with retry 

220 behavior. 

221 """ 

222 if self._on_error is not None: 

223 on_error = self._on_error 

224 

225 @functools.wraps(func) 

226 async def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R: 

227 """A wrapper that calls target function with retry.""" 

228 sleep_generator = exponential_sleep_generator( 

229 self._initial, self._maximum, multiplier=self._multiplier 

230 ) 

231 return await retry_target( 

232 functools.partial(func, *args, **kwargs), 

233 predicate=self._predicate, 

234 sleep_generator=sleep_generator, 

235 timeout=self._timeout, 

236 on_error=on_error, 

237 ) 

238 

239 return retry_wrapped_func