1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import division
16
17import collections
18import functools
19import inspect
20import itertools
21import logging
22import threading
23import typing
24from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
25import uuid
26
27from opentelemetry import trace
28import grpc # type: ignore
29
30from google.api_core import bidi
31from google.api_core import exceptions
32from google.cloud.pubsub_v1 import types
33from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
34from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
35from google.cloud.pubsub_v1.subscriber._protocol import histogram
36from google.cloud.pubsub_v1.subscriber._protocol import leaser
37from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
38from google.cloud.pubsub_v1.subscriber._protocol import requests
39from google.cloud.pubsub_v1.subscriber.exceptions import (
40 AcknowledgeError,
41 AcknowledgeStatus,
42)
43from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
44 SubscribeOpenTelemetry,
45)
46import google.cloud.pubsub_v1.subscriber.message
47from google.cloud.pubsub_v1.subscriber import futures
48from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
49from google.pubsub_v1 import types as gapic_types
50from grpc_status import rpc_status # type: ignore
51from google.rpc.error_details_pb2 import ErrorInfo # type: ignore
52from google.rpc import code_pb2 # type: ignore
53from google.rpc import status_pb2
54from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
55 start_modack_span,
56)
57
58if typing.TYPE_CHECKING: # pragma: NO COVER
59 from google.cloud.pubsub_v1 import subscriber
60
61
62_LOGGER = logging.getLogger(__name__)
63_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
64_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
65_RETRYABLE_STREAM_ERRORS = (
66 exceptions.Aborted,
67 exceptions.DeadlineExceeded,
68 exceptions.GatewayTimeout,
69 exceptions.InternalServerError,
70 exceptions.ResourceExhausted,
71 exceptions.ServiceUnavailable,
72 exceptions.Unknown,
73)
74_TERMINATING_STREAM_ERRORS = (
75 exceptions.Cancelled,
76 exceptions.InvalidArgument,
77 exceptions.NotFound,
78 exceptions.PermissionDenied,
79 exceptions.Unauthenticated,
80 exceptions.Unauthorized,
81)
82_MAX_LOAD = 1.0
83"""The load threshold above which to pause the incoming message stream."""
84
85_RESUME_THRESHOLD = 0.8
86"""The load threshold below which to resume the incoming message stream."""
87
88_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60
89"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
90a subscription. We do this to reduce premature ack expiration.
91"""
92
93_DEFAULT_STREAM_ACK_DEADLINE: float = 60
94"""The default stream ack deadline in seconds."""
95
96_MAX_STREAM_ACK_DEADLINE: float = 600
97"""The maximum stream ack deadline in seconds."""
98
99_MIN_STREAM_ACK_DEADLINE: float = 10
100"""The minimum stream ack deadline in seconds."""
101
102_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
103 code_pb2.DEADLINE_EXCEEDED,
104 code_pb2.RESOURCE_EXHAUSTED,
105 code_pb2.ABORTED,
106 code_pb2.INTERNAL,
107 code_pb2.UNAVAILABLE,
108}
109
110# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111# callers on unrecoverable errors. We can only pass this arg if it's available in the
112# `BackgroundConsumer` spec.
113_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114 bidi.BackgroundConsumer
115)
116
117
118def _wrap_as_exception(maybe_exception: Any) -> BaseException:
119 """Wrap an object as a Python exception, if needed.
120
121 Args:
122 maybe_exception: The object to wrap, usually a gRPC exception class.
123
124 Returns:
125 The argument itself if an instance of ``BaseException``, otherwise
126 the argument represented as an instance of ``Exception`` (sub)class.
127 """
128 if isinstance(maybe_exception, grpc.RpcError):
129 return exceptions.from_grpc_error(maybe_exception)
130 elif isinstance(maybe_exception, BaseException):
131 return maybe_exception
132
133 return Exception(maybe_exception)
134
135
136def _wrap_callback_errors(
137 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
138 on_callback_error: Callable[[BaseException], Any],
139 message: "google.cloud.pubsub_v1.subscriber.message.Message",
140):
141 """Wraps a user callback so that if an exception occurs the message is
142 nacked.
143
144 Args:
145 callback: The user callback.
146 message: The Pub/Sub message.
147 """
148 try:
149 if message.opentelemetry_data:
150 message.opentelemetry_data.end_subscribe_concurrency_control_span()
151 message.opentelemetry_data.start_process_span()
152 callback(message)
153 except BaseException as exc:
154 # Note: the likelihood of this failing is extremely low. This just adds
155 # a message to a queue, so if this doesn't work the world is in an
156 # unrecoverable state and this thread should just bail.
157 _LOGGER.exception(
158 "Top-level exception occurred in callback while processing a message"
159 )
160 message.nack()
161 on_callback_error(exc)
162
163
164def _get_status(
165 exc: exceptions.GoogleAPICallError,
166) -> Optional["status_pb2.Status"]:
167 if not exc.response:
168 _LOGGER.debug("No response obj in errored RPC call.")
169 return None
170 try:
171 return rpc_status.from_call(exc.response)
172 # Possible "If the gRPC call’s code or details are inconsistent
173 # with the status code and message inside of the
174 # google.rpc.status.Status"
175 except ValueError:
176 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True)
177 return None
178
179
180def _get_ack_errors(
181 exc: exceptions.GoogleAPICallError,
182) -> Optional[Dict[str, str]]:
183 status = _get_status(exc)
184 if not status:
185 _LOGGER.debug("Unable to get status of errored RPC.")
186 return None
187 for detail in status.details:
188 info = ErrorInfo()
189 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)):
190 _LOGGER.debug("Unable to unpack ErrorInfo.")
191 return None
192 return info.metadata
193 return None
194
195
196def _process_requests(
197 error_status: Optional["status_pb2.Status"],
198 ack_reqs_dict: Dict[str, requests.AckRequest],
199 errors_dict: Optional[Dict[str, str]],
200):
201 """Process requests when exactly-once delivery is enabled by referring to
202 error_status and errors_dict.
203
204 The errors returned by the server in as `error_status` or in `errors_dict`
205 are used to complete the request futures in `ack_reqs_dict` (with a success
206 or exception) or to return requests for further retries.
207 """
208 requests_completed = []
209 requests_to_retry = []
210 for ack_id in ack_reqs_dict:
211 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
212 # sidecar metadata when exactly-once delivery is enabled.
213 if errors_dict and ack_id in errors_dict:
214 exactly_once_error = errors_dict[ack_id]
215 if exactly_once_error.startswith("TRANSIENT_"):
216 requests_to_retry.append(ack_reqs_dict[ack_id])
217 else:
218 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
219 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
220 else:
221 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
222 future = ack_reqs_dict[ack_id].future
223 if future is not None:
224 future.set_exception(exc)
225 requests_completed.append(ack_reqs_dict[ack_id])
226 # Temporary GRPC errors are retried
227 elif (
228 error_status
229 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
230 ):
231 requests_to_retry.append(ack_reqs_dict[ack_id])
232 # Other GRPC errors are NOT retried
233 elif error_status:
234 if error_status.code == code_pb2.PERMISSION_DENIED:
235 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
236 elif error_status.code == code_pb2.FAILED_PRECONDITION:
237 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
238 else:
239 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
240 future = ack_reqs_dict[ack_id].future
241 if future is not None:
242 future.set_exception(exc)
243 requests_completed.append(ack_reqs_dict[ack_id])
244 # Since no error occurred, requests with futures are completed successfully.
245 elif ack_reqs_dict[ack_id].future:
246 future = ack_reqs_dict[ack_id].future
247 # success
248 assert future is not None
249 future.set_result(AcknowledgeStatus.SUCCESS)
250 requests_completed.append(ack_reqs_dict[ack_id])
251 # All other requests are considered completed.
252 else:
253 requests_completed.append(ack_reqs_dict[ack_id])
254
255 return requests_completed, requests_to_retry
256
257
258class StreamingPullManager(object):
259 """The streaming pull manager coordinates pulling messages from Pub/Sub,
260 leasing them, and scheduling them to be processed.
261
262 Args:
263 client:
264 The subscriber client used to create this instance.
265 subscription:
266 The name of the subscription. The canonical format for this is
267 ``projects/{project}/subscriptions/{subscription}``.
268 flow_control:
269 The flow control settings.
270 scheduler:
271 The scheduler to use to process messages. If not provided, a thread
272 pool-based scheduler will be used.
273 use_legacy_flow_control:
274 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled,
275 though client-side flow control is still enabled. If set to ``False``
276 (default), both server-side and client-side flow control are enabled.
277 await_callbacks_on_shutdown:
278 If ``True``, the shutdown thread will wait until all scheduler threads
279 terminate and only then proceed with shutting down the remaining running
280 helper threads.
281
282 If ``False`` (default), the shutdown thread will shut the scheduler down,
283 but it will not wait for the currently executing scheduler threads to
284 terminate.
285
286 This setting affects when the on close callbacks get invoked, and
287 consequently, when the StreamingPullFuture associated with the stream gets
288 resolved.
289 """
290
291 def __init__(
292 self,
293 client: "subscriber.Client",
294 subscription: str,
295 flow_control: types.FlowControl = types.FlowControl(),
296 scheduler: Optional[ThreadScheduler] = None,
297 use_legacy_flow_control: bool = False,
298 await_callbacks_on_shutdown: bool = False,
299 ):
300 self._client = client
301 self._subscription = subscription
302 self._exactly_once_enabled = False
303 self._flow_control = flow_control
304 self._use_legacy_flow_control = use_legacy_flow_control
305 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
306 self._ack_histogram = histogram.Histogram()
307 self._last_histogram_size = 0
308 self._stream_metadata = [
309 ["x-goog-request-params", "subscription=" + subscription]
310 ]
311
312 # If max_duration_per_lease_extension is the default
313 # we set the stream_ack_deadline to the default of 60
314 if self._flow_control.max_duration_per_lease_extension == 0:
315 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
316 # We will not be able to extend more than the default minimum
317 elif (
318 self._flow_control.max_duration_per_lease_extension
319 < _MIN_STREAM_ACK_DEADLINE
320 ):
321 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
322 # Will not be able to extend past the max
323 elif (
324 self._flow_control.max_duration_per_lease_extension
325 > _MAX_STREAM_ACK_DEADLINE
326 ):
327 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
328 else:
329 self._stream_ack_deadline = (
330 self._flow_control.max_duration_per_lease_extension
331 )
332
333 self._ack_deadline = max(
334 min(
335 self._flow_control.min_duration_per_lease_extension,
336 histogram.MAX_ACK_DEADLINE,
337 ),
338 histogram.MIN_ACK_DEADLINE,
339 )
340
341 self._rpc: Optional[bidi.ResumableBidiRpc] = None
342 self._callback: Optional[functools.partial] = None
343 self._closing = threading.Lock()
344 self._closed = False
345 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = []
346 # Guarded by self._exactly_once_enabled_lock
347 self._send_new_ack_deadline = False
348
349 # A shutdown thread is created on intentional shutdown.
350 self._regular_shutdown_thread: Optional[threading.Thread] = None
351
352 # Generate a random client id tied to this object. All streaming pull
353 # connections (initial and re-connects) will then use the same client
354 # id. Doing so lets the server establish affinity even across stream
355 # disconncetions.
356 self._client_id = str(uuid.uuid4())
357
358 if scheduler is None:
359 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler()
360 else:
361 self._scheduler = scheduler
362
363 # A collection for the messages that have been received from the server,
364 # but not yet sent to the user callback.
365 self._messages_on_hold = messages_on_hold.MessagesOnHold()
366
367 # The total number of bytes consumed by the messages currently on hold
368 self._on_hold_bytes = 0
369
370 # A lock ensuring that pausing / resuming the consumer are both atomic
371 # operations that cannot be executed concurrently. Needed for properly
372 # syncing these operations with the current leaser load. Additionally,
373 # the lock is used to protect modifications of internal data that
374 # affects the load computation, i.e. the count and size of the messages
375 # currently on hold.
376 self._pause_resume_lock = threading.Lock()
377
378 # A lock guarding the self._exactly_once_enabled variable. We may also
379 # acquire the self._ack_deadline_lock while this lock is held, but not
380 # the reverse. So, we maintain a simple ordering of these two locks to
381 # prevent deadlocks.
382 self._exactly_once_enabled_lock = threading.Lock()
383
384 # A lock protecting the current ACK deadline used in the lease management. This
385 # value can be potentially updated both by the leaser thread and by the message
386 # consumer thread when invoking the internal _on_response() callback.
387 self._ack_deadline_lock = threading.Lock()
388
389 # The threads created in ``.open()``.
390 self._dispatcher: Optional[dispatcher.Dispatcher] = None
391 self._leaser: Optional[leaser.Leaser] = None
392 self._consumer: Optional[bidi.BackgroundConsumer] = None
393 self._heartbeater: Optional[heartbeater.Heartbeater] = None
394
395 @property
396 def is_active(self) -> bool:
397 """``True`` if this manager is actively streaming.
398
399 Note that ``False`` does not indicate this is complete shut down,
400 just that it stopped getting new messages.
401 """
402 return self._consumer is not None and self._consumer.is_active
403
404 @property
405 def flow_control(self) -> types.FlowControl:
406 """The active flow control settings."""
407 return self._flow_control
408
409 @property
410 def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
411 """The dispatcher helper."""
412 return self._dispatcher
413
414 @property
415 def leaser(self) -> Optional[leaser.Leaser]:
416 """The leaser helper."""
417 return self._leaser
418
419 @property
420 def ack_histogram(self) -> histogram.Histogram:
421 """The histogram tracking time-to-acknowledge."""
422 return self._ack_histogram
423
424 @property
425 def ack_deadline(self) -> float:
426 """Return the current ACK deadline based on historical data without updating it.
427
428 Returns:
429 The ack deadline.
430 """
431 return self._obtain_ack_deadline(maybe_update=False)
432
433 def _obtain_ack_deadline(self, maybe_update: bool) -> float:
434 """The actual `ack_deadline` implementation.
435
436 This method is "sticky". It will only perform the computations to check on the
437 right ACK deadline if explicitly requested AND if the histogram with past
438 time-to-ack data has gained a significant amount of new information.
439
440 Args:
441 maybe_update:
442 If ``True``, also update the current ACK deadline before returning it if
443 enough new ACK data has been gathered.
444
445 Returns:
446 The current ACK deadline in seconds to use.
447 """
448 with self._ack_deadline_lock:
449 if not maybe_update:
450 return self._ack_deadline
451
452 target_size = min(
453 self._last_histogram_size * 2, self._last_histogram_size + 100
454 )
455 hist_size = len(self.ack_histogram)
456
457 if hist_size > target_size:
458 self._last_histogram_size = hist_size
459 self._ack_deadline = self.ack_histogram.percentile(percent=99)
460
461 if self.flow_control.max_duration_per_lease_extension > 0:
462 # The setting in flow control could be too low, adjust if needed.
463 flow_control_setting = max(
464 self.flow_control.max_duration_per_lease_extension,
465 histogram.MIN_ACK_DEADLINE,
466 )
467 self._ack_deadline = min(self._ack_deadline, flow_control_setting)
468
469 # If the user explicitly sets a min ack_deadline, respect it.
470 if self.flow_control.min_duration_per_lease_extension > 0:
471 # The setting in flow control could be too high, adjust if needed.
472 flow_control_setting = min(
473 self.flow_control.min_duration_per_lease_extension,
474 histogram.MAX_ACK_DEADLINE,
475 )
476 self._ack_deadline = max(self._ack_deadline, flow_control_setting)
477 elif self._exactly_once_enabled:
478 # Higher minimum ack_deadline for subscriptions with
479 # exactly-once delivery enabled.
480 self._ack_deadline = max(
481 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
482 )
483 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline
484 # set the stream_ack_deadline to the new ack_deadline.
485 if self._ack_deadline > self._stream_ack_deadline:
486 self._stream_ack_deadline = self._ack_deadline
487 return self._ack_deadline
488
489 @property
490 def load(self) -> float:
491 """Return the current load.
492
493 The load is represented as a float, where 1.0 represents having
494 hit one of the flow control limits, and values between 0.0 and 1.0
495 represent how close we are to them. (0.5 means we have exactly half
496 of what the flow control setting allows, for example.)
497
498 There are (currently) two flow control settings; this property
499 computes how close the manager is to each of them, and returns
500 whichever value is higher. (It does not matter that we have lots of
501 running room on setting A if setting B is over.)
502
503 Returns:
504 The load value.
505 """
506 if self._leaser is None:
507 return 0.0
508
509 # Messages that are temporarily put on hold are not being delivered to
510 # user's callbacks, thus they should not contribute to the flow control
511 # load calculation.
512 # However, since these messages must still be lease-managed to avoid
513 # unnecessary ACK deadline expirations, their count and total size must
514 # be subtracted from the leaser's values.
515 return max(
516 [
517 (self._leaser.message_count - self._messages_on_hold.size)
518 / self._flow_control.max_messages,
519 (self._leaser.bytes - self._on_hold_bytes)
520 / self._flow_control.max_bytes,
521 ]
522 )
523
524 def add_close_callback(
525 self, callback: Callable[["StreamingPullManager", Any], Any]
526 ) -> None:
527 """Schedules a callable when the manager closes.
528
529 Args:
530 The method to call.
531 """
532 self._close_callbacks.append(callback)
533
534 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None:
535 """Send the next message in the queue for each of the passed-in
536 ordering keys, if they exist. Clean up state for keys that no longer
537 have any queued messages.
538
539 Since the load went down by one message, it's probably safe to send the
540 user another message for the same key. Since the released message may be
541 bigger than the previous one, this may increase the load above the maximum.
542 This decision is by design because it simplifies MessagesOnHold.
543
544 Args:
545 ordering_keys:
546 A sequence of ordering keys to activate. May be empty.
547 """
548 with self._pause_resume_lock:
549 if self._scheduler is None:
550 return # We are shutting down, don't try to dispatch any more messages.
551
552 self._messages_on_hold.activate_ordering_keys(
553 ordering_keys, self._schedule_message_on_hold
554 )
555
556 def maybe_pause_consumer(self) -> None:
557 """Check the current load and pause the consumer if needed."""
558 with self._pause_resume_lock:
559 if self.load >= _MAX_LOAD:
560 if self._consumer is not None and not self._consumer.is_paused:
561 _LOGGER.debug(
562 "Message backlog over load at %.2f, pausing.", self.load
563 )
564 self._consumer.pause()
565
566 def maybe_resume_consumer(self) -> None:
567 """Check the load and held messages and resume the consumer if needed.
568
569 If there are messages held internally, release those messages before
570 resuming the consumer. That will avoid leaser overload.
571 """
572 with self._pause_resume_lock:
573 # If we have been paused by flow control, check and see if we are
574 # back within our limits.
575 #
576 # In order to not thrash too much, require us to have passed below
577 # the resume threshold (80% by default) of each flow control setting
578 # before restarting.
579 if self._consumer is None or not self._consumer.is_paused:
580 return
581
582 _LOGGER.debug("Current load: %.2f", self.load)
583
584 # Before maybe resuming the background consumer, release any messages
585 # currently on hold, if the current load allows for it.
586 self._maybe_release_messages()
587
588 if self.load < _RESUME_THRESHOLD:
589 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
590 self._consumer.resume()
591 else:
592 _LOGGER.debug("Did not resume, current load is %.2f.", self.load)
593
594 def _maybe_release_messages(self) -> None:
595 """Release (some of) the held messages if the current load allows for it.
596
597 The method tries to release as many messages as the current leaser load
598 would allow. Each released message is added to the lease management,
599 and the user callback is scheduled for it.
600
601 If there are currently no messages on hold, or if the leaser is
602 already overloaded, this method is effectively a no-op.
603
604 The method assumes the caller has acquired the ``_pause_resume_lock``.
605 """
606 released_ack_ids = []
607 while self.load < _MAX_LOAD:
608 msg = self._messages_on_hold.get()
609 if not msg:
610 break
611 if msg.opentelemetry_data:
612 msg.opentelemetry_data.end_subscribe_scheduler_span()
613 self._schedule_message_on_hold(msg)
614 released_ack_ids.append(msg.ack_id)
615
616 assert self._leaser is not None
617 self._leaser.start_lease_expiry_timer(released_ack_ids)
618
619 def _schedule_message_on_hold(
620 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"
621 ):
622 """Schedule a message on hold to be sent to the user and change on-hold-bytes.
623
624 The method assumes the caller has acquired the ``_pause_resume_lock``.
625
626 Args:
627 msg: The message to schedule to be sent to the user.
628 """
629 assert msg, "Message must not be None."
630
631 # On-hold bytes goes down, increasing load.
632 self._on_hold_bytes -= msg.size
633
634 if self._on_hold_bytes < 0:
635 _LOGGER.warning(
636 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
637 )
638 self._on_hold_bytes = 0
639
640 _LOGGER.debug(
641 "Released held message, scheduling callback for it, "
642 "still on hold %s (bytes %s).",
643 self._messages_on_hold.size,
644 self._on_hold_bytes,
645 )
646 assert self._scheduler is not None
647 assert self._callback is not None
648 if msg.opentelemetry_data:
649 msg.opentelemetry_data.start_subscribe_concurrency_control_span()
650 self._scheduler.schedule(self._callback, msg)
651
652 def send_unary_ack(
653 self, ack_ids, ack_reqs_dict
654 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]:
655 """Send a request using a separate unary request instead of over the stream.
656
657 If a RetryError occurs, the manager shutdown is triggered, and the
658 error is re-raised.
659 """
660 assert ack_ids
661 assert len(ack_ids) == len(ack_reqs_dict)
662
663 error_status = None
664 ack_errors_dict = None
665 try:
666 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)
667 except exceptions.GoogleAPICallError as exc:
668 _LOGGER.debug(
669 "Exception while sending unary RPC. This is typically "
670 "non-fatal as stream requests are best-effort.",
671 exc_info=True,
672 )
673 error_status = _get_status(exc)
674 ack_errors_dict = _get_ack_errors(exc)
675 except exceptions.RetryError as exc:
676 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
677 # Makes sure to complete futures so they don't block forever.
678 for req in ack_reqs_dict.values():
679 # Futures may be present even with exactly-once delivery
680 # disabled, in transition periods after the setting is changed on
681 # the subscription.
682 if req.future:
683 if exactly_once_delivery_enabled:
684 e = AcknowledgeError(
685 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC."
686 )
687 req.future.set_exception(e)
688 else:
689 req.future.set_result(AcknowledgeStatus.SUCCESS)
690
691 _LOGGER.debug(
692 "RetryError while sending ack RPC. Waiting on a transient "
693 "error resolution for too long, will now trigger shutdown.",
694 exc_info=False,
695 )
696 # The underlying channel has been suffering from a retryable error
697 # for too long, time to give up and shut the streaming pull down.
698 self._on_rpc_done(exc)
699 raise
700
701 if self._exactly_once_delivery_enabled():
702 requests_completed, requests_to_retry = _process_requests(
703 error_status, ack_reqs_dict, ack_errors_dict
704 )
705 else:
706 requests_completed = []
707 requests_to_retry = []
708 # When exactly-once delivery is NOT enabled, acks/modacks are considered
709 # best-effort. So, they always succeed even if the RPC fails.
710 for req in ack_reqs_dict.values():
711 # Futures may be present even with exactly-once delivery
712 # disabled, in transition periods after the setting is changed on
713 # the subscription.
714 if req.future:
715 req.future.set_result(AcknowledgeStatus.SUCCESS)
716 requests_completed.append(req)
717
718 return requests_completed, requests_to_retry
719
720 def send_unary_modack(
721 self,
722 modify_deadline_ack_ids,
723 modify_deadline_seconds,
724 ack_reqs_dict,
725 default_deadline=None,
726 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
727 """Send a request using a separate unary request instead of over the stream.
728
729 If a RetryError occurs, the manager shutdown is triggered, and the
730 error is re-raised.
731 """
732 assert modify_deadline_ack_ids
733 # Either we have a generator or a single deadline.
734 assert modify_deadline_seconds is None or default_deadline is None
735
736 error_status = None
737 modack_errors_dict = None
738 try:
739 if default_deadline is None:
740 # Send ack_ids with the same deadline seconds together.
741 deadline_to_ack_ids = collections.defaultdict(list)
742
743 for n, ack_id in enumerate(modify_deadline_ack_ids):
744 deadline = modify_deadline_seconds[n]
745 deadline_to_ack_ids[deadline].append(ack_id)
746
747 for deadline, ack_ids in deadline_to_ack_ids.items():
748 self._client.modify_ack_deadline(
749 subscription=self._subscription,
750 ack_ids=ack_ids,
751 ack_deadline_seconds=deadline,
752 )
753 else:
754 # We can send all requests with the default deadline.
755 self._client.modify_ack_deadline(
756 subscription=self._subscription,
757 ack_ids=modify_deadline_ack_ids,
758 ack_deadline_seconds=default_deadline,
759 )
760 except exceptions.GoogleAPICallError as exc:
761 _LOGGER.debug(
762 "Exception while sending unary RPC. This is typically "
763 "non-fatal as stream requests are best-effort.",
764 exc_info=True,
765 )
766 error_status = _get_status(exc)
767 modack_errors_dict = _get_ack_errors(exc)
768 except exceptions.RetryError as exc:
769 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled()
770 # Makes sure to complete futures so they don't block forever.
771 for req in ack_reqs_dict.values():
772 # Futures may be present even with exactly-once delivery
773 # disabled, in transition periods after the setting is changed on
774 # the subscription.
775 if req.future:
776 if exactly_once_delivery_enabled:
777 e = AcknowledgeError(
778 AcknowledgeStatus.OTHER,
779 "RetryError while sending modack RPC.",
780 )
781 req.future.set_exception(e)
782 else:
783 req.future.set_result(AcknowledgeStatus.SUCCESS)
784
785 _LOGGER.debug(
786 "RetryError while sending modack RPC. Waiting on a transient "
787 "error resolution for too long, will now trigger shutdown.",
788 exc_info=False,
789 )
790 # The underlying channel has been suffering from a retryable error
791 # for too long, time to give up and shut the streaming pull down.
792 self._on_rpc_done(exc)
793 raise
794
795 if self._exactly_once_delivery_enabled():
796 requests_completed, requests_to_retry = _process_requests(
797 error_status, ack_reqs_dict, modack_errors_dict
798 )
799 else:
800 requests_completed = []
801 requests_to_retry = []
802 # When exactly-once delivery is NOT enabled, acks/modacks are considered
803 # best-effort. So, they always succeed even if the RPC fails.
804 for req in ack_reqs_dict.values():
805 # Futures may be present even with exactly-once delivery
806 # disabled, in transition periods after the setting is changed on
807 # the subscription.
808 if req.future:
809 req.future.set_result(AcknowledgeStatus.SUCCESS)
810 requests_completed.append(req)
811
812 return requests_completed, requests_to_retry
813
814 def heartbeat(self) -> bool:
815 """Sends a heartbeat request over the streaming pull RPC.
816
817 The request is empty by default, but may contain the current ack_deadline
818 if the self._exactly_once_enabled flag has changed.
819
820 Returns:
821 If a heartbeat request has actually been sent.
822 """
823 if self._rpc is not None and self._rpc.is_active:
824 send_new_ack_deadline = False
825 with self._exactly_once_enabled_lock:
826 send_new_ack_deadline = self._send_new_ack_deadline
827 self._send_new_ack_deadline = False
828
829 if send_new_ack_deadline:
830 request = gapic_types.StreamingPullRequest(
831 stream_ack_deadline_seconds=self._stream_ack_deadline
832 )
833 _LOGGER.debug(
834 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
835 )
836 else:
837 request = gapic_types.StreamingPullRequest()
838
839 self._rpc.send(request)
840 return True
841
842 return False
843
844 def open(
845 self,
846 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any],
847 on_callback_error: Callable[[Exception], Any],
848 ) -> None:
849 """Begin consuming messages.
850
851 Args:
852 callback:
853 A callback that will be called for each message received on the
854 stream.
855 on_callback_error:
856 A callable that will be called if an exception is raised in
857 the provided `callback`.
858 """
859 if self.is_active:
860 raise ValueError("This manager is already open.")
861
862 if self._closed:
863 raise ValueError("This manager has been closed and can not be re-used.")
864
865 self._callback = functools.partial(
866 _wrap_callback_errors, callback, on_callback_error
867 )
868
869 # Create the RPC
870 stream_ack_deadline_seconds = self._stream_ack_deadline
871
872 get_initial_request = functools.partial(
873 self._get_initial_request, stream_ack_deadline_seconds
874 )
875 self._rpc = bidi.ResumableBidiRpc(
876 start_rpc=self._client.streaming_pull,
877 initial_request=get_initial_request,
878 should_recover=self._should_recover,
879 should_terminate=self._should_terminate,
880 metadata=self._stream_metadata,
881 throttle_reopen=True,
882 )
883 self._rpc.add_done_callback(self._on_rpc_done)
884
885 _LOGGER.debug(
886 "Creating a stream, default ACK deadline set to {} seconds.".format(
887 self._stream_ack_deadline
888 )
889 )
890
891 # Create references to threads
892 assert self._scheduler is not None
893 scheduler_queue = self._scheduler.queue
894 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
895
896 # `on_fatal_exception` is only available in more recent library versions.
897 # For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898 if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899 self._consumer = bidi.BackgroundConsumer(
900 self._rpc,
901 self._on_response,
902 on_fatal_exception=self._on_fatal_exception,
903 )
904 else:
905 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
906
907 self._leaser = leaser.Leaser(self)
908 self._heartbeater = heartbeater.Heartbeater(self)
909
910 # Start the thread to pass the requests.
911 self._dispatcher.start()
912
913 # Start consuming messages.
914 self._consumer.start()
915
916 # Start the lease maintainer thread.
917 self._leaser.start()
918
919 # Start the stream heartbeater thread.
920 self._heartbeater.start()
921
922 def close(self, reason: Any = None) -> None:
923 """Stop consuming messages and shutdown all helper threads.
924
925 This method is idempotent. Additional calls will have no effect.
926
927 The method does not block, it delegates the shutdown operations to a background
928 thread.
929
930 Args:
931 reason:
932 The reason to close this. If ``None``, this is considered
933 an "intentional" shutdown. This is passed to the callbacks
934 specified via :meth:`add_close_callback`.
935 """
936 self._regular_shutdown_thread = threading.Thread(
937 name=_REGULAR_SHUTDOWN_THREAD_NAME,
938 daemon=True,
939 target=self._shutdown,
940 kwargs={"reason": reason},
941 )
942 self._regular_shutdown_thread.start()
943
944 def _shutdown(self, reason: Any = None) -> None:
945 """Run the actual shutdown sequence (stop the stream and all helper threads).
946
947 Args:
948 reason:
949 The reason to close the stream. If ``None``, this is considered
950 an "intentional" shutdown.
951 """
952 with self._closing:
953 if self._closed:
954 return
955
956 # Stop consuming messages.
957 if self.is_active:
958 _LOGGER.debug("Stopping consumer.")
959 assert self._consumer is not None
960 self._consumer.stop()
961 self._consumer = None
962
963 # Shutdown all helper threads
964 _LOGGER.debug("Stopping scheduler.")
965 assert self._scheduler is not None
966 dropped_messages = self._scheduler.shutdown(
967 await_msg_callbacks=self._await_callbacks_on_shutdown
968 )
969 self._scheduler = None
970
971 # Leaser and dispatcher reference each other through the shared
972 # StreamingPullManager instance, i.e. "self", thus do not set their
973 # references to None until both have been shut down.
974 #
975 # NOTE: Even if the dispatcher operates on an inactive leaser using
976 # the latter's add() and remove() methods, these have no impact on
977 # the stopped leaser (the leaser is never again re-started). Ditto
978 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
979 # because the consumer gets shut down first.
980 _LOGGER.debug("Stopping leaser.")
981 assert self._leaser is not None
982 self._leaser.stop()
983
984 total = len(dropped_messages) + len(
985 self._messages_on_hold._messages_on_hold
986 )
987 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).")
988 messages_to_nack = itertools.chain(
989 dropped_messages, self._messages_on_hold._messages_on_hold
990 )
991 for msg in messages_to_nack:
992 msg.nack()
993
994 _LOGGER.debug("Stopping dispatcher.")
995 assert self._dispatcher is not None
996 self._dispatcher.stop()
997 self._dispatcher = None
998 # dispatcher terminated, OK to dispose the leaser reference now
999 self._leaser = None
1000
1001 _LOGGER.debug("Stopping heartbeater.")
1002 assert self._heartbeater is not None
1003 self._heartbeater.stop()
1004 self._heartbeater = None
1005
1006 self._rpc = None
1007 self._closed = True
1008 _LOGGER.debug("Finished stopping manager.")
1009
1010 for callback in self._close_callbacks:
1011 callback(self, reason)
1012
1013 def _get_initial_request(
1014 self, stream_ack_deadline_seconds: int
1015 ) -> gapic_types.StreamingPullRequest:
1016 """Return the initial request for the RPC.
1017
1018 This defines the initial request that must always be sent to Pub/Sub
1019 immediately upon opening the subscription.
1020
1021 Args:
1022 stream_ack_deadline_seconds:
1023 The default message acknowledge deadline for the stream.
1024
1025 Returns:
1026 A request suitable for being the first request on the stream (and not
1027 suitable for any other purpose).
1028 """
1029 # Put the request together.
1030 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
1031 # anyway. Set to some big-ish value in case we modack late.
1032 request = gapic_types.StreamingPullRequest(
1033 stream_ack_deadline_seconds=stream_ack_deadline_seconds,
1034 modify_deadline_ack_ids=[],
1035 modify_deadline_seconds=[],
1036 subscription=self._subscription,
1037 client_id=self._client_id,
1038 max_outstanding_messages=(
1039 0 if self._use_legacy_flow_control else self._flow_control.max_messages
1040 ),
1041 max_outstanding_bytes=(
1042 0 if self._use_legacy_flow_control else self._flow_control.max_bytes
1043 ),
1044 )
1045
1046 # Return the initial request.
1047 return request
1048
1049 def _send_lease_modacks(
1050 self,
1051 ack_ids: Iterable[str],
1052 ack_deadline: float,
1053 opentelemetry_data: List[SubscribeOpenTelemetry],
1054 warn_on_invalid=True,
1055 receipt_modack: bool = False,
1056 ) -> Set[str]:
1057 exactly_once_enabled = False
1058
1059 modack_span: Optional[trace.Span] = None
1060 if self._client.open_telemetry_enabled:
1061 subscribe_span_links: List[trace.Link] = []
1062 subscribe_spans: List[trace.Span] = []
1063 subscription_split: List[str] = self._subscription.split("/")
1064 assert len(subscription_split) == 4
1065 subscription_id: str = subscription_split[3]
1066 project_id: str = subscription_split[1]
1067 for data in opentelemetry_data:
1068 subscribe_span: Optional[trace.Span] = data.subscribe_span
1069 if (
1070 subscribe_span
1071 and subscribe_span.get_span_context().trace_flags.sampled
1072 ):
1073 subscribe_span_links.append(
1074 trace.Link(subscribe_span.get_span_context())
1075 )
1076 subscribe_spans.append(subscribe_span)
1077 modack_span = start_modack_span(
1078 subscribe_span_links,
1079 subscription_id,
1080 len(opentelemetry_data),
1081 ack_deadline,
1082 project_id,
1083 "_send_lease_modacks",
1084 receipt_modack,
1085 )
1086 if (
1087 modack_span and modack_span.get_span_context().trace_flags.sampled
1088 ): # pragma: NO COVER
1089 modack_span_context: trace.SpanContext = modack_span.get_span_context()
1090 for subscribe_span in subscribe_spans:
1091 subscribe_span.add_link(
1092 context=modack_span_context,
1093 attributes={
1094 "messaging.operation.name": "modack",
1095 },
1096 )
1097
1098 with self._exactly_once_enabled_lock:
1099 exactly_once_enabled = self._exactly_once_enabled
1100 if exactly_once_enabled:
1101 eod_items: List[requests.ModAckRequest] = []
1102 if self._client.open_telemetry_enabled:
1103 for ack_id, data in zip(
1104 ack_ids, opentelemetry_data
1105 ): # pragma: NO COVER # Identical code covered in the same function below
1106 assert data is not None
1107 eod_items.append(
1108 requests.ModAckRequest(
1109 ack_id,
1110 ack_deadline,
1111 futures.Future(),
1112 data,
1113 )
1114 )
1115 else:
1116 eod_items = [
1117 requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
1118 for ack_id in ack_ids
1119 ]
1120
1121 assert self._dispatcher is not None
1122 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline)
1123 if (
1124 modack_span
1125 ): # pragma: NO COVER # Identical code covered in the same function below
1126 modack_span.end()
1127 expired_ack_ids = set()
1128 for req in eod_items:
1129 try:
1130 assert req.future is not None
1131 req.future.result()
1132 except AcknowledgeError as ack_error:
1133 if (
1134 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID
1135 or warn_on_invalid
1136 ):
1137 _LOGGER.warning(
1138 "AcknowledgeError when lease-modacking a message.",
1139 exc_info=True,
1140 )
1141 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
1142 expired_ack_ids.add(req.ack_id)
1143 return expired_ack_ids
1144 else:
1145 items: List[requests.ModAckRequest] = []
1146 if self._client.open_telemetry_enabled:
1147 for ack_id, data in zip(ack_ids, opentelemetry_data):
1148 assert data is not None
1149 items.append(
1150 requests.ModAckRequest(
1151 ack_id,
1152 self.ack_deadline,
1153 None,
1154 data,
1155 )
1156 )
1157 else:
1158 items = [
1159 requests.ModAckRequest(ack_id, self.ack_deadline, None)
1160 for ack_id in ack_ids
1161 ]
1162 assert self._dispatcher is not None
1163 self._dispatcher.modify_ack_deadline(items, ack_deadline)
1164 if modack_span:
1165 modack_span.end()
1166 return set()
1167
1168 def _exactly_once_delivery_enabled(self) -> bool:
1169 """Whether exactly-once delivery is enabled for the subscription."""
1170 with self._exactly_once_enabled_lock:
1171 return self._exactly_once_enabled
1172
1173 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
1174 """Process all received Pub/Sub messages.
1175
1176 For each message, send a modified acknowledgment request to the
1177 server. This prevents expiration of the message due to buffering by
1178 gRPC or proxy/firewall. This makes the server and client expiration
1179 timer closer to each other thus preventing the message being
1180 redelivered multiple times.
1181
1182 After the messages have all had their ack deadline updated, execute
1183 the callback for each message using the executor.
1184 """
1185 if response is None:
1186 _LOGGER.debug(
1187 "Response callback invoked with None, likely due to a "
1188 "transport shutdown."
1189 )
1190 return
1191
1192 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
1193 # protobuf message to significantly gain on attribute access performance.
1194 received_messages = response._pb.received_messages
1195
1196 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = []
1197 if self._client.open_telemetry_enabled:
1198 for received_message in received_messages:
1199 opentelemetry_data = SubscribeOpenTelemetry(received_message.message)
1200 opentelemetry_data.start_subscribe_span(
1201 self._subscription,
1202 response.subscription_properties.exactly_once_delivery_enabled,
1203 received_message.ack_id,
1204 received_message.delivery_attempt,
1205 )
1206 subscribe_opentelemetry.append(opentelemetry_data)
1207
1208 _LOGGER.debug(
1209 "Processing %s received message(s), currently on hold %s (bytes %s).",
1210 len(received_messages),
1211 self._messages_on_hold.size,
1212 self._on_hold_bytes,
1213 )
1214
1215 with self._exactly_once_enabled_lock:
1216 if (
1217 response.subscription_properties.exactly_once_delivery_enabled
1218 != self._exactly_once_enabled
1219 ):
1220 self._exactly_once_enabled = (
1221 response.subscription_properties.exactly_once_delivery_enabled
1222 )
1223 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled
1224 # This method acquires the self._ack_deadline_lock lock.
1225 self._obtain_ack_deadline(maybe_update=True)
1226 self._send_new_ack_deadline = True
1227
1228 # Immediately (i.e. without waiting for the auto lease management)
1229 # modack the messages we received, as this tells the server that we've
1230 # received them.
1231 ack_id_gen = (message.ack_id for message in received_messages)
1232 expired_ack_ids = self._send_lease_modacks(
1233 ack_id_gen,
1234 self.ack_deadline,
1235 subscribe_opentelemetry,
1236 warn_on_invalid=False,
1237 receipt_modack=True,
1238 )
1239
1240 with self._pause_resume_lock:
1241 if self._scheduler is None or self._leaser is None:
1242 _LOGGER.debug(
1243 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
1244 )
1245 return
1246
1247 i: int = 0
1248 for received_message in received_messages:
1249 if (
1250 not self._exactly_once_delivery_enabled()
1251 or received_message.ack_id not in expired_ack_ids
1252 ):
1253 message = google.cloud.pubsub_v1.subscriber.message.Message(
1254 received_message.message,
1255 received_message.ack_id,
1256 received_message.delivery_attempt,
1257 self._scheduler.queue,
1258 self._exactly_once_delivery_enabled,
1259 )
1260 if self._client.open_telemetry_enabled:
1261 message.opentelemetry_data = subscribe_opentelemetry[i]
1262 i = i + 1
1263 self._messages_on_hold.put(message)
1264 self._on_hold_bytes += message.size
1265 req = requests.LeaseRequest(
1266 ack_id=message.ack_id,
1267 byte_size=message.size,
1268 ordering_key=message.ordering_key,
1269 opentelemetry_data=message.opentelemetry_data,
1270 )
1271 self._leaser.add([req])
1272
1273 self._maybe_release_messages()
1274
1275 self.maybe_pause_consumer()
1276
1277 def _on_fatal_exception(self, exception: BaseException) -> None:
1278 """
1279 Called whenever `self.consumer` receives a non-retryable exception.
1280 We close the manager on such non-retryable cases.
1281 """
1282 _LOGGER.exception(
1283 "Streaming pull terminating after receiving non-recoverable error: %s",
1284 exception,
1285 )
1286 self.close(exception)
1287
1288 def _should_recover(self, exception: BaseException) -> bool:
1289 """Determine if an error on the RPC stream should be recovered.
1290
1291 If the exception is one of the retryable exceptions, this will signal
1292 to the consumer thread that it should "recover" from the failure.
1293
1294 This will cause the stream to exit when it returns :data:`False`.
1295
1296 Returns:
1297 Indicates if the caller should recover or shut down.
1298 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1299 in a list of retryable / idempotent exceptions.
1300 """
1301 exception = _wrap_as_exception(exception)
1302 # If this is in the list of idempotent exceptions, then we want to
1303 # recover.
1304 if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
1305 _LOGGER.debug("Observed recoverable stream error %s", exception)
1306 return True
1307 _LOGGER.debug("Observed non-recoverable stream error %s", exception)
1308 return False
1309
1310 def _should_terminate(self, exception: BaseException) -> bool:
1311 """Determine if an error on the RPC stream should be terminated.
1312
1313 If the exception is one of the terminating exceptions, this will signal
1314 to the consumer thread that it should terminate.
1315
1316 This will cause the stream to exit when it returns :data:`True`.
1317
1318 Returns:
1319 Indicates if the caller should terminate or attempt recovery.
1320 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
1321 in a list of terminating exceptions.
1322 """
1323 exception = _wrap_as_exception(exception)
1324 is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1325 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1326 if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1327 _LOGGER.error("Observed terminating stream error %s", exception)
1328 return True
1329 _LOGGER.debug("Observed non-terminating stream error %s", exception)
1330 return False
1331
1332 def _on_rpc_done(self, future: Any) -> None:
1333 """Triggered whenever the underlying RPC terminates without recovery.
1334
1335 This is typically triggered from one of two threads: the background
1336 consumer thread (when calling ``recv()`` produces a non-recoverable
1337 error) or the grpc management thread (when cancelling the RPC).
1338
1339 This method is *non-blocking*. It will start another thread to deal
1340 with shutting everything down. This is to prevent blocking in the
1341 background consumer and preventing it from being ``joined()``.
1342 """
1343 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.")
1344 error = _wrap_as_exception(future)
1345 thread = threading.Thread(
1346 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
1347 )
1348 thread.daemon = True
1349 thread.start()