1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for wrapping low-level gRPC methods with common functionality.
16
17This is used by gapic clients to provide common error mapping, retry, timeout,
18compression, pagination, and long-running operations to gRPC methods.
19"""
20
21import enum
22import functools
23
24from google.api_core import grpc_helpers
25from google.api_core.gapic_v1 import client_info
26from google.api_core.timeout import TimeToDeadlineTimeout
27
28USE_DEFAULT_METADATA = object()
29
30
31class _MethodDefault(enum.Enum):
32 # Uses enum so that pytype/mypy knows that this is the only possible value.
33 # https://stackoverflow.com/a/60605919/101923
34 #
35 # Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8.
36 # https://docs.python.org/3/library/typing.html#typing.Literal
37 _DEFAULT_VALUE = object()
38
39
40DEFAULT = _MethodDefault._DEFAULT_VALUE
41"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
42so the default should be used."""
43
44
45def _is_not_none_or_false(value):
46 return value is not None and value is not False
47
48
49def _apply_decorators(func, decorators):
50 """Apply a list of decorators to a given function.
51
52 ``decorators`` may contain items that are ``None`` or ``False`` which will
53 be ignored.
54 """
55 filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))
56
57 for decorator in filtered_decorators:
58 func = decorator(func)
59
60 return func
61
62
63class _GapicCallable(object):
64 """Callable that applies retry, timeout, and metadata logic.
65
66 Args:
67 target (Callable): The low-level RPC method.
68 retry (google.api_core.retry.Retry): The default retry for the
69 callable. If ``None``, this callable will not retry by default
70 timeout (google.api_core.timeout.Timeout): The default timeout for the
71 callable (i.e. duration of time within which an RPC must terminate
72 after its start, not to be confused with deadline). If ``None``,
73 this callable will not specify a timeout argument to the low-level
74 RPC method.
75 compression (grpc.Compression): The default compression for the callable.
76 If ``None``, this callable will not specify a compression argument
77 to the low-level RPC method.
78 metadata (Sequence[Tuple[str, str]]): Additional metadata that is
79 provided to the RPC method on every invocation. This is merged with
80 any metadata specified during invocation. If ``None``, no
81 additional metadata will be passed to the RPC method.
82 """
83
84 def __init__(
85 self,
86 target,
87 retry,
88 timeout,
89 compression,
90 metadata=None,
91 ):
92 self._target = target
93 self._retry = retry
94 self._timeout = timeout
95 self._compression = compression
96 self._metadata = metadata
97
98 def __call__(
99 self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
100 ):
101 """Invoke the low-level RPC with retry, timeout, compression, and metadata."""
102
103 if retry is DEFAULT:
104 retry = self._retry
105
106 if timeout is DEFAULT:
107 timeout = self._timeout
108
109 if compression is DEFAULT:
110 compression = self._compression
111
112 if isinstance(timeout, (int, float)):
113 timeout = TimeToDeadlineTimeout(timeout=timeout)
114
115 # Apply all applicable decorators.
116 wrapped_func = _apply_decorators(self._target, [retry, timeout])
117
118 # Add the user agent metadata to the call.
119 if self._metadata is not None:
120 metadata = kwargs.get("metadata", [])
121 # Due to the nature of invocation, None should be treated the same
122 # as not specified.
123 if metadata is None:
124 metadata = []
125 metadata = list(metadata)
126 metadata.extend(self._metadata)
127 kwargs["metadata"] = metadata
128 if self._compression is not None:
129 kwargs["compression"] = compression
130
131 return wrapped_func(*args, **kwargs)
132
133
134def wrap_method(
135 func,
136 default_retry=None,
137 default_timeout=None,
138 default_compression=None,
139 client_info=client_info.DEFAULT_CLIENT_INFO,
140 *,
141 with_call=False,
142):
143 """Wrap an RPC method with common behavior.
144
145 This applies common error wrapping, retry, timeout, and compression behavior to a function.
146 The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
147 arguments.
148
149 For example::
150
151 import google.api_core.gapic_v1.method
152 from google.api_core import retry
153 from google.api_core import timeout
154 from grpc import Compression
155
156 # The original RPC method.
157 def get_topic(name, timeout=None):
158 request = publisher_v2.GetTopicRequest(name=name)
159 return publisher_stub.GetTopic(request, timeout=timeout)
160
161 default_retry = retry.Retry(deadline=60)
162 default_timeout = timeout.Timeout(deadline=60)
163 default_compression = Compression.NoCompression
164 wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
165 get_topic, default_retry)
166
167 # Execute get_topic with default retry and timeout:
168 response = wrapped_get_topic()
169
170 # Execute get_topic without doing any retying but with the default
171 # timeout:
172 response = wrapped_get_topic(retry=None)
173
174 # Execute get_topic but only retry on 5xx errors:
175 my_retry = retry.Retry(retry.if_exception_type(
176 exceptions.InternalServerError))
177 response = wrapped_get_topic(retry=my_retry)
178
179 The way this works is by late-wrapping the given function with the retry
180 and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
181 called:
182
183 * ``get_topic()`` is first wrapped with the ``timeout`` into
184 ``get_topic_with_timeout``.
185 * ``get_topic_with_timeout`` is wrapped with the ``retry`` into
186 ``get_topic_with_timeout_and_retry()``.
187 * The final ``get_topic_with_timeout_and_retry`` is called passing through
188 the ``args`` and ``kwargs``.
189
190 The callstack is therefore::
191
192 method.__call__() ->
193 Retry.__call__() ->
194 Timeout.__call__() ->
195 wrap_errors() ->
196 get_topic()
197
198 Note that if ``timeout`` or ``retry`` is ``None``, then they are not
199 applied to the function. For example,
200 ``wrapped_get_topic(timeout=None, retry=None)`` is more or less
201 equivalent to just calling ``get_topic`` but with error re-mapping.
202
203 Args:
204 func (Callable[Any]): The function to wrap. It should accept an
205 optional ``timeout`` argument. If ``metadata`` is not ``None``, it
206 should accept a ``metadata`` argument.
207 default_retry (Optional[google.api_core.Retry]): The default retry
208 strategy. If ``None``, the method will not retry by default.
209 default_timeout (Optional[google.api_core.Timeout]): The default
210 timeout strategy. Can also be specified as an int or float. If
211 ``None``, the method will not have timeout specified by default.
212 default_compression (Optional[grpc.Compression]): The default
213 grpc.Compression. If ``None``, the method will not have
214 compression specified by default.
215 client_info
216 (Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
217 Client information used to create a user-agent string that's
218 passed as gRPC metadata to the method. If unspecified, then
219 a sane default will be used. If ``None``, then no user agent
220 metadata will be provided to the RPC method.
221 with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will
222 return a tuple of (response, grpc.Call) instead of just the response.
223 This is useful for extracting trailing metadata from unary calls.
224 Defaults to False.
225
226 Returns:
227 Callable: A new callable that takes optional ``retry``, ``timeout``,
228 and ``compression``
229 arguments and applies the common error mapping, retry, timeout, compression,
230 and metadata behavior to the low-level RPC method.
231 """
232 if with_call:
233 try:
234 func = func.with_call
235 except AttributeError as exc:
236 raise ValueError(
237 "with_call=True is only supported for unary calls."
238 ) from exc
239 func = grpc_helpers.wrap_errors(func)
240 if client_info is not None:
241 user_agent_metadata = [client_info.to_grpc_metadata()]
242 else:
243 user_agent_metadata = None
244
245 return functools.wraps(func)(
246 _GapicCallable(
247 func,
248 default_retry,
249 default_timeout,
250 default_compression,
251 metadata=user_agent_metadata,
252 )
253 )