Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for wrapping low-level gRPC methods with common functionality. 

16 

17This is used by gapic clients to provide common error mapping, retry, timeout, 

18compression, pagination, and long-running operations to gRPC methods. 

19""" 

20 

21import enum 

22import functools 

23 

24from google.api_core import grpc_helpers 

25from google.api_core.gapic_v1 import client_info 

26from google.api_core.timeout import TimeToDeadlineTimeout 

27 

28USE_DEFAULT_METADATA = object() 

29 

30 

31class _MethodDefault(enum.Enum): 

32 # Uses enum so that pytype/mypy knows that this is the only possible value. 

33 # https://stackoverflow.com/a/60605919/101923 

34 # 

35 # Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8. 

36 # https://docs.python.org/3/library/typing.html#typing.Literal 

37 _DEFAULT_VALUE = object() 

38 

39 

40DEFAULT = _MethodDefault._DEFAULT_VALUE 

41"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified, 

42so the default should be used.""" 

43 

44 

45def _is_not_none_or_false(value): 

46 return value is not None and value is not False 

47 

48 

49def _apply_decorators(func, decorators): 

50 """Apply a list of decorators to a given function. 

51 

52 ``decorators`` may contain items that are ``None`` or ``False`` which will 

53 be ignored. 

54 """ 

55 filtered_decorators = filter(_is_not_none_or_false, reversed(decorators)) 

56 

57 for decorator in filtered_decorators: 

58 func = decorator(func) 

59 

60 return func 

61 

62 

63class _GapicCallable(object): 

64 """Callable that applies retry, timeout, and metadata logic. 

65 

66 Args: 

67 target (Callable): The low-level RPC method. 

68 retry (google.api_core.retry.Retry): The default retry for the 

69 callable. If ``None``, this callable will not retry by default 

70 timeout (google.api_core.timeout.Timeout): The default timeout for the 

71 callable (i.e. duration of time within which an RPC must terminate 

72 after its start, not to be confused with deadline). If ``None``, 

73 this callable will not specify a timeout argument to the low-level 

74 RPC method. 

75 compression (grpc.Compression): The default compression for the callable. 

76 If ``None``, this callable will not specify a compression argument 

77 to the low-level RPC method. 

78 metadata (Sequence[Tuple[str, str]]): Additional metadata that is 

79 provided to the RPC method on every invocation. This is merged with 

80 any metadata specified during invocation. If ``None``, no 

81 additional metadata will be passed to the RPC method. 

82 """ 

83 

84 def __init__( 

85 self, 

86 target, 

87 retry, 

88 timeout, 

89 compression, 

90 metadata=None, 

91 ): 

92 self._target = target 

93 self._retry = retry 

94 self._timeout = timeout 

95 self._compression = compression 

96 self._metadata = metadata 

97 

98 def __call__( 

99 self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs 

100 ): 

101 """Invoke the low-level RPC with retry, timeout, compression, and metadata.""" 

102 

103 if retry is DEFAULT: 

104 retry = self._retry 

105 

106 if timeout is DEFAULT: 

107 timeout = self._timeout 

108 

109 if compression is DEFAULT: 

110 compression = self._compression 

111 

112 if isinstance(timeout, (int, float)): 

113 timeout = TimeToDeadlineTimeout(timeout=timeout) 

114 

115 # Apply all applicable decorators. 

116 wrapped_func = _apply_decorators(self._target, [retry, timeout]) 

117 

118 # Add the user agent metadata to the call. 

119 if self._metadata is not None: 

120 metadata = kwargs.get("metadata", []) 

121 # Due to the nature of invocation, None should be treated the same 

122 # as not specified. 

123 if metadata is None: 

124 metadata = [] 

125 metadata = list(metadata) 

126 metadata.extend(self._metadata) 

127 kwargs["metadata"] = metadata 

128 if self._compression is not None: 

129 kwargs["compression"] = compression 

130 

131 return wrapped_func(*args, **kwargs) 

132 

133 

134def wrap_method( 

135 func, 

136 default_retry=None, 

137 default_timeout=None, 

138 default_compression=None, 

139 client_info=client_info.DEFAULT_CLIENT_INFO, 

140 *, 

141 with_call=False, 

142): 

