Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%
394 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Interceptors implementation of gRPC Asyncio Python."""
15from abc import ABCMeta
16from abc import abstractmethod
17import asyncio
18import collections
19import functools
20from typing import (
21 AsyncIterable,
22 Awaitable,
23 Callable,
24 Iterator,
25 List,
26 Optional,
27 Sequence,
28 Union,
29)
31import grpc
32from grpc._cython import cygrpc
34from . import _base_call
35from ._call import AioRpcError
36from ._call import StreamStreamCall
37from ._call import StreamUnaryCall
38from ._call import UnaryStreamCall
39from ._call import UnaryUnaryCall
40from ._call import _API_STYLE_ERROR
41from ._call import _RPC_ALREADY_FINISHED_DETAILS
42from ._call import _RPC_HALF_CLOSED_DETAILS
43from ._metadata import Metadata
44from ._typing import DeserializingFunction
45from ._typing import DoneCallbackType
46from ._typing import RequestIterableType
47from ._typing import RequestType
48from ._typing import ResponseIterableType
49from ._typing import ResponseType
50from ._typing import SerializingFunction
51from ._utils import _timeout_to_deadline
53_LOCAL_CANCELLATION_DETAILS = "Locally cancelled by application!"
56class ServerInterceptor(metaclass=ABCMeta):
57 """Affords intercepting incoming RPCs on the service-side.
59 This is an EXPERIMENTAL API.
60 """
62 @abstractmethod
63 async def intercept_service(
64 self,
65 continuation: Callable[
66 [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
67 ],
68 handler_call_details: grpc.HandlerCallDetails,
69 ) -> grpc.RpcMethodHandler:
70 """Intercepts incoming RPCs before handing them over to a handler.
72 Args:
73 continuation: A function that takes a HandlerCallDetails and
74 proceeds to invoke the next interceptor in the chain, if any,
75 or the RPC handler lookup logic, with the call details passed
76 as an argument, and returns an RpcMethodHandler instance if
77 the RPC is considered serviced, or None otherwise.
78 handler_call_details: A HandlerCallDetails describing the RPC.
80 Returns:
81 An RpcMethodHandler with which the RPC may be serviced if the
82 interceptor chooses to service this RPC, or None otherwise.
83 """
86class ClientCallDetails(
87 collections.namedtuple(
88 "ClientCallDetails",
89 ("method", "timeout", "metadata", "credentials", "wait_for_ready"),
90 ),
91 grpc.ClientCallDetails,
92):
93 """Describes an RPC to be invoked.
95 This is an EXPERIMENTAL API.
97 Args:
98 method: The method name of the RPC.
99 timeout: An optional duration of time in seconds to allow for the RPC.
100 metadata: Optional metadata to be transmitted to the service-side of
101 the RPC.
102 credentials: An optional CallCredentials for the RPC.
103 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
104 """
106 method: str
107 timeout: Optional[float]
108 metadata: Optional[Metadata]
109 credentials: Optional[grpc.CallCredentials]
110 wait_for_ready: Optional[bool]
113class ClientInterceptor(metaclass=ABCMeta):
114 """Base class used for all Aio Client Interceptor classes"""
117class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
118 """Affords intercepting unary-unary invocations."""
120 @abstractmethod
121 async def intercept_unary_unary(
122 self,
123 continuation: Callable[
124 [ClientCallDetails, RequestType], UnaryUnaryCall
125 ],
126 client_call_details: ClientCallDetails,
127 request: RequestType,
128 ) -> Union[UnaryUnaryCall, ResponseType]:
129 """Intercepts a unary-unary invocation asynchronously.
131 Args:
132 continuation: A coroutine that proceeds with the invocation by
133 executing the next interceptor in the chain or invoking the
134 actual RPC on the underlying Channel. It is the interceptor's
135 responsibility to call it if it decides to move the RPC forward.
136 The interceptor can use
137 `call = await continuation(client_call_details, request)`
138 to continue with the RPC. `continuation` returns the call to the
139 RPC.
140 client_call_details: A ClientCallDetails object describing the
141 outgoing RPC.
142 request: The request value for the RPC.
144 Returns:
145 An object with the RPC response.
147 Raises:
148 AioRpcError: Indicating that the RPC terminated with non-OK status.
149 asyncio.CancelledError: Indicating that the RPC was canceled.
150 """
153class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
154 """Affords intercepting unary-stream invocations."""
156 @abstractmethod
157 async def intercept_unary_stream(
158 self,
159 continuation: Callable[
160 [ClientCallDetails, RequestType], UnaryStreamCall
161 ],
162 client_call_details: ClientCallDetails,
163 request: RequestType,
164 ) -> Union[ResponseIterableType, UnaryStreamCall]:
165 """Intercepts a unary-stream invocation asynchronously.
167 The function could return the call object or an asynchronous
168 iterator, in case of being an asyncrhonous iterator this will
169 become the source of the reads done by the caller.
171 Args:
172 continuation: A coroutine that proceeds with the invocation by
173 executing the next interceptor in the chain or invoking the
174 actual RPC on the underlying Channel. It is the interceptor's
175 responsibility to call it if it decides to move the RPC forward.
176 The interceptor can use
177 `call = await continuation(client_call_details, request)`
178 to continue with the RPC. `continuation` returns the call to the
179 RPC.
180 client_call_details: A ClientCallDetails object describing the
181 outgoing RPC.
182 request: The request value for the RPC.
184 Returns:
185 The RPC Call or an asynchronous iterator.
187 Raises:
188 AioRpcError: Indicating that the RPC terminated with non-OK status.
189 asyncio.CancelledError: Indicating that the RPC was canceled.
190 """
193class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
194 """Affords intercepting stream-unary invocations."""
196 @abstractmethod
197 async def intercept_stream_unary(
198 self,
199 continuation: Callable[
200 [ClientCallDetails, RequestType], StreamUnaryCall
201 ],
202 client_call_details: ClientCallDetails,
203 request_iterator: RequestIterableType,
204 ) -> StreamUnaryCall:
205 """Intercepts a stream-unary invocation asynchronously.
207 Within the interceptor the usage of the call methods like `write` or
208 even awaiting the call should be done carefully, since the caller
209 could be expecting an untouched call, for example for start writing
210 messages to it.
212 Args:
213 continuation: A coroutine that proceeds with the invocation by
214 executing the next interceptor in the chain or invoking the
215 actual RPC on the underlying Channel. It is the interceptor's
216 responsibility to call it if it decides to move the RPC forward.
217 The interceptor can use
218 `call = await continuation(client_call_details, request_iterator)`
219 to continue with the RPC. `continuation` returns the call to the
220 RPC.
221 client_call_details: A ClientCallDetails object describing the
222 outgoing RPC.
223 request_iterator: The request iterator that will produce requests
224 for the RPC.
226 Returns:
227 The RPC Call.
229 Raises:
230 AioRpcError: Indicating that the RPC terminated with non-OK status.
231 asyncio.CancelledError: Indicating that the RPC was canceled.
232 """
235class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
236 """Affords intercepting stream-stream invocations."""
238 @abstractmethod
239 async def intercept_stream_stream(
240 self,
241 continuation: Callable[
242 [ClientCallDetails, RequestType], StreamStreamCall
243 ],
244 client_call_details: ClientCallDetails,
245 request_iterator: RequestIterableType,
246 ) -> Union[ResponseIterableType, StreamStreamCall]:
247 """Intercepts a stream-stream invocation asynchronously.
249 Within the interceptor the usage of the call methods like `write` or
250 even awaiting the call should be done carefully, since the caller
251 could be expecting an untouched call, for example for start writing
252 messages to it.
254 The function could return the call object or an asynchronous
255 iterator, in case of being an asyncrhonous iterator this will
256 become the source of the reads done by the caller.
258 Args:
259 continuation: A coroutine that proceeds with the invocation by
260 executing the next interceptor in the chain or invoking the
261 actual RPC on the underlying Channel. It is the interceptor's
262 responsibility to call it if it decides to move the RPC forward.
263 The interceptor can use
264 `call = await continuation(client_call_details, request_iterator)`
265 to continue with the RPC. `continuation` returns the call to the
266 RPC.
267 client_call_details: A ClientCallDetails object describing the
268 outgoing RPC.
269 request_iterator: The request iterator that will produce requests
270 for the RPC.
272 Returns:
273 The RPC Call or an asynchronous iterator.
275 Raises:
276 AioRpcError: Indicating that the RPC terminated with non-OK status.
277 asyncio.CancelledError: Indicating that the RPC was canceled.
278 """
281class InterceptedCall:
282 """Base implementation for all intercepted call arities.
284 Interceptors might have some work to do before the RPC invocation with
285 the capacity of changing the invocation parameters, and some work to do
286 after the RPC invocation with the capacity for accessing to the wrapped
287 `UnaryUnaryCall`.
289 It handles also early and later cancellations, when the RPC has not even
290 started and the execution is still held by the interceptors or when the
291 RPC has finished but again the execution is still held by the interceptors.
293 Once the RPC is finally executed, all methods are finally done against the
294 intercepted call, being at the same time the same call returned to the
295 interceptors.
297 As a base class for all of the interceptors implements the logic around
298 final status, metadata and cancellation.
299 """
301 _interceptors_task: asyncio.Task
302 _pending_add_done_callbacks: Sequence[DoneCallbackType]
304 def __init__(self, interceptors_task: asyncio.Task) -> None:
305 self._interceptors_task = interceptors_task
306 self._pending_add_done_callbacks = []
307 self._interceptors_task.add_done_callback(
308 self._fire_or_add_pending_done_callbacks
309 )
311 def __del__(self):
312 self.cancel()
314 def _fire_or_add_pending_done_callbacks(
315 self, interceptors_task: asyncio.Task
316 ) -> None:
317 if not self._pending_add_done_callbacks:
318 return
320 call_completed = False
322 try:
323 call = interceptors_task.result()
324 if call.done():
325 call_completed = True
326 except (AioRpcError, asyncio.CancelledError):
327 call_completed = True
329 if call_completed:
330 for callback in self._pending_add_done_callbacks:
331 callback(self)
332 else:
333 for callback in self._pending_add_done_callbacks:
334 callback = functools.partial(
335 self._wrap_add_done_callback, callback
336 )
337 call.add_done_callback(callback)
339 self._pending_add_done_callbacks = []
341 def _wrap_add_done_callback(
342 self, callback: DoneCallbackType, unused_call: _base_call.Call
343 ) -> None:
344 callback(self)
346 def cancel(self) -> bool:
347 if not self._interceptors_task.done():
348 # There is no yet the intercepted call available,
349 # Trying to cancel it by using the generic Asyncio
350 # cancellation method.
351 return self._interceptors_task.cancel()
353 try:
354 call = self._interceptors_task.result()
355 except AioRpcError:
356 return False
357 except asyncio.CancelledError:
358 return False
360 return call.cancel()
362 def cancelled(self) -> bool:
363 if not self._interceptors_task.done():
364 return False
366 try:
367 call = self._interceptors_task.result()
368 except AioRpcError as err:
369 return err.code() == grpc.StatusCode.CANCELLED
370 except asyncio.CancelledError:
371 return True
373 return call.cancelled()
375 def done(self) -> bool:
376 if not self._interceptors_task.done():
377 return False
379 try:
380 call = self._interceptors_task.result()
381 except (AioRpcError, asyncio.CancelledError):
382 return True
384 return call.done()
386 def add_done_callback(self, callback: DoneCallbackType) -> None:
387 if not self._interceptors_task.done():
388 self._pending_add_done_callbacks.append(callback)
389 return
391 try:
392 call = self._interceptors_task.result()
393 except (AioRpcError, asyncio.CancelledError):
394 callback(self)
395 return
397 if call.done():
398 callback(self)
399 else:
400 callback = functools.partial(self._wrap_add_done_callback, callback)
401 call.add_done_callback(callback)
403 def time_remaining(self) -> Optional[float]:
404 raise NotImplementedError()
406 async def initial_metadata(self) -> Optional[Metadata]:
407 try:
408 call = await self._interceptors_task
409 except AioRpcError as err:
410 return err.initial_metadata()
411 except asyncio.CancelledError:
412 return None
414 return await call.initial_metadata()
416 async def trailing_metadata(self) -> Optional[Metadata]:
417 try:
418 call = await self._interceptors_task
419 except AioRpcError as err:
420 return err.trailing_metadata()
421 except asyncio.CancelledError:
422 return None
424 return await call.trailing_metadata()
426 async def code(self) -> grpc.StatusCode:
427 try:
428 call = await self._interceptors_task
429 except AioRpcError as err:
430 return err.code()
431 except asyncio.CancelledError:
432 return grpc.StatusCode.CANCELLED
434 return await call.code()
436 async def details(self) -> str:
437 try:
438 call = await self._interceptors_task
439 except AioRpcError as err:
440 return err.details()
441 except asyncio.CancelledError:
442 return _LOCAL_CANCELLATION_DETAILS
444 return await call.details()
446 async def debug_error_string(self) -> Optional[str]:
447 try:
448 call = await self._interceptors_task
449 except AioRpcError as err:
450 return err.debug_error_string()
451 except asyncio.CancelledError:
452 return ""
454 return await call.debug_error_string()
456 async def wait_for_connection(self) -> None:
457 call = await self._interceptors_task
458 return await call.wait_for_connection()
461class _InterceptedUnaryResponseMixin:
462 def __await__(self):
463 call = yield from self._interceptors_task.__await__()
464 response = yield from call.__await__()
465 return response
468class _InterceptedStreamResponseMixin:
469 _response_aiter: Optional[AsyncIterable[ResponseType]]
471 def _init_stream_response_mixin(self) -> None:
472 # Is initalized later, otherwise if the iterator is not finnally
473 # consumed a logging warning is emmited by Asyncio.
474 self._response_aiter = None
476 async def _wait_for_interceptor_task_response_iterator(
477 self,
478 ) -> ResponseType:
479 call = await self._interceptors_task
480 async for response in call:
481 yield response
483 def __aiter__(self) -> AsyncIterable[ResponseType]:
484 if self._response_aiter is None:
485 self._response_aiter = (
486 self._wait_for_interceptor_task_response_iterator()
487 )
488 return self._response_aiter
490 async def read(self) -> ResponseType:
491 if self._response_aiter is None:
492 self._response_aiter = (
493 self._wait_for_interceptor_task_response_iterator()
494 )
495 return await self._response_aiter.asend(None)
498class _InterceptedStreamRequestMixin:
499 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
500 _write_to_iterator_queue: Optional[asyncio.Queue]
501 _status_code_task: Optional[asyncio.Task]
503 _FINISH_ITERATOR_SENTINEL = object()
505 def _init_stream_request_mixin(
506 self, request_iterator: Optional[RequestIterableType]
507 ) -> RequestIterableType:
508 if request_iterator is None:
509 # We provide our own request iterator which is a proxy
510 # of the futures writes that will be done by the caller.
511 self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
512 self._write_to_iterator_async_gen = (
513 self._proxy_writes_as_request_iterator()
514 )
515 self._status_code_task = None
516 request_iterator = self._write_to_iterator_async_gen
517 else:
518 self._write_to_iterator_queue = None
520 return request_iterator
522 async def _proxy_writes_as_request_iterator(self):
523 await self._interceptors_task
525 while True:
526 value = await self._write_to_iterator_queue.get()
527 if (
528 value
529 is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL
530 ):
531 break
532 yield value
534 async def _write_to_iterator_queue_interruptible(
535 self, request: RequestType, call: InterceptedCall
536 ):
537 # Write the specified 'request' to the request iterator queue using the
538 # specified 'call' to allow for interruption of the write in the case
539 # of abrupt termination of the call.
540 if self._status_code_task is None:
541 self._status_code_task = self._loop.create_task(call.code())
543 await asyncio.wait(
544 (
545 self._loop.create_task(
546 self._write_to_iterator_queue.put(request)
547 ),
548 self._status_code_task,
549 ),
550 return_when=asyncio.FIRST_COMPLETED,
551 )
553 async def write(self, request: RequestType) -> None:
554 # If no queue was created it means that requests
555 # should be expected through an iterators provided
556 # by the caller.
557 if self._write_to_iterator_queue is None:
558 raise cygrpc.UsageError(_API_STYLE_ERROR)
560 try:
561 call = await self._interceptors_task
562 except (asyncio.CancelledError, AioRpcError):
563 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
565 if call.done():
566 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
567 elif call._done_writing_flag:
568 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
570 await self._write_to_iterator_queue_interruptible(request, call)
572 if call.done():
573 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
575 async def done_writing(self) -> None:
576 """Signal peer that client is done writing.
578 This method is idempotent.
579 """
580 # If no queue was created it means that requests
581 # should be expected through an iterators provided
582 # by the caller.
583 if self._write_to_iterator_queue is None:
584 raise cygrpc.UsageError(_API_STYLE_ERROR)
586 try:
587 call = await self._interceptors_task
588 except asyncio.CancelledError:
589 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
591 await self._write_to_iterator_queue_interruptible(
592 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call
593 )
596class InterceptedUnaryUnaryCall(
597 _InterceptedUnaryResponseMixin, InterceptedCall, _base_call.UnaryUnaryCall
598):
599 """Used for running a `UnaryUnaryCall` wrapped by interceptors.
601 For the `__await__` method is it is proxied to the intercepted call only when
602 the interceptor task is finished.
603 """
605 _loop: asyncio.AbstractEventLoop
606 _channel: cygrpc.AioChannel
608 # pylint: disable=too-many-arguments
609 def __init__(
610 self,
611 interceptors: Sequence[UnaryUnaryClientInterceptor],
612 request: RequestType,
613 timeout: Optional[float],
614 metadata: Metadata,
615 credentials: Optional[grpc.CallCredentials],
616 wait_for_ready: Optional[bool],
617 channel: cygrpc.AioChannel,
618 method: bytes,
619 request_serializer: SerializingFunction,
620 response_deserializer: DeserializingFunction,
621 loop: asyncio.AbstractEventLoop,
622 ) -> None:
623 self._loop = loop
624 self._channel = channel
625 interceptors_task = loop.create_task(
626 self._invoke(
627 interceptors,
628 method,
629 timeout,
630 metadata,
631 credentials,
632 wait_for_ready,
633 request,
634 request_serializer,
635 response_deserializer,
636 )
637 )
638 super().__init__(interceptors_task)
640 # pylint: disable=too-many-arguments
641 async def _invoke(
642 self,
643 interceptors: Sequence[UnaryUnaryClientInterceptor],
644 method: bytes,
645 timeout: Optional[float],
646 metadata: Optional[Metadata],
647 credentials: Optional[grpc.CallCredentials],
648 wait_for_ready: Optional[bool],
649 request: RequestType,
650 request_serializer: SerializingFunction,
651 response_deserializer: DeserializingFunction,
652 ) -> UnaryUnaryCall:
653 """Run the RPC call wrapped in interceptors"""
655 async def _run_interceptor(
656 interceptors: List[UnaryUnaryClientInterceptor],
657 client_call_details: ClientCallDetails,
658 request: RequestType,
659 ) -> _base_call.UnaryUnaryCall:
660 if interceptors:
661 continuation = functools.partial(
662 _run_interceptor, interceptors[1:]
663 )
664 call_or_response = await interceptors[0].intercept_unary_unary(
665 continuation, client_call_details, request
666 )
668 if isinstance(call_or_response, _base_call.UnaryUnaryCall):
669 return call_or_response
670 else:
671 return UnaryUnaryCallResponse(call_or_response)
673 else:
674 return UnaryUnaryCall(
675 request,
676 _timeout_to_deadline(client_call_details.timeout),
677 client_call_details.metadata,
678 client_call_details.credentials,
679 client_call_details.wait_for_ready,
680 self._channel,
681 client_call_details.method,
682 request_serializer,
683 response_deserializer,
684 self._loop,
685 )
687 client_call_details = ClientCallDetails(
688 method, timeout, metadata, credentials, wait_for_ready
689 )
690 return await _run_interceptor(
691 list(interceptors), client_call_details, request
692 )
694 def time_remaining(self) -> Optional[float]:
695 raise NotImplementedError()
698class InterceptedUnaryStreamCall(
699 _InterceptedStreamResponseMixin, InterceptedCall, _base_call.UnaryStreamCall
700):
701 """Used for running a `UnaryStreamCall` wrapped by interceptors."""
703 _loop: asyncio.AbstractEventLoop
704 _channel: cygrpc.AioChannel
705 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
707 # pylint: disable=too-many-arguments
708 def __init__(
709 self,
710 interceptors: Sequence[UnaryStreamClientInterceptor],
711 request: RequestType,
712 timeout: Optional[float],
713 metadata: Metadata,
714 credentials: Optional[grpc.CallCredentials],
715 wait_for_ready: Optional[bool],
716 channel: cygrpc.AioChannel,
717 method: bytes,
718 request_serializer: SerializingFunction,
719 response_deserializer: DeserializingFunction,
720 loop: asyncio.AbstractEventLoop,
721 ) -> None:
722 self._loop = loop
723 self._channel = channel
724 self._init_stream_response_mixin()
725 self._last_returned_call_from_interceptors = None
726 interceptors_task = loop.create_task(
727 self._invoke(
728 interceptors,
729 method,
730 timeout,
731 metadata,
732 credentials,
733 wait_for_ready,
734 request,
735 request_serializer,
736 response_deserializer,
737 )
738 )
739 super().__init__(interceptors_task)
741 # pylint: disable=too-many-arguments
742 async def _invoke(
743 self,
744 interceptors: Sequence[UnaryStreamClientInterceptor],
745 method: bytes,
746 timeout: Optional[float],
747 metadata: Optional[Metadata],
748 credentials: Optional[grpc.CallCredentials],
749 wait_for_ready: Optional[bool],
750 request: RequestType,
751 request_serializer: SerializingFunction,
752 response_deserializer: DeserializingFunction,
753 ) -> UnaryStreamCall:
754 """Run the RPC call wrapped in interceptors"""
756 async def _run_interceptor(
757 interceptors: List[UnaryStreamClientInterceptor],
758 client_call_details: ClientCallDetails,
759 request: RequestType,
760 ) -> _base_call.UnaryStreamCall:
761 if interceptors:
762 continuation = functools.partial(
763 _run_interceptor, interceptors[1:]
764 )
766 call_or_response_iterator = await interceptors[
767 0
768 ].intercept_unary_stream(
769 continuation, client_call_details, request
770 )
772 if isinstance(
773 call_or_response_iterator, _base_call.UnaryStreamCall
774 ):
775 self._last_returned_call_from_interceptors = (
776 call_or_response_iterator
777 )
778 else:
779 self._last_returned_call_from_interceptors = (
780 UnaryStreamCallResponseIterator(
781 self._last_returned_call_from_interceptors,
782 call_or_response_iterator,
783 )
784 )
785 return self._last_returned_call_from_interceptors
786 else:
787 self._last_returned_call_from_interceptors = UnaryStreamCall(
788 request,
789 _timeout_to_deadline(client_call_details.timeout),
790 client_call_details.metadata,
791 client_call_details.credentials,
792 client_call_details.wait_for_ready,
793 self._channel,
794 client_call_details.method,
795 request_serializer,
796 response_deserializer,
797 self._loop,
798 )
800 return self._last_returned_call_from_interceptors
802 client_call_details = ClientCallDetails(
803 method, timeout, metadata, credentials, wait_for_ready
804 )
805 return await _run_interceptor(
806 list(interceptors), client_call_details, request
807 )
809 def time_remaining(self) -> Optional[float]:
810 raise NotImplementedError()
813class InterceptedStreamUnaryCall(
814 _InterceptedUnaryResponseMixin,
815 _InterceptedStreamRequestMixin,
816 InterceptedCall,
817 _base_call.StreamUnaryCall,
818):
819 """Used for running a `StreamUnaryCall` wrapped by interceptors.
821 For the `__await__` method is it is proxied to the intercepted call only when
822 the interceptor task is finished.
823 """
825 _loop: asyncio.AbstractEventLoop
826 _channel: cygrpc.AioChannel
828 # pylint: disable=too-many-arguments
829 def __init__(
830 self,
831 interceptors: Sequence[StreamUnaryClientInterceptor],
832 request_iterator: Optional[RequestIterableType],
833 timeout: Optional[float],
834 metadata: Metadata,
835 credentials: Optional[grpc.CallCredentials],
836 wait_for_ready: Optional[bool],
837 channel: cygrpc.AioChannel,
838 method: bytes,
839 request_serializer: SerializingFunction,
840 response_deserializer: DeserializingFunction,
841 loop: asyncio.AbstractEventLoop,
842 ) -> None:
843 self._loop = loop
844 self._channel = channel
845 request_iterator = self._init_stream_request_mixin(request_iterator)
846 interceptors_task = loop.create_task(
847 self._invoke(
848 interceptors,
849 method,
850 timeout,
851 metadata,
852 credentials,
853 wait_for_ready,
854 request_iterator,
855 request_serializer,
856 response_deserializer,
857 )
858 )
859 super().__init__(interceptors_task)
861 # pylint: disable=too-many-arguments
862 async def _invoke(
863 self,
864 interceptors: Sequence[StreamUnaryClientInterceptor],
865 method: bytes,
866 timeout: Optional[float],
867 metadata: Optional[Metadata],
868 credentials: Optional[grpc.CallCredentials],
869 wait_for_ready: Optional[bool],
870 request_iterator: RequestIterableType,
871 request_serializer: SerializingFunction,
872 response_deserializer: DeserializingFunction,
873 ) -> StreamUnaryCall:
874 """Run the RPC call wrapped in interceptors"""
876 async def _run_interceptor(
877 interceptors: Iterator[StreamUnaryClientInterceptor],
878 client_call_details: ClientCallDetails,
879 request_iterator: RequestIterableType,
880 ) -> _base_call.StreamUnaryCall:
881 if interceptors:
882 continuation = functools.partial(
883 _run_interceptor, interceptors[1:]
884 )
886 return await interceptors[0].intercept_stream_unary(
887 continuation, client_call_details, request_iterator
888 )
889 else:
890 return StreamUnaryCall(
891 request_iterator,
892 _timeout_to_deadline(client_call_details.timeout),
893 client_call_details.metadata,
894 client_call_details.credentials,
895 client_call_details.wait_for_ready,
896 self._channel,
897 client_call_details.method,
898 request_serializer,
899 response_deserializer,
900 self._loop,
901 )
903 client_call_details = ClientCallDetails(
904 method, timeout, metadata, credentials, wait_for_ready
905 )
906 return await _run_interceptor(
907 list(interceptors), client_call_details, request_iterator
908 )
910 def time_remaining(self) -> Optional[float]:
911 raise NotImplementedError()
914class InterceptedStreamStreamCall(
915 _InterceptedStreamResponseMixin,
916 _InterceptedStreamRequestMixin,
917 InterceptedCall,
918 _base_call.StreamStreamCall,
919):
920 """Used for running a `StreamStreamCall` wrapped by interceptors."""
922 _loop: asyncio.AbstractEventLoop
923 _channel: cygrpc.AioChannel
924 _last_returned_call_from_interceptors = Optional[
925 _base_call.StreamStreamCall
926 ]
928 # pylint: disable=too-many-arguments
929 def __init__(
930 self,
931 interceptors: Sequence[StreamStreamClientInterceptor],
932 request_iterator: Optional[RequestIterableType],
933 timeout: Optional[float],
934 metadata: Metadata,
935 credentials: Optional[grpc.CallCredentials],
936 wait_for_ready: Optional[bool],
937 channel: cygrpc.AioChannel,
938 method: bytes,
939 request_serializer: SerializingFunction,
940 response_deserializer: DeserializingFunction,
941 loop: asyncio.AbstractEventLoop,
942 ) -> None:
943 self._loop = loop
944 self._channel = channel
945 self._init_stream_response_mixin()
946 request_iterator = self._init_stream_request_mixin(request_iterator)
947 self._last_returned_call_from_interceptors = None
948 interceptors_task = loop.create_task(
949 self._invoke(
950 interceptors,
951 method,
952 timeout,
953 metadata,
954 credentials,
955 wait_for_ready,
956 request_iterator,
957 request_serializer,
958 response_deserializer,
959 )
960 )
961 super().__init__(interceptors_task)
963 # pylint: disable=too-many-arguments
964 async def _invoke(
965 self,
966 interceptors: Sequence[StreamStreamClientInterceptor],
967 method: bytes,
968 timeout: Optional[float],
969 metadata: Optional[Metadata],
970 credentials: Optional[grpc.CallCredentials],
971 wait_for_ready: Optional[bool],
972 request_iterator: RequestIterableType,
973 request_serializer: SerializingFunction,
974 response_deserializer: DeserializingFunction,
975 ) -> StreamStreamCall:
976 """Run the RPC call wrapped in interceptors"""
978 async def _run_interceptor(
979 interceptors: List[StreamStreamClientInterceptor],
980 client_call_details: ClientCallDetails,
981 request_iterator: RequestIterableType,
982 ) -> _base_call.StreamStreamCall:
983 if interceptors:
984 continuation = functools.partial(
985 _run_interceptor, interceptors[1:]
986 )
988 call_or_response_iterator = await interceptors[
989 0
990 ].intercept_stream_stream(
991 continuation, client_call_details, request_iterator
992 )
994 if isinstance(
995 call_or_response_iterator, _base_call.StreamStreamCall
996 ):
997 self._last_returned_call_from_interceptors = (
998 call_or_response_iterator
999 )
1000 else:
1001 self._last_returned_call_from_interceptors = (
1002 StreamStreamCallResponseIterator(
1003 self._last_returned_call_from_interceptors,
1004 call_or_response_iterator,
1005 )
1006 )
1007 return self._last_returned_call_from_interceptors
1008 else:
1009 self._last_returned_call_from_interceptors = StreamStreamCall(
1010 request_iterator,
1011 _timeout_to_deadline(client_call_details.timeout),
1012 client_call_details.metadata,
1013 client_call_details.credentials,
1014 client_call_details.wait_for_ready,
1015 self._channel,
1016 client_call_details.method,
1017 request_serializer,
1018 response_deserializer,
1019 self._loop,
1020 )
1021 return self._last_returned_call_from_interceptors
1023 client_call_details = ClientCallDetails(
1024 method, timeout, metadata, credentials, wait_for_ready
1025 )
1026 return await _run_interceptor(
1027 list(interceptors), client_call_details, request_iterator
1028 )
1030 def time_remaining(self) -> Optional[float]:
1031 raise NotImplementedError()
1034class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall):
1035 """Final UnaryUnaryCall class finished with a response."""
1037 _response: ResponseType
1039 def __init__(self, response: ResponseType) -> None:
1040 self._response = response
1042 def cancel(self) -> bool:
1043 return False
1045 def cancelled(self) -> bool:
1046 return False
1048 def done(self) -> bool:
1049 return True
1051 def add_done_callback(self, unused_callback) -> None:
1052 raise NotImplementedError()
1054 def time_remaining(self) -> Optional[float]:
1055 raise NotImplementedError()
1057 async def initial_metadata(self) -> Optional[Metadata]:
1058 return None
1060 async def trailing_metadata(self) -> Optional[Metadata]:
1061 return None
1063 async def code(self) -> grpc.StatusCode:
1064 return grpc.StatusCode.OK
1066 async def details(self) -> str:
1067 return ""
1069 async def debug_error_string(self) -> Optional[str]:
1070 return None
1072 def __await__(self):
1073 if False: # pylint: disable=using-constant-test
1074 # This code path is never used, but a yield statement is needed
1075 # for telling the interpreter that __await__ is a generator.
1076 yield None
1077 return self._response
1079 async def wait_for_connection(self) -> None:
1080 pass
1083class _StreamCallResponseIterator:
1084 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
1085 _response_iterator: AsyncIterable[ResponseType]
1087 def __init__(
1088 self,
1089 call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall],
1090 response_iterator: AsyncIterable[ResponseType],
1091 ) -> None:
1092 self._response_iterator = response_iterator
1093 self._call = call
1095 def cancel(self) -> bool:
1096 return self._call.cancel()
1098 def cancelled(self) -> bool:
1099 return self._call.cancelled()
1101 def done(self) -> bool:
1102 return self._call.done()
1104 def add_done_callback(self, callback) -> None:
1105 self._call.add_done_callback(callback)
1107 def time_remaining(self) -> Optional[float]:
1108 return self._call.time_remaining()
1110 async def initial_metadata(self) -> Optional[Metadata]:
1111 return await self._call.initial_metadata()
1113 async def trailing_metadata(self) -> Optional[Metadata]:
1114 return await self._call.trailing_metadata()
1116 async def code(self) -> grpc.StatusCode:
1117 return await self._call.code()
1119 async def details(self) -> str:
1120 return await self._call.details()
1122 async def debug_error_string(self) -> Optional[str]:
1123 return await self._call.debug_error_string()
1125 def __aiter__(self):
1126 return self._response_iterator.__aiter__()
1128 async def wait_for_connection(self) -> None:
1129 return await self._call.wait_for_connection()
1132class UnaryStreamCallResponseIterator(
1133 _StreamCallResponseIterator, _base_call.UnaryStreamCall
1134):
1135 """UnaryStreamCall class wich uses an alternative response iterator."""
1137 async def read(self) -> ResponseType:
1138 # Behind the scenes everyting goes through the
1139 # async iterator. So this path should not be reached.
1140 raise NotImplementedError()
1143class StreamStreamCallResponseIterator(
1144 _StreamCallResponseIterator, _base_call.StreamStreamCall
1145):
1146 """StreamStreamCall class wich uses an alternative response iterator."""
1148 async def read(self) -> ResponseType:
1149 # Behind the scenes everyting goes through the
1150 # async iterator. So this path should not be reached.
1151 raise NotImplementedError()
1153 async def write(self, request: RequestType) -> None:
1154 # Behind the scenes everyting goes through the
1155 # async iterator provided by the InterceptedStreamStreamCall.
1156 # So this path should not be reached.
1157 raise NotImplementedError()
1159 async def done_writing(self) -> None:
1160 # Behind the scenes everyting goes through the
1161 # async iterator provided by the InterceptedStreamStreamCall.
1162 # So this path should not be reached.
1163 raise NotImplementedError()
1165 @property
1166 def _done_writing_flag(self) -> bool:
1167 return self._call._done_writing_flag