Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 42%
187 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-16 06:17 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-16 06:17 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import sys
18from typing import Any, Iterable, List, Optional, Sequence
20import grpc
21from grpc import _common
22from grpc import _compression
23from grpc import _grpcio_metadata
24from grpc._cython import cygrpc
26from . import _base_call
27from . import _base_channel
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._interceptor import ClientInterceptor
33from ._interceptor import InterceptedStreamStreamCall
34from ._interceptor import InterceptedStreamUnaryCall
35from ._interceptor import InterceptedUnaryStreamCall
36from ._interceptor import InterceptedUnaryUnaryCall
37from ._interceptor import StreamStreamClientInterceptor
38from ._interceptor import StreamUnaryClientInterceptor
39from ._interceptor import UnaryStreamClientInterceptor
40from ._interceptor import UnaryUnaryClientInterceptor
41from ._metadata import Metadata
42from ._typing import ChannelArgumentType
43from ._typing import DeserializingFunction
44from ._typing import RequestIterableType
45from ._typing import RequestType
46from ._typing import ResponseType
47from ._typing import SerializingFunction
48from ._utils import _timeout_to_deadline
50_USER_AGENT = "grpc-python-asyncio/{}".format(_grpcio_metadata.__version__)
52if sys.version_info[1] < 7:
54 def _all_tasks() -> Iterable[asyncio.Task]:
55 return asyncio.Task.all_tasks() # pylint: disable=no-member
57else:
59 def _all_tasks() -> Iterable[asyncio.Task]:
60 return asyncio.all_tasks()
63def _augment_channel_arguments(
64 base_options: ChannelArgumentType, compression: Optional[grpc.Compression]
65):
66 compression_channel_argument = _compression.create_channel_option(
67 compression
68 )
69 user_agent_channel_argument = (
70 (
71 cygrpc.ChannelArgKey.primary_user_agent_string,
72 _USER_AGENT,
73 ),
74 )
75 return (
76 tuple(base_options)
77 + compression_channel_argument
78 + user_agent_channel_argument
79 )
82class _BaseMultiCallable:
83 """Base class of all multi callable objects.
85 Handles the initialization logic and stores common attributes.
86 """
88 _loop: asyncio.AbstractEventLoop
89 _channel: cygrpc.AioChannel
90 _method: bytes
91 _request_serializer: SerializingFunction
92 _response_deserializer: DeserializingFunction
93 _interceptors: Optional[Sequence[ClientInterceptor]]
94 _references: List[Any]
95 _loop: asyncio.AbstractEventLoop
97 # pylint: disable=too-many-arguments
98 def __init__(
99 self,
100 channel: cygrpc.AioChannel,
101 method: bytes,
102 request_serializer: SerializingFunction,
103 response_deserializer: DeserializingFunction,
104 interceptors: Optional[Sequence[ClientInterceptor]],
105 references: List[Any],
106 loop: asyncio.AbstractEventLoop,
107 ) -> None:
108 self._loop = loop
109 self._channel = channel
110 self._method = method
111 self._request_serializer = request_serializer
112 self._response_deserializer = response_deserializer
113 self._interceptors = interceptors
114 self._references = references
116 @staticmethod
117 def _init_metadata(
118 metadata: Optional[Metadata] = None,
119 compression: Optional[grpc.Compression] = None,
120 ) -> Metadata:
121 """Based on the provided values for <metadata> or <compression> initialise the final
122 metadata, as it should be used for the current call.
123 """
124 metadata = metadata or Metadata()
125 if compression:
126 metadata = Metadata(
127 *_compression.augment_metadata(metadata, compression)
128 )
129 return metadata
132class UnaryUnaryMultiCallable(
133 _BaseMultiCallable, _base_channel.UnaryUnaryMultiCallable
134):
135 def __call__(
136 self,
137 request: RequestType,
138 *,
139 timeout: Optional[float] = None,
140 metadata: Optional[Metadata] = None,
141 credentials: Optional[grpc.CallCredentials] = None,
142 wait_for_ready: Optional[bool] = None,
143 compression: Optional[grpc.Compression] = None,
144 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]:
145 metadata = self._init_metadata(metadata, compression)
146 if not self._interceptors:
147 call = UnaryUnaryCall(
148 request,
149 _timeout_to_deadline(timeout),
150 metadata,
151 credentials,
152 wait_for_ready,
153 self._channel,
154 self._method,
155 self._request_serializer,
156 self._response_deserializer,
157 self._loop,
158 )
159 else:
160 call = InterceptedUnaryUnaryCall(
161 self._interceptors,
162 request,
163 timeout,
164 metadata,
165 credentials,
166 wait_for_ready,
167 self._channel,
168 self._method,
169 self._request_serializer,
170 self._response_deserializer,
171 self._loop,
172 )
174 return call
177class UnaryStreamMultiCallable(
178 _BaseMultiCallable, _base_channel.UnaryStreamMultiCallable
179):
180 def __call__(
181 self,
182 request: RequestType,
183 *,
184 timeout: Optional[float] = None,
185 metadata: Optional[Metadata] = None,
186 credentials: Optional[grpc.CallCredentials] = None,
187 wait_for_ready: Optional[bool] = None,
188 compression: Optional[grpc.Compression] = None,
189 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]:
190 metadata = self._init_metadata(metadata, compression)
191 deadline = _timeout_to_deadline(timeout)
193 if not self._interceptors:
194 call = UnaryStreamCall(
195 request,
196 deadline,
197 metadata,
198 credentials,
199 wait_for_ready,
200 self._channel,
201 self._method,
202 self._request_serializer,
203 self._response_deserializer,
204 self._loop,
205 )
206 else:
207 call = InterceptedUnaryStreamCall(
208 self._interceptors,
209 request,
210 deadline,
211 metadata,
212 credentials,
213 wait_for_ready,
214 self._channel,
215 self._method,
216 self._request_serializer,
217 self._response_deserializer,
218 self._loop,
219 )
221 return call
224class StreamUnaryMultiCallable(
225 _BaseMultiCallable, _base_channel.StreamUnaryMultiCallable
226):
227 def __call__(
228 self,
229 request_iterator: Optional[RequestIterableType] = None,
230 timeout: Optional[float] = None,
231 metadata: Optional[Metadata] = None,
232 credentials: Optional[grpc.CallCredentials] = None,
233 wait_for_ready: Optional[bool] = None,
234 compression: Optional[grpc.Compression] = None,
235 ) -> _base_call.StreamUnaryCall:
236 metadata = self._init_metadata(metadata, compression)
237 deadline = _timeout_to_deadline(timeout)
239 if not self._interceptors:
240 call = StreamUnaryCall(
241 request_iterator,
242 deadline,
243 metadata,
244 credentials,
245 wait_for_ready,
246 self._channel,
247 self._method,
248 self._request_serializer,
249 self._response_deserializer,
250 self._loop,
251 )
252 else:
253 call = InterceptedStreamUnaryCall(
254 self._interceptors,
255 request_iterator,
256 deadline,
257 metadata,
258 credentials,
259 wait_for_ready,
260 self._channel,
261 self._method,
262 self._request_serializer,
263 self._response_deserializer,
264 self._loop,
265 )
267 return call
270class StreamStreamMultiCallable(
271 _BaseMultiCallable, _base_channel.StreamStreamMultiCallable
272):
273 def __call__(
274 self,
275 request_iterator: Optional[RequestIterableType] = None,
276 timeout: Optional[float] = None,
277 metadata: Optional[Metadata] = None,
278 credentials: Optional[grpc.CallCredentials] = None,
279 wait_for_ready: Optional[bool] = None,
280 compression: Optional[grpc.Compression] = None,
281 ) -> _base_call.StreamStreamCall:
282 metadata = self._init_metadata(metadata, compression)
283 deadline = _timeout_to_deadline(timeout)
285 if not self._interceptors:
286 call = StreamStreamCall(
287 request_iterator,
288 deadline,
289 metadata,
290 credentials,
291 wait_for_ready,
292 self._channel,
293 self._method,
294 self._request_serializer,
295 self._response_deserializer,
296 self._loop,
297 )
298 else:
299 call = InterceptedStreamStreamCall(
300 self._interceptors,
301 request_iterator,
302 deadline,
303 metadata,
304 credentials,
305 wait_for_ready,
306 self._channel,
307 self._method,
308 self._request_serializer,
309 self._response_deserializer,
310 self._loop,
311 )
313 return call
316class Channel(_base_channel.Channel):
317 _loop: asyncio.AbstractEventLoop
318 _channel: cygrpc.AioChannel
319 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor]
320 _unary_stream_interceptors: List[UnaryStreamClientInterceptor]
321 _stream_unary_interceptors: List[StreamUnaryClientInterceptor]
322 _stream_stream_interceptors: List[StreamStreamClientInterceptor]
324 def __init__(
325 self,
326 target: str,
327 options: ChannelArgumentType,
328 credentials: Optional[grpc.ChannelCredentials],
329 compression: Optional[grpc.Compression],
330 interceptors: Optional[Sequence[ClientInterceptor]],
331 ):
332 """Constructor.
334 Args:
335 target: The target to which to connect.
336 options: Configuration options for the channel.
337 credentials: A cygrpc.ChannelCredentials or None.
338 compression: An optional value indicating the compression method to be
339 used over the lifetime of the channel.
340 interceptors: An optional list of interceptors that would be used for
341 intercepting any RPC executed with that channel.
342 """
343 self._unary_unary_interceptors = []
344 self._unary_stream_interceptors = []
345 self._stream_unary_interceptors = []
346 self._stream_stream_interceptors = []
348 if interceptors is not None:
349 for interceptor in interceptors:
350 if isinstance(interceptor, UnaryUnaryClientInterceptor):
351 self._unary_unary_interceptors.append(interceptor)
352 elif isinstance(interceptor, UnaryStreamClientInterceptor):
353 self._unary_stream_interceptors.append(interceptor)
354 elif isinstance(interceptor, StreamUnaryClientInterceptor):
355 self._stream_unary_interceptors.append(interceptor)
356 elif isinstance(interceptor, StreamStreamClientInterceptor):
357 self._stream_stream_interceptors.append(interceptor)
358 else:
359 raise ValueError(
360 "Interceptor {} must be ".format(interceptor)
361 + "{} or ".format(UnaryUnaryClientInterceptor.__name__)
362 + "{} or ".format(UnaryStreamClientInterceptor.__name__)
363 + "{} or ".format(StreamUnaryClientInterceptor.__name__)
364 + "{}. ".format(StreamStreamClientInterceptor.__name__)
365 )
367 self._loop = cygrpc.get_working_loop()
368 self._channel = cygrpc.AioChannel(
369 _common.encode(target),
370 _augment_channel_arguments(options, compression),
371 credentials,
372 self._loop,
373 )
375 async def __aenter__(self):
376 return self
378 async def __aexit__(self, exc_type, exc_val, exc_tb):
379 await self._close(None)
381 async def _close(self, grace): # pylint: disable=too-many-branches
382 if self._channel.closed():
383 return
385 # No new calls will be accepted by the Cython channel.
386 self._channel.closing()
388 # Iterate through running tasks
389 tasks = _all_tasks()
390 calls = []
391 call_tasks = []
392 for task in tasks:
393 try:
394 stack = task.get_stack(limit=1)
395 except AttributeError as attribute_error:
396 # NOTE(lidiz) tl;dr: If the Task is created with a CPython
397 # object, it will trigger AttributeError.
398 #
399 # In the global finalizer, the event loop schedules
400 # a CPython PyAsyncGenAThrow object.
401 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484
402 #
403 # However, the PyAsyncGenAThrow object is written in C and
404 # failed to include the normal Python frame objects. Hence,
405 # this exception is a false negative, and it is safe to ignore
406 # the failure. It is fixed by https://github.com/python/cpython/pull/18669,
407 # but not available until 3.9 or 3.8.3. So, we have to keep it
408 # for a while.
409 # TODO(lidiz) drop this hack after 3.8 deprecation
410 if "frame" in str(attribute_error):
411 continue
412 else:
413 raise
415 # If the Task is created by a C-extension, the stack will be empty.
416 if not stack:
417 continue
419 # Locate ones created by `aio.Call`.
420 frame = stack[0]
421 candidate = frame.f_locals.get("self")
422 if candidate:
423 if isinstance(candidate, _base_call.Call):
424 if hasattr(candidate, "_channel"):
425 # For intercepted Call object
426 if candidate._channel is not self._channel:
427 continue
428 elif hasattr(candidate, "_cython_call"):
429 # For normal Call object
430 if candidate._cython_call._channel is not self._channel:
431 continue
432 else:
433 # Unidentified Call object
434 raise cygrpc.InternalError(
435 f"Unrecognized call object: {candidate}"
436 )
438 calls.append(candidate)
439 call_tasks.append(task)
441 # If needed, try to wait for them to finish.
442 # Call objects are not always awaitables.
443 if grace and call_tasks:
444 await asyncio.wait(call_tasks, timeout=grace)
446 # Time to cancel existing calls.
447 for call in calls:
448 call.cancel()
450 # Destroy the channel
451 self._channel.close()
453 async def close(self, grace: Optional[float] = None):
454 await self._close(grace)
456 def __del__(self):
457 if hasattr(self, "_channel"):
458 if not self._channel.closed():
459 self._channel.close()
461 def get_state(
462 self, try_to_connect: bool = False
463 ) -> grpc.ChannelConnectivity:
464 result = self._channel.check_connectivity_state(try_to_connect)
465 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
467 async def wait_for_state_change(
468 self,
469 last_observed_state: grpc.ChannelConnectivity,
470 ) -> None:
471 assert await self._channel.watch_connectivity_state(
472 last_observed_state.value[0], None
473 )
475 async def channel_ready(self) -> None:
476 state = self.get_state(try_to_connect=True)
477 while state != grpc.ChannelConnectivity.READY:
478 await self.wait_for_state_change(state)
479 state = self.get_state(try_to_connect=True)
481 def unary_unary(
482 self,
483 method: str,
484 request_serializer: Optional[SerializingFunction] = None,
485 response_deserializer: Optional[DeserializingFunction] = None,
486 ) -> UnaryUnaryMultiCallable:
487 return UnaryUnaryMultiCallable(
488 self._channel,
489 _common.encode(method),
490 request_serializer,
491 response_deserializer,
492 self._unary_unary_interceptors,
493 [self],
494 self._loop,
495 )
497 def unary_stream(
498 self,
499 method: str,
500 request_serializer: Optional[SerializingFunction] = None,
501 response_deserializer: Optional[DeserializingFunction] = None,
502 ) -> UnaryStreamMultiCallable:
503 return UnaryStreamMultiCallable(
504 self._channel,
505 _common.encode(method),
506 request_serializer,
507 response_deserializer,
508 self._unary_stream_interceptors,
509 [self],
510 self._loop,
511 )
513 def stream_unary(
514 self,
515 method: str,
516 request_serializer: Optional[SerializingFunction] = None,
517 response_deserializer: Optional[DeserializingFunction] = None,
518 ) -> StreamUnaryMultiCallable:
519 return StreamUnaryMultiCallable(
520 self._channel,
521 _common.encode(method),
522 request_serializer,
523 response_deserializer,
524 self._stream_unary_interceptors,
525 [self],
526 self._loop,
527 )
529 def stream_stream(
530 self,
531 method: str,
532 request_serializer: Optional[SerializingFunction] = None,
533 response_deserializer: Optional[DeserializingFunction] = None,
534 ) -> StreamStreamMultiCallable:
535 return StreamStreamMultiCallable(
536 self._channel,
537 _common.encode(method),
538 request_serializer,
539 response_deserializer,
540 self._stream_stream_interceptors,
541 [self],
542 self._loop,
543 )
546def insecure_channel(
547 target: str,
548 options: Optional[ChannelArgumentType] = None,
549 compression: Optional[grpc.Compression] = None,
550 interceptors: Optional[Sequence[ClientInterceptor]] = None,
551):
552 """Creates an insecure asynchronous Channel to a server.
554 Args:
555 target: The server address
556 options: An optional list of key-value pairs (:term:`channel_arguments`
557 in gRPC Core runtime) to configure the channel.
558 compression: An optional value indicating the compression method to be
559 used over the lifetime of the channel.
560 interceptors: An optional sequence of interceptors that will be executed for
561 any call executed with this channel.
563 Returns:
564 A Channel.
565 """
566 return Channel(
567 target,
568 () if options is None else options,
569 None,
570 compression,
571 interceptors,
572 )
575def secure_channel(
576 target: str,
577 credentials: grpc.ChannelCredentials,
578 options: Optional[ChannelArgumentType] = None,
579 compression: Optional[grpc.Compression] = None,
580 interceptors: Optional[Sequence[ClientInterceptor]] = None,
581):
582 """Creates a secure asynchronous Channel to a server.
584 Args:
585 target: The server address.
586 credentials: A ChannelCredentials instance.
587 options: An optional list of key-value pairs (:term:`channel_arguments`
588 in gRPC Core runtime) to configure the channel.
589 compression: An optional value indicating the compression method to be
590 used over the lifetime of the channel.
591 interceptors: An optional sequence of interceptors that will be executed for
592 any call executed with this channel.
594 Returns:
595 An aio.Channel.
596 """
597 return Channel(
598 target,
599 () if options is None else options,
600 credentials._credentials,
601 compression,
602 interceptors,
603 )