1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for wrapping low-level gRPC methods with common functionality.
16
17This is used by gapic clients to provide common error mapping, retry, timeout,
18compression, pagination, and long-running operations to gRPC methods.
19"""
20
21import enum
22import functools
23
24from google.api_core import grpc_helpers
25from google.api_core.gapic_v1 import client_info
26from google.api_core.timeout import TimeToDeadlineTimeout
27
28USE_DEFAULT_METADATA = object()
29
30
31class _MethodDefault(enum.Enum):
32 # Uses enum so that pytype/mypy knows that this is the only possible value.
33 # https://stackoverflow.com/a/60605919/101923
34 _DEFAULT_VALUE = object()
35
36
37DEFAULT = _MethodDefault._DEFAULT_VALUE
38"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
39so the default should be used."""
40
41
42def _is_not_none_or_false(value):
43 return value is not None and value is not False
44
45
46def _apply_decorators(func, decorators):
47 """Apply a list of decorators to a given function.
48
49 ``decorators`` may contain items that are ``None`` or ``False`` which will
50 be ignored.
51 """
52 filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))
53
54 for decorator in filtered_decorators:
55 func = decorator(func)
56
57 return func
58
59
60class _GapicCallable(object):
61 """Callable that applies retry, timeout, and metadata logic.
62
63 Args:
64 target (Callable): The low-level RPC method.
65 retry (google.api_core.retry.Retry): The default retry for the
66 callable. If ``None``, this callable will not retry by default
67 timeout (google.api_core.timeout.Timeout): The default timeout for the
68 callable (i.e. duration of time within which an RPC must terminate
69 after its start, not to be confused with deadline). If ``None``,
70 this callable will not specify a timeout argument to the low-level
71 RPC method.
72 compression (grpc.Compression): The default compression for the callable.
73 If ``None``, this callable will not specify a compression argument
74 to the low-level RPC method.
75 metadata (Sequence[Tuple[str, str]]): Additional metadata that is
76 provided to the RPC method on every invocation. This is merged with
77 any metadata specified during invocation. If ``None``, no
78 additional metadata will be passed to the RPC method.
79 """
80
81 def __init__(
82 self,
83 target,
84 retry,
85 timeout,
86 compression,
87 metadata=None,
88 ):
89 self._target = target
90 self._retry = retry
91 self._timeout = timeout
92 self._compression = compression
93 self._metadata = metadata
94
95 def __call__(
96 self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
97 ):
98 """Invoke the low-level RPC with retry, timeout, compression, and metadata."""
99
100 if retry is DEFAULT:
101 retry = self._retry
102
103 if timeout is DEFAULT:
104 timeout = self._timeout
105
106 if compression is DEFAULT:
107 compression = self._compression
108
109 if isinstance(timeout, (int, float)):
110 timeout = TimeToDeadlineTimeout(timeout=timeout)
111
112 # Apply all applicable decorators.
113 wrapped_func = _apply_decorators(self._target, [retry, timeout])
114
115 # Add the user agent metadata to the call.
116 if self._metadata is not None:
117 metadata = kwargs.get("metadata", [])
118 # Due to the nature of invocation, None should be treated the same
119 # as not specified.
120 if metadata is None:
121 metadata = []
122 metadata = list(metadata)
123 metadata.extend(self._metadata)
124 kwargs["metadata"] = metadata
125 if self._compression is not None:
126 kwargs["compression"] = compression
127
128 return wrapped_func(*args, **kwargs)
129
130
131def wrap_method(
132 func,
133 default_retry=None,
134 default_timeout=None,
135 default_compression=None,
136 client_info=client_info.DEFAULT_CLIENT_INFO,
137 *,
138 with_call=False,
139):
140 """Wrap an RPC method with common behavior.
141
142 This applies common error wrapping, retry, timeout, and compression behavior to a function.
143 The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
144 arguments.
145
146 For example::
147
148 import google.api_core.gapic_v1.method
149 from google.api_core import retry
150 from google.api_core import timeout
151 from grpc import Compression
152
153 # The original RPC method.
154 def get_topic(name, timeout=None):
155 request = publisher_v2.GetTopicRequest(name=name)
156 return publisher_stub.GetTopic(request, timeout=timeout)
157
158 default_retry = retry.Retry(deadline=60)
159 default_timeout = timeout.Timeout(deadline=60)
160 default_compression = Compression.NoCompression
161 wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
162 get_topic, default_retry)
163
164 # Execute get_topic with default retry and timeout:
165 response = wrapped_get_topic()
166
167 # Execute get_topic without doing any retying but with the default
168 # timeout:
169 response = wrapped_get_topic(retry=None)
170
171 # Execute get_topic but only retry on 5xx errors:
172 my_retry = retry.Retry(retry.if_exception_type(
173 exceptions.InternalServerError))
174 response = wrapped_get_topic(retry=my_retry)
175
176 The way this works is by late-wrapping the given function with the retry
177 and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
178 called:
179
180 * ``get_topic()`` is first wrapped with the ``timeout`` into
181 ``get_topic_with_timeout``.
182 * ``get_topic_with_timeout`` is wrapped with the ``retry`` into
183 ``get_topic_with_timeout_and_retry()``.
184 * The final ``get_topic_with_timeout_and_retry`` is called passing through
185 the ``args`` and ``kwargs``.
186
187 The callstack is therefore::
188
189 method.__call__() ->
190 Retry.__call__() ->
191 Timeout.__call__() ->
192 wrap_errors() ->
193 get_topic()
194
195 Note that if ``timeout`` or ``retry`` is ``None``, then they are not
196 applied to the function. For example,
197 ``wrapped_get_topic(timeout=None, retry=None)`` is more or less
198 equivalent to just calling ``get_topic`` but with error re-mapping.
199
200 Args:
201 func (Callable[Any]): The function to wrap. It should accept an
202 optional ``timeout`` argument. If ``metadata`` is not ``None``, it
203 should accept a ``metadata`` argument.
204 default_retry (Optional[google.api_core.Retry]): The default retry
205 strategy. If ``None``, the method will not retry by default.
206 default_timeout (Optional[google.api_core.Timeout]): The default
207 timeout strategy. Can also be specified as an int or float. If
208 ``None``, the method will not have timeout specified by default.
209 default_compression (Optional[grpc.Compression]): The default
210 grpc.Compression. If ``None``, the method will not have
211 compression specified by default.
212 client_info
213 (Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
214 Client information used to create a user-agent string that's
215 passed as gRPC metadata to the method. If unspecified, then
216 a sane default will be used. If ``None``, then no user agent
217 metadata will be provided to the RPC method.
218 with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will
219 return a tuple of (response, grpc.Call) instead of just the response.
220 This is useful for extracting trailing metadata from unary calls.
221 Defaults to False.
222
223 Returns:
224 Callable: A new callable that takes optional ``retry``, ``timeout``,
225 and ``compression``
226 arguments and applies the common error mapping, retry, timeout, compression,
227 and metadata behavior to the low-level RPC method.
228 """
229 if with_call:
230 try:
231 func = func.with_call
232 except AttributeError as exc:
233 raise ValueError(
234 "with_call=True is only supported for unary calls."
235 ) from exc
236 func = grpc_helpers.wrap_errors(func)
237 if client_info is not None:
238 user_agent_metadata = [client_info.to_grpc_metadata()]
239 else:
240 user_agent_metadata = None
241
242 return functools.wraps(func)(
243 _GapicCallable(
244 func,
245 default_retry,
246 default_timeout,
247 default_compression,
248 metadata=user_agent_metadata,
249 )
250 )