Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 31%
917 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17from datetime import datetime
18import functools
19import logging
20import os
21import sys
22import threading
23import time
24import types
25from typing import (
26 Any,
27 Callable,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[datetime]
134 rpc_end_time: Optional[datetime]
135 method: Optional[str]
137 def __init__(
138 self,
139 due: Sequence[cygrpc.OperationType],
140 initial_metadata: Optional[MetadataType],
141 trailing_metadata: Optional[MetadataType],
142 code: Optional[grpc.StatusCode],
143 details: Optional[str],
144 ):
145 # `condition` guards all members of _RPCState. `notify_all` is called on
146 # `condition` when the state of the RPC has changed.
147 self.condition = threading.Condition()
149 # The cygrpc.OperationType objects representing events due from the RPC's
150 # completion queue. If an operation is in `due`, it is guaranteed that
151 # `operate()` has been called on a corresponding operation. But the
152 # converse is not true. That is, in the case of failed `operate()`
153 # calls, there may briefly be events in `due` that do not correspond to
154 # operations submitted to Core.
155 self.due = set(due)
156 self.initial_metadata = initial_metadata
157 self.response = None
158 self.trailing_metadata = trailing_metadata
159 self.code = code
160 self.details = details
161 self.debug_error_string = None
162 # The following three fields are used for observability.
163 # Updates to those fields do not trigger self.condition.
164 self.rpc_start_time = None
165 self.rpc_end_time = None
166 self.method = None
168 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
169 # slightly wonky, so they have to be tracked separately from the rest of the
170 # result of the RPC. This field tracks whether cancellation was requested
171 # prior to termination of the RPC.
172 self.cancelled = False
173 self.callbacks = []
174 self.fork_epoch = cygrpc.get_fork_epoch()
176 def reset_postfork_child(self):
177 self.condition = threading.Condition()
180def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
181 if state.code is None:
182 state.code = code
183 state.details = details
184 if state.initial_metadata is None:
185 state.initial_metadata = ()
186 state.trailing_metadata = ()
189def _handle_event(
190 event: cygrpc.BaseEvent,
191 state: _RPCState,
192 response_deserializer: Optional[DeserializingFunction],
193) -> List[NullaryCallbackType]:
194 callbacks = []
195 for batch_operation in event.batch_operations:
196 operation_type = batch_operation.type()
197 state.due.remove(operation_type)
198 if operation_type == cygrpc.OperationType.receive_initial_metadata:
199 state.initial_metadata = batch_operation.initial_metadata()
200 elif operation_type == cygrpc.OperationType.receive_message:
201 serialized_response = batch_operation.message()
202 if serialized_response is not None:
203 response = _common.deserialize(
204 serialized_response, response_deserializer
205 )
206 if response is None:
207 details = "Exception deserializing response!"
208 _abort(state, grpc.StatusCode.INTERNAL, details)
209 else:
210 state.response = response
211 elif operation_type == cygrpc.OperationType.receive_status_on_client:
212 state.trailing_metadata = batch_operation.trailing_metadata()
213 if state.code is None:
214 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
215 batch_operation.code()
216 )
217 if code is None:
218 state.code = grpc.StatusCode.UNKNOWN
219 state.details = _unknown_code_details(
220 code, batch_operation.details()
221 )
222 else:
223 state.code = code
224 state.details = batch_operation.details()
225 state.debug_error_string = batch_operation.error_string()
226 state.rpc_end_time = datetime.utcnow()
227 _observability.maybe_record_rpc_latency(state)
228 callbacks.extend(state.callbacks)
229 state.callbacks = None
230 return callbacks
233def _event_handler(
234 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
235) -> UserTag:
236 def handle_event(event):
237 with state.condition:
238 callbacks = _handle_event(event, state, response_deserializer)
239 state.condition.notify_all()
240 done = not state.due
241 for callback in callbacks:
242 try:
243 callback()
244 except Exception as e: # pylint: disable=broad-except
245 # NOTE(rbellevi): We suppress but log errors here so as not to
246 # kill the channel spin thread.
247 logging.error(
248 "Exception in callback %s: %s", repr(callback.func), repr(e)
249 )
250 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
252 return handle_event
255# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
256# pylint: disable=too-many-statements
257def _consume_request_iterator(
258 request_iterator: Iterator,
259 state: _RPCState,
260 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
261 request_serializer: SerializingFunction,
262 event_handler: Optional[UserTag],
263) -> None:
264 """Consume a request supplied by the user."""
266 def consume_request_iterator(): # pylint: disable=too-many-branches
267 # Iterate over the request iterator until it is exhausted or an error
268 # condition is encountered.
269 while True:
270 return_from_user_request_generator_invoked = False
271 try:
272 # The thread may die in user-code. Do not block fork for this.
273 cygrpc.enter_user_request_generator()
274 request = next(request_iterator)
275 except StopIteration:
276 break
277 except Exception: # pylint: disable=broad-except
278 cygrpc.return_from_user_request_generator()
279 return_from_user_request_generator_invoked = True
280 code = grpc.StatusCode.UNKNOWN
281 details = "Exception iterating requests!"
282 _LOGGER.exception(details)
283 call.cancel(
284 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
285 )
286 _abort(state, code, details)
287 return
288 finally:
289 if not return_from_user_request_generator_invoked:
290 cygrpc.return_from_user_request_generator()
291 serialized_request = _common.serialize(request, request_serializer)
292 with state.condition:
293 if state.code is None and not state.cancelled:
294 if serialized_request is None:
295 code = grpc.StatusCode.INTERNAL
296 details = "Exception serializing request!"
297 call.cancel(
298 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
299 details,
300 )
301 _abort(state, code, details)
302 return
303 else:
304 state.due.add(cygrpc.OperationType.send_message)
305 operations = (
306 cygrpc.SendMessageOperation(
307 serialized_request, _EMPTY_FLAGS
308 ),
309 )
310 operating = call.operate(operations, event_handler)
311 if not operating:
312 state.due.remove(cygrpc.OperationType.send_message)
313 return
315 def _done():
316 return (
317 state.code is not None
318 or cygrpc.OperationType.send_message
319 not in state.due
320 )
322 _common.wait(
323 state.condition.wait,
324 _done,
325 spin_cb=functools.partial(
326 cygrpc.block_if_fork_in_progress, state
327 ),
328 )
329 if state.code is not None:
330 return
331 else:
332 return
333 with state.condition:
334 if state.code is None:
335 state.due.add(cygrpc.OperationType.send_close_from_client)
336 operations = (
337 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
338 )
339 operating = call.operate(operations, event_handler)
340 if not operating:
341 state.due.remove(
342 cygrpc.OperationType.send_close_from_client
343 )
345 consumption_thread = cygrpc.ForkManagedThread(
346 target=consume_request_iterator
347 )
348 consumption_thread.setDaemon(True)
349 consumption_thread.start()
352def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
353 """Calculates error string for RPC."""
354 with rpc_state.condition:
355 if rpc_state.code is None:
356 return "<{} object>".format(class_name)
357 elif rpc_state.code is grpc.StatusCode.OK:
358 return _OK_RENDEZVOUS_REPR_FORMAT.format(
359 class_name, rpc_state.code, rpc_state.details
360 )
361 else:
362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
363 class_name,
364 rpc_state.code,
365 rpc_state.details,
366 rpc_state.debug_error_string,
367 )
370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
371 """An RPC error not tied to the execution of a particular RPC.
373 The RPC represented by the state object must not be in-progress or
374 cancelled.
376 Attributes:
377 _state: An instance of _RPCState.
378 """
380 _state: _RPCState
382 def __init__(self, state: _RPCState):
383 with state.condition:
384 self._state = _RPCState(
385 (),
386 copy.deepcopy(state.initial_metadata),
387 copy.deepcopy(state.trailing_metadata),
388 state.code,
389 copy.deepcopy(state.details),
390 )
391 self._state.response = copy.copy(state.response)
392 self._state.debug_error_string = copy.copy(state.debug_error_string)
394 def initial_metadata(self) -> Optional[MetadataType]:
395 return self._state.initial_metadata
397 def trailing_metadata(self) -> Optional[MetadataType]:
398 return self._state.trailing_metadata
400 def code(self) -> Optional[grpc.StatusCode]:
401 return self._state.code
403 def details(self) -> Optional[str]:
404 return _common.decode(self._state.details)
406 def debug_error_string(self) -> Optional[str]:
407 return _common.decode(self._state.debug_error_string)
409 def _repr(self) -> str:
410 return _rpc_state_string(self.__class__.__name__, self._state)
412 def __repr__(self) -> str:
413 return self._repr()
415 def __str__(self) -> str:
416 return self._repr()
418 def cancel(self) -> bool:
419 """See grpc.Future.cancel."""
420 return False
422 def cancelled(self) -> bool:
423 """See grpc.Future.cancelled."""
424 return False
426 def running(self) -> bool:
427 """See grpc.Future.running."""
428 return False
430 def done(self) -> bool:
431 """See grpc.Future.done."""
432 return True
434 def result(
435 self, timeout: Optional[float] = None
436 ) -> Any: # pylint: disable=unused-argument
437 """See grpc.Future.result."""
438 raise self
440 def exception(
441 self, timeout: Optional[float] = None # pylint: disable=unused-argument
442 ) -> Optional[Exception]:
443 """See grpc.Future.exception."""
444 return self
446 def traceback(
447 self, timeout: Optional[float] = None # pylint: disable=unused-argument
448 ) -> Optional[types.TracebackType]:
449 """See grpc.Future.traceback."""
450 try:
451 raise self
452 except grpc.RpcError:
453 return sys.exc_info()[2]
455 def add_done_callback(
456 self,
457 fn: Callable[[grpc.Future], None],
458 timeout: Optional[float] = None, # pylint: disable=unused-argument
459 ) -> None:
460 """See grpc.Future.add_done_callback."""
461 fn(self)
464class _Rendezvous(grpc.RpcError, grpc.RpcContext):
465 """An RPC iterator.
467 Attributes:
468 _state: An instance of _RPCState.
469 _call: An instance of SegregatedCall or IntegratedCall.
470 In either case, the _call object is expected to have operate, cancel,
471 and next_event methods.
472 _response_deserializer: A callable taking bytes and return a Python
473 object.
474 _deadline: A float representing the deadline of the RPC in seconds. Or
475 possibly None, to represent an RPC with no deadline at all.
476 """
478 _state: _RPCState
479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
480 _response_deserializer: Optional[DeserializingFunction]
481 _deadline: Optional[float]
483 def __init__(
484 self,
485 state: _RPCState,
486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
487 response_deserializer: Optional[DeserializingFunction],
488 deadline: Optional[float],
489 ):
490 super(_Rendezvous, self).__init__()
491 self._state = state
492 self._call = call
493 self._response_deserializer = response_deserializer
494 self._deadline = deadline
496 def is_active(self) -> bool:
497 """See grpc.RpcContext.is_active"""
498 with self._state.condition:
499 return self._state.code is None
501 def time_remaining(self) -> Optional[float]:
502 """See grpc.RpcContext.time_remaining"""
503 with self._state.condition:
504 if self._deadline is None:
505 return None
506 else:
507 return max(self._deadline - time.time(), 0)
509 def cancel(self) -> bool:
510 """See grpc.RpcContext.cancel"""
511 with self._state.condition:
512 if self._state.code is None:
513 code = grpc.StatusCode.CANCELLED
514 details = "Locally cancelled by application!"
515 self._call.cancel(
516 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
517 )
518 self._state.cancelled = True
519 _abort(self._state, code, details)
520 self._state.condition.notify_all()
521 return True
522 else:
523 return False
525 def add_callback(self, callback: NullaryCallbackType) -> bool:
526 """See grpc.RpcContext.add_callback"""
527 with self._state.condition:
528 if self._state.callbacks is None:
529 return False
530 else:
531 self._state.callbacks.append(callback)
532 return True
534 def __iter__(self):
535 return self
537 def next(self):
538 return self._next()
540 def __next__(self):
541 return self._next()
543 def _next(self):
544 raise NotImplementedError()
546 def debug_error_string(self) -> Optional[str]:
547 raise NotImplementedError()
549 def _repr(self) -> str:
550 return _rpc_state_string(self.__class__.__name__, self._state)
552 def __repr__(self) -> str:
553 return self._repr()
555 def __str__(self) -> str:
556 return self._repr()
558 def __del__(self) -> None:
559 with self._state.condition:
560 if self._state.code is None:
561 self._state.code = grpc.StatusCode.CANCELLED
562 self._state.details = "Cancelled upon garbage collection!"
563 self._state.cancelled = True
564 self._call.cancel(
565 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
566 self._state.details,
567 )
568 self._state.condition.notify_all()
571class _SingleThreadedRendezvous(
572 _Rendezvous, grpc.Call, grpc.Future
573): # pylint: disable=too-many-ancestors
574 """An RPC iterator operating entirely on a single thread.
576 The __next__ method of _SingleThreadedRendezvous does not depend on the
577 existence of any other thread, including the "channel spin thread".
578 However, this means that its interface is entirely synchronous. So this
579 class cannot completely fulfill the grpc.Future interface. The result,
580 exception, and traceback methods will never block and will instead raise
581 an exception if calling the method would result in blocking.
583 This means that these methods are safe to call from add_done_callback
584 handlers.
585 """
587 _state: _RPCState
589 def _is_complete(self) -> bool:
590 return self._state.code is not None
592 def cancelled(self) -> bool:
593 with self._state.condition:
594 return self._state.cancelled
596 def running(self) -> bool:
597 with self._state.condition:
598 return self._state.code is None
600 def done(self) -> bool:
601 with self._state.condition:
602 return self._state.code is not None
604 def result(self, timeout: Optional[float] = None) -> Any:
605 """Returns the result of the computation or raises its exception.
607 This method will never block. Instead, it will raise an exception
608 if calling this method would otherwise result in blocking.
610 Since this method will never block, any `timeout` argument passed will
611 be ignored.
612 """
613 del timeout
614 with self._state.condition:
615 if not self._is_complete():
616 raise grpc.experimental.UsageError(
617 "_SingleThreadedRendezvous only supports result() when the"
618 " RPC is complete."
619 )
620 if self._state.code is grpc.StatusCode.OK:
621 return self._state.response
622 elif self._state.cancelled:
623 raise grpc.FutureCancelledError()
624 else:
625 raise self
627 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
628 """Return the exception raised by the computation.
630 This method will never block. Instead, it will raise an exception
631 if calling this method would otherwise result in blocking.
633 Since this method will never block, any `timeout` argument passed will
634 be ignored.
635 """
636 del timeout
637 with self._state.condition:
638 if not self._is_complete():
639 raise grpc.experimental.UsageError(
640 "_SingleThreadedRendezvous only supports exception() when"
641 " the RPC is complete."
642 )
643 if self._state.code is grpc.StatusCode.OK:
644 return None
645 elif self._state.cancelled:
646 raise grpc.FutureCancelledError()
647 else:
648 return self
650 def traceback(
651 self, timeout: Optional[float] = None
652 ) -> Optional[types.TracebackType]:
653 """Access the traceback of the exception raised by the computation.
655 This method will never block. Instead, it will raise an exception
656 if calling this method would otherwise result in blocking.
658 Since this method will never block, any `timeout` argument passed will
659 be ignored.
660 """
661 del timeout
662 with self._state.condition:
663 if not self._is_complete():
664 raise grpc.experimental.UsageError(
665 "_SingleThreadedRendezvous only supports traceback() when"
666 " the RPC is complete."
667 )
668 if self._state.code is grpc.StatusCode.OK:
669 return None
670 elif self._state.cancelled:
671 raise grpc.FutureCancelledError()
672 else:
673 try:
674 raise self
675 except grpc.RpcError:
676 return sys.exc_info()[2]
678 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
679 with self._state.condition:
680 if self._state.code is None:
681 self._state.callbacks.append(functools.partial(fn, self))
682 return
684 fn(self)
686 def initial_metadata(self) -> Optional[MetadataType]:
687 """See grpc.Call.initial_metadata"""
688 with self._state.condition:
689 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
690 # to receive initial metadata before any messages.
691 while self._state.initial_metadata is None:
692 self._consume_next_event()
693 return self._state.initial_metadata
695 def trailing_metadata(self) -> Optional[MetadataType]:
696 """See grpc.Call.trailing_metadata"""
697 with self._state.condition:
698 if self._state.trailing_metadata is None:
699 raise grpc.experimental.UsageError(
700 "Cannot get trailing metadata until RPC is completed."
701 )
702 return self._state.trailing_metadata
704 def code(self) -> Optional[grpc.StatusCode]:
705 """See grpc.Call.code"""
706 with self._state.condition:
707 if self._state.code is None:
708 raise grpc.experimental.UsageError(
709 "Cannot get code until RPC is completed."
710 )
711 return self._state.code
713 def details(self) -> Optional[str]:
714 """See grpc.Call.details"""
715 with self._state.condition:
716 if self._state.details is None:
717 raise grpc.experimental.UsageError(
718 "Cannot get details until RPC is completed."
719 )
720 return _common.decode(self._state.details)
722 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
723 event = self._call.next_event()
724 with self._state.condition:
725 callbacks = _handle_event(
726 event, self._state, self._response_deserializer
727 )
728 for callback in callbacks:
729 # NOTE(gnossen): We intentionally allow exceptions to bubble up
730 # to the user when running on a single thread.
731 callback()
732 return event
734 def _next_response(self) -> Any:
735 while True:
736 self._consume_next_event()
737 with self._state.condition:
738 if self._state.response is not None:
739 response = self._state.response
740 self._state.response = None
741 return response
742 elif (
743 cygrpc.OperationType.receive_message not in self._state.due
744 ):
745 if self._state.code is grpc.StatusCode.OK:
746 raise StopIteration()
747 elif self._state.code is not None:
748 raise self
750 def _next(self) -> Any:
751 with self._state.condition:
752 if self._state.code is None:
753 # We tentatively add the operation as expected and remove
754 # it if the enqueue operation fails. This allows us to guarantee that
755 # if an event has been submitted to the core completion queue,
756 # it is in `due`. If we waited until after a successful
757 # enqueue operation then a signal could interrupt this
758 # thread between the enqueue operation and the addition of the
759 # operation to `due`. This would cause an exception on the
760 # channel spin thread when the operation completes and no
761 # corresponding operation would be present in state.due.
762 # Note that, since `condition` is held through this block, there is
763 # no data race on `due`.
764 self._state.due.add(cygrpc.OperationType.receive_message)
765 operating = self._call.operate(
766 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
767 )
768 if not operating:
769 self._state.due.remove(cygrpc.OperationType.receive_message)
770 elif self._state.code is grpc.StatusCode.OK:
771 raise StopIteration()
772 else:
773 raise self
774 return self._next_response()
776 def debug_error_string(self) -> Optional[str]:
777 with self._state.condition:
778 if self._state.debug_error_string is None:
779 raise grpc.experimental.UsageError(
780 "Cannot get debug error string until RPC is completed."
781 )
782 return _common.decode(self._state.debug_error_string)
785class _MultiThreadedRendezvous(
786 _Rendezvous, grpc.Call, grpc.Future
787): # pylint: disable=too-many-ancestors
788 """An RPC iterator that depends on a channel spin thread.
790 This iterator relies upon a per-channel thread running in the background,
791 dequeueing events from the completion queue, and notifying threads waiting
792 on the threading.Condition object in the _RPCState object.
794 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
795 and to mediate a bidirection streaming RPC.
796 """
798 _state: _RPCState
800 def initial_metadata(self) -> Optional[MetadataType]:
801 """See grpc.Call.initial_metadata"""
802 with self._state.condition:
804 def _done():
805 return self._state.initial_metadata is not None
807 _common.wait(self._state.condition.wait, _done)
808 return self._state.initial_metadata
810 def trailing_metadata(self) -> Optional[MetadataType]:
811 """See grpc.Call.trailing_metadata"""
812 with self._state.condition:
814 def _done():
815 return self._state.trailing_metadata is not None
817 _common.wait(self._state.condition.wait, _done)
818 return self._state.trailing_metadata
820 def code(self) -> Optional[grpc.StatusCode]:
821 """See grpc.Call.code"""
822 with self._state.condition:
824 def _done():
825 return self._state.code is not None
827 _common.wait(self._state.condition.wait, _done)
828 return self._state.code
830 def details(self) -> Optional[str]:
831 """See grpc.Call.details"""
832 with self._state.condition:
834 def _done():
835 return self._state.details is not None
837 _common.wait(self._state.condition.wait, _done)
838 return _common.decode(self._state.details)
840 def debug_error_string(self) -> Optional[str]:
841 with self._state.condition:
843 def _done():
844 return self._state.debug_error_string is not None
846 _common.wait(self._state.condition.wait, _done)
847 return _common.decode(self._state.debug_error_string)
849 def cancelled(self) -> bool:
850 with self._state.condition:
851 return self._state.cancelled
853 def running(self) -> bool:
854 with self._state.condition:
855 return self._state.code is None
857 def done(self) -> bool:
858 with self._state.condition:
859 return self._state.code is not None
861 def _is_complete(self) -> bool:
862 return self._state.code is not None
864 def result(self, timeout: Optional[float] = None) -> Any:
865 """Returns the result of the computation or raises its exception.
867 See grpc.Future.result for the full API contract.
868 """
869 with self._state.condition:
870 timed_out = _common.wait(
871 self._state.condition.wait, self._is_complete, timeout=timeout
872 )
873 if timed_out:
874 raise grpc.FutureTimeoutError()
875 else:
876 if self._state.code is grpc.StatusCode.OK:
877 return self._state.response
878 elif self._state.cancelled:
879 raise grpc.FutureCancelledError()
880 else:
881 raise self
883 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
884 """Return the exception raised by the computation.
886 See grpc.Future.exception for the full API contract.
887 """
888 with self._state.condition:
889 timed_out = _common.wait(
890 self._state.condition.wait, self._is_complete, timeout=timeout
891 )
892 if timed_out:
893 raise grpc.FutureTimeoutError()
894 else:
895 if self._state.code is grpc.StatusCode.OK:
896 return None
897 elif self._state.cancelled:
898 raise grpc.FutureCancelledError()
899 else:
900 return self
902 def traceback(
903 self, timeout: Optional[float] = None
904 ) -> Optional[types.TracebackType]:
905 """Access the traceback of the exception raised by the computation.
907 See grpc.future.traceback for the full API contract.
908 """
909 with self._state.condition:
910 timed_out = _common.wait(
911 self._state.condition.wait, self._is_complete, timeout=timeout
912 )
913 if timed_out:
914 raise grpc.FutureTimeoutError()
915 else:
916 if self._state.code is grpc.StatusCode.OK:
917 return None
918 elif self._state.cancelled:
919 raise grpc.FutureCancelledError()
920 else:
921 try:
922 raise self
923 except grpc.RpcError:
924 return sys.exc_info()[2]
926 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
927 with self._state.condition:
928 if self._state.code is None:
929 self._state.callbacks.append(functools.partial(fn, self))
930 return
932 fn(self)
934 def _next(self) -> Any:
935 with self._state.condition:
936 if self._state.code is None:
937 event_handler = _event_handler(
938 self._state, self._response_deserializer
939 )
940 self._state.due.add(cygrpc.OperationType.receive_message)
941 operating = self._call.operate(
942 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
943 event_handler,
944 )
945 if not operating:
946 self._state.due.remove(cygrpc.OperationType.receive_message)
947 elif self._state.code is grpc.StatusCode.OK:
948 raise StopIteration()
949 else:
950 raise self
952 def _response_ready():
953 return self._state.response is not None or (
954 cygrpc.OperationType.receive_message not in self._state.due
955 and self._state.code is not None
956 )
958 _common.wait(self._state.condition.wait, _response_ready)
959 if self._state.response is not None:
960 response = self._state.response
961 self._state.response = None
962 return response
963 elif cygrpc.OperationType.receive_message not in self._state.due:
964 if self._state.code is grpc.StatusCode.OK:
965 raise StopIteration()
966 elif self._state.code is not None:
967 raise self
970def _start_unary_request(
971 request: Any,
972 timeout: Optional[float],
973 request_serializer: SerializingFunction,
974) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
975 deadline = _deadline(timeout)
976 serialized_request = _common.serialize(request, request_serializer)
977 if serialized_request is None:
978 state = _RPCState(
979 (),
980 (),
981 (),
982 grpc.StatusCode.INTERNAL,
983 "Exception serializing request!",
984 )
985 error = _InactiveRpcError(state)
986 return deadline, None, error
987 else:
988 return deadline, serialized_request, None
991def _end_unary_response_blocking(
992 state: _RPCState,
993 call: cygrpc.SegregatedCall,
994 with_call: bool,
995 deadline: Optional[float],
996) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
997 if state.code is grpc.StatusCode.OK:
998 if with_call:
999 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
1000 return state.response, rendezvous
1001 else:
1002 return state.response
1003 else:
1004 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
1007def _stream_unary_invocation_operations(
1008 metadata: Optional[MetadataType], initial_metadata_flags: int
1009) -> Sequence[Sequence[cygrpc.Operation]]:
1010 return (
1011 (
1012 cygrpc.SendInitialMetadataOperation(
1013 metadata, initial_metadata_flags
1014 ),
1015 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1016 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1017 ),
1018 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1019 )
1022def _stream_unary_invocation_operations_and_tags(
1023 metadata: Optional[MetadataType], initial_metadata_flags: int
1024) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1025 return tuple(
1026 (
1027 operations,
1028 None,
1029 )
1030 for operations in _stream_unary_invocation_operations(
1031 metadata, initial_metadata_flags
1032 )
1033 )
1036def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1037 parent_deadline = cygrpc.get_deadline_from_context()
1038 if parent_deadline is None and user_deadline is None:
1039 return None
1040 elif parent_deadline is not None and user_deadline is None:
1041 return parent_deadline
1042 elif user_deadline is not None and parent_deadline is None:
1043 return user_deadline
1044 else:
1045 return min(parent_deadline, user_deadline)
1048class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1049 _channel: cygrpc.Channel
1050 _managed_call: IntegratedCallFactory
1051 _method: bytes
1052 _request_serializer: Optional[SerializingFunction]
1053 _response_deserializer: Optional[DeserializingFunction]
1054 _context: Any
1056 # pylint: disable=too-many-arguments
1057 def __init__(
1058 self,
1059 channel: cygrpc.Channel,
1060 managed_call: IntegratedCallFactory,
1061 method: bytes,
1062 request_serializer: Optional[SerializingFunction],
1063 response_deserializer: Optional[DeserializingFunction],
1064 ):
1065 self._channel = channel
1066 self._managed_call = managed_call
1067 self._method = method
1068 self._request_serializer = request_serializer
1069 self._response_deserializer = response_deserializer
1070 self._context = cygrpc.build_census_context()
1072 def _prepare(
1073 self,
1074 request: Any,
1075 timeout: Optional[float],
1076 metadata: Optional[MetadataType],
1077 wait_for_ready: Optional[bool],
1078 compression: Optional[grpc.Compression],
1079 ) -> Tuple[
1080 Optional[_RPCState],
1081 Optional[Sequence[cygrpc.Operation]],
1082 Optional[float],
1083 Optional[grpc.RpcError],
1084 ]:
1085 deadline, serialized_request, rendezvous = _start_unary_request(
1086 request, timeout, self._request_serializer
1087 )
1088 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1089 wait_for_ready
1090 )
1091 augmented_metadata = _compression.augment_metadata(
1092 metadata, compression
1093 )
1094 if serialized_request is None:
1095 return None, None, None, rendezvous
1096 else:
1097 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1098 operations = (
1099 cygrpc.SendInitialMetadataOperation(
1100 augmented_metadata, initial_metadata_flags
1101 ),
1102 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1103 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1104 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1105 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1106 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1107 )
1108 return state, operations, deadline, None
1110 def _blocking(
1111 self,
1112 request: Any,
1113 timeout: Optional[float] = None,
1114 metadata: Optional[MetadataType] = None,
1115 credentials: Optional[grpc.CallCredentials] = None,
1116 wait_for_ready: Optional[bool] = None,
1117 compression: Optional[grpc.Compression] = None,
1118 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1119 state, operations, deadline, rendezvous = self._prepare(
1120 request, timeout, metadata, wait_for_ready, compression
1121 )
1122 if state is None:
1123 raise rendezvous # pylint: disable-msg=raising-bad-type
1124 else:
1125 state.rpc_start_time = datetime.utcnow()
1126 state.method = _common.decode(self._method)
1127 call = self._channel.segregated_call(
1128 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1129 self._method,
1130 None,
1131 _determine_deadline(deadline),
1132 metadata,
1133 None if credentials is None else credentials._credentials,
1134 (
1135 (
1136 operations,
1137 None,
1138 ),
1139 ),
1140 self._context,
1141 )
1142 event = call.next_event()
1143 _handle_event(event, state, self._response_deserializer)
1144 return state, call
1146 def __call__(
1147 self,
1148 request: Any,
1149 timeout: Optional[float] = None,
1150 metadata: Optional[MetadataType] = None,
1151 credentials: Optional[grpc.CallCredentials] = None,
1152 wait_for_ready: Optional[bool] = None,
1153 compression: Optional[grpc.Compression] = None,
1154 ) -> Any:
1155 (
1156 state,
1157 call,
1158 ) = self._blocking(
1159 request, timeout, metadata, credentials, wait_for_ready, compression
1160 )
1161 return _end_unary_response_blocking(state, call, False, None)
1163 def with_call(
1164 self,
1165 request: Any,
1166 timeout: Optional[float] = None,
1167 metadata: Optional[MetadataType] = None,
1168 credentials: Optional[grpc.CallCredentials] = None,
1169 wait_for_ready: Optional[bool] = None,
1170 compression: Optional[grpc.Compression] = None,
1171 ) -> Tuple[Any, grpc.Call]:
1172 (
1173 state,
1174 call,
1175 ) = self._blocking(
1176 request, timeout, metadata, credentials, wait_for_ready, compression
1177 )
1178 return _end_unary_response_blocking(state, call, True, None)
1180 def future(
1181 self,
1182 request: Any,
1183 timeout: Optional[float] = None,
1184 metadata: Optional[MetadataType] = None,
1185 credentials: Optional[grpc.CallCredentials] = None,
1186 wait_for_ready: Optional[bool] = None,
1187 compression: Optional[grpc.Compression] = None,
1188 ) -> _MultiThreadedRendezvous:
1189 state, operations, deadline, rendezvous = self._prepare(
1190 request, timeout, metadata, wait_for_ready, compression
1191 )
1192 if state is None:
1193 raise rendezvous # pylint: disable-msg=raising-bad-type
1194 else:
1195 event_handler = _event_handler(state, self._response_deserializer)
1196 state.rpc_start_time = datetime.utcnow()
1197 state.method = _common.decode(self._method)
1198 call = self._managed_call(
1199 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1200 self._method,
1201 None,
1202 deadline,
1203 metadata,
1204 None if credentials is None else credentials._credentials,
1205 (operations,),
1206 event_handler,
1207 self._context,
1208 )
1209 return _MultiThreadedRendezvous(
1210 state, call, self._response_deserializer, deadline
1211 )
1214class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1215 _channel: cygrpc.Channel
1216 _method: bytes
1217 _request_serializer: Optional[SerializingFunction]
1218 _response_deserializer: Optional[DeserializingFunction]
1219 _context: Any
1221 # pylint: disable=too-many-arguments
1222 def __init__(
1223 self,
1224 channel: cygrpc.Channel,
1225 method: bytes,
1226 request_serializer: SerializingFunction,
1227 response_deserializer: DeserializingFunction,
1228 ):
1229 self._channel = channel
1230 self._method = method
1231 self._request_serializer = request_serializer
1232 self._response_deserializer = response_deserializer
1233 self._context = cygrpc.build_census_context()
1235 def __call__( # pylint: disable=too-many-locals
1236 self,
1237 request: Any,
1238 timeout: Optional[float] = None,
1239 metadata: Optional[MetadataType] = None,
1240 credentials: Optional[grpc.CallCredentials] = None,
1241 wait_for_ready: Optional[bool] = None,
1242 compression: Optional[grpc.Compression] = None,
1243 ) -> _SingleThreadedRendezvous:
1244 deadline = _deadline(timeout)
1245 serialized_request = _common.serialize(
1246 request, self._request_serializer
1247 )
1248 if serialized_request is None:
1249 state = _RPCState(
1250 (),
1251 (),
1252 (),
1253 grpc.StatusCode.INTERNAL,
1254 "Exception serializing request!",
1255 )
1256 raise _InactiveRpcError(state)
1258 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1259 call_credentials = (
1260 None if credentials is None else credentials._credentials
1261 )
1262 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1263 wait_for_ready
1264 )
1265 augmented_metadata = _compression.augment_metadata(
1266 metadata, compression
1267 )
1268 operations = (
1269 (
1270 cygrpc.SendInitialMetadataOperation(
1271 augmented_metadata, initial_metadata_flags
1272 ),
1273 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1274 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1275 ),
1276 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1277 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1278 )
1279 operations_and_tags = tuple((ops, None) for ops in operations)
1280 state.rpc_start_time = datetime.utcnow()
1281 state.method = _common.decode(self._method)
1282 call = self._channel.segregated_call(
1283 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1284 self._method,
1285 None,
1286 _determine_deadline(deadline),
1287 metadata,
1288 call_credentials,
1289 operations_and_tags,
1290 self._context,
1291 )
1292 return _SingleThreadedRendezvous(
1293 state, call, self._response_deserializer, deadline
1294 )
1297class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1298 _channel: cygrpc.Channel
1299 _managed_call: IntegratedCallFactory
1300 _method: bytes
1301 _request_serializer: Optional[SerializingFunction]
1302 _response_deserializer: Optional[DeserializingFunction]
1303 _context: Any
1305 # pylint: disable=too-many-arguments
1306 def __init__(
1307 self,
1308 channel: cygrpc.Channel,
1309 managed_call: IntegratedCallFactory,
1310 method: bytes,
1311 request_serializer: SerializingFunction,
1312 response_deserializer: DeserializingFunction,
1313 ):
1314 self._channel = channel
1315 self._managed_call = managed_call
1316 self._method = method
1317 self._request_serializer = request_serializer
1318 self._response_deserializer = response_deserializer
1319 self._context = cygrpc.build_census_context()
1321 def __call__( # pylint: disable=too-many-locals
1322 self,
1323 request: Any,
1324 timeout: Optional[float] = None,
1325 metadata: Optional[MetadataType] = None,
1326 credentials: Optional[grpc.CallCredentials] = None,
1327 wait_for_ready: Optional[bool] = None,
1328 compression: Optional[grpc.Compression] = None,
1329 ) -> _MultiThreadedRendezvous:
1330 deadline, serialized_request, rendezvous = _start_unary_request(
1331 request, timeout, self._request_serializer
1332 )
1333 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1334 wait_for_ready
1335 )
1336 if serialized_request is None:
1337 raise rendezvous # pylint: disable-msg=raising-bad-type
1338 else:
1339 augmented_metadata = _compression.augment_metadata(
1340 metadata, compression
1341 )
1342 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1343 operations = (
1344 (
1345 cygrpc.SendInitialMetadataOperation(
1346 augmented_metadata, initial_metadata_flags
1347 ),
1348 cygrpc.SendMessageOperation(
1349 serialized_request, _EMPTY_FLAGS
1350 ),
1351 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1352 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1353 ),
1354 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1355 )
1356 state.rpc_start_time = datetime.utcnow()
1357 state.method = _common.decode(self._method)
1358 call = self._managed_call(
1359 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1360 self._method,
1361 None,
1362 _determine_deadline(deadline),
1363 metadata,
1364 None if credentials is None else credentials._credentials,
1365 operations,
1366 _event_handler(state, self._response_deserializer),
1367 self._context,
1368 )
1369 return _MultiThreadedRendezvous(
1370 state, call, self._response_deserializer, deadline
1371 )
1374class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1375 _channel: cygrpc.Channel
1376 _managed_call: IntegratedCallFactory
1377 _method: bytes
1378 _request_serializer: Optional[SerializingFunction]
1379 _response_deserializer: Optional[DeserializingFunction]
1380 _context: Any
1382 # pylint: disable=too-many-arguments
1383 def __init__(
1384 self,
1385 channel: cygrpc.Channel,
1386 managed_call: IntegratedCallFactory,
1387 method: bytes,
1388 request_serializer: Optional[SerializingFunction],
1389 response_deserializer: Optional[DeserializingFunction],
1390 ):
1391 self._channel = channel
1392 self._managed_call = managed_call
1393 self._method = method
1394 self._request_serializer = request_serializer
1395 self._response_deserializer = response_deserializer
1396 self._context = cygrpc.build_census_context()
1398 def _blocking(
1399 self,
1400 request_iterator: Iterator,
1401 timeout: Optional[float],
1402 metadata: Optional[MetadataType],
1403 credentials: Optional[grpc.CallCredentials],
1404 wait_for_ready: Optional[bool],
1405 compression: Optional[grpc.Compression],
1406 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1407 deadline = _deadline(timeout)
1408 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1409 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1410 wait_for_ready
1411 )
1412 augmented_metadata = _compression.augment_metadata(
1413 metadata, compression
1414 )
1415 state.rpc_start_time = datetime.utcnow()
1416 state.method = _common.decode(self._method)
1417 call = self._channel.segregated_call(
1418 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1419 self._method,
1420 None,
1421 _determine_deadline(deadline),
1422 augmented_metadata,
1423 None if credentials is None else credentials._credentials,
1424 _stream_unary_invocation_operations_and_tags(
1425 augmented_metadata, initial_metadata_flags
1426 ),
1427 self._context,
1428 )
1429 _consume_request_iterator(
1430 request_iterator, state, call, self._request_serializer, None
1431 )
1432 while True:
1433 event = call.next_event()
1434 with state.condition:
1435 _handle_event(event, state, self._response_deserializer)
1436 state.condition.notify_all()
1437 if not state.due:
1438 break
1439 return state, call
1441 def __call__(
1442 self,
1443 request_iterator: Iterator,
1444 timeout: Optional[float] = None,
1445 metadata: Optional[MetadataType] = None,
1446 credentials: Optional[grpc.CallCredentials] = None,
1447 wait_for_ready: Optional[bool] = None,
1448 compression: Optional[grpc.Compression] = None,
1449 ) -> Any:
1450 (
1451 state,
1452 call,
1453 ) = self._blocking(
1454 request_iterator,
1455 timeout,
1456 metadata,
1457 credentials,
1458 wait_for_ready,
1459 compression,
1460 )
1461 return _end_unary_response_blocking(state, call, False, None)
1463 def with_call(
1464 self,
1465 request_iterator: Iterator,
1466 timeout: Optional[float] = None,
1467 metadata: Optional[MetadataType] = None,
1468 credentials: Optional[grpc.CallCredentials] = None,
1469 wait_for_ready: Optional[bool] = None,
1470 compression: Optional[grpc.Compression] = None,
1471 ) -> Tuple[Any, grpc.Call]:
1472 (
1473 state,
1474 call,
1475 ) = self._blocking(
1476 request_iterator,
1477 timeout,
1478 metadata,
1479 credentials,
1480 wait_for_ready,
1481 compression,
1482 )
1483 return _end_unary_response_blocking(state, call, True, None)
1485 def future(
1486 self,
1487 request_iterator: Iterator,
1488 timeout: Optional[float] = None,
1489 metadata: Optional[MetadataType] = None,
1490 credentials: Optional[grpc.CallCredentials] = None,
1491 wait_for_ready: Optional[bool] = None,
1492 compression: Optional[grpc.Compression] = None,
1493 ) -> _MultiThreadedRendezvous:
1494 deadline = _deadline(timeout)
1495 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1496 event_handler = _event_handler(state, self._response_deserializer)
1497 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1498 wait_for_ready
1499 )
1500 augmented_metadata = _compression.augment_metadata(
1501 metadata, compression
1502 )
1503 state.rpc_start_time = datetime.utcnow()
1504 state.method = _common.decode(self._method)
1505 call = self._managed_call(
1506 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1507 self._method,
1508 None,
1509 deadline,
1510 augmented_metadata,
1511 None if credentials is None else credentials._credentials,
1512 _stream_unary_invocation_operations(
1513 metadata, initial_metadata_flags
1514 ),
1515 event_handler,
1516 self._context,
1517 )
1518 _consume_request_iterator(
1519 request_iterator,
1520 state,
1521 call,
1522 self._request_serializer,
1523 event_handler,
1524 )
1525 return _MultiThreadedRendezvous(
1526 state, call, self._response_deserializer, deadline
1527 )
1530class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1531 _channel: cygrpc.Channel
1532 _managed_call: IntegratedCallFactory
1533 _method: bytes
1534 _request_serializer: Optional[SerializingFunction]
1535 _response_deserializer: Optional[DeserializingFunction]
1536 _context: Any
1538 # pylint: disable=too-many-arguments
1539 def __init__(
1540 self,
1541 channel: cygrpc.Channel,
1542 managed_call: IntegratedCallFactory,
1543 method: bytes,
1544 request_serializer: Optional[SerializingFunction] = None,
1545 response_deserializer: Optional[DeserializingFunction] = None,
1546 ):
1547 self._channel = channel
1548 self._managed_call = managed_call
1549 self._method = method
1550 self._request_serializer = request_serializer
1551 self._response_deserializer = response_deserializer
1552 self._context = cygrpc.build_census_context()
1554 def __call__(
1555 self,
1556 request_iterator: Iterator,
1557 timeout: Optional[float] = None,
1558 metadata: Optional[MetadataType] = None,
1559 credentials: Optional[grpc.CallCredentials] = None,
1560 wait_for_ready: Optional[bool] = None,
1561 compression: Optional[grpc.Compression] = None,
1562 ) -> _MultiThreadedRendezvous:
1563 deadline = _deadline(timeout)
1564 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1565 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1566 wait_for_ready
1567 )
1568 augmented_metadata = _compression.augment_metadata(
1569 metadata, compression
1570 )
1571 operations = (
1572 (
1573 cygrpc.SendInitialMetadataOperation(
1574 augmented_metadata, initial_metadata_flags
1575 ),
1576 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1577 ),
1578 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1579 )
1580 event_handler = _event_handler(state, self._response_deserializer)
1581 state.rpc_start_time = datetime.utcnow()
1582 state.method = _common.decode(self._method)
1583 call = self._managed_call(
1584 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1585 self._method,
1586 None,
1587 _determine_deadline(deadline),
1588 augmented_metadata,
1589 None if credentials is None else credentials._credentials,
1590 operations,
1591 event_handler,
1592 self._context,
1593 )
1594 _consume_request_iterator(
1595 request_iterator,
1596 state,
1597 call,
1598 self._request_serializer,
1599 event_handler,
1600 )
1601 return _MultiThreadedRendezvous(
1602 state, call, self._response_deserializer, deadline
1603 )
1606class _InitialMetadataFlags(int):
1607 """Stores immutable initial metadata flags"""
1609 def __new__(cls, value: int = _EMPTY_FLAGS):
1610 value &= cygrpc.InitialMetadataFlags.used_mask
1611 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1613 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1614 if wait_for_ready is not None:
1615 if wait_for_ready:
1616 return self.__class__(
1617 self
1618 | cygrpc.InitialMetadataFlags.wait_for_ready
1619 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1620 )
1621 elif not wait_for_ready:
1622 return self.__class__(
1623 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1624 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1625 )
1626 return self
1629class _ChannelCallState(object):
1630 channel: cygrpc.Channel
1631 managed_calls: int
1632 threading: bool
1634 def __init__(self, channel: cygrpc.Channel):
1635 self.lock = threading.Lock()
1636 self.channel = channel
1637 self.managed_calls = 0
1638 self.threading = False
1640 def reset_postfork_child(self) -> None:
1641 self.managed_calls = 0
1643 def __del__(self):
1644 try:
1645 self.channel.close(
1646 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1647 )
1648 except (TypeError, AttributeError):
1649 pass
1652def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1653 def channel_spin():
1654 while True:
1655 cygrpc.block_if_fork_in_progress(state)
1656 event = state.channel.next_call_event()
1657 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1658 continue
1659 call_completed = event.tag(event)
1660 if call_completed:
1661 with state.lock:
1662 state.managed_calls -= 1
1663 if state.managed_calls == 0:
1664 return
1666 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1667 channel_spin_thread.setDaemon(True)
1668 channel_spin_thread.start()
1671def _channel_managed_call_management(state: _ChannelCallState):
1672 # pylint: disable=too-many-arguments
1673 def create(
1674 flags: int,
1675 method: bytes,
1676 host: Optional[str],
1677 deadline: Optional[float],
1678 metadata: Optional[MetadataType],
1679 credentials: Optional[cygrpc.CallCredentials],
1680 operations: Sequence[Sequence[cygrpc.Operation]],
1681 event_handler: UserTag,
1682 context,
1683 ) -> cygrpc.IntegratedCall:
1684 """Creates a cygrpc.IntegratedCall.
1686 Args:
1687 flags: An integer bitfield of call flags.
1688 method: The RPC method.
1689 host: A host string for the created call.
1690 deadline: A float to be the deadline of the created call or None if
1691 the call is to have an infinite deadline.
1692 metadata: The metadata for the call or None.
1693 credentials: A cygrpc.CallCredentials or None.
1694 operations: A sequence of sequences of cygrpc.Operations to be
1695 started on the call.
1696 event_handler: A behavior to call to handle the events resultant from
1697 the operations on the call.
1698 context: Context object for distributed tracing.
1699 Returns:
1700 A cygrpc.IntegratedCall with which to conduct an RPC.
1701 """
1702 operations_and_tags = tuple(
1703 (
1704 operation,
1705 event_handler,
1706 )
1707 for operation in operations
1708 )
1709 with state.lock:
1710 call = state.channel.integrated_call(
1711 flags,
1712 method,
1713 host,
1714 deadline,
1715 metadata,
1716 credentials,
1717 operations_and_tags,
1718 context,
1719 )
1720 if state.managed_calls == 0:
1721 state.managed_calls = 1
1722 _run_channel_spin_thread(state)
1723 else:
1724 state.managed_calls += 1
1725 return call
1727 return create
1730class _ChannelConnectivityState(object):
1731 lock: threading.RLock
1732 channel: grpc.Channel
1733 polling: bool
1734 connectivity: grpc.ChannelConnectivity
1735 try_to_connect: bool
1736 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1737 callbacks_and_connectivities: List[
1738 Sequence[
1739 Union[
1740 Callable[[grpc.ChannelConnectivity], None],
1741 Optional[grpc.ChannelConnectivity],
1742 ]
1743 ]
1744 ]
1745 delivering: bool
1747 def __init__(self, channel: grpc.Channel):
1748 self.lock = threading.RLock()
1749 self.channel = channel
1750 self.polling = False
1751 self.connectivity = None
1752 self.try_to_connect = False
1753 self.callbacks_and_connectivities = []
1754 self.delivering = False
1756 def reset_postfork_child(self) -> None:
1757 self.polling = False
1758 self.connectivity = None
1759 self.try_to_connect = False
1760 self.callbacks_and_connectivities = []
1761 self.delivering = False
1764def _deliveries(
1765 state: _ChannelConnectivityState,
1766) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1767 callbacks_needing_update = []
1768 for callback_and_connectivity in state.callbacks_and_connectivities:
1769 (
1770 callback,
1771 callback_connectivity,
1772 ) = callback_and_connectivity
1773 if callback_connectivity is not state.connectivity:
1774 callbacks_needing_update.append(callback)
1775 callback_and_connectivity[1] = state.connectivity
1776 return callbacks_needing_update
1779def _deliver(
1780 state: _ChannelConnectivityState,
1781 initial_connectivity: grpc.ChannelConnectivity,
1782 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1783) -> None:
1784 connectivity = initial_connectivity
1785 callbacks = initial_callbacks
1786 while True:
1787 for callback in callbacks:
1788 cygrpc.block_if_fork_in_progress(state)
1789 try:
1790 callback(connectivity)
1791 except Exception: # pylint: disable=broad-except
1792 _LOGGER.exception(
1793 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1794 )
1795 with state.lock:
1796 callbacks = _deliveries(state)
1797 if callbacks:
1798 connectivity = state.connectivity
1799 else:
1800 state.delivering = False
1801 return
1804def _spawn_delivery(
1805 state: _ChannelConnectivityState,
1806 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1807) -> None:
1808 delivering_thread = cygrpc.ForkManagedThread(
1809 target=_deliver,
1810 args=(
1811 state,
1812 state.connectivity,
1813 callbacks,
1814 ),
1815 )
1816 delivering_thread.setDaemon(True)
1817 delivering_thread.start()
1818 state.delivering = True
1821# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1822def _poll_connectivity(
1823 state: _ChannelConnectivityState,
1824 channel: grpc.Channel,
1825 initial_try_to_connect: bool,
1826) -> None:
1827 try_to_connect = initial_try_to_connect
1828 connectivity = channel.check_connectivity_state(try_to_connect)
1829 with state.lock:
1830 state.connectivity = (
1831 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1832 connectivity
1833 ]
1834 )
1835 callbacks = tuple(
1836 callback for callback, _ in state.callbacks_and_connectivities
1837 )
1838 for callback_and_connectivity in state.callbacks_and_connectivities:
1839 callback_and_connectivity[1] = state.connectivity
1840 if callbacks:
1841 _spawn_delivery(state, callbacks)
1842 while True:
1843 event = channel.watch_connectivity_state(
1844 connectivity, time.time() + 0.2
1845 )
1846 cygrpc.block_if_fork_in_progress(state)
1847 with state.lock:
1848 if (
1849 not state.callbacks_and_connectivities
1850 and not state.try_to_connect
1851 ):
1852 state.polling = False
1853 state.connectivity = None
1854 break
1855 try_to_connect = state.try_to_connect
1856 state.try_to_connect = False
1857 if event.success or try_to_connect:
1858 connectivity = channel.check_connectivity_state(try_to_connect)
1859 with state.lock:
1860 state.connectivity = (
1861 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1862 connectivity
1863 ]
1864 )
1865 if not state.delivering:
1866 callbacks = _deliveries(state)
1867 if callbacks:
1868 _spawn_delivery(state, callbacks)
1871def _subscribe(
1872 state: _ChannelConnectivityState,
1873 callback: Callable[[grpc.ChannelConnectivity], None],
1874 try_to_connect: bool,
1875) -> None:
1876 with state.lock:
1877 if not state.callbacks_and_connectivities and not state.polling:
1878 polling_thread = cygrpc.ForkManagedThread(
1879 target=_poll_connectivity,
1880 args=(state, state.channel, bool(try_to_connect)),
1881 )
1882 polling_thread.setDaemon(True)
1883 polling_thread.start()
1884 state.polling = True
1885 state.callbacks_and_connectivities.append([callback, None])
1886 elif not state.delivering and state.connectivity is not None:
1887 _spawn_delivery(state, (callback,))
1888 state.try_to_connect |= bool(try_to_connect)
1889 state.callbacks_and_connectivities.append(
1890 [callback, state.connectivity]
1891 )
1892 else:
1893 state.try_to_connect |= bool(try_to_connect)
1894 state.callbacks_and_connectivities.append([callback, None])
1897def _unsubscribe(
1898 state: _ChannelConnectivityState,
1899 callback: Callable[[grpc.ChannelConnectivity], None],
1900) -> None:
1901 with state.lock:
1902 for index, (subscribed_callback, unused_connectivity) in enumerate(
1903 state.callbacks_and_connectivities
1904 ):
1905 if callback == subscribed_callback:
1906 state.callbacks_and_connectivities.pop(index)
1907 break
1910def _augment_options(
1911 base_options: Sequence[ChannelArgumentType],
1912 compression: Optional[grpc.Compression],
1913) -> Sequence[ChannelArgumentType]:
1914 compression_option = _compression.create_channel_option(compression)
1915 return (
1916 tuple(base_options)
1917 + compression_option
1918 + (
1919 (
1920 cygrpc.ChannelArgKey.primary_user_agent_string,
1921 _USER_AGENT,
1922 ),
1923 )
1924 )
1927def _separate_channel_options(
1928 options: Sequence[ChannelArgumentType],
1929) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
1930 """Separates core channel options from Python channel options."""
1931 core_options = []
1932 python_options = []
1933 for pair in options:
1934 if (
1935 pair[0]
1936 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
1937 ):
1938 python_options.append(pair)
1939 else:
1940 core_options.append(pair)
1941 return python_options, core_options
1944class Channel(grpc.Channel):
1945 """A cygrpc.Channel-backed implementation of grpc.Channel."""
1947 _single_threaded_unary_stream: bool
1948 _channel: cygrpc.Channel
1949 _call_state: _ChannelCallState
1950 _connectivity_state: _ChannelConnectivityState
1952 def __init__(
1953 self,
1954 target: str,
1955 options: Sequence[ChannelArgumentType],
1956 credentials: Optional[grpc.ChannelCredentials],
1957 compression: Optional[grpc.Compression],
1958 ):
1959 """Constructor.
1961 Args:
1962 target: The target to which to connect.
1963 options: Configuration options for the channel.
1964 credentials: A cygrpc.ChannelCredentials or None.
1965 compression: An optional value indicating the compression method to be
1966 used over the lifetime of the channel.
1967 """
1968 python_options, core_options = _separate_channel_options(options)
1969 self._single_threaded_unary_stream = (
1970 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1971 )
1972 self._process_python_options(python_options)
1973 self._channel = cygrpc.Channel(
1974 _common.encode(target),
1975 _augment_options(core_options, compression),
1976 credentials,
1977 )
1978 self._call_state = _ChannelCallState(self._channel)
1979 self._connectivity_state = _ChannelConnectivityState(self._channel)
1980 cygrpc.fork_register_channel(self)
1981 if cygrpc.g_gevent_activated:
1982 cygrpc.gevent_increment_channel_count()
1984 def _process_python_options(
1985 self, python_options: Sequence[ChannelArgumentType]
1986 ) -> None:
1987 """Sets channel attributes according to python-only channel options."""
1988 for pair in python_options:
1989 if (
1990 pair[0]
1991 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
1992 ):
1993 self._single_threaded_unary_stream = True
1995 def subscribe(
1996 self,
1997 callback: Callable[[grpc.ChannelConnectivity], None],
1998 try_to_connect: Optional[bool] = None,
1999 ) -> None:
2000 _subscribe(self._connectivity_state, callback, try_to_connect)
2002 def unsubscribe(
2003 self, callback: Callable[[grpc.ChannelConnectivity], None]
2004 ) -> None:
2005 _unsubscribe(self._connectivity_state, callback)
2007 def unary_unary(
2008 self,
2009 method: str,
2010 request_serializer: Optional[SerializingFunction] = None,
2011 response_deserializer: Optional[DeserializingFunction] = None,
2012 ) -> grpc.UnaryUnaryMultiCallable:
2013 return _UnaryUnaryMultiCallable(
2014 self._channel,
2015 _channel_managed_call_management(self._call_state),
2016 _common.encode(method),
2017 request_serializer,
2018 response_deserializer,
2019 )
2021 def unary_stream(
2022 self,
2023 method: str,
2024 request_serializer: Optional[SerializingFunction] = None,
2025 response_deserializer: Optional[DeserializingFunction] = None,
2026 ) -> grpc.UnaryStreamMultiCallable:
2027 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2028 # on a single Python thread results in an appreciable speed-up. However,
2029 # due to slight differences in capability, the multi-threaded variant
2030 # remains the default.
2031 if self._single_threaded_unary_stream:
2032 return _SingleThreadedUnaryStreamMultiCallable(
2033 self._channel,
2034 _common.encode(method),
2035 request_serializer,
2036 response_deserializer,
2037 )
2038 else:
2039 return _UnaryStreamMultiCallable(
2040 self._channel,
2041 _channel_managed_call_management(self._call_state),
2042 _common.encode(method),
2043 request_serializer,
2044 response_deserializer,
2045 )
2047 def stream_unary(
2048 self,
2049 method: str,
2050 request_serializer: Optional[SerializingFunction] = None,
2051 response_deserializer: Optional[DeserializingFunction] = None,
2052 ) -> grpc.StreamUnaryMultiCallable:
2053 return _StreamUnaryMultiCallable(
2054 self._channel,
2055 _channel_managed_call_management(self._call_state),
2056 _common.encode(method),
2057 request_serializer,
2058 response_deserializer,
2059 )
2061 def stream_stream(
2062 self,
2063 method: str,
2064 request_serializer: Optional[SerializingFunction] = None,
2065 response_deserializer: Optional[DeserializingFunction] = None,
2066 ) -> grpc.StreamStreamMultiCallable:
2067 return _StreamStreamMultiCallable(
2068 self._channel,
2069 _channel_managed_call_management(self._call_state),
2070 _common.encode(method),
2071 request_serializer,
2072 response_deserializer,
2073 )
2075 def _unsubscribe_all(self) -> None:
2076 state = self._connectivity_state
2077 if state:
2078 with state.lock:
2079 del state.callbacks_and_connectivities[:]
2081 def _close(self) -> None:
2082 self._unsubscribe_all()
2083 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2084 cygrpc.fork_unregister_channel(self)
2085 if cygrpc.g_gevent_activated:
2086 cygrpc.gevent_decrement_channel_count()
2088 def _close_on_fork(self) -> None:
2089 self._unsubscribe_all()
2090 self._channel.close_on_fork(
2091 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2092 )
2094 def __enter__(self):
2095 return self
2097 def __exit__(self, exc_type, exc_val, exc_tb):
2098 self._close()
2099 return False
2101 def close(self) -> None:
2102 self._close()
2104 def __del__(self):
2105 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2106 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2107 # here (or more likely, call self._close() here). We don't do this today
2108 # because many valid use cases today allow the channel to be deleted
2109 # immediately after stubs are created. After a sufficient period of time
2110 # has passed for all users to be trusted to freeze out to their channels
2111 # for as long as they are in use and to close them after using them,
2112 # then deletion of this grpc._channel.Channel instance can be made to
2113 # effect closure of the underlying cygrpc.Channel instance.
2114 try:
2115 self._unsubscribe_all()
2116 except: # pylint: disable=bare-except
2117 # Exceptions in __del__ are ignored by Python anyway, but they can
2118 # keep spamming logs. Just silence them.
2119 pass