Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Dict,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[float] # In relative seconds
134 rpc_end_time: Optional[float] # In relative seconds
135 method: Optional[str]
136 target: Optional[str]
138 def __init__(
139 self,
140 due: Sequence[cygrpc.OperationType],
141 initial_metadata: Optional[MetadataType],
142 trailing_metadata: Optional[MetadataType],
143 code: Optional[grpc.StatusCode],
144 details: Optional[str],
145 ):
146 # `condition` guards all members of _RPCState. `notify_all` is called on
147 # `condition` when the state of the RPC has changed.
148 self.condition = threading.Condition()
150 # The cygrpc.OperationType objects representing events due from the RPC's
151 # completion queue. If an operation is in `due`, it is guaranteed that
152 # `operate()` has been called on a corresponding operation. But the
153 # converse is not true. That is, in the case of failed `operate()`
154 # calls, there may briefly be events in `due` that do not correspond to
155 # operations submitted to Core.
156 self.due = set(due)
157 self.initial_metadata = initial_metadata
158 self.response = None
159 self.trailing_metadata = trailing_metadata
160 self.code = code
161 self.details = details
162 self.debug_error_string = None
163 # The following three fields are used for observability.
164 # Updates to those fields do not trigger self.condition.
165 self.rpc_start_time = None
166 self.rpc_end_time = None
167 self.method = None
168 self.target = None
170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171 # slightly wonky, so they have to be tracked separately from the rest of the
172 # result of the RPC. This field tracks whether cancellation was requested
173 # prior to termination of the RPC.
174 self.cancelled = False
175 self.callbacks = []
176 self.fork_epoch = cygrpc.get_fork_epoch()
178 def reset_postfork_child(self):
179 self.condition = threading.Condition()
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183 if state.code is None:
184 state.code = code
185 state.details = details
186 if state.initial_metadata is None:
187 state.initial_metadata = ()
188 state.trailing_metadata = ()
191def _handle_event(
192 event: cygrpc.BaseEvent,
193 state: _RPCState,
194 response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196 callbacks = []
197 for batch_operation in event.batch_operations:
198 operation_type = batch_operation.type()
199 state.due.remove(operation_type)
200 if operation_type == cygrpc.OperationType.receive_initial_metadata:
201 state.initial_metadata = batch_operation.initial_metadata()
202 elif operation_type == cygrpc.OperationType.receive_message:
203 serialized_response = batch_operation.message()
204 if serialized_response is not None:
205 response = _common.deserialize(
206 serialized_response, response_deserializer
207 )
208 if response is None:
209 details = "Exception deserializing response!"
210 _abort(state, grpc.StatusCode.INTERNAL, details)
211 else:
212 state.response = response
213 elif operation_type == cygrpc.OperationType.receive_status_on_client:
214 state.trailing_metadata = batch_operation.trailing_metadata()
215 if state.code is None:
216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217 batch_operation.code()
218 )
219 if code is None:
220 state.code = grpc.StatusCode.UNKNOWN
221 state.details = _unknown_code_details(
222 code, batch_operation.details()
223 )
224 else:
225 state.code = code
226 state.details = batch_operation.details()
227 state.debug_error_string = batch_operation.error_string()
228 state.rpc_end_time = time.perf_counter()
229 _observability.maybe_record_rpc_latency(state)
230 callbacks.extend(state.callbacks)
231 state.callbacks = None
232 return callbacks
235def _event_handler(
236 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238 def handle_event(event):
239 with state.condition:
240 callbacks = _handle_event(event, state, response_deserializer)
241 state.condition.notify_all()
242 done = not state.due
243 for callback in callbacks:
244 try:
245 callback()
246 except Exception as e: # pylint: disable=broad-except
247 # NOTE(rbellevi): We suppress but log errors here so as not to
248 # kill the channel spin thread.
249 _LOGGER.error(
250 "Exception in callback %s: %s", repr(callback.func), repr(e)
251 )
252 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
254 return handle_event
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260 request_iterator: Iterator,
261 state: _RPCState,
262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263 request_serializer: SerializingFunction,
264 event_handler: Optional[UserTag],
265) -> None:
266 """Consume a request supplied by the user."""
268 def consume_request_iterator(): # pylint: disable=too-many-branches
269 # Iterate over the request iterator until it is exhausted or an error
270 # condition is encountered.
271 while True:
272 return_from_user_request_generator_invoked = False
273 try:
274 # The thread may die in user-code. Do not block fork for this.
275 cygrpc.enter_user_request_generator()
276 request = next(request_iterator)
277 except StopIteration:
278 break
279 except Exception: # pylint: disable=broad-except
280 cygrpc.return_from_user_request_generator()
281 return_from_user_request_generator_invoked = True
282 code = grpc.StatusCode.UNKNOWN
283 details = "Exception iterating requests!"
284 _LOGGER.exception(details)
285 call.cancel(
286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287 )
288 _abort(state, code, details)
289 return
290 finally:
291 if not return_from_user_request_generator_invoked:
292 cygrpc.return_from_user_request_generator()
293 serialized_request = _common.serialize(request, request_serializer)
294 with state.condition:
295 if state.code is None and not state.cancelled:
296 if serialized_request is None:
297 code = grpc.StatusCode.INTERNAL
298 details = "Exception serializing request!"
299 call.cancel(
300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301 details,
302 )
303 _abort(state, code, details)
304 return
305 else:
306 state.due.add(cygrpc.OperationType.send_message)
307 operations = (
308 cygrpc.SendMessageOperation(
309 serialized_request, _EMPTY_FLAGS
310 ),
311 )
312 operating = call.operate(operations, event_handler)
313 if not operating:
314 state.due.remove(cygrpc.OperationType.send_message)
315 return
317 def _done():
318 return (
319 state.code is not None
320 or cygrpc.OperationType.send_message
321 not in state.due
322 )
324 _common.wait(
325 state.condition.wait,
326 _done,
327 spin_cb=functools.partial(
328 cygrpc.block_if_fork_in_progress, state
329 ),
330 )
331 if state.code is not None:
332 return
333 else:
334 return
335 with state.condition:
336 if state.code is None:
337 state.due.add(cygrpc.OperationType.send_close_from_client)
338 operations = (
339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
340 )
341 operating = call.operate(operations, event_handler)
342 if not operating:
343 state.due.remove(
344 cygrpc.OperationType.send_close_from_client
345 )
347 consumption_thread = cygrpc.ForkManagedThread(
348 target=consume_request_iterator
349 )
350 consumption_thread.setDaemon(True)
351 consumption_thread.start()
354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
355 """Calculates error string for RPC."""
356 with rpc_state.condition:
357 if rpc_state.code is None:
358 return "<{} object>".format(class_name)
359 elif rpc_state.code is grpc.StatusCode.OK:
360 return _OK_RENDEZVOUS_REPR_FORMAT.format(
361 class_name, rpc_state.code, rpc_state.details
362 )
363 else:
364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
365 class_name,
366 rpc_state.code,
367 rpc_state.details,
368 rpc_state.debug_error_string,
369 )
372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
373 """An RPC error not tied to the execution of a particular RPC.
375 The RPC represented by the state object must not be in-progress or
376 cancelled.
378 Attributes:
379 _state: An instance of _RPCState.
380 """
382 _state: _RPCState
384 def __init__(self, state: _RPCState):
385 with state.condition:
386 self._state = _RPCState(
387 (),
388 copy.deepcopy(state.initial_metadata),
389 copy.deepcopy(state.trailing_metadata),
390 state.code,
391 copy.deepcopy(state.details),
392 )
393 self._state.response = copy.copy(state.response)
394 self._state.debug_error_string = copy.copy(state.debug_error_string)
396 def initial_metadata(self) -> Optional[MetadataType]:
397 return self._state.initial_metadata
399 def trailing_metadata(self) -> Optional[MetadataType]:
400 return self._state.trailing_metadata
402 def code(self) -> Optional[grpc.StatusCode]:
403 return self._state.code
405 def details(self) -> Optional[str]:
406 return _common.decode(self._state.details)
408 def debug_error_string(self) -> Optional[str]:
409 return _common.decode(self._state.debug_error_string)
411 def _repr(self) -> str:
412 return _rpc_state_string(self.__class__.__name__, self._state)
414 def __repr__(self) -> str:
415 return self._repr()
417 def __str__(self) -> str:
418 return self._repr()
420 def cancel(self) -> bool:
421 """See grpc.Future.cancel."""
422 return False
424 def cancelled(self) -> bool:
425 """See grpc.Future.cancelled."""
426 return False
428 def running(self) -> bool:
429 """See grpc.Future.running."""
430 return False
432 def done(self) -> bool:
433 """See grpc.Future.done."""
434 return True
436 def result(
437 self, timeout: Optional[float] = None
438 ) -> Any: # pylint: disable=unused-argument
439 """See grpc.Future.result."""
440 raise self
442 def exception(
443 self, timeout: Optional[float] = None # pylint: disable=unused-argument
444 ) -> Optional[Exception]:
445 """See grpc.Future.exception."""
446 return self
448 def traceback(
449 self, timeout: Optional[float] = None # pylint: disable=unused-argument
450 ) -> Optional[types.TracebackType]:
451 """See grpc.Future.traceback."""
452 try:
453 raise self
454 except grpc.RpcError:
455 return sys.exc_info()[2]
457 def add_done_callback(
458 self,
459 fn: Callable[[grpc.Future], None],
460 timeout: Optional[float] = None, # pylint: disable=unused-argument
461 ) -> None:
462 """See grpc.Future.add_done_callback."""
463 fn(self)
466class _Rendezvous(grpc.RpcError, grpc.RpcContext):
467 """An RPC iterator.
469 Attributes:
470 _state: An instance of _RPCState.
471 _call: An instance of SegregatedCall or IntegratedCall.
472 In either case, the _call object is expected to have operate, cancel,
473 and next_event methods.
474 _response_deserializer: A callable taking bytes and return a Python
475 object.
476 _deadline: A float representing the deadline of the RPC in seconds. Or
477 possibly None, to represent an RPC with no deadline at all.
478 """
480 _state: _RPCState
481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
482 _response_deserializer: Optional[DeserializingFunction]
483 _deadline: Optional[float]
485 def __init__(
486 self,
487 state: _RPCState,
488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
489 response_deserializer: Optional[DeserializingFunction],
490 deadline: Optional[float],
491 ):
492 super(_Rendezvous, self).__init__()
493 self._state = state
494 self._call = call
495 self._response_deserializer = response_deserializer
496 self._deadline = deadline
498 def is_active(self) -> bool:
499 """See grpc.RpcContext.is_active"""
500 with self._state.condition:
501 return self._state.code is None
503 def time_remaining(self) -> Optional[float]:
504 """See grpc.RpcContext.time_remaining"""
505 with self._state.condition:
506 if self._deadline is None:
507 return None
508 else:
509 return max(self._deadline - time.time(), 0)
511 def cancel(self) -> bool:
512 """See grpc.RpcContext.cancel"""
513 with self._state.condition:
514 if self._state.code is None:
515 code = grpc.StatusCode.CANCELLED
516 details = "Locally cancelled by application!"
517 self._call.cancel(
518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
519 )
520 self._state.cancelled = True
521 _abort(self._state, code, details)
522 self._state.condition.notify_all()
523 return True
524 else:
525 return False
527 def add_callback(self, callback: NullaryCallbackType) -> bool:
528 """See grpc.RpcContext.add_callback"""
529 with self._state.condition:
530 if self._state.callbacks is None:
531 return False
532 else:
533 self._state.callbacks.append(callback)
534 return True
536 def __iter__(self):
537 return self
539 def next(self):
540 return self._next()
542 def __next__(self):
543 return self._next()
545 def _next(self):
546 raise NotImplementedError()
548 def debug_error_string(self) -> Optional[str]:
549 raise NotImplementedError()
551 def _repr(self) -> str:
552 return _rpc_state_string(self.__class__.__name__, self._state)
554 def __repr__(self) -> str:
555 return self._repr()
557 def __str__(self) -> str:
558 return self._repr()
560 def __del__(self) -> None:
561 with self._state.condition:
562 if self._state.code is None:
563 self._state.code = grpc.StatusCode.CANCELLED
564 self._state.details = "Cancelled upon garbage collection!"
565 self._state.cancelled = True
566 self._call.cancel(
567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
568 self._state.details,
569 )
570 self._state.condition.notify_all()
573class _SingleThreadedRendezvous(
574 _Rendezvous, grpc.Call, grpc.Future
575): # pylint: disable=too-many-ancestors
576 """An RPC iterator operating entirely on a single thread.
578 The __next__ method of _SingleThreadedRendezvous does not depend on the
579 existence of any other thread, including the "channel spin thread".
580 However, this means that its interface is entirely synchronous. So this
581 class cannot completely fulfill the grpc.Future interface. The result,
582 exception, and traceback methods will never block and will instead raise
583 an exception if calling the method would result in blocking.
585 This means that these methods are safe to call from add_done_callback
586 handlers.
587 """
589 _state: _RPCState
591 def _is_complete(self) -> bool:
592 return self._state.code is not None
594 def cancelled(self) -> bool:
595 with self._state.condition:
596 return self._state.cancelled
598 def running(self) -> bool:
599 with self._state.condition:
600 return self._state.code is None
602 def done(self) -> bool:
603 with self._state.condition:
604 return self._state.code is not None
606 def result(self, timeout: Optional[float] = None) -> Any:
607 """Returns the result of the computation or raises its exception.
609 This method will never block. Instead, it will raise an exception
610 if calling this method would otherwise result in blocking.
612 Since this method will never block, any `timeout` argument passed will
613 be ignored.
614 """
615 del timeout
616 with self._state.condition:
617 if not self._is_complete():
618 error_msg = (
619 "_SingleThreadedRendezvous only supports "
620 "result() when the RPC is complete."
621 )
622 raise grpc.experimental.UsageError(error_msg)
623 if self._state.code is grpc.StatusCode.OK:
624 return self._state.response
625 elif self._state.cancelled:
626 raise grpc.FutureCancelledError()
627 else:
628 raise self
630 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
631 """Return the exception raised by the computation.
633 This method will never block. Instead, it will raise an exception
634 if calling this method would otherwise result in blocking.
636 Since this method will never block, any `timeout` argument passed will
637 be ignored.
638 """
639 del timeout
640 with self._state.condition:
641 if not self._is_complete():
642 error_msg = (
643 "_SingleThreadedRendezvous only supports "
644 "exception() when the RPC is complete."
645 )
646 raise grpc.experimental.UsageError(error_msg)
647 if self._state.code is grpc.StatusCode.OK:
648 return None
649 elif self._state.cancelled:
650 raise grpc.FutureCancelledError()
651 else:
652 return self
654 def traceback(
655 self, timeout: Optional[float] = None
656 ) -> Optional[types.TracebackType]:
657 """Access the traceback of the exception raised by the computation.
659 This method will never block. Instead, it will raise an exception
660 if calling this method would otherwise result in blocking.
662 Since this method will never block, any `timeout` argument passed will
663 be ignored.
664 """
665 del timeout
666 with self._state.condition:
667 if not self._is_complete():
668 msg = (
669 "_SingleThreadedRendezvous only supports "
670 "traceback() when the RPC is complete."
671 )
672 raise grpc.experimental.UsageError(msg)
673 if self._state.code is grpc.StatusCode.OK:
674 return None
675 elif self._state.cancelled:
676 raise grpc.FutureCancelledError()
677 else:
678 try:
679 raise self
680 except grpc.RpcError:
681 return sys.exc_info()[2]
683 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
684 with self._state.condition:
685 if self._state.code is None:
686 self._state.callbacks.append(functools.partial(fn, self))
687 return
689 fn(self)
691 def initial_metadata(self) -> Optional[MetadataType]:
692 """See grpc.Call.initial_metadata"""
693 with self._state.condition:
694 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
695 # to receive initial metadata before any messages.
696 while self._state.initial_metadata is None:
697 self._consume_next_event()
698 return self._state.initial_metadata
700 def trailing_metadata(self) -> Optional[MetadataType]:
701 """See grpc.Call.trailing_metadata"""
702 with self._state.condition:
703 if self._state.trailing_metadata is None:
704 error_msg = (
705 "Cannot get trailing metadata until RPC is completed."
706 )
707 raise grpc.experimental.UsageError(error_msg)
708 return self._state.trailing_metadata
710 def code(self) -> Optional[grpc.StatusCode]:
711 """See grpc.Call.code"""
712 with self._state.condition:
713 if self._state.code is None:
714 error_msg = "Cannot get code until RPC is completed."
715 raise grpc.experimental.UsageError(error_msg)
716 return self._state.code
718 def details(self) -> Optional[str]:
719 """See grpc.Call.details"""
720 with self._state.condition:
721 if self._state.details is None:
722 error_msg = "Cannot get details until RPC is completed."
723 raise grpc.experimental.UsageError(error_msg)
724 return _common.decode(self._state.details)
726 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
727 event = self._call.next_event()
728 with self._state.condition:
729 callbacks = _handle_event(
730 event, self._state, self._response_deserializer
731 )
732 for callback in callbacks:
733 # NOTE(gnossen): We intentionally allow exceptions to bubble up
734 # to the user when running on a single thread.
735 callback()
736 return event
738 def _next_response(self) -> Any:
739 while True:
740 self._consume_next_event()
741 with self._state.condition:
742 if self._state.response is not None:
743 response = self._state.response
744 self._state.response = None
745 return response
746 elif (
747 cygrpc.OperationType.receive_message not in self._state.due
748 ):
749 if self._state.code is grpc.StatusCode.OK:
750 raise StopIteration()
751 elif self._state.code is not None:
752 raise self
754 def _next(self) -> Any:
755 with self._state.condition:
756 if self._state.code is None:
757 # We tentatively add the operation as expected and remove
758 # it if the enqueue operation fails. This allows us to guarantee that
759 # if an event has been submitted to the core completion queue,
760 # it is in `due`. If we waited until after a successful
761 # enqueue operation then a signal could interrupt this
762 # thread between the enqueue operation and the addition of the
763 # operation to `due`. This would cause an exception on the
764 # channel spin thread when the operation completes and no
765 # corresponding operation would be present in state.due.
766 # Note that, since `condition` is held through this block, there is
767 # no data race on `due`.
768 self._state.due.add(cygrpc.OperationType.receive_message)
769 operating = self._call.operate(
770 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
771 )
772 if not operating:
773 self._state.due.remove(cygrpc.OperationType.receive_message)
774 elif self._state.code is grpc.StatusCode.OK:
775 raise StopIteration()
776 else:
777 raise self
778 return self._next_response()
780 def debug_error_string(self) -> Optional[str]:
781 with self._state.condition:
782 if self._state.debug_error_string is None:
783 error_msg = (
784 "Cannot get debug error string until RPC is completed."
785 )
786 raise grpc.experimental.UsageError(error_msg)
787 return _common.decode(self._state.debug_error_string)
790class _MultiThreadedRendezvous(
791 _Rendezvous, grpc.Call, grpc.Future
792): # pylint: disable=too-many-ancestors
793 """An RPC iterator that depends on a channel spin thread.
795 This iterator relies upon a per-channel thread running in the background,
796 dequeueing events from the completion queue, and notifying threads waiting
797 on the threading.Condition object in the _RPCState object.
799 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
800 and to mediate a bidirection streaming RPC.
801 """
803 _state: _RPCState
805 def initial_metadata(self) -> Optional[MetadataType]:
806 """See grpc.Call.initial_metadata"""
807 with self._state.condition:
809 def _done():
810 return self._state.initial_metadata is not None
812 _common.wait(self._state.condition.wait, _done)
813 return self._state.initial_metadata
815 def trailing_metadata(self) -> Optional[MetadataType]:
816 """See grpc.Call.trailing_metadata"""
817 with self._state.condition:
819 def _done():
820 return self._state.trailing_metadata is not None
822 _common.wait(self._state.condition.wait, _done)
823 return self._state.trailing_metadata
825 def code(self) -> Optional[grpc.StatusCode]:
826 """See grpc.Call.code"""
827 with self._state.condition:
829 def _done():
830 return self._state.code is not None
832 _common.wait(self._state.condition.wait, _done)
833 return self._state.code
835 def details(self) -> Optional[str]:
836 """See grpc.Call.details"""
837 with self._state.condition:
839 def _done():
840 return self._state.details is not None
842 _common.wait(self._state.condition.wait, _done)
843 return _common.decode(self._state.details)
845 def debug_error_string(self) -> Optional[str]:
846 with self._state.condition:
848 def _done():
849 return self._state.debug_error_string is not None
851 _common.wait(self._state.condition.wait, _done)
852 return _common.decode(self._state.debug_error_string)
854 def cancelled(self) -> bool:
855 with self._state.condition:
856 return self._state.cancelled
858 def running(self) -> bool:
859 with self._state.condition:
860 return self._state.code is None
862 def done(self) -> bool:
863 with self._state.condition:
864 return self._state.code is not None
866 def _is_complete(self) -> bool:
867 return self._state.code is not None
869 def result(self, timeout: Optional[float] = None) -> Any:
870 """Returns the result of the computation or raises its exception.
872 See grpc.Future.result for the full API contract.
873 """
874 with self._state.condition:
875 timed_out = _common.wait(
876 self._state.condition.wait, self._is_complete, timeout=timeout
877 )
878 if timed_out:
879 raise grpc.FutureTimeoutError()
880 else:
881 if self._state.code is grpc.StatusCode.OK:
882 return self._state.response
883 elif self._state.cancelled:
884 raise grpc.FutureCancelledError()
885 else:
886 raise self
888 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
889 """Return the exception raised by the computation.
891 See grpc.Future.exception for the full API contract.
892 """
893 with self._state.condition:
894 timed_out = _common.wait(
895 self._state.condition.wait, self._is_complete, timeout=timeout
896 )
897 if timed_out:
898 raise grpc.FutureTimeoutError()
899 else:
900 if self._state.code is grpc.StatusCode.OK:
901 return None
902 elif self._state.cancelled:
903 raise grpc.FutureCancelledError()
904 else:
905 return self
907 def traceback(
908 self, timeout: Optional[float] = None
909 ) -> Optional[types.TracebackType]:
910 """Access the traceback of the exception raised by the computation.
912 See grpc.future.traceback for the full API contract.
913 """
914 with self._state.condition:
915 timed_out = _common.wait(
916 self._state.condition.wait, self._is_complete, timeout=timeout
917 )
918 if timed_out:
919 raise grpc.FutureTimeoutError()
920 else:
921 if self._state.code is grpc.StatusCode.OK:
922 return None
923 elif self._state.cancelled:
924 raise grpc.FutureCancelledError()
925 else:
926 try:
927 raise self
928 except grpc.RpcError:
929 return sys.exc_info()[2]
931 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
932 with self._state.condition:
933 if self._state.code is None:
934 self._state.callbacks.append(functools.partial(fn, self))
935 return
937 fn(self)
939 def _next(self) -> Any:
940 with self._state.condition:
941 if self._state.code is None:
942 event_handler = _event_handler(
943 self._state, self._response_deserializer
944 )
945 self._state.due.add(cygrpc.OperationType.receive_message)
946 operating = self._call.operate(
947 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
948 event_handler,
949 )
950 if not operating:
951 self._state.due.remove(cygrpc.OperationType.receive_message)
952 elif self._state.code is grpc.StatusCode.OK:
953 raise StopIteration()
954 else:
955 raise self
957 def _response_ready():
958 return self._state.response is not None or (
959 cygrpc.OperationType.receive_message not in self._state.due
960 and self._state.code is not None
961 )
963 _common.wait(self._state.condition.wait, _response_ready)
964 if self._state.response is not None:
965 response = self._state.response
966 self._state.response = None
967 return response
968 elif cygrpc.OperationType.receive_message not in self._state.due:
969 if self._state.code is grpc.StatusCode.OK:
970 raise StopIteration()
971 elif self._state.code is not None:
972 raise self
975def _start_unary_request(
976 request: Any,
977 timeout: Optional[float],
978 request_serializer: SerializingFunction,
979) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
980 deadline = _deadline(timeout)
981 serialized_request = _common.serialize(request, request_serializer)
982 if serialized_request is None:
983 state = _RPCState(
984 (),
985 (),
986 (),
987 grpc.StatusCode.INTERNAL,
988 "Exception serializing request!",
989 )
990 error = _InactiveRpcError(state)
991 return deadline, None, error
992 else:
993 return deadline, serialized_request, None
996def _end_unary_response_blocking(
997 state: _RPCState,
998 call: cygrpc.SegregatedCall,
999 with_call: bool,
1000 deadline: Optional[float],
1001) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
1002 if state.code is grpc.StatusCode.OK:
1003 if with_call:
1004 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
1005 return state.response, rendezvous
1006 else:
1007 return state.response
1008 else:
1009 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
1012def _stream_unary_invocation_operations(
1013 metadata: Optional[MetadataType], initial_metadata_flags: int
1014) -> Sequence[Sequence[cygrpc.Operation]]:
1015 return (
1016 (
1017 cygrpc.SendInitialMetadataOperation(
1018 metadata, initial_metadata_flags
1019 ),
1020 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1021 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1022 ),
1023 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1024 )
1027def _stream_unary_invocation_operations_and_tags(
1028 metadata: Optional[MetadataType], initial_metadata_flags: int
1029) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1030 return tuple(
1031 (
1032 operations,
1033 None,
1034 )
1035 for operations in _stream_unary_invocation_operations(
1036 metadata, initial_metadata_flags
1037 )
1038 )
1041def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1042 parent_deadline = cygrpc.get_deadline_from_context()
1043 if parent_deadline is None and user_deadline is None:
1044 return None
1045 elif parent_deadline is not None and user_deadline is None:
1046 return parent_deadline
1047 elif user_deadline is not None and parent_deadline is None:
1048 return user_deadline
1049 else:
1050 return min(parent_deadline, user_deadline)
1053class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1054 _channel: cygrpc.Channel
1055 _managed_call: IntegratedCallFactory
1056 _method: bytes
1057 _target: bytes
1058 _request_serializer: Optional[SerializingFunction]
1059 _response_deserializer: Optional[DeserializingFunction]
1060 _context: Any
1061 _registered_call_handle: Optional[int]
1063 __slots__ = [
1064 "_channel",
1065 "_managed_call",
1066 "_method",
1067 "_target",
1068 "_request_serializer",
1069 "_response_deserializer",
1070 "_context",
1071 ]
1073 # pylint: disable=too-many-arguments
1074 def __init__(
1075 self,
1076 channel: cygrpc.Channel,
1077 managed_call: IntegratedCallFactory,
1078 method: bytes,
1079 target: bytes,
1080 request_serializer: Optional[SerializingFunction],
1081 response_deserializer: Optional[DeserializingFunction],
1082 _registered_call_handle: Optional[int],
1083 ):
1084 self._channel = channel
1085 self._managed_call = managed_call
1086 self._method = method
1087 self._target = target
1088 self._request_serializer = request_serializer
1089 self._response_deserializer = response_deserializer
1090 self._context = cygrpc.build_census_context()
1091 self._registered_call_handle = _registered_call_handle
1093 def _prepare(
1094 self,
1095 request: Any,
1096 timeout: Optional[float],
1097 metadata: Optional[MetadataType],
1098 wait_for_ready: Optional[bool],
1099 compression: Optional[grpc.Compression],
1100 ) -> Tuple[
1101 Optional[_RPCState],
1102 Optional[Sequence[cygrpc.Operation]],
1103 Optional[float],
1104 Optional[grpc.RpcError],
1105 ]:
1106 deadline, serialized_request, rendezvous = _start_unary_request(
1107 request, timeout, self._request_serializer
1108 )
1109 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1110 wait_for_ready
1111 )
1112 augmented_metadata = _compression.augment_metadata(
1113 metadata, compression
1114 )
1115 if serialized_request is None:
1116 return None, None, None, rendezvous
1117 else:
1118 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1119 operations = (
1120 cygrpc.SendInitialMetadataOperation(
1121 augmented_metadata, initial_metadata_flags
1122 ),
1123 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1124 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1125 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1126 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1127 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1128 )
1129 return state, operations, deadline, None
1131 def _blocking(
1132 self,
1133 request: Any,
1134 timeout: Optional[float] = None,
1135 metadata: Optional[MetadataType] = None,
1136 credentials: Optional[grpc.CallCredentials] = None,
1137 wait_for_ready: Optional[bool] = None,
1138 compression: Optional[grpc.Compression] = None,
1139 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1140 state, operations, deadline, rendezvous = self._prepare(
1141 request, timeout, metadata, wait_for_ready, compression
1142 )
1143 if state is None:
1144 raise rendezvous # pylint: disable-msg=raising-bad-type
1145 else:
1146 state.rpc_start_time = time.perf_counter()
1147 state.method = _common.decode(self._method)
1148 state.target = _common.decode(self._target)
1149 call = self._channel.segregated_call(
1150 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1151 self._method,
1152 None,
1153 _determine_deadline(deadline),
1154 metadata,
1155 None if credentials is None else credentials._credentials,
1156 (
1157 (
1158 operations,
1159 None,
1160 ),
1161 ),
1162 self._context,
1163 self._registered_call_handle,
1164 )
1165 event = call.next_event()
1166 _handle_event(event, state, self._response_deserializer)
1167 return state, call
1169 def __call__(
1170 self,
1171 request: Any,
1172 timeout: Optional[float] = None,
1173 metadata: Optional[MetadataType] = None,
1174 credentials: Optional[grpc.CallCredentials] = None,
1175 wait_for_ready: Optional[bool] = None,
1176 compression: Optional[grpc.Compression] = None,
1177 ) -> Any:
1178 state, call = self._blocking(
1179 request, timeout, metadata, credentials, wait_for_ready, compression
1180 )
1181 return _end_unary_response_blocking(state, call, False, None)
1183 def with_call(
1184 self,
1185 request: Any,
1186 timeout: Optional[float] = None,
1187 metadata: Optional[MetadataType] = None,
1188 credentials: Optional[grpc.CallCredentials] = None,
1189 wait_for_ready: Optional[bool] = None,
1190 compression: Optional[grpc.Compression] = None,
1191 ) -> Tuple[Any, grpc.Call]:
1192 state, call = self._blocking(
1193 request, timeout, metadata, credentials, wait_for_ready, compression
1194 )
1195 return _end_unary_response_blocking(state, call, True, None)
1197 def future(
1198 self,
1199 request: Any,
1200 timeout: Optional[float] = None,
1201 metadata: Optional[MetadataType] = None,
1202 credentials: Optional[grpc.CallCredentials] = None,
1203 wait_for_ready: Optional[bool] = None,
1204 compression: Optional[grpc.Compression] = None,
1205 ) -> _MultiThreadedRendezvous:
1206 state, operations, deadline, rendezvous = self._prepare(
1207 request, timeout, metadata, wait_for_ready, compression
1208 )
1209 if state is None:
1210 raise rendezvous # pylint: disable-msg=raising-bad-type
1211 else:
1212 event_handler = _event_handler(state, self._response_deserializer)
1213 state.rpc_start_time = time.perf_counter()
1214 state.method = _common.decode(self._method)
1215 state.target = _common.decode(self._target)
1216 call = self._managed_call(
1217 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1218 self._method,
1219 None,
1220 deadline,
1221 metadata,
1222 None if credentials is None else credentials._credentials,
1223 (operations,),
1224 event_handler,
1225 self._context,
1226 self._registered_call_handle,
1227 )
1228 return _MultiThreadedRendezvous(
1229 state, call, self._response_deserializer, deadline
1230 )
1233class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1234 _channel: cygrpc.Channel
1235 _method: bytes
1236 _target: bytes
1237 _request_serializer: Optional[SerializingFunction]
1238 _response_deserializer: Optional[DeserializingFunction]
1239 _context: Any
1240 _registered_call_handle: Optional[int]
1242 __slots__ = [
1243 "_channel",
1244 "_method",
1245 "_target",
1246 "_request_serializer",
1247 "_response_deserializer",
1248 "_context",
1249 ]
1251 # pylint: disable=too-many-arguments
1252 def __init__(
1253 self,
1254 channel: cygrpc.Channel,
1255 method: bytes,
1256 target: bytes,
1257 request_serializer: SerializingFunction,
1258 response_deserializer: DeserializingFunction,
1259 _registered_call_handle: Optional[int],
1260 ):
1261 self._channel = channel
1262 self._method = method
1263 self._target = target
1264 self._request_serializer = request_serializer
1265 self._response_deserializer = response_deserializer
1266 self._context = cygrpc.build_census_context()
1267 self._registered_call_handle = _registered_call_handle
1269 def __call__( # pylint: disable=too-many-locals
1270 self,
1271 request: Any,
1272 timeout: Optional[float] = None,
1273 metadata: Optional[MetadataType] = None,
1274 credentials: Optional[grpc.CallCredentials] = None,
1275 wait_for_ready: Optional[bool] = None,
1276 compression: Optional[grpc.Compression] = None,
1277 ) -> _SingleThreadedRendezvous:
1278 deadline = _deadline(timeout)
1279 serialized_request = _common.serialize(
1280 request, self._request_serializer
1281 )
1282 if serialized_request is None:
1283 state = _RPCState(
1284 (),
1285 (),
1286 (),
1287 grpc.StatusCode.INTERNAL,
1288 "Exception serializing request!",
1289 )
1290 raise _InactiveRpcError(state)
1292 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1293 call_credentials = (
1294 None if credentials is None else credentials._credentials
1295 )
1296 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1297 wait_for_ready
1298 )
1299 augmented_metadata = _compression.augment_metadata(
1300 metadata, compression
1301 )
1302 operations = (
1303 (
1304 cygrpc.SendInitialMetadataOperation(
1305 augmented_metadata, initial_metadata_flags
1306 ),
1307 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1308 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1309 ),
1310 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1311 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1312 )
1313 operations_and_tags = tuple((ops, None) for ops in operations)
1314 state.rpc_start_time = time.perf_counter()
1315 state.method = _common.decode(self._method)
1316 state.target = _common.decode(self._target)
1317 call = self._channel.segregated_call(
1318 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1319 self._method,
1320 None,
1321 _determine_deadline(deadline),
1322 metadata,
1323 call_credentials,
1324 operations_and_tags,
1325 self._context,
1326 self._registered_call_handle,
1327 )
1328 return _SingleThreadedRendezvous(
1329 state, call, self._response_deserializer, deadline
1330 )
1333class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1334 _channel: cygrpc.Channel
1335 _managed_call: IntegratedCallFactory
1336 _method: bytes
1337 _target: bytes
1338 _request_serializer: Optional[SerializingFunction]
1339 _response_deserializer: Optional[DeserializingFunction]
1340 _context: Any
1341 _registered_call_handle: Optional[int]
1343 __slots__ = [
1344 "_channel",
1345 "_managed_call",
1346 "_method",
1347 "_target",
1348 "_request_serializer",
1349 "_response_deserializer",
1350 "_context",
1351 ]
1353 # pylint: disable=too-many-arguments
1354 def __init__(
1355 self,
1356 channel: cygrpc.Channel,
1357 managed_call: IntegratedCallFactory,
1358 method: bytes,
1359 target: bytes,
1360 request_serializer: SerializingFunction,
1361 response_deserializer: DeserializingFunction,
1362 _registered_call_handle: Optional[int],
1363 ):
1364 self._channel = channel
1365 self._managed_call = managed_call
1366 self._method = method
1367 self._target = target
1368 self._request_serializer = request_serializer
1369 self._response_deserializer = response_deserializer
1370 self._context = cygrpc.build_census_context()
1371 self._registered_call_handle = _registered_call_handle
1373 def __call__( # pylint: disable=too-many-locals
1374 self,
1375 request: Any,
1376 timeout: Optional[float] = None,
1377 metadata: Optional[MetadataType] = None,
1378 credentials: Optional[grpc.CallCredentials] = None,
1379 wait_for_ready: Optional[bool] = None,
1380 compression: Optional[grpc.Compression] = None,
1381 ) -> _MultiThreadedRendezvous:
1382 deadline, serialized_request, rendezvous = _start_unary_request(
1383 request, timeout, self._request_serializer
1384 )
1385 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1386 wait_for_ready
1387 )
1388 if serialized_request is None:
1389 raise rendezvous # pylint: disable-msg=raising-bad-type
1390 else:
1391 augmented_metadata = _compression.augment_metadata(
1392 metadata, compression
1393 )
1394 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1395 operations = (
1396 (
1397 cygrpc.SendInitialMetadataOperation(
1398 augmented_metadata, initial_metadata_flags
1399 ),
1400 cygrpc.SendMessageOperation(
1401 serialized_request, _EMPTY_FLAGS
1402 ),
1403 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1404 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1405 ),
1406 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1407 )
1408 state.rpc_start_time = time.perf_counter()
1409 state.method = _common.decode(self._method)
1410 state.target = _common.decode(self._target)
1411 call = self._managed_call(
1412 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1413 self._method,
1414 None,
1415 _determine_deadline(deadline),
1416 metadata,
1417 None if credentials is None else credentials._credentials,
1418 operations,
1419 _event_handler(state, self._response_deserializer),
1420 self._context,
1421 self._registered_call_handle,
1422 )
1423 return _MultiThreadedRendezvous(
1424 state, call, self._response_deserializer, deadline
1425 )
1428class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1429 _channel: cygrpc.Channel
1430 _managed_call: IntegratedCallFactory
1431 _method: bytes
1432 _target: bytes
1433 _request_serializer: Optional[SerializingFunction]
1434 _response_deserializer: Optional[DeserializingFunction]
1435 _context: Any
1436 _registered_call_handle: Optional[int]
1438 __slots__ = [
1439 "_channel",
1440 "_managed_call",
1441 "_method",
1442 "_target",
1443 "_request_serializer",
1444 "_response_deserializer",
1445 "_context",
1446 ]
1448 # pylint: disable=too-many-arguments
1449 def __init__(
1450 self,
1451 channel: cygrpc.Channel,
1452 managed_call: IntegratedCallFactory,
1453 method: bytes,
1454 target: bytes,
1455 request_serializer: Optional[SerializingFunction],
1456 response_deserializer: Optional[DeserializingFunction],
1457 _registered_call_handle: Optional[int],
1458 ):
1459 self._channel = channel
1460 self._managed_call = managed_call
1461 self._method = method
1462 self._target = target
1463 self._request_serializer = request_serializer
1464 self._response_deserializer = response_deserializer
1465 self._context = cygrpc.build_census_context()
1466 self._registered_call_handle = _registered_call_handle
1468 def _blocking(
1469 self,
1470 request_iterator: Iterator,
1471 timeout: Optional[float],
1472 metadata: Optional[MetadataType],
1473 credentials: Optional[grpc.CallCredentials],
1474 wait_for_ready: Optional[bool],
1475 compression: Optional[grpc.Compression],
1476 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1477 deadline = _deadline(timeout)
1478 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1479 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1480 wait_for_ready
1481 )
1482 augmented_metadata = _compression.augment_metadata(
1483 metadata, compression
1484 )
1485 state.rpc_start_time = time.perf_counter()
1486 state.method = _common.decode(self._method)
1487 state.target = _common.decode(self._target)
1488 call = self._channel.segregated_call(
1489 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1490 self._method,
1491 None,
1492 _determine_deadline(deadline),
1493 augmented_metadata,
1494 None if credentials is None else credentials._credentials,
1495 _stream_unary_invocation_operations_and_tags(
1496 augmented_metadata, initial_metadata_flags
1497 ),
1498 self._context,
1499 self._registered_call_handle,
1500 )
1501 _consume_request_iterator(
1502 request_iterator, state, call, self._request_serializer, None
1503 )
1504 while True:
1505 event = call.next_event()
1506 with state.condition:
1507 _handle_event(event, state, self._response_deserializer)
1508 state.condition.notify_all()
1509 if not state.due:
1510 break
1511 return state, call
1513 def __call__(
1514 self,
1515 request_iterator: Iterator,
1516 timeout: Optional[float] = None,
1517 metadata: Optional[MetadataType] = None,
1518 credentials: Optional[grpc.CallCredentials] = None,
1519 wait_for_ready: Optional[bool] = None,
1520 compression: Optional[grpc.Compression] = None,
1521 ) -> Any:
1522 state, call = self._blocking(
1523 request_iterator,
1524 timeout,
1525 metadata,
1526 credentials,
1527 wait_for_ready,
1528 compression,
1529 )
1530 return _end_unary_response_blocking(state, call, False, None)
1532 def with_call(
1533 self,
1534 request_iterator: Iterator,
1535 timeout: Optional[float] = None,
1536 metadata: Optional[MetadataType] = None,
1537 credentials: Optional[grpc.CallCredentials] = None,
1538 wait_for_ready: Optional[bool] = None,
1539 compression: Optional[grpc.Compression] = None,
1540 ) -> Tuple[Any, grpc.Call]:
1541 state, call = self._blocking(
1542 request_iterator,
1543 timeout,
1544 metadata,
1545 credentials,
1546 wait_for_ready,
1547 compression,
1548 )
1549 return _end_unary_response_blocking(state, call, True, None)
1551 def future(
1552 self,
1553 request_iterator: Iterator,
1554 timeout: Optional[float] = None,
1555 metadata: Optional[MetadataType] = None,
1556 credentials: Optional[grpc.CallCredentials] = None,
1557 wait_for_ready: Optional[bool] = None,
1558 compression: Optional[grpc.Compression] = None,
1559 ) -> _MultiThreadedRendezvous:
1560 deadline = _deadline(timeout)
1561 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1562 event_handler = _event_handler(state, self._response_deserializer)
1563 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1564 wait_for_ready
1565 )
1566 augmented_metadata = _compression.augment_metadata(
1567 metadata, compression
1568 )
1569 state.rpc_start_time = time.perf_counter()
1570 state.method = _common.decode(self._method)
1571 state.target = _common.decode(self._target)
1572 call = self._managed_call(
1573 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1574 self._method,
1575 None,
1576 deadline,
1577 augmented_metadata,
1578 None if credentials is None else credentials._credentials,
1579 _stream_unary_invocation_operations(
1580 metadata, initial_metadata_flags
1581 ),
1582 event_handler,
1583 self._context,
1584 self._registered_call_handle,
1585 )
1586 _consume_request_iterator(
1587 request_iterator,
1588 state,
1589 call,
1590 self._request_serializer,
1591 event_handler,
1592 )
1593 return _MultiThreadedRendezvous(
1594 state, call, self._response_deserializer, deadline
1595 )
1598class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1599 _channel: cygrpc.Channel
1600 _managed_call: IntegratedCallFactory
1601 _method: bytes
1602 _target: bytes
1603 _request_serializer: Optional[SerializingFunction]
1604 _response_deserializer: Optional[DeserializingFunction]
1605 _context: Any
1606 _registered_call_handle: Optional[int]
1608 __slots__ = [
1609 "_channel",
1610 "_managed_call",
1611 "_method",
1612 "_target",
1613 "_request_serializer",
1614 "_response_deserializer",
1615 "_context",
1616 ]
1618 # pylint: disable=too-many-arguments
1619 def __init__(
1620 self,
1621 channel: cygrpc.Channel,
1622 managed_call: IntegratedCallFactory,
1623 method: bytes,
1624 target: bytes,
1625 request_serializer: Optional[SerializingFunction],
1626 response_deserializer: Optional[DeserializingFunction],
1627 _registered_call_handle: Optional[int],
1628 ):
1629 self._channel = channel
1630 self._managed_call = managed_call
1631 self._method = method
1632 self._target = target
1633 self._request_serializer = request_serializer
1634 self._response_deserializer = response_deserializer
1635 self._context = cygrpc.build_census_context()
1636 self._registered_call_handle = _registered_call_handle
1638 def __call__(
1639 self,
1640 request_iterator: Iterator,
1641 timeout: Optional[float] = None,
1642 metadata: Optional[MetadataType] = None,
1643 credentials: Optional[grpc.CallCredentials] = None,
1644 wait_for_ready: Optional[bool] = None,
1645 compression: Optional[grpc.Compression] = None,
1646 ) -> _MultiThreadedRendezvous:
1647 deadline = _deadline(timeout)
1648 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1649 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1650 wait_for_ready
1651 )
1652 augmented_metadata = _compression.augment_metadata(
1653 metadata, compression
1654 )
1655 operations = (
1656 (
1657 cygrpc.SendInitialMetadataOperation(
1658 augmented_metadata, initial_metadata_flags
1659 ),
1660 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1661 ),
1662 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1663 )
1664 event_handler = _event_handler(state, self._response_deserializer)
1665 state.rpc_start_time = time.perf_counter()
1666 state.method = _common.decode(self._method)
1667 state.target = _common.decode(self._target)
1668 call = self._managed_call(
1669 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1670 self._method,
1671 None,
1672 _determine_deadline(deadline),
1673 augmented_metadata,
1674 None if credentials is None else credentials._credentials,
1675 operations,
1676 event_handler,
1677 self._context,
1678 self._registered_call_handle,
1679 )
1680 _consume_request_iterator(
1681 request_iterator,
1682 state,
1683 call,
1684 self._request_serializer,
1685 event_handler,
1686 )
1687 return _MultiThreadedRendezvous(
1688 state, call, self._response_deserializer, deadline
1689 )
1692class _InitialMetadataFlags(int):
1693 """Stores immutable initial metadata flags"""
1695 def __new__(cls, value: int = _EMPTY_FLAGS):
1696 value &= cygrpc.InitialMetadataFlags.used_mask
1697 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1699 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1700 if wait_for_ready is not None:
1701 if wait_for_ready:
1702 return self.__class__(
1703 self
1704 | cygrpc.InitialMetadataFlags.wait_for_ready
1705 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1706 )
1707 elif not wait_for_ready:
1708 return self.__class__(
1709 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1710 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1711 )
1712 return self
1715class _ChannelCallState(object):
1716 channel: cygrpc.Channel
1717 managed_calls: int
1718 threading: bool
1720 def __init__(self, channel: cygrpc.Channel):
1721 self.lock = threading.Lock()
1722 self.channel = channel
1723 self.managed_calls = 0
1724 self.threading = False
1726 def reset_postfork_child(self) -> None:
1727 self.managed_calls = 0
1729 def __del__(self):
1730 try:
1731 self.channel.close(
1732 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1733 )
1734 except (TypeError, AttributeError):
1735 pass
1738def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1739 def channel_spin():
1740 while True:
1741 cygrpc.block_if_fork_in_progress(state)
1742 event = state.channel.next_call_event()
1743 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1744 continue
1745 call_completed = event.tag(event)
1746 if call_completed:
1747 with state.lock:
1748 state.managed_calls -= 1
1749 if state.managed_calls == 0:
1750 return
1752 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1753 channel_spin_thread.setDaemon(True)
1754 channel_spin_thread.start()
1757def _channel_managed_call_management(state: _ChannelCallState):
1758 # pylint: disable=too-many-arguments
1759 def create(
1760 flags: int,
1761 method: bytes,
1762 host: Optional[str],
1763 deadline: Optional[float],
1764 metadata: Optional[MetadataType],
1765 credentials: Optional[cygrpc.CallCredentials],
1766 operations: Sequence[Sequence[cygrpc.Operation]],
1767 event_handler: UserTag,
1768 context: Any,
1769 _registered_call_handle: Optional[int],
1770 ) -> cygrpc.IntegratedCall:
1771 """Creates a cygrpc.IntegratedCall.
1773 Args:
1774 flags: An integer bitfield of call flags.
1775 method: The RPC method.
1776 host: A host string for the created call.
1777 deadline: A float to be the deadline of the created call or None if
1778 the call is to have an infinite deadline.
1779 metadata: The metadata for the call or None.
1780 credentials: A cygrpc.CallCredentials or None.
1781 operations: A sequence of sequences of cygrpc.Operations to be
1782 started on the call.
1783 event_handler: A behavior to call to handle the events resultant from
1784 the operations on the call.
1785 context: Context object for distributed tracing.
1786 _registered_call_handle: An int representing the call handle of the
1787 method, or None if the method is not registered.
1789 Returns:
1790 A cygrpc.IntegratedCall with which to conduct an RPC.
1791 """
1792 operations_and_tags = tuple(
1793 (
1794 operation,
1795 event_handler,
1796 )
1797 for operation in operations
1798 )
1799 with state.lock:
1800 call = state.channel.integrated_call(
1801 flags,
1802 method,
1803 host,
1804 deadline,
1805 metadata,
1806 credentials,
1807 operations_and_tags,
1808 context,
1809 _registered_call_handle,
1810 )
1811 if state.managed_calls == 0:
1812 state.managed_calls = 1
1813 _run_channel_spin_thread(state)
1814 else:
1815 state.managed_calls += 1
1816 return call
1818 return create
1821class _ChannelConnectivityState(object):
1822 lock: threading.RLock
1823 channel: grpc.Channel
1824 polling: bool
1825 connectivity: grpc.ChannelConnectivity
1826 try_to_connect: bool
1827 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1828 callbacks_and_connectivities: List[
1829 Sequence[
1830 Union[
1831 Callable[[grpc.ChannelConnectivity], None],
1832 Optional[grpc.ChannelConnectivity],
1833 ]
1834 ]
1835 ]
1836 delivering: bool
1838 def __init__(self, channel: grpc.Channel):
1839 self.lock = threading.RLock()
1840 self.channel = channel
1841 self.polling = False
1842 self.connectivity = None
1843 self.try_to_connect = False
1844 self.callbacks_and_connectivities = []
1845 self.delivering = False
1847 def reset_postfork_child(self) -> None:
1848 self.polling = False
1849 self.connectivity = None
1850 self.try_to_connect = False
1851 self.callbacks_and_connectivities = []
1852 self.delivering = False
1855def _deliveries(
1856 state: _ChannelConnectivityState,
1857) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1858 callbacks_needing_update = []
1859 for callback_and_connectivity in state.callbacks_and_connectivities:
1860 callback, callback_connectivity = callback_and_connectivity
1861 if callback_connectivity is not state.connectivity:
1862 callbacks_needing_update.append(callback)
1863 callback_and_connectivity[1] = state.connectivity
1864 return callbacks_needing_update
1867def _deliver(
1868 state: _ChannelConnectivityState,
1869 initial_connectivity: grpc.ChannelConnectivity,
1870 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1871) -> None:
1872 connectivity = initial_connectivity
1873 callbacks = initial_callbacks
1874 while True:
1875 for callback in callbacks:
1876 cygrpc.block_if_fork_in_progress(state)
1877 try:
1878 callback(connectivity)
1879 except Exception: # pylint: disable=broad-except
1880 _LOGGER.exception(
1881 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1882 )
1883 with state.lock:
1884 callbacks = _deliveries(state)
1885 if callbacks:
1886 connectivity = state.connectivity
1887 else:
1888 state.delivering = False
1889 return
1892def _spawn_delivery(
1893 state: _ChannelConnectivityState,
1894 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1895) -> None:
1896 delivering_thread = cygrpc.ForkManagedThread(
1897 target=_deliver,
1898 args=(
1899 state,
1900 state.connectivity,
1901 callbacks,
1902 ),
1903 )
1904 delivering_thread.setDaemon(True)
1905 delivering_thread.start()
1906 state.delivering = True
1909# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1910def _poll_connectivity(
1911 state: _ChannelConnectivityState,
1912 channel: grpc.Channel,
1913 initial_try_to_connect: bool,
1914) -> None:
1915 try_to_connect = initial_try_to_connect
1916 connectivity = channel.check_connectivity_state(try_to_connect)
1917 with state.lock:
1918 state.connectivity = (
1919 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1920 connectivity
1921 ]
1922 )
1923 callbacks = tuple(
1924 callback for callback, _ in state.callbacks_and_connectivities
1925 )
1926 for callback_and_connectivity in state.callbacks_and_connectivities:
1927 callback_and_connectivity[1] = state.connectivity
1928 if callbacks:
1929 _spawn_delivery(state, callbacks)
1930 while True:
1931 event = channel.watch_connectivity_state(
1932 connectivity, time.time() + 0.2
1933 )
1934 cygrpc.block_if_fork_in_progress(state)
1935 with state.lock:
1936 if (
1937 not state.callbacks_and_connectivities
1938 and not state.try_to_connect
1939 ):
1940 state.polling = False
1941 state.connectivity = None
1942 break
1943 try_to_connect = state.try_to_connect
1944 state.try_to_connect = False
1945 if event.success or try_to_connect:
1946 connectivity = channel.check_connectivity_state(try_to_connect)
1947 with state.lock:
1948 state.connectivity = (
1949 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1950 connectivity
1951 ]
1952 )
1953 if not state.delivering:
1954 callbacks = _deliveries(state)
1955 if callbacks:
1956 _spawn_delivery(state, callbacks)
1959def _subscribe(
1960 state: _ChannelConnectivityState,
1961 callback: Callable[[grpc.ChannelConnectivity], None],
1962 try_to_connect: bool,
1963) -> None:
1964 with state.lock:
1965 if not state.callbacks_and_connectivities and not state.polling:
1966 polling_thread = cygrpc.ForkManagedThread(
1967 target=_poll_connectivity,
1968 args=(state, state.channel, bool(try_to_connect)),
1969 )
1970 polling_thread.setDaemon(True)
1971 polling_thread.start()
1972 state.polling = True
1973 state.callbacks_and_connectivities.append([callback, None])
1974 elif not state.delivering and state.connectivity is not None:
1975 _spawn_delivery(state, (callback,))
1976 state.try_to_connect |= bool(try_to_connect)
1977 state.callbacks_and_connectivities.append(
1978 [callback, state.connectivity]
1979 )
1980 else:
1981 state.try_to_connect |= bool(try_to_connect)
1982 state.callbacks_and_connectivities.append([callback, None])
1985def _unsubscribe(
1986 state: _ChannelConnectivityState,
1987 callback: Callable[[grpc.ChannelConnectivity], None],
1988) -> None:
1989 with state.lock:
1990 for index, (subscribed_callback, _unused_connectivity) in enumerate(
1991 state.callbacks_and_connectivities
1992 ):
1993 if callback == subscribed_callback:
1994 state.callbacks_and_connectivities.pop(index)
1995 break
1998def _augment_options(
1999 base_options: Sequence[ChannelArgumentType],
2000 compression: Optional[grpc.Compression],
2001) -> Sequence[ChannelArgumentType]:
2002 compression_option = _compression.create_channel_option(compression)
2003 return (
2004 tuple(base_options)
2005 + compression_option
2006 + (
2007 (
2008 cygrpc.ChannelArgKey.primary_user_agent_string,
2009 _USER_AGENT,
2010 ),
2011 )
2012 )
2015def _separate_channel_options(
2016 options: Sequence[ChannelArgumentType],
2017) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
2018 """Separates core channel options from Python channel options."""
2019 core_options = []
2020 python_options = []
2021 for pair in options:
2022 if (
2023 pair[0]
2024 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2025 ):
2026 python_options.append(pair)
2027 else:
2028 core_options.append(pair)
2029 return python_options, core_options
2032class Channel(grpc.Channel):
2033 """A cygrpc.Channel-backed implementation of grpc.Channel."""
2035 _single_threaded_unary_stream: bool
2036 _channel: cygrpc.Channel
2037 _call_state: _ChannelCallState
2038 _connectivity_state: _ChannelConnectivityState
2039 _target: str
2040 _registered_call_handles: Dict[str, int]
2042 def __init__(
2043 self,
2044 target: str,
2045 options: Sequence[ChannelArgumentType],
2046 credentials: Optional[grpc.ChannelCredentials],
2047 compression: Optional[grpc.Compression],
2048 ):
2049 """Constructor.
2051 Args:
2052 target: The target to which to connect.
2053 options: Configuration options for the channel.
2054 credentials: A cygrpc.ChannelCredentials or None.
2055 compression: An optional value indicating the compression method to be
2056 used over the lifetime of the channel.
2057 """
2058 python_options, core_options = _separate_channel_options(options)
2059 self._single_threaded_unary_stream = (
2060 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2061 )
2062 self._process_python_options(python_options)
2063 self._channel = cygrpc.Channel(
2064 _common.encode(target),
2065 _augment_options(core_options, compression),
2066 credentials,
2067 )
2068 self._target = target
2069 self._call_state = _ChannelCallState(self._channel)
2070 self._connectivity_state = _ChannelConnectivityState(self._channel)
2071 cygrpc.fork_register_channel(self)
2072 if cygrpc.g_gevent_activated:
2073 cygrpc.gevent_increment_channel_count()
2075 def _get_registered_call_handle(self, method: str) -> int:
2076 """
2077 Get the registered call handle for a method.
2079 This is a semi-private method. It is intended for use only by gRPC generated code.
2081 This method is not thread-safe.
2083 Args:
2084 method: Required, the method name for the RPC.
2086 Returns:
2087 The registered call handle pointer in the form of a Python Long.
2088 """
2089 return self._channel.get_registered_call_handle(_common.encode(method))
2091 def _process_python_options(
2092 self, python_options: Sequence[ChannelArgumentType]
2093 ) -> None:
2094 """Sets channel attributes according to python-only channel options."""
2095 for pair in python_options:
2096 if (
2097 pair[0]
2098 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2099 ):
2100 self._single_threaded_unary_stream = True
2102 def subscribe(
2103 self,
2104 callback: Callable[[grpc.ChannelConnectivity], None],
2105 try_to_connect: Optional[bool] = None,
2106 ) -> None:
2107 _subscribe(self._connectivity_state, callback, try_to_connect)
2109 def unsubscribe(
2110 self, callback: Callable[[grpc.ChannelConnectivity], None]
2111 ) -> None:
2112 _unsubscribe(self._connectivity_state, callback)
2114 # pylint: disable=arguments-differ
2115 def unary_unary(
2116 self,
2117 method: str,
2118 request_serializer: Optional[SerializingFunction] = None,
2119 response_deserializer: Optional[DeserializingFunction] = None,
2120 _registered_method: Optional[bool] = False,
2121 ) -> grpc.UnaryUnaryMultiCallable:
2122 _registered_call_handle = None
2123 if _registered_method:
2124 _registered_call_handle = self._get_registered_call_handle(method)
2125 return _UnaryUnaryMultiCallable(
2126 self._channel,
2127 _channel_managed_call_management(self._call_state),
2128 _common.encode(method),
2129 _common.encode(self._target),
2130 request_serializer,
2131 response_deserializer,
2132 _registered_call_handle,
2133 )
2135 # pylint: disable=arguments-differ
2136 def unary_stream(
2137 self,
2138 method: str,
2139 request_serializer: Optional[SerializingFunction] = None,
2140 response_deserializer: Optional[DeserializingFunction] = None,
2141 _registered_method: Optional[bool] = False,
2142 ) -> grpc.UnaryStreamMultiCallable:
2143 _registered_call_handle = None
2144 if _registered_method:
2145 _registered_call_handle = self._get_registered_call_handle(method)
2146 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2147 # on a single Python thread results in an appreciable speed-up. However,
2148 # due to slight differences in capability, the multi-threaded variant
2149 # remains the default.
2150 if self._single_threaded_unary_stream:
2151 return _SingleThreadedUnaryStreamMultiCallable(
2152 self._channel,
2153 _common.encode(method),
2154 _common.encode(self._target),
2155 request_serializer,
2156 response_deserializer,
2157 _registered_call_handle,
2158 )
2159 else:
2160 return _UnaryStreamMultiCallable(
2161 self._channel,
2162 _channel_managed_call_management(self._call_state),
2163 _common.encode(method),
2164 _common.encode(self._target),
2165 request_serializer,
2166 response_deserializer,
2167 _registered_call_handle,
2168 )
2170 # pylint: disable=arguments-differ
2171 def stream_unary(
2172 self,
2173 method: str,
2174 request_serializer: Optional[SerializingFunction] = None,
2175 response_deserializer: Optional[DeserializingFunction] = None,
2176 _registered_method: Optional[bool] = False,
2177 ) -> grpc.StreamUnaryMultiCallable:
2178 _registered_call_handle = None
2179 if _registered_method:
2180 _registered_call_handle = self._get_registered_call_handle(method)
2181 return _StreamUnaryMultiCallable(
2182 self._channel,
2183 _channel_managed_call_management(self._call_state),
2184 _common.encode(method),
2185 _common.encode(self._target),
2186 request_serializer,
2187 response_deserializer,
2188 _registered_call_handle,
2189 )
2191 # pylint: disable=arguments-differ
2192 def stream_stream(
2193 self,
2194 method: str,
2195 request_serializer: Optional[SerializingFunction] = None,
2196 response_deserializer: Optional[DeserializingFunction] = None,
2197 _registered_method: Optional[bool] = False,
2198 ) -> grpc.StreamStreamMultiCallable:
2199 _registered_call_handle = None
2200 if _registered_method:
2201 _registered_call_handle = self._get_registered_call_handle(method)
2202 return _StreamStreamMultiCallable(
2203 self._channel,
2204 _channel_managed_call_management(self._call_state),
2205 _common.encode(method),
2206 _common.encode(self._target),
2207 request_serializer,
2208 response_deserializer,
2209 _registered_call_handle,
2210 )
2212 def _unsubscribe_all(self) -> None:
2213 state = self._connectivity_state
2214 if state:
2215 with state.lock:
2216 del state.callbacks_and_connectivities[:]
2218 def _close(self) -> None:
2219 self._unsubscribe_all()
2220 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2221 cygrpc.fork_unregister_channel(self)
2222 if cygrpc.g_gevent_activated:
2223 cygrpc.gevent_decrement_channel_count()
2225 def _close_on_fork(self) -> None:
2226 self._unsubscribe_all()
2227 self._channel.close_on_fork(
2228 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2229 )
2231 def __enter__(self):
2232 return self
2234 def __exit__(self, exc_type, exc_val, exc_tb):
2235 self._close()
2236 return False
2238 def close(self) -> None:
2239 self._close()
2241 def __del__(self):
2242 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2243 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2244 # here (or more likely, call self._close() here). We don't do this today
2245 # because many valid use cases today allow the channel to be deleted
2246 # immediately after stubs are created. After a sufficient period of time
2247 # has passed for all users to be trusted to freeze out to their channels
2248 # for as long as they are in use and to close them after using them,
2249 # then deletion of this grpc._channel.Channel instance can be made to
2250 # effect closure of the underlying cygrpc.Channel instance.
2251 try:
2252 self._unsubscribe_all()
2253 except: # pylint: disable=bare-except # noqa: E722
2254 # Exceptions in __del__ are ignored by Python anyway, but they can
2255 # keep spamming logs. Just silence them.
2256 pass