Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_call.py: 36%
346 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import enum
18from functools import partial
19import inspect
20import logging
21import traceback
22from typing import Any, AsyncIterator, Generator, Generic, Optional, Tuple
24import grpc
25from grpc import _common
26from grpc._cython import cygrpc
28from . import _base_call
29from ._metadata import Metadata
30from ._typing import DeserializingFunction
31from ._typing import DoneCallbackType
32from ._typing import MetadatumType
33from ._typing import RequestIterableType
34from ._typing import RequestType
35from ._typing import ResponseType
36from ._typing import SerializingFunction
38__all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall"
40_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!"
41_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!"
42_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished."
43_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".'
44_API_STYLE_ERROR = (
45 "The iterator and read/write APIs may not be mixed on a single RPC."
46)
48_OK_CALL_REPRESENTATION = (
49 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
50)
52_NON_OK_CALL_REPRESENTATION = (
53 "<{} of RPC that terminated with:\n"
54 "\tstatus = {}\n"
55 '\tdetails = "{}"\n'
56 '\tdebug_error_string = "{}"\n'
57 ">"
58)
60_LOGGER = logging.getLogger(__name__)
63class AioRpcError(grpc.RpcError):
64 """An implementation of RpcError to be used by the asynchronous API.
66 Raised RpcError is a snapshot of the final status of the RPC, values are
67 determined. Hence, its methods no longer needs to be coroutines.
68 """
70 _code: grpc.StatusCode
71 _details: Optional[str]
72 _initial_metadata: Optional[Metadata]
73 _trailing_metadata: Optional[Metadata]
74 _debug_error_string: Optional[str]
76 def __init__(
77 self,
78 code: grpc.StatusCode,
79 initial_metadata: Metadata,
80 trailing_metadata: Metadata,
81 details: Optional[str] = None,
82 debug_error_string: Optional[str] = None,
83 ) -> None:
84 """Constructor.
86 Args:
87 code: The status code with which the RPC has been finalized.
88 details: Optional details explaining the reason of the error.
89 initial_metadata: Optional initial metadata that could be sent by the
90 Server.
91 trailing_metadata: Optional metadata that could be sent by the Server.
92 """
94 super().__init__()
95 self._code = code
96 self._details = details
97 self._initial_metadata = initial_metadata
98 self._trailing_metadata = trailing_metadata
99 self._debug_error_string = debug_error_string
101 def code(self) -> grpc.StatusCode:
102 """Accesses the status code sent by the server.
104 Returns:
105 The `grpc.StatusCode` status code.
106 """
107 return self._code
109 def details(self) -> Optional[str]:
110 """Accesses the details sent by the server.
112 Returns:
113 The description of the error.
114 """
115 return self._details
117 def initial_metadata(self) -> Metadata:
118 """Accesses the initial metadata sent by the server.
120 Returns:
121 The initial metadata received.
122 """
123 return self._initial_metadata
125 def trailing_metadata(self) -> Metadata:
126 """Accesses the trailing metadata sent by the server.
128 Returns:
129 The trailing metadata received.
130 """
131 return self._trailing_metadata
133 def debug_error_string(self) -> str:
134 """Accesses the debug error string sent by the server.
136 Returns:
137 The debug error string received.
138 """
139 return self._debug_error_string
141 def _repr(self) -> str:
142 """Assembles the error string for the RPC error."""
143 return _NON_OK_CALL_REPRESENTATION.format(
144 self.__class__.__name__,
145 self._code,
146 self._details,
147 self._debug_error_string,
148 )
150 def __repr__(self) -> str:
151 return self._repr()
153 def __str__(self) -> str:
154 return self._repr()
156 def __reduce__(self):
157 return (
158 type(self),
159 (
160 self._code,
161 self._initial_metadata,
162 self._trailing_metadata,
163 self._details,
164 self._debug_error_string,
165 ),
166 )
169def _create_rpc_error(
170 initial_metadata: Metadata, status: cygrpc.AioRpcStatus
171) -> AioRpcError:
172 return AioRpcError(
173 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],
174 Metadata.from_tuple(initial_metadata),
175 Metadata.from_tuple(status.trailing_metadata()),
176 details=status.details(),
177 debug_error_string=status.debug_error_string(),
178 )
181class Call:
182 """Base implementation of client RPC Call object.
184 Implements logic around final status, metadata and cancellation.
185 """
187 _loop: asyncio.AbstractEventLoop
188 _code: grpc.StatusCode
189 _cython_call: cygrpc._AioCall
190 _metadata: Tuple[MetadatumType, ...]
191 _request_serializer: SerializingFunction
192 _response_deserializer: DeserializingFunction
194 def __init__(
195 self,
196 cython_call: cygrpc._AioCall,
197 metadata: Metadata,
198 request_serializer: SerializingFunction,
199 response_deserializer: DeserializingFunction,
200 loop: asyncio.AbstractEventLoop,
201 ) -> None:
202 self._loop = loop
203 self._cython_call = cython_call
204 self._metadata = tuple(metadata)
205 self._request_serializer = request_serializer
206 self._response_deserializer = response_deserializer
208 def __del__(self) -> None:
209 # The '_cython_call' object might be destructed before Call object
210 if hasattr(self, "_cython_call"):
211 if not self._cython_call.done():
212 self._cancel(_GC_CANCELLATION_DETAILS)
214 def cancelled(self) -> bool:
215 return self._cython_call.cancelled()
217 def _cancel(self, details: str) -> bool:
218 """Forwards the application cancellation reasoning."""
219 if not self._cython_call.done():
220 self._cython_call.cancel(details)
221 return True
222 else:
223 return False
225 def cancel(self) -> bool:
226 return self._cancel(_LOCAL_CANCELLATION_DETAILS)
228 def done(self) -> bool:
229 return self._cython_call.done()
231 def add_done_callback(self, callback: DoneCallbackType) -> None:
232 cb = partial(callback, self)
233 self._cython_call.add_done_callback(cb)
235 def time_remaining(self) -> Optional[float]:
236 return self._cython_call.time_remaining()
238 async def initial_metadata(self) -> Metadata:
239 raw_metadata_tuple = await self._cython_call.initial_metadata()
240 return Metadata.from_tuple(raw_metadata_tuple)
242 async def trailing_metadata(self) -> Metadata:
243 raw_metadata_tuple = (
244 await self._cython_call.status()
245 ).trailing_metadata()
246 return Metadata.from_tuple(raw_metadata_tuple)
248 async def code(self) -> grpc.StatusCode:
249 cygrpc_code = (await self._cython_call.status()).code()
250 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]
252 async def details(self) -> str:
253 return (await self._cython_call.status()).details()
255 async def debug_error_string(self) -> str:
256 return (await self._cython_call.status()).debug_error_string()
258 async def _raise_for_status(self) -> None:
259 if self._cython_call.is_locally_cancelled():
260 raise asyncio.CancelledError()
261 code = await self.code()
262 if code != grpc.StatusCode.OK:
263 raise _create_rpc_error(
264 await self.initial_metadata(), await self._cython_call.status()
265 )
267 def _repr(self) -> str:
268 return repr(self._cython_call)
270 def __repr__(self) -> str:
271 return self._repr()
273 def __str__(self) -> str:
274 return self._repr()
277class _APIStyle(enum.IntEnum):
278 UNKNOWN = 0
279 ASYNC_GENERATOR = 1
280 READER_WRITER = 2
283class _UnaryResponseMixin(Call, Generic[ResponseType]):
284 _call_response: asyncio.Task
286 def _init_unary_response_mixin(self, response_task: asyncio.Task):
287 self._call_response = response_task
289 def cancel(self) -> bool:
290 if super().cancel():
291 self._call_response.cancel()
292 return True
293 else:
294 return False
296 def __await__(self) -> Generator[Any, None, ResponseType]:
297 """Wait till the ongoing RPC request finishes."""
298 try:
299 response = yield from self._call_response
300 except asyncio.CancelledError:
301 # Even if we caught all other CancelledError, there is still
302 # this corner case. If the application cancels immediately after
303 # the Call object is created, we will observe this
304 # `CancelledError`.
305 if not self.cancelled():
306 self.cancel()
307 raise
309 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
310 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
311 # Instead, if we move the exception raising here, the spam stops.
312 # Unfortunately, there can only be one 'yield from' in '__await__'. So,
313 # we need to access the private instance variable.
314 if response is cygrpc.EOF:
315 if self._cython_call.is_locally_cancelled():
316 raise asyncio.CancelledError()
317 else:
318 raise _create_rpc_error(
319 self._cython_call._initial_metadata,
320 self._cython_call._status,
321 )
322 else:
323 return response
326class _StreamResponseMixin(Call):
327 _message_aiter: AsyncIterator[ResponseType]
328 _preparation: asyncio.Task
329 _response_style: _APIStyle
331 def _init_stream_response_mixin(self, preparation: asyncio.Task):
332 self._message_aiter = None
333 self._preparation = preparation
334 self._response_style = _APIStyle.UNKNOWN
336 def _update_response_style(self, style: _APIStyle):
337 if self._response_style is _APIStyle.UNKNOWN:
338 self._response_style = style
339 elif self._response_style is not style:
340 raise cygrpc.UsageError(_API_STYLE_ERROR)
342 def cancel(self) -> bool:
343 if super().cancel():
344 self._preparation.cancel()
345 return True
346 else:
347 return False
349 async def _fetch_stream_responses(self) -> ResponseType:
350 message = await self._read()
351 while message is not cygrpc.EOF:
352 yield message
353 message = await self._read()
355 # If the read operation failed, Core should explain why.
356 await self._raise_for_status()
358 def __aiter__(self) -> AsyncIterator[ResponseType]:
359 self._update_response_style(_APIStyle.ASYNC_GENERATOR)
360 if self._message_aiter is None:
361 self._message_aiter = self._fetch_stream_responses()
362 return self._message_aiter
364 async def _read(self) -> ResponseType:
365 # Wait for the request being sent
366 await self._preparation
368 # Reads response message from Core
369 try:
370 raw_response = await self._cython_call.receive_serialized_message()
371 except asyncio.CancelledError:
372 if not self.cancelled():
373 self.cancel()
374 raise
376 if raw_response is cygrpc.EOF:
377 return cygrpc.EOF
378 else:
379 return _common.deserialize(
380 raw_response, self._response_deserializer
381 )
383 async def read(self) -> ResponseType:
384 if self.done():
385 await self._raise_for_status()
386 return cygrpc.EOF
387 self._update_response_style(_APIStyle.READER_WRITER)
389 response_message = await self._read()
391 if response_message is cygrpc.EOF:
392 # If the read operation failed, Core should explain why.
393 await self._raise_for_status()
394 return response_message
397class _StreamRequestMixin(Call):
398 _metadata_sent: asyncio.Event
399 _done_writing_flag: bool
400 _async_request_poller: Optional[asyncio.Task]
401 _request_style: _APIStyle
403 def _init_stream_request_mixin(
404 self, request_iterator: Optional[RequestIterableType]
405 ):
406 self._metadata_sent = asyncio.Event()
407 self._done_writing_flag = False
409 # If user passes in an async iterator, create a consumer Task.
410 if request_iterator is not None:
411 self._async_request_poller = self._loop.create_task(
412 self._consume_request_iterator(request_iterator)
413 )
414 self._request_style = _APIStyle.ASYNC_GENERATOR
415 else:
416 self._async_request_poller = None
417 self._request_style = _APIStyle.READER_WRITER
419 def _raise_for_different_style(self, style: _APIStyle):
420 if self._request_style is not style:
421 raise cygrpc.UsageError(_API_STYLE_ERROR)
423 def cancel(self) -> bool:
424 if super().cancel():
425 if self._async_request_poller is not None:
426 self._async_request_poller.cancel()
427 return True
428 else:
429 return False
431 def _metadata_sent_observer(self):
432 self._metadata_sent.set()
434 async def _consume_request_iterator(
435 self, request_iterator: RequestIterableType
436 ) -> None:
437 try:
438 if inspect.isasyncgen(request_iterator) or hasattr(
439 request_iterator, "__aiter__"
440 ):
441 async for request in request_iterator:
442 try:
443 await self._write(request)
444 except AioRpcError as rpc_error:
445 _LOGGER.debug(
446 (
447 "Exception while consuming the"
448 " request_iterator: %s"
449 ),
450 rpc_error,
451 )
452 return
453 else:
454 for request in request_iterator:
455 try:
456 await self._write(request)
457 except AioRpcError as rpc_error:
458 _LOGGER.debug(
459 (
460 "Exception while consuming the"
461 " request_iterator: %s"
462 ),
463 rpc_error,
464 )
465 return
467 await self._done_writing()
468 except: # pylint: disable=bare-except
469 # Client iterators can raise exceptions, which we should handle by
470 # cancelling the RPC and logging the client's error. No exceptions
471 # should escape this function.
472 _LOGGER.debug(
473 "Client request_iterator raised exception:\n%s",
474 traceback.format_exc(),
475 )
476 self.cancel()
478 async def _write(self, request: RequestType) -> None:
479 if self.done():
480 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
481 if self._done_writing_flag:
482 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
483 if not self._metadata_sent.is_set():
484 await self._metadata_sent.wait()
485 if self.done():
486 await self._raise_for_status()
488 serialized_request = _common.serialize(
489 request, self._request_serializer
490 )
491 try:
492 await self._cython_call.send_serialized_message(serialized_request)
493 except cygrpc.InternalError:
494 await self._raise_for_status()
495 except asyncio.CancelledError:
496 if not self.cancelled():
497 self.cancel()
498 raise
500 async def _done_writing(self) -> None:
501 if self.done():
502 # If the RPC is finished, do nothing.
503 return
504 if not self._done_writing_flag:
505 # If the done writing is not sent before, try to send it.
506 self._done_writing_flag = True
507 try:
508 await self._cython_call.send_receive_close()
509 except asyncio.CancelledError:
510 if not self.cancelled():
511 self.cancel()
512 raise
514 async def write(self, request: RequestType) -> None:
515 self._raise_for_different_style(_APIStyle.READER_WRITER)
516 await self._write(request)
518 async def done_writing(self) -> None:
519 """Signal peer that client is done writing.
521 This method is idempotent.
522 """
523 self._raise_for_different_style(_APIStyle.READER_WRITER)
524 await self._done_writing()
526 async def wait_for_connection(self) -> None:
527 await self._metadata_sent.wait()
528 if self.done():
529 await self._raise_for_status()
532class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
533 """Object for managing unary-unary RPC calls.
535 Returned when an instance of `UnaryUnaryMultiCallable` object is called.
536 """
538 _request: RequestType
539 _invocation_task: asyncio.Task
541 # pylint: disable=too-many-arguments
542 def __init__(
543 self,
544 request: RequestType,
545 deadline: Optional[float],
546 metadata: Metadata,
547 credentials: Optional[grpc.CallCredentials],
548 wait_for_ready: Optional[bool],
549 channel: cygrpc.AioChannel,
550 method: bytes,
551 request_serializer: SerializingFunction,
552 response_deserializer: DeserializingFunction,
553 loop: asyncio.AbstractEventLoop,
554 ) -> None:
555 super().__init__(
556 channel.call(method, deadline, credentials, wait_for_ready),
557 metadata,
558 request_serializer,
559 response_deserializer,
560 loop,
561 )
562 self._request = request
563 self._invocation_task = loop.create_task(self._invoke())
564 self._init_unary_response_mixin(self._invocation_task)
566 async def _invoke(self) -> ResponseType:
567 serialized_request = _common.serialize(
568 self._request, self._request_serializer
569 )
571 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status,
572 # because the asyncio.Task class do not cache the exception object.
573 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
574 try:
575 serialized_response = await self._cython_call.unary_unary(
576 serialized_request, self._metadata
577 )
578 except asyncio.CancelledError:
579 if not self.cancelled():
580 self.cancel()
582 if self._cython_call.is_ok():
583 return _common.deserialize(
584 serialized_response, self._response_deserializer
585 )
586 else:
587 return cygrpc.EOF
589 async def wait_for_connection(self) -> None:
590 await self._invocation_task
591 if self.done():
592 await self._raise_for_status()
595class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
596 """Object for managing unary-stream RPC calls.
598 Returned when an instance of `UnaryStreamMultiCallable` object is called.
599 """
601 _request: RequestType
602 _send_unary_request_task: asyncio.Task
604 # pylint: disable=too-many-arguments
605 def __init__(
606 self,
607 request: RequestType,
608 deadline: Optional[float],
609 metadata: Metadata,
610 credentials: Optional[grpc.CallCredentials],
611 wait_for_ready: Optional[bool],
612 channel: cygrpc.AioChannel,
613 method: bytes,
614 request_serializer: SerializingFunction,
615 response_deserializer: DeserializingFunction,
616 loop: asyncio.AbstractEventLoop,
617 ) -> None:
618 super().__init__(
619 channel.call(method, deadline, credentials, wait_for_ready),
620 metadata,
621 request_serializer,
622 response_deserializer,
623 loop,
624 )
625 self._request = request
626 self._send_unary_request_task = loop.create_task(
627 self._send_unary_request()
628 )
629 self._init_stream_response_mixin(self._send_unary_request_task)
631 async def _send_unary_request(self) -> ResponseType:
632 serialized_request = _common.serialize(
633 self._request, self._request_serializer
634 )
635 try:
636 await self._cython_call.initiate_unary_stream(
637 serialized_request, self._metadata
638 )
639 except asyncio.CancelledError:
640 if not self.cancelled():
641 self.cancel()
642 raise
644 async def wait_for_connection(self) -> None:
645 await self._send_unary_request_task
646 if self.done():
647 await self._raise_for_status()
650# pylint: disable=too-many-ancestors
651class StreamUnaryCall(
652 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall
653):
654 """Object for managing stream-unary RPC calls.
656 Returned when an instance of `StreamUnaryMultiCallable` object is called.
657 """
659 # pylint: disable=too-many-arguments
660 def __init__(
661 self,
662 request_iterator: Optional[RequestIterableType],
663 deadline: Optional[float],
664 metadata: Metadata,
665 credentials: Optional[grpc.CallCredentials],
666 wait_for_ready: Optional[bool],
667 channel: cygrpc.AioChannel,
668 method: bytes,
669 request_serializer: SerializingFunction,
670 response_deserializer: DeserializingFunction,
671 loop: asyncio.AbstractEventLoop,
672 ) -> None:
673 super().__init__(
674 channel.call(method, deadline, credentials, wait_for_ready),
675 metadata,
676 request_serializer,
677 response_deserializer,
678 loop,
679 )
681 self._init_stream_request_mixin(request_iterator)
682 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
684 async def _conduct_rpc(self) -> ResponseType:
685 try:
686 serialized_response = await self._cython_call.stream_unary(
687 self._metadata, self._metadata_sent_observer
688 )
689 except asyncio.CancelledError:
690 if not self.cancelled():
691 self.cancel()
692 raise
694 if self._cython_call.is_ok():
695 return _common.deserialize(
696 serialized_response, self._response_deserializer
697 )
698 else:
699 return cygrpc.EOF
702class StreamStreamCall(
703 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall
704):
705 """Object for managing stream-stream RPC calls.
707 Returned when an instance of `StreamStreamMultiCallable` object is called.
708 """
710 _initializer: asyncio.Task
712 # pylint: disable=too-many-arguments
713 def __init__(
714 self,
715 request_iterator: Optional[RequestIterableType],
716 deadline: Optional[float],
717 metadata: Metadata,
718 credentials: Optional[grpc.CallCredentials],
719 wait_for_ready: Optional[bool],
720 channel: cygrpc.AioChannel,
721 method: bytes,
722 request_serializer: SerializingFunction,
723 response_deserializer: DeserializingFunction,
724 loop: asyncio.AbstractEventLoop,
725 ) -> None:
726 super().__init__(
727 channel.call(method, deadline, credentials, wait_for_ready),
728 metadata,
729 request_serializer,
730 response_deserializer,
731 loop,
732 )
733 self._initializer = self._loop.create_task(self._prepare_rpc())
734 self._init_stream_request_mixin(request_iterator)
735 self._init_stream_response_mixin(self._initializer)
737 async def _prepare_rpc(self):
738 """This method prepares the RPC for receiving/sending messages.
740 All other operations around the stream should only happen after the
741 completion of this method.
742 """
743 try:
744 await self._cython_call.initiate_stream_stream(
745 self._metadata, self._metadata_sent_observer
746 )
747 except asyncio.CancelledError:
748 if not self.cancelled():
749 self.cancel()
750 # No need to raise RpcError here, because no one will `await` this task.