Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers_async.py: 36%
122 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""AsyncIO helpers for :mod:`grpc` supporting 3.7+.
17Please combine more detailed docstring in grpc_helpers.py to use following
18functions. This module is implementing the same surface with AsyncIO semantics.
19"""
21import asyncio
22import functools
24import grpc
25from grpc import aio
27from google.api_core import exceptions, grpc_helpers
30# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
31# automatic patching for us. But that means the overhead of creating an
32# extra Python function spreads to every single send and receive.
35class _WrappedCall(aio.Call):
36 def __init__(self):
37 self._call = None
39 def with_call(self, call):
40 """Supplies the call object separately to keep __init__ clean."""
41 self._call = call
42 return self
44 async def initial_metadata(self):
45 return await self._call.initial_metadata()
47 async def trailing_metadata(self):
48 return await self._call.trailing_metadata()
50 async def code(self):
51 return await self._call.code()
53 async def details(self):
54 return await self._call.details()
56 def cancelled(self):
57 return self._call.cancelled()
59 def done(self):
60 return self._call.done()
62 def time_remaining(self):
63 return self._call.time_remaining()
65 def cancel(self):
66 return self._call.cancel()
68 def add_done_callback(self, callback):
69 self._call.add_done_callback(callback)
71 async def wait_for_connection(self):
72 try:
73 await self._call.wait_for_connection()
74 except grpc.RpcError as rpc_error:
75 raise exceptions.from_grpc_error(rpc_error) from rpc_error
78class _WrappedUnaryResponseMixin(_WrappedCall):
79 def __await__(self):
80 try:
81 response = yield from self._call.__await__()
82 return response
83 except grpc.RpcError as rpc_error:
84 raise exceptions.from_grpc_error(rpc_error) from rpc_error
87class _WrappedStreamResponseMixin(_WrappedCall):
88 def __init__(self):
89 self._wrapped_async_generator = None
91 async def read(self):
92 try:
93 return await self._call.read()
94 except grpc.RpcError as rpc_error:
95 raise exceptions.from_grpc_error(rpc_error) from rpc_error
97 async def _wrapped_aiter(self):
98 try:
99 # NOTE(lidiz) coverage doesn't understand the exception raised from
100 # __anext__ method. It is covered by test case:
101 # test_wrap_stream_errors_aiter_non_rpc_error
102 async for response in self._call: # pragma: no branch
103 yield response
104 except grpc.RpcError as rpc_error:
105 raise exceptions.from_grpc_error(rpc_error) from rpc_error
107 def __aiter__(self):
108 if not self._wrapped_async_generator:
109 self._wrapped_async_generator = self._wrapped_aiter()
110 return self._wrapped_async_generator
113class _WrappedStreamRequestMixin(_WrappedCall):
114 async def write(self, request):
115 try:
116 await self._call.write(request)
117 except grpc.RpcError as rpc_error:
118 raise exceptions.from_grpc_error(rpc_error) from rpc_error
120 async def done_writing(self):
121 try:
122 await self._call.done_writing()
123 except grpc.RpcError as rpc_error:
124 raise exceptions.from_grpc_error(rpc_error) from rpc_error
127# NOTE(lidiz) Implementing each individual class separately, so we don't
128# expose any API that should not be seen. E.g., __aiter__ in unary-unary
129# RPC, or __await__ in stream-stream RPC.
130class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin, aio.UnaryUnaryCall):
131 """Wrapped UnaryUnaryCall to map exceptions."""
134class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin, aio.UnaryStreamCall):
135 """Wrapped UnaryStreamCall to map exceptions."""
138class _WrappedStreamUnaryCall(
139 _WrappedUnaryResponseMixin, _WrappedStreamRequestMixin, aio.StreamUnaryCall
140):
141 """Wrapped StreamUnaryCall to map exceptions."""
144class _WrappedStreamStreamCall(
145 _WrappedStreamRequestMixin, _WrappedStreamResponseMixin, aio.StreamStreamCall
146):
147 """Wrapped StreamStreamCall to map exceptions."""
150def _wrap_unary_errors(callable_):
151 """Map errors for Unary-Unary async callables."""
152 grpc_helpers._patch_callable_name(callable_)
154 @functools.wraps(callable_)
155 def error_remapped_callable(*args, **kwargs):
156 call = callable_(*args, **kwargs)
157 return _WrappedUnaryUnaryCall().with_call(call)
159 return error_remapped_callable
162def _wrap_stream_errors(callable_):
163 """Map errors for streaming RPC async callables."""
164 grpc_helpers._patch_callable_name(callable_)
166 @functools.wraps(callable_)
167 async def error_remapped_callable(*args, **kwargs):
168 call = callable_(*args, **kwargs)
170 if isinstance(call, aio.UnaryStreamCall):
171 call = _WrappedUnaryStreamCall().with_call(call)
172 elif isinstance(call, aio.StreamUnaryCall):
173 call = _WrappedStreamUnaryCall().with_call(call)
174 elif isinstance(call, aio.StreamStreamCall):
175 call = _WrappedStreamStreamCall().with_call(call)
176 else:
177 raise TypeError("Unexpected type of call %s" % type(call))
179 await call.wait_for_connection()
180 return call
182 return error_remapped_callable
185def wrap_errors(callable_):
186 """Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to
187 friendly error classes.
189 Errors raised by the gRPC callable are mapped to the appropriate
190 :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The
191 original `grpc.RpcError` (which is usually also a `grpc.Call`) is
192 available from the ``response`` property on the mapped exception. This
193 is useful for extracting metadata from the original error.
195 Args:
196 callable_ (Callable): A gRPC callable.
198 Returns: Callable: The wrapped gRPC callable.
199 """
200 if isinstance(callable_, aio.UnaryUnaryMultiCallable):
201 return _wrap_unary_errors(callable_)
202 else:
203 return _wrap_stream_errors(callable_)
206def create_channel(
207 target,
208 credentials=None,
209 scopes=None,
210 ssl_credentials=None,
211 credentials_file=None,
212 quota_project_id=None,
213 default_scopes=None,
214 default_host=None,
215 **kwargs
216):
217 """Create an AsyncIO secure channel with credentials.
219 Args:
220 target (str): The target service address in the format 'hostname:port'.
221 credentials (google.auth.credentials.Credentials): The credentials. If
222 not specified, then this function will attempt to ascertain the
223 credentials from the environment using :func:`google.auth.default`.
224 scopes (Sequence[str]): A optional list of scopes needed for this
225 service. These are only used when credentials are not specified and
226 are passed to :func:`google.auth.default`.
227 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
228 credentials. This can be used to specify different certificates.
229 credentials_file (str): A file with credentials that can be loaded with
230 :func:`google.auth.load_credentials_from_file`. This argument is
231 mutually exclusive with credentials.
232 quota_project_id (str): An optional project to use for billing and quota.
233 default_scopes (Sequence[str]): Default scopes passed by a Google client
234 library. Use 'scopes' for user-defined scopes.
235 default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
236 kwargs: Additional key-word args passed to :func:`aio.secure_channel`.
238 Returns:
239 aio.Channel: The created channel.
241 Raises:
242 google.api_core.DuplicateCredentialArgs: If both a credentials object and credentials_file are passed.
243 """
245 composite_credentials = grpc_helpers._create_composite_credentials(
246 credentials=credentials,
247 credentials_file=credentials_file,
248 scopes=scopes,
249 default_scopes=default_scopes,
250 ssl_credentials=ssl_credentials,
251 quota_project_id=quota_project_id,
252 default_host=default_host,
253 )
255 return aio.secure_channel(target, composite_credentials, **kwargs)
258class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
259 """Fake implementation for unary-unary RPCs.
261 It is a dummy object for response message. Supply the intended response
262 upon the initialization, and the coroutine will return the exact response
263 message.
264 """
266 def __init__(self, response=object()):
267 self.response = response
268 self._future = asyncio.get_event_loop().create_future()
269 self._future.set_result(self.response)
271 def __await__(self):
272 response = yield from self._future.__await__()
273 return response
276class FakeStreamUnaryCall(_WrappedStreamUnaryCall):
277 """Fake implementation for stream-unary RPCs.
279 It is a dummy object for response message. Supply the intended response
280 upon the initialization, and the coroutine will return the exact response
281 message.
282 """
284 def __init__(self, response=object()):
285 self.response = response
286 self._future = asyncio.get_event_loop().create_future()
287 self._future.set_result(self.response)
289 def __await__(self):
290 response = yield from self._future.__await__()
291 return response
293 async def wait_for_connection(self):
294 pass