Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 26%
916 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:45 +0000
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Iterator,
28 List,
29 Optional,
30 Sequence,
31 Set,
32 Tuple,
33 Union,
34)
36import grpc # pytype: disable=pyi-error
37from grpc import _common # pytype: disable=pyi-error
38from grpc import _compression # pytype: disable=pyi-error
39from grpc import _grpcio_metadata # pytype: disable=pyi-error
40from grpc import _observability # pytype: disable=pyi-error
41from grpc._cython import cygrpc
42from grpc._typing import ChannelArgumentType
43from grpc._typing import DeserializingFunction
44from grpc._typing import IntegratedCallFactory
45from grpc._typing import MetadataType
46from grpc._typing import NullaryCallbackType
47from grpc._typing import ResponseType
48from grpc._typing import SerializingFunction
49from grpc._typing import UserTag
50import grpc.experimental # pytype: disable=pyi-error
52_LOGGER = logging.getLogger(__name__)
54_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
56_EMPTY_FLAGS = 0
58# NOTE(rbellevi): No guarantees are given about the maintenance of this
59# environment variable.
60_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
61 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
62)
64_UNARY_UNARY_INITIAL_DUE = (
65 cygrpc.OperationType.send_initial_metadata,
66 cygrpc.OperationType.send_message,
67 cygrpc.OperationType.send_close_from_client,
68 cygrpc.OperationType.receive_initial_metadata,
69 cygrpc.OperationType.receive_message,
70 cygrpc.OperationType.receive_status_on_client,
71)
72_UNARY_STREAM_INITIAL_DUE = (
73 cygrpc.OperationType.send_initial_metadata,
74 cygrpc.OperationType.send_message,
75 cygrpc.OperationType.send_close_from_client,
76 cygrpc.OperationType.receive_initial_metadata,
77 cygrpc.OperationType.receive_status_on_client,
78)
79_STREAM_UNARY_INITIAL_DUE = (
80 cygrpc.OperationType.send_initial_metadata,
81 cygrpc.OperationType.receive_initial_metadata,
82 cygrpc.OperationType.receive_message,
83 cygrpc.OperationType.receive_status_on_client,
84)
85_STREAM_STREAM_INITIAL_DUE = (
86 cygrpc.OperationType.send_initial_metadata,
87 cygrpc.OperationType.receive_initial_metadata,
88 cygrpc.OperationType.receive_status_on_client,
89)
91_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
92 "Exception calling channel subscription callback!"
93)
95_OK_RENDEZVOUS_REPR_FORMAT = (
96 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
97)
99_NON_OK_RENDEZVOUS_REPR_FORMAT = (
100 "<{} of RPC that terminated with:\n"
101 "\tstatus = {}\n"
102 '\tdetails = "{}"\n'
103 '\tdebug_error_string = "{}"\n'
104 ">"
105)
108def _deadline(timeout: Optional[float]) -> Optional[float]:
109 return None if timeout is None else time.time() + timeout
112def _unknown_code_details(
113 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
114) -> str:
115 return 'Server sent unknown code {} and details "{}"'.format(
116 unknown_cygrpc_code, details
117 )
120class _RPCState(object):
121 condition: threading.Condition
122 due: Set[cygrpc.OperationType]
123 initial_metadata: Optional[MetadataType]
124 response: Any
125 trailing_metadata: Optional[MetadataType]
126 code: Optional[grpc.StatusCode]
127 details: Optional[str]
128 debug_error_string: Optional[str]
129 cancelled: bool
130 callbacks: List[NullaryCallbackType]
131 fork_epoch: Optional[int]
132 rpc_start_time: Optional[float] # In relative seconds
133 rpc_end_time: Optional[float] # In relative seconds
134 method: Optional[str]
136 def __init__(
137 self,
138 due: Sequence[cygrpc.OperationType],
139 initial_metadata: Optional[MetadataType],
140 trailing_metadata: Optional[MetadataType],
141 code: Optional[grpc.StatusCode],
142 details: Optional[str],
143 ):
144 # `condition` guards all members of _RPCState. `notify_all` is called on
145 # `condition` when the state of the RPC has changed.
146 self.condition = threading.Condition()
148 # The cygrpc.OperationType objects representing events due from the RPC's
149 # completion queue. If an operation is in `due`, it is guaranteed that
150 # `operate()` has been called on a corresponding operation. But the
151 # converse is not true. That is, in the case of failed `operate()`
152 # calls, there may briefly be events in `due` that do not correspond to
153 # operations submitted to Core.
154 self.due = set(due)
155 self.initial_metadata = initial_metadata
156 self.response = None
157 self.trailing_metadata = trailing_metadata
158 self.code = code
159 self.details = details
160 self.debug_error_string = None
161 # The following three fields are used for observability.
162 # Updates to those fields do not trigger self.condition.
163 self.rpc_start_time = None
164 self.rpc_end_time = None
165 self.method = None
167 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
168 # slightly wonky, so they have to be tracked separately from the rest of the
169 # result of the RPC. This field tracks whether cancellation was requested
170 # prior to termination of the RPC.
171 self.cancelled = False
172 self.callbacks = []
173 self.fork_epoch = cygrpc.get_fork_epoch()
175 def reset_postfork_child(self):
176 self.condition = threading.Condition()
179def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
180 if state.code is None:
181 state.code = code
182 state.details = details
183 if state.initial_metadata is None:
184 state.initial_metadata = ()
185 state.trailing_metadata = ()
188def _handle_event(
189 event: cygrpc.BaseEvent,
190 state: _RPCState,
191 response_deserializer: Optional[DeserializingFunction],
192) -> List[NullaryCallbackType]:
193 callbacks = []
194 for batch_operation in event.batch_operations:
195 operation_type = batch_operation.type()
196 state.due.remove(operation_type)
197 if operation_type == cygrpc.OperationType.receive_initial_metadata:
198 state.initial_metadata = batch_operation.initial_metadata()
199 elif operation_type == cygrpc.OperationType.receive_message:
200 serialized_response = batch_operation.message()
201 if serialized_response is not None:
202 response = _common.deserialize(
203 serialized_response, response_deserializer
204 )
205 if response is None:
206 details = "Exception deserializing response!"
207 _abort(state, grpc.StatusCode.INTERNAL, details)
208 else:
209 state.response = response
210 elif operation_type == cygrpc.OperationType.receive_status_on_client:
211 state.trailing_metadata = batch_operation.trailing_metadata()
212 if state.code is None:
213 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
214 batch_operation.code()
215 )
216 if code is None:
217 state.code = grpc.StatusCode.UNKNOWN
218 state.details = _unknown_code_details(
219 code, batch_operation.details()
220 )
221 else:
222 state.code = code
223 state.details = batch_operation.details()
224 state.debug_error_string = batch_operation.error_string()
225 state.rpc_end_time = time.perf_counter()
226 _observability.maybe_record_rpc_latency(state)
227 callbacks.extend(state.callbacks)
228 state.callbacks = None
229 return callbacks
232def _event_handler(
233 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
234) -> UserTag:
235 def handle_event(event):
236 with state.condition:
237 callbacks = _handle_event(event, state, response_deserializer)
238 state.condition.notify_all()
239 done = not state.due
240 for callback in callbacks:
241 try:
242 callback()
243 except Exception as e: # pylint: disable=broad-except
244 # NOTE(rbellevi): We suppress but log errors here so as not to
245 # kill the channel spin thread.
246 logging.error(
247 "Exception in callback %s: %s", repr(callback.func), repr(e)
248 )
249 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
251 return handle_event
254# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
255# pylint: disable=too-many-statements
256def _consume_request_iterator(
257 request_iterator: Iterator,
258 state: _RPCState,
259 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
260 request_serializer: SerializingFunction,
261 event_handler: Optional[UserTag],
262) -> None:
263 """Consume a request supplied by the user."""
265 def consume_request_iterator(): # pylint: disable=too-many-branches
266 # Iterate over the request iterator until it is exhausted or an error
267 # condition is encountered.
268 while True:
269 return_from_user_request_generator_invoked = False
270 try:
271 # The thread may die in user-code. Do not block fork for this.
272 cygrpc.enter_user_request_generator()
273 request = next(request_iterator)
274 except StopIteration:
275 break
276 except Exception: # pylint: disable=broad-except
277 cygrpc.return_from_user_request_generator()
278 return_from_user_request_generator_invoked = True
279 code = grpc.StatusCode.UNKNOWN
280 details = "Exception iterating requests!"
281 _LOGGER.exception(details)
282 call.cancel(
283 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
284 )
285 _abort(state, code, details)
286 return
287 finally:
288 if not return_from_user_request_generator_invoked:
289 cygrpc.return_from_user_request_generator()
290 serialized_request = _common.serialize(request, request_serializer)
291 with state.condition:
292 if state.code is None and not state.cancelled:
293 if serialized_request is None:
294 code = grpc.StatusCode.INTERNAL
295 details = "Exception serializing request!"
296 call.cancel(
297 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
298 details,
299 )
300 _abort(state, code, details)
301 return
302 else:
303 state.due.add(cygrpc.OperationType.send_message)
304 operations = (
305 cygrpc.SendMessageOperation(
306 serialized_request, _EMPTY_FLAGS
307 ),
308 )
309 operating = call.operate(operations, event_handler)
310 if not operating:
311 state.due.remove(cygrpc.OperationType.send_message)
312 return
314 def _done():
315 return (
316 state.code is not None
317 or cygrpc.OperationType.send_message
318 not in state.due
319 )
321 _common.wait(
322 state.condition.wait,
323 _done,
324 spin_cb=functools.partial(
325 cygrpc.block_if_fork_in_progress, state
326 ),
327 )
328 if state.code is not None:
329 return
330 else:
331 return
332 with state.condition:
333 if state.code is None:
334 state.due.add(cygrpc.OperationType.send_close_from_client)
335 operations = (
336 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
337 )
338 operating = call.operate(operations, event_handler)
339 if not operating:
340 state.due.remove(
341 cygrpc.OperationType.send_close_from_client
342 )
344 consumption_thread = cygrpc.ForkManagedThread(
345 target=consume_request_iterator
346 )
347 consumption_thread.setDaemon(True)
348 consumption_thread.start()
351def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
352 """Calculates error string for RPC."""
353 with rpc_state.condition:
354 if rpc_state.code is None:
355 return "<{} object>".format(class_name)
356 elif rpc_state.code is grpc.StatusCode.OK:
357 return _OK_RENDEZVOUS_REPR_FORMAT.format(
358 class_name, rpc_state.code, rpc_state.details
359 )
360 else:
361 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
362 class_name,
363 rpc_state.code,
364 rpc_state.details,
365 rpc_state.debug_error_string,
366 )
369class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
370 """An RPC error not tied to the execution of a particular RPC.
372 The RPC represented by the state object must not be in-progress or
373 cancelled.
375 Attributes:
376 _state: An instance of _RPCState.
377 """
379 _state: _RPCState
381 def __init__(self, state: _RPCState):
382 with state.condition:
383 self._state = _RPCState(
384 (),
385 copy.deepcopy(state.initial_metadata),
386 copy.deepcopy(state.trailing_metadata),
387 state.code,
388 copy.deepcopy(state.details),
389 )
390 self._state.response = copy.copy(state.response)
391 self._state.debug_error_string = copy.copy(state.debug_error_string)
393 def initial_metadata(self) -> Optional[MetadataType]:
394 return self._state.initial_metadata
396 def trailing_metadata(self) -> Optional[MetadataType]:
397 return self._state.trailing_metadata
399 def code(self) -> Optional[grpc.StatusCode]:
400 return self._state.code
402 def details(self) -> Optional[str]:
403 return _common.decode(self._state.details)
405 def debug_error_string(self) -> Optional[str]:
406 return _common.decode(self._state.debug_error_string)
408 def _repr(self) -> str:
409 return _rpc_state_string(self.__class__.__name__, self._state)
411 def __repr__(self) -> str:
412 return self._repr()
414 def __str__(self) -> str:
415 return self._repr()
417 def cancel(self) -> bool:
418 """See grpc.Future.cancel."""
419 return False
421 def cancelled(self) -> bool:
422 """See grpc.Future.cancelled."""
423 return False
425 def running(self) -> bool:
426 """See grpc.Future.running."""
427 return False
429 def done(self) -> bool:
430 """See grpc.Future.done."""
431 return True
433 def result(
434 self, timeout: Optional[float] = None
435 ) -> Any: # pylint: disable=unused-argument
436 """See grpc.Future.result."""
437 raise self
439 def exception(
440 self, timeout: Optional[float] = None # pylint: disable=unused-argument
441 ) -> Optional[Exception]:
442 """See grpc.Future.exception."""
443 return self
445 def traceback(
446 self, timeout: Optional[float] = None # pylint: disable=unused-argument
447 ) -> Optional[types.TracebackType]:
448 """See grpc.Future.traceback."""
449 try:
450 raise self
451 except grpc.RpcError:
452 return sys.exc_info()[2]
454 def add_done_callback(
455 self,
456 fn: Callable[[grpc.Future], None],
457 timeout: Optional[float] = None, # pylint: disable=unused-argument
458 ) -> None:
459 """See grpc.Future.add_done_callback."""
460 fn(self)
463class _Rendezvous(grpc.RpcError, grpc.RpcContext):
464 """An RPC iterator.
466 Attributes:
467 _state: An instance of _RPCState.
468 _call: An instance of SegregatedCall or IntegratedCall.
469 In either case, the _call object is expected to have operate, cancel,
470 and next_event methods.
471 _response_deserializer: A callable taking bytes and return a Python
472 object.
473 _deadline: A float representing the deadline of the RPC in seconds. Or
474 possibly None, to represent an RPC with no deadline at all.
475 """
477 _state: _RPCState
478 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
479 _response_deserializer: Optional[DeserializingFunction]
480 _deadline: Optional[float]
482 def __init__(
483 self,
484 state: _RPCState,
485 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
486 response_deserializer: Optional[DeserializingFunction],
487 deadline: Optional[float],
488 ):
489 super(_Rendezvous, self).__init__()
490 self._state = state
491 self._call = call
492 self._response_deserializer = response_deserializer
493 self._deadline = deadline
495 def is_active(self) -> bool:
496 """See grpc.RpcContext.is_active"""
497 with self._state.condition:
498 return self._state.code is None
500 def time_remaining(self) -> Optional[float]:
501 """See grpc.RpcContext.time_remaining"""
502 with self._state.condition:
503 if self._deadline is None:
504 return None
505 else:
506 return max(self._deadline - time.time(), 0)
508 def cancel(self) -> bool:
509 """See grpc.RpcContext.cancel"""
510 with self._state.condition:
511 if self._state.code is None:
512 code = grpc.StatusCode.CANCELLED
513 details = "Locally cancelled by application!"
514 self._call.cancel(
515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
516 )
517 self._state.cancelled = True
518 _abort(self._state, code, details)
519 self._state.condition.notify_all()
520 return True
521 else:
522 return False
524 def add_callback(self, callback: NullaryCallbackType) -> bool:
525 """See grpc.RpcContext.add_callback"""
526 with self._state.condition:
527 if self._state.callbacks is None:
528 return False
529 else:
530 self._state.callbacks.append(callback)
531 return True
533 def __iter__(self):
534 return self
536 def next(self):
537 return self._next()
539 def __next__(self):
540 return self._next()
542 def _next(self):
543 raise NotImplementedError()
545 def debug_error_string(self) -> Optional[str]:
546 raise NotImplementedError()
548 def _repr(self) -> str:
549 return _rpc_state_string(self.__class__.__name__, self._state)
551 def __repr__(self) -> str:
552 return self._repr()
554 def __str__(self) -> str:
555 return self._repr()
557 def __del__(self) -> None:
558 with self._state.condition:
559 if self._state.code is None:
560 self._state.code = grpc.StatusCode.CANCELLED
561 self._state.details = "Cancelled upon garbage collection!"
562 self._state.cancelled = True
563 self._call.cancel(
564 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
565 self._state.details,
566 )
567 self._state.condition.notify_all()
570class _SingleThreadedRendezvous(
571 _Rendezvous, grpc.Call, grpc.Future
572): # pylint: disable=too-many-ancestors
573 """An RPC iterator operating entirely on a single thread.
575 The __next__ method of _SingleThreadedRendezvous does not depend on the
576 existence of any other thread, including the "channel spin thread".
577 However, this means that its interface is entirely synchronous. So this
578 class cannot completely fulfill the grpc.Future interface. The result,
579 exception, and traceback methods will never block and will instead raise
580 an exception if calling the method would result in blocking.
582 This means that these methods are safe to call from add_done_callback
583 handlers.
584 """
586 _state: _RPCState
588 def _is_complete(self) -> bool:
589 return self._state.code is not None
591 def cancelled(self) -> bool:
592 with self._state.condition:
593 return self._state.cancelled
595 def running(self) -> bool:
596 with self._state.condition:
597 return self._state.code is None
599 def done(self) -> bool:
600 with self._state.condition:
601 return self._state.code is not None
603 def result(self, timeout: Optional[float] = None) -> Any:
604 """Returns the result of the computation or raises its exception.
606 This method will never block. Instead, it will raise an exception
607 if calling this method would otherwise result in blocking.
609 Since this method will never block, any `timeout` argument passed will
610 be ignored.
611 """
612 del timeout
613 with self._state.condition:
614 if not self._is_complete():
615 raise grpc.experimental.UsageError(
616 "_SingleThreadedRendezvous only supports result() when the"
617 " RPC is complete."
618 )
619 if self._state.code is grpc.StatusCode.OK:
620 return self._state.response
621 elif self._state.cancelled:
622 raise grpc.FutureCancelledError()
623 else:
624 raise self
626 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
627 """Return the exception raised by the computation.
629 This method will never block. Instead, it will raise an exception
630 if calling this method would otherwise result in blocking.
632 Since this method will never block, any `timeout` argument passed will
633 be ignored.
634 """
635 del timeout
636 with self._state.condition:
637 if not self._is_complete():
638 raise grpc.experimental.UsageError(
639 "_SingleThreadedRendezvous only supports exception() when"
640 " the RPC is complete."
641 )
642 if self._state.code is grpc.StatusCode.OK:
643 return None
644 elif self._state.cancelled:
645 raise grpc.FutureCancelledError()
646 else:
647 return self
649 def traceback(
650 self, timeout: Optional[float] = None
651 ) -> Optional[types.TracebackType]:
652 """Access the traceback of the exception raised by the computation.
654 This method will never block. Instead, it will raise an exception
655 if calling this method would otherwise result in blocking.
657 Since this method will never block, any `timeout` argument passed will
658 be ignored.
659 """
660 del timeout
661 with self._state.condition:
662 if not self._is_complete():
663 raise grpc.experimental.UsageError(
664 "_SingleThreadedRendezvous only supports traceback() when"
665 " the RPC is complete."
666 )
667 if self._state.code is grpc.StatusCode.OK:
668 return None
669 elif self._state.cancelled:
670 raise grpc.FutureCancelledError()
671 else:
672 try:
673 raise self
674 except grpc.RpcError:
675 return sys.exc_info()[2]
677 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
678 with self._state.condition:
679 if self._state.code is None:
680 self._state.callbacks.append(functools.partial(fn, self))
681 return
683 fn(self)
685 def initial_metadata(self) -> Optional[MetadataType]:
686 """See grpc.Call.initial_metadata"""
687 with self._state.condition:
688 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
689 # to receive initial metadata before any messages.
690 while self._state.initial_metadata is None:
691 self._consume_next_event()
692 return self._state.initial_metadata
694 def trailing_metadata(self) -> Optional[MetadataType]:
695 """See grpc.Call.trailing_metadata"""
696 with self._state.condition:
697 if self._state.trailing_metadata is None:
698 raise grpc.experimental.UsageError(
699 "Cannot get trailing metadata until RPC is completed."
700 )
701 return self._state.trailing_metadata
703 def code(self) -> Optional[grpc.StatusCode]:
704 """See grpc.Call.code"""
705 with self._state.condition:
706 if self._state.code is None:
707 raise grpc.experimental.UsageError(
708 "Cannot get code until RPC is completed."
709 )
710 return self._state.code
712 def details(self) -> Optional[str]:
713 """See grpc.Call.details"""
714 with self._state.condition:
715 if self._state.details is None:
716 raise grpc.experimental.UsageError(
717 "Cannot get details until RPC is completed."
718 )
719 return _common.decode(self._state.details)
721 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
722 event = self._call.next_event()
723 with self._state.condition:
724 callbacks = _handle_event(
725 event, self._state, self._response_deserializer
726 )
727 for callback in callbacks:
728 # NOTE(gnossen): We intentionally allow exceptions to bubble up
729 # to the user when running on a single thread.
730 callback()
731 return event
733 def _next_response(self) -> Any:
734 while True:
735 self._consume_next_event()
736 with self._state.condition:
737 if self._state.response is not None:
738 response = self._state.response
739 self._state.response = None
740 return response
741 elif (
742 cygrpc.OperationType.receive_message not in self._state.due
743 ):
744 if self._state.code is grpc.StatusCode.OK:
745 raise StopIteration()
746 elif self._state.code is not None:
747 raise self
749 def _next(self) -> Any:
750 with self._state.condition:
751 if self._state.code is None:
752 # We tentatively add the operation as expected and remove
753 # it if the enqueue operation fails. This allows us to guarantee that
754 # if an event has been submitted to the core completion queue,
755 # it is in `due`. If we waited until after a successful
756 # enqueue operation then a signal could interrupt this
757 # thread between the enqueue operation and the addition of the
758 # operation to `due`. This would cause an exception on the
759 # channel spin thread when the operation completes and no
760 # corresponding operation would be present in state.due.
761 # Note that, since `condition` is held through this block, there is
762 # no data race on `due`.
763 self._state.due.add(cygrpc.OperationType.receive_message)
764 operating = self._call.operate(
765 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
766 )
767 if not operating:
768 self._state.due.remove(cygrpc.OperationType.receive_message)
769 elif self._state.code is grpc.StatusCode.OK:
770 raise StopIteration()
771 else:
772 raise self
773 return self._next_response()
775 def debug_error_string(self) -> Optional[str]:
776 with self._state.condition:
777 if self._state.debug_error_string is None:
778 raise grpc.experimental.UsageError(
779 "Cannot get debug error string until RPC is completed."
780 )
781 return _common.decode(self._state.debug_error_string)
784class _MultiThreadedRendezvous(
785 _Rendezvous, grpc.Call, grpc.Future
786): # pylint: disable=too-many-ancestors
787 """An RPC iterator that depends on a channel spin thread.
789 This iterator relies upon a per-channel thread running in the background,
790 dequeueing events from the completion queue, and notifying threads waiting
791 on the threading.Condition object in the _RPCState object.
793 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
794 and to mediate a bidirection streaming RPC.
795 """
797 _state: _RPCState
799 def initial_metadata(self) -> Optional[MetadataType]:
800 """See grpc.Call.initial_metadata"""
801 with self._state.condition:
803 def _done():
804 return self._state.initial_metadata is not None
806 _common.wait(self._state.condition.wait, _done)
807 return self._state.initial_metadata
809 def trailing_metadata(self) -> Optional[MetadataType]:
810 """See grpc.Call.trailing_metadata"""
811 with self._state.condition:
813 def _done():
814 return self._state.trailing_metadata is not None
816 _common.wait(self._state.condition.wait, _done)
817 return self._state.trailing_metadata
819 def code(self) -> Optional[grpc.StatusCode]:
820 """See grpc.Call.code"""
821 with self._state.condition:
823 def _done():
824 return self._state.code is not None
826 _common.wait(self._state.condition.wait, _done)
827 return self._state.code
829 def details(self) -> Optional[str]:
830 """See grpc.Call.details"""
831 with self._state.condition:
833 def _done():
834 return self._state.details is not None
836 _common.wait(self._state.condition.wait, _done)
837 return _common.decode(self._state.details)
839 def debug_error_string(self) -> Optional[str]:
840 with self._state.condition:
842 def _done():
843 return self._state.debug_error_string is not None
845 _common.wait(self._state.condition.wait, _done)
846 return _common.decode(self._state.debug_error_string)
848 def cancelled(self) -> bool:
849 with self._state.condition:
850 return self._state.cancelled
852 def running(self) -> bool:
853 with self._state.condition:
854 return self._state.code is None
856 def done(self) -> bool:
857 with self._state.condition:
858 return self._state.code is not None
860 def _is_complete(self) -> bool:
861 return self._state.code is not None
863 def result(self, timeout: Optional[float] = None) -> Any:
864 """Returns the result of the computation or raises its exception.
866 See grpc.Future.result for the full API contract.
867 """
868 with self._state.condition:
869 timed_out = _common.wait(
870 self._state.condition.wait, self._is_complete, timeout=timeout
871 )
872 if timed_out:
873 raise grpc.FutureTimeoutError()
874 else:
875 if self._state.code is grpc.StatusCode.OK:
876 return self._state.response
877 elif self._state.cancelled:
878 raise grpc.FutureCancelledError()
879 else:
880 raise self
882 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
883 """Return the exception raised by the computation.
885 See grpc.Future.exception for the full API contract.
886 """
887 with self._state.condition:
888 timed_out = _common.wait(
889 self._state.condition.wait, self._is_complete, timeout=timeout
890 )
891 if timed_out:
892 raise grpc.FutureTimeoutError()
893 else:
894 if self._state.code is grpc.StatusCode.OK:
895 return None
896 elif self._state.cancelled:
897 raise grpc.FutureCancelledError()
898 else:
899 return self
901 def traceback(
902 self, timeout: Optional[float] = None
903 ) -> Optional[types.TracebackType]:
904 """Access the traceback of the exception raised by the computation.
906 See grpc.future.traceback for the full API contract.
907 """
908 with self._state.condition:
909 timed_out = _common.wait(
910 self._state.condition.wait, self._is_complete, timeout=timeout
911 )
912 if timed_out:
913 raise grpc.FutureTimeoutError()
914 else:
915 if self._state.code is grpc.StatusCode.OK:
916 return None
917 elif self._state.cancelled:
918 raise grpc.FutureCancelledError()
919 else:
920 try:
921 raise self
922 except grpc.RpcError:
923 return sys.exc_info()[2]
925 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
926 with self._state.condition:
927 if self._state.code is None:
928 self._state.callbacks.append(functools.partial(fn, self))
929 return
931 fn(self)
933 def _next(self) -> Any:
934 with self._state.condition:
935 if self._state.code is None:
936 event_handler = _event_handler(
937 self._state, self._response_deserializer
938 )
939 self._state.due.add(cygrpc.OperationType.receive_message)
940 operating = self._call.operate(
941 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
942 event_handler,
943 )
944 if not operating:
945 self._state.due.remove(cygrpc.OperationType.receive_message)
946 elif self._state.code is grpc.StatusCode.OK:
947 raise StopIteration()
948 else:
949 raise self
951 def _response_ready():
952 return self._state.response is not None or (
953 cygrpc.OperationType.receive_message not in self._state.due
954 and self._state.code is not None
955 )
957 _common.wait(self._state.condition.wait, _response_ready)
958 if self._state.response is not None:
959 response = self._state.response
960 self._state.response = None
961 return response
962 elif cygrpc.OperationType.receive_message not in self._state.due:
963 if self._state.code is grpc.StatusCode.OK:
964 raise StopIteration()
965 elif self._state.code is not None:
966 raise self
969def _start_unary_request(
970 request: Any,
971 timeout: Optional[float],
972 request_serializer: SerializingFunction,
973) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
974 deadline = _deadline(timeout)
975 serialized_request = _common.serialize(request, request_serializer)
976 if serialized_request is None:
977 state = _RPCState(
978 (),
979 (),
980 (),
981 grpc.StatusCode.INTERNAL,
982 "Exception serializing request!",
983 )
984 error = _InactiveRpcError(state)
985 return deadline, None, error
986 else:
987 return deadline, serialized_request, None
990def _end_unary_response_blocking(
991 state: _RPCState,
992 call: cygrpc.SegregatedCall,
993 with_call: bool,
994 deadline: Optional[float],
995) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
996 if state.code is grpc.StatusCode.OK:
997 if with_call:
998 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
999 return state.response, rendezvous
1000 else:
1001 return state.response
1002 else:
1003 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
1006def _stream_unary_invocation_operations(
1007 metadata: Optional[MetadataType], initial_metadata_flags: int
1008) -> Sequence[Sequence[cygrpc.Operation]]:
1009 return (
1010 (
1011 cygrpc.SendInitialMetadataOperation(
1012 metadata, initial_metadata_flags
1013 ),
1014 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1015 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1016 ),
1017 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1018 )
1021def _stream_unary_invocation_operations_and_tags(
1022 metadata: Optional[MetadataType], initial_metadata_flags: int
1023) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1024 return tuple(
1025 (
1026 operations,
1027 None,
1028 )
1029 for operations in _stream_unary_invocation_operations(
1030 metadata, initial_metadata_flags
1031 )
1032 )
1035def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1036 parent_deadline = cygrpc.get_deadline_from_context()
1037 if parent_deadline is None and user_deadline is None:
1038 return None
1039 elif parent_deadline is not None and user_deadline is None:
1040 return parent_deadline
1041 elif user_deadline is not None and parent_deadline is None:
1042 return user_deadline
1043 else:
1044 return min(parent_deadline, user_deadline)
1047class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1048 _channel: cygrpc.Channel
1049 _managed_call: IntegratedCallFactory
1050 _method: bytes
1051 _request_serializer: Optional[SerializingFunction]
1052 _response_deserializer: Optional[DeserializingFunction]
1053 _context: Any
1055 # pylint: disable=too-many-arguments
1056 def __init__(
1057 self,
1058 channel: cygrpc.Channel,
1059 managed_call: IntegratedCallFactory,
1060 method: bytes,
1061 request_serializer: Optional[SerializingFunction],
1062 response_deserializer: Optional[DeserializingFunction],
1063 ):
1064 self._channel = channel
1065 self._managed_call = managed_call
1066 self._method = method
1067 self._request_serializer = request_serializer
1068 self._response_deserializer = response_deserializer
1069 self._context = cygrpc.build_census_context()
1071 def _prepare(
1072 self,
1073 request: Any,
1074 timeout: Optional[float],
1075 metadata: Optional[MetadataType],
1076 wait_for_ready: Optional[bool],
1077 compression: Optional[grpc.Compression],
1078 ) -> Tuple[
1079 Optional[_RPCState],
1080 Optional[Sequence[cygrpc.Operation]],
1081 Optional[float],
1082 Optional[grpc.RpcError],
1083 ]:
1084 deadline, serialized_request, rendezvous = _start_unary_request(
1085 request, timeout, self._request_serializer
1086 )
1087 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1088 wait_for_ready
1089 )
1090 augmented_metadata = _compression.augment_metadata(
1091 metadata, compression
1092 )
1093 if serialized_request is None:
1094 return None, None, None, rendezvous
1095 else:
1096 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1097 operations = (
1098 cygrpc.SendInitialMetadataOperation(
1099 augmented_metadata, initial_metadata_flags
1100 ),
1101 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1102 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1103 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1104 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1105 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1106 )
1107 return state, operations, deadline, None
1109 def _blocking(
1110 self,
1111 request: Any,
1112 timeout: Optional[float] = None,
1113 metadata: Optional[MetadataType] = None,
1114 credentials: Optional[grpc.CallCredentials] = None,
1115 wait_for_ready: Optional[bool] = None,
1116 compression: Optional[grpc.Compression] = None,
1117 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1118 state, operations, deadline, rendezvous = self._prepare(
1119 request, timeout, metadata, wait_for_ready, compression
1120 )
1121 if state is None:
1122 raise rendezvous # pylint: disable-msg=raising-bad-type
1123 else:
1124 state.rpc_start_time = time.perf_counter()
1125 state.method = _common.decode(self._method)
1126 call = self._channel.segregated_call(
1127 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1128 self._method,
1129 None,
1130 _determine_deadline(deadline),
1131 metadata,
1132 None if credentials is None else credentials._credentials,
1133 (
1134 (
1135 operations,
1136 None,
1137 ),
1138 ),
1139 self._context,
1140 )
1141 event = call.next_event()
1142 _handle_event(event, state, self._response_deserializer)
1143 return state, call
1145 def __call__(
1146 self,
1147 request: Any,
1148 timeout: Optional[float] = None,
1149 metadata: Optional[MetadataType] = None,
1150 credentials: Optional[grpc.CallCredentials] = None,
1151 wait_for_ready: Optional[bool] = None,
1152 compression: Optional[grpc.Compression] = None,
1153 ) -> Any:
1154 (
1155 state,
1156 call,
1157 ) = self._blocking(
1158 request, timeout, metadata, credentials, wait_for_ready, compression
1159 )
1160 return _end_unary_response_blocking(state, call, False, None)
1162 def with_call(
1163 self,
1164 request: Any,
1165 timeout: Optional[float] = None,
1166 metadata: Optional[MetadataType] = None,
1167 credentials: Optional[grpc.CallCredentials] = None,
1168 wait_for_ready: Optional[bool] = None,
1169 compression: Optional[grpc.Compression] = None,
1170 ) -> Tuple[Any, grpc.Call]:
1171 (
1172 state,
1173 call,
1174 ) = self._blocking(
1175 request, timeout, metadata, credentials, wait_for_ready, compression
1176 )
1177 return _end_unary_response_blocking(state, call, True, None)
1179 def future(
1180 self,
1181 request: Any,
1182 timeout: Optional[float] = None,
1183 metadata: Optional[MetadataType] = None,
1184 credentials: Optional[grpc.CallCredentials] = None,
1185 wait_for_ready: Optional[bool] = None,
1186 compression: Optional[grpc.Compression] = None,
1187 ) -> _MultiThreadedRendezvous:
1188 state, operations, deadline, rendezvous = self._prepare(
1189 request, timeout, metadata, wait_for_ready, compression
1190 )
1191 if state is None:
1192 raise rendezvous # pylint: disable-msg=raising-bad-type
1193 else:
1194 event_handler = _event_handler(state, self._response_deserializer)
1195 state.rpc_start_time = time.perf_counter()
1196 state.method = _common.decode(self._method)
1197 call = self._managed_call(
1198 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1199 self._method,
1200 None,
1201 deadline,
1202 metadata,
1203 None if credentials is None else credentials._credentials,
1204 (operations,),
1205 event_handler,
1206 self._context,
1207 )
1208 return _MultiThreadedRendezvous(
1209 state, call, self._response_deserializer, deadline
1210 )
1213class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1214 _channel: cygrpc.Channel
1215 _method: bytes
1216 _request_serializer: Optional[SerializingFunction]
1217 _response_deserializer: Optional[DeserializingFunction]
1218 _context: Any
1220 # pylint: disable=too-many-arguments
1221 def __init__(
1222 self,
1223 channel: cygrpc.Channel,
1224 method: bytes,
1225 request_serializer: SerializingFunction,
1226 response_deserializer: DeserializingFunction,
1227 ):
1228 self._channel = channel
1229 self._method = method
1230 self._request_serializer = request_serializer
1231 self._response_deserializer = response_deserializer
1232 self._context = cygrpc.build_census_context()
1234 def __call__( # pylint: disable=too-many-locals
1235 self,
1236 request: Any,
1237 timeout: Optional[float] = None,
1238 metadata: Optional[MetadataType] = None,
1239 credentials: Optional[grpc.CallCredentials] = None,
1240 wait_for_ready: Optional[bool] = None,
1241 compression: Optional[grpc.Compression] = None,
1242 ) -> _SingleThreadedRendezvous:
1243 deadline = _deadline(timeout)
1244 serialized_request = _common.serialize(
1245 request, self._request_serializer
1246 )
1247 if serialized_request is None:
1248 state = _RPCState(
1249 (),
1250 (),
1251 (),
1252 grpc.StatusCode.INTERNAL,
1253 "Exception serializing request!",
1254 )
1255 raise _InactiveRpcError(state)
1257 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1258 call_credentials = (
1259 None if credentials is None else credentials._credentials
1260 )
1261 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1262 wait_for_ready
1263 )
1264 augmented_metadata = _compression.augment_metadata(
1265 metadata, compression
1266 )
1267 operations = (
1268 (
1269 cygrpc.SendInitialMetadataOperation(
1270 augmented_metadata, initial_metadata_flags
1271 ),
1272 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1273 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1274 ),
1275 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1276 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1277 )
1278 operations_and_tags = tuple((ops, None) for ops in operations)
1279 state.rpc_start_time = time.perf_counter()
1280 state.method = _common.decode(self._method)
1281 call = self._channel.segregated_call(
1282 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1283 self._method,
1284 None,
1285 _determine_deadline(deadline),
1286 metadata,
1287 call_credentials,
1288 operations_and_tags,
1289 self._context,
1290 )
1291 return _SingleThreadedRendezvous(
1292 state, call, self._response_deserializer, deadline
1293 )
1296class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1297 _channel: cygrpc.Channel
1298 _managed_call: IntegratedCallFactory
1299 _method: bytes
1300 _request_serializer: Optional[SerializingFunction]
1301 _response_deserializer: Optional[DeserializingFunction]
1302 _context: Any
1304 # pylint: disable=too-many-arguments
1305 def __init__(
1306 self,
1307 channel: cygrpc.Channel,
1308 managed_call: IntegratedCallFactory,
1309 method: bytes,
1310 request_serializer: SerializingFunction,
1311 response_deserializer: DeserializingFunction,
1312 ):
1313 self._channel = channel
1314 self._managed_call = managed_call
1315 self._method = method
1316 self._request_serializer = request_serializer
1317 self._response_deserializer = response_deserializer
1318 self._context = cygrpc.build_census_context()
1320 def __call__( # pylint: disable=too-many-locals
1321 self,
1322 request: Any,
1323 timeout: Optional[float] = None,
1324 metadata: Optional[MetadataType] = None,
1325 credentials: Optional[grpc.CallCredentials] = None,
1326 wait_for_ready: Optional[bool] = None,
1327 compression: Optional[grpc.Compression] = None,
1328 ) -> _MultiThreadedRendezvous:
1329 deadline, serialized_request, rendezvous = _start_unary_request(
1330 request, timeout, self._request_serializer
1331 )
1332 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1333 wait_for_ready
1334 )
1335 if serialized_request is None:
1336 raise rendezvous # pylint: disable-msg=raising-bad-type
1337 else:
1338 augmented_metadata = _compression.augment_metadata(
1339 metadata, compression
1340 )
1341 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1342 operations = (
1343 (
1344 cygrpc.SendInitialMetadataOperation(
1345 augmented_metadata, initial_metadata_flags
1346 ),
1347 cygrpc.SendMessageOperation(
1348 serialized_request, _EMPTY_FLAGS
1349 ),
1350 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1351 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1352 ),
1353 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1354 )
1355 state.rpc_start_time = time.perf_counter()
1356 state.method = _common.decode(self._method)
1357 call = self._managed_call(
1358 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1359 self._method,
1360 None,
1361 _determine_deadline(deadline),
1362 metadata,
1363 None if credentials is None else credentials._credentials,
1364 operations,
1365 _event_handler(state, self._response_deserializer),
1366 self._context,
1367 )
1368 return _MultiThreadedRendezvous(
1369 state, call, self._response_deserializer, deadline
1370 )
1373class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1374 _channel: cygrpc.Channel
1375 _managed_call: IntegratedCallFactory
1376 _method: bytes
1377 _request_serializer: Optional[SerializingFunction]
1378 _response_deserializer: Optional[DeserializingFunction]
1379 _context: Any
1381 # pylint: disable=too-many-arguments
1382 def __init__(
1383 self,
1384 channel: cygrpc.Channel,
1385 managed_call: IntegratedCallFactory,
1386 method: bytes,
1387 request_serializer: Optional[SerializingFunction],
1388 response_deserializer: Optional[DeserializingFunction],
1389 ):
1390 self._channel = channel
1391 self._managed_call = managed_call
1392 self._method = method
1393 self._request_serializer = request_serializer
1394 self._response_deserializer = response_deserializer
1395 self._context = cygrpc.build_census_context()
1397 def _blocking(
1398 self,
1399 request_iterator: Iterator,
1400 timeout: Optional[float],
1401 metadata: Optional[MetadataType],
1402 credentials: Optional[grpc.CallCredentials],
1403 wait_for_ready: Optional[bool],
1404 compression: Optional[grpc.Compression],
1405 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1406 deadline = _deadline(timeout)
1407 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1408 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1409 wait_for_ready
1410 )
1411 augmented_metadata = _compression.augment_metadata(
1412 metadata, compression
1413 )
1414 state.rpc_start_time = time.perf_counter()
1415 state.method = _common.decode(self._method)
1416 call = self._channel.segregated_call(
1417 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1418 self._method,
1419 None,
1420 _determine_deadline(deadline),
1421 augmented_metadata,
1422 None if credentials is None else credentials._credentials,
1423 _stream_unary_invocation_operations_and_tags(
1424 augmented_metadata, initial_metadata_flags
1425 ),
1426 self._context,
1427 )
1428 _consume_request_iterator(
1429 request_iterator, state, call, self._request_serializer, None
1430 )
1431 while True:
1432 event = call.next_event()
1433 with state.condition:
1434 _handle_event(event, state, self._response_deserializer)
1435 state.condition.notify_all()
1436 if not state.due:
1437 break
1438 return state, call
1440 def __call__(
1441 self,
1442 request_iterator: Iterator,
1443 timeout: Optional[float] = None,
1444 metadata: Optional[MetadataType] = None,
1445 credentials: Optional[grpc.CallCredentials] = None,
1446 wait_for_ready: Optional[bool] = None,
1447 compression: Optional[grpc.Compression] = None,
1448 ) -> Any:
1449 (
1450 state,
1451 call,
1452 ) = self._blocking(
1453 request_iterator,
1454 timeout,
1455 metadata,
1456 credentials,
1457 wait_for_ready,
1458 compression,
1459 )
1460 return _end_unary_response_blocking(state, call, False, None)
1462 def with_call(
1463 self,
1464 request_iterator: Iterator,
1465 timeout: Optional[float] = None,
1466 metadata: Optional[MetadataType] = None,
1467 credentials: Optional[grpc.CallCredentials] = None,
1468 wait_for_ready: Optional[bool] = None,
1469 compression: Optional[grpc.Compression] = None,
1470 ) -> Tuple[Any, grpc.Call]:
1471 (
1472 state,
1473 call,
1474 ) = self._blocking(
1475 request_iterator,
1476 timeout,
1477 metadata,
1478 credentials,
1479 wait_for_ready,
1480 compression,
1481 )
1482 return _end_unary_response_blocking(state, call, True, None)
1484 def future(
1485 self,
1486 request_iterator: Iterator,
1487 timeout: Optional[float] = None,
1488 metadata: Optional[MetadataType] = None,
1489 credentials: Optional[grpc.CallCredentials] = None,
1490 wait_for_ready: Optional[bool] = None,
1491 compression: Optional[grpc.Compression] = None,
1492 ) -> _MultiThreadedRendezvous:
1493 deadline = _deadline(timeout)
1494 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1495 event_handler = _event_handler(state, self._response_deserializer)
1496 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1497 wait_for_ready
1498 )
1499 augmented_metadata = _compression.augment_metadata(
1500 metadata, compression
1501 )
1502 state.rpc_start_time = time.perf_counter()
1503 state.method = _common.decode(self._method)
1504 call = self._managed_call(
1505 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1506 self._method,
1507 None,
1508 deadline,
1509 augmented_metadata,
1510 None if credentials is None else credentials._credentials,
1511 _stream_unary_invocation_operations(
1512 metadata, initial_metadata_flags
1513 ),
1514 event_handler,
1515 self._context,
1516 )
1517 _consume_request_iterator(
1518 request_iterator,
1519 state,
1520 call,
1521 self._request_serializer,
1522 event_handler,
1523 )
1524 return _MultiThreadedRendezvous(
1525 state, call, self._response_deserializer, deadline
1526 )
1529class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1530 _channel: cygrpc.Channel
1531 _managed_call: IntegratedCallFactory
1532 _method: bytes
1533 _request_serializer: Optional[SerializingFunction]
1534 _response_deserializer: Optional[DeserializingFunction]
1535 _context: Any
1537 # pylint: disable=too-many-arguments
1538 def __init__(
1539 self,
1540 channel: cygrpc.Channel,
1541 managed_call: IntegratedCallFactory,
1542 method: bytes,
1543 request_serializer: Optional[SerializingFunction] = None,
1544 response_deserializer: Optional[DeserializingFunction] = None,
1545 ):
1546 self._channel = channel
1547 self._managed_call = managed_call
1548 self._method = method
1549 self._request_serializer = request_serializer
1550 self._response_deserializer = response_deserializer
1551 self._context = cygrpc.build_census_context()
1553 def __call__(
1554 self,
1555 request_iterator: Iterator,
1556 timeout: Optional[float] = None,
1557 metadata: Optional[MetadataType] = None,
1558 credentials: Optional[grpc.CallCredentials] = None,
1559 wait_for_ready: Optional[bool] = None,
1560 compression: Optional[grpc.Compression] = None,
1561 ) -> _MultiThreadedRendezvous:
1562 deadline = _deadline(timeout)
1563 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1564 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1565 wait_for_ready
1566 )
1567 augmented_metadata = _compression.augment_metadata(
1568 metadata, compression
1569 )
1570 operations = (
1571 (
1572 cygrpc.SendInitialMetadataOperation(
1573 augmented_metadata, initial_metadata_flags
1574 ),
1575 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1576 ),
1577 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1578 )
1579 event_handler = _event_handler(state, self._response_deserializer)
1580 state.rpc_start_time = time.perf_counter()
1581 state.method = _common.decode(self._method)
1582 call = self._managed_call(
1583 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1584 self._method,
1585 None,
1586 _determine_deadline(deadline),
1587 augmented_metadata,
1588 None if credentials is None else credentials._credentials,
1589 operations,
1590 event_handler,
1591 self._context,
1592 )
1593 _consume_request_iterator(
1594 request_iterator,
1595 state,
1596 call,
1597 self._request_serializer,
1598 event_handler,
1599 )
1600 return _MultiThreadedRendezvous(
1601 state, call, self._response_deserializer, deadline
1602 )
1605class _InitialMetadataFlags(int):
1606 """Stores immutable initial metadata flags"""
1608 def __new__(cls, value: int = _EMPTY_FLAGS):
1609 value &= cygrpc.InitialMetadataFlags.used_mask
1610 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1612 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1613 if wait_for_ready is not None:
1614 if wait_for_ready:
1615 return self.__class__(
1616 self
1617 | cygrpc.InitialMetadataFlags.wait_for_ready
1618 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1619 )
1620 elif not wait_for_ready:
1621 return self.__class__(
1622 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1623 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1624 )
1625 return self
1628class _ChannelCallState(object):
1629 channel: cygrpc.Channel
1630 managed_calls: int
1631 threading: bool
1633 def __init__(self, channel: cygrpc.Channel):
1634 self.lock = threading.Lock()
1635 self.channel = channel
1636 self.managed_calls = 0
1637 self.threading = False
1639 def reset_postfork_child(self) -> None:
1640 self.managed_calls = 0
1642 def __del__(self):
1643 try:
1644 self.channel.close(
1645 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1646 )
1647 except (TypeError, AttributeError):
1648 pass
1651def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1652 def channel_spin():
1653 while True:
1654 cygrpc.block_if_fork_in_progress(state)
1655 event = state.channel.next_call_event()
1656 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1657 continue
1658 call_completed = event.tag(event)
1659 if call_completed:
1660 with state.lock:
1661 state.managed_calls -= 1
1662 if state.managed_calls == 0:
1663 return
1665 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1666 channel_spin_thread.setDaemon(True)
1667 channel_spin_thread.start()
1670def _channel_managed_call_management(state: _ChannelCallState):
1671 # pylint: disable=too-many-arguments
1672 def create(
1673 flags: int,
1674 method: bytes,
1675 host: Optional[str],
1676 deadline: Optional[float],
1677 metadata: Optional[MetadataType],
1678 credentials: Optional[cygrpc.CallCredentials],
1679 operations: Sequence[Sequence[cygrpc.Operation]],
1680 event_handler: UserTag,
1681 context,
1682 ) -> cygrpc.IntegratedCall:
1683 """Creates a cygrpc.IntegratedCall.
1685 Args:
1686 flags: An integer bitfield of call flags.
1687 method: The RPC method.
1688 host: A host string for the created call.
1689 deadline: A float to be the deadline of the created call or None if
1690 the call is to have an infinite deadline.
1691 metadata: The metadata for the call or None.
1692 credentials: A cygrpc.CallCredentials or None.
1693 operations: A sequence of sequences of cygrpc.Operations to be
1694 started on the call.
1695 event_handler: A behavior to call to handle the events resultant from
1696 the operations on the call.
1697 context: Context object for distributed tracing.
1698 Returns:
1699 A cygrpc.IntegratedCall with which to conduct an RPC.
1700 """
1701 operations_and_tags = tuple(
1702 (
1703 operation,
1704 event_handler,
1705 )
1706 for operation in operations
1707 )
1708 with state.lock:
1709 call = state.channel.integrated_call(
1710 flags,
1711 method,
1712 host,
1713 deadline,
1714 metadata,
1715 credentials,
1716 operations_and_tags,
1717 context,
1718 )
1719 if state.managed_calls == 0:
1720 state.managed_calls = 1
1721 _run_channel_spin_thread(state)
1722 else:
1723 state.managed_calls += 1
1724 return call
1726 return create
1729class _ChannelConnectivityState(object):
1730 lock: threading.RLock
1731 channel: grpc.Channel
1732 polling: bool
1733 connectivity: grpc.ChannelConnectivity
1734 try_to_connect: bool
1735 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1736 callbacks_and_connectivities: List[
1737 Sequence[
1738 Union[
1739 Callable[[grpc.ChannelConnectivity], None],
1740 Optional[grpc.ChannelConnectivity],
1741 ]
1742 ]
1743 ]
1744 delivering: bool
1746 def __init__(self, channel: grpc.Channel):
1747 self.lock = threading.RLock()
1748 self.channel = channel
1749 self.polling = False
1750 self.connectivity = None
1751 self.try_to_connect = False
1752 self.callbacks_and_connectivities = []
1753 self.delivering = False
1755 def reset_postfork_child(self) -> None:
1756 self.polling = False
1757 self.connectivity = None
1758 self.try_to_connect = False
1759 self.callbacks_and_connectivities = []
1760 self.delivering = False
1763def _deliveries(
1764 state: _ChannelConnectivityState,
1765) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1766 callbacks_needing_update = []
1767 for callback_and_connectivity in state.callbacks_and_connectivities:
1768 (
1769 callback,
1770 callback_connectivity,
1771 ) = callback_and_connectivity
1772 if callback_connectivity is not state.connectivity:
1773 callbacks_needing_update.append(callback)
1774 callback_and_connectivity[1] = state.connectivity
1775 return callbacks_needing_update
1778def _deliver(
1779 state: _ChannelConnectivityState,
1780 initial_connectivity: grpc.ChannelConnectivity,
1781 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1782) -> None:
1783 connectivity = initial_connectivity
1784 callbacks = initial_callbacks
1785 while True:
1786 for callback in callbacks:
1787 cygrpc.block_if_fork_in_progress(state)
1788 try:
1789 callback(connectivity)
1790 except Exception: # pylint: disable=broad-except
1791 _LOGGER.exception(
1792 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1793 )
1794 with state.lock:
1795 callbacks = _deliveries(state)
1796 if callbacks:
1797 connectivity = state.connectivity
1798 else:
1799 state.delivering = False
1800 return
1803def _spawn_delivery(
1804 state: _ChannelConnectivityState,
1805 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1806) -> None:
1807 delivering_thread = cygrpc.ForkManagedThread(
1808 target=_deliver,
1809 args=(
1810 state,
1811 state.connectivity,
1812 callbacks,
1813 ),
1814 )
1815 delivering_thread.setDaemon(True)
1816 delivering_thread.start()
1817 state.delivering = True
1820# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1821def _poll_connectivity(
1822 state: _ChannelConnectivityState,
1823 channel: grpc.Channel,
1824 initial_try_to_connect: bool,
1825) -> None:
1826 try_to_connect = initial_try_to_connect
1827 connectivity = channel.check_connectivity_state(try_to_connect)
1828 with state.lock:
1829 state.connectivity = (
1830 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1831 connectivity
1832 ]
1833 )
1834 callbacks = tuple(
1835 callback for callback, _ in state.callbacks_and_connectivities
1836 )
1837 for callback_and_connectivity in state.callbacks_and_connectivities:
1838 callback_and_connectivity[1] = state.connectivity
1839 if callbacks:
1840 _spawn_delivery(state, callbacks)
1841 while True:
1842 event = channel.watch_connectivity_state(
1843 connectivity, time.time() + 0.2
1844 )
1845 cygrpc.block_if_fork_in_progress(state)
1846 with state.lock:
1847 if (
1848 not state.callbacks_and_connectivities
1849 and not state.try_to_connect
1850 ):
1851 state.polling = False
1852 state.connectivity = None
1853 break
1854 try_to_connect = state.try_to_connect
1855 state.try_to_connect = False
1856 if event.success or try_to_connect:
1857 connectivity = channel.check_connectivity_state(try_to_connect)
1858 with state.lock:
1859 state.connectivity = (
1860 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1861 connectivity
1862 ]
1863 )
1864 if not state.delivering:
1865 callbacks = _deliveries(state)
1866 if callbacks:
1867 _spawn_delivery(state, callbacks)
1870def _subscribe(
1871 state: _ChannelConnectivityState,
1872 callback: Callable[[grpc.ChannelConnectivity], None],
1873 try_to_connect: bool,
1874) -> None:
1875 with state.lock:
1876 if not state.callbacks_and_connectivities and not state.polling:
1877 polling_thread = cygrpc.ForkManagedThread(
1878 target=_poll_connectivity,
1879 args=(state, state.channel, bool(try_to_connect)),
1880 )
1881 polling_thread.setDaemon(True)
1882 polling_thread.start()
1883 state.polling = True
1884 state.callbacks_and_connectivities.append([callback, None])
1885 elif not state.delivering and state.connectivity is not None:
1886 _spawn_delivery(state, (callback,))
1887 state.try_to_connect |= bool(try_to_connect)
1888 state.callbacks_and_connectivities.append(
1889 [callback, state.connectivity]
1890 )
1891 else:
1892 state.try_to_connect |= bool(try_to_connect)
1893 state.callbacks_and_connectivities.append([callback, None])
1896def _unsubscribe(
1897 state: _ChannelConnectivityState,
1898 callback: Callable[[grpc.ChannelConnectivity], None],
1899) -> None:
1900 with state.lock:
1901 for index, (subscribed_callback, unused_connectivity) in enumerate(
1902 state.callbacks_and_connectivities
1903 ):
1904 if callback == subscribed_callback:
1905 state.callbacks_and_connectivities.pop(index)
1906 break
1909def _augment_options(
1910 base_options: Sequence[ChannelArgumentType],
1911 compression: Optional[grpc.Compression],
1912) -> Sequence[ChannelArgumentType]:
1913 compression_option = _compression.create_channel_option(compression)
1914 return (
1915 tuple(base_options)
1916 + compression_option
1917 + (
1918 (
1919 cygrpc.ChannelArgKey.primary_user_agent_string,
1920 _USER_AGENT,
1921 ),
1922 )
1923 )
1926def _separate_channel_options(
1927 options: Sequence[ChannelArgumentType],
1928) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
1929 """Separates core channel options from Python channel options."""
1930 core_options = []
1931 python_options = []
1932 for pair in options:
1933 if (
1934 pair[0]
1935 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
1936 ):
1937 python_options.append(pair)
1938 else:
1939 core_options.append(pair)
1940 return python_options, core_options
1943class Channel(grpc.Channel):
1944 """A cygrpc.Channel-backed implementation of grpc.Channel."""
1946 _single_threaded_unary_stream: bool
1947 _channel: cygrpc.Channel
1948 _call_state: _ChannelCallState
1949 _connectivity_state: _ChannelConnectivityState
1951 def __init__(
1952 self,
1953 target: str,
1954 options: Sequence[ChannelArgumentType],
1955 credentials: Optional[grpc.ChannelCredentials],
1956 compression: Optional[grpc.Compression],
1957 ):
1958 """Constructor.
1960 Args:
1961 target: The target to which to connect.
1962 options: Configuration options for the channel.
1963 credentials: A cygrpc.ChannelCredentials or None.
1964 compression: An optional value indicating the compression method to be
1965 used over the lifetime of the channel.
1966 """
1967 python_options, core_options = _separate_channel_options(options)
1968 self._single_threaded_unary_stream = (
1969 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1970 )
1971 self._process_python_options(python_options)
1972 self._channel = cygrpc.Channel(
1973 _common.encode(target),
1974 _augment_options(core_options, compression),
1975 credentials,
1976 )
1977 self._call_state = _ChannelCallState(self._channel)
1978 self._connectivity_state = _ChannelConnectivityState(self._channel)
1979 cygrpc.fork_register_channel(self)
1980 if cygrpc.g_gevent_activated:
1981 cygrpc.gevent_increment_channel_count()
1983 def _process_python_options(
1984 self, python_options: Sequence[ChannelArgumentType]
1985 ) -> None:
1986 """Sets channel attributes according to python-only channel options."""
1987 for pair in python_options:
1988 if (
1989 pair[0]
1990 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
1991 ):
1992 self._single_threaded_unary_stream = True
1994 def subscribe(
1995 self,
1996 callback: Callable[[grpc.ChannelConnectivity], None],
1997 try_to_connect: Optional[bool] = None,
1998 ) -> None:
1999 _subscribe(self._connectivity_state, callback, try_to_connect)
2001 def unsubscribe(
2002 self, callback: Callable[[grpc.ChannelConnectivity], None]
2003 ) -> None:
2004 _unsubscribe(self._connectivity_state, callback)
2006 def unary_unary(
2007 self,
2008 method: str,
2009 request_serializer: Optional[SerializingFunction] = None,
2010 response_deserializer: Optional[DeserializingFunction] = None,
2011 ) -> grpc.UnaryUnaryMultiCallable:
2012 return _UnaryUnaryMultiCallable(
2013 self._channel,
2014 _channel_managed_call_management(self._call_state),
2015 _common.encode(method),
2016 request_serializer,
2017 response_deserializer,
2018 )
2020 def unary_stream(
2021 self,
2022 method: str,
2023 request_serializer: Optional[SerializingFunction] = None,
2024 response_deserializer: Optional[DeserializingFunction] = None,
2025 ) -> grpc.UnaryStreamMultiCallable:
2026 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2027 # on a single Python thread results in an appreciable speed-up. However,
2028 # due to slight differences in capability, the multi-threaded variant
2029 # remains the default.
2030 if self._single_threaded_unary_stream:
2031 return _SingleThreadedUnaryStreamMultiCallable(
2032 self._channel,
2033 _common.encode(method),
2034 request_serializer,
2035 response_deserializer,
2036 )
2037 else:
2038 return _UnaryStreamMultiCallable(
2039 self._channel,
2040 _channel_managed_call_management(self._call_state),
2041 _common.encode(method),
2042 request_serializer,
2043 response_deserializer,
2044 )
2046 def stream_unary(
2047 self,
2048 method: str,
2049 request_serializer: Optional[SerializingFunction] = None,
2050 response_deserializer: Optional[DeserializingFunction] = None,
2051 ) -> grpc.StreamUnaryMultiCallable:
2052 return _StreamUnaryMultiCallable(
2053 self._channel,
2054 _channel_managed_call_management(self._call_state),
2055 _common.encode(method),
2056 request_serializer,
2057 response_deserializer,
2058 )
2060 def stream_stream(
2061 self,
2062 method: str,
2063 request_serializer: Optional[SerializingFunction] = None,
2064 response_deserializer: Optional[DeserializingFunction] = None,
2065 ) -> grpc.StreamStreamMultiCallable:
2066 return _StreamStreamMultiCallable(
2067 self._channel,
2068 _channel_managed_call_management(self._call_state),
2069 _common.encode(method),
2070 request_serializer,
2071 response_deserializer,
2072 )
2074 def _unsubscribe_all(self) -> None:
2075 state = self._connectivity_state
2076 if state:
2077 with state.lock:
2078 del state.callbacks_and_connectivities[:]
2080 def _close(self) -> None:
2081 self._unsubscribe_all()
2082 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2083 cygrpc.fork_unregister_channel(self)
2084 if cygrpc.g_gevent_activated:
2085 cygrpc.gevent_decrement_channel_count()
2087 def _close_on_fork(self) -> None:
2088 self._unsubscribe_all()
2089 self._channel.close_on_fork(
2090 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2091 )
2093 def __enter__(self):
2094 return self
2096 def __exit__(self, exc_type, exc_val, exc_tb):
2097 self._close()
2098 return False
2100 def close(self) -> None:
2101 self._close()
2103 def __del__(self):
2104 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2105 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2106 # here (or more likely, call self._close() here). We don't do this today
2107 # because many valid use cases today allow the channel to be deleted
2108 # immediately after stubs are created. After a sufficient period of time
2109 # has passed for all users to be trusted to freeze out to their channels
2110 # for as long as they are in use and to close them after using them,
2111 # then deletion of this grpc._channel.Channel instance can be made to
2112 # effect closure of the underlying cygrpc.Channel instance.
2113 try:
2114 self._unsubscribe_all()
2115 except: # pylint: disable=bare-except
2116 # Exceptions in __del__ are ignored by Python anyway, but they can
2117 # keep spamming logs. Just silence them.
2118 pass