1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Generator wrapper for retryable streaming RPCs.
17"""
18from __future__ import annotations
19
20from typing import (
21 Callable,
22 Optional,
23 List,
24 Tuple,
25 Iterable,
26 Generator,
27 TypeVar,
28 Any,
29 TYPE_CHECKING,
30)
31
32import sys
33import time
34import functools
35
36from google.api_core.retry.retry_base import _BaseRetry
37from google.api_core.retry.retry_base import _retry_error_helper
38from google.api_core.retry import exponential_sleep_generator
39from google.api_core.retry import build_retry_error
40from google.api_core.retry import RetryFailureReason
41
42if TYPE_CHECKING:
43 if sys.version_info >= (3, 10):
44 from typing import ParamSpec
45 else:
46 from typing_extensions import ParamSpec
47
48 _P = ParamSpec("_P") # target function call parameters
49 _Y = TypeVar("_Y") # yielded values
50
51
52def retry_target_stream(
53 target: Callable[_P, Iterable[_Y]],
54 predicate: Callable[[Exception], bool],
55 sleep_generator: Iterable[float],
56 timeout: Optional[float] = None,
57 on_error: Optional[Callable[[Exception], None]] = None,
58 exception_factory: Callable[
59 [List[Exception], RetryFailureReason, Optional[float]],
60 Tuple[Exception, Optional[Exception]],
61 ] = build_retry_error,
62 init_args: tuple = (),
63 init_kwargs: dict = {},
64 **kwargs,
65) -> Generator[_Y, Any, None]:
66 """Create a generator wrapper that retries the wrapped stream if it fails.
67
68 This is the lowest-level retry helper. Generally, you'll use the
69 higher-level retry helper :class:`Retry`.
70
71 Args:
72 target: The generator function to call and retry.
73 predicate: A callable used to determine if an
74 exception raised by the target should be considered retryable.
75 It should return True to retry or False otherwise.
76 sleep_generator: An infinite iterator that determines
77 how long to sleep between retries.
78 timeout: How long to keep retrying the target.
79 Note: timeout is only checked before initiating a retry, so the target may
80 run past the timeout value as long as it is healthy.
81 on_error: If given, the on_error callback will be called with each
82 retryable exception raised by the target. Any error raised by this
83 function will *not* be caught.
84 exception_factory: A function that is called when the retryable reaches
85 a terminal failure state, used to construct an exception to be raised.
86 It takes a list of all exceptions encountered, a retry.RetryFailureReason
87 enum indicating the failure cause, and the original timeout value
88 as arguments. It should return a tuple of the exception to be raised,
89 along with the cause exception if any. The default implementation will raise
90 a RetryError on timeout, or the last exception encountered otherwise.
91 init_args: Positional arguments to pass to the target function.
92 init_kwargs: Keyword arguments to pass to the target function.
93
94 Returns:
95 Generator: A retryable generator that wraps the target generator function.
96
97 Raises:
98 ValueError: If the sleep generator stops yielding values.
99 Exception: a custom exception specified by the exception_factory if provided.
100 If no exception_factory is provided:
101 google.api_core.RetryError: If the timeout is exceeded while retrying.
102 Exception: If the target raises an error that isn't retryable.
103 """
104
105 timeout = kwargs.get("deadline", timeout)
106 deadline: Optional[float] = (
107 time.monotonic() + timeout if timeout is not None else None
108 )
109 error_list: list[Exception] = []
110 sleep_iter = iter(sleep_generator)
111
112 # continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
113 # TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
114 while True:
115 # Start a new retry loop
116 try:
117 # Note: in the future, we can add a ResumptionStrategy object
118 # to generate new args between calls. For now, use the same args
119 # for each attempt.
120 subgenerator = target(*init_args, **init_kwargs)
121 return (yield from subgenerator)
122 # handle exceptions raised by the subgenerator
123 # pylint: disable=broad-except
124 # This function explicitly must deal with broad exceptions.
125 except Exception as exc:
126 # defer to shared logic for handling errors
127 next_sleep = _retry_error_helper(
128 exc,
129 deadline,
130 sleep_iter,
131 error_list,
132 predicate,
133 on_error,
134 exception_factory,
135 timeout,
136 )
137 # if exception not raised, sleep before next attempt
138 time.sleep(next_sleep)
139
140
141class StreamingRetry(_BaseRetry):
142 """Exponential retry decorator for streaming synchronous RPCs.
143
144 This class returns a Generator when called, which wraps the target
145 stream in retry logic. If any exception is raised by the target, the
146 entire stream will be retried within the wrapper.
147
148 Although the default behavior is to retry transient API errors, a
149 different predicate can be provided to retry other exceptions.
150
151 Important Note: when a stream encounters a retryable error, it will
152 silently construct a fresh iterator instance in the background
153 and continue yielding (likely duplicate) values as if no error occurred.
154 This is the most general way to retry a stream, but it often is not the
155 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
156
157 There are two ways to build more advanced retry logic for streams:
158
159 1. Wrap the target
160 Use a ``target`` that maintains state between retries, and creates a
161 different generator on each retry call. For example, you can wrap a
162 network call in a function that modifies the request based on what has
163 already been returned:
164
165 .. code-block:: python
166
167 def attempt_with_modified_request(target, request, seen_items=[]):
168 # remove seen items from request on each attempt
169 new_request = modify_request(request, seen_items)
170 new_generator = target(new_request)
171 for item in new_generator:
172 yield item
173 seen_items.append(item)
174
175 retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request)
176 retryable_generator = retry_wrapped_fn(target, request)
177
178 2. Wrap the retry generator
179 Alternatively, you can wrap the retryable generator itself before
180 passing it to the end-user to add a filter on the stream. For
181 example, you can keep track of the items that were successfully yielded
182 in previous retry attempts, and only yield new items when the
183 new attempt surpasses the previous ones:
184
185 .. code-block:: python
186
187 def retryable_with_filter(target):
188 stream_idx = 0
189 # reset stream_idx when the stream is retried
190 def on_error(e):
191 nonlocal stream_idx
192 stream_idx = 0
193 # build retryable
194 retryable_gen = StreamingRetry(...)(target)
195 # keep track of what has been yielded out of filter
196 seen_items = []
197 for item in retryable_gen():
198 if stream_idx >= len(seen_items):
199 seen_items.append(item)
200 yield item
201 elif item != seen_items[stream_idx]:
202 raise ValueError("Stream differs from last attempt")
203 stream_idx += 1
204
205 filter_retry_wrapped = retryable_with_filter(target)
206
207 Args:
208 predicate (Callable[Exception]): A callable that should return ``True``
209 if the given exception is retryable.
210 initial (float): The minimum amount of time to delay in seconds. This
211 must be greater than 0.
212 maximum (float): The maximum amount of time to delay in seconds.
213 multiplier (float): The multiplier applied to the delay.
214 timeout (float): How long to keep retrying, in seconds.
215 Note: timeout is only checked before initiating a retry, so the target may
216 run past the timeout value as long as it is healthy.
217 on_error (Callable[Exception]): A function to call while processing
218 a retryable exception. Any error raised by this function will
219 *not* be caught.
220 deadline (float): DEPRECATED: use `timeout` instead. For backward
221 compatibility, if specified it will override the ``timeout`` parameter.
222 """
223
224 def __call__(
225 self,
226 func: Callable[_P, Iterable[_Y]],
227 on_error: Callable[[Exception], Any] | None = None,
228 ) -> Callable[_P, Generator[_Y, Any, None]]:
229 """Wrap a callable with retry behavior.
230
231 Args:
232 func (Callable): The callable to add retry behavior to.
233 on_error (Optional[Callable[Exception]]): If given, the
234 on_error callback will be called with each retryable exception
235 raised by the wrapped function. Any error raised by this
236 function will *not* be caught. If on_error was specified in the
237 constructor, this value will be ignored.
238
239 Returns:
240 Callable: A callable that will invoke ``func`` with retry
241 behavior.
242 """
243 if self._on_error is not None:
244 on_error = self._on_error
245
246 @functools.wraps(func)
247 def retry_wrapped_func(
248 *args: _P.args, **kwargs: _P.kwargs
249 ) -> Generator[_Y, Any, None]:
250 """A wrapper that calls target function with retry."""
251 sleep_generator = exponential_sleep_generator(
252 self._initial, self._maximum, multiplier=self._multiplier
253 )
254 return retry_target_stream(
255 func,
256 predicate=self._predicate,
257 sleep_generator=sleep_generator,
258 timeout=self._timeout,
259 on_error=on_error,
260 init_args=args,
261 init_kwargs=kwargs,
262 )
263
264 return retry_wrapped_func