Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py: 86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for wrapping low-level gRPC methods with common functionality. 

16 

17This is used by gapic clients to provide common error mapping, retry, timeout, 

18compression, pagination, and long-running operations to gRPC methods. 

19""" 

20 

21import enum 

22import functools 

23 

24from google.api_core import grpc_helpers 

25from google.api_core.gapic_v1 import client_info 

26from google.api_core.timeout import TimeToDeadlineTimeout 

27 

28USE_DEFAULT_METADATA = object() 

29 

30 

31class _MethodDefault(enum.Enum): 

32 # Uses enum so that pytype/mypy knows that this is the only possible value. 

33 # https://stackoverflow.com/a/60605919/101923 

34 _DEFAULT_VALUE = object() 

35 

36 

37DEFAULT = _MethodDefault._DEFAULT_VALUE 

38"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified, 

39so the default should be used.""" 

40 

41 

42def _is_not_none_or_false(value): 

43 return value is not None and value is not False 

44 

45 

46def _apply_decorators(func, decorators): 

47 """Apply a list of decorators to a given function. 

48 

49 ``decorators`` may contain items that are ``None`` or ``False`` which will 

50 be ignored. 

51 """ 

52 filtered_decorators = filter(_is_not_none_or_false, reversed(decorators)) 

53 

54 for decorator in filtered_decorators: 

55 func = decorator(func) 

56 

57 return func 

58 

59 

60class _GapicCallable(object): 

61 """Callable that applies retry, timeout, and metadata logic. 

62 

63 Args: 

64 target (Callable): The low-level RPC method. 

65 retry (google.api_core.retry.Retry): The default retry for the 

66 callable. If ``None``, this callable will not retry by default 

67 timeout (google.api_core.timeout.Timeout): The default timeout for the 

68 callable (i.e. duration of time within which an RPC must terminate 

69 after its start, not to be confused with deadline). If ``None``, 

70 this callable will not specify a timeout argument to the low-level 

71 RPC method. 

72 compression (grpc.Compression): The default compression for the callable. 

73 If ``None``, this callable will not specify a compression argument 

74 to the low-level RPC method. 

75 metadata (Sequence[Tuple[str, str]]): Additional metadata that is 

76 provided to the RPC method on every invocation. This is merged with 

77 any metadata specified during invocation. If ``None``, no 

78 additional metadata will be passed to the RPC method. 

79 """ 

80 

81 def __init__( 

82 self, 

83 target, 

84 retry, 

85 timeout, 

86 compression, 

87 metadata=None, 

88 ): 

89 self._target = target 

90 self._retry = retry 

91 self._timeout = timeout 

92 self._compression = compression 

93 self._metadata = metadata 

94 

95 def __call__( 

96 self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs 

97 ): 

98 """Invoke the low-level RPC with retry, timeout, compression, and metadata.""" 

99 

100 if retry is DEFAULT: 

101 retry = self._retry 

102 

103 if timeout is DEFAULT: 

104 timeout = self._timeout 

105 

106 if compression is DEFAULT: 

107 compression = self._compression 

108 

109 if isinstance(timeout, (int, float)): 

110 timeout = TimeToDeadlineTimeout(timeout=timeout) 

111 

112 # Apply all applicable decorators. 

113 wrapped_func = _apply_decorators(self._target, [retry, timeout]) 

114 

115 # Add the user agent metadata to the call. 

116 if self._metadata is not None: 

117 metadata = kwargs.get("metadata", []) 

118 # Due to the nature of invocation, None should be treated the same 

119 # as not specified. 

120 if metadata is None: 

121 metadata = [] 

122 metadata = list(metadata) 

123 metadata.extend(self._metadata) 

124 kwargs["metadata"] = metadata 

125 if self._compression is not None: 

126 kwargs["compression"] = compression 

127 

128 return wrapped_func(*args, **kwargs) 

129 

130 

131def wrap_method( 

132 func, 

133 default_retry=None, 

134 default_timeout=None, 

135 default_compression=None, 

136 client_info=client_info.DEFAULT_CLIENT_INFO, 

137 *, 

138 with_call=False, 

139): 

