Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py: 36%
45 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for wrapping low-level gRPC methods with common functionality.
17This is used by gapic clients to provide common error mapping, retry, timeout,
18pagination, and long-running operations to gRPC methods.
19"""
21import enum
22import functools
24from google.api_core import grpc_helpers
25from google.api_core.gapic_v1 import client_info
26from google.api_core.timeout import TimeToDeadlineTimeout
28USE_DEFAULT_METADATA = object()
31class _MethodDefault(enum.Enum):
32 # Uses enum so that pytype/mypy knows that this is the only possible value.
33 # https://stackoverflow.com/a/60605919/101923
34 #
35 # Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8.
36 # https://docs.python.org/3/library/typing.html#typing.Literal
37 _DEFAULT_VALUE = object()
40DEFAULT = _MethodDefault._DEFAULT_VALUE
41"""Sentinel value indicating that a retry or timeout argument was unspecified,
42so the default should be used."""
45def _is_not_none_or_false(value):
46 return value is not None and value is not False
49def _apply_decorators(func, decorators):
50 """Apply a list of decorators to a given function.
52 ``decorators`` may contain items that are ``None`` or ``False`` which will
53 be ignored.
54 """
55 filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))
57 for decorator in filtered_decorators:
58 func = decorator(func)
60 return func
63class _GapicCallable(object):
64 """Callable that applies retry, timeout, and metadata logic.
66 Args:
67 target (Callable): The low-level RPC method.
68 retry (google.api_core.retry.Retry): The default retry for the
69 callable. If ``None``, this callable will not retry by default
70 timeout (google.api_core.timeout.Timeout): The default timeout for the
71 callable (i.e. duration of time within which an RPC must terminate
72 after its start, not to be confused with deadline). If ``None``,
73 this callable will not specify a timeout argument to the low-level
74 RPC method.
75 metadata (Sequence[Tuple[str, str]]): Additional metadata that is
76 provided to the RPC method on every invocation. This is merged with
77 any metadata specified during invocation. If ``None``, no
78 additional metadata will be passed to the RPC method.
79 """
81 def __init__(self, target, retry, timeout, metadata=None):
82 self._target = target
83 self._retry = retry
84 self._timeout = timeout
85 self._metadata = metadata
87 def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs):
88 """Invoke the low-level RPC with retry, timeout, and metadata."""
90 if retry is DEFAULT:
91 retry = self._retry
93 if timeout is DEFAULT:
94 timeout = self._timeout
96 if isinstance(timeout, (int, float)):
97 timeout = TimeToDeadlineTimeout(timeout=timeout)
99 # Apply all applicable decorators.
100 wrapped_func = _apply_decorators(self._target, [retry, timeout])
102 # Add the user agent metadata to the call.
103 if self._metadata is not None:
104 metadata = kwargs.get("metadata", [])
105 # Due to the nature of invocation, None should be treated the same
106 # as not specified.
107 if metadata is None:
108 metadata = []
109 metadata = list(metadata)
110 metadata.extend(self._metadata)
111 kwargs["metadata"] = metadata
113 return wrapped_func(*args, **kwargs)
116def wrap_method(
117 func,
118 default_retry=None,
119 default_timeout=None,
120 client_info=client_info.DEFAULT_CLIENT_INFO,
121):
122 """Wrap an RPC method with common behavior.
124 This applies common error wrapping, retry, and timeout behavior a function.
125 The wrapped function will take optional ``retry`` and ``timeout``
126 arguments.
128 For example::
130 import google.api_core.gapic_v1.method
131 from google.api_core import retry
132 from google.api_core import timeout
134 # The original RPC method.
135 def get_topic(name, timeout=None):
136 request = publisher_v2.GetTopicRequest(name=name)
137 return publisher_stub.GetTopic(request, timeout=timeout)
139 default_retry = retry.Retry(deadline=60)
140 default_timeout = timeout.Timeout(deadline=60)
141 wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
142 get_topic, default_retry)
144 # Execute get_topic with default retry and timeout:
145 response = wrapped_get_topic()
147 # Execute get_topic without doing any retying but with the default
148 # timeout:
149 response = wrapped_get_topic(retry=None)
151 # Execute get_topic but only retry on 5xx errors:
152 my_retry = retry.Retry(retry.if_exception_type(
153 exceptions.InternalServerError))
154 response = wrapped_get_topic(retry=my_retry)
156 The way this works is by late-wrapping the given function with the retry
157 and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
158 called:
160 * ``get_topic()`` is first wrapped with the ``timeout`` into
161 ``get_topic_with_timeout``.
162 * ``get_topic_with_timeout`` is wrapped with the ``retry`` into
163 ``get_topic_with_timeout_and_retry()``.
164 * The final ``get_topic_with_timeout_and_retry`` is called passing through
165 the ``args`` and ``kwargs``.
167 The callstack is therefore::
169 method.__call__() ->
170 Retry.__call__() ->
171 Timeout.__call__() ->
172 wrap_errors() ->
173 get_topic()
175 Note that if ``timeout`` or ``retry`` is ``None``, then they are not
176 applied to the function. For example,
177 ``wrapped_get_topic(timeout=None, retry=None)`` is more or less
178 equivalent to just calling ``get_topic`` but with error re-mapping.
180 Args:
181 func (Callable[Any]): The function to wrap. It should accept an
182 optional ``timeout`` argument. If ``metadata`` is not ``None``, it
183 should accept a ``metadata`` argument.
184 default_retry (Optional[google.api_core.Retry]): The default retry
185 strategy. If ``None``, the method will not retry by default.
186 default_timeout (Optional[google.api_core.Timeout]): The default
187 timeout strategy. Can also be specified as an int or float. If
188 ``None``, the method will not have timeout specified by default.
189 client_info
190 (Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
191 Client information used to create a user-agent string that's
192 passed as gRPC metadata to the method. If unspecified, then
193 a sane default will be used. If ``None``, then no user agent
194 metadata will be provided to the RPC method.
196 Returns:
197 Callable: A new callable that takes optional ``retry`` and ``timeout``
198 arguments and applies the common error mapping, retry, timeout,
199 and metadata behavior to the low-level RPC method.
200 """
201 func = grpc_helpers.wrap_errors(func)
203 if client_info is not None:
204 user_agent_metadata = [client_info.to_grpc_metadata()]
205 else:
206 user_agent_metadata = None
208 return functools.wraps(func)(
209 _GapicCallable(
210 func, default_retry, default_timeout, metadata=user_agent_metadata
211 )
212 )