1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import collections
18import functools
19import itertools
20import logging
21import threading
22import typing
23from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
24import uuid
25
26import grpc # type: ignore
27
28from google.api_core import bidi
29from google.api_core import exceptions
30from google.cloud.pubsub_v1 import types
31from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
32from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
33from google.cloud.pubsub_v1.subscriber._protocol import histogram
34from google.cloud.pubsub_v1.subscriber._protocol import leaser
35from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
36from google.cloud.pubsub_v1.subscriber._protocol import requests
37from google.cloud.pubsub_v1.subscriber.exceptions import (
38 AcknowledgeError,
39 AcknowledgeStatus,
40)
41import google.cloud.pubsub_v1.subscriber.message
42from google.cloud.pubsub_v1.subscriber import futures
43from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
44from google.pubsub_v1 import types as gapic_types
45from grpc_status import rpc_status # type: ignore
46from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
47from google.rpc import code_pb2 # type: ignore
48from google.rpc import status_pb2
49
50if typing.TYPE_CHECKING: # pragma: NO COVER
51 from google.cloud.pubsub_v1 import subscriber
52
53
54_LOGGER = logging.getLogger(__name__)
55_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
56_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
57_RETRYABLE_STREAM_ERRORS = (
58 exceptions.DeadlineExceeded,
59 exceptions.ServiceUnavailable,
60 exceptions.InternalServerError,
61 exceptions.Unknown,
62 exceptions.GatewayTimeout,
63 exceptions.Aborted,
64)
65_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
66_MAX_LOAD = 1.0
67"""The load threshold above which to pause the incoming message stream."""
68
69_RESUME_THRESHOLD = 0.8
70"""The load threshold below which to resume the incoming message stream."""
71
72_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60
73"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
74a subscription. We do this to reduce premature ack expiration.
75"""
76
77_DEFAULT_STREAM_ACK_DEADLINE: float = 60
78"""The default stream ack deadline in seconds."""
79
80_MAX_STREAM_ACK_DEADLINE: float = 600
81"""The maximum stream ack deadline in seconds."""
82
83_MIN_STREAM_ACK_DEADLINE: float = 10
84"""The minimum stream ack deadline in seconds."""
85
86_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
87 code_pb2.DEADLINE_EXCEEDED,
88 code_pb2.RESOURCE_EXHAUSTED,
89 code_pb2.ABORTED,
90 code_pb2.INTERNAL,
91 code_pb2.UNAVAILABLE,
92}
93
94
95def _wrap_as_exception(maybe_exception: Any) -> BaseException:
96 """Wrap an object as a Python exception, if needed.
97
98 Args:
99 maybe_exception: The object to wrap, usually a gRPC exception class.
100
101 Returns:
102 The argument itself if an instance of ``BaseException``, otherwise
103 the argument represented as an instance of ``Exception`` (sub)class.
104 """
105 if isinstance(maybe_exception, grpc.RpcError):
106 return exceptions.from_grpc_error(maybe_exception)
107 elif isinstance(maybe_exception, BaseException):
108 return maybe_exception
109
110 return Exception(maybe_exception)
111
112
113def _wrap_callback_errors(
114 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
115 on_callback_error: Callable[[Exception], Any],
116 message: "google.cloud.pubsub_v1.subscriber.message.Message",
117):
118 """Wraps a user callback so that if an exception occurs the message is
119 nacked.
120
121 Args:
122 callback: The user callback.
123 message: The Pub/Sub message.
124 """
125 try:
126 callback(message)
127 except Exception as exc:
128 # Note: the likelihood of this failing is extremely low. This just adds
129 # a message to a queue, so if this doesn't work the world is in an
130 # unrecoverable state and this thread should just bail.
131 _LOGGER.exception(
132 "Top-level exception occurred in callback while processing a message"
133 )
134 message.nack()
135 on_callback_error(exc)
136
137
138def _get_status(
139 exc: exceptions.GoogleAPICallError,
140) -> Optional["status_pb2.Status"]:
141 if not exc.response:
142 _LOGGER.debug("No response obj in errored RPC call.")
143 return None
144 try:
145 return rpc_status.from_call(exc.response)
146 # Possible "If the gRPC call’s code or details are inconsistent
147 # with the status code and message inside of the
148 # google.rpc.status.Status"
149 except ValueError:
150 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True)
151 return None
152
153
154def _get_ack_errors(
155 exc: exceptions.GoogleAPICallError,
156) -> Optional[Dict[str, str]]:
157 status = _get_status(exc)
158 if not status:
159 _LOGGER.debug("Unable to get status of errored RPC.")
160 return None
161 for detail in status.details:
162 info = ErrorInfo()
163 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)):
164 _LOGGER.debug("Unable to unpack ErrorInfo.")
165 return None
166 return info.metadata
167 return None
168
169
170def _process_requests(
171 error_status: Optional["status_pb2.Status"],
172 ack_reqs_dict: Dict[str, requests.AckRequest],
173 errors_dict: Optional[Dict[str, str]],
174):
175 """Process requests when exactly-once delivery is enabled by referring to
176 error_status and errors_dict.
177
178 The errors returned by the server in as `error_status` or in `errors_dict`
179 are used to complete the request futures in `ack_reqs_dict` (with a success
180 or exception) or to return requests for further retries.
181 """
182 requests_completed = []
183 requests_to_retry = []
184 for ack_id in ack_reqs_dict:
185 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
186 # sidecar metadata when exactly-once delivery is enabled.
187 if errors_dict and ack_id in errors_dict:
188 exactly_once_error = errors_dict[ack_id]
189 if exactly_once_error.startswith("TRANSIENT_"):
190 requests_to_retry.append(ack_reqs_dict[ack_id])
191 else:
192 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
193 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
194 else:
195 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
196 future = ack_reqs_dict[ack_id].future
197 if future is not None:
198 future.set_exception(exc)
199 requests_completed.append(ack_reqs_dict[ack_id])
200 # Temporary GRPC errors are retried
201 elif (
202 error_status
203 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
204 ):
205 requests_to_retry.append(ack_reqs_dict[ack_id])
206 # Other GRPC errors are NOT retried
207 elif error_status:
208 if error_status.code == code_pb2.PERMISSION_DENIED:
209 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
210 elif error_status.code == code_pb2.FAILED_PRECONDITION:
211 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
212 else:
213 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
214 future = ack_reqs_dict[ack_id].future
215 if future is not None:
216 future.set_exception(exc)
217 requests_completed.append(ack_reqs_dict[ack_id])
218 # Since no error occurred, requests with futures are completed successfully.
219 elif ack_reqs_dict[ack_id].future:
220 future = ack_reqs_dict[ack_id].future
221 # success
222 assert future is not None
223 future.set_result(AcknowledgeStatus.SUCCESS)
224 requests_completed.append(ack_reqs_dict[ack_id])
225 # All other requests are considered completed.
226 else:
227 requests_completed.append(ack_reqs_dict[ack_id])
228
229 return requests_completed, requests_to_retry
230
231
232class StreamingPullManager(object):
233 """The streaming pull manager coordinates pulling messages from Pub/Sub,
234 leasing them, and scheduling them to be processed.
235
236 Args:
237 client:
238 The subscriber client used to create this instance.
239 subscription:
240 The name of the subscription. The canonical format for this is
241 ``projects/{project}/subscriptions/{subscription}``.
242 flow_control:
243 The flow control settings.
244 scheduler:
245 The scheduler to use to process messages. If not provided, a thread
246 pool-based scheduler will be used.
247 use_legacy_flow_control:
248 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
249 though client-side flow control is still enabled. If set to ``False``
250 (default), both server-side and client-side flow control are enabled.
251 await_callbacks_on_shutdown:
252 If ``True``, the shutdown thread will wait until all scheduler threads
253 terminate and only then proceed with shutting down the remaining running
254 helper threads.
255
256 If ``False`` (default), the shutdown thread will shut the scheduler down,
257 but it will not wait for the currently executing scheduler threads to
258 terminate.
259
260 This setting affects when the on close callbacks get invoked, and
261 consequently, when the StreamingPullFuture associated with the stream gets
262 resolved.
263 """
264
265 def __init__(
266 self,
267 client: "subscriber.Client",
268 subscription: str,
269 flow_control: types.FlowControl = types.FlowControl(),
270 scheduler: ThreadScheduler = None,
271 use_legacy_flow_control: bool = False,
272 await_callbacks_on_shutdown: bool = False,
273 ):
274 self._client = client
275 self._subscription = subscription
276 self._exactly_once_enabled = False
277 self._flow_control = flow_control
278 self._use_legacy_flow_control = use_legacy_flow_control
279 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
280 self._ack_histogram = histogram.Histogram()
281 self._last_histogram_size = 0
282 self._stream_metadata = [
283 ["x-goog-request-params", "subscription=" + subscription]
284 ]
285
286 # If max_duration_per_lease_extension is the default
287 # we set the stream_ack_deadline to the default of 60
288 if self._flow_control.max_duration_per_lease_extension == 0:
289 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
290 # We will not be able to extend more than the default minimum
291 elif (
292 self._flow_control.max_duration_per_lease_extension
293 < _MIN_STREAM_ACK_DEADLINE
294 ):
295 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
296 # Will not be able to extend past the max
297 elif (
298 self._flow_control.max_duration_per_lease_extension
299 > _MAX_STREAM_ACK_DEADLINE
300 ):
301 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
302 else:
303 self._stream_ack_deadline = (
304 self._flow_control.max_duration_per_lease_extension
305 )
306
307 self._ack_deadline = max(
308 min(
309 self._flow_control.min_duration_per_lease_extension,
310 histogram.MAX_ACK_DEADLINE,
311 ),
312 histogram.MIN_ACK_DEADLINE,
313 )
314
315 self._rpc: Optional[bidi.ResumableBidiRpc] = None
316 self._callback: Optional[functools.partial] = None
317 self._closing = threading.Lock()
318 self._closed = False
319 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = []
320 # Guarded by self._exactly_once_enabled_lock
321 self._send_new_ack_deadline = False
322
323 # A shutdown thread is created on intentional shutdown.
324 self._regular_shutdown_thread: Optional[threading.Thread] = None
325
326 # Generate a random client id tied to this object. All streaming pull
327 # connections (initial and re-connects) will then use the same client
328 # id. Doing so lets the server establish affinity even across stream
329 # disconncetions.
330 self._client_id = str(uuid.uuid4())
331
332 if scheduler is None:
333 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler()
334 else:
335 self._scheduler = scheduler
336
337 # A collection for the messages that have been received from the server,
338 # but not yet sent to the user callback.
339 self._messages_on_hold = messages_on_hold.MessagesOnHold()
340
341 # The total number of bytes consumed by the messages currently on hold
342 self._on_hold_bytes = 0
343
344 # A lock ensuring that pausing / resuming the consumer are both atomic
345 # operations that cannot be executed concurrently. Needed for properly
346 # syncing these operations with the current leaser load. Additionally,
347 # the lock is used to protect modifications of internal data that
348 # affects the load computation, i.e. the count and size of the messages
349 # currently on hold.
350 self._pause_resume_lock = threading.Lock()
351
352 # A lock guarding the self._exactly_once_enabled variable. We may also
353 # acquire the self._ack_deadline_lock while this lock is held, but not
354 # the reverse. So, we maintain a simple ordering of these two locks to
355 # prevent deadlocks.
356 self._exactly_once_enabled_lock = threading.Lock()
357
358 # A lock protecting the current ACK deadline used in the lease management. This
359 # value can be potentially updated both by the leaser thread and by the message
360 # consumer thread when invoking the internal _on_response() callback.
361 self._ack_deadline_lock = threading.Lock()
362
363 # The threads created in ``.open()``.
364 self._dispatcher: Optional[dispatcher.Dispatcher] = None
365 self._leaser: Optional[leaser.Leaser] = None
366 self._consumer: Optional[bidi.BackgroundConsumer] = None
367 self._heartbeater: Optional[heartbeater.Heartbeater] = None
368
369 @property
370 def is_active(self) -> bool:
371 """``True`` if this manager is actively streaming.
372
373 Note that ``False`` does not indicate this is complete shut down,
374 just that it stopped getting new messages.
375 """
376 return self._consumer is not None and self._consumer.is_active
377
378 @property
379 def flow_control(self) -> types.FlowControl:
380 """The active flow control settings."""
381 return self._flow_control
382
383 @property
384 def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
385 """The dispatcher helper."""
386 return self._dispatcher
387
388 @property
389 def leaser(self) -> Optional[leaser.Leaser]:
390 """The leaser helper."""
391 return self._leaser
392
393 @property
394 def ack_histogram(self) -> histogram.Histogram:
395 """The histogram tracking time-to-acknowledge."""
396 return self._ack_histogram
397
398 @property
399 def ack_deadline(self) -> float:
400 """Return the current ACK deadline based on historical data without updating it.
401
402 Returns:
403 The ack deadline.
404 """
405 return self._obtain_ack_deadline(maybe_update=False)
406
407 def _obtain_ack_deadline(self, maybe_update: bool) -> float:
408 """The actual `ack_deadline` implementation.
409
410 This method is "sticky". It will only perform the computations to check on the
411 right ACK deadline if explicitly requested AND if the histogram with past
412 time-to-ack data has gained a significant amount of new information.
413
414 Args:
415 maybe_update:
416 If ``True``, also update the current ACK deadline before returning it if
417 enough new ACK data has been gathered.
418
419 Returns:
420 The current ACK deadline in seconds to use.
421 """
422 with self._ack_deadline_lock:
423 if not maybe_update:
424 return self._ack_deadline
425
426 target_size = min(
427 self._last_histogram_size * 2, self._last_histogram_size + 100
428 )
429 hist_size = len(self.ack_histogram)
430
431 if hist_size > target_size:
432 self._last_histogram_size = hist_size
433 self._ack_deadline = self.ack_histogram.percentile(percent=99)
434
435 if self.flow_control.max_duration_per_lease_extension > 0:
436 # The setting in flow control could be too low, adjust if needed.
437 flow_control_setting = max(
438 self.flow_control.max_duration_per_lease_extension,
439 histogram.MIN_ACK_DEADLINE,
440 )
441 self._ack_deadline = min(self._ack_deadline, flow_control_setting)
442
443 # If the user explicitly sets a min ack_deadline, respect it.
444 if self.flow_control.min_duration_per_lease_extension > 0:
445 # The setting in flow control could be too high, adjust if needed.
446 flow_control_setting = min(
447 self.flow_control.min_duration_per_lease_extension,
448 histogram.MAX_ACK_DEADLINE,
449 )
450 self._ack_deadline = max(self._ack_deadline, flow_control_setting)
451 elif self._exactly_once_enabled:
452 # Higher minimum ack_deadline for subscriptions with
453 # exactly-once delivery enabled.
454 self._ack_deadline = max(
455 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
456 )
457 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline
458 # set the stream_ack_deadline to the new ack_deadline.
459 if self._ack_deadline > self._stream_ack_deadline:
460 self._stream_ack_deadline = self._ack_deadline
461 return self._ack_deadline
462
463 @property
464 def load(self) -> float:
465 """Return the current load.
466
467 The load is represented as a float, where 1.0 represents having
468 hit one of the flow control limits, and values between 0.0 and 1.0
469 represent how close we are to them. (0.5 means we have exactly half
470 of what the flow control setting allows, for example.)
471
472 There are (currently) two flow control settings; this property
473 computes how close the manager is to each of them, and returns
474 whichever value is higher. (It does not matter that we have lots of
475 running room on setting A if setting B is over.)
476
477 Returns:
478 The load value.
479 """
480 if self._leaser is None:
481 return 0.0
482
483 # Messages that are temporarily put on hold are not being delivered to
484 # user's callbacks, thus they should not contribute to the flow control
485 # load calculation.
486 # However, since these messages must still be lease-managed to avoid
487 # unnecessary ACK deadline expirations, their count and total size must
488 # be subtracted from the leaser's values.
489 return max(
490 [
491 (self._leaser.message_count - self._messages_on_hold.size)
492 / self._flow_control.max_messages,
493 (self._leaser.bytes - self._on_hold_bytes)
494 / self._flow_control.max_bytes,
495 ]
496 )
497
498 def add_close_callback(
499 self, callback: Callable[["StreamingPullManager", Any], Any]
500 ) -> None:
501 """Schedules a callable when the manager closes.
502
503 Args:
504 The method to call.
505 """
506 self._close_callbacks.append(callback)
507
508 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None:
509 """Send the next message in the queue for each of the passed-in
510 ordering keys, if they exist. Clean up state for keys that no longer
511 have any queued messages.
512
513 Since the load went down by one message, it's probably safe to send the
514 user another message for the same key. Since the released message may be
515 bigger than the previous one, this may increase the load above the maximum.
516 This decision is by design because it simplifies MessagesOnHold.
517
518 Args:
519 ordering_keys:
520 A sequence of ordering keys to activate. May be empty.
521 """
522 with self._pause_resume_lock:
523 if self._scheduler is None:
524 return # We are shutting down, don't try to dispatch any more messages.
525
526 self._messages_on_hold.activate_ordering_keys(
527 ordering_keys, self._schedule_message_on_hold
528 )
529
530 def maybe_pause_consumer(self) -> None:
531 """Check the current load and pause the consumer if needed."""
532 with self._pause_resume_lock:
533 if self.load >= _MAX_LOAD:
534 if self._consumer is not None and not self._consumer.is_paused:
535 _LOGGER.debug(
536 "Message backlog over load at %.2f, pausing.", self.load
537 )
538 self._consumer.pause()
539
540 def maybe_resume_consumer(self) -> None:
541 """Check the load and held messages and resume the consumer if needed.
542
543 If there are messages held internally, release those messages before
544 resuming the consumer. That will avoid leaser overload.
545 """
546 with self._pause_resume_lock:
547 # If we have been paused by flow control, check and see if we are
548 # back within our limits.
549 #
550 # In order to not thrash too much, require us to have passed below
551 # the resume threshold (80% by default) of each flow control setting
552 # before restarting.
553 if self._consumer is None or not self._consumer.is_paused:
554 return
555
556 _LOGGER.debug("Current load: %.2f", self.load)
557
558 # Before maybe resuming the background consumer, release any messages
559 # currently on hold, if the current load allows for it.
560 self._maybe_release_messages()
561
562 if self.load < _RESUME_THRESHOLD:
563 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
564 self._consumer.resume()
565 else:
566 _LOGGER.debug("Did not resume, current load is %.2f.", self.load)
567
568 def _maybe_release_messages(self) -> None:
569 """Release (some of) the held messages if the current load allows for it.
570
571 The method tries to release as many messages as the current leaser load
572 would allow. Each released message is added to the lease management,
573 and the user callback is scheduled for it.
574
575 If there are currently no messages on hold, or if the leaser is
576 already overloaded, this method is effectively a no-op.
577
578 The method assumes the caller has acquired the ``_pause_resume_lock``.
579 """
580 released_ack_ids = []
581 while self.load < _MAX_LOAD:
582 msg = self._messages_on_hold.get()
583 if not msg:
584 break
585
586 self._schedule_message_on_hold(msg)
587 released_ack_ids.append(msg.ack_id)
588
589 assert self._leaser is not None
590 self._leaser.start_lease_expiry_timer(released_ack_ids)
591
592 def _schedule_message_on_hold(
593 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"
594 ):
595 """Schedule a message on hold to be sent to the user and change on-hold-bytes.
596
597 The method assumes the caller has acquired the ``_pause_resume_lock``.
598
599 Args:
600 msg: The message to schedule to be sent to the user.
601 """
602 assert msg, "Message must not be None."
603
604 # On-hold bytes goes down, increasing load.
605 self._on_hold_bytes -= msg.size
606
607 if self._on_hold_bytes < 0:
608 _LOGGER.warning(
609 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
610 )
611 self._on_hold_bytes = 0
612
613 _LOGGER.debug(
614 "Released held message, scheduling callback for it, "
615 "still on hold %s (bytes %s).",
616 self._messages_on_hold.size,
617 self._on_hold_bytes,
618 )
619 assert self._scheduler is not None
620 assert self._callback is not None
621 self._scheduler.schedule(self._callback, msg)
622
623 def send_unary_ack(
624 self, ack_ids, ack_reqs_dict
625 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
626 """Send a request using a separate unary request instead of over the stream.
627
628 If a RetryError occurs, the manager shutdown is triggered, and the
629 error is re-raised.
630 """
631 assert ack_ids
632 assert len(ack_ids) == len(ack_reqs_dict)
633
634 error_status = None
635 ack_errors_dict = None
636 try:
637 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)
638 except exceptions.GoogleAPICallError as exc:
639 _LOGGER.debug(
640 "Exception while sending unary RPC. This is typically "
641 "non-fatal as stream requests are best-effort.",
642 exc_info=True,
643 )
644 error_status = _get_status(exc)
645 ack_errors_dict = _get_ack_errors(exc)
646 except exceptions.RetryError as exc:
647 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
648 # Makes sure to complete futures so they don't block forever.
649 for req in ack_reqs_dict.values():
650 # Futures may be present even with exactly-once delivery
651 # disabled, in transition periods after the setting is changed on
652 # the subscription.
653 if req.future:
654 if exactly_once_delivery_enabled:
655 e = AcknowledgeError(
656 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
657 )
658 req.future.set_exception(e)
659 else:
660 req.future.set_result(AcknowledgeStatus.SUCCESS)
661
662 _LOGGER.debug(
663 "RetryError while sending ack RPC. Waiting on a transient "
664 "error resolution for too long, will now trigger shutdown.",
665 exc_info=False,
666 )
667 # The underlying channel has been suffering from a retryable error
668 # for too long, time to give up and shut the streaming pull down.
669 self._on_rpc_done(exc)
670 raise
671
672 if self._exactly_once_delivery_enabled():
673 requests_completed, requests_to_retry = _process_requests(
674 error_status, ack_reqs_dict, ack_errors_dict
675 )
676 else:
677 requests_completed = []
678 requests_to_retry = []
679 # When exactly-once delivery is NOT enabled, acks/modacks are considered
680 # best-effort. So, they always succeed even if the RPC fails.
681 for req in ack_reqs_dict.values():
682 # Futures may be present even with exactly-once delivery
683 # disabled, in transition periods after the setting is changed on
684 # the subscription.
685 if req.future:
686 req.future.set_result(AcknowledgeStatus.SUCCESS)
687 requests_completed.append(req)
688
689 return requests_completed, requests_to_retry
690
691 def send_unary_modack(
692 self,
693 modify_deadline_ack_ids,
694 modify_deadline_seconds,
695 ack_reqs_dict,
696 default_deadline=None,
697 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
698 """Send a request using a separate unary request instead of over the stream.
699
700 If a RetryError occurs, the manager shutdown is triggered, and the
701 error is re-raised.
702 """
703 assert modify_deadline_ack_ids
704 # Either we have a generator or a single deadline.
705 assert modify_deadline_seconds is None or default_deadline is None
706
707 error_status = None
708 modack_errors_dict = None
709 try:
710 if default_deadline is None:
711 # Send ack_ids with the same deadline seconds together.
712 deadline_to_ack_ids = collections.defaultdict(list)
713
714 for n, ack_id in enumerate(modify_deadline_ack_ids):
715 deadline = modify_deadline_seconds[n]
716 deadline_to_ack_ids[deadline].append(ack_id)
717
718 for deadline, ack_ids in deadline_to_ack_ids.items():
719 self._client.modify_ack_deadline(
720 subscription=self._subscription,
721 ack_ids=ack_ids,
722 ack_deadline_seconds=deadline,
723 )
724 else:
725 # We can send all requests with the default deadline.
726 self._client.modify_ack_deadline(
727 subscription=self._subscription,
728 ack_ids=modify_deadline_ack_ids,
729 ack_deadline_seconds=default_deadline,
730 )
731 except exceptions.GoogleAPICallError as exc:
732 _LOGGER.debug(
733 "Exception while sending unary RPC. This is typically "
734 "non-fatal as stream requests are best-effort.",
735 exc_info=True,
736 )
737 error_status = _get_status(exc)
738 modack_errors_dict = _get_ack_errors(exc)
739 except exceptions.RetryError as exc:
740 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
741 # Makes sure to complete futures so they don't block forever.
742 for req in ack_reqs_dict.values():
743 # Futures may be present even with exactly-once delivery
744 # disabled, in transition periods after the setting is changed on
745 # the subscription.
746 if req.future:
747 if exactly_once_delivery_enabled:
748 e = AcknowledgeError(
749 AcknowledgeStatus.OTHER,
750 "RetryError while sending modack RPC.",
751 )
752 req.future.set_exception(e)
753 else:
754 req.future.set_result(AcknowledgeStatus.SUCCESS)
755
756 _LOGGER.debug(
757 "RetryError while sending modack RPC. Waiting on a transient "
758 "error resolution for too long, will now trigger shutdown.",
759 exc_info=False,
760 )
761 # The underlying channel has been suffering from a retryable error
762 # for too long, time to give up and shut the streaming pull down.
763 self._on_rpc_done(exc)
764 raise
765
766 if self._exactly_once_delivery_enabled():
767 requests_completed, requests_to_retry = _process_requests(
768 error_status, ack_reqs_dict, modack_errors_dict
769 )
770 else:
771 requests_completed = []
772 requests_to_retry = []
773 # When exactly-once delivery is NOT enabled, acks/modacks are considered
774 # best-effort. So, they always succeed even if the RPC fails.
775 for req in ack_reqs_dict.values():
776 # Futures may be present even with exactly-once delivery
777 # disabled, in transition periods after the setting is changed on
778 # the subscription.
779 if req.future:
780 req.future.set_result(AcknowledgeStatus.SUCCESS)
781 requests_completed.append(req)
782
783 return requests_completed, requests_to_retry
784
785 def heartbeat(self) -> bool:
786 """Sends a heartbeat request over the streaming pull RPC.
787
788 The request is empty by default, but may contain the current ack_deadline
789 if the self._exactly_once_enabled flag has changed.
790
791 Returns:
792 If a heartbeat request has actually been sent.
793 """
794 if self._rpc is not None and self._rpc.is_active:
795 send_new_ack_deadline = False
796 with self._exactly_once_enabled_lock:
797 send_new_ack_deadline = self._send_new_ack_deadline
798 self._send_new_ack_deadline = False
799
800 if send_new_ack_deadline:
801 request = gapic_types.StreamingPullRequest(
802 stream_ack_deadline_seconds=self._stream_ack_deadline
803 )
804 _LOGGER.debug(
805 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
806 )
807 else:
808 request = gapic_types.StreamingPullRequest()
809
810 self._rpc.send(request)
811 return True
812
813 return False
814
815 def open(
816 self,
817 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
818 on_callback_error: Callable[[Exception], Any],
819 ) -> None:
820 """Begin consuming messages.
821
822 Args:
823 callback:
824 A callback that will be called for each message received on the
825 stream.
826 on_callback_error:
827 A callable that will be called if an exception is raised in
828 the provided `callback`.
829 """
830 if self.is_active:
831 raise ValueError("This manager is already open.")
832
833 if self._closed:
834 raise ValueError("This manager has been closed and can not be re-used.")
835
836 self._callback = functools.partial(
837 _wrap_callback_errors, callback, on_callback_error
838 )
839
840 # Create the RPC
841 stream_ack_deadline_seconds = self._stream_ack_deadline
842
843 get_initial_request = functools.partial(
844 self._get_initial_request, stream_ack_deadline_seconds
845 )
846 self._rpc = bidi.ResumableBidiRpc(
847 start_rpc=self._client.streaming_pull,
848 initial_request=get_initial_request,
849 should_recover=self._should_recover,
850 should_terminate=self._should_terminate,
851 metadata=self._stream_metadata,
852 throttle_reopen=True,
853 )
854 self._rpc.add_done_callback(self._on_rpc_done)
855
856 _LOGGER.debug(
857 "Creating a stream, default ACK deadline set to {} seconds.".format(
858 self._stream_ack_deadline
859 )
860 )
861
862 # Create references to threads
863 assert self._scheduler is not None
864 scheduler_queue = self._scheduler.queue
865 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
866 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
867 self._leaser = leaser.Leaser(self)
868 self._heartbeater = heartbeater.Heartbeater(self)
869
870 # Start the thread to pass the requests.
871 self._dispatcher.start()
872
873 # Start consuming messages.
874 self._consumer.start()
875
876 # Start the lease maintainer thread.
877 self._leaser.start()
878
879 # Start the stream heartbeater thread.
880 self._heartbeater.start()
881
882 def close(self, reason: Any = None) -> None:
883 """Stop consuming messages and shutdown all helper threads.
884
885 This method is idempotent. Additional calls will have no effect.
886
887 The method does not block, it delegates the shutdown operations to a background
888 thread.
889
890 Args:
891 reason:
892 The reason to close this. If ``None``, this is considered
893 an "intentional" shutdown. This is passed to the callbacks
894 specified via :meth:`add_close_callback`.
895 """
896 self._regular_shutdown_thread = threading.Thread(
897 name=_REGULAR_SHUTDOWN_THREAD_NAME,
898 daemon=True,
899 target=self._shutdown,
900 kwargs={"reason": reason},
901 )
902 self._regular_shutdown_thread.start()
903
904 def _shutdown(self, reason: Any = None) -> None:
905 """Run the actual shutdown sequence (stop the stream and all helper threads).
906
907 Args:
908 reason:
909 The reason to close the stream. If ``None``, this is considered
910 an "intentional" shutdown.
911 """
912 with self._closing:
913 if self._closed:
914 return
915
916 # Stop consuming messages.
917 if self.is_active:
918 _LOGGER.debug("Stopping consumer.")
919 assert self._consumer is not None
920 self._consumer.stop()
921 self._consumer = None
922
923 # Shutdown all helper threads
924 _LOGGER.debug("Stopping scheduler.")
925 assert self._scheduler is not None
926 dropped_messages = self._scheduler.shutdown(
927 await_msg_callbacks=self._await_callbacks_on_shutdown
928 )
929 self._scheduler = None
930
931 # Leaser and dispatcher reference each other through the shared
932 # StreamingPullManager instance, i.e. "self", thus do not set their
933 # references to None until both have been shut down.
934 #
935 # NOTE: Even if the dispatcher operates on an inactive leaser using
936 # the latter's add() and remove() methods, these have no impact on
937 # the stopped leaser (the leaser is never again re-started). Ditto
938 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
939 # because the consumer gets shut down first.
940 _LOGGER.debug("Stopping leaser.")
941 assert self._leaser is not None
942 self._leaser.stop()
943
944 total = len(dropped_messages) + len(
945 self._messages_on_hold._messages_on_hold
946 )
947 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
948 messages_to_nack = itertools.chain(
949 dropped_messages, self._messages_on_hold._messages_on_hold
950 )
951 for msg in messages_to_nack:
952 msg.nack()
953
954 _LOGGER.debug("Stopping dispatcher.")
955 assert self._dispatcher is not None
956 self._dispatcher.stop()
957 self._dispatcher = None
958 # dispatcher terminated, OK to dispose the leaser reference now
959 self._leaser = None
960
961 _LOGGER.debug("Stopping heartbeater.")
962 assert self._heartbeater is not None
963 self._heartbeater.stop()
964 self._heartbeater = None
965
966 self._rpc = None
967 self._closed = True
968 _LOGGER.debug("Finished stopping manager.")
969
970 for callback in self._close_callbacks:
971 callback(self, reason)
972
973 def _get_initial_request(
974 self, stream_ack_deadline_seconds: int
975 ) -> gapic_types.StreamingPullRequest:
976 """Return the initial request for the RPC.
977
978 This defines the initial request that must always be sent to Pub/Sub
979 immediately upon opening the subscription.
980
981 Args:
982 stream_ack_deadline_seconds:
983 The default message acknowledge deadline for the stream.
984
985 Returns:
986 A request suitable for being the first request on the stream (and not
987 suitable for any other purpose).
988 """
989 # Put the request together.
990 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
991 # anyway. Set to some big-ish value in case we modack late.
992 request = gapic_types.StreamingPullRequest(
993 stream_ack_deadline_seconds=stream_ack_deadline_seconds,
994 modify_deadline_ack_ids=[],
995 modify_deadline_seconds=[],
996 subscription=self._subscription,
997 client_id=self._client_id,
998 max_outstanding_messages=(
999 0 if self._use_legacy_flow_control else self._flow_control.max_messages
1000 ),
1001 max_outstanding_bytes=(
1002 0 if self._use_legacy_flow_control else self._flow_control.max_bytes
1003 ),
1004 )
1005
1006 # Return the initial request.
1007 return request
1008
1009 def _send_lease_modacks(
1010 self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True
1011 ) -> Set[str]:
1012 exactly_once_enabled = False
1013 with self._exactly_once_enabled_lock:
1014 exactly_once_enabled = self._exactly_once_enabled
1015 if exactly_once_enabled:
1016 items = [
1017 requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1018 for ack_id in ack_ids
1019 ]
1020
1021 assert self._dispatcher is not None
1022 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1023
1024 expired_ack_ids = set()
1025 for req in items:
1026 try:
1027 assert req.future is not None
1028 req.future.result()
1029 except AcknowledgeError as ack_error:
1030 if (
1031 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
1032 or warn_on_invalid
1033 ):
1034 _LOGGER.warning(
1035 "AcknowledgeError when lease-modacking a message.",
1036 exc_info=True,
1037 )
1038 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1039 expired_ack_ids.add(req.ack_id)
1040 return expired_ack_ids
1041 else:
1042 items = [
1043 requests.ModAckRequest(ack_id, self.ack_deadline, None)
1044 for ack_id in ack_ids
1045 ]
1046 assert self._dispatcher is not None
1047 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1048 return set()
1049
1050 def _exactly_once_delivery_enabled(self) -> bool:
1051 """Whether exactly-once delivery is enabled for the subscription."""
1052 with self._exactly_once_enabled_lock:
1053 return self._exactly_once_enabled
1054
1055 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
1056 """Process all received Pub/Sub messages.
1057
1058 For each message, send a modified acknowledgment request to the
1059 server. This prevents expiration of the message due to buffering by
1060 gRPC or proxy/firewall. This makes the server and client expiration
1061 timer closer to each other thus preventing the message being
1062 redelivered multiple times.
1063
1064 After the messages have all had their ack deadline updated, execute
1065 the callback for each message using the executor.
1066 """
1067 if response is None:
1068 _LOGGER.debug(
1069 "Response callback invoked with None, likely due to a "
1070 "transport shutdown."
1071 )
1072 return
1073
1074 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
1075 # protobuf message to significantly gain on attribute access performance.
1076 received_messages = response._pb.received_messages
1077
1078 _LOGGER.debug(
1079 "Processing %s received message(s), currently on hold %s (bytes %s).",
1080 len(received_messages),
1081 self._messages_on_hold.size,
1082 self._on_hold_bytes,
1083 )
1084
1085 with self._exactly_once_enabled_lock:
1086 if (
1087 response.subscription_properties.exactly_once_delivery_enabled
1088 != self._exactly_once_enabled
1089 ):
1090 self._exactly_once_enabled = (
1091 response.subscription_properties.exactly_once_delivery_enabled
1092 )
1093 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled
1094 # This method acquires the self._ack_deadline_lock lock.
1095 self._obtain_ack_deadline(maybe_update=True)
1096 self._send_new_ack_deadline = True
1097
1098 # Immediately (i.e. without waiting for the auto lease management)
1099 # modack the messages we received, as this tells the server that we've
1100 # received them.
1101 ack_id_gen = (message.ack_id for message in received_messages)
1102 expired_ack_ids = self._send_lease_modacks(
1103 ack_id_gen, self.ack_deadline, warn_on_invalid=False
1104 )
1105
1106 with self._pause_resume_lock:
1107 assert self._scheduler is not None
1108 assert self._leaser is not None
1109
1110 for received_message in received_messages:
1111 if (
1112 not self._exactly_once_delivery_enabled()
1113 or received_message.ack_id not in expired_ack_ids
1114 ):
1115 message = google.cloud.pubsub_v1.subscriber.message.Message(
1116 received_message.message,
1117 received_message.ack_id,
1118 received_message.delivery_attempt,
1119 self._scheduler.queue,
1120 self._exactly_once_delivery_enabled,
1121 )
1122 self._messages_on_hold.put(message)
1123 self._on_hold_bytes += message.size
1124 req = requests.LeaseRequest(
1125 ack_id=message.ack_id,
1126 byte_size=message.size,
1127 ordering_key=message.ordering_key,
1128 )
1129 self._leaser.add([req])
1130
1131 self._maybe_release_messages()
1132
1133 self.maybe_pause_consumer()
1134
1135 def _should_recover(self, exception: BaseException) -> bool:
1136 """Determine if an error on the RPC stream should be recovered.
1137
1138 If the exception is one of the retryable exceptions, this will signal
1139 to the consumer thread that it should "recover" from the failure.
1140
1141 This will cause the stream to exit when it returns :data:`False`.
1142
1143 Returns:
1144 Indicates if the caller should recover or shut down.
1145 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1146 in a list of retryable / idempotent exceptions.
1147 """
1148 exception = _wrap_as_exception(exception)
1149 # If this is in the list of idempotent exceptions, then we want to
1150 # recover.
1151 if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1152 _LOGGER.debug("Observed recoverable stream error %s", exception)
1153 return True
1154 _LOGGER.debug("Observed non-recoverable stream error %s", exception)
1155 return False
1156
1157 def _should_terminate(self, exception: BaseException) -> bool:
1158 """Determine if an error on the RPC stream should be terminated.
1159
1160 If the exception is one of the terminating exceptions, this will signal
1161 to the consumer thread that it should terminate.
1162
1163 This will cause the stream to exit when it returns :data:`True`.
1164
1165 Returns:
1166 Indicates if the caller should terminate or attempt recovery.
1167 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1168 in a list of terminating exceptions.
1169 """
1170 exception = _wrap_as_exception(exception)
1171 if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1172 _LOGGER.debug("Observed terminating stream error %s", exception)
1173 return True
1174 _LOGGER.debug("Observed non-terminating stream error %s", exception)
1175 return False
1176
1177 def _on_rpc_done(self, future: Any) -> None:
1178 """Triggered whenever the underlying RPC terminates without recovery.
1179
1180 This is typically triggered from one of two threads: the background
1181 consumer thread (when calling ``recv()`` produces a non-recoverable
1182 error) or the grpc management thread (when cancelling the RPC).
1183
1184 This method is *non-blocking*. It will start another thread to deal
1185 with shutting everything down. This is to prevent blocking in the
1186 background consumer and preventing it from being ``joined()``.
1187 """
1188 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.")
1189 error = _wrap_as_exception(future)
1190 thread = threading.Thread(
1191 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
1192 )
1193 thread.daemon = True
1194 thread.start()