Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/retry_async.py: 47%
70 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for retrying coroutine functions with exponential back-off.
17The :class:`AsyncRetry` decorator shares most functionality and behavior with
18:class:`Retry`, but supports coroutine functions. Please refer to description
19of :class:`Retry` for more details.
21By default, this decorator will retry transient
22API errors (see :func:`if_transient_error`). For example:
24.. code-block:: python
26 @retry_async.AsyncRetry()
27 async def call_flaky_rpc():
28 return await client.flaky_rpc()
30 # Will retry flaky_rpc() if it raises transient API errors.
31 result = await call_flaky_rpc()
33You can pass a custom predicate to retry on different exceptions, such as
34waiting for an eventually consistent item to be available:
36.. code-block:: python
38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
39 async def check_if_exists():
40 return await client.does_thing_exist()
42 is_available = await check_if_exists()
44Some client library methods apply retry automatically. These methods can accept
45a ``retry`` parameter that allows you to configure the behavior:
47.. code-block:: python
49 my_retry = retry_async.AsyncRetry(deadline=60)
50 result = await client.some_method(retry=my_retry)
52"""
54import asyncio
55import datetime
56import functools
57import logging
59from google.api_core import datetime_helpers
60from google.api_core import exceptions
61from google.api_core.retry import exponential_sleep_generator
62from google.api_core.retry import if_exception_type # noqa: F401
63from google.api_core.retry import if_transient_error
66_LOGGER = logging.getLogger(__name__)
67_DEFAULT_INITIAL_DELAY = 1.0 # seconds
68_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
69_DEFAULT_DELAY_MULTIPLIER = 2.0
70_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
71_DEFAULT_TIMEOUT = 60.0 * 2.0 # seconds
74async def retry_target(
75 target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs
76):
77 """Call a function and retry if it fails.
79 This is the lowest-level retry helper. Generally, you'll use the
80 higher-level retry helper :class:`Retry`.
82 Args:
83 target(Callable): The function to call and retry. This must be a
84 nullary function - apply arguments with `functools.partial`.
85 predicate (Callable[Exception]): A callable used to determine if an
86 exception raised by the target should be considered retryable.
87 It should return True to retry or False otherwise.
88 sleep_generator (Iterable[float]): An infinite iterator that determines
89 how long to sleep between retries.
90 timeout (float): How long to keep retrying the target, in seconds.
91 on_error (Callable[Exception]): A function to call while processing a
92 retryable exception. Any error raised by this function will *not*
93 be caught.
94 deadline (float): DEPRECATED use ``timeout`` instead. For backward
95 compatibility, if set it will override the ``timeout`` parameter.
97 Returns:
98 Any: the return value of the target function.
100 Raises:
101 google.api_core.RetryError: If the deadline is exceeded while retrying.
102 ValueError: If the sleep generator stops yielding values.
103 Exception: If the target raises a method that isn't retryable.
104 """
106 timeout = kwargs.get("deadline", timeout)
108 deadline_dt = (
109 (datetime_helpers.utcnow() + datetime.timedelta(seconds=timeout))
110 if timeout
111 else None
112 )
114 last_exc = None
116 for sleep in sleep_generator:
117 try:
118 if not deadline_dt:
119 return await target()
120 else:
121 return await asyncio.wait_for(
122 target(),
123 timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds(),
124 )
125 # pylint: disable=broad-except
126 # This function explicitly must deal with broad exceptions.
127 except Exception as exc:
128 if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError):
129 raise
130 last_exc = exc
131 if on_error is not None:
132 on_error(exc)
134 now = datetime_helpers.utcnow()
136 if deadline_dt:
137 if deadline_dt <= now:
138 # Chains the raising RetryError with the root cause error,
139 # which helps observability and debugability.
140 raise exceptions.RetryError(
141 "Timeout of {:.1f}s exceeded while calling target function".format(
142 timeout
143 ),
144 last_exc,
145 ) from last_exc
146 else:
147 time_to_deadline = (deadline_dt - now).total_seconds()
148 sleep = min(time_to_deadline, sleep)
150 _LOGGER.debug(
151 "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
152 )
153 await asyncio.sleep(sleep)
155 raise ValueError("Sleep generator stopped yielding sleep values.")
158class AsyncRetry:
159 """Exponential retry decorator for async functions.
161 This class is a decorator used to add exponential back-off retry behavior
162 to an RPC call.
164 Although the default behavior is to retry transient API errors, a
165 different predicate can be provided to retry other exceptions.
167 Args:
168 predicate (Callable[Exception]): A callable that should return ``True``
169 if the given exception is retryable.
170 initial (float): The minimum a,out of time to delay in seconds. This
171 must be greater than 0.
172 maximum (float): The maximum amount of time to delay in seconds.
173 multiplier (float): The multiplier applied to the delay.
174 timeout (float): How long to keep retrying in seconds.
175 on_error (Callable[Exception]): A function to call while processing
176 a retryable exception. Any error raised by this function will
177 *not* be caught.
178 deadline (float): DEPRECATED use ``timeout`` instead. If set it will
179 override ``timeout`` parameter.
180 """
182 def __init__(
183 self,
184 predicate=if_transient_error,
185 initial=_DEFAULT_INITIAL_DELAY,
186 maximum=_DEFAULT_MAXIMUM_DELAY,
187 multiplier=_DEFAULT_DELAY_MULTIPLIER,
188 timeout=_DEFAULT_TIMEOUT,
189 on_error=None,
190 **kwargs
191 ):
192 self._predicate = predicate
193 self._initial = initial
194 self._multiplier = multiplier
195 self._maximum = maximum
196 self._timeout = kwargs.get("deadline", timeout)
197 self._deadline = self._timeout
198 self._on_error = on_error
200 def __call__(self, func, on_error=None):
201 """Wrap a callable with retry behavior.
203 Args:
204 func (Callable): The callable to add retry behavior to.
205 on_error (Callable[Exception]): A function to call while processing
206 a retryable exception. Any error raised by this function will
207 *not* be caught.
209 Returns:
210 Callable: A callable that will invoke ``func`` with retry
211 behavior.
212 """
213 if self._on_error is not None:
214 on_error = self._on_error
216 @functools.wraps(func)
217 async def retry_wrapped_func(*args, **kwargs):
218 """A wrapper that calls target function with retry."""
219 target = functools.partial(func, *args, **kwargs)
220 sleep_generator = exponential_sleep_generator(
221 self._initial, self._maximum, multiplier=self._multiplier
222 )
223 return await retry_target(
224 target,
225 self._predicate,
226 sleep_generator,
227 self._timeout,
228 on_error=on_error,
229 )
231 return retry_wrapped_func
233 def _replace(
234 self,
235 predicate=None,
236 initial=None,
237 maximum=None,
238 multiplier=None,
239 timeout=None,
240 on_error=None,
241 ):
242 return AsyncRetry(
243 predicate=predicate or self._predicate,
244 initial=initial or self._initial,
245 maximum=maximum or self._maximum,
246 multiplier=multiplier or self._multiplier,
247 timeout=timeout or self._timeout,
248 on_error=on_error or self._on_error,
249 )
251 def with_deadline(self, deadline):
252 """Return a copy of this retry with the given deadline.
253 DEPRECATED: use :meth:`with_timeout` instead.
255 Args:
256 deadline (float): How long to keep retrying.
258 Returns:
259 AsyncRetry: A new retry instance with the given deadline.
260 """
261 return self._replace(timeout=deadline)
263 def with_timeout(self, timeout):
264 """Return a copy of this retry with the given timeout.
266 Args:
267 timeout (float): How long to keep retrying, in seconds.
269 Returns:
270 AsyncRetry: A new retry instance with the given timeout.
271 """
272 return self._replace(timeout=timeout)
274 def with_predicate(self, predicate):
275 """Return a copy of this retry with the given predicate.
277 Args:
278 predicate (Callable[Exception]): A callable that should return
279 ``True`` if the given exception is retryable.
281 Returns:
282 AsyncRetry: A new retry instance with the given predicate.
283 """
284 return self._replace(predicate=predicate)
286 def with_delay(self, initial=None, maximum=None, multiplier=None):
287 """Return a copy of this retry with the given delay options.
289 Args:
290 initial (float): The minimum amount of time to delay. This must
291 be greater than 0.
292 maximum (float): The maximum amount of time to delay.
293 multiplier (float): The multiplier applied to the delay.
295 Returns:
296 AsyncRetry: A new retry instance with the given predicate.
297 """
298 return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
300 def __str__(self):
301 return (
302 "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, "
303 "multiplier={:.1f}, timeout={:.1f}, on_error={}>".format(
304 self._predicate,
305 self._initial,
306 self._maximum,
307 self._multiplier,
308 self._timeout,
309 self._on_error,
310 )
311 )