Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/retry/retry_unary_async.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for retrying coroutine functions with exponential back-off. 

16 

17The :class:`AsyncRetry` decorator shares most functionality and behavior with 

18:class:`Retry`, but supports coroutine functions. Please refer to description 

19of :class:`Retry` for more details. 

20 

21By default, this decorator will retry transient 

22API errors (see :func:`if_transient_error`). For example: 

23 

24.. code-block:: python 

25 

26 @retry_async.AsyncRetry() 

27 async def call_flaky_rpc(): 

28 return await client.flaky_rpc() 

29 

30 # Will retry flaky_rpc() if it raises transient API errors. 

31 result = await call_flaky_rpc() 

32 

33You can pass a custom predicate to retry on different exceptions, such as 

34waiting for an eventually consistent item to be available: 

35 

36.. code-block:: python 

37 

38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) 

39 async def check_if_exists(): 

40 return await client.does_thing_exist() 

41 

42 is_available = await check_if_exists() 

43 

44Some client library methods apply retry automatically. These methods can accept 

45a ``retry`` parameter that allows you to configure the behavior: 

46 

47.. code-block:: python 

48 

49 my_retry = retry_async.AsyncRetry(timeout=60) 

50 result = await client.some_method(retry=my_retry) 

51 

52""" 

53 

54from __future__ import annotations 

55 

56import asyncio 

57import time 

58import functools 

59from typing import ( 

60 Awaitable, 

61 Any, 

62 Callable, 

63 Iterable, 

64 TypeVar, 

65 TYPE_CHECKING, 

66) 

67 

68from google.api_core.retry.retry_base import _BaseRetry 

69from google.api_core.retry.retry_base import _retry_error_helper 

70from google.api_core.retry.retry_base import exponential_sleep_generator 

71from google.api_core.retry.retry_base import build_retry_error 

72from google.api_core.retry.retry_base import RetryFailureReason 

73 

74# for backwards compatibility, expose helpers in this module 

75from google.api_core.retry.retry_base import if_exception_type # noqa 

76from google.api_core.retry.retry_base import if_transient_error # noqa 

77 

78if TYPE_CHECKING: 

79 import sys 

80 

81 if sys.version_info >= (3, 10): 

82 from typing import ParamSpec 

83 else: 

84 from typing_extensions import ParamSpec 

85 

86 _P = ParamSpec("_P") # target function call parameters 

87 _R = TypeVar("_R") # target function returned value 

88 

89_DEFAULT_INITIAL_DELAY = 1.0 # seconds 

90_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds 

91_DEFAULT_DELAY_MULTIPLIER = 2.0 

92_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds 

93_DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds 

94 

95 

96async def retry_target( 

97 target: Callable[_P, Awaitable[_R]], 

98 predicate: Callable[[Exception], bool], 

99 sleep_generator: Iterable[float], 

100 timeout: float | None = None, 

101 on_error: Callable[[Exception], None] | None = None, 

102 exception_factory: Callable[ 

103 [list[Exception], RetryFailureReason, float | None], 

104 tuple[Exception, Exception | None], 

105 ] = build_retry_error, 

106 **kwargs, 

107): 

108 """Await a coroutine and retry if it fails. 

109 

110 This is the lowest-level retry helper. Generally, you'll use the 

111 higher-level retry helper :class:`Retry`. 

112 

113 Args: 

114 target(Callable[[], Any]): The function to call and retry. This must be a 

115 nullary function - apply arguments with `functools.partial`. 

116 predicate (Callable[Exception]): A callable used to determine if an 

117 exception raised by the target should be considered retryable. 

118 It should return True to retry or False otherwise. 

119 sleep_generator (Iterable[float]): An infinite iterator that determines 

120 how long to sleep between retries. 

121 timeout (Optional[float]): How long to keep retrying the target, in seconds. 

122 Note: timeout is only checked before initiating a retry, so the target may 

123 run past the timeout value as long as it is healthy. 

124 on_error (Optional[Callable[Exception]]): If given, the on_error 

125 callback will be called with each retryable exception raised by the 

126 target. Any error raised by this function will *not* be caught. 

127 exception_factory: A function that is called when the retryable reaches 

128 a terminal failure state, used to construct an exception to be raised. 

