Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Dict,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[float] # In relative seconds
134 rpc_end_time: Optional[float] # In relative seconds
135 method: Optional[str]
136 target: Optional[str]
138 def __init__(
139 self,
140 due: Sequence[cygrpc.OperationType],
141 initial_metadata: Optional[MetadataType],
142 trailing_metadata: Optional[MetadataType],
143 code: Optional[grpc.StatusCode],
144 details: Optional[str],
145 ):
146 # `condition` guards all members of _RPCState. `notify_all` is called on
147 # `condition` when the state of the RPC has changed.
148 self.condition = threading.Condition()
150 # The cygrpc.OperationType objects representing events due from the RPC's
151 # completion queue. If an operation is in `due`, it is guaranteed that
152 # `operate()` has been called on a corresponding operation. But the
153 # converse is not true. That is, in the case of failed `operate()`
154 # calls, there may briefly be events in `due` that do not correspond to
155 # operations submitted to Core.
156 self.due = set(due)
157 self.initial_metadata = initial_metadata
158 self.response = None
159 self.trailing_metadata = trailing_metadata
160 self.code = code
161 self.details = details
162 self.debug_error_string = None
163 # The following three fields are used for observability.
164 # Updates to those fields do not trigger self.condition.
165 self.rpc_start_time = None
166 self.rpc_end_time = None
167 self.method = None
168 self.target = None
170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171 # slightly wonky, so they have to be tracked separately from the rest of the
172 # result of the RPC. This field tracks whether cancellation was requested
173 # prior to termination of the RPC.
174 self.cancelled = False
175 self.callbacks = []
176 self.fork_epoch = cygrpc.get_fork_epoch()
178 def reset_postfork_child(self):
179 self.condition = threading.Condition()
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183 if state.code is None:
184 state.code = code
185 state.details = details
186 if state.initial_metadata is None:
187 state.initial_metadata = ()
188 state.trailing_metadata = ()
191def _handle_event(
192 event: cygrpc.BaseEvent,
193 state: _RPCState,
194 response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196 callbacks = []
197 for batch_operation in event.batch_operations:
198 operation_type = batch_operation.type()
199 state.due.remove(operation_type)
200 if operation_type == cygrpc.OperationType.receive_initial_metadata:
201 state.initial_metadata = batch_operation.initial_metadata()
202 elif operation_type == cygrpc.OperationType.receive_message:
203 serialized_response = batch_operation.message()
204 if serialized_response is not None:
205 response = _common.deserialize(
206 serialized_response, response_deserializer
207 )
208 if response is None:
209 details = "Exception deserializing response!"
210 _abort(state, grpc.StatusCode.INTERNAL, details)
211 else:
212 state.response = response
213 elif operation_type == cygrpc.OperationType.receive_status_on_client:
214 state.trailing_metadata = batch_operation.trailing_metadata()
215 if state.code is None:
216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217 batch_operation.code()
218 )
219 if code is None:
220 state.code = grpc.StatusCode.UNKNOWN
221 state.details = _unknown_code_details(
222 code, batch_operation.details()
223 )
224 else:
225 state.code = code
226 state.details = batch_operation.details()
227 state.debug_error_string = batch_operation.error_string()
228 state.rpc_end_time = time.perf_counter()
229 _observability.maybe_record_rpc_latency(state)
230 callbacks.extend(state.callbacks)
231 state.callbacks = None
232 return callbacks
235def _event_handler(
236 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238 def handle_event(event):
239 with state.condition:
240 callbacks = _handle_event(event, state, response_deserializer)
241 state.condition.notify_all()
242 done = not state.due
243 for callback in callbacks:
244 try:
245 callback()
246 except Exception as e: # pylint: disable=broad-except
247 # NOTE(rbellevi): We suppress but log errors here so as not to
248 # kill the channel spin thread.
249 _LOGGER.error(
250 "Exception in callback %s: %s", repr(callback.func), repr(e)
251 )
252 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
254 return handle_event
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260 request_iterator: Iterator,
261 state: _RPCState,
262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263 request_serializer: SerializingFunction,
264 event_handler: Optional[UserTag],
265) -> None:
266 """Consume a request supplied by the user."""
268 def consume_request_iterator(): # pylint: disable=too-many-branches
269 # Iterate over the request iterator until it is exhausted or an error
270 # condition is encountered.
271 while True:
272 return_from_user_request_generator_invoked = False
273 try:
274 # The thread may die in user-code. Do not block fork for this.
275 cygrpc.enter_user_request_generator()
276 request = next(request_iterator)
277 except StopIteration:
278 break
279 except Exception: # pylint: disable=broad-except
280 cygrpc.return_from_user_request_generator()
281 return_from_user_request_generator_invoked = True
282 code = grpc.StatusCode.UNKNOWN
283 details = "Exception iterating requests!"
284 _LOGGER.exception(details)
285 call.cancel(
286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287 )
288 _abort(state, code, details)
289 return
290 finally:
291 if not return_from_user_request_generator_invoked:
292 cygrpc.return_from_user_request_generator()
293 serialized_request = _common.serialize(request, request_serializer)
294 with state.condition:
295 if state.code is None and not state.cancelled:
296 if serialized_request is None:
297 code = grpc.StatusCode.INTERNAL
298 details = "Exception serializing request!"
299 call.cancel(
300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301 details,
302 )
303 _abort(state, code, details)
304 return
305 state.due.add(cygrpc.OperationType.send_message)
306 operations = (
307 cygrpc.SendMessageOperation(
308 serialized_request, _EMPTY_FLAGS
309 ),
310 )
311 operating = call.operate(operations, event_handler)
312 if not operating:
313 state.due.remove(cygrpc.OperationType.send_message)
314 return
316 def _done():
317 return (
318 state.code is not None
319 or cygrpc.OperationType.send_message
320 not in state.due
321 )
323 _common.wait(
324 state.condition.wait,
325 _done,
326 spin_cb=functools.partial(
327 cygrpc.block_if_fork_in_progress, state
328 ),
329 )
330 if state.code is not None:
331 return
332 else:
333 return
334 with state.condition:
335 if state.code is None:
336 state.due.add(cygrpc.OperationType.send_close_from_client)
337 operations = (
338 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
339 )
340 operating = call.operate(operations, event_handler)
341 if not operating:
342 state.due.remove(
343 cygrpc.OperationType.send_close_from_client
344 )
346 consumption_thread = cygrpc.ForkManagedThread(
347 target=consume_request_iterator
348 )
349 consumption_thread.setDaemon(True)
350 consumption_thread.start()
353def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
354 """Calculates error string for RPC."""
355 with rpc_state.condition:
356 if rpc_state.code is None:
357 return "<{} object>".format(class_name)
358 if rpc_state.code is grpc.StatusCode.OK:
359 return _OK_RENDEZVOUS_REPR_FORMAT.format(
360 class_name, rpc_state.code, rpc_state.details
361 )
362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
363 class_name,
364 rpc_state.code,
365 rpc_state.details,
366 rpc_state.debug_error_string,
367 )
370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
371 """An RPC error not tied to the execution of a particular RPC.
373 The RPC represented by the state object must not be in-progress or
374 cancelled.
376 Attributes:
377 _state: An instance of _RPCState.
378 """
380 _state: _RPCState
382 def __init__(self, state: _RPCState):
383 with state.condition:
384 self._state = _RPCState(
385 (),
386 copy.deepcopy(state.initial_metadata),
387 copy.deepcopy(state.trailing_metadata),
388 state.code,
389 copy.deepcopy(state.details),
390 )
391 self._state.response = copy.copy(state.response)
392 self._state.debug_error_string = copy.copy(state.debug_error_string)
394 def initial_metadata(self) -> Optional[MetadataType]:
395 return self._state.initial_metadata
397 def trailing_metadata(self) -> Optional[MetadataType]:
398 return self._state.trailing_metadata
400 def code(self) -> Optional[grpc.StatusCode]:
401 return self._state.code
403 def details(self) -> Optional[str]:
404 return _common.decode(self._state.details)
406 def debug_error_string(self) -> Optional[str]:
407 return _common.decode(self._state.debug_error_string)
409 def _repr(self) -> str:
410 return _rpc_state_string(self.__class__.__name__, self._state)
412 def __repr__(self) -> str:
413 return self._repr()
415 def __str__(self) -> str:
416 return self._repr()
418 def cancel(self) -> bool:
419 """See grpc.Future.cancel."""
420 return False
422 def cancelled(self) -> bool:
423 """See grpc.Future.cancelled."""
424 return False
426 def running(self) -> bool:
427 """See grpc.Future.running."""
428 return False
430 def done(self) -> bool:
431 """See grpc.Future.done."""
432 return True
434 def result(
435 self, timeout: Optional[float] = None
436 ) -> Any: # pylint: disable=unused-argument
437 """See grpc.Future.result."""
438 raise self
440 def exception(
441 self, timeout: Optional[float] = None # pylint: disable=unused-argument
442 ) -> Optional[Exception]:
443 """See grpc.Future.exception."""
444 return self
446 def traceback(
447 self, timeout: Optional[float] = None # pylint: disable=unused-argument
448 ) -> Optional[types.TracebackType]:
449 """See grpc.Future.traceback."""
450 try:
451 raise self
452 except grpc.RpcError:
453 return sys.exc_info()[2]
455 def add_done_callback(
456 self,
457 fn: Callable[[grpc.Future], None],
458 timeout: Optional[float] = None, # pylint: disable=unused-argument
459 ) -> None:
460 """See grpc.Future.add_done_callback."""
461 fn(self)
464class _Rendezvous(grpc.RpcError, grpc.RpcContext):
465 """An RPC iterator.
467 Attributes:
468 _state: An instance of _RPCState.
469 _call: An instance of SegregatedCall or IntegratedCall.
470 In either case, the _call object is expected to have operate, cancel,
471 and next_event methods.
472 _response_deserializer: A callable taking bytes and return a Python
473 object.
474 _deadline: A float representing the deadline of the RPC in seconds. Or
475 possibly None, to represent an RPC with no deadline at all.
476 """
478 _state: _RPCState
479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
480 _response_deserializer: Optional[DeserializingFunction]
481 _deadline: Optional[float]
483 def __init__(
484 self,
485 state: _RPCState,
486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
487 response_deserializer: Optional[DeserializingFunction],
488 deadline: Optional[float],
489 ):
490 super(_Rendezvous, self).__init__()
491 self._state = state
492 self._call = call
493 self._response_deserializer = response_deserializer
494 self._deadline = deadline
496 def is_active(self) -> bool:
497 """See grpc.RpcContext.is_active"""
498 with self._state.condition:
499 return self._state.code is None
501 def time_remaining(self) -> Optional[float]:
502 """See grpc.RpcContext.time_remaining"""
503 with self._state.condition:
504 if self._deadline is None:
505 return None
506 return max(self._deadline - time.time(), 0)
508 def cancel(self) -> bool:
509 """See grpc.RpcContext.cancel"""
510 with self._state.condition:
511 if self._state.code is None:
512 code = grpc.StatusCode.CANCELLED
513 details = "Locally cancelled by application!"
514 self._call.cancel(
515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
516 )
517 self._state.cancelled = True
518 _abort(self._state, code, details)
519 self._state.condition.notify_all()
520 return True
521 return False
523 def add_callback(self, callback: NullaryCallbackType) -> bool:
524 """See grpc.RpcContext.add_callback"""
525 with self._state.condition:
526 if self._state.callbacks is None:
527 return False
528 self._state.callbacks.append(callback)
529 return True
531 def __iter__(self):
532 return self
534 def next(self):
535 return self._next()
537 def __next__(self):
538 return self._next()
540 def _next(self):
541 raise NotImplementedError()
543 def debug_error_string(self) -> Optional[str]:
544 raise NotImplementedError()
546 def _repr(self) -> str:
547 return _rpc_state_string(self.__class__.__name__, self._state)
549 def __repr__(self) -> str:
550 return self._repr()
552 def __str__(self) -> str:
553 return self._repr()
555 def __del__(self) -> None:
556 with self._state.condition:
557 if self._state.code is None:
558 self._state.code = grpc.StatusCode.CANCELLED
559 self._state.details = "Cancelled upon garbage collection!"
560 self._state.cancelled = True
561 self._call.cancel(
562 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
563 self._state.details,
564 )
565 self._state.condition.notify_all()
568class _SingleThreadedRendezvous(
569 _Rendezvous, grpc.Call, grpc.Future
570): # pylint: disable=too-many-ancestors
571 """An RPC iterator operating entirely on a single thread.
573 The __next__ method of _SingleThreadedRendezvous does not depend on the
574 existence of any other thread, including the "channel spin thread".
575 However, this means that its interface is entirely synchronous. So this
576 class cannot completely fulfill the grpc.Future interface. The result,
577 exception, and traceback methods will never block and will instead raise
578 an exception if calling the method would result in blocking.
580 This means that these methods are safe to call from add_done_callback
581 handlers.
582 """
584 _state: _RPCState
586 def _is_complete(self) -> bool:
587 return self._state.code is not None
589 def cancelled(self) -> bool:
590 with self._state.condition:
591 return self._state.cancelled
593 def running(self) -> bool:
594 with self._state.condition:
595 return self._state.code is None
597 def done(self) -> bool:
598 with self._state.condition:
599 return self._state.code is not None
601 def result(self, timeout: Optional[float] = None) -> Any:
602 """Returns the result of the computation or raises its exception.
604 This method will never block. Instead, it will raise an exception
605 if calling this method would otherwise result in blocking.
607 Since this method will never block, any `timeout` argument passed will
608 be ignored.
609 """
610 del timeout
611 with self._state.condition:
612 if not self._is_complete():
613 error_msg = (
614 "_SingleThreadedRendezvous only supports "
615 "result() when the RPC is complete."
616 )
617 raise grpc.experimental.UsageError(error_msg)
618 if self._state.code is grpc.StatusCode.OK:
619 return self._state.response
620 if self._state.cancelled:
621 raise grpc.FutureCancelledError()
622 raise self
624 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
625 """Return the exception raised by the computation.
627 This method will never block. Instead, it will raise an exception
628 if calling this method would otherwise result in blocking.
630 Since this method will never block, any `timeout` argument passed will
631 be ignored.
632 """
633 del timeout
634 with self._state.condition:
635 if not self._is_complete():
636 error_msg = (
637 "_SingleThreadedRendezvous only supports "
638 "exception() when the RPC is complete."
639 )
640 raise grpc.experimental.UsageError(error_msg)
641 if self._state.code is grpc.StatusCode.OK:
642 return None
643 if self._state.cancelled:
644 raise grpc.FutureCancelledError()
645 return self
647 def traceback(
648 self, timeout: Optional[float] = None
649 ) -> Optional[types.TracebackType]:
650 """Access the traceback of the exception raised by the computation.
652 This method will never block. Instead, it will raise an exception
653 if calling this method would otherwise result in blocking.
655 Since this method will never block, any `timeout` argument passed will
656 be ignored.
657 """
658 del timeout
659 with self._state.condition:
660 if not self._is_complete():
661 msg = (
662 "_SingleThreadedRendezvous only supports "
663 "traceback() when the RPC is complete."
664 )
665 raise grpc.experimental.UsageError(msg)
666 if self._state.code is grpc.StatusCode.OK:
667 return None
668 if self._state.cancelled:
669 raise grpc.FutureCancelledError()
670 try:
671 raise self
672 except grpc.RpcError:
673 return sys.exc_info()[2]
675 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
676 with self._state.condition:
677 if self._state.code is None:
678 self._state.callbacks.append(functools.partial(fn, self))
679 return
681 fn(self)
683 def initial_metadata(self) -> Optional[MetadataType]:
684 """See grpc.Call.initial_metadata"""
685 with self._state.condition:
686 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
687 # to receive initial metadata before any messages.
688 while self._state.initial_metadata is None:
689 self._consume_next_event()
690 return self._state.initial_metadata
692 def trailing_metadata(self) -> Optional[MetadataType]:
693 """See grpc.Call.trailing_metadata"""
694 with self._state.condition:
695 if self._state.trailing_metadata is None:
696 error_msg = (
697 "Cannot get trailing metadata until RPC is completed."
698 )
699 raise grpc.experimental.UsageError(error_msg)
700 return self._state.trailing_metadata
702 def code(self) -> Optional[grpc.StatusCode]:
703 """See grpc.Call.code"""
704 with self._state.condition:
705 if self._state.code is None:
706 error_msg = "Cannot get code until RPC is completed."
707 raise grpc.experimental.UsageError(error_msg)
708 return self._state.code
710 def details(self) -> Optional[str]:
711 """See grpc.Call.details"""
712 with self._state.condition:
713 if self._state.details is None:
714 error_msg = "Cannot get details until RPC is completed."
715 raise grpc.experimental.UsageError(error_msg)
716 return _common.decode(self._state.details)
718 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
719 event = self._call.next_event()
720 with self._state.condition:
721 callbacks = _handle_event(
722 event, self._state, self._response_deserializer
723 )
724 for callback in callbacks:
725 # NOTE(gnossen): We intentionally allow exceptions to bubble up
726 # to the user when running on a single thread.
727 callback()
728 return event
730 def _next_response(self) -> Any:
731 while True:
732 self._consume_next_event()
733 with self._state.condition:
734 if self._state.response is not None:
735 response = self._state.response
736 self._state.response = None
737 return response
738 if cygrpc.OperationType.receive_message not in self._state.due:
739 if self._state.code is grpc.StatusCode.OK:
740 raise StopIteration()
741 if self._state.code is not None:
742 raise self
744 def _next(self) -> Any:
745 with self._state.condition:
746 if self._state.code is None:
747 # We tentatively add the operation as expected and remove
748 # it if the enqueue operation fails. This allows us to guarantee that
749 # if an event has been submitted to the core completion queue,
750 # it is in `due`. If we waited until after a successful
751 # enqueue operation then a signal could interrupt this
752 # thread between the enqueue operation and the addition of the
753 # operation to `due`. This would cause an exception on the
754 # channel spin thread when the operation completes and no
755 # corresponding operation would be present in state.due.
756 # Note that, since `condition` is held through this block, there is
757 # no data race on `due`.
758 self._state.due.add(cygrpc.OperationType.receive_message)
759 operating = self._call.operate(
760 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
761 )
762 if not operating:
763 self._state.due.remove(cygrpc.OperationType.receive_message)
764 elif self._state.code is grpc.StatusCode.OK:
765 raise StopIteration()
766 else:
767 raise self
768 return self._next_response()
770 def debug_error_string(self) -> Optional[str]:
771 with self._state.condition:
772 if self._state.debug_error_string is None:
773 error_msg = (
774 "Cannot get debug error string until RPC is completed."
775 )
776 raise grpc.experimental.UsageError(error_msg)
777 return _common.decode(self._state.debug_error_string)
780class _MultiThreadedRendezvous(
781 _Rendezvous, grpc.Call, grpc.Future
782): # pylint: disable=too-many-ancestors
783 """An RPC iterator that depends on a channel spin thread.
785 This iterator relies upon a per-channel thread running in the background,
786 dequeueing events from the completion queue, and notifying threads waiting
787 on the threading.Condition object in the _RPCState object.
789 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
790 and to mediate a bidirection streaming RPC.
791 """
793 _state: _RPCState
795 def initial_metadata(self) -> Optional[MetadataType]:
796 """See grpc.Call.initial_metadata"""
797 with self._state.condition:
799 def _done():
800 return self._state.initial_metadata is not None
802 _common.wait(self._state.condition.wait, _done)
803 return self._state.initial_metadata
805 def trailing_metadata(self) -> Optional[MetadataType]:
806 """See grpc.Call.trailing_metadata"""
807 with self._state.condition:
809 def _done():
810 return self._state.trailing_metadata is not None
812 _common.wait(self._state.condition.wait, _done)
813 return self._state.trailing_metadata
815 def code(self) -> Optional[grpc.StatusCode]:
816 """See grpc.Call.code"""
817 with self._state.condition:
819 def _done():
820 return self._state.code is not None
822 _common.wait(self._state.condition.wait, _done)
823 return self._state.code
825 def details(self) -> Optional[str]:
826 """See grpc.Call.details"""
827 with self._state.condition:
829 def _done():
830 return self._state.details is not None
832 _common.wait(self._state.condition.wait, _done)
833 return _common.decode(self._state.details)
835 def debug_error_string(self) -> Optional[str]:
836 with self._state.condition:
838 def _done():
839 return self._state.debug_error_string is not None
841 _common.wait(self._state.condition.wait, _done)
842 return _common.decode(self._state.debug_error_string)
844 def cancelled(self) -> bool:
845 with self._state.condition:
846 return self._state.cancelled
848 def running(self) -> bool:
849 with self._state.condition:
850 return self._state.code is None
852 def done(self) -> bool:
853 with self._state.condition:
854 return self._state.code is not None
856 def _is_complete(self) -> bool:
857 return self._state.code is not None
859 def result(self, timeout: Optional[float] = None) -> Any:
860 """Returns the result of the computation or raises its exception.
862 See grpc.Future.result for the full API contract.
863 """
864 with self._state.condition:
865 timed_out = _common.wait(
866 self._state.condition.wait, self._is_complete, timeout=timeout
867 )
868 if timed_out:
869 raise grpc.FutureTimeoutError()
870 if self._state.code is grpc.StatusCode.OK:
871 return self._state.response
872 if self._state.cancelled:
873 raise grpc.FutureCancelledError()
874 raise self
876 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
877 """Return the exception raised by the computation.
879 See grpc.Future.exception for the full API contract.
880 """
881 with self._state.condition:
882 timed_out = _common.wait(
883 self._state.condition.wait, self._is_complete, timeout=timeout
884 )
885 if timed_out:
886 raise grpc.FutureTimeoutError()
887 if self._state.code is grpc.StatusCode.OK:
888 return None
889 if self._state.cancelled:
890 raise grpc.FutureCancelledError()
891 return self
893 def traceback(
894 self, timeout: Optional[float] = None
895 ) -> Optional[types.TracebackType]:
896 """Access the traceback of the exception raised by the computation.
898 See grpc.future.traceback for the full API contract.
899 """
900 with self._state.condition:
901 timed_out = _common.wait(
902 self._state.condition.wait, self._is_complete, timeout=timeout
903 )
904 if timed_out:
905 raise grpc.FutureTimeoutError()
906 if self._state.code is grpc.StatusCode.OK:
907 return None
908 if self._state.cancelled:
909 raise grpc.FutureCancelledError()
910 try:
911 raise self
912 except grpc.RpcError:
913 return sys.exc_info()[2]
915 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
916 with self._state.condition:
917 if self._state.code is None:
918 self._state.callbacks.append(functools.partial(fn, self))
919 return
921 fn(self)
923 def _next(self) -> Any:
924 with self._state.condition:
925 if self._state.code is None:
926 event_handler = _event_handler(
927 self._state, self._response_deserializer
928 )
929 self._state.due.add(cygrpc.OperationType.receive_message)
930 operating = self._call.operate(
931 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
932 event_handler,
933 )
934 if not operating:
935 self._state.due.remove(cygrpc.OperationType.receive_message)
936 elif self._state.code is grpc.StatusCode.OK:
937 raise StopIteration()
938 else:
939 raise self
941 def _response_ready():
942 return self._state.response is not None or (
943 cygrpc.OperationType.receive_message not in self._state.due
944 and self._state.code is not None
945 )
947 _common.wait(self._state.condition.wait, _response_ready)
948 if self._state.response is not None:
949 response = self._state.response
950 self._state.response = None
951 return response
952 if cygrpc.OperationType.receive_message not in self._state.due:
953 if self._state.code is grpc.StatusCode.OK:
954 raise StopIteration()
955 if self._state.code is not None:
956 raise self
959def _start_unary_request(
960 request: Any,
961 timeout: Optional[float],
962 request_serializer: SerializingFunction,
963) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
964 deadline = _deadline(timeout)
965 serialized_request = _common.serialize(request, request_serializer)
966 if serialized_request is None:
967 state = _RPCState(
968 (),
969 (),
970 (),
971 grpc.StatusCode.INTERNAL,
972 "Exception serializing request!",
973 )
974 error = _InactiveRpcError(state)
975 return deadline, None, error
976 return deadline, serialized_request, None
979def _end_unary_response_blocking(
980 state: _RPCState,
981 call: cygrpc.SegregatedCall,
982 with_call: bool,
983 deadline: Optional[float],
984) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
985 if state.code is grpc.StatusCode.OK:
986 if with_call:
987 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
988 return state.response, rendezvous
989 return state.response
990 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
993def _stream_unary_invocation_operations(
994 metadata: Optional[MetadataType], initial_metadata_flags: int
995) -> Sequence[Sequence[cygrpc.Operation]]:
996 return (
997 (
998 cygrpc.SendInitialMetadataOperation(
999 metadata, initial_metadata_flags
1000 ),
1001 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1002 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1003 ),
1004 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1005 )
1008def _stream_unary_invocation_operations_and_tags(
1009 metadata: Optional[MetadataType], initial_metadata_flags: int
1010) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1011 return tuple(
1012 (
1013 operations,
1014 None,
1015 )
1016 for operations in _stream_unary_invocation_operations(
1017 metadata, initial_metadata_flags
1018 )
1019 )
1022def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1023 parent_deadline = cygrpc.get_deadline_from_context()
1024 if parent_deadline is None and user_deadline is None:
1025 return None
1026 if parent_deadline is not None and user_deadline is None:
1027 return parent_deadline
1028 if user_deadline is not None and parent_deadline is None:
1029 return user_deadline
1030 return min(parent_deadline, user_deadline)
1033class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1034 _channel: cygrpc.Channel
1035 _managed_call: IntegratedCallFactory
1036 _method: bytes
1037 _target: bytes
1038 _request_serializer: Optional[SerializingFunction]
1039 _response_deserializer: Optional[DeserializingFunction]
1040 _context: Any
1041 _registered_call_handle: Optional[int]
1043 __slots__ = [
1044 "_channel",
1045 "_context",
1046 "_managed_call",
1047 "_method",
1048 "_request_serializer",
1049 "_response_deserializer",
1050 "_target",
1051 ]
1053 # pylint: disable=too-many-arguments
1054 def __init__(
1055 self,
1056 channel: cygrpc.Channel,
1057 managed_call: IntegratedCallFactory,
1058 method: bytes,
1059 target: bytes,
1060 request_serializer: Optional[SerializingFunction],
1061 response_deserializer: Optional[DeserializingFunction],
1062 _registered_call_handle: Optional[int],
1063 ):
1064 self._channel = channel
1065 self._managed_call = managed_call
1066 self._method = method
1067 self._target = target
1068 self._request_serializer = request_serializer
1069 self._response_deserializer = response_deserializer
1070 self._context = cygrpc.build_census_context()
1071 self._registered_call_handle = _registered_call_handle
1073 def _prepare(
1074 self,
1075 request: Any,
1076 timeout: Optional[float],
1077 metadata: Optional[MetadataType],
1078 wait_for_ready: Optional[bool],
1079 compression: Optional[grpc.Compression],
1080 ) -> Tuple[
1081 Optional[_RPCState],
1082 Optional[Sequence[cygrpc.Operation]],
1083 Optional[float],
1084 Optional[grpc.RpcError],
1085 ]:
1086 deadline, serialized_request, rendezvous = _start_unary_request(
1087 request, timeout, self._request_serializer
1088 )
1089 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1090 wait_for_ready
1091 )
1092 augmented_metadata = _compression.augment_metadata(
1093 metadata, compression
1094 )
1095 if serialized_request is None:
1096 return None, None, None, rendezvous
1097 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1098 operations = (
1099 cygrpc.SendInitialMetadataOperation(
1100 augmented_metadata, initial_metadata_flags
1101 ),
1102 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1103 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1104 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1105 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1106 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1107 )
1108 return state, operations, deadline, None
1110 def _blocking(
1111 self,
1112 request: Any,
1113 timeout: Optional[float] = None,
1114 metadata: Optional[MetadataType] = None,
1115 credentials: Optional[grpc.CallCredentials] = None,
1116 wait_for_ready: Optional[bool] = None,
1117 compression: Optional[grpc.Compression] = None,
1118 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1119 state, operations, deadline, rendezvous = self._prepare(
1120 request, timeout, metadata, wait_for_ready, compression
1121 )
1122 if state is None:
1123 raise rendezvous # pylint: disable-msg=raising-bad-type
1124 state.rpc_start_time = time.perf_counter()
1125 state.method = _common.decode(self._method)
1126 state.target = _common.decode(self._target)
1127 call = self._channel.segregated_call(
1128 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1129 self._method,
1130 None,
1131 _determine_deadline(deadline),
1132 metadata,
1133 None if credentials is None else credentials._credentials,
1134 (
1135 (
1136 operations,
1137 None,
1138 ),
1139 ),
1140 self._context,
1141 self._registered_call_handle,
1142 )
1143 event = call.next_event()
1144 _handle_event(event, state, self._response_deserializer)
1145 return state, call
1147 def __call__(
1148 self,
1149 request: Any,
1150 timeout: Optional[float] = None,
1151 metadata: Optional[MetadataType] = None,
1152 credentials: Optional[grpc.CallCredentials] = None,
1153 wait_for_ready: Optional[bool] = None,
1154 compression: Optional[grpc.Compression] = None,
1155 ) -> Any:
1156 state, call = self._blocking(
1157 request, timeout, metadata, credentials, wait_for_ready, compression
1158 )
1159 return _end_unary_response_blocking(state, call, False, None)
1161 def with_call(
1162 self,
1163 request: Any,
1164 timeout: Optional[float] = None,
1165 metadata: Optional[MetadataType] = None,
1166 credentials: Optional[grpc.CallCredentials] = None,
1167 wait_for_ready: Optional[bool] = None,
1168 compression: Optional[grpc.Compression] = None,
1169 ) -> Tuple[Any, grpc.Call]:
1170 state, call = self._blocking(
1171 request, timeout, metadata, credentials, wait_for_ready, compression
1172 )
1173 return _end_unary_response_blocking(state, call, True, None)
1175 def future(
1176 self,
1177 request: Any,
1178 timeout: Optional[float] = None,
1179 metadata: Optional[MetadataType] = None,
1180 credentials: Optional[grpc.CallCredentials] = None,
1181 wait_for_ready: Optional[bool] = None,
1182 compression: Optional[grpc.Compression] = None,
1183 ) -> _MultiThreadedRendezvous:
1184 state, operations, deadline, rendezvous = self._prepare(
1185 request, timeout, metadata, wait_for_ready, compression
1186 )
1187 if state is None:
1188 raise rendezvous # pylint: disable-msg=raising-bad-type
1189 event_handler = _event_handler(state, self._response_deserializer)
1190 state.rpc_start_time = time.perf_counter()
1191 state.method = _common.decode(self._method)
1192 state.target = _common.decode(self._target)
1193 call = self._managed_call(
1194 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1195 self._method,
1196 None,
1197 deadline,
1198 metadata,
1199 None if credentials is None else credentials._credentials,
1200 (operations,),
1201 event_handler,
1202 self._context,
1203 self._registered_call_handle,
1204 )
1205 return _MultiThreadedRendezvous(
1206 state, call, self._response_deserializer, deadline
1207 )
1210class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1211 _channel: cygrpc.Channel
1212 _method: bytes
1213 _target: bytes
1214 _request_serializer: Optional[SerializingFunction]
1215 _response_deserializer: Optional[DeserializingFunction]
1216 _context: Any
1217 _registered_call_handle: Optional[int]
1219 __slots__ = [
1220 "_channel",
1221 "_context",
1222 "_method",
1223 "_request_serializer",
1224 "_response_deserializer",
1225 "_target",
1226 ]
1228 # pylint: disable=too-many-arguments
1229 def __init__(
1230 self,
1231 channel: cygrpc.Channel,
1232 method: bytes,
1233 target: bytes,
1234 request_serializer: SerializingFunction,
1235 response_deserializer: DeserializingFunction,
1236 _registered_call_handle: Optional[int],
1237 ):
1238 self._channel = channel
1239 self._method = method
1240 self._target = target
1241 self._request_serializer = request_serializer
1242 self._response_deserializer = response_deserializer
1243 self._context = cygrpc.build_census_context()
1244 self._registered_call_handle = _registered_call_handle
1246 def __call__( # pylint: disable=too-many-locals
1247 self,
1248 request: Any,
1249 timeout: Optional[float] = None,
1250 metadata: Optional[MetadataType] = None,
1251 credentials: Optional[grpc.CallCredentials] = None,
1252 wait_for_ready: Optional[bool] = None,
1253 compression: Optional[grpc.Compression] = None,
1254 ) -> _SingleThreadedRendezvous:
1255 deadline = _deadline(timeout)
1256 serialized_request = _common.serialize(
1257 request, self._request_serializer
1258 )
1259 if serialized_request is None:
1260 state = _RPCState(
1261 (),
1262 (),
1263 (),
1264 grpc.StatusCode.INTERNAL,
1265 "Exception serializing request!",
1266 )
1267 raise _InactiveRpcError(state)
1269 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1270 call_credentials = (
1271 None if credentials is None else credentials._credentials
1272 )
1273 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1274 wait_for_ready
1275 )
1276 augmented_metadata = _compression.augment_metadata(
1277 metadata, compression
1278 )
1279 operations = (
1280 (
1281 cygrpc.SendInitialMetadataOperation(
1282 augmented_metadata, initial_metadata_flags
1283 ),
1284 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1285 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1286 ),
1287 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1288 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1289 )
1290 operations_and_tags = tuple((ops, None) for ops in operations)
1291 state.rpc_start_time = time.perf_counter()
1292 state.method = _common.decode(self._method)
1293 state.target = _common.decode(self._target)
1294 call = self._channel.segregated_call(
1295 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1296 self._method,
1297 None,
1298 _determine_deadline(deadline),
1299 metadata,
1300 call_credentials,
1301 operations_and_tags,
1302 self._context,
1303 self._registered_call_handle,
1304 )
1305 return _SingleThreadedRendezvous(
1306 state, call, self._response_deserializer, deadline
1307 )
1310class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1311 _channel: cygrpc.Channel
1312 _managed_call: IntegratedCallFactory
1313 _method: bytes
1314 _target: bytes
1315 _request_serializer: Optional[SerializingFunction]
1316 _response_deserializer: Optional[DeserializingFunction]
1317 _context: Any
1318 _registered_call_handle: Optional[int]
1320 __slots__ = [
1321 "_channel",
1322 "_context",
1323 "_managed_call",
1324 "_method",
1325 "_request_serializer",
1326 "_response_deserializer",
1327 "_target",
1328 ]
1330 # pylint: disable=too-many-arguments
1331 def __init__(
1332 self,
1333 channel: cygrpc.Channel,
1334 managed_call: IntegratedCallFactory,
1335 method: bytes,
1336 target: bytes,
1337 request_serializer: SerializingFunction,
1338 response_deserializer: DeserializingFunction,
1339 _registered_call_handle: Optional[int],
1340 ):
1341 self._channel = channel
1342 self._managed_call = managed_call
1343 self._method = method
1344 self._target = target
1345 self._request_serializer = request_serializer
1346 self._response_deserializer = response_deserializer
1347 self._context = cygrpc.build_census_context()
1348 self._registered_call_handle = _registered_call_handle
1350 def __call__( # pylint: disable=too-many-locals
1351 self,
1352 request: Any,
1353 timeout: Optional[float] = None,
1354 metadata: Optional[MetadataType] = None,
1355 credentials: Optional[grpc.CallCredentials] = None,
1356 wait_for_ready: Optional[bool] = None,
1357 compression: Optional[grpc.Compression] = None,
1358 ) -> _MultiThreadedRendezvous:
1359 deadline, serialized_request, rendezvous = _start_unary_request(
1360 request, timeout, self._request_serializer
1361 )
1362 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1363 wait_for_ready
1364 )
1365 if serialized_request is None:
1366 raise rendezvous # pylint: disable-msg=raising-bad-type
1367 augmented_metadata = _compression.augment_metadata(
1368 metadata, compression
1369 )
1370 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1371 operations = (
1372 (
1373 cygrpc.SendInitialMetadataOperation(
1374 augmented_metadata, initial_metadata_flags
1375 ),
1376 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1377 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1378 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1379 ),
1380 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1381 )
1382 state.rpc_start_time = time.perf_counter()
1383 state.method = _common.decode(self._method)
1384 state.target = _common.decode(self._target)
1385 call = self._managed_call(
1386 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1387 self._method,
1388 None,
1389 _determine_deadline(deadline),
1390 metadata,
1391 None if credentials is None else credentials._credentials,
1392 operations,
1393 _event_handler(state, self._response_deserializer),
1394 self._context,
1395 self._registered_call_handle,
1396 )
1397 return _MultiThreadedRendezvous(
1398 state, call, self._response_deserializer, deadline
1399 )
1402class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1403 _channel: cygrpc.Channel
1404 _managed_call: IntegratedCallFactory
1405 _method: bytes
1406 _target: bytes
1407 _request_serializer: Optional[SerializingFunction]
1408 _response_deserializer: Optional[DeserializingFunction]
1409 _context: Any
1410 _registered_call_handle: Optional[int]
1412 __slots__ = [
1413 "_channel",
1414 "_context",
1415 "_managed_call",
1416 "_method",
1417 "_request_serializer",
1418 "_response_deserializer",
1419 "_target",
1420 ]
1422 # pylint: disable=too-many-arguments
1423 def __init__(
1424 self,
1425 channel: cygrpc.Channel,
1426 managed_call: IntegratedCallFactory,
1427 method: bytes,
1428 target: bytes,
1429 request_serializer: Optional[SerializingFunction],
1430 response_deserializer: Optional[DeserializingFunction],
1431 _registered_call_handle: Optional[int],
1432 ):
1433 self._channel = channel
1434 self._managed_call = managed_call
1435 self._method = method
1436 self._target = target
1437 self._request_serializer = request_serializer
1438 self._response_deserializer = response_deserializer
1439 self._context = cygrpc.build_census_context()
1440 self._registered_call_handle = _registered_call_handle
1442 def _blocking(
1443 self,
1444 request_iterator: Iterator,
1445 timeout: Optional[float],
1446 metadata: Optional[MetadataType],
1447 credentials: Optional[grpc.CallCredentials],
1448 wait_for_ready: Optional[bool],
1449 compression: Optional[grpc.Compression],
1450 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1451 deadline = _deadline(timeout)
1452 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1453 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1454 wait_for_ready
1455 )
1456 augmented_metadata = _compression.augment_metadata(
1457 metadata, compression
1458 )
1459 state.rpc_start_time = time.perf_counter()
1460 state.method = _common.decode(self._method)
1461 state.target = _common.decode(self._target)
1462 call = self._channel.segregated_call(
1463 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1464 self._method,
1465 None,
1466 _determine_deadline(deadline),
1467 augmented_metadata,
1468 None if credentials is None else credentials._credentials,
1469 _stream_unary_invocation_operations_and_tags(
1470 augmented_metadata, initial_metadata_flags
1471 ),
1472 self._context,
1473 self._registered_call_handle,
1474 )
1475 _consume_request_iterator(
1476 request_iterator, state, call, self._request_serializer, None
1477 )
1478 while True:
1479 event = call.next_event()
1480 with state.condition:
1481 _handle_event(event, state, self._response_deserializer)
1482 state.condition.notify_all()
1483 if not state.due:
1484 break
1485 return state, call
1487 def __call__(
1488 self,
1489 request_iterator: Iterator,
1490 timeout: Optional[float] = None,
1491 metadata: Optional[MetadataType] = None,
1492 credentials: Optional[grpc.CallCredentials] = None,
1493 wait_for_ready: Optional[bool] = None,
1494 compression: Optional[grpc.Compression] = None,
1495 ) -> Any:
1496 state, call = self._blocking(
1497 request_iterator,
1498 timeout,
1499 metadata,
1500 credentials,
1501 wait_for_ready,
1502 compression,
1503 )
1504 return _end_unary_response_blocking(state, call, False, None)
1506 def with_call(
1507 self,
1508 request_iterator: Iterator,
1509 timeout: Optional[float] = None,
1510 metadata: Optional[MetadataType] = None,
1511 credentials: Optional[grpc.CallCredentials] = None,
1512 wait_for_ready: Optional[bool] = None,
1513 compression: Optional[grpc.Compression] = None,
1514 ) -> Tuple[Any, grpc.Call]:
1515 state, call = self._blocking(
1516 request_iterator,
1517 timeout,
1518 metadata,
1519 credentials,
1520 wait_for_ready,
1521 compression,
1522 )
1523 return _end_unary_response_blocking(state, call, True, None)
1525 def future(
1526 self,
1527 request_iterator: Iterator,
1528 timeout: Optional[float] = None,
1529 metadata: Optional[MetadataType] = None,
1530 credentials: Optional[grpc.CallCredentials] = None,
1531 wait_for_ready: Optional[bool] = None,
1532 compression: Optional[grpc.Compression] = None,
1533 ) -> _MultiThreadedRendezvous:
1534 deadline = _deadline(timeout)
1535 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1536 event_handler = _event_handler(state, self._response_deserializer)
1537 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1538 wait_for_ready
1539 )
1540 augmented_metadata = _compression.augment_metadata(
1541 metadata, compression
1542 )
1543 state.rpc_start_time = time.perf_counter()
1544 state.method = _common.decode(self._method)
1545 state.target = _common.decode(self._target)
1546 call = self._managed_call(
1547 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1548 self._method,
1549 None,
1550 deadline,
1551 augmented_metadata,
1552 None if credentials is None else credentials._credentials,
1553 _stream_unary_invocation_operations(
1554 metadata, initial_metadata_flags
1555 ),
1556 event_handler,
1557 self._context,
1558 self._registered_call_handle,
1559 )
1560 _consume_request_iterator(
1561 request_iterator,
1562 state,
1563 call,
1564 self._request_serializer,
1565 event_handler,
1566 )
1567 return _MultiThreadedRendezvous(
1568 state, call, self._response_deserializer, deadline
1569 )
1572class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1573 _channel: cygrpc.Channel
1574 _managed_call: IntegratedCallFactory
1575 _method: bytes
1576 _target: bytes
1577 _request_serializer: Optional[SerializingFunction]
1578 _response_deserializer: Optional[DeserializingFunction]
1579 _context: Any
1580 _registered_call_handle: Optional[int]
1582 __slots__ = [
1583 "_channel",
1584 "_context",
1585 "_managed_call",
1586 "_method",
1587 "_request_serializer",
1588 "_response_deserializer",
1589 "_target",
1590 ]
1592 # pylint: disable=too-many-arguments
1593 def __init__(
1594 self,
1595 channel: cygrpc.Channel,
1596 managed_call: IntegratedCallFactory,
1597 method: bytes,
1598 target: bytes,
1599 request_serializer: Optional[SerializingFunction],
1600 response_deserializer: Optional[DeserializingFunction],
1601 _registered_call_handle: Optional[int],
1602 ):
1603 self._channel = channel
1604 self._managed_call = managed_call
1605 self._method = method
1606 self._target = target
1607 self._request_serializer = request_serializer
1608 self._response_deserializer = response_deserializer
1609 self._context = cygrpc.build_census_context()
1610 self._registered_call_handle = _registered_call_handle
1612 def __call__(
1613 self,
1614 request_iterator: Iterator,
1615 timeout: Optional[float] = None,
1616 metadata: Optional[MetadataType] = None,
1617 credentials: Optional[grpc.CallCredentials] = None,
1618 wait_for_ready: Optional[bool] = None,
1619 compression: Optional[grpc.Compression] = None,
1620 ) -> _MultiThreadedRendezvous:
1621 deadline = _deadline(timeout)
1622 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1623 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1624 wait_for_ready
1625 )
1626 augmented_metadata = _compression.augment_metadata(
1627 metadata, compression
1628 )
1629 operations = (
1630 (
1631 cygrpc.SendInitialMetadataOperation(
1632 augmented_metadata, initial_metadata_flags
1633 ),
1634 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1635 ),
1636 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1637 )
1638 event_handler = _event_handler(state, self._response_deserializer)
1639 state.rpc_start_time = time.perf_counter()
1640 state.method = _common.decode(self._method)
1641 state.target = _common.decode(self._target)
1642 call = self._managed_call(
1643 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1644 self._method,
1645 None,
1646 _determine_deadline(deadline),
1647 augmented_metadata,
1648 None if credentials is None else credentials._credentials,
1649 operations,
1650 event_handler,
1651 self._context,
1652 self._registered_call_handle,
1653 )
1654 _consume_request_iterator(
1655 request_iterator,
1656 state,
1657 call,
1658 self._request_serializer,
1659 event_handler,
1660 )
1661 return _MultiThreadedRendezvous(
1662 state, call, self._response_deserializer, deadline
1663 )
1666class _InitialMetadataFlags(int):
1667 """Stores immutable initial metadata flags"""
1669 def __new__(cls, value: int = _EMPTY_FLAGS):
1670 value &= cygrpc.InitialMetadataFlags.used_mask
1671 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1673 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1674 if wait_for_ready is not None:
1675 if wait_for_ready:
1676 return self.__class__(
1677 self
1678 | cygrpc.InitialMetadataFlags.wait_for_ready
1679 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1680 )
1681 if not wait_for_ready:
1682 return self.__class__(
1683 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1684 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1685 )
1686 return self
1689class _ChannelCallState(object):
1690 channel: cygrpc.Channel
1691 managed_calls: int
1692 threading: bool
1694 def __init__(self, channel: cygrpc.Channel):
1695 self.lock = threading.Lock()
1696 self.channel = channel
1697 self.managed_calls = 0
1698 self.threading = False
1700 def reset_postfork_child(self) -> None:
1701 self.managed_calls = 0
1703 def __del__(self):
1704 try:
1705 self.channel.close(
1706 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1707 )
1708 except (TypeError, AttributeError):
1709 pass
1712def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1713 def channel_spin():
1714 while True:
1715 cygrpc.block_if_fork_in_progress(state)
1716 event = state.channel.next_call_event()
1717 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1718 continue
1719 call_completed = event.tag(event)
1720 if call_completed:
1721 with state.lock:
1722 state.managed_calls -= 1
1723 if state.managed_calls == 0:
1724 return
1726 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1727 channel_spin_thread.setDaemon(True)
1728 channel_spin_thread.start()
1731def _channel_managed_call_management(state: _ChannelCallState):
1732 # pylint: disable=too-many-arguments
1733 def create(
1734 flags: int,
1735 method: bytes,
1736 host: Optional[str],
1737 deadline: Optional[float],
1738 metadata: Optional[MetadataType],
1739 credentials: Optional[cygrpc.CallCredentials],
1740 operations: Sequence[Sequence[cygrpc.Operation]],
1741 event_handler: UserTag,
1742 context: Any,
1743 _registered_call_handle: Optional[int],
1744 ) -> cygrpc.IntegratedCall:
1745 """Creates a cygrpc.IntegratedCall.
1747 Args:
1748 flags: An integer bitfield of call flags.
1749 method: The RPC method.
1750 host: A host string for the created call.
1751 deadline: A float to be the deadline of the created call or None if
1752 the call is to have an infinite deadline.
1753 metadata: The metadata for the call or None.
1754 credentials: A cygrpc.CallCredentials or None.
1755 operations: A sequence of sequences of cygrpc.Operations to be
1756 started on the call.
1757 event_handler: A behavior to call to handle the events resultant from
1758 the operations on the call.
1759 context: Context object for distributed tracing.
1760 _registered_call_handle: An int representing the call handle of the
1761 method, or None if the method is not registered.
1763 Returns:
1764 A cygrpc.IntegratedCall with which to conduct an RPC.
1765 """
1766 operations_and_tags = tuple(
1767 (
1768 operation,
1769 event_handler,
1770 )
1771 for operation in operations
1772 )
1773 with state.lock:
1774 call = state.channel.integrated_call(
1775 flags,
1776 method,
1777 host,
1778 deadline,
1779 metadata,
1780 credentials,
1781 operations_and_tags,
1782 context,
1783 _registered_call_handle,
1784 )
1785 if state.managed_calls == 0:
1786 state.managed_calls = 1
1787 _run_channel_spin_thread(state)
1788 else:
1789 state.managed_calls += 1
1790 return call
1792 return create
1795class _ChannelConnectivityState(object):
1796 lock: threading.RLock
1797 channel: grpc.Channel
1798 polling: bool
1799 connectivity: grpc.ChannelConnectivity
1800 try_to_connect: bool
1801 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1802 callbacks_and_connectivities: List[
1803 Sequence[
1804 Union[
1805 Callable[[grpc.ChannelConnectivity], None],
1806 Optional[grpc.ChannelConnectivity],
1807 ]
1808 ]
1809 ]
1810 delivering: bool
1812 def __init__(self, channel: grpc.Channel):
1813 self.lock = threading.RLock()
1814 self.channel = channel
1815 self.polling = False
1816 self.connectivity = None
1817 self.try_to_connect = False
1818 self.callbacks_and_connectivities = []
1819 self.delivering = False
1821 def reset_postfork_child(self) -> None:
1822 self.polling = False
1823 self.connectivity = None
1824 self.try_to_connect = False
1825 self.callbacks_and_connectivities = []
1826 self.delivering = False
1829def _deliveries(
1830 state: _ChannelConnectivityState,
1831) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1832 callbacks_needing_update = []
1833 for callback_and_connectivity in state.callbacks_and_connectivities:
1834 callback, callback_connectivity = callback_and_connectivity
1835 if callback_connectivity is not state.connectivity:
1836 callbacks_needing_update.append(callback)
1837 callback_and_connectivity[1] = state.connectivity
1838 return callbacks_needing_update
1841def _deliver(
1842 state: _ChannelConnectivityState,
1843 initial_connectivity: grpc.ChannelConnectivity,
1844 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1845) -> None:
1846 connectivity = initial_connectivity
1847 callbacks = initial_callbacks
1848 while True:
1849 for callback in callbacks:
1850 cygrpc.block_if_fork_in_progress(state)
1851 try:
1852 callback(connectivity)
1853 except Exception: # pylint: disable=broad-except
1854 _LOGGER.exception(
1855 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1856 )
1857 with state.lock:
1858 callbacks = _deliveries(state)
1859 if callbacks:
1860 connectivity = state.connectivity
1861 else:
1862 state.delivering = False
1863 return
1866def _spawn_delivery(
1867 state: _ChannelConnectivityState,
1868 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1869) -> None:
1870 delivering_thread = cygrpc.ForkManagedThread(
1871 target=_deliver,
1872 args=(
1873 state,
1874 state.connectivity,
1875 callbacks,
1876 ),
1877 )
1878 delivering_thread.setDaemon(True)
1879 delivering_thread.start()
1880 state.delivering = True
1883# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1884def _poll_connectivity(
1885 state: _ChannelConnectivityState,
1886 channel: grpc.Channel,
1887 initial_try_to_connect: bool,
1888) -> None:
1889 try_to_connect = initial_try_to_connect
1890 connectivity = channel.check_connectivity_state(try_to_connect)
1891 with state.lock:
1892 state.connectivity = (
1893 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1894 connectivity
1895 ]
1896 )
1897 callbacks = tuple(
1898 callback for callback, _ in state.callbacks_and_connectivities
1899 )
1900 for callback_and_connectivity in state.callbacks_and_connectivities:
1901 callback_and_connectivity[1] = state.connectivity
1902 if callbacks:
1903 _spawn_delivery(state, callbacks)
1904 while True:
1905 event = channel.watch_connectivity_state(
1906 connectivity, time.time() + 0.2
1907 )
1908 cygrpc.block_if_fork_in_progress(state)
1909 with state.lock:
1910 if (
1911 not state.callbacks_and_connectivities
1912 and not state.try_to_connect
1913 ):
1914 state.polling = False
1915 state.connectivity = None
1916 break
1917 try_to_connect = state.try_to_connect
1918 state.try_to_connect = False
1919 if event.success or try_to_connect:
1920 connectivity = channel.check_connectivity_state(try_to_connect)
1921 with state.lock:
1922 state.connectivity = (
1923 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1924 connectivity
1925 ]
1926 )
1927 if not state.delivering:
1928 callbacks = _deliveries(state)
1929 if callbacks:
1930 _spawn_delivery(state, callbacks)
1933def _subscribe(
1934 state: _ChannelConnectivityState,
1935 callback: Callable[[grpc.ChannelConnectivity], None],
1936 try_to_connect: bool,
1937) -> None:
1938 with state.lock:
1939 if not state.callbacks_and_connectivities and not state.polling:
1940 polling_thread = cygrpc.ForkManagedThread(
1941 target=_poll_connectivity,
1942 args=(state, state.channel, bool(try_to_connect)),
1943 )
1944 polling_thread.setDaemon(True)
1945 polling_thread.start()
1946 state.polling = True
1947 state.callbacks_and_connectivities.append([callback, None])
1948 elif not state.delivering and state.connectivity is not None:
1949 _spawn_delivery(state, (callback,))
1950 state.try_to_connect |= bool(try_to_connect)
1951 state.callbacks_and_connectivities.append(
1952 [callback, state.connectivity]
1953 )
1954 else:
1955 state.try_to_connect |= bool(try_to_connect)
1956 state.callbacks_and_connectivities.append([callback, None])
1959def _unsubscribe(
1960 state: _ChannelConnectivityState,
1961 callback: Callable[[grpc.ChannelConnectivity], None],
1962) -> None:
1963 with state.lock:
1964 for index, (subscribed_callback, _unused_connectivity) in enumerate(
1965 state.callbacks_and_connectivities
1966 ):
1967 if callback == subscribed_callback:
1968 state.callbacks_and_connectivities.pop(index)
1969 break
1972def _augment_options(
1973 base_options: Sequence[ChannelArgumentType],
1974 compression: Optional[grpc.Compression],
1975) -> Sequence[ChannelArgumentType]:
1976 compression_option = _compression.create_channel_option(compression)
1977 return (
1978 tuple(base_options)
1979 + compression_option
1980 + (
1981 (
1982 cygrpc.ChannelArgKey.primary_user_agent_string,
1983 _USER_AGENT,
1984 ),
1985 )
1986 )
1989def _separate_channel_options(
1990 options: Sequence[ChannelArgumentType],
1991) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
1992 """Separates core channel options from Python channel options."""
1993 core_options = []
1994 python_options = []
1995 for pair in options:
1996 if (
1997 pair[0]
1998 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
1999 ):
2000 python_options.append(pair)
2001 else:
2002 core_options.append(pair)
2003 return python_options, core_options
2006class Channel(grpc.Channel):
2007 """A cygrpc.Channel-backed implementation of grpc.Channel."""
2009 _single_threaded_unary_stream: bool
2010 _channel: cygrpc.Channel
2011 _call_state: _ChannelCallState
2012 _connectivity_state: _ChannelConnectivityState
2013 _target: str
2014 _registered_call_handles: Dict[str, int]
2016 def __init__(
2017 self,
2018 target: str,
2019 options: Sequence[ChannelArgumentType],
2020 credentials: Optional[grpc.ChannelCredentials],
2021 compression: Optional[grpc.Compression],
2022 ):
2023 """Constructor.
2025 Args:
2026 target: The target to which to connect.
2027 options: Configuration options for the channel.
2028 credentials: A cygrpc.ChannelCredentials or None.
2029 compression: An optional value indicating the compression method to be
2030 used over the lifetime of the channel.
2031 """
2032 python_options, core_options = _separate_channel_options(options)
2033 self._single_threaded_unary_stream = (
2034 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2035 )
2036 self._process_python_options(python_options)
2037 self._channel = cygrpc.Channel(
2038 _common.encode(target),
2039 _augment_options(core_options, compression),
2040 credentials,
2041 )
2042 self._target = target
2043 self._call_state = _ChannelCallState(self._channel)
2044 self._connectivity_state = _ChannelConnectivityState(self._channel)
2045 cygrpc.fork_register_channel(self)
2046 if cygrpc.g_gevent_activated:
2047 cygrpc.gevent_increment_channel_count()
2049 def _get_registered_call_handle(self, method: str) -> int:
2050 """
2051 Get the registered call handle for a method.
2053 This is a semi-private method. It is intended for use only by gRPC generated code.
2055 This method is not thread-safe.
2057 Args:
2058 method: Required, the method name for the RPC.
2060 Returns:
2061 The registered call handle pointer in the form of a Python Long.
2062 """
2063 return self._channel.get_registered_call_handle(_common.encode(method))
2065 def _process_python_options(
2066 self, python_options: Sequence[ChannelArgumentType]
2067 ) -> None:
2068 """Sets channel attributes according to python-only channel options."""
2069 for pair in python_options:
2070 if (
2071 pair[0]
2072 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2073 ):
2074 self._single_threaded_unary_stream = True
2076 def subscribe(
2077 self,
2078 callback: Callable[[grpc.ChannelConnectivity], None],
2079 try_to_connect: Optional[bool] = None,
2080 ) -> None:
2081 _subscribe(self._connectivity_state, callback, try_to_connect)
2083 def unsubscribe(
2084 self, callback: Callable[[grpc.ChannelConnectivity], None]
2085 ) -> None:
2086 _unsubscribe(self._connectivity_state, callback)
2088 # pylint: disable=arguments-differ
2089 def unary_unary(
2090 self,
2091 method: str,
2092 request_serializer: Optional[SerializingFunction] = None,
2093 response_deserializer: Optional[DeserializingFunction] = None,
2094 _registered_method: Optional[bool] = False,
2095 ) -> grpc.UnaryUnaryMultiCallable:
2096 _registered_call_handle = None
2097 if _registered_method:
2098 _registered_call_handle = self._get_registered_call_handle(method)
2099 return _UnaryUnaryMultiCallable(
2100 self._channel,
2101 _channel_managed_call_management(self._call_state),
2102 _common.encode(method),
2103 _common.encode(self._target),
2104 request_serializer,
2105 response_deserializer,
2106 _registered_call_handle,
2107 )
2109 # pylint: disable=arguments-differ
2110 def unary_stream(
2111 self,
2112 method: str,
2113 request_serializer: Optional[SerializingFunction] = None,
2114 response_deserializer: Optional[DeserializingFunction] = None,
2115 _registered_method: Optional[bool] = False,
2116 ) -> grpc.UnaryStreamMultiCallable:
2117 _registered_call_handle = None
2118 if _registered_method:
2119 _registered_call_handle = self._get_registered_call_handle(method)
2120 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2121 # on a single Python thread results in an appreciable speed-up. However,
2122 # due to slight differences in capability, the multi-threaded variant
2123 # remains the default.
2124 if self._single_threaded_unary_stream:
2125 return _SingleThreadedUnaryStreamMultiCallable(
2126 self._channel,
2127 _common.encode(method),
2128 _common.encode(self._target),
2129 request_serializer,
2130 response_deserializer,
2131 _registered_call_handle,
2132 )
2133 return _UnaryStreamMultiCallable(
2134 self._channel,
2135 _channel_managed_call_management(self._call_state),
2136 _common.encode(method),
2137 _common.encode(self._target),
2138 request_serializer,
2139 response_deserializer,
2140 _registered_call_handle,
2141 )
2143 # pylint: disable=arguments-differ
2144 def stream_unary(
2145 self,
2146 method: str,
2147 request_serializer: Optional[SerializingFunction] = None,
2148 response_deserializer: Optional[DeserializingFunction] = None,
2149 _registered_method: Optional[bool] = False,
2150 ) -> grpc.StreamUnaryMultiCallable:
2151 _registered_call_handle = None
2152 if _registered_method:
2153 _registered_call_handle = self._get_registered_call_handle(method)
2154 return _StreamUnaryMultiCallable(
2155 self._channel,
2156 _channel_managed_call_management(self._call_state),
2157 _common.encode(method),
2158 _common.encode(self._target),
2159 request_serializer,
2160 response_deserializer,
2161 _registered_call_handle,
2162 )
2164 # pylint: disable=arguments-differ
2165 def stream_stream(
2166 self,
2167 method: str,
2168 request_serializer: Optional[SerializingFunction] = None,
2169 response_deserializer: Optional[DeserializingFunction] = None,
2170 _registered_method: Optional[bool] = False,
2171 ) -> grpc.StreamStreamMultiCallable:
2172 _registered_call_handle = None
2173 if _registered_method:
2174 _registered_call_handle = self._get_registered_call_handle(method)
2175 return _StreamStreamMultiCallable(
2176 self._channel,
2177 _channel_managed_call_management(self._call_state),
2178 _common.encode(method),
2179 _common.encode(self._target),
2180 request_serializer,
2181 response_deserializer,
2182 _registered_call_handle,
2183 )
2185 def _unsubscribe_all(self) -> None:
2186 state = self._connectivity_state
2187 if state:
2188 with state.lock:
2189 del state.callbacks_and_connectivities[:]
2191 def _close(self) -> None:
2192 self._unsubscribe_all()
2193 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2194 cygrpc.fork_unregister_channel(self)
2195 if cygrpc.g_gevent_activated:
2196 cygrpc.gevent_decrement_channel_count()
2198 def _close_on_fork(self) -> None:
2199 self._unsubscribe_all()
2200 self._channel.close_on_fork(
2201 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2202 )
2204 def __enter__(self):
2205 return self
2207 def __exit__(self, exc_type, exc_val, exc_tb):
2208 self._close()
2209 return False
2211 def close(self) -> None:
2212 self._close()
2214 def __del__(self):
2215 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2216 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2217 # here (or more likely, call self._close() here). We don't do this today
2218 # because many valid use cases today allow the channel to be deleted
2219 # immediately after stubs are created. After a sufficient period of time
2220 # has passed for all users to be trusted to freeze out to their channels
2221 # for as long as they are in use and to close them after using them,
2222 # then deletion of this grpc._channel.Channel instance can be made to
2223 # effect closure of the underlying cygrpc.Channel instance.
2224 try:
2225 self._unsubscribe_all()
2226 except: # pylint: disable=bare-except # noqa: E722
2227 # Exceptions in __del__ are ignored by Python anyway, but they can
2228 # keep spamming logs. Just silence them.
2229 pass