1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Generator wrapper for retryable async streaming RPCs.
17"""
18from __future__ import annotations
19
20from typing import (
21 cast,
22 Any,
23 Callable,
24 Iterable,
25 AsyncIterator,
26 AsyncIterable,
27 Awaitable,
28 TypeVar,
29 AsyncGenerator,
30 TYPE_CHECKING,
31)
32
33import asyncio
34import time
35import sys
36import functools
37
38from google.api_core.retry.retry_base import _BaseRetry
39from google.api_core.retry.retry_base import _retry_error_helper
40from google.api_core.retry import exponential_sleep_generator
41from google.api_core.retry import build_retry_error
42from google.api_core.retry import RetryFailureReason
43
44
45if TYPE_CHECKING:
46 if sys.version_info >= (3, 10):
47 from typing import ParamSpec
48 else:
49 from typing_extensions import ParamSpec
50
51 _P = ParamSpec("_P") # target function call parameters
52 _Y = TypeVar("_Y") # yielded values
53
54
55async def retry_target_stream(
56 target: Callable[_P, AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]],
57 predicate: Callable[[Exception], bool],
58 sleep_generator: Iterable[float],
59 timeout: float | None = None,
60 on_error: Callable[[Exception], None] | None = None,
61 exception_factory: Callable[
62 [list[Exception], RetryFailureReason, float | None],
63 tuple[Exception, Exception | None],
64 ] = build_retry_error,
65 init_args: _P.args = (),
66 init_kwargs: _P.kwargs = {},
67 **kwargs,
68) -> AsyncGenerator[_Y, None]:
69 """Create a generator wrapper that retries the wrapped stream if it fails.
70
71 This is the lowest-level retry helper. Generally, you'll use the
72 higher-level retry helper :class:`AsyncRetry`.
73
74 Args:
75 target: The generator function to call and retry.
76 predicate: A callable used to determine if an
77 exception raised by the target should be considered retryable.
78 It should return True to retry or False otherwise.
79 sleep_generator: An infinite iterator that determines
80 how long to sleep between retries.
81 timeout: How long to keep retrying the target.
82 Note: timeout is only checked before initiating a retry, so the target may
83 run past the timeout value as long as it is healthy.
84 on_error: If given, the on_error callback will be called with each
85 retryable exception raised by the target. Any error raised by this
86 function will *not* be caught.
87 exception_factory: A function that is called when the retryable reaches
88 a terminal failure state, used to construct an exception to be raised.
89 It takes a list of all exceptions encountered, a retry.RetryFailureReason
90 enum indicating the failure cause, and the original timeout value
91 as arguments. It should return a tuple of the exception to be raised,
92 along with the cause exception if any. The default implementation will raise
93 a RetryError on timeout, or the last exception encountered otherwise.
94 init_args: Positional arguments to pass to the target function.
95 init_kwargs: Keyword arguments to pass to the target function.
96
97 Returns:
98 AsyncGenerator: A retryable generator that wraps the target generator function.
99
100 Raises:
101 ValueError: If the sleep generator stops yielding values.
102 Exception: a custom exception specified by the exception_factory if provided.
103 If no exception_factory is provided:
104 google.api_core.RetryError: If the timeout is exceeded while retrying.
105 Exception: If the target raises an error that isn't retryable.
106 """
107 target_iterator: AsyncIterator[_Y] | None = None
108 timeout = kwargs.get("deadline", timeout)
109 deadline = time.monotonic() + timeout if timeout else None
110 # keep track of retryable exceptions we encounter to pass in to exception_factory
111 error_list: list[Exception] = []
112 target_is_generator: bool | None = None
113
114 for sleep in sleep_generator:
115 # Start a new retry loop
116 try:
117 # Note: in the future, we can add a ResumptionStrategy object
118 # to generate new args between calls. For now, use the same args
119 # for each attempt.
120 target_output: AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]] = target(
121 *init_args, **init_kwargs
122 )
123 try:
124 # gapic functions return the generator behind an awaitable
125 # unwrap the awaitable so we can work with the generator directly
126 target_output = await target_output # type: ignore
127 except TypeError:
128 # was not awaitable, continue
129 pass
130 target_iterator = cast(AsyncIterable["_Y"], target_output).__aiter__()
131
132 if target_is_generator is None:
133 # Check if target supports generator features (asend, athrow, aclose)
134 target_is_generator = bool(getattr(target_iterator, "asend", None))
135
136 sent_in = None
137 while True:
138 ## Read from target_iterator
139 # If the target is a generator, we will advance it with `asend`
140 # otherwise, we will use `anext`
141 if target_is_generator:
142 next_value = await target_iterator.asend(sent_in) # type: ignore
143 else:
144 next_value = await target_iterator.__anext__()
145 ## Yield from Wrapper to caller
146 try:
147 # yield latest value from target
148 # exceptions from `athrow` and `aclose` are injected here
149 sent_in = yield next_value
150 except GeneratorExit:
151 # if wrapper received `aclose` while waiting on yield,
152 # it will raise GeneratorExit here
153 if target_is_generator:
154 # pass to inner target_iterator for handling
155 await cast(AsyncGenerator["_Y", None], target_iterator).aclose()
156 else:
157 raise
158 return
159 except: # noqa: E722
160 # bare except catches any exception passed to `athrow`
161 if target_is_generator:
162 # delegate error handling to target_iterator
163 await cast(AsyncGenerator["_Y", None], target_iterator).athrow(
164 cast(BaseException, sys.exc_info()[1])
165 )
166 else:
167 raise
168 return
169 except StopAsyncIteration:
170 # if iterator exhausted, return
171 return
172 # handle exceptions raised by the target_iterator
173 # pylint: disable=broad-except
174 # This function explicitly must deal with broad exceptions.
175 except Exception as exc:
176 # defer to shared logic for handling errors
177 _retry_error_helper(
178 exc,
179 deadline,
180 sleep,
181 error_list,
182 predicate,
183 on_error,
184 exception_factory,
185 timeout,
186 )
187 # if exception not raised, sleep before next attempt
188 await asyncio.sleep(sleep)
189 finally:
190 if target_is_generator and target_iterator is not None:
191 await cast(AsyncGenerator["_Y", None], target_iterator).aclose()
192 raise ValueError("Sleep generator stopped yielding sleep values.")
193
194
195class AsyncStreamingRetry(_BaseRetry):
196 """Exponential retry decorator for async streaming rpcs.
197
198 This class returns an AsyncGenerator when called, which wraps the target
199 stream in retry logic. If any exception is raised by the target, the
200 entire stream will be retried within the wrapper.
201
202 Although the default behavior is to retry transient API errors, a
203 different predicate can be provided to retry other exceptions.
204
205 Important Note: when a stream is encounters a retryable error, it will
206 silently construct a fresh iterator instance in the background
207 and continue yielding (likely duplicate) values as if no error occurred.
208 This is the most general way to retry a stream, but it often is not the
209 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
210
211 There are two ways to build more advanced retry logic for streams:
212
213 1. Wrap the target
214 Use a ``target`` that maintains state between retries, and creates a
215 different generator on each retry call. For example, you can wrap a
216 grpc call in a function that modifies the request based on what has
217 already been returned:
218
219 .. code-block:: python
220
221 async def attempt_with_modified_request(target, request, seen_items=[]):
222 # remove seen items from request on each attempt
223 new_request = modify_request(request, seen_items)
224 new_generator = await target(new_request)
225 async for item in new_generator:
226 yield item
227 seen_items.append(item)
228
229 retry_wrapped = AsyncRetry(is_stream=True,...)(attempt_with_modified_request, target, request, [])
230
231 2. Wrap the retry generator
232 Alternatively, you can wrap the retryable generator itself before
233 passing it to the end-user to add a filter on the stream. For
234 example, you can keep track of the items that were successfully yielded
235 in previous retry attempts, and only yield new items when the
236 new attempt surpasses the previous ones:
237
238 .. code-block:: python
239
240 async def retryable_with_filter(target):
241 stream_idx = 0
242 # reset stream_idx when the stream is retried
243 def on_error(e):
244 nonlocal stream_idx
245 stream_idx = 0
246 # build retryable
247 retryable_gen = AsyncRetry(is_stream=True, ...)(target)
248 # keep track of what has been yielded out of filter
249 seen_items = []
250 async for item in retryable_gen:
251 if stream_idx >= len(seen_items):
252 yield item
253 seen_items.append(item)
254 elif item != previous_stream[stream_idx]:
255 raise ValueError("Stream differs from last attempt")"
256 stream_idx += 1
257
258 filter_retry_wrapped = retryable_with_filter(target)
259
260 Args:
261 predicate (Callable[Exception]): A callable that should return ``True``
262 if the given exception is retryable.
263 initial (float): The minimum amount of time to delay in seconds. This
264 must be greater than 0.
265 maximum (float): The maximum amount of time to delay in seconds.
266 multiplier (float): The multiplier applied to the delay.
267 timeout (Optional[float]): How long to keep retrying in seconds.
268 Note: timeout is only checked before initiating a retry, so the target may
269 run past the timeout value as long as it is healthy.
270 on_error (Optional[Callable[Exception]]): A function to call while processing
271 a retryable exception. Any error raised by this function will
272 *not* be caught.
273 is_stream (bool): Indicates whether the input function
274 should be treated as a stream function (i.e. an AsyncGenerator,
275 or function or coroutine that returns an AsyncIterable).
276 If True, the iterable will be wrapped with retry logic, and any
277 failed outputs will restart the stream. If False, only the input
278 function call itself will be retried. Defaults to False.
279 To avoid duplicate values, retryable streams should typically be
280 wrapped in additional filter logic before use.
281 deadline (float): DEPRECATED use ``timeout`` instead. If set it will
282 override ``timeout`` parameter.
283 """
284
285 def __call__(
286 self,
287 func: Callable[..., AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]],
288 on_error: Callable[[Exception], Any] | None = None,
289 ) -> Callable[_P, Awaitable[AsyncGenerator[_Y, None]]]:
290 """Wrap a callable with retry behavior.
291
292 Args:
293 func (Callable): The callable or stream to add retry behavior to.
294 on_error (Optional[Callable[Exception]]): If given, the
295 on_error callback will be called with each retryable exception
296 raised by the wrapped function. Any error raised by this
297 function will *not* be caught. If on_error was specified in the
298 constructor, this value will be ignored.
299
300 Returns:
301 Callable: A callable that will invoke ``func`` with retry
302 behavior.
303 """
304 if self._on_error is not None:
305 on_error = self._on_error
306
307 @functools.wraps(func)
308 async def retry_wrapped_func(
309 *args: _P.args, **kwargs: _P.kwargs
310 ) -> AsyncGenerator[_Y, None]:
311 """A wrapper that calls target function with retry."""
312 sleep_generator = exponential_sleep_generator(
313 self._initial, self._maximum, multiplier=self._multiplier
314 )
315 return retry_target_stream(
316 func,
317 self._predicate,
318 sleep_generator,
319 self._timeout,
320 on_error,
321 init_args=args,
322 init_kwargs=kwargs,
323 )
324
325 return retry_wrapped_func