Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_streaming.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

39 statements  

1# Copyright 2023 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Generator wrapper for retryable streaming RPCs. 

17""" 

18from __future__ import annotations 

19 

20from typing import ( 

21 Callable, 

22 Optional, 

23 List, 

24 Tuple, 

25 Iterable, 

26 Generator, 

27 TypeVar, 

28 Any, 

29 TYPE_CHECKING, 

30) 

31 

32import sys 

33import time 

34import functools 

35 

36from google.api_core.retry.retry_base import _BaseRetry 

37from google.api_core.retry.retry_base import _retry_error_helper 

38from google.api_core.retry import exponential_sleep_generator 

39from google.api_core.retry import build_retry_error 

40from google.api_core.retry import RetryFailureReason 

41 

42if TYPE_CHECKING: 

43 if sys.version_info >= (3, 10): 

44 from typing import ParamSpec 

45 else: 

46 from typing_extensions import ParamSpec 

47 

48 _P = ParamSpec("_P") # target function call parameters 

49 _Y = TypeVar("_Y") # yielded values 

50 

51 

52def retry_target_stream( 

53 target: Callable[_P, Iterable[_Y]], 

54 predicate: Callable[[Exception], bool], 

55 sleep_generator: Iterable[float], 

56 timeout: Optional[float] = None, 

57 on_error: Optional[Callable[[Exception], None]] = None, 

58 exception_factory: Callable[ 

59 [List[Exception], RetryFailureReason, Optional[float]], 

60 Tuple[Exception, Optional[Exception]], 

61 ] = build_retry_error, 

62 init_args: tuple = (), 

63 init_kwargs: dict = {}, 

64 **kwargs, 

65) -> Generator[_Y, Any, None]: 

66 """Create a generator wrapper that retries the wrapped stream if it fails. 

67 

68 This is the lowest-level retry helper. Generally, you'll use the 

69 higher-level retry helper :class:`Retry`. 

70 

71 Args: 

72 target: The generator function to call and retry. 

73 predicate: A callable used to determine if an 

74 exception raised by the target should be considered retryable. 

75 It should return True to retry or False otherwise. 

76 sleep_generator: An infinite iterator that determines 

77 how long to sleep between retries. 

78 timeout: How long to keep retrying the target. 

79 Note: timeout is only checked before initiating a retry, so the target may 

80 run past the timeout value as long as it is healthy. 

81 on_error: If given, the on_error callback will be called with each 

82 retryable exception raised by the target. Any error raised by this 

83 function will *not* be caught. 

84 exception_factory: A function that is called when the retryable reaches 

85 a terminal failure state, used to construct an exception to be raised. 

86 It takes a list of all exceptions encountered, a retry.RetryFailureReason 

87 enum indicating the failure cause, and the original timeout value 

88 as arguments. It should return a tuple of the exception to be raised, 

89 along with the cause exception if any. The default implementation will raise 

90 a RetryError on timeout, or the last exception encountered otherwise. 

91 init_args: Positional arguments to pass to the target function. 

92 init_kwargs: Keyword arguments to pass to the target function. 

93 

94 Returns: 

95 Generator: A retryable generator that wraps the target generator function. 

96 

97 Raises: 

98 ValueError: If the sleep generator stops yielding values. 

99 Exception: a custom exception specified by the exception_factory if provided. 

100 If no exception_factory is provided: 

101 google.api_core.RetryError: If the timeout is exceeded while retrying. 

102 Exception: If the target raises an error that isn't retryable. 

103 """ 

104 

105 timeout = kwargs.get("deadline", timeout) 

106 deadline: Optional[float] = ( 

107 time.monotonic() + timeout if timeout is not None else None 

108 ) 

109 error_list: list[Exception] = [] 

110 sleep_iter = iter(sleep_generator) 

111 

112 # continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper 

113 # TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535 

114 while True: 

115 # Start a new retry loop 

116 try: 

117 # Note: in the future, we can add a ResumptionStrategy object 

118 # to generate new args between calls. For now, use the same args 

119 # for each attempt. 

120 subgenerator = target(*init_args, **init_kwargs) 

121 return (yield from subgenerator) 

122 # handle exceptions raised by the subgenerator 

123 # pylint: disable=broad-except 

124 # This function explicitly must deal with broad exceptions. 

125 except Exception as exc: 

126 # defer to shared logic for handling errors 

127 next_sleep = _retry_error_helper( 

128 exc, 

129 deadline, 

130 sleep_iter, 

131 error_list, 

132 predicate, 

133 on_error, 

134 exception_factory, 

135 timeout, 

136 ) 

137 # if exception not raised, sleep before next attempt 

138 time.sleep(next_sleep) 

139 

140 

141class StreamingRetry(_BaseRetry): 

142 """Exponential retry decorator for streaming synchronous RPCs. 

