Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/retry/retry_streaming.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

38 statements  

1# Copyright 2023 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Generator wrapper for retryable streaming RPCs. 

17""" 

18from __future__ import annotations 

19 

20from typing import ( 

21 Callable, 

22 Optional, 

23 List, 

24 Tuple, 

25 Iterable, 

26 Generator, 

27 TypeVar, 

28 Any, 

29 TYPE_CHECKING, 

30) 

31 

32import sys 

33import time 

34import functools 

35 

36from google.api_core.retry.retry_base import _BaseRetry 

37from google.api_core.retry.retry_base import _retry_error_helper 

38from google.api_core.retry import exponential_sleep_generator 

39from google.api_core.retry import build_retry_error 

40from google.api_core.retry import RetryFailureReason 

41 

42if TYPE_CHECKING: 

43 if sys.version_info >= (3, 10): 

44 from typing import ParamSpec 

45 else: 

46 from typing_extensions import ParamSpec 

47 

48 _P = ParamSpec("_P") # target function call parameters 

49 _Y = TypeVar("_Y") # yielded values 

50 

51 

52def retry_target_stream( 

53 target: Callable[_P, Iterable[_Y]], 

54 predicate: Callable[[Exception], bool], 

55 sleep_generator: Iterable[float], 

56 timeout: Optional[float] = None, 

57 on_error: Optional[Callable[[Exception], None]] = None, 

58 exception_factory: Callable[ 

59 [List[Exception], RetryFailureReason, Optional[float]], 

60 Tuple[Exception, Optional[Exception]], 

61 ] = build_retry_error, 

62 init_args: _P.args = (), 

63 init_kwargs: _P.kwargs = {}, 

64 **kwargs, 

65) -> Generator[_Y, Any, None]: 

66 """Create a generator wrapper that retries the wrapped stream if it fails. 

67 

68 This is the lowest-level retry helper. Generally, you'll use the 

69 higher-level retry helper :class:`Retry`. 

70 

71 Args: 

72 target: The generator function to call and retry. 

73 predicate: A callable used to determine if an 

74 exception raised by the target should be considered retryable. 

75 It should return True to retry or False otherwise. 

76 sleep_generator: An infinite iterator that determines 

77 how long to sleep between retries. 

78 timeout: How long to keep retrying the target. 

79 Note: timeout is only checked before initiating a retry, so the target may 

80 run past the timeout value as long as it is healthy. 

81 on_error: If given, the on_error callback will be called with each 

82 retryable exception raised by the target. Any error raised by this 

83 function will *not* be caught. 

84 exception_factory: A function that is called when the retryable reaches 

85 a terminal failure state, used to construct an exception to be raised. 

86 It takes a list of all exceptions encountered, a retry.RetryFailureReason 

87 enum indicating the failure cause, and the original timeout value 

88 as arguments. It should return a tuple of the exception to be raised, 

89 along with the cause exception if any. The default implementation will raise 

90 a RetryError on timeout, or the last exception encountered otherwise. 

91 init_args: Positional arguments to pass to the target function. 

92 init_kwargs: Keyword arguments to pass to the target function. 

93 

94 Returns: 

95 Generator: A retryable generator that wraps the target generator function. 

96 

97 Raises: 

98 ValueError: If the sleep generator stops yielding values. 

99 Exception: a custom exception specified by the exception_factory if provided. 

100 If no exception_factory is provided: 

101 google.api_core.RetryError: If the timeout is exceeded while retrying. 

102 Exception: If the target raises an error that isn't retryable. 

103 """ 

104 

105 timeout = kwargs.get("deadline", timeout) 

106 deadline: Optional[float] = ( 

107 time.monotonic() + timeout if timeout is not None else None 

108 ) 

109 error_list: list[Exception] = [] 

110 

111 for sleep in sleep_generator: 

112 # Start a new retry loop 

113 try: 

114 # Note: in the future, we can add a ResumptionStrategy object 

115 # to generate new args between calls. For now, use the same args 

116 # for each attempt. 

117 subgenerator = target(*init_args, **init_kwargs) 

118 return (yield from subgenerator) 

119 # handle exceptions raised by the subgenerator 

120 # pylint: disable=broad-except 

121 # This function explicitly must deal with broad exceptions. 

122 except Exception as exc: 

123 # defer to shared logic for handling errors 

124 _retry_error_helper( 

125 exc, 

126 deadline, 

127 sleep, 

128 error_list, 

129 predicate, 

130 on_error, 

131 exception_factory, 

132 timeout, 

133 ) 

134 # if exception not raised, sleep before next attempt 

135 time.sleep(sleep) 

136 

137 raise ValueError("Sleep generator stopped yielding sleep values.") 

138 

139 

140class StreamingRetry(_BaseRetry): 

141 """Exponential retry decorator for streaming synchronous RPCs. 

