1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import collections
18import functools
19import itertools
20import logging
21import threading
22import typing
23from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
24import uuid
25
26from opentelemetry import trace
27import grpc # type: ignore
28
29from google.api_core import bidi
30from google.api_core import exceptions
31from google.cloud.pubsub_v1 import types
32from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
33from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
34from google.cloud.pubsub_v1.subscriber._protocol import histogram
35from google.cloud.pubsub_v1.subscriber._protocol import leaser
36from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
37from google.cloud.pubsub_v1.subscriber._protocol import requests
38from google.cloud.pubsub_v1.subscriber.exceptions import (
39 AcknowledgeError,
40 AcknowledgeStatus,
41)
42from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
43 SubscribeOpenTelemetry,
44)
45import google.cloud.pubsub_v1.subscriber.message
46from google.cloud.pubsub_v1.subscriber import futures
47from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
48from google.pubsub_v1 import types as gapic_types
49from grpc_status import rpc_status # type: ignore
50from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
51from google.rpc import code_pb2 # type: ignore
52from google.rpc import status_pb2
53from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
54 start_modack_span,
55)
56
57if typing.TYPE_CHECKING: # pragma: NO COVER
58 from google.cloud.pubsub_v1 import subscriber
59
60
61_LOGGER = logging.getLogger(__name__)
62_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
63_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
64_RETRYABLE_STREAM_ERRORS = (
65 exceptions.DeadlineExceeded,
66 exceptions.ServiceUnavailable,
67 exceptions.InternalServerError,
68 exceptions.Unknown,
69 exceptions.GatewayTimeout,
70 exceptions.Aborted,
71)
72_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
73_MAX_LOAD = 1.0
74"""The load threshold above which to pause the incoming message stream."""
75
76_RESUME_THRESHOLD = 0.8
77"""The load threshold below which to resume the incoming message stream."""
78
79_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60
80"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
81a subscription. We do this to reduce premature ack expiration.
82"""
83
84_DEFAULT_STREAM_ACK_DEADLINE: float = 60
85"""The default stream ack deadline in seconds."""
86
87_MAX_STREAM_ACK_DEADLINE: float = 600
88"""The maximum stream ack deadline in seconds."""
89
90_MIN_STREAM_ACK_DEADLINE: float = 10
91"""The minimum stream ack deadline in seconds."""
92
93_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
94 code_pb2.DEADLINE_EXCEEDED,
95 code_pb2.RESOURCE_EXHAUSTED,
96 code_pb2.ABORTED,
97 code_pb2.INTERNAL,
98 code_pb2.UNAVAILABLE,
99}
100
101
102def _wrap_as_exception(maybe_exception: Any) -> BaseException:
103 """Wrap an object as a Python exception, if needed.
104
105 Args:
106 maybe_exception: The object to wrap, usually a gRPC exception class.
107
108 Returns:
109 The argument itself if an instance of ``BaseException``, otherwise
110 the argument represented as an instance of ``Exception`` (sub)class.
111 """
112 if isinstance(maybe_exception, grpc.RpcError):
113 return exceptions.from_grpc_error(maybe_exception)
114 elif isinstance(maybe_exception, BaseException):
115 return maybe_exception
116
117 return Exception(maybe_exception)
118
119
120def _wrap_callback_errors(
121 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
122 on_callback_error: Callable[[BaseException], Any],
123 message: "google.cloud.pubsub_v1.subscriber.message.Message",
124):
125 """Wraps a user callback so that if an exception occurs the message is
126 nacked.
127
128 Args:
129 callback: The user callback.
130 message: The Pub/Sub message.
131 """
132 try:
133 if message.opentelemetry_data:
134 message.opentelemetry_data.end_subscribe_concurrency_control_span()
135 message.opentelemetry_data.start_process_span()
136 callback(message)
137 except BaseException as exc:
138 # Note: the likelihood of this failing is extremely low. This just adds
139 # a message to a queue, so if this doesn't work the world is in an
140 # unrecoverable state and this thread should just bail.
141 _LOGGER.exception(
142 "Top-level exception occurred in callback while processing a message"
143 )
144 message.nack()
145 on_callback_error(exc)
146
147
148def _get_status(
149 exc: exceptions.GoogleAPICallError,
150) -> Optional["status_pb2.Status"]:
151 if not exc.response:
152 _LOGGER.debug("No response obj in errored RPC call.")
153 return None
154 try:
155 return rpc_status.from_call(exc.response)
156 # Possible "If the gRPC call’s code or details are inconsistent
157 # with the status code and message inside of the
158 # google.rpc.status.Status"
159 except ValueError:
160 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True)
161 return None
162
163
164def _get_ack_errors(
165 exc: exceptions.GoogleAPICallError,
166) -> Optional[Dict[str, str]]:
167 status = _get_status(exc)
168 if not status:
169 _LOGGER.debug("Unable to get status of errored RPC.")
170 return None
171 for detail in status.details:
172 info = ErrorInfo()
173 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)):
174 _LOGGER.debug("Unable to unpack ErrorInfo.")
175 return None
176 return info.metadata
177 return None
178
179
180def _process_requests(
181 error_status: Optional["status_pb2.Status"],
182 ack_reqs_dict: Dict[str, requests.AckRequest],
183 errors_dict: Optional[Dict[str, str]],
184):
185 """Process requests when exactly-once delivery is enabled by referring to
186 error_status and errors_dict.
187
188 The errors returned by the server in as `error_status` or in `errors_dict`
189 are used to complete the request futures in `ack_reqs_dict` (with a success
190 or exception) or to return requests for further retries.
191 """
192 requests_completed = []
193 requests_to_retry = []
194 for ack_id in ack_reqs_dict:
195 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
196 # sidecar metadata when exactly-once delivery is enabled.
197 if errors_dict and ack_id in errors_dict:
198 exactly_once_error = errors_dict[ack_id]
199 if exactly_once_error.startswith("TRANSIENT_"):
200 requests_to_retry.append(ack_reqs_dict[ack_id])
201 else:
202 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
203 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
204 else:
205 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
206 future = ack_reqs_dict[ack_id].future
207 if future is not None:
208 future.set_exception(exc)
209 requests_completed.append(ack_reqs_dict[ack_id])
210 # Temporary GRPC errors are retried
211 elif (
212 error_status
213 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
214 ):
215 requests_to_retry.append(ack_reqs_dict[ack_id])
216 # Other GRPC errors are NOT retried
217 elif error_status:
218 if error_status.code == code_pb2.PERMISSION_DENIED:
219 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
220 elif error_status.code == code_pb2.FAILED_PRECONDITION:
221 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
222 else:
223 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
224 future = ack_reqs_dict[ack_id].future
225 if future is not None:
226 future.set_exception(exc)
227 requests_completed.append(ack_reqs_dict[ack_id])
228 # Since no error occurred, requests with futures are completed successfully.
229 elif ack_reqs_dict[ack_id].future:
230 future = ack_reqs_dict[ack_id].future
231 # success
232 assert future is not None
233 future.set_result(AcknowledgeStatus.SUCCESS)
234 requests_completed.append(ack_reqs_dict[ack_id])
235 # All other requests are considered completed.
236 else:
237 requests_completed.append(ack_reqs_dict[ack_id])
238
239 return requests_completed, requests_to_retry
240
241
242class StreamingPullManager(object):
243 """The streaming pull manager coordinates pulling messages from Pub/Sub,
244 leasing them, and scheduling them to be processed.
245
246 Args:
247 client:
248 The subscriber client used to create this instance.
249 subscription:
250 The name of the subscription. The canonical format for this is
251 ``projects/{project}/subscriptions/{subscription}``.
252 flow_control:
253 The flow control settings.
254 scheduler:
255 The scheduler to use to process messages. If not provided, a thread
256 pool-based scheduler will be used.
257 use_legacy_flow_control:
258 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
259 though client-side flow control is still enabled. If set to ``False``
260 (default), both server-side and client-side flow control are enabled.
261 await_callbacks_on_shutdown:
262 If ``True``, the shutdown thread will wait until all scheduler threads
263 terminate and only then proceed with shutting down the remaining running
264 helper threads.
265
266 If ``False`` (default), the shutdown thread will shut the scheduler down,
267 but it will not wait for the currently executing scheduler threads to
268 terminate.
269
270 This setting affects when the on close callbacks get invoked, and
271 consequently, when the StreamingPullFuture associated with the stream gets
272 resolved.
273 """
274
275 def __init__(
276 self,
277 client: "subscriber.Client",
278 subscription: str,
279 flow_control: types.FlowControl = types.FlowControl(),
280 scheduler: Optional[ThreadScheduler] = None,
281 use_legacy_flow_control: bool = False,
282 await_callbacks_on_shutdown: bool = False,
283 ):
284 self._client = client
285 self._subscription = subscription
286 self._exactly_once_enabled = False
287 self._flow_control = flow_control
288 self._use_legacy_flow_control = use_legacy_flow_control
289 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
290 self._ack_histogram = histogram.Histogram()
291 self._last_histogram_size = 0
292 self._stream_metadata = [
293 ["x-goog-request-params", "subscription=" + subscription]
294 ]
295
296 # If max_duration_per_lease_extension is the default
297 # we set the stream_ack_deadline to the default of 60
298 if self._flow_control.max_duration_per_lease_extension == 0:
299 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
300 # We will not be able to extend more than the default minimum
301 elif (
302 self._flow_control.max_duration_per_lease_extension
303 < _MIN_STREAM_ACK_DEADLINE
304 ):
305 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
306 # Will not be able to extend past the max
307 elif (
308 self._flow_control.max_duration_per_lease_extension
309 > _MAX_STREAM_ACK_DEADLINE
310 ):
311 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
312 else:
313 self._stream_ack_deadline = (
314 self._flow_control.max_duration_per_lease_extension
315 )
316
317 self._ack_deadline = max(
318 min(
319 self._flow_control.min_duration_per_lease_extension,
320 histogram.MAX_ACK_DEADLINE,
321 ),
322 histogram.MIN_ACK_DEADLINE,
323 )
324
325 self._rpc: Optional[bidi.ResumableBidiRpc] = None
326 self._callback: Optional[functools.partial] = None
327 self._closing = threading.Lock()
328 self._closed = False
329 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = []
330 # Guarded by self._exactly_once_enabled_lock
331 self._send_new_ack_deadline = False
332
333 # A shutdown thread is created on intentional shutdown.
334 self._regular_shutdown_thread: Optional[threading.Thread] = None
335
336 # Generate a random client id tied to this object. All streaming pull
337 # connections (initial and re-connects) will then use the same client
338 # id. Doing so lets the server establish affinity even across stream
339 # disconncetions.
340 self._client_id = str(uuid.uuid4())
341
342 if scheduler is None:
343 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler()
344 else:
345 self._scheduler = scheduler
346
347 # A collection for the messages that have been received from the server,
348 # but not yet sent to the user callback.
349 self._messages_on_hold = messages_on_hold.MessagesOnHold()
350
351 # The total number of bytes consumed by the messages currently on hold
352 self._on_hold_bytes = 0
353
354 # A lock ensuring that pausing / resuming the consumer are both atomic
355 # operations that cannot be executed concurrently. Needed for properly
356 # syncing these operations with the current leaser load. Additionally,
357 # the lock is used to protect modifications of internal data that
358 # affects the load computation, i.e. the count and size of the messages
359 # currently on hold.
360 self._pause_resume_lock = threading.Lock()
361
362 # A lock guarding the self._exactly_once_enabled variable. We may also
363 # acquire the self._ack_deadline_lock while this lock is held, but not
364 # the reverse. So, we maintain a simple ordering of these two locks to
365 # prevent deadlocks.
366 self._exactly_once_enabled_lock = threading.Lock()
367
368 # A lock protecting the current ACK deadline used in the lease management. This
369 # value can be potentially updated both by the leaser thread and by the message
370 # consumer thread when invoking the internal _on_response() callback.
371 self._ack_deadline_lock = threading.Lock()
372
373 # The threads created in ``.open()``.
374 self._dispatcher: Optional[dispatcher.Dispatcher] = None
375 self._leaser: Optional[leaser.Leaser] = None
376 self._consumer: Optional[bidi.BackgroundConsumer] = None
377 self._heartbeater: Optional[heartbeater.Heartbeater] = None
378
379 @property
380 def is_active(self) -> bool:
381 """``True`` if this manager is actively streaming.
382
383 Note that ``False`` does not indicate this is complete shut down,
384 just that it stopped getting new messages.
385 """
386 return self._consumer is not None and self._consumer.is_active
387
388 @property
389 def flow_control(self) -> types.FlowControl:
390 """The active flow control settings."""
391 return self._flow_control
392
393 @property
394 def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
395 """The dispatcher helper."""
396 return self._dispatcher
397
398 @property
399 def leaser(self) -> Optional[leaser.Leaser]:
400 """The leaser helper."""
401 return self._leaser
402
403 @property
404 def ack_histogram(self) -> histogram.Histogram:
405 """The histogram tracking time-to-acknowledge."""
406 return self._ack_histogram
407
408 @property
409 def ack_deadline(self) -> float:
410 """Return the current ACK deadline based on historical data without updating it.
411
412 Returns:
413 The ack deadline.
414 """
415 return self._obtain_ack_deadline(maybe_update=False)
416
417 def _obtain_ack_deadline(self, maybe_update: bool) -> float:
418 """The actual `ack_deadline` implementation.
419
420 This method is "sticky". It will only perform the computations to check on the
421 right ACK deadline if explicitly requested AND if the histogram with past
422 time-to-ack data has gained a significant amount of new information.
423
424 Args:
425 maybe_update:
426 If ``True``, also update the current ACK deadline before returning it if
427 enough new ACK data has been gathered.
428
429 Returns:
430 The current ACK deadline in seconds to use.
431 """
432 with self._ack_deadline_lock:
433 if not maybe_update:
434 return self._ack_deadline
435
436 target_size = min(
437 self._last_histogram_size * 2, self._last_histogram_size + 100
438 )
439 hist_size = len(self.ack_histogram)
440
441 if hist_size > target_size:
442 self._last_histogram_size = hist_size
443 self._ack_deadline = self.ack_histogram.percentile(percent=99)
444
445 if self.flow_control.max_duration_per_lease_extension > 0:
446 # The setting in flow control could be too low, adjust if needed.
447 flow_control_setting = max(
448 self.flow_control.max_duration_per_lease_extension,
449 histogram.MIN_ACK_DEADLINE,
450 )
451 self._ack_deadline = min(self._ack_deadline, flow_control_setting)
452
453 # If the user explicitly sets a min ack_deadline, respect it.
454 if self.flow_control.min_duration_per_lease_extension > 0:
455 # The setting in flow control could be too high, adjust if needed.
456 flow_control_setting = min(
457 self.flow_control.min_duration_per_lease_extension,
458 histogram.MAX_ACK_DEADLINE,
459 )
460 self._ack_deadline = max(self._ack_deadline, flow_control_setting)
461 elif self._exactly_once_enabled:
462 # Higher minimum ack_deadline for subscriptions with
463 # exactly-once delivery enabled.
464 self._ack_deadline = max(
465 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
466 )
467 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline
468 # set the stream_ack_deadline to the new ack_deadline.
469 if self._ack_deadline > self._stream_ack_deadline:
470 self._stream_ack_deadline = self._ack_deadline
471 return self._ack_deadline
472
473 @property
474 def load(self) -> float:
475 """Return the current load.
476
477 The load is represented as a float, where 1.0 represents having
478 hit one of the flow control limits, and values between 0.0 and 1.0
479 represent how close we are to them. (0.5 means we have exactly half
480 of what the flow control setting allows, for example.)
481
482 There are (currently) two flow control settings; this property
483 computes how close the manager is to each of them, and returns
484 whichever value is higher. (It does not matter that we have lots of
485 running room on setting A if setting B is over.)
486
487 Returns:
488 The load value.
489 """
490 if self._leaser is None:
491 return 0.0
492
493 # Messages that are temporarily put on hold are not being delivered to
494 # user's callbacks, thus they should not contribute to the flow control
495 # load calculation.
496 # However, since these messages must still be lease-managed to avoid
497 # unnecessary ACK deadline expirations, their count and total size must
498 # be subtracted from the leaser's values.
499 return max(
500 [
501 (self._leaser.message_count - self._messages_on_hold.size)
502 / self._flow_control.max_messages,
503 (self._leaser.bytes - self._on_hold_bytes)
504 / self._flow_control.max_bytes,
505 ]
506 )
507
508 def add_close_callback(
509 self, callback: Callable[["StreamingPullManager", Any], Any]
510 ) -> None:
511 """Schedules a callable when the manager closes.
512
513 Args:
514 The method to call.
515 """
516 self._close_callbacks.append(callback)
517
518 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None:
519 """Send the next message in the queue for each of the passed-in
520 ordering keys, if they exist. Clean up state for keys that no longer
521 have any queued messages.
522
523 Since the load went down by one message, it's probably safe to send the
524 user another message for the same key. Since the released message may be
525 bigger than the previous one, this may increase the load above the maximum.
526 This decision is by design because it simplifies MessagesOnHold.
527
528 Args:
529 ordering_keys:
530 A sequence of ordering keys to activate. May be empty.
531 """
532 with self._pause_resume_lock:
533 if self._scheduler is None:
534 return # We are shutting down, don't try to dispatch any more messages.
535
536 self._messages_on_hold.activate_ordering_keys(
537 ordering_keys, self._schedule_message_on_hold
538 )
539
540 def maybe_pause_consumer(self) -> None:
541 """Check the current load and pause the consumer if needed."""
542 with self._pause_resume_lock:
543 if self.load >= _MAX_LOAD:
544 if self._consumer is not None and not self._consumer.is_paused:
545 _LOGGER.debug(
546 "Message backlog over load at %.2f, pausing.", self.load
547 )
548 self._consumer.pause()
549
550 def maybe_resume_consumer(self) -> None:
551 """Check the load and held messages and resume the consumer if needed.
552
553 If there are messages held internally, release those messages before
554 resuming the consumer. That will avoid leaser overload.
555 """
556 with self._pause_resume_lock:
557 # If we have been paused by flow control, check and see if we are
558 # back within our limits.
559 #
560 # In order to not thrash too much, require us to have passed below
561 # the resume threshold (80% by default) of each flow control setting
562 # before restarting.
563 if self._consumer is None or not self._consumer.is_paused:
564 return
565
566 _LOGGER.debug("Current load: %.2f", self.load)
567
568 # Before maybe resuming the background consumer, release any messages
569 # currently on hold, if the current load allows for it.
570 self._maybe_release_messages()
571
572 if self.load < _RESUME_THRESHOLD:
573 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
574 self._consumer.resume()
575 else:
576 _LOGGER.debug("Did not resume, current load is %.2f.", self.load)
577
578 def _maybe_release_messages(self) -> None:
579 """Release (some of) the held messages if the current load allows for it.
580
581 The method tries to release as many messages as the current leaser load
582 would allow. Each released message is added to the lease management,
583 and the user callback is scheduled for it.
584
585 If there are currently no messages on hold, or if the leaser is
586 already overloaded, this method is effectively a no-op.
587
588 The method assumes the caller has acquired the ``_pause_resume_lock``.
589 """
590 released_ack_ids = []
591 while self.load < _MAX_LOAD:
592 msg = self._messages_on_hold.get()
593 if not msg:
594 break
595 if msg.opentelemetry_data:
596 msg.opentelemetry_data.end_subscribe_scheduler_span()
597 self._schedule_message_on_hold(msg)
598 released_ack_ids.append(msg.ack_id)
599
600 assert self._leaser is not None
601 self._leaser.start_lease_expiry_timer(released_ack_ids)
602
603 def _schedule_message_on_hold(
604 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"
605 ):
606 """Schedule a message on hold to be sent to the user and change on-hold-bytes.
607
608 The method assumes the caller has acquired the ``_pause_resume_lock``.
609
610 Args:
611 msg: The message to schedule to be sent to the user.
612 """
613 assert msg, "Message must not be None."
614
615 # On-hold bytes goes down, increasing load.
616 self._on_hold_bytes -= msg.size
617
618 if self._on_hold_bytes < 0:
619 _LOGGER.warning(
620 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
621 )
622 self._on_hold_bytes = 0
623
624 _LOGGER.debug(
625 "Released held message, scheduling callback for it, "
626 "still on hold %s (bytes %s).",
627 self._messages_on_hold.size,
628 self._on_hold_bytes,
629 )
630 assert self._scheduler is not None
631 assert self._callback is not None
632 if msg.opentelemetry_data:
633 msg.opentelemetry_data.start_subscribe_concurrency_control_span()
634 self._scheduler.schedule(self._callback, msg)
635
636 def send_unary_ack(
637 self, ack_ids, ack_reqs_dict
638 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
639 """Send a request using a separate unary request instead of over the stream.
640
641 If a RetryError occurs, the manager shutdown is triggered, and the
642 error is re-raised.
643 """
644 assert ack_ids
645 assert len(ack_ids) == len(ack_reqs_dict)
646
647 error_status = None
648 ack_errors_dict = None
649 try:
650 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)
651 except exceptions.GoogleAPICallError as exc:
652 _LOGGER.debug(
653 "Exception while sending unary RPC. This is typically "
654 "non-fatal as stream requests are best-effort.",
655 exc_info=True,
656 )
657 error_status = _get_status(exc)
658 ack_errors_dict = _get_ack_errors(exc)
659 except exceptions.RetryError as exc:
660 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
661 # Makes sure to complete futures so they don't block forever.
662 for req in ack_reqs_dict.values():
663 # Futures may be present even with exactly-once delivery
664 # disabled, in transition periods after the setting is changed on
665 # the subscription.
666 if req.future:
667 if exactly_once_delivery_enabled:
668 e = AcknowledgeError(
669 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
670 )
671 req.future.set_exception(e)
672 else:
673 req.future.set_result(AcknowledgeStatus.SUCCESS)
674
675 _LOGGER.debug(
676 "RetryError while sending ack RPC. Waiting on a transient "
677 "error resolution for too long, will now trigger shutdown.",
678 exc_info=False,
679 )
680 # The underlying channel has been suffering from a retryable error
681 # for too long, time to give up and shut the streaming pull down.
682 self._on_rpc_done(exc)
683 raise
684
685 if self._exactly_once_delivery_enabled():
686 requests_completed, requests_to_retry = _process_requests(
687 error_status, ack_reqs_dict, ack_errors_dict
688 )
689 else:
690 requests_completed = []
691 requests_to_retry = []
692 # When exactly-once delivery is NOT enabled, acks/modacks are considered
693 # best-effort. So, they always succeed even if the RPC fails.
694 for req in ack_reqs_dict.values():
695 # Futures may be present even with exactly-once delivery
696 # disabled, in transition periods after the setting is changed on
697 # the subscription.
698 if req.future:
699 req.future.set_result(AcknowledgeStatus.SUCCESS)
700 requests_completed.append(req)
701
702 return requests_completed, requests_to_retry
703
704 def send_unary_modack(
705 self,
706 modify_deadline_ack_ids,
707 modify_deadline_seconds,
708 ack_reqs_dict,
709 default_deadline=None,
710 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
711 """Send a request using a separate unary request instead of over the stream.
712
713 If a RetryError occurs, the manager shutdown is triggered, and the
714 error is re-raised.
715 """
716 assert modify_deadline_ack_ids
717 # Either we have a generator or a single deadline.
718 assert modify_deadline_seconds is None or default_deadline is None
719
720 error_status = None
721 modack_errors_dict = None
722 try:
723 if default_deadline is None:
724 # Send ack_ids with the same deadline seconds together.
725 deadline_to_ack_ids = collections.defaultdict(list)
726
727 for n, ack_id in enumerate(modify_deadline_ack_ids):
728 deadline = modify_deadline_seconds[n]
729 deadline_to_ack_ids[deadline].append(ack_id)
730
731 for deadline, ack_ids in deadline_to_ack_ids.items():
732 self._client.modify_ack_deadline(
733 subscription=self._subscription,
734 ack_ids=ack_ids,
735 ack_deadline_seconds=deadline,
736 )
737 else:
738 # We can send all requests with the default deadline.
739 self._client.modify_ack_deadline(
740 subscription=self._subscription,
741 ack_ids=modify_deadline_ack_ids,
742 ack_deadline_seconds=default_deadline,
743 )
744 except exceptions.GoogleAPICallError as exc:
745 _LOGGER.debug(
746 "Exception while sending unary RPC. This is typically "
747 "non-fatal as stream requests are best-effort.",
748 exc_info=True,
749 )
750 error_status = _get_status(exc)
751 modack_errors_dict = _get_ack_errors(exc)
752 except exceptions.RetryError as exc:
753 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
754 # Makes sure to complete futures so they don't block forever.
755 for req in ack_reqs_dict.values():
756 # Futures may be present even with exactly-once delivery
757 # disabled, in transition periods after the setting is changed on
758 # the subscription.
759 if req.future:
760 if exactly_once_delivery_enabled:
761 e = AcknowledgeError(
762 AcknowledgeStatus.OTHER,
763 "RetryError while sending modack RPC.",
764 )
765 req.future.set_exception(e)
766 else:
767 req.future.set_result(AcknowledgeStatus.SUCCESS)
768
769 _LOGGER.debug(
770 "RetryError while sending modack RPC. Waiting on a transient "
771 "error resolution for too long, will now trigger shutdown.",
772 exc_info=False,
773 )
774 # The underlying channel has been suffering from a retryable error
775 # for too long, time to give up and shut the streaming pull down.
776 self._on_rpc_done(exc)
777 raise
778
779 if self._exactly_once_delivery_enabled():
780 requests_completed, requests_to_retry = _process_requests(
781 error_status, ack_reqs_dict, modack_errors_dict
782 )
783 else:
784 requests_completed = []
785 requests_to_retry = []
786 # When exactly-once delivery is NOT enabled, acks/modacks are considered
787 # best-effort. So, they always succeed even if the RPC fails.
788 for req in ack_reqs_dict.values():
789 # Futures may be present even with exactly-once delivery
790 # disabled, in transition periods after the setting is changed on
791 # the subscription.
792 if req.future:
793 req.future.set_result(AcknowledgeStatus.SUCCESS)
794 requests_completed.append(req)
795
796 return requests_completed, requests_to_retry
797
798 def heartbeat(self) -> bool:
799 """Sends a heartbeat request over the streaming pull RPC.
800
801 The request is empty by default, but may contain the current ack_deadline
802 if the self._exactly_once_enabled flag has changed.
803
804 Returns:
805 If a heartbeat request has actually been sent.
806 """
807 if self._rpc is not None and self._rpc.is_active:
808 send_new_ack_deadline = False
809 with self._exactly_once_enabled_lock:
810 send_new_ack_deadline = self._send_new_ack_deadline
811 self._send_new_ack_deadline = False
812
813 if send_new_ack_deadline:
814 request = gapic_types.StreamingPullRequest(
815 stream_ack_deadline_seconds=self._stream_ack_deadline
816 )
817 _LOGGER.debug(
818 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
819 )
820 else:
821 request = gapic_types.StreamingPullRequest()
822
823 self._rpc.send(request)
824 return True
825
826 return False
827
828 def open(
829 self,
830 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
831 on_callback_error: Callable[[Exception], Any],
832 ) -> None:
833 """Begin consuming messages.
834
835 Args:
836 callback:
837 A callback that will be called for each message received on the
838 stream.
839 on_callback_error:
840 A callable that will be called if an exception is raised in
841 the provided `callback`.
842 """
843 if self.is_active:
844 raise ValueError("This manager is already open.")
845
846 if self._closed:
847 raise ValueError("This manager has been closed and can not be re-used.")
848
849 self._callback = functools.partial(
850 _wrap_callback_errors, callback, on_callback_error
851 )
852
853 # Create the RPC
854 stream_ack_deadline_seconds = self._stream_ack_deadline
855
856 get_initial_request = functools.partial(
857 self._get_initial_request, stream_ack_deadline_seconds
858 )
859 self._rpc = bidi.ResumableBidiRpc(
860 start_rpc=self._client.streaming_pull,
861 initial_request=get_initial_request,
862 should_recover=self._should_recover,
863 should_terminate=self._should_terminate,
864 metadata=self._stream_metadata,
865 throttle_reopen=True,
866 )
867 self._rpc.add_done_callback(self._on_rpc_done)
868
869 _LOGGER.debug(
870 "Creating a stream, default ACK deadline set to {} seconds.".format(
871 self._stream_ack_deadline
872 )
873 )
874
875 # Create references to threads
876 assert self._scheduler is not None
877 scheduler_queue = self._scheduler.queue
878 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
879 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
880 self._leaser = leaser.Leaser(self)
881 self._heartbeater = heartbeater.Heartbeater(self)
882
883 # Start the thread to pass the requests.
884 self._dispatcher.start()
885
886 # Start consuming messages.
887 self._consumer.start()
888
889 # Start the lease maintainer thread.
890 self._leaser.start()
891
892 # Start the stream heartbeater thread.
893 self._heartbeater.start()
894
895 def close(self, reason: Any = None) -> None:
896 """Stop consuming messages and shutdown all helper threads.
897
898 This method is idempotent. Additional calls will have no effect.
899
900 The method does not block, it delegates the shutdown operations to a background
901 thread.
902
903 Args:
904 reason:
905 The reason to close this. If ``None``, this is considered
906 an "intentional" shutdown. This is passed to the callbacks
907 specified via :meth:`add_close_callback`.
908 """
909 self._regular_shutdown_thread = threading.Thread(
910 name=_REGULAR_SHUTDOWN_THREAD_NAME,
911 daemon=True,
912 target=self._shutdown,
913 kwargs={"reason": reason},
914 )
915 self._regular_shutdown_thread.start()
916
917 def _shutdown(self, reason: Any = None) -> None:
918 """Run the actual shutdown sequence (stop the stream and all helper threads).
919
920 Args:
921 reason:
922 The reason to close the stream. If ``None``, this is considered
923 an "intentional" shutdown.
924 """
925 with self._closing:
926 if self._closed:
927 return
928
929 # Stop consuming messages.
930 if self.is_active:
931 _LOGGER.debug("Stopping consumer.")
932 assert self._consumer is not None
933 self._consumer.stop()
934 self._consumer = None
935
936 # Shutdown all helper threads
937 _LOGGER.debug("Stopping scheduler.")
938 assert self._scheduler is not None
939 dropped_messages = self._scheduler.shutdown(
940 await_msg_callbacks=self._await_callbacks_on_shutdown
941 )
942 self._scheduler = None
943
944 # Leaser and dispatcher reference each other through the shared
945 # StreamingPullManager instance, i.e. "self", thus do not set their
946 # references to None until both have been shut down.
947 #
948 # NOTE: Even if the dispatcher operates on an inactive leaser using
949 # the latter's add() and remove() methods, these have no impact on
950 # the stopped leaser (the leaser is never again re-started). Ditto
951 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
952 # because the consumer gets shut down first.
953 _LOGGER.debug("Stopping leaser.")
954 assert self._leaser is not None
955 self._leaser.stop()
956
957 total = len(dropped_messages) + len(
958 self._messages_on_hold._messages_on_hold
959 )
960 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
961 messages_to_nack = itertools.chain(
962 dropped_messages, self._messages_on_hold._messages_on_hold
963 )
964 for msg in messages_to_nack:
965 msg.nack()
966
967 _LOGGER.debug("Stopping dispatcher.")
968 assert self._dispatcher is not None
969 self._dispatcher.stop()
970 self._dispatcher = None
971 # dispatcher terminated, OK to dispose the leaser reference now
972 self._leaser = None
973
974 _LOGGER.debug("Stopping heartbeater.")
975 assert self._heartbeater is not None
976 self._heartbeater.stop()
977 self._heartbeater = None
978
979 self._rpc = None
980 self._closed = True
981 _LOGGER.debug("Finished stopping manager.")
982
983 for callback in self._close_callbacks:
984 callback(self, reason)
985
986 def _get_initial_request(
987 self, stream_ack_deadline_seconds: int
988 ) -> gapic_types.StreamingPullRequest:
989 """Return the initial request for the RPC.
990
991 This defines the initial request that must always be sent to Pub/Sub
992 immediately upon opening the subscription.
993
994 Args:
995 stream_ack_deadline_seconds:
996 The default message acknowledge deadline for the stream.
997
998 Returns:
999 A request suitable for being the first request on the stream (and not
1000 suitable for any other purpose).
1001 """
1002 # Put the request together.
1003 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
1004 # anyway. Set to some big-ish value in case we modack late.
1005 request = gapic_types.StreamingPullRequest(
1006 stream_ack_deadline_seconds=stream_ack_deadline_seconds,
1007 modify_deadline_ack_ids=[],
1008 modify_deadline_seconds=[],
1009 subscription=self._subscription,
1010 client_id=self._client_id,
1011 max_outstanding_messages=(
1012 0 if self._use_legacy_flow_control else self._flow_control.max_messages
1013 ),
1014 max_outstanding_bytes=(
1015 0 if self._use_legacy_flow_control else self._flow_control.max_bytes
1016 ),
1017 )
1018
1019 # Return the initial request.
1020 return request
1021
1022 def _send_lease_modacks(
1023 self,
1024 ack_ids: Iterable[str],
1025 ack_deadline: float,
1026 opentelemetry_data: List[SubscribeOpenTelemetry],
1027 warn_on_invalid=True,
1028 receipt_modack: bool = False,
1029 ) -> Set[str]:
1030 exactly_once_enabled = False
1031
1032 modack_span: Optional[trace.Span] = None
1033 if self._client.open_telemetry_enabled:
1034 subscribe_span_links: List[trace.Link] = []
1035 subscribe_spans: List[trace.Span] = []
1036 subscription_split: List[str] = self._subscription.split("/")
1037 assert len(subscription_split) == 4
1038 subscription_id: str = subscription_split[3]
1039 project_id: str = subscription_split[1]
1040 for data in opentelemetry_data:
1041 subscribe_span: Optional[trace.Span] = data.subscribe_span
1042 if (
1043 subscribe_span
1044 and subscribe_span.get_span_context().trace_flags.sampled
1045 ):
1046 subscribe_span_links.append(
1047 trace.Link(subscribe_span.get_span_context())
1048 )
1049 subscribe_spans.append(subscribe_span)
1050 modack_span = start_modack_span(
1051 subscribe_span_links,
1052 subscription_id,
1053 len(opentelemetry_data),
1054 ack_deadline,
1055 project_id,
1056 "_send_lease_modacks",
1057 receipt_modack,
1058 )
1059 if (
1060 modack_span and modack_span.get_span_context().trace_flags.sampled
1061 ): # pragma: NO COVER
1062 modack_span_context: trace.SpanContext = modack_span.get_span_context()
1063 for subscribe_span in subscribe_spans:
1064 subscribe_span.add_link(
1065 context=modack_span_context,
1066 attributes={
1067 "messaging.operation.name": "modack",
1068 },
1069 )
1070
1071 with self._exactly_once_enabled_lock:
1072 exactly_once_enabled = self._exactly_once_enabled
1073 if exactly_once_enabled:
1074 eod_items: List[requests.ModAckRequest] = []
1075 if self._client.open_telemetry_enabled:
1076 for ack_id, data in zip(
1077 ack_ids, opentelemetry_data
1078 ): # pragma: NO COVER # Identical code covered in the same function below
1079 assert data is not None
1080 eod_items.append(
1081 requests.ModAckRequest(
1082 ack_id,
1083 ack_deadline,
1084 futures.Future(),
1085 data,
1086 )
1087 )
1088 else:
1089 eod_items = [
1090 requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1091 for ack_id in ack_ids
1092 ]
1093
1094 assert self._dispatcher is not None
1095 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline)
1096 if (
1097 modack_span
1098 ): # pragma: NO COVER # Identical code covered in the same function below
1099 modack_span.end()
1100 expired_ack_ids = set()
1101 for req in eod_items:
1102 try:
1103 assert req.future is not None
1104 req.future.result()
1105 except AcknowledgeError as ack_error:
1106 if (
1107 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
1108 or warn_on_invalid
1109 ):
1110 _LOGGER.warning(
1111 "AcknowledgeError when lease-modacking a message.",
1112 exc_info=True,
1113 )
1114 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1115 expired_ack_ids.add(req.ack_id)
1116 return expired_ack_ids
1117 else:
1118 items: List[requests.ModAckRequest] = []
1119 if self._client.open_telemetry_enabled:
1120 for ack_id, data in zip(ack_ids, opentelemetry_data):
1121 assert data is not None
1122 items.append(
1123 requests.ModAckRequest(
1124 ack_id,
1125 self.ack_deadline,
1126 None,
1127 data,
1128 )
1129 )
1130 else:
1131 items = [
1132 requests.ModAckRequest(ack_id, self.ack_deadline, None)
1133 for ack_id in ack_ids
1134 ]
1135 assert self._dispatcher is not None
1136 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1137 if modack_span:
1138 modack_span.end()
1139 return set()
1140
1141 def _exactly_once_delivery_enabled(self) -> bool:
1142 """Whether exactly-once delivery is enabled for the subscription."""
1143 with self._exactly_once_enabled_lock:
1144 return self._exactly_once_enabled
1145
1146 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
1147 """Process all received Pub/Sub messages.
1148
1149 For each message, send a modified acknowledgment request to the
1150 server. This prevents expiration of the message due to buffering by
1151 gRPC or proxy/firewall. This makes the server and client expiration
1152 timer closer to each other thus preventing the message being
1153 redelivered multiple times.
1154
1155 After the messages have all had their ack deadline updated, execute
1156 the callback for each message using the executor.
1157 """
1158 if response is None:
1159 _LOGGER.debug(
1160 "Response callback invoked with None, likely due to a "
1161 "transport shutdown."
1162 )
1163 return
1164
1165 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
1166 # protobuf message to significantly gain on attribute access performance.
1167 received_messages = response._pb.received_messages
1168
1169 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = []
1170 if self._client.open_telemetry_enabled:
1171 for received_message in received_messages:
1172 opentelemetry_data = SubscribeOpenTelemetry(received_message.message)
1173 opentelemetry_data.start_subscribe_span(
1174 self._subscription,
1175 response.subscription_properties.exactly_once_delivery_enabled,
1176 received_message.ack_id,
1177 received_message.delivery_attempt,
1178 )
1179 subscribe_opentelemetry.append(opentelemetry_data)
1180
1181 _LOGGER.debug(
1182 "Processing %s received message(s), currently on hold %s (bytes %s).",
1183 len(received_messages),
1184 self._messages_on_hold.size,
1185 self._on_hold_bytes,
1186 )
1187
1188 with self._exactly_once_enabled_lock:
1189 if (
1190 response.subscription_properties.exactly_once_delivery_enabled
1191 != self._exactly_once_enabled
1192 ):
1193 self._exactly_once_enabled = (
1194 response.subscription_properties.exactly_once_delivery_enabled
1195 )
1196 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled
1197 # This method acquires the self._ack_deadline_lock lock.
1198 self._obtain_ack_deadline(maybe_update=True)
1199 self._send_new_ack_deadline = True
1200
1201 # Immediately (i.e. without waiting for the auto lease management)
1202 # modack the messages we received, as this tells the server that we've
1203 # received them.
1204 ack_id_gen = (message.ack_id for message in received_messages)
1205 expired_ack_ids = self._send_lease_modacks(
1206 ack_id_gen,
1207 self.ack_deadline,
1208 subscribe_opentelemetry,
1209 warn_on_invalid=False,
1210 receipt_modack=True,
1211 )
1212
1213 with self._pause_resume_lock:
1214 if self._scheduler is None or self._leaser is None:
1215 _LOGGER.debug(
1216 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
1217 )
1218 return
1219
1220 i: int = 0
1221 for received_message in received_messages:
1222 if (
1223 not self._exactly_once_delivery_enabled()
1224 or received_message.ack_id not in expired_ack_ids
1225 ):
1226 message = google.cloud.pubsub_v1.subscriber.message.Message(
1227 received_message.message,
1228 received_message.ack_id,
1229 received_message.delivery_attempt,
1230 self._scheduler.queue,
1231 self._exactly_once_delivery_enabled,
1232 )
1233 if self._client.open_telemetry_enabled:
1234 message.opentelemetry_data = subscribe_opentelemetry[i]
1235 i = i + 1
1236 self._messages_on_hold.put(message)
1237 self._on_hold_bytes += message.size
1238 req = requests.LeaseRequest(
1239 ack_id=message.ack_id,
1240 byte_size=message.size,
1241 ordering_key=message.ordering_key,
1242 opentelemetry_data=message.opentelemetry_data,
1243 )
1244 self._leaser.add([req])
1245
1246 self._maybe_release_messages()
1247
1248 self.maybe_pause_consumer()
1249
1250 def _should_recover(self, exception: BaseException) -> bool:
1251 """Determine if an error on the RPC stream should be recovered.
1252
1253 If the exception is one of the retryable exceptions, this will signal
1254 to the consumer thread that it should "recover" from the failure.
1255
1256 This will cause the stream to exit when it returns :data:`False`.
1257
1258 Returns:
1259 Indicates if the caller should recover or shut down.
1260 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1261 in a list of retryable / idempotent exceptions.
1262 """
1263 exception = _wrap_as_exception(exception)
1264 # If this is in the list of idempotent exceptions, then we want to
1265 # recover.
1266 if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1267 _LOGGER.debug("Observed recoverable stream error %s", exception)
1268 return True
1269 _LOGGER.debug("Observed non-recoverable stream error %s", exception)
1270 return False
1271
1272 def _should_terminate(self, exception: BaseException) -> bool:
1273 """Determine if an error on the RPC stream should be terminated.
1274
1275 If the exception is one of the terminating exceptions, this will signal
1276 to the consumer thread that it should terminate.
1277
1278 This will cause the stream to exit when it returns :data:`True`.
1279
1280 Returns:
1281 Indicates if the caller should terminate or attempt recovery.
1282 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1283 in a list of terminating exceptions.
1284 """
1285 exception = _wrap_as_exception(exception)
1286 if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1287 _LOGGER.debug("Observed terminating stream error %s", exception)
1288 return True
1289 _LOGGER.debug("Observed non-terminating stream error %s", exception)
1290 return False
1291
1292 def _on_rpc_done(self, future: Any) -> None:
1293 """Triggered whenever the underlying RPC terminates without recovery.
1294
1295 This is typically triggered from one of two threads: the background
1296 consumer thread (when calling ``recv()`` produces a non-recoverable
1297 error) or the grpc management thread (when cancelling the RPC).
1298
1299 This method is *non-blocking*. It will start another thread to deal
1300 with shutting everything down. This is to prevent blocking in the
1301 background consumer and preventing it from being ``joined()``.
1302 """
1303 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.")
1304 error = _wrap_as_exception(future)
1305 thread = threading.Thread(
1306 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
1307 )
1308 thread.daemon = True
1309 thread.start()