Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 20%
822 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:30 +0000
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
24import grpc
25from grpc import _common
26from grpc import _compression
27from grpc import _grpcio_metadata
28from grpc._cython import cygrpc
29import grpc.experimental
31_LOGGER = logging.getLogger(__name__)
33_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
35_EMPTY_FLAGS = 0
37# NOTE(rbellevi): No guarantees are given about the maintenance of this
38# environment variable.
39_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
40 "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
42_UNARY_UNARY_INITIAL_DUE = (
43 cygrpc.OperationType.send_initial_metadata,
44 cygrpc.OperationType.send_message,
45 cygrpc.OperationType.send_close_from_client,
46 cygrpc.OperationType.receive_initial_metadata,
47 cygrpc.OperationType.receive_message,
48 cygrpc.OperationType.receive_status_on_client,
49)
50_UNARY_STREAM_INITIAL_DUE = (
51 cygrpc.OperationType.send_initial_metadata,
52 cygrpc.OperationType.send_message,
53 cygrpc.OperationType.send_close_from_client,
54 cygrpc.OperationType.receive_initial_metadata,
55 cygrpc.OperationType.receive_status_on_client,
56)
57_STREAM_UNARY_INITIAL_DUE = (
58 cygrpc.OperationType.send_initial_metadata,
59 cygrpc.OperationType.receive_initial_metadata,
60 cygrpc.OperationType.receive_message,
61 cygrpc.OperationType.receive_status_on_client,
62)
63_STREAM_STREAM_INITIAL_DUE = (
64 cygrpc.OperationType.send_initial_metadata,
65 cygrpc.OperationType.receive_initial_metadata,
66 cygrpc.OperationType.receive_status_on_client,
67)
69_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
70 'Exception calling channel subscription callback!')
72_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
73 '\tstatus = {}\n'
74 '\tdetails = "{}"\n'
75 '>')
77_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
78 '\tstatus = {}\n'
79 '\tdetails = "{}"\n'
80 '\tdebug_error_string = "{}"\n'
81 '>')
84def _deadline(timeout):
85 return None if timeout is None else time.time() + timeout
88def _unknown_code_details(unknown_cygrpc_code, details):
89 return 'Server sent unknown code {} and details "{}"'.format(
90 unknown_cygrpc_code, details)
93class _RPCState(object):
95 def __init__(self, due, initial_metadata, trailing_metadata, code, details):
96 # `condition` guards all members of _RPCState. `notify_all` is called on
97 # `condition` when the state of the RPC has changed.
98 self.condition = threading.Condition()
100 # The cygrpc.OperationType objects representing events due from the RPC's
101 # completion queue. If an operation is in `due`, it is guaranteed that
102 # `operate()` has been called on a corresponding operation. But the
103 # converse is not true. That is, in the case of failed `operate()`
104 # calls, there may briefly be events in `due` that do not correspond to
105 # operations submitted to Core.
106 self.due = set(due)
107 self.initial_metadata = initial_metadata
108 self.response = None
109 self.trailing_metadata = trailing_metadata
110 self.code = code
111 self.details = details
112 self.debug_error_string = None
114 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
115 # slightly wonky, so they have to be tracked separately from the rest of the
116 # result of the RPC. This field tracks whether cancellation was requested
117 # prior to termination of the RPC.
118 self.cancelled = False
119 self.callbacks = []
120 self.fork_epoch = cygrpc.get_fork_epoch()
122 def reset_postfork_child(self):
123 self.condition = threading.Condition()
126def _abort(state, code, details):
127 if state.code is None:
128 state.code = code
129 state.details = details
130 if state.initial_metadata is None:
131 state.initial_metadata = ()
132 state.trailing_metadata = ()
135def _handle_event(event, state, response_deserializer):
136 callbacks = []
137 for batch_operation in event.batch_operations:
138 operation_type = batch_operation.type()
139 state.due.remove(operation_type)
140 if operation_type == cygrpc.OperationType.receive_initial_metadata:
141 state.initial_metadata = batch_operation.initial_metadata()
142 elif operation_type == cygrpc.OperationType.receive_message:
143 serialized_response = batch_operation.message()
144 if serialized_response is not None:
145 response = _common.deserialize(serialized_response,
146 response_deserializer)
147 if response is None:
148 details = 'Exception deserializing response!'
149 _abort(state, grpc.StatusCode.INTERNAL, details)
150 else:
151 state.response = response
152 elif operation_type == cygrpc.OperationType.receive_status_on_client:
153 state.trailing_metadata = batch_operation.trailing_metadata()
154 if state.code is None:
155 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
156 batch_operation.code())
157 if code is None:
158 state.code = grpc.StatusCode.UNKNOWN
159 state.details = _unknown_code_details(
160 code, batch_operation.details())
161 else:
162 state.code = code
163 state.details = batch_operation.details()
164 state.debug_error_string = batch_operation.error_string()
165 callbacks.extend(state.callbacks)
166 state.callbacks = None
167 return callbacks
170def _event_handler(state, response_deserializer):
172 def handle_event(event):
173 with state.condition:
174 callbacks = _handle_event(event, state, response_deserializer)
175 state.condition.notify_all()
176 done = not state.due
177 for callback in callbacks:
178 try:
179 callback()
180 except Exception as e: # pylint: disable=broad-except
181 # NOTE(rbellevi): We suppress but log errors here so as not to
182 # kill the channel spin thread.
183 logging.error('Exception in callback %s: %s',
184 repr(callback.func), repr(e))
185 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
187 return handle_event
190#pylint: disable=too-many-statements
191def _consume_request_iterator(request_iterator, state, call, request_serializer,
192 event_handler):
193 """Consume a request iterator supplied by the user."""
195 def consume_request_iterator(): # pylint: disable=too-many-branches
196 # Iterate over the request iterator until it is exhausted or an error
197 # condition is encountered.
198 while True:
199 return_from_user_request_generator_invoked = False
200 try:
201 # The thread may die in user-code. Do not block fork for this.
202 cygrpc.enter_user_request_generator()
203 request = next(request_iterator)
204 except StopIteration:
205 break
206 except Exception: # pylint: disable=broad-except
207 cygrpc.return_from_user_request_generator()
208 return_from_user_request_generator_invoked = True
209 code = grpc.StatusCode.UNKNOWN
210 details = 'Exception iterating requests!'
211 _LOGGER.exception(details)
212 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
213 details)
214 _abort(state, code, details)
215 return
216 finally:
217 if not return_from_user_request_generator_invoked:
218 cygrpc.return_from_user_request_generator()
219 serialized_request = _common.serialize(request, request_serializer)
220 with state.condition:
221 if state.code is None and not state.cancelled:
222 if serialized_request is None:
223 code = grpc.StatusCode.INTERNAL
224 details = 'Exception serializing request!'
225 call.cancel(
226 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
227 details)
228 _abort(state, code, details)
229 return
230 else:
231 state.due.add(cygrpc.OperationType.send_message)
232 operations = (cygrpc.SendMessageOperation(
233 serialized_request, _EMPTY_FLAGS),)
234 operating = call.operate(operations, event_handler)
235 if not operating:
236 state.due.remove(cygrpc.OperationType.send_message)
237 return
239 def _done():
240 return (state.code is not None or
241 cygrpc.OperationType.send_message
242 not in state.due)
244 _common.wait(state.condition.wait,
245 _done,
246 spin_cb=functools.partial(
247 cygrpc.block_if_fork_in_progress,
248 state))
249 if state.code is not None:
250 return
251 else:
252 return
253 with state.condition:
254 if state.code is None:
255 state.due.add(cygrpc.OperationType.send_close_from_client)
256 operations = (
257 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
258 operating = call.operate(operations, event_handler)
259 if not operating:
260 state.due.remove(
261 cygrpc.OperationType.send_close_from_client)
263 consumption_thread = cygrpc.ForkManagedThread(
264 target=consume_request_iterator)
265 consumption_thread.setDaemon(True)
266 consumption_thread.start()
269def _rpc_state_string(class_name, rpc_state):
270 """Calculates error string for RPC."""
271 with rpc_state.condition:
272 if rpc_state.code is None:
273 return '<{} object>'.format(class_name)
274 elif rpc_state.code is grpc.StatusCode.OK:
275 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
276 rpc_state.details)
277 else:
278 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
279 class_name, rpc_state.code, rpc_state.details,
280 rpc_state.debug_error_string)
283class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
284 """An RPC error not tied to the execution of a particular RPC.
286 The RPC represented by the state object must not be in-progress or
287 cancelled.
289 Attributes:
290 _state: An instance of _RPCState.
291 """
293 def __init__(self, state):
294 with state.condition:
295 self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
296 copy.deepcopy(state.trailing_metadata),
297 state.code, copy.deepcopy(state.details))
298 self._state.response = copy.copy(state.response)
299 self._state.debug_error_string = copy.copy(state.debug_error_string)
301 def initial_metadata(self):
302 return self._state.initial_metadata
304 def trailing_metadata(self):
305 return self._state.trailing_metadata
307 def code(self):
308 return self._state.code
310 def details(self):
311 return _common.decode(self._state.details)
313 def debug_error_string(self):
314 return _common.decode(self._state.debug_error_string)
316 def _repr(self):
317 return _rpc_state_string(self.__class__.__name__, self._state)
319 def __repr__(self):
320 return self._repr()
322 def __str__(self):
323 return self._repr()
325 def cancel(self):
326 """See grpc.Future.cancel."""
327 return False
329 def cancelled(self):
330 """See grpc.Future.cancelled."""
331 return False
333 def running(self):
334 """See grpc.Future.running."""
335 return False
337 def done(self):
338 """See grpc.Future.done."""
339 return True
341 def result(self, timeout=None): # pylint: disable=unused-argument
342 """See grpc.Future.result."""
343 raise self
345 def exception(self, timeout=None): # pylint: disable=unused-argument
346 """See grpc.Future.exception."""
347 return self
349 def traceback(self, timeout=None): # pylint: disable=unused-argument
350 """See grpc.Future.traceback."""
351 try:
352 raise self
353 except grpc.RpcError:
354 return sys.exc_info()[2]
356 def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument
357 """See grpc.Future.add_done_callback."""
358 fn(self)
361class _Rendezvous(grpc.RpcError, grpc.RpcContext):
362 """An RPC iterator.
364 Attributes:
365 _state: An instance of _RPCState.
366 _call: An instance of SegregatedCall or IntegratedCall.
367 In either case, the _call object is expected to have operate, cancel,
368 and next_event methods.
369 _response_deserializer: A callable taking bytes and return a Python
370 object.
371 _deadline: A float representing the deadline of the RPC in seconds. Or
372 possibly None, to represent an RPC with no deadline at all.
373 """
375 def __init__(self, state, call, response_deserializer, deadline):
376 super(_Rendezvous, self).__init__()
377 self._state = state
378 self._call = call
379 self._response_deserializer = response_deserializer
380 self._deadline = deadline
382 def is_active(self):
383 """See grpc.RpcContext.is_active"""
384 with self._state.condition:
385 return self._state.code is None
387 def time_remaining(self):
388 """See grpc.RpcContext.time_remaining"""
389 with self._state.condition:
390 if self._deadline is None:
391 return None
392 else:
393 return max(self._deadline - time.time(), 0)
395 def cancel(self):
396 """See grpc.RpcContext.cancel"""
397 with self._state.condition:
398 if self._state.code is None:
399 code = grpc.StatusCode.CANCELLED
400 details = 'Locally cancelled by application!'
401 self._call.cancel(
402 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
403 self._state.cancelled = True
404 _abort(self._state, code, details)
405 self._state.condition.notify_all()
406 return True
407 else:
408 return False
410 def add_callback(self, callback):
411 """See grpc.RpcContext.add_callback"""
412 with self._state.condition:
413 if self._state.callbacks is None:
414 return False
415 else:
416 self._state.callbacks.append(callback)
417 return True
419 def __iter__(self):
420 return self
422 def next(self):
423 return self._next()
425 def __next__(self):
426 return self._next()
428 def _next(self):
429 raise NotImplementedError()
431 def debug_error_string(self):
432 raise NotImplementedError()
434 def _repr(self):
435 return _rpc_state_string(self.__class__.__name__, self._state)
437 def __repr__(self):
438 return self._repr()
440 def __str__(self):
441 return self._repr()
443 def __del__(self):
444 with self._state.condition:
445 if self._state.code is None:
446 self._state.code = grpc.StatusCode.CANCELLED
447 self._state.details = 'Cancelled upon garbage collection!'
448 self._state.cancelled = True
449 self._call.cancel(
450 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
451 self._state.details)
452 self._state.condition.notify_all()
455class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
456 """An RPC iterator operating entirely on a single thread.
458 The __next__ method of _SingleThreadedRendezvous does not depend on the
459 existence of any other thread, including the "channel spin thread".
460 However, this means that its interface is entirely synchronous. So this
461 class cannot completely fulfill the grpc.Future interface. The result,
462 exception, and traceback methods will never block and will instead raise
463 an exception if calling the method would result in blocking.
465 This means that these methods are safe to call from add_done_callback
466 handlers.
467 """
469 def _is_complete(self):
470 return self._state.code is not None
472 def cancelled(self):
473 with self._state.condition:
474 return self._state.cancelled
476 def running(self):
477 with self._state.condition:
478 return self._state.code is None
480 def done(self):
481 with self._state.condition:
482 return self._state.code is not None
484 def result(self, timeout=None):
485 """Returns the result of the computation or raises its exception.
487 This method will never block. Instead, it will raise an exception
488 if calling this method would otherwise result in blocking.
490 Since this method will never block, any `timeout` argument passed will
491 be ignored.
492 """
493 del timeout
494 with self._state.condition:
495 if not self._is_complete():
496 raise grpc.experimental.UsageError(
497 "_SingleThreadedRendezvous only supports result() when the RPC is complete."
498 )
499 if self._state.code is grpc.StatusCode.OK:
500 return self._state.response
501 elif self._state.cancelled:
502 raise grpc.FutureCancelledError()
503 else:
504 raise self
506 def exception(self, timeout=None):
507 """Return the exception raised by the computation.
509 This method will never block. Instead, it will raise an exception
510 if calling this method would otherwise result in blocking.
512 Since this method will never block, any `timeout` argument passed will
513 be ignored.
514 """
515 del timeout
516 with self._state.condition:
517 if not self._is_complete():
518 raise grpc.experimental.UsageError(
519 "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
520 )
521 if self._state.code is grpc.StatusCode.OK:
522 return None
523 elif self._state.cancelled:
524 raise grpc.FutureCancelledError()
525 else:
526 return self
528 def traceback(self, timeout=None):
529 """Access the traceback of the exception raised by the computation.
531 This method will never block. Instead, it will raise an exception
532 if calling this method would otherwise result in blocking.
534 Since this method will never block, any `timeout` argument passed will
535 be ignored.
536 """
537 del timeout
538 with self._state.condition:
539 if not self._is_complete():
540 raise grpc.experimental.UsageError(
541 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
542 )
543 if self._state.code is grpc.StatusCode.OK:
544 return None
545 elif self._state.cancelled:
546 raise grpc.FutureCancelledError()
547 else:
548 try:
549 raise self
550 except grpc.RpcError:
551 return sys.exc_info()[2]
553 def add_done_callback(self, fn):
554 with self._state.condition:
555 if self._state.code is None:
556 self._state.callbacks.append(functools.partial(fn, self))
557 return
559 fn(self)
561 def initial_metadata(self):
562 """See grpc.Call.initial_metadata"""
563 with self._state.condition:
564 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
565 # to receive initial metadata before any messages.
566 while self._state.initial_metadata is None:
567 self._consume_next_event()
568 return self._state.initial_metadata
570 def trailing_metadata(self):
571 """See grpc.Call.trailing_metadata"""
572 with self._state.condition:
573 if self._state.trailing_metadata is None:
574 raise grpc.experimental.UsageError(
575 "Cannot get trailing metadata until RPC is completed.")
576 return self._state.trailing_metadata
578 def code(self):
579 """See grpc.Call.code"""
580 with self._state.condition:
581 if self._state.code is None:
582 raise grpc.experimental.UsageError(
583 "Cannot get code until RPC is completed.")
584 return self._state.code
586 def details(self):
587 """See grpc.Call.details"""
588 with self._state.condition:
589 if self._state.details is None:
590 raise grpc.experimental.UsageError(
591 "Cannot get details until RPC is completed.")
592 return _common.decode(self._state.details)
594 def _consume_next_event(self):
595 event = self._call.next_event()
596 with self._state.condition:
597 callbacks = _handle_event(event, self._state,
598 self._response_deserializer)
599 for callback in callbacks:
600 # NOTE(gnossen): We intentionally allow exceptions to bubble up
601 # to the user when running on a single thread.
602 callback()
603 return event
605 def _next_response(self):
606 while True:
607 self._consume_next_event()
608 with self._state.condition:
609 if self._state.response is not None:
610 response = self._state.response
611 self._state.response = None
612 return response
613 elif cygrpc.OperationType.receive_message not in self._state.due:
614 if self._state.code is grpc.StatusCode.OK:
615 raise StopIteration()
616 elif self._state.code is not None:
617 raise self
619 def _next(self):
620 with self._state.condition:
621 if self._state.code is None:
622 # We tentatively add the operation as expected and remove
623 # it if the enqueue operation fails. This allows us to guarantee that
624 # if an event has been submitted to the core completion queue,
625 # it is in `due`. If we waited until after a successful
626 # enqueue operation then a signal could interrupt this
627 # thread between the enqueue operation and the addition of the
628 # operation to `due`. This would cause an exception on the
629 # channel spin thread when the operation completes and no
630 # corresponding operation would be present in state.due.
631 # Note that, since `condition` is held through this block, there is
632 # no data race on `due`.
633 self._state.due.add(cygrpc.OperationType.receive_message)
634 operating = self._call.operate(
635 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
636 if not operating:
637 self._state.due.remove(cygrpc.OperationType.receive_message)
638 elif self._state.code is grpc.StatusCode.OK:
639 raise StopIteration()
640 else:
641 raise self
642 return self._next_response()
644 def debug_error_string(self):
645 with self._state.condition:
646 if self._state.debug_error_string is None:
647 raise grpc.experimental.UsageError(
648 "Cannot get debug error string until RPC is completed.")
649 return _common.decode(self._state.debug_error_string)
652class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
653 """An RPC iterator that depends on a channel spin thread.
655 This iterator relies upon a per-channel thread running in the background,
656 dequeueing events from the completion queue, and notifying threads waiting
657 on the threading.Condition object in the _RPCState object.
659 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
660 and to mediate a bidirection streaming RPC.
661 """
663 def initial_metadata(self):
664 """See grpc.Call.initial_metadata"""
665 with self._state.condition:
667 def _done():
668 return self._state.initial_metadata is not None
670 _common.wait(self._state.condition.wait, _done)
671 return self._state.initial_metadata
673 def trailing_metadata(self):
674 """See grpc.Call.trailing_metadata"""
675 with self._state.condition:
677 def _done():
678 return self._state.trailing_metadata is not None
680 _common.wait(self._state.condition.wait, _done)
681 return self._state.trailing_metadata
683 def code(self):
684 """See grpc.Call.code"""
685 with self._state.condition:
687 def _done():
688 return self._state.code is not None
690 _common.wait(self._state.condition.wait, _done)
691 return self._state.code
693 def details(self):
694 """See grpc.Call.details"""
695 with self._state.condition:
697 def _done():
698 return self._state.details is not None
700 _common.wait(self._state.condition.wait, _done)
701 return _common.decode(self._state.details)
703 def debug_error_string(self):
704 with self._state.condition:
706 def _done():
707 return self._state.debug_error_string is not None
709 _common.wait(self._state.condition.wait, _done)
710 return _common.decode(self._state.debug_error_string)
712 def cancelled(self):
713 with self._state.condition:
714 return self._state.cancelled
716 def running(self):
717 with self._state.condition:
718 return self._state.code is None
720 def done(self):
721 with self._state.condition:
722 return self._state.code is not None
724 def _is_complete(self):
725 return self._state.code is not None
727 def result(self, timeout=None):
728 """Returns the result of the computation or raises its exception.
730 See grpc.Future.result for the full API contract.
731 """
732 with self._state.condition:
733 timed_out = _common.wait(self._state.condition.wait,
734 self._is_complete,
735 timeout=timeout)
736 if timed_out:
737 raise grpc.FutureTimeoutError()
738 else:
739 if self._state.code is grpc.StatusCode.OK:
740 return self._state.response
741 elif self._state.cancelled:
742 raise grpc.FutureCancelledError()
743 else:
744 raise self
746 def exception(self, timeout=None):
747 """Return the exception raised by the computation.
749 See grpc.Future.exception for the full API contract.
750 """
751 with self._state.condition:
752 timed_out = _common.wait(self._state.condition.wait,
753 self._is_complete,
754 timeout=timeout)
755 if timed_out:
756 raise grpc.FutureTimeoutError()
757 else:
758 if self._state.code is grpc.StatusCode.OK:
759 return None
760 elif self._state.cancelled:
761 raise grpc.FutureCancelledError()
762 else:
763 return self
765 def traceback(self, timeout=None):
766 """Access the traceback of the exception raised by the computation.
768 See grpc.future.traceback for the full API contract.
769 """
770 with self._state.condition:
771 timed_out = _common.wait(self._state.condition.wait,
772 self._is_complete,
773 timeout=timeout)
774 if timed_out:
775 raise grpc.FutureTimeoutError()
776 else:
777 if self._state.code is grpc.StatusCode.OK:
778 return None
779 elif self._state.cancelled:
780 raise grpc.FutureCancelledError()
781 else:
782 try:
783 raise self
784 except grpc.RpcError:
785 return sys.exc_info()[2]
787 def add_done_callback(self, fn):
788 with self._state.condition:
789 if self._state.code is None:
790 self._state.callbacks.append(functools.partial(fn, self))
791 return
793 fn(self)
795 def _next(self):
796 with self._state.condition:
797 if self._state.code is None:
798 event_handler = _event_handler(self._state,
799 self._response_deserializer)
800 self._state.due.add(cygrpc.OperationType.receive_message)
801 operating = self._call.operate(
802 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
803 event_handler)
804 if not operating:
805 self._state.due.remove(cygrpc.OperationType.receive_message)
806 elif self._state.code is grpc.StatusCode.OK:
807 raise StopIteration()
808 else:
809 raise self
811 def _response_ready():
812 return (self._state.response is not None or
813 (cygrpc.OperationType.receive_message
814 not in self._state.due and
815 self._state.code is not None))
817 _common.wait(self._state.condition.wait, _response_ready)
818 if self._state.response is not None:
819 response = self._state.response
820 self._state.response = None
821 return response
822 elif cygrpc.OperationType.receive_message not in self._state.due:
823 if self._state.code is grpc.StatusCode.OK:
824 raise StopIteration()
825 elif self._state.code is not None:
826 raise self
829def _start_unary_request(request, timeout, request_serializer):
830 deadline = _deadline(timeout)
831 serialized_request = _common.serialize(request, request_serializer)
832 if serialized_request is None:
833 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
834 'Exception serializing request!')
835 error = _InactiveRpcError(state)
836 return deadline, None, error
837 else:
838 return deadline, serialized_request, None
841def _end_unary_response_blocking(state, call, with_call, deadline):
842 if state.code is grpc.StatusCode.OK:
843 if with_call:
844 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
845 return state.response, rendezvous
846 else:
847 return state.response
848 else:
849 raise _InactiveRpcError(state)
852def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
853 return (
854 (
855 cygrpc.SendInitialMetadataOperation(metadata,
856 initial_metadata_flags),
857 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
858 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
859 ),
860 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
861 )
864def _stream_unary_invocation_operationses_and_tags(metadata,
865 initial_metadata_flags):
866 return tuple((
867 operations,
868 None,
869 ) for operations in _stream_unary_invocation_operationses(
870 metadata, initial_metadata_flags))
873def _determine_deadline(user_deadline):
874 parent_deadline = cygrpc.get_deadline_from_context()
875 if parent_deadline is None and user_deadline is None:
876 return None
877 elif parent_deadline is not None and user_deadline is None:
878 return parent_deadline
879 elif user_deadline is not None and parent_deadline is None:
880 return user_deadline
881 else:
882 return min(parent_deadline, user_deadline)
885class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
887 # pylint: disable=too-many-arguments
888 def __init__(self, channel, managed_call, method, request_serializer,
889 response_deserializer):
890 self._channel = channel
891 self._managed_call = managed_call
892 self._method = method
893 self._request_serializer = request_serializer
894 self._response_deserializer = response_deserializer
895 self._context = cygrpc.build_census_context()
897 def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
898 deadline, serialized_request, rendezvous = _start_unary_request(
899 request, timeout, self._request_serializer)
900 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
901 wait_for_ready)
902 augmented_metadata = _compression.augment_metadata(
903 metadata, compression)
904 if serialized_request is None:
905 return None, None, None, rendezvous
906 else:
907 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
908 operations = (
909 cygrpc.SendInitialMetadataOperation(augmented_metadata,
910 initial_metadata_flags),
911 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
912 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
913 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
914 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
915 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
916 )
917 return state, operations, deadline, None
919 def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
920 compression):
921 state, operations, deadline, rendezvous = self._prepare(
922 request, timeout, metadata, wait_for_ready, compression)
923 if state is None:
924 raise rendezvous # pylint: disable-msg=raising-bad-type
925 else:
926 call = self._channel.segregated_call(
927 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
928 self._method, None, _determine_deadline(deadline), metadata,
929 None if credentials is None else credentials._credentials, ((
930 operations,
931 None,
932 ),), self._context)
933 event = call.next_event()
934 _handle_event(event, state, self._response_deserializer)
935 return state, call
937 def __call__(self,
938 request,
939 timeout=None,
940 metadata=None,
941 credentials=None,
942 wait_for_ready=None,
943 compression=None):
944 state, call, = self._blocking(request, timeout, metadata, credentials,
945 wait_for_ready, compression)
946 return _end_unary_response_blocking(state, call, False, None)
948 def with_call(self,
949 request,
950 timeout=None,
951 metadata=None,
952 credentials=None,
953 wait_for_ready=None,
954 compression=None):
955 state, call, = self._blocking(request, timeout, metadata, credentials,
956 wait_for_ready, compression)
957 return _end_unary_response_blocking(state, call, True, None)
959 def future(self,
960 request,
961 timeout=None,
962 metadata=None,
963 credentials=None,
964 wait_for_ready=None,
965 compression=None):
966 state, operations, deadline, rendezvous = self._prepare(
967 request, timeout, metadata, wait_for_ready, compression)
968 if state is None:
969 raise rendezvous # pylint: disable-msg=raising-bad-type
970 else:
971 event_handler = _event_handler(state, self._response_deserializer)
972 call = self._managed_call(
973 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
974 self._method, None, deadline, metadata,
975 None if credentials is None else credentials._credentials,
976 (operations,), event_handler, self._context)
977 return _MultiThreadedRendezvous(state, call,
978 self._response_deserializer,
979 deadline)
982class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
984 # pylint: disable=too-many-arguments
985 def __init__(self, channel, method, request_serializer,
986 response_deserializer):
987 self._channel = channel
988 self._method = method
989 self._request_serializer = request_serializer
990 self._response_deserializer = response_deserializer
991 self._context = cygrpc.build_census_context()
993 def __call__( # pylint: disable=too-many-locals
994 self,
995 request,
996 timeout=None,
997 metadata=None,
998 credentials=None,
999 wait_for_ready=None,
1000 compression=None):
1001 deadline = _deadline(timeout)
1002 serialized_request = _common.serialize(request,
1003 self._request_serializer)
1004 if serialized_request is None:
1005 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
1006 'Exception serializing request!')
1007 raise _InactiveRpcError(state)
1009 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1010 call_credentials = None if credentials is None else credentials._credentials
1011 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1012 wait_for_ready)
1013 augmented_metadata = _compression.augment_metadata(
1014 metadata, compression)
1015 operations = (
1016 (cygrpc.SendInitialMetadataOperation(augmented_metadata,
1017 initial_metadata_flags),
1018 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1019 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
1020 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1021 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1022 )
1023 operations_and_tags = tuple((ops, None) for ops in operations)
1024 call = self._channel.segregated_call(
1025 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1026 None, _determine_deadline(deadline), metadata, call_credentials,
1027 operations_and_tags, self._context)
1028 return _SingleThreadedRendezvous(state, call,
1029 self._response_deserializer, deadline)
1032class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1034 # pylint: disable=too-many-arguments
1035 def __init__(self, channel, managed_call, method, request_serializer,
1036 response_deserializer):
1037 self._channel = channel
1038 self._managed_call = managed_call
1039 self._method = method
1040 self._request_serializer = request_serializer
1041 self._response_deserializer = response_deserializer
1042 self._context = cygrpc.build_census_context()
1044 def __call__( # pylint: disable=too-many-locals
1045 self,
1046 request,
1047 timeout=None,
1048 metadata=None,
1049 credentials=None,
1050 wait_for_ready=None,
1051 compression=None):
1052 deadline, serialized_request, rendezvous = _start_unary_request(
1053 request, timeout, self._request_serializer)
1054 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1055 wait_for_ready)
1056 if serialized_request is None:
1057 raise rendezvous # pylint: disable-msg=raising-bad-type
1058 else:
1059 augmented_metadata = _compression.augment_metadata(
1060 metadata, compression)
1061 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1062 operationses = (
1063 (
1064 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1065 initial_metadata_flags),
1066 cygrpc.SendMessageOperation(serialized_request,
1067 _EMPTY_FLAGS),
1068 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1069 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1070 ),
1071 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1072 )
1073 call = self._managed_call(
1074 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1075 self._method, None, _determine_deadline(deadline), metadata,
1076 None if credentials is None else credentials._credentials,
1077 operationses, _event_handler(state,
1078 self._response_deserializer),
1079 self._context)
1080 return _MultiThreadedRendezvous(state, call,
1081 self._response_deserializer,
1082 deadline)
1085class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1087 # pylint: disable=too-many-arguments
1088 def __init__(self, channel, managed_call, method, request_serializer,
1089 response_deserializer):
1090 self._channel = channel
1091 self._managed_call = managed_call
1092 self._method = method
1093 self._request_serializer = request_serializer
1094 self._response_deserializer = response_deserializer
1095 self._context = cygrpc.build_census_context()
1097 def _blocking(self, request_iterator, timeout, metadata, credentials,
1098 wait_for_ready, compression):
1099 deadline = _deadline(timeout)
1100 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1101 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1102 wait_for_ready)
1103 augmented_metadata = _compression.augment_metadata(
1104 metadata, compression)
1105 call = self._channel.segregated_call(
1106 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1107 None, _determine_deadline(deadline), augmented_metadata,
1108 None if credentials is None else credentials._credentials,
1109 _stream_unary_invocation_operationses_and_tags(
1110 augmented_metadata, initial_metadata_flags), self._context)
1111 _consume_request_iterator(request_iterator, state, call,
1112 self._request_serializer, None)
1113 while True:
1114 event = call.next_event()
1115 with state.condition:
1116 _handle_event(event, state, self._response_deserializer)
1117 state.condition.notify_all()
1118 if not state.due:
1119 break
1120 return state, call
1122 def __call__(self,
1123 request_iterator,
1124 timeout=None,
1125 metadata=None,
1126 credentials=None,
1127 wait_for_ready=None,
1128 compression=None):
1129 state, call, = self._blocking(request_iterator, timeout, metadata,
1130 credentials, wait_for_ready, compression)
1131 return _end_unary_response_blocking(state, call, False, None)
1133 def with_call(self,
1134 request_iterator,
1135 timeout=None,
1136 metadata=None,
1137 credentials=None,
1138 wait_for_ready=None,
1139 compression=None):
1140 state, call, = self._blocking(request_iterator, timeout, metadata,
1141 credentials, wait_for_ready, compression)
1142 return _end_unary_response_blocking(state, call, True, None)
1144 def future(self,
1145 request_iterator,
1146 timeout=None,
1147 metadata=None,
1148 credentials=None,
1149 wait_for_ready=None,
1150 compression=None):
1151 deadline = _deadline(timeout)
1152 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1153 event_handler = _event_handler(state, self._response_deserializer)
1154 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1155 wait_for_ready)
1156 augmented_metadata = _compression.augment_metadata(
1157 metadata, compression)
1158 call = self._managed_call(
1159 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1160 None, deadline, augmented_metadata,
1161 None if credentials is None else credentials._credentials,
1162 _stream_unary_invocation_operationses(metadata,
1163 initial_metadata_flags),
1164 event_handler, self._context)
1165 _consume_request_iterator(request_iterator, state, call,
1166 self._request_serializer, event_handler)
1167 return _MultiThreadedRendezvous(state, call,
1168 self._response_deserializer, deadline)
1171class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1173 # pylint: disable=too-many-arguments
1174 def __init__(self, channel, managed_call, method, request_serializer,
1175 response_deserializer):
1176 self._channel = channel
1177 self._managed_call = managed_call
1178 self._method = method
1179 self._request_serializer = request_serializer
1180 self._response_deserializer = response_deserializer
1181 self._context = cygrpc.build_census_context()
1183 def __call__(self,
1184 request_iterator,
1185 timeout=None,
1186 metadata=None,
1187 credentials=None,
1188 wait_for_ready=None,
1189 compression=None):
1190 deadline = _deadline(timeout)
1191 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1192 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1193 wait_for_ready)
1194 augmented_metadata = _compression.augment_metadata(
1195 metadata, compression)
1196 operationses = (
1197 (
1198 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1199 initial_metadata_flags),
1200 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1201 ),
1202 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1203 )
1204 event_handler = _event_handler(state, self._response_deserializer)
1205 call = self._managed_call(
1206 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1207 None, _determine_deadline(deadline), augmented_metadata,
1208 None if credentials is None else credentials._credentials,
1209 operationses, event_handler, self._context)
1210 _consume_request_iterator(request_iterator, state, call,
1211 self._request_serializer, event_handler)
1212 return _MultiThreadedRendezvous(state, call,
1213 self._response_deserializer, deadline)
1216class _InitialMetadataFlags(int):
1217 """Stores immutable initial metadata flags"""
1219 def __new__(cls, value=_EMPTY_FLAGS):
1220 value &= cygrpc.InitialMetadataFlags.used_mask
1221 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1223 def with_wait_for_ready(self, wait_for_ready):
1224 if wait_for_ready is not None:
1225 if wait_for_ready:
1226 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1227 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1228 elif not wait_for_ready:
1229 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1230 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1231 return self
1234class _ChannelCallState(object):
1236 def __init__(self, channel):
1237 self.lock = threading.Lock()
1238 self.channel = channel
1239 self.managed_calls = 0
1240 self.threading = False
1242 def reset_postfork_child(self):
1243 self.managed_calls = 0
1245 def __del__(self):
1246 try:
1247 self.channel.close(cygrpc.StatusCode.cancelled,
1248 'Channel deallocated!')
1249 except (TypeError, AttributeError):
1250 pass
1253def _run_channel_spin_thread(state):
1255 def channel_spin():
1256 while True:
1257 cygrpc.block_if_fork_in_progress(state)
1258 event = state.channel.next_call_event()
1259 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1260 continue
1261 call_completed = event.tag(event)
1262 if call_completed:
1263 with state.lock:
1264 state.managed_calls -= 1
1265 if state.managed_calls == 0:
1266 return
1268 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1269 channel_spin_thread.setDaemon(True)
1270 channel_spin_thread.start()
1273def _channel_managed_call_management(state):
1275 # pylint: disable=too-many-arguments
1276 def create(flags, method, host, deadline, metadata, credentials,
1277 operationses, event_handler, context):
1278 """Creates a cygrpc.IntegratedCall.
1280 Args:
1281 flags: An integer bitfield of call flags.
1282 method: The RPC method.
1283 host: A host string for the created call.
1284 deadline: A float to be the deadline of the created call or None if
1285 the call is to have an infinite deadline.
1286 metadata: The metadata for the call or None.
1287 credentials: A cygrpc.CallCredentials or None.
1288 operationses: An iterable of iterables of cygrpc.Operations to be
1289 started on the call.
1290 event_handler: A behavior to call to handle the events resultant from
1291 the operations on the call.
1292 context: Context object for distributed tracing.
1293 Returns:
1294 A cygrpc.IntegratedCall with which to conduct an RPC.
1295 """
1296 operationses_and_tags = tuple((
1297 operations,
1298 event_handler,
1299 ) for operations in operationses)
1300 with state.lock:
1301 call = state.channel.integrated_call(flags, method, host, deadline,
1302 metadata, credentials,
1303 operationses_and_tags, context)
1304 if state.managed_calls == 0:
1305 state.managed_calls = 1
1306 _run_channel_spin_thread(state)
1307 else:
1308 state.managed_calls += 1
1309 return call
1311 return create
1314class _ChannelConnectivityState(object):
1316 def __init__(self, channel):
1317 self.lock = threading.RLock()
1318 self.channel = channel
1319 self.polling = False
1320 self.connectivity = None
1321 self.try_to_connect = False
1322 self.callbacks_and_connectivities = []
1323 self.delivering = False
1325 def reset_postfork_child(self):
1326 self.polling = False
1327 self.connectivity = None
1328 self.try_to_connect = False
1329 self.callbacks_and_connectivities = []
1330 self.delivering = False
1333def _deliveries(state):
1334 callbacks_needing_update = []
1335 for callback_and_connectivity in state.callbacks_and_connectivities:
1336 callback, callback_connectivity, = callback_and_connectivity
1337 if callback_connectivity is not state.connectivity:
1338 callbacks_needing_update.append(callback)
1339 callback_and_connectivity[1] = state.connectivity
1340 return callbacks_needing_update
1343def _deliver(state, initial_connectivity, initial_callbacks):
1344 connectivity = initial_connectivity
1345 callbacks = initial_callbacks
1346 while True:
1347 for callback in callbacks:
1348 cygrpc.block_if_fork_in_progress(state)
1349 try:
1350 callback(connectivity)
1351 except Exception: # pylint: disable=broad-except
1352 _LOGGER.exception(
1353 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1354 with state.lock:
1355 callbacks = _deliveries(state)
1356 if callbacks:
1357 connectivity = state.connectivity
1358 else:
1359 state.delivering = False
1360 return
1363def _spawn_delivery(state, callbacks):
1364 delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1365 args=(
1366 state,
1367 state.connectivity,
1368 callbacks,
1369 ))
1370 delivering_thread.setDaemon(True)
1371 delivering_thread.start()
1372 state.delivering = True
1375# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1376def _poll_connectivity(state, channel, initial_try_to_connect):
1377 try_to_connect = initial_try_to_connect
1378 connectivity = channel.check_connectivity_state(try_to_connect)
1379 with state.lock:
1380 state.connectivity = (
1381 _common.
1382 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1383 callbacks = tuple(
1384 callback for callback, unused_but_known_to_be_none_connectivity in
1385 state.callbacks_and_connectivities)
1386 for callback_and_connectivity in state.callbacks_and_connectivities:
1387 callback_and_connectivity[1] = state.connectivity
1388 if callbacks:
1389 _spawn_delivery(state, callbacks)
1390 while True:
1391 event = channel.watch_connectivity_state(connectivity,
1392 time.time() + 0.2)
1393 cygrpc.block_if_fork_in_progress(state)
1394 with state.lock:
1395 if not state.callbacks_and_connectivities and not state.try_to_connect:
1396 state.polling = False
1397 state.connectivity = None
1398 break
1399 try_to_connect = state.try_to_connect
1400 state.try_to_connect = False
1401 if event.success or try_to_connect:
1402 connectivity = channel.check_connectivity_state(try_to_connect)
1403 with state.lock:
1404 state.connectivity = (
1405 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1406 connectivity])
1407 if not state.delivering:
1408 callbacks = _deliveries(state)
1409 if callbacks:
1410 _spawn_delivery(state, callbacks)
1413def _subscribe(state, callback, try_to_connect):
1414 with state.lock:
1415 if not state.callbacks_and_connectivities and not state.polling:
1416 polling_thread = cygrpc.ForkManagedThread(
1417 target=_poll_connectivity,
1418 args=(state, state.channel, bool(try_to_connect)))
1419 polling_thread.setDaemon(True)
1420 polling_thread.start()
1421 state.polling = True
1422 state.callbacks_and_connectivities.append([callback, None])
1423 elif not state.delivering and state.connectivity is not None:
1424 _spawn_delivery(state, (callback,))
1425 state.try_to_connect |= bool(try_to_connect)
1426 state.callbacks_and_connectivities.append(
1427 [callback, state.connectivity])
1428 else:
1429 state.try_to_connect |= bool(try_to_connect)
1430 state.callbacks_and_connectivities.append([callback, None])
1433def _unsubscribe(state, callback):
1434 with state.lock:
1435 for index, (subscribed_callback, unused_connectivity) in enumerate(
1436 state.callbacks_and_connectivities):
1437 if callback == subscribed_callback:
1438 state.callbacks_and_connectivities.pop(index)
1439 break
1442def _augment_options(base_options, compression):
1443 compression_option = _compression.create_channel_option(compression)
1444 return tuple(base_options) + compression_option + ((
1445 cygrpc.ChannelArgKey.primary_user_agent_string,
1446 _USER_AGENT,
1447 ),)
1450def _separate_channel_options(options):
1451 """Separates core channel options from Python channel options."""
1452 core_options = []
1453 python_options = []
1454 for pair in options:
1455 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1456 python_options.append(pair)
1457 else:
1458 core_options.append(pair)
1459 return python_options, core_options
1462class Channel(grpc.Channel):
1463 """A cygrpc.Channel-backed implementation of grpc.Channel."""
1465 def __init__(self, target, options, credentials, compression):
1466 """Constructor.
1468 Args:
1469 target: The target to which to connect.
1470 options: Configuration options for the channel.
1471 credentials: A cygrpc.ChannelCredentials or None.
1472 compression: An optional value indicating the compression method to be
1473 used over the lifetime of the channel.
1474 """
1475 python_options, core_options = _separate_channel_options(options)
1476 self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1477 self._process_python_options(python_options)
1478 self._channel = cygrpc.Channel(
1479 _common.encode(target), _augment_options(core_options, compression),
1480 credentials)
1481 self._call_state = _ChannelCallState(self._channel)
1482 self._connectivity_state = _ChannelConnectivityState(self._channel)
1483 cygrpc.fork_register_channel(self)
1484 if cygrpc.g_gevent_activated:
1485 cygrpc.gevent_increment_channel_count()
1487 def _process_python_options(self, python_options):
1488 """Sets channel attributes according to python-only channel options."""
1489 for pair in python_options:
1490 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1491 self._single_threaded_unary_stream = True
1493 def subscribe(self, callback, try_to_connect=None):
1494 _subscribe(self._connectivity_state, callback, try_to_connect)
1496 def unsubscribe(self, callback):
1497 _unsubscribe(self._connectivity_state, callback)
1499 def unary_unary(self,
1500 method,
1501 request_serializer=None,
1502 response_deserializer=None):
1503 return _UnaryUnaryMultiCallable(
1504 self._channel, _channel_managed_call_management(self._call_state),
1505 _common.encode(method), request_serializer, response_deserializer)
1507 def unary_stream(self,
1508 method,
1509 request_serializer=None,
1510 response_deserializer=None):
1511 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
1512 # on a single Python thread results in an appreciable speed-up. However,
1513 # due to slight differences in capability, the multi-threaded variant
1514 # remains the default.
1515 if self._single_threaded_unary_stream:
1516 return _SingleThreadedUnaryStreamMultiCallable(
1517 self._channel, _common.encode(method), request_serializer,
1518 response_deserializer)
1519 else:
1520 return _UnaryStreamMultiCallable(
1521 self._channel,
1522 _channel_managed_call_management(self._call_state),
1523 _common.encode(method), request_serializer,
1524 response_deserializer)
1526 def stream_unary(self,
1527 method,
1528 request_serializer=None,
1529 response_deserializer=None):
1530 return _StreamUnaryMultiCallable(
1531 self._channel, _channel_managed_call_management(self._call_state),
1532 _common.encode(method), request_serializer, response_deserializer)
1534 def stream_stream(self,
1535 method,
1536 request_serializer=None,
1537 response_deserializer=None):
1538 return _StreamStreamMultiCallable(
1539 self._channel, _channel_managed_call_management(self._call_state),
1540 _common.encode(method), request_serializer, response_deserializer)
1542 def _unsubscribe_all(self):
1543 state = self._connectivity_state
1544 if state:
1545 with state.lock:
1546 del state.callbacks_and_connectivities[:]
1548 def _close(self):
1549 self._unsubscribe_all()
1550 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1551 cygrpc.fork_unregister_channel(self)
1552 if cygrpc.g_gevent_activated:
1553 cygrpc.gevent_decrement_channel_count()
1555 def _close_on_fork(self):
1556 self._unsubscribe_all()
1557 self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
1558 'Channel closed due to fork')
1560 def __enter__(self):
1561 return self
1563 def __exit__(self, exc_type, exc_val, exc_tb):
1564 self._close()
1565 return False
1567 def close(self):
1568 self._close()
1570 def __del__(self):
1571 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
1572 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
1573 # here (or more likely, call self._close() here). We don't do this today
1574 # because many valid use cases today allow the channel to be deleted
1575 # immediately after stubs are created. After a sufficient period of time
1576 # has passed for all users to be trusted to freeze out to their channels
1577 # for as long as they are in use and to close them after using them,
1578 # then deletion of this grpc._channel.Channel instance can be made to
1579 # effect closure of the underlying cygrpc.Channel instance.
1580 try:
1581 self._unsubscribe_all()
1582 except: # pylint: disable=bare-except
1583 # Exceptions in __del__ are ignored by Python anyway, but they can
1584 # keep spamming logs. Just silence them.
1585 pass