1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared classes and functions for retrying requests.
16
17:class:`_BaseRetry` is the base class for :class:`Retry`,
18:class:`AsyncRetry`, :class:`StreamingRetry`, and :class:`AsyncStreamingRetry`.
19"""
20
21from __future__ import annotations
22
23import logging
24import random
25import time
26
27from enum import Enum
28from typing import Any, Callable, Optional, TYPE_CHECKING
29
30import requests.exceptions
31
32from google.api_core import exceptions
33from google.auth import exceptions as auth_exceptions
34
35if TYPE_CHECKING:
36 import sys
37
38 if sys.version_info >= (3, 11):
39 from typing import Self
40 else:
41 from typing_extensions import Self
42
43_DEFAULT_INITIAL_DELAY = 1.0 # seconds
44_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
45_DEFAULT_DELAY_MULTIPLIER = 2.0
46_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
47
48_LOGGER = logging.getLogger("google.api_core.retry")
49
50
51def if_exception_type(
52 *exception_types: type[Exception],
53) -> Callable[[Exception], bool]:
54 """Creates a predicate to check if the exception is of a given type.
55
56 Args:
57 exception_types (Sequence[:func:`type`]): The exception types to check
58 for.
59
60 Returns:
61 Callable[Exception]: A predicate that returns True if the provided
62 exception is of the given type(s).
63 """
64
65 def if_exception_type_predicate(exception: Exception) -> bool:
66 """Bound predicate for checking an exception type."""
67 return isinstance(exception, exception_types)
68
69 return if_exception_type_predicate
70
71
72# pylint: disable=invalid-name
73# Pylint sees this as a constant, but it is also an alias that should be
74# considered a function.
75if_transient_error = if_exception_type(
76 exceptions.InternalServerError,
77 exceptions.TooManyRequests,
78 exceptions.ServiceUnavailable,
79 requests.exceptions.ConnectionError,
80 requests.exceptions.ChunkedEncodingError,
81 auth_exceptions.TransportError,
82)
83"""A predicate that checks if an exception is a transient API error.
84
85The following server errors are considered transient:
86
87- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC
88 ``INTERNAL(13)`` and its subclasses.
89- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429
90- :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503
91- :class:`requests.exceptions.ConnectionError`
92- :class:`requests.exceptions.ChunkedEncodingError` - The server declared
93 chunked encoding but sent an invalid chunk.
94- :class:`google.auth.exceptions.TransportError` - Used to indicate an
95 error occurred during an HTTP request.
96"""
97# pylint: enable=invalid-name
98
99
100def exponential_sleep_generator(
101 initial: float, maximum: float, multiplier: float = _DEFAULT_DELAY_MULTIPLIER
102):
103 """Generates sleep intervals based on the exponential back-off algorithm.
104
105 This implements the `Truncated Exponential Back-off`_ algorithm.
106
107 .. _Truncated Exponential Back-off:
108 https://cloud.google.com/storage/docs/exponential-backoff
109
110 Args:
111 initial (float): The minimum amount of time to delay. This must
112 be greater than 0.
113 maximum (float): The maximum amount of time to delay.
114 multiplier (float): The multiplier applied to the delay.
115
116 Yields:
117 float: successive sleep intervals.
118 """
119 max_delay = min(initial, maximum)
120 while True:
121 yield random.uniform(0.0, max_delay)
122 max_delay = min(max_delay * multiplier, maximum)
123
124
125class RetryFailureReason(Enum):
126 """
127 The cause of a failed retry, used when building exceptions
128 """
129
130 TIMEOUT = 0
131 NON_RETRYABLE_ERROR = 1
132
133
134def build_retry_error(
135 exc_list: list[Exception],
136 reason: RetryFailureReason,
137 timeout_val: float | None,
138 **kwargs: Any,
139) -> tuple[Exception, Exception | None]:
140 """
141 Default exception_factory implementation.
142
143 Returns a RetryError if the failure is due to a timeout, otherwise
144 returns the last exception encountered.
145
146 Args:
147 - exc_list: list of exceptions that occurred during the retry
148 - reason: reason for the retry failure.
149 Can be TIMEOUT or NON_RETRYABLE_ERROR
150 - timeout_val: the original timeout value for the retry (in seconds), for use in the exception message
151
152 Returns:
153 - tuple: a tuple of the exception to be raised, and the cause exception if any
154 """
155 if reason == RetryFailureReason.TIMEOUT:
156 # return RetryError with the most recent exception as the cause
157 src_exc = exc_list[-1] if exc_list else None
158 timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
159 return (
160 exceptions.RetryError(
161 f"Timeout {timeout_val_str}exceeded",
162 src_exc,
163 ),
164 src_exc,
165 )
166 elif exc_list:
167 # return most recent exception encountered
168 return exc_list[-1], None
169 else:
170 # no exceptions were given in exc_list. Raise generic RetryError
171 return exceptions.RetryError("Unknown error", None), None
172
173
174def _retry_error_helper(
175 exc: Exception,
176 deadline: float | None,
177 next_sleep: float,
178 error_list: list[Exception],
179 predicate_fn: Callable[[Exception], bool],
180 on_error_fn: Callable[[Exception], None] | None,
181 exc_factory_fn: Callable[
182 [list[Exception], RetryFailureReason, float | None],
183 tuple[Exception, Exception | None],
184 ],
185 original_timeout: float | None,
186):
187 """
188 Shared logic for handling an error for all retry implementations
189
190 - Raises an error on timeout or non-retryable error
191 - Calls on_error_fn if provided
192 - Logs the error
193
194 Args:
195 - exc: the exception that was raised
196 - deadline: the deadline for the retry, calculated as a diff from time.monotonic()
197 - next_sleep: the next sleep interval
198 - error_list: the list of exceptions that have been raised so far
199 - predicate_fn: takes `exc` and returns true if the operation should be retried
200 - on_error_fn: callback to execute when a retryable error occurs
201 - exc_factory_fn: callback used to build the exception to be raised on terminal failure
202 - original_timeout_val: the original timeout value for the retry (in seconds),
203 to be passed to the exception factory for building an error message
204 """
205 error_list.append(exc)
206 if not predicate_fn(exc):
207 final_exc, source_exc = exc_factory_fn(
208 error_list,
209 RetryFailureReason.NON_RETRYABLE_ERROR,
210 original_timeout,
211 )
212 raise final_exc from source_exc
213 if on_error_fn is not None:
214 on_error_fn(exc)
215 if deadline is not None and time.monotonic() + next_sleep > deadline:
216 final_exc, source_exc = exc_factory_fn(
217 error_list,
218 RetryFailureReason.TIMEOUT,
219 original_timeout,
220 )
221 raise final_exc from source_exc
222 _LOGGER.debug(
223 "Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep)
224 )
225
226
227class _BaseRetry(object):
228 """
229 Base class for retry configuration objects. This class is intended to capture retry
230 and backoff configuration that is common to both synchronous and asynchronous retries,
231 for both unary and streaming RPCs. It is not intended to be instantiated directly,
232 but rather to be subclassed by the various retry configuration classes.
233 """
234
235 def __init__(
236 self,
237 predicate: Callable[[Exception], bool] = if_transient_error,
238 initial: float = _DEFAULT_INITIAL_DELAY,
239 maximum: float = _DEFAULT_MAXIMUM_DELAY,
240 multiplier: float = _DEFAULT_DELAY_MULTIPLIER,
241 timeout: Optional[float] = _DEFAULT_DEADLINE,
242 on_error: Optional[Callable[[Exception], Any]] = None,
243 **kwargs: Any,
244 ) -> None:
245 self._predicate = predicate
246 self._initial = initial
247 self._multiplier = multiplier
248 self._maximum = maximum
249 self._timeout = kwargs.get("deadline", timeout)
250 self._deadline = self._timeout
251 self._on_error = on_error
252
253 def __call__(self, *args, **kwargs) -> Any:
254 raise NotImplementedError("Not implemented in base class")
255
256 @property
257 def deadline(self) -> float | None:
258 """
259 DEPRECATED: use ``timeout`` instead. Refer to the ``Retry`` class
260 documentation for details.
261 """
262 return self._timeout
263
264 @property
265 def timeout(self) -> float | None:
266 return self._timeout
267
268 def with_deadline(self, deadline: float | None) -> Self:
269 """Return a copy of this retry with the given timeout.
270
271 DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class
272 documentation for details.
273
274 Args:
275 deadline (float|None): How long to keep retrying, in seconds. If None,
276 no timeout is enforced.
277
278 Returns:
279 Retry: A new retry instance with the given timeout.
280 """
281 return self.with_timeout(deadline)
282
283 def with_timeout(self, timeout: float | None) -> Self:
284 """Return a copy of this retry with the given timeout.
285
286 Args:
287 timeout (float): How long to keep retrying, in seconds. If None,
288 no timeout will be enforced.
289
290 Returns:
291 Retry: A new retry instance with the given timeout.
292 """
293 return type(self)(
294 predicate=self._predicate,
295 initial=self._initial,
296 maximum=self._maximum,
297 multiplier=self._multiplier,
298 timeout=timeout,
299 on_error=self._on_error,
300 )
301
302 def with_predicate(self, predicate: Callable[[Exception], bool]) -> Self:
303 """Return a copy of this retry with the given predicate.
304
305 Args:
306 predicate (Callable[Exception]): A callable that should return
307 ``True`` if the given exception is retryable.
308
309 Returns:
310 Retry: A new retry instance with the given predicate.
311 """
312 return type(self)(
313 predicate=predicate,
314 initial=self._initial,
315 maximum=self._maximum,
316 multiplier=self._multiplier,
317 timeout=self._timeout,
318 on_error=self._on_error,
319 )
320
321 def with_delay(
322 self,
323 initial: Optional[float] = None,
324 maximum: Optional[float] = None,
325 multiplier: Optional[float] = None,
326 ) -> Self:
327 """Return a copy of this retry with the given delay options.
328
329 Args:
330 initial (float): The minimum amount of time to delay (in seconds). This must
331 be greater than 0. If None, the current value is used.
332 maximum (float): The maximum amount of time to delay (in seconds). If None, the
333 current value is used.
334 multiplier (float): The multiplier applied to the delay. If None, the current
335 value is used.
336
337 Returns:
338 Retry: A new retry instance with the given delay options.
339 """
340 return type(self)(
341 predicate=self._predicate,
342 initial=initial if initial is not None else self._initial,
343 maximum=maximum if maximum is not None else self._maximum,
344 multiplier=multiplier if multiplier is not None else self._multiplier,
345 timeout=self._timeout,
346 on_error=self._on_error,
347 )
348
349 def __str__(self) -> str:
350 return (
351 "<{} predicate={}, initial={:.1f}, maximum={:.1f}, "
352 "multiplier={:.1f}, timeout={}, on_error={}>".format(
353 type(self).__name__,
354 self._predicate,
355 self._initial,
356 self._maximum,
357 self._multiplier,
358 self._timeout, # timeout can be None, thus no {:.1f}
359 self._on_error,
360 )
361 )