129 It takes a list of all exceptions encountered, a retry.RetryFailureReason 

130 enum indicating the failure cause, and the original timeout value 

131 as arguments. It should return a tuple of the exception to be raised, 

132 along with the cause exception if any. The default implementation will raise 

133 a RetryError on timeout, or the last exception encountered otherwise. 

134 deadline (float): DEPRECATED use ``timeout`` instead. For backward 

135 compatibility, if set it will override the ``timeout`` parameter. 

136 

137 Returns: 

138 Any: the return value of the target function. 

139 

140 Raises: 

141 ValueError: If the sleep generator stops yielding values. 

142 Exception: a custom exception specified by the exception_factory if provided. 

143 If no exception_factory is provided: 

144 google.api_core.RetryError: If the timeout is exceeded while retrying. 

145 Exception: If the target raises an error that isn't retryable. 

146 """ 

147 

148 timeout = kwargs.get("deadline", timeout) 

149 

150 deadline = time.monotonic() + timeout if timeout is not None else None 

151 error_list: list[Exception] = [] 

152 

153 for sleep in sleep_generator: 

154 try: 

155 return await target() 

156 # pylint: disable=broad-except 

157 # This function explicitly must deal with broad exceptions. 

158 except Exception as exc: 

159 # defer to shared logic for handling errors 

160 _retry_error_helper( 

161 exc, 

162 deadline, 

163 sleep, 

164 error_list, 

165 predicate, 

166 on_error, 

167 exception_factory, 

168 timeout, 

169 ) 

170 # if exception not raised, sleep before next attempt 

171 await asyncio.sleep(sleep) 

172 

173 raise ValueError("Sleep generator stopped yielding sleep values.") 

174 

175 

176class AsyncRetry(_BaseRetry): 

177 """Exponential retry decorator for async coroutines. 

178 

179 This class is a decorator used to add exponential back-off retry behavior 

180 to an RPC call. 

181 

182 Although the default behavior is to retry transient API errors, a 

183 different predicate can be provided to retry other exceptions. 

184 

185 Args: 

186 predicate (Callable[Exception]): A callable that should return ``True`` 

187 if the given exception is retryable. 

188 initial (float): The minimum amount of time to delay in seconds. This 

189 must be greater than 0. 

190 maximum (float): The maximum amount of time to delay in seconds. 

191 multiplier (float): The multiplier applied to the delay. 

192 timeout (Optional[float]): How long to keep retrying in seconds. 

193 Note: timeout is only checked before initiating a retry, so the target may 

194 run past the timeout value as long as it is healthy. 

195 on_error (Optional[Callable[Exception]]): A function to call while processing 

196 a retryable exception. Any error raised by this function will 

197 *not* be caught. 

198 deadline (float): DEPRECATED use ``timeout`` instead. If set it will 

199 override ``timeout`` parameter. 

200 """ 

201 

202 def __call__( 

203 self, 

204 func: Callable[..., Awaitable[_R]], 

205 on_error: Callable[[Exception], Any] | None = None, 

206 ) -> Callable[_P, Awaitable[_R]]: 

207 """Wrap a callable with retry behavior. 

208 

209 Args: 

210 func (Callable): The callable or stream to add retry behavior to. 

211 on_error (Optional[Callable[Exception]]): If given, the 

212 on_error callback will be called with each retryable exception 

213 raised by the wrapped function. Any error raised by this 

214 function will *not* be caught. If on_error was specified in the 

215 constructor, this value will be ignored. 

216 

217 Returns: 

218 Callable: A callable that will invoke ``func`` with retry 

219 behavior. 

220 """ 

221 if self._on_error is not None: 

222 on_error = self._on_error 

223 

224 @functools.wraps(func) 

225 async def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R: 

226 """A wrapper that calls target function with retry.""" 

227 sleep_generator = exponential_sleep_generator( 

228 self._initial, self._maximum, multiplier=self._multiplier 

229 ) 

230 return await retry_target( 

231 functools.partial(func, *args, **kwargs), 

232 predicate=self._predicate, 

233 sleep_generator=sleep_generator, 

234 timeout=self._timeout, 

235 on_error=on_error, 

236 ) 

237 

238 return retry_wrapped_func