1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Bi-directional streaming RPC helpers."""
16
17import collections
18import datetime
19import logging
20import queue as queue_module
21import threading
22import time
23
24from google.api_core import exceptions
25
26_LOGGER = logging.getLogger(__name__)
27_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"
28
29
30class _RequestQueueGenerator(object):
31 """A helper for sending requests to a gRPC stream from a Queue.
32
33 This generator takes requests off a given queue and yields them to gRPC.
34
35 This helper is useful when you have an indeterminate, indefinite, or
36 otherwise open-ended set of requests to send through a request-streaming
37 (or bidirectional) RPC.
38
39 The reason this is necessary is because gRPC takes an iterator as the
40 request for request-streaming RPCs. gRPC consumes this iterator in another
41 thread to allow it to block while generating requests for the stream.
42 However, if the generator blocks indefinitely gRPC will not be able to
43 clean up the thread as it'll be blocked on `next(iterator)` and not be able
44 to check the channel status to stop iterating. This helper mitigates that
45 by waiting on the queue with a timeout and checking the RPC state before
46 yielding.
47
48 Finally, it allows for retrying without swapping queues because if it does
49 pull an item off the queue when the RPC is inactive, it'll immediately put
50 it back and then exit. This is necessary because yielding the item in this
51 case will cause gRPC to discard it. In practice, this means that the order
52 of messages is not guaranteed. If such a thing is necessary it would be
53 easy to use a priority queue.
54
55 Example::
56
57 requests = request_queue_generator(q)
58 call = stub.StreamingRequest(iter(requests))
59 requests.call = call
60
61 for response in call:
62 print(response)
63 q.put(...)
64
65 Note that it is possible to accomplish this behavior without "spinning"
66 (using a queue timeout). One possible way would be to use more threads to
67 multiplex the grpc end event with the queue, another possible way is to
68 use selectors and a custom event/queue object. Both of these approaches
69 are significant from an engineering perspective for small benefit - the
70 CPU consumed by spinning is pretty minuscule.
71
72 Args:
73 queue (queue_module.Queue): The request queue.
74 period (float): The number of seconds to wait for items from the queue
75 before checking if the RPC is cancelled. In practice, this
76 determines the maximum amount of time the request consumption
77 thread will live after the RPC is cancelled.
78 initial_request (Union[protobuf.Message,
79 Callable[None, protobuf.Message]]): The initial request to
80 yield. This is done independently of the request queue to allow fo
81 easily restarting streams that require some initial configuration
82 request.
83 """
84
85 def __init__(self, queue, period=1, initial_request=None):
86 self._queue = queue
87 self._period = period
88 self._initial_request = initial_request
89 self.call = None
90
91 def _is_active(self):
92 # Note: there is a possibility that this starts *before* the call
93 # property is set. So we have to check if self.call is set before
94 # seeing if it's active. We need to return True if self.call is None.
95 # See https://github.com/googleapis/python-api-core/issues/560.
96 return self.call is None or self.call.is_active()
97
98 def __iter__(self):
99 if self._initial_request is not None:
100 if callable(self._initial_request):
101 yield self._initial_request()
102 else:
103 yield self._initial_request
104
105 while True:
106 try:
107 item = self._queue.get(timeout=self._period)
108 except queue_module.Empty:
109 if not self._is_active():
110 _LOGGER.debug(
111 "Empty queue and inactive call, exiting request " "generator."
112 )
113 return
114 else:
115 # call is still active, keep waiting for queue items.
116 continue
117
118 # The consumer explicitly sent "None", indicating that the request
119 # should end.
120 if item is None:
121 _LOGGER.debug("Cleanly exiting request generator.")
122 return
123
124 if not self._is_active():
125 # We have an item, but the call is closed. We should put the
126 # item back on the queue so that the next call can consume it.
127 self._queue.put(item)
128 _LOGGER.debug(
129 "Inactive call, replacing item on queue and exiting "
130 "request generator."
131 )
132 return
133
134 yield item
135
136
137class _Throttle(object):
138 """A context manager limiting the total entries in a sliding time window.
139
140 If more than ``access_limit`` attempts are made to enter the context manager
141 instance in the last ``time window`` interval, the exceeding requests block
142 until enough time elapses.
143
144 The context manager instances are thread-safe and can be shared between
145 multiple threads. If multiple requests are blocked and waiting to enter,
146 the exact order in which they are allowed to proceed is not determined.
147
148 Example::
149
150 max_three_per_second = _Throttle(
151 access_limit=3, time_window=datetime.timedelta(seconds=1)
152 )
153
154 for i in range(5):
155 with max_three_per_second as time_waited:
156 print("{}: Waited {} seconds to enter".format(i, time_waited))
157
158 Args:
159 access_limit (int): the maximum number of entries allowed in the time window
160 time_window (datetime.timedelta): the width of the sliding time window
161 """
162
163 def __init__(self, access_limit, time_window):
164 if access_limit < 1:
165 raise ValueError("access_limit argument must be positive")
166
167 if time_window <= datetime.timedelta(0):
168 raise ValueError("time_window argument must be a positive timedelta")
169
170 self._time_window = time_window
171 self._access_limit = access_limit
172 self._past_entries = collections.deque(
173 maxlen=access_limit
174 ) # least recent first
175 self._entry_lock = threading.Lock()
176
177 def __enter__(self):
178 with self._entry_lock:
179 cutoff_time = datetime.datetime.now() - self._time_window
180
181 # drop the entries that are too old, as they are no longer relevant
182 while self._past_entries and self._past_entries[0] < cutoff_time:
183 self._past_entries.popleft()
184
185 if len(self._past_entries) < self._access_limit:
186 self._past_entries.append(datetime.datetime.now())
187 return 0.0 # no waiting was needed
188
189 to_wait = (self._past_entries[0] - cutoff_time).total_seconds()
190 time.sleep(to_wait)
191
192 self._past_entries.append(datetime.datetime.now())
193 return to_wait
194
195 def __exit__(self, *_):
196 pass
197
198 def __repr__(self):
199 return "{}(access_limit={}, time_window={})".format(
200 self.__class__.__name__, self._access_limit, repr(self._time_window)
201 )
202
203
204class BidiRpc(object):
205 """A helper for consuming a bi-directional streaming RPC.
206
207 This maps gRPC's built-in interface which uses a request iterator and a
208 response iterator into a socket-like :func:`send` and :func:`recv`. This
209 is a more useful pattern for long-running or asymmetric streams (streams
210 where there is not a direct correlation between the requests and
211 responses).
212
213 Example::
214
215 initial_request = example_pb2.StreamingRpcRequest(
216 setting='example')
217 rpc = BidiRpc(
218 stub.StreamingRpc,
219 initial_request=initial_request,
220 metadata=[('name', 'value')]
221 )
222
223 rpc.open()
224
225 while rpc.is_active():
226 print(rpc.recv())
227 rpc.send(example_pb2.StreamingRpcRequest(
228 data='example'))
229
230 This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.
231
232 Args:
233 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
234 start the RPC.
235 initial_request (Union[protobuf.Message,
236 Callable[None, protobuf.Message]]): The initial request to
237 yield. This is useful if an initial request is needed to start the
238 stream.
239 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
240 the request.
241 """
242
243 def __init__(self, start_rpc, initial_request=None, metadata=None):
244 self._start_rpc = start_rpc
245 self._initial_request = initial_request
246 self._rpc_metadata = metadata
247 self._request_queue = queue_module.Queue()
248 self._request_generator = None
249 self._is_active = False
250 self._callbacks = []
251 self.call = None
252
253 def add_done_callback(self, callback):
254 """Adds a callback that will be called when the RPC terminates.
255
256 This occurs when the RPC errors or is successfully terminated.
257
258 Args:
259 callback (Callable[[grpc.Future], None]): The callback to execute.
260 It will be provided with the same gRPC future as the underlying
261 stream which will also be a :class:`grpc.Call`.
262 """
263 self._callbacks.append(callback)
264
265 def _on_call_done(self, future):
266 # This occurs when the RPC errors or is successfully terminated.
267 # Note that grpc's "future" here can also be a grpc.RpcError.
268 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
269 # that `grpc.RpcError` is also `grpc.call`.
270 for callback in self._callbacks:
271 callback(future)
272
273 def open(self):
274 """Opens the stream."""
275 if self.is_active:
276 raise ValueError("Can not open an already open stream.")
277
278 request_generator = _RequestQueueGenerator(
279 self._request_queue, initial_request=self._initial_request
280 )
281 try:
282 call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
283 except exceptions.GoogleAPICallError as exc:
284 # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
285 # available from the ``response`` property on the mapped exception.
286 self._on_call_done(exc.response)
287 raise
288
289 request_generator.call = call
290
291 # TODO: api_core should expose the future interface for wrapped
292 # callables as well.
293 if hasattr(call, "_wrapped"): # pragma: NO COVER
294 call._wrapped.add_done_callback(self._on_call_done)
295 else:
296 call.add_done_callback(self._on_call_done)
297
298 self._request_generator = request_generator
299 self.call = call
300
301 def close(self):
302 """Closes the stream."""
303 if self.call is None:
304 return
305
306 self._request_queue.put(None)
307 self.call.cancel()
308 self._request_generator = None
309 self._initial_request = None
310 self._callbacks = []
311 # Don't set self.call to None. Keep it around so that send/recv can
312 # raise the error.
313
314 def send(self, request):
315 """Queue a message to be sent on the stream.
316
317 Send is non-blocking.
318
319 If the underlying RPC has been closed, this will raise.
320
321 Args:
322 request (protobuf.Message): The request to send.
323 """
324 if self.call is None:
325 raise ValueError("Can not send() on an RPC that has never been open()ed.")
326
327 # Don't use self.is_active(), as ResumableBidiRpc will overload it
328 # to mean something semantically different.
329 if self.call.is_active():
330 self._request_queue.put(request)
331 else:
332 # calling next should cause the call to raise.
333 next(self.call)
334
335 def recv(self):
336 """Wait for a message to be returned from the stream.
337
338 Recv is blocking.
339
340 If the underlying RPC has been closed, this will raise.
341
342 Returns:
343 protobuf.Message: The received message.
344 """
345 if self.call is None:
346 raise ValueError("Can not recv() on an RPC that has never been open()ed.")
347
348 return next(self.call)
349
350 @property
351 def is_active(self):
352 """bool: True if this stream is currently open and active."""
353 return self.call is not None and self.call.is_active()
354
355 @property
356 def pending_requests(self):
357 """int: Returns an estimate of the number of queued requests."""
358 return self._request_queue.qsize()
359
360
361def _never_terminate(future_or_error):
362 """By default, no errors cause BiDi termination."""
363 return False
364
365
366class ResumableBidiRpc(BidiRpc):
367 """A :class:`BidiRpc` that can automatically resume the stream on errors.
368
369 It uses the ``should_recover`` arg to determine if it should re-establish
370 the stream on error.
371
372 Example::
373
374 def should_recover(exc):
375 return (
376 isinstance(exc, grpc.RpcError) and
377 exc.code() == grpc.StatusCode.UNAVAILABLE)
378
379 initial_request = example_pb2.StreamingRpcRequest(
380 setting='example')
381
382 metadata = [('header_name', 'value')]
383
384 rpc = ResumableBidiRpc(
385 stub.StreamingRpc,
386 should_recover=should_recover,
387 initial_request=initial_request,
388 metadata=metadata
389 )
390
391 rpc.open()
392
393 while rpc.is_active():
394 print(rpc.recv())
395 rpc.send(example_pb2.StreamingRpcRequest(
396 data='example'))
397
398 Args:
399 start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
400 start the RPC.
401 initial_request (Union[protobuf.Message,
402 Callable[None, protobuf.Message]]): The initial request to
403 yield. This is useful if an initial request is needed to start the
404 stream.
405 should_recover (Callable[[Exception], bool]): A function that returns
406 True if the stream should be recovered. This will be called
407 whenever an error is encountered on the stream.
408 should_terminate (Callable[[Exception], bool]): A function that returns
409 True if the stream should be terminated. This will be called
410 whenever an error is encountered on the stream.
411 metadata Sequence[Tuple(str, str)]: RPC metadata to include in
412 the request.
413 throttle_reopen (bool): If ``True``, throttling will be applied to
414 stream reopen calls. Defaults to ``False``.
415 """
416
417 def __init__(
418 self,
419 start_rpc,
420 should_recover,
421 should_terminate=_never_terminate,
422 initial_request=None,
423 metadata=None,
424 throttle_reopen=False,
425 ):
426 super(ResumableBidiRpc, self).__init__(start_rpc, initial_request, metadata)
427 self._should_recover = should_recover
428 self._should_terminate = should_terminate
429 self._operational_lock = threading.RLock()
430 self._finalized = False
431 self._finalize_lock = threading.Lock()
432
433 if throttle_reopen:
434 self._reopen_throttle = _Throttle(
435 access_limit=5, time_window=datetime.timedelta(seconds=10)
436 )
437 else:
438 self._reopen_throttle = None
439
440 def _finalize(self, result):
441 with self._finalize_lock:
442 if self._finalized:
443 return
444
445 for callback in self._callbacks:
446 callback(result)
447
448 self._finalized = True
449
450 def _on_call_done(self, future):
451 # Unlike the base class, we only execute the callbacks on a terminal
452 # error, not for errors that we can recover from. Note that grpc's
453 # "future" here is also a grpc.RpcError.
454 with self._operational_lock:
455 if self._should_terminate(future):
456 self._finalize(future)
457 elif not self._should_recover(future):
458 self._finalize(future)
459 else:
460 _LOGGER.debug("Re-opening stream from gRPC callback.")
461 self._reopen()
462
463 def _reopen(self):
464 with self._operational_lock:
465 # Another thread already managed to re-open this stream.
466 if self.call is not None and self.call.is_active():
467 _LOGGER.debug("Stream was already re-established.")
468 return
469
470 self.call = None
471 # Request generator should exit cleanly since the RPC its bound to
472 # has exited.
473 self._request_generator = None
474
475 # Note: we do not currently do any sort of backoff here. The
476 # assumption is that re-establishing the stream under normal
477 # circumstances will happen in intervals greater than 60s.
478 # However, it is possible in a degenerative case that the server
479 # closes the stream rapidly which would lead to thrashing here,
480 # but hopefully in those cases the server would return a non-
481 # retryable error.
482
483 try:
484 if self._reopen_throttle:
485 with self._reopen_throttle:
486 self.open()
487 else:
488 self.open()
489 # If re-opening or re-calling the method fails for any reason,
490 # consider it a terminal error and finalize the stream.
491 except Exception as exc:
492 _LOGGER.debug("Failed to re-open stream due to %s", exc)
493 self._finalize(exc)
494 raise
495
496 _LOGGER.info("Re-established stream")
497
498 def _recoverable(self, method, *args, **kwargs):
499 """Wraps a method to recover the stream and retry on error.
500
501 If a retryable error occurs while making the call, then the stream will
502 be re-opened and the method will be retried. This happens indefinitely
503 so long as the error is a retryable one. If an error occurs while
504 re-opening the stream, then this method will raise immediately and
505 trigger finalization of this object.
506
507 Args:
508 method (Callable[..., Any]): The method to call.
509 args: The args to pass to the method.
510 kwargs: The kwargs to pass to the method.
511 """
512 while True:
513 try:
514 return method(*args, **kwargs)
515
516 except Exception as exc:
517 with self._operational_lock:
518 _LOGGER.debug("Call to retryable %r caused %s.", method, exc)
519
520 if self._should_terminate(exc):
521 self.close()
522 _LOGGER.debug("Terminating %r due to %s.", method, exc)
523 self._finalize(exc)
524 break
525
526 if not self._should_recover(exc):
527 self.close()
528 _LOGGER.debug("Not retrying %r due to %s.", method, exc)
529 self._finalize(exc)
530 raise exc
531
532 _LOGGER.debug("Re-opening stream from retryable %r.", method)
533 self._reopen()
534
535 def _send(self, request):
536 # Grab a reference to the RPC call. Because another thread (notably
537 # the gRPC error thread) can modify self.call (by invoking reopen),
538 # we should ensure our reference can not change underneath us.
539 # If self.call is modified (such as replaced with a new RPC call) then
540 # this will use the "old" RPC, which should result in the same
541 # exception passed into gRPC's error handler being raised here, which
542 # will be handled by the usual error handling in retryable.
543 with self._operational_lock:
544 call = self.call
545
546 if call is None:
547 raise ValueError("Can not send() on an RPC that has never been open()ed.")
548
549 # Don't use self.is_active(), as ResumableBidiRpc will overload it
550 # to mean something semantically different.
551 if call.is_active():
552 self._request_queue.put(request)
553 pass
554 else:
555 # calling next should cause the call to raise.
556 next(call)
557
558 def send(self, request):
559 return self._recoverable(self._send, request)
560
561 def _recv(self):
562 with self._operational_lock:
563 call = self.call
564
565 if call is None:
566 raise ValueError("Can not recv() on an RPC that has never been open()ed.")
567
568 return next(call)
569
570 def recv(self):
571 return self._recoverable(self._recv)
572
573 def close(self):
574 self._finalize(None)
575 super(ResumableBidiRpc, self).close()
576
577 @property
578 def is_active(self):
579 """bool: True if this stream is currently open and active."""
580 # Use the operational lock. It's entirely possible for something
581 # to check the active state *while* the RPC is being retried.
582 # Also, use finalized to track the actual terminal state here.
583 # This is because if the stream is re-established by the gRPC thread
584 # it's technically possible to check this between when gRPC marks the
585 # RPC as inactive and when gRPC executes our callback that re-opens
586 # the stream.
587 with self._operational_lock:
588 return self.call is not None and not self._finalized
589
590
591class BackgroundConsumer(object):
592 """A bi-directional stream consumer that runs in a separate thread.
593
594 This maps the consumption of a stream into a callback-based model. It also
595 provides :func:`pause` and :func:`resume` to allow for flow-control.
596
597 Example::
598
599 def should_recover(exc):
600 return (
601 isinstance(exc, grpc.RpcError) and
602 exc.code() == grpc.StatusCode.UNAVAILABLE)
603
604 initial_request = example_pb2.StreamingRpcRequest(
605 setting='example')
606
607 rpc = ResumeableBidiRpc(
608 stub.StreamingRpc,
609 initial_request=initial_request,
610 should_recover=should_recover)
611
612 def on_response(response):
613 print(response)
614
615 consumer = BackgroundConsumer(rpc, on_response)
616 consumer.start()
617
618 Note that error handling *must* be done by using the provided
619 ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
620 whenever the RPC itself exits and will not provide any error details.
621
622 Args:
623 bidi_rpc (BidiRpc): The RPC to consume. Should not have been
624 ``open()``ed yet.
625 on_response (Callable[[protobuf.Message], None]): The callback to
626 be called for every response on the stream.
627 on_fatal_exception (Callable[[Exception], None]): The callback to
628 be called on fatal errors during consumption. Default None.
629 """
630
631 def __init__(self, bidi_rpc, on_response, on_fatal_exception=None):
632 self._bidi_rpc = bidi_rpc
633 self._on_response = on_response
634 self._paused = False
635 self._on_fatal_exception = on_fatal_exception
636 self._wake = threading.Condition()
637 self._thread = None
638 self._operational_lock = threading.Lock()
639
640 def _on_call_done(self, future):
641 # Resume the thread if it's paused, this prevents blocking forever
642 # when the RPC has terminated.
643 self.resume()
644
645 def _thread_main(self, ready):
646 try:
647 ready.set()
648 self._bidi_rpc.add_done_callback(self._on_call_done)
649 self._bidi_rpc.open()
650
651 while self._bidi_rpc.is_active:
652 # Do not allow the paused status to change at all during this
653 # section. There is a condition where we could be resumed
654 # between checking if we are paused and calling wake.wait(),
655 # which means that we will miss the notification to wake up
656 # (oops!) and wait for a notification that will never come.
657 # Keeping the lock throughout avoids that.
658 # In the future, we could use `Condition.wait_for` if we drop
659 # Python 2.7.
660 # See: https://github.com/googleapis/python-api-core/issues/211
661 with self._wake:
662 while self._paused:
663 _LOGGER.debug("paused, waiting for waking.")
664 self._wake.wait()
665 _LOGGER.debug("woken.")
666
667 _LOGGER.debug("waiting for recv.")
668 response = self._bidi_rpc.recv()
669 _LOGGER.debug("recved response.")
670 if self._on_response is not None:
671 self._on_response(response)
672
673 except exceptions.GoogleAPICallError as exc:
674 _LOGGER.debug(
675 "%s caught error %s and will exit. Generally this is due to "
676 "the RPC itself being cancelled and the error will be "
677 "surfaced to the calling code.",
678 _BIDIRECTIONAL_CONSUMER_NAME,
679 exc,
680 exc_info=True,
681 )
682 if self._on_fatal_exception is not None:
683 self._on_fatal_exception(exc)
684
685 except Exception as exc:
686 _LOGGER.exception(
687 "%s caught unexpected exception %s and will exit.",
688 _BIDIRECTIONAL_CONSUMER_NAME,
689 exc,
690 )
691 if self._on_fatal_exception is not None:
692 self._on_fatal_exception(exc)
693
694 _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
695
696 def start(self):
697 """Start the background thread and begin consuming the thread."""
698 with self._operational_lock:
699 ready = threading.Event()
700 thread = threading.Thread(
701 name=_BIDIRECTIONAL_CONSUMER_NAME,
702 target=self._thread_main,
703 args=(ready,),
704 daemon=True,
705 )
706 thread.start()
707 # Other parts of the code rely on `thread.is_alive` which
708 # isn't sufficient to know if a thread is active, just that it may
709 # soon be active. This can cause races. Further protect
710 # against races by using a ready event and wait on it to be set.
711 ready.wait()
712 self._thread = thread
713 _LOGGER.debug("Started helper thread %s", thread.name)
714
715 def stop(self):
716 """Stop consuming the stream and shutdown the background thread.
717
718 NOTE: Cannot be called within `_thread_main`, since it is not
719 possible to join a thread to itself.
720 """
721 with self._operational_lock:
722 self._bidi_rpc.close()
723
724 if self._thread is not None:
725 # Resume the thread to wake it up in case it is sleeping.
726 self.resume()
727 # The daemonized thread may itself block, so don't wait
728 # for it longer than a second.
729 self._thread.join(1.0)
730 if self._thread.is_alive(): # pragma: NO COVER
731 _LOGGER.warning("Background thread did not exit.")
732
733 self._thread = None
734 self._on_response = None
735 self._on_fatal_exception = None
736
737 @property
738 def is_active(self):
739 """bool: True if the background thread is active."""
740 return self._thread is not None and self._thread.is_alive()
741
742 def pause(self):
743 """Pauses the response stream.
744
745 This does *not* pause the request stream.
746 """
747 with self._wake:
748 self._paused = True
749
750 def resume(self):
751 """Resumes the response stream."""
752 with self._wake:
753 self._paused = False
754 self._wake.notify_all()
755
756 @property
757 def is_paused(self):
758 """bool: True if the response stream is paused."""
759 return self._paused