Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Implementation of gRPC Python interceptors."""
16import collections
17import sys
18import types
19from typing import Any, Callable, Optional, Sequence, Tuple, Union
21import grpc
23from ._typing import DeserializingFunction
24from ._typing import DoneCallbackType
25from ._typing import MetadataType
26from ._typing import RequestIterableType
27from ._typing import SerializingFunction
30class _ServicePipeline(object):
31 interceptors: Tuple[grpc.ServerInterceptor]
33 def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]):
34 self.interceptors = tuple(interceptors)
36 def _continuation(self, thunk: Callable, index: int) -> Callable:
37 return lambda context: self._intercept_at(thunk, index, context)
39 def _intercept_at(
40 self, thunk: Callable, index: int, context: grpc.HandlerCallDetails
41 ) -> grpc.RpcMethodHandler:
42 if index < len(self.interceptors):
43 interceptor = self.interceptors[index]
44 thunk = self._continuation(thunk, index + 1)
45 return interceptor.intercept_service(thunk, context)
46 return thunk(context)
48 def execute(
49 self, thunk: Callable, context: grpc.HandlerCallDetails
50 ) -> grpc.RpcMethodHandler:
51 return self._intercept_at(thunk, 0, context)
54def service_pipeline(
55 interceptors: Optional[Sequence[grpc.ServerInterceptor]],
56) -> Optional[_ServicePipeline]:
57 return _ServicePipeline(interceptors) if interceptors else None
60class _ClientCallDetails(
61 collections.namedtuple(
62 "_ClientCallDetails",
63 (
64 "method",
65 "timeout",
66 "metadata",
67 "credentials",
68 "wait_for_ready",
69 "compression",
70 ),
71 ),
72 grpc.ClientCallDetails,
73):
74 pass
77def _unwrap_client_call_details(
78 call_details: grpc.ClientCallDetails,
79 default_details: grpc.ClientCallDetails,
80) -> Tuple[
81 str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression
82]:
83 try:
84 method = call_details.method # pytype: disable=attribute-error
85 except AttributeError:
86 method = default_details.method # pytype: disable=attribute-error
88 try:
89 timeout = call_details.timeout # pytype: disable=attribute-error
90 except AttributeError:
91 timeout = default_details.timeout # pytype: disable=attribute-error
93 try:
94 metadata = call_details.metadata # pytype: disable=attribute-error
95 except AttributeError:
96 metadata = default_details.metadata # pytype: disable=attribute-error
98 try:
99 credentials = (
100 call_details.credentials
101 ) # pytype: disable=attribute-error
102 except AttributeError:
103 credentials = (
104 default_details.credentials
105 ) # pytype: disable=attribute-error
107 try:
108 wait_for_ready = (
109 call_details.wait_for_ready
110 ) # pytype: disable=attribute-error
111 except AttributeError:
112 wait_for_ready = (
113 default_details.wait_for_ready
114 ) # pytype: disable=attribute-error
116 try:
117 compression = (
118 call_details.compression
119 ) # pytype: disable=attribute-error
120 except AttributeError:
121 compression = (
122 default_details.compression
123 ) # pytype: disable=attribute-error
125 return method, timeout, metadata, credentials, wait_for_ready, compression
128class _FailureOutcome(
129 grpc.RpcError, grpc.Future, grpc.Call
130): # pylint: disable=too-many-ancestors
131 _exception: Exception
132 _traceback: types.TracebackType
134 def __init__(self, exception: Exception, traceback: types.TracebackType):
135 super(_FailureOutcome, self).__init__()
136 self._exception = exception
137 self._traceback = traceback
139 def initial_metadata(self) -> Optional[MetadataType]:
140 return None
142 def trailing_metadata(self) -> Optional[MetadataType]:
143 return None
145 def code(self) -> Optional[grpc.StatusCode]:
146 return grpc.StatusCode.INTERNAL
148 def details(self) -> Optional[str]:
149 return "Exception raised while intercepting the RPC"
151 def cancel(self) -> bool:
152 return False
154 def cancelled(self) -> bool:
155 return False
157 def is_active(self) -> bool:
158 return False
160 def time_remaining(self) -> Optional[float]:
161 return None
163 def running(self) -> bool:
164 return False
166 def done(self) -> bool:
167 return True
169 def result(self, ignored_timeout: Optional[float] = None):
170 raise self._exception
172 def exception(
173 self, ignored_timeout: Optional[float] = None
174 ) -> Optional[Exception]:
175 return self._exception
177 def traceback(
178 self, ignored_timeout: Optional[float] = None
179 ) -> Optional[types.TracebackType]:
180 return self._traceback
182 def add_callback(self, unused_callback) -> bool:
183 return False
185 def add_done_callback(self, fn: DoneCallbackType) -> None:
186 fn(self)
188 def __iter__(self):
189 return self
191 def __next__(self):
192 raise self._exception
194 def next(self):
195 return self.__next__()
198class _UnaryOutcome(grpc.Call, grpc.Future):
199 _response: Any
200 _call: grpc.Call
202 def __init__(self, response: Any, call: grpc.Call):
203 self._response = response
204 self._call = call
206 def initial_metadata(self) -> Optional[MetadataType]:
207 return self._call.initial_metadata()
209 def trailing_metadata(self) -> Optional[MetadataType]:
210 return self._call.trailing_metadata()
212 def code(self) -> Optional[grpc.StatusCode]:
213 return self._call.code()
215 def details(self) -> Optional[str]:
216 return self._call.details()
218 def is_active(self) -> bool:
219 return self._call.is_active()
221 def time_remaining(self) -> Optional[float]:
222 return self._call.time_remaining()
224 def cancel(self) -> bool:
225 return self._call.cancel()
227 def add_callback(self, callback) -> bool:
228 return self._call.add_callback(callback)
230 def cancelled(self) -> bool:
231 return False
233 def running(self) -> bool:
234 return False
236 def done(self) -> bool:
237 return True
239 def result(self, ignored_timeout: Optional[float] = None):
240 return self._response
242 def exception(self, ignored_timeout: Optional[float] = None):
243 return None
245 def traceback(self, ignored_timeout: Optional[float] = None):
246 return None
248 def add_done_callback(self, fn: DoneCallbackType) -> None:
249 fn(self)
252class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
253 _thunk: Callable
254 _method: str
255 _interceptor: grpc.UnaryUnaryClientInterceptor
257 def __init__(
258 self,
259 thunk: Callable,
260 method: str,
261 interceptor: grpc.UnaryUnaryClientInterceptor,
262 ):
263 self._thunk = thunk
264 self._method = method
265 self._interceptor = interceptor
267 def __call__(
268 self,
269 request: Any,
270 timeout: Optional[float] = None,
271 metadata: Optional[MetadataType] = None,
272 credentials: Optional[grpc.CallCredentials] = None,
273 wait_for_ready: Optional[bool] = None,
274 compression: Optional[grpc.Compression] = None,
275 ) -> Any:
276 response, ignored_call = self._with_call(
277 request,
278 timeout=timeout,
279 metadata=metadata,
280 credentials=credentials,
281 wait_for_ready=wait_for_ready,
282 compression=compression,
283 )
284 return response
286 def _with_call(
287 self,
288 request: Any,
289 timeout: Optional[float] = None,
290 metadata: Optional[MetadataType] = None,
291 credentials: Optional[grpc.CallCredentials] = None,
292 wait_for_ready: Optional[bool] = None,
293 compression: Optional[grpc.Compression] = None,
294 ) -> Tuple[Any, grpc.Call]:
295 client_call_details = _ClientCallDetails(
296 self._method,
297 timeout,
298 metadata,
299 credentials,
300 wait_for_ready,
301 compression,
302 )
304 def continuation(new_details, request):
305 (
306 new_method,
307 new_timeout,
308 new_metadata,
309 new_credentials,
310 new_wait_for_ready,
311 new_compression,
312 ) = _unwrap_client_call_details(new_details, client_call_details)
313 try:
314 response, call = self._thunk(new_method).with_call(
315 request,
316 timeout=new_timeout,
317 metadata=new_metadata,
318 credentials=new_credentials,
319 wait_for_ready=new_wait_for_ready,
320 compression=new_compression,
321 )
322 return _UnaryOutcome(response, call)
323 except grpc.RpcError as rpc_error:
324 return rpc_error
325 except Exception as exception: # pylint:disable=broad-except
326 return _FailureOutcome(exception, sys.exc_info()[2])
328 call = self._interceptor.intercept_unary_unary(
329 continuation, client_call_details, request
330 )
331 return call.result(), call
333 def with_call(
334 self,
335 request: Any,
336 timeout: Optional[float] = None,
337 metadata: Optional[MetadataType] = None,
338 credentials: Optional[grpc.CallCredentials] = None,
339 wait_for_ready: Optional[bool] = None,
340 compression: Optional[grpc.Compression] = None,
341 ) -> Tuple[Any, grpc.Call]:
342 return self._with_call(
343 request,
344 timeout=timeout,
345 metadata=metadata,
346 credentials=credentials,
347 wait_for_ready=wait_for_ready,
348 compression=compression,
349 )
351 def future(
352 self,
353 request: Any,
354 timeout: Optional[float] = None,
355 metadata: Optional[MetadataType] = None,
356 credentials: Optional[grpc.CallCredentials] = None,
357 wait_for_ready: Optional[bool] = None,
358 compression: Optional[grpc.Compression] = None,
359 ) -> Any:
360 client_call_details = _ClientCallDetails(
361 self._method,
362 timeout,
363 metadata,
364 credentials,
365 wait_for_ready,
366 compression,
367 )
369 def continuation(new_details, request):
370 (
371 new_method,
372 new_timeout,
373 new_metadata,
374 new_credentials,
375 new_wait_for_ready,
376 new_compression,
377 ) = _unwrap_client_call_details(new_details, client_call_details)
378 return self._thunk(new_method).future(
379 request,
380 timeout=new_timeout,
381 metadata=new_metadata,
382 credentials=new_credentials,
383 wait_for_ready=new_wait_for_ready,
384 compression=new_compression,
385 )
387 try:
388 return self._interceptor.intercept_unary_unary(
389 continuation, client_call_details, request
390 )
391 except Exception as exception: # pylint:disable=broad-except
392 return _FailureOutcome(exception, sys.exc_info()[2])
395class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
396 _thunk: Callable
397 _method: str
398 _interceptor: grpc.UnaryStreamClientInterceptor
400 def __init__(
401 self,
402 thunk: Callable,
403 method: str,
404 interceptor: grpc.UnaryStreamClientInterceptor,
405 ):
406 self._thunk = thunk
407 self._method = method
408 self._interceptor = interceptor
410 def __call__(
411 self,
412 request: Any,
413 timeout: Optional[float] = None,
414 metadata: Optional[MetadataType] = None,
415 credentials: Optional[grpc.CallCredentials] = None,
416 wait_for_ready: Optional[bool] = None,
417 compression: Optional[grpc.Compression] = None,
418 ):
419 client_call_details = _ClientCallDetails(
420 self._method,
421 timeout,
422 metadata,
423 credentials,
424 wait_for_ready,
425 compression,
426 )
428 def continuation(new_details, request):
429 (
430 new_method,
431 new_timeout,
432 new_metadata,
433 new_credentials,
434 new_wait_for_ready,
435 new_compression,
436 ) = _unwrap_client_call_details(new_details, client_call_details)
437 return self._thunk(new_method)(
438 request,
439 timeout=new_timeout,
440 metadata=new_metadata,
441 credentials=new_credentials,
442 wait_for_ready=new_wait_for_ready,
443 compression=new_compression,
444 )
446 try:
447 return self._interceptor.intercept_unary_stream(
448 continuation, client_call_details, request
449 )
450 except Exception as exception: # pylint:disable=broad-except
451 return _FailureOutcome(exception, sys.exc_info()[2])
454class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
455 _thunk: Callable
456 _method: str
457 _interceptor: grpc.StreamUnaryClientInterceptor
459 def __init__(
460 self,
461 thunk: Callable,
462 method: str,
463 interceptor: grpc.StreamUnaryClientInterceptor,
464 ):
465 self._thunk = thunk
466 self._method = method
467 self._interceptor = interceptor
469 def __call__(
470 self,
471 request_iterator: RequestIterableType,
472 timeout: Optional[float] = None,
473 metadata: Optional[MetadataType] = None,
474 credentials: Optional[grpc.CallCredentials] = None,
475 wait_for_ready: Optional[bool] = None,
476 compression: Optional[grpc.Compression] = None,
477 ) -> Any:
478 response, ignored_call = self._with_call(
479 request_iterator,
480 timeout=timeout,
481 metadata=metadata,
482 credentials=credentials,
483 wait_for_ready=wait_for_ready,
484 compression=compression,
485 )
486 return response
488 def _with_call(
489 self,
490 request_iterator: RequestIterableType,
491 timeout: Optional[float] = None,
492 metadata: Optional[MetadataType] = None,
493 credentials: Optional[grpc.CallCredentials] = None,
494 wait_for_ready: Optional[bool] = None,
495 compression: Optional[grpc.Compression] = None,
496 ) -> Tuple[Any, grpc.Call]:
497 client_call_details = _ClientCallDetails(
498 self._method,
499 timeout,
500 metadata,
501 credentials,
502 wait_for_ready,
503 compression,
504 )
506 def continuation(new_details, request_iterator):
507 (
508 new_method,
509 new_timeout,
510 new_metadata,
511 new_credentials,
512 new_wait_for_ready,
513 new_compression,
514 ) = _unwrap_client_call_details(new_details, client_call_details)
515 try:
516 response, call = self._thunk(new_method).with_call(
517 request_iterator,
518 timeout=new_timeout,
519 metadata=new_metadata,
520 credentials=new_credentials,
521 wait_for_ready=new_wait_for_ready,
522 compression=new_compression,
523 )
524 return _UnaryOutcome(response, call)
525 except grpc.RpcError as rpc_error:
526 return rpc_error
527 except Exception as exception: # pylint:disable=broad-except
528 return _FailureOutcome(exception, sys.exc_info()[2])
530 call = self._interceptor.intercept_stream_unary(
531 continuation, client_call_details, request_iterator
532 )
533 return call.result(), call
535 def with_call(
536 self,
537 request_iterator: RequestIterableType,
538 timeout: Optional[float] = None,
539 metadata: Optional[MetadataType] = None,
540 credentials: Optional[grpc.CallCredentials] = None,
541 wait_for_ready: Optional[bool] = None,
542 compression: Optional[grpc.Compression] = None,
543 ) -> Tuple[Any, grpc.Call]:
544 return self._with_call(
545 request_iterator,
546 timeout=timeout,
547 metadata=metadata,
548 credentials=credentials,
549 wait_for_ready=wait_for_ready,
550 compression=compression,
551 )
553 def future(
554 self,
555 request_iterator: RequestIterableType,
556 timeout: Optional[float] = None,
557 metadata: Optional[MetadataType] = None,
558 credentials: Optional[grpc.CallCredentials] = None,
559 wait_for_ready: Optional[bool] = None,
560 compression: Optional[grpc.Compression] = None,
561 ) -> Any:
562 client_call_details = _ClientCallDetails(
563 self._method,
564 timeout,
565 metadata,
566 credentials,
567 wait_for_ready,
568 compression,
569 )
571 def continuation(new_details, request_iterator):
572 (
573 new_method,
574 new_timeout,
575 new_metadata,
576 new_credentials,
577 new_wait_for_ready,
578 new_compression,
579 ) = _unwrap_client_call_details(new_details, client_call_details)
580 return self._thunk(new_method).future(
581 request_iterator,
582 timeout=new_timeout,
583 metadata=new_metadata,
584 credentials=new_credentials,
585 wait_for_ready=new_wait_for_ready,
586 compression=new_compression,
587 )
589 try:
590 return self._interceptor.intercept_stream_unary(
591 continuation, client_call_details, request_iterator
592 )
593 except Exception as exception: # pylint:disable=broad-except
594 return _FailureOutcome(exception, sys.exc_info()[2])
597class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
598 _thunk: Callable
599 _method: str
600 _interceptor: grpc.StreamStreamClientInterceptor
602 def __init__(
603 self,
604 thunk: Callable,
605 method: str,
606 interceptor: grpc.StreamStreamClientInterceptor,
607 ):
608 self._thunk = thunk
609 self._method = method
610 self._interceptor = interceptor
612 def __call__(
613 self,
614 request_iterator: RequestIterableType,
615 timeout: Optional[float] = None,
616 metadata: Optional[MetadataType] = None,
617 credentials: Optional[grpc.CallCredentials] = None,
618 wait_for_ready: Optional[bool] = None,
619 compression: Optional[grpc.Compression] = None,
620 ):
621 client_call_details = _ClientCallDetails(
622 self._method,
623 timeout,
624 metadata,
625 credentials,
626 wait_for_ready,
627 compression,
628 )
630 def continuation(new_details, request_iterator):
631 (
632 new_method,
633 new_timeout,
634 new_metadata,
635 new_credentials,
636 new_wait_for_ready,
637 new_compression,
638 ) = _unwrap_client_call_details(new_details, client_call_details)
639 return self._thunk(new_method)(
640 request_iterator,
641 timeout=new_timeout,
642 metadata=new_metadata,
643 credentials=new_credentials,
644 wait_for_ready=new_wait_for_ready,
645 compression=new_compression,
646 )
648 try:
649 return self._interceptor.intercept_stream_stream(
650 continuation, client_call_details, request_iterator
651 )
652 except Exception as exception: # pylint:disable=broad-except
653 return _FailureOutcome(exception, sys.exc_info()[2])
656class _Channel(grpc.Channel):
657 _channel: grpc.Channel
658 _interceptor: Union[
659 grpc.UnaryUnaryClientInterceptor,
660 grpc.UnaryStreamClientInterceptor,
661 grpc.StreamStreamClientInterceptor,
662 grpc.StreamUnaryClientInterceptor,
663 ]
665 def __init__(
666 self,
667 channel: grpc.Channel,
668 interceptor: Union[
669 grpc.UnaryUnaryClientInterceptor,
670 grpc.UnaryStreamClientInterceptor,
671 grpc.StreamStreamClientInterceptor,
672 grpc.StreamUnaryClientInterceptor,
673 ],
674 ):
675 self._channel = channel
676 self._interceptor = interceptor
678 def subscribe(
679 self, callback: Callable, try_to_connect: Optional[bool] = False
680 ):
681 self._channel.subscribe(callback, try_to_connect=try_to_connect)
683 def unsubscribe(self, callback: Callable):
684 self._channel.unsubscribe(callback)
686 # pylint: disable=arguments-differ
687 def unary_unary(
688 self,
689 method: str,
690 request_serializer: Optional[SerializingFunction] = None,
691 response_deserializer: Optional[DeserializingFunction] = None,
692 _registered_method: Optional[bool] = False,
693 ) -> grpc.UnaryUnaryMultiCallable:
694 # pytype: disable=wrong-arg-count
695 thunk = lambda m: self._channel.unary_unary(
696 m,
697 request_serializer,
698 response_deserializer,
699 _registered_method,
700 )
701 # pytype: enable=wrong-arg-count
702 if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor):
703 return _UnaryUnaryMultiCallable(thunk, method, self._interceptor)
704 return thunk(method)
706 # pylint: disable=arguments-differ
707 def unary_stream(
708 self,
709 method: str,
710 request_serializer: Optional[SerializingFunction] = None,
711 response_deserializer: Optional[DeserializingFunction] = None,
712 _registered_method: Optional[bool] = False,
713 ) -> grpc.UnaryStreamMultiCallable:
714 # pytype: disable=wrong-arg-count
715 thunk = lambda m: self._channel.unary_stream(
716 m,
717 request_serializer,
718 response_deserializer,
719 _registered_method,
720 )
721 # pytype: enable=wrong-arg-count
722 if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor):
723 return _UnaryStreamMultiCallable(thunk, method, self._interceptor)
724 return thunk(method)
726 # pylint: disable=arguments-differ
727 def stream_unary(
728 self,
729 method: str,
730 request_serializer: Optional[SerializingFunction] = None,
731 response_deserializer: Optional[DeserializingFunction] = None,
732 _registered_method: Optional[bool] = False,
733 ) -> grpc.StreamUnaryMultiCallable:
734 # pytype: disable=wrong-arg-count
735 thunk = lambda m: self._channel.stream_unary(
736 m,
737 request_serializer,
738 response_deserializer,
739 _registered_method,
740 )
741 # pytype: enable=wrong-arg-count
742 if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor):
743 return _StreamUnaryMultiCallable(thunk, method, self._interceptor)
744 return thunk(method)
746 # pylint: disable=arguments-differ
747 def stream_stream(
748 self,
749 method: str,
750 request_serializer: Optional[SerializingFunction] = None,
751 response_deserializer: Optional[DeserializingFunction] = None,
752 _registered_method: Optional[bool] = False,
753 ) -> grpc.StreamStreamMultiCallable:
754 # pytype: disable=wrong-arg-count
755 thunk = lambda m: self._channel.stream_stream(
756 m,
757 request_serializer,
758 response_deserializer,
759 _registered_method,
760 )
761 # pytype: enable=wrong-arg-count
762 if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor):
763 return _StreamStreamMultiCallable(thunk, method, self._interceptor)
764 return thunk(method)
766 def _close(self):
767 self._channel.close()
769 def __enter__(self):
770 return self
772 def __exit__(self, exc_type, exc_val, exc_tb):
773 self._close()
774 return False
776 def close(self):
777 self._channel.close()
780def intercept_channel(
781 channel: grpc.Channel,
782 *interceptors: Optional[
783 Sequence[
784 Union[
785 grpc.UnaryUnaryClientInterceptor,
786 grpc.UnaryStreamClientInterceptor,
787 grpc.StreamStreamClientInterceptor,
788 grpc.StreamUnaryClientInterceptor,
789 ]
790 ]
791 ],
792) -> grpc.Channel:
793 for interceptor in reversed(list(interceptors)):
794 if (
795 not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor)
796 and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor)
797 and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor)
798 and not isinstance(interceptor, grpc.StreamStreamClientInterceptor)
799 ):
800 error_msg = (
801 "interceptor must be "
802 "grpc.UnaryUnaryClientInterceptor or "
803 "grpc.UnaryStreamClientInterceptor or "
804 "grpc.StreamUnaryClientInterceptor or "
805 "grpc.StreamStreamClientInterceptor"
806 )
807 raise TypeError(error_msg)
808 channel = _Channel(channel, interceptor)
809 return channel