Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py: 47%
153 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for :mod:`grpc`."""
17import collections
18import functools
19import warnings
21import grpc
23from google.api_core import exceptions
24import google.auth
25import google.auth.credentials
26import google.auth.transport.grpc
27import google.auth.transport.requests
28import google.protobuf
30PROTOBUF_VERSION = google.protobuf.__version__
32# The grpcio-gcp package only has support for protobuf < 4
33if PROTOBUF_VERSION[0:2] == "3.": # pragma: NO COVER
34 try:
35 import grpc_gcp
37 warnings.warn(
38 """Support for grpcio-gcp is deprecated. This feature will be
39 removed from `google-api-core` after January 1, 2024. If you need to
40 continue to use this feature, please pin to a specific version of
41 `google-api-core`.""",
42 DeprecationWarning,
43 )
44 HAS_GRPC_GCP = True
45 except ImportError:
46 HAS_GRPC_GCP = False
47else:
48 HAS_GRPC_GCP = False
51# The list of gRPC Callable interfaces that return iterators.
52_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable)
55def _patch_callable_name(callable_):
56 """Fix-up gRPC callable attributes.
58 gRPC callable lack the ``__name__`` attribute which causes
59 :func:`functools.wraps` to error. This adds the attribute if needed.
60 """
61 if not hasattr(callable_, "__name__"):
62 callable_.__name__ = callable_.__class__.__name__
65def _wrap_unary_errors(callable_):
66 """Map errors for Unary-Unary and Stream-Unary gRPC callables."""
67 _patch_callable_name(callable_)
69 @functools.wraps(callable_)
70 def error_remapped_callable(*args, **kwargs):
71 try:
72 return callable_(*args, **kwargs)
73 except grpc.RpcError as exc:
74 raise exceptions.from_grpc_error(exc) from exc
76 return error_remapped_callable
79class _StreamingResponseIterator(grpc.Call):
80 def __init__(self, wrapped, prefetch_first_result=True):
81 self._wrapped = wrapped
83 # This iterator is used in a retry context, and returned outside after init.
84 # gRPC will not throw an exception until the stream is consumed, so we need
85 # to retrieve the first result, in order to fail, in order to trigger a retry.
86 try:
87 if prefetch_first_result:
88 self._stored_first_result = next(self._wrapped)
89 except TypeError:
90 # It is possible the wrapped method isn't an iterable (a grpc.Call
91 # for instance). If this happens don't store the first result.
92 pass
93 except StopIteration:
94 # ignore stop iteration at this time. This should be handled outside of retry.
95 pass
97 def __iter__(self):
98 """This iterator is also an iterable that returns itself."""
99 return self
101 def __next__(self):
102 """Get the next response from the stream.
104 Returns:
105 protobuf.Message: A single response from the stream.
106 """
107 try:
108 if hasattr(self, "_stored_first_result"):
109 result = self._stored_first_result
110 del self._stored_first_result
111 return result
112 return next(self._wrapped)
113 except grpc.RpcError as exc:
114 # If the stream has already returned data, we cannot recover here.
115 raise exceptions.from_grpc_error(exc) from exc
117 # grpc.Call & grpc.RpcContext interface
119 def add_callback(self, callback):
120 return self._wrapped.add_callback(callback)
122 def cancel(self):
123 return self._wrapped.cancel()
125 def code(self):
126 return self._wrapped.code()
128 def details(self):
129 return self._wrapped.details()
131 def initial_metadata(self):
132 return self._wrapped.initial_metadata()
134 def is_active(self):
135 return self._wrapped.is_active()
137 def time_remaining(self):
138 return self._wrapped.time_remaining()
140 def trailing_metadata(self):
141 return self._wrapped.trailing_metadata()
144def _wrap_stream_errors(callable_):
145 """Wrap errors for Unary-Stream and Stream-Stream gRPC callables.
147 The callables that return iterators require a bit more logic to re-map
148 errors when iterating. This wraps both the initial invocation and the
149 iterator of the return value to re-map errors.
150 """
151 _patch_callable_name(callable_)
153 @functools.wraps(callable_)
154 def error_remapped_callable(*args, **kwargs):
155 try:
156 result = callable_(*args, **kwargs)
157 # Auto-fetching the first result causes PubSub client's streaming pull
158 # to hang when re-opening the stream, thus we need examine the hacky
159 # hidden flag to see if pre-fetching is disabled.
160 # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
161 prefetch_first = getattr(callable_, "_prefetch_first_result_", True)
162 return _StreamingResponseIterator(
163 result, prefetch_first_result=prefetch_first
164 )
165 except grpc.RpcError as exc:
166 raise exceptions.from_grpc_error(exc) from exc
168 return error_remapped_callable
171def wrap_errors(callable_):
172 """Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error
173 classes.
175 Errors raised by the gRPC callable are mapped to the appropriate
176 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses.
177 The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
178 available from the ``response`` property on the mapped exception. This
179 is useful for extracting metadata from the original error.
181 Args:
182 callable_ (Callable): A gRPC callable.
184 Returns:
185 Callable: The wrapped gRPC callable.
186 """
187 if isinstance(callable_, _STREAM_WRAP_CLASSES):
188 return _wrap_stream_errors(callable_)
189 else:
190 return _wrap_unary_errors(callable_)
193def _create_composite_credentials(
194 credentials=None,
195 credentials_file=None,
196 default_scopes=None,
197 scopes=None,
198 ssl_credentials=None,
199 quota_project_id=None,
200 default_host=None,
201):
202 """Create the composite credentials for secure channels.
204 Args:
205 credentials (google.auth.credentials.Credentials): The credentials. If
206 not specified, then this function will attempt to ascertain the
207 credentials from the environment using :func:`google.auth.default`.
208 credentials_file (str): A file with credentials that can be loaded with
209 :func:`google.auth.load_credentials_from_file`. This argument is
210 mutually exclusive with credentials.
211 default_scopes (Sequence[str]): A optional list of scopes needed for this
212 service. These are only used when credentials are not specified and
213 are passed to :func:`google.auth.default`.
214 scopes (Sequence[str]): A optional list of scopes needed for this
215 service. These are only used when credentials are not specified and
216 are passed to :func:`google.auth.default`.
217 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
218 credentials. This can be used to specify different certificates.
219 quota_project_id (str): An optional project to use for billing and quota.
220 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
222 Returns:
223 grpc.ChannelCredentials: The composed channel credentials object.
225 Raises:
226 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed.
227 """
228 if credentials and credentials_file:
229 raise exceptions.DuplicateCredentialArgs(
230 "'credentials' and 'credentials_file' are mutually exclusive."
231 )
233 if credentials_file:
234 credentials, _ = google.auth.load_credentials_from_file(
235 credentials_file, scopes=scopes, default_scopes=default_scopes
236 )
237 elif credentials:
238 credentials = google.auth.credentials.with_scopes_if_required(
239 credentials, scopes=scopes, default_scopes=default_scopes
240 )
241 else:
242 credentials, _ = google.auth.default(
243 scopes=scopes, default_scopes=default_scopes
244 )
246 if quota_project_id and isinstance(
247 credentials, google.auth.credentials.CredentialsWithQuotaProject
248 ):
249 credentials = credentials.with_quota_project(quota_project_id)
251 request = google.auth.transport.requests.Request()
253 # Create the metadata plugin for inserting the authorization header.
254 metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin(
255 credentials,
256 request,
257 default_host=default_host,
258 )
260 # Create a set of grpc.CallCredentials using the metadata plugin.
261 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
263 if ssl_credentials is None:
264 ssl_credentials = grpc.ssl_channel_credentials()
266 # Combine the ssl credentials and the authorization credentials.
267 return grpc.composite_channel_credentials(ssl_credentials, google_auth_credentials)
270def create_channel(
271 target,
272 credentials=None,
273 scopes=None,
274 ssl_credentials=None,
275 credentials_file=None,
276 quota_project_id=None,
277 default_scopes=None,
278 default_host=None,
279 **kwargs
280):
281 """Create a secure channel with credentials.
283 Args:
284 target (str): The target service address in the format 'hostname:port'.
285 credentials (google.auth.credentials.Credentials): The credentials. If
286 not specified, then this function will attempt to ascertain the
287 credentials from the environment using :func:`google.auth.default`.
288 scopes (Sequence[str]): A optional list of scopes needed for this
289 service. These are only used when credentials are not specified and
290 are passed to :func:`google.auth.default`.
291 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
292 credentials. This can be used to specify different certificates.
293 credentials_file (str): A file with credentials that can be loaded with
294 :func:`google.auth.load_credentials_from_file`. This argument is
295 mutually exclusive with credentials.
296 quota_project_id (str): An optional project to use for billing and quota.
297 default_scopes (Sequence[str]): Default scopes passed by a Google client
298 library. Use 'scopes' for user-defined scopes.
299 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
300 kwargs: Additional key-word args passed to
301 :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
302 Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0.
304 Returns:
305 grpc.Channel: The created channel.
307 Raises:
308 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed.
309 """
311 composite_credentials = _create_composite_credentials(
312 credentials=credentials,
313 credentials_file=credentials_file,
314 default_scopes=default_scopes,
315 scopes=scopes,
316 ssl_credentials=ssl_credentials,
317 quota_project_id=quota_project_id,
318 default_host=default_host,
319 )
321 if HAS_GRPC_GCP: # pragma: NO COVER
322 return grpc_gcp.secure_channel(target, composite_credentials, **kwargs)
323 return grpc.secure_channel(target, composite_credentials, **kwargs)
326_MethodCall = collections.namedtuple(
327 "_MethodCall", ("request", "timeout", "metadata", "credentials")
328)
330_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request"))
333class _CallableStub(object):
334 """Stub for the grpc.*MultiCallable interfaces."""
336 def __init__(self, method, channel):
337 self._method = method
338 self._channel = channel
339 self.response = None
340 """Union[protobuf.Message, Callable[protobuf.Message], exception]:
341 The response to give when invoking this callable. If this is a
342 callable, it will be invoked with the request protobuf. If it's an
343 exception, the exception will be raised when this is invoked.
344 """
345 self.responses = None
346 """Iterator[
347 Union[protobuf.Message, Callable[protobuf.Message], exception]]:
348 An iterator of responses. If specified, self.response will be populated
349 on each invocation by calling ``next(self.responses)``."""
350 self.requests = []
351 """List[protobuf.Message]: All requests sent to this callable."""
352 self.calls = []
353 """List[Tuple]: All invocations of this callable. Each tuple is the
354 request, timeout, metadata, and credentials."""
356 def __call__(self, request, timeout=None, metadata=None, credentials=None):
357 self._channel.requests.append(_ChannelRequest(self._method, request))
358 self.calls.append(_MethodCall(request, timeout, metadata, credentials))
359 self.requests.append(request)
361 response = self.response
362 if self.responses is not None:
363 if response is None:
364 response = next(self.responses)
365 else:
366 raise ValueError(
367 "{method}.response and {method}.responses are mutually "
368 "exclusive.".format(method=self._method)
369 )
371 if callable(response):
372 return response(request)
374 if isinstance(response, Exception):
375 raise response
377 if response is not None:
378 return response
380 raise ValueError('Method stub for "{}" has no response.'.format(self._method))
383def _simplify_method_name(method):
384 """Simplifies a gRPC method name.
386 When gRPC invokes the channel to create a callable, it gives a full
387 method name like "/google.pubsub.v1.Publisher/CreateTopic". This
388 returns just the name of the method, in this case "CreateTopic".
390 Args:
391 method (str): The name of the method.
393 Returns:
394 str: The simplified name of the method.
395 """
396 return method.rsplit("/", 1).pop()
399class ChannelStub(grpc.Channel):
400 """A testing stub for the grpc.Channel interface.
402 This can be used to test any client that eventually uses a gRPC channel
403 to communicate. By passing in a channel stub, you can configure which
404 responses are returned and track which requests are made.
406 For example:
408 .. code-block:: python
410 channel_stub = grpc_helpers.ChannelStub()
411 client = FooClient(channel=channel_stub)
413 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar')
415 foo = client.get_foo(labels=['baz'])
417 assert foo.name == 'bar'
418 assert channel_stub.GetFoo.requests[0].labels = ['baz']
420 Each method on the stub can be accessed and configured on the channel.
421 Here's some examples of various configurations:
423 .. code-block:: python
425 # Return a basic response:
427 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar')
428 assert client.get_foo().name == 'bar'
430 # Raise an exception:
431 channel_stub.GetFoo.response = NotFound('...')
433 with pytest.raises(NotFound):
434 client.get_foo()
436 # Use a sequence of responses:
437 channel_stub.GetFoo.responses = iter([
438 foo_pb2.Foo(name='bar'),
439 foo_pb2.Foo(name='baz'),
440 ])
442 assert client.get_foo().name == 'bar'
443 assert client.get_foo().name == 'baz'
445 # Use a callable
447 def on_get_foo(request):
448 return foo_pb2.Foo(name='bar' + request.id)
450 channel_stub.GetFoo.response = on_get_foo
452 assert client.get_foo(id='123').name == 'bar123'
453 """
455 def __init__(self, responses=[]):
456 self.requests = []
457 """Sequence[Tuple[str, protobuf.Message]]: A list of all requests made
458 on this channel in order. The tuple is of method name, request
459 message."""
460 self._method_stubs = {}
462 def _stub_for_method(self, method):
463 method = _simplify_method_name(method)
464 self._method_stubs[method] = _CallableStub(method, self)
465 return self._method_stubs[method]
467 def __getattr__(self, key):
468 try:
469 return self._method_stubs[key]
470 except KeyError:
471 raise AttributeError
473 def unary_unary(self, method, request_serializer=None, response_deserializer=None):
474 """grpc.Channel.unary_unary implementation."""
475 return self._stub_for_method(method)
477 def unary_stream(self, method, request_serializer=None, response_deserializer=None):
478 """grpc.Channel.unary_stream implementation."""
479 return self._stub_for_method(method)
481 def stream_unary(self, method, request_serializer=None, response_deserializer=None):
482 """grpc.Channel.stream_unary implementation."""
483 return self._stub_for_method(method)
485 def stream_stream(
486 self, method, request_serializer=None, response_deserializer=None
487 ):
488 """grpc.Channel.stream_stream implementation."""
489 return self._stub_for_method(method)
491 def subscribe(self, callback, try_to_connect=False):
492 """grpc.Channel.subscribe implementation."""
493 pass
495 def unsubscribe(self, callback):
496 """grpc.Channel.unsubscribe implementation."""
497 pass
499 def close(self):
500 """grpc.Channel.close implementation."""
501 pass