Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_streaming_async.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1# Copyright 2023 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Generator wrapper for retryable async streaming RPCs. 

17""" 

18from __future__ import annotations 

19 

20from typing import ( 

21 cast, 

22 Any, 

23 Callable, 

24 Iterable, 

25 AsyncIterator, 

26 AsyncIterable, 

27 Awaitable, 

28 TypeVar, 

29 AsyncGenerator, 

30 TYPE_CHECKING, 

31) 

32 

33import asyncio 

34import time 

35import sys 

36import functools 

37 

38from google.api_core.retry.retry_base import _BaseRetry 

39from google.api_core.retry.retry_base import _retry_error_helper 

40from google.api_core.retry import exponential_sleep_generator 

41from google.api_core.retry import build_retry_error 

42from google.api_core.retry import RetryFailureReason 

43 

44 

45if TYPE_CHECKING: 

46 if sys.version_info >= (3, 10): 

47 from typing import ParamSpec 

48 else: 

49 from typing_extensions import ParamSpec 

50 

51 _P = ParamSpec("_P") # target function call parameters 

52 _Y = TypeVar("_Y") # yielded values 

53 

54 

55async def retry_target_stream( 

56 target: Callable[_P, AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]], 

57 predicate: Callable[[Exception], bool], 

58 sleep_generator: Iterable[float], 

59 timeout: float | None = None, 

60 on_error: Callable[[Exception], None] | None = None, 

61 exception_factory: Callable[ 

62 [list[Exception], RetryFailureReason, float | None], 

63 tuple[Exception, Exception | None], 

64 ] = build_retry_error, 

65 init_args: _P.args = (), 

66 init_kwargs: _P.kwargs = {}, 

67 **kwargs, 

68) -> AsyncGenerator[_Y, None]: 

69 """Create a generator wrapper that retries the wrapped stream if it fails. 

70 

71 This is the lowest-level retry helper. Generally, you'll use the 

72 higher-level retry helper :class:`AsyncRetry`. 

73 

74 Args: 

75 target: The generator function to call and retry. 

76 predicate: A callable used to determine if an 

77 exception raised by the target should be considered retryable. 

78 It should return True to retry or False otherwise. 

79 sleep_generator: An infinite iterator that determines 

80 how long to sleep between retries. 

81 timeout: How long to keep retrying the target. 

82 Note: timeout is only checked before initiating a retry, so the target may 

83 run past the timeout value as long as it is healthy. 

84 on_error: If given, the on_error callback will be called with each 

85 retryable exception raised by the target. Any error raised by this 

86 function will *not* be caught. 

87 exception_factory: A function that is called when the retryable reaches 

88 a terminal failure state, used to construct an exception to be raised. 

89 It takes a list of all exceptions encountered, a retry.RetryFailureReason 

90 enum indicating the failure cause, and the original timeout value 

91 as arguments. It should return a tuple of the exception to be raised, 

92 along with the cause exception if any. The default implementation will raise 

93 a RetryError on timeout, or the last exception encountered otherwise. 

94 init_args: Positional arguments to pass to the target function. 

95 init_kwargs: Keyword arguments to pass to the target function. 

96 

97 Returns: 

98 AsyncGenerator: A retryable generator that wraps the target generator function. 

99 

100 Raises: 

101 ValueError: If the sleep generator stops yielding values. 

102 Exception: a custom exception specified by the exception_factory if provided. 

103 If no exception_factory is provided: 

104 google.api_core.RetryError: If the timeout is exceeded while retrying. 

105 Exception: If the target raises an error that isn't retryable. 

