Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 41%
185 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import sys
18from typing import Any, Iterable, List, Optional, Sequence
20import grpc
21from grpc import _common
22from grpc import _compression
23from grpc import _grpcio_metadata
24from grpc._cython import cygrpc
26from . import _base_call
27from . import _base_channel
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._interceptor import ClientInterceptor
33from ._interceptor import InterceptedStreamStreamCall
34from ._interceptor import InterceptedStreamUnaryCall
35from ._interceptor import InterceptedUnaryStreamCall
36from ._interceptor import InterceptedUnaryUnaryCall
37from ._interceptor import StreamStreamClientInterceptor
38from ._interceptor import StreamUnaryClientInterceptor
39from ._interceptor import UnaryStreamClientInterceptor
40from ._interceptor import UnaryUnaryClientInterceptor
41from ._metadata import Metadata
42from ._typing import ChannelArgumentType
43from ._typing import DeserializingFunction
44from ._typing import RequestIterableType
45from ._typing import SerializingFunction
46from ._utils import _timeout_to_deadline
48_USER_AGENT = 'grpc-python-asyncio/{}'.format(_grpcio_metadata.__version__)
50if sys.version_info[1] < 7:
52 def _all_tasks() -> Iterable[asyncio.Task]:
53 return asyncio.Task.all_tasks()
54else:
56 def _all_tasks() -> Iterable[asyncio.Task]:
57 return asyncio.all_tasks()
60def _augment_channel_arguments(base_options: ChannelArgumentType,
61 compression: Optional[grpc.Compression]):
62 compression_channel_argument = _compression.create_channel_option(
63 compression)
64 user_agent_channel_argument = ((
65 cygrpc.ChannelArgKey.primary_user_agent_string,
66 _USER_AGENT,
67 ),)
68 return tuple(base_options
69 ) + compression_channel_argument + user_agent_channel_argument
72class _BaseMultiCallable:
73 """Base class of all multi callable objects.
75 Handles the initialization logic and stores common attributes.
76 """
77 _loop: asyncio.AbstractEventLoop
78 _channel: cygrpc.AioChannel
79 _method: bytes
80 _request_serializer: SerializingFunction
81 _response_deserializer: DeserializingFunction
82 _interceptors: Optional[Sequence[ClientInterceptor]]
83 _references: List[Any]
84 _loop: asyncio.AbstractEventLoop
86 # pylint: disable=too-many-arguments
87 def __init__(
88 self,
89 channel: cygrpc.AioChannel,
90 method: bytes,
91 request_serializer: SerializingFunction,
92 response_deserializer: DeserializingFunction,
93 interceptors: Optional[Sequence[ClientInterceptor]],
94 references: List[Any],
95 loop: asyncio.AbstractEventLoop,
96 ) -> None:
97 self._loop = loop
98 self._channel = channel
99 self._method = method
100 self._request_serializer = request_serializer
101 self._response_deserializer = response_deserializer
102 self._interceptors = interceptors
103 self._references = references
105 @staticmethod
106 def _init_metadata(
107 metadata: Optional[Metadata] = None,
108 compression: Optional[grpc.Compression] = None) -> Metadata:
109 """Based on the provided values for <metadata> or <compression> initialise the final
110 metadata, as it should be used for the current call.
111 """
112 metadata = metadata or Metadata()
113 if compression:
114 metadata = Metadata(
115 *_compression.augment_metadata(metadata, compression))
116 return metadata
119class UnaryUnaryMultiCallable(_BaseMultiCallable,
120 _base_channel.UnaryUnaryMultiCallable):
122 def __call__(
123 self,
124 request: Any,
125 *,
126 timeout: Optional[float] = None,
127 metadata: Optional[Metadata] = None,
128 credentials: Optional[grpc.CallCredentials] = None,
129 wait_for_ready: Optional[bool] = None,
130 compression: Optional[grpc.Compression] = None
131 ) -> _base_call.UnaryUnaryCall:
133 metadata = self._init_metadata(metadata, compression)
134 if not self._interceptors:
135 call = UnaryUnaryCall(request, _timeout_to_deadline(timeout),
136 metadata, credentials, wait_for_ready,
137 self._channel, self._method,
138 self._request_serializer,
139 self._response_deserializer, self._loop)
140 else:
141 call = InterceptedUnaryUnaryCall(
142 self._interceptors, request, timeout, metadata, credentials,
143 wait_for_ready, self._channel, self._method,
144 self._request_serializer, self._response_deserializer,
145 self._loop)
147 return call
150class UnaryStreamMultiCallable(_BaseMultiCallable,
151 _base_channel.UnaryStreamMultiCallable):
153 def __call__(
154 self,
155 request: Any,
156 *,
157 timeout: Optional[float] = None,
158 metadata: Optional[Metadata] = None,
159 credentials: Optional[grpc.CallCredentials] = None,
160 wait_for_ready: Optional[bool] = None,
161 compression: Optional[grpc.Compression] = None
162 ) -> _base_call.UnaryStreamCall:
164 metadata = self._init_metadata(metadata, compression)
165 deadline = _timeout_to_deadline(timeout)
167 if not self._interceptors:
168 call = UnaryStreamCall(request, deadline, metadata, credentials,
169 wait_for_ready, self._channel, self._method,
170 self._request_serializer,
171 self._response_deserializer, self._loop)
172 else:
173 call = InterceptedUnaryStreamCall(
174 self._interceptors, request, deadline, metadata, credentials,
175 wait_for_ready, self._channel, self._method,
176 self._request_serializer, self._response_deserializer,
177 self._loop)
179 return call
182class StreamUnaryMultiCallable(_BaseMultiCallable,
183 _base_channel.StreamUnaryMultiCallable):
185 def __call__(
186 self,
187 request_iterator: Optional[RequestIterableType] = None,
188 timeout: Optional[float] = None,
189 metadata: Optional[Metadata] = None,
190 credentials: Optional[grpc.CallCredentials] = None,
191 wait_for_ready: Optional[bool] = None,
192 compression: Optional[grpc.Compression] = None
193 ) -> _base_call.StreamUnaryCall:
195 metadata = self._init_metadata(metadata, compression)
196 deadline = _timeout_to_deadline(timeout)
198 if not self._interceptors:
199 call = StreamUnaryCall(request_iterator, deadline, metadata,
200 credentials, wait_for_ready, self._channel,
201 self._method, self._request_serializer,
202 self._response_deserializer, self._loop)
203 else:
204 call = InterceptedStreamUnaryCall(
205 self._interceptors, request_iterator, deadline, metadata,
206 credentials, wait_for_ready, self._channel, self._method,
207 self._request_serializer, self._response_deserializer,
208 self._loop)
210 return call
213class StreamStreamMultiCallable(_BaseMultiCallable,
214 _base_channel.StreamStreamMultiCallable):
216 def __call__(
217 self,
218 request_iterator: Optional[RequestIterableType] = None,
219 timeout: Optional[float] = None,
220 metadata: Optional[Metadata] = None,
221 credentials: Optional[grpc.CallCredentials] = None,
222 wait_for_ready: Optional[bool] = None,
223 compression: Optional[grpc.Compression] = None
224 ) -> _base_call.StreamStreamCall:
226 metadata = self._init_metadata(metadata, compression)
227 deadline = _timeout_to_deadline(timeout)
229 if not self._interceptors:
230 call = StreamStreamCall(request_iterator, deadline, metadata,
231 credentials, wait_for_ready, self._channel,
232 self._method, self._request_serializer,
233 self._response_deserializer, self._loop)
234 else:
235 call = InterceptedStreamStreamCall(
236 self._interceptors, request_iterator, deadline, metadata,
237 credentials, wait_for_ready, self._channel, self._method,
238 self._request_serializer, self._response_deserializer,
239 self._loop)
241 return call
244class Channel(_base_channel.Channel):
245 _loop: asyncio.AbstractEventLoop
246 _channel: cygrpc.AioChannel
247 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor]
248 _unary_stream_interceptors: List[UnaryStreamClientInterceptor]
249 _stream_unary_interceptors: List[StreamUnaryClientInterceptor]
250 _stream_stream_interceptors: List[StreamStreamClientInterceptor]
252 def __init__(self, target: str, options: ChannelArgumentType,
253 credentials: Optional[grpc.ChannelCredentials],
254 compression: Optional[grpc.Compression],
255 interceptors: Optional[Sequence[ClientInterceptor]]):
256 """Constructor.
258 Args:
259 target: The target to which to connect.
260 options: Configuration options for the channel.
261 credentials: A cygrpc.ChannelCredentials or None.
262 compression: An optional value indicating the compression method to be
263 used over the lifetime of the channel.
264 interceptors: An optional list of interceptors that would be used for
265 intercepting any RPC executed with that channel.
266 """
267 self._unary_unary_interceptors = []
268 self._unary_stream_interceptors = []
269 self._stream_unary_interceptors = []
270 self._stream_stream_interceptors = []
272 if interceptors is not None:
273 for interceptor in interceptors:
274 if isinstance(interceptor, UnaryUnaryClientInterceptor):
275 self._unary_unary_interceptors.append(interceptor)
276 elif isinstance(interceptor, UnaryStreamClientInterceptor):
277 self._unary_stream_interceptors.append(interceptor)
278 elif isinstance(interceptor, StreamUnaryClientInterceptor):
279 self._stream_unary_interceptors.append(interceptor)
280 elif isinstance(interceptor, StreamStreamClientInterceptor):
281 self._stream_stream_interceptors.append(interceptor)
282 else:
283 raise ValueError(
284 "Interceptor {} must be ".format(interceptor) +
285 "{} or ".format(UnaryUnaryClientInterceptor.__name__) +
286 "{} or ".format(UnaryStreamClientInterceptor.__name__) +
287 "{} or ".format(StreamUnaryClientInterceptor.__name__) +
288 "{}. ".format(StreamStreamClientInterceptor.__name__))
290 self._loop = cygrpc.get_working_loop()
291 self._channel = cygrpc.AioChannel(
292 _common.encode(target),
293 _augment_channel_arguments(options, compression), credentials,
294 self._loop)
296 async def __aenter__(self):
297 return self
299 async def __aexit__(self, exc_type, exc_val, exc_tb):
300 await self._close(None)
302 async def _close(self, grace): # pylint: disable=too-many-branches
303 if self._channel.closed():
304 return
306 # No new calls will be accepted by the Cython channel.
307 self._channel.closing()
309 # Iterate through running tasks
310 tasks = _all_tasks()
311 calls = []
312 call_tasks = []
313 for task in tasks:
314 try:
315 stack = task.get_stack(limit=1)
316 except AttributeError as attribute_error:
317 # NOTE(lidiz) tl;dr: If the Task is created with a CPython
318 # object, it will trigger AttributeError.
319 #
320 # In the global finalizer, the event loop schedules
321 # a CPython PyAsyncGenAThrow object.
322 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484
323 #
324 # However, the PyAsyncGenAThrow object is written in C and
325 # failed to include the normal Python frame objects. Hence,
326 # this exception is a false negative, and it is safe to ignore
327 # the failure. It is fixed by https://github.com/python/cpython/pull/18669,
328 # but not available until 3.9 or 3.8.3. So, we have to keep it
329 # for a while.
330 # TODO(lidiz) drop this hack after 3.8 deprecation
331 if 'frame' in str(attribute_error):
332 continue
333 else:
334 raise
336 # If the Task is created by a C-extension, the stack will be empty.
337 if not stack:
338 continue
340 # Locate ones created by `aio.Call`.
341 frame = stack[0]
342 candidate = frame.f_locals.get('self')
343 if candidate:
344 if isinstance(candidate, _base_call.Call):
345 if hasattr(candidate, '_channel'):
346 # For intercepted Call object
347 if candidate._channel is not self._channel:
348 continue
349 elif hasattr(candidate, '_cython_call'):
350 # For normal Call object
351 if candidate._cython_call._channel is not self._channel:
352 continue
353 else:
354 # Unidentified Call object
355 raise cygrpc.InternalError(
356 f'Unrecognized call object: {candidate}')
358 calls.append(candidate)
359 call_tasks.append(task)
361 # If needed, try to wait for them to finish.
362 # Call objects are not always awaitables.
363 if grace and call_tasks:
364 await asyncio.wait(call_tasks, timeout=grace)
366 # Time to cancel existing calls.
367 for call in calls:
368 call.cancel()
370 # Destroy the channel
371 self._channel.close()
373 async def close(self, grace: Optional[float] = None):
374 await self._close(grace)
376 def __del__(self):
377 if hasattr(self, '_channel'):
378 if not self._channel.closed():
379 self._channel.close()
381 def get_state(self,
382 try_to_connect: bool = False) -> grpc.ChannelConnectivity:
383 result = self._channel.check_connectivity_state(try_to_connect)
384 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
386 async def wait_for_state_change(
387 self,
388 last_observed_state: grpc.ChannelConnectivity,
389 ) -> None:
390 assert await self._channel.watch_connectivity_state(
391 last_observed_state.value[0], None)
393 async def channel_ready(self) -> None:
394 state = self.get_state(try_to_connect=True)
395 while state != grpc.ChannelConnectivity.READY:
396 await self.wait_for_state_change(state)
397 state = self.get_state(try_to_connect=True)
399 def unary_unary(
400 self,
401 method: str,
402 request_serializer: Optional[SerializingFunction] = None,
403 response_deserializer: Optional[DeserializingFunction] = None
404 ) -> UnaryUnaryMultiCallable:
405 return UnaryUnaryMultiCallable(self._channel, _common.encode(method),
406 request_serializer,
407 response_deserializer,
408 self._unary_unary_interceptors, [self],
409 self._loop)
411 def unary_stream(
412 self,
413 method: str,
414 request_serializer: Optional[SerializingFunction] = None,
415 response_deserializer: Optional[DeserializingFunction] = None
416 ) -> UnaryStreamMultiCallable:
417 return UnaryStreamMultiCallable(self._channel, _common.encode(method),
418 request_serializer,
419 response_deserializer,
420 self._unary_stream_interceptors, [self],
421 self._loop)
423 def stream_unary(
424 self,
425 method: str,
426 request_serializer: Optional[SerializingFunction] = None,
427 response_deserializer: Optional[DeserializingFunction] = None
428 ) -> StreamUnaryMultiCallable:
429 return StreamUnaryMultiCallable(self._channel, _common.encode(method),
430 request_serializer,
431 response_deserializer,
432 self._stream_unary_interceptors, [self],
433 self._loop)
435 def stream_stream(
436 self,
437 method: str,
438 request_serializer: Optional[SerializingFunction] = None,
439 response_deserializer: Optional[DeserializingFunction] = None
440 ) -> StreamStreamMultiCallable:
441 return StreamStreamMultiCallable(self._channel, _common.encode(method),
442 request_serializer,
443 response_deserializer,
444 self._stream_stream_interceptors,
445 [self], self._loop)
448def insecure_channel(
449 target: str,
450 options: Optional[ChannelArgumentType] = None,
451 compression: Optional[grpc.Compression] = None,
452 interceptors: Optional[Sequence[ClientInterceptor]] = None):
453 """Creates an insecure asynchronous Channel to a server.
455 Args:
456 target: The server address
457 options: An optional list of key-value pairs (:term:`channel_arguments`
458 in gRPC Core runtime) to configure the channel.
459 compression: An optional value indicating the compression method to be
460 used over the lifetime of the channel.
461 interceptors: An optional sequence of interceptors that will be executed for
462 any call executed with this channel.
464 Returns:
465 A Channel.
466 """
467 return Channel(target, () if options is None else options, None,
468 compression, interceptors)
471def secure_channel(target: str,
472 credentials: grpc.ChannelCredentials,
473 options: Optional[ChannelArgumentType] = None,
474 compression: Optional[grpc.Compression] = None,
475 interceptors: Optional[Sequence[ClientInterceptor]] = None):
476 """Creates a secure asynchronous Channel to a server.
478 Args:
479 target: The server address.
480 credentials: A ChannelCredentials instance.
481 options: An optional list of key-value pairs (:term:`channel_arguments`
482 in gRPC Core runtime) to configure the channel.
483 compression: An optional value indicating the compression method to be
484 used over the lifetime of the channel.
485 interceptors: An optional sequence of interceptors that will be executed for
486 any call executed with this channel.
488 Returns:
489 An aio.Channel.
490 """
491 return Channel(target, () if options is None else options,
492 credentials._credentials, compression, interceptors)