1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import collections
18import functools
19import inspect
20import itertools
21import logging
22import threading
23import typing
24from typing import (
25 Any,
26 Dict,
27 Callable,
28 Iterable,
29 List,
30 Optional,
31 Set,
32 Tuple,
33)
34import uuid
35
36from opentelemetry import trace
37import grpc # type: ignore
38
39from google.api_core import bidi
40from google.api_core import exceptions
41from google.cloud.pubsub_v1 import types
42from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
43from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
44from google.cloud.pubsub_v1.subscriber._protocol import histogram
45from google.cloud.pubsub_v1.subscriber._protocol import leaser
46from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
47from google.cloud.pubsub_v1.subscriber._protocol import requests
48from google.cloud.pubsub_v1.subscriber.exceptions import (
49 AcknowledgeError,
50 AcknowledgeStatus,
51)
52from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
53 SubscribeOpenTelemetry,
54)
55import google.cloud.pubsub_v1.subscriber.message
56from google.cloud.pubsub_v1.subscriber import futures
57from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
58from google.pubsub_v1 import types as gapic_types
59from grpc_status import rpc_status # type: ignore
60from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
61from google.rpc import code_pb2 # type: ignore
62from google.rpc import status_pb2
63from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
64 start_modack_span,
65)
66
67if typing.TYPE_CHECKING: # pragma: NO COVER
68 from google.cloud.pubsub_v1 import subscriber
69
70
71_LOGGER = logging.getLogger(__name__)
72_SLOW_ACK_LOGGER = logging.getLogger("slow-ack")
73_STREAMS_LOGGER = logging.getLogger("subscriber-streams")
74_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control")
75_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery")
76_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions")
77_EXPIRY_LOGGER = logging.getLogger("expiry")
78_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
79_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
80_RETRYABLE_STREAM_ERRORS = (
81 exceptions.Aborted,
82 exceptions.DeadlineExceeded,
83 exceptions.GatewayTimeout,
84 exceptions.InternalServerError,
85 exceptions.ResourceExhausted,
86 exceptions.ServiceUnavailable,
87 exceptions.Unknown,
88)
89_TERMINATING_STREAM_ERRORS = (
90 exceptions.Cancelled,
91 exceptions.InvalidArgument,
92 exceptions.NotFound,
93 exceptions.PermissionDenied,
94 exceptions.Unauthenticated,
95 exceptions.Unauthorized,
96)
97_MAX_LOAD = 1.0
98"""The load threshold above which to pause the incoming message stream."""
99
100_RESUME_THRESHOLD = 0.8
101"""The load threshold below which to resume the incoming message stream."""
102
103_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60
104"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
105a subscription. We do this to reduce premature ack expiration.
106"""
107
108_DEFAULT_STREAM_ACK_DEADLINE: float = 60
109"""The default stream ack deadline in seconds."""
110
111_MAX_STREAM_ACK_DEADLINE: float = 600
112"""The maximum stream ack deadline in seconds."""
113
114_MIN_STREAM_ACK_DEADLINE: float = 10
115"""The minimum stream ack deadline in seconds."""
116
117_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
118 code_pb2.DEADLINE_EXCEEDED,
119 code_pb2.RESOURCE_EXHAUSTED,
120 code_pb2.ABORTED,
121 code_pb2.INTERNAL,
122 code_pb2.UNAVAILABLE,
123}
124
125# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
126# callers on unrecoverable errors. We can only pass this arg if it's available in the
127# `BackgroundConsumer` spec.
128_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
129 bidi.BackgroundConsumer
130)
131
132
133def _wrap_as_exception(maybe_exception: Any) -> BaseException:
134 """Wrap an object as a Python exception, if needed.
135
136 Args:
137 maybe_exception: The object to wrap, usually a gRPC exception class.
138
139 Returns:
140 The argument itself if an instance of ``BaseException``, otherwise
141 the argument represented as an instance of ``Exception`` (sub)class.
142 """
143 if isinstance(maybe_exception, grpc.RpcError):
144 return exceptions.from_grpc_error(maybe_exception)
145 elif isinstance(maybe_exception, BaseException):
146 return maybe_exception
147
148 return Exception(maybe_exception)
149
150
151def _wrap_callback_errors(
152 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
153 on_callback_error: Callable[[BaseException], Any],
154 message: "google.cloud.pubsub_v1.subscriber.message.Message",
155):
156 """Wraps a user callback so that if an exception occurs the message is
157 nacked.
158
159 Args:
160 callback: The user callback.
161 message: The Pub/Sub message.
162 """
163 _CALLBACK_DELIVERY_LOGGER.debug(
164 "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback",
165 message.message_id,
166 message.ack_id,
167 message.ordering_key,
168 message.exactly_once_enabled,
169 )
170
171 try:
172 if message.opentelemetry_data:
173 message.opentelemetry_data.end_subscribe_concurrency_control_span()
174 with message.opentelemetry_data:
175 callback(message)
176 else:
177 callback(message)
178 except BaseException as exc:
179 # Note: the likelihood of this failing is extremely low. This just adds
180 # a message to a queue, so if this doesn't work the world is in an
181 # unrecoverable state and this thread should just bail.
182
183 _CALLBACK_EXCEPTION_LOGGER.exception(
184 "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.",
185 message.message_id,
186 message.ack_id,
187 message.ordering_key,
188 message.exactly_once_enabled,
189 )
190
191 message.nack()
192 on_callback_error(exc)
193
194
195def _get_status(
196 exc: exceptions.GoogleAPICallError,
197) -> Optional["status_pb2.Status"]:
198 if not exc.response:
199 _LOGGER.debug("No response obj in errored RPC call.")
200 return None
201 try:
202 return rpc_status.from_call(exc.response)
203 # Possible "If the gRPC call’s code or details are inconsistent
204 # with the status code and message inside of the
205 # google.rpc.status.Status"
206 except ValueError:
207 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True)
208 return None
209
210
211def _get_ack_errors(
212 exc: exceptions.GoogleAPICallError,
213) -> Optional[Dict[str, str]]:
214 status = _get_status(exc)
215 if not status:
216 _LOGGER.debug("Unable to get status of errored RPC.")
217 return None
218 for detail in status.details:
219 info = ErrorInfo()
220 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)):
221 _LOGGER.debug("Unable to unpack ErrorInfo.")
222 return None
223 return info.metadata
224 return None
225
226
227def _process_requests(
228 error_status: Optional["status_pb2.Status"],
229 ack_reqs_dict: Dict[str, requests.AckRequest],
230 errors_dict: Optional[Dict[str, str]],
231 ack_histogram: Optional[histogram.Histogram] = None,
232 # TODO - Change this param to a Union of Literals when we drop p3.7 support
233 req_type: str = "ack",
234):
235 """Process requests when exactly-once delivery is enabled by referring to
236 error_status and errors_dict.
237
238 The errors returned by the server in as `error_status` or in `errors_dict`
239 are used to complete the request futures in `ack_reqs_dict` (with a success
240 or exception) or to return requests for further retries.
241 """
242 requests_completed = []
243 requests_to_retry = []
244 for ack_id, ack_request in ack_reqs_dict.items():
245 # Debug logging: slow acks
246 if (
247 req_type == "ack"
248 and ack_histogram
249 and ack_request.time_to_ack > ack_histogram.percentile(percent=99)
250 ):
251 _SLOW_ACK_LOGGER.debug(
252 "Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
253 ack_request.message_id,
254 ack_request.ack_id,
255 )
256
257 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
258 # sidecar metadata when exactly-once delivery is enabled.
259 if errors_dict and ack_id in errors_dict:
260 exactly_once_error = errors_dict[ack_id]
261 if exactly_once_error.startswith("TRANSIENT_"):
262 requests_to_retry.append(ack_request)
263 else:
264 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
265 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
266 else:
267 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
268 future = ack_request.future
269 if future is not None:
270 future.set_exception(exc)
271 requests_completed.append(ack_request)
272 # Temporary GRPC errors are retried
273 elif (
274 error_status
275 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
276 ):
277 requests_to_retry.append(ack_request)
278 # Other GRPC errors are NOT retried
279 elif error_status:
280 if error_status.code == code_pb2.PERMISSION_DENIED:
281 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
282 elif error_status.code == code_pb2.FAILED_PRECONDITION:
283 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
284 else:
285 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
286 future = ack_request.future
287 if future is not None:
288 future.set_exception(exc)
289 requests_completed.append(ack_request)
290 # Since no error occurred, requests with futures are completed successfully.
291 elif ack_request.future:
292 future = ack_request.future
293 # success
294 assert future is not None
295 future.set_result(AcknowledgeStatus.SUCCESS)
296 requests_completed.append(ack_request)
297 # All other requests are considered completed.
298 else:
299 requests_completed.append(ack_request)
300
301 return requests_completed, requests_to_retry
302
303
304class StreamingPullManager(object):
305 """The streaming pull manager coordinates pulling messages from Pub/Sub,
306 leasing them, and scheduling them to be processed.
307
308 Args:
309 client:
310 The subscriber client used to create this instance.
311 subscription:
312 The name of the subscription. The canonical format for this is
313 ``projects/{project}/subscriptions/{subscription}``.
314 flow_control:
315 The flow control settings.
316 scheduler:
317 The scheduler to use to process messages. If not provided, a thread
318 pool-based scheduler will be used.
319 use_legacy_flow_control:
320 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
321 though client-side flow control is still enabled. If set to ``False``
322 (default), both server-side and client-side flow control are enabled.
323 await_callbacks_on_shutdown:
324 If ``True``, the shutdown thread will wait until all scheduler threads
325 terminate and only then proceed with shutting down the remaining running
326 helper threads.
327
328 If ``False`` (default), the shutdown thread will shut the scheduler down,
329 but it will not wait for the currently executing scheduler threads to
330 terminate.
331
332 This setting affects when the on close callbacks get invoked, and
333 consequently, when the StreamingPullFuture associated with the stream gets
334 resolved.
335 """
336
337 def __init__(
338 self,
339 client: "subscriber.Client",
340 subscription: str,
341 flow_control: types.FlowControl = types.FlowControl(),
342 scheduler: Optional[ThreadScheduler] = None,
343 use_legacy_flow_control: bool = False,
344 await_callbacks_on_shutdown: bool = False,
345 ):
346 self._client = client
347 self._subscription = subscription
348 self._exactly_once_enabled = False
349 self._flow_control = flow_control
350 self._use_legacy_flow_control = use_legacy_flow_control
351 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
352 self._ack_histogram = histogram.Histogram()
353 self._last_histogram_size = 0
354 self._stream_metadata = [
355 ["x-goog-request-params", "subscription=" + subscription]
356 ]
357
358 # If max_duration_per_lease_extension is the default
359 # we set the stream_ack_deadline to the default of 60
360 if self._flow_control.max_duration_per_lease_extension == 0:
361 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
362 # We will not be able to extend more than the default minimum
363 elif (
364 self._flow_control.max_duration_per_lease_extension
365 < _MIN_STREAM_ACK_DEADLINE
366 ):
367 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
368 # Will not be able to extend past the max
369 elif (
370 self._flow_control.max_duration_per_lease_extension
371 > _MAX_STREAM_ACK_DEADLINE
372 ):
373 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
374 else:
375 self._stream_ack_deadline = (
376 self._flow_control.max_duration_per_lease_extension
377 )
378
379 self._ack_deadline = max(
380 min(
381 self._flow_control.min_duration_per_lease_extension,
382 histogram.MAX_ACK_DEADLINE,
383 ),
384 histogram.MIN_ACK_DEADLINE,
385 )
386
387 self._rpc: Optional[bidi.ResumableBidiRpc] = None
388 self._callback: Optional[functools.partial] = None
389 self._closing = threading.Lock()
390 self._closed = False
391 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = []
392 # Guarded by self._exactly_once_enabled_lock
393 self._send_new_ack_deadline = False
394
395 # A shutdown thread is created on intentional shutdown.
396 self._regular_shutdown_thread: Optional[threading.Thread] = None
397
398 # Generate a random client id tied to this object. All streaming pull
399 # connections (initial and re-connects) will then use the same client
400 # id. Doing so lets the server establish affinity even across stream
401 # disconncetions.
402 self._client_id = str(uuid.uuid4())
403
404 if scheduler is None:
405 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler()
406 else:
407 self._scheduler = scheduler
408
409 # A collection for the messages that have been received from the server,
410 # but not yet sent to the user callback.
411 self._messages_on_hold = messages_on_hold.MessagesOnHold()
412
413 # The total number of bytes consumed by the messages currently on hold
414 self._on_hold_bytes = 0
415
416 # A lock ensuring that pausing / resuming the consumer are both atomic
417 # operations that cannot be executed concurrently. Needed for properly
418 # syncing these operations with the current leaser load. Additionally,
419 # the lock is used to protect modifications of internal data that
420 # affects the load computation, i.e. the count and size of the messages
421 # currently on hold.
422 self._pause_resume_lock = threading.Lock()
423
424 # A lock guarding the self._exactly_once_enabled variable. We may also
425 # acquire the self._ack_deadline_lock while this lock is held, but not
426 # the reverse. So, we maintain a simple ordering of these two locks to
427 # prevent deadlocks.
428 self._exactly_once_enabled_lock = threading.Lock()
429
430 # A lock protecting the current ACK deadline used in the lease management. This
431 # value can be potentially updated both by the leaser thread and by the message
432 # consumer thread when invoking the internal _on_response() callback.
433 self._ack_deadline_lock = threading.Lock()
434
435 # The threads created in ``.open()``.
436 self._dispatcher: Optional[dispatcher.Dispatcher] = None
437 self._leaser: Optional[leaser.Leaser] = None
438 self._consumer: Optional[bidi.BackgroundConsumer] = None
439 self._heartbeater: Optional[heartbeater.Heartbeater] = None
440
441 @property
442 def is_active(self) -> bool:
443 """``True`` if this manager is actively streaming.
444
445 Note that ``False`` does not indicate this is complete shut down,
446 just that it stopped getting new messages.
447 """
448 return self._consumer is not None and self._consumer.is_active
449
450 @property
451 def flow_control(self) -> types.FlowControl:
452 """The active flow control settings."""
453 return self._flow_control
454
455 @property
456 def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
457 """The dispatcher helper."""
458 return self._dispatcher
459
460 @property
461 def leaser(self) -> Optional[leaser.Leaser]:
462 """The leaser helper."""
463 return self._leaser
464
465 @property
466 def ack_histogram(self) -> histogram.Histogram:
467 """The histogram tracking time-to-acknowledge."""
468 return self._ack_histogram
469
470 @property
471 def ack_deadline(self) -> float:
472 """Return the current ACK deadline based on historical data without updating it.
473
474 Returns:
475 The ack deadline.
476 """
477 return self._obtain_ack_deadline(maybe_update=False)
478
479 def _obtain_ack_deadline(self, maybe_update: bool) -> float:
480 """The actual `ack_deadline` implementation.
481
482 This method is "sticky". It will only perform the computations to check on the
483 right ACK deadline if explicitly requested AND if the histogram with past
484 time-to-ack data has gained a significant amount of new information.
485
486 Args:
487 maybe_update:
488 If ``True``, also update the current ACK deadline before returning it if
489 enough new ACK data has been gathered.
490
491 Returns:
492 The current ACK deadline in seconds to use.
493 """
494 with self._ack_deadline_lock:
495 if not maybe_update:
496 return self._ack_deadline
497
498 target_size = min(
499 self._last_histogram_size * 2, self._last_histogram_size + 100
500 )
501 hist_size = len(self.ack_histogram)
502
503 if hist_size > target_size:
504 self._last_histogram_size = hist_size
505 self._ack_deadline = self.ack_histogram.percentile(percent=99)
506
507 if self.flow_control.max_duration_per_lease_extension > 0:
508 # The setting in flow control could be too low, adjust if needed.
509 flow_control_setting = max(
510 self.flow_control.max_duration_per_lease_extension,
511 histogram.MIN_ACK_DEADLINE,
512 )
513 self._ack_deadline = min(self._ack_deadline, flow_control_setting)
514
515 # If the user explicitly sets a min ack_deadline, respect it.
516 if self.flow_control.min_duration_per_lease_extension > 0:
517 # The setting in flow control could be too high, adjust if needed.
518 flow_control_setting = min(
519 self.flow_control.min_duration_per_lease_extension,
520 histogram.MAX_ACK_DEADLINE,
521 )
522 self._ack_deadline = max(self._ack_deadline, flow_control_setting)
523 elif self._exactly_once_enabled:
524 # Higher minimum ack_deadline for subscriptions with
525 # exactly-once delivery enabled.
526 self._ack_deadline = max(
527 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
528 )
529 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline
530 # set the stream_ack_deadline to the new ack_deadline.
531 if self._ack_deadline > self._stream_ack_deadline:
532 self._stream_ack_deadline = self._ack_deadline
533 return self._ack_deadline
534
535 @property
536 def load(self) -> float:
537 """Return the current load.
538
539 The load is represented as a float, where 1.0 represents having
540 hit one of the flow control limits, and values between 0.0 and 1.0
541 represent how close we are to them. (0.5 means we have exactly half
542 of what the flow control setting allows, for example.)
543
544 There are (currently) two flow control settings; this property
545 computes how close the manager is to each of them, and returns
546 whichever value is higher. (It does not matter that we have lots of
547 running room on setting A if setting B is over.)
548
549 Returns:
550 The load value.
551 """
552 if self._leaser is None:
553 return 0.0
554
555 # Messages that are temporarily put on hold are not being delivered to
556 # user's callbacks, thus they should not contribute to the flow control
557 # load calculation.
558 # However, since these messages must still be lease-managed to avoid
559 # unnecessary ACK deadline expirations, their count and total size must
560 # be subtracted from the leaser's values.
561 return max(
562 [
563 (self._leaser.message_count - self._messages_on_hold.size)
564 / self._flow_control.max_messages,
565 (self._leaser.bytes - self._on_hold_bytes)
566 / self._flow_control.max_bytes,
567 ]
568 )
569
570 def add_close_callback(
571 self, callback: Callable[["StreamingPullManager", Any], Any]
572 ) -> None:
573 """Schedules a callable when the manager closes.
574
575 Args:
576 The method to call.
577 """
578 self._close_callbacks.append(callback)
579
580 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None:
581 """Send the next message in the queue for each of the passed-in
582 ordering keys, if they exist. Clean up state for keys that no longer
583 have any queued messages.
584
585 Since the load went down by one message, it's probably safe to send the
586 user another message for the same key. Since the released message may be
587 bigger than the previous one, this may increase the load above the maximum.
588 This decision is by design because it simplifies MessagesOnHold.
589
590 Args:
591 ordering_keys:
592 A sequence of ordering keys to activate. May be empty.
593 """
594 with self._pause_resume_lock:
595 if self._scheduler is None:
596 return # We are shutting down, don't try to dispatch any more messages.
597
598 self._messages_on_hold.activate_ordering_keys(
599 ordering_keys, self._schedule_message_on_hold
600 )
601
602 def maybe_pause_consumer(self) -> None:
603 """Check the current load and pause the consumer if needed."""
604 with self._pause_resume_lock:
605 if self.load >= _MAX_LOAD:
606 if self._consumer is not None and not self._consumer.is_paused:
607 _FLOW_CONTROL_LOGGER.debug(
608 "Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control",
609 self.load,
610 _RESUME_THRESHOLD,
611 )
612 self._consumer.pause()
613
614 def maybe_resume_consumer(self) -> None:
615 """Check the load and held messages and resume the consumer if needed.
616
617 If there are messages held internally, release those messages before
618 resuming the consumer. That will avoid leaser overload.
619 """
620 with self._pause_resume_lock:
621 # If we have been paused by flow control, check and see if we are
622 # back within our limits.
623 #
624 # In order to not thrash too much, require us to have passed below
625 # the resume threshold (80% by default) of each flow control setting
626 # before restarting.
627 if self._consumer is None or not self._consumer.is_paused:
628 return
629
630 _LOGGER.debug("Current load: %.2f", self.load)
631
632 # Before maybe resuming the background consumer, release any messages
633 # currently on hold, if the current load allows for it.
634 self._maybe_release_messages()
635
636 if self.load < _RESUME_THRESHOLD:
637 _FLOW_CONTROL_LOGGER.debug(
638 "Current load is %.2f (threshold %.2f), suspending client-side flow control.",
639 self.load,
640 _RESUME_THRESHOLD,
641 )
642 self._consumer.resume()
643 else:
644 _FLOW_CONTROL_LOGGER.debug(
645 "Current load is %.2f (threshold %.2f), retaining client-side flow control.",
646 self.load,
647 _RESUME_THRESHOLD,
648 )
649
650 def _maybe_release_messages(self) -> None:
651 """Release (some of) the held messages if the current load allows for it.
652
653 The method tries to release as many messages as the current leaser load
654 would allow. Each released message is added to the lease management,
655 and the user callback is scheduled for it.
656
657 If there are currently no messages on hold, or if the leaser is
658 already overloaded, this method is effectively a no-op.
659
660 The method assumes the caller has acquired the ``_pause_resume_lock``.
661 """
662 released_ack_ids = []
663 while self.load < _MAX_LOAD:
664 msg = self._messages_on_hold.get()
665 if not msg:
666 break
667 if msg.opentelemetry_data:
668 msg.opentelemetry_data.end_subscribe_scheduler_span()
669 self._schedule_message_on_hold(msg)
670 released_ack_ids.append(msg.ack_id)
671
672 assert self._leaser is not None
673 self._leaser.start_lease_expiry_timer(released_ack_ids)
674
675 def _schedule_message_on_hold(
676 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"
677 ):
678 """Schedule a message on hold to be sent to the user and change on-hold-bytes.
679
680 The method assumes the caller has acquired the ``_pause_resume_lock``.
681
682 Args:
683 msg: The message to schedule to be sent to the user.
684 """
685 assert msg, "Message must not be None."
686
687 # On-hold bytes goes down, increasing load.
688 self._on_hold_bytes -= msg.size
689
690 if self._on_hold_bytes < 0:
691 _LOGGER.warning(
692 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
693 )
694 self._on_hold_bytes = 0
695
696 _LOGGER.debug(
697 "Released held message, scheduling callback for it, "
698 "still on hold %s (bytes %s).",
699 self._messages_on_hold.size,
700 self._on_hold_bytes,
701 )
702 assert self._scheduler is not None
703 assert self._callback is not None
704 if msg.opentelemetry_data:
705 msg.opentelemetry_data.start_subscribe_concurrency_control_span()
706 self._scheduler.schedule(self._callback, msg)
707
708 def send_unary_ack(
709 self, ack_ids, ack_reqs_dict
710 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
711 """Send a request using a separate unary request instead of over the stream.
712
713 If a RetryError occurs, the manager shutdown is triggered, and the
714 error is re-raised.
715 """
716 assert ack_ids
717 assert len(ack_ids) == len(ack_reqs_dict)
718
719 error_status = None
720 ack_errors_dict = None
721 try:
722 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)
723 except exceptions.GoogleAPICallError as exc:
724 _LOGGER.debug(
725 "Exception while sending unary RPC. This is typically "
726 "non-fatal as stream requests are best-effort.",
727 exc_info=True,
728 )
729 error_status = _get_status(exc)
730 ack_errors_dict = _get_ack_errors(exc)
731 except exceptions.RetryError as exc:
732 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
733 # Makes sure to complete futures so they don't block forever.
734 for req in ack_reqs_dict.values():
735 # Futures may be present even with exactly-once delivery
736 # disabled, in transition periods after the setting is changed on
737 # the subscription.
738 if req.future:
739 if exactly_once_delivery_enabled:
740 e = AcknowledgeError(
741 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
742 )
743 req.future.set_exception(e)
744 else:
745 req.future.set_result(AcknowledgeStatus.SUCCESS)
746
747 _LOGGER.debug(
748 "RetryError while sending ack RPC. Waiting on a transient "
749 "error resolution for too long, will now trigger shutdown.",
750 exc_info=False,
751 )
752 # The underlying channel has been suffering from a retryable error
753 # for too long, time to give up and shut the streaming pull down.
754 self._on_rpc_done(exc)
755 raise
756
757 if self._exactly_once_delivery_enabled():
758 requests_completed, requests_to_retry = _process_requests(
759 error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram, "ack"
760 )
761 else:
762 requests_completed = []
763 requests_to_retry = []
764 # When exactly-once delivery is NOT enabled, acks/modacks are considered
765 # best-effort. So, they always succeed even if the RPC fails.
766 for req in ack_reqs_dict.values():
767 # Futures may be present even with exactly-once delivery
768 # disabled, in transition periods after the setting is changed on
769 # the subscription.
770 if req.future:
771 req.future.set_result(AcknowledgeStatus.SUCCESS)
772 requests_completed.append(req)
773
774 return requests_completed, requests_to_retry
775
776 def send_unary_modack(
777 self,
778 modify_deadline_ack_ids,
779 modify_deadline_seconds,
780 ack_reqs_dict,
781 default_deadline=None,
782 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
783 """Send a request using a separate unary request instead of over the stream.
784
785 If a RetryError occurs, the manager shutdown is triggered, and the
786 error is re-raised.
787 """
788 assert modify_deadline_ack_ids
789 # Either we have a generator or a single deadline.
790 assert modify_deadline_seconds is None or default_deadline is None
791
792 error_status = None
793 modack_errors_dict = None
794 try:
795 if default_deadline is None:
796 # Send ack_ids with the same deadline seconds together.
797 deadline_to_ack_ids = collections.defaultdict(list)
798
799 for n, ack_id in enumerate(modify_deadline_ack_ids):
800 deadline = modify_deadline_seconds[n]
801 deadline_to_ack_ids[deadline].append(ack_id)
802
803 for deadline, ack_ids in deadline_to_ack_ids.items():
804 self._client.modify_ack_deadline(
805 subscription=self._subscription,
806 ack_ids=ack_ids,
807 ack_deadline_seconds=deadline,
808 )
809 else:
810 # We can send all requests with the default deadline.
811 self._client.modify_ack_deadline(
812 subscription=self._subscription,
813 ack_ids=modify_deadline_ack_ids,
814 ack_deadline_seconds=default_deadline,
815 )
816 except exceptions.GoogleAPICallError as exc:
817 _LOGGER.debug(
818 "Exception while sending unary RPC. This is typically "
819 "non-fatal as stream requests are best-effort.",
820 exc_info=True,
821 )
822 error_status = _get_status(exc)
823 modack_errors_dict = _get_ack_errors(exc)
824 except exceptions.RetryError as exc:
825 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
826 # Makes sure to complete futures so they don't block forever.
827 for req in ack_reqs_dict.values():
828 # Futures may be present even with exactly-once delivery
829 # disabled, in transition periods after the setting is changed on
830 # the subscription.
831 if req.future:
832 if exactly_once_delivery_enabled:
833 e = AcknowledgeError(
834 AcknowledgeStatus.OTHER,
835 "RetryError while sending modack RPC.",
836 )
837 req.future.set_exception(e)
838 else:
839 req.future.set_result(AcknowledgeStatus.SUCCESS)
840
841 _LOGGER.debug(
842 "RetryError while sending modack RPC. Waiting on a transient "
843 "error resolution for too long, will now trigger shutdown.",
844 exc_info=False,
845 )
846 # The underlying channel has been suffering from a retryable error
847 # for too long, time to give up and shut the streaming pull down.
848 self._on_rpc_done(exc)
849 raise
850
851 if self._exactly_once_delivery_enabled():
852 requests_completed, requests_to_retry = _process_requests(
853 error_status,
854 ack_reqs_dict,
855 modack_errors_dict,
856 self.ack_histogram,
857 "modack",
858 )
859 else:
860 requests_completed = []
861 requests_to_retry = []
862 # When exactly-once delivery is NOT enabled, acks/modacks are considered
863 # best-effort. So, they always succeed even if the RPC fails.
864 for req in ack_reqs_dict.values():
865 # Futures may be present even with exactly-once delivery
866 # disabled, in transition periods after the setting is changed on
867 # the subscription.
868 if req.future:
869 req.future.set_result(AcknowledgeStatus.SUCCESS)
870 requests_completed.append(req)
871
872 return requests_completed, requests_to_retry
873
874 def heartbeat(self) -> bool:
875 """Sends a heartbeat request over the streaming pull RPC.
876
877 The request is empty by default, but may contain the current ack_deadline
878 if the self._exactly_once_enabled flag has changed.
879
880 Returns:
881 If a heartbeat request has actually been sent.
882 """
883 if self._rpc is not None and self._rpc.is_active:
884 send_new_ack_deadline = False
885 with self._exactly_once_enabled_lock:
886 send_new_ack_deadline = self._send_new_ack_deadline
887 self._send_new_ack_deadline = False
888
889 if send_new_ack_deadline:
890 request = gapic_types.StreamingPullRequest(
891 stream_ack_deadline_seconds=self._stream_ack_deadline
892 )
893 _LOGGER.debug(
894 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
895 )
896 else:
897 request = gapic_types.StreamingPullRequest()
898
899 self._rpc.send(request)
900 return True
901
902 return False
903
904 def open(
905 self,
906 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
907 on_callback_error: Callable[[Exception], Any],
908 ) -> None:
909 """Begin consuming messages.
910
911 Args:
912 callback:
913 A callback that will be called for each message received on the
914 stream.
915 on_callback_error:
916 A callable that will be called if an exception is raised in
917 the provided `callback`.
918 """
919 if self.is_active:
920 raise ValueError("This manager is already open.")
921
922 if self._closed:
923 raise ValueError("This manager has been closed and can not be re-used.")
924
925 self._callback = functools.partial(
926 _wrap_callback_errors, callback, on_callback_error
927 )
928
929 # Create the RPC
930 stream_ack_deadline_seconds = self._stream_ack_deadline
931
932 get_initial_request = functools.partial(
933 self._get_initial_request, stream_ack_deadline_seconds
934 )
935 self._rpc = bidi.ResumableBidiRpc(
936 start_rpc=self._client.streaming_pull,
937 initial_request=get_initial_request,
938 should_recover=self._should_recover,
939 should_terminate=self._should_terminate,
940 metadata=self._stream_metadata,
941 throttle_reopen=True,
942 )
943 self._rpc.add_done_callback(self._on_rpc_done)
944
945 _LOGGER.debug(
946 "Creating a stream, default ACK deadline set to {} seconds.".format(
947 self._stream_ack_deadline
948 )
949 )
950
951 # Create references to threads
952 assert self._scheduler is not None
953 scheduler_queue = self._scheduler.queue
954 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
955
956 # `on_fatal_exception` is only available in more recent library versions.
957 # For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
958 if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
959 self._consumer = bidi.BackgroundConsumer(
960 self._rpc,
961 self._on_response,
962 on_fatal_exception=self._on_fatal_exception,
963 )
964 else:
965 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
966
967 self._leaser = leaser.Leaser(self)
968 self._heartbeater = heartbeater.Heartbeater(self)
969
970 # Start the thread to pass the requests.
971 self._dispatcher.start()
972
973 # Start consuming messages.
974 self._consumer.start()
975
976 # Start the lease maintainer thread.
977 self._leaser.start()
978
979 # Start the stream heartbeater thread.
980 self._heartbeater.start()
981
982 def close(self, reason: Any = None) -> None:
983 """Stop consuming messages and shutdown all helper threads.
984
985 This method is idempotent. Additional calls will have no effect.
986
987 The method does not block, it delegates the shutdown operations to a background
988 thread.
989
990 Args:
991 reason:
992 The reason to close this. If ``None``, this is considered
993 an "intentional" shutdown. This is passed to the callbacks
994 specified via :meth:`add_close_callback`.
995 """
996 self._regular_shutdown_thread = threading.Thread(
997 name=_REGULAR_SHUTDOWN_THREAD_NAME,
998 daemon=True,
999 target=self._shutdown,
1000 kwargs={"reason": reason},
1001 )
1002 self._regular_shutdown_thread.start()
1003
1004 def _shutdown(self, reason: Any = None) -> None:
1005 """Run the actual shutdown sequence (stop the stream and all helper threads).
1006
1007 Args:
1008 reason:
1009 The reason to close the stream. If ``None``, this is considered
1010 an "intentional" shutdown.
1011 """
1012 with self._closing:
1013 if self._closed:
1014 return
1015
1016 # Stop consuming messages.
1017 if self.is_active:
1018 _LOGGER.debug("Stopping consumer.")
1019 assert self._consumer is not None
1020 self._consumer.stop()
1021 self._consumer = None
1022
1023 # Shutdown all helper threads
1024 _LOGGER.debug("Stopping scheduler.")
1025 assert self._scheduler is not None
1026 dropped_messages = self._scheduler.shutdown(
1027 await_msg_callbacks=self._await_callbacks_on_shutdown
1028 )
1029 self._scheduler = None
1030
1031 # Leaser and dispatcher reference each other through the shared
1032 # StreamingPullManager instance, i.e. "self", thus do not set their
1033 # references to None until both have been shut down.
1034 #
1035 # NOTE: Even if the dispatcher operates on an inactive leaser using
1036 # the latter's add() and remove() methods, these have no impact on
1037 # the stopped leaser (the leaser is never again re-started). Ditto
1038 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
1039 # because the consumer gets shut down first.
1040 _LOGGER.debug("Stopping leaser.")
1041 assert self._leaser is not None
1042 self._leaser.stop()
1043
1044 total = len(dropped_messages) + len(
1045 self._messages_on_hold._messages_on_hold
1046 )
1047 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
1048 messages_to_nack = itertools.chain(
1049 dropped_messages, self._messages_on_hold._messages_on_hold
1050 )
1051 for msg in messages_to_nack:
1052 msg.nack()
1053
1054 _LOGGER.debug("Stopping dispatcher.")
1055 assert self._dispatcher is not None
1056 self._dispatcher.stop()
1057 self._dispatcher = None
1058 # dispatcher terminated, OK to dispose the leaser reference now
1059 self._leaser = None
1060
1061 _LOGGER.debug("Stopping heartbeater.")
1062 assert self._heartbeater is not None
1063 self._heartbeater.stop()
1064 self._heartbeater = None
1065
1066 self._rpc = None
1067 self._closed = True
1068 _LOGGER.debug("Finished stopping manager.")
1069
1070 for callback in self._close_callbacks:
1071 callback(self, reason)
1072
1073 def _get_initial_request(
1074 self, stream_ack_deadline_seconds: int
1075 ) -> gapic_types.StreamingPullRequest:
1076 """Return the initial request for the RPC.
1077
1078 This defines the initial request that must always be sent to Pub/Sub
1079 immediately upon opening the subscription.
1080
1081 Args:
1082 stream_ack_deadline_seconds:
1083 The default message acknowledge deadline for the stream.
1084
1085 Returns:
1086 A request suitable for being the first request on the stream (and not
1087 suitable for any other purpose).
1088 """
1089 # Put the request together.
1090 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
1091 # anyway. Set to some big-ish value in case we modack late.
1092 request = gapic_types.StreamingPullRequest(
1093 stream_ack_deadline_seconds=stream_ack_deadline_seconds,
1094 modify_deadline_ack_ids=[],
1095 modify_deadline_seconds=[],
1096 subscription=self._subscription,
1097 client_id=self._client_id,
1098 max_outstanding_messages=(
1099 0 if self._use_legacy_flow_control else self._flow_control.max_messages
1100 ),
1101 max_outstanding_bytes=(
1102 0 if self._use_legacy_flow_control else self._flow_control.max_bytes
1103 ),
1104 )
1105
1106 # Return the initial request.
1107 return request
1108
1109 def _send_lease_modacks(
1110 self,
1111 ack_ids: Iterable[str],
1112 ack_deadline: float,
1113 opentelemetry_data: List[SubscribeOpenTelemetry],
1114 warn_on_invalid=True,
1115 receipt_modack: bool = False,
1116 ) -> Set[str]:
1117 exactly_once_enabled = False
1118
1119 modack_span: Optional[trace.Span] = None
1120 if self._client.open_telemetry_enabled:
1121 subscribe_span_links: List[trace.Link] = []
1122 subscribe_spans: List[trace.Span] = []
1123 subscription_split: List[str] = self._subscription.split("/")
1124 assert len(subscription_split) == 4
1125 subscription_id: str = subscription_split[3]
1126 project_id: str = subscription_split[1]
1127 for data in opentelemetry_data:
1128 subscribe_span: Optional[trace.Span] = data.subscribe_span
1129 if (
1130 subscribe_span
1131 and subscribe_span.get_span_context().trace_flags.sampled
1132 ):
1133 subscribe_span_links.append(
1134 trace.Link(subscribe_span.get_span_context())
1135 )
1136 subscribe_spans.append(subscribe_span)
1137 modack_span = start_modack_span(
1138 subscribe_span_links,
1139 subscription_id,
1140 len(opentelemetry_data),
1141 ack_deadline,
1142 project_id,
1143 "_send_lease_modacks",
1144 receipt_modack,
1145 )
1146 if (
1147 modack_span and modack_span.get_span_context().trace_flags.sampled
1148 ): # pragma: NO COVER
1149 modack_span_context: trace.SpanContext = modack_span.get_span_context()
1150 for subscribe_span in subscribe_spans:
1151 subscribe_span.add_link(
1152 context=modack_span_context,
1153 attributes={
1154 "messaging.operation.name": "modack",
1155 },
1156 )
1157
1158 with self._exactly_once_enabled_lock:
1159 exactly_once_enabled = self._exactly_once_enabled
1160 if exactly_once_enabled:
1161 eod_items: List[requests.ModAckRequest] = []
1162 if self._client.open_telemetry_enabled:
1163 for ack_id, data in zip(
1164 ack_ids, opentelemetry_data
1165 ): # pragma: NO COVER # Identical code covered in the same function below
1166 assert data is not None
1167 eod_items.append(
1168 requests.ModAckRequest(
1169 ack_id,
1170 ack_deadline,
1171 futures.Future(),
1172 data,
1173 )
1174 )
1175 else:
1176 eod_items = [
1177 requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1178 for ack_id in ack_ids
1179 ]
1180
1181 assert self._dispatcher is not None
1182 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline)
1183 if (
1184 modack_span
1185 ): # pragma: NO COVER # Identical code covered in the same function below
1186 modack_span.end()
1187 expired_ack_ids = set()
1188 for req in eod_items:
1189 try:
1190 assert req.future is not None
1191 req.future.result()
1192 except AcknowledgeError as ack_error:
1193 if (
1194 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
1195 or warn_on_invalid
1196 ):
1197 _LOGGER.warning(
1198 "AcknowledgeError when lease-modacking a message.",
1199 exc_info=True,
1200 )
1201 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1202 expired_ack_ids.add(req.ack_id)
1203 return expired_ack_ids
1204 else:
1205 items: List[requests.ModAckRequest] = []
1206 if self._client.open_telemetry_enabled:
1207 for ack_id, data in zip(ack_ids, opentelemetry_data):
1208 assert data is not None
1209 items.append(
1210 requests.ModAckRequest(
1211 ack_id,
1212 self.ack_deadline,
1213 None,
1214 data,
1215 )
1216 )
1217 else:
1218 items = [
1219 requests.ModAckRequest(ack_id, self.ack_deadline, None)
1220 for ack_id in ack_ids
1221 ]
1222 assert self._dispatcher is not None
1223 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1224 if modack_span:
1225 modack_span.end()
1226 return set()
1227
1228 def _exactly_once_delivery_enabled(self) -> bool:
1229 """Whether exactly-once delivery is enabled for the subscription."""
1230 with self._exactly_once_enabled_lock:
1231 return self._exactly_once_enabled
1232
1233 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
1234 """Process all received Pub/Sub messages.
1235
1236 For each message, send a modified acknowledgment request to the
1237 server. This prevents expiration of the message due to buffering by
1238 gRPC or proxy/firewall. This makes the server and client expiration
1239 timer closer to each other thus preventing the message being
1240 redelivered multiple times.
1241
1242 After the messages have all had their ack deadline updated, execute
1243 the callback for each message using the executor.
1244 """
1245 if response is None:
1246 _LOGGER.debug(
1247 "Response callback invoked with None, likely due to a "
1248 "transport shutdown."
1249 )
1250 return
1251
1252 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
1253 # protobuf message to significantly gain on attribute access performance.
1254 received_messages = response._pb.received_messages
1255
1256 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = []
1257 if self._client.open_telemetry_enabled:
1258 for received_message in received_messages:
1259 opentelemetry_data = SubscribeOpenTelemetry(received_message.message)
1260 opentelemetry_data.start_subscribe_span(
1261 self._subscription,
1262 response.subscription_properties.exactly_once_delivery_enabled,
1263 received_message.ack_id,
1264 received_message.delivery_attempt,
1265 )
1266 subscribe_opentelemetry.append(opentelemetry_data)
1267
1268 _LOGGER.debug(
1269 "Processing %s received message(s), currently on hold %s (bytes %s).",
1270 len(received_messages),
1271 self._messages_on_hold.size,
1272 self._on_hold_bytes,
1273 )
1274
1275 with self._exactly_once_enabled_lock:
1276 if (
1277 response.subscription_properties.exactly_once_delivery_enabled
1278 != self._exactly_once_enabled
1279 ):
1280 self._exactly_once_enabled = (
1281 response.subscription_properties.exactly_once_delivery_enabled
1282 )
1283 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled
1284 # This method acquires the self._ack_deadline_lock lock.
1285 self._obtain_ack_deadline(maybe_update=True)
1286 self._send_new_ack_deadline = True
1287
1288 # Immediately (i.e. without waiting for the auto lease management)
1289 # modack the messages we received, as this tells the server that we've
1290 # received them.
1291 ack_id_gen = (message.ack_id for message in received_messages)
1292 expired_ack_ids = self._send_lease_modacks(
1293 ack_id_gen,
1294 self.ack_deadline,
1295 subscribe_opentelemetry,
1296 warn_on_invalid=False,
1297 receipt_modack=True,
1298 )
1299
1300 if len(expired_ack_ids):
1301 _EXPIRY_LOGGER.debug(
1302 "ack ids %s were dropped as they have already expired.", expired_ack_ids
1303 )
1304
1305 with self._pause_resume_lock:
1306 if self._scheduler is None or self._leaser is None:
1307 _LOGGER.debug(
1308 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
1309 )
1310 return
1311
1312 i: int = 0
1313 for received_message in received_messages:
1314 if (
1315 not self._exactly_once_delivery_enabled()
1316 or received_message.ack_id not in expired_ack_ids
1317 ):
1318 message = google.cloud.pubsub_v1.subscriber.message.Message(
1319 received_message.message,
1320 received_message.ack_id,
1321 received_message.delivery_attempt,
1322 self._scheduler.queue,
1323 self._exactly_once_delivery_enabled,
1324 )
1325 if self._client.open_telemetry_enabled:
1326 message.opentelemetry_data = subscribe_opentelemetry[i]
1327 i = i + 1
1328 self._messages_on_hold.put(message)
1329 self._on_hold_bytes += message.size
1330 req = requests.LeaseRequest(
1331 ack_id=message.ack_id,
1332 byte_size=message.size,
1333 ordering_key=message.ordering_key,
1334 opentelemetry_data=message.opentelemetry_data,
1335 )
1336 self._leaser.add([req])
1337
1338 self._maybe_release_messages()
1339
1340 self.maybe_pause_consumer()
1341
1342 def _on_fatal_exception(self, exception: BaseException) -> None:
1343 """
1344 Called whenever `self.consumer` receives a non-retryable exception.
1345 We close the manager on such non-retryable cases.
1346 """
1347 _LOGGER.info(
1348 "Streaming pull terminating after receiving non-recoverable error: %s",
1349 exception,
1350 )
1351 self.close(exception)
1352
1353 def _should_recover(self, exception: BaseException) -> bool:
1354 """Determine if an error on the RPC stream should be recovered.
1355
1356 If the exception is one of the retryable exceptions, this will signal
1357 to the consumer thread that it should "recover" from the failure.
1358
1359 This will cause the stream to exit when it returns :data:`False`.
1360
1361 Returns:
1362 Indicates if the caller should recover or shut down.
1363 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1364 in a list of retryable / idempotent exceptions.
1365 """
1366 exception = _wrap_as_exception(exception)
1367 # If this is in the list of idempotent exceptions, then we want to
1368 # recover.
1369 if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1370 _STREAMS_LOGGER.debug(
1371 "Observed recoverable stream error %s, reopening stream", exception
1372 )
1373 return True
1374 _STREAMS_LOGGER.debug(
1375 "Observed non-recoverable stream error %s, shutting down stream", exception
1376 )
1377 return False
1378
1379 def _should_terminate(self, exception: BaseException) -> bool:
1380 """Determine if an error on the RPC stream should be terminated.
1381
1382 If the exception is one of the terminating exceptions, this will signal
1383 to the consumer thread that it should terminate.
1384
1385 This will cause the stream to exit when it returns :data:`True`.
1386
1387 Returns:
1388 Indicates if the caller should terminate or attempt recovery.
1389 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1390 in a list of terminating exceptions.
1391 """
1392 exception = _wrap_as_exception(exception)
1393 is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1394 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1395 if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1396 _STREAMS_LOGGER.debug(
1397 "Observed terminating stream error %s, shutting down stream", exception
1398 )
1399 return True
1400 _STREAMS_LOGGER.debug(
1401 "Observed non-terminating stream error %s, attempting to reopen", exception
1402 )
1403 return False
1404
1405 def _on_rpc_done(self, future: Any) -> None:
1406 """Triggered whenever the underlying RPC terminates without recovery.
1407
1408 This is typically triggered from one of two threads: the background
1409 consumer thread (when calling ``recv()`` produces a non-recoverable
1410 error) or the grpc management thread (when cancelling the RPC).
1411
1412 This method is *non-blocking*. It will start another thread to deal
1413 with shutting everything down. This is to prevent blocking in the
1414 background consumer and preventing it from being ``joined()``.
1415 """
1416 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.")
1417 error = _wrap_as_exception(future)
1418 thread = threading.Thread(
1419 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
1420 )
1421 thread.daemon = True
1422 thread.start()