1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Generator wrapper for retryable streaming RPCs.
17"""
18from __future__ import annotations
19
20from typing import (
21 Callable,
22 Optional,
23 List,
24 Tuple,
25 Iterable,
26 Generator,
27 TypeVar,
28 Any,
29 TYPE_CHECKING,
30)
31
32import sys
33import time
34import functools
35
36from google.api_core.retry.retry_base import _BaseRetry
37from google.api_core.retry.retry_base import _retry_error_helper
38from google.api_core.retry import exponential_sleep_generator
39from google.api_core.retry import build_retry_error
40from google.api_core.retry import RetryFailureReason
41
42if TYPE_CHECKING:
43 if sys.version_info >= (3, 10):
44 from typing import ParamSpec
45 else:
46 from typing_extensions import ParamSpec
47
48 _P = ParamSpec("_P") # target function call parameters
49 _Y = TypeVar("_Y") # yielded values
50
51
52def retry_target_stream(
53 target: Callable[_P, Iterable[_Y]],
54 predicate: Callable[[Exception], bool],
55 sleep_generator: Iterable[float],
56 timeout: Optional[float] = None,
57 on_error: Optional[Callable[[Exception], None]] = None,
58 exception_factory: Callable[
59 [List[Exception], RetryFailureReason, Optional[float]],
60 Tuple[Exception, Optional[Exception]],
61 ] = build_retry_error,
62 init_args: _P.args = (),
63 init_kwargs: _P.kwargs = {},
64 **kwargs,
65) -> Generator[_Y, Any, None]:
66 """Create a generator wrapper that retries the wrapped stream if it fails.
67
68 This is the lowest-level retry helper. Generally, you'll use the
69 higher-level retry helper :class:`Retry`.
70
71 Args:
72 target: The generator function to call and retry.
73 predicate: A callable used to determine if an
74 exception raised by the target should be considered retryable.
75 It should return True to retry or False otherwise.
76 sleep_generator: An infinite iterator that determines
77 how long to sleep between retries.
78 timeout: How long to keep retrying the target.
79 Note: timeout is only checked before initiating a retry, so the target may
80 run past the timeout value as long as it is healthy.
81 on_error: If given, the on_error callback will be called with each
82 retryable exception raised by the target. Any error raised by this
83 function will *not* be caught.
84 exception_factory: A function that is called when the retryable reaches
85 a terminal failure state, used to construct an exception to be raised.
86 It takes a list of all exceptions encountered, a retry.RetryFailureReason
87 enum indicating the failure cause, and the original timeout value
88 as arguments. It should return a tuple of the exception to be raised,
89 along with the cause exception if any. The default implementation will raise
90 a RetryError on timeout, or the last exception encountered otherwise.
91 init_args: Positional arguments to pass to the target function.
92 init_kwargs: Keyword arguments to pass to the target function.
93
94 Returns:
95 Generator: A retryable generator that wraps the target generator function.
96
97 Raises:
98 ValueError: If the sleep generator stops yielding values.
99 Exception: a custom exception specified by the exception_factory if provided.
100 If no exception_factory is provided:
101 google.api_core.RetryError: If the timeout is exceeded while retrying.
102 Exception: If the target raises an error that isn't retryable.
103 """
104
105 timeout = kwargs.get("deadline", timeout)
106 deadline: Optional[float] = (
107 time.monotonic() + timeout if timeout is not None else None
108 )
109 error_list: list[Exception] = []
110
111 for sleep in sleep_generator:
112 # Start a new retry loop
113 try:
114 # Note: in the future, we can add a ResumptionStrategy object
115 # to generate new args between calls. For now, use the same args
116 # for each attempt.
117 subgenerator = target(*init_args, **init_kwargs)
118 return (yield from subgenerator)
119 # handle exceptions raised by the subgenerator
120 # pylint: disable=broad-except
121 # This function explicitly must deal with broad exceptions.
122 except Exception as exc:
123 # defer to shared logic for handling errors
124 _retry_error_helper(
125 exc,
126 deadline,
127 sleep,
128 error_list,
129 predicate,
130 on_error,
131 exception_factory,
132 timeout,
133 )
134 # if exception not raised, sleep before next attempt
135 time.sleep(sleep)
136
137 raise ValueError("Sleep generator stopped yielding sleep values.")
138
139
140class StreamingRetry(_BaseRetry):
141 """Exponential retry decorator for streaming synchronous RPCs.
142
143 This class returns a Generator when called, which wraps the target
144 stream in retry logic. If any exception is raised by the target, the
145 entire stream will be retried within the wrapper.
146
147 Although the default behavior is to retry transient API errors, a
148 different predicate can be provided to retry other exceptions.
149
150 Important Note: when a stream encounters a retryable error, it will
151 silently construct a fresh iterator instance in the background
152 and continue yielding (likely duplicate) values as if no error occurred.
153 This is the most general way to retry a stream, but it often is not the
154 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
155
156 There are two ways to build more advanced retry logic for streams:
157
158 1. Wrap the target
159 Use a ``target`` that maintains state between retries, and creates a
160 different generator on each retry call. For example, you can wrap a
161 network call in a function that modifies the request based on what has
162 already been returned:
163
164 .. code-block:: python
165
166 def attempt_with_modified_request(target, request, seen_items=[]):
167 # remove seen items from request on each attempt
168 new_request = modify_request(request, seen_items)
169 new_generator = target(new_request)
170 for item in new_generator:
171 yield item
172 seen_items.append(item)
173
174 retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request)
175 retryable_generator = retry_wrapped_fn(target, request)
176
177 2. Wrap the retry generator
178 Alternatively, you can wrap the retryable generator itself before
179 passing it to the end-user to add a filter on the stream. For
180 example, you can keep track of the items that were successfully yielded
181 in previous retry attempts, and only yield new items when the
182 new attempt surpasses the previous ones:
183
184 .. code-block:: python
185
186 def retryable_with_filter(target):
187 stream_idx = 0
188 # reset stream_idx when the stream is retried
189 def on_error(e):
190 nonlocal stream_idx
191 stream_idx = 0
192 # build retryable
193 retryable_gen = StreamingRetry(...)(target)
194 # keep track of what has been yielded out of filter
195 seen_items = []
196 for item in retryable_gen():
197 if stream_idx >= len(seen_items):
198 seen_items.append(item)
199 yield item
200 elif item != seen_items[stream_idx]:
201 raise ValueError("Stream differs from last attempt")
202 stream_idx += 1
203
204 filter_retry_wrapped = retryable_with_filter(target)
205
206 Args:
207 predicate (Callable[Exception]): A callable that should return ``True``
208 if the given exception is retryable.
209 initial (float): The minimum amount of time to delay in seconds. This
210 must be greater than 0.
211 maximum (float): The maximum amount of time to delay in seconds.
212 multiplier (float): The multiplier applied to the delay.
213 timeout (float): How long to keep retrying, in seconds.
214 Note: timeout is only checked before initiating a retry, so the target may
215 run past the timeout value as long as it is healthy.
216 on_error (Callable[Exception]): A function to call while processing
217 a retryable exception. Any error raised by this function will
218 *not* be caught.
219 deadline (float): DEPRECATED: use `timeout` instead. For backward
220 compatibility, if specified it will override the ``timeout`` parameter.
221 """
222
223 def __call__(
224 self,
225 func: Callable[_P, Iterable[_Y]],
226 on_error: Callable[[Exception], Any] | None = None,
227 ) -> Callable[_P, Generator[_Y, Any, None]]:
228 """Wrap a callable with retry behavior.
229
230 Args:
231 func (Callable): The callable to add retry behavior to.
232 on_error (Optional[Callable[Exception]]): If given, the
233 on_error callback will be called with each retryable exception
234 raised by the wrapped function. Any error raised by this
235 function will *not* be caught. If on_error was specified in the
236 constructor, this value will be ignored.
237
238 Returns:
239 Callable: A callable that will invoke ``func`` with retry
240 behavior.
241 """
242 if self._on_error is not None:
243 on_error = self._on_error
244
245 @functools.wraps(func)
246 def retry_wrapped_func(
247 *args: _P.args, **kwargs: _P.kwargs
248 ) -> Generator[_Y, Any, None]:
249 """A wrapper that calls target function with retry."""
250 sleep_generator = exponential_sleep_generator(
251 self._initial, self._maximum, multiplier=self._multiplier
252 )
253 return retry_target_stream(
254 func,
255 predicate=self._predicate,
256 sleep_generator=sleep_generator,
257 timeout=self._timeout,
258 on_error=on_error,
259 init_args=args,
260 init_kwargs=kwargs,
261 )
262
263 return retry_wrapped_func