1# Copyright 2023 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15""" 
    16Generator wrapper for retryable streaming RPCs. 
    17""" 
    18from __future__ import annotations 
    19 
    20from typing import ( 
    21    Callable, 
    22    Optional, 
    23    List, 
    24    Tuple, 
    25    Iterable, 
    26    Generator, 
    27    TypeVar, 
    28    Any, 
    29    TYPE_CHECKING, 
    30) 
    31 
    32import sys 
    33import time 
    34import functools 
    35 
    36from google.api_core.retry.retry_base import _BaseRetry 
    37from google.api_core.retry.retry_base import _retry_error_helper 
    38from google.api_core.retry import exponential_sleep_generator 
    39from google.api_core.retry import build_retry_error 
    40from google.api_core.retry import RetryFailureReason 
    41 
    42if TYPE_CHECKING: 
    43    if sys.version_info >= (3, 10): 
    44        from typing import ParamSpec 
    45    else: 
    46        from typing_extensions import ParamSpec 
    47 
    48    _P = ParamSpec("_P")  # target function call parameters 
    49    _Y = TypeVar("_Y")  # yielded values 
    50 
    51 
    52def retry_target_stream( 
    53    target: Callable[_P, Iterable[_Y]], 
    54    predicate: Callable[[Exception], bool], 
    55    sleep_generator: Iterable[float], 
    56    timeout: Optional[float] = None, 
    57    on_error: Optional[Callable[[Exception], None]] = None, 
    58    exception_factory: Callable[ 
    59        [List[Exception], RetryFailureReason, Optional[float]], 
    60        Tuple[Exception, Optional[Exception]], 
    61    ] = build_retry_error, 
    62    init_args: tuple = (), 
    63    init_kwargs: dict = {}, 
    64    **kwargs, 
    65) -> Generator[_Y, Any, None]: 
    66    """Create a generator wrapper that retries the wrapped stream if it fails. 
    67 
    68    This is the lowest-level retry helper. Generally, you'll use the 
    69    higher-level retry helper :class:`Retry`. 
    70 
    71    Args: 
    72        target: The generator function to call and retry. 
    73        predicate: A callable used to determine if an 
    74            exception raised by the target should be considered retryable. 
    75            It should return True to retry or False otherwise. 
    76        sleep_generator: An infinite iterator that determines 
    77            how long to sleep between retries. 
    78        timeout: How long to keep retrying the target. 
    79            Note: timeout is only checked before initiating a retry, so the target may 
    80            run past the timeout value as long as it is healthy. 
    81        on_error: If given, the on_error callback will be called with each 
    82            retryable exception raised by the target. Any error raised by this 
    83            function will *not* be caught. 
    84        exception_factory: A function that is called when the retryable reaches 
    85            a terminal failure state, used to construct an exception to be raised. 
    86            It takes a list of all exceptions encountered, a retry.RetryFailureReason 
    87            enum indicating the failure cause, and the original timeout value 
    88            as arguments. It should return a tuple of the exception to be raised, 
    89            along with the cause exception if any. The default implementation will raise 
    90            a RetryError on timeout, or the last exception encountered otherwise. 
    91        init_args: Positional arguments to pass to the target function. 
    92        init_kwargs: Keyword arguments to pass to the target function. 
    93 
    94    Returns: 
    95        Generator: A retryable generator that wraps the target generator function. 
    96 
    97    Raises: 
    98        ValueError: If the sleep generator stops yielding values. 
    99        Exception: a custom exception specified by the exception_factory if provided. 
    100            If no exception_factory is provided: 
    101                google.api_core.RetryError: If the timeout is exceeded while retrying. 
    102                Exception: If the target raises an error that isn't retryable. 
    103    """ 
    104 
    105    timeout = kwargs.get("deadline", timeout) 
    106    deadline: Optional[float] = ( 
    107        time.monotonic() + timeout if timeout is not None else None 
    108    ) 
    109    error_list: list[Exception] = [] 
    110    sleep_iter = iter(sleep_generator) 
    111 
    112    # continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper 
    113    # TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535 
    114    while True: 
    115        # Start a new retry loop 
    116        try: 
    117            # Note: in the future, we can add a ResumptionStrategy object 
    118            # to generate new args between calls. For now, use the same args 
    119            # for each attempt. 
    120            subgenerator = target(*init_args, **init_kwargs) 
    121            return (yield from subgenerator) 
    122        # handle exceptions raised by the subgenerator 
    123        # pylint: disable=broad-except 
    124        # This function explicitly must deal with broad exceptions. 
    125        except Exception as exc: 
    126            # defer to shared logic for handling errors 
    127            next_sleep = _retry_error_helper( 
    128                exc, 
    129                deadline, 
    130                sleep_iter, 
    131                error_list, 
    132                predicate, 
    133                on_error, 
    134                exception_factory, 
    135                timeout, 
    136            ) 
    137            # if exception not raised, sleep before next attempt 
    138            time.sleep(next_sleep) 
    139 
    140 
    141class StreamingRetry(_BaseRetry): 
    142    """Exponential retry decorator for streaming synchronous RPCs. 
    143 
    144    This class returns a Generator when called, which wraps the target 
    145    stream in retry logic. If any exception is raised by the target, the 
    146    entire stream will be retried within the wrapper. 
    147 
    148    Although the default behavior is to retry transient API errors, a 
    149    different predicate can be provided to retry other exceptions. 
    150 
    151    Important Note: when a stream encounters a retryable error, it will 
    152    silently construct a fresh iterator instance in the background 
    153    and continue yielding (likely duplicate) values as if no error occurred. 
    154    This is the most general way to retry a stream, but it often is not the 
    155    desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] 
    156 
    157    There are two ways to build more advanced retry logic for streams: 
    158 
    159    1. Wrap the target 
    160        Use a ``target`` that maintains state between retries, and creates a 
    161        different generator on each retry call. For example, you can wrap a 
    162        network call in a function that modifies the request based on what has 
    163        already been returned: 
    164 
    165        .. code-block:: python 
    166 
    167            def attempt_with_modified_request(target, request, seen_items=[]): 
    168                # remove seen items from request on each attempt 
    169                new_request = modify_request(request, seen_items) 
    170                new_generator = target(new_request) 
    171                for item in new_generator: 
    172                    yield item 
    173                    seen_items.append(item) 
    174 
    175            retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request) 
    176            retryable_generator = retry_wrapped_fn(target, request) 
    177 
    178    2. Wrap the retry generator 
    179        Alternatively, you can wrap the retryable generator itself before 
    180        passing it to the end-user to add a filter on the stream. For 
    181        example, you can keep track of the items that were successfully yielded 
    182        in previous retry attempts, and only yield new items when the 
    183        new attempt surpasses the previous ones: 
    184 
    185        .. code-block:: python 
    186 
    187            def retryable_with_filter(target): 
    188                stream_idx = 0 
    189                # reset stream_idx when the stream is retried 
    190                def on_error(e): 
    191                    nonlocal stream_idx 
    192                    stream_idx = 0 
    193                # build retryable 
    194                retryable_gen = StreamingRetry(...)(target) 
    195                # keep track of what has been yielded out of filter 
    196                seen_items = [] 
    197                for item in retryable_gen(): 
    198                    if stream_idx >= len(seen_items): 
    199                        seen_items.append(item) 
    200                        yield item 
    201                    elif item != seen_items[stream_idx]: 
    202                        raise ValueError("Stream differs from last attempt") 
    203                    stream_idx += 1 
    204 
    205            filter_retry_wrapped = retryable_with_filter(target) 
    206 
    207    Args: 
    208        predicate (Callable[Exception]): A callable that should return ``True`` 
    209            if the given exception is retryable. 
    210        initial (float): The minimum amount of time to delay in seconds. This 
    211            must be greater than 0. 
    212        maximum (float): The maximum amount of time to delay in seconds. 
    213        multiplier (float): The multiplier applied to the delay. 
    214        timeout (float): How long to keep retrying, in seconds. 
    215            Note: timeout is only checked before initiating a retry, so the target may 
    216            run past the timeout value as long as it is healthy. 
    217        on_error (Callable[Exception]): A function to call while processing 
    218            a retryable exception. Any error raised by this function will 
    219            *not* be caught. 
    220        deadline (float): DEPRECATED: use `timeout` instead. For backward 
    221            compatibility, if specified it will override the ``timeout`` parameter. 
    222    """ 
    223 
    224    def __call__( 
    225        self, 
    226        func: Callable[_P, Iterable[_Y]], 
    227        on_error: Callable[[Exception], Any] | None = None, 
    228    ) -> Callable[_P, Generator[_Y, Any, None]]: 
    229        """Wrap a callable with retry behavior. 
    230 
    231        Args: 
    232            func (Callable): The callable to add retry behavior to. 
    233            on_error (Optional[Callable[Exception]]): If given, the 
    234                on_error callback will be called with each retryable exception 
    235                raised by the wrapped function. Any error raised by this 
    236                function will *not* be caught. If on_error was specified in the 
    237                constructor, this value will be ignored. 
    238 
    239        Returns: 
    240            Callable: A callable that will invoke ``func`` with retry 
    241                behavior. 
    242        """ 
    243        if self._on_error is not None: 
    244            on_error = self._on_error 
    245 
    246        @functools.wraps(func) 
    247        def retry_wrapped_func( 
    248            *args: _P.args, **kwargs: _P.kwargs 
    249        ) -> Generator[_Y, Any, None]: 
    250            """A wrapper that calls target function with retry.""" 
    251            sleep_generator = exponential_sleep_generator( 
    252                self._initial, self._maximum, multiplier=self._multiplier 
    253            ) 
    254            return retry_target_stream( 
    255                func, 
    256                predicate=self._predicate, 
    257                sleep_generator=sleep_generator, 
    258                timeout=self._timeout, 
    259                on_error=on_error, 
    260                init_args=args, 
    261                init_kwargs=kwargs, 
    262            ) 
    263 
    264        return retry_wrapped_func