143 """Wrap an RPC method with common behavior. 

144 

145 This applies common error wrapping, retry, timeout, and compression behavior to a function. 

146 The wrapped function will take optional ``retry``, ``timeout``, and ``compression`` 

147 arguments. 

148 

149 For example:: 

150 

151 import google.api_core.gapic_v1.method 

152 from google.api_core import retry 

153 from google.api_core import timeout 

154 from grpc import Compression 

155 

156 # The original RPC method. 

157 def get_topic(name, timeout=None): 

158 request = publisher_v2.GetTopicRequest(name=name) 

159 return publisher_stub.GetTopic(request, timeout=timeout) 

160 

161 default_retry = retry.Retry(deadline=60) 

162 default_timeout = timeout.Timeout(deadline=60) 

163 default_compression = Compression.NoCompression 

164 wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( 

165 get_topic, default_retry) 

166 

167 # Execute get_topic with default retry and timeout: 

168 response = wrapped_get_topic() 

169 

170 # Execute get_topic without doing any retying but with the default 

171 # timeout: 

172 response = wrapped_get_topic(retry=None) 

173 

174 # Execute get_topic but only retry on 5xx errors: 

175 my_retry = retry.Retry(retry.if_exception_type( 

176 exceptions.InternalServerError)) 

177 response = wrapped_get_topic(retry=my_retry) 

178 

179 The way this works is by late-wrapping the given function with the retry 

180 and timeout decorators. Essentially, when ``wrapped_get_topic()`` is 

181 called: 

182 

183 * ``get_topic()`` is first wrapped with the ``timeout`` into 

184 ``get_topic_with_timeout``. 

185 * ``get_topic_with_timeout`` is wrapped with the ``retry`` into 

186 ``get_topic_with_timeout_and_retry()``. 

187 * The final ``get_topic_with_timeout_and_retry`` is called passing through 

188 the ``args`` and ``kwargs``. 

189 

190 The callstack is therefore:: 

191 

192 method.__call__() -> 

193 Retry.__call__() -> 

194 Timeout.__call__() -> 

195 wrap_errors() -> 

196 get_topic() 

197 

198 Note that if ``timeout`` or ``retry`` is ``None``, then they are not 

199 applied to the function. For example, 

200 ``wrapped_get_topic(timeout=None, retry=None)`` is more or less 

201 equivalent to just calling ``get_topic`` but with error re-mapping. 

202 

203 Args: 

204 func (Callable[Any]): The function to wrap. It should accept an 

205 optional ``timeout`` argument. If ``metadata`` is not ``None``, it 

206 should accept a ``metadata`` argument. 

207 default_retry (Optional[google.api_core.Retry]): The default retry 

208 strategy. If ``None``, the method will not retry by default. 

209 default_timeout (Optional[google.api_core.Timeout]): The default 

210 timeout strategy. Can also be specified as an int or float. If 

211 ``None``, the method will not have timeout specified by default. 

212 default_compression (Optional[grpc.Compression]): The default 

213 grpc.Compression. If ``None``, the method will not have 

214 compression specified by default. 

215 client_info 

216 (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 

217 Client information used to create a user-agent string that's 

218 passed as gRPC metadata to the method. If unspecified, then 

219 a sane default will be used. If ``None``, then no user agent 

220 metadata will be provided to the RPC method. 

221 with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will 

222 return a tuple of (response, grpc.Call) instead of just the response. 

223 This is useful for extracting trailing metadata from unary calls. 

224 Defaults to False. 

225 

226 Returns: 

227 Callable: A new callable that takes optional ``retry``, ``timeout``, 

228 and ``compression`` 

229 arguments and applies the common error mapping, retry, timeout, compression, 

230 and metadata behavior to the low-level RPC method. 

231 """ 

232 if with_call: 

233 try: 

234 func = func.with_call 

235 except AttributeError as exc: 

236 raise ValueError( 

237 "with_call=True is only supported for unary calls." 

238 ) from exc 

239 func = grpc_helpers.wrap_errors(func) 

240 if client_info is not None: 

241 user_agent_metadata = [client_info.to_grpc_metadata()] 

242 else: 

243 user_agent_metadata = None 

244 

245 return functools.wraps(func)( 

246 _GapicCallable( 

247 func, 

248 default_retry, 

249 default_timeout, 

250 default_compression, 

251 metadata=user_agent_metadata, 

252 ) 

253 )