Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%
398 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Interceptors implementation of gRPC Asyncio Python."""
15from abc import ABCMeta
16from abc import abstractmethod
17import asyncio
18import collections
19import functools
20from typing import (AsyncIterable, Awaitable, Callable, Iterator, Optional,
21 Sequence, Union)
23import grpc
24from grpc._cython import cygrpc
26from . import _base_call
27from ._call import AioRpcError
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._call import _API_STYLE_ERROR
33from ._call import _RPC_ALREADY_FINISHED_DETAILS
34from ._call import _RPC_HALF_CLOSED_DETAILS
35from ._metadata import Metadata
36from ._typing import DeserializingFunction
37from ._typing import DoneCallbackType
38from ._typing import RequestIterableType
39from ._typing import RequestType
40from ._typing import ResponseIterableType
41from ._typing import ResponseType
42from ._typing import SerializingFunction
43from ._utils import _timeout_to_deadline
45_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!'
48class ServerInterceptor(metaclass=ABCMeta):
49 """Affords intercepting incoming RPCs on the service-side.
51 This is an EXPERIMENTAL API.
52 """
54 @abstractmethod
55 async def intercept_service(
56 self, continuation: Callable[[grpc.HandlerCallDetails],
57 Awaitable[grpc.RpcMethodHandler]],
58 handler_call_details: grpc.HandlerCallDetails
59 ) -> grpc.RpcMethodHandler:
60 """Intercepts incoming RPCs before handing them over to a handler.
62 Args:
63 continuation: A function that takes a HandlerCallDetails and
64 proceeds to invoke the next interceptor in the chain, if any,
65 or the RPC handler lookup logic, with the call details passed
66 as an argument, and returns an RpcMethodHandler instance if
67 the RPC is considered serviced, or None otherwise.
68 handler_call_details: A HandlerCallDetails describing the RPC.
70 Returns:
71 An RpcMethodHandler with which the RPC may be serviced if the
72 interceptor chooses to service this RPC, or None otherwise.
73 """
76class ClientCallDetails(
77 collections.namedtuple(
78 'ClientCallDetails',
79 ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')),
80 grpc.ClientCallDetails):
81 """Describes an RPC to be invoked.
83 This is an EXPERIMENTAL API.
85 Args:
86 method: The method name of the RPC.
87 timeout: An optional duration of time in seconds to allow for the RPC.
88 metadata: Optional metadata to be transmitted to the service-side of
89 the RPC.
90 credentials: An optional CallCredentials for the RPC.
91 wait_for_ready: This is an EXPERIMENTAL argument. An optional
92 flag to enable :term:`wait_for_ready` mechanism.
93 """
95 method: str
96 timeout: Optional[float]
97 metadata: Optional[Metadata]
98 credentials: Optional[grpc.CallCredentials]
99 wait_for_ready: Optional[bool]
102class ClientInterceptor(metaclass=ABCMeta):
103 """Base class used for all Aio Client Interceptor classes"""
106class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
107 """Affords intercepting unary-unary invocations."""
109 @abstractmethod
110 async def intercept_unary_unary(
111 self, continuation: Callable[[ClientCallDetails, RequestType],
112 UnaryUnaryCall],
113 client_call_details: ClientCallDetails,
114 request: RequestType) -> Union[UnaryUnaryCall, ResponseType]:
115 """Intercepts a unary-unary invocation asynchronously.
117 Args:
118 continuation: A coroutine that proceeds with the invocation by
119 executing the next interceptor in the chain or invoking the
120 actual RPC on the underlying Channel. It is the interceptor's
121 responsibility to call it if it decides to move the RPC forward.
122 The interceptor can use
123 `call = await continuation(client_call_details, request)`
124 to continue with the RPC. `continuation` returns the call to the
125 RPC.
126 client_call_details: A ClientCallDetails object describing the
127 outgoing RPC.
128 request: The request value for the RPC.
130 Returns:
131 An object with the RPC response.
133 Raises:
134 AioRpcError: Indicating that the RPC terminated with non-OK status.
135 asyncio.CancelledError: Indicating that the RPC was canceled.
136 """
139class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
140 """Affords intercepting unary-stream invocations."""
142 @abstractmethod
143 async def intercept_unary_stream(
144 self, continuation: Callable[[ClientCallDetails, RequestType],
145 UnaryStreamCall],
146 client_call_details: ClientCallDetails, request: RequestType
147 ) -> Union[ResponseIterableType, UnaryStreamCall]:
148 """Intercepts a unary-stream invocation asynchronously.
150 The function could return the call object or an asynchronous
151 iterator, in case of being an asyncrhonous iterator this will
152 become the source of the reads done by the caller.
154 Args:
155 continuation: A coroutine that proceeds with the invocation by
156 executing the next interceptor in the chain or invoking the
157 actual RPC on the underlying Channel. It is the interceptor's
158 responsibility to call it if it decides to move the RPC forward.
159 The interceptor can use
160 `call = await continuation(client_call_details, request)`
161 to continue with the RPC. `continuation` returns the call to the
162 RPC.
163 client_call_details: A ClientCallDetails object describing the
164 outgoing RPC.
165 request: The request value for the RPC.
167 Returns:
168 The RPC Call or an asynchronous iterator.
170 Raises:
171 AioRpcError: Indicating that the RPC terminated with non-OK status.
172 asyncio.CancelledError: Indicating that the RPC was canceled.
173 """
176class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
177 """Affords intercepting stream-unary invocations."""
179 @abstractmethod
180 async def intercept_stream_unary(
181 self,
182 continuation: Callable[[ClientCallDetails, RequestType],
183 StreamUnaryCall],
184 client_call_details: ClientCallDetails,
185 request_iterator: RequestIterableType,
186 ) -> StreamUnaryCall:
187 """Intercepts a stream-unary invocation asynchronously.
189 Within the interceptor the usage of the call methods like `write` or
190 even awaiting the call should be done carefully, since the caller
191 could be expecting an untouched call, for example for start writing
192 messages to it.
194 Args:
195 continuation: A coroutine that proceeds with the invocation by
196 executing the next interceptor in the chain or invoking the
197 actual RPC on the underlying Channel. It is the interceptor's
198 responsibility to call it if it decides to move the RPC forward.
199 The interceptor can use
200 `call = await continuation(client_call_details, request_iterator)`
201 to continue with the RPC. `continuation` returns the call to the
202 RPC.
203 client_call_details: A ClientCallDetails object describing the
204 outgoing RPC.
205 request_iterator: The request iterator that will produce requests
206 for the RPC.
208 Returns:
209 The RPC Call.
211 Raises:
212 AioRpcError: Indicating that the RPC terminated with non-OK status.
213 asyncio.CancelledError: Indicating that the RPC was canceled.
214 """
217class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
218 """Affords intercepting stream-stream invocations."""
220 @abstractmethod
221 async def intercept_stream_stream(
222 self,
223 continuation: Callable[[ClientCallDetails, RequestType],
224 StreamStreamCall],
225 client_call_details: ClientCallDetails,
226 request_iterator: RequestIterableType,
227 ) -> Union[ResponseIterableType, StreamStreamCall]:
228 """Intercepts a stream-stream invocation asynchronously.
230 Within the interceptor the usage of the call methods like `write` or
231 even awaiting the call should be done carefully, since the caller
232 could be expecting an untouched call, for example for start writing
233 messages to it.
235 The function could return the call object or an asynchronous
236 iterator, in case of being an asyncrhonous iterator this will
237 become the source of the reads done by the caller.
239 Args:
240 continuation: A coroutine that proceeds with the invocation by
241 executing the next interceptor in the chain or invoking the
242 actual RPC on the underlying Channel. It is the interceptor's
243 responsibility to call it if it decides to move the RPC forward.
244 The interceptor can use
245 `call = await continuation(client_call_details, request_iterator)`
246 to continue with the RPC. `continuation` returns the call to the
247 RPC.
248 client_call_details: A ClientCallDetails object describing the
249 outgoing RPC.
250 request_iterator: The request iterator that will produce requests
251 for the RPC.
253 Returns:
254 The RPC Call or an asynchronous iterator.
256 Raises:
257 AioRpcError: Indicating that the RPC terminated with non-OK status.
258 asyncio.CancelledError: Indicating that the RPC was canceled.
259 """
262class InterceptedCall:
263 """Base implementation for all intercepted call arities.
265 Interceptors might have some work to do before the RPC invocation with
266 the capacity of changing the invocation parameters, and some work to do
267 after the RPC invocation with the capacity for accessing to the wrapped
268 `UnaryUnaryCall`.
270 It handles also early and later cancellations, when the RPC has not even
271 started and the execution is still held by the interceptors or when the
272 RPC has finished but again the execution is still held by the interceptors.
274 Once the RPC is finally executed, all methods are finally done against the
275 intercepted call, being at the same time the same call returned to the
276 interceptors.
278 As a base class for all of the interceptors implements the logic around
279 final status, metadata and cancellation.
280 """
282 _interceptors_task: asyncio.Task
283 _pending_add_done_callbacks: Sequence[DoneCallbackType]
285 def __init__(self, interceptors_task: asyncio.Task) -> None:
286 self._interceptors_task = interceptors_task
287 self._pending_add_done_callbacks = []
288 self._interceptors_task.add_done_callback(
289 self._fire_or_add_pending_done_callbacks)
291 def __del__(self):
292 self.cancel()
294 def _fire_or_add_pending_done_callbacks(
295 self, interceptors_task: asyncio.Task) -> None:
297 if not self._pending_add_done_callbacks:
298 return
300 call_completed = False
302 try:
303 call = interceptors_task.result()
304 if call.done():
305 call_completed = True
306 except (AioRpcError, asyncio.CancelledError):
307 call_completed = True
309 if call_completed:
310 for callback in self._pending_add_done_callbacks:
311 callback(self)
312 else:
313 for callback in self._pending_add_done_callbacks:
314 callback = functools.partial(self._wrap_add_done_callback,
315 callback)
316 call.add_done_callback(callback)
318 self._pending_add_done_callbacks = []
320 def _wrap_add_done_callback(self, callback: DoneCallbackType,
321 unused_call: _base_call.Call) -> None:
322 callback(self)
324 def cancel(self) -> bool:
325 if not self._interceptors_task.done():
326 # There is no yet the intercepted call available,
327 # Trying to cancel it by using the generic Asyncio
328 # cancellation method.
329 return self._interceptors_task.cancel()
331 try:
332 call = self._interceptors_task.result()
333 except AioRpcError:
334 return False
335 except asyncio.CancelledError:
336 return False
338 return call.cancel()
340 def cancelled(self) -> bool:
341 if not self._interceptors_task.done():
342 return False
344 try:
345 call = self._interceptors_task.result()
346 except AioRpcError as err:
347 return err.code() == grpc.StatusCode.CANCELLED
348 except asyncio.CancelledError:
349 return True
351 return call.cancelled()
353 def done(self) -> bool:
354 if not self._interceptors_task.done():
355 return False
357 try:
358 call = self._interceptors_task.result()
359 except (AioRpcError, asyncio.CancelledError):
360 return True
362 return call.done()
364 def add_done_callback(self, callback: DoneCallbackType) -> None:
365 if not self._interceptors_task.done():
366 self._pending_add_done_callbacks.append(callback)
367 return
369 try:
370 call = self._interceptors_task.result()
371 except (AioRpcError, asyncio.CancelledError):
372 callback(self)
373 return
375 if call.done():
376 callback(self)
377 else:
378 callback = functools.partial(self._wrap_add_done_callback, callback)
379 call.add_done_callback(callback)
381 def time_remaining(self) -> Optional[float]:
382 raise NotImplementedError()
384 async def initial_metadata(self) -> Optional[Metadata]:
385 try:
386 call = await self._interceptors_task
387 except AioRpcError as err:
388 return err.initial_metadata()
389 except asyncio.CancelledError:
390 return None
392 return await call.initial_metadata()
394 async def trailing_metadata(self) -> Optional[Metadata]:
395 try:
396 call = await self._interceptors_task
397 except AioRpcError as err:
398 return err.trailing_metadata()
399 except asyncio.CancelledError:
400 return None
402 return await call.trailing_metadata()
404 async def code(self) -> grpc.StatusCode:
405 try:
406 call = await self._interceptors_task
407 except AioRpcError as err:
408 return err.code()
409 except asyncio.CancelledError:
410 return grpc.StatusCode.CANCELLED
412 return await call.code()
414 async def details(self) -> str:
415 try:
416 call = await self._interceptors_task
417 except AioRpcError as err:
418 return err.details()
419 except asyncio.CancelledError:
420 return _LOCAL_CANCELLATION_DETAILS
422 return await call.details()
424 async def debug_error_string(self) -> Optional[str]:
425 try:
426 call = await self._interceptors_task
427 except AioRpcError as err:
428 return err.debug_error_string()
429 except asyncio.CancelledError:
430 return ''
432 return await call.debug_error_string()
434 async def wait_for_connection(self) -> None:
435 call = await self._interceptors_task
436 return await call.wait_for_connection()
439class _InterceptedUnaryResponseMixin:
441 def __await__(self):
442 call = yield from self._interceptors_task.__await__()
443 response = yield from call.__await__()
444 return response
447class _InterceptedStreamResponseMixin:
448 _response_aiter: Optional[AsyncIterable[ResponseType]]
450 def _init_stream_response_mixin(self) -> None:
451 # Is initalized later, otherwise if the iterator is not finnally
452 # consumed a logging warning is emmited by Asyncio.
453 self._response_aiter = None
455 async def _wait_for_interceptor_task_response_iterator(
456 self) -> ResponseType:
457 call = await self._interceptors_task
458 async for response in call:
459 yield response
461 def __aiter__(self) -> AsyncIterable[ResponseType]:
462 if self._response_aiter is None:
463 self._response_aiter = self._wait_for_interceptor_task_response_iterator(
464 )
465 return self._response_aiter
467 async def read(self) -> ResponseType:
468 if self._response_aiter is None:
469 self._response_aiter = self._wait_for_interceptor_task_response_iterator(
470 )
471 return await self._response_aiter.asend(None)
474class _InterceptedStreamRequestMixin:
476 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
477 _write_to_iterator_queue: Optional[asyncio.Queue]
478 _status_code_task: Optional[asyncio.Task]
480 _FINISH_ITERATOR_SENTINEL = object()
482 def _init_stream_request_mixin(
483 self, request_iterator: Optional[RequestIterableType]
484 ) -> RequestIterableType:
486 if request_iterator is None:
487 # We provide our own request iterator which is a proxy
488 # of the futures writes that will be done by the caller.
489 self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
490 self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator(
491 )
492 self._status_code_task = None
493 request_iterator = self._write_to_iterator_async_gen
494 else:
495 self._write_to_iterator_queue = None
497 return request_iterator
499 async def _proxy_writes_as_request_iterator(self):
500 await self._interceptors_task
502 while True:
503 value = await self._write_to_iterator_queue.get()
504 if value is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL:
505 break
506 yield value
508 async def _write_to_iterator_queue_interruptible(self, request: RequestType,
509 call: InterceptedCall):
510 # Write the specified 'request' to the request iterator queue using the
511 # specified 'call' to allow for interruption of the write in the case
512 # of abrupt termination of the call.
513 if self._status_code_task is None:
514 self._status_code_task = self._loop.create_task(call.code())
516 await asyncio.wait(
517 (self._loop.create_task(self._write_to_iterator_queue.put(request)),
518 self._status_code_task),
519 return_when=asyncio.FIRST_COMPLETED)
521 async def write(self, request: RequestType) -> None:
522 # If no queue was created it means that requests
523 # should be expected through an iterators provided
524 # by the caller.
525 if self._write_to_iterator_queue is None:
526 raise cygrpc.UsageError(_API_STYLE_ERROR)
528 try:
529 call = await self._interceptors_task
530 except (asyncio.CancelledError, AioRpcError):
531 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
533 if call.done():
534 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
535 elif call._done_writing_flag:
536 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
538 await self._write_to_iterator_queue_interruptible(request, call)
540 if call.done():
541 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
543 async def done_writing(self) -> None:
544 """Signal peer that client is done writing.
546 This method is idempotent.
547 """
548 # If no queue was created it means that requests
549 # should be expected through an iterators provided
550 # by the caller.
551 if self._write_to_iterator_queue is None:
552 raise cygrpc.UsageError(_API_STYLE_ERROR)
554 try:
555 call = await self._interceptors_task
556 except asyncio.CancelledError:
557 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
559 await self._write_to_iterator_queue_interruptible(
560 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call)
563class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall,
564 _base_call.UnaryUnaryCall):
565 """Used for running a `UnaryUnaryCall` wrapped by interceptors.
567 For the `__await__` method is it is proxied to the intercepted call only when
568 the interceptor task is finished.
569 """
571 _loop: asyncio.AbstractEventLoop
572 _channel: cygrpc.AioChannel
574 # pylint: disable=too-many-arguments
575 def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
576 request: RequestType, timeout: Optional[float],
577 metadata: Metadata,
578 credentials: Optional[grpc.CallCredentials],
579 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
580 method: bytes, request_serializer: SerializingFunction,
581 response_deserializer: DeserializingFunction,
582 loop: asyncio.AbstractEventLoop) -> None:
583 self._loop = loop
584 self._channel = channel
585 interceptors_task = loop.create_task(
586 self._invoke(interceptors, method, timeout, metadata, credentials,
587 wait_for_ready, request, request_serializer,
588 response_deserializer))
589 super().__init__(interceptors_task)
591 # pylint: disable=too-many-arguments
592 async def _invoke(
593 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
594 method: bytes, timeout: Optional[float],
595 metadata: Optional[Metadata],
596 credentials: Optional[grpc.CallCredentials],
597 wait_for_ready: Optional[bool], request: RequestType,
598 request_serializer: SerializingFunction,
599 response_deserializer: DeserializingFunction) -> UnaryUnaryCall:
600 """Run the RPC call wrapped in interceptors"""
602 async def _run_interceptor(
603 interceptors: Iterator[UnaryUnaryClientInterceptor],
604 client_call_details: ClientCallDetails,
605 request: RequestType) -> _base_call.UnaryUnaryCall:
607 interceptor = next(interceptors, None)
609 if interceptor:
610 continuation = functools.partial(_run_interceptor, interceptors)
612 call_or_response = await interceptor.intercept_unary_unary(
613 continuation, client_call_details, request)
615 if isinstance(call_or_response, _base_call.UnaryUnaryCall):
616 return call_or_response
617 else:
618 return UnaryUnaryCallResponse(call_or_response)
620 else:
621 return UnaryUnaryCall(
622 request, _timeout_to_deadline(client_call_details.timeout),
623 client_call_details.metadata,
624 client_call_details.credentials,
625 client_call_details.wait_for_ready, self._channel,
626 client_call_details.method, request_serializer,
627 response_deserializer, self._loop)
629 client_call_details = ClientCallDetails(method, timeout, metadata,
630 credentials, wait_for_ready)
631 return await _run_interceptor(iter(interceptors), client_call_details,
632 request)
634 def time_remaining(self) -> Optional[float]:
635 raise NotImplementedError()
638class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin,
639 InterceptedCall, _base_call.UnaryStreamCall):
640 """Used for running a `UnaryStreamCall` wrapped by interceptors."""
642 _loop: asyncio.AbstractEventLoop
643 _channel: cygrpc.AioChannel
644 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
646 # pylint: disable=too-many-arguments
647 def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor],
648 request: RequestType, timeout: Optional[float],
649 metadata: Metadata,
650 credentials: Optional[grpc.CallCredentials],
651 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
652 method: bytes, request_serializer: SerializingFunction,
653 response_deserializer: DeserializingFunction,
654 loop: asyncio.AbstractEventLoop) -> None:
655 self._loop = loop
656 self._channel = channel
657 self._init_stream_response_mixin()
658 self._last_returned_call_from_interceptors = None
659 interceptors_task = loop.create_task(
660 self._invoke(interceptors, method, timeout, metadata, credentials,
661 wait_for_ready, request, request_serializer,
662 response_deserializer))
663 super().__init__(interceptors_task)
665 # pylint: disable=too-many-arguments
666 async def _invoke(
667 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
668 method: bytes, timeout: Optional[float],
669 metadata: Optional[Metadata],
670 credentials: Optional[grpc.CallCredentials],
671 wait_for_ready: Optional[bool], request: RequestType,
672 request_serializer: SerializingFunction,
673 response_deserializer: DeserializingFunction) -> UnaryStreamCall:
674 """Run the RPC call wrapped in interceptors"""
676 async def _run_interceptor(
677 interceptors: Iterator[UnaryStreamClientInterceptor],
678 client_call_details: ClientCallDetails,
679 request: RequestType,
680 ) -> _base_call.UnaryUnaryCall:
682 interceptor = next(interceptors, None)
684 if interceptor:
685 continuation = functools.partial(_run_interceptor, interceptors)
687 call_or_response_iterator = await interceptor.intercept_unary_stream(
688 continuation, client_call_details, request)
690 if isinstance(call_or_response_iterator,
691 _base_call.UnaryStreamCall):
692 self._last_returned_call_from_interceptors = call_or_response_iterator
693 else:
694 self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator(
695 self._last_returned_call_from_interceptors,
696 call_or_response_iterator)
697 return self._last_returned_call_from_interceptors
698 else:
699 self._last_returned_call_from_interceptors = UnaryStreamCall(
700 request, _timeout_to_deadline(client_call_details.timeout),
701 client_call_details.metadata,
702 client_call_details.credentials,
703 client_call_details.wait_for_ready, self._channel,
704 client_call_details.method, request_serializer,
705 response_deserializer, self._loop)
707 return self._last_returned_call_from_interceptors
709 client_call_details = ClientCallDetails(method, timeout, metadata,
710 credentials, wait_for_ready)
711 return await _run_interceptor(iter(interceptors), client_call_details,
712 request)
714 def time_remaining(self) -> Optional[float]:
715 raise NotImplementedError()
718class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
719 _InterceptedStreamRequestMixin,
720 InterceptedCall, _base_call.StreamUnaryCall):
721 """Used for running a `StreamUnaryCall` wrapped by interceptors.
723 For the `__await__` method is it is proxied to the intercepted call only when
724 the interceptor task is finished.
725 """
727 _loop: asyncio.AbstractEventLoop
728 _channel: cygrpc.AioChannel
730 # pylint: disable=too-many-arguments
731 def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor],
732 request_iterator: Optional[RequestIterableType],
733 timeout: Optional[float], metadata: Metadata,
734 credentials: Optional[grpc.CallCredentials],
735 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
736 method: bytes, request_serializer: SerializingFunction,
737 response_deserializer: DeserializingFunction,
738 loop: asyncio.AbstractEventLoop) -> None:
739 self._loop = loop
740 self._channel = channel
741 request_iterator = self._init_stream_request_mixin(request_iterator)
742 interceptors_task = loop.create_task(
743 self._invoke(interceptors, method, timeout, metadata, credentials,
744 wait_for_ready, request_iterator, request_serializer,
745 response_deserializer))
746 super().__init__(interceptors_task)
748 # pylint: disable=too-many-arguments
749 async def _invoke(
750 self, interceptors: Sequence[StreamUnaryClientInterceptor],
751 method: bytes, timeout: Optional[float],
752 metadata: Optional[Metadata],
753 credentials: Optional[grpc.CallCredentials],
754 wait_for_ready: Optional[bool],
755 request_iterator: RequestIterableType,
756 request_serializer: SerializingFunction,
757 response_deserializer: DeserializingFunction) -> StreamUnaryCall:
758 """Run the RPC call wrapped in interceptors"""
760 async def _run_interceptor(
761 interceptors: Iterator[UnaryUnaryClientInterceptor],
762 client_call_details: ClientCallDetails,
763 request_iterator: RequestIterableType
764 ) -> _base_call.StreamUnaryCall:
766 interceptor = next(interceptors, None)
768 if interceptor:
769 continuation = functools.partial(_run_interceptor, interceptors)
771 return await interceptor.intercept_stream_unary(
772 continuation, client_call_details, request_iterator)
773 else:
774 return StreamUnaryCall(
775 request_iterator,
776 _timeout_to_deadline(client_call_details.timeout),
777 client_call_details.metadata,
778 client_call_details.credentials,
779 client_call_details.wait_for_ready, self._channel,
780 client_call_details.method, request_serializer,
781 response_deserializer, self._loop)
783 client_call_details = ClientCallDetails(method, timeout, metadata,
784 credentials, wait_for_ready)
785 return await _run_interceptor(iter(interceptors), client_call_details,
786 request_iterator)
788 def time_remaining(self) -> Optional[float]:
789 raise NotImplementedError()
792class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin,
793 _InterceptedStreamRequestMixin,
794 InterceptedCall, _base_call.StreamStreamCall):
795 """Used for running a `StreamStreamCall` wrapped by interceptors."""
797 _loop: asyncio.AbstractEventLoop
798 _channel: cygrpc.AioChannel
799 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
801 # pylint: disable=too-many-arguments
802 def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor],
803 request_iterator: Optional[RequestIterableType],
804 timeout: Optional[float], metadata: Metadata,
805 credentials: Optional[grpc.CallCredentials],
806 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
807 method: bytes, request_serializer: SerializingFunction,
808 response_deserializer: DeserializingFunction,
809 loop: asyncio.AbstractEventLoop) -> None:
810 self._loop = loop
811 self._channel = channel
812 self._init_stream_response_mixin()
813 request_iterator = self._init_stream_request_mixin(request_iterator)
814 self._last_returned_call_from_interceptors = None
815 interceptors_task = loop.create_task(
816 self._invoke(interceptors, method, timeout, metadata, credentials,
817 wait_for_ready, request_iterator, request_serializer,
818 response_deserializer))
819 super().__init__(interceptors_task)
821 # pylint: disable=too-many-arguments
822 async def _invoke(
823 self, interceptors: Sequence[StreamStreamClientInterceptor],
824 method: bytes, timeout: Optional[float],
825 metadata: Optional[Metadata],
826 credentials: Optional[grpc.CallCredentials],
827 wait_for_ready: Optional[bool],
828 request_iterator: RequestIterableType,
829 request_serializer: SerializingFunction,
830 response_deserializer: DeserializingFunction) -> StreamStreamCall:
831 """Run the RPC call wrapped in interceptors"""
833 async def _run_interceptor(
834 interceptors: Iterator[StreamStreamClientInterceptor],
835 client_call_details: ClientCallDetails,
836 request_iterator: RequestIterableType
837 ) -> _base_call.StreamStreamCall:
839 interceptor = next(interceptors, None)
841 if interceptor:
842 continuation = functools.partial(_run_interceptor, interceptors)
844 call_or_response_iterator = await interceptor.intercept_stream_stream(
845 continuation, client_call_details, request_iterator)
847 if isinstance(call_or_response_iterator,
848 _base_call.StreamStreamCall):
849 self._last_returned_call_from_interceptors = call_or_response_iterator
850 else:
851 self._last_returned_call_from_interceptors = StreamStreamCallResponseIterator(
852 self._last_returned_call_from_interceptors,
853 call_or_response_iterator)
854 return self._last_returned_call_from_interceptors
855 else:
856 self._last_returned_call_from_interceptors = StreamStreamCall(
857 request_iterator,
858 _timeout_to_deadline(client_call_details.timeout),
859 client_call_details.metadata,
860 client_call_details.credentials,
861 client_call_details.wait_for_ready, self._channel,
862 client_call_details.method, request_serializer,
863 response_deserializer, self._loop)
864 return self._last_returned_call_from_interceptors
866 client_call_details = ClientCallDetails(method, timeout, metadata,
867 credentials, wait_for_ready)
868 return await _run_interceptor(iter(interceptors), client_call_details,
869 request_iterator)
871 def time_remaining(self) -> Optional[float]:
872 raise NotImplementedError()
875class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall):
876 """Final UnaryUnaryCall class finished with a response."""
877 _response: ResponseType
879 def __init__(self, response: ResponseType) -> None:
880 self._response = response
882 def cancel(self) -> bool:
883 return False
885 def cancelled(self) -> bool:
886 return False
888 def done(self) -> bool:
889 return True
891 def add_done_callback(self, unused_callback) -> None:
892 raise NotImplementedError()
894 def time_remaining(self) -> Optional[float]:
895 raise NotImplementedError()
897 async def initial_metadata(self) -> Optional[Metadata]:
898 return None
900 async def trailing_metadata(self) -> Optional[Metadata]:
901 return None
903 async def code(self) -> grpc.StatusCode:
904 return grpc.StatusCode.OK
906 async def details(self) -> str:
907 return ''
909 async def debug_error_string(self) -> Optional[str]:
910 return None
912 def __await__(self):
913 if False: # pylint: disable=using-constant-test
914 # This code path is never used, but a yield statement is needed
915 # for telling the interpreter that __await__ is a generator.
916 yield None
917 return self._response
919 async def wait_for_connection(self) -> None:
920 pass
923class _StreamCallResponseIterator:
925 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
926 _response_iterator: AsyncIterable[ResponseType]
928 def __init__(self, call: Union[_base_call.UnaryStreamCall,
929 _base_call.StreamStreamCall],
930 response_iterator: AsyncIterable[ResponseType]) -> None:
931 self._response_iterator = response_iterator
932 self._call = call
934 def cancel(self) -> bool:
935 return self._call.cancel()
937 def cancelled(self) -> bool:
938 return self._call.cancelled()
940 def done(self) -> bool:
941 return self._call.done()
943 def add_done_callback(self, callback) -> None:
944 self._call.add_done_callback(callback)
946 def time_remaining(self) -> Optional[float]:
947 return self._call.time_remaining()
949 async def initial_metadata(self) -> Optional[Metadata]:
950 return await self._call.initial_metadata()
952 async def trailing_metadata(self) -> Optional[Metadata]:
953 return await self._call.trailing_metadata()
955 async def code(self) -> grpc.StatusCode:
956 return await self._call.code()
958 async def details(self) -> str:
959 return await self._call.details()
961 async def debug_error_string(self) -> Optional[str]:
962 return await self._call.debug_error_string()
964 def __aiter__(self):
965 return self._response_iterator.__aiter__()
967 async def wait_for_connection(self) -> None:
968 return await self._call.wait_for_connection()
971class UnaryStreamCallResponseIterator(_StreamCallResponseIterator,
972 _base_call.UnaryStreamCall):
973 """UnaryStreamCall class wich uses an alternative response iterator."""
975 async def read(self) -> ResponseType:
976 # Behind the scenes everyting goes through the
977 # async iterator. So this path should not be reached.
978 raise NotImplementedError()
981class StreamStreamCallResponseIterator(_StreamCallResponseIterator,
982 _base_call.StreamStreamCall):
983 """StreamStreamCall class wich uses an alternative response iterator."""
985 async def read(self) -> ResponseType:
986 # Behind the scenes everyting goes through the
987 # async iterator. So this path should not be reached.
988 raise NotImplementedError()
990 async def write(self, request: RequestType) -> None:
991 # Behind the scenes everyting goes through the
992 # async iterator provided by the InterceptedStreamStreamCall.
993 # So this path should not be reached.
994 raise NotImplementedError()
996 async def done_writing(self) -> None:
997 # Behind the scenes everyting goes through the
998 # async iterator provided by the InterceptedStreamStreamCall.
999 # So this path should not be reached.
1000 raise NotImplementedError()
1002 @property
1003 def _done_writing_flag(self) -> bool:
1004 return self._call._done_writing_flag