Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Dict,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[float] # In relative seconds
134 rpc_end_time: Optional[float] # In relative seconds
135 method: Optional[str]
136 target: Optional[str]
138 def __init__(
139 self,
140 due: Sequence[cygrpc.OperationType],
141 initial_metadata: Optional[MetadataType],
142 trailing_metadata: Optional[MetadataType],
143 code: Optional[grpc.StatusCode],
144 details: Optional[str],
145 ):
146 # `condition` guards all members of _RPCState. `notify_all` is called on
147 # `condition` when the state of the RPC has changed.
148 self.condition = threading.Condition()
150 # The cygrpc.OperationType objects representing events due from the RPC's
151 # completion queue. If an operation is in `due`, it is guaranteed that
152 # `operate()` has been called on a corresponding operation. But the
153 # converse is not true. That is, in the case of failed `operate()`
154 # calls, there may briefly be events in `due` that do not correspond to
155 # operations submitted to Core.
156 self.due = set(due)
157 self.initial_metadata = initial_metadata
158 self.response = None
159 self.trailing_metadata = trailing_metadata
160 self.code = code
161 self.details = details
162 self.debug_error_string = None
163 # The following three fields are used for observability.
164 # Updates to those fields do not trigger self.condition.
165 self.rpc_start_time = None
166 self.rpc_end_time = None
167 self.method = None
168 self.target = None
170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171 # slightly wonky, so they have to be tracked separately from the rest of the
172 # result of the RPC. This field tracks whether cancellation was requested
173 # prior to termination of the RPC.
174 self.cancelled = False
175 self.callbacks = []
176 self.fork_epoch = cygrpc.get_fork_epoch()
178 def reset_postfork_child(self):
179 self.condition = threading.Condition()
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183 if state.code is None:
184 state.code = code
185 state.details = details
186 if state.initial_metadata is None:
187 state.initial_metadata = ()
188 state.trailing_metadata = ()
191def _handle_event(
192 event: cygrpc.BaseEvent,
193 state: _RPCState,
194 response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196 callbacks = []
197 for batch_operation in event.batch_operations:
198 operation_type = batch_operation.type()
199 state.due.remove(operation_type)
200 if operation_type == cygrpc.OperationType.receive_initial_metadata:
201 state.initial_metadata = batch_operation.initial_metadata()
202 elif operation_type == cygrpc.OperationType.receive_message:
203 serialized_response = batch_operation.message()
204 if serialized_response is not None:
205 response = _common.deserialize(
206 serialized_response, response_deserializer
207 )
208 if response is None:
209 details = "Exception deserializing response!"
210 _abort(state, grpc.StatusCode.INTERNAL, details)
211 else:
212 state.response = response
213 elif operation_type == cygrpc.OperationType.receive_status_on_client:
214 state.trailing_metadata = batch_operation.trailing_metadata()
215 if state.code is None:
216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217 batch_operation.code()
218 )
219 if code is None:
220 state.code = grpc.StatusCode.UNKNOWN
221 state.details = _unknown_code_details(
222 code, batch_operation.details()
223 )
224 else:
225 state.code = code
226 state.details = batch_operation.details()
227 state.debug_error_string = batch_operation.error_string()
228 state.rpc_end_time = time.perf_counter()
229 _observability.maybe_record_rpc_latency(state)
230 callbacks.extend(state.callbacks)
231 state.callbacks = None
232 return callbacks
235def _event_handler(
236 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238 def handle_event(event):
239 with state.condition:
240 callbacks = _handle_event(event, state, response_deserializer)
241 state.condition.notify_all()
242 done = not state.due
243 for callback in callbacks:
244 try:
245 callback()
246 except Exception as e: # pylint: disable=broad-except
247 # NOTE(rbellevi): We suppress but log errors here so as not to
248 # kill the channel spin thread.
249 _LOGGER.error(
250 "Exception in callback %s: %s", repr(callback.func), repr(e)
251 )
252 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
254 return handle_event
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260 request_iterator: Iterator,
261 state: _RPCState,
262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263 request_serializer: SerializingFunction,
264 event_handler: Optional[UserTag],
265) -> None:
266 """Consume a request supplied by the user."""
268 def consume_request_iterator(): # pylint: disable=too-many-branches
269 # Iterate over the request iterator until it is exhausted or an error
270 # condition is encountered.
271 while True:
272 return_from_user_request_generator_invoked = False
273 try:
274 # The thread may die in user-code. Do not block fork for this.
275 cygrpc.enter_user_request_generator()
276 request = next(request_iterator)
277 except StopIteration:
278 break
279 except Exception: # pylint: disable=broad-except
280 cygrpc.return_from_user_request_generator()
281 return_from_user_request_generator_invoked = True
282 code = grpc.StatusCode.UNKNOWN
283 details = "Exception iterating requests!"
284 _LOGGER.exception(details)
285 call.cancel(
286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287 )
288 _abort(state, code, details)
289 return
290 finally:
291 if not return_from_user_request_generator_invoked:
292 cygrpc.return_from_user_request_generator()
293 serialized_request = _common.serialize(request, request_serializer)
294 with state.condition:
295 if state.code is None and not state.cancelled:
296 if serialized_request is None:
297 code = grpc.StatusCode.INTERNAL
298 details = "Exception serializing request!"
299 call.cancel(
300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301 details,
302 )
303 _abort(state, code, details)
304 return
305 state.due.add(cygrpc.OperationType.send_message)
306 operations = (
307 cygrpc.SendMessageOperation(
308 serialized_request, _EMPTY_FLAGS
309 ),
310 )
311 operating = call.operate(operations, event_handler)
312 if not operating:
313 state.due.remove(cygrpc.OperationType.send_message)
314 return
316 def _done():
317 return (
318 state.code is not None
319 or cygrpc.OperationType.send_message
320 not in state.due
321 )
323 _common.wait(
324 state.condition.wait,
325 _done,
326 spin_cb=functools.partial(
327 cygrpc.block_if_fork_in_progress, state
328 ),
329 )
330 if state.code is not None:
331 return
332 else:
333 return
334 with state.condition:
335 if state.code is None:
336 state.due.add(cygrpc.OperationType.send_close_from_client)
337 operations = (
338 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
339 )
340 operating = call.operate(operations, event_handler)
341 if not operating:
342 state.due.remove(
343 cygrpc.OperationType.send_close_from_client
344 )
346 consumption_thread = cygrpc.ForkManagedThread(
347 target=consume_request_iterator
348 )
349 consumption_thread.setDaemon(True)
350 consumption_thread.start()
353def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
354 """Calculates error string for RPC."""
355 with rpc_state.condition:
356 if rpc_state.code is None:
357 return "<{} object>".format(class_name)
358 if rpc_state.code is grpc.StatusCode.OK:
359 return _OK_RENDEZVOUS_REPR_FORMAT.format(
360 class_name, rpc_state.code, rpc_state.details
361 )
362 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
363 class_name,
364 rpc_state.code,
365 rpc_state.details,
366 rpc_state.debug_error_string,
367 )
370class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
371 """An RPC error not tied to the execution of a particular RPC.
373 The RPC represented by the state object must not be in-progress or
374 cancelled.
376 Attributes:
377 _state: An instance of _RPCState.
378 """
380 _state: _RPCState
382 def __init__(self, state: _RPCState):
383 with state.condition:
384 self._state = _RPCState(
385 (),
386 copy.deepcopy(state.initial_metadata),
387 copy.deepcopy(state.trailing_metadata),
388 state.code,
389 copy.deepcopy(state.details),
390 )
391 self._state.response = copy.copy(state.response)
392 self._state.debug_error_string = copy.copy(state.debug_error_string)
394 def initial_metadata(self) -> Optional[MetadataType]:
395 return self._state.initial_metadata
397 def trailing_metadata(self) -> Optional[MetadataType]:
398 return self._state.trailing_metadata
400 def code(self) -> Optional[grpc.StatusCode]:
401 return self._state.code
403 def details(self) -> Optional[str]:
404 return _common.decode(self._state.details)
406 def debug_error_string(self) -> Optional[str]:
407 return _common.decode(self._state.debug_error_string)
409 def _repr(self) -> str:
410 return _rpc_state_string(self.__class__.__name__, self._state)
412 def __repr__(self) -> str:
413 return self._repr()
415 def __str__(self) -> str:
416 return self._repr()
418 def cancel(self) -> bool:
419 """See grpc.Future.cancel."""
420 return False
422 def cancelled(self) -> bool:
423 """See grpc.Future.cancelled."""
424 return False
426 def running(self) -> bool:
427 """See grpc.Future.running."""
428 return False
430 def done(self) -> bool:
431 """See grpc.Future.done."""
432 return True
434 def result(
435 self, timeout: Optional[float] = None
436 ) -> Any: # pylint: disable=unused-argument
437 """See grpc.Future.result."""
438 raise self
440 def exception(
441 self, timeout: Optional[float] = None # pylint: disable=unused-argument
442 ) -> Optional[Exception]:
443 """See grpc.Future.exception."""
444 return self
446 def traceback(
447 self, timeout: Optional[float] = None # pylint: disable=unused-argument
448 ) -> Optional[types.TracebackType]:
449 """See grpc.Future.traceback."""
450 try:
451 raise self
452 except grpc.RpcError:
453 return sys.exc_info()[2]
455 def add_done_callback(
456 self,
457 fn: Callable[[grpc.Future], None],
458 timeout: Optional[float] = None, # pylint: disable=unused-argument
459 ) -> None:
460 """See grpc.Future.add_done_callback."""
461 fn(self)
464class _Rendezvous(grpc.RpcError, grpc.RpcContext):
465 """An RPC iterator.
467 Attributes:
468 _state: An instance of _RPCState.
469 _call: An instance of SegregatedCall or IntegratedCall.
470 In either case, the _call object is expected to have operate, cancel,
471 and next_event methods.
472 _response_deserializer: A callable taking bytes and return a Python
473 object.
474 _deadline: A float representing the deadline of the RPC in seconds. Or
475 possibly None, to represent an RPC with no deadline at all.
476 """
478 _state: _RPCState
479 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
480 _response_deserializer: Optional[DeserializingFunction]
481 _deadline: Optional[float]
483 def __init__(
484 self,
485 state: _RPCState,
486 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
487 response_deserializer: Optional[DeserializingFunction],
488 deadline: Optional[float],
489 ):
490 super(_Rendezvous, self).__init__()
491 self._state = state
492 self._call = call
493 self._response_deserializer = response_deserializer
494 self._deadline = deadline
496 def is_active(self) -> bool:
497 """See grpc.RpcContext.is_active"""
498 with self._state.condition:
499 return self._state.code is None
501 def time_remaining(self) -> Optional[float]:
502 """See grpc.RpcContext.time_remaining"""
503 with self._state.condition:
504 if self._deadline is None:
505 return None
506 return max(self._deadline - time.time(), 0)
508 def cancel(self) -> bool:
509 """See grpc.RpcContext.cancel"""
510 with self._state.condition:
511 if self._state.code is None:
512 code = grpc.StatusCode.CANCELLED
513 details = "Locally cancelled by application!"
514 self._call.cancel(
515 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
516 )
517 self._state.cancelled = True
518 _abort(self._state, code, details)
519 self._state.condition.notify_all()
520 return True
521 return False
523 def add_callback(self, callback: NullaryCallbackType) -> bool:
524 """See grpc.RpcContext.add_callback"""
525 with self._state.condition:
526 if self._state.callbacks is None:
527 return False
528 self._state.callbacks.append(callback)
529 return True
531 def __iter__(self):
532 return self
534 def next(self):
535 return self._next()
537 def __next__(self):
538 return self._next()
540 def _next(self):
541 raise NotImplementedError()
543 def debug_error_string(self) -> Optional[str]:
544 raise NotImplementedError()
546 def _repr(self) -> str:
547 return _rpc_state_string(self.__class__.__name__, self._state)
549 def __repr__(self) -> str:
550 return self._repr()
552 def __str__(self) -> str:
553 return self._repr()
555 def __del__(self) -> None:
556 with self._state.condition:
557 if self._state.code is None:
558 self._state.code = grpc.StatusCode.CANCELLED
559 self._state.details = "Cancelled upon garbage collection!"
560 self._state.cancelled = True
561 self._call.cancel(
562 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
563 self._state.details,
564 )
565 self._state.condition.notify_all()
568class _SingleThreadedRendezvous(
569 _Rendezvous, grpc.Call, grpc.Future
570): # pylint: disable=too-many-ancestors
571 """An RPC iterator operating entirely on a single thread.
573 The __next__ method of _SingleThreadedRendezvous does not depend on the
574 existence of any other thread, including the "channel spin thread".
575 However, this means that its interface is entirely synchronous. So this
576 class cannot completely fulfill the grpc.Future interface. The result,
577 exception, and traceback methods will never block and will instead raise
578 an exception if calling the method would result in blocking.
580 This means that these methods are safe to call from add_done_callback
581 handlers.
582 """
584 _state: _RPCState
586 def _is_complete(self) -> bool:
587 return self._state.code is not None
589 def cancelled(self) -> bool:
590 with self._state.condition:
591 return self._state.cancelled
593 def running(self) -> bool:
594 with self._state.condition:
595 return self._state.code is None
597 def done(self) -> bool:
598 with self._state.condition:
599 return self._state.code is not None
601 def result(self, timeout: Optional[float] = None) -> Any:
602 """Returns the result of the computation or raises its exception.
604 This method will never block. Instead, it will raise an exception
605 if calling this method would otherwise result in blocking.
607 Since this method will never block, any `timeout` argument passed will
608 be ignored.
609 """
610 del timeout
611 with self._state.condition:
612 if not self._is_complete():
613 error_msg = (
614 "_SingleThreadedRendezvous only supports "
615 "result() when the RPC is complete."
616 )
617 raise grpc.experimental.UsageError(error_msg)
618 if self._state.code is grpc.StatusCode.OK:
619 return self._state.response
620 if self._state.cancelled:
621 raise grpc.FutureCancelledError()
622 else:
623 raise self
625 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
626 """Return the exception raised by the computation.
628 This method will never block. Instead, it will raise an exception
629 if calling this method would otherwise result in blocking.
631 Since this method will never block, any `timeout` argument passed will
632 be ignored.
633 """
634 del timeout
635 with self._state.condition:
636 if not self._is_complete():
637 error_msg = (
638 "_SingleThreadedRendezvous only supports "
639 "exception() when the RPC is complete."
640 )
641 raise grpc.experimental.UsageError(error_msg)
642 if self._state.code is grpc.StatusCode.OK:
643 return None
644 if self._state.cancelled:
645 raise grpc.FutureCancelledError()
646 else:
647 return self
649 def traceback(
650 self, timeout: Optional[float] = None
651 ) -> Optional[types.TracebackType]:
652 """Access the traceback of the exception raised by the computation.
654 This method will never block. Instead, it will raise an exception
655 if calling this method would otherwise result in blocking.
657 Since this method will never block, any `timeout` argument passed will
658 be ignored.
659 """
660 del timeout
661 with self._state.condition:
662 if not self._is_complete():
663 msg = (
664 "_SingleThreadedRendezvous only supports "
665 "traceback() when the RPC is complete."
666 )
667 raise grpc.experimental.UsageError(msg)
668 if self._state.code is grpc.StatusCode.OK:
669 return None
670 if self._state.cancelled:
671 raise grpc.FutureCancelledError()
672 else:
673 try:
674 raise self
675 except grpc.RpcError:
676 return sys.exc_info()[2]
678 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
679 with self._state.condition:
680 if self._state.code is None:
681 self._state.callbacks.append(functools.partial(fn, self))
682 return
684 fn(self)
686 def initial_metadata(self) -> Optional[MetadataType]:
687 """See grpc.Call.initial_metadata"""
688 with self._state.condition:
689 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
690 # to receive initial metadata before any messages.
691 while self._state.initial_metadata is None:
692 self._consume_next_event()
693 return self._state.initial_metadata
695 def trailing_metadata(self) -> Optional[MetadataType]:
696 """See grpc.Call.trailing_metadata"""
697 with self._state.condition:
698 if self._state.trailing_metadata is None:
699 error_msg = (
700 "Cannot get trailing metadata until RPC is completed."
701 )
702 raise grpc.experimental.UsageError(error_msg)
703 return self._state.trailing_metadata
705 def code(self) -> Optional[grpc.StatusCode]:
706 """See grpc.Call.code"""
707 with self._state.condition:
708 if self._state.code is None:
709 error_msg = "Cannot get code until RPC is completed."
710 raise grpc.experimental.UsageError(error_msg)
711 return self._state.code
713 def details(self) -> Optional[str]:
714 """See grpc.Call.details"""
715 with self._state.condition:
716 if self._state.details is None:
717 error_msg = "Cannot get details until RPC is completed."
718 raise grpc.experimental.UsageError(error_msg)
719 return _common.decode(self._state.details)
721 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
722 event = self._call.next_event()
723 with self._state.condition:
724 callbacks = _handle_event(
725 event, self._state, self._response_deserializer
726 )
727 for callback in callbacks:
728 # NOTE(gnossen): We intentionally allow exceptions to bubble up
729 # to the user when running on a single thread.
730 callback()
731 return event
733 def _next_response(self) -> Any:
734 while True:
735 self._consume_next_event()
736 with self._state.condition:
737 if self._state.response is not None:
738 response = self._state.response
739 self._state.response = None
740 return response
741 if cygrpc.OperationType.receive_message not in self._state.due:
742 if self._state.code is grpc.StatusCode.OK:
743 raise StopIteration()
744 elif self._state.code is not None:
745 raise self
747 def _next(self) -> Any:
748 with self._state.condition:
749 if self._state.code is None:
750 # We tentatively add the operation as expected and remove
751 # it if the enqueue operation fails. This allows us to guarantee that
752 # if an event has been submitted to the core completion queue,
753 # it is in `due`. If we waited until after a successful
754 # enqueue operation then a signal could interrupt this
755 # thread between the enqueue operation and the addition of the
756 # operation to `due`. This would cause an exception on the
757 # channel spin thread when the operation completes and no
758 # corresponding operation would be present in state.due.
759 # Note that, since `condition` is held through this block, there is
760 # no data race on `due`.
761 self._state.due.add(cygrpc.OperationType.receive_message)
762 operating = self._call.operate(
763 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
764 )
765 if not operating:
766 self._state.due.remove(cygrpc.OperationType.receive_message)
767 elif self._state.code is grpc.StatusCode.OK:
768 raise StopIteration()
769 else:
770 raise self
771 return self._next_response()
773 def debug_error_string(self) -> Optional[str]:
774 with self._state.condition:
775 if self._state.debug_error_string is None:
776 error_msg = (
777 "Cannot get debug error string until RPC is completed."
778 )
779 raise grpc.experimental.UsageError(error_msg)
780 return _common.decode(self._state.debug_error_string)
783class _MultiThreadedRendezvous(
784 _Rendezvous, grpc.Call, grpc.Future
785): # pylint: disable=too-many-ancestors
786 """An RPC iterator that depends on a channel spin thread.
788 This iterator relies upon a per-channel thread running in the background,
789 dequeueing events from the completion queue, and notifying threads waiting
790 on the threading.Condition object in the _RPCState object.
792 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
793 and to mediate a bidirection streaming RPC.
794 """
796 _state: _RPCState
798 def initial_metadata(self) -> Optional[MetadataType]:
799 """See grpc.Call.initial_metadata"""
800 with self._state.condition:
802 def _done():
803 return self._state.initial_metadata is not None
805 _common.wait(self._state.condition.wait, _done)
806 return self._state.initial_metadata
808 def trailing_metadata(self) -> Optional[MetadataType]:
809 """See grpc.Call.trailing_metadata"""
810 with self._state.condition:
812 def _done():
813 return self._state.trailing_metadata is not None
815 _common.wait(self._state.condition.wait, _done)
816 return self._state.trailing_metadata
818 def code(self) -> Optional[grpc.StatusCode]:
819 """See grpc.Call.code"""
820 with self._state.condition:
822 def _done():
823 return self._state.code is not None
825 _common.wait(self._state.condition.wait, _done)
826 return self._state.code
828 def details(self) -> Optional[str]:
829 """See grpc.Call.details"""
830 with self._state.condition:
832 def _done():
833 return self._state.details is not None
835 _common.wait(self._state.condition.wait, _done)
836 return _common.decode(self._state.details)
838 def debug_error_string(self) -> Optional[str]:
839 with self._state.condition:
841 def _done():
842 return self._state.debug_error_string is not None
844 _common.wait(self._state.condition.wait, _done)
845 return _common.decode(self._state.debug_error_string)
847 def cancelled(self) -> bool:
848 with self._state.condition:
849 return self._state.cancelled
851 def running(self) -> bool:
852 with self._state.condition:
853 return self._state.code is None
855 def done(self) -> bool:
856 with self._state.condition:
857 return self._state.code is not None
859 def _is_complete(self) -> bool:
860 return self._state.code is not None
862 def result(self, timeout: Optional[float] = None) -> Any:
863 """Returns the result of the computation or raises its exception.
865 See grpc.Future.result for the full API contract.
866 """
867 with self._state.condition:
868 timed_out = _common.wait(
869 self._state.condition.wait, self._is_complete, timeout=timeout
870 )
871 if timed_out:
872 raise grpc.FutureTimeoutError()
873 elif self._state.code is grpc.StatusCode.OK:
874 return self._state.response
875 elif self._state.cancelled:
876 raise grpc.FutureCancelledError()
877 else:
878 raise self
880 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
881 """Return the exception raised by the computation.
883 See grpc.Future.exception for the full API contract.
884 """
885 with self._state.condition:
886 timed_out = _common.wait(
887 self._state.condition.wait, self._is_complete, timeout=timeout
888 )
889 if timed_out:
890 raise grpc.FutureTimeoutError()
891 elif self._state.code is grpc.StatusCode.OK:
892 return None
893 elif self._state.cancelled:
894 raise grpc.FutureCancelledError()
895 else:
896 return self
898 def traceback(
899 self, timeout: Optional[float] = None
900 ) -> Optional[types.TracebackType]:
901 """Access the traceback of the exception raised by the computation.
903 See grpc.future.traceback for the full API contract.
904 """
905 with self._state.condition:
906 timed_out = _common.wait(
907 self._state.condition.wait, self._is_complete, timeout=timeout
908 )
909 if timed_out:
910 raise grpc.FutureTimeoutError()
911 elif self._state.code is grpc.StatusCode.OK:
912 return None
913 elif self._state.cancelled:
914 raise grpc.FutureCancelledError()
915 else:
916 try:
917 raise self
918 except grpc.RpcError:
919 return sys.exc_info()[2]
921 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
922 with self._state.condition:
923 if self._state.code is None:
924 self._state.callbacks.append(functools.partial(fn, self))
925 return
927 fn(self)
929 def _next(self) -> Any:
930 with self._state.condition:
931 if self._state.code is None:
932 event_handler = _event_handler(
933 self._state, self._response_deserializer
934 )
935 self._state.due.add(cygrpc.OperationType.receive_message)
936 operating = self._call.operate(
937 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
938 event_handler,
939 )
940 if not operating:
941 self._state.due.remove(cygrpc.OperationType.receive_message)
942 elif self._state.code is grpc.StatusCode.OK:
943 raise StopIteration()
944 else:
945 raise self
947 def _response_ready():
948 return self._state.response is not None or (
949 cygrpc.OperationType.receive_message not in self._state.due
950 and self._state.code is not None
951 )
953 _common.wait(self._state.condition.wait, _response_ready)
954 if self._state.response is not None:
955 response = self._state.response
956 self._state.response = None
957 return response
958 if cygrpc.OperationType.receive_message not in self._state.due:
959 if self._state.code is grpc.StatusCode.OK:
960 raise StopIteration()
961 elif self._state.code is not None:
962 raise self
965def _start_unary_request(
966 request: Any,
967 timeout: Optional[float],
968 request_serializer: SerializingFunction,
969) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
970 deadline = _deadline(timeout)
971 serialized_request = _common.serialize(request, request_serializer)
972 if serialized_request is None:
973 state = _RPCState(
974 (),
975 (),
976 (),
977 grpc.StatusCode.INTERNAL,
978 "Exception serializing request!",
979 )
980 error = _InactiveRpcError(state)
981 return deadline, None, error
982 return deadline, serialized_request, None
985def _end_unary_response_blocking(
986 state: _RPCState,
987 call: cygrpc.SegregatedCall,
988 with_call: bool,
989 deadline: Optional[float],
990) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
991 if state.code is grpc.StatusCode.OK:
992 if with_call:
993 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
994 return state.response, rendezvous
995 return state.response
996 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
999def _stream_unary_invocation_operations(
1000 metadata: Optional[MetadataType], initial_metadata_flags: int
1001) -> Sequence[Sequence[cygrpc.Operation]]:
1002 return (
1003 (
1004 cygrpc.SendInitialMetadataOperation(
1005 metadata, initial_metadata_flags
1006 ),
1007 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1008 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1009 ),
1010 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1011 )
1014def _stream_unary_invocation_operations_and_tags(
1015 metadata: Optional[MetadataType], initial_metadata_flags: int
1016) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1017 return tuple(
1018 (
1019 operations,
1020 None,
1021 )
1022 for operations in _stream_unary_invocation_operations(
1023 metadata, initial_metadata_flags
1024 )
1025 )
1028def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1029 parent_deadline = cygrpc.get_deadline_from_context()
1030 if parent_deadline is None and user_deadline is None:
1031 return None
1032 if parent_deadline is not None and user_deadline is None:
1033 return parent_deadline
1034 if user_deadline is not None and parent_deadline is None:
1035 return user_deadline
1036 return min(parent_deadline, user_deadline)
1039class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1040 _channel: cygrpc.Channel
1041 _managed_call: IntegratedCallFactory
1042 _method: bytes
1043 _target: bytes
1044 _request_serializer: Optional[SerializingFunction]
1045 _response_deserializer: Optional[DeserializingFunction]
1046 _context: Any
1047 _registered_call_handle: Optional[int]
1049 __slots__ = [
1050 "_channel",
1051 "_managed_call",
1052 "_method",
1053 "_target",
1054 "_request_serializer",
1055 "_response_deserializer",
1056 "_context",
1057 ]
1059 # pylint: disable=too-many-arguments
1060 def __init__(
1061 self,
1062 channel: cygrpc.Channel,
1063 managed_call: IntegratedCallFactory,
1064 method: bytes,
1065 target: bytes,
1066 request_serializer: Optional[SerializingFunction],
1067 response_deserializer: Optional[DeserializingFunction],
1068 _registered_call_handle: Optional[int],
1069 ):
1070 self._channel = channel
1071 self._managed_call = managed_call
1072 self._method = method
1073 self._target = target
1074 self._request_serializer = request_serializer
1075 self._response_deserializer = response_deserializer
1076 self._context = cygrpc.build_census_context()
1077 self._registered_call_handle = _registered_call_handle
1079 def _prepare(
1080 self,
1081 request: Any,
1082 timeout: Optional[float],
1083 metadata: Optional[MetadataType],
1084 wait_for_ready: Optional[bool],
1085 compression: Optional[grpc.Compression],
1086 ) -> Tuple[
1087 Optional[_RPCState],
1088 Optional[Sequence[cygrpc.Operation]],
1089 Optional[float],
1090 Optional[grpc.RpcError],
1091 ]:
1092 deadline, serialized_request, rendezvous = _start_unary_request(
1093 request, timeout, self._request_serializer
1094 )
1095 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1096 wait_for_ready
1097 )
1098 augmented_metadata = _compression.augment_metadata(
1099 metadata, compression
1100 )
1101 if serialized_request is None:
1102 return None, None, None, rendezvous
1103 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1104 operations = (
1105 cygrpc.SendInitialMetadataOperation(
1106 augmented_metadata, initial_metadata_flags
1107 ),
1108 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1109 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1110 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1111 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1112 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1113 )
1114 return state, operations, deadline, None
1116 def _blocking(
1117 self,
1118 request: Any,
1119 timeout: Optional[float] = None,
1120 metadata: Optional[MetadataType] = None,
1121 credentials: Optional[grpc.CallCredentials] = None,
1122 wait_for_ready: Optional[bool] = None,
1123 compression: Optional[grpc.Compression] = None,
1124 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1125 state, operations, deadline, rendezvous = self._prepare(
1126 request, timeout, metadata, wait_for_ready, compression
1127 )
1128 if state is None:
1129 raise rendezvous # pylint: disable-msg=raising-bad-type
1130 else:
1131 state.rpc_start_time = time.perf_counter()
1132 state.method = _common.decode(self._method)
1133 state.target = _common.decode(self._target)
1134 call = self._channel.segregated_call(
1135 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1136 self._method,
1137 None,
1138 _determine_deadline(deadline),
1139 metadata,
1140 None if credentials is None else credentials._credentials,
1141 (
1142 (
1143 operations,
1144 None,
1145 ),
1146 ),
1147 self._context,
1148 self._registered_call_handle,
1149 )
1150 event = call.next_event()
1151 _handle_event(event, state, self._response_deserializer)
1152 return state, call
1154 def __call__(
1155 self,
1156 request: Any,
1157 timeout: Optional[float] = None,
1158 metadata: Optional[MetadataType] = None,
1159 credentials: Optional[grpc.CallCredentials] = None,
1160 wait_for_ready: Optional[bool] = None,
1161 compression: Optional[grpc.Compression] = None,
1162 ) -> Any:
1163 state, call = self._blocking(
1164 request, timeout, metadata, credentials, wait_for_ready, compression
1165 )
1166 return _end_unary_response_blocking(state, call, False, None)
1168 def with_call(
1169 self,
1170 request: Any,
1171 timeout: Optional[float] = None,
1172 metadata: Optional[MetadataType] = None,
1173 credentials: Optional[grpc.CallCredentials] = None,
1174 wait_for_ready: Optional[bool] = None,
1175 compression: Optional[grpc.Compression] = None,
1176 ) -> Tuple[Any, grpc.Call]:
1177 state, call = self._blocking(
1178 request, timeout, metadata, credentials, wait_for_ready, compression
1179 )
1180 return _end_unary_response_blocking(state, call, True, None)
1182 def future(
1183 self,
1184 request: Any,
1185 timeout: Optional[float] = None,
1186 metadata: Optional[MetadataType] = None,
1187 credentials: Optional[grpc.CallCredentials] = None,
1188 wait_for_ready: Optional[bool] = None,
1189 compression: Optional[grpc.Compression] = None,
1190 ) -> _MultiThreadedRendezvous:
1191 state, operations, deadline, rendezvous = self._prepare(
1192 request, timeout, metadata, wait_for_ready, compression
1193 )
1194 if state is None:
1195 raise rendezvous # pylint: disable-msg=raising-bad-type
1196 else:
1197 event_handler = _event_handler(state, self._response_deserializer)
1198 state.rpc_start_time = time.perf_counter()
1199 state.method = _common.decode(self._method)
1200 state.target = _common.decode(self._target)
1201 call = self._managed_call(
1202 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1203 self._method,
1204 None,
1205 deadline,
1206 metadata,
1207 None if credentials is None else credentials._credentials,
1208 (operations,),
1209 event_handler,
1210 self._context,
1211 self._registered_call_handle,
1212 )
1213 return _MultiThreadedRendezvous(
1214 state, call, self._response_deserializer, deadline
1215 )
1218class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1219 _channel: cygrpc.Channel
1220 _method: bytes
1221 _target: bytes
1222 _request_serializer: Optional[SerializingFunction]
1223 _response_deserializer: Optional[DeserializingFunction]
1224 _context: Any
1225 _registered_call_handle: Optional[int]
1227 __slots__ = [
1228 "_channel",
1229 "_method",
1230 "_target",
1231 "_request_serializer",
1232 "_response_deserializer",
1233 "_context",
1234 ]
1236 # pylint: disable=too-many-arguments
1237 def __init__(
1238 self,
1239 channel: cygrpc.Channel,
1240 method: bytes,
1241 target: bytes,
1242 request_serializer: SerializingFunction,
1243 response_deserializer: DeserializingFunction,
1244 _registered_call_handle: Optional[int],
1245 ):
1246 self._channel = channel
1247 self._method = method
1248 self._target = target
1249 self._request_serializer = request_serializer
1250 self._response_deserializer = response_deserializer
1251 self._context = cygrpc.build_census_context()
1252 self._registered_call_handle = _registered_call_handle
1254 def __call__( # pylint: disable=too-many-locals
1255 self,
1256 request: Any,
1257 timeout: Optional[float] = None,
1258 metadata: Optional[MetadataType] = None,
1259 credentials: Optional[grpc.CallCredentials] = None,
1260 wait_for_ready: Optional[bool] = None,
1261 compression: Optional[grpc.Compression] = None,
1262 ) -> _SingleThreadedRendezvous:
1263 deadline = _deadline(timeout)
1264 serialized_request = _common.serialize(
1265 request, self._request_serializer
1266 )
1267 if serialized_request is None:
1268 state = _RPCState(
1269 (),
1270 (),
1271 (),
1272 grpc.StatusCode.INTERNAL,
1273 "Exception serializing request!",
1274 )
1275 raise _InactiveRpcError(state)
1277 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1278 call_credentials = (
1279 None if credentials is None else credentials._credentials
1280 )
1281 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1282 wait_for_ready
1283 )
1284 augmented_metadata = _compression.augment_metadata(
1285 metadata, compression
1286 )
1287 operations = (
1288 (
1289 cygrpc.SendInitialMetadataOperation(
1290 augmented_metadata, initial_metadata_flags
1291 ),
1292 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1293 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1294 ),
1295 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1296 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1297 )
1298 operations_and_tags = tuple((ops, None) for ops in operations)
1299 state.rpc_start_time = time.perf_counter()
1300 state.method = _common.decode(self._method)
1301 state.target = _common.decode(self._target)
1302 call = self._channel.segregated_call(
1303 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1304 self._method,
1305 None,
1306 _determine_deadline(deadline),
1307 metadata,
1308 call_credentials,
1309 operations_and_tags,
1310 self._context,
1311 self._registered_call_handle,
1312 )
1313 return _SingleThreadedRendezvous(
1314 state, call, self._response_deserializer, deadline
1315 )
1318class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1319 _channel: cygrpc.Channel
1320 _managed_call: IntegratedCallFactory
1321 _method: bytes
1322 _target: bytes
1323 _request_serializer: Optional[SerializingFunction]
1324 _response_deserializer: Optional[DeserializingFunction]
1325 _context: Any
1326 _registered_call_handle: Optional[int]
1328 __slots__ = [
1329 "_channel",
1330 "_managed_call",
1331 "_method",
1332 "_target",
1333 "_request_serializer",
1334 "_response_deserializer",
1335 "_context",
1336 ]
1338 # pylint: disable=too-many-arguments
1339 def __init__(
1340 self,
1341 channel: cygrpc.Channel,
1342 managed_call: IntegratedCallFactory,
1343 method: bytes,
1344 target: bytes,
1345 request_serializer: SerializingFunction,
1346 response_deserializer: DeserializingFunction,
1347 _registered_call_handle: Optional[int],
1348 ):
1349 self._channel = channel
1350 self._managed_call = managed_call
1351 self._method = method
1352 self._target = target
1353 self._request_serializer = request_serializer
1354 self._response_deserializer = response_deserializer
1355 self._context = cygrpc.build_census_context()
1356 self._registered_call_handle = _registered_call_handle
1358 def __call__( # pylint: disable=too-many-locals
1359 self,
1360 request: Any,
1361 timeout: Optional[float] = None,
1362 metadata: Optional[MetadataType] = None,
1363 credentials: Optional[grpc.CallCredentials] = None,
1364 wait_for_ready: Optional[bool] = None,
1365 compression: Optional[grpc.Compression] = None,
1366 ) -> _MultiThreadedRendezvous:
1367 deadline, serialized_request, rendezvous = _start_unary_request(
1368 request, timeout, self._request_serializer
1369 )
1370 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1371 wait_for_ready
1372 )
1373 if serialized_request is None:
1374 raise rendezvous # pylint: disable-msg=raising-bad-type
1375 else:
1376 augmented_metadata = _compression.augment_metadata(
1377 metadata, compression
1378 )
1379 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1380 operations = (
1381 (
1382 cygrpc.SendInitialMetadataOperation(
1383 augmented_metadata, initial_metadata_flags
1384 ),
1385 cygrpc.SendMessageOperation(
1386 serialized_request, _EMPTY_FLAGS
1387 ),
1388 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1389 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1390 ),
1391 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1392 )
1393 state.rpc_start_time = time.perf_counter()
1394 state.method = _common.decode(self._method)
1395 state.target = _common.decode(self._target)
1396 call = self._managed_call(
1397 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1398 self._method,
1399 None,
1400 _determine_deadline(deadline),
1401 metadata,
1402 None if credentials is None else credentials._credentials,
1403 operations,
1404 _event_handler(state, self._response_deserializer),
1405 self._context,
1406 self._registered_call_handle,
1407 )
1408 return _MultiThreadedRendezvous(
1409 state, call, self._response_deserializer, deadline
1410 )
1413class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1414 _channel: cygrpc.Channel
1415 _managed_call: IntegratedCallFactory
1416 _method: bytes
1417 _target: bytes
1418 _request_serializer: Optional[SerializingFunction]
1419 _response_deserializer: Optional[DeserializingFunction]
1420 _context: Any
1421 _registered_call_handle: Optional[int]
1423 __slots__ = [
1424 "_channel",
1425 "_managed_call",
1426 "_method",
1427 "_target",
1428 "_request_serializer",
1429 "_response_deserializer",
1430 "_context",
1431 ]
1433 # pylint: disable=too-many-arguments
1434 def __init__(
1435 self,
1436 channel: cygrpc.Channel,
1437 managed_call: IntegratedCallFactory,
1438 method: bytes,
1439 target: bytes,
1440 request_serializer: Optional[SerializingFunction],
1441 response_deserializer: Optional[DeserializingFunction],
1442 _registered_call_handle: Optional[int],
1443 ):
1444 self._channel = channel
1445 self._managed_call = managed_call
1446 self._method = method
1447 self._target = target
1448 self._request_serializer = request_serializer
1449 self._response_deserializer = response_deserializer
1450 self._context = cygrpc.build_census_context()
1451 self._registered_call_handle = _registered_call_handle
1453 def _blocking(
1454 self,
1455 request_iterator: Iterator,
1456 timeout: Optional[float],
1457 metadata: Optional[MetadataType],
1458 credentials: Optional[grpc.CallCredentials],
1459 wait_for_ready: Optional[bool],
1460 compression: Optional[grpc.Compression],
1461 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1462 deadline = _deadline(timeout)
1463 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1464 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1465 wait_for_ready
1466 )
1467 augmented_metadata = _compression.augment_metadata(
1468 metadata, compression
1469 )
1470 state.rpc_start_time = time.perf_counter()
1471 state.method = _common.decode(self._method)
1472 state.target = _common.decode(self._target)
1473 call = self._channel.segregated_call(
1474 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1475 self._method,
1476 None,
1477 _determine_deadline(deadline),
1478 augmented_metadata,
1479 None if credentials is None else credentials._credentials,
1480 _stream_unary_invocation_operations_and_tags(
1481 augmented_metadata, initial_metadata_flags
1482 ),
1483 self._context,
1484 self._registered_call_handle,
1485 )
1486 _consume_request_iterator(
1487 request_iterator, state, call, self._request_serializer, None
1488 )
1489 while True:
1490 event = call.next_event()
1491 with state.condition:
1492 _handle_event(event, state, self._response_deserializer)
1493 state.condition.notify_all()
1494 if not state.due:
1495 break
1496 return state, call
1498 def __call__(
1499 self,
1500 request_iterator: Iterator,
1501 timeout: Optional[float] = None,
1502 metadata: Optional[MetadataType] = None,
1503 credentials: Optional[grpc.CallCredentials] = None,
1504 wait_for_ready: Optional[bool] = None,
1505 compression: Optional[grpc.Compression] = None,
1506 ) -> Any:
1507 state, call = self._blocking(
1508 request_iterator,
1509 timeout,
1510 metadata,
1511 credentials,
1512 wait_for_ready,
1513 compression,
1514 )
1515 return _end_unary_response_blocking(state, call, False, None)
1517 def with_call(
1518 self,
1519 request_iterator: Iterator,
1520 timeout: Optional[float] = None,
1521 metadata: Optional[MetadataType] = None,
1522 credentials: Optional[grpc.CallCredentials] = None,
1523 wait_for_ready: Optional[bool] = None,
1524 compression: Optional[grpc.Compression] = None,
1525 ) -> Tuple[Any, grpc.Call]:
1526 state, call = self._blocking(
1527 request_iterator,
1528 timeout,
1529 metadata,
1530 credentials,
1531 wait_for_ready,
1532 compression,
1533 )
1534 return _end_unary_response_blocking(state, call, True, None)
1536 def future(
1537 self,
1538 request_iterator: Iterator,
1539 timeout: Optional[float] = None,
1540 metadata: Optional[MetadataType] = None,
1541 credentials: Optional[grpc.CallCredentials] = None,
1542 wait_for_ready: Optional[bool] = None,
1543 compression: Optional[grpc.Compression] = None,
1544 ) -> _MultiThreadedRendezvous:
1545 deadline = _deadline(timeout)
1546 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1547 event_handler = _event_handler(state, self._response_deserializer)
1548 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1549 wait_for_ready
1550 )
1551 augmented_metadata = _compression.augment_metadata(
1552 metadata, compression
1553 )
1554 state.rpc_start_time = time.perf_counter()
1555 state.method = _common.decode(self._method)
1556 state.target = _common.decode(self._target)
1557 call = self._managed_call(
1558 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1559 self._method,
1560 None,
1561 deadline,
1562 augmented_metadata,
1563 None if credentials is None else credentials._credentials,
1564 _stream_unary_invocation_operations(
1565 metadata, initial_metadata_flags
1566 ),
1567 event_handler,
1568 self._context,
1569 self._registered_call_handle,
1570 )
1571 _consume_request_iterator(
1572 request_iterator,
1573 state,
1574 call,
1575 self._request_serializer,
1576 event_handler,
1577 )
1578 return _MultiThreadedRendezvous(
1579 state, call, self._response_deserializer, deadline
1580 )
1583class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1584 _channel: cygrpc.Channel
1585 _managed_call: IntegratedCallFactory
1586 _method: bytes
1587 _target: bytes
1588 _request_serializer: Optional[SerializingFunction]
1589 _response_deserializer: Optional[DeserializingFunction]
1590 _context: Any
1591 _registered_call_handle: Optional[int]
1593 __slots__ = [
1594 "_channel",
1595 "_managed_call",
1596 "_method",
1597 "_target",
1598 "_request_serializer",
1599 "_response_deserializer",
1600 "_context",
1601 ]
1603 # pylint: disable=too-many-arguments
1604 def __init__(
1605 self,
1606 channel: cygrpc.Channel,
1607 managed_call: IntegratedCallFactory,
1608 method: bytes,
1609 target: bytes,
1610 request_serializer: Optional[SerializingFunction],
1611 response_deserializer: Optional[DeserializingFunction],
1612 _registered_call_handle: Optional[int],
1613 ):
1614 self._channel = channel
1615 self._managed_call = managed_call
1616 self._method = method
1617 self._target = target
1618 self._request_serializer = request_serializer
1619 self._response_deserializer = response_deserializer
1620 self._context = cygrpc.build_census_context()
1621 self._registered_call_handle = _registered_call_handle
1623 def __call__(
1624 self,
1625 request_iterator: Iterator,
1626 timeout: Optional[float] = None,
1627 metadata: Optional[MetadataType] = None,
1628 credentials: Optional[grpc.CallCredentials] = None,
1629 wait_for_ready: Optional[bool] = None,
1630 compression: Optional[grpc.Compression] = None,
1631 ) -> _MultiThreadedRendezvous:
1632 deadline = _deadline(timeout)
1633 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1634 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1635 wait_for_ready
1636 )
1637 augmented_metadata = _compression.augment_metadata(
1638 metadata, compression
1639 )
1640 operations = (
1641 (
1642 cygrpc.SendInitialMetadataOperation(
1643 augmented_metadata, initial_metadata_flags
1644 ),
1645 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1646 ),
1647 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1648 )
1649 event_handler = _event_handler(state, self._response_deserializer)
1650 state.rpc_start_time = time.perf_counter()
1651 state.method = _common.decode(self._method)
1652 state.target = _common.decode(self._target)
1653 call = self._managed_call(
1654 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1655 self._method,
1656 None,
1657 _determine_deadline(deadline),
1658 augmented_metadata,
1659 None if credentials is None else credentials._credentials,
1660 operations,
1661 event_handler,
1662 self._context,
1663 self._registered_call_handle,
1664 )
1665 _consume_request_iterator(
1666 request_iterator,
1667 state,
1668 call,
1669 self._request_serializer,
1670 event_handler,
1671 )
1672 return _MultiThreadedRendezvous(
1673 state, call, self._response_deserializer, deadline
1674 )
1677class _InitialMetadataFlags(int):
1678 """Stores immutable initial metadata flags"""
1680 def __new__(cls, value: int = _EMPTY_FLAGS):
1681 value &= cygrpc.InitialMetadataFlags.used_mask
1682 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1684 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1685 if wait_for_ready is not None:
1686 if wait_for_ready:
1687 return self.__class__(
1688 self
1689 | cygrpc.InitialMetadataFlags.wait_for_ready
1690 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1691 )
1692 if not wait_for_ready:
1693 return self.__class__(
1694 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1695 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1696 )
1697 return self
1700class _ChannelCallState(object):
1701 channel: cygrpc.Channel
1702 managed_calls: int
1703 threading: bool
1705 def __init__(self, channel: cygrpc.Channel):
1706 self.lock = threading.Lock()
1707 self.channel = channel
1708 self.managed_calls = 0
1709 self.threading = False
1711 def reset_postfork_child(self) -> None:
1712 self.managed_calls = 0
1714 def __del__(self):
1715 try:
1716 self.channel.close(
1717 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1718 )
1719 except (TypeError, AttributeError):
1720 pass
1723def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1724 def channel_spin():
1725 while True:
1726 cygrpc.block_if_fork_in_progress(state)
1727 event = state.channel.next_call_event()
1728 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1729 continue
1730 call_completed = event.tag(event)
1731 if call_completed:
1732 with state.lock:
1733 state.managed_calls -= 1
1734 if state.managed_calls == 0:
1735 return
1737 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1738 channel_spin_thread.setDaemon(True)
1739 channel_spin_thread.start()
1742def _channel_managed_call_management(state: _ChannelCallState):
1743 # pylint: disable=too-many-arguments
1744 def create(
1745 flags: int,
1746 method: bytes,
1747 host: Optional[str],
1748 deadline: Optional[float],
1749 metadata: Optional[MetadataType],
1750 credentials: Optional[cygrpc.CallCredentials],
1751 operations: Sequence[Sequence[cygrpc.Operation]],
1752 event_handler: UserTag,
1753 context: Any,
1754 _registered_call_handle: Optional[int],
1755 ) -> cygrpc.IntegratedCall:
1756 """Creates a cygrpc.IntegratedCall.
1758 Args:
1759 flags: An integer bitfield of call flags.
1760 method: The RPC method.
1761 host: A host string for the created call.
1762 deadline: A float to be the deadline of the created call or None if
1763 the call is to have an infinite deadline.
1764 metadata: The metadata for the call or None.
1765 credentials: A cygrpc.CallCredentials or None.
1766 operations: A sequence of sequences of cygrpc.Operations to be
1767 started on the call.
1768 event_handler: A behavior to call to handle the events resultant from
1769 the operations on the call.
1770 context: Context object for distributed tracing.
1771 _registered_call_handle: An int representing the call handle of the
1772 method, or None if the method is not registered.
1774 Returns:
1775 A cygrpc.IntegratedCall with which to conduct an RPC.
1776 """
1777 operations_and_tags = tuple(
1778 (
1779 operation,
1780 event_handler,
1781 )
1782 for operation in operations
1783 )
1784 with state.lock:
1785 call = state.channel.integrated_call(
1786 flags,
1787 method,
1788 host,
1789 deadline,
1790 metadata,
1791 credentials,
1792 operations_and_tags,
1793 context,
1794 _registered_call_handle,
1795 )
1796 if state.managed_calls == 0:
1797 state.managed_calls = 1
1798 _run_channel_spin_thread(state)
1799 else:
1800 state.managed_calls += 1
1801 return call
1803 return create
1806class _ChannelConnectivityState(object):
1807 lock: threading.RLock
1808 channel: grpc.Channel
1809 polling: bool
1810 connectivity: grpc.ChannelConnectivity
1811 try_to_connect: bool
1812 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1813 callbacks_and_connectivities: List[
1814 Sequence[
1815 Union[
1816 Callable[[grpc.ChannelConnectivity], None],
1817 Optional[grpc.ChannelConnectivity],
1818 ]
1819 ]
1820 ]
1821 delivering: bool
1823 def __init__(self, channel: grpc.Channel):
1824 self.lock = threading.RLock()
1825 self.channel = channel
1826 self.polling = False
1827 self.connectivity = None
1828 self.try_to_connect = False
1829 self.callbacks_and_connectivities = []
1830 self.delivering = False
1832 def reset_postfork_child(self) -> None:
1833 self.polling = False
1834 self.connectivity = None
1835 self.try_to_connect = False
1836 self.callbacks_and_connectivities = []
1837 self.delivering = False
1840def _deliveries(
1841 state: _ChannelConnectivityState,
1842) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1843 callbacks_needing_update = []
1844 for callback_and_connectivity in state.callbacks_and_connectivities:
1845 callback, callback_connectivity = callback_and_connectivity
1846 if callback_connectivity is not state.connectivity:
1847 callbacks_needing_update.append(callback)
1848 callback_and_connectivity[1] = state.connectivity
1849 return callbacks_needing_update
1852def _deliver(
1853 state: _ChannelConnectivityState,
1854 initial_connectivity: grpc.ChannelConnectivity,
1855 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1856) -> None:
1857 connectivity = initial_connectivity
1858 callbacks = initial_callbacks
1859 while True:
1860 for callback in callbacks:
1861 cygrpc.block_if_fork_in_progress(state)
1862 try:
1863 callback(connectivity)
1864 except Exception: # pylint: disable=broad-except
1865 _LOGGER.exception(
1866 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1867 )
1868 with state.lock:
1869 callbacks = _deliveries(state)
1870 if callbacks:
1871 connectivity = state.connectivity
1872 else:
1873 state.delivering = False
1874 return
1877def _spawn_delivery(
1878 state: _ChannelConnectivityState,
1879 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1880) -> None:
1881 delivering_thread = cygrpc.ForkManagedThread(
1882 target=_deliver,
1883 args=(
1884 state,
1885 state.connectivity,
1886 callbacks,
1887 ),
1888 )
1889 delivering_thread.setDaemon(True)
1890 delivering_thread.start()
1891 state.delivering = True
1894# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1895def _poll_connectivity(
1896 state: _ChannelConnectivityState,
1897 channel: grpc.Channel,
1898 initial_try_to_connect: bool,
1899) -> None:
1900 try_to_connect = initial_try_to_connect
1901 connectivity = channel.check_connectivity_state(try_to_connect)
1902 with state.lock:
1903 state.connectivity = (
1904 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1905 connectivity
1906 ]
1907 )
1908 callbacks = tuple(
1909 callback for callback, _ in state.callbacks_and_connectivities
1910 )
1911 for callback_and_connectivity in state.callbacks_and_connectivities:
1912 callback_and_connectivity[1] = state.connectivity
1913 if callbacks:
1914 _spawn_delivery(state, callbacks)
1915 while True:
1916 event = channel.watch_connectivity_state(
1917 connectivity, time.time() + 0.2
1918 )
1919 cygrpc.block_if_fork_in_progress(state)
1920 with state.lock:
1921 if (
1922 not state.callbacks_and_connectivities
1923 and not state.try_to_connect
1924 ):
1925 state.polling = False
1926 state.connectivity = None
1927 break
1928 try_to_connect = state.try_to_connect
1929 state.try_to_connect = False
1930 if event.success or try_to_connect:
1931 connectivity = channel.check_connectivity_state(try_to_connect)
1932 with state.lock:
1933 state.connectivity = (
1934 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1935 connectivity
1936 ]
1937 )
1938 if not state.delivering:
1939 callbacks = _deliveries(state)
1940 if callbacks:
1941 _spawn_delivery(state, callbacks)
1944def _subscribe(
1945 state: _ChannelConnectivityState,
1946 callback: Callable[[grpc.ChannelConnectivity], None],
1947 try_to_connect: bool,
1948) -> None:
1949 with state.lock:
1950 if not state.callbacks_and_connectivities and not state.polling:
1951 polling_thread = cygrpc.ForkManagedThread(
1952 target=_poll_connectivity,
1953 args=(state, state.channel, bool(try_to_connect)),
1954 )
1955 polling_thread.setDaemon(True)
1956 polling_thread.start()
1957 state.polling = True
1958 state.callbacks_and_connectivities.append([callback, None])
1959 elif not state.delivering and state.connectivity is not None:
1960 _spawn_delivery(state, (callback,))
1961 state.try_to_connect |= bool(try_to_connect)
1962 state.callbacks_and_connectivities.append(
1963 [callback, state.connectivity]
1964 )
1965 else:
1966 state.try_to_connect |= bool(try_to_connect)
1967 state.callbacks_and_connectivities.append([callback, None])
1970def _unsubscribe(
1971 state: _ChannelConnectivityState,
1972 callback: Callable[[grpc.ChannelConnectivity], None],
1973) -> None:
1974 with state.lock:
1975 for index, (subscribed_callback, _unused_connectivity) in enumerate(
1976 state.callbacks_and_connectivities
1977 ):
1978 if callback == subscribed_callback:
1979 state.callbacks_and_connectivities.pop(index)
1980 break
1983def _augment_options(
1984 base_options: Sequence[ChannelArgumentType],
1985 compression: Optional[grpc.Compression],
1986) -> Sequence[ChannelArgumentType]:
1987 compression_option = _compression.create_channel_option(compression)
1988 return (
1989 tuple(base_options)
1990 + compression_option
1991 + (
1992 (
1993 cygrpc.ChannelArgKey.primary_user_agent_string,
1994 _USER_AGENT,
1995 ),
1996 )
1997 )
2000def _separate_channel_options(
2001 options: Sequence[ChannelArgumentType],
2002) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
2003 """Separates core channel options from Python channel options."""
2004 core_options = []
2005 python_options = []
2006 for pair in options:
2007 if (
2008 pair[0]
2009 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2010 ):
2011 python_options.append(pair)
2012 else:
2013 core_options.append(pair)
2014 return python_options, core_options
2017class Channel(grpc.Channel):
2018 """A cygrpc.Channel-backed implementation of grpc.Channel."""
2020 _single_threaded_unary_stream: bool
2021 _channel: cygrpc.Channel
2022 _call_state: _ChannelCallState
2023 _connectivity_state: _ChannelConnectivityState
2024 _target: str
2025 _registered_call_handles: Dict[str, int]
2027 def __init__(
2028 self,
2029 target: str,
2030 options: Sequence[ChannelArgumentType],
2031 credentials: Optional[grpc.ChannelCredentials],
2032 compression: Optional[grpc.Compression],
2033 ):
2034 """Constructor.
2036 Args:
2037 target: The target to which to connect.
2038 options: Configuration options for the channel.
2039 credentials: A cygrpc.ChannelCredentials or None.
2040 compression: An optional value indicating the compression method to be
2041 used over the lifetime of the channel.
2042 """
2043 python_options, core_options = _separate_channel_options(options)
2044 self._single_threaded_unary_stream = (
2045 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2046 )
2047 self._process_python_options(python_options)
2048 self._channel = cygrpc.Channel(
2049 _common.encode(target),
2050 _augment_options(core_options, compression),
2051 credentials,
2052 )
2053 self._target = target
2054 self._call_state = _ChannelCallState(self._channel)
2055 self._connectivity_state = _ChannelConnectivityState(self._channel)
2056 cygrpc.fork_register_channel(self)
2057 if cygrpc.g_gevent_activated:
2058 cygrpc.gevent_increment_channel_count()
2060 def _get_registered_call_handle(self, method: str) -> int:
2061 """
2062 Get the registered call handle for a method.
2064 This is a semi-private method. It is intended for use only by gRPC generated code.
2066 This method is not thread-safe.
2068 Args:
2069 method: Required, the method name for the RPC.
2071 Returns:
2072 The registered call handle pointer in the form of a Python Long.
2073 """
2074 return self._channel.get_registered_call_handle(_common.encode(method))
2076 def _process_python_options(
2077 self, python_options: Sequence[ChannelArgumentType]
2078 ) -> None:
2079 """Sets channel attributes according to python-only channel options."""
2080 for pair in python_options:
2081 if (
2082 pair[0]
2083 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2084 ):
2085 self._single_threaded_unary_stream = True
2087 def subscribe(
2088 self,
2089 callback: Callable[[grpc.ChannelConnectivity], None],
2090 try_to_connect: Optional[bool] = None,
2091 ) -> None:
2092 _subscribe(self._connectivity_state, callback, try_to_connect)
2094 def unsubscribe(
2095 self, callback: Callable[[grpc.ChannelConnectivity], None]
2096 ) -> None:
2097 _unsubscribe(self._connectivity_state, callback)
2099 # pylint: disable=arguments-differ
2100 def unary_unary(
2101 self,
2102 method: str,
2103 request_serializer: Optional[SerializingFunction] = None,
2104 response_deserializer: Optional[DeserializingFunction] = None,
2105 _registered_method: Optional[bool] = False,
2106 ) -> grpc.UnaryUnaryMultiCallable:
2107 _registered_call_handle = None
2108 if _registered_method:
2109 _registered_call_handle = self._get_registered_call_handle(method)
2110 return _UnaryUnaryMultiCallable(
2111 self._channel,
2112 _channel_managed_call_management(self._call_state),
2113 _common.encode(method),
2114 _common.encode(self._target),
2115 request_serializer,
2116 response_deserializer,
2117 _registered_call_handle,
2118 )
2120 # pylint: disable=arguments-differ
2121 def unary_stream(
2122 self,
2123 method: str,
2124 request_serializer: Optional[SerializingFunction] = None,
2125 response_deserializer: Optional[DeserializingFunction] = None,
2126 _registered_method: Optional[bool] = False,
2127 ) -> grpc.UnaryStreamMultiCallable:
2128 _registered_call_handle = None
2129 if _registered_method:
2130 _registered_call_handle = self._get_registered_call_handle(method)
2131 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2132 # on a single Python thread results in an appreciable speed-up. However,
2133 # due to slight differences in capability, the multi-threaded variant
2134 # remains the default.
2135 if self._single_threaded_unary_stream:
2136 return _SingleThreadedUnaryStreamMultiCallable(
2137 self._channel,
2138 _common.encode(method),
2139 _common.encode(self._target),
2140 request_serializer,
2141 response_deserializer,
2142 _registered_call_handle,
2143 )
2144 return _UnaryStreamMultiCallable(
2145 self._channel,
2146 _channel_managed_call_management(self._call_state),
2147 _common.encode(method),
2148 _common.encode(self._target),
2149 request_serializer,
2150 response_deserializer,
2151 _registered_call_handle,
2152 )
2154 # pylint: disable=arguments-differ
2155 def stream_unary(
2156 self,
2157 method: str,
2158 request_serializer: Optional[SerializingFunction] = None,
2159 response_deserializer: Optional[DeserializingFunction] = None,
2160 _registered_method: Optional[bool] = False,
2161 ) -> grpc.StreamUnaryMultiCallable:
2162 _registered_call_handle = None
2163 if _registered_method:
2164 _registered_call_handle = self._get_registered_call_handle(method)
2165 return _StreamUnaryMultiCallable(
2166 self._channel,
2167 _channel_managed_call_management(self._call_state),
2168 _common.encode(method),
2169 _common.encode(self._target),
2170 request_serializer,
2171 response_deserializer,
2172 _registered_call_handle,
2173 )
2175 # pylint: disable=arguments-differ
2176 def stream_stream(
2177 self,
2178 method: str,
2179 request_serializer: Optional[SerializingFunction] = None,
2180 response_deserializer: Optional[DeserializingFunction] = None,
2181 _registered_method: Optional[bool] = False,
2182 ) -> grpc.StreamStreamMultiCallable:
2183 _registered_call_handle = None
2184 if _registered_method:
2185 _registered_call_handle = self._get_registered_call_handle(method)
2186 return _StreamStreamMultiCallable(
2187 self._channel,
2188 _channel_managed_call_management(self._call_state),
2189 _common.encode(method),
2190 _common.encode(self._target),
2191 request_serializer,
2192 response_deserializer,
2193 _registered_call_handle,
2194 )
2196 def _unsubscribe_all(self) -> None:
2197 state = self._connectivity_state
2198 if state:
2199 with state.lock:
2200 del state.callbacks_and_connectivities[:]
2202 def _close(self) -> None:
2203 self._unsubscribe_all()
2204 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2205 cygrpc.fork_unregister_channel(self)
2206 if cygrpc.g_gevent_activated:
2207 cygrpc.gevent_decrement_channel_count()
2209 def _close_on_fork(self) -> None:
2210 self._unsubscribe_all()
2211 self._channel.close_on_fork(
2212 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2213 )
2215 def __enter__(self):
2216 return self
2218 def __exit__(self, exc_type, exc_val, exc_tb):
2219 self._close()
2220 return False
2222 def close(self) -> None:
2223 self._close()
2225 def __del__(self):
2226 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2227 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2228 # here (or more likely, call self._close() here). We don't do this today
2229 # because many valid use cases today allow the channel to be deleted
2230 # immediately after stubs are created. After a sufficient period of time
2231 # has passed for all users to be trusted to freeze out to their channels
2232 # for as long as they are in use and to close them after using them,
2233 # then deletion of this grpc._channel.Channel instance can be made to
2234 # effect closure of the underlying cygrpc.Channel instance.
2235 try:
2236 self._unsubscribe_all()
2237 except: # pylint: disable=bare-except # noqa: E722
2238 # Exceptions in __del__ are ignored by Python anyway, but they can
2239 # keep spamming logs. Just silence them.
2240 pass