Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_call.py: 36%
343 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Asyncio Python."""
16import asyncio
17import enum
18from functools import partial
19import inspect
20import logging
21import traceback
22from typing import AsyncIterable, Optional, Tuple
24import grpc
25from grpc import _common
26from grpc._cython import cygrpc
28from . import _base_call
29from ._metadata import Metadata
30from ._typing import DeserializingFunction
31from ._typing import DoneCallbackType
32from ._typing import MetadatumType
33from ._typing import RequestIterableType
34from ._typing import RequestType
35from ._typing import ResponseType
36from ._typing import SerializingFunction
38__all__ = 'AioRpcError', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall'
40_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!'
41_GC_CANCELLATION_DETAILS = 'Cancelled upon garbage collection!'
42_RPC_ALREADY_FINISHED_DETAILS = 'RPC already finished.'
43_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".'
44_API_STYLE_ERROR = 'The iterator and read/write APIs may not be mixed on a single RPC.'
46_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
47 '\tstatus = {}\n'
48 '\tdetails = "{}"\n'
49 '>')
51_NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
52 '\tstatus = {}\n'
53 '\tdetails = "{}"\n'
54 '\tdebug_error_string = "{}"\n'
55 '>')
57_LOGGER = logging.getLogger(__name__)
60class AioRpcError(grpc.RpcError):
61 """An implementation of RpcError to be used by the asynchronous API.
63 Raised RpcError is a snapshot of the final status of the RPC, values are
64 determined. Hence, its methods no longer needs to be coroutines.
65 """
67 _code: grpc.StatusCode
68 _details: Optional[str]
69 _initial_metadata: Optional[Metadata]
70 _trailing_metadata: Optional[Metadata]
71 _debug_error_string: Optional[str]
73 def __init__(self,
74 code: grpc.StatusCode,
75 initial_metadata: Metadata,
76 trailing_metadata: Metadata,
77 details: Optional[str] = None,
78 debug_error_string: Optional[str] = None) -> None:
79 """Constructor.
81 Args:
82 code: The status code with which the RPC has been finalized.
83 details: Optional details explaining the reason of the error.
84 initial_metadata: Optional initial metadata that could be sent by the
85 Server.
86 trailing_metadata: Optional metadata that could be sent by the Server.
87 """
89 super().__init__(self)
90 self._code = code
91 self._details = details
92 self._initial_metadata = initial_metadata
93 self._trailing_metadata = trailing_metadata
94 self._debug_error_string = debug_error_string
96 def code(self) -> grpc.StatusCode:
97 """Accesses the status code sent by the server.
99 Returns:
100 The `grpc.StatusCode` status code.
101 """
102 return self._code
104 def details(self) -> Optional[str]:
105 """Accesses the details sent by the server.
107 Returns:
108 The description of the error.
109 """
110 return self._details
112 def initial_metadata(self) -> Metadata:
113 """Accesses the initial metadata sent by the server.
115 Returns:
116 The initial metadata received.
117 """
118 return self._initial_metadata
120 def trailing_metadata(self) -> Metadata:
121 """Accesses the trailing metadata sent by the server.
123 Returns:
124 The trailing metadata received.
125 """
126 return self._trailing_metadata
128 def debug_error_string(self) -> str:
129 """Accesses the debug error string sent by the server.
131 Returns:
132 The debug error string received.
133 """
134 return self._debug_error_string
136 def _repr(self) -> str:
137 """Assembles the error string for the RPC error."""
138 return _NON_OK_CALL_REPRESENTATION.format(self.__class__.__name__,
139 self._code, self._details,
140 self._debug_error_string)
142 def __repr__(self) -> str:
143 return self._repr()
145 def __str__(self) -> str:
146 return self._repr()
149def _create_rpc_error(initial_metadata: Metadata,
150 status: cygrpc.AioRpcStatus) -> AioRpcError:
151 return AioRpcError(
152 _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],
153 Metadata.from_tuple(initial_metadata),
154 Metadata.from_tuple(status.trailing_metadata()),
155 details=status.details(),
156 debug_error_string=status.debug_error_string(),
157 )
160class Call:
161 """Base implementation of client RPC Call object.
163 Implements logic around final status, metadata and cancellation.
164 """
165 _loop: asyncio.AbstractEventLoop
166 _code: grpc.StatusCode
167 _cython_call: cygrpc._AioCall
168 _metadata: Tuple[MetadatumType, ...]
169 _request_serializer: SerializingFunction
170 _response_deserializer: DeserializingFunction
172 def __init__(self, cython_call: cygrpc._AioCall, metadata: Metadata,
173 request_serializer: SerializingFunction,
174 response_deserializer: DeserializingFunction,
175 loop: asyncio.AbstractEventLoop) -> None:
176 self._loop = loop
177 self._cython_call = cython_call
178 self._metadata = tuple(metadata)
179 self._request_serializer = request_serializer
180 self._response_deserializer = response_deserializer
182 def __del__(self) -> None:
183 # The '_cython_call' object might be destructed before Call object
184 if hasattr(self, '_cython_call'):
185 if not self._cython_call.done():
186 self._cancel(_GC_CANCELLATION_DETAILS)
188 def cancelled(self) -> bool:
189 return self._cython_call.cancelled()
191 def _cancel(self, details: str) -> bool:
192 """Forwards the application cancellation reasoning."""
193 if not self._cython_call.done():
194 self._cython_call.cancel(details)
195 return True
196 else:
197 return False
199 def cancel(self) -> bool:
200 return self._cancel(_LOCAL_CANCELLATION_DETAILS)
202 def done(self) -> bool:
203 return self._cython_call.done()
205 def add_done_callback(self, callback: DoneCallbackType) -> None:
206 cb = partial(callback, self)
207 self._cython_call.add_done_callback(cb)
209 def time_remaining(self) -> Optional[float]:
210 return self._cython_call.time_remaining()
212 async def initial_metadata(self) -> Metadata:
213 raw_metadata_tuple = await self._cython_call.initial_metadata()
214 return Metadata.from_tuple(raw_metadata_tuple)
216 async def trailing_metadata(self) -> Metadata:
217 raw_metadata_tuple = (await
218 self._cython_call.status()).trailing_metadata()
219 return Metadata.from_tuple(raw_metadata_tuple)
221 async def code(self) -> grpc.StatusCode:
222 cygrpc_code = (await self._cython_call.status()).code()
223 return _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]
225 async def details(self) -> str:
226 return (await self._cython_call.status()).details()
228 async def debug_error_string(self) -> str:
229 return (await self._cython_call.status()).debug_error_string()
231 async def _raise_for_status(self) -> None:
232 if self._cython_call.is_locally_cancelled():
233 raise asyncio.CancelledError()
234 code = await self.code()
235 if code != grpc.StatusCode.OK:
236 raise _create_rpc_error(await self.initial_metadata(), await
237 self._cython_call.status())
239 def _repr(self) -> str:
240 return repr(self._cython_call)
242 def __repr__(self) -> str:
243 return self._repr()
245 def __str__(self) -> str:
246 return self._repr()
249class _APIStyle(enum.IntEnum):
250 UNKNOWN = 0
251 ASYNC_GENERATOR = 1
252 READER_WRITER = 2
255class _UnaryResponseMixin(Call):
256 _call_response: asyncio.Task
258 def _init_unary_response_mixin(self, response_task: asyncio.Task):
259 self._call_response = response_task
261 def cancel(self) -> bool:
262 if super().cancel():
263 self._call_response.cancel()
264 return True
265 else:
266 return False
268 def __await__(self) -> ResponseType:
269 """Wait till the ongoing RPC request finishes."""
270 try:
271 response = yield from self._call_response
272 except asyncio.CancelledError:
273 # Even if we caught all other CancelledError, there is still
274 # this corner case. If the application cancels immediately after
275 # the Call object is created, we will observe this
276 # `CancelledError`.
277 if not self.cancelled():
278 self.cancel()
279 raise
281 # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
282 # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
283 # Instead, if we move the exception raising here, the spam stops.
284 # Unfortunately, there can only be one 'yield from' in '__await__'. So,
285 # we need to access the private instance variable.
286 if response is cygrpc.EOF:
287 if self._cython_call.is_locally_cancelled():
288 raise asyncio.CancelledError()
289 else:
290 raise _create_rpc_error(self._cython_call._initial_metadata,
291 self._cython_call._status)
292 else:
293 return response
296class _StreamResponseMixin(Call):
297 _message_aiter: AsyncIterable[ResponseType]
298 _preparation: asyncio.Task
299 _response_style: _APIStyle
301 def _init_stream_response_mixin(self, preparation: asyncio.Task):
302 self._message_aiter = None
303 self._preparation = preparation
304 self._response_style = _APIStyle.UNKNOWN
306 def _update_response_style(self, style: _APIStyle):
307 if self._response_style is _APIStyle.UNKNOWN:
308 self._response_style = style
309 elif self._response_style is not style:
310 raise cygrpc.UsageError(_API_STYLE_ERROR)
312 def cancel(self) -> bool:
313 if super().cancel():
314 self._preparation.cancel()
315 return True
316 else:
317 return False
319 async def _fetch_stream_responses(self) -> ResponseType:
320 message = await self._read()
321 while message is not cygrpc.EOF:
322 yield message
323 message = await self._read()
325 # If the read operation failed, Core should explain why.
326 await self._raise_for_status()
328 def __aiter__(self) -> AsyncIterable[ResponseType]:
329 self._update_response_style(_APIStyle.ASYNC_GENERATOR)
330 if self._message_aiter is None:
331 self._message_aiter = self._fetch_stream_responses()
332 return self._message_aiter
334 async def _read(self) -> ResponseType:
335 # Wait for the request being sent
336 await self._preparation
338 # Reads response message from Core
339 try:
340 raw_response = await self._cython_call.receive_serialized_message()
341 except asyncio.CancelledError:
342 if not self.cancelled():
343 self.cancel()
344 await self._raise_for_status()
346 if raw_response is cygrpc.EOF:
347 return cygrpc.EOF
348 else:
349 return _common.deserialize(raw_response,
350 self._response_deserializer)
352 async def read(self) -> ResponseType:
353 if self.done():
354 await self._raise_for_status()
355 return cygrpc.EOF
356 self._update_response_style(_APIStyle.READER_WRITER)
358 response_message = await self._read()
360 if response_message is cygrpc.EOF:
361 # If the read operation failed, Core should explain why.
362 await self._raise_for_status()
363 return response_message
366class _StreamRequestMixin(Call):
367 _metadata_sent: asyncio.Event
368 _done_writing_flag: bool
369 _async_request_poller: Optional[asyncio.Task]
370 _request_style: _APIStyle
372 def _init_stream_request_mixin(
373 self, request_iterator: Optional[RequestIterableType]):
374 self._metadata_sent = asyncio.Event()
375 self._done_writing_flag = False
377 # If user passes in an async iterator, create a consumer Task.
378 if request_iterator is not None:
379 self._async_request_poller = self._loop.create_task(
380 self._consume_request_iterator(request_iterator))
381 self._request_style = _APIStyle.ASYNC_GENERATOR
382 else:
383 self._async_request_poller = None
384 self._request_style = _APIStyle.READER_WRITER
386 def _raise_for_different_style(self, style: _APIStyle):
387 if self._request_style is not style:
388 raise cygrpc.UsageError(_API_STYLE_ERROR)
390 def cancel(self) -> bool:
391 if super().cancel():
392 if self._async_request_poller is not None:
393 self._async_request_poller.cancel()
394 return True
395 else:
396 return False
398 def _metadata_sent_observer(self):
399 self._metadata_sent.set()
401 async def _consume_request_iterator(
402 self, request_iterator: RequestIterableType) -> None:
403 try:
404 if inspect.isasyncgen(request_iterator) or hasattr(
405 request_iterator, '__aiter__'):
406 async for request in request_iterator:
407 try:
408 await self._write(request)
409 except AioRpcError as rpc_error:
410 _LOGGER.debug(
411 'Exception while consuming the request_iterator: %s',
412 rpc_error)
413 return
414 else:
415 for request in request_iterator:
416 try:
417 await self._write(request)
418 except AioRpcError as rpc_error:
419 _LOGGER.debug(
420 'Exception while consuming the request_iterator: %s',
421 rpc_error)
422 return
424 await self._done_writing()
425 except: # pylint: disable=bare-except
426 # Client iterators can raise exceptions, which we should handle by
427 # cancelling the RPC and logging the client's error. No exceptions
428 # should escape this function.
429 _LOGGER.debug('Client request_iterator raised exception:\n%s',
430 traceback.format_exc())
431 self.cancel()
433 async def _write(self, request: RequestType) -> None:
434 if self.done():
435 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
436 if self._done_writing_flag:
437 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
438 if not self._metadata_sent.is_set():
439 await self._metadata_sent.wait()
440 if self.done():
441 await self._raise_for_status()
443 serialized_request = _common.serialize(request,
444 self._request_serializer)
445 try:
446 await self._cython_call.send_serialized_message(serialized_request)
447 except cygrpc.InternalError:
448 await self._raise_for_status()
449 except asyncio.CancelledError:
450 if not self.cancelled():
451 self.cancel()
452 await self._raise_for_status()
454 async def _done_writing(self) -> None:
455 if self.done():
456 # If the RPC is finished, do nothing.
457 return
458 if not self._done_writing_flag:
459 # If the done writing is not sent before, try to send it.
460 self._done_writing_flag = True
461 try:
462 await self._cython_call.send_receive_close()
463 except asyncio.CancelledError:
464 if not self.cancelled():
465 self.cancel()
466 await self._raise_for_status()
468 async def write(self, request: RequestType) -> None:
469 self._raise_for_different_style(_APIStyle.READER_WRITER)
470 await self._write(request)
472 async def done_writing(self) -> None:
473 """Signal peer that client is done writing.
475 This method is idempotent.
476 """
477 self._raise_for_different_style(_APIStyle.READER_WRITER)
478 await self._done_writing()
480 async def wait_for_connection(self) -> None:
481 await self._metadata_sent.wait()
482 if self.done():
483 await self._raise_for_status()
486class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
487 """Object for managing unary-unary RPC calls.
489 Returned when an instance of `UnaryUnaryMultiCallable` object is called.
490 """
491 _request: RequestType
492 _invocation_task: asyncio.Task
494 # pylint: disable=too-many-arguments
495 def __init__(self, request: RequestType, deadline: Optional[float],
496 metadata: Metadata,
497 credentials: Optional[grpc.CallCredentials],
498 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
499 method: bytes, request_serializer: SerializingFunction,
500 response_deserializer: DeserializingFunction,
501 loop: asyncio.AbstractEventLoop) -> None:
502 super().__init__(
503 channel.call(method, deadline, credentials, wait_for_ready),
504 metadata, request_serializer, response_deserializer, loop)
505 self._request = request
506 self._invocation_task = loop.create_task(self._invoke())
507 self._init_unary_response_mixin(self._invocation_task)
509 async def _invoke(self) -> ResponseType:
510 serialized_request = _common.serialize(self._request,
511 self._request_serializer)
513 # NOTE(lidiz) asyncio.CancelledError is not a good transport for status,
514 # because the asyncio.Task class do not cache the exception object.
515 # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
516 try:
517 serialized_response = await self._cython_call.unary_unary(
518 serialized_request, self._metadata)
519 except asyncio.CancelledError:
520 if not self.cancelled():
521 self.cancel()
523 if self._cython_call.is_ok():
524 return _common.deserialize(serialized_response,
525 self._response_deserializer)
526 else:
527 return cygrpc.EOF
529 async def wait_for_connection(self) -> None:
530 await self._invocation_task
531 if self.done():
532 await self._raise_for_status()
535class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
536 """Object for managing unary-stream RPC calls.
538 Returned when an instance of `UnaryStreamMultiCallable` object is called.
539 """
540 _request: RequestType
541 _send_unary_request_task: asyncio.Task
543 # pylint: disable=too-many-arguments
544 def __init__(self, request: RequestType, deadline: Optional[float],
545 metadata: Metadata,
546 credentials: Optional[grpc.CallCredentials],
547 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
548 method: bytes, request_serializer: SerializingFunction,
549 response_deserializer: DeserializingFunction,
550 loop: asyncio.AbstractEventLoop) -> None:
551 super().__init__(
552 channel.call(method, deadline, credentials, wait_for_ready),
553 metadata, request_serializer, response_deserializer, loop)
554 self._request = request
555 self._send_unary_request_task = loop.create_task(
556 self._send_unary_request())
557 self._init_stream_response_mixin(self._send_unary_request_task)
559 async def _send_unary_request(self) -> ResponseType:
560 serialized_request = _common.serialize(self._request,
561 self._request_serializer)
562 try:
563 await self._cython_call.initiate_unary_stream(
564 serialized_request, self._metadata)
565 except asyncio.CancelledError:
566 if not self.cancelled():
567 self.cancel()
568 raise
570 async def wait_for_connection(self) -> None:
571 await self._send_unary_request_task
572 if self.done():
573 await self._raise_for_status()
576class StreamUnaryCall(_StreamRequestMixin, _UnaryResponseMixin, Call,
577 _base_call.StreamUnaryCall):
578 """Object for managing stream-unary RPC calls.
580 Returned when an instance of `StreamUnaryMultiCallable` object is called.
581 """
583 # pylint: disable=too-many-arguments
584 def __init__(self, request_iterator: Optional[RequestIterableType],
585 deadline: Optional[float], metadata: Metadata,
586 credentials: Optional[grpc.CallCredentials],
587 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
588 method: bytes, request_serializer: SerializingFunction,
589 response_deserializer: DeserializingFunction,
590 loop: asyncio.AbstractEventLoop) -> None:
591 super().__init__(
592 channel.call(method, deadline, credentials, wait_for_ready),
593 metadata, request_serializer, response_deserializer, loop)
595 self._init_stream_request_mixin(request_iterator)
596 self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
598 async def _conduct_rpc(self) -> ResponseType:
599 try:
600 serialized_response = await self._cython_call.stream_unary(
601 self._metadata, self._metadata_sent_observer)
602 except asyncio.CancelledError:
603 if not self.cancelled():
604 self.cancel()
606 if self._cython_call.is_ok():
607 return _common.deserialize(serialized_response,
608 self._response_deserializer)
609 else:
610 return cygrpc.EOF
613class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call,
614 _base_call.StreamStreamCall):
615 """Object for managing stream-stream RPC calls.
617 Returned when an instance of `StreamStreamMultiCallable` object is called.
618 """
619 _initializer: asyncio.Task
621 # pylint: disable=too-many-arguments
622 def __init__(self, request_iterator: Optional[RequestIterableType],
623 deadline: Optional[float], metadata: Metadata,
624 credentials: Optional[grpc.CallCredentials],
625 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
626 method: bytes, request_serializer: SerializingFunction,
627 response_deserializer: DeserializingFunction,
628 loop: asyncio.AbstractEventLoop) -> None:
629 super().__init__(
630 channel.call(method, deadline, credentials, wait_for_ready),
631 metadata, request_serializer, response_deserializer, loop)
632 self._initializer = self._loop.create_task(self._prepare_rpc())
633 self._init_stream_request_mixin(request_iterator)
634 self._init_stream_response_mixin(self._initializer)
636 async def _prepare_rpc(self):
637 """This method prepares the RPC for receiving/sending messages.
639 All other operations around the stream should only happen after the
640 completion of this method.
641 """
642 try:
643 await self._cython_call.initiate_stream_stream(
644 self._metadata, self._metadata_sent_observer)
645 except asyncio.CancelledError:
646 if not self.cancelled():
647 self.cancel()
648 # No need to raise RpcError here, because no one will `await` this task.