Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_channel.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25 Any,
26 Callable,
27 Dict,
28 Iterator,
29 List,
30 Optional,
31 Sequence,
32 Set,
33 Tuple,
34 Union,
35)
37import grpc # pytype: disable=pyi-error
38from grpc import _common # pytype: disable=pyi-error
39from grpc import _compression # pytype: disable=pyi-error
40from grpc import _grpcio_metadata # pytype: disable=pyi-error
41from grpc import _observability # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental # pytype: disable=pyi-error
53_LOGGER = logging.getLogger(__name__)
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
57_EMPTY_FLAGS = 0
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
65_UNARY_UNARY_INITIAL_DUE = (
66 cygrpc.OperationType.send_initial_metadata,
67 cygrpc.OperationType.send_message,
68 cygrpc.OperationType.send_close_from_client,
69 cygrpc.OperationType.receive_initial_metadata,
70 cygrpc.OperationType.receive_message,
71 cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74 cygrpc.OperationType.send_initial_metadata,
75 cygrpc.OperationType.send_message,
76 cygrpc.OperationType.send_close_from_client,
77 cygrpc.OperationType.receive_initial_metadata,
78 cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81 cygrpc.OperationType.send_initial_metadata,
82 cygrpc.OperationType.receive_initial_metadata,
83 cygrpc.OperationType.receive_message,
84 cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87 cygrpc.OperationType.send_initial_metadata,
88 cygrpc.OperationType.receive_initial_metadata,
89 cygrpc.OperationType.receive_status_on_client,
90)
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93 "Exception calling channel subscription callback!"
94)
96_OK_RENDEZVOUS_REPR_FORMAT = (
97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101 "<{} of RPC that terminated with:\n"
102 "\tstatus = {}\n"
103 '\tdetails = "{}"\n'
104 '\tdebug_error_string = "{}"\n'
105 ">"
106)
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110 return None if timeout is None else time.time() + timeout
113def _unknown_code_details(
114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116 return 'Server sent unknown code {} and details "{}"'.format(
117 unknown_cygrpc_code, details
118 )
121class _RPCState(object):
122 condition: threading.Condition
123 due: Set[cygrpc.OperationType]
124 initial_metadata: Optional[MetadataType]
125 response: Any
126 trailing_metadata: Optional[MetadataType]
127 code: Optional[grpc.StatusCode]
128 details: Optional[str]
129 debug_error_string: Optional[str]
130 cancelled: bool
131 callbacks: List[NullaryCallbackType]
132 fork_epoch: Optional[int]
133 rpc_start_time: Optional[float] # In relative seconds
134 rpc_end_time: Optional[float] # In relative seconds
135 method: Optional[str]
136 target: Optional[str]
138 def __init__(
139 self,
140 due: Sequence[cygrpc.OperationType],
141 initial_metadata: Optional[MetadataType],
142 trailing_metadata: Optional[MetadataType],
143 code: Optional[grpc.StatusCode],
144 details: Optional[str],
145 ):
146 # `condition` guards all members of _RPCState. `notify_all` is called on
147 # `condition` when the state of the RPC has changed.
148 self.condition = threading.Condition()
150 # The cygrpc.OperationType objects representing events due from the RPC's
151 # completion queue. If an operation is in `due`, it is guaranteed that
152 # `operate()` has been called on a corresponding operation. But the
153 # converse is not true. That is, in the case of failed `operate()`
154 # calls, there may briefly be events in `due` that do not correspond to
155 # operations submitted to Core.
156 self.due = set(due)
157 self.initial_metadata = initial_metadata
158 self.response = None
159 self.trailing_metadata = trailing_metadata
160 self.code = code
161 self.details = details
162 self.debug_error_string = None
163 # The following three fields are used for observability.
164 # Updates to those fields do not trigger self.condition.
165 self.rpc_start_time = None
166 self.rpc_end_time = None
167 self.method = None
168 self.target = None
170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171 # slightly wonky, so they have to be tracked separately from the rest of the
172 # result of the RPC. This field tracks whether cancellation was requested
173 # prior to termination of the RPC.
174 self.cancelled = False
175 self.callbacks = []
176 self.fork_epoch = cygrpc.get_fork_epoch()
178 def reset_postfork_child(self):
179 self.condition = threading.Condition()
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183 if state.code is None:
184 state.code = code
185 state.details = details
186 if state.initial_metadata is None:
187 state.initial_metadata = ()
188 state.trailing_metadata = ()
191def _handle_event(
192 event: cygrpc.BaseEvent,
193 state: _RPCState,
194 response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196 callbacks = []
197 for batch_operation in event.batch_operations:
198 operation_type = batch_operation.type()
199 state.due.remove(operation_type)
200 if operation_type == cygrpc.OperationType.receive_initial_metadata:
201 state.initial_metadata = batch_operation.initial_metadata()
202 elif operation_type == cygrpc.OperationType.receive_message:
203 serialized_response = batch_operation.message()
204 if serialized_response is not None:
205 response = _common.deserialize(
206 serialized_response, response_deserializer
207 )
208 if response is None:
209 details = "Exception deserializing response!"
210 _abort(state, grpc.StatusCode.INTERNAL, details)
211 else:
212 state.response = response
213 elif operation_type == cygrpc.OperationType.receive_status_on_client:
214 state.trailing_metadata = batch_operation.trailing_metadata()
215 if state.code is None:
216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217 batch_operation.code()
218 )
219 if code is None:
220 state.code = grpc.StatusCode.UNKNOWN
221 state.details = _unknown_code_details(
222 code, batch_operation.details()
223 )
224 else:
225 state.code = code
226 state.details = batch_operation.details()
227 state.debug_error_string = batch_operation.error_string()
228 state.rpc_end_time = time.perf_counter()
229 _observability.maybe_record_rpc_latency(state)
230 callbacks.extend(state.callbacks)
231 state.callbacks = None
232 return callbacks
235def _event_handler(
236 state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238 def handle_event(event):
239 with state.condition:
240 callbacks = _handle_event(event, state, response_deserializer)
241 state.condition.notify_all()
242 done = not state.due
243 for callback in callbacks:
244 try:
245 callback()
246 except Exception as e: # pylint: disable=broad-except
247 # NOTE(rbellevi): We suppress but log errors here so as not to
248 # kill the channel spin thread.
249 logging.error(
250 "Exception in callback %s: %s", repr(callback.func), repr(e)
251 )
252 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
254 return handle_event
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260 request_iterator: Iterator,
261 state: _RPCState,
262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263 request_serializer: SerializingFunction,
264 event_handler: Optional[UserTag],
265) -> None:
266 """Consume a request supplied by the user."""
268 def consume_request_iterator(): # pylint: disable=too-many-branches
269 # Iterate over the request iterator until it is exhausted or an error
270 # condition is encountered.
271 while True:
272 return_from_user_request_generator_invoked = False
273 try:
274 # The thread may die in user-code. Do not block fork for this.
275 cygrpc.enter_user_request_generator()
276 request = next(request_iterator)
277 except StopIteration:
278 break
279 except Exception: # pylint: disable=broad-except
280 cygrpc.return_from_user_request_generator()
281 return_from_user_request_generator_invoked = True
282 code = grpc.StatusCode.UNKNOWN
283 details = "Exception iterating requests!"
284 _LOGGER.exception(details)
285 call.cancel(
286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287 )
288 _abort(state, code, details)
289 return
290 finally:
291 if not return_from_user_request_generator_invoked:
292 cygrpc.return_from_user_request_generator()
293 serialized_request = _common.serialize(request, request_serializer)
294 with state.condition:
295 if state.code is None and not state.cancelled:
296 if serialized_request is None:
297 code = grpc.StatusCode.INTERNAL
298 details = "Exception serializing request!"
299 call.cancel(
300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301 details,
302 )
303 _abort(state, code, details)
304 return
305 else:
306 state.due.add(cygrpc.OperationType.send_message)
307 operations = (
308 cygrpc.SendMessageOperation(
309 serialized_request, _EMPTY_FLAGS
310 ),
311 )
312 operating = call.operate(operations, event_handler)
313 if not operating:
314 state.due.remove(cygrpc.OperationType.send_message)
315 return
317 def _done():
318 return (
319 state.code is not None
320 or cygrpc.OperationType.send_message
321 not in state.due
322 )
324 _common.wait(
325 state.condition.wait,
326 _done,
327 spin_cb=functools.partial(
328 cygrpc.block_if_fork_in_progress, state
329 ),
330 )
331 if state.code is not None:
332 return
333 else:
334 return
335 with state.condition:
336 if state.code is None:
337 state.due.add(cygrpc.OperationType.send_close_from_client)
338 operations = (
339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
340 )
341 operating = call.operate(operations, event_handler)
342 if not operating:
343 state.due.remove(
344 cygrpc.OperationType.send_close_from_client
345 )
347 consumption_thread = cygrpc.ForkManagedThread(
348 target=consume_request_iterator
349 )
350 consumption_thread.setDaemon(True)
351 consumption_thread.start()
354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
355 """Calculates error string for RPC."""
356 with rpc_state.condition:
357 if rpc_state.code is None:
358 return "<{} object>".format(class_name)
359 elif rpc_state.code is grpc.StatusCode.OK:
360 return _OK_RENDEZVOUS_REPR_FORMAT.format(
361 class_name, rpc_state.code, rpc_state.details
362 )
363 else:
364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
365 class_name,
366 rpc_state.code,
367 rpc_state.details,
368 rpc_state.debug_error_string,
369 )
372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
373 """An RPC error not tied to the execution of a particular RPC.
375 The RPC represented by the state object must not be in-progress or
376 cancelled.
378 Attributes:
379 _state: An instance of _RPCState.
380 """
382 _state: _RPCState
384 def __init__(self, state: _RPCState):
385 with state.condition:
386 self._state = _RPCState(
387 (),
388 copy.deepcopy(state.initial_metadata),
389 copy.deepcopy(state.trailing_metadata),
390 state.code,
391 copy.deepcopy(state.details),
392 )
393 self._state.response = copy.copy(state.response)
394 self._state.debug_error_string = copy.copy(state.debug_error_string)
396 def initial_metadata(self) -> Optional[MetadataType]:
397 return self._state.initial_metadata
399 def trailing_metadata(self) -> Optional[MetadataType]:
400 return self._state.trailing_metadata
402 def code(self) -> Optional[grpc.StatusCode]:
403 return self._state.code
405 def details(self) -> Optional[str]:
406 return _common.decode(self._state.details)
408 def debug_error_string(self) -> Optional[str]:
409 return _common.decode(self._state.debug_error_string)
411 def _repr(self) -> str:
412 return _rpc_state_string(self.__class__.__name__, self._state)
414 def __repr__(self) -> str:
415 return self._repr()
417 def __str__(self) -> str:
418 return self._repr()
420 def cancel(self) -> bool:
421 """See grpc.Future.cancel."""
422 return False
424 def cancelled(self) -> bool:
425 """See grpc.Future.cancelled."""
426 return False
428 def running(self) -> bool:
429 """See grpc.Future.running."""
430 return False
432 def done(self) -> bool:
433 """See grpc.Future.done."""
434 return True
436 def result(
437 self, timeout: Optional[float] = None
438 ) -> Any: # pylint: disable=unused-argument
439 """See grpc.Future.result."""
440 raise self
442 def exception(
443 self, timeout: Optional[float] = None # pylint: disable=unused-argument
444 ) -> Optional[Exception]:
445 """See grpc.Future.exception."""
446 return self
448 def traceback(
449 self, timeout: Optional[float] = None # pylint: disable=unused-argument
450 ) -> Optional[types.TracebackType]:
451 """See grpc.Future.traceback."""
452 try:
453 raise self
454 except grpc.RpcError:
455 return sys.exc_info()[2]
457 def add_done_callback(
458 self,
459 fn: Callable[[grpc.Future], None],
460 timeout: Optional[float] = None, # pylint: disable=unused-argument
461 ) -> None:
462 """See grpc.Future.add_done_callback."""
463 fn(self)
466class _Rendezvous(grpc.RpcError, grpc.RpcContext):
467 """An RPC iterator.
469 Attributes:
470 _state: An instance of _RPCState.
471 _call: An instance of SegregatedCall or IntegratedCall.
472 In either case, the _call object is expected to have operate, cancel,
473 and next_event methods.
474 _response_deserializer: A callable taking bytes and return a Python
475 object.
476 _deadline: A float representing the deadline of the RPC in seconds. Or
477 possibly None, to represent an RPC with no deadline at all.
478 """
480 _state: _RPCState
481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
482 _response_deserializer: Optional[DeserializingFunction]
483 _deadline: Optional[float]
485 def __init__(
486 self,
487 state: _RPCState,
488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
489 response_deserializer: Optional[DeserializingFunction],
490 deadline: Optional[float],
491 ):
492 super(_Rendezvous, self).__init__()
493 self._state = state
494 self._call = call
495 self._response_deserializer = response_deserializer
496 self._deadline = deadline
498 def is_active(self) -> bool:
499 """See grpc.RpcContext.is_active"""
500 with self._state.condition:
501 return self._state.code is None
503 def time_remaining(self) -> Optional[float]:
504 """See grpc.RpcContext.time_remaining"""
505 with self._state.condition:
506 if self._deadline is None:
507 return None
508 else:
509 return max(self._deadline - time.time(), 0)
511 def cancel(self) -> bool:
512 """See grpc.RpcContext.cancel"""
513 with self._state.condition:
514 if self._state.code is None:
515 code = grpc.StatusCode.CANCELLED
516 details = "Locally cancelled by application!"
517 self._call.cancel(
518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
519 )
520 self._state.cancelled = True
521 _abort(self._state, code, details)
522 self._state.condition.notify_all()
523 return True
524 else:
525 return False
527 def add_callback(self, callback: NullaryCallbackType) -> bool:
528 """See grpc.RpcContext.add_callback"""
529 with self._state.condition:
530 if self._state.callbacks is None:
531 return False
532 else:
533 self._state.callbacks.append(callback)
534 return True
536 def __iter__(self):
537 return self
539 def next(self):
540 return self._next()
542 def __next__(self):
543 return self._next()
545 def _next(self):
546 raise NotImplementedError()
548 def debug_error_string(self) -> Optional[str]:
549 raise NotImplementedError()
551 def _repr(self) -> str:
552 return _rpc_state_string(self.__class__.__name__, self._state)
554 def __repr__(self) -> str:
555 return self._repr()
557 def __str__(self) -> str:
558 return self._repr()
560 def __del__(self) -> None:
561 with self._state.condition:
562 if self._state.code is None:
563 self._state.code = grpc.StatusCode.CANCELLED
564 self._state.details = "Cancelled upon garbage collection!"
565 self._state.cancelled = True
566 self._call.cancel(
567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
568 self._state.details,
569 )
570 self._state.condition.notify_all()
573class _SingleThreadedRendezvous(
574 _Rendezvous, grpc.Call, grpc.Future
575): # pylint: disable=too-many-ancestors
576 """An RPC iterator operating entirely on a single thread.
578 The __next__ method of _SingleThreadedRendezvous does not depend on the
579 existence of any other thread, including the "channel spin thread".
580 However, this means that its interface is entirely synchronous. So this
581 class cannot completely fulfill the grpc.Future interface. The result,
582 exception, and traceback methods will never block and will instead raise
583 an exception if calling the method would result in blocking.
585 This means that these methods are safe to call from add_done_callback
586 handlers.
587 """
589 _state: _RPCState
591 def _is_complete(self) -> bool:
592 return self._state.code is not None
594 def cancelled(self) -> bool:
595 with self._state.condition:
596 return self._state.cancelled
598 def running(self) -> bool:
599 with self._state.condition:
600 return self._state.code is None
602 def done(self) -> bool:
603 with self._state.condition:
604 return self._state.code is not None
606 def result(self, timeout: Optional[float] = None) -> Any:
607 """Returns the result of the computation or raises its exception.
609 This method will never block. Instead, it will raise an exception
610 if calling this method would otherwise result in blocking.
612 Since this method will never block, any `timeout` argument passed will
613 be ignored.
614 """
615 del timeout
616 with self._state.condition:
617 if not self._is_complete():
618 raise grpc.experimental.UsageError(
619 "_SingleThreadedRendezvous only supports result() when the"
620 " RPC is complete."
621 )
622 if self._state.code is grpc.StatusCode.OK:
623 return self._state.response
624 elif self._state.cancelled:
625 raise grpc.FutureCancelledError()
626 else:
627 raise self
629 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
630 """Return the exception raised by the computation.
632 This method will never block. Instead, it will raise an exception
633 if calling this method would otherwise result in blocking.
635 Since this method will never block, any `timeout` argument passed will
636 be ignored.
637 """
638 del timeout
639 with self._state.condition:
640 if not self._is_complete():
641 raise grpc.experimental.UsageError(
642 "_SingleThreadedRendezvous only supports exception() when"
643 " the RPC is complete."
644 )
645 if self._state.code is grpc.StatusCode.OK:
646 return None
647 elif self._state.cancelled:
648 raise grpc.FutureCancelledError()
649 else:
650 return self
652 def traceback(
653 self, timeout: Optional[float] = None
654 ) -> Optional[types.TracebackType]:
655 """Access the traceback of the exception raised by the computation.
657 This method will never block. Instead, it will raise an exception
658 if calling this method would otherwise result in blocking.
660 Since this method will never block, any `timeout` argument passed will
661 be ignored.
662 """
663 del timeout
664 with self._state.condition:
665 if not self._is_complete():
666 raise grpc.experimental.UsageError(
667 "_SingleThreadedRendezvous only supports traceback() when"
668 " the RPC is complete."
669 )
670 if self._state.code is grpc.StatusCode.OK:
671 return None
672 elif self._state.cancelled:
673 raise grpc.FutureCancelledError()
674 else:
675 try:
676 raise self
677 except grpc.RpcError:
678 return sys.exc_info()[2]
680 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
681 with self._state.condition:
682 if self._state.code is None:
683 self._state.callbacks.append(functools.partial(fn, self))
684 return
686 fn(self)
688 def initial_metadata(self) -> Optional[MetadataType]:
689 """See grpc.Call.initial_metadata"""
690 with self._state.condition:
691 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
692 # to receive initial metadata before any messages.
693 while self._state.initial_metadata is None:
694 self._consume_next_event()
695 return self._state.initial_metadata
697 def trailing_metadata(self) -> Optional[MetadataType]:
698 """See grpc.Call.trailing_metadata"""
699 with self._state.condition:
700 if self._state.trailing_metadata is None:
701 raise grpc.experimental.UsageError(
702 "Cannot get trailing metadata until RPC is completed."
703 )
704 return self._state.trailing_metadata
706 def code(self) -> Optional[grpc.StatusCode]:
707 """See grpc.Call.code"""
708 with self._state.condition:
709 if self._state.code is None:
710 raise grpc.experimental.UsageError(
711 "Cannot get code until RPC is completed."
712 )
713 return self._state.code
715 def details(self) -> Optional[str]:
716 """See grpc.Call.details"""
717 with self._state.condition:
718 if self._state.details is None:
719 raise grpc.experimental.UsageError(
720 "Cannot get details until RPC is completed."
721 )
722 return _common.decode(self._state.details)
724 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
725 event = self._call.next_event()
726 with self._state.condition:
727 callbacks = _handle_event(
728 event, self._state, self._response_deserializer
729 )
730 for callback in callbacks:
731 # NOTE(gnossen): We intentionally allow exceptions to bubble up
732 # to the user when running on a single thread.
733 callback()
734 return event
736 def _next_response(self) -> Any:
737 while True:
738 self._consume_next_event()
739 with self._state.condition:
740 if self._state.response is not None:
741 response = self._state.response
742 self._state.response = None
743 return response
744 elif (
745 cygrpc.OperationType.receive_message not in self._state.due
746 ):
747 if self._state.code is grpc.StatusCode.OK:
748 raise StopIteration()
749 elif self._state.code is not None:
750 raise self
752 def _next(self) -> Any:
753 with self._state.condition:
754 if self._state.code is None:
755 # We tentatively add the operation as expected and remove
756 # it if the enqueue operation fails. This allows us to guarantee that
757 # if an event has been submitted to the core completion queue,
758 # it is in `due`. If we waited until after a successful
759 # enqueue operation then a signal could interrupt this
760 # thread between the enqueue operation and the addition of the
761 # operation to `due`. This would cause an exception on the
762 # channel spin thread when the operation completes and no
763 # corresponding operation would be present in state.due.
764 # Note that, since `condition` is held through this block, there is
765 # no data race on `due`.
766 self._state.due.add(cygrpc.OperationType.receive_message)
767 operating = self._call.operate(
768 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
769 )
770 if not operating:
771 self._state.due.remove(cygrpc.OperationType.receive_message)
772 elif self._state.code is grpc.StatusCode.OK:
773 raise StopIteration()
774 else:
775 raise self
776 return self._next_response()
778 def debug_error_string(self) -> Optional[str]:
779 with self._state.condition:
780 if self._state.debug_error_string is None:
781 raise grpc.experimental.UsageError(
782 "Cannot get debug error string until RPC is completed."
783 )
784 return _common.decode(self._state.debug_error_string)
787class _MultiThreadedRendezvous(
788 _Rendezvous, grpc.Call, grpc.Future
789): # pylint: disable=too-many-ancestors
790 """An RPC iterator that depends on a channel spin thread.
792 This iterator relies upon a per-channel thread running in the background,
793 dequeueing events from the completion queue, and notifying threads waiting
794 on the threading.Condition object in the _RPCState object.
796 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
797 and to mediate a bidirection streaming RPC.
798 """
800 _state: _RPCState
802 def initial_metadata(self) -> Optional[MetadataType]:
803 """See grpc.Call.initial_metadata"""
804 with self._state.condition:
806 def _done():
807 return self._state.initial_metadata is not None
809 _common.wait(self._state.condition.wait, _done)
810 return self._state.initial_metadata
812 def trailing_metadata(self) -> Optional[MetadataType]:
813 """See grpc.Call.trailing_metadata"""
814 with self._state.condition:
816 def _done():
817 return self._state.trailing_metadata is not None
819 _common.wait(self._state.condition.wait, _done)
820 return self._state.trailing_metadata
822 def code(self) -> Optional[grpc.StatusCode]:
823 """See grpc.Call.code"""
824 with self._state.condition:
826 def _done():
827 return self._state.code is not None
829 _common.wait(self._state.condition.wait, _done)
830 return self._state.code
832 def details(self) -> Optional[str]:
833 """See grpc.Call.details"""
834 with self._state.condition:
836 def _done():
837 return self._state.details is not None
839 _common.wait(self._state.condition.wait, _done)
840 return _common.decode(self._state.details)
842 def debug_error_string(self) -> Optional[str]:
843 with self._state.condition:
845 def _done():
846 return self._state.debug_error_string is not None
848 _common.wait(self._state.condition.wait, _done)
849 return _common.decode(self._state.debug_error_string)
851 def cancelled(self) -> bool:
852 with self._state.condition:
853 return self._state.cancelled
855 def running(self) -> bool:
856 with self._state.condition:
857 return self._state.code is None
859 def done(self) -> bool:
860 with self._state.condition:
861 return self._state.code is not None
863 def _is_complete(self) -> bool:
864 return self._state.code is not None
866 def result(self, timeout: Optional[float] = None) -> Any:
867 """Returns the result of the computation or raises its exception.
869 See grpc.Future.result for the full API contract.
870 """
871 with self._state.condition:
872 timed_out = _common.wait(
873 self._state.condition.wait, self._is_complete, timeout=timeout
874 )
875 if timed_out:
876 raise grpc.FutureTimeoutError()
877 else:
878 if self._state.code is grpc.StatusCode.OK:
879 return self._state.response
880 elif self._state.cancelled:
881 raise grpc.FutureCancelledError()
882 else:
883 raise self
885 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
886 """Return the exception raised by the computation.
888 See grpc.Future.exception for the full API contract.
889 """
890 with self._state.condition:
891 timed_out = _common.wait(
892 self._state.condition.wait, self._is_complete, timeout=timeout
893 )
894 if timed_out:
895 raise grpc.FutureTimeoutError()
896 else:
897 if self._state.code is grpc.StatusCode.OK:
898 return None
899 elif self._state.cancelled:
900 raise grpc.FutureCancelledError()
901 else:
902 return self
904 def traceback(
905 self, timeout: Optional[float] = None
906 ) -> Optional[types.TracebackType]:
907 """Access the traceback of the exception raised by the computation.
909 See grpc.future.traceback for the full API contract.
910 """
911 with self._state.condition:
912 timed_out = _common.wait(
913 self._state.condition.wait, self._is_complete, timeout=timeout
914 )
915 if timed_out:
916 raise grpc.FutureTimeoutError()
917 else:
918 if self._state.code is grpc.StatusCode.OK:
919 return None
920 elif self._state.cancelled:
921 raise grpc.FutureCancelledError()
922 else:
923 try:
924 raise self
925 except grpc.RpcError:
926 return sys.exc_info()[2]
928 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
929 with self._state.condition:
930 if self._state.code is None:
931 self._state.callbacks.append(functools.partial(fn, self))
932 return
934 fn(self)
936 def _next(self) -> Any:
937 with self._state.condition:
938 if self._state.code is None:
939 event_handler = _event_handler(
940 self._state, self._response_deserializer
941 )
942 self._state.due.add(cygrpc.OperationType.receive_message)
943 operating = self._call.operate(
944 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
945 event_handler,
946 )
947 if not operating:
948 self._state.due.remove(cygrpc.OperationType.receive_message)
949 elif self._state.code is grpc.StatusCode.OK:
950 raise StopIteration()
951 else:
952 raise self
954 def _response_ready():
955 return self._state.response is not None or (
956 cygrpc.OperationType.receive_message not in self._state.due
957 and self._state.code is not None
958 )
960 _common.wait(self._state.condition.wait, _response_ready)
961 if self._state.response is not None:
962 response = self._state.response
963 self._state.response = None
964 return response
965 elif cygrpc.OperationType.receive_message not in self._state.due:
966 if self._state.code is grpc.StatusCode.OK:
967 raise StopIteration()
968 elif self._state.code is not None:
969 raise self
972def _start_unary_request(
973 request: Any,
974 timeout: Optional[float],
975 request_serializer: SerializingFunction,
976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
977 deadline = _deadline(timeout)
978 serialized_request = _common.serialize(request, request_serializer)
979 if serialized_request is None:
980 state = _RPCState(
981 (),
982 (),
983 (),
984 grpc.StatusCode.INTERNAL,
985 "Exception serializing request!",
986 )
987 error = _InactiveRpcError(state)
988 return deadline, None, error
989 else:
990 return deadline, serialized_request, None
993def _end_unary_response_blocking(
994 state: _RPCState,
995 call: cygrpc.SegregatedCall,
996 with_call: bool,
997 deadline: Optional[float],
998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
999 if state.code is grpc.StatusCode.OK:
1000 if with_call:
1001 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
1002 return state.response, rendezvous
1003 else:
1004 return state.response
1005 else:
1006 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
1009def _stream_unary_invocation_operations(
1010 metadata: Optional[MetadataType], initial_metadata_flags: int
1011) -> Sequence[Sequence[cygrpc.Operation]]:
1012 return (
1013 (
1014 cygrpc.SendInitialMetadataOperation(
1015 metadata, initial_metadata_flags
1016 ),
1017 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1018 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1019 ),
1020 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1021 )
1024def _stream_unary_invocation_operations_and_tags(
1025 metadata: Optional[MetadataType], initial_metadata_flags: int
1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1027 return tuple(
1028 (
1029 operations,
1030 None,
1031 )
1032 for operations in _stream_unary_invocation_operations(
1033 metadata, initial_metadata_flags
1034 )
1035 )
1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1039 parent_deadline = cygrpc.get_deadline_from_context()
1040 if parent_deadline is None and user_deadline is None:
1041 return None
1042 elif parent_deadline is not None and user_deadline is None:
1043 return parent_deadline
1044 elif user_deadline is not None and parent_deadline is None:
1045 return user_deadline
1046 else:
1047 return min(parent_deadline, user_deadline)
1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1051 _channel: cygrpc.Channel
1052 _managed_call: IntegratedCallFactory
1053 _method: bytes
1054 _target: bytes
1055 _request_serializer: Optional[SerializingFunction]
1056 _response_deserializer: Optional[DeserializingFunction]
1057 _context: Any
1058 _registered_call_handle: Optional[int]
1060 __slots__ = [
1061 "_channel",
1062 "_managed_call",
1063 "_method",
1064 "_target",
1065 "_request_serializer",
1066 "_response_deserializer",
1067 "_context",
1068 ]
1070 # pylint: disable=too-many-arguments
1071 def __init__(
1072 self,
1073 channel: cygrpc.Channel,
1074 managed_call: IntegratedCallFactory,
1075 method: bytes,
1076 target: bytes,
1077 request_serializer: Optional[SerializingFunction],
1078 response_deserializer: Optional[DeserializingFunction],
1079 _registered_call_handle: Optional[int],
1080 ):
1081 self._channel = channel
1082 self._managed_call = managed_call
1083 self._method = method
1084 self._target = target
1085 self._request_serializer = request_serializer
1086 self._response_deserializer = response_deserializer
1087 self._context = cygrpc.build_census_context()
1088 self._registered_call_handle = _registered_call_handle
1090 def _prepare(
1091 self,
1092 request: Any,
1093 timeout: Optional[float],
1094 metadata: Optional[MetadataType],
1095 wait_for_ready: Optional[bool],
1096 compression: Optional[grpc.Compression],
1097 ) -> Tuple[
1098 Optional[_RPCState],
1099 Optional[Sequence[cygrpc.Operation]],
1100 Optional[float],
1101 Optional[grpc.RpcError],
1102 ]:
1103 deadline, serialized_request, rendezvous = _start_unary_request(
1104 request, timeout, self._request_serializer
1105 )
1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1107 wait_for_ready
1108 )
1109 augmented_metadata = _compression.augment_metadata(
1110 metadata, compression
1111 )
1112 if serialized_request is None:
1113 return None, None, None, rendezvous
1114 else:
1115 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1116 operations = (
1117 cygrpc.SendInitialMetadataOperation(
1118 augmented_metadata, initial_metadata_flags
1119 ),
1120 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1121 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1122 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1123 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1124 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1125 )
1126 return state, operations, deadline, None
1128 def _blocking(
1129 self,
1130 request: Any,
1131 timeout: Optional[float] = None,
1132 metadata: Optional[MetadataType] = None,
1133 credentials: Optional[grpc.CallCredentials] = None,
1134 wait_for_ready: Optional[bool] = None,
1135 compression: Optional[grpc.Compression] = None,
1136 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1137 state, operations, deadline, rendezvous = self._prepare(
1138 request, timeout, metadata, wait_for_ready, compression
1139 )
1140 if state is None:
1141 raise rendezvous # pylint: disable-msg=raising-bad-type
1142 else:
1143 state.rpc_start_time = time.perf_counter()
1144 state.method = _common.decode(self._method)
1145 state.target = _common.decode(self._target)
1146 call = self._channel.segregated_call(
1147 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1148 self._method,
1149 None,
1150 _determine_deadline(deadline),
1151 metadata,
1152 None if credentials is None else credentials._credentials,
1153 (
1154 (
1155 operations,
1156 None,
1157 ),
1158 ),
1159 self._context,
1160 self._registered_call_handle,
1161 )
1162 event = call.next_event()
1163 _handle_event(event, state, self._response_deserializer)
1164 return state, call
1166 def __call__(
1167 self,
1168 request: Any,
1169 timeout: Optional[float] = None,
1170 metadata: Optional[MetadataType] = None,
1171 credentials: Optional[grpc.CallCredentials] = None,
1172 wait_for_ready: Optional[bool] = None,
1173 compression: Optional[grpc.Compression] = None,
1174 ) -> Any:
1175 state, call = self._blocking(
1176 request, timeout, metadata, credentials, wait_for_ready, compression
1177 )
1178 return _end_unary_response_blocking(state, call, False, None)
1180 def with_call(
1181 self,
1182 request: Any,
1183 timeout: Optional[float] = None,
1184 metadata: Optional[MetadataType] = None,
1185 credentials: Optional[grpc.CallCredentials] = None,
1186 wait_for_ready: Optional[bool] = None,
1187 compression: Optional[grpc.Compression] = None,
1188 ) -> Tuple[Any, grpc.Call]:
1189 state, call = self._blocking(
1190 request, timeout, metadata, credentials, wait_for_ready, compression
1191 )
1192 return _end_unary_response_blocking(state, call, True, None)
1194 def future(
1195 self,
1196 request: Any,
1197 timeout: Optional[float] = None,
1198 metadata: Optional[MetadataType] = None,
1199 credentials: Optional[grpc.CallCredentials] = None,
1200 wait_for_ready: Optional[bool] = None,
1201 compression: Optional[grpc.Compression] = None,
1202 ) -> _MultiThreadedRendezvous:
1203 state, operations, deadline, rendezvous = self._prepare(
1204 request, timeout, metadata, wait_for_ready, compression
1205 )
1206 if state is None:
1207 raise rendezvous # pylint: disable-msg=raising-bad-type
1208 else:
1209 event_handler = _event_handler(state, self._response_deserializer)
1210 state.rpc_start_time = time.perf_counter()
1211 state.method = _common.decode(self._method)
1212 state.target = _common.decode(self._target)
1213 call = self._managed_call(
1214 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1215 self._method,
1216 None,
1217 deadline,
1218 metadata,
1219 None if credentials is None else credentials._credentials,
1220 (operations,),
1221 event_handler,
1222 self._context,
1223 self._registered_call_handle,
1224 )
1225 return _MultiThreadedRendezvous(
1226 state, call, self._response_deserializer, deadline
1227 )
1230class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1231 _channel: cygrpc.Channel
1232 _method: bytes
1233 _target: bytes
1234 _request_serializer: Optional[SerializingFunction]
1235 _response_deserializer: Optional[DeserializingFunction]
1236 _context: Any
1237 _registered_call_handle: Optional[int]
1239 __slots__ = [
1240 "_channel",
1241 "_method",
1242 "_target",
1243 "_request_serializer",
1244 "_response_deserializer",
1245 "_context",
1246 ]
1248 # pylint: disable=too-many-arguments
1249 def __init__(
1250 self,
1251 channel: cygrpc.Channel,
1252 method: bytes,
1253 target: bytes,
1254 request_serializer: SerializingFunction,
1255 response_deserializer: DeserializingFunction,
1256 _registered_call_handle: Optional[int],
1257 ):
1258 self._channel = channel
1259 self._method = method
1260 self._target = target
1261 self._request_serializer = request_serializer
1262 self._response_deserializer = response_deserializer
1263 self._context = cygrpc.build_census_context()
1264 self._registered_call_handle = _registered_call_handle
1266 def __call__( # pylint: disable=too-many-locals
1267 self,
1268 request: Any,
1269 timeout: Optional[float] = None,
1270 metadata: Optional[MetadataType] = None,
1271 credentials: Optional[grpc.CallCredentials] = None,
1272 wait_for_ready: Optional[bool] = None,
1273 compression: Optional[grpc.Compression] = None,
1274 ) -> _SingleThreadedRendezvous:
1275 deadline = _deadline(timeout)
1276 serialized_request = _common.serialize(
1277 request, self._request_serializer
1278 )
1279 if serialized_request is None:
1280 state = _RPCState(
1281 (),
1282 (),
1283 (),
1284 grpc.StatusCode.INTERNAL,
1285 "Exception serializing request!",
1286 )
1287 raise _InactiveRpcError(state)
1289 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1290 call_credentials = (
1291 None if credentials is None else credentials._credentials
1292 )
1293 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1294 wait_for_ready
1295 )
1296 augmented_metadata = _compression.augment_metadata(
1297 metadata, compression
1298 )
1299 operations = (
1300 (
1301 cygrpc.SendInitialMetadataOperation(
1302 augmented_metadata, initial_metadata_flags
1303 ),
1304 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1305 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1306 ),
1307 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1308 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1309 )
1310 operations_and_tags = tuple((ops, None) for ops in operations)
1311 state.rpc_start_time = time.perf_counter()
1312 state.method = _common.decode(self._method)
1313 state.target = _common.decode(self._target)
1314 call = self._channel.segregated_call(
1315 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1316 self._method,
1317 None,
1318 _determine_deadline(deadline),
1319 metadata,
1320 call_credentials,
1321 operations_and_tags,
1322 self._context,
1323 self._registered_call_handle,
1324 )
1325 return _SingleThreadedRendezvous(
1326 state, call, self._response_deserializer, deadline
1327 )
1330class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1331 _channel: cygrpc.Channel
1332 _managed_call: IntegratedCallFactory
1333 _method: bytes
1334 _target: bytes
1335 _request_serializer: Optional[SerializingFunction]
1336 _response_deserializer: Optional[DeserializingFunction]
1337 _context: Any
1338 _registered_call_handle: Optional[int]
1340 __slots__ = [
1341 "_channel",
1342 "_managed_call",
1343 "_method",
1344 "_target",
1345 "_request_serializer",
1346 "_response_deserializer",
1347 "_context",
1348 ]
1350 # pylint: disable=too-many-arguments
1351 def __init__(
1352 self,
1353 channel: cygrpc.Channel,
1354 managed_call: IntegratedCallFactory,
1355 method: bytes,
1356 target: bytes,
1357 request_serializer: SerializingFunction,
1358 response_deserializer: DeserializingFunction,
1359 _registered_call_handle: Optional[int],
1360 ):
1361 self._channel = channel
1362 self._managed_call = managed_call
1363 self._method = method
1364 self._target = target
1365 self._request_serializer = request_serializer
1366 self._response_deserializer = response_deserializer
1367 self._context = cygrpc.build_census_context()
1368 self._registered_call_handle = _registered_call_handle
1370 def __call__( # pylint: disable=too-many-locals
1371 self,
1372 request: Any,
1373 timeout: Optional[float] = None,
1374 metadata: Optional[MetadataType] = None,
1375 credentials: Optional[grpc.CallCredentials] = None,
1376 wait_for_ready: Optional[bool] = None,
1377 compression: Optional[grpc.Compression] = None,
1378 ) -> _MultiThreadedRendezvous:
1379 deadline, serialized_request, rendezvous = _start_unary_request(
1380 request, timeout, self._request_serializer
1381 )
1382 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1383 wait_for_ready
1384 )
1385 if serialized_request is None:
1386 raise rendezvous # pylint: disable-msg=raising-bad-type
1387 else:
1388 augmented_metadata = _compression.augment_metadata(
1389 metadata, compression
1390 )
1391 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1392 operations = (
1393 (
1394 cygrpc.SendInitialMetadataOperation(
1395 augmented_metadata, initial_metadata_flags
1396 ),
1397 cygrpc.SendMessageOperation(
1398 serialized_request, _EMPTY_FLAGS
1399 ),
1400 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1401 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1402 ),
1403 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1404 )
1405 state.rpc_start_time = time.perf_counter()
1406 state.method = _common.decode(self._method)
1407 state.target = _common.decode(self._target)
1408 call = self._managed_call(
1409 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1410 self._method,
1411 None,
1412 _determine_deadline(deadline),
1413 metadata,
1414 None if credentials is None else credentials._credentials,
1415 operations,
1416 _event_handler(state, self._response_deserializer),
1417 self._context,
1418 self._registered_call_handle,
1419 )
1420 return _MultiThreadedRendezvous(
1421 state, call, self._response_deserializer, deadline
1422 )
1425class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1426 _channel: cygrpc.Channel
1427 _managed_call: IntegratedCallFactory
1428 _method: bytes
1429 _target: bytes
1430 _request_serializer: Optional[SerializingFunction]
1431 _response_deserializer: Optional[DeserializingFunction]
1432 _context: Any
1433 _registered_call_handle: Optional[int]
1435 __slots__ = [
1436 "_channel",
1437 "_managed_call",
1438 "_method",
1439 "_target",
1440 "_request_serializer",
1441 "_response_deserializer",
1442 "_context",
1443 ]
1445 # pylint: disable=too-many-arguments
1446 def __init__(
1447 self,
1448 channel: cygrpc.Channel,
1449 managed_call: IntegratedCallFactory,
1450 method: bytes,
1451 target: bytes,
1452 request_serializer: Optional[SerializingFunction],
1453 response_deserializer: Optional[DeserializingFunction],
1454 _registered_call_handle: Optional[int],
1455 ):
1456 self._channel = channel
1457 self._managed_call = managed_call
1458 self._method = method
1459 self._target = target
1460 self._request_serializer = request_serializer
1461 self._response_deserializer = response_deserializer
1462 self._context = cygrpc.build_census_context()
1463 self._registered_call_handle = _registered_call_handle
1465 def _blocking(
1466 self,
1467 request_iterator: Iterator,
1468 timeout: Optional[float],
1469 metadata: Optional[MetadataType],
1470 credentials: Optional[grpc.CallCredentials],
1471 wait_for_ready: Optional[bool],
1472 compression: Optional[grpc.Compression],
1473 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1474 deadline = _deadline(timeout)
1475 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1476 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1477 wait_for_ready
1478 )
1479 augmented_metadata = _compression.augment_metadata(
1480 metadata, compression
1481 )
1482 state.rpc_start_time = time.perf_counter()
1483 state.method = _common.decode(self._method)
1484 state.target = _common.decode(self._target)
1485 call = self._channel.segregated_call(
1486 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1487 self._method,
1488 None,
1489 _determine_deadline(deadline),
1490 augmented_metadata,
1491 None if credentials is None else credentials._credentials,
1492 _stream_unary_invocation_operations_and_tags(
1493 augmented_metadata, initial_metadata_flags
1494 ),
1495 self._context,
1496 self._registered_call_handle,
1497 )
1498 _consume_request_iterator(
1499 request_iterator, state, call, self._request_serializer, None
1500 )
1501 while True:
1502 event = call.next_event()
1503 with state.condition:
1504 _handle_event(event, state, self._response_deserializer)
1505 state.condition.notify_all()
1506 if not state.due:
1507 break
1508 return state, call
1510 def __call__(
1511 self,
1512 request_iterator: Iterator,
1513 timeout: Optional[float] = None,
1514 metadata: Optional[MetadataType] = None,
1515 credentials: Optional[grpc.CallCredentials] = None,
1516 wait_for_ready: Optional[bool] = None,
1517 compression: Optional[grpc.Compression] = None,
1518 ) -> Any:
1519 state, call = self._blocking(
1520 request_iterator,
1521 timeout,
1522 metadata,
1523 credentials,
1524 wait_for_ready,
1525 compression,
1526 )
1527 return _end_unary_response_blocking(state, call, False, None)
1529 def with_call(
1530 self,
1531 request_iterator: Iterator,
1532 timeout: Optional[float] = None,
1533 metadata: Optional[MetadataType] = None,
1534 credentials: Optional[grpc.CallCredentials] = None,
1535 wait_for_ready: Optional[bool] = None,
1536 compression: Optional[grpc.Compression] = None,
1537 ) -> Tuple[Any, grpc.Call]:
1538 state, call = self._blocking(
1539 request_iterator,
1540 timeout,
1541 metadata,
1542 credentials,
1543 wait_for_ready,
1544 compression,
1545 )
1546 return _end_unary_response_blocking(state, call, True, None)
1548 def future(
1549 self,
1550 request_iterator: Iterator,
1551 timeout: Optional[float] = None,
1552 metadata: Optional[MetadataType] = None,
1553 credentials: Optional[grpc.CallCredentials] = None,
1554 wait_for_ready: Optional[bool] = None,
1555 compression: Optional[grpc.Compression] = None,
1556 ) -> _MultiThreadedRendezvous:
1557 deadline = _deadline(timeout)
1558 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1559 event_handler = _event_handler(state, self._response_deserializer)
1560 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1561 wait_for_ready
1562 )
1563 augmented_metadata = _compression.augment_metadata(
1564 metadata, compression
1565 )
1566 state.rpc_start_time = time.perf_counter()
1567 state.method = _common.decode(self._method)
1568 state.target = _common.decode(self._target)
1569 call = self._managed_call(
1570 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1571 self._method,
1572 None,
1573 deadline,
1574 augmented_metadata,
1575 None if credentials is None else credentials._credentials,
1576 _stream_unary_invocation_operations(
1577 metadata, initial_metadata_flags
1578 ),
1579 event_handler,
1580 self._context,
1581 self._registered_call_handle,
1582 )
1583 _consume_request_iterator(
1584 request_iterator,
1585 state,
1586 call,
1587 self._request_serializer,
1588 event_handler,
1589 )
1590 return _MultiThreadedRendezvous(
1591 state, call, self._response_deserializer, deadline
1592 )
1595class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1596 _channel: cygrpc.Channel
1597 _managed_call: IntegratedCallFactory
1598 _method: bytes
1599 _target: bytes
1600 _request_serializer: Optional[SerializingFunction]
1601 _response_deserializer: Optional[DeserializingFunction]
1602 _context: Any
1603 _registered_call_handle: Optional[int]
1605 __slots__ = [
1606 "_channel",
1607 "_managed_call",
1608 "_method",
1609 "_target",
1610 "_request_serializer",
1611 "_response_deserializer",
1612 "_context",
1613 ]
1615 # pylint: disable=too-many-arguments
1616 def __init__(
1617 self,
1618 channel: cygrpc.Channel,
1619 managed_call: IntegratedCallFactory,
1620 method: bytes,
1621 target: bytes,
1622 request_serializer: Optional[SerializingFunction],
1623 response_deserializer: Optional[DeserializingFunction],
1624 _registered_call_handle: Optional[int],
1625 ):
1626 self._channel = channel
1627 self._managed_call = managed_call
1628 self._method = method
1629 self._target = target
1630 self._request_serializer = request_serializer
1631 self._response_deserializer = response_deserializer
1632 self._context = cygrpc.build_census_context()
1633 self._registered_call_handle = _registered_call_handle
1635 def __call__(
1636 self,
1637 request_iterator: Iterator,
1638 timeout: Optional[float] = None,
1639 metadata: Optional[MetadataType] = None,
1640 credentials: Optional[grpc.CallCredentials] = None,
1641 wait_for_ready: Optional[bool] = None,
1642 compression: Optional[grpc.Compression] = None,
1643 ) -> _MultiThreadedRendezvous:
1644 deadline = _deadline(timeout)
1645 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1646 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1647 wait_for_ready
1648 )
1649 augmented_metadata = _compression.augment_metadata(
1650 metadata, compression
1651 )
1652 operations = (
1653 (
1654 cygrpc.SendInitialMetadataOperation(
1655 augmented_metadata, initial_metadata_flags
1656 ),
1657 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1658 ),
1659 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1660 )
1661 event_handler = _event_handler(state, self._response_deserializer)
1662 state.rpc_start_time = time.perf_counter()
1663 state.method = _common.decode(self._method)
1664 state.target = _common.decode(self._target)
1665 call = self._managed_call(
1666 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1667 self._method,
1668 None,
1669 _determine_deadline(deadline),
1670 augmented_metadata,
1671 None if credentials is None else credentials._credentials,
1672 operations,
1673 event_handler,
1674 self._context,
1675 self._registered_call_handle,
1676 )
1677 _consume_request_iterator(
1678 request_iterator,
1679 state,
1680 call,
1681 self._request_serializer,
1682 event_handler,
1683 )
1684 return _MultiThreadedRendezvous(
1685 state, call, self._response_deserializer, deadline
1686 )
1689class _InitialMetadataFlags(int):
1690 """Stores immutable initial metadata flags"""
1692 def __new__(cls, value: int = _EMPTY_FLAGS):
1693 value &= cygrpc.InitialMetadataFlags.used_mask
1694 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1696 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1697 if wait_for_ready is not None:
1698 if wait_for_ready:
1699 return self.__class__(
1700 self
1701 | cygrpc.InitialMetadataFlags.wait_for_ready
1702 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1703 )
1704 elif not wait_for_ready:
1705 return self.__class__(
1706 self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1707 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1708 )
1709 return self
1712class _ChannelCallState(object):
1713 channel: cygrpc.Channel
1714 managed_calls: int
1715 threading: bool
1717 def __init__(self, channel: cygrpc.Channel):
1718 self.lock = threading.Lock()
1719 self.channel = channel
1720 self.managed_calls = 0
1721 self.threading = False
1723 def reset_postfork_child(self) -> None:
1724 self.managed_calls = 0
1726 def __del__(self):
1727 try:
1728 self.channel.close(
1729 cygrpc.StatusCode.cancelled, "Channel deallocated!"
1730 )
1731 except (TypeError, AttributeError):
1732 pass
1735def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1736 def channel_spin():
1737 while True:
1738 cygrpc.block_if_fork_in_progress(state)
1739 event = state.channel.next_call_event()
1740 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1741 continue
1742 call_completed = event.tag(event)
1743 if call_completed:
1744 with state.lock:
1745 state.managed_calls -= 1
1746 if state.managed_calls == 0:
1747 return
1749 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1750 channel_spin_thread.setDaemon(True)
1751 channel_spin_thread.start()
1754def _channel_managed_call_management(state: _ChannelCallState):
1755 # pylint: disable=too-many-arguments
1756 def create(
1757 flags: int,
1758 method: bytes,
1759 host: Optional[str],
1760 deadline: Optional[float],
1761 metadata: Optional[MetadataType],
1762 credentials: Optional[cygrpc.CallCredentials],
1763 operations: Sequence[Sequence[cygrpc.Operation]],
1764 event_handler: UserTag,
1765 context: Any,
1766 _registered_call_handle: Optional[int],
1767 ) -> cygrpc.IntegratedCall:
1768 """Creates a cygrpc.IntegratedCall.
1770 Args:
1771 flags: An integer bitfield of call flags.
1772 method: The RPC method.
1773 host: A host string for the created call.
1774 deadline: A float to be the deadline of the created call or None if
1775 the call is to have an infinite deadline.
1776 metadata: The metadata for the call or None.
1777 credentials: A cygrpc.CallCredentials or None.
1778 operations: A sequence of sequences of cygrpc.Operations to be
1779 started on the call.
1780 event_handler: A behavior to call to handle the events resultant from
1781 the operations on the call.
1782 context: Context object for distributed tracing.
1783 _registered_call_handle: An int representing the call handle of the
1784 method, or None if the method is not registered.
1785 Returns:
1786 A cygrpc.IntegratedCall with which to conduct an RPC.
1787 """
1788 operations_and_tags = tuple(
1789 (
1790 operation,
1791 event_handler,
1792 )
1793 for operation in operations
1794 )
1795 with state.lock:
1796 call = state.channel.integrated_call(
1797 flags,
1798 method,
1799 host,
1800 deadline,
1801 metadata,
1802 credentials,
1803 operations_and_tags,
1804 context,
1805 _registered_call_handle,
1806 )
1807 if state.managed_calls == 0:
1808 state.managed_calls = 1
1809 _run_channel_spin_thread(state)
1810 else:
1811 state.managed_calls += 1
1812 return call
1814 return create
1817class _ChannelConnectivityState(object):
1818 lock: threading.RLock
1819 channel: grpc.Channel
1820 polling: bool
1821 connectivity: grpc.ChannelConnectivity
1822 try_to_connect: bool
1823 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1824 callbacks_and_connectivities: List[
1825 Sequence[
1826 Union[
1827 Callable[[grpc.ChannelConnectivity], None],
1828 Optional[grpc.ChannelConnectivity],
1829 ]
1830 ]
1831 ]
1832 delivering: bool
1834 def __init__(self, channel: grpc.Channel):
1835 self.lock = threading.RLock()
1836 self.channel = channel
1837 self.polling = False
1838 self.connectivity = None
1839 self.try_to_connect = False
1840 self.callbacks_and_connectivities = []
1841 self.delivering = False
1843 def reset_postfork_child(self) -> None:
1844 self.polling = False
1845 self.connectivity = None
1846 self.try_to_connect = False
1847 self.callbacks_and_connectivities = []
1848 self.delivering = False
1851def _deliveries(
1852 state: _ChannelConnectivityState,
1853) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1854 callbacks_needing_update = []
1855 for callback_and_connectivity in state.callbacks_and_connectivities:
1856 callback, callback_connectivity = callback_and_connectivity
1857 if callback_connectivity is not state.connectivity:
1858 callbacks_needing_update.append(callback)
1859 callback_and_connectivity[1] = state.connectivity
1860 return callbacks_needing_update
1863def _deliver(
1864 state: _ChannelConnectivityState,
1865 initial_connectivity: grpc.ChannelConnectivity,
1866 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1867) -> None:
1868 connectivity = initial_connectivity
1869 callbacks = initial_callbacks
1870 while True:
1871 for callback in callbacks:
1872 cygrpc.block_if_fork_in_progress(state)
1873 try:
1874 callback(connectivity)
1875 except Exception: # pylint: disable=broad-except
1876 _LOGGER.exception(
1877 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1878 )
1879 with state.lock:
1880 callbacks = _deliveries(state)
1881 if callbacks:
1882 connectivity = state.connectivity
1883 else:
1884 state.delivering = False
1885 return
1888def _spawn_delivery(
1889 state: _ChannelConnectivityState,
1890 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1891) -> None:
1892 delivering_thread = cygrpc.ForkManagedThread(
1893 target=_deliver,
1894 args=(
1895 state,
1896 state.connectivity,
1897 callbacks,
1898 ),
1899 )
1900 delivering_thread.setDaemon(True)
1901 delivering_thread.start()
1902 state.delivering = True
1905# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1906def _poll_connectivity(
1907 state: _ChannelConnectivityState,
1908 channel: grpc.Channel,
1909 initial_try_to_connect: bool,
1910) -> None:
1911 try_to_connect = initial_try_to_connect
1912 connectivity = channel.check_connectivity_state(try_to_connect)
1913 with state.lock:
1914 state.connectivity = (
1915 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1916 connectivity
1917 ]
1918 )
1919 callbacks = tuple(
1920 callback for callback, _ in state.callbacks_and_connectivities
1921 )
1922 for callback_and_connectivity in state.callbacks_and_connectivities:
1923 callback_and_connectivity[1] = state.connectivity
1924 if callbacks:
1925 _spawn_delivery(state, callbacks)
1926 while True:
1927 event = channel.watch_connectivity_state(
1928 connectivity, time.time() + 0.2
1929 )
1930 cygrpc.block_if_fork_in_progress(state)
1931 with state.lock:
1932 if (
1933 not state.callbacks_and_connectivities
1934 and not state.try_to_connect
1935 ):
1936 state.polling = False
1937 state.connectivity = None
1938 break
1939 try_to_connect = state.try_to_connect
1940 state.try_to_connect = False
1941 if event.success or try_to_connect:
1942 connectivity = channel.check_connectivity_state(try_to_connect)
1943 with state.lock:
1944 state.connectivity = (
1945 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1946 connectivity
1947 ]
1948 )
1949 if not state.delivering:
1950 callbacks = _deliveries(state)
1951 if callbacks:
1952 _spawn_delivery(state, callbacks)
1955def _subscribe(
1956 state: _ChannelConnectivityState,
1957 callback: Callable[[grpc.ChannelConnectivity], None],
1958 try_to_connect: bool,
1959) -> None:
1960 with state.lock:
1961 if not state.callbacks_and_connectivities and not state.polling:
1962 polling_thread = cygrpc.ForkManagedThread(
1963 target=_poll_connectivity,
1964 args=(state, state.channel, bool(try_to_connect)),
1965 )
1966 polling_thread.setDaemon(True)
1967 polling_thread.start()
1968 state.polling = True
1969 state.callbacks_and_connectivities.append([callback, None])
1970 elif not state.delivering and state.connectivity is not None:
1971 _spawn_delivery(state, (callback,))
1972 state.try_to_connect |= bool(try_to_connect)
1973 state.callbacks_and_connectivities.append(
1974 [callback, state.connectivity]
1975 )
1976 else:
1977 state.try_to_connect |= bool(try_to_connect)
1978 state.callbacks_and_connectivities.append([callback, None])
1981def _unsubscribe(
1982 state: _ChannelConnectivityState,
1983 callback: Callable[[grpc.ChannelConnectivity], None],
1984) -> None:
1985 with state.lock:
1986 for index, (subscribed_callback, unused_connectivity) in enumerate(
1987 state.callbacks_and_connectivities
1988 ):
1989 if callback == subscribed_callback:
1990 state.callbacks_and_connectivities.pop(index)
1991 break
1994def _augment_options(
1995 base_options: Sequence[ChannelArgumentType],
1996 compression: Optional[grpc.Compression],
1997) -> Sequence[ChannelArgumentType]:
1998 compression_option = _compression.create_channel_option(compression)
1999 return (
2000 tuple(base_options)
2001 + compression_option
2002 + (
2003 (
2004 cygrpc.ChannelArgKey.primary_user_agent_string,
2005 _USER_AGENT,
2006 ),
2007 )
2008 )
2011def _separate_channel_options(
2012 options: Sequence[ChannelArgumentType],
2013) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
2014 """Separates core channel options from Python channel options."""
2015 core_options = []
2016 python_options = []
2017 for pair in options:
2018 if (
2019 pair[0]
2020 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2021 ):
2022 python_options.append(pair)
2023 else:
2024 core_options.append(pair)
2025 return python_options, core_options
2028class Channel(grpc.Channel):
2029 """A cygrpc.Channel-backed implementation of grpc.Channel."""
2031 _single_threaded_unary_stream: bool
2032 _channel: cygrpc.Channel
2033 _call_state: _ChannelCallState
2034 _connectivity_state: _ChannelConnectivityState
2035 _target: str
2036 _registered_call_handles: Dict[str, int]
2038 def __init__(
2039 self,
2040 target: str,
2041 options: Sequence[ChannelArgumentType],
2042 credentials: Optional[grpc.ChannelCredentials],
2043 compression: Optional[grpc.Compression],
2044 ):
2045 """Constructor.
2047 Args:
2048 target: The target to which to connect.
2049 options: Configuration options for the channel.
2050 credentials: A cygrpc.ChannelCredentials or None.
2051 compression: An optional value indicating the compression method to be
2052 used over the lifetime of the channel.
2053 """
2054 python_options, core_options = _separate_channel_options(options)
2055 self._single_threaded_unary_stream = (
2056 _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2057 )
2058 self._process_python_options(python_options)
2059 self._channel = cygrpc.Channel(
2060 _common.encode(target),
2061 _augment_options(core_options, compression),
2062 credentials,
2063 )
2064 self._target = target
2065 self._call_state = _ChannelCallState(self._channel)
2066 self._connectivity_state = _ChannelConnectivityState(self._channel)
2067 cygrpc.fork_register_channel(self)
2068 if cygrpc.g_gevent_activated:
2069 cygrpc.gevent_increment_channel_count()
2071 def _get_registered_call_handle(self, method: str) -> int:
2072 """
2073 Get the registered call handle for a method.
2075 This is a semi-private method. It is intended for use only by gRPC generated code.
2077 This method is not thread-safe.
2079 Args:
2080 method: Required, the method name for the RPC.
2082 Returns:
2083 The registered call handle pointer in the form of a Python Long.
2084 """
2085 return self._channel.get_registered_call_handle(_common.encode(method))
2087 def _process_python_options(
2088 self, python_options: Sequence[ChannelArgumentType]
2089 ) -> None:
2090 """Sets channel attributes according to python-only channel options."""
2091 for pair in python_options:
2092 if (
2093 pair[0]
2094 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2095 ):
2096 self._single_threaded_unary_stream = True
2098 def subscribe(
2099 self,
2100 callback: Callable[[grpc.ChannelConnectivity], None],
2101 try_to_connect: Optional[bool] = None,
2102 ) -> None:
2103 _subscribe(self._connectivity_state, callback, try_to_connect)
2105 def unsubscribe(
2106 self, callback: Callable[[grpc.ChannelConnectivity], None]
2107 ) -> None:
2108 _unsubscribe(self._connectivity_state, callback)
2110 # pylint: disable=arguments-differ
2111 def unary_unary(
2112 self,
2113 method: str,
2114 request_serializer: Optional[SerializingFunction] = None,
2115 response_deserializer: Optional[DeserializingFunction] = None,
2116 _registered_method: Optional[bool] = False,
2117 ) -> grpc.UnaryUnaryMultiCallable:
2118 _registered_call_handle = None
2119 if _registered_method:
2120 _registered_call_handle = self._get_registered_call_handle(method)
2121 return _UnaryUnaryMultiCallable(
2122 self._channel,
2123 _channel_managed_call_management(self._call_state),
2124 _common.encode(method),
2125 _common.encode(self._target),
2126 request_serializer,
2127 response_deserializer,
2128 _registered_call_handle,
2129 )
2131 # pylint: disable=arguments-differ
2132 def unary_stream(
2133 self,
2134 method: str,
2135 request_serializer: Optional[SerializingFunction] = None,
2136 response_deserializer: Optional[DeserializingFunction] = None,
2137 _registered_method: Optional[bool] = False,
2138 ) -> grpc.UnaryStreamMultiCallable:
2139 _registered_call_handle = None
2140 if _registered_method:
2141 _registered_call_handle = self._get_registered_call_handle(method)
2142 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2143 # on a single Python thread results in an appreciable speed-up. However,
2144 # due to slight differences in capability, the multi-threaded variant
2145 # remains the default.
2146 if self._single_threaded_unary_stream:
2147 return _SingleThreadedUnaryStreamMultiCallable(
2148 self._channel,
2149 _common.encode(method),
2150 _common.encode(self._target),
2151 request_serializer,
2152 response_deserializer,
2153 _registered_call_handle,
2154 )
2155 else:
2156 return _UnaryStreamMultiCallable(
2157 self._channel,
2158 _channel_managed_call_management(self._call_state),
2159 _common.encode(method),
2160 _common.encode(self._target),
2161 request_serializer,
2162 response_deserializer,
2163 _registered_call_handle,
2164 )
2166 # pylint: disable=arguments-differ
2167 def stream_unary(
2168 self,
2169 method: str,
2170 request_serializer: Optional[SerializingFunction] = None,
2171 response_deserializer: Optional[DeserializingFunction] = None,
2172 _registered_method: Optional[bool] = False,
2173 ) -> grpc.StreamUnaryMultiCallable:
2174 _registered_call_handle = None
2175 if _registered_method:
2176 _registered_call_handle = self._get_registered_call_handle(method)
2177 return _StreamUnaryMultiCallable(
2178 self._channel,
2179 _channel_managed_call_management(self._call_state),
2180 _common.encode(method),
2181 _common.encode(self._target),
2182 request_serializer,
2183 response_deserializer,
2184 _registered_call_handle,
2185 )
2187 # pylint: disable=arguments-differ
2188 def stream_stream(
2189 self,
2190 method: str,
2191 request_serializer: Optional[SerializingFunction] = None,
2192 response_deserializer: Optional[DeserializingFunction] = None,
2193 _registered_method: Optional[bool] = False,
2194 ) -> grpc.StreamStreamMultiCallable:
2195 _registered_call_handle = None
2196 if _registered_method:
2197 _registered_call_handle = self._get_registered_call_handle(method)
2198 return _StreamStreamMultiCallable(
2199 self._channel,
2200 _channel_managed_call_management(self._call_state),
2201 _common.encode(method),
2202 _common.encode(self._target),
2203 request_serializer,
2204 response_deserializer,
2205 _registered_call_handle,
2206 )
2208 def _unsubscribe_all(self) -> None:
2209 state = self._connectivity_state
2210 if state:
2211 with state.lock:
2212 del state.callbacks_and_connectivities[:]
2214 def _close(self) -> None:
2215 self._unsubscribe_all()
2216 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2217 cygrpc.fork_unregister_channel(self)
2218 if cygrpc.g_gevent_activated:
2219 cygrpc.gevent_decrement_channel_count()
2221 def _close_on_fork(self) -> None:
2222 self._unsubscribe_all()
2223 self._channel.close_on_fork(
2224 cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2225 )
2227 def __enter__(self):
2228 return self
2230 def __exit__(self, exc_type, exc_val, exc_tb):
2231 self._close()
2232 return False
2234 def close(self) -> None:
2235 self._close()
2237 def __del__(self):
2238 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2239 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2240 # here (or more likely, call self._close() here). We don't do this today
2241 # because many valid use cases today allow the channel to be deleted
2242 # immediately after stubs are created. After a sufficient period of time
2243 # has passed for all users to be trusted to freeze out to their channels
2244 # for as long as they are in use and to close them after using them,
2245 # then deletion of this grpc._channel.Channel instance can be made to
2246 # effect closure of the underlying cygrpc.Channel instance.
2247 try:
2248 self._unsubscribe_all()
2249 except: # pylint: disable=bare-except
2250 # Exceptions in __del__ are ignored by Python anyway, but they can
2251 # keep spamming logs. Just silence them.
2252 pass