1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for retrying coroutine functions with exponential back-off.
16
17The :class:`AsyncRetry` decorator shares most functionality and behavior with
18:class:`Retry`, but supports coroutine functions. Please refer to description
19of :class:`Retry` for more details.
20
21By default, this decorator will retry transient
22API errors (see :func:`if_transient_error`). For example:
23
24.. code-block:: python
25
26 @retry_async.AsyncRetry()
27 async def call_flaky_rpc():
28 return await client.flaky_rpc()
29
30 # Will retry flaky_rpc() if it raises transient API errors.
31 result = await call_flaky_rpc()
32
33You can pass a custom predicate to retry on different exceptions, such as
34waiting for an eventually consistent item to be available:
35
36.. code-block:: python
37
38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
39 async def check_if_exists():
40 return await client.does_thing_exist()
41
42 is_available = await check_if_exists()
43
44Some client library methods apply retry automatically. These methods can accept
45a ``retry`` parameter that allows you to configure the behavior:
46
47.. code-block:: python
48
49 my_retry = retry_async.AsyncRetry(timeout=60)
50 result = await client.some_method(retry=my_retry)
51
52"""
53
54from __future__ import annotations
55
56import asyncio
57import time
58import functools
59from typing import (
60 Awaitable,
61 Any,
62 Callable,
63 Iterable,
64 TypeVar,
65 TYPE_CHECKING,
66)
67
68from google.api_core.retry.retry_base import _BaseRetry
69from google.api_core.retry.retry_base import _retry_error_helper
70from google.api_core.retry.retry_base import exponential_sleep_generator
71from google.api_core.retry.retry_base import build_retry_error
72from google.api_core.retry.retry_base import RetryFailureReason
73
74# for backwards compatibility, expose helpers in this module
75from google.api_core.retry.retry_base import if_exception_type # noqa
76from google.api_core.retry.retry_base import if_transient_error # noqa
77
78if TYPE_CHECKING:
79 import sys
80
81 if sys.version_info >= (3, 10):
82 from typing import ParamSpec
83 else:
84 from typing_extensions import ParamSpec
85
86 _P = ParamSpec("_P") # target function call parameters
87 _R = TypeVar("_R") # target function returned value
88
89_DEFAULT_INITIAL_DELAY = 1.0 # seconds
90_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
91_DEFAULT_DELAY_MULTIPLIER = 2.0
92_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
93_DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds
94
95
96async def retry_target(
97 target: Callable[[], Awaitable[_R]],
98 predicate: Callable[[Exception], bool],
99 sleep_generator: Iterable[float],
100 timeout: float | None = None,
101 on_error: Callable[[Exception], None] | None = None,
102 exception_factory: Callable[
103 [list[Exception], RetryFailureReason, float | None],
104 tuple[Exception, Exception | None],
105 ] = build_retry_error,
106 **kwargs,
107):
108 """Await a coroutine and retry if it fails.
109
110 This is the lowest-level retry helper. Generally, you'll use the
111 higher-level retry helper :class:`Retry`.
112
113 Args:
114 target(Callable[[], Any]): The function to call and retry. This must be a
115 nullary function - apply arguments with `functools.partial`.
116 predicate (Callable[Exception]): A callable used to determine if an
117 exception raised by the target should be considered retryable.
118 It should return True to retry or False otherwise.
119 sleep_generator (Iterable[float]): An infinite iterator that determines
120 how long to sleep between retries.
121 timeout (Optional[float]): How long to keep retrying the target, in seconds.
122 Note: timeout is only checked before initiating a retry, so the target may
123 run past the timeout value as long as it is healthy.
124 on_error (Optional[Callable[Exception]]): If given, the on_error
125 callback will be called with each retryable exception raised by the
126 target. Any error raised by this function will *not* be caught.
127 exception_factory: A function that is called when the retryable reaches
128 a terminal failure state, used to construct an exception to be raised.
129 It takes a list of all exceptions encountered, a retry.RetryFailureReason
130 enum indicating the failure cause, and the original timeout value
131 as arguments. It should return a tuple of the exception to be raised,
132 along with the cause exception if any. The default implementation will raise
133 a RetryError on timeout, or the last exception encountered otherwise.
134 deadline (float): DEPRECATED use ``timeout`` instead. For backward
135 compatibility, if set it will override the ``timeout`` parameter.
136
137 Returns:
138 Any: the return value of the target function.
139
140 Raises:
141 ValueError: If the sleep generator stops yielding values.
142 Exception: a custom exception specified by the exception_factory if provided.
143 If no exception_factory is provided:
144 google.api_core.RetryError: If the timeout is exceeded while retrying.
145 Exception: If the target raises an error that isn't retryable.
146 """
147
148 timeout = kwargs.get("deadline", timeout)
149
150 deadline = time.monotonic() + timeout if timeout is not None else None
151 error_list: list[Exception] = []
152 sleep_iter = iter(sleep_generator)
153
154 # continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
155 # TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
156 while True:
157 try:
158 return await target()
159 # pylint: disable=broad-except
160 # This function explicitly must deal with broad exceptions.
161 except Exception as exc:
162 # defer to shared logic for handling errors
163 next_sleep = _retry_error_helper(
164 exc,
165 deadline,
166 sleep_iter,
167 error_list,
168 predicate,
169 on_error,
170 exception_factory,
171 timeout,
172 )
173 # if exception not raised, sleep before next attempt
174 await asyncio.sleep(next_sleep)
175
176
177class AsyncRetry(_BaseRetry):
178 """Exponential retry decorator for async coroutines.
179
180 This class is a decorator used to add exponential back-off retry behavior
181 to an RPC call.
182
183 Although the default behavior is to retry transient API errors, a
184 different predicate can be provided to retry other exceptions.
185
186 Args:
187 predicate (Callable[Exception]): A callable that should return ``True``
188 if the given exception is retryable.
189 initial (float): The minimum amount of time to delay in seconds. This
190 must be greater than 0.
191 maximum (float): The maximum amount of time to delay in seconds.
192 multiplier (float): The multiplier applied to the delay.
193 timeout (Optional[float]): How long to keep retrying in seconds.
194 Note: timeout is only checked before initiating a retry, so the target may
195 run past the timeout value as long as it is healthy.
196 on_error (Optional[Callable[Exception]]): A function to call while processing
197 a retryable exception. Any error raised by this function will
198 *not* be caught.
199 deadline (float): DEPRECATED use ``timeout`` instead. If set it will
200 override ``timeout`` parameter.
201 """
202
203 def __call__(
204 self,
205 func: Callable[..., Awaitable[_R]],
206 on_error: Callable[[Exception], Any] | None = None,
207 ) -> Callable[_P, Awaitable[_R]]:
208 """Wrap a callable with retry behavior.
209
210 Args:
211 func (Callable): The callable or stream to add retry behavior to.
212 on_error (Optional[Callable[Exception]]): If given, the
213 on_error callback will be called with each retryable exception
214 raised by the wrapped function. Any error raised by this
215 function will *not* be caught. If on_error was specified in the
216 constructor, this value will be ignored.
217
218 Returns:
219 Callable: A callable that will invoke ``func`` with retry
220 behavior.
221 """
222 if self._on_error is not None:
223 on_error = self._on_error
224
225 @functools.wraps(func)
226 async def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
227 """A wrapper that calls target function with retry."""
228 sleep_generator = exponential_sleep_generator(
229 self._initial, self._maximum, multiplier=self._multiplier
230 )
231 return await retry_target(
232 functools.partial(func, *args, **kwargs),
233 predicate=self._predicate,
234 sleep_generator=sleep_generator,
235 timeout=self._timeout,
236 on_error=on_error,
237 )
238
239 return retry_wrapped_func