142 

143 This class returns a Generator when called, which wraps the target 

144 stream in retry logic. If any exception is raised by the target, the 

145 entire stream will be retried within the wrapper. 

146 

147 Although the default behavior is to retry transient API errors, a 

148 different predicate can be provided to retry other exceptions. 

149 

150 Important Note: when a stream encounters a retryable error, it will 

151 silently construct a fresh iterator instance in the background 

152 and continue yielding (likely duplicate) values as if no error occurred. 

153 This is the most general way to retry a stream, but it often is not the 

154 desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] 

155 

156 There are two ways to build more advanced retry logic for streams: 

157 

158 1. Wrap the target 

159 Use a ``target`` that maintains state between retries, and creates a 

160 different generator on each retry call. For example, you can wrap a 

161 network call in a function that modifies the request based on what has 

162 already been returned: 

163 

164 .. code-block:: python 

165 

166 def attempt_with_modified_request(target, request, seen_items=[]): 

167 # remove seen items from request on each attempt 

168 new_request = modify_request(request, seen_items) 

169 new_generator = target(new_request) 

170 for item in new_generator: 

171 yield item 

172 seen_items.append(item) 

173 

174 retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request) 

175 retryable_generator = retry_wrapped_fn(target, request) 

176 

177 2. Wrap the retry generator 

178 Alternatively, you can wrap the retryable generator itself before 

179 passing it to the end-user to add a filter on the stream. For 

180 example, you can keep track of the items that were successfully yielded 

181 in previous retry attempts, and only yield new items when the 

182 new attempt surpasses the previous ones: 

183 

184 .. code-block:: python 

185 

186 def retryable_with_filter(target): 

187 stream_idx = 0 

188 # reset stream_idx when the stream is retried 

189 def on_error(e): 

190 nonlocal stream_idx 

191 stream_idx = 0 

192 # build retryable 

193 retryable_gen = StreamingRetry(...)(target) 

194 # keep track of what has been yielded out of filter 

195 seen_items = [] 

196 for item in retryable_gen(): 

197 if stream_idx >= len(seen_items): 

198 seen_items.append(item) 

199 yield item 

200 elif item != seen_items[stream_idx]: 

201 raise ValueError("Stream differs from last attempt") 

202 stream_idx += 1 

203 

204 filter_retry_wrapped = retryable_with_filter(target) 

205 

206 Args: 

207 predicate (Callable[Exception]): A callable that should return ``True`` 

208 if the given exception is retryable. 

209 initial (float): The minimum amount of time to delay in seconds. This 

210 must be greater than 0. 

211 maximum (float): The maximum amount of time to delay in seconds. 

212 multiplier (float): The multiplier applied to the delay. 

213 timeout (float): How long to keep retrying, in seconds. 

214 Note: timeout is only checked before initiating a retry, so the target may 

215 run past the timeout value as long as it is healthy. 

216 on_error (Callable[Exception]): A function to call while processing 

217 a retryable exception. Any error raised by this function will 

218 *not* be caught. 

219 deadline (float): DEPRECATED: use `timeout` instead. For backward 

220 compatibility, if specified it will override the ``timeout`` parameter. 

221 """ 

222 

223 def __call__( 

224 self, 

225 func: Callable[_P, Iterable[_Y]], 

226 on_error: Callable[[Exception], Any] | None = None, 

227 ) -> Callable[_P, Generator[_Y, Any, None]]: 

228 """Wrap a callable with retry behavior. 

229 

230 Args: 

231 func (Callable): The callable to add retry behavior to. 

232 on_error (Optional[Callable[Exception]]): If given, the 

233 on_error callback will be called with each retryable exception 

234 raised by the wrapped function. Any error raised by this 

235 function will *not* be caught. If on_error was specified in the 

236 constructor, this value will be ignored. 

237 

238 Returns: 

239 Callable: A callable that will invoke ``func`` with retry 

240 behavior. 

241 """ 

242 if self._on_error is not None: 

243 on_error = self._on_error 

244 

245 @functools.wraps(func) 

246 def retry_wrapped_func( 

247 *args: _P.args, **kwargs: _P.kwargs 

248 ) -> Generator[_Y, Any, None]: 

249 """A wrapper that calls target function with retry.""" 

250 sleep_generator = exponential_sleep_generator( 

251 self._initial, self._maximum, multiplier=self._multiplier 

252 ) 

253 return retry_target_stream( 

254 func, 

255 predicate=self._predicate, 

256 sleep_generator=sleep_generator, 

257 timeout=self._timeout, 

258 on_error=on_error, 

259 init_args=args, 

260 init_kwargs=kwargs, 

261 ) 

262 

263 return retry_wrapped_func