140 """Wrap an RPC method with common behavior. 

141 

142 This applies common error wrapping, retry, timeout, and compression behavior to a function. 

143 The wrapped function will take optional ``retry``, ``timeout``, and ``compression`` 

144 arguments. 

145 

146 For example:: 

147 

148 import google.api_core.gapic_v1.method 

149 from google.api_core import retry 

150 from google.api_core import timeout 

151 from grpc import Compression 

152 

153 # The original RPC method. 

154 def get_topic(name, timeout=None): 

155 request = publisher_v2.GetTopicRequest(name=name) 

156 return publisher_stub.GetTopic(request, timeout=timeout) 

157 

158 default_retry = retry.Retry(deadline=60) 

159 default_timeout = timeout.Timeout(deadline=60) 

160 default_compression = Compression.NoCompression 

161 wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( 

162 get_topic, default_retry) 

163 

164 # Execute get_topic with default retry and timeout: 

165 response = wrapped_get_topic() 

166 

167 # Execute get_topic without doing any retying but with the default 

168 # timeout: 

169 response = wrapped_get_topic(retry=None) 

170 

171 # Execute get_topic but only retry on 5xx errors: 

172 my_retry = retry.Retry(retry.if_exception_type( 

173 exceptions.InternalServerError)) 

174 response = wrapped_get_topic(retry=my_retry) 

175 

176 The way this works is by late-wrapping the given function with the retry 

177 and timeout decorators. Essentially, when ``wrapped_get_topic()`` is 

178 called: 

179 

180 * ``get_topic()`` is first wrapped with the ``timeout`` into 

181 ``get_topic_with_timeout``. 

182 * ``get_topic_with_timeout`` is wrapped with the ``retry`` into 

183 ``get_topic_with_timeout_and_retry()``. 

184 * The final ``get_topic_with_timeout_and_retry`` is called passing through 

185 the ``args`` and ``kwargs``. 

186 

187 The callstack is therefore:: 

188 

189 method.__call__() -> 

190 Retry.__call__() -> 

191 Timeout.__call__() -> 

192 wrap_errors() -> 

193 get_topic() 

194 

195 Note that if ``timeout`` or ``retry`` is ``None``, then they are not 

196 applied to the function. For example, 

197 ``wrapped_get_topic(timeout=None, retry=None)`` is more or less 

198 equivalent to just calling ``get_topic`` but with error re-mapping. 

199 

200 Args: 

201 func (Callable[Any]): The function to wrap. It should accept an 

202 optional ``timeout`` argument. If ``metadata`` is not ``None``, it 

203 should accept a ``metadata`` argument. 

204 default_retry (Optional[google.api_core.Retry]): The default retry 

205 strategy. If ``None``, the method will not retry by default. 

206 default_timeout (Optional[google.api_core.Timeout]): The default 

207 timeout strategy. Can also be specified as an int or float. If 

208 ``None``, the method will not have timeout specified by default. 

209 default_compression (Optional[grpc.Compression]): The default 

210 grpc.Compression. If ``None``, the method will not have 

211 compression specified by default. 

212 client_info 

213 (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 

214 Client information used to create a user-agent string that's 

215 passed as gRPC metadata to the method. If unspecified, then 

216 a sane default will be used. If ``None``, then no user agent 

217 metadata will be provided to the RPC method. 

218 with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will 

219 return a tuple of (response, grpc.Call) instead of just the response. 

220 This is useful for extracting trailing metadata from unary calls. 

221 Defaults to False. 

222 

223 Returns: 

224 Callable: A new callable that takes optional ``retry``, ``timeout``, 

225 and ``compression`` 

226 arguments and applies the common error mapping, retry, timeout, compression, 

227 and metadata behavior to the low-level RPC method. 

228 """ 

229 if with_call: 

230 try: 

231 func = func.with_call 

232 except AttributeError as exc: 

233 raise ValueError( 

234 "with_call=True is only supported for unary calls." 

235 ) from exc 

236 func = grpc_helpers.wrap_errors(func) 

237 if client_info is not None: 

238 user_agent_metadata = [client_info.to_grpc_metadata()] 

239 else: 

240 user_agent_metadata = None 

241 

242 return functools.wraps(func)( 

243 _GapicCallable( 

244 func, 

245 default_retry, 

246 default_timeout, 

247 default_compression, 

248 metadata=user_agent_metadata, 

249 ) 

250 )