143 

144 This class returns a Generator when called, which wraps the target 

145 stream in retry logic. If any exception is raised by the target, the 

146 entire stream will be retried within the wrapper. 

147 

148 Although the default behavior is to retry transient API errors, a 

149 different predicate can be provided to retry other exceptions. 

150 

151 Important Note: when a stream encounters a retryable error, it will 

152 silently construct a fresh iterator instance in the background 

153 and continue yielding (likely duplicate) values as if no error occurred. 

154 This is the most general way to retry a stream, but it often is not the 

155 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] 

156 

157 There are two ways to build more advanced retry logic for streams: 

158 

159 1. Wrap the target 

160 Use a ``target`` that maintains state between retries, and creates a 

161 different generator on each retry call. For example, you can wrap a 

162 network call in a function that modifies the request based on what has 

163 already been returned: 

164 

165 .. code-block:: python 

166 

167 def attempt_with_modified_request(target, request, seen_items=[]): 

168 # remove seen items from request on each attempt 

169 new_request = modify_request(request, seen_items) 

170 new_generator = target(new_request) 

171 for item in new_generator: 

172 yield item 

173 seen_items.append(item) 

174 

175 retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request) 

176 retryable_generator = retry_wrapped_fn(target, request) 

177 

178 2. Wrap the retry generator 

179 Alternatively, you can wrap the retryable generator itself before 

180 passing it to the end-user to add a filter on the stream. For 

181 example, you can keep track of the items that were successfully yielded 

182 in previous retry attempts, and only yield new items when the 

183 new attempt surpasses the previous ones: 

184 

185 .. code-block:: python 

186 

187 def retryable_with_filter(target): 

188 stream_idx = 0 

189 # reset stream_idx when the stream is retried 

190 def on_error(e): 

191 nonlocal stream_idx 

192 stream_idx = 0 

193 # build retryable 

194 retryable_gen = StreamingRetry(...)(target) 

195 # keep track of what has been yielded out of filter 

196 seen_items = [] 

197 for item in retryable_gen(): 

198 if stream_idx >= len(seen_items): 

199 seen_items.append(item) 

200 yield item 

201 elif item != seen_items[stream_idx]: 

202 raise ValueError("Stream differs from last attempt") 

203 stream_idx += 1 

204 

205 filter_retry_wrapped = retryable_with_filter(target) 

206 

207 Args: 

208 predicate (Callable[Exception]): A callable that should return ``True`` 

209 if the given exception is retryable. 

210 initial (float): The minimum amount of time to delay in seconds. This 

211 must be greater than 0. 

212 maximum (float): The maximum amount of time to delay in seconds. 

213 multiplier (float): The multiplier applied to the delay. 

214 timeout (float): How long to keep retrying, in seconds. 

215 Note: timeout is only checked before initiating a retry, so the target may 

216 run past the timeout value as long as it is healthy. 

217 on_error (Callable[Exception]): A function to call while processing 

218 a retryable exception. Any error raised by this function will 

219 *not* be caught. 

220 deadline (float): DEPRECATED: use `timeout` instead. For backward 

221 compatibility, if specified it will override the ``timeout`` parameter. 

222 """ 

223 

224 def __call__( 

225 self, 

226 func: Callable[_P, Iterable[_Y]], 

227 on_error: Callable[[Exception], Any] | None = None, 

228 ) -> Callable[_P, Generator[_Y, Any, None]]: 

229 """Wrap a callable with retry behavior. 

230 

231 Args: 

232 func (Callable): The callable to add retry behavior to. 

233 on_error (Optional[Callable[Exception]]): If given, the 

234 on_error callback will be called with each retryable exception 

235 raised by the wrapped function. Any error raised by this 

236 function will *not* be caught. If on_error was specified in the 

237 constructor, this value will be ignored. 

238 

239 Returns: 

240 Callable: A callable that will invoke ``func`` with retry 

241 behavior. 

242 """ 

243 if self._on_error is not None: 

244 on_error = self._on_error 

245 

246 @functools.wraps(func) 

247 def retry_wrapped_func( 

248 *args: _P.args, **kwargs: _P.kwargs 

249 ) -> Generator[_Y, Any, None]: 

250 """A wrapper that calls target function with retry.""" 

251 sleep_generator = exponential_sleep_generator( 

252 self._initial, self._maximum, multiplier=self._multiplier 

253 ) 

254 return retry_target_stream( 

255 func, 

256 predicate=self._predicate, 

257 sleep_generator=sleep_generator, 

258 timeout=self._timeout, 

259 on_error=on_error, 

260 init_args=args, 

261 init_kwargs=kwargs, 

262 ) 

263 

264 return retry_wrapped_func