1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import collections
18import functools
19import inspect
20import itertools
21import logging
22import threading
23import typing
24from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
25import uuid
26
27from opentelemetry import trace
28import grpc # type: ignore
29
30from google.api_core import bidi
31from google.api_core import exceptions
32from google.cloud.pubsub_v1 import types
33from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
34from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
35from google.cloud.pubsub_v1.subscriber._protocol import histogram
36from google.cloud.pubsub_v1.subscriber._protocol import leaser
37from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
38from google.cloud.pubsub_v1.subscriber._protocol import requests
39from google.cloud.pubsub_v1.subscriber.exceptions import (
40 AcknowledgeError,
41 AcknowledgeStatus,
42)
43from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
44 SubscribeOpenTelemetry,
45)
46import google.cloud.pubsub_v1.subscriber.message
47from google.cloud.pubsub_v1.subscriber import futures
48from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
49from google.pubsub_v1 import types as gapic_types
50from grpc_status import rpc_status # type: ignore
51from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
52from google.rpc import code_pb2 # type: ignore
53from google.rpc import status_pb2
54from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
55 start_modack_span,
56)
57
58if typing.TYPE_CHECKING: # pragma: NO COVER
59 from google.cloud.pubsub_v1 import subscriber
60
61
62_LOGGER = logging.getLogger(__name__)
63_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
64_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
65_RETRYABLE_STREAM_ERRORS = (
66 exceptions.Aborted,
67 exceptions.DeadlineExceeded,
68 exceptions.GatewayTimeout,
69 exceptions.InternalServerError,
70 exceptions.ResourceExhausted,
71 exceptions.ServiceUnavailable,
72 exceptions.Unknown,
73)
74_TERMINATING_STREAM_ERRORS = (
75 exceptions.Cancelled,
76 exceptions.InvalidArgument,
77 exceptions.NotFound,
78 exceptions.PermissionDenied,
79 exceptions.Unauthenticated,
80 exceptions.Unauthorized,
81)
82_MAX_LOAD = 1.0
83"""The load threshold above which to pause the incoming message stream."""
84
85_RESUME_THRESHOLD = 0.8
86"""The load threshold below which to resume the incoming message stream."""
87
88_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60
89"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
90a subscription. We do this to reduce premature ack expiration.
91"""
92
93_DEFAULT_STREAM_ACK_DEADLINE: float = 60
94"""The default stream ack deadline in seconds."""
95
96_MAX_STREAM_ACK_DEADLINE: float = 600
97"""The maximum stream ack deadline in seconds."""
98
99_MIN_STREAM_ACK_DEADLINE: float = 10
100"""The minimum stream ack deadline in seconds."""
101
102_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
103 code_pb2.DEADLINE_EXCEEDED,
104 code_pb2.RESOURCE_EXHAUSTED,
105 code_pb2.ABORTED,
106 code_pb2.INTERNAL,
107 code_pb2.UNAVAILABLE,
108}
109
110# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111# callers on unrecoverable errors. We can only pass this arg if it's available in the
112# `BackgroundConsumer` spec.
113_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114 bidi.BackgroundConsumer
115)
116
117
118def _wrap_as_exception(maybe_exception: Any) -> BaseException:
119 """Wrap an object as a Python exception, if needed.
120
121 Args:
122 maybe_exception: The object to wrap, usually a gRPC exception class.
123
124 Returns:
125 The argument itself if an instance of ``BaseException``, otherwise
126 the argument represented as an instance of ``Exception`` (sub)class.
127 """
128 if isinstance(maybe_exception, grpc.RpcError):
129 return exceptions.from_grpc_error(maybe_exception)
130 elif isinstance(maybe_exception, BaseException):
131 return maybe_exception
132
133 return Exception(maybe_exception)
134
135
136def _wrap_callback_errors(
137 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
138 on_callback_error: Callable[[BaseException], Any],
139 message: "google.cloud.pubsub_v1.subscriber.message.Message",
140):
141 """Wraps a user callback so that if an exception occurs the message is
142 nacked.
143
144 Args:
145 callback: The user callback.
146 message: The Pub/Sub message.
147 """
148 try:
149 if message.opentelemetry_data:
150 message.opentelemetry_data.end_subscribe_concurrency_control_span()
151 with message.opentelemetry_data:
152 callback(message)
153 else:
154 callback(message)
155 except BaseException as exc:
156 # Note: the likelihood of this failing is extremely low. This just adds
157 # a message to a queue, so if this doesn't work the world is in an
158 # unrecoverable state and this thread should just bail.
159 _LOGGER.exception(
160 "Top-level exception occurred in callback while processing a message"
161 )
162 message.nack()
163 on_callback_error(exc)
164
165
166def _get_status(
167 exc: exceptions.GoogleAPICallError,
168) -> Optional["status_pb2.Status"]:
169 if not exc.response:
170 _LOGGER.debug("No response obj in errored RPC call.")
171 return None
172 try:
173 return rpc_status.from_call(exc.response)
174 # Possible "If the gRPC call’s code or details are inconsistent
175 # with the status code and message inside of the
176 # google.rpc.status.Status"
177 except ValueError:
178 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True)
179 return None
180
181
182def _get_ack_errors(
183 exc: exceptions.GoogleAPICallError,
184) -> Optional[Dict[str, str]]:
185 status = _get_status(exc)
186 if not status:
187 _LOGGER.debug("Unable to get status of errored RPC.")
188 return None
189 for detail in status.details:
190 info = ErrorInfo()
191 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)):
192 _LOGGER.debug("Unable to unpack ErrorInfo.")
193 return None
194 return info.metadata
195 return None
196
197
198def _process_requests(
199 error_status: Optional["status_pb2.Status"],
200 ack_reqs_dict: Dict[str, requests.AckRequest],
201 errors_dict: Optional[Dict[str, str]],
202):
203 """Process requests when exactly-once delivery is enabled by referring to
204 error_status and errors_dict.
205
206 The errors returned by the server in as `error_status` or in `errors_dict`
207 are used to complete the request futures in `ack_reqs_dict` (with a success
208 or exception) or to return requests for further retries.
209 """
210 requests_completed = []
211 requests_to_retry = []
212 for ack_id in ack_reqs_dict:
213 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
214 # sidecar metadata when exactly-once delivery is enabled.
215 if errors_dict and ack_id in errors_dict:
216 exactly_once_error = errors_dict[ack_id]
217 if exactly_once_error.startswith("TRANSIENT_"):
218 requests_to_retry.append(ack_reqs_dict[ack_id])
219 else:
220 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
221 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
222 else:
223 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
224 future = ack_reqs_dict[ack_id].future
225 if future is not None:
226 future.set_exception(exc)
227 requests_completed.append(ack_reqs_dict[ack_id])
228 # Temporary GRPC errors are retried
229 elif (
230 error_status
231 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
232 ):
233 requests_to_retry.append(ack_reqs_dict[ack_id])
234 # Other GRPC errors are NOT retried
235 elif error_status:
236 if error_status.code == code_pb2.PERMISSION_DENIED:
237 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
238 elif error_status.code == code_pb2.FAILED_PRECONDITION:
239 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
240 else:
241 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
242 future = ack_reqs_dict[ack_id].future
243 if future is not None:
244 future.set_exception(exc)
245 requests_completed.append(ack_reqs_dict[ack_id])
246 # Since no error occurred, requests with futures are completed successfully.
247 elif ack_reqs_dict[ack_id].future:
248 future = ack_reqs_dict[ack_id].future
249 # success
250 assert future is not None
251 future.set_result(AcknowledgeStatus.SUCCESS)
252 requests_completed.append(ack_reqs_dict[ack_id])
253 # All other requests are considered completed.
254 else:
255 requests_completed.append(ack_reqs_dict[ack_id])
256
257 return requests_completed, requests_to_retry
258
259
260class StreamingPullManager(object):
261 """The streaming pull manager coordinates pulling messages from Pub/Sub,
262 leasing them, and scheduling them to be processed.
263
264 Args:
265 client:
266 The subscriber client used to create this instance.
267 subscription:
268 The name of the subscription. The canonical format for this is
269 ``projects/{project}/subscriptions/{subscription}``.
270 flow_control:
271 The flow control settings.
272 scheduler:
273 The scheduler to use to process messages. If not provided, a thread
274 pool-based scheduler will be used.
275 use_legacy_flow_control:
276 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
277 though client-side flow control is still enabled. If set to ``False``
278 (default), both server-side and client-side flow control are enabled.
279 await_callbacks_on_shutdown:
280 If ``True``, the shutdown thread will wait until all scheduler threads
281 terminate and only then proceed with shutting down the remaining running
282 helper threads.
283
284 If ``False`` (default), the shutdown thread will shut the scheduler down,
285 but it will not wait for the currently executing scheduler threads to
286 terminate.
287
288 This setting affects when the on close callbacks get invoked, and
289 consequently, when the StreamingPullFuture associated with the stream gets
290 resolved.
291 """
292
293 def __init__(
294 self,
295 client: "subscriber.Client",
296 subscription: str,
297 flow_control: types.FlowControl = types.FlowControl(),
298 scheduler: Optional[ThreadScheduler] = None,
299 use_legacy_flow_control: bool = False,
300 await_callbacks_on_shutdown: bool = False,
301 ):
302 self._client = client
303 self._subscription = subscription
304 self._exactly_once_enabled = False
305 self._flow_control = flow_control
306 self._use_legacy_flow_control = use_legacy_flow_control
307 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
308 self._ack_histogram = histogram.Histogram()
309 self._last_histogram_size = 0
310 self._stream_metadata = [
311 ["x-goog-request-params", "subscription=" + subscription]
312 ]
313
314 # If max_duration_per_lease_extension is the default
315 # we set the stream_ack_deadline to the default of 60
316 if self._flow_control.max_duration_per_lease_extension == 0:
317 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
318 # We will not be able to extend more than the default minimum
319 elif (
320 self._flow_control.max_duration_per_lease_extension
321 < _MIN_STREAM_ACK_DEADLINE
322 ):
323 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
324 # Will not be able to extend past the max
325 elif (
326 self._flow_control.max_duration_per_lease_extension
327 > _MAX_STREAM_ACK_DEADLINE
328 ):
329 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
330 else:
331 self._stream_ack_deadline = (
332 self._flow_control.max_duration_per_lease_extension
333 )
334
335 self._ack_deadline = max(
336 min(
337 self._flow_control.min_duration_per_lease_extension,
338 histogram.MAX_ACK_DEADLINE,
339 ),
340 histogram.MIN_ACK_DEADLINE,
341 )
342
343 self._rpc: Optional[bidi.ResumableBidiRpc] = None
344 self._callback: Optional[functools.partial] = None
345 self._closing = threading.Lock()
346 self._closed = False
347 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = []
348 # Guarded by self._exactly_once_enabled_lock
349 self._send_new_ack_deadline = False
350
351 # A shutdown thread is created on intentional shutdown.
352 self._regular_shutdown_thread: Optional[threading.Thread] = None
353
354 # Generate a random client id tied to this object. All streaming pull
355 # connections (initial and re-connects) will then use the same client
356 # id. Doing so lets the server establish affinity even across stream
357 # disconncetions.
358 self._client_id = str(uuid.uuid4())
359
360 if scheduler is None:
361 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler()
362 else:
363 self._scheduler = scheduler
364
365 # A collection for the messages that have been received from the server,
366 # but not yet sent to the user callback.
367 self._messages_on_hold = messages_on_hold.MessagesOnHold()
368
369 # The total number of bytes consumed by the messages currently on hold
370 self._on_hold_bytes = 0
371
372 # A lock ensuring that pausing / resuming the consumer are both atomic
373 # operations that cannot be executed concurrently. Needed for properly
374 # syncing these operations with the current leaser load. Additionally,
375 # the lock is used to protect modifications of internal data that
376 # affects the load computation, i.e. the count and size of the messages
377 # currently on hold.
378 self._pause_resume_lock = threading.Lock()
379
380 # A lock guarding the self._exactly_once_enabled variable. We may also
381 # acquire the self._ack_deadline_lock while this lock is held, but not
382 # the reverse. So, we maintain a simple ordering of these two locks to
383 # prevent deadlocks.
384 self._exactly_once_enabled_lock = threading.Lock()
385
386 # A lock protecting the current ACK deadline used in the lease management. This
387 # value can be potentially updated both by the leaser thread and by the message
388 # consumer thread when invoking the internal _on_response() callback.
389 self._ack_deadline_lock = threading.Lock()
390
391 # The threads created in ``.open()``.
392 self._dispatcher: Optional[dispatcher.Dispatcher] = None
393 self._leaser: Optional[leaser.Leaser] = None
394 self._consumer: Optional[bidi.BackgroundConsumer] = None
395 self._heartbeater: Optional[heartbeater.Heartbeater] = None
396
397 @property
398 def is_active(self) -> bool:
399 """``True`` if this manager is actively streaming.
400
401 Note that ``False`` does not indicate this is complete shut down,
402 just that it stopped getting new messages.
403 """
404 return self._consumer is not None and self._consumer.is_active
405
406 @property
407 def flow_control(self) -> types.FlowControl:
408 """The active flow control settings."""
409 return self._flow_control
410
411 @property
412 def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
413 """The dispatcher helper."""
414 return self._dispatcher
415
416 @property
417 def leaser(self) -> Optional[leaser.Leaser]:
418 """The leaser helper."""
419 return self._leaser
420
421 @property
422 def ack_histogram(self) -> histogram.Histogram:
423 """The histogram tracking time-to-acknowledge."""
424 return self._ack_histogram
425
426 @property
427 def ack_deadline(self) -> float:
428 """Return the current ACK deadline based on historical data without updating it.
429
430 Returns:
431 The ack deadline.
432 """
433 return self._obtain_ack_deadline(maybe_update=False)
434
435 def _obtain_ack_deadline(self, maybe_update: bool) -> float:
436 """The actual `ack_deadline` implementation.
437
438 This method is "sticky". It will only perform the computations to check on the
439 right ACK deadline if explicitly requested AND if the histogram with past
440 time-to-ack data has gained a significant amount of new information.
441
442 Args:
443 maybe_update:
444 If ``True``, also update the current ACK deadline before returning it if
445 enough new ACK data has been gathered.
446
447 Returns:
448 The current ACK deadline in seconds to use.
449 """
450 with self._ack_deadline_lock:
451 if not maybe_update:
452 return self._ack_deadline
453
454 target_size = min(
455 self._last_histogram_size * 2, self._last_histogram_size + 100
456 )
457 hist_size = len(self.ack_histogram)
458
459 if hist_size > target_size:
460 self._last_histogram_size = hist_size
461 self._ack_deadline = self.ack_histogram.percentile(percent=99)
462
463 if self.flow_control.max_duration_per_lease_extension > 0:
464 # The setting in flow control could be too low, adjust if needed.
465 flow_control_setting = max(
466 self.flow_control.max_duration_per_lease_extension,
467 histogram.MIN_ACK_DEADLINE,
468 )
469 self._ack_deadline = min(self._ack_deadline, flow_control_setting)
470
471 # If the user explicitly sets a min ack_deadline, respect it.
472 if self.flow_control.min_duration_per_lease_extension > 0:
473 # The setting in flow control could be too high, adjust if needed.
474 flow_control_setting = min(
475 self.flow_control.min_duration_per_lease_extension,
476 histogram.MAX_ACK_DEADLINE,
477 )
478 self._ack_deadline = max(self._ack_deadline, flow_control_setting)
479 elif self._exactly_once_enabled:
480 # Higher minimum ack_deadline for subscriptions with
481 # exactly-once delivery enabled.
482 self._ack_deadline = max(
483 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
484 )
485 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline
486 # set the stream_ack_deadline to the new ack_deadline.
487 if self._ack_deadline > self._stream_ack_deadline:
488 self._stream_ack_deadline = self._ack_deadline
489 return self._ack_deadline
490
491 @property
492 def load(self) -> float:
493 """Return the current load.
494
495 The load is represented as a float, where 1.0 represents having
496 hit one of the flow control limits, and values between 0.0 and 1.0
497 represent how close we are to them. (0.5 means we have exactly half
498 of what the flow control setting allows, for example.)
499
500 There are (currently) two flow control settings; this property
501 computes how close the manager is to each of them, and returns
502 whichever value is higher. (It does not matter that we have lots of
503 running room on setting A if setting B is over.)
504
505 Returns:
506 The load value.
507 """
508 if self._leaser is None:
509 return 0.0
510
511 # Messages that are temporarily put on hold are not being delivered to
512 # user's callbacks, thus they should not contribute to the flow control
513 # load calculation.
514 # However, since these messages must still be lease-managed to avoid
515 # unnecessary ACK deadline expirations, their count and total size must
516 # be subtracted from the leaser's values.
517 return max(
518 [
519 (self._leaser.message_count - self._messages_on_hold.size)
520 / self._flow_control.max_messages,
521 (self._leaser.bytes - self._on_hold_bytes)
522 / self._flow_control.max_bytes,
523 ]
524 )
525
526 def add_close_callback(
527 self, callback: Callable[["StreamingPullManager", Any], Any]
528 ) -> None:
529 """Schedules a callable when the manager closes.
530
531 Args:
532 The method to call.
533 """
534 self._close_callbacks.append(callback)
535
536 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None:
537 """Send the next message in the queue for each of the passed-in
538 ordering keys, if they exist. Clean up state for keys that no longer
539 have any queued messages.
540
541 Since the load went down by one message, it's probably safe to send the
542 user another message for the same key. Since the released message may be
543 bigger than the previous one, this may increase the load above the maximum.
544 This decision is by design because it simplifies MessagesOnHold.
545
546 Args:
547 ordering_keys:
548 A sequence of ordering keys to activate. May be empty.
549 """
550 with self._pause_resume_lock:
551 if self._scheduler is None:
552 return # We are shutting down, don't try to dispatch any more messages.
553
554 self._messages_on_hold.activate_ordering_keys(
555 ordering_keys, self._schedule_message_on_hold
556 )
557
558 def maybe_pause_consumer(self) -> None:
559 """Check the current load and pause the consumer if needed."""
560 with self._pause_resume_lock:
561 if self.load >= _MAX_LOAD:
562 if self._consumer is not None and not self._consumer.is_paused:
563 _LOGGER.debug(
564 "Message backlog over load at %.2f, pausing.", self.load
565 )
566 self._consumer.pause()
567
568 def maybe_resume_consumer(self) -> None:
569 """Check the load and held messages and resume the consumer if needed.
570
571 If there are messages held internally, release those messages before
572 resuming the consumer. That will avoid leaser overload.
573 """
574 with self._pause_resume_lock:
575 # If we have been paused by flow control, check and see if we are
576 # back within our limits.
577 #
578 # In order to not thrash too much, require us to have passed below
579 # the resume threshold (80% by default) of each flow control setting
580 # before restarting.
581 if self._consumer is None or not self._consumer.is_paused:
582 return
583
584 _LOGGER.debug("Current load: %.2f", self.load)
585
586 # Before maybe resuming the background consumer, release any messages
587 # currently on hold, if the current load allows for it.
588 self._maybe_release_messages()
589
590 if self.load < _RESUME_THRESHOLD:
591 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
592 self._consumer.resume()
593 else:
594 _LOGGER.debug("Did not resume, current load is %.2f.", self.load)
595
596 def _maybe_release_messages(self) -> None:
597 """Release (some of) the held messages if the current load allows for it.
598
599 The method tries to release as many messages as the current leaser load
600 would allow. Each released message is added to the lease management,
601 and the user callback is scheduled for it.
602
603 If there are currently no messages on hold, or if the leaser is
604 already overloaded, this method is effectively a no-op.
605
606 The method assumes the caller has acquired the ``_pause_resume_lock``.
607 """
608 released_ack_ids = []
609 while self.load < _MAX_LOAD:
610 msg = self._messages_on_hold.get()
611 if not msg:
612 break
613 if msg.opentelemetry_data:
614 msg.opentelemetry_data.end_subscribe_scheduler_span()
615 self._schedule_message_on_hold(msg)
616 released_ack_ids.append(msg.ack_id)
617
618 assert self._leaser is not None
619 self._leaser.start_lease_expiry_timer(released_ack_ids)
620
621 def _schedule_message_on_hold(
622 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"
623 ):
624 """Schedule a message on hold to be sent to the user and change on-hold-bytes.
625
626 The method assumes the caller has acquired the ``_pause_resume_lock``.
627
628 Args:
629 msg: The message to schedule to be sent to the user.
630 """
631 assert msg, "Message must not be None."
632
633 # On-hold bytes goes down, increasing load.
634 self._on_hold_bytes -= msg.size
635
636 if self._on_hold_bytes < 0:
637 _LOGGER.warning(
638 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
639 )
640 self._on_hold_bytes = 0
641
642 _LOGGER.debug(
643 "Released held message, scheduling callback for it, "
644 "still on hold %s (bytes %s).",
645 self._messages_on_hold.size,
646 self._on_hold_bytes,
647 )
648 assert self._scheduler is not None
649 assert self._callback is not None
650 if msg.opentelemetry_data:
651 msg.opentelemetry_data.start_subscribe_concurrency_control_span()
652 self._scheduler.schedule(self._callback, msg)
653
654 def send_unary_ack(
655 self, ack_ids, ack_reqs_dict
656 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
657 """Send a request using a separate unary request instead of over the stream.
658
659 If a RetryError occurs, the manager shutdown is triggered, and the
660 error is re-raised.
661 """
662 assert ack_ids
663 assert len(ack_ids) == len(ack_reqs_dict)
664
665 error_status = None
666 ack_errors_dict = None
667 try:
668 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)
669 except exceptions.GoogleAPICallError as exc:
670 _LOGGER.debug(
671 "Exception while sending unary RPC. This is typically "
672 "non-fatal as stream requests are best-effort.",
673 exc_info=True,
674 )
675 error_status = _get_status(exc)
676 ack_errors_dict = _get_ack_errors(exc)
677 except exceptions.RetryError as exc:
678 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
679 # Makes sure to complete futures so they don't block forever.
680 for req in ack_reqs_dict.values():
681 # Futures may be present even with exactly-once delivery
682 # disabled, in transition periods after the setting is changed on
683 # the subscription.
684 if req.future:
685 if exactly_once_delivery_enabled:
686 e = AcknowledgeError(
687 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
688 )
689 req.future.set_exception(e)
690 else:
691 req.future.set_result(AcknowledgeStatus.SUCCESS)
692
693 _LOGGER.debug(
694 "RetryError while sending ack RPC. Waiting on a transient "
695 "error resolution for too long, will now trigger shutdown.",
696 exc_info=False,
697 )
698 # The underlying channel has been suffering from a retryable error
699 # for too long, time to give up and shut the streaming pull down.
700 self._on_rpc_done(exc)
701 raise
702
703 if self._exactly_once_delivery_enabled():
704 requests_completed, requests_to_retry = _process_requests(
705 error_status, ack_reqs_dict, ack_errors_dict
706 )
707 else:
708 requests_completed = []
709 requests_to_retry = []
710 # When exactly-once delivery is NOT enabled, acks/modacks are considered
711 # best-effort. So, they always succeed even if the RPC fails.
712 for req in ack_reqs_dict.values():
713 # Futures may be present even with exactly-once delivery
714 # disabled, in transition periods after the setting is changed on
715 # the subscription.
716 if req.future:
717 req.future.set_result(AcknowledgeStatus.SUCCESS)
718 requests_completed.append(req)
719
720 return requests_completed, requests_to_retry
721
722 def send_unary_modack(
723 self,
724 modify_deadline_ack_ids,
725 modify_deadline_seconds,
726 ack_reqs_dict,
727 default_deadline=None,
728 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
729 """Send a request using a separate unary request instead of over the stream.
730
731 If a RetryError occurs, the manager shutdown is triggered, and the
732 error is re-raised.
733 """
734 assert modify_deadline_ack_ids
735 # Either we have a generator or a single deadline.
736 assert modify_deadline_seconds is None or default_deadline is None
737
738 error_status = None
739 modack_errors_dict = None
740 try:
741 if default_deadline is None:
742 # Send ack_ids with the same deadline seconds together.
743 deadline_to_ack_ids = collections.defaultdict(list)
744
745 for n, ack_id in enumerate(modify_deadline_ack_ids):
746 deadline = modify_deadline_seconds[n]
747 deadline_to_ack_ids[deadline].append(ack_id)
748
749 for deadline, ack_ids in deadline_to_ack_ids.items():
750 self._client.modify_ack_deadline(
751 subscription=self._subscription,
752 ack_ids=ack_ids,
753 ack_deadline_seconds=deadline,
754 )
755 else:
756 # We can send all requests with the default deadline.
757 self._client.modify_ack_deadline(
758 subscription=self._subscription,
759 ack_ids=modify_deadline_ack_ids,
760 ack_deadline_seconds=default_deadline,
761 )
762 except exceptions.GoogleAPICallError as exc:
763 _LOGGER.debug(
764 "Exception while sending unary RPC. This is typically "
765 "non-fatal as stream requests are best-effort.",
766 exc_info=True,
767 )
768 error_status = _get_status(exc)
769 modack_errors_dict = _get_ack_errors(exc)
770 except exceptions.RetryError as exc:
771 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
772 # Makes sure to complete futures so they don't block forever.
773 for req in ack_reqs_dict.values():
774 # Futures may be present even with exactly-once delivery
775 # disabled, in transition periods after the setting is changed on
776 # the subscription.
777 if req.future:
778 if exactly_once_delivery_enabled:
779 e = AcknowledgeError(
780 AcknowledgeStatus.OTHER,
781 "RetryError while sending modack RPC.",
782 )
783 req.future.set_exception(e)
784 else:
785 req.future.set_result(AcknowledgeStatus.SUCCESS)
786
787 _LOGGER.debug(
788 "RetryError while sending modack RPC. Waiting on a transient "
789 "error resolution for too long, will now trigger shutdown.",
790 exc_info=False,
791 )
792 # The underlying channel has been suffering from a retryable error
793 # for too long, time to give up and shut the streaming pull down.
794 self._on_rpc_done(exc)
795 raise
796
797 if self._exactly_once_delivery_enabled():
798 requests_completed, requests_to_retry = _process_requests(
799 error_status, ack_reqs_dict, modack_errors_dict
800 )
801 else:
802 requests_completed = []
803 requests_to_retry = []
804 # When exactly-once delivery is NOT enabled, acks/modacks are considered
805 # best-effort. So, they always succeed even if the RPC fails.
806 for req in ack_reqs_dict.values():
807 # Futures may be present even with exactly-once delivery
808 # disabled, in transition periods after the setting is changed on
809 # the subscription.
810 if req.future:
811 req.future.set_result(AcknowledgeStatus.SUCCESS)
812 requests_completed.append(req)
813
814 return requests_completed, requests_to_retry
815
816 def heartbeat(self) -> bool:
817 """Sends a heartbeat request over the streaming pull RPC.
818
819 The request is empty by default, but may contain the current ack_deadline
820 if the self._exactly_once_enabled flag has changed.
821
822 Returns:
823 If a heartbeat request has actually been sent.
824 """
825 if self._rpc is not None and self._rpc.is_active:
826 send_new_ack_deadline = False
827 with self._exactly_once_enabled_lock:
828 send_new_ack_deadline = self._send_new_ack_deadline
829 self._send_new_ack_deadline = False
830
831 if send_new_ack_deadline:
832 request = gapic_types.StreamingPullRequest(
833 stream_ack_deadline_seconds=self._stream_ack_deadline
834 )
835 _LOGGER.debug(
836 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
837 )
838 else:
839 request = gapic_types.StreamingPullRequest()
840
841 self._rpc.send(request)
842 return True
843
844 return False
845
846 def open(
847 self,
848 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
849 on_callback_error: Callable[[Exception], Any],
850 ) -> None:
851 """Begin consuming messages.
852
853 Args:
854 callback:
855 A callback that will be called for each message received on the
856 stream.
857 on_callback_error:
858 A callable that will be called if an exception is raised in
859 the provided `callback`.
860 """
861 if self.is_active:
862 raise ValueError("This manager is already open.")
863
864 if self._closed:
865 raise ValueError("This manager has been closed and can not be re-used.")
866
867 self._callback = functools.partial(
868 _wrap_callback_errors, callback, on_callback_error
869 )
870
871 # Create the RPC
872 stream_ack_deadline_seconds = self._stream_ack_deadline
873
874 get_initial_request = functools.partial(
875 self._get_initial_request, stream_ack_deadline_seconds
876 )
877 self._rpc = bidi.ResumableBidiRpc(
878 start_rpc=self._client.streaming_pull,
879 initial_request=get_initial_request,
880 should_recover=self._should_recover,
881 should_terminate=self._should_terminate,
882 metadata=self._stream_metadata,
883 throttle_reopen=True,
884 )
885 self._rpc.add_done_callback(self._on_rpc_done)
886
887 _LOGGER.debug(
888 "Creating a stream, default ACK deadline set to {} seconds.".format(
889 self._stream_ack_deadline
890 )
891 )
892
893 # Create references to threads
894 assert self._scheduler is not None
895 scheduler_queue = self._scheduler.queue
896 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
897
898 # `on_fatal_exception` is only available in more recent library versions.
899 # For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
900 if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
901 self._consumer = bidi.BackgroundConsumer(
902 self._rpc,
903 self._on_response,
904 on_fatal_exception=self._on_fatal_exception,
905 )
906 else:
907 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
908
909 self._leaser = leaser.Leaser(self)
910 self._heartbeater = heartbeater.Heartbeater(self)
911
912 # Start the thread to pass the requests.
913 self._dispatcher.start()
914
915 # Start consuming messages.
916 self._consumer.start()
917
918 # Start the lease maintainer thread.
919 self._leaser.start()
920
921 # Start the stream heartbeater thread.
922 self._heartbeater.start()
923
924 def close(self, reason: Any = None) -> None:
925 """Stop consuming messages and shutdown all helper threads.
926
927 This method is idempotent. Additional calls will have no effect.
928
929 The method does not block, it delegates the shutdown operations to a background
930 thread.
931
932 Args:
933 reason:
934 The reason to close this. If ``None``, this is considered
935 an "intentional" shutdown. This is passed to the callbacks
936 specified via :meth:`add_close_callback`.
937 """
938 self._regular_shutdown_thread = threading.Thread(
939 name=_REGULAR_SHUTDOWN_THREAD_NAME,
940 daemon=True,
941 target=self._shutdown,
942 kwargs={"reason": reason},
943 )
944 self._regular_shutdown_thread.start()
945
946 def _shutdown(self, reason: Any = None) -> None:
947 """Run the actual shutdown sequence (stop the stream and all helper threads).
948
949 Args:
950 reason:
951 The reason to close the stream. If ``None``, this is considered
952 an "intentional" shutdown.
953 """
954 with self._closing:
955 if self._closed:
956 return
957
958 # Stop consuming messages.
959 if self.is_active:
960 _LOGGER.debug("Stopping consumer.")
961 assert self._consumer is not None
962 self._consumer.stop()
963 self._consumer = None
964
965 # Shutdown all helper threads
966 _LOGGER.debug("Stopping scheduler.")
967 assert self._scheduler is not None
968 dropped_messages = self._scheduler.shutdown(
969 await_msg_callbacks=self._await_callbacks_on_shutdown
970 )
971 self._scheduler = None
972
973 # Leaser and dispatcher reference each other through the shared
974 # StreamingPullManager instance, i.e. "self", thus do not set their
975 # references to None until both have been shut down.
976 #
977 # NOTE: Even if the dispatcher operates on an inactive leaser using
978 # the latter's add() and remove() methods, these have no impact on
979 # the stopped leaser (the leaser is never again re-started). Ditto
980 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
981 # because the consumer gets shut down first.
982 _LOGGER.debug("Stopping leaser.")
983 assert self._leaser is not None
984 self._leaser.stop()
985
986 total = len(dropped_messages) + len(
987 self._messages_on_hold._messages_on_hold
988 )
989 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
990 messages_to_nack = itertools.chain(
991 dropped_messages, self._messages_on_hold._messages_on_hold
992 )
993 for msg in messages_to_nack:
994 msg.nack()
995
996 _LOGGER.debug("Stopping dispatcher.")
997 assert self._dispatcher is not None
998 self._dispatcher.stop()
999 self._dispatcher = None
1000 # dispatcher terminated, OK to dispose the leaser reference now
1001 self._leaser = None
1002
1003 _LOGGER.debug("Stopping heartbeater.")
1004 assert self._heartbeater is not None
1005 self._heartbeater.stop()
1006 self._heartbeater = None
1007
1008 self._rpc = None
1009 self._closed = True
1010 _LOGGER.debug("Finished stopping manager.")
1011
1012 for callback in self._close_callbacks:
1013 callback(self, reason)
1014
1015 def _get_initial_request(
1016 self, stream_ack_deadline_seconds: int
1017 ) -> gapic_types.StreamingPullRequest:
1018 """Return the initial request for the RPC.
1019
1020 This defines the initial request that must always be sent to Pub/Sub
1021 immediately upon opening the subscription.
1022
1023 Args:
1024 stream_ack_deadline_seconds:
1025 The default message acknowledge deadline for the stream.
1026
1027 Returns:
1028 A request suitable for being the first request on the stream (and not
1029 suitable for any other purpose).
1030 """
1031 # Put the request together.
1032 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
1033 # anyway. Set to some big-ish value in case we modack late.
1034 request = gapic_types.StreamingPullRequest(
1035 stream_ack_deadline_seconds=stream_ack_deadline_seconds,
1036 modify_deadline_ack_ids=[],
1037 modify_deadline_seconds=[],
1038 subscription=self._subscription,
1039 client_id=self._client_id,
1040 max_outstanding_messages=(
1041 0 if self._use_legacy_flow_control else self._flow_control.max_messages
1042 ),
1043 max_outstanding_bytes=(
1044 0 if self._use_legacy_flow_control else self._flow_control.max_bytes
1045 ),
1046 )
1047
1048 # Return the initial request.
1049 return request
1050
1051 def _send_lease_modacks(
1052 self,
1053 ack_ids: Iterable[str],
1054 ack_deadline: float,
1055 opentelemetry_data: List[SubscribeOpenTelemetry],
1056 warn_on_invalid=True,
1057 receipt_modack: bool = False,
1058 ) -> Set[str]:
1059 exactly_once_enabled = False
1060
1061 modack_span: Optional[trace.Span] = None
1062 if self._client.open_telemetry_enabled:
1063 subscribe_span_links: List[trace.Link] = []
1064 subscribe_spans: List[trace.Span] = []
1065 subscription_split: List[str] = self._subscription.split("/")
1066 assert len(subscription_split) == 4
1067 subscription_id: str = subscription_split[3]
1068 project_id: str = subscription_split[1]
1069 for data in opentelemetry_data:
1070 subscribe_span: Optional[trace.Span] = data.subscribe_span
1071 if (
1072 subscribe_span
1073 and subscribe_span.get_span_context().trace_flags.sampled
1074 ):
1075 subscribe_span_links.append(
1076 trace.Link(subscribe_span.get_span_context())
1077 )
1078 subscribe_spans.append(subscribe_span)
1079 modack_span = start_modack_span(
1080 subscribe_span_links,
1081 subscription_id,
1082 len(opentelemetry_data),
1083 ack_deadline,
1084 project_id,
1085 "_send_lease_modacks",
1086 receipt_modack,
1087 )
1088 if (
1089 modack_span and modack_span.get_span_context().trace_flags.sampled
1090 ): # pragma: NO COVER
1091 modack_span_context: trace.SpanContext = modack_span.get_span_context()
1092 for subscribe_span in subscribe_spans:
1093 subscribe_span.add_link(
1094 context=modack_span_context,
1095 attributes={
1096 "messaging.operation.name": "modack",
1097 },
1098 )
1099
1100 with self._exactly_once_enabled_lock:
1101 exactly_once_enabled = self._exactly_once_enabled
1102 if exactly_once_enabled:
1103 eod_items: List[requests.ModAckRequest] = []
1104 if self._client.open_telemetry_enabled:
1105 for ack_id, data in zip(
1106 ack_ids, opentelemetry_data
1107 ): # pragma: NO COVER # Identical code covered in the same function below
1108 assert data is not None
1109 eod_items.append(
1110 requests.ModAckRequest(
1111 ack_id,
1112 ack_deadline,
1113 futures.Future(),
1114 data,
1115 )
1116 )
1117 else:
1118 eod_items = [
1119 requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1120 for ack_id in ack_ids
1121 ]
1122
1123 assert self._dispatcher is not None
1124 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline)
1125 if (
1126 modack_span
1127 ): # pragma: NO COVER # Identical code covered in the same function below
1128 modack_span.end()
1129 expired_ack_ids = set()
1130 for req in eod_items:
1131 try:
1132 assert req.future is not None
1133 req.future.result()
1134 except AcknowledgeError as ack_error:
1135 if (
1136 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
1137 or warn_on_invalid
1138 ):
1139 _LOGGER.warning(
1140 "AcknowledgeError when lease-modacking a message.",
1141 exc_info=True,
1142 )
1143 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1144 expired_ack_ids.add(req.ack_id)
1145 return expired_ack_ids
1146 else:
1147 items: List[requests.ModAckRequest] = []
1148 if self._client.open_telemetry_enabled:
1149 for ack_id, data in zip(ack_ids, opentelemetry_data):
1150 assert data is not None
1151 items.append(
1152 requests.ModAckRequest(
1153 ack_id,
1154 self.ack_deadline,
1155 None,
1156 data,
1157 )
1158 )
1159 else:
1160 items = [
1161 requests.ModAckRequest(ack_id, self.ack_deadline, None)
1162 for ack_id in ack_ids
1163 ]
1164 assert self._dispatcher is not None
1165 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1166 if modack_span:
1167 modack_span.end()
1168 return set()
1169
1170 def _exactly_once_delivery_enabled(self) -> bool:
1171 """Whether exactly-once delivery is enabled for the subscription."""
1172 with self._exactly_once_enabled_lock:
1173 return self._exactly_once_enabled
1174
1175 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
1176 """Process all received Pub/Sub messages.
1177
1178 For each message, send a modified acknowledgment request to the
1179 server. This prevents expiration of the message due to buffering by
1180 gRPC or proxy/firewall. This makes the server and client expiration
1181 timer closer to each other thus preventing the message being
1182 redelivered multiple times.
1183
1184 After the messages have all had their ack deadline updated, execute
1185 the callback for each message using the executor.
1186 """
1187 if response is None:
1188 _LOGGER.debug(
1189 "Response callback invoked with None, likely due to a "
1190 "transport shutdown."
1191 )
1192 return
1193
1194 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
1195 # protobuf message to significantly gain on attribute access performance.
1196 received_messages = response._pb.received_messages
1197
1198 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = []
1199 if self._client.open_telemetry_enabled:
1200 for received_message in received_messages:
1201 opentelemetry_data = SubscribeOpenTelemetry(received_message.message)
1202 opentelemetry_data.start_subscribe_span(
1203 self._subscription,
1204 response.subscription_properties.exactly_once_delivery_enabled,
1205 received_message.ack_id,
1206 received_message.delivery_attempt,
1207 )
1208 subscribe_opentelemetry.append(opentelemetry_data)
1209
1210 _LOGGER.debug(
1211 "Processing %s received message(s), currently on hold %s (bytes %s).",
1212 len(received_messages),
1213 self._messages_on_hold.size,
1214 self._on_hold_bytes,
1215 )
1216
1217 with self._exactly_once_enabled_lock:
1218 if (
1219 response.subscription_properties.exactly_once_delivery_enabled
1220 != self._exactly_once_enabled
1221 ):
1222 self._exactly_once_enabled = (
1223 response.subscription_properties.exactly_once_delivery_enabled
1224 )
1225 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled
1226 # This method acquires the self._ack_deadline_lock lock.
1227 self._obtain_ack_deadline(maybe_update=True)
1228 self._send_new_ack_deadline = True
1229
1230 # Immediately (i.e. without waiting for the auto lease management)
1231 # modack the messages we received, as this tells the server that we've
1232 # received them.
1233 ack_id_gen = (message.ack_id for message in received_messages)
1234 expired_ack_ids = self._send_lease_modacks(
1235 ack_id_gen,
1236 self.ack_deadline,
1237 subscribe_opentelemetry,
1238 warn_on_invalid=False,
1239 receipt_modack=True,
1240 )
1241
1242 with self._pause_resume_lock:
1243 if self._scheduler is None or self._leaser is None:
1244 _LOGGER.debug(
1245 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
1246 )
1247 return
1248
1249 i: int = 0
1250 for received_message in received_messages:
1251 if (
1252 not self._exactly_once_delivery_enabled()
1253 or received_message.ack_id not in expired_ack_ids
1254 ):
1255 message = google.cloud.pubsub_v1.subscriber.message.Message(
1256 received_message.message,
1257 received_message.ack_id,
1258 received_message.delivery_attempt,
1259 self._scheduler.queue,
1260 self._exactly_once_delivery_enabled,
1261 )
1262 if self._client.open_telemetry_enabled:
1263 message.opentelemetry_data = subscribe_opentelemetry[i]
1264 i = i + 1
1265 self._messages_on_hold.put(message)
1266 self._on_hold_bytes += message.size
1267 req = requests.LeaseRequest(
1268 ack_id=message.ack_id,
1269 byte_size=message.size,
1270 ordering_key=message.ordering_key,
1271 opentelemetry_data=message.opentelemetry_data,
1272 )
1273 self._leaser.add([req])
1274
1275 self._maybe_release_messages()
1276
1277 self.maybe_pause_consumer()
1278
1279 def _on_fatal_exception(self, exception: BaseException) -> None:
1280 """
1281 Called whenever `self.consumer` receives a non-retryable exception.
1282 We close the manager on such non-retryable cases.
1283 """
1284 _LOGGER.info(
1285 "Streaming pull terminating after receiving non-recoverable error: %s",
1286 exception,
1287 )
1288 self.close(exception)
1289
1290 def _should_recover(self, exception: BaseException) -> bool:
1291 """Determine if an error on the RPC stream should be recovered.
1292
1293 If the exception is one of the retryable exceptions, this will signal
1294 to the consumer thread that it should "recover" from the failure.
1295
1296 This will cause the stream to exit when it returns :data:`False`.
1297
1298 Returns:
1299 Indicates if the caller should recover or shut down.
1300 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1301 in a list of retryable / idempotent exceptions.
1302 """
1303 exception = _wrap_as_exception(exception)
1304 # If this is in the list of idempotent exceptions, then we want to
1305 # recover.
1306 if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1307 _LOGGER.debug("Observed recoverable stream error %s", exception)
1308 return True
1309 _LOGGER.debug("Observed non-recoverable stream error %s", exception)
1310 return False
1311
1312 def _should_terminate(self, exception: BaseException) -> bool:
1313 """Determine if an error on the RPC stream should be terminated.
1314
1315 If the exception is one of the terminating exceptions, this will signal
1316 to the consumer thread that it should terminate.
1317
1318 This will cause the stream to exit when it returns :data:`True`.
1319
1320 Returns:
1321 Indicates if the caller should terminate or attempt recovery.
1322 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1323 in a list of terminating exceptions.
1324 """
1325 exception = _wrap_as_exception(exception)
1326 is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1327 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1328 if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1329 _LOGGER.debug("Observed terminating stream error %s", exception)
1330 return True
1331 _LOGGER.debug("Observed non-terminating stream error %s", exception)
1332 return False
1333
1334 def _on_rpc_done(self, future: Any) -> None:
1335 """Triggered whenever the underlying RPC terminates without recovery.
1336
1337 This is typically triggered from one of two threads: the background
1338 consumer thread (when calling ``recv()`` produces a non-recoverable
1339 error) or the grpc management thread (when cancelling the RPC).
1340
1341 This method is *non-blocking*. It will start another thread to deal
1342 with shutting everything down. This is to prevent blocking in the
1343 background consumer and preventing it from being ``joined()``.
1344 """
1345 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.")
1346 error = _wrap_as_exception(future)
1347 thread = threading.Thread(
1348 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
1349 )
1350 thread.daemon = True
1351 thread.start()