Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_channel.py: 42%
184 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import sys
18from typing import Any, Iterable, List, Optional, Sequence
20import grpc
21from grpc import _common
22from grpc import _compression
23from grpc import _grpcio_metadata
24from grpc._cython import cygrpc
26from . import _base_call
27from . import _base_channel
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._interceptor import ClientInterceptor
33from ._interceptor import InterceptedStreamStreamCall
34from ._interceptor import InterceptedStreamUnaryCall
35from ._interceptor import InterceptedUnaryStreamCall
36from ._interceptor import InterceptedUnaryUnaryCall
37from ._interceptor import StreamStreamClientInterceptor
38from ._interceptor import StreamUnaryClientInterceptor
39from ._interceptor import UnaryStreamClientInterceptor
40from ._interceptor import UnaryUnaryClientInterceptor
41from ._metadata import Metadata
42from ._typing import ChannelArgumentType
43from ._typing import DeserializingFunction
44from ._typing import RequestIterableType
45from ._typing import RequestType
46from ._typing import ResponseType
47from ._typing import SerializingFunction
48from ._utils import _timeout_to_deadline
50_USER_AGENT = "grpc-python-asyncio/{}".format(_grpcio_metadata.__version__)
52if sys.version_info[1] < 7:
54 def _all_tasks() -> Iterable[asyncio.Task]:
55 return asyncio.Task.all_tasks() # pylint: disable=no-member
57else:
59 def _all_tasks() -> Iterable[asyncio.Task]:
60 return asyncio.all_tasks()
63def _augment_channel_arguments(
64 base_options: ChannelArgumentType, compression: Optional[grpc.Compression]
65):
66 compression_channel_argument = _compression.create_channel_option(
67 compression
68 )
69 user_agent_channel_argument = (
70 (
71 cygrpc.ChannelArgKey.primary_user_agent_string,
72 _USER_AGENT,
73 ),
74 )
75 return (
76 tuple(base_options)
77 + compression_channel_argument
78 + user_agent_channel_argument
79 )
82class _BaseMultiCallable:
83 """Base class of all multi callable objects.
85 Handles the initialization logic and stores common attributes.
86 """
88 _loop: asyncio.AbstractEventLoop
89 _channel: cygrpc.AioChannel
90 _method: bytes
91 _request_serializer: SerializingFunction
92 _response_deserializer: DeserializingFunction
93 _interceptors: Optional[Sequence[ClientInterceptor]]
94 _references: List[Any]
95 _loop: asyncio.AbstractEventLoop
97 # pylint: disable=too-many-arguments
98 def __init__(
99 self,
100 channel: cygrpc.AioChannel,
101 method: bytes,
102 request_serializer: SerializingFunction,
103 response_deserializer: DeserializingFunction,
104 interceptors: Optional[Sequence[ClientInterceptor]],
105 references: List[Any],
106 loop: asyncio.AbstractEventLoop,
107 ) -> None:
108 self._loop = loop
109 self._channel = channel
110 self._method = method
111 self._request_serializer = request_serializer
112 self._response_deserializer = response_deserializer
113 self._interceptors = interceptors
114 self._references = references
116 @staticmethod
117 def _init_metadata(
118 metadata: Optional[Metadata] = None,
119 compression: Optional[grpc.Compression] = None,
120 ) -> Metadata:
121 """Based on the provided values for <metadata> or <compression> initialise the final
122 metadata, as it should be used for the current call.
123 """
124 metadata = metadata or Metadata()
125 if compression:
126 metadata = Metadata(
127 *_compression.augment_metadata(metadata, compression)
128 )
129 return metadata
132class UnaryUnaryMultiCallable(
133 _BaseMultiCallable, _base_channel.UnaryUnaryMultiCallable
134):
135 def __call__(
136 self,
137 request: RequestType,
138 *,
139 timeout: Optional[float] = None,
140 metadata: Optional[Metadata] = None,
141 credentials: Optional[grpc.CallCredentials] = None,
142 wait_for_ready: Optional[bool] = None,
143 compression: Optional[grpc.Compression] = None,
144 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]:
145 metadata = self._init_metadata(metadata, compression)
146 if not self._interceptors:
147 call = UnaryUnaryCall(
148 request,
149 _timeout_to_deadline(timeout),
150 metadata,
151 credentials,
152 wait_for_ready,
153 self._channel,
154 self._method,
155 self._request_serializer,
156 self._response_deserializer,
157 self._loop,
158 )
159 else:
160 call = InterceptedUnaryUnaryCall(
161 self._interceptors,
162 request,
163 timeout,
164 metadata,
165 credentials,
166 wait_for_ready,
167 self._channel,
168 self._method,
169 self._request_serializer,
170 self._response_deserializer,
171 self._loop,
172 )
174 return call
177class UnaryStreamMultiCallable(
178 _BaseMultiCallable, _base_channel.UnaryStreamMultiCallable
179):
180 def __call__(
181 self,
182 request: RequestType,
183 *,
184 timeout: Optional[float] = None,
185 metadata: Optional[Metadata] = None,
186 credentials: Optional[grpc.CallCredentials] = None,
187 wait_for_ready: Optional[bool] = None,
188 compression: Optional[grpc.Compression] = None,
189 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]:
190 metadata = self._init_metadata(metadata, compression)
192 if not self._interceptors:
193 call = UnaryStreamCall(
194 request,
195 _timeout_to_deadline(timeout),
196 metadata,
197 credentials,
198 wait_for_ready,
199 self._channel,
200 self._method,
201 self._request_serializer,
202 self._response_deserializer,
203 self._loop,
204 )
205 else:
206 call = InterceptedUnaryStreamCall(
207 self._interceptors,
208 request,
209 timeout,
210 metadata,
211 credentials,
212 wait_for_ready,
213 self._channel,
214 self._method,
215 self._request_serializer,
216 self._response_deserializer,
217 self._loop,
218 )
220 return call
223class StreamUnaryMultiCallable(
224 _BaseMultiCallable, _base_channel.StreamUnaryMultiCallable
225):
226 def __call__(
227 self,
228 request_iterator: Optional[RequestIterableType] = None,
229 timeout: Optional[float] = None,
230 metadata: Optional[Metadata] = None,
231 credentials: Optional[grpc.CallCredentials] = None,
232 wait_for_ready: Optional[bool] = None,
233 compression: Optional[grpc.Compression] = None,
234 ) -> _base_call.StreamUnaryCall:
235 metadata = self._init_metadata(metadata, compression)
237 if not self._interceptors:
238 call = StreamUnaryCall(
239 request_iterator,
240 _timeout_to_deadline(timeout),
241 metadata,
242 credentials,
243 wait_for_ready,
244 self._channel,
245 self._method,
246 self._request_serializer,
247 self._response_deserializer,
248 self._loop,
249 )
250 else:
251 call = InterceptedStreamUnaryCall(
252 self._interceptors,
253 request_iterator,
254 timeout,
255 metadata,
256 credentials,
257 wait_for_ready,
258 self._channel,
259 self._method,
260 self._request_serializer,
261 self._response_deserializer,
262 self._loop,
263 )
265 return call
268class StreamStreamMultiCallable(
269 _BaseMultiCallable, _base_channel.StreamStreamMultiCallable
270):
271 def __call__(
272 self,
273 request_iterator: Optional[RequestIterableType] = None,
274 timeout: Optional[float] = None,
275 metadata: Optional[Metadata] = None,
276 credentials: Optional[grpc.CallCredentials] = None,
277 wait_for_ready: Optional[bool] = None,
278 compression: Optional[grpc.Compression] = None,
279 ) -> _base_call.StreamStreamCall:
280 metadata = self._init_metadata(metadata, compression)
282 if not self._interceptors:
283 call = StreamStreamCall(
284 request_iterator,
285 _timeout_to_deadline(timeout),
286 metadata,
287 credentials,
288 wait_for_ready,
289 self._channel,
290 self._method,
291 self._request_serializer,
292 self._response_deserializer,
293 self._loop,
294 )
295 else:
296 call = InterceptedStreamStreamCall(
297 self._interceptors,
298 request_iterator,
299 timeout,
300 metadata,
301 credentials,
302 wait_for_ready,
303 self._channel,
304 self._method,
305 self._request_serializer,
306 self._response_deserializer,
307 self._loop,
308 )
310 return call
313class Channel(_base_channel.Channel):
314 _loop: asyncio.AbstractEventLoop
315 _channel: cygrpc.AioChannel
316 _unary_unary_interceptors: List[UnaryUnaryClientInterceptor]
317 _unary_stream_interceptors: List[UnaryStreamClientInterceptor]
318 _stream_unary_interceptors: List[StreamUnaryClientInterceptor]
319 _stream_stream_interceptors: List[StreamStreamClientInterceptor]
321 def __init__(
322 self,
323 target: str,
324 options: ChannelArgumentType,
325 credentials: Optional[grpc.ChannelCredentials],
326 compression: Optional[grpc.Compression],
327 interceptors: Optional[Sequence[ClientInterceptor]],
328 ):
329 """Constructor.
331 Args:
332 target: The target to which to connect.
333 options: Configuration options for the channel.
334 credentials: A cygrpc.ChannelCredentials or None.
335 compression: An optional value indicating the compression method to be
336 used over the lifetime of the channel.
337 interceptors: An optional list of interceptors that would be used for
338 intercepting any RPC executed with that channel.
339 """
340 self._unary_unary_interceptors = []
341 self._unary_stream_interceptors = []
342 self._stream_unary_interceptors = []
343 self._stream_stream_interceptors = []
345 if interceptors is not None:
346 for interceptor in interceptors:
347 if isinstance(interceptor, UnaryUnaryClientInterceptor):
348 self._unary_unary_interceptors.append(interceptor)
349 elif isinstance(interceptor, UnaryStreamClientInterceptor):
350 self._unary_stream_interceptors.append(interceptor)
351 elif isinstance(interceptor, StreamUnaryClientInterceptor):
352 self._stream_unary_interceptors.append(interceptor)
353 elif isinstance(interceptor, StreamStreamClientInterceptor):
354 self._stream_stream_interceptors.append(interceptor)
355 else:
356 raise ValueError(
357 "Interceptor {} must be ".format(interceptor)
358 + "{} or ".format(UnaryUnaryClientInterceptor.__name__)
359 + "{} or ".format(UnaryStreamClientInterceptor.__name__)
360 + "{} or ".format(StreamUnaryClientInterceptor.__name__)
361 + "{}. ".format(StreamStreamClientInterceptor.__name__)
362 )
364 self._loop = cygrpc.get_working_loop()
365 self._channel = cygrpc.AioChannel(
366 _common.encode(target),
367 _augment_channel_arguments(options, compression),
368 credentials,
369 self._loop,
370 )
372 async def __aenter__(self):
373 return self
375 async def __aexit__(self, exc_type, exc_val, exc_tb):
376 await self._close(None)
378 async def _close(self, grace): # pylint: disable=too-many-branches
379 if self._channel.closed():
380 return
382 # No new calls will be accepted by the Cython channel.
383 self._channel.closing()
385 # Iterate through running tasks
386 tasks = _all_tasks()
387 calls = []
388 call_tasks = []
389 for task in tasks:
390 try:
391 stack = task.get_stack(limit=1)
392 except AttributeError as attribute_error:
393 # NOTE(lidiz) tl;dr: If the Task is created with a CPython
394 # object, it will trigger AttributeError.
395 #
396 # In the global finalizer, the event loop schedules
397 # a CPython PyAsyncGenAThrow object.
398 # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484
399 #
400 # However, the PyAsyncGenAThrow object is written in C and
401 # failed to include the normal Python frame objects. Hence,
402 # this exception is a false negative, and it is safe to ignore
403 # the failure. It is fixed by https://github.com/python/cpython/pull/18669,
404 # but not available until 3.9 or 3.8.3. So, we have to keep it
405 # for a while.
406 # TODO(lidiz) drop this hack after 3.8 deprecation
407 if "frame" in str(attribute_error):
408 continue
409 else:
410 raise
412 # If the Task is created by a C-extension, the stack will be empty.
413 if not stack:
414 continue
416 # Locate ones created by `aio.Call`.
417 frame = stack[0]
418 candidate = frame.f_locals.get("self")
419 if candidate:
420 if isinstance(candidate, _base_call.Call):
421 if hasattr(candidate, "_channel"):
422 # For intercepted Call object
423 if candidate._channel is not self._channel:
424 continue
425 elif hasattr(candidate, "_cython_call"):
426 # For normal Call object
427 if candidate._cython_call._channel is not self._channel:
428 continue
429 else:
430 # Unidentified Call object
431 raise cygrpc.InternalError(
432 f"Unrecognized call object: {candidate}"
433 )
435 calls.append(candidate)
436 call_tasks.append(task)
438 # If needed, try to wait for them to finish.
439 # Call objects are not always awaitables.
440 if grace and call_tasks:
441 await asyncio.wait(call_tasks, timeout=grace)
443 # Time to cancel existing calls.
444 for call in calls:
445 call.cancel()
447 # Destroy the channel
448 self._channel.close()
450 async def close(self, grace: Optional[float] = None):
451 await self._close(grace)
453 def __del__(self):
454 if hasattr(self, "_channel"):
455 if not self._channel.closed():
456 self._channel.close()
458 def get_state(
459 self, try_to_connect: bool = False
460 ) -> grpc.ChannelConnectivity:
461 result = self._channel.check_connectivity_state(try_to_connect)
462 return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
464 async def wait_for_state_change(
465 self,
466 last_observed_state: grpc.ChannelConnectivity,
467 ) -> None:
468 assert await self._channel.watch_connectivity_state(
469 last_observed_state.value[0], None
470 )
472 async def channel_ready(self) -> None:
473 state = self.get_state(try_to_connect=True)
474 while state != grpc.ChannelConnectivity.READY:
475 await self.wait_for_state_change(state)
476 state = self.get_state(try_to_connect=True)
478 def unary_unary(
479 self,
480 method: str,
481 request_serializer: Optional[SerializingFunction] = None,
482 response_deserializer: Optional[DeserializingFunction] = None,
483 ) -> UnaryUnaryMultiCallable:
484 return UnaryUnaryMultiCallable(
485 self._channel,
486 _common.encode(method),
487 request_serializer,
488 response_deserializer,
489 self._unary_unary_interceptors,
490 [self],
491 self._loop,
492 )
494 def unary_stream(
495 self,
496 method: str,
497 request_serializer: Optional[SerializingFunction] = None,
498 response_deserializer: Optional[DeserializingFunction] = None,
499 ) -> UnaryStreamMultiCallable:
500 return UnaryStreamMultiCallable(
501 self._channel,
502 _common.encode(method),
503 request_serializer,
504 response_deserializer,
505 self._unary_stream_interceptors,
506 [self],
507 self._loop,
508 )
510 def stream_unary(
511 self,
512 method: str,
513 request_serializer: Optional[SerializingFunction] = None,
514 response_deserializer: Optional[DeserializingFunction] = None,
515 ) -> StreamUnaryMultiCallable:
516 return StreamUnaryMultiCallable(
517 self._channel,
518 _common.encode(method),
519 request_serializer,
520 response_deserializer,
521 self._stream_unary_interceptors,
522 [self],
523 self._loop,
524 )
526 def stream_stream(
527 self,
528 method: str,
529 request_serializer: Optional[SerializingFunction] = None,
530 response_deserializer: Optional[DeserializingFunction] = None,
531 ) -> StreamStreamMultiCallable:
532 return StreamStreamMultiCallable(
533 self._channel,
534 _common.encode(method),
535 request_serializer,
536 response_deserializer,
537 self._stream_stream_interceptors,
538 [self],
539 self._loop,
540 )
543def insecure_channel(
544 target: str,
545 options: Optional[ChannelArgumentType] = None,
546 compression: Optional[grpc.Compression] = None,
547 interceptors: Optional[Sequence[ClientInterceptor]] = None,
548):
549 """Creates an insecure asynchronous Channel to a server.
551 Args:
552 target: The server address
553 options: An optional list of key-value pairs (:term:`channel_arguments`
554 in gRPC Core runtime) to configure the channel.
555 compression: An optional value indicating the compression method to be
556 used over the lifetime of the channel.
557 interceptors: An optional sequence of interceptors that will be executed for
558 any call executed with this channel.
560 Returns:
561 A Channel.
562 """
563 return Channel(
564 target,
565 () if options is None else options,
566 None,
567 compression,
568 interceptors,
569 )
572def secure_channel(
573 target: str,
574 credentials: grpc.ChannelCredentials,
575 options: Optional[ChannelArgumentType] = None,
576 compression: Optional[grpc.Compression] = None,
577 interceptors: Optional[Sequence[ClientInterceptor]] = None,
578):
579 """Creates a secure asynchronous Channel to a server.
581 Args:
582 target: The server address.
583 credentials: A ChannelCredentials instance.
584 options: An optional list of key-value pairs (:term:`channel_arguments`
585 in gRPC Core runtime) to configure the channel.
586 compression: An optional value indicating the compression method to be
587 used over the lifetime of the channel.
588 interceptors: An optional sequence of interceptors that will be executed for
589 any call executed with this channel.
591 Returns:
592 An aio.Channel.
593 """
594 return Channel(
595 target,
596 () if options is None else options,
597 credentials._credentials,
598 compression,
599 interceptors,
600 )