1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Shared classes and functions for retrying requests. 
    16 
    17:class:`_BaseRetry` is the base class for :class:`Retry`, 
    18:class:`AsyncRetry`, :class:`StreamingRetry`, and :class:`AsyncStreamingRetry`. 
    19""" 
    20 
    21from __future__ import annotations 
    22 
    23import logging 
    24import random 
    25import time 
    26 
    27from enum import Enum 
    28from typing import Any, Callable, Optional, Iterator, TYPE_CHECKING 
    29 
    30import requests.exceptions 
    31 
    32from google.api_core import exceptions 
    33from google.auth import exceptions as auth_exceptions 
    34 
    35if TYPE_CHECKING: 
    36    import sys 
    37 
    38    if sys.version_info >= (3, 11): 
    39        from typing import Self 
    40    else: 
    41        from typing_extensions import Self 
    42 
    43_DEFAULT_INITIAL_DELAY = 1.0  # seconds 
    44_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds 
    45_DEFAULT_DELAY_MULTIPLIER = 2.0 
    46_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds 
    47 
    48_LOGGER = logging.getLogger("google.api_core.retry") 
    49 
    50 
    51def if_exception_type( 
    52    *exception_types: type[Exception], 
    53) -> Callable[[Exception], bool]: 
    54    """Creates a predicate to check if the exception is of a given type. 
    55 
    56    Args: 
    57        exception_types (Sequence[:func:`type`]): The exception types to check 
    58            for. 
    59 
    60    Returns: 
    61        Callable[Exception]: A predicate that returns True if the provided 
    62            exception is of the given type(s). 
    63    """ 
    64 
    65    def if_exception_type_predicate(exception: Exception) -> bool: 
    66        """Bound predicate for checking an exception type.""" 
    67        return isinstance(exception, exception_types) 
    68 
    69    return if_exception_type_predicate 
    70 
    71 
    72# pylint: disable=invalid-name 
    73# Pylint sees this as a constant, but it is also an alias that should be 
    74# considered a function. 
    75if_transient_error = if_exception_type( 
    76    exceptions.InternalServerError, 
    77    exceptions.TooManyRequests, 
    78    exceptions.ServiceUnavailable, 
    79    requests.exceptions.ConnectionError, 
    80    requests.exceptions.ChunkedEncodingError, 
    81    auth_exceptions.TransportError, 
    82) 
    83"""A predicate that checks if an exception is a transient API error. 
    84 
    85The following server errors are considered transient: 
    86 
    87- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC 
    88    ``INTERNAL(13)`` and its subclasses. 
    89- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429 
    90- :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503 
    91- :class:`requests.exceptions.ConnectionError` 
    92- :class:`requests.exceptions.ChunkedEncodingError` - The server declared 
    93    chunked encoding but sent an invalid chunk. 
    94- :class:`google.auth.exceptions.TransportError` - Used to indicate an 
    95    error occurred during an HTTP request. 
    96""" 
    97# pylint: enable=invalid-name 
    98 
    99 
    100def exponential_sleep_generator( 
    101    initial: float, maximum: float, multiplier: float = _DEFAULT_DELAY_MULTIPLIER 
    102): 
    103    """Generates sleep intervals based on the exponential back-off algorithm. 
    104 
    105    This implements the `Truncated Exponential Back-off`_ algorithm. 
    106 
    107    .. _Truncated Exponential Back-off: 
    108        https://cloud.google.com/storage/docs/exponential-backoff 
    109 
    110    Args: 
    111        initial (float): The minimum amount of time to delay. This must 
    112            be greater than 0. 
    113        maximum (float): The maximum amount of time to delay. 
    114        multiplier (float): The multiplier applied to the delay. 
    115 
    116    Yields: 
    117        float: successive sleep intervals. 
    118    """ 
    119    max_delay = min(initial, maximum) 
    120    while True: 
    121        yield random.uniform(0.0, max_delay) 
    122        max_delay = min(max_delay * multiplier, maximum) 
    123 
    124 
    125class RetryFailureReason(Enum): 
    126    """ 
    127    The cause of a failed retry, used when building exceptions 
    128    """ 
    129 
    130    TIMEOUT = 0 
    131    NON_RETRYABLE_ERROR = 1 
    132 
    133 
    134def build_retry_error( 
    135    exc_list: list[Exception], 
    136    reason: RetryFailureReason, 
    137    timeout_val: float | None, 
    138    **kwargs: Any, 
    139) -> tuple[Exception, Exception | None]: 
    140    """ 
    141    Default exception_factory implementation. 
    142 
    143    Returns a RetryError if the failure is due to a timeout, otherwise 
    144    returns the last exception encountered. 
    145 
    146    Args: 
    147      - exc_list: list of exceptions that occurred during the retry 
    148      - reason: reason for the retry failure. 
    149            Can be TIMEOUT or NON_RETRYABLE_ERROR 
    150      - timeout_val: the original timeout value for the retry (in seconds), for use in the exception message 
    151 
    152    Returns: 
    153      - tuple: a tuple of the exception to be raised, and the cause exception if any 
    154    """ 
    155    if reason == RetryFailureReason.TIMEOUT: 
    156        # return RetryError with the most recent exception as the cause 
    157        src_exc = exc_list[-1] if exc_list else None 
    158        timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" 
    159        return ( 
    160            exceptions.RetryError( 
    161                f"Timeout {timeout_val_str}exceeded", 
    162                src_exc, 
    163            ), 
    164            src_exc, 
    165        ) 
    166    elif exc_list: 
    167        # return most recent exception encountered 
    168        return exc_list[-1], None 
    169    else: 
    170        # no exceptions were given in exc_list. Raise generic RetryError 
    171        return exceptions.RetryError("Unknown error", None), None 
    172 
    173 
    174def _retry_error_helper( 
    175    exc: Exception, 
    176    deadline: float | None, 
    177    sleep_iterator: Iterator[float], 
    178    error_list: list[Exception], 
    179    predicate_fn: Callable[[Exception], bool], 
    180    on_error_fn: Callable[[Exception], None] | None, 
    181    exc_factory_fn: Callable[ 
    182        [list[Exception], RetryFailureReason, float | None], 
    183        tuple[Exception, Exception | None], 
    184    ], 
    185    original_timeout: float | None, 
    186) -> float: 
    187    """ 
    188    Shared logic for handling an error for all retry implementations 
    189 
    190    - Raises an error on timeout or non-retryable error 
    191    - Calls on_error_fn if provided 
    192    - Logs the error 
    193 
    194    Args: 
    195       - exc: the exception that was raised 
    196       - deadline: the deadline for the retry, calculated as a diff from time.monotonic() 
    197       - sleep_iterator: iterator to draw the next backoff value from 
    198       - error_list: the list of exceptions that have been raised so far 
    199       - predicate_fn: takes `exc` and returns true if the operation should be retried 
    200       - on_error_fn: callback to execute when a retryable error occurs 
    201       - exc_factory_fn: callback used to build the exception to be raised on terminal failure 
    202       - original_timeout_val: the original timeout value for the retry (in seconds), 
    203           to be passed to the exception factory for building an error message 
    204    Returns: 
    205        - the sleep value chosen before the next attempt 
    206    """ 
    207    error_list.append(exc) 
    208    if not predicate_fn(exc): 
    209        final_exc, source_exc = exc_factory_fn( 
    210            error_list, 
    211            RetryFailureReason.NON_RETRYABLE_ERROR, 
    212            original_timeout, 
    213        ) 
    214        raise final_exc from source_exc 
    215    if on_error_fn is not None: 
    216        on_error_fn(exc) 
    217    # next_sleep is fetched after the on_error callback, to allow clients 
    218    # to update sleep_iterator values dynamically in response to errors 
    219    try: 
    220        next_sleep = next(sleep_iterator) 
    221    except StopIteration: 
    222        raise ValueError("Sleep generator stopped yielding sleep values.") from exc 
    223    if deadline is not None and time.monotonic() + next_sleep > deadline: 
    224        final_exc, source_exc = exc_factory_fn( 
    225            error_list, 
    226            RetryFailureReason.TIMEOUT, 
    227            original_timeout, 
    228        ) 
    229        raise final_exc from source_exc 
    230    _LOGGER.debug( 
    231        "Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep) 
    232    ) 
    233    return next_sleep 
    234 
    235 
    236class _BaseRetry(object): 
    237    """ 
    238    Base class for retry configuration objects. This class is intended to capture retry 
    239    and backoff configuration that is common to both synchronous and asynchronous retries, 
    240    for both unary and streaming RPCs. It is not intended to be instantiated directly, 
    241    but rather to be subclassed by the various retry configuration classes. 
    242    """ 
    243 
    244    def __init__( 
    245        self, 
    246        predicate: Callable[[Exception], bool] = if_transient_error, 
    247        initial: float = _DEFAULT_INITIAL_DELAY, 
    248        maximum: float = _DEFAULT_MAXIMUM_DELAY, 
    249        multiplier: float = _DEFAULT_DELAY_MULTIPLIER, 
    250        timeout: Optional[float] = _DEFAULT_DEADLINE, 
    251        on_error: Optional[Callable[[Exception], Any]] = None, 
    252        **kwargs: Any, 
    253    ) -> None: 
    254        self._predicate = predicate 
    255        self._initial = initial 
    256        self._multiplier = multiplier 
    257        self._maximum = maximum 
    258        self._timeout = kwargs.get("deadline", timeout) 
    259        self._deadline = self._timeout 
    260        self._on_error = on_error 
    261 
    262    def __call__(self, *args, **kwargs) -> Any: 
    263        raise NotImplementedError("Not implemented in base class") 
    264 
    265    @property 
    266    def deadline(self) -> float | None: 
    267        """ 
    268        DEPRECATED: use ``timeout`` instead.  Refer to the ``Retry`` class 
    269        documentation for details. 
    270        """ 
    271        return self._timeout 
    272 
    273    @property 
    274    def timeout(self) -> float | None: 
    275        return self._timeout 
    276 
    277    def with_deadline(self, deadline: float | None) -> Self: 
    278        """Return a copy of this retry with the given timeout. 
    279 
    280        DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class 
    281        documentation for details. 
    282 
    283        Args: 
    284            deadline (float|None): How long to keep retrying, in seconds. If None, 
    285                no timeout is enforced. 
    286 
    287        Returns: 
    288            Retry: A new retry instance with the given timeout. 
    289        """ 
    290        return self.with_timeout(deadline) 
    291 
    292    def with_timeout(self, timeout: float | None) -> Self: 
    293        """Return a copy of this retry with the given timeout. 
    294 
    295        Args: 
    296            timeout (float): How long to keep retrying, in seconds. If None, 
    297                no timeout will be enforced. 
    298 
    299        Returns: 
    300            Retry: A new retry instance with the given timeout. 
    301        """ 
    302        return type(self)( 
    303            predicate=self._predicate, 
    304            initial=self._initial, 
    305            maximum=self._maximum, 
    306            multiplier=self._multiplier, 
    307            timeout=timeout, 
    308            on_error=self._on_error, 
    309        ) 
    310 
    311    def with_predicate(self, predicate: Callable[[Exception], bool]) -> Self: 
    312        """Return a copy of this retry with the given predicate. 
    313 
    314        Args: 
    315            predicate (Callable[Exception]): A callable that should return 
    316                ``True`` if the given exception is retryable. 
    317 
    318        Returns: 
    319            Retry: A new retry instance with the given predicate. 
    320        """ 
    321        return type(self)( 
    322            predicate=predicate, 
    323            initial=self._initial, 
    324            maximum=self._maximum, 
    325            multiplier=self._multiplier, 
    326            timeout=self._timeout, 
    327            on_error=self._on_error, 
    328        ) 
    329 
    330    def with_delay( 
    331        self, 
    332        initial: Optional[float] = None, 
    333        maximum: Optional[float] = None, 
    334        multiplier: Optional[float] = None, 
    335    ) -> Self: 
    336        """Return a copy of this retry with the given delay options. 
    337 
    338        Args: 
    339            initial (float): The minimum amount of time to delay (in seconds). This must 
    340                be greater than 0. If None, the current value is used. 
    341            maximum (float): The maximum amount of time to delay (in seconds). If None, the 
    342                current value is used. 
    343            multiplier (float): The multiplier applied to the delay. If None, the current 
    344                value is used. 
    345 
    346        Returns: 
    347            Retry: A new retry instance with the given delay options. 
    348        """ 
    349        return type(self)( 
    350            predicate=self._predicate, 
    351            initial=initial if initial is not None else self._initial, 
    352            maximum=maximum if maximum is not None else self._maximum, 
    353            multiplier=multiplier if multiplier is not None else self._multiplier, 
    354            timeout=self._timeout, 
    355            on_error=self._on_error, 
    356        ) 
    357 
    358    def __str__(self) -> str: 
    359        return ( 
    360            "<{} predicate={}, initial={:.1f}, maximum={:.1f}, " 
    361            "multiplier={:.1f}, timeout={}, on_error={}>".format( 
    362                type(self).__name__, 
    363                self._predicate, 
    364                self._initial, 
    365                self._maximum, 
    366                self._multiplier, 
    367                self._timeout,  # timeout can be None, thus no {:.1f} 
    368                self._on_error, 
    369            ) 
    370        )