Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_channel.py: 31%
893 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-06 06:03 +0000
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (Any, Callable, Iterator, List, Optional, Sequence, Set,
25 Tuple, Union)
27import grpc # pytype: disable=pyi-error
28from grpc import _common # pytype: disable=pyi-error
29from grpc import _compression # pytype: disable=pyi-error
30from grpc import _grpcio_metadata # pytype: disable=pyi-error
31from grpc._cython import cygrpc
32from grpc._typing import ChannelArgumentType
33from grpc._typing import DeserializingFunction
34from grpc._typing import IntegratedCallFactory
35from grpc._typing import MetadataType
36from grpc._typing import NullaryCallbackType
37from grpc._typing import ResponseType
38from grpc._typing import SerializingFunction
39from grpc._typing import UserTag
40import grpc.experimental # pytype: disable=pyi-error
42_LOGGER = logging.getLogger(__name__)
44_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
46_EMPTY_FLAGS = 0
48# NOTE(rbellevi): No guarantees are given about the maintenance of this
49# environment variable.
50_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
51 "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
53_UNARY_UNARY_INITIAL_DUE = (
54 cygrpc.OperationType.send_initial_metadata,
55 cygrpc.OperationType.send_message,
56 cygrpc.OperationType.send_close_from_client,
57 cygrpc.OperationType.receive_initial_metadata,
58 cygrpc.OperationType.receive_message,
59 cygrpc.OperationType.receive_status_on_client,
60)
61_UNARY_STREAM_INITIAL_DUE = (
62 cygrpc.OperationType.send_initial_metadata,
63 cygrpc.OperationType.send_message,
64 cygrpc.OperationType.send_close_from_client,
65 cygrpc.OperationType.receive_initial_metadata,
66 cygrpc.OperationType.receive_status_on_client,
67)
68_STREAM_UNARY_INITIAL_DUE = (
69 cygrpc.OperationType.send_initial_metadata,
70 cygrpc.OperationType.receive_initial_metadata,
71 cygrpc.OperationType.receive_message,
72 cygrpc.OperationType.receive_status_on_client,
73)
74_STREAM_STREAM_INITIAL_DUE = (
75 cygrpc.OperationType.send_initial_metadata,
76 cygrpc.OperationType.receive_initial_metadata,
77 cygrpc.OperationType.receive_status_on_client,
78)
80_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
81 'Exception calling channel subscription callback!')
83_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
84 '\tstatus = {}\n'
85 '\tdetails = "{}"\n'
86 '>')
88_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
89 '\tstatus = {}\n'
90 '\tdetails = "{}"\n'
91 '\tdebug_error_string = "{}"\n'
92 '>')
95def _deadline(timeout: Optional[float]) -> Optional[float]:
96 return None if timeout is None else time.time() + timeout
99def _unknown_code_details(unknown_cygrpc_code: Optional[grpc.StatusCode],
100 details: Optional[str]) -> str:
101 return 'Server sent unknown code {} and details "{}"'.format(
102 unknown_cygrpc_code, details)
105class _RPCState(object):
106 condition: threading.Condition
107 due: Set[cygrpc.OperationType]
108 initial_metadata: Optional[MetadataType]
109 response: Any
110 trailing_metadata: Optional[MetadataType]
111 code: Optional[grpc.StatusCode]
112 details: Optional[str]
113 debug_error_string: Optional[str]
114 cancelled: bool
115 callbacks: List[NullaryCallbackType]
116 fork_epoch: Optional[int]
118 def __init__(self, due: Sequence[cygrpc.OperationType],
119 initial_metadata: Optional[MetadataType],
120 trailing_metadata: Optional[MetadataType],
121 code: Optional[grpc.StatusCode], details: Optional[str]):
122 # `condition` guards all members of _RPCState. `notify_all` is called on
123 # `condition` when the state of the RPC has changed.
124 self.condition = threading.Condition()
126 # The cygrpc.OperationType objects representing events due from the RPC's
127 # completion queue. If an operation is in `due`, it is guaranteed that
128 # `operate()` has been called on a corresponding operation. But the
129 # converse is not true. That is, in the case of failed `operate()`
130 # calls, there may briefly be events in `due` that do not correspond to
131 # operations submitted to Core.
132 self.due = set(due)
133 self.initial_metadata = initial_metadata
134 self.response = None
135 self.trailing_metadata = trailing_metadata
136 self.code = code
137 self.details = details
138 self.debug_error_string = None
140 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
141 # slightly wonky, so they have to be tracked separately from the rest of the
142 # result of the RPC. This field tracks whether cancellation was requested
143 # prior to termination of the RPC.
144 self.cancelled = False
145 self.callbacks = []
146 self.fork_epoch = cygrpc.get_fork_epoch()
148 def reset_postfork_child(self):
149 self.condition = threading.Condition()
152def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
153 if state.code is None:
154 state.code = code
155 state.details = details
156 if state.initial_metadata is None:
157 state.initial_metadata = ()
158 state.trailing_metadata = ()
161def _handle_event(
162 event: cygrpc.BaseEvent, state: _RPCState,
163 response_deserializer: Optional[DeserializingFunction]
164) -> List[NullaryCallbackType]:
165 callbacks = []
166 for batch_operation in event.batch_operations:
167 operation_type = batch_operation.type()
168 state.due.remove(operation_type)
169 if operation_type == cygrpc.OperationType.receive_initial_metadata:
170 state.initial_metadata = batch_operation.initial_metadata()
171 elif operation_type == cygrpc.OperationType.receive_message:
172 serialized_response = batch_operation.message()
173 if serialized_response is not None:
174 response = _common.deserialize(serialized_response,
175 response_deserializer)
176 if response is None:
177 details = 'Exception deserializing response!'
178 _abort(state, grpc.StatusCode.INTERNAL, details)
179 else:
180 state.response = response
181 elif operation_type == cygrpc.OperationType.receive_status_on_client:
182 state.trailing_metadata = batch_operation.trailing_metadata()
183 if state.code is None:
184 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
185 batch_operation.code())
186 if code is None:
187 state.code = grpc.StatusCode.UNKNOWN
188 state.details = _unknown_code_details(
189 code, batch_operation.details())
190 else:
191 state.code = code
192 state.details = batch_operation.details()
193 state.debug_error_string = batch_operation.error_string()
194 callbacks.extend(state.callbacks)
195 state.callbacks = None
196 return callbacks
199def _event_handler(
200 state: _RPCState,
201 response_deserializer: Optional[DeserializingFunction]) -> UserTag:
203 def handle_event(event):
204 with state.condition:
205 callbacks = _handle_event(event, state, response_deserializer)
206 state.condition.notify_all()
207 done = not state.due
208 for callback in callbacks:
209 try:
210 callback()
211 except Exception as e: # pylint: disable=broad-except
212 # NOTE(rbellevi): We suppress but log errors here so as not to
213 # kill the channel spin thread.
214 logging.error('Exception in callback %s: %s',
215 repr(callback.func), repr(e))
216 return done and state.fork_epoch >= cygrpc.get_fork_epoch()
218 return handle_event
221# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
222#pylint: disable=too-many-statements
223def _consume_request_iterator(request_iterator: Iterator, state: _RPCState,
224 call: Union[cygrpc.IntegratedCall,
225 cygrpc.SegregatedCall],
226 request_serializer: SerializingFunction,
227 event_handler: Optional[UserTag]) -> None:
228 """Consume a request supplied by the user."""
230 def consume_request_iterator(): # pylint: disable=too-many-branches
231 # Iterate over the request iterator until it is exhausted or an error
232 # condition is encountered.
233 while True:
234 return_from_user_request_generator_invoked = False
235 try:
236 # The thread may die in user-code. Do not block fork for this.
237 cygrpc.enter_user_request_generator()
238 request = next(request_iterator)
239 except StopIteration:
240 break
241 except Exception: # pylint: disable=broad-except
242 cygrpc.return_from_user_request_generator()
243 return_from_user_request_generator_invoked = True
244 code = grpc.StatusCode.UNKNOWN
245 details = 'Exception iterating requests!'
246 _LOGGER.exception(details)
247 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
248 details)
249 _abort(state, code, details)
250 return
251 finally:
252 if not return_from_user_request_generator_invoked:
253 cygrpc.return_from_user_request_generator()
254 serialized_request = _common.serialize(request, request_serializer)
255 with state.condition:
256 if state.code is None and not state.cancelled:
257 if serialized_request is None:
258 code = grpc.StatusCode.INTERNAL
259 details = 'Exception serializing request!'
260 call.cancel(
261 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
262 details)
263 _abort(state, code, details)
264 return
265 else:
266 state.due.add(cygrpc.OperationType.send_message)
267 operations = (cygrpc.SendMessageOperation(
268 serialized_request, _EMPTY_FLAGS),)
269 operating = call.operate(operations, event_handler)
270 if not operating:
271 state.due.remove(cygrpc.OperationType.send_message)
272 return
274 def _done():
275 return (state.code is not None or
276 cygrpc.OperationType.send_message
277 not in state.due)
279 _common.wait(state.condition.wait,
280 _done,
281 spin_cb=functools.partial(
282 cygrpc.block_if_fork_in_progress,
283 state))
284 if state.code is not None:
285 return
286 else:
287 return
288 with state.condition:
289 if state.code is None:
290 state.due.add(cygrpc.OperationType.send_close_from_client)
291 operations = (
292 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
293 operating = call.operate(operations, event_handler)
294 if not operating:
295 state.due.remove(
296 cygrpc.OperationType.send_close_from_client)
298 consumption_thread = cygrpc.ForkManagedThread(
299 target=consume_request_iterator)
300 consumption_thread.setDaemon(True)
301 consumption_thread.start()
304def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
305 """Calculates error string for RPC."""
306 with rpc_state.condition:
307 if rpc_state.code is None:
308 return '<{} object>'.format(class_name)
309 elif rpc_state.code is grpc.StatusCode.OK:
310 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
311 rpc_state.details)
312 else:
313 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
314 class_name, rpc_state.code, rpc_state.details,
315 rpc_state.debug_error_string)
318class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
319 """An RPC error not tied to the execution of a particular RPC.
321 The RPC represented by the state object must not be in-progress or
322 cancelled.
324 Attributes:
325 _state: An instance of _RPCState.
326 """
327 _state: _RPCState
329 def __init__(self, state: _RPCState):
330 with state.condition:
331 self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
332 copy.deepcopy(state.trailing_metadata),
333 state.code, copy.deepcopy(state.details))
334 self._state.response = copy.copy(state.response)
335 self._state.debug_error_string = copy.copy(state.debug_error_string)
337 def initial_metadata(self) -> Optional[MetadataType]:
338 return self._state.initial_metadata
340 def trailing_metadata(self) -> Optional[MetadataType]:
341 return self._state.trailing_metadata
343 def code(self) -> Optional[grpc.StatusCode]:
344 return self._state.code
346 def details(self) -> Optional[str]:
347 return _common.decode(self._state.details)
349 def debug_error_string(self) -> Optional[str]:
350 return _common.decode(self._state.debug_error_string)
352 def _repr(self) -> str:
353 return _rpc_state_string(self.__class__.__name__, self._state)
355 def __repr__(self) -> str:
356 return self._repr()
358 def __str__(self) -> str:
359 return self._repr()
361 def cancel(self) -> bool:
362 """See grpc.Future.cancel."""
363 return False
365 def cancelled(self) -> bool:
366 """See grpc.Future.cancelled."""
367 return False
369 def running(self) -> bool:
370 """See grpc.Future.running."""
371 return False
373 def done(self) -> bool:
374 """See grpc.Future.done."""
375 return True
377 def result(self, timeout: Optional[float] = None) -> Any: # pylint: disable=unused-argument
378 """See grpc.Future.result."""
379 raise self
381 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: # pylint: disable=unused-argument
382 """See grpc.Future.exception."""
383 return self
385 def traceback(
386 self,
387 timeout: Optional[float] = None # pylint: disable=unused-argument
388 ) -> Optional[types.TracebackType]:
389 """See grpc.Future.traceback."""
390 try:
391 raise self
392 except grpc.RpcError:
393 return sys.exc_info()[2]
395 def add_done_callback(
396 self,
397 fn: Callable[[grpc.Future], None],
398 timeout: Optional[float] = None) -> None: # pylint: disable=unused-argument
399 """See grpc.Future.add_done_callback."""
400 fn(self)
403class _Rendezvous(grpc.RpcError, grpc.RpcContext):
404 """An RPC iterator.
406 Attributes:
407 _state: An instance of _RPCState.
408 _call: An instance of SegregatedCall or IntegratedCall.
409 In either case, the _call object is expected to have operate, cancel,
410 and next_event methods.
411 _response_deserializer: A callable taking bytes and return a Python
412 object.
413 _deadline: A float representing the deadline of the RPC in seconds. Or
414 possibly None, to represent an RPC with no deadline at all.
415 """
416 _state: _RPCState
417 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
418 _response_deserializer: Optional[DeserializingFunction]
419 _deadline: Optional[float]
421 def __init__(self, state: _RPCState, call: Union[cygrpc.SegregatedCall,
422 cygrpc.IntegratedCall],
423 response_deserializer: Optional[DeserializingFunction],
424 deadline: Optional[float]):
425 super(_Rendezvous, self).__init__()
426 self._state = state
427 self._call = call
428 self._response_deserializer = response_deserializer
429 self._deadline = deadline
431 def is_active(self) -> bool:
432 """See grpc.RpcContext.is_active"""
433 with self._state.condition:
434 return self._state.code is None
436 def time_remaining(self) -> Optional[float]:
437 """See grpc.RpcContext.time_remaining"""
438 with self._state.condition:
439 if self._deadline is None:
440 return None
441 else:
442 return max(self._deadline - time.time(), 0)
444 def cancel(self) -> bool:
445 """See grpc.RpcContext.cancel"""
446 with self._state.condition:
447 if self._state.code is None:
448 code = grpc.StatusCode.CANCELLED
449 details = 'Locally cancelled by application!'
450 self._call.cancel(
451 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
452 self._state.cancelled = True
453 _abort(self._state, code, details)
454 self._state.condition.notify_all()
455 return True
456 else:
457 return False
459 def add_callback(self, callback: NullaryCallbackType) -> bool:
460 """See grpc.RpcContext.add_callback"""
461 with self._state.condition:
462 if self._state.callbacks is None:
463 return False
464 else:
465 self._state.callbacks.append(callback)
466 return True
468 def __iter__(self):
469 return self
471 def next(self):
472 return self._next()
474 def __next__(self):
475 return self._next()
477 def _next(self):
478 raise NotImplementedError()
480 def debug_error_string(self) -> Optional[str]:
481 raise NotImplementedError()
483 def _repr(self) -> str:
484 return _rpc_state_string(self.__class__.__name__, self._state)
486 def __repr__(self) -> str:
487 return self._repr()
489 def __str__(self) -> str:
490 return self._repr()
492 def __del__(self) -> None:
493 with self._state.condition:
494 if self._state.code is None:
495 self._state.code = grpc.StatusCode.CANCELLED
496 self._state.details = 'Cancelled upon garbage collection!'
497 self._state.cancelled = True
498 self._call.cancel(
499 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
500 self._state.details)
501 self._state.condition.notify_all()
504class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
505 """An RPC iterator operating entirely on a single thread.
507 The __next__ method of _SingleThreadedRendezvous does not depend on the
508 existence of any other thread, including the "channel spin thread".
509 However, this means that its interface is entirely synchronous. So this
510 class cannot completely fulfill the grpc.Future interface. The result,
511 exception, and traceback methods will never block and will instead raise
512 an exception if calling the method would result in blocking.
514 This means that these methods are safe to call from add_done_callback
515 handlers.
516 """
517 _state: _RPCState
519 def _is_complete(self) -> bool:
520 return self._state.code is not None
522 def cancelled(self) -> bool:
523 with self._state.condition:
524 return self._state.cancelled
526 def running(self) -> bool:
527 with self._state.condition:
528 return self._state.code is None
530 def done(self) -> bool:
531 with self._state.condition:
532 return self._state.code is not None
534 def result(self, timeout: Optional[float] = None) -> Any:
535 """Returns the result of the computation or raises its exception.
537 This method will never block. Instead, it will raise an exception
538 if calling this method would otherwise result in blocking.
540 Since this method will never block, any `timeout` argument passed will
541 be ignored.
542 """
543 del timeout
544 with self._state.condition:
545 if not self._is_complete():
546 raise grpc.experimental.UsageError(
547 "_SingleThreadedRendezvous only supports result() when the RPC is complete."
548 )
549 if self._state.code is grpc.StatusCode.OK:
550 return self._state.response
551 elif self._state.cancelled:
552 raise grpc.FutureCancelledError()
553 else:
554 raise self
556 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
557 """Return the exception raised by the computation.
559 This method will never block. Instead, it will raise an exception
560 if calling this method would otherwise result in blocking.
562 Since this method will never block, any `timeout` argument passed will
563 be ignored.
564 """
565 del timeout
566 with self._state.condition:
567 if not self._is_complete():
568 raise grpc.experimental.UsageError(
569 "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
570 )
571 if self._state.code is grpc.StatusCode.OK:
572 return None
573 elif self._state.cancelled:
574 raise grpc.FutureCancelledError()
575 else:
576 return self
578 def traceback(
579 self,
580 timeout: Optional[float] = None) -> Optional[types.TracebackType]:
581 """Access the traceback of the exception raised by the computation.
583 This method will never block. Instead, it will raise an exception
584 if calling this method would otherwise result in blocking.
586 Since this method will never block, any `timeout` argument passed will
587 be ignored.
588 """
589 del timeout
590 with self._state.condition:
591 if not self._is_complete():
592 raise grpc.experimental.UsageError(
593 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
594 )
595 if self._state.code is grpc.StatusCode.OK:
596 return None
597 elif self._state.cancelled:
598 raise grpc.FutureCancelledError()
599 else:
600 try:
601 raise self
602 except grpc.RpcError:
603 return sys.exc_info()[2]
605 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
606 with self._state.condition:
607 if self._state.code is None:
608 self._state.callbacks.append(functools.partial(fn, self))
609 return
611 fn(self)
613 def initial_metadata(self) -> Optional[MetadataType]:
614 """See grpc.Call.initial_metadata"""
615 with self._state.condition:
616 # NOTE(gnossen): Based on our initial call batch, we are guaranteed
617 # to receive initial metadata before any messages.
618 while self._state.initial_metadata is None:
619 self._consume_next_event()
620 return self._state.initial_metadata
622 def trailing_metadata(self) -> Optional[MetadataType]:
623 """See grpc.Call.trailing_metadata"""
624 with self._state.condition:
625 if self._state.trailing_metadata is None:
626 raise grpc.experimental.UsageError(
627 "Cannot get trailing metadata until RPC is completed.")
628 return self._state.trailing_metadata
630 def code(self) -> Optional[grpc.StatusCode]:
631 """See grpc.Call.code"""
632 with self._state.condition:
633 if self._state.code is None:
634 raise grpc.experimental.UsageError(
635 "Cannot get code until RPC is completed.")
636 return self._state.code
638 def details(self) -> Optional[str]:
639 """See grpc.Call.details"""
640 with self._state.condition:
641 if self._state.details is None:
642 raise grpc.experimental.UsageError(
643 "Cannot get details until RPC is completed.")
644 return _common.decode(self._state.details)
646 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
647 event = self._call.next_event()
648 with self._state.condition:
649 callbacks = _handle_event(event, self._state,
650 self._response_deserializer)
651 for callback in callbacks:
652 # NOTE(gnossen): We intentionally allow exceptions to bubble up
653 # to the user when running on a single thread.
654 callback()
655 return event
657 def _next_response(self) -> Any:
658 while True:
659 self._consume_next_event()
660 with self._state.condition:
661 if self._state.response is not None:
662 response = self._state.response
663 self._state.response = None
664 return response
665 elif cygrpc.OperationType.receive_message not in self._state.due:
666 if self._state.code is grpc.StatusCode.OK:
667 raise StopIteration()
668 elif self._state.code is not None:
669 raise self
671 def _next(self) -> Any:
672 with self._state.condition:
673 if self._state.code is None:
674 # We tentatively add the operation as expected and remove
675 # it if the enqueue operation fails. This allows us to guarantee that
676 # if an event has been submitted to the core completion queue,
677 # it is in `due`. If we waited until after a successful
678 # enqueue operation then a signal could interrupt this
679 # thread between the enqueue operation and the addition of the
680 # operation to `due`. This would cause an exception on the
681 # channel spin thread when the operation completes and no
682 # corresponding operation would be present in state.due.
683 # Note that, since `condition` is held through this block, there is
684 # no data race on `due`.
685 self._state.due.add(cygrpc.OperationType.receive_message)
686 operating = self._call.operate(
687 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
688 if not operating:
689 self._state.due.remove(cygrpc.OperationType.receive_message)
690 elif self._state.code is grpc.StatusCode.OK:
691 raise StopIteration()
692 else:
693 raise self
694 return self._next_response()
696 def debug_error_string(self) -> Optional[str]:
697 with self._state.condition:
698 if self._state.debug_error_string is None:
699 raise grpc.experimental.UsageError(
700 "Cannot get debug error string until RPC is completed.")
701 return _common.decode(self._state.debug_error_string)
704class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
705 """An RPC iterator that depends on a channel spin thread.
707 This iterator relies upon a per-channel thread running in the background,
708 dequeueing events from the completion queue, and notifying threads waiting
709 on the threading.Condition object in the _RPCState object.
711 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
712 and to mediate a bidirection streaming RPC.
713 """
714 _state: _RPCState
716 def initial_metadata(self) -> Optional[MetadataType]:
717 """See grpc.Call.initial_metadata"""
718 with self._state.condition:
720 def _done():
721 return self._state.initial_metadata is not None
723 _common.wait(self._state.condition.wait, _done)
724 return self._state.initial_metadata
726 def trailing_metadata(self) -> Optional[MetadataType]:
727 """See grpc.Call.trailing_metadata"""
728 with self._state.condition:
730 def _done():
731 return self._state.trailing_metadata is not None
733 _common.wait(self._state.condition.wait, _done)
734 return self._state.trailing_metadata
736 def code(self) -> Optional[grpc.StatusCode]:
737 """See grpc.Call.code"""
738 with self._state.condition:
740 def _done():
741 return self._state.code is not None
743 _common.wait(self._state.condition.wait, _done)
744 return self._state.code
746 def details(self) -> Optional[str]:
747 """See grpc.Call.details"""
748 with self._state.condition:
750 def _done():
751 return self._state.details is not None
753 _common.wait(self._state.condition.wait, _done)
754 return _common.decode(self._state.details)
756 def debug_error_string(self) -> Optional[str]:
757 with self._state.condition:
759 def _done():
760 return self._state.debug_error_string is not None
762 _common.wait(self._state.condition.wait, _done)
763 return _common.decode(self._state.debug_error_string)
765 def cancelled(self) -> bool:
766 with self._state.condition:
767 return self._state.cancelled
769 def running(self) -> bool:
770 with self._state.condition:
771 return self._state.code is None
773 def done(self) -> bool:
774 with self._state.condition:
775 return self._state.code is not None
777 def _is_complete(self) -> bool:
778 return self._state.code is not None
780 def result(self, timeout: Optional[float] = None) -> Any:
781 """Returns the result of the computation or raises its exception.
783 See grpc.Future.result for the full API contract.
784 """
785 with self._state.condition:
786 timed_out = _common.wait(self._state.condition.wait,
787 self._is_complete,
788 timeout=timeout)
789 if timed_out:
790 raise grpc.FutureTimeoutError()
791 else:
792 if self._state.code is grpc.StatusCode.OK:
793 return self._state.response
794 elif self._state.cancelled:
795 raise grpc.FutureCancelledError()
796 else:
797 raise self
799 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
800 """Return the exception raised by the computation.
802 See grpc.Future.exception for the full API contract.
803 """
804 with self._state.condition:
805 timed_out = _common.wait(self._state.condition.wait,
806 self._is_complete,
807 timeout=timeout)
808 if timed_out:
809 raise grpc.FutureTimeoutError()
810 else:
811 if self._state.code is grpc.StatusCode.OK:
812 return None
813 elif self._state.cancelled:
814 raise grpc.FutureCancelledError()
815 else:
816 return self
818 def traceback(
819 self,
820 timeout: Optional[float] = None) -> Optional[types.TracebackType]:
821 """Access the traceback of the exception raised by the computation.
823 See grpc.future.traceback for the full API contract.
824 """
825 with self._state.condition:
826 timed_out = _common.wait(self._state.condition.wait,
827 self._is_complete,
828 timeout=timeout)
829 if timed_out:
830 raise grpc.FutureTimeoutError()
831 else:
832 if self._state.code is grpc.StatusCode.OK:
833 return None
834 elif self._state.cancelled:
835 raise grpc.FutureCancelledError()
836 else:
837 try:
838 raise self
839 except grpc.RpcError:
840 return sys.exc_info()[2]
842 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
843 with self._state.condition:
844 if self._state.code is None:
845 self._state.callbacks.append(functools.partial(fn, self))
846 return
848 fn(self)
850 def _next(self) -> Any:
851 with self._state.condition:
852 if self._state.code is None:
853 event_handler = _event_handler(self._state,
854 self._response_deserializer)
855 self._state.due.add(cygrpc.OperationType.receive_message)
856 operating = self._call.operate(
857 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
858 event_handler)
859 if not operating:
860 self._state.due.remove(cygrpc.OperationType.receive_message)
861 elif self._state.code is grpc.StatusCode.OK:
862 raise StopIteration()
863 else:
864 raise self
866 def _response_ready():
867 return (self._state.response is not None or
868 (cygrpc.OperationType.receive_message
869 not in self._state.due and
870 self._state.code is not None))
872 _common.wait(self._state.condition.wait, _response_ready)
873 if self._state.response is not None:
874 response = self._state.response
875 self._state.response = None
876 return response
877 elif cygrpc.OperationType.receive_message not in self._state.due:
878 if self._state.code is grpc.StatusCode.OK:
879 raise StopIteration()
880 elif self._state.code is not None:
881 raise self
884def _start_unary_request(
885 request: Any, timeout: Optional[float],
886 request_serializer: SerializingFunction
887) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
888 deadline = _deadline(timeout)
889 serialized_request = _common.serialize(request, request_serializer)
890 if serialized_request is None:
891 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
892 'Exception serializing request!')
893 error = _InactiveRpcError(state)
894 return deadline, None, error
895 else:
896 return deadline, serialized_request, None
899def _end_unary_response_blocking(
900 state: _RPCState, call: cygrpc.SegregatedCall, with_call: bool,
901 deadline: Optional[float]
902) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
903 if state.code is grpc.StatusCode.OK:
904 if with_call:
905 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
906 return state.response, rendezvous
907 else:
908 return state.response
909 else:
910 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
913def _stream_unary_invocation_operations(
914 metadata: Optional[MetadataType],
915 initial_metadata_flags: int) -> Sequence[Sequence[cygrpc.Operation]]:
916 return (
917 (
918 cygrpc.SendInitialMetadataOperation(metadata,
919 initial_metadata_flags),
920 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
921 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
922 ),
923 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
924 )
927def _stream_unary_invocation_operations_and_tags(
928 metadata: Optional[MetadataType], initial_metadata_flags: int
929) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
930 return tuple((
931 operations,
932 None,
933 ) for operations in _stream_unary_invocation_operations(
934 metadata, initial_metadata_flags))
937def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
938 parent_deadline = cygrpc.get_deadline_from_context()
939 if parent_deadline is None and user_deadline is None:
940 return None
941 elif parent_deadline is not None and user_deadline is None:
942 return parent_deadline
943 elif user_deadline is not None and parent_deadline is None:
944 return user_deadline
945 else:
946 return min(parent_deadline, user_deadline)
949class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
950 _channel: cygrpc.Channel
951 _managed_call: IntegratedCallFactory
952 _method: bytes
953 _request_serializer: Optional[SerializingFunction]
954 _response_deserializer: Optional[DeserializingFunction]
955 _context: Any
957 # pylint: disable=too-many-arguments
958 def __init__(self, channel: cygrpc.Channel,
959 managed_call: IntegratedCallFactory, method: bytes,
960 request_serializer: Optional[SerializingFunction],
961 response_deserializer: Optional[DeserializingFunction]):
962 self._channel = channel
963 self._managed_call = managed_call
964 self._method = method
965 self._request_serializer = request_serializer
966 self._response_deserializer = response_deserializer
967 self._context = cygrpc.build_census_context()
969 def _prepare(
970 self, request: Any, timeout: Optional[float],
971 metadata: Optional[MetadataType], wait_for_ready: Optional[bool],
972 compression: Optional[grpc.Compression]
973 ) -> Tuple[Optional[_RPCState], Optional[Sequence[cygrpc.Operation]],
974 Optional[float], Optional[grpc.RpcError]]:
975 deadline, serialized_request, rendezvous = _start_unary_request(
976 request, timeout, self._request_serializer)
977 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
978 wait_for_ready)
979 augmented_metadata = _compression.augment_metadata(
980 metadata, compression)
981 if serialized_request is None:
982 return None, None, None, rendezvous
983 else:
984 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
985 operations = (
986 cygrpc.SendInitialMetadataOperation(augmented_metadata,
987 initial_metadata_flags),
988 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
989 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
990 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
991 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
992 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
993 )
994 return state, operations, deadline, None
996 def _blocking(
997 self,
998 request: Any,
999 timeout: Optional[float] = None,
1000 metadata: Optional[MetadataType] = None,
1001 credentials: Optional[grpc.CallCredentials] = None,
1002 wait_for_ready: Optional[bool] = None,
1003 compression: Optional[grpc.Compression] = None
1004 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1005 state, operations, deadline, rendezvous = self._prepare(
1006 request, timeout, metadata, wait_for_ready, compression)
1007 if state is None:
1008 raise rendezvous # pylint: disable-msg=raising-bad-type
1009 else:
1010 call = self._channel.segregated_call(
1011 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1012 self._method, None, _determine_deadline(deadline), metadata,
1013 None if credentials is None else credentials._credentials, ((
1014 operations,
1015 None,
1016 ),), self._context)
1017 event = call.next_event()
1018 _handle_event(event, state, self._response_deserializer)
1019 return state, call
1021 def __call__(self,
1022 request: Any,
1023 timeout: Optional[float] = None,
1024 metadata: Optional[MetadataType] = None,
1025 credentials: Optional[grpc.CallCredentials] = None,
1026 wait_for_ready: Optional[bool] = None,
1027 compression: Optional[grpc.Compression] = None) -> Any:
1028 state, call, = self._blocking(request, timeout, metadata, credentials,
1029 wait_for_ready, compression)
1030 return _end_unary_response_blocking(state, call, False, None)
1032 def with_call(
1033 self,
1034 request: Any,
1035 timeout: Optional[float] = None,
1036 metadata: Optional[MetadataType] = None,
1037 credentials: Optional[grpc.CallCredentials] = None,
1038 wait_for_ready: Optional[bool] = None,
1039 compression: Optional[grpc.Compression] = None
1040 ) -> Tuple[Any, grpc.Call]:
1041 state, call, = self._blocking(request, timeout, metadata, credentials,
1042 wait_for_ready, compression)
1043 return _end_unary_response_blocking(state, call, True, None)
1045 def future(
1046 self,
1047 request: Any,
1048 timeout: Optional[float] = None,
1049 metadata: Optional[MetadataType] = None,
1050 credentials: Optional[grpc.CallCredentials] = None,
1051 wait_for_ready: Optional[bool] = None,
1052 compression: Optional[grpc.Compression] = None
1053 ) -> _MultiThreadedRendezvous:
1054 state, operations, deadline, rendezvous = self._prepare(
1055 request, timeout, metadata, wait_for_ready, compression)
1056 if state is None:
1057 raise rendezvous # pylint: disable-msg=raising-bad-type
1058 else:
1059 event_handler = _event_handler(state, self._response_deserializer)
1060 call = self._managed_call(
1061 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1062 self._method, None, deadline, metadata,
1063 None if credentials is None else credentials._credentials,
1064 (operations,), event_handler, self._context)
1065 return _MultiThreadedRendezvous(state, call,
1066 self._response_deserializer,
1067 deadline)
1070class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1071 _channel: cygrpc.Channel
1072 _method: bytes
1073 _request_serializer: Optional[SerializingFunction]
1074 _response_deserializer: Optional[DeserializingFunction]
1075 _context: Any
1077 # pylint: disable=too-many-arguments
1078 def __init__(self, channel: cygrpc.Channel, method: bytes,
1079 request_serializer: SerializingFunction,
1080 response_deserializer: DeserializingFunction):
1081 self._channel = channel
1082 self._method = method
1083 self._request_serializer = request_serializer
1084 self._response_deserializer = response_deserializer
1085 self._context = cygrpc.build_census_context()
1087 def __call__( # pylint: disable=too-many-locals
1088 self,
1089 request: Any,
1090 timeout: Optional[float] = None,
1091 metadata: Optional[MetadataType] = None,
1092 credentials: Optional[grpc.CallCredentials] = None,
1093 wait_for_ready: Optional[bool] = None,
1094 compression: Optional[grpc.Compression] = None
1095 ) -> _SingleThreadedRendezvous:
1096 deadline = _deadline(timeout)
1097 serialized_request = _common.serialize(request,
1098 self._request_serializer)
1099 if serialized_request is None:
1100 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
1101 'Exception serializing request!')
1102 raise _InactiveRpcError(state)
1104 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1105 call_credentials = None if credentials is None else credentials._credentials
1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1107 wait_for_ready)
1108 augmented_metadata = _compression.augment_metadata(
1109 metadata, compression)
1110 operations = (
1111 (cygrpc.SendInitialMetadataOperation(augmented_metadata,
1112 initial_metadata_flags),
1113 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1114 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
1115 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1116 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1117 )
1118 operations_and_tags = tuple((ops, None) for ops in operations)
1119 call = self._channel.segregated_call(
1120 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1121 None, _determine_deadline(deadline), metadata, call_credentials,
1122 operations_and_tags, self._context)
1123 return _SingleThreadedRendezvous(state, call,
1124 self._response_deserializer, deadline)
1127class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1128 _channel: cygrpc.Channel
1129 _managed_call: IntegratedCallFactory
1130 _method: bytes
1131 _request_serializer: Optional[SerializingFunction]
1132 _response_deserializer: Optional[DeserializingFunction]
1133 _context: Any
1135 # pylint: disable=too-many-arguments
1136 def __init__(self, channel: cygrpc.Channel,
1137 managed_call: IntegratedCallFactory, method: bytes,
1138 request_serializer: SerializingFunction,
1139 response_deserializer: DeserializingFunction):
1140 self._channel = channel
1141 self._managed_call = managed_call
1142 self._method = method
1143 self._request_serializer = request_serializer
1144 self._response_deserializer = response_deserializer
1145 self._context = cygrpc.build_census_context()
1147 def __call__( # pylint: disable=too-many-locals
1148 self,
1149 request: Any,
1150 timeout: Optional[float] = None,
1151 metadata: Optional[MetadataType] = None,
1152 credentials: Optional[grpc.CallCredentials] = None,
1153 wait_for_ready: Optional[bool] = None,
1154 compression: Optional[
1155 grpc.Compression] = None) -> _MultiThreadedRendezvous:
1156 deadline, serialized_request, rendezvous = _start_unary_request(
1157 request, timeout, self._request_serializer)
1158 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1159 wait_for_ready)
1160 if serialized_request is None:
1161 raise rendezvous # pylint: disable-msg=raising-bad-type
1162 else:
1163 augmented_metadata = _compression.augment_metadata(
1164 metadata, compression)
1165 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1166 operations = (
1167 (
1168 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1169 initial_metadata_flags),
1170 cygrpc.SendMessageOperation(serialized_request,
1171 _EMPTY_FLAGS),
1172 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1173 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1174 ),
1175 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1176 )
1177 call = self._managed_call(
1178 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1179 self._method, None, _determine_deadline(deadline), metadata,
1180 None if credentials is None else credentials._credentials,
1181 operations, _event_handler(state, self._response_deserializer),
1182 self._context)
1183 return _MultiThreadedRendezvous(state, call,
1184 self._response_deserializer,
1185 deadline)
1188class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1189 _channel: cygrpc.Channel
1190 _managed_call: IntegratedCallFactory
1191 _method: bytes
1192 _request_serializer: Optional[SerializingFunction]
1193 _response_deserializer: Optional[DeserializingFunction]
1194 _context: Any
1196 # pylint: disable=too-many-arguments
1197 def __init__(self, channel: cygrpc.Channel,
1198 managed_call: IntegratedCallFactory, method: bytes,
1199 request_serializer: Optional[SerializingFunction],
1200 response_deserializer: Optional[DeserializingFunction]):
1201 self._channel = channel
1202 self._managed_call = managed_call
1203 self._method = method
1204 self._request_serializer = request_serializer
1205 self._response_deserializer = response_deserializer
1206 self._context = cygrpc.build_census_context()
1208 def _blocking(
1209 self, request_iterator: Iterator, timeout: Optional[float],
1210 metadata: Optional[MetadataType],
1211 credentials: Optional[grpc.CallCredentials],
1212 wait_for_ready: Optional[bool], compression: Optional[grpc.Compression]
1213 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1214 deadline = _deadline(timeout)
1215 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1216 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1217 wait_for_ready)
1218 augmented_metadata = _compression.augment_metadata(
1219 metadata, compression)
1220 call = self._channel.segregated_call(
1221 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1222 None, _determine_deadline(deadline), augmented_metadata,
1223 None if credentials is None else credentials._credentials,
1224 _stream_unary_invocation_operations_and_tags(
1225 augmented_metadata, initial_metadata_flags), self._context)
1226 _consume_request_iterator(request_iterator, state, call,
1227 self._request_serializer, None)
1228 while True:
1229 event = call.next_event()
1230 with state.condition:
1231 _handle_event(event, state, self._response_deserializer)
1232 state.condition.notify_all()
1233 if not state.due:
1234 break
1235 return state, call
1237 def __call__(self,
1238 request_iterator: Iterator,
1239 timeout: Optional[float] = None,
1240 metadata: Optional[MetadataType] = None,
1241 credentials: Optional[grpc.CallCredentials] = None,
1242 wait_for_ready: Optional[bool] = None,
1243 compression: Optional[grpc.Compression] = None) -> Any:
1244 state, call, = self._blocking(request_iterator, timeout, metadata,
1245 credentials, wait_for_ready, compression)
1246 return _end_unary_response_blocking(state, call, False, None)
1248 def with_call(
1249 self,
1250 request_iterator: Iterator,
1251 timeout: Optional[float] = None,
1252 metadata: Optional[MetadataType] = None,
1253 credentials: Optional[grpc.CallCredentials] = None,
1254 wait_for_ready: Optional[bool] = None,
1255 compression: Optional[grpc.Compression] = None
1256 ) -> Tuple[Any, grpc.Call]:
1257 state, call, = self._blocking(request_iterator, timeout, metadata,
1258 credentials, wait_for_ready, compression)
1259 return _end_unary_response_blocking(state, call, True, None)
1261 def future(
1262 self,
1263 request_iterator: Iterator,
1264 timeout: Optional[float] = None,
1265 metadata: Optional[MetadataType] = None,
1266 credentials: Optional[grpc.CallCredentials] = None,
1267 wait_for_ready: Optional[bool] = None,
1268 compression: Optional[grpc.Compression] = None
1269 ) -> _MultiThreadedRendezvous:
1270 deadline = _deadline(timeout)
1271 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1272 event_handler = _event_handler(state, self._response_deserializer)
1273 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1274 wait_for_ready)
1275 augmented_metadata = _compression.augment_metadata(
1276 metadata, compression)
1277 call = self._managed_call(
1278 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1279 None, deadline, augmented_metadata,
1280 None if credentials is None else credentials._credentials,
1281 _stream_unary_invocation_operations(metadata,
1282 initial_metadata_flags),
1283 event_handler, self._context)
1284 _consume_request_iterator(request_iterator, state, call,
1285 self._request_serializer, event_handler)
1286 return _MultiThreadedRendezvous(state, call,
1287 self._response_deserializer, deadline)
1290class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1291 _channel: cygrpc.Channel
1292 _managed_call: IntegratedCallFactory
1293 _method: bytes
1294 _request_serializer: Optional[SerializingFunction]
1295 _response_deserializer: Optional[DeserializingFunction]
1296 _context: Any
1298 # pylint: disable=too-many-arguments
1299 def __init__(self,
1300 channel: cygrpc.Channel,
1301 managed_call: IntegratedCallFactory,
1302 method: bytes,
1303 request_serializer: Optional[SerializingFunction] = None,
1304 response_deserializer: Optional[DeserializingFunction] = None):
1305 self._channel = channel
1306 self._managed_call = managed_call
1307 self._method = method
1308 self._request_serializer = request_serializer
1309 self._response_deserializer = response_deserializer
1310 self._context = cygrpc.build_census_context()
1312 def __call__(
1313 self,
1314 request_iterator: Iterator,
1315 timeout: Optional[float] = None,
1316 metadata: Optional[MetadataType] = None,
1317 credentials: Optional[grpc.CallCredentials] = None,
1318 wait_for_ready: Optional[bool] = None,
1319 compression: Optional[grpc.Compression] = None
1320 ) -> _MultiThreadedRendezvous:
1321 deadline = _deadline(timeout)
1322 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1323 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1324 wait_for_ready)
1325 augmented_metadata = _compression.augment_metadata(
1326 metadata, compression)
1327 operations = (
1328 (
1329 cygrpc.SendInitialMetadataOperation(augmented_metadata,
1330 initial_metadata_flags),
1331 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1332 ),
1333 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1334 )
1335 event_handler = _event_handler(state, self._response_deserializer)
1336 call = self._managed_call(
1337 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1338 None, _determine_deadline(deadline), augmented_metadata,
1339 None if credentials is None else credentials._credentials,
1340 operations, event_handler, self._context)
1341 _consume_request_iterator(request_iterator, state, call,
1342 self._request_serializer, event_handler)
1343 return _MultiThreadedRendezvous(state, call,
1344 self._response_deserializer, deadline)
1347class _InitialMetadataFlags(int):
1348 """Stores immutable initial metadata flags"""
1350 def __new__(cls, value: int = _EMPTY_FLAGS):
1351 value &= cygrpc.InitialMetadataFlags.used_mask
1352 return super(_InitialMetadataFlags, cls).__new__(cls, value)
1354 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1355 if wait_for_ready is not None:
1356 if wait_for_ready:
1357 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1358 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1359 elif not wait_for_ready:
1360 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1361 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1362 return self
1365class _ChannelCallState(object):
1366 channel: cygrpc.Channel
1367 managed_calls: int
1368 threading: bool
1370 def __init__(self, channel: cygrpc.Channel):
1371 self.lock = threading.Lock()
1372 self.channel = channel
1373 self.managed_calls = 0
1374 self.threading = False
1376 def reset_postfork_child(self) -> None:
1377 self.managed_calls = 0
1379 def __del__(self):
1380 try:
1381 self.channel.close(cygrpc.StatusCode.cancelled,
1382 'Channel deallocated!')
1383 except (TypeError, AttributeError):
1384 pass
1387def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1389 def channel_spin():
1390 while True:
1391 cygrpc.block_if_fork_in_progress(state)
1392 event = state.channel.next_call_event()
1393 if event.completion_type == cygrpc.CompletionType.queue_timeout:
1394 continue
1395 call_completed = event.tag(event)
1396 if call_completed:
1397 with state.lock:
1398 state.managed_calls -= 1
1399 if state.managed_calls == 0:
1400 return
1402 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1403 channel_spin_thread.setDaemon(True)
1404 channel_spin_thread.start()
1407def _channel_managed_call_management(state: _ChannelCallState):
1409 # pylint: disable=too-many-arguments
1410 def create(flags: int, method: bytes, host: Optional[str],
1411 deadline: Optional[float], metadata: Optional[MetadataType],
1412 credentials: Optional[cygrpc.CallCredentials],
1413 operations: Sequence[Sequence[cygrpc.Operation]],
1414 event_handler: UserTag, context) -> cygrpc.IntegratedCall:
1415 """Creates a cygrpc.IntegratedCall.
1417 Args:
1418 flags: An integer bitfield of call flags.
1419 method: The RPC method.
1420 host: A host string for the created call.
1421 deadline: A float to be the deadline of the created call or None if
1422 the call is to have an infinite deadline.
1423 metadata: The metadata for the call or None.
1424 credentials: A cygrpc.CallCredentials or None.
1425 operations: A sequence of sequences of cygrpc.Operations to be
1426 started on the call.
1427 event_handler: A behavior to call to handle the events resultant from
1428 the operations on the call.
1429 context: Context object for distributed tracing.
1430 Returns:
1431 A cygrpc.IntegratedCall with which to conduct an RPC.
1432 """
1433 operations_and_tags = tuple((
1434 operation,
1435 event_handler,
1436 ) for operation in operations)
1437 with state.lock:
1438 call = state.channel.integrated_call(flags, method, host, deadline,
1439 metadata, credentials,
1440 operations_and_tags, context)
1441 if state.managed_calls == 0:
1442 state.managed_calls = 1
1443 _run_channel_spin_thread(state)
1444 else:
1445 state.managed_calls += 1
1446 return call
1448 return create
1451class _ChannelConnectivityState(object):
1452 lock: threading.RLock
1453 channel: grpc.Channel
1454 polling: bool
1455 connectivity: grpc.ChannelConnectivity
1456 try_to_connect: bool
1457 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1458 callbacks_and_connectivities: List[Sequence[Union[Callable[
1459 [grpc.ChannelConnectivity], None], Optional[grpc.ChannelConnectivity]]]]
1460 delivering: bool
1462 def __init__(self, channel: grpc.Channel):
1463 self.lock = threading.RLock()
1464 self.channel = channel
1465 self.polling = False
1466 self.connectivity = None
1467 self.try_to_connect = False
1468 self.callbacks_and_connectivities = []
1469 self.delivering = False
1471 def reset_postfork_child(self) -> None:
1472 self.polling = False
1473 self.connectivity = None
1474 self.try_to_connect = False
1475 self.callbacks_and_connectivities = []
1476 self.delivering = False
1479def _deliveries(
1480 state: _ChannelConnectivityState
1481) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1482 callbacks_needing_update = []
1483 for callback_and_connectivity in state.callbacks_and_connectivities:
1484 callback, callback_connectivity, = callback_and_connectivity
1485 if callback_connectivity is not state.connectivity:
1486 callbacks_needing_update.append(callback)
1487 callback_and_connectivity[1] = state.connectivity
1488 return callbacks_needing_update
1491def _deliver(
1492 state: _ChannelConnectivityState,
1493 initial_connectivity: grpc.ChannelConnectivity,
1494 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]]
1495) -> None:
1496 connectivity = initial_connectivity
1497 callbacks = initial_callbacks
1498 while True:
1499 for callback in callbacks:
1500 cygrpc.block_if_fork_in_progress(state)
1501 try:
1502 callback(connectivity)
1503 except Exception: # pylint: disable=broad-except
1504 _LOGGER.exception(
1505 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1506 with state.lock:
1507 callbacks = _deliveries(state)
1508 if callbacks:
1509 connectivity = state.connectivity
1510 else:
1511 state.delivering = False
1512 return
1515def _spawn_delivery(
1516 state: _ChannelConnectivityState,
1517 callbacks: Sequence[Callable[[grpc.ChannelConnectivity],
1518 None]]) -> None:
1519 delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1520 args=(
1521 state,
1522 state.connectivity,
1523 callbacks,
1524 ))
1525 delivering_thread.setDaemon(True)
1526 delivering_thread.start()
1527 state.delivering = True
1530# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1531def _poll_connectivity(state: _ChannelConnectivityState, channel: grpc.Channel,
1532 initial_try_to_connect: bool) -> None:
1533 try_to_connect = initial_try_to_connect
1534 connectivity = channel.check_connectivity_state(try_to_connect)
1535 with state.lock:
1536 state.connectivity = (
1537 _common.
1538 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1539 callbacks = tuple(
1540 callback for callback, unused_but_known_to_be_none_connectivity in
1541 state.callbacks_and_connectivities)
1542 for callback_and_connectivity in state.callbacks_and_connectivities:
1543 callback_and_connectivity[1] = state.connectivity
1544 if callbacks:
1545 _spawn_delivery(state, callbacks)
1546 while True:
1547 event = channel.watch_connectivity_state(connectivity,
1548 time.time() + 0.2)
1549 cygrpc.block_if_fork_in_progress(state)
1550 with state.lock:
1551 if not state.callbacks_and_connectivities and not state.try_to_connect:
1552 state.polling = False
1553 state.connectivity = None
1554 break
1555 try_to_connect = state.try_to_connect
1556 state.try_to_connect = False
1557 if event.success or try_to_connect:
1558 connectivity = channel.check_connectivity_state(try_to_connect)
1559 with state.lock:
1560 state.connectivity = (
1561 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1562 connectivity])
1563 if not state.delivering:
1564 callbacks = _deliveries(state)
1565 if callbacks:
1566 _spawn_delivery(state, callbacks)
1569def _subscribe(state: _ChannelConnectivityState,
1570 callback: Callable[[grpc.ChannelConnectivity],
1571 None], try_to_connect: bool) -> None:
1572 with state.lock:
1573 if not state.callbacks_and_connectivities and not state.polling:
1574 polling_thread = cygrpc.ForkManagedThread(
1575 target=_poll_connectivity,
1576 args=(state, state.channel, bool(try_to_connect)))
1577 polling_thread.setDaemon(True)
1578 polling_thread.start()
1579 state.polling = True
1580 state.callbacks_and_connectivities.append([callback, None])
1581 elif not state.delivering and state.connectivity is not None:
1582 _spawn_delivery(state, (callback,))
1583 state.try_to_connect |= bool(try_to_connect)
1584 state.callbacks_and_connectivities.append(
1585 [callback, state.connectivity])
1586 else:
1587 state.try_to_connect |= bool(try_to_connect)
1588 state.callbacks_and_connectivities.append([callback, None])
1591def _unsubscribe(state: _ChannelConnectivityState,
1592 callback: Callable[[grpc.ChannelConnectivity], None]) -> None:
1593 with state.lock:
1594 for index, (subscribed_callback, unused_connectivity) in enumerate(
1595 state.callbacks_and_connectivities):
1596 if callback == subscribed_callback:
1597 state.callbacks_and_connectivities.pop(index)
1598 break
1601def _augment_options(
1602 base_options: Sequence[ChannelArgumentType],
1603 compression: Optional[grpc.Compression]
1604) -> Sequence[ChannelArgumentType]:
1605 compression_option = _compression.create_channel_option(compression)
1606 return tuple(base_options) + compression_option + ((
1607 cygrpc.ChannelArgKey.primary_user_agent_string,
1608 _USER_AGENT,
1609 ),)
1612def _separate_channel_options(
1613 options: Sequence[ChannelArgumentType]
1614) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
1615 """Separates core channel options from Python channel options."""
1616 core_options = []
1617 python_options = []
1618 for pair in options:
1619 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1620 python_options.append(pair)
1621 else:
1622 core_options.append(pair)
1623 return python_options, core_options
1626class Channel(grpc.Channel):
1627 """A cygrpc.Channel-backed implementation of grpc.Channel."""
1628 _single_threaded_unary_stream: bool
1629 _channel: cygrpc.Channel
1630 _call_state: _ChannelCallState
1631 _connectivity_state: _ChannelConnectivityState
1633 def __init__(self, target: str, options: Sequence[ChannelArgumentType],
1634 credentials: Optional[grpc.ChannelCredentials],
1635 compression: Optional[grpc.Compression]):
1636 """Constructor.
1638 Args:
1639 target: The target to which to connect.
1640 options: Configuration options for the channel.
1641 credentials: A cygrpc.ChannelCredentials or None.
1642 compression: An optional value indicating the compression method to be
1643 used over the lifetime of the channel.
1644 """
1645 python_options, core_options = _separate_channel_options(options)
1646 self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1647 self._process_python_options(python_options)
1648 self._channel = cygrpc.Channel(
1649 _common.encode(target), _augment_options(core_options, compression),
1650 credentials)
1651 self._call_state = _ChannelCallState(self._channel)
1652 self._connectivity_state = _ChannelConnectivityState(self._channel)
1653 cygrpc.fork_register_channel(self)
1654 if cygrpc.g_gevent_activated:
1655 cygrpc.gevent_increment_channel_count()
1657 def _process_python_options(
1658 self, python_options: Sequence[ChannelArgumentType]) -> None:
1659 """Sets channel attributes according to python-only channel options."""
1660 for pair in python_options:
1661 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1662 self._single_threaded_unary_stream = True
1664 def subscribe(self,
1665 callback: Callable[[grpc.ChannelConnectivity], None],
1666 try_to_connect: Optional[bool] = None) -> None:
1667 _subscribe(self._connectivity_state, callback, try_to_connect)
1669 def unsubscribe(
1670 self, callback: Callable[[grpc.ChannelConnectivity], None]) -> None:
1671 _unsubscribe(self._connectivity_state, callback)
1673 def unary_unary(
1674 self,
1675 method: str,
1676 request_serializer: Optional[SerializingFunction] = None,
1677 response_deserializer: Optional[DeserializingFunction] = None
1678 ) -> grpc.UnaryUnaryMultiCallable:
1679 return _UnaryUnaryMultiCallable(
1680 self._channel, _channel_managed_call_management(self._call_state),
1681 _common.encode(method), request_serializer, response_deserializer)
1683 def unary_stream(
1684 self,
1685 method: str,
1686 request_serializer: Optional[SerializingFunction] = None,
1687 response_deserializer: Optional[DeserializingFunction] = None
1688 ) -> grpc.UnaryStreamMultiCallable:
1689 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
1690 # on a single Python thread results in an appreciable speed-up. However,
1691 # due to slight differences in capability, the multi-threaded variant
1692 # remains the default.
1693 if self._single_threaded_unary_stream:
1694 return _SingleThreadedUnaryStreamMultiCallable(
1695 self._channel, _common.encode(method), request_serializer,
1696 response_deserializer)
1697 else:
1698 return _UnaryStreamMultiCallable(
1699 self._channel,
1700 _channel_managed_call_management(self._call_state),
1701 _common.encode(method), request_serializer,
1702 response_deserializer)
1704 def stream_unary(
1705 self,
1706 method: str,
1707 request_serializer: Optional[SerializingFunction] = None,
1708 response_deserializer: Optional[DeserializingFunction] = None
1709 ) -> grpc.StreamUnaryMultiCallable:
1710 return _StreamUnaryMultiCallable(
1711 self._channel, _channel_managed_call_management(self._call_state),
1712 _common.encode(method), request_serializer, response_deserializer)
1714 def stream_stream(
1715 self,
1716 method: str,
1717 request_serializer: Optional[SerializingFunction] = None,
1718 response_deserializer: Optional[DeserializingFunction] = None
1719 ) -> grpc.StreamStreamMultiCallable:
1720 return _StreamStreamMultiCallable(
1721 self._channel, _channel_managed_call_management(self._call_state),
1722 _common.encode(method), request_serializer, response_deserializer)
1724 def _unsubscribe_all(self) -> None:
1725 state = self._connectivity_state
1726 if state:
1727 with state.lock:
1728 del state.callbacks_and_connectivities[:]
1730 def _close(self) -> None:
1731 self._unsubscribe_all()
1732 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1733 cygrpc.fork_unregister_channel(self)
1734 if cygrpc.g_gevent_activated:
1735 cygrpc.gevent_decrement_channel_count()
1737 def _close_on_fork(self) -> None:
1738 self._unsubscribe_all()
1739 self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
1740 'Channel closed due to fork')
1742 def __enter__(self):
1743 return self
1745 def __exit__(self, exc_type, exc_val, exc_tb):
1746 self._close()
1747 return False
1749 def close(self) -> None:
1750 self._close()
1752 def __del__(self):
1753 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
1754 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
1755 # here (or more likely, call self._close() here). We don't do this today
1756 # because many valid use cases today allow the channel to be deleted
1757 # immediately after stubs are created. After a sufficient period of time
1758 # has passed for all users to be trusted to freeze out to their channels
1759 # for as long as they are in use and to close them after using them,
1760 # then deletion of this grpc._channel.Channel instance can be made to
1761 # effect closure of the underlying cygrpc.Channel instance.
1762 try:
1763 self._unsubscribe_all()
1764 except: # pylint: disable=bare-except
1765 # Exceptions in __del__ are ignored by Python anyway, but they can
1766 # keep spamming logs. Just silence them.
1767 pass