Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for synchronous bidirectional streaming RPCs."""
17import collections
18import datetime
19import logging
20import queue as queue_module
21import threading
22import time
24from google.api_core import exceptions
25from google.api_core.bidi_base import BidiRpcBase
27_LOGGER = logging.getLogger(__name__)
28_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"
31class _RequestQueueGenerator(object):
32 """A helper for sending requests to a gRPC stream from a Queue.
34 This generator takes requests off a given queue and yields them to gRPC.
36 This helper is useful when you have an indeterminate, indefinite, or
37 otherwise open-ended set of requests to send through a request-streaming
38 (or bidirectional) RPC.
41 Example::
43 requests = request_queue_generator(q)
44 call = stub.StreamingRequest(iter(requests))
45 requests.call = call
47 for response in call:
48 print(response)
49 q.put(...)
52 Args:
53 queue (queue_module.Queue): The request queue.
54 period (float): The number of seconds to wait for items from the queue
55 before checking if the RPC is cancelled. In practice, this
56 determines the maximum amount of time the request consumption
57 thread will live after the RPC is cancelled.
58 initial_request (Union[protobuf.Message,
59 Callable[None, protobuf.Message]]): The initial request to
60 yield. This is done independently of the request queue to allow fo
61 easily restarting streams that require some initial configuration
62 request.
63 """
65 def __init__(self, queue, period=1, initial_request=None):
66 self._queue = queue
67 self._period = period
68 self._initial_request = initial_request
69 self.call = None
71 def _is_active(self):
72 # Note: there is a possibility that this starts *before* the call
73 # property is set. So we have to check if self.call is set before
74 # seeing if it's active. We need to return True if self.call is None.
75 # See https://github.com/googleapis/python-api-core/issues/560.
76 return self.call is None or self.call.is_active()
78 def __iter__(self):
79 # The reason this is necessary is because gRPC takes an iterator as the
80 # request for request-streaming RPCs. gRPC consumes this iterator in
81 # another thread to allow it to block while generating requests for
82 # the stream. However, if the generator blocks indefinitely gRPC will
83 # not be able to clean up the thread as it'll be blocked on
84 # `next(iterator)` and not be able to check the channel status to stop
85 # iterating. This helper mitigates that by waiting on the queue with
86 # a timeout and checking the RPC state before yielding.
87 #
88 # Finally, it allows for retrying without swapping queues because if
89 # it does pull an item off the queue when the RPC is inactive, it'll
90 # immediately put it back and then exit. This is necessary because
91 # yielding the item in this case will cause gRPC to discard it. In
92 # practice, this means that the order of messages is not guaranteed.
93 # If such a thing is necessary it would be easy to use a priority
94 # queue.
95 #
96 # Note that it is possible to accomplish this behavior without
97 # "spinning" (using a queue timeout). One possible way would be to use
98 # more threads to multiplex the grpc end event with the queue, another
99 # possible way is to use selectors and a custom event/queue object.
100 # Both of these approaches are significant from an engineering
101 # perspective for small benefit - the CPU consumed by spinning is
102 # pretty minuscule.
104 if self._initial_request is not None:
105 if callable(self._initial_request):
106 yield self._initial_request()
107 else:
108 yield self._initial_request
110 while True:
111 try:
112 item = self._queue.get(timeout=self._period)
113 except queue_module.Empty:
114 if not self._is_active():
115 _LOGGER.debug(
116 "Empty queue and inactive call, exiting request " "generator."
117 )
118 return
119 else:
120 # call is still active, keep waiting for queue items.
121 continue
123 # The consumer explicitly sent "None", indicating that the request
124 # should end.
125 if item is None:
126 _LOGGER.debug("Cleanly exiting request generator.")
127 return
129 if not self._is_active():
130 # We have an item, but the call is closed. We should put the
131 # item back on the queue so that the next call can consume it.
132 self._queue.put(item)
133 _LOGGER.debug(
134 "Inactive call, replacing item on queue and exiting "
135 "request generator."
136 )
137 return
139 yield item
142class _Throttle(object):
143 """A context manager limiting the total entries in a sliding time window.
145 If more than ``access_limit`` attempts are made to enter the context manager
146 instance in the last ``time window`` interval, the exceeding requests block
147 until enough time elapses.
149 The context manager instances are thread-safe and can be shared between
150 multiple threads. If multiple requests are blocked and waiting to enter,
151 the exact order in which they are allowed to proceed is not determined.
153 Example::
155 max_three_per_second = _Throttle(
156 access_limit=3, time_window=datetime.timedelta(seconds=1)
157 )
159 for i in range(5):
160 with max_three_per_second as time_waited:
161 print("{}: Waited {} seconds to enter".format(i, time_waited))
163 Args:
164 access_limit (int): the maximum number of entries allowed in the time window
165 time_window (datetime.timedelta): the width of the sliding time window
166 """
168 def __init__(self, access_limit, time_window):
169 if access_limit < 1:
170 raise ValueError("access_limit argument must be positive")
172 if time_window <= datetime.timedelta(0):
173 raise ValueError("time_window argument must be a positive timedelta")
175 self._time_window = time_window
176 self._access_limit = access_limit
177 self._past_entries = collections.deque(
178 maxlen=access_limit
179 ) # least recent first
180 self._entry_lock = threading.Lock()
182 def __enter__(self):
183 with self._entry_lock:
184 cutoff_time = datetime.datetime.now() - self._time_window
186 # drop the entries that are too old, as they are no longer relevant
187 while self._past_entries and self._past_entries[0] < cutoff_time:
188 self._past_entries.popleft()
190 if len(self._past_entries) < self._access_limit:
191 self._past_entries.append(datetime.datetime.now())
192 return 0.0 # no waiting was needed
194 to_wait = (self._past_entries[0] - cutoff_time).total_seconds()
195 time.sleep(to_wait)
197 self._past_entries.append(datetime.datetime.now())
198 return to_wait
200 def __exit__(self, *_):
201 pass
203 def __repr__(self):
204 return "{}(access_limit={}, time_window={})".format(
205 self.__class__.__name__, self._access_limit, repr(self._time_window)
206 )
209class BidiRpc(BidiRpcBase):
210 """A helper for consuming a bi-directional streaming RPC.
212 This maps gRPC's built-in interface which uses a request iterator and a
213 response iterator into a socket-like :func:`send` and :func:`recv`. This
214 is a more useful pattern for long-running or asymmetric streams (streams
215 where there is not a direct correlation between the requests and
216 responses).
218 Example::
220 initial_request = example_pb2.StreamingRpcRequest(
221 setting='example')
222 rpc = BidiRpc(
223 stub.StreamingRpc,
224 initial_request=initial_request,
225 metadata=[('name', 'value')]
226 )
228 rpc.open()
230 while rpc.is_active():
231 print(rpc.recv())
232 rpc.send(example_pb2.StreamingRpcRequest(
233 data='example'))
235 rpc.close()
237 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.
239 Args:
240 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
241 start the RPC.
242 initial_request (Union[protobuf.Message,
243 Callable[None, protobuf.Message]]): The initial request to
244 yield. This is useful if an initial request is needed to start the
245 stream.
246 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
247 the request.
248 """
250 def _create_queue(self):
251 """Create a queue for requests."""
252 return queue_module.Queue()
254 def open(self):
255 """Opens the stream."""
256 if self.is_active:
257 raise ValueError("Cannot open an already open stream.")
259 request_generator = _RequestQueueGenerator(
260 self._request_queue, initial_request=self._initial_request
261 )
262 try:
263 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
264 except exceptions.GoogleAPICallError as exc:
265 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
266 # available from the ``response`` property on the mapped exception.
267 self._on_call_done(exc.response)
268 raise
270 request_generator.call = call
272 # TODO: api_core should expose the future interface for wrapped
273 # callables as well.
274 if hasattr(call, "_wrapped"): # pragma: NO COVER
275 call._wrapped.add_done_callback(self._on_call_done)
276 else:
277 call.add_done_callback(self._on_call_done)
279 self._request_generator = request_generator
280 self.call = call
282 def close(self):
283 """Closes the stream."""
284 if self.call is None:
285 return
287 self._request_queue.put(None)
288 self.call.cancel()
289 self._request_generator = None
290 self._initial_request = None
291 self._callbacks = []
292 # Don't set self.call to None. Keep it around so that send/recv can
293 # raise the error.
295 def send(self, request):
296 """Queue a message to be sent on the stream.
298 Send is non-blocking.
300 If the underlying RPC has been closed, this will raise.
302 Args:
303 request (protobuf.Message): The request to send.
304 """
305 if self.call is None:
306 raise ValueError("Cannot send on an RPC stream that has never been opened.")
308 # Don't use self.is_active(), as ResumableBidiRpc will overload it
309 # to mean something semantically different.
310 if self.call.is_active():
311 self._request_queue.put(request)
312 else:
313 # calling next should cause the call to raise.
314 next(self.call)
316 def recv(self):
317 """Wait for a message to be returned from the stream.
319 Recv is blocking.
321 If the underlying RPC has been closed, this will raise.
323 Returns:
324 protobuf.Message: The received message.
325 """
326 if self.call is None:
327 raise ValueError("Cannot recv on an RPC stream that has never been opened.")
329 return next(self.call)
331 @property
332 def is_active(self):
333 """True if this stream is currently open and active."""
334 return self.call is not None and self.call.is_active()
337def _never_terminate(future_or_error):
338 """By default, no errors cause BiDi termination."""
339 return False
342class ResumableBidiRpc(BidiRpc):
343 """A :class:`BidiRpc` that can automatically resume the stream on errors.
345 It uses the ``should_recover`` arg to determine if it should re-establish
346 the stream on error.
348 Example::
350 def should_recover(exc):
351 return (
352 isinstance(exc, grpc.RpcError) and
353 exc.code() == grpc.StatusCode.UNAVAILABLE)
355 initial_request = example_pb2.StreamingRpcRequest(
356 setting='example')
358 metadata = [('header_name', 'value')]
360 rpc = ResumableBidiRpc(
361 stub.StreamingRpc,
362 should_recover=should_recover,
363 initial_request=initial_request,
364 metadata=metadata
365 )
367 rpc.open()
369 while rpc.is_active():
370 print(rpc.recv())
371 rpc.send(example_pb2.StreamingRpcRequest(
372 data='example'))
374 Args:
375 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
376 start the RPC.
377 initial_request (Union[protobuf.Message,
378 Callable[None, protobuf.Message]]): The initial request to
379 yield. This is useful if an initial request is needed to start the
380 stream.
381 should_recover (Callable[[Exception], bool]): A function that returns
382 True if the stream should be recovered. This will be called
383 whenever an error is encountered on the stream.
384 should_terminate (Callable[[Exception], bool]): A function that returns
385 True if the stream should be terminated. This will be called
386 whenever an error is encountered on the stream.
387 metadata Sequence[Tuple(str, str)]: RPC metadata to include in
388 the request.
389 throttle_reopen (bool): If ``True``, throttling will be applied to
390 stream reopen calls. Defaults to ``False``.
391 """
393 def __init__(
394 self,
395 start_rpc,
396 should_recover,
397 should_terminate=_never_terminate,
398 initial_request=None,
399 metadata=None,
400 throttle_reopen=False,
401 ):
402 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata)
403 self._should_recover = should_recover
404 self._should_terminate = should_terminate
405 self._operational_lock = threading.RLock()
406 self._finalized = False
407 self._finalize_lock = threading.Lock()
409 if throttle_reopen:
410 self._reopen_throttle = _Throttle(
411 access_limit=5, time_window=datetime.timedelta(seconds=10)
412 )
413 else:
414 self._reopen_throttle = None
416 def _finalize(self, result):
417 with self._finalize_lock:
418 if self._finalized:
419 return
421 for callback in self._callbacks:
422 callback(result)
424 self._finalized = True
426 def _on_call_done(self, future):
427 # Unlike the base class, we only execute the callbacks on a terminal
428 # error, not for errors that we can recover from. Note that grpc's
429 # "future" here is also a grpc.RpcError.
430 with self._operational_lock:
431 if self._should_terminate(future):
432 self._finalize(future)
433 elif not self._should_recover(future):
434 self._finalize(future)
435 else:
436 _LOGGER.debug("Re-opening stream from gRPC callback.")
437 self._reopen()
439 def _reopen(self):
440 with self._operational_lock:
441 # Another thread already managed to re-open this stream.
442 if self.call is not None and self.call.is_active():
443 _LOGGER.debug("Stream was already re-established.")
444 return
446 self.call = None
447 # Request generator should exit cleanly since the RPC its bound to
448 # has exited.
449 self._request_generator = None
451 # Note: we do not currently do any sort of backoff here. The
452 # assumption is that re-establishing the stream under normal
453 # circumstances will happen in intervals greater than 60s.
454 # However, it is possible in a degenerative case that the server
455 # closes the stream rapidly which would lead to thrashing here,
456 # but hopefully in those cases the server would return a non-
457 # retryable error.
459 try:
460 if self._reopen_throttle:
461 with self._reopen_throttle:
462 self.open()
463 else:
464 self.open()
465 # If re-opening or re-calling the method fails for any reason,
466 # consider it a terminal error and finalize the stream.
467 except Exception as exc:
468 _LOGGER.debug("Failed to re-open stream due to %s", exc)
469 self._finalize(exc)
470 raise
472 _LOGGER.info("Re-established stream")
474 def _recoverable(self, method, *args, **kwargs):
475 """Wraps a method to recover the stream and retry on error.
477 If a retryable error occurs while making the call, then the stream will
478 be re-opened and the method will be retried. This happens indefinitely
479 so long as the error is a retryable one. If an error occurs while
480 re-opening the stream, then this method will raise immediately and
481 trigger finalization of this object.
483 Args:
484 method (Callable[..., Any]): The method to call.
485 args: The args to pass to the method.
486 kwargs: The kwargs to pass to the method.
487 """
488 while True:
489 try:
490 return method(*args, **kwargs)
492 except Exception as exc:
493 with self._operational_lock:
494 _LOGGER.debug("Call to retryable %r caused %s.", method, exc)
496 if self._should_terminate(exc):
497 self.close()
498 _LOGGER.debug("Terminating %r due to %s.", method, exc)
499 self._finalize(exc)
500 break
502 if not self._should_recover(exc):
503 self.close()
504 _LOGGER.debug("Not retrying %r due to %s.", method, exc)
505 self._finalize(exc)
506 raise exc
508 _LOGGER.debug("Re-opening stream from retryable %r.", method)
509 self._reopen()
511 def _send(self, request):
512 # Grab a reference to the RPC call. Because another thread (notably
513 # the gRPC error thread) can modify self.call (by invoking reopen),
514 # we should ensure our reference can not change underneath us.
515 # If self.call is modified (such as replaced with a new RPC call) then
516 # this will use the "old" RPC, which should result in the same
517 # exception passed into gRPC's error handler being raised here, which
518 # will be handled by the usual error handling in retryable.
519 with self._operational_lock:
520 call = self.call
522 if call is None:
523 raise ValueError("Cannot send on an RPC that has never been opened.")
525 # Don't use self.is_active(), as ResumableBidiRpc will overload it
526 # to mean something semantically different.
527 if call.is_active():
528 self._request_queue.put(request)
529 pass
530 else:
531 # calling next should cause the call to raise.
532 next(call)
534 def send(self, request):
535 return self._recoverable(self._send, request)
537 def _recv(self):
538 with self._operational_lock:
539 call = self.call
541 if call is None:
542 raise ValueError("Cannot recv on an RPC that has never been opened.")
544 return next(call)
546 def recv(self):
547 return self._recoverable(self._recv)
549 def close(self):
550 self._finalize(None)
551 super(ResumableBidiRpc, self).close()
553 @property
554 def is_active(self):
555 """bool: True if this stream is currently open and active."""
556 # Use the operational lock. It's entirely possible for something
557 # to check the active state *while* the RPC is being retried.
558 # Also, use finalized to track the actual terminal state here.
559 # This is because if the stream is re-established by the gRPC thread
560 # it's technically possible to check this between when gRPC marks the
561 # RPC as inactive and when gRPC executes our callback that re-opens
562 # the stream.
563 with self._operational_lock:
564 return self.call is not None and not self._finalized
567class BackgroundConsumer(object):
568 """A bi-directional stream consumer that runs in a separate thread.
570 This maps the consumption of a stream into a callback-based model. It also
571 provides :func:`pause` and :func:`resume` to allow for flow-control.
573 Example::
575 def should_recover(exc):
576 return (
577 isinstance(exc, grpc.RpcError) and
578 exc.code() == grpc.StatusCode.UNAVAILABLE)
580 initial_request = example_pb2.StreamingRpcRequest(
581 setting='example')
583 rpc = ResumeableBidiRpc(
584 stub.StreamingRpc,
585 initial_request=initial_request,
586 should_recover=should_recover)
588 def on_response(response):
589 print(response)
591 consumer = BackgroundConsumer(rpc, on_response)
592 consumer.start()
594 Note that error handling *must* be done by using the provided
595 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
596 whenever the RPC itself exits and will not provide any error details.
598 Args:
599 bidi_rpc (BidiRpc): The RPC to consume. Should not have been
600 ``open()``ed yet.
601 on_response (Callable[[protobuf.Message], None]): The callback to
602 be called for every response on the stream.
603 on_fatal_exception (Callable[[Exception], None]): The callback to
604 be called on fatal errors during consumption. Default None.
605 """
607 def __init__(self, bidi_rpc, on_response, on_fatal_exception=None):
608 self._bidi_rpc = bidi_rpc
609 self._on_response = on_response
610 self._paused = False
611 self._on_fatal_exception = on_fatal_exception
612 self._wake = threading.Condition()
613 self._thread = None
614 self._operational_lock = threading.Lock()
616 def _on_call_done(self, future):
617 # Resume the thread if it's paused, this prevents blocking forever
618 # when the RPC has terminated.
619 self.resume()
621 def _thread_main(self, ready):
622 try:
623 ready.set()
624 self._bidi_rpc.add_done_callback(self._on_call_done)
625 self._bidi_rpc.open()
627 while self._bidi_rpc.is_active:
628 # Do not allow the paused status to change at all during this
629 # section. There is a condition where we could be resumed
630 # between checking if we are paused and calling wake.wait(),
631 # which means that we will miss the notification to wake up
632 # (oops!) and wait for a notification that will never come.
633 # Keeping the lock throughout avoids that.
634 # In the future, we could use `Condition.wait_for` if we drop
635 # Python 2.7.
636 # See: https://github.com/googleapis/python-api-core/issues/211
637 with self._wake:
638 while self._paused:
639 _LOGGER.debug("paused, waiting for waking.")
640 self._wake.wait()
641 _LOGGER.debug("woken.")
643 _LOGGER.debug("waiting for recv.")
644 response = self._bidi_rpc.recv()
645 _LOGGER.debug("recved response.")
646 if self._on_response is not None:
647 self._on_response(response)
649 except exceptions.GoogleAPICallError as exc:
650 _LOGGER.debug(
651 "%s caught error %s and will exit. Generally this is due to "
652 "the RPC itself being cancelled and the error will be "
653 "surfaced to the calling code.",
654 _BIDIRECTIONAL_CONSUMER_NAME,
655 exc,
656 exc_info=True,
657 )
658 if self._on_fatal_exception is not None:
659 self._on_fatal_exception(exc)
661 except Exception as exc:
662 _LOGGER.exception(
663 "%s caught unexpected exception %s and will exit.",
664 _BIDIRECTIONAL_CONSUMER_NAME,
665 exc,
666 )
667 if self._on_fatal_exception is not None:
668 self._on_fatal_exception(exc)
670 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
672 def start(self):
673 """Start the background thread and begin consuming the thread."""
674 with self._operational_lock:
675 ready = threading.Event()
676 thread = threading.Thread(
677 name=_BIDIRECTIONAL_CONSUMER_NAME,
678 target=self._thread_main,
679 args=(ready,),
680 daemon=True,
681 )
682 thread.start()
683 # Other parts of the code rely on `thread.is_alive` which
684 # isn't sufficient to know if a thread is active, just that it may
685 # soon be active. This can cause races. Further protect
686 # against races by using a ready event and wait on it to be set.
687 ready.wait()
688 self._thread = thread
689 _LOGGER.debug("Started helper thread %s", thread.name)
691 def stop(self):
692 """Stop consuming the stream and shutdown the background thread.
694 NOTE: Cannot be called within `_thread_main`, since it is not
695 possible to join a thread to itself.
696 """
697 with self._operational_lock:
698 self._bidi_rpc.close()
700 if self._thread is not None:
701 # Resume the thread to wake it up in case it is sleeping.
702 self.resume()
703 # The daemonized thread may itself block, so don't wait
704 # for it longer than a second.
705 self._thread.join(1.0)
706 if self._thread.is_alive(): # pragma: NO COVER
707 _LOGGER.warning("Background thread did not exit.")
709 self._thread = None
710 self._on_response = None
711 self._on_fatal_exception = None
713 @property
714 def is_active(self):
715 """bool: True if the background thread is active."""
716 return self._thread is not None and self._thread.is_alive()
718 def pause(self):
719 """Pauses the response stream.
721 This does *not* pause the request stream.
722 """
723 with self._wake:
724 self._paused = True
726 def resume(self):
727 """Resumes the response stream."""
728 with self._wake:
729 self._paused = False
730 self._wake.notify_all()
732 @property
733 def is_paused(self):
734 """bool: True if the response stream is paused."""
735 return self._paused