Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_interceptor.py: 38%
394 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Interceptors implementation of gRPC Asyncio Python."""
15from abc import ABCMeta
16from abc import abstractmethod
17import asyncio
18import collections
19import functools
20from typing import (AsyncIterable, Awaitable, Callable, Iterator, List,
21 Optional, Sequence, Union)
23import grpc
24from grpc._cython import cygrpc
26from . import _base_call
27from ._call import AioRpcError
28from ._call import StreamStreamCall
29from ._call import StreamUnaryCall
30from ._call import UnaryStreamCall
31from ._call import UnaryUnaryCall
32from ._call import _API_STYLE_ERROR
33from ._call import _RPC_ALREADY_FINISHED_DETAILS
34from ._call import _RPC_HALF_CLOSED_DETAILS
35from ._metadata import Metadata
36from ._typing import DeserializingFunction
37from ._typing import DoneCallbackType
38from ._typing import RequestIterableType
39from ._typing import RequestType
40from ._typing import ResponseIterableType
41from ._typing import ResponseType
42from ._typing import SerializingFunction
43from ._utils import _timeout_to_deadline
45_LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!'
48class ServerInterceptor(metaclass=ABCMeta):
49 """Affords intercepting incoming RPCs on the service-side.
51 This is an EXPERIMENTAL API.
52 """
54 @abstractmethod
55 async def intercept_service(
56 self, continuation: Callable[[grpc.HandlerCallDetails],
57 Awaitable[grpc.RpcMethodHandler]],
58 handler_call_details: grpc.HandlerCallDetails
59 ) -> grpc.RpcMethodHandler:
60 """Intercepts incoming RPCs before handing them over to a handler.
62 Args:
63 continuation: A function that takes a HandlerCallDetails and
64 proceeds to invoke the next interceptor in the chain, if any,
65 or the RPC handler lookup logic, with the call details passed
66 as an argument, and returns an RpcMethodHandler instance if
67 the RPC is considered serviced, or None otherwise.
68 handler_call_details: A HandlerCallDetails describing the RPC.
70 Returns:
71 An RpcMethodHandler with which the RPC may be serviced if the
72 interceptor chooses to service this RPC, or None otherwise.
73 """
76class ClientCallDetails(
77 collections.namedtuple(
78 'ClientCallDetails',
79 ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')),
80 grpc.ClientCallDetails):
81 """Describes an RPC to be invoked.
83 This is an EXPERIMENTAL API.
85 Args:
86 method: The method name of the RPC.
87 timeout: An optional duration of time in seconds to allow for the RPC.
88 metadata: Optional metadata to be transmitted to the service-side of
89 the RPC.
90 credentials: An optional CallCredentials for the RPC.
91 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
92 """
94 method: str
95 timeout: Optional[float]
96 metadata: Optional[Metadata]
97 credentials: Optional[grpc.CallCredentials]
98 wait_for_ready: Optional[bool]
101class ClientInterceptor(metaclass=ABCMeta):
102 """Base class used for all Aio Client Interceptor classes"""
105class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
106 """Affords intercepting unary-unary invocations."""
108 @abstractmethod
109 async def intercept_unary_unary(
110 self, continuation: Callable[[ClientCallDetails, RequestType],
111 UnaryUnaryCall],
112 client_call_details: ClientCallDetails,
113 request: RequestType) -> Union[UnaryUnaryCall, ResponseType]:
114 """Intercepts a unary-unary invocation asynchronously.
116 Args:
117 continuation: A coroutine that proceeds with the invocation by
118 executing the next interceptor in the chain or invoking the
119 actual RPC on the underlying Channel. It is the interceptor's
120 responsibility to call it if it decides to move the RPC forward.
121 The interceptor can use
122 `call = await continuation(client_call_details, request)`
123 to continue with the RPC. `continuation` returns the call to the
124 RPC.
125 client_call_details: A ClientCallDetails object describing the
126 outgoing RPC.
127 request: The request value for the RPC.
129 Returns:
130 An object with the RPC response.
132 Raises:
133 AioRpcError: Indicating that the RPC terminated with non-OK status.
134 asyncio.CancelledError: Indicating that the RPC was canceled.
135 """
138class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
139 """Affords intercepting unary-stream invocations."""
141 @abstractmethod
142 async def intercept_unary_stream(
143 self, continuation: Callable[[ClientCallDetails, RequestType],
144 UnaryStreamCall],
145 client_call_details: ClientCallDetails, request: RequestType
146 ) -> Union[ResponseIterableType, UnaryStreamCall]:
147 """Intercepts a unary-stream invocation asynchronously.
149 The function could return the call object or an asynchronous
150 iterator, in case of being an asyncrhonous iterator this will
151 become the source of the reads done by the caller.
153 Args:
154 continuation: A coroutine that proceeds with the invocation by
155 executing the next interceptor in the chain or invoking the
156 actual RPC on the underlying Channel. It is the interceptor's
157 responsibility to call it if it decides to move the RPC forward.
158 The interceptor can use
159 `call = await continuation(client_call_details, request)`
160 to continue with the RPC. `continuation` returns the call to the
161 RPC.
162 client_call_details: A ClientCallDetails object describing the
163 outgoing RPC.
164 request: The request value for the RPC.
166 Returns:
167 The RPC Call or an asynchronous iterator.
169 Raises:
170 AioRpcError: Indicating that the RPC terminated with non-OK status.
171 asyncio.CancelledError: Indicating that the RPC was canceled.
172 """
175class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
176 """Affords intercepting stream-unary invocations."""
178 @abstractmethod
179 async def intercept_stream_unary(
180 self,
181 continuation: Callable[[ClientCallDetails, RequestType],
182 StreamUnaryCall],
183 client_call_details: ClientCallDetails,
184 request_iterator: RequestIterableType,
185 ) -> StreamUnaryCall:
186 """Intercepts a stream-unary invocation asynchronously.
188 Within the interceptor the usage of the call methods like `write` or
189 even awaiting the call should be done carefully, since the caller
190 could be expecting an untouched call, for example for start writing
191 messages to it.
193 Args:
194 continuation: A coroutine that proceeds with the invocation by
195 executing the next interceptor in the chain or invoking the
196 actual RPC on the underlying Channel. It is the interceptor's
197 responsibility to call it if it decides to move the RPC forward.
198 The interceptor can use
199 `call = await continuation(client_call_details, request_iterator)`
200 to continue with the RPC. `continuation` returns the call to the
201 RPC.
202 client_call_details: A ClientCallDetails object describing the
203 outgoing RPC.
204 request_iterator: The request iterator that will produce requests
205 for the RPC.
207 Returns:
208 The RPC Call.
210 Raises:
211 AioRpcError: Indicating that the RPC terminated with non-OK status.
212 asyncio.CancelledError: Indicating that the RPC was canceled.
213 """
216class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
217 """Affords intercepting stream-stream invocations."""
219 @abstractmethod
220 async def intercept_stream_stream(
221 self,
222 continuation: Callable[[ClientCallDetails, RequestType],
223 StreamStreamCall],
224 client_call_details: ClientCallDetails,
225 request_iterator: RequestIterableType,
226 ) -> Union[ResponseIterableType, StreamStreamCall]:
227 """Intercepts a stream-stream invocation asynchronously.
229 Within the interceptor the usage of the call methods like `write` or
230 even awaiting the call should be done carefully, since the caller
231 could be expecting an untouched call, for example for start writing
232 messages to it.
234 The function could return the call object or an asynchronous
235 iterator, in case of being an asyncrhonous iterator this will
236 become the source of the reads done by the caller.
238 Args:
239 continuation: A coroutine that proceeds with the invocation by
240 executing the next interceptor in the chain or invoking the
241 actual RPC on the underlying Channel. It is the interceptor's
242 responsibility to call it if it decides to move the RPC forward.
243 The interceptor can use
244 `call = await continuation(client_call_details, request_iterator)`
245 to continue with the RPC. `continuation` returns the call to the
246 RPC.
247 client_call_details: A ClientCallDetails object describing the
248 outgoing RPC.
249 request_iterator: The request iterator that will produce requests
250 for the RPC.
252 Returns:
253 The RPC Call or an asynchronous iterator.
255 Raises:
256 AioRpcError: Indicating that the RPC terminated with non-OK status.
257 asyncio.CancelledError: Indicating that the RPC was canceled.
258 """
261class InterceptedCall:
262 """Base implementation for all intercepted call arities.
264 Interceptors might have some work to do before the RPC invocation with
265 the capacity of changing the invocation parameters, and some work to do
266 after the RPC invocation with the capacity for accessing to the wrapped
267 `UnaryUnaryCall`.
269 It handles also early and later cancellations, when the RPC has not even
270 started and the execution is still held by the interceptors or when the
271 RPC has finished but again the execution is still held by the interceptors.
273 Once the RPC is finally executed, all methods are finally done against the
274 intercepted call, being at the same time the same call returned to the
275 interceptors.
277 As a base class for all of the interceptors implements the logic around
278 final status, metadata and cancellation.
279 """
281 _interceptors_task: asyncio.Task
282 _pending_add_done_callbacks: Sequence[DoneCallbackType]
284 def __init__(self, interceptors_task: asyncio.Task) -> None:
285 self._interceptors_task = interceptors_task
286 self._pending_add_done_callbacks = []
287 self._interceptors_task.add_done_callback(
288 self._fire_or_add_pending_done_callbacks)
290 def __del__(self):
291 self.cancel()
293 def _fire_or_add_pending_done_callbacks(
294 self, interceptors_task: asyncio.Task) -> None:
296 if not self._pending_add_done_callbacks:
297 return
299 call_completed = False
301 try:
302 call = interceptors_task.result()
303 if call.done():
304 call_completed = True
305 except (AioRpcError, asyncio.CancelledError):
306 call_completed = True
308 if call_completed:
309 for callback in self._pending_add_done_callbacks:
310 callback(self)
311 else:
312 for callback in self._pending_add_done_callbacks:
313 callback = functools.partial(self._wrap_add_done_callback,
314 callback)
315 call.add_done_callback(callback)
317 self._pending_add_done_callbacks = []
319 def _wrap_add_done_callback(self, callback: DoneCallbackType,
320 unused_call: _base_call.Call) -> None:
321 callback(self)
323 def cancel(self) -> bool:
324 if not self._interceptors_task.done():
325 # There is no yet the intercepted call available,
326 # Trying to cancel it by using the generic Asyncio
327 # cancellation method.
328 return self._interceptors_task.cancel()
330 try:
331 call = self._interceptors_task.result()
332 except AioRpcError:
333 return False
334 except asyncio.CancelledError:
335 return False
337 return call.cancel()
339 def cancelled(self) -> bool:
340 if not self._interceptors_task.done():
341 return False
343 try:
344 call = self._interceptors_task.result()
345 except AioRpcError as err:
346 return err.code() == grpc.StatusCode.CANCELLED
347 except asyncio.CancelledError:
348 return True
350 return call.cancelled()
352 def done(self) -> bool:
353 if not self._interceptors_task.done():
354 return False
356 try:
357 call = self._interceptors_task.result()
358 except (AioRpcError, asyncio.CancelledError):
359 return True
361 return call.done()
363 def add_done_callback(self, callback: DoneCallbackType) -> None:
364 if not self._interceptors_task.done():
365 self._pending_add_done_callbacks.append(callback)
366 return
368 try:
369 call = self._interceptors_task.result()
370 except (AioRpcError, asyncio.CancelledError):
371 callback(self)
372 return
374 if call.done():
375 callback(self)
376 else:
377 callback = functools.partial(self._wrap_add_done_callback, callback)
378 call.add_done_callback(callback)
380 def time_remaining(self) -> Optional[float]:
381 raise NotImplementedError()
383 async def initial_metadata(self) -> Optional[Metadata]:
384 try:
385 call = await self._interceptors_task
386 except AioRpcError as err:
387 return err.initial_metadata()
388 except asyncio.CancelledError:
389 return None
391 return await call.initial_metadata()
393 async def trailing_metadata(self) -> Optional[Metadata]:
394 try:
395 call = await self._interceptors_task
396 except AioRpcError as err:
397 return err.trailing_metadata()
398 except asyncio.CancelledError:
399 return None
401 return await call.trailing_metadata()
403 async def code(self) -> grpc.StatusCode:
404 try:
405 call = await self._interceptors_task
406 except AioRpcError as err:
407 return err.code()
408 except asyncio.CancelledError:
409 return grpc.StatusCode.CANCELLED
411 return await call.code()
413 async def details(self) -> str:
414 try:
415 call = await self._interceptors_task
416 except AioRpcError as err:
417 return err.details()
418 except asyncio.CancelledError:
419 return _LOCAL_CANCELLATION_DETAILS
421 return await call.details()
423 async def debug_error_string(self) -> Optional[str]:
424 try:
425 call = await self._interceptors_task
426 except AioRpcError as err:
427 return err.debug_error_string()
428 except asyncio.CancelledError:
429 return ''
431 return await call.debug_error_string()
433 async def wait_for_connection(self) -> None:
434 call = await self._interceptors_task
435 return await call.wait_for_connection()
438class _InterceptedUnaryResponseMixin:
440 def __await__(self):
441 call = yield from self._interceptors_task.__await__()
442 response = yield from call.__await__()
443 return response
446class _InterceptedStreamResponseMixin:
447 _response_aiter: Optional[AsyncIterable[ResponseType]]
449 def _init_stream_response_mixin(self) -> None:
450 # Is initalized later, otherwise if the iterator is not finnally
451 # consumed a logging warning is emmited by Asyncio.
452 self._response_aiter = None
454 async def _wait_for_interceptor_task_response_iterator(
455 self) -> ResponseType:
456 call = await self._interceptors_task
457 async for response in call:
458 yield response
460 def __aiter__(self) -> AsyncIterable[ResponseType]:
461 if self._response_aiter is None:
462 self._response_aiter = self._wait_for_interceptor_task_response_iterator(
463 )
464 return self._response_aiter
466 async def read(self) -> ResponseType:
467 if self._response_aiter is None:
468 self._response_aiter = self._wait_for_interceptor_task_response_iterator(
469 )
470 return await self._response_aiter.asend(None)
473class _InterceptedStreamRequestMixin:
475 _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
476 _write_to_iterator_queue: Optional[asyncio.Queue]
477 _status_code_task: Optional[asyncio.Task]
479 _FINISH_ITERATOR_SENTINEL = object()
481 def _init_stream_request_mixin(
482 self, request_iterator: Optional[RequestIterableType]
483 ) -> RequestIterableType:
485 if request_iterator is None:
486 # We provide our own request iterator which is a proxy
487 # of the futures writes that will be done by the caller.
488 self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
489 self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator(
490 )
491 self._status_code_task = None
492 request_iterator = self._write_to_iterator_async_gen
493 else:
494 self._write_to_iterator_queue = None
496 return request_iterator
498 async def _proxy_writes_as_request_iterator(self):
499 await self._interceptors_task
501 while True:
502 value = await self._write_to_iterator_queue.get()
503 if value is _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL:
504 break
505 yield value
507 async def _write_to_iterator_queue_interruptible(self, request: RequestType,
508 call: InterceptedCall):
509 # Write the specified 'request' to the request iterator queue using the
510 # specified 'call' to allow for interruption of the write in the case
511 # of abrupt termination of the call.
512 if self._status_code_task is None:
513 self._status_code_task = self._loop.create_task(call.code())
515 await asyncio.wait(
516 (self._loop.create_task(self._write_to_iterator_queue.put(request)),
517 self._status_code_task),
518 return_when=asyncio.FIRST_COMPLETED)
520 async def write(self, request: RequestType) -> None:
521 # If no queue was created it means that requests
522 # should be expected through an iterators provided
523 # by the caller.
524 if self._write_to_iterator_queue is None:
525 raise cygrpc.UsageError(_API_STYLE_ERROR)
527 try:
528 call = await self._interceptors_task
529 except (asyncio.CancelledError, AioRpcError):
530 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
532 if call.done():
533 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
534 elif call._done_writing_flag:
535 raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
537 await self._write_to_iterator_queue_interruptible(request, call)
539 if call.done():
540 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
542 async def done_writing(self) -> None:
543 """Signal peer that client is done writing.
545 This method is idempotent.
546 """
547 # If no queue was created it means that requests
548 # should be expected through an iterators provided
549 # by the caller.
550 if self._write_to_iterator_queue is None:
551 raise cygrpc.UsageError(_API_STYLE_ERROR)
553 try:
554 call = await self._interceptors_task
555 except asyncio.CancelledError:
556 raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
558 await self._write_to_iterator_queue_interruptible(
559 _InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call)
562class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall,
563 _base_call.UnaryUnaryCall):
564 """Used for running a `UnaryUnaryCall` wrapped by interceptors.
566 For the `__await__` method is it is proxied to the intercepted call only when
567 the interceptor task is finished.
568 """
570 _loop: asyncio.AbstractEventLoop
571 _channel: cygrpc.AioChannel
573 # pylint: disable=too-many-arguments
574 def __init__(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
575 request: RequestType, timeout: Optional[float],
576 metadata: Metadata,
577 credentials: Optional[grpc.CallCredentials],
578 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
579 method: bytes, request_serializer: SerializingFunction,
580 response_deserializer: DeserializingFunction,
581 loop: asyncio.AbstractEventLoop) -> None:
582 self._loop = loop
583 self._channel = channel
584 interceptors_task = loop.create_task(
585 self._invoke(interceptors, method, timeout, metadata, credentials,
586 wait_for_ready, request, request_serializer,
587 response_deserializer))
588 super().__init__(interceptors_task)
590 # pylint: disable=too-many-arguments
591 async def _invoke(
592 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
593 method: bytes, timeout: Optional[float],
594 metadata: Optional[Metadata],
595 credentials: Optional[grpc.CallCredentials],
596 wait_for_ready: Optional[bool], request: RequestType,
597 request_serializer: SerializingFunction,
598 response_deserializer: DeserializingFunction) -> UnaryUnaryCall:
599 """Run the RPC call wrapped in interceptors"""
601 async def _run_interceptor(
602 interceptors: List[UnaryUnaryClientInterceptor],
603 client_call_details: ClientCallDetails,
604 request: RequestType) -> _base_call.UnaryUnaryCall:
606 if interceptors:
607 continuation = functools.partial(_run_interceptor,
608 interceptors[1:])
609 call_or_response = await interceptors[0].intercept_unary_unary(
610 continuation, client_call_details, request)
612 if isinstance(call_or_response, _base_call.UnaryUnaryCall):
613 return call_or_response
614 else:
615 return UnaryUnaryCallResponse(call_or_response)
617 else:
618 return UnaryUnaryCall(
619 request, _timeout_to_deadline(client_call_details.timeout),
620 client_call_details.metadata,
621 client_call_details.credentials,
622 client_call_details.wait_for_ready, self._channel,
623 client_call_details.method, request_serializer,
624 response_deserializer, self._loop)
626 client_call_details = ClientCallDetails(method, timeout, metadata,
627 credentials, wait_for_ready)
628 return await _run_interceptor(list(interceptors), client_call_details,
629 request)
631 def time_remaining(self) -> Optional[float]:
632 raise NotImplementedError()
635class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin,
636 InterceptedCall, _base_call.UnaryStreamCall):
637 """Used for running a `UnaryStreamCall` wrapped by interceptors."""
639 _loop: asyncio.AbstractEventLoop
640 _channel: cygrpc.AioChannel
641 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
643 # pylint: disable=too-many-arguments
644 def __init__(self, interceptors: Sequence[UnaryStreamClientInterceptor],
645 request: RequestType, timeout: Optional[float],
646 metadata: Metadata,
647 credentials: Optional[grpc.CallCredentials],
648 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
649 method: bytes, request_serializer: SerializingFunction,
650 response_deserializer: DeserializingFunction,
651 loop: asyncio.AbstractEventLoop) -> None:
652 self._loop = loop
653 self._channel = channel
654 self._init_stream_response_mixin()
655 self._last_returned_call_from_interceptors = None
656 interceptors_task = loop.create_task(
657 self._invoke(interceptors, method, timeout, metadata, credentials,
658 wait_for_ready, request, request_serializer,
659 response_deserializer))
660 super().__init__(interceptors_task)
662 # pylint: disable=too-many-arguments
663 async def _invoke(
664 self, interceptors: Sequence[UnaryUnaryClientInterceptor],
665 method: bytes, timeout: Optional[float],
666 metadata: Optional[Metadata],
667 credentials: Optional[grpc.CallCredentials],
668 wait_for_ready: Optional[bool], request: RequestType,
669 request_serializer: SerializingFunction,
670 response_deserializer: DeserializingFunction) -> UnaryStreamCall:
671 """Run the RPC call wrapped in interceptors"""
673 async def _run_interceptor(
674 interceptors: List[UnaryStreamClientInterceptor],
675 client_call_details: ClientCallDetails,
676 request: RequestType,
677 ) -> _base_call.UnaryUnaryCall:
679 if interceptors:
680 continuation = functools.partial(_run_interceptor,
681 interceptors[1:])
683 call_or_response_iterator = await interceptors[
684 0].intercept_unary_stream(continuation, client_call_details,
685 request)
687 if isinstance(call_or_response_iterator,
688 _base_call.UnaryStreamCall):
689 self._last_returned_call_from_interceptors = call_or_response_iterator
690 else:
691 self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator(
692 self._last_returned_call_from_interceptors,
693 call_or_response_iterator)
694 return self._last_returned_call_from_interceptors
695 else:
696 self._last_returned_call_from_interceptors = UnaryStreamCall(
697 request, _timeout_to_deadline(client_call_details.timeout),
698 client_call_details.metadata,
699 client_call_details.credentials,
700 client_call_details.wait_for_ready, self._channel,
701 client_call_details.method, request_serializer,
702 response_deserializer, self._loop)
704 return self._last_returned_call_from_interceptors
706 client_call_details = ClientCallDetails(method, timeout, metadata,
707 credentials, wait_for_ready)
708 return await _run_interceptor(list(interceptors), client_call_details,
709 request)
711 def time_remaining(self) -> Optional[float]:
712 raise NotImplementedError()
715class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
716 _InterceptedStreamRequestMixin,
717 InterceptedCall, _base_call.StreamUnaryCall):
718 """Used for running a `StreamUnaryCall` wrapped by interceptors.
720 For the `__await__` method is it is proxied to the intercepted call only when
721 the interceptor task is finished.
722 """
724 _loop: asyncio.AbstractEventLoop
725 _channel: cygrpc.AioChannel
727 # pylint: disable=too-many-arguments
728 def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor],
729 request_iterator: Optional[RequestIterableType],
730 timeout: Optional[float], metadata: Metadata,
731 credentials: Optional[grpc.CallCredentials],
732 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
733 method: bytes, request_serializer: SerializingFunction,
734 response_deserializer: DeserializingFunction,
735 loop: asyncio.AbstractEventLoop) -> None:
736 self._loop = loop
737 self._channel = channel
738 request_iterator = self._init_stream_request_mixin(request_iterator)
739 interceptors_task = loop.create_task(
740 self._invoke(interceptors, method, timeout, metadata, credentials,
741 wait_for_ready, request_iterator, request_serializer,
742 response_deserializer))
743 super().__init__(interceptors_task)
745 # pylint: disable=too-many-arguments
746 async def _invoke(
747 self, interceptors: Sequence[StreamUnaryClientInterceptor],
748 method: bytes, timeout: Optional[float],
749 metadata: Optional[Metadata],
750 credentials: Optional[grpc.CallCredentials],
751 wait_for_ready: Optional[bool],
752 request_iterator: RequestIterableType,
753 request_serializer: SerializingFunction,
754 response_deserializer: DeserializingFunction) -> StreamUnaryCall:
755 """Run the RPC call wrapped in interceptors"""
757 async def _run_interceptor(
758 interceptors: Iterator[UnaryUnaryClientInterceptor],
759 client_call_details: ClientCallDetails,
760 request_iterator: RequestIterableType
761 ) -> _base_call.StreamUnaryCall:
763 if interceptors:
764 continuation = functools.partial(_run_interceptor,
765 interceptors[1:])
767 return await interceptors[0].intercept_stream_unary(
768 continuation, client_call_details, request_iterator)
769 else:
770 return StreamUnaryCall(
771 request_iterator,
772 _timeout_to_deadline(client_call_details.timeout),
773 client_call_details.metadata,
774 client_call_details.credentials,
775 client_call_details.wait_for_ready, self._channel,
776 client_call_details.method, request_serializer,
777 response_deserializer, self._loop)
779 client_call_details = ClientCallDetails(method, timeout, metadata,
780 credentials, wait_for_ready)
781 return await _run_interceptor(list(interceptors), client_call_details,
782 request_iterator)
784 def time_remaining(self) -> Optional[float]:
785 raise NotImplementedError()
788class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin,
789 _InterceptedStreamRequestMixin,
790 InterceptedCall, _base_call.StreamStreamCall):
791 """Used for running a `StreamStreamCall` wrapped by interceptors."""
793 _loop: asyncio.AbstractEventLoop
794 _channel: cygrpc.AioChannel
795 _last_returned_call_from_interceptors = Optional[_base_call.UnaryStreamCall]
797 # pylint: disable=too-many-arguments
798 def __init__(self, interceptors: Sequence[StreamStreamClientInterceptor],
799 request_iterator: Optional[RequestIterableType],
800 timeout: Optional[float], metadata: Metadata,
801 credentials: Optional[grpc.CallCredentials],
802 wait_for_ready: Optional[bool], channel: cygrpc.AioChannel,
803 method: bytes, request_serializer: SerializingFunction,
804 response_deserializer: DeserializingFunction,
805 loop: asyncio.AbstractEventLoop) -> None:
806 self._loop = loop
807 self._channel = channel
808 self._init_stream_response_mixin()
809 request_iterator = self._init_stream_request_mixin(request_iterator)
810 self._last_returned_call_from_interceptors = None
811 interceptors_task = loop.create_task(
812 self._invoke(interceptors, method, timeout, metadata, credentials,
813 wait_for_ready, request_iterator, request_serializer,
814 response_deserializer))
815 super().__init__(interceptors_task)
817 # pylint: disable=too-many-arguments
818 async def _invoke(
819 self, interceptors: Sequence[StreamStreamClientInterceptor],
820 method: bytes, timeout: Optional[float],
821 metadata: Optional[Metadata],
822 credentials: Optional[grpc.CallCredentials],
823 wait_for_ready: Optional[bool],
824 request_iterator: RequestIterableType,
825 request_serializer: SerializingFunction,
826 response_deserializer: DeserializingFunction) -> StreamStreamCall:
827 """Run the RPC call wrapped in interceptors"""
829 async def _run_interceptor(
830 interceptors: List[StreamStreamClientInterceptor],
831 client_call_details: ClientCallDetails,
832 request_iterator: RequestIterableType
833 ) -> _base_call.StreamStreamCall:
835 if interceptors:
836 continuation = functools.partial(_run_interceptor,
837 interceptors[1:])
839 call_or_response_iterator = await interceptors[
840 0].intercept_stream_stream(continuation,
841 client_call_details,
842 request_iterator)
844 if isinstance(call_or_response_iterator,
845 _base_call.StreamStreamCall):
846 self._last_returned_call_from_interceptors = call_or_response_iterator
847 else:
848 self._last_returned_call_from_interceptors = StreamStreamCallResponseIterator(
849 self._last_returned_call_from_interceptors,
850 call_or_response_iterator)
851 return self._last_returned_call_from_interceptors
852 else:
853 self._last_returned_call_from_interceptors = StreamStreamCall(
854 request_iterator,
855 _timeout_to_deadline(client_call_details.timeout),
856 client_call_details.metadata,
857 client_call_details.credentials,
858 client_call_details.wait_for_ready, self._channel,
859 client_call_details.method, request_serializer,
860 response_deserializer, self._loop)
861 return self._last_returned_call_from_interceptors
863 client_call_details = ClientCallDetails(method, timeout, metadata,
864 credentials, wait_for_ready)
865 return await _run_interceptor(list(interceptors), client_call_details,
866 request_iterator)
868 def time_remaining(self) -> Optional[float]:
869 raise NotImplementedError()
872class UnaryUnaryCallResponse(_base_call.UnaryUnaryCall):
873 """Final UnaryUnaryCall class finished with a response."""
874 _response: ResponseType
876 def __init__(self, response: ResponseType) -> None:
877 self._response = response
879 def cancel(self) -> bool:
880 return False
882 def cancelled(self) -> bool:
883 return False
885 def done(self) -> bool:
886 return True
888 def add_done_callback(self, unused_callback) -> None:
889 raise NotImplementedError()
891 def time_remaining(self) -> Optional[float]:
892 raise NotImplementedError()
894 async def initial_metadata(self) -> Optional[Metadata]:
895 return None
897 async def trailing_metadata(self) -> Optional[Metadata]:
898 return None
900 async def code(self) -> grpc.StatusCode:
901 return grpc.StatusCode.OK
903 async def details(self) -> str:
904 return ''
906 async def debug_error_string(self) -> Optional[str]:
907 return None
909 def __await__(self):
910 if False: # pylint: disable=using-constant-test
911 # This code path is never used, but a yield statement is needed
912 # for telling the interpreter that __await__ is a generator.
913 yield None
914 return self._response
916 async def wait_for_connection(self) -> None:
917 pass
920class _StreamCallResponseIterator:
922 _call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
923 _response_iterator: AsyncIterable[ResponseType]
925 def __init__(self, call: Union[_base_call.UnaryStreamCall,
926 _base_call.StreamStreamCall],
927 response_iterator: AsyncIterable[ResponseType]) -> None:
928 self._response_iterator = response_iterator
929 self._call = call
931 def cancel(self) -> bool:
932 return self._call.cancel()
934 def cancelled(self) -> bool:
935 return self._call.cancelled()
937 def done(self) -> bool:
938 return self._call.done()
940 def add_done_callback(self, callback) -> None:
941 self._call.add_done_callback(callback)
943 def time_remaining(self) -> Optional[float]:
944 return self._call.time_remaining()
946 async def initial_metadata(self) -> Optional[Metadata]:
947 return await self._call.initial_metadata()
949 async def trailing_metadata(self) -> Optional[Metadata]:
950 return await self._call.trailing_metadata()
952 async def code(self) -> grpc.StatusCode:
953 return await self._call.code()
955 async def details(self) -> str:
956 return await self._call.details()
958 async def debug_error_string(self) -> Optional[str]:
959 return await self._call.debug_error_string()
961 def __aiter__(self):
962 return self._response_iterator.__aiter__()
964 async def wait_for_connection(self) -> None:
965 return await self._call.wait_for_connection()
968class UnaryStreamCallResponseIterator(_StreamCallResponseIterator,
969 _base_call.UnaryStreamCall):
970 """UnaryStreamCall class wich uses an alternative response iterator."""
972 async def read(self) -> ResponseType:
973 # Behind the scenes everyting goes through the
974 # async iterator. So this path should not be reached.
975 raise NotImplementedError()
978class StreamStreamCallResponseIterator(_StreamCallResponseIterator,
979 _base_call.StreamStreamCall):
980 """StreamStreamCall class wich uses an alternative response iterator."""
982 async def read(self) -> ResponseType:
983 # Behind the scenes everyting goes through the
984 # async iterator. So this path should not be reached.
985 raise NotImplementedError()
987 async def write(self, request: RequestType) -> None:
988 # Behind the scenes everyting goes through the
989 # async iterator provided by the InterceptedStreamStreamCall.
990 # So this path should not be reached.
991 raise NotImplementedError()
993 async def done_writing(self) -> None:
994 # Behind the scenes everyting goes through the
995 # async iterator provided by the InterceptedStreamStreamCall.
996 # So this path should not be reached.
997 raise NotImplementedError()
999 @property
1000 def _done_writing_flag(self) -> bool:
1001 return self._call._done_writing_flag