Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Dict,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[float] # In relative seconds
134 rpc_end_time: Optional[float] # In relative seconds
135 method: Optional[str]
136 target: Optional[str]
138 def __init__(
139 self,
140 due: Sequence[cygrpc.OperationType],
141 initial_metadata: Optional[MetadataType],
142 trailing_metadata: Optional[MetadataType],
143 code: Optional[grpc.StatusCode],
144 details: Optional[str],
145 ):
146 # `condition` guards all members of _RPCState. `notify_all` is called on
147 # `condition` when the state of the RPC has changed.
148 self.condition = threading.Condition()
150 # The cygrpc.OperationType objects representing events due from the RPC's
151 # completion queue. If an operation is in `due`, it is guaranteed that
152 # `operate()` has been called on a corresponding operation. But the
153 # converse is not true. That is, in the case of failed `operate()`
154 # calls, there may briefly be events in `due` that do not correspond to
155 # operations submitted to Core.
156 self.due = set(due)
157 self.initial_metadata = initial_metadata
158 self.response = None
159 self.trailing_metadata = trailing_metadata
160 self.code = code
161 self.details = details
162 self.debug_error_string = None
163 # The following three fields are used for observability.
164 # Updates to those fields do not trigger self.condition.
165 self.rpc_start_time = None
166 self.rpc_end_time = None
167 self.method = None
168 self.target = None
170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171 # slightly wonky, so they have to be tracked separately from the rest of the
172 # result of the RPC. This field tracks whether cancellation was requested
173 # prior to termination of the RPC.
174 self.cancelled = False
175 self.callbacks = []
176 self.fork_epoch = cygrpc.get_fork_epoch()
178 def reset_postfork_child(self):
179 self.condition = threading.Condition()
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183 if state.code is None:
184 state.code = code
185 state.details = details
186 if state.initial_metadata is None:
187 state.initial_metadata = ()
188 state.trailing_metadata = ()
191def _handle_event(
192 event: cygrpc.BaseEvent,
193 state: _RPCState,
194 response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196 callbacks = []
197 for batch_operation in event.batch_operations:
198 operation_type = batch_operation.type()
199 state.due.remove(operation_type)
200 if operation_type == cygrpc.OperationType.receive_initial_metadata:
201 state.initial_metadata = batch_operation.initial_metadata()
202 elif operation_type == cygrpc.OperationType.receive_message:
203 serialized_response = batch_operation.message()
204 if serialized_response is not None:
205 response = _common.deserialize(
206 serialized_response, response_deserializer
207 )
208 if response is None:
209 details = "Exception deserializing response!"
210 _abort(state, grpc.StatusCode.INTERNAL, details)
211 else:
212 state.response = response
213 elif operation_type == cygrpc.OperationType.receive_status_on_client:
214 state.trailing_metadata = batch_operation.trailing_metadata()
215 if state.code is None:
216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217 batch_operation.code()
218 )
219 if code is None:
220 state.code = grpc.StatusCode.UNKNOWN
221 state.details = _unknown_code_details(
222 code, batch_operation.details()
223 )
224 else:
225 state.code = code
226 state.details = batch_operation.details()
227 state.debug_error_string = batch_operation.error_string()
228 state.rpc_end_time = time.perf_counter()
229 _observability.maybe_record_rpc_latency(state)
230 callbacks.extend(state.callbacks)
231 state.callbacks = None
232 return callbacks
235def _event_handler(
236 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238 def handle_event(event):
239 with state.condition:
240 callbacks = _handle_event(event, state, response_deserializer)
241 state.condition.notify_all()
242 done = not state.due
243 for callback in callbacks:
244 try:
245 callback()
246 except Exception as e: # pylint: disable=broad-except
247 # NOTE(rbellevi): We suppress but log errors here so as not to
248 # kill the channel spin thread.
249 logging.error(
250 "Exception in callback %s: %s", repr(callback.func), repr(e)
251 )
252 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
254 return handle_event
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260 request_iterator: Iterator,
261 state: _RPCState,
262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263 request_serializer: SerializingFunction,
264 event_handler: Optional[UserTag],
265) -> None:
266 """Consume a request supplied by the user."""
268 def consume_request_iterator(): # pylint: disable=too-many-branches
269 # Iterate over the request iterator until it is exhausted or an error
270 # condition is encountered.
271 while True:
272 return_from_user_request_generator_invoked = False
273 try:
274 # The thread may die in user-code. Do not block fork for this.
275 cygrpc.enter_user_request_generator()
276 request = next(request_iterator)
277 except StopIteration:
278 break
279 except Exception: # pylint: disable=broad-except
280 cygrpc.return_from_user_request_generator()
281 return_from_user_request_generator_invoked = True
282 code = grpc.StatusCode.UNKNOWN
283 details = "Exception iterating requests!"
284 _LOGGER.exception(details)
285 call.cancel(
286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287 )
288 _abort(state, code, details)
289 return
290 finally:
291 if not return_from_user_request_generator_invoked:
292 cygrpc.return_from_user_request_generator()
293 serialized_request = _common.serialize(request, request_serializer)
294 with state.condition:
295 if state.code is None and not state.cancelled:
296 if serialized_request is None:
297 code = grpc.StatusCode.INTERNAL
298 details = "Exception serializing request!"
299 call.cancel(
300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301 details,
302 )
303 _abort(state, code, details)
304 return
305 else:
306 state.due.add(cygrpc.OperationType.send_message)
307 operations = (
308 cygrpc.SendMessageOperation(
309 serialized_request, _EMPTY_FLAGS
310 ),
311 )
312 operating = call.operate(operations, event_handler)
313 if not operating:
314 state.due.remove(cygrpc.OperationType.send_message)
315 return
317 def _done():
318 return (
319 state.code is not None
320 or cygrpc.OperationType.send_message
321 not in state.due
322 )
324 _common.wait(
325 state.condition.wait,
326 _done,
327 spin_cb=functools.partial(
328 cygrpc.block_if_fork_in_progress, state
329 ),
330 )
331 if state.code is not None:
332 return
333 else:
334 return
335 with state.condition:
336 if state.code is None:
337 state.due.add(cygrpc.OperationType.send_close_from_client)
338 operations = (
339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
340 )
341 operating = call.operate(operations, event_handler)
342 if not operating:
343 state.due.remove(
344 cygrpc.OperationType.send_close_from_client
345 )
347 consumption_thread = cygrpc.ForkManagedThread(
348 target=consume_request_iterator
349 )
350 consumption_thread.setDaemon(True)
351 consumption_thread.start()
354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
355 """Calculates error string for RPC."""
356 with rpc_state.condition:
357 if rpc_state.code is None:
358 return "<{} object>".format(class_name)
359 elif rpc_state.code is grpc.StatusCode.OK:
360 return _OK_RENDEZVOUS_REPR_FORMAT.format(
361 class_name, rpc_state.code, rpc_state.details
362 )
363 else:
364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
365 class_name,
366 rpc_state.code,
367 rpc_state.details,
368 rpc_state.debug_error_string,
369 )
372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
373 """An RPC error not tied to the execution of a particular RPC.
375 The RPC represented by the state object must not be in-progress or
376 cancelled.
378 Attributes:
379 _state: An instance of _RPCState.
380 """
382 _state: _RPCState
384 def __init__(self, state: _RPCState):
385 with state.condition:
386 self._state = _RPCState(
387 (),
388 copy.deepcopy(state.initial_metadata),
389 copy.deepcopy(state.trailing_metadata),
390 state.code,
391 copy.deepcopy(state.details),
392 )
393 self._state.response = copy.copy(state.response)
394 self._state.debug_error_string = copy.copy(state.debug_error_string)
396 def initial_metadata(self) -> Optional[MetadataType]:
397 return self._state.initial_metadata
399 def trailing_metadata(self) -> Optional[MetadataType]:
400 return self._state.trailing_metadata
402 def code(self) -> Optional[grpc.StatusCode]:
403 return self._state.code
405 def details(self) -> Optional[str]:
406 return _common.decode(self._state.details)
408 def debug_error_string(self) -> Optional[str]:
409 return _common.decode(self._state.debug_error_string)
411 def _repr(self) -> str:
412 return _rpc_state_string(self.__class__.__name__, self._state)
414 def __repr__(self) -> str:
415 return self._repr()
417 def __str__(self) -> str:
418 return self._repr()
420 def cancel(self) -> bool:
421 """See grpc.Future.cancel."""
422 return False
424 def cancelled(self) -> bool:
425 """See grpc.Future.cancelled."""
426 return False
428 def running(self) -> bool:
429 """See grpc.Future.running."""
430 return False
432 def done(self) -> bool:
433 """See grpc.Future.done."""
434 return True
436 def result(
437 self, timeout: Optional[float] = None
438 ) -> Any: # pylint: disable=unused-argument
439 """See grpc.Future.result."""
440 raise self
442 def exception(
443 self, timeout: Optional[float] = None # pylint: disable=unused-argument
444 ) -> Optional[Exception]:
445 """See grpc.Future.exception."""
446 return self
448 def traceback(
449 self, timeout: Optional[float] = None # pylint: disable=unused-argument
450 ) -> Optional[types.TracebackType]:
451 """See grpc.Future.traceback."""
452 try:
453 raise self
454 except grpc.RpcError:
455 return sys.exc_info()[2]
457 def add_done_callback(
458 self,
459 fn: Callable[[grpc.Future], None],
460 timeout: Optional[float] = None, # pylint: disable=unused-argument
461 ) -> None:
462 """See grpc.Future.add_done_callback."""
463 fn(self)
466class _Rendezvous(grpc.RpcError, grpc.RpcContext):
467 """An RPC iterator.
469 Attributes:
470 _state: An instance of _RPCState.
471 _call: An instance of SegregatedCall or IntegratedCall.
472 In either case, the _call object is expected to have operate, cancel,
473 and next_event methods.
474 _response_deserializer: A callable taking bytes and return a Python
475 object.
476 _deadline: A float representing the deadline of the RPC in seconds. Or
477 possibly None, to represent an RPC with no deadline at all.
478 """
480 _state: _RPCState
481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
482 _response_deserializer: Optional[DeserializingFunction]
483 _deadline: Optional[float]
485 def __init__(
486 self,
487 state: _RPCState,
488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
489 response_deserializer: Optional[DeserializingFunction],
490 deadline: Optional[float],
491 ):
492 super(_Rendezvous, self).__init__()
493 self._state = state
494 self._call = call
495 self._response_deserializer = response_deserializer
496 self._deadline = deadline
498 def is_active(self) -> bool:
499 """See grpc.RpcContext.is_active"""
500 with self._state.condition:
501 return self._state.code is None
503 def time_remaining(self) -> Optional[float]:
504 """See grpc.RpcContext.time_remaining"""
505 with self._state.condition:
506 if self._deadline is None:
507 return None
508 else:
509 return max(self._deadline - time.time(), 0)
511 def cancel(self) -> bool:
512 """See grpc.RpcContext.cancel"""
513 with self._state.condition:
514 if self._state.code is None:
515 code = grpc.StatusCode.CANCELLED
516 details = "Locally cancelled by application!"
517 self._call.cancel(
518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
519 )
520 self._state.cancelled = True
521 _abort(self._state, code, details)
522 self._state.condition.notify_all()
523 return True
524 else:
525 return False
527 def add_callback(self, callback: NullaryCallbackType) -> bool:
528 """See grpc.RpcContext.add_callback"""
529 with self._state.condition:
530 if self._state.callbacks is None:
531 return False
532 else:
533 self._state.callbacks.append(callback)
534 return True
536 def __iter__(self):
537 return self
539 def next(self):
540 return self._next()
542 def __next__(self):
543 return self._next()
545 def _next(self):
546 raise NotImplementedError()
548 def debug_error_string(self) -> Optional[str]:
549 raise NotImplementedError()
551 def _repr(self) -> str:
552 return _rpc_state_string(self.__class__.__name__, self._state)
554 def __repr__(self) -> str:
555 return self._repr()
557 def __str__(self) -> str:
558 return self._repr()
560 def __del__(self) -> None:
561 with self._state.condition:
562 if self._state.code is None:
563 self._state.code = grpc.StatusCode.CANCELLED
564 self._state.details = "Cancelled upon garbage collection!"
565 self._state.cancelled = True
566 self._call.cancel(
567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
568 self._state.details,
569 )
570 self._state.condition.notify_all()
573class _SingleThreadedRendezvous(
574 _Rendezvous, grpc.Call, grpc.Future
575): # pylint: disable=too-many-ancestors
576 """An RPC iterator operating entirely on a single thread.
578 The __next__ method of _SingleThreadedRendezvous does not depend on the
579 existence of any other thread, including the "channel spin thread".
580 However, this means that its interface is entirely synchronous. So this
581 class cannot completely fulfill the grpc.Future interface. The result,
582 exception, and traceback methods will never block and will instead raise
583 an exception if calling the method would result in blocking.
585 This means that these methods are safe to call from add_done_callback
586 handlers.
587 """
589 _state: _RPCState
591 def _is_complete(self) -> bool:
592 return self._state.code is not None
594 def cancelled(self) -> bool:
595 with self._state.condition:
596 return self._state.cancelled
598 def running(self) -> bool:
599 with self._state.condition:
600 return self._state.code is None
602 def done(self) -> bool:
603 with self._state.condition:
604 return self._state.code is not None
606 def result(self, timeout: Optional[float] = None) -> Any:
607 """Returns the result of the computation or raises its exception.
609 This method will never block. Instead, it will raise an exception
610 if calling this method would otherwise result in blocking.
612 Since this method will never block, any `timeout` argument passed will
613 be ignored.
614 """
615 del timeout
616 with self._state.condition:
617 if not self._is_complete():
618 raise grpc.experimental.UsageError(
619 "_SingleThreadedRendezvous only supports result() when the"
620 " RPC is complete."
621 )
622 if self._state.code is grpc.StatusCode.OK:
623 return self._state.response
624 elif self._state.cancelled:
625 raise grpc.FutureCancelledError()
626 else:
627 raise self
629 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
630 """Return the exception raised by the computation.
632 This method will never block. Instead, it will raise an exception
633 if calling this method would otherwise result in blocking.
635 Since this method will never block, any `timeout` argument passed will
636 be ignored.
637 """
638 del timeout
639 with self._state.condition:
640 if not self._is_complete():
641 raise grpc.experimental.UsageError(
642 "_SingleThreadedRendezvous only supports exception() when"
643 " the RPC is complete."
644 )
645 if self._state.code is grpc.StatusCode.OK:
646 return None
647 elif self._state.cancelled:
648 raise grpc.FutureCancelledError()
649 else:
650 return self
652 def traceback(
653 self, timeout: Optional[float] = None
654 ) -> Optional[types.TracebackType]:
655 """Access the traceback of the exception raised by the computation.
657 This method will never block. Instead, it will raise an exception
658 if calling this method would otherwise result in blocking.
660 Since this method will never block, any `timeout` argument passed will
661 be ignored.
662 """
663 del timeout
664 with self._state.condition:
665 if not self._is_complete():
666 raise grpc.experimental.UsageError(
667 "_SingleThreadedRendezvous only supports traceback() when"
668 " the RPC is complete."
669 )
670 if self._state.code is grpc.StatusCode.OK:
671 return None
672 elif self._state.cancelled:
673 raise grpc.FutureCancelledError()
674 else:
675 try:
676 raise self
677 except grpc.RpcError:
678 return sys.exc_info()[2]
680 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
681 with self._state.condition:
682 if self._state.code is None:
683 self._state.callbacks.append(functools.partial(fn, self))
684 return
686 fn(self)
688 def initial_metadata(self) -> Optional[MetadataType]:
689 """See grpc.Call.initial_metadata"""
690 with self._state.condition:
691 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
692 # to receive initial metadata before any messages.
693 while self._state.initial_metadata is None:
694 self._consume_next_event()
695 return self._state.initial_metadata
697 def trailing_metadata(self) -> Optional[MetadataType]:
698 """See grpc.Call.trailing_metadata"""
699 with self._state.condition:
700 if self._state.trailing_metadata is None:
701 raise grpc.experimental.UsageError(
702 "Cannot get trailing metadata until RPC is completed."
703 )
704 return self._state.trailing_metadata
706 def code(self) -> Optional[grpc.StatusCode]:
707 """See grpc.Call.code"""
708 with self._state.condition:
709 if self._state.code is None:
710 raise grpc.experimental.UsageError(
711 "Cannot get code until RPC is completed."
712 )
713 return self._state.code
715 def details(self) -> Optional[str]:
716 """See grpc.Call.details"""
717 with self._state.condition:
718 if self._state.details is None:
719 raise grpc.experimental.UsageError(
720 "Cannot get details until RPC is completed."
721 )
722 return _common.decode(self._state.details)
724 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
725 event = self._call.next_event()
726 with self._state.condition:
727 callbacks = _handle_event(
728 event, self._state, self._response_deserializer
729 )
730 for callback in callbacks:
731 # NOTE(gnossen): We intentionally allow exceptions to bubble up
732 # to the user when running on a single thread.
733 callback()
734 return event
736 def _next_response(self) -> Any:
737 while True:
738 self._consume_next_event()
739 with self._state.condition:
740 if self._state.response is not None:
741 response = self._state.response
742 self._state.response = None
743 return response
744 elif (
745 cygrpc.OperationType.receive_message not in self._state.due
746 ):
747 if self._state.code is grpc.StatusCode.OK:
748 raise StopIteration()
749 elif self._state.code is not None:
750 raise self
752 def _next(self) -> Any:
753 with self._state.condition:
754 if self._state.code is None:
755 # We tentatively add the operation as expected and remove
756 # it if the enqueue operation fails. This allows us to guarantee that
757 # if an event has been submitted to the core completion queue,
758 # it is in `due`. If we waited until after a successful
759 # enqueue operation then a signal could interrupt this
760 # thread between the enqueue operation and the addition of the
761 # operation to `due`. This would cause an exception on the
762 # channel spin thread when the operation completes and no
763 # corresponding operation would be present in state.due.
764 # Note that, since `condition` is held through this block, there is
765 # no data race on `due`.
766 self._state.due.add(cygrpc.OperationType.receive_message)
767 operating = self._call.operate(
768 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
769 )
770 if not operating:
771 self._state.due.remove(cygrpc.OperationType.receive_message)
772 elif self._state.code is grpc.StatusCode.OK:
773 raise StopIteration()
774 else:
775 raise self
776 return self._next_response()
778 def debug_error_string(self) -> Optional[str]:
779 with self._state.condition:
780 if self._state.debug_error_string is None:
781 raise grpc.experimental.UsageError(
782 "Cannot get debug error string until RPC is completed."
783 )
784 return _common.decode(self._state.debug_error_string)
787class _MultiThreadedRendezvous(
788 _Rendezvous, grpc.Call, grpc.Future
789): # pylint: disable=too-many-ancestors
790 """An RPC iterator that depends on a channel spin thread.
792 This iterator relies upon a per-channel thread running in the background,
793 dequeueing events from the completion queue, and notifying threads waiting
794 on the threading.Condition object in the _RPCState object.
796 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
797 and to mediate a bidirection streaming RPC.
798 """
800 _state: _RPCState
802 def initial_metadata(self) -> Optional[MetadataType]:
803 """See grpc.Call.initial_metadata"""
804 with self._state.condition:
806 def _done():
807 return self._state.initial_metadata is not None
809 _common.wait(self._state.condition.wait, _done)
810 return self._state.initial_metadata
812 def trailing_metadata(self) -> Optional[MetadataType]:
813 """See grpc.Call.trailing_metadata"""
814 with self._state.condition:
816 def _done():
817 return self._state.trailing_metadata is not None
819 _common.wait(self._state.condition.wait, _done)
820 return self._state.trailing_metadata
822 def code(self) -> Optional[grpc.StatusCode]:
823 """See grpc.Call.code"""
824 with self._state.condition:
826 def _done():
827 return self._state.code is not None
829 _common.wait(self._state.condition.wait, _done)
830 return self._state.code
832 def details(self) -> Optional[str]:
833 """See grpc.Call.details"""
834 with self._state.condition:
836 def _done():
837 return self._state.details is not None
839 _common.wait(self._state.condition.wait, _done)
840 return _common.decode(self._state.details)
842 def debug_error_string(self) -> Optional[str]:
843 with self._state.condition:
845 def _done():
846 return self._state.debug_error_string is not None
848 _common.wait(self._state.condition.wait, _done)
849 return _common.decode(self._state.debug_error_string)
851 def cancelled(self) -> bool:
852 with self._state.condition:
853 return self._state.cancelled
855 def running(self) -> bool:
856 with self._state.condition:
857 return self._state.code is None
859 def done(self) -> bool:
860 with self._state.condition:
861 return self._state.code is not None
863 def _is_complete(self) -> bool:
864 return self._state.code is not None
866 def result(self, timeout: Optional[float] = None) -> Any:
867 """Returns the result of the computation or raises its exception.
869 See grpc.Future.result for the full API contract.
870 """
871 with self._state.condition:
872 timed_out = _common.wait(
873 self._state.condition.wait, self._is_complete, timeout=timeout
874 )
875 if timed_out:
876 raise grpc.FutureTimeoutError()
877 else:
878 if self._state.code is grpc.StatusCode.OK:
879 return self._state.response
880 elif self._state.cancelled:
881 raise grpc.FutureCancelledError()
882 else:
883 raise self
885 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
886 """Return the exception raised by the computation.
888 See grpc.Future.exception for the full API contract.
889 """
890 with self._state.condition:
891 timed_out = _common.wait(
892 self._state.condition.wait, self._is_complete, timeout=timeout
893 )
894 if timed_out:
895 raise grpc.FutureTimeoutError()
896 else:
897 if self._state.code is grpc.StatusCode.OK:
898 return None
899 elif self._state.cancelled:
900 raise grpc.FutureCancelledError()
901 else:
902 return self
904 def traceback(
905 self, timeout: Optional[float] = None
906 ) -> Optional[types.TracebackType]:
907 """Access the traceback of the exception raised by the computation.
909 See grpc.future.traceback for the full API contract.
910 """
911 with self._state.condition:
912 timed_out = _common.wait(
913 self._state.condition.wait, self._is_complete, timeout=timeout
914 )
915 if timed_out:
916 raise grpc.FutureTimeoutError()
917 else:
918 if self._state.code is grpc.StatusCode.OK:
919 return None
920 elif self._state.cancelled:
921 raise grpc.FutureCancelledError()
922 else:
923 try:
924 raise self
925 except grpc.RpcError:
926 return sys.exc_info()[2]
928 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
929 with self._state.condition:
930 if self._state.code is None:
931 self._state.callbacks.append(functools.partial(fn, self))
932 return
934 fn(self)
936 def _next(self) -> Any:
937 with self._state.condition:
938 if self._state.code is None:
939 event_handler = _event_handler(
940 self._state, self._response_deserializer
941 )
942 self._state.due.add(cygrpc.OperationType.receive_message)
943 operating = self._call.operate(
944 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
945 event_handler,
946 )
947 if not operating:
948 self._state.due.remove(cygrpc.OperationType.receive_message)
949 elif self._state.code is grpc.StatusCode.OK:
950 raise StopIteration()
951 else:
952 raise self
954 def _response_ready():
955 return self._state.response is not None or (
956 cygrpc.OperationType.receive_message not in self._state.due
957 and self._state.code is not None
958 )
960 _common.wait(self._state.condition.wait, _response_ready)
961 if self._state.response is not None:
962 response = self._state.response
963 self._state.response = None
964 return response
965 elif cygrpc.OperationType.receive_message not in self._state.due:
966 if self._state.code is grpc.StatusCode.OK:
967 raise StopIteration()
968 elif self._state.code is not None:
969 raise self
972def _start_unary_request(
973 request: Any,
974 timeout: Optional[float],
975 request_serializer: SerializingFunction,
976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
977 deadline = _deadline(timeout)
978 serialized_request = _common.serialize(request, request_serializer)
979 if serialized_request is None:
980 state = _RPCState(
981 (),
982 (),
983 (),
984 grpc.StatusCode.INTERNAL,
985 "Exception serializing request!",
986 )
987 error = _InactiveRpcError(state)
988 return deadline, None, error
989 else:
990 return deadline, serialized_request, None
993def _end_unary_response_blocking(
994 state: _RPCState,
995 call: cygrpc.SegregatedCall,
996 with_call: bool,
997 deadline: Optional[float],
998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
999 if state.code is grpc.StatusCode.OK:
1000 if with_call:
1001 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
1002 return state.response, rendezvous
1003 else:
1004 return state.response
1005 else:
1006 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
1009def _stream_unary_invocation_operations(
1010 metadata: Optional[MetadataType], initial_metadata_flags: int
1011) -> Sequence[Sequence[cygrpc.Operation]]:
1012 return (
1013 (
1014 cygrpc.SendInitialMetadataOperation(
1015 metadata, initial_metadata_flags
1016 ),
1017 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1018 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1019 ),
1020 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1021 )
1024def _stream_unary_invocation_operations_and_tags(
1025 metadata: Optional[MetadataType], initial_metadata_flags: int
1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1027 return tuple(
1028 (
1029 operations,
1030 None,
1031 )
1032 for operations in _stream_unary_invocation_operations(
1033 metadata, initial_metadata_flags
1034 )
1035 )
1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1039 parent_deadline = cygrpc.get_deadline_from_context()
1040 if parent_deadline is None and user_deadline is None:
1041 return None
1042 elif parent_deadline is not None and user_deadline is None:
1043 return parent_deadline
1044 elif user_deadline is not None and parent_deadline is None:
1045 return user_deadline
1046 else:
1047 return min(parent_deadline, user_deadline)
1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1051 _channel: cygrpc.Channel
1052 _managed_call: IntegratedCallFactory
1053 _method: bytes
1054 _target: bytes
1055 _request_serializer: Optional[SerializingFunction]
1056 _response_deserializer: Optional[DeserializingFunction]
1057 _context: Any
1058 _registered_call_handle: Optional[int]
1060 __slots__ = [
1061 "_channel",
1062 "_managed_call",
1063 "_method",
1064 "_target",
1065 "_request_serializer",
1066 "_response_deserializer",
1067 "_context",
1068 ]
1070 # pylint: disable=too-many-arguments
1071 def __init__(
1072 self,
1073 channel: cygrpc.Channel,
1074 managed_call: IntegratedCallFactory,
1075 method: bytes,
1076 target: bytes,
1077 request_serializer: Optional[SerializingFunction],
1078 response_deserializer: Optional[DeserializingFunction],
1079 _registered_call_handle: Optional[int],
1080 ):
1081 self._channel = channel
1082 self._managed_call = managed_call
1083 self._method = method
1084 self._target = target
1085 self._request_serializer = request_serializer
1086 self._response_deserializer = response_deserializer
1087 self._context = cygrpc.build_census_context()
1088 self._registered_call_handle = _registered_call_handle
1090 def _prepare(
1091 self,
1092 request: Any,
1093 timeout: Optional[float],
1094 metadata: Optional[MetadataType],
1095 wait_for_ready: Optional[bool],
1096 compression: Optional[grpc.Compression],
1097 ) -> Tuple[
1098 Optional[_RPCState],
1099 Optional[Sequence[cygrpc.Operation]],
1100 Optional[float],
1101 Optional[grpc.RpcError],
1102 ]:
1103 deadline, serialized_request, rendezvous = _start_unary_request(
1104 request, timeout, self._request_serializer
1105 )
1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1107 wait_for_ready
1108 )
1109 augmented_metadata = _compression.augment_metadata(
1110 metadata, compression
1111 )
1112 if serialized_request is None:
1113 return None, None, None, rendezvous
1114 else:
1115 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1116 operations = (
1117 cygrpc.SendInitialMetadataOperation(
1118 augmented_metadata, initial_metadata_flags
1119 ),
1120 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1121 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1122 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1123 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1124 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1125 )
1126 return state, operations, deadline, None
1128 def _blocking(
1129 self,
1130 request: Any,
1131 timeout: Optional[float] = None,
1132 metadata: Optional[MetadataType] = None,
1133 credentials: Optional[grpc.CallCredentials] = None,
1134 wait_for_ready: Optional[bool] = None,
1135 compression: Optional[grpc.Compression] = None,
1136 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1137 state, operations, deadline, rendezvous = self._prepare(
1138 request, timeout, metadata, wait_for_ready, compression
1139 )
1140 if state is None:
1141 raise rendezvous # pylint: disable-msg=raising-bad-type
1142 else:
1143 state.rpc_start_time = time.perf_counter()
1144 state.method = _common.decode(self._method)
1145 state.target = _common.decode(self._target)
1146 call = self._channel.segregated_call(
1147 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1148 self._method,
1149 None,
1150 _determine_deadline(deadline),
1151 metadata,
1152 None if credentials is None else credentials._credentials,
1153 (
1154 (
1155 operations,
1156 None,
1157 ),
1158 ),
1159 self._context,
1160 self._registered_call_handle,
1161 )
1162 event = call.next_event()
1163 _handle_event(event, state, self._response_deserializer)
1164 return state, call
1166 def __call__(
1167 self,
1168 request: Any,
1169 timeout: Optional[float] = None,
1170 metadata: Optional[MetadataType] = None,
1171 credentials: Optional[grpc.CallCredentials] = None,
1172 wait_for_ready: Optional[bool] = None,
1173 compression: Optional[grpc.Compression] = None,
1174 ) -> Any:
1175 (
1176 state,
1177 call,
1178 ) = self._blocking(
1179 request, timeout, metadata, credentials, wait_for_ready, compression
1180 )
1181 return _end_unary_response_blocking(state, call, False, None)
1183 def with_call(
1184 self,
1185 request: Any,
1186 timeout: Optional[float] = None,
1187 metadata: Optional[MetadataType] = None,
1188 credentials: Optional[grpc.CallCredentials] = None,
1189 wait_for_ready: Optional[bool] = None,
1190 compression: Optional[grpc.Compression] = None,
1191 ) -> Tuple[Any, grpc.Call]:
1192 (
1193 state,
1194 call,
1195 ) = self._blocking(
1196 request, timeout, metadata, credentials, wait_for_ready, compression
1197 )
1198 return _end_unary_response_blocking(state, call, True, None)
1200 def future(
1201 self,
1202 request: Any,
1203 timeout: Optional[float] = None,
1204 metadata: Optional[MetadataType] = None,
1205 credentials: Optional[grpc.CallCredentials] = None,
1206 wait_for_ready: Optional[bool] = None,
1207 compression: Optional[grpc.Compression] = None,
1208 ) -> _MultiThreadedRendezvous:
1209 state, operations, deadline, rendezvous = self._prepare(
1210 request, timeout, metadata, wait_for_ready, compression
1211 )
1212 if state is None:
1213 raise rendezvous # pylint: disable-msg=raising-bad-type
1214 else:
1215 event_handler = _event_handler(state, self._response_deserializer)
1216 state.rpc_start_time = time.perf_counter()
1217 state.method = _common.decode(self._method)
1218 state.target = _common.decode(self._target)
1219 call = self._managed_call(
1220 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1221 self._method,
1222 None,
1223 deadline,
1224 metadata,
1225 None if credentials is None else credentials._credentials,
1226 (operations,),
1227 event_handler,
1228 self._context,
1229 self._registered_call_handle,
1230 )
1231 return _MultiThreadedRendezvous(
1232 state, call, self._response_deserializer, deadline
1233 )
1236class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1237 _channel: cygrpc.Channel
1238 _method: bytes
1239 _target: bytes
1240 _request_serializer: Optional[SerializingFunction]
1241 _response_deserializer: Optional[DeserializingFunction]
1242 _context: Any
1243 _registered_call_handle: Optional[int]
1245 __slots__ = [
1246 "_channel",
1247 "_method",
1248 "_target",
1249 "_request_serializer",
1250 "_response_deserializer",
1251 "_context",
1252 ]
1254 # pylint: disable=too-many-arguments
1255 def __init__(
1256 self,
1257 channel: cygrpc.Channel,
1258 method: bytes,
1259 target: bytes,
1260 request_serializer: SerializingFunction,
1261 response_deserializer: DeserializingFunction,
1262 _registered_call_handle: Optional[int],
1263 ):
1264 self._channel = channel
1265 self._method = method
1266 self._target = target
1267 self._request_serializer = request_serializer
1268 self._response_deserializer = response_deserializer
1269 self._context = cygrpc.build_census_context()
1270 self._registered_call_handle = _registered_call_handle
1272 def __call__( # pylint: disable=too-many-locals
1273 self,
1274 request: Any,
1275 timeout: Optional[float] = None,
1276 metadata: Optional[MetadataType] = None,
1277 credentials: Optional[grpc.CallCredentials] = None,
1278 wait_for_ready: Optional[bool] = None,
1279 compression: Optional[grpc.Compression] = None,
1280 ) -> _SingleThreadedRendezvous:
1281 deadline = _deadline(timeout)
1282 serialized_request = _common.serialize(
1283 request, self._request_serializer
1284 )
1285 if serialized_request is None:
1286 state = _RPCState(
1287 (),
1288 (),
1289 (),
1290 grpc.StatusCode.INTERNAL,
1291 "Exception serializing request!",
1292 )
1293 raise _InactiveRpcError(state)
1295 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1296 call_credentials = (
1297 None if credentials is None else credentials._credentials
1298 )
1299 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1300 wait_for_ready
1301 )
1302 augmented_metadata = _compression.augment_metadata(
1303 metadata, compression
1304 )
1305 operations = (
1306 (
1307 cygrpc.SendInitialMetadataOperation(
1308 augmented_metadata, initial_metadata_flags
1309 ),
1310 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1311 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1312 ),
1313 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1314 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1315 )
1316 operations_and_tags = tuple((ops, None) for ops in operations)
1317 state.rpc_start_time = time.perf_counter()
1318 state.method = _common.decode(self._method)
1319 state.target = _common.decode(self._target)
1320 call = self._channel.segregated_call(
1321 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1322 self._method,
1323 None,
1324 _determine_deadline(deadline),
1325 metadata,
1326 call_credentials,
1327 operations_and_tags,
1328 self._context,
1329 self._registered_call_handle,
1330 )
1331 return _SingleThreadedRendezvous(
1332 state, call, self._response_deserializer, deadline
1333 )
1336class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1337 _channel: cygrpc.Channel
1338 _managed_call: IntegratedCallFactory
1339 _method: bytes
1340 _target: bytes
1341 _request_serializer: Optional[SerializingFunction]
1342 _response_deserializer: Optional[DeserializingFunction]
1343 _context: Any
1344 _registered_call_handle: Optional[int]
1346 __slots__ = [
1347 "_channel",
1348 "_managed_call",
1349 "_method",
1350 "_target",
1351 "_request_serializer",
1352 "_response_deserializer",
1353 "_context",
1354 ]
1356 # pylint: disable=too-many-arguments
1357 def __init__(
1358 self,
1359 channel: cygrpc.Channel,
1360 managed_call: IntegratedCallFactory,
1361 method: bytes,
1362 target: bytes,
1363 request_serializer: SerializingFunction,
1364 response_deserializer: DeserializingFunction,
1365 _registered_call_handle: Optional[int],
1366 ):
1367 self._channel = channel
1368 self._managed_call = managed_call
1369 self._method = method
1370 self._target = target
1371 self._request_serializer = request_serializer
1372 self._response_deserializer = response_deserializer
1373 self._context = cygrpc.build_census_context()
1374 self._registered_call_handle = _registered_call_handle
1376 def __call__( # pylint: disable=too-many-locals
1377 self,
1378 request: Any,
1379 timeout: Optional[float] = None,
1380 metadata: Optional[MetadataType] = None,
1381 credentials: Optional[grpc.CallCredentials] = None,
1382 wait_for_ready: Optional[bool] = None,
1383 compression: Optional[grpc.Compression] = None,
1384 ) -> _MultiThreadedRendezvous:
1385 deadline, serialized_request, rendezvous = _start_unary_request(
1386 request, timeout, self._request_serializer
1387 )
1388 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1389 wait_for_ready
1390 )
1391 if serialized_request is None:
1392 raise rendezvous # pylint: disable-msg=raising-bad-type
1393 else:
1394 augmented_metadata = _compression.augment_metadata(
1395 metadata, compression
1396 )
1397 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1398 operations = (
1399 (
1400 cygrpc.SendInitialMetadataOperation(
1401 augmented_metadata, initial_metadata_flags
1402 ),
1403 cygrpc.SendMessageOperation(
1404 serialized_request, _EMPTY_FLAGS
1405 ),
1406 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1407 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1408 ),
1409 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1410 )
1411 state.rpc_start_time = time.perf_counter()
1412 state.method = _common.decode(self._method)
1413 state.target = _common.decode(self._target)
1414 call = self._managed_call(
1415 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1416 self._method,
1417 None,
1418 _determine_deadline(deadline),
1419 metadata,
1420 None if credentials is None else credentials._credentials,
1421 operations,
1422 _event_handler(state, self._response_deserializer),
1423 self._context,
1424 self._registered_call_handle,
1425 )
1426 return _MultiThreadedRendezvous(
1427 state, call, self._response_deserializer, deadline
1428 )
1431class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1432 _channel: cygrpc.Channel
1433 _managed_call: IntegratedCallFactory
1434 _method: bytes
1435 _target: bytes
1436 _request_serializer: Optional[SerializingFunction]
1437 _response_deserializer: Optional[DeserializingFunction]
1438 _context: Any
1439 _registered_call_handle: Optional[int]
1441 __slots__ = [
1442 "_channel",
1443 "_managed_call",
1444 "_method",
1445 "_target",
1446 "_request_serializer",
1447 "_response_deserializer",
1448 "_context",
1449 ]
1451 # pylint: disable=too-many-arguments
1452 def __init__(
1453 self,
1454 channel: cygrpc.Channel,
1455 managed_call: IntegratedCallFactory,
1456 method: bytes,
1457 target: bytes,
1458 request_serializer: Optional[SerializingFunction],
1459 response_deserializer: Optional[DeserializingFunction],
1460 _registered_call_handle: Optional[int],
1461 ):
1462 self._channel = channel
1463 self._managed_call = managed_call
1464 self._method = method
1465 self._target = target
1466 self._request_serializer = request_serializer
1467 self._response_deserializer = response_deserializer
1468 self._context = cygrpc.build_census_context()
1469 self._registered_call_handle = _registered_call_handle
1471 def _blocking(
1472 self,
1473 request_iterator: Iterator,
1474 timeout: Optional[float],
1475 metadata: Optional[MetadataType],
1476 credentials: Optional[grpc.CallCredentials],
1477 wait_for_ready: Optional[bool],
1478 compression: Optional[grpc.Compression],
1479 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1480 deadline = _deadline(timeout)
1481 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1482 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1483 wait_for_ready
1484 )
1485 augmented_metadata = _compression.augment_metadata(
1486 metadata, compression
1487 )
1488 state.rpc_start_time = time.perf_counter()
1489 state.method = _common.decode(self._method)
1490 state.target = _common.decode(self._target)
1491 call = self._channel.segregated_call(
1492 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1493 self._method,
1494 None,
1495 _determine_deadline(deadline),
1496 augmented_metadata,
1497 None if credentials is None else credentials._credentials,
1498 _stream_unary_invocation_operations_and_tags(
1499 augmented_metadata, initial_metadata_flags
1500 ),
1501 self._context,
1502 self._registered_call_handle,
1503 )
1504 _consume_request_iterator(
1505 request_iterator, state, call, self._request_serializer, None
1506 )
1507 while True:
1508 event = call.next_event()
1509 with state.condition:
1510 _handle_event(event, state, self._response_deserializer)
1511 state.condition.notify_all()
1512 if not state.due:
1513 break
1514 return state, call
1516 def __call__(
1517 self,
1518 request_iterator: Iterator,
1519 timeout: Optional[float] = None,
1520 metadata: Optional[MetadataType] = None,
1521 credentials: Optional[grpc.CallCredentials] = None,
1522 wait_for_ready: Optional[bool] = None,
1523 compression: Optional[grpc.Compression] = None,
1524 ) -> Any:
1525 (
1526 state,
1527 call,
1528 ) = self._blocking(
1529 request_iterator,
1530 timeout,
1531 metadata,
1532 credentials,
1533 wait_for_ready,
1534 compression,
1535 )
1536 return _end_unary_response_blocking(state, call, False, None)
1538 def with_call(
1539 self,
1540 request_iterator: Iterator,
1541 timeout: Optional[float] = None,
1542 metadata: Optional[MetadataType] = None,
1543 credentials: Optional[grpc.CallCredentials] = None,
1544 wait_for_ready: Optional[bool] = None,
1545 compression: Optional[grpc.Compression] = None,
1546 ) -> Tuple[Any, grpc.Call]:
1547 (
1548 state,
1549 call,
1550 ) = self._blocking(
1551 request_iterator,
1552 timeout,
1553 metadata,
1554 credentials,
1555 wait_for_ready,
1556 compression,
1557 )
1558 return _end_unary_response_blocking(state, call, True, None)
1560 def future(
1561 self,
1562 request_iterator: Iterator,
1563 timeout: Optional[float] = None,
1564 metadata: Optional[MetadataType] = None,
1565 credentials: Optional[grpc.CallCredentials] = None,
1566 wait_for_ready: Optional[bool] = None,
1567 compression: Optional[grpc.Compression] = None,
1568 ) -> _MultiThreadedRendezvous:
1569 deadline = _deadline(timeout)
1570 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1571 event_handler = _event_handler(state, self._response_deserializer)
1572 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1573 wait_for_ready
1574 )
1575 augmented_metadata = _compression.augment_metadata(
1576 metadata, compression
1577 )
1578 state.rpc_start_time = time.perf_counter()
1579 state.method = _common.decode(self._method)
1580 state.target = _common.decode(self._target)
1581 call = self._managed_call(
1582 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1583 self._method,
1584 None,
1585 deadline,
1586 augmented_metadata,
1587 None if credentials is None else credentials._credentials,
1588 _stream_unary_invocation_operations(
1589 metadata, initial_metadata_flags
1590 ),
1591 event_handler,
1592 self._context,
1593 self._registered_call_handle,
1594 )
1595 _consume_request_iterator(
1596 request_iterator,
1597 state,
1598 call,
1599 self._request_serializer,
1600 event_handler,
1601 )
1602 return _MultiThreadedRendezvous(
1603 state, call, self._response_deserializer, deadline
1604 )
1607class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1608 _channel: cygrpc.Channel
1609 _managed_call: IntegratedCallFactory
1610 _method: bytes
1611 _target: bytes
1612 _request_serializer: Optional[SerializingFunction]
1613 _response_deserializer: Optional[DeserializingFunction]
1614 _context: Any
1615 _registered_call_handle: Optional[int]
1617 __slots__ = [
1618 "_channel",
1619 "_managed_call",
1620 "_method",
1621 "_target",
1622 "_request_serializer",
1623 "_response_deserializer",
1624 "_context",
1625 ]
1627 # pylint: disable=too-many-arguments
1628 def __init__(
1629 self,
1630 channel: cygrpc.Channel,
1631 managed_call: IntegratedCallFactory,
1632 method: bytes,
1633 target: bytes,
1634 request_serializer: Optional[SerializingFunction],
1635 response_deserializer: Optional[DeserializingFunction],
1636 _registered_call_handle: Optional[int],
1637 ):
1638 self._channel = channel
1639 self._managed_call = managed_call
1640 self._method = method
1641 self._target = target
1642 self._request_serializer = request_serializer
1643 self._response_deserializer = response_deserializer
1644 self._context = cygrpc.build_census_context()
1645 self._registered_call_handle = _registered_call_handle
1647 def __call__(
1648 self,
1649 request_iterator: Iterator,
1650 timeout: Optional[float] = None,
1651 metadata: Optional[MetadataType] = None,
1652 credentials: Optional[grpc.CallCredentials] = None,
1653 wait_for_ready: Optional[bool] = None,
1654 compression: Optional[grpc.Compression] = None,
1655 ) -> _MultiThreadedRendezvous:
1656 deadline = _deadline(timeout)
1657 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1658 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1659 wait_for_ready
1660 )
1661 augmented_metadata = _compression.augment_metadata(
1662 metadata, compression
1663 )
1664 operations = (
1665 (
1666 cygrpc.SendInitialMetadataOperation(
1667 augmented_metadata, initial_metadata_flags
1668 ),
1669 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1670 ),
1671 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1672 )
1673 event_handler = _event_handler(state, self._response_deserializer)
1674 state.rpc_start_time = time.perf_counter()
1675 state.method = _common.decode(self._method)
1676 state.target = _common.decode(self._target)
1677 call = self._managed_call(
1678 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1679 self._method,
1680 None,
1681 _determine_deadline(deadline),
1682 augmented_metadata,
1683 None if credentials is None else credentials._credentials,
1684 operations,
1685 event_handler,
1686 self._context,
1687 self._registered_call_handle,
1688 )
1689 _consume_request_iterator(
1690 request_iterator,
1691 state,
1692 call,
1693 self._request_serializer,
1694 event_handler,
1695 )
1696 return _MultiThreadedRendezvous(
1697 state, call, self._response_deserializer, deadline
1698 )
1701class _InitialMetadataFlags(int):
1702 """Stores immutable initial metadata flags"""
1704 def __new__(cls, value: int = _EMPTY_FLAGS):
1705 value &= cygrpc.InitialMetadataFlags.used_mask
1706 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1708 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1709 if wait_for_ready is not None:
1710 if wait_for_ready:
1711 return self.__class__(
1712 self
1713 | cygrpc.InitialMetadataFlags.wait_for_ready
1714 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1715 )
1716 elif not wait_for_ready:
1717 return self.__class__(
1718 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1719 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1720 )
1721 return self
1724class _ChannelCallState(object):
1725 channel: cygrpc.Channel
1726 managed_calls: int
1727 threading: bool
1729 def __init__(self, channel: cygrpc.Channel):
1730 self.lock = threading.Lock()
1731 self.channel = channel
1732 self.managed_calls = 0
1733 self.threading = False
1735 def reset_postfork_child(self) -> None:
1736 self.managed_calls = 0
1738 def __del__(self):
1739 try:
1740 self.channel.close(
1741 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1742 )
1743 except (TypeError, AttributeError):
1744 pass
1747def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1748 def channel_spin():
1749 while True:
1750 cygrpc.block_if_fork_in_progress(state)
1751 event = state.channel.next_call_event()
1752 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1753 continue
1754 call_completed = event.tag(event)
1755 if call_completed:
1756 with state.lock:
1757 state.managed_calls -= 1
1758 if state.managed_calls == 0:
1759 return
1761 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1762 channel_spin_thread.setDaemon(True)
1763 channel_spin_thread.start()
1766def _channel_managed_call_management(state: _ChannelCallState):
1767 # pylint: disable=too-many-arguments
1768 def create(
1769 flags: int,
1770 method: bytes,
1771 host: Optional[str],
1772 deadline: Optional[float],
1773 metadata: Optional[MetadataType],
1774 credentials: Optional[cygrpc.CallCredentials],
1775 operations: Sequence[Sequence[cygrpc.Operation]],
1776 event_handler: UserTag,
1777 context: Any,
1778 _registered_call_handle: Optional[int],
1779 ) -> cygrpc.IntegratedCall:
1780 """Creates a cygrpc.IntegratedCall.
1782 Args:
1783 flags: An integer bitfield of call flags.
1784 method: The RPC method.
1785 host: A host string for the created call.
1786 deadline: A float to be the deadline of the created call or None if
1787 the call is to have an infinite deadline.
1788 metadata: The metadata for the call or None.
1789 credentials: A cygrpc.CallCredentials or None.
1790 operations: A sequence of sequences of cygrpc.Operations to be
1791 started on the call.
1792 event_handler: A behavior to call to handle the events resultant from
1793 the operations on the call.
1794 context: Context object for distributed tracing.
1795 _registered_call_handle: An int representing the call handle of the
1796 method, or None if the method is not registered.
1797 Returns:
1798 A cygrpc.IntegratedCall with which to conduct an RPC.
1799 """
1800 operations_and_tags = tuple(
1801 (
1802 operation,
1803 event_handler,
1804 )
1805 for operation in operations
1806 )
1807 with state.lock:
1808 call = state.channel.integrated_call(
1809 flags,
1810 method,
1811 host,
1812 deadline,
1813 metadata,
1814 credentials,
1815 operations_and_tags,
1816 context,
1817 _registered_call_handle,
1818 )
1819 if state.managed_calls == 0:
1820 state.managed_calls = 1
1821 _run_channel_spin_thread(state)
1822 else:
1823 state.managed_calls += 1
1824 return call
1826 return create
1829class _ChannelConnectivityState(object):
1830 lock: threading.RLock
1831 channel: grpc.Channel
1832 polling: bool
1833 connectivity: grpc.ChannelConnectivity
1834 try_to_connect: bool
1835 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1836 callbacks_and_connectivities: List[
1837 Sequence[
1838 Union[
1839 Callable[[grpc.ChannelConnectivity], None],
1840 Optional[grpc.ChannelConnectivity],
1841 ]
1842 ]
1843 ]
1844 delivering: bool
1846 def __init__(self, channel: grpc.Channel):
1847 self.lock = threading.RLock()
1848 self.channel = channel
1849 self.polling = False
1850 self.connectivity = None
1851 self.try_to_connect = False
1852 self.callbacks_and_connectivities = []
1853 self.delivering = False
1855 def reset_postfork_child(self) -> None:
1856 self.polling = False
1857 self.connectivity = None
1858 self.try_to_connect = False
1859 self.callbacks_and_connectivities = []
1860 self.delivering = False
1863def _deliveries(
1864 state: _ChannelConnectivityState,
1865) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1866 callbacks_needing_update = []
1867 for callback_and_connectivity in state.callbacks_and_connectivities:
1868 (
1869 callback,
1870 callback_connectivity,
1871 ) = callback_and_connectivity
1872 if callback_connectivity is not state.connectivity:
1873 callbacks_needing_update.append(callback)
1874 callback_and_connectivity[1] = state.connectivity
1875 return callbacks_needing_update
1878def _deliver(
1879 state: _ChannelConnectivityState,
1880 initial_connectivity: grpc.ChannelConnectivity,
1881 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1882) -> None:
1883 connectivity = initial_connectivity
1884 callbacks = initial_callbacks
1885 while True:
1886 for callback in callbacks:
1887 cygrpc.block_if_fork_in_progress(state)
1888 try:
1889 callback(connectivity)
1890 except Exception: # pylint: disable=broad-except
1891 _LOGGER.exception(
1892 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1893 )
1894 with state.lock:
1895 callbacks = _deliveries(state)
1896 if callbacks:
1897 connectivity = state.connectivity
1898 else:
1899 state.delivering = False
1900 return
1903def _spawn_delivery(
1904 state: _ChannelConnectivityState,
1905 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1906) -> None:
1907 delivering_thread = cygrpc.ForkManagedThread(
1908 target=_deliver,
1909 args=(
1910 state,
1911 state.connectivity,
1912 callbacks,
1913 ),
1914 )
1915 delivering_thread.setDaemon(True)
1916 delivering_thread.start()
1917 state.delivering = True
1920# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1921def _poll_connectivity(
1922 state: _ChannelConnectivityState,
1923 channel: grpc.Channel,
1924 initial_try_to_connect: bool,
1925) -> None:
1926 try_to_connect = initial_try_to_connect
1927 connectivity = channel.check_connectivity_state(try_to_connect)
1928 with state.lock:
1929 state.connectivity = (
1930 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1931 connectivity
1932 ]
1933 )
1934 callbacks = tuple(
1935 callback for callback, _ in state.callbacks_and_connectivities
1936 )
1937 for callback_and_connectivity in state.callbacks_and_connectivities:
1938 callback_and_connectivity[1] = state.connectivity
1939 if callbacks:
1940 _spawn_delivery(state, callbacks)
1941 while True:
1942 event = channel.watch_connectivity_state(
1943 connectivity, time.time() + 0.2
1944 )
1945 cygrpc.block_if_fork_in_progress(state)
1946 with state.lock:
1947 if (
1948 not state.callbacks_and_connectivities
1949 and not state.try_to_connect
1950 ):
1951 state.polling = False
1952 state.connectivity = None
1953 break
1954 try_to_connect = state.try_to_connect
1955 state.try_to_connect = False
1956 if event.success or try_to_connect:
1957 connectivity = channel.check_connectivity_state(try_to_connect)
1958 with state.lock:
1959 state.connectivity = (
1960 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1961 connectivity
1962 ]
1963 )
1964 if not state.delivering:
1965 callbacks = _deliveries(state)
1966 if callbacks:
1967 _spawn_delivery(state, callbacks)
1970def _subscribe(
1971 state: _ChannelConnectivityState,
1972 callback: Callable[[grpc.ChannelConnectivity], None],
1973 try_to_connect: bool,
1974) -> None:
1975 with state.lock:
1976 if not state.callbacks_and_connectivities and not state.polling:
1977 polling_thread = cygrpc.ForkManagedThread(
1978 target=_poll_connectivity,
1979 args=(state, state.channel, bool(try_to_connect)),
1980 )
1981 polling_thread.setDaemon(True)
1982 polling_thread.start()
1983 state.polling = True
1984 state.callbacks_and_connectivities.append([callback, None])
1985 elif not state.delivering and state.connectivity is not None:
1986 _spawn_delivery(state, (callback,))
1987 state.try_to_connect |= bool(try_to_connect)
1988 state.callbacks_and_connectivities.append(
1989 [callback, state.connectivity]
1990 )
1991 else:
1992 state.try_to_connect |= bool(try_to_connect)
1993 state.callbacks_and_connectivities.append([callback, None])
1996def _unsubscribe(
1997 state: _ChannelConnectivityState,
1998 callback: Callable[[grpc.ChannelConnectivity], None],
1999) -> None:
2000 with state.lock:
2001 for index, (subscribed_callback, unused_connectivity) in enumerate(
2002 state.callbacks_and_connectivities
2003 ):
2004 if callback == subscribed_callback:
2005 state.callbacks_and_connectivities.pop(index)
2006 break
2009def _augment_options(
2010 base_options: Sequence[ChannelArgumentType],
2011 compression: Optional[grpc.Compression],
2012) -> Sequence[ChannelArgumentType]:
2013 compression_option = _compression.create_channel_option(compression)
2014 return (
2015 tuple(base_options)
2016 + compression_option
2017 + (
2018 (
2019 cygrpc.ChannelArgKey.primary_user_agent_string,
2020 _USER_AGENT,
2021 ),
2022 )
2023 )
2026def _separate_channel_options(
2027 options: Sequence[ChannelArgumentType],
2028) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
2029 """Separates core channel options from Python channel options."""
2030 core_options = []
2031 python_options = []
2032 for pair in options:
2033 if (
2034 pair[0]
2035 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2036 ):
2037 python_options.append(pair)
2038 else:
2039 core_options.append(pair)
2040 return python_options, core_options
2043class Channel(grpc.Channel):
2044 """A cygrpc.Channel-backed implementation of grpc.Channel."""
2046 _single_threaded_unary_stream: bool
2047 _channel: cygrpc.Channel
2048 _call_state: _ChannelCallState
2049 _connectivity_state: _ChannelConnectivityState
2050 _target: str
2051 _registered_call_handles: Dict[str, int]
2053 def __init__(
2054 self,
2055 target: str,
2056 options: Sequence[ChannelArgumentType],
2057 credentials: Optional[grpc.ChannelCredentials],
2058 compression: Optional[grpc.Compression],
2059 ):
2060 """Constructor.
2062 Args:
2063 target: The target to which to connect.
2064 options: Configuration options for the channel.
2065 credentials: A cygrpc.ChannelCredentials or None.
2066 compression: An optional value indicating the compression method to be
2067 used over the lifetime of the channel.
2068 """
2069 python_options, core_options = _separate_channel_options(options)
2070 self._single_threaded_unary_stream = (
2071 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2072 )
2073 self._process_python_options(python_options)
2074 self._channel = cygrpc.Channel(
2075 _common.encode(target),
2076 _augment_options(core_options, compression),
2077 credentials,
2078 )
2079 self._target = target
2080 self._call_state = _ChannelCallState(self._channel)
2081 self._connectivity_state = _ChannelConnectivityState(self._channel)
2082 cygrpc.fork_register_channel(self)
2083 if cygrpc.g_gevent_activated:
2084 cygrpc.gevent_increment_channel_count()
2086 def _get_registered_call_handle(self, method: str) -> int:
2087 """
2088 Get the registered call handle for a method.
2090 This is a semi-private method. It is intended for use only by gRPC generated code.
2092 This method is not thread-safe.
2094 Args:
2095 method: Required, the method name for the RPC.
2097 Returns:
2098 The registered call handle pointer in the form of a Python Long.
2099 """
2100 return self._channel.get_registered_call_handle(_common.encode(method))
2102 def _process_python_options(
2103 self, python_options: Sequence[ChannelArgumentType]
2104 ) -> None:
2105 """Sets channel attributes according to python-only channel options."""
2106 for pair in python_options:
2107 if (
2108 pair[0]
2109 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2110 ):
2111 self._single_threaded_unary_stream = True
2113 def subscribe(
2114 self,
2115 callback: Callable[[grpc.ChannelConnectivity], None],
2116 try_to_connect: Optional[bool] = None,
2117 ) -> None:
2118 _subscribe(self._connectivity_state, callback, try_to_connect)
2120 def unsubscribe(
2121 self, callback: Callable[[grpc.ChannelConnectivity], None]
2122 ) -> None:
2123 _unsubscribe(self._connectivity_state, callback)
2125 # pylint: disable=arguments-differ
2126 def unary_unary(
2127 self,
2128 method: str,
2129 request_serializer: Optional[SerializingFunction] = None,
2130 response_deserializer: Optional[DeserializingFunction] = None,
2131 _registered_method: Optional[bool] = False,
2132 ) -> grpc.UnaryUnaryMultiCallable:
2133 _registered_call_handle = None
2134 if _registered_method:
2135 _registered_call_handle = self._get_registered_call_handle(method)
2136 return _UnaryUnaryMultiCallable(
2137 self._channel,
2138 _channel_managed_call_management(self._call_state),
2139 _common.encode(method),
2140 _common.encode(self._target),
2141 request_serializer,
2142 response_deserializer,
2143 _registered_call_handle,
2144 )
2146 # pylint: disable=arguments-differ
2147 def unary_stream(
2148 self,
2149 method: str,
2150 request_serializer: Optional[SerializingFunction] = None,
2151 response_deserializer: Optional[DeserializingFunction] = None,
2152 _registered_method: Optional[bool] = False,
2153 ) -> grpc.UnaryStreamMultiCallable:
2154 _registered_call_handle = None
2155 if _registered_method:
2156 _registered_call_handle = self._get_registered_call_handle(method)
2157 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2158 # on a single Python thread results in an appreciable speed-up. However,
2159 # due to slight differences in capability, the multi-threaded variant
2160 # remains the default.
2161 if self._single_threaded_unary_stream:
2162 return _SingleThreadedUnaryStreamMultiCallable(
2163 self._channel,
2164 _common.encode(method),
2165 _common.encode(self._target),
2166 request_serializer,
2167 response_deserializer,
2168 _registered_call_handle,
2169 )
2170 else:
2171 return _UnaryStreamMultiCallable(
2172 self._channel,
2173 _channel_managed_call_management(self._call_state),
2174 _common.encode(method),
2175 _common.encode(self._target),
2176 request_serializer,
2177 response_deserializer,
2178 _registered_call_handle,
2179 )
2181 # pylint: disable=arguments-differ
2182 def stream_unary(
2183 self,
2184 method: str,
2185 request_serializer: Optional[SerializingFunction] = None,
2186 response_deserializer: Optional[DeserializingFunction] = None,
2187 _registered_method: Optional[bool] = False,
2188 ) -> grpc.StreamUnaryMultiCallable:
2189 _registered_call_handle = None
2190 if _registered_method:
2191 _registered_call_handle = self._get_registered_call_handle(method)
2192 return _StreamUnaryMultiCallable(
2193 self._channel,
2194 _channel_managed_call_management(self._call_state),
2195 _common.encode(method),
2196 _common.encode(self._target),
2197 request_serializer,
2198 response_deserializer,
2199 _registered_call_handle,
2200 )
2202 # pylint: disable=arguments-differ
2203 def stream_stream(
2204 self,
2205 method: str,
2206 request_serializer: Optional[SerializingFunction] = None,
2207 response_deserializer: Optional[DeserializingFunction] = None,
2208 _registered_method: Optional[bool] = False,
2209 ) -> grpc.StreamStreamMultiCallable:
2210 _registered_call_handle = None
2211 if _registered_method:
2212 _registered_call_handle = self._get_registered_call_handle(method)
2213 return _StreamStreamMultiCallable(
2214 self._channel,
2215 _channel_managed_call_management(self._call_state),
2216 _common.encode(method),
2217 _common.encode(self._target),
2218 request_serializer,
2219 response_deserializer,
2220 _registered_call_handle,
2221 )
2223 def _unsubscribe_all(self) -> None:
2224 state = self._connectivity_state
2225 if state:
2226 with state.lock:
2227 del state.callbacks_and_connectivities[:]
2229 def _close(self) -> None:
2230 self._unsubscribe_all()
2231 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2232 cygrpc.fork_unregister_channel(self)
2233 if cygrpc.g_gevent_activated:
2234 cygrpc.gevent_decrement_channel_count()
2236 def _close_on_fork(self) -> None:
2237 self._unsubscribe_all()
2238 self._channel.close_on_fork(
2239 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2240 )
2242 def __enter__(self):
2243 return self
2245 def __exit__(self, exc_type, exc_val, exc_tb):
2246 self._close()
2247 return False
2249 def close(self) -> None:
2250 self._close()
2252 def __del__(self):
2253 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2254 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2255 # here (or more likely, call self._close() here). We don't do this today
2256 # because many valid use cases today allow the channel to be deleted
2257 # immediately after stubs are created. After a sufficient period of time
2258 # has passed for all users to be trusted to freeze out to their channels
2259 # for as long as they are in use and to close them after using them,
2260 # then deletion of this grpc._channel.Channel instance can be made to
2261 # effect closure of the underlying cygrpc.Channel instance.
2262 try:
2263 self._unsubscribe_all()
2264 except: # pylint: disable=bare-except
2265 # Exceptions in __del__ are ignored by Python anyway, but they can
2266 # keep spamming logs. Just silence them.
2267 pass