Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/bidi.py: 22%
262 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 06:27 +0000
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Bi-directional streaming RPC helpers."""
17import collections
18import datetime
19import logging
20import queue as queue_module
21import threading
22import time
24from google.api_core import exceptions
26_LOGGER = logging.getLogger(__name__)
27_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"
30class _RequestQueueGenerator(object):
31 """A helper for sending requests to a gRPC stream from a Queue.
33 This generator takes requests off a given queue and yields them to gRPC.
35 This helper is useful when you have an indeterminate, indefinite, or
36 otherwise open-ended set of requests to send through a request-streaming
37 (or bidirectional) RPC.
39 The reason this is necessary is because gRPC takes an iterator as the
40 request for request-streaming RPCs. gRPC consumes this iterator in another
41 thread to allow it to block while generating requests for the stream.
42 However, if the generator blocks indefinitely gRPC will not be able to
43 clean up the thread as it'll be blocked on `next(iterator)` and not be able
44 to check the channel status to stop iterating. This helper mitigates that
45 by waiting on the queue with a timeout and checking the RPC state before
46 yielding.
48 Finally, it allows for retrying without swapping queues because if it does
49 pull an item off the queue when the RPC is inactive, it'll immediately put
50 it back and then exit. This is necessary because yielding the item in this
51 case will cause gRPC to discard it. In practice, this means that the order
52 of messages is not guaranteed. If such a thing is necessary it would be
53 easy to use a priority queue.
55 Example::
57 requests = request_queue_generator(q)
58 call = stub.StreamingRequest(iter(requests))
59 requests.call = call
61 for response in call:
62 print(response)
63 q.put(...)
65 Note that it is possible to accomplish this behavior without "spinning"
66 (using a queue timeout). One possible way would be to use more threads to
67 multiplex the grpc end event with the queue, another possible way is to
68 use selectors and a custom event/queue object. Both of these approaches
69 are significant from an engineering perspective for small benefit - the
70 CPU consumed by spinning is pretty minuscule.
72 Args:
73 queue (queue_module.Queue): The request queue.
74 period (float): The number of seconds to wait for items from the queue
75 before checking if the RPC is cancelled. In practice, this
76 determines the maximum amount of time the request consumption
77 thread will live after the RPC is cancelled.
78 initial_request (Union[protobuf.Message,
79 Callable[None, protobuf.Message]]): The initial request to
80 yield. This is done independently of the request queue to allow fo
81 easily restarting streams that require some initial configuration
82 request.
83 """
85 def __init__(self, queue, period=1, initial_request=None):
86 self._queue = queue
87 self._period = period
88 self._initial_request = initial_request
89 self.call = None
91 def _is_active(self):
92 # Note: there is a possibility that this starts *before* the call
93 # property is set. So we have to check if self.call is set before
94 # seeing if it's active. We need to return True if self.call is None.
95 # See https://github.com/googleapis/python-api-core/issues/560.
96 return self.call is None or self.call.is_active()
98 def __iter__(self):
99 if self._initial_request is not None:
100 if callable(self._initial_request):
101 yield self._initial_request()
102 else:
103 yield self._initial_request
105 while True:
106 try:
107 item = self._queue.get(timeout=self._period)
108 except queue_module.Empty:
109 if not self._is_active():
110 _LOGGER.debug(
111 "Empty queue and inactive call, exiting request " "generator."
112 )
113 return
114 else:
115 # call is still active, keep waiting for queue items.
116 continue
118 # The consumer explicitly sent "None", indicating that the request
119 # should end.
120 if item is None:
121 _LOGGER.debug("Cleanly exiting request generator.")
122 return
124 if not self._is_active():
125 # We have an item, but the call is closed. We should put the
126 # item back on the queue so that the next call can consume it.
127 self._queue.put(item)
128 _LOGGER.debug(
129 "Inactive call, replacing item on queue and exiting "
130 "request generator."
131 )
132 return
134 yield item
137class _Throttle(object):
138 """A context manager limiting the total entries in a sliding time window.
140 If more than ``access_limit`` attempts are made to enter the context manager
141 instance in the last ``time window`` interval, the exceeding requests block
142 until enough time elapses.
144 The context manager instances are thread-safe and can be shared between
145 multiple threads. If multiple requests are blocked and waiting to enter,
146 the exact order in which they are allowed to proceed is not determined.
148 Example::
150 max_three_per_second = _Throttle(
151 access_limit=3, time_window=datetime.timedelta(seconds=1)
152 )
154 for i in range(5):
155 with max_three_per_second as time_waited:
156 print("{}: Waited {} seconds to enter".format(i, time_waited))
158 Args:
159 access_limit (int): the maximum number of entries allowed in the time window
160 time_window (datetime.timedelta): the width of the sliding time window
161 """
163 def __init__(self, access_limit, time_window):
164 if access_limit < 1:
165 raise ValueError("access_limit argument must be positive")
167 if time_window <= datetime.timedelta(0):
168 raise ValueError("time_window argument must be a positive timedelta")
170 self._time_window = time_window
171 self._access_limit = access_limit
172 self._past_entries = collections.deque(
173 maxlen=access_limit
174 ) # least recent first
175 self._entry_lock = threading.Lock()
177 def __enter__(self):
178 with self._entry_lock:
179 cutoff_time = datetime.datetime.now() - self._time_window
181 # drop the entries that are too old, as they are no longer relevant
182 while self._past_entries and self._past_entries[0] < cutoff_time:
183 self._past_entries.popleft()
185 if len(self._past_entries) < self._access_limit:
186 self._past_entries.append(datetime.datetime.now())
187 return 0.0 # no waiting was needed
189 to_wait = (self._past_entries[0] - cutoff_time).total_seconds()
190 time.sleep(to_wait)
192 self._past_entries.append(datetime.datetime.now())
193 return to_wait
195 def __exit__(self, *_):
196 pass
198 def __repr__(self):
199 return "{}(access_limit={}, time_window={})".format(
200 self.__class__.__name__, self._access_limit, repr(self._time_window)
201 )
204class BidiRpc(object):
205 """A helper for consuming a bi-directional streaming RPC.
207 This maps gRPC's built-in interface which uses a request iterator and a
208 response iterator into a socket-like :func:`send` and :func:`recv`. This
209 is a more useful pattern for long-running or asymmetric streams (streams
210 where there is not a direct correlation between the requests and
211 responses).
213 Example::
215 initial_request = example_pb2.StreamingRpcRequest(
216 setting='example')
217 rpc = BidiRpc(
218 stub.StreamingRpc,
219 initial_request=initial_request,
220 metadata=[('name', 'value')]
221 )
223 rpc.open()
225 while rpc.is_active():
226 print(rpc.recv())
227 rpc.send(example_pb2.StreamingRpcRequest(
228 data='example'))
230 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.
232 Args:
233 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
234 start the RPC.
235 initial_request (Union[protobuf.Message,
236 Callable[None, protobuf.Message]]): The initial request to
237 yield. This is useful if an initial request is needed to start the
238 stream.
239 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
240 the request.
241 """
243 def __init__(self, start_rpc, initial_request=None, metadata=None):
244 self._start_rpc = start_rpc
245 self._initial_request = initial_request
246 self._rpc_metadata = metadata
247 self._request_queue = queue_module.Queue()
248 self._request_generator = None
249 self._is_active = False
250 self._callbacks = []
251 self.call = None
253 def add_done_callback(self, callback):
254 """Adds a callback that will be called when the RPC terminates.
256 This occurs when the RPC errors or is successfully terminated.
258 Args:
259 callback (Callable[[grpc.Future], None]): The callback to execute.
260 It will be provided with the same gRPC future as the underlying
261 stream which will also be a :class:`grpc.Call`.
262 """
263 self._callbacks.append(callback)
265 def _on_call_done(self, future):
266 # This occurs when the RPC errors or is successfully terminated.
267 # Note that grpc's "future" here can also be a grpc.RpcError.
268 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
269 # that `grpc.RpcError` is also `grpc.call`.
270 for callback in self._callbacks:
271 callback(future)
273 def open(self):
274 """Opens the stream."""
275 if self.is_active:
276 raise ValueError("Can not open an already open stream.")
278 request_generator = _RequestQueueGenerator(
279 self._request_queue, initial_request=self._initial_request
280 )
281 try:
282 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
283 except exceptions.GoogleAPICallError as exc:
284 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
285 # available from the ``response`` property on the mapped exception.
286 self._on_call_done(exc.response)
287 raise
289 request_generator.call = call
291 # TODO: api_core should expose the future interface for wrapped
292 # callables as well.
293 if hasattr(call, "_wrapped"): # pragma: NO COVER
294 call._wrapped.add_done_callback(self._on_call_done)
295 else:
296 call.add_done_callback(self._on_call_done)
298 self._request_generator = request_generator
299 self.call = call
301 def close(self):
302 """Closes the stream."""
303 if self.call is None:
304 return
306 self._request_queue.put(None)
307 self.call.cancel()
308 self._request_generator = None
309 # Don't set self.call to None. Keep it around so that send/recv can
310 # raise the error.
312 def send(self, request):
313 """Queue a message to be sent on the stream.
315 Send is non-blocking.
317 If the underlying RPC has been closed, this will raise.
319 Args:
320 request (protobuf.Message): The request to send.
321 """
322 if self.call is None:
323 raise ValueError("Can not send() on an RPC that has never been open()ed.")
325 # Don't use self.is_active(), as ResumableBidiRpc will overload it
326 # to mean something semantically different.
327 if self.call.is_active():
328 self._request_queue.put(request)
329 else:
330 # calling next should cause the call to raise.
331 next(self.call)
333 def recv(self):
334 """Wait for a message to be returned from the stream.
336 Recv is blocking.
338 If the underlying RPC has been closed, this will raise.
340 Returns:
341 protobuf.Message: The received message.
342 """
343 if self.call is None:
344 raise ValueError("Can not recv() on an RPC that has never been open()ed.")
346 return next(self.call)
348 @property
349 def is_active(self):
350 """bool: True if this stream is currently open and active."""
351 return self.call is not None and self.call.is_active()
353 @property
354 def pending_requests(self):
355 """int: Returns an estimate of the number of queued requests."""
356 return self._request_queue.qsize()
359def _never_terminate(future_or_error):
360 """By default, no errors cause BiDi termination."""
361 return False
364class ResumableBidiRpc(BidiRpc):
365 """A :class:`BidiRpc` that can automatically resume the stream on errors.
367 It uses the ``should_recover`` arg to determine if it should re-establish
368 the stream on error.
370 Example::
372 def should_recover(exc):
373 return (
374 isinstance(exc, grpc.RpcError) and
375 exc.code() == grpc.StatusCode.UNAVAILABLE)
377 initial_request = example_pb2.StreamingRpcRequest(
378 setting='example')
380 metadata = [('header_name', 'value')]
382 rpc = ResumableBidiRpc(
383 stub.StreamingRpc,
384 should_recover=should_recover,
385 initial_request=initial_request,
386 metadata=metadata
387 )
389 rpc.open()
391 while rpc.is_active():
392 print(rpc.recv())
393 rpc.send(example_pb2.StreamingRpcRequest(
394 data='example'))
396 Args:
397 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
398 start the RPC.
399 initial_request (Union[protobuf.Message,
400 Callable[None, protobuf.Message]]): The initial request to
401 yield. This is useful if an initial request is needed to start the
402 stream.
403 should_recover (Callable[[Exception], bool]): A function that returns
404 True if the stream should be recovered. This will be called
405 whenever an error is encountered on the stream.
406 should_terminate (Callable[[Exception], bool]): A function that returns
407 True if the stream should be terminated. This will be called
408 whenever an error is encountered on the stream.
409 metadata Sequence[Tuple(str, str)]: RPC metadata to include in
410 the request.
411 throttle_reopen (bool): If ``True``, throttling will be applied to
412 stream reopen calls. Defaults to ``False``.
413 """
415 def __init__(
416 self,
417 start_rpc,
418 should_recover,
419 should_terminate=_never_terminate,
420 initial_request=None,
421 metadata=None,
422 throttle_reopen=False,
423 ):
424 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata)
425 self._should_recover = should_recover
426 self._should_terminate = should_terminate
427 self._operational_lock = threading.RLock()
428 self._finalized = False
429 self._finalize_lock = threading.Lock()
431 if throttle_reopen:
432 self._reopen_throttle = _Throttle(
433 access_limit=5, time_window=datetime.timedelta(seconds=10)
434 )
435 else:
436 self._reopen_throttle = None
438 def _finalize(self, result):
439 with self._finalize_lock:
440 if self._finalized:
441 return
443 for callback in self._callbacks:
444 callback(result)
446 self._finalized = True
448 def _on_call_done(self, future):
449 # Unlike the base class, we only execute the callbacks on a terminal
450 # error, not for errors that we can recover from. Note that grpc's
451 # "future" here is also a grpc.RpcError.
452 with self._operational_lock:
453 if self._should_terminate(future):
454 self._finalize(future)
455 elif not self._should_recover(future):
456 self._finalize(future)
457 else:
458 _LOGGER.debug("Re-opening stream from gRPC callback.")
459 self._reopen()
461 def _reopen(self):
462 with self._operational_lock:
463 # Another thread already managed to re-open this stream.
464 if self.call is not None and self.call.is_active():
465 _LOGGER.debug("Stream was already re-established.")
466 return
468 self.call = None
469 # Request generator should exit cleanly since the RPC its bound to
470 # has exited.
471 self._request_generator = None
473 # Note: we do not currently do any sort of backoff here. The
474 # assumption is that re-establishing the stream under normal
475 # circumstances will happen in intervals greater than 60s.
476 # However, it is possible in a degenerative case that the server
477 # closes the stream rapidly which would lead to thrashing here,
478 # but hopefully in those cases the server would return a non-
479 # retryable error.
481 try:
482 if self._reopen_throttle:
483 with self._reopen_throttle:
484 self.open()
485 else:
486 self.open()
487 # If re-opening or re-calling the method fails for any reason,
488 # consider it a terminal error and finalize the stream.
489 except Exception as exc:
490 _LOGGER.debug("Failed to re-open stream due to %s", exc)
491 self._finalize(exc)
492 raise
494 _LOGGER.info("Re-established stream")
496 def _recoverable(self, method, *args, **kwargs):
497 """Wraps a method to recover the stream and retry on error.
499 If a retryable error occurs while making the call, then the stream will
500 be re-opened and the method will be retried. This happens indefinitely
501 so long as the error is a retryable one. If an error occurs while
502 re-opening the stream, then this method will raise immediately and
503 trigger finalization of this object.
505 Args:
506 method (Callable[..., Any]): The method to call.
507 args: The args to pass to the method.
508 kwargs: The kwargs to pass to the method.
509 """
510 while True:
511 try:
512 return method(*args, **kwargs)
514 except Exception as exc:
515 with self._operational_lock:
516 _LOGGER.debug("Call to retryable %r caused %s.", method, exc)
518 if self._should_terminate(exc):
519 self.close()
520 _LOGGER.debug("Terminating %r due to %s.", method, exc)
521 self._finalize(exc)
522 break
524 if not self._should_recover(exc):
525 self.close()
526 _LOGGER.debug("Not retrying %r due to %s.", method, exc)
527 self._finalize(exc)
528 raise exc
530 _LOGGER.debug("Re-opening stream from retryable %r.", method)
531 self._reopen()
533 def _send(self, request):
534 # Grab a reference to the RPC call. Because another thread (notably
535 # the gRPC error thread) can modify self.call (by invoking reopen),
536 # we should ensure our reference can not change underneath us.
537 # If self.call is modified (such as replaced with a new RPC call) then
538 # this will use the "old" RPC, which should result in the same
539 # exception passed into gRPC's error handler being raised here, which
540 # will be handled by the usual error handling in retryable.
541 with self._operational_lock:
542 call = self.call
544 if call is None:
545 raise ValueError("Can not send() on an RPC that has never been open()ed.")
547 # Don't use self.is_active(), as ResumableBidiRpc will overload it
548 # to mean something semantically different.
549 if call.is_active():
550 self._request_queue.put(request)
551 pass
552 else:
553 # calling next should cause the call to raise.
554 next(call)
556 def send(self, request):
557 return self._recoverable(self._send, request)
559 def _recv(self):
560 with self._operational_lock:
561 call = self.call
563 if call is None:
564 raise ValueError("Can not recv() on an RPC that has never been open()ed.")
566 return next(call)
568 def recv(self):
569 return self._recoverable(self._recv)
571 def close(self):
572 self._finalize(None)
573 super(ResumableBidiRpc, self).close()
575 @property
576 def is_active(self):
577 """bool: True if this stream is currently open and active."""
578 # Use the operational lock. It's entirely possible for something
579 # to check the active state *while* the RPC is being retried.
580 # Also, use finalized to track the actual terminal state here.
581 # This is because if the stream is re-established by the gRPC thread
582 # it's technically possible to check this between when gRPC marks the
583 # RPC as inactive and when gRPC executes our callback that re-opens
584 # the stream.
585 with self._operational_lock:
586 return self.call is not None and not self._finalized
589class BackgroundConsumer(object):
590 """A bi-directional stream consumer that runs in a separate thread.
592 This maps the consumption of a stream into a callback-based model. It also
593 provides :func:`pause` and :func:`resume` to allow for flow-control.
595 Example::
597 def should_recover(exc):
598 return (
599 isinstance(exc, grpc.RpcError) and
600 exc.code() == grpc.StatusCode.UNAVAILABLE)
602 initial_request = example_pb2.StreamingRpcRequest(
603 setting='example')
605 rpc = ResumeableBidiRpc(
606 stub.StreamingRpc,
607 initial_request=initial_request,
608 should_recover=should_recover)
610 def on_response(response):
611 print(response)
613 consumer = BackgroundConsumer(rpc, on_response)
614 consumer.start()
616 Note that error handling *must* be done by using the provided
617 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
618 whenever the RPC itself exits and will not provide any error details.
620 Args:
621 bidi_rpc (BidiRpc): The RPC to consume. Should not have been
622 ``open()``ed yet.
623 on_response (Callable[[protobuf.Message], None]): The callback to
624 be called for every response on the stream.
625 """
627 def __init__(self, bidi_rpc, on_response):
628 self._bidi_rpc = bidi_rpc
629 self._on_response = on_response
630 self._paused = False
631 self._wake = threading.Condition()
632 self._thread = None
633 self._operational_lock = threading.Lock()
635 def _on_call_done(self, future):
636 # Resume the thread if it's paused, this prevents blocking forever
637 # when the RPC has terminated.
638 self.resume()
640 def _thread_main(self, ready):
641 try:
642 ready.set()
643 self._bidi_rpc.add_done_callback(self._on_call_done)
644 self._bidi_rpc.open()
646 while self._bidi_rpc.is_active:
647 # Do not allow the paused status to change at all during this
648 # section. There is a condition where we could be resumed
649 # between checking if we are paused and calling wake.wait(),
650 # which means that we will miss the notification to wake up
651 # (oops!) and wait for a notification that will never come.
652 # Keeping the lock throughout avoids that.
653 # In the future, we could use `Condition.wait_for` if we drop
654 # Python 2.7.
655 # See: https://github.com/googleapis/python-api-core/issues/211
656 with self._wake:
657 while self._paused:
658 _LOGGER.debug("paused, waiting for waking.")
659 self._wake.wait()
660 _LOGGER.debug("woken.")
662 _LOGGER.debug("waiting for recv.")
663 response = self._bidi_rpc.recv()
664 _LOGGER.debug("recved response.")
665 self._on_response(response)
667 except exceptions.GoogleAPICallError as exc:
668 _LOGGER.debug(
669 "%s caught error %s and will exit. Generally this is due to "
670 "the RPC itself being cancelled and the error will be "
671 "surfaced to the calling code.",
672 _BIDIRECTIONAL_CONSUMER_NAME,
673 exc,
674 exc_info=True,
675 )
677 except Exception as exc:
678 _LOGGER.exception(
679 "%s caught unexpected exception %s and will exit.",
680 _BIDIRECTIONAL_CONSUMER_NAME,
681 exc,
682 )
684 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
686 def start(self):
687 """Start the background thread and begin consuming the thread."""
688 with self._operational_lock:
689 ready = threading.Event()
690 thread = threading.Thread(
691 name=_BIDIRECTIONAL_CONSUMER_NAME,
692 target=self._thread_main,
693 args=(ready,),
694 )
695 thread.daemon = True
696 thread.start()
697 # Other parts of the code rely on `thread.is_alive` which
698 # isn't sufficient to know if a thread is active, just that it may
699 # soon be active. This can cause races. Further protect
700 # against races by using a ready event and wait on it to be set.
701 ready.wait()
702 self._thread = thread
703 _LOGGER.debug("Started helper thread %s", thread.name)
705 def stop(self):
706 """Stop consuming the stream and shutdown the background thread."""
707 with self._operational_lock:
708 self._bidi_rpc.close()
710 if self._thread is not None:
711 # Resume the thread to wake it up in case it is sleeping.
712 self.resume()
713 # The daemonized thread may itself block, so don't wait
714 # for it longer than a second.
715 self._thread.join(1.0)
716 if self._thread.is_alive(): # pragma: NO COVER
717 _LOGGER.warning("Background thread did not exit.")
719 self._thread = None
721 @property
722 def is_active(self):
723 """bool: True if the background thread is active."""
724 return self._thread is not None and self._thread.is_alive()
726 def pause(self):
727 """Pauses the response stream.
729 This does *not* pause the request stream.
730 """
731 with self._wake:
732 self._paused = True
734 def resume(self):
735 """Resumes the response stream."""
736 with self._wake:
737 self._paused = False
738 self._wake.notify_all()
740 @property
741 def is_paused(self):
742 """bool: True if the response stream is paused."""
743 return self._paused