106 """ 

107 target_iterator: AsyncIterator[_Y] | None = None 

108 timeout = kwargs.get("deadline", timeout) 

109 deadline = time.monotonic() + timeout if timeout else None 

110 # keep track of retryable exceptions we encounter to pass in to exception_factory 

111 error_list: list[Exception] = [] 

112 target_is_generator: bool | None = None 

113 

114 for sleep in sleep_generator: 

115 # Start a new retry loop 

116 try: 

117 # Note: in the future, we can add a ResumptionStrategy object 

118 # to generate new args between calls. For now, use the same args 

119 # for each attempt. 

120 target_output: AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]] = target( 

121 *init_args, **init_kwargs 

122 ) 

123 try: 

124 # gapic functions return the generator behind an awaitable 

125 # unwrap the awaitable so we can work with the generator directly 

126 target_output = await target_output # type: ignore 

127 except TypeError: 

128 # was not awaitable, continue 

129 pass 

130 target_iterator = cast(AsyncIterable["_Y"], target_output).__aiter__() 

131 

132 if target_is_generator is None: 

133 # Check if target supports generator features (asend, athrow, aclose) 

134 target_is_generator = bool(getattr(target_iterator, "asend", None)) 

135 

136 sent_in = None 

137 while True: 

138 ## Read from target_iterator 

139 # If the target is a generator, we will advance it with `asend` 

140 # otherwise, we will use `anext` 

141 if target_is_generator: 

142 next_value = await target_iterator.asend(sent_in) # type: ignore 

143 else: 

144 next_value = await target_iterator.__anext__() 

145 ## Yield from Wrapper to caller 

146 try: 

147 # yield latest value from target 

148 # exceptions from `athrow` and `aclose` are injected here 

149 sent_in = yield next_value 

150 except GeneratorExit: 

151 # if wrapper received `aclose` while waiting on yield, 

152 # it will raise GeneratorExit here 

153 if target_is_generator: 

154 # pass to inner target_iterator for handling 

155 await cast(AsyncGenerator["_Y", None], target_iterator).aclose() 

156 else: 

157 raise 

158 return 

159 except: # noqa: E722 

160 # bare except catches any exception passed to `athrow` 

161 if target_is_generator: 

162 # delegate error handling to target_iterator 

163 await cast(AsyncGenerator["_Y", None], target_iterator).athrow( 

164 cast(BaseException, sys.exc_info()[1]) 

165 ) 

166 else: 

167 raise 

168 return 

169 except StopAsyncIteration: 

170 # if iterator exhausted, return 

171 return 

172 # handle exceptions raised by the target_iterator 

173 # pylint: disable=broad-except 

174 # This function explicitly must deal with broad exceptions. 

175 except Exception as exc: 

176 # defer to shared logic for handling errors 

177 _retry_error_helper( 

178 exc, 

179 deadline, 

180 sleep, 

181 error_list, 

182 predicate, 

183 on_error, 

184 exception_factory, 

185 timeout, 

186 ) 

187 # if exception not raised, sleep before next attempt 

188 await asyncio.sleep(sleep) 

189 finally: 

190 if target_is_generator and target_iterator is not None: 

191 await cast(AsyncGenerator["_Y", None], target_iterator).aclose() 

192 raise ValueError("Sleep generator stopped yielding sleep values.") 

193 

194 

195class AsyncStreamingRetry(_BaseRetry): 

196 """Exponential retry decorator for async streaming rpcs. 

197 

198 This class returns an AsyncGenerator when called, which wraps the target 

199 stream in retry logic. If any exception is raised by the target, the 

200 entire stream will be retried within the wrapper. 

201 

202 Although the default behavior is to retry transient API errors, a 

203 different predicate can be provided to retry other exceptions. 

204 

205 Important Note: when a stream is encounters a retryable error, it will 

206 silently construct a fresh iterator instance in the background 

207 and continue yielding (likely duplicate) values as if no error occurred. 

208 This is the most general way to retry a stream, but it often is not the 

209 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] 

210 

211 There are two ways to build more advanced retry logic for streams: 

212 

213 1. Wrap the target 

214 Use a ``target`` that maintains state between retries, and creates a 

215 different generator on each retry call. For example, you can wrap a 

216 grpc call in a function that modifies the request based on what has 

217 already been returned: 

218 

219 .. code-block:: python 

220 

221 async def attempt_with_modified_request(target, request, seen_items=[]): 

222 # remove seen items from request on each attempt 

223 new_request = modify_request(request, seen_items) 

224 new_generator = await target(new_request) 

225 async for item in new_generator: 

