Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 42%
187 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import sys
18from typing import Any, Iterable, List, Optional, Sequence
20import grpc
21from grpc import _common
22from grpc import _compression
23from grpc import _grpcio_metadata
24from grpc._cython import cygrpc
26from . import _base_call
27from . import _base_channel
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._interceptor import ClientInterceptor
33from ._interceptor import InterceptedStreamStreamCall
34from ._interceptor import InterceptedStreamUnaryCall
35from ._interceptor import InterceptedUnaryStreamCall
36from ._interceptor import InterceptedUnaryUnaryCall
37from ._interceptor import StreamStreamClientInterceptor
38from ._interceptor import StreamUnaryClientInterceptor
39from ._interceptor import UnaryStreamClientInterceptor
40from ._interceptor import UnaryUnaryClientInterceptor
41from ._metadata import Metadata
42from ._typing import ChannelArgumentType
43from ._typing import DeserializingFunction
44from ._typing import MetadataType
45from ._typing import RequestIterableType
46from ._typing import RequestType
47from ._typing import ResponseType
48from ._typing import SerializingFunction
49from ._utils import _timeout_to_deadline
51_USER_AGENT = "grpc-python-asyncio/{}".format(_grpcio_metadata.__version__)
53if sys.version_info[1] < 7:
55 def _all_tasks() -> Iterable[asyncio.Task]:
56 return asyncio.Task.all_tasks() # pylint: disable=no-member
58else:
60 def _all_tasks() -> Iterable[asyncio.Task]:
61 return asyncio.all_tasks()
64def _augment_channel_arguments(
65 base_options: ChannelArgumentType, compression: Optional[grpc.Compression]
66):
67 compression_channel_argument = _compression.create_channel_option(
68 compression
69 )
70 user_agent_channel_argument = (
71 (
72 cygrpc.ChannelArgKey.primary_user_agent_string,
73 _USER_AGENT,
74 ),
75 )
76 return (
77 tuple(base_options)
78 + compression_channel_argument
79 + user_agent_channel_argument
80 )
83class _BaseMultiCallable:
84 """Base class of all multi callable objects.
86 Handles the initialization logic and stores common attributes.
87 """
89 _loop: asyncio.AbstractEventLoop
90 _channel: cygrpc.AioChannel
91 _method: bytes
92 _request_serializer: SerializingFunction
93 _response_deserializer: DeserializingFunction
94 _interceptors: Optional[Sequence[ClientInterceptor]]
95 _references: List[Any]
96 _loop: asyncio.AbstractEventLoop
98 # pylint: disable=too-many-arguments
99 def __init__(
100 self,
101 channel: cygrpc.AioChannel,
102 method: bytes,
103 request_serializer: SerializingFunction,
104 response_deserializer: DeserializingFunction,
105 interceptors: Optional[Sequence[ClientInterceptor]],
106 references: List[Any],
107 loop: asyncio.AbstractEventLoop,
108 ) -> None:
109 self._loop = loop
110 self._channel = channel
111 self._method = method
112 self._request_serializer = request_serializer
113 self._response_deserializer = response_deserializer
114 self._interceptors = interceptors
115 self._references = references
117 @staticmethod
118 def _init_metadata(
119 metadata: Optional[MetadataType] = None,
120 compression: Optional[grpc.Compression] = None,
121 ) -> Metadata:
122 """Based on the provided values for <metadata> or <compression> initialise the final
123 metadata, as it should be used for the current call.
124 """
125 metadata = metadata or Metadata()
126 if not isinstance(metadata, Metadata) and isinstance(metadata, tuple):
127 metadata = Metadata.from_tuple(metadata)
128 if compression:
129 metadata = Metadata(
130 *_compression.augment_metadata(metadata, compression)
131 )
132 return metadata
135class UnaryUnaryMultiCallable(
136 _BaseMultiCallable, _base_channel.UnaryUnaryMultiCallable
137):
138 def __call__(
139 self,
140 request: RequestType,
141 *,
142 timeout: Optional[float] = None,
143 metadata: Optional[MetadataType] = None,
144 credentials: Optional[grpc.CallCredentials] = None,
145 wait_for_ready: Optional[bool] = None,
146 compression: Optional[grpc.Compression] = None,
147 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]:
148 metadata = self._init_metadata(metadata, compression)
149 if not self._interceptors:
150 call = UnaryUnaryCall(
151 request,
152 _timeout_to_deadline(timeout),
153 metadata,
154 credentials,
155 wait_for_ready,
156 self._channel,
157 self._method,
158 self._request_serializer,
159 self._response_deserializer,
160 self._loop,
161 )
162 else:
163 call = InterceptedUnaryUnaryCall(
164 self._interceptors,
165 request,
166 timeout,
167 metadata,
168 credentials,
169 wait_for_ready,
170 self._channel,
171 self._method,
172 self._request_serializer,
173 self._response_deserializer,
174 self._loop,
175 )
177 return call
180class UnaryStreamMultiCallable(
181 _BaseMultiCallable, _base_channel.UnaryStreamMultiCallable
182):
183 def __call__(
184 self,
185 request: RequestType,
186 *,
187 timeout: Optional[float] = None,
188 metadata: Optional[MetadataType] = None,
189 credentials: Optional[grpc.CallCredentials] = None,
190 wait_for_ready: Optional[bool] = None,
191 compression: Optional[grpc.Compression] = None,
192 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]:
193 metadata = self._init_metadata(metadata, compression)
195 if not self._interceptors:
196 call = UnaryStreamCall(
197 request,
198 _timeout_to_deadline(timeout),
199 metadata,
200 credentials,
201 wait_for_ready,
202 self._channel,
203 self._method,
204 self._request_serializer,
205 self._response_deserializer,
206 self._loop,
207 )
208 else:
209 call = InterceptedUnaryStreamCall(
210 self._interceptors,
211 request,
212 timeout,
213 metadata,
214 credentials,
215 wait_for_ready,
216 self._channel,
217 self._method,
218 self._request_serializer,
219 self._response_deserializer,
220 self._loop,
221 )
223 return call
226class StreamUnaryMultiCallable(
227 _BaseMultiCallable, _base_channel.StreamUnaryMultiCallable
228):
229 def __call__(
230 self,
231 request_iterator: Optional[RequestIterableType] = None,
232 timeout: Optional[float] = None,
233 metadata: Optional[MetadataType] = None,
234 credentials: Optional[grpc.CallCredentials] = None,
235 wait_for_ready: Optional[bool] = None,
236 compression: Optional[grpc.Compression] = None,
237 ) -> _base_call.StreamUnaryCall:
238 metadata = self._init_metadata(metadata, compression)
240 if not self._interceptors:
241 call = StreamUnaryCall(
242 request_iterator,
243 _timeout_to_deadline(timeout),
244 metadata,
245 credentials,
246 wait_for_ready,
247 self._channel,
248 self._method,
249 self._request_serializer,
250 self._response_deserializer,
251 self._loop,
252 )
253 else:
254 call = InterceptedStreamUnaryCall(
255 self._interceptors,
256 request_iterator,
257 timeout,
258 metadata,
259 credentials,
260 wait_for_ready,
261 self._channel,
262 self._method,
263 self._request_serializer,
264 self._response_deserializer,
265 self._loop,
266 )
268 return call
271class StreamStreamMultiCallable(
272 _BaseMultiCallable, _base_channel.StreamStreamMultiCallable
273):
274 def __call__(
275 self,
276 request_iterator: Optional[RequestIterableType] = None,
277 timeout: Optional[float] = None,
278 metadata: Optional[MetadataType] = None,
279 credentials: Optional[grpc.CallCredentials] = None,
280 wait_for_ready: Optional[bool] = None,
281 compression: Optional[grpc.Compression] = None,
282 ) -> _base_call.StreamStreamCall:
283 metadata = self._init_metadata(metadata, compression)
285 if not self._interceptors:
286 call = StreamStreamCall(
287 request_iterator,
288 _timeout_to_deadline(timeout),
289 metadata,
290 credentials,
291 wait_for_ready,
292 self._channel,
293 self._method,
294 self._request_serializer,
295 self._response_deserializer,
296 self._loop,
297 )
298 else:
299 call = InterceptedStreamStreamCall(
300 self._interceptors,
301 request_iterator,
302 timeout,
303 metadata,
304 credentials,
305 wait_for_ready,
306 self._channel,
307 self._method,
308 self._request_serializer,
309 self._response_deserializer,
310 self._loop,
311 )
313 return call
316class Channel(_base_channel.Channel):
317 _loop: asyncio.AbstractEventLoop
318 _channel: cygrpc.AioChannel
319 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor]
320 _unary_stream_interceptors: List[UnaryStreamClientInterceptor]
321 _stream_unary_interceptors: List[StreamUnaryClientInterceptor]
322 _stream_stream_interceptors: List[StreamStreamClientInterceptor]
324 def __init__(
325 self,
326 target: str,
327 options: ChannelArgumentType,
328 credentials: Optional[grpc.ChannelCredentials],
329 compression: Optional[grpc.Compression],
330 interceptors: Optional[Sequence[ClientInterceptor]],
331 ):
332 """Constructor.
334 Args:
335 target: The target to which to connect.
336 options: Configuration options for the channel.
337 credentials: A cygrpc.ChannelCredentials or None.
338 compression: An optional value indicating the compression method to be
339 used over the lifetime of the channel.
340 interceptors: An optional list of interceptors that would be used for
341 intercepting any RPC executed with that channel.
342 """
343 self._unary_unary_interceptors = []
344 self._unary_stream_interceptors = []
345 self._stream_unary_interceptors = []
346 self._stream_stream_interceptors = []
348 if interceptors is not None:
349 for interceptor in interceptors:
350 if isinstance(interceptor, UnaryUnaryClientInterceptor):
351 self._unary_unary_interceptors.append(interceptor)
352 elif isinstance(interceptor, UnaryStreamClientInterceptor):
353 self._unary_stream_interceptors.append(interceptor)
354 elif isinstance(interceptor, StreamUnaryClientInterceptor):
355 self._stream_unary_interceptors.append(interceptor)
356 elif isinstance(interceptor, StreamStreamClientInterceptor):
357 self._stream_stream_interceptors.append(interceptor)
358 else:
359 raise ValueError(
360 "Interceptor {} must be ".format(interceptor)
361 + "{} or ".format(UnaryUnaryClientInterceptor.__name__)
362 + "{} or ".format(UnaryStreamClientInterceptor.__name__)
363 + "{} or ".format(StreamUnaryClientInterceptor.__name__)
364 + "{}. ".format(StreamStreamClientInterceptor.__name__)
365 )
367 self._loop = cygrpc.get_working_loop()
368 self._channel = cygrpc.AioChannel(
369 _common.encode(target),
370 _augment_channel_arguments(options, compression),
371 credentials,
372 self._loop,
373 )
375 async def __aenter__(self):
376 return self
378 async def __aexit__(self, exc_type, exc_val, exc_tb):
379 await self._close(None)
381 async def _close(self, grace): # pylint: disable=too-many-branches
382 if self._channel.closed():
383 return
385 # No new calls will be accepted by the Cython channel.
386 self._channel.closing()
388 # Iterate through running tasks
389 tasks = _all_tasks()
390 calls = []
391 call_tasks = []
392 for task in tasks:
393 try:
394 stack = task.get_stack(limit=1)
395 except AttributeError as attribute_error:
396 # NOTE(lidiz) tl;dr: If the Task is created with a CPython
397 # object, it will trigger AttributeError.
398 #
399 # In the global finalizer, the event loop schedules
400 # a CPython PyAsyncGenAThrow object.
401 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484
402 #
403 # However, the PyAsyncGenAThrow object is written in C and
404 # failed to include the normal Python frame objects. Hence,
405 # this exception is a false negative, and it is safe to ignore
406 # the failure. It is fixed by https://github.com/python/cpython/pull/18669,
407 # but not available until 3.9 or 3.8.3. So, we have to keep it
408 # for a while.
409 # TODO(lidiz) drop this hack after 3.8 deprecation
410 if "frame" in str(attribute_error):
411 continue
412 else:
413 raise
415 # If the Task is created by a C-extension, the stack will be empty.
416 if not stack:
417 continue
419 # Locate ones created by `aio.Call`.
420 frame = stack[0]
421 candidate = frame.f_locals.get("self")
422 if candidate:
423 if isinstance(candidate, _base_call.Call):
424 if hasattr(candidate, "_channel"):
425 # For intercepted Call object
426 if candidate._channel is not self._channel:
427 continue
428 elif hasattr(candidate, "_cython_call"):
429 # For normal Call object
430 if candidate._cython_call._channel is not self._channel:
431 continue
432 else:
433 # Unidentified Call object
434 raise cygrpc.InternalError(
435 f"Unrecognized call object: {candidate}"
436 )
438 calls.append(candidate)
439 call_tasks.append(task)
441 # If needed, try to wait for them to finish.
442 # Call objects are not always awaitables.
443 if grace and call_tasks:
444 await asyncio.wait(call_tasks, timeout=grace)
446 # Time to cancel existing calls.
447 for call in calls:
448 call.cancel()
450 # Destroy the channel
451 self._channel.close()
453 async def close(self, grace: Optional[float] = None):
454 await self._close(grace)
456 def __del__(self):
457 if hasattr(self, "_channel"):
458 if not self._channel.closed():
459 self._channel.close()
461 def get_state(
462 self, try_to_connect: bool = False
463 ) -> grpc.ChannelConnectivity:
464 result = self._channel.check_connectivity_state(try_to_connect)
465 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
467 async def wait_for_state_change(
468 self,
469 last_observed_state: grpc.ChannelConnectivity,
470 ) -> None:
471 assert await self._channel.watch_connectivity_state(
472 last_observed_state.value[0], None
473 )
475 async def channel_ready(self) -> None:
476 state = self.get_state(try_to_connect=True)
477 while state != grpc.ChannelConnectivity.READY:
478 await self.wait_for_state_change(state)
479 state = self.get_state(try_to_connect=True)
481 def unary_unary(
482 self,
483 method: str,
484 request_serializer: Optional[SerializingFunction] = None,
485 response_deserializer: Optional[DeserializingFunction] = None,
486 ) -> UnaryUnaryMultiCallable:
487 return UnaryUnaryMultiCallable(
488 self._channel,
489 _common.encode(method),
490 request_serializer,
491 response_deserializer,
492 self._unary_unary_interceptors,
493 [self],
494 self._loop,
495 )
497 def unary_stream(
498 self,
499 method: str,
500 request_serializer: Optional[SerializingFunction] = None,
501 response_deserializer: Optional[DeserializingFunction] = None,
502 ) -> UnaryStreamMultiCallable:
503 return UnaryStreamMultiCallable(
504 self._channel,
505 _common.encode(method),
506 request_serializer,
507 response_deserializer,
508 self._unary_stream_interceptors,
509 [self],
510 self._loop,
511 )
513 def stream_unary(
514 self,
515 method: str,
516 request_serializer: Optional[SerializingFunction] = None,
517 response_deserializer: Optional[DeserializingFunction] = None,
518 ) -> StreamUnaryMultiCallable:
519 return StreamUnaryMultiCallable(
520 self._channel,
521 _common.encode(method),
522 request_serializer,
523 response_deserializer,
524 self._stream_unary_interceptors,
525 [self],
526 self._loop,
527 )
529 def stream_stream(
530 self,
531 method: str,
532 request_serializer: Optional[SerializingFunction] = None,
533 response_deserializer: Optional[DeserializingFunction] = None,
534 ) -> StreamStreamMultiCallable:
535 return StreamStreamMultiCallable(
536 self._channel,
537 _common.encode(method),
538 request_serializer,
539 response_deserializer,
540 self._stream_stream_interceptors,
541 [self],
542 self._loop,
543 )
546def insecure_channel(
547 target: str,
548 options: Optional[ChannelArgumentType] = None,
549 compression: Optional[grpc.Compression] = None,
550 interceptors: Optional[Sequence[ClientInterceptor]] = None,
551):
552 """Creates an insecure asynchronous Channel to a server.
554 Args:
555 target: The server address
556 options: An optional list of key-value pairs (:term:`channel_arguments`
557 in gRPC Core runtime) to configure the channel.
558 compression: An optional value indicating the compression method to be
559 used over the lifetime of the channel.
560 interceptors: An optional sequence of interceptors that will be executed for
561 any call executed with this channel.
563 Returns:
564 A Channel.
565 """
566 return Channel(
567 target,
568 () if options is None else options,
569 None,
570 compression,
571 interceptors,
572 )
575def secure_channel(
576 target: str,
577 credentials: grpc.ChannelCredentials,
578 options: Optional[ChannelArgumentType] = None,
579 compression: Optional[grpc.Compression] = None,
580 interceptors: Optional[Sequence[ClientInterceptor]] = None,
581):
582 """Creates a secure asynchronous Channel to a server.
584 Args:
585 target: The server address.
586 credentials: A ChannelCredentials instance.
587 options: An optional list of key-value pairs (:term:`channel_arguments`
588 in gRPC Core runtime) to configure the channel.
589 compression: An optional value indicating the compression method to be
590 used over the lifetime of the channel.
591 interceptors: An optional sequence of interceptors that will be executed for
592 any call executed with this channel.
594 Returns:
595 An aio.Channel.
596 """
597 return Channel(
598 target,
599 () if options is None else options,
600 credentials._credentials,
601 compression,
602 interceptors,
603 )