Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import enum
18from functools import partial
19import inspect
20import logging
21import traceback
22from typing import (
23 Any,
24 AsyncIterator,
25 Generator,
26 Generic,
27 Optional,
28 Tuple,
29 Union,
30)
32import grpc
33from grpc import _common
34from grpc._cython import cygrpc
36from . import _base_call
37from ._metadata import Metadata
38from ._typing import DeserializingFunction
39from ._typing import DoneCallbackType
40from ._typing import EOFType
41from ._typing import MetadataType
42from ._typing import MetadatumType
43from ._typing import RequestIterableType
44from ._typing import RequestType
45from ._typing import ResponseType
46from ._typing import SerializingFunction
48__all__ = "AioRpcError", "Call", "UnaryUnaryCall", "UnaryStreamCall"
50_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!"
51_GC_CANCELLATION_DETAILS = "Cancelled upon garbage collection!"
52_RPC_ALREADY_FINISHED_DETAILS = "RPC already finished."
53_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".'
54_API_STYLE_ERROR = (
55 "The iterator and read/write APIs may not be mixed on a single RPC."
56)
58_OK_CALL_REPRESENTATION = (
59 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
60)
62_NON_OK_CALL_REPRESENTATION = (
63 "<{} of RPC that terminated with:\n"
64 "\tstatus = {}\n"
65 '\tdetails = "{}"\n'
66 '\tdebug_error_string = "{}"\n'
67 ">"
68)
70_LOGGER = logging.getLogger(__name__)
73class AioRpcError(grpc.RpcError):
74 """An implementation of RpcError to be used by the asynchronous API.
76 Raised RpcError is a snapshot of the final status of the RPC, values are
77 determined. Hence, its methods no longer needs to be coroutines.
78 """
80 _code: grpc.StatusCode
81 _details: Optional[str]
82 _initial_metadata: Optional[Metadata]
83 _trailing_metadata: Optional[Metadata]
84 _debug_error_string: Optional[str]
86 def __init__(
87 self,
88 code: grpc.StatusCode,
89 initial_metadata: Metadata,
90 trailing_metadata: Metadata,
91 details: Optional[str] = None,
92 debug_error_string: Optional[str] = None,
93 ) -> None:
94 """Constructor.
96 Args:
97 code: The status code with which the RPC has been finalized.
98 initial_metadata: Optional initial metadata that could be sent by the
99 Server.
100 trailing_metadata: Optional metadata that could be sent by the Server.
101 details: Optional details explaining the reason of the error.
102 debug_error_string: Optional string
103 """
104 super().__init__()
105 self._code = code
106 self._details = details
107 self._initial_metadata = initial_metadata
108 self._trailing_metadata = trailing_metadata
109 self._debug_error_string = debug_error_string
111 def code(self) -> grpc.StatusCode:
112 """Accesses the status code sent by the server.
114 Returns:
115 The `grpc.StatusCode` status code.
116 """
117 return self._code
119 def details(self) -> Optional[str]:
120 """Accesses the details sent by the server.
122 Returns:
123 The description of the error.
124 """
125 return self._details
127 def initial_metadata(self) -> Metadata:
128 """Accesses the initial metadata sent by the server.
130 Returns:
131 The initial metadata received.
132 """
133 return self._initial_metadata
135 def trailing_metadata(self) -> Metadata:
136 """Accesses the trailing metadata sent by the server.
138 Returns:
139 The trailing metadata received.
140 """
141 return self._trailing_metadata
143 def debug_error_string(self) -> str:
144 """Accesses the debug error string sent by the server.
146 Returns:
147 The debug error string received.
148 """
149 return self._debug_error_string
151 def _repr(self) -> str:
152 """Assembles the error string for the RPC error."""
153 return _NON_OK_CALL_REPRESENTATION.format(
154 self.__class__.__name__,
155 self._code,
156 self._details,
157 self._debug_error_string,
158 )
160 def __repr__(self) -> str:
161 return self._repr()
163 def __str__(self) -> str:
164 return self._repr()
166 def __reduce__(self):
167 return (
168 type(self),
169 (
170 self._code,
171 self._initial_metadata,
172 self._trailing_metadata,
173 self._details,
174 self._debug_error_string,
175 ),
176 )
179def _create_rpc_error(
180 initial_metadata: MetadataType,
181 status: cygrpc.AioRpcStatus,
182) -> AioRpcError:
183 return AioRpcError(
184 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],
185 Metadata.from_tuple(initial_metadata),
186 Metadata.from_tuple(status.trailing_metadata()),
187 details=status.details(),
188 debug_error_string=status.debug_error_string(),
189 )
192class Call:
193 """Base implementation of client RPC Call object.
195 Implements logic around final status, metadata and cancellation.
196 """
198 _loop: asyncio.AbstractEventLoop
199 _code: grpc.StatusCode
200 _cython_call: cygrpc._AioCall
201 _metadata: Tuple[MetadatumType, ...]
202 _request_serializer: Optional[SerializingFunction]
203 _response_deserializer: Optional[DeserializingFunction]
205 def __init__(
206 self,
207 cython_call: cygrpc._AioCall,
208 metadata: Metadata,
209 request_serializer: Optional[SerializingFunction],
210 response_deserializer: Optional[DeserializingFunction],
211 loop: asyncio.AbstractEventLoop,
212 ) -> None:
213 self._loop = loop
214 self._cython_call = cython_call
215 self._metadata = tuple(metadata)
216 self._request_serializer = request_serializer
217 self._response_deserializer = response_deserializer
219 def __del__(self) -> None:
220 # The '_cython_call' object might be destructed before Call object
221 if hasattr(self, "_cython_call"):
222 if not self._cython_call.done():
223 self._cancel(_GC_CANCELLATION_DETAILS)
225 def cancelled(self) -> bool:
226 return self._cython_call.cancelled()
228 def _cancel(self, details: str) -> bool:
229 """Forwards the application cancellation reasoning."""
230 if not self._cython_call.done():
231 self._cython_call.cancel(details)
232 return True
233 else:
234 return False
236 def cancel(self) -> bool:
237 return self._cancel(_LOCAL_CANCELLATION_DETAILS)
239 def done(self) -> bool:
240 return self._cython_call.done()
242 def add_done_callback(self, callback: DoneCallbackType) -> None:
243 cb = partial(callback, self)
244 self._cython_call.add_done_callback(cb)
246 def time_remaining(self) -> Optional[float]:
247 return self._cython_call.time_remaining()
249 async def initial_metadata(self) -> Metadata:
250 raw_metadata_tuple = await self._cython_call.initial_metadata()
251 return Metadata.from_tuple(raw_metadata_tuple)
253 async def trailing_metadata(self) -> Metadata:
254 raw_metadata_tuple = (
255 await self._cython_call.status()
256 ).trailing_metadata()
257 return Metadata.from_tuple(raw_metadata_tuple)
259 async def code(self) -> grpc.StatusCode:
260 cygrpc_code = (await self._cython_call.status()).code()
261 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]
263 async def details(self) -> str:
264 return (await self._cython_call.status()).details()
266 async def debug_error_string(self) -> str:
267 return (await self._cython_call.status()).debug_error_string()
269 async def _raise_for_status(self) -> None:
270 if self._cython_call.is_locally_cancelled():
271 raise asyncio.CancelledError()
272 code = await self.code()
273 if code != grpc.StatusCode.OK:
274 raise _create_rpc_error(
275 await self.initial_metadata(),
276 await self._cython_call.status(),
277 )
279 def _repr(self) -> str:
280 return repr(self._cython_call)
282 def __repr__(self) -> str:
283 return self._repr()
285 def __str__(self) -> str:
286 return self._repr()
289class _APIStyle(enum.IntEnum):
290 UNKNOWN = 0
291 ASYNC_GENERATOR = 1
292 READER_WRITER = 2
295class _UnaryResponseMixin(Call, Generic[ResponseType]):
296 _call_response: asyncio.Task
298 def _init_unary_response_mixin(self, response_task: asyncio.Task):
299 self._call_response = response_task
301 def cancel(self) -> bool:
302 if super().cancel():
303 self._call_response.cancel()
304 return True
305 else:
306 return False
308 def __await__(self) -> Generator[Any, None, ResponseType]:
309 """Wait till the ongoing RPC request finishes."""
310 try:
311 response = yield from self._call_response
312 except asyncio.CancelledError:
313 # Even if we caught all other CancelledError, there is still
314 # this corner case. If the application cancels immediately after
315 # the Call object is created, we will observe this
316 # `CancelledError`.
317 if not self.cancelled():
318 self.cancel()
319 raise
321 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
322 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
323 # Instead, if we move the exception raising here, the spam stops.
324 # Unfortunately, there can only be one 'yield from' in '__await__'. So,
325 # we need to access the private instance variable.
326 if response is cygrpc.EOF:
327 if self._cython_call.is_locally_cancelled():
328 raise asyncio.CancelledError()
329 else:
330 raise _create_rpc_error(
331 self._cython_call._initial_metadata,
332 self._cython_call._status,
333 )
334 else:
335 return response
338class _StreamResponseMixin(Call):
339 _message_aiter: AsyncIterator[ResponseType]
340 _preparation: asyncio.Task
341 _response_style: _APIStyle
343 def _init_stream_response_mixin(self, preparation: asyncio.Task):
344 self._message_aiter = None
345 self._preparation = preparation
346 self._response_style = _APIStyle.UNKNOWN
348 def _update_response_style(self, style: _APIStyle):
349 if self._response_style is _APIStyle.UNKNOWN:
350 self._response_style = style
351 elif self._response_style is not style:
352 raise cygrpc.UsageError(_API_STYLE_ERROR)
354 def cancel(self) -> bool:
355 if super().cancel():
356 self._preparation.cancel()
357 return True
358 else:
359 return False
361 async def _fetch_stream_responses(self) -> ResponseType:
362 message = await self._read()
363 while message is not cygrpc.EOF:
364 yield message
365 message = await self._read()
367 # If the read operation failed, Core should explain why.
368 await self._raise_for_status()
370 def __aiter__(self) -> AsyncIterator[ResponseType]:
371 self._update_response_style(_APIStyle.ASYNC_GENERATOR)
372 if self._message_aiter is None:
373 self._message_aiter = self._fetch_stream_responses()
374 return self._message_aiter
376 async def _read(self) -> ResponseType:
377 # Wait for the request being sent
378 await self._preparation
380 # Reads response message from Core
381 try:
382 raw_response = await self._cython_call.receive_serialized_message()
383 except asyncio.CancelledError:
384 if not self.cancelled():
385 self.cancel()
386 raise
388 if raw_response is cygrpc.EOF:
389 return cygrpc.EOF
390 else:
391 return _common.deserialize(
392 raw_response, self._response_deserializer
393 )
395 async def read(self) -> Union[EOFType, ResponseType]:
396 if self.done():
397 await self._raise_for_status()
398 return cygrpc.EOF
399 self._update_response_style(_APIStyle.READER_WRITER)
401 response_message = await self._read()
403 if response_message is cygrpc.EOF:
404 # If the read operation failed, Core should explain why.
405 await self._raise_for_status()
406 return response_message
409class _StreamRequestMixin(Call):
410 _metadata_sent: asyncio.Event
411 _done_writing_flag: bool
412 _async_request_poller: Optional[asyncio.Task]
413 _request_style: _APIStyle
415 def _init_stream_request_mixin(
416 self, request_iterator: Optional[RequestIterableType]
417 ):
418 self._metadata_sent = asyncio.Event()
419 self._done_writing_flag = False
421 # If user passes in an async iterator, create a consumer Task.
422 if request_iterator is not None:
423 self._async_request_poller = self._loop.create_task(
424 self._consume_request_iterator(request_iterator)
425 )
426 self._request_style = _APIStyle.ASYNC_GENERATOR
427 else:
428 self._async_request_poller = None
429 self._request_style = _APIStyle.READER_WRITER
431 def _raise_for_different_style(self, style: _APIStyle):
432 if self._request_style is not style:
433 raise cygrpc.UsageError(_API_STYLE_ERROR)
435 def cancel(self) -> bool:
436 if super().cancel():
437 if self._async_request_poller is not None:
438 self._async_request_poller.cancel()
439 return True
440 else:
441 return False
443 def _metadata_sent_observer(self):
444 self._metadata_sent.set()
446 async def _consume_request_iterator(
447 self, request_iterator: RequestIterableType
448 ) -> None:
449 try:
450 if inspect.isasyncgen(request_iterator) or hasattr(
451 request_iterator, "__aiter__"
452 ):
453 async for request in request_iterator:
454 try:
455 await self._write(request)
456 except AioRpcError as rpc_error:
457 _LOGGER.debug(
458 (
459 "Exception while consuming the"
460 " request_iterator: %s"
461 ),
462 rpc_error,
463 )
464 return
465 else:
466 for request in request_iterator:
467 try:
468 await self._write(request)
469 except AioRpcError as rpc_error:
470 _LOGGER.debug(
471 (
472 "Exception while consuming the"
473 " request_iterator: %s"
474 ),
475 rpc_error,
476 )
477 return
479 await self._done_writing()
480 except: # pylint: disable=bare-except # noqa: E722
481 # Client iterators can raise exceptions, which we should handle by
482 # cancelling the RPC and logging the client's error. No exceptions
483 # should escape this function.
484 _LOGGER.debug(
485 "Client request_iterator raised exception:\n%s",
486 traceback.format_exc(),
487 )
488 self.cancel()
490 async def _write(self, request: RequestType) -> None:
491 if self.done():
492 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
493 if self._done_writing_flag:
494 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
495 if not self._metadata_sent.is_set():
496 await self._metadata_sent.wait()
497 if self.done():
498 await self._raise_for_status()
500 serialized_request = _common.serialize(
501 request, self._request_serializer
502 )
503 try:
504 await self._cython_call.send_serialized_message(serialized_request)
505 except cygrpc.InternalError as err:
506 self._cython_call.set_internal_error(str(err))
507 await self._raise_for_status()
508 except asyncio.CancelledError:
509 if not self.cancelled():
510 self.cancel()
511 raise
513 async def _done_writing(self) -> None:
514 if self.done():
515 # If the RPC is finished, do nothing.
516 return
517 if not self._done_writing_flag:
518 # If the done writing is not sent before, try to send it.
519 self._done_writing_flag = True
520 try:
521 await self._cython_call.send_receive_close()
522 except asyncio.CancelledError:
523 if not self.cancelled():
524 self.cancel()
525 raise
527 async def write(self, request: RequestType) -> None:
528 self._raise_for_different_style(_APIStyle.READER_WRITER)
529 await self._write(request)
531 async def done_writing(self) -> None:
532 """Signal peer that client is done writing.
534 This method is idempotent.
535 """
536 self._raise_for_different_style(_APIStyle.READER_WRITER)
537 await self._done_writing()
539 async def wait_for_connection(self) -> None:
540 await self._metadata_sent.wait()
541 if self.done():
542 await self._raise_for_status()
545class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
546 """Object for managing unary-unary RPC calls.
548 Returned when an instance of `UnaryUnaryMultiCallable` object is called.
549 """
551 _request: RequestType
552 _invocation_task: asyncio.Task
554 # pylint: disable=too-many-arguments
555 def __init__(
556 self,
557 request: RequestType,
558 deadline: Optional[float],
559 metadata: Metadata,
560 credentials: Optional[grpc.CallCredentials],
561 wait_for_ready: Optional[bool],
562 channel: cygrpc.AioChannel,
563 method: bytes,
564 request_serializer: Optional[SerializingFunction],
565 response_deserializer: Optional[DeserializingFunction],
566 loop: asyncio.AbstractEventLoop,
567 ) -> None:
568 super().__init__(
569 channel.call(method, deadline, credentials, wait_for_ready),
570 metadata,
571 request_serializer,
572 response_deserializer,
573 loop,
574 )
575 self._request = request
576 self._context = cygrpc.build_census_context()
577 self._invocation_task = loop.create_task(self._invoke())
578 self._init_unary_response_mixin(self._invocation_task)
580 async def _invoke(self) -> ResponseType:
581 serialized_request = _common.serialize(
582 self._request, self._request_serializer
583 )
585 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status,
586 # because the asyncio.Task class do not cache the exception object.
587 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
588 try:
589 serialized_response = await self._cython_call.unary_unary(
590 serialized_request, self._metadata, self._context
591 )
592 except asyncio.CancelledError:
593 if not self.cancelled():
594 self.cancel()
596 if self._cython_call.is_ok():
597 return _common.deserialize(
598 serialized_response, self._response_deserializer
599 )
600 else:
601 return cygrpc.EOF
603 async def wait_for_connection(self) -> None:
604 await self._invocation_task
605 if self.done():
606 await self._raise_for_status()
609class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
610 """Object for managing unary-stream RPC calls.
612 Returned when an instance of `UnaryStreamMultiCallable` object is called.
613 """
615 _request: RequestType
616 _send_unary_request_task: asyncio.Task
618 # pylint: disable=too-many-arguments
619 def __init__(
620 self,
621 request: RequestType,
622 deadline: Optional[float],
623 metadata: Metadata,
624 credentials: Optional[grpc.CallCredentials],
625 wait_for_ready: Optional[bool],
626 channel: cygrpc.AioChannel,
627 method: bytes,
628 request_serializer: Optional[SerializingFunction],
629 response_deserializer: Optional[DeserializingFunction],
630 loop: asyncio.AbstractEventLoop,
631 ) -> None:
632 super().__init__(
633 channel.call(method, deadline, credentials, wait_for_ready),
634 metadata,
635 request_serializer,
636 response_deserializer,
637 loop,
638 )
639 self._request = request
640 self._context = cygrpc.build_census_context()
641 self._send_unary_request_task = loop.create_task(
642 self._send_unary_request()
643 )
644 self._init_stream_response_mixin(self._send_unary_request_task)
646 async def _send_unary_request(self) -> ResponseType:
647 serialized_request = _common.serialize(
648 self._request, self._request_serializer
649 )
650 try:
651 await self._cython_call.initiate_unary_stream(
652 serialized_request, self._metadata, self._context
653 )
654 except asyncio.CancelledError:
655 if not self.cancelled():
656 self.cancel()
657 raise
659 async def wait_for_connection(self) -> None:
660 await self._send_unary_request_task
661 if self.done():
662 await self._raise_for_status()
665# pylint: disable=too-many-ancestors
666class StreamUnaryCall(
667 _StreamRequestMixin, _UnaryResponseMixin, Call, _base_call.StreamUnaryCall
668):
669 """Object for managing stream-unary RPC calls.
671 Returned when an instance of `StreamUnaryMultiCallable` object is called.
672 """
674 # pylint: disable=too-many-arguments
675 def __init__(
676 self,
677 request_iterator: Optional[RequestIterableType],
678 deadline: Optional[float],
679 metadata: Metadata,
680 credentials: Optional[grpc.CallCredentials],
681 wait_for_ready: Optional[bool],
682 channel: cygrpc.AioChannel,
683 method: bytes,
684 request_serializer: Optional[SerializingFunction],
685 response_deserializer: Optional[DeserializingFunction],
686 loop: asyncio.AbstractEventLoop,
687 ) -> None:
688 super().__init__(
689 channel.call(method, deadline, credentials, wait_for_ready),
690 metadata,
691 request_serializer,
692 response_deserializer,
693 loop,
694 )
696 self._context = cygrpc.build_census_context()
697 self._init_stream_request_mixin(request_iterator)
698 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
700 async def _conduct_rpc(self) -> ResponseType:
701 try:
702 serialized_response = await self._cython_call.stream_unary(
703 self._metadata, self._metadata_sent_observer, self._context
704 )
705 except asyncio.CancelledError:
706 if not self.cancelled():
707 self.cancel()
708 raise
710 if self._cython_call.is_ok():
711 return _common.deserialize(
712 serialized_response, self._response_deserializer
713 )
714 else:
715 return cygrpc.EOF
718class StreamStreamCall(
719 _StreamRequestMixin, _StreamResponseMixin, Call, _base_call.StreamStreamCall
720):
721 """Object for managing stream-stream RPC calls.
723 Returned when an instance of `StreamStreamMultiCallable` object is called.
724 """
726 _initializer: asyncio.Task
728 # pylint: disable=too-many-arguments
729 def __init__(
730 self,
731 request_iterator: Optional[RequestIterableType],
732 deadline: Optional[float],
733 metadata: Metadata,
734 credentials: Optional[grpc.CallCredentials],
735 wait_for_ready: Optional[bool],
736 channel: cygrpc.AioChannel,
737 method: bytes,
738 request_serializer: Optional[SerializingFunction],
739 response_deserializer: Optional[DeserializingFunction],
740 loop: asyncio.AbstractEventLoop,
741 ) -> None:
742 super().__init__(
743 channel.call(method, deadline, credentials, wait_for_ready),
744 metadata,
745 request_serializer,
746 response_deserializer,
747 loop,
748 )
749 self._context = cygrpc.build_census_context()
750 self._initializer = self._loop.create_task(self._prepare_rpc())
751 self._init_stream_request_mixin(request_iterator)
752 self._init_stream_response_mixin(self._initializer)
754 async def _prepare_rpc(self):
755 """Prepares the RPC for receiving/sending messages.
757 All other operations around the stream should only happen after the
758 completion of this method.
759 """
760 try:
761 await self._cython_call.initiate_stream_stream(
762 self._metadata, self._metadata_sent_observer, self._context
763 )
764 except asyncio.CancelledError:
765 if not self.cancelled():
766 self.cancel()
767 # No need to raise RpcError here, because no one will `await` this task.