226 yield item 

227 seen_items.append(item) 

228 

229 retry_wrapped = AsyncRetry(is_stream=True,...)(attempt_with_modified_request, target, request, []) 

230 

231 2. Wrap the retry generator 

232 Alternatively, you can wrap the retryable generator itself before 

233 passing it to the end-user to add a filter on the stream. For 

234 example, you can keep track of the items that were successfully yielded 

235 in previous retry attempts, and only yield new items when the 

236 new attempt surpasses the previous ones: 

237 

238 .. code-block:: python 

239 

240 async def retryable_with_filter(target): 

241 stream_idx = 0 

242 # reset stream_idx when the stream is retried 

243 def on_error(e): 

244 nonlocal stream_idx 

245 stream_idx = 0 

246 # build retryable 

247 retryable_gen = AsyncRetry(is_stream=True, ...)(target) 

248 # keep track of what has been yielded out of filter 

249 seen_items = [] 

250 async for item in retryable_gen: 

251 if stream_idx >= len(seen_items): 

252 yield item 

253 seen_items.append(item) 

254 elif item != previous_stream[stream_idx]: 

255 raise ValueError("Stream differs from last attempt")" 

256 stream_idx += 1 

257 

258 filter_retry_wrapped = retryable_with_filter(target) 

259 

260 Args: 

261 predicate (Callable[Exception]): A callable that should return ``True`` 

262 if the given exception is retryable. 

263 initial (float): The minimum amount of time to delay in seconds. This 

264 must be greater than 0. 

265 maximum (float): The maximum amount of time to delay in seconds. 

266 multiplier (float): The multiplier applied to the delay. 

267 timeout (Optional[float]): How long to keep retrying in seconds. 

268 Note: timeout is only checked before initiating a retry, so the target may 

269 run past the timeout value as long as it is healthy. 

270 on_error (Optional[Callable[Exception]]): A function to call while processing 

271 a retryable exception. Any error raised by this function will 

272 *not* be caught. 

273 is_stream (bool): Indicates whether the input function 

274 should be treated as a stream function (i.e. an AsyncGenerator, 

275 or function or coroutine that returns an AsyncIterable). 

276 If True, the iterable will be wrapped with retry logic, and any 

277 failed outputs will restart the stream. If False, only the input 

278 function call itself will be retried. Defaults to False. 

279 To avoid duplicate values, retryable streams should typically be 

280 wrapped in additional filter logic before use. 

281 deadline (float): DEPRECATED use ``timeout`` instead. If set it will 

282 override ``timeout`` parameter. 

283 """ 

284 

285 def __call__( 

286 self, 

287 func: Callable[..., AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]], 

288 on_error: Callable[[Exception], Any] | None = None, 

289 ) -> Callable[_P, Awaitable[AsyncGenerator[_Y, None]]]: 

290 """Wrap a callable with retry behavior. 

291 

292 Args: 

293 func (Callable): The callable or stream to add retry behavior to. 

294 on_error (Optional[Callable[Exception]]): If given, the 

295 on_error callback will be called with each retryable exception 

296 raised by the wrapped function. Any error raised by this 

297 function will *not* be caught. If on_error was specified in the 

298 constructor, this value will be ignored. 

299 

300 Returns: 

301 Callable: A callable that will invoke ``func`` with retry 

302 behavior. 

303 """ 

304 if self._on_error is not None: 

305 on_error = self._on_error 

306 

307 @functools.wraps(func) 

308 async def retry_wrapped_func( 

309 *args: _P.args, **kwargs: _P.kwargs 

310 ) -> AsyncGenerator[_Y, None]: 

311 """A wrapper that calls target function with retry.""" 

312 sleep_generator = exponential_sleep_generator( 

313 self._initial, self._maximum, multiplier=self._multiplier 

314 ) 

315 return retry_target_stream( 

316 func, 

317 self._predicate, 

318 sleep_generator, 

319 self._timeout, 

320 on_error, 

321 init_args=args, 

322 init_kwargs=kwargs, 

323 ) 

324 

325 return retry_wrapped_func