Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py: 34%
158 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for :mod:`grpc`."""
16from typing import Generic, TypeVar, Iterator
18import collections
19import functools
20import logging
21import warnings
23import grpc
25from google.api_core import exceptions
26import google.auth
27import google.auth.credentials
28import google.auth.transport.grpc
29import google.auth.transport.requests
30import google.protobuf
32PROTOBUF_VERSION = google.protobuf.__version__
34# The grpcio-gcp package only has support for protobuf < 4
35if PROTOBUF_VERSION[0:2] == "3.": # pragma: NO COVER
36 try:
37 import grpc_gcp
39 warnings.warn(
40 """Support for grpcio-gcp is deprecated. This feature will be
41 removed from `google-api-core` after January 1, 2024. If you need to
42 continue to use this feature, please pin to a specific version of
43 `google-api-core`.""",
44 DeprecationWarning,
45 )
46 HAS_GRPC_GCP = True
47 except ImportError:
48 HAS_GRPC_GCP = False
49else:
50 HAS_GRPC_GCP = False
53# The list of gRPC Callable interfaces that return iterators.
54_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable)
56_LOGGER = logging.getLogger(__name__)
58# denotes the proto response type for grpc calls
59P = TypeVar("P")
62def _patch_callable_name(callable_):
63 """Fix-up gRPC callable attributes.
65 gRPC callable lack the ``__name__`` attribute which causes
66 :func:`functools.wraps` to error. This adds the attribute if needed.
67 """
68 if not hasattr(callable_, "__name__"):
69 callable_.__name__ = callable_.__class__.__name__
72def _wrap_unary_errors(callable_):
73 """Map errors for Unary-Unary and Stream-Unary gRPC callables."""
74 _patch_callable_name(callable_)
76 @functools.wraps(callable_)
77 def error_remapped_callable(*args, **kwargs):
78 try:
79 return callable_(*args, **kwargs)
80 except grpc.RpcError as exc:
81 raise exceptions.from_grpc_error(exc) from exc
83 return error_remapped_callable
86class _StreamingResponseIterator(Generic[P], grpc.Call):
87 def __init__(self, wrapped, prefetch_first_result=True):
88 self._wrapped = wrapped
90 # This iterator is used in a retry context, and returned outside after init.
91 # gRPC will not throw an exception until the stream is consumed, so we need
92 # to retrieve the first result, in order to fail, in order to trigger a retry.
93 try:
94 if prefetch_first_result:
95 self._stored_first_result = next(self._wrapped)
96 except TypeError:
97 # It is possible the wrapped method isn't an iterable (a grpc.Call
98 # for instance). If this happens don't store the first result.
99 pass
100 except StopIteration:
101 # ignore stop iteration at this time. This should be handled outside of retry.
102 pass
104 def __iter__(self) -> Iterator[P]:
105 """This iterator is also an iterable that returns itself."""
106 return self
108 def __next__(self) -> P:
109 """Get the next response from the stream.
111 Returns:
112 protobuf.Message: A single response from the stream.
113 """
114 try:
115 if hasattr(self, "_stored_first_result"):
116 result = self._stored_first_result
117 del self._stored_first_result
118 return result
119 return next(self._wrapped)
120 except grpc.RpcError as exc:
121 # If the stream has already returned data, we cannot recover here.
122 raise exceptions.from_grpc_error(exc) from exc
124 # grpc.Call & grpc.RpcContext interface
126 def add_callback(self, callback):
127 return self._wrapped.add_callback(callback)
129 def cancel(self):
130 return self._wrapped.cancel()
132 def code(self):
133 return self._wrapped.code()
135 def details(self):
136 return self._wrapped.details()
138 def initial_metadata(self):
139 return self._wrapped.initial_metadata()
141 def is_active(self):
142 return self._wrapped.is_active()
144 def time_remaining(self):
145 return self._wrapped.time_remaining()
147 def trailing_metadata(self):
148 return self._wrapped.trailing_metadata()
151# public type alias denoting the return type of streaming gapic calls
152GrpcStream = _StreamingResponseIterator[P]
155def _wrap_stream_errors(callable_):
156 """Wrap errors for Unary-Stream and Stream-Stream gRPC callables.
158 The callables that return iterators require a bit more logic to re-map
159 errors when iterating. This wraps both the initial invocation and the
160 iterator of the return value to re-map errors.
161 """
162 _patch_callable_name(callable_)
164 @functools.wraps(callable_)
165 def error_remapped_callable(*args, **kwargs):
166 try:
167 result = callable_(*args, **kwargs)
168 # Auto-fetching the first result causes PubSub client's streaming pull
169 # to hang when re-opening the stream, thus we need examine the hacky
170 # hidden flag to see if pre-fetching is disabled.
171 # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
172 prefetch_first = getattr(callable_, "_prefetch_first_result_", True)
173 return _StreamingResponseIterator(
174 result, prefetch_first_result=prefetch_first
175 )
176 except grpc.RpcError as exc:
177 raise exceptions.from_grpc_error(exc) from exc
179 return error_remapped_callable
182def wrap_errors(callable_):
183 """Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error
184 classes.
186 Errors raised by the gRPC callable are mapped to the appropriate
187 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses.
188 The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
189 available from the ``response`` property on the mapped exception. This
190 is useful for extracting metadata from the original error.
192 Args:
193 callable_ (Callable): A gRPC callable.
195 Returns:
196 Callable: The wrapped gRPC callable.
197 """
198 if isinstance(callable_, _STREAM_WRAP_CLASSES):
199 return _wrap_stream_errors(callable_)
200 else:
201 return _wrap_unary_errors(callable_)
204def _create_composite_credentials(
205 credentials=None,
206 credentials_file=None,
207 default_scopes=None,
208 scopes=None,
209 ssl_credentials=None,
210 quota_project_id=None,
211 default_host=None,
212):
213 """Create the composite credentials for secure channels.
215 Args:
216 credentials (google.auth.credentials.Credentials): The credentials. If
217 not specified, then this function will attempt to ascertain the
218 credentials from the environment using :func:`google.auth.default`.
219 credentials_file (str): A file with credentials that can be loaded with
220 :func:`google.auth.load_credentials_from_file`. This argument is
221 mutually exclusive with credentials.
222 default_scopes (Sequence[str]): A optional list of scopes needed for this
223 service. These are only used when credentials are not specified and
224 are passed to :func:`google.auth.default`.
225 scopes (Sequence[str]): A optional list of scopes needed for this
226 service. These are only used when credentials are not specified and
227 are passed to :func:`google.auth.default`.
228 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
229 credentials. This can be used to specify different certificates.
230 quota_project_id (str): An optional project to use for billing and quota.
231 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
233 Returns:
234 grpc.ChannelCredentials: The composed channel credentials object.
236 Raises:
237 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed.
238 """
239 if credentials and credentials_file:
240 raise exceptions.DuplicateCredentialArgs(
241 "'credentials' and 'credentials_file' are mutually exclusive."
242 )
244 if credentials_file:
245 credentials, _ = google.auth.load_credentials_from_file(
246 credentials_file, scopes=scopes, default_scopes=default_scopes
247 )
248 elif credentials:
249 credentials = google.auth.credentials.with_scopes_if_required(
250 credentials, scopes=scopes, default_scopes=default_scopes
251 )
252 else:
253 credentials, _ = google.auth.default(
254 scopes=scopes, default_scopes=default_scopes
255 )
257 if quota_project_id and isinstance(
258 credentials, google.auth.credentials.CredentialsWithQuotaProject
259 ):
260 credentials = credentials.with_quota_project(quota_project_id)
262 request = google.auth.transport.requests.Request()
264 # Create the metadata plugin for inserting the authorization header.
265 metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin(
266 credentials,
267 request,
268 default_host=default_host,
269 )
271 # Create a set of grpc.CallCredentials using the metadata plugin.
272 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
274 if ssl_credentials is None:
275 ssl_credentials = grpc.ssl_channel_credentials()
277 # Combine the ssl credentials and the authorization credentials.
278 return grpc.composite_channel_credentials(ssl_credentials, google_auth_credentials)
281def create_channel(
282 target,
283 credentials=None,
284 scopes=None,
285 ssl_credentials=None,
286 credentials_file=None,
287 quota_project_id=None,
288 default_scopes=None,
289 default_host=None,
290 compression=None,
291 **kwargs,
292):
293 """Create a secure channel with credentials.
295 Args:
296 target (str): The target service address in the format 'hostname:port'.
297 credentials (google.auth.credentials.Credentials): The credentials. If
298 not specified, then this function will attempt to ascertain the
299 credentials from the environment using :func:`google.auth.default`.
300 scopes (Sequence[str]): A optional list of scopes needed for this
301 service. These are only used when credentials are not specified and
302 are passed to :func:`google.auth.default`.
303 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
304 credentials. This can be used to specify different certificates.
305 credentials_file (str): A file with credentials that can be loaded with
306 :func:`google.auth.load_credentials_from_file`. This argument is
307 mutually exclusive with credentials.
308 quota_project_id (str): An optional project to use for billing and quota.
309 default_scopes (Sequence[str]): Default scopes passed by a Google client
310 library. Use 'scopes' for user-defined scopes.
311 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
312 compression (grpc.Compression): An optional value indicating the
313 compression method to be used over the lifetime of the channel.
314 kwargs: Additional key-word args passed to
315 :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
316 Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0.
318 Returns:
319 grpc.Channel: The created channel.
321 Raises:
322 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed.
323 """
325 composite_credentials = _create_composite_credentials(
326 credentials=credentials,
327 credentials_file=credentials_file,
328 default_scopes=default_scopes,
329 scopes=scopes,
330 ssl_credentials=ssl_credentials,
331 quota_project_id=quota_project_id,
332 default_host=default_host,
333 )
335 if HAS_GRPC_GCP: # pragma: NO COVER
336 if compression is not None and compression != grpc.Compression.NoCompression:
337 _LOGGER.debug(
338 "Compression argument is being ignored for grpc_gcp.secure_channel creation."
339 )
340 return grpc_gcp.secure_channel(target, composite_credentials, **kwargs)
341 return grpc.secure_channel(
342 target, composite_credentials, compression=compression, **kwargs
343 )
346_MethodCall = collections.namedtuple(
347 "_MethodCall", ("request", "timeout", "metadata", "credentials", "compression")
348)
350_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request"))
353class _CallableStub(object):
354 """Stub for the grpc.*MultiCallable interfaces."""
356 def __init__(self, method, channel):
357 self._method = method
358 self._channel = channel
359 self.response = None
360 """Union[protobuf.Message, Callable[protobuf.Message], exception]:
361 The response to give when invoking this callable. If this is a
362 callable, it will be invoked with the request protobuf. If it's an
363 exception, the exception will be raised when this is invoked.
364 """
365 self.responses = None
366 """Iterator[
367 Union[protobuf.Message, Callable[protobuf.Message], exception]]:
368 An iterator of responses. If specified, self.response will be populated
369 on each invocation by calling ``next(self.responses)``."""
370 self.requests = []
371 """List[protobuf.Message]: All requests sent to this callable."""
372 self.calls = []
373 """List[Tuple]: All invocations of this callable. Each tuple is the
374 request, timeout, metadata, compression, and credentials."""
376 def __call__(
377 self, request, timeout=None, metadata=None, credentials=None, compression=None
378 ):
379 self._channel.requests.append(_ChannelRequest(self._method, request))
380 self.calls.append(
381 _MethodCall(request, timeout, metadata, credentials, compression)
382 )
383 self.requests.append(request)
385 response = self.response
386 if self.responses is not None:
387 if response is None:
388 response = next(self.responses)
389 else:
390 raise ValueError(
391 "{method}.response and {method}.responses are mutually "
392 "exclusive.".format(method=self._method)
393 )
395 if callable(response):
396 return response(request)
398 if isinstance(response, Exception):
399 raise response
401 if response is not None:
402 return response
404 raise ValueError('Method stub for "{}" has no response.'.format(self._method))
407def _simplify_method_name(method):
408 """Simplifies a gRPC method name.
410 When gRPC invokes the channel to create a callable, it gives a full
411 method name like "/google.pubsub.v1.Publisher/CreateTopic". This
412 returns just the name of the method, in this case "CreateTopic".
414 Args:
415 method (str): The name of the method.
417 Returns:
418 str: The simplified name of the method.
419 """
420 return method.rsplit("/", 1).pop()
423class ChannelStub(grpc.Channel):
424 """A testing stub for the grpc.Channel interface.
426 This can be used to test any client that eventually uses a gRPC channel
427 to communicate. By passing in a channel stub, you can configure which
428 responses are returned and track which requests are made.
430 For example:
432 .. code-block:: python
434 channel_stub = grpc_helpers.ChannelStub()
435 client = FooClient(channel=channel_stub)
437 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar')
439 foo = client.get_foo(labels=['baz'])
441 assert foo.name == 'bar'
442 assert channel_stub.GetFoo.requests[0].labels = ['baz']
444 Each method on the stub can be accessed and configured on the channel.
445 Here's some examples of various configurations:
447 .. code-block:: python
449 # Return a basic response:
451 channel_stub.GetFoo.response = foo_pb2.Foo(name='bar')
452 assert client.get_foo().name == 'bar'
454 # Raise an exception:
455 channel_stub.GetFoo.response = NotFound('...')
457 with pytest.raises(NotFound):
458 client.get_foo()
460 # Use a sequence of responses:
461 channel_stub.GetFoo.responses = iter([
462 foo_pb2.Foo(name='bar'),
463 foo_pb2.Foo(name='baz'),
464 ])
466 assert client.get_foo().name == 'bar'
467 assert client.get_foo().name == 'baz'
469 # Use a callable
471 def on_get_foo(request):
472 return foo_pb2.Foo(name='bar' + request.id)
474 channel_stub.GetFoo.response = on_get_foo
476 assert client.get_foo(id='123').name == 'bar123'
477 """
479 def __init__(self, responses=[]):
480 self.requests = []
481 """Sequence[Tuple[str, protobuf.Message]]: A list of all requests made
482 on this channel in order. The tuple is of method name, request
483 message."""
484 self._method_stubs = {}
486 def _stub_for_method(self, method):
487 method = _simplify_method_name(method)
488 self._method_stubs[method] = _CallableStub(method, self)
489 return self._method_stubs[method]
491 def __getattr__(self, key):
492 try:
493 return self._method_stubs[key]
494 except KeyError:
495 raise AttributeError
497 def unary_unary(self, method, request_serializer=None, response_deserializer=None):
498 """grpc.Channel.unary_unary implementation."""
499 return self._stub_for_method(method)
501 def unary_stream(self, method, request_serializer=None, response_deserializer=None):
502 """grpc.Channel.unary_stream implementation."""
503 return self._stub_for_method(method)
505 def stream_unary(self, method, request_serializer=None, response_deserializer=None):
506 """grpc.Channel.stream_unary implementation."""
507 return self._stub_for_method(method)
509 def stream_stream(
510 self, method, request_serializer=None, response_deserializer=None
511 ):
512 """grpc.Channel.stream_stream implementation."""
513 return self._stub_for_method(method)
515 def subscribe(self, callback, try_to_connect=False):
516 """grpc.Channel.subscribe implementation."""
517 pass
519 def unsubscribe(self, callback):
520 """grpc.Channel.unsubscribe implementation."""
521 pass
523 def close(self):
524 """grpc.Channel.close implementation."""
525 pass