1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16from __future__ import division
17
18import functools
19import itertools
20import logging
21import math
22import time
23import threading
24import typing
25from typing import List, Optional, Sequence, Union
26import warnings
27from google.api_core.retry import exponential_sleep_generator
28
29from opentelemetry import trace
30
31from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
32from google.cloud.pubsub_v1.subscriber._protocol import requests
33from google.cloud.pubsub_v1.subscriber.exceptions import (
34 AcknowledgeStatus,
35)
36from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
37 start_ack_span,
38 start_nack_span,
39)
40
41if typing.TYPE_CHECKING: # pragma: NO COVER
42 import queue
43 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
44 StreamingPullManager,
45 )
46
47
48RequestItem = Union[
49 requests.AckRequest,
50 requests.DropRequest,
51 requests.LeaseRequest,
52 requests.ModAckRequest,
53 requests.NackRequest,
54]
55
56
57_LOGGER = logging.getLogger(__name__)
58_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"
59
60
61_MAX_BATCH_SIZE = 100
62"""The maximum number of requests to process and dispatch at a time."""
63
64_MAX_BATCH_LATENCY = 0.01
65"""The maximum amount of time in seconds to wait for additional request items
66before processing the next batch of requests."""
67
68_ACK_IDS_BATCH_SIZE = 1000
69"""The maximum number of ACK IDs to send in a single StreamingPullRequest.
70"""
71
72_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1
73"""The time to wait for the first retry of failed acks and modacks when exactly-once
74delivery is enabled."""
75
76_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60
77"""The maximum amount of time in seconds to retry failed acks and modacks when
78exactly-once delivery is enabled."""
79
80
81class Dispatcher(object):
82 def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
83 self._manager = manager
84 self._queue = queue
85 self._thread: Optional[threading.Thread] = None
86 self._operational_lock = threading.Lock()
87
88 def start(self) -> None:
89 """Start a thread to dispatch requests queued up by callbacks.
90
91 Spawns a thread to run :meth:`dispatch_callback`.
92 """
93 with self._operational_lock:
94 if self._thread is not None:
95 raise ValueError("Dispatcher is already running.")
96
97 worker = helper_threads.QueueCallbackWorker(
98 self._queue,
99 self.dispatch_callback,
100 max_items=_MAX_BATCH_SIZE,
101 max_latency=_MAX_BATCH_LATENCY,
102 )
103 # Create and start the helper thread.
104 thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
105 thread.daemon = True
106 thread.start()
107 _LOGGER.debug("Started helper thread %s", thread.name)
108 self._thread = thread
109
110 def stop(self) -> None:
111 with self._operational_lock:
112 if self._thread is not None:
113 # Signal the worker to stop by queueing a "poison pill"
114 self._queue.put(helper_threads.STOP)
115 self._thread.join()
116
117 self._thread = None
118
119 def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
120 """Map the callback request to the appropriate gRPC request.
121
122 Args:
123 items:
124 Queued requests to dispatch.
125 """
126 lease_requests: List[requests.LeaseRequest] = []
127 modack_requests: List[requests.ModAckRequest] = []
128 ack_requests: List[requests.AckRequest] = []
129 nack_requests: List[requests.NackRequest] = []
130 drop_requests: List[requests.DropRequest] = []
131
132 lease_ids = set()
133 modack_ids = set()
134 ack_ids = set()
135 nack_ids = set()
136 drop_ids = set()
137 exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()
138
139 for item in items:
140 if isinstance(item, requests.LeaseRequest):
141 if (
142 item.ack_id not in lease_ids
143 ): # LeaseRequests have no futures to handle.
144 lease_ids.add(item.ack_id)
145 lease_requests.append(item)
146 elif isinstance(item, requests.ModAckRequest):
147 if item.ack_id in modack_ids:
148 self._handle_duplicate_request_future(
149 exactly_once_delivery_enabled, item
150 )
151 else:
152 modack_ids.add(item.ack_id)
153 modack_requests.append(item)
154 elif isinstance(item, requests.AckRequest):
155 if item.ack_id in ack_ids:
156 self._handle_duplicate_request_future(
157 exactly_once_delivery_enabled, item
158 )
159 else:
160 ack_ids.add(item.ack_id)
161 ack_requests.append(item)
162 elif isinstance(item, requests.NackRequest):
163 if item.ack_id in nack_ids:
164 self._handle_duplicate_request_future(
165 exactly_once_delivery_enabled, item
166 )
167 else:
168 nack_ids.add(item.ack_id)
169 nack_requests.append(item)
170 elif isinstance(item, requests.DropRequest):
171 if (
172 item.ack_id not in drop_ids
173 ): # DropRequests have no futures to handle.
174 drop_ids.add(item.ack_id)
175 drop_requests.append(item)
176 else:
177 warnings.warn(
178 f'Skipping unknown request item of type "{type(item)}"',
179 category=RuntimeWarning,
180 )
181
182 _LOGGER.debug("Handling %d batched requests", len(items))
183
184 if lease_requests:
185 self.lease(lease_requests)
186
187 if modack_requests:
188 self.modify_ack_deadline(modack_requests)
189
190 # Note: Drop and ack *must* be after lease. It's possible to get both
191 # the lease and the ack/drop request in the same batch.
192 if ack_requests:
193 self.ack(ack_requests)
194
195 if nack_requests:
196 self.nack(nack_requests)
197
198 if drop_requests:
199 self.drop(drop_requests)
200
201 def _handle_duplicate_request_future(
202 self,
203 exactly_once_delivery_enabled: bool,
204 item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest],
205 ) -> None:
206 _LOGGER.debug(
207 "This is a duplicate %s with the same ack_id: %s.",
208 type(item),
209 item.ack_id,
210 )
211 if item.future:
212 if exactly_once_delivery_enabled:
213 item.future.set_exception(
214 ValueError(f"Duplicate ack_id for {type(item)}")
215 )
216 # Futures may be present even with exactly-once delivery
217 # disabled, in transition periods after the setting is changed on
218 # the subscription.
219 else:
220 # When exactly-once delivery is NOT enabled, acks/modacks are considered
221 # best-effort, so the future should succeed even though this is a duplicate.
222 item.future.set_result(AcknowledgeStatus.SUCCESS)
223
224 def ack(self, items: Sequence[requests.AckRequest]) -> None:
225 """Acknowledge the given messages.
226
227 Args:
228 items: The items to acknowledge.
229 """
230 # If we got timing information, add it to the histogram.
231 for item in items:
232 time_to_ack = item.time_to_ack
233 if time_to_ack is not None:
234 self._manager.ack_histogram.add(time_to_ack)
235
236 # We must potentially split the request into multiple smaller requests
237 # to avoid the server-side max request size limit.
238 items_gen = iter(items)
239 ack_ids_gen = (item.ack_id for item in items)
240 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
241 subscription_id: Optional[str] = None
242 project_id: Optional[str] = None
243 for item in items:
244 if item.opentelemetry_data:
245 item.opentelemetry_data.add_subscribe_span_event("ack start")
246 if subscription_id is None:
247 subscription_id = item.opentelemetry_data.subscription_id
248 if project_id is None:
249 project_id = item.opentelemetry_data.project_id
250
251 for _ in range(total_chunks):
252 ack_reqs_dict = {
253 req.ack_id: req
254 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
255 }
256
257 subscribe_links: List[trace.Link] = []
258 subscribe_spans: List[trace.Span] = []
259 for ack_req in ack_reqs_dict.values():
260 if ack_req.opentelemetry_data:
261 subscribe_span: Optional[
262 trace.Span
263 ] = ack_req.opentelemetry_data.subscribe_span
264 if (
265 subscribe_span
266 and subscribe_span.get_span_context().trace_flags.sampled
267 ):
268 subscribe_links.append(
269 trace.Link(subscribe_span.get_span_context())
270 )
271 subscribe_spans.append(subscribe_span)
272 ack_span: Optional[trace.Span] = None
273 if subscription_id and project_id:
274 ack_span = start_ack_span(
275 subscription_id,
276 len(ack_reqs_dict),
277 project_id,
278 subscribe_links,
279 )
280 if (
281 ack_span and ack_span.get_span_context().trace_flags.sampled
282 ): # pragma: NO COVER
283 ack_span_context: trace.SpanContext = ack_span.get_span_context()
284 for subscribe_span in subscribe_spans:
285 subscribe_span.add_link(
286 context=ack_span_context,
287 attributes={
288 "messaging.operation.name": "ack",
289 },
290 )
291
292 requests_completed, requests_to_retry = self._manager.send_unary_ack(
293 ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
294 ack_reqs_dict=ack_reqs_dict,
295 )
296 if ack_span:
297 ack_span.end()
298
299 for completed_ack in requests_completed:
300 if completed_ack.opentelemetry_data:
301 completed_ack.opentelemetry_data.add_subscribe_span_event("ack end")
302 completed_ack.opentelemetry_data.set_subscribe_span_result("acked")
303 completed_ack.opentelemetry_data.end_subscribe_span()
304
305 # Remove the completed messages from lease management.
306 self.drop(requests_completed)
307
308 # Retry on a separate thread so the dispatcher thread isn't blocked
309 # by sleeps.
310 if requests_to_retry:
311 self._start_retry_thread(
312 "Thread-RetryAcks",
313 functools.partial(self._retry_acks, requests_to_retry),
314 )
315
316 def _start_retry_thread(self, thread_name, thread_target):
317 # note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue.
318 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
319 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
320 retry_thread = threading.Thread(
321 name=thread_name,
322 target=thread_target,
323 daemon=True,
324 )
325 # The thread finishes when the requests succeed or eventually fail with
326 # a back-end timeout error or other permanent failure.
327 retry_thread.start()
328
329 def _retry_acks(self, requests_to_retry: List[requests.AckRequest]):
330 retry_delay_gen = exponential_sleep_generator(
331 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
332 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
333 )
334 while requests_to_retry:
335 time_to_wait = next(retry_delay_gen)
336 _LOGGER.debug(
337 "Retrying {len(requests_to_retry)} ack(s) after delay of "
338 + str(time_to_wait)
339 + " seconds"
340 )
341 time.sleep(time_to_wait)
342
343 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
344 subscription_id: Optional[str] = None
345 project_id: Optional[str] = None
346 subscribe_links: List[trace.Link] = []
347 subscribe_spans: List[trace.Span] = []
348 for req in requests_to_retry:
349 if req.opentelemetry_data:
350 req.opentelemetry_data.add_subscribe_span_event("ack start")
351 if subscription_id is None:
352 subscription_id = req.opentelemetry_data.subscription_id
353 if project_id is None:
354 project_id = req.opentelemetry_data.project_id
355 subscribe_span: Optional[
356 trace.Span
357 ] = req.opentelemetry_data.subscribe_span
358 if (
359 subscribe_span
360 and subscribe_span.get_span_context().trace_flags.sampled
361 ):
362 subscribe_links.append(
363 trace.Link(subscribe_span.get_span_context())
364 )
365 subscribe_spans.append(subscribe_span)
366 ack_span: Optional[trace.Span] = None
367 if subscription_id and project_id:
368 ack_span = start_ack_span(
369 subscription_id,
370 len(ack_reqs_dict),
371 project_id,
372 subscribe_links,
373 )
374 if (
375 ack_span and ack_span.get_span_context().trace_flags.sampled
376 ): # pragma: NO COVER
377 ack_span_context: trace.SpanContext = ack_span.get_span_context()
378 for subscribe_span in subscribe_spans:
379 subscribe_span.add_link(
380 context=ack_span_context,
381 attributes={
382 "messaging.operation.name": "ack",
383 },
384 )
385
386 requests_completed, requests_to_retry = self._manager.send_unary_ack(
387 ack_ids=[req.ack_id for req in requests_to_retry],
388 ack_reqs_dict=ack_reqs_dict,
389 )
390
391 if ack_span:
392 ack_span.end()
393
394 for completed_ack in requests_completed:
395 if completed_ack.opentelemetry_data:
396 completed_ack.opentelemetry_data.add_subscribe_span_event("ack end")
397 completed_ack.opentelemetry_data.set_subscribe_span_result("acked")
398 completed_ack.opentelemetry_data.end_subscribe_span()
399
400 assert (
401 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
402 ), "Too many requests to be retried."
403 # Remove the completed messages from lease management.
404 self.drop(requests_completed)
405
406 def drop(
407 self,
408 items: Sequence[
409 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest]
410 ],
411 ) -> None:
412 """Remove the given messages from lease management.
413
414 Args:
415 items: The items to drop.
416 """
417 assert self._manager.leaser is not None
418 self._manager.leaser.remove(items)
419 ordering_keys = (k.ordering_key for k in items if k.ordering_key)
420 self._manager.activate_ordering_keys(ordering_keys)
421 self._manager.maybe_resume_consumer()
422
423 def lease(self, items: Sequence[requests.LeaseRequest]) -> None:
424 """Add the given messages to lease management.
425
426 Args:
427 items: The items to lease.
428 """
429 assert self._manager.leaser is not None
430 self._manager.leaser.add(items)
431 self._manager.maybe_pause_consumer()
432
433 def modify_ack_deadline(
434 self,
435 items: Sequence[requests.ModAckRequest],
436 default_deadline: Optional[float] = None,
437 ) -> None:
438 """Modify the ack deadline for the given messages.
439
440 Args:
441 items: The items to modify.
442 """
443 # We must potentially split the request into multiple smaller requests
444 # to avoid the server-side max request size limit.
445 items_gen = iter(items)
446 ack_ids_gen = (item.ack_id for item in items)
447 deadline_seconds_gen = (item.seconds for item in items)
448 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
449
450 subscription_id: Optional[str] = None
451 project_id: Optional[str] = None
452
453 for item in items:
454 if item.opentelemetry_data:
455 if math.isclose(item.seconds, 0):
456 item.opentelemetry_data.add_subscribe_span_event("nack start")
457 if subscription_id is None:
458 subscription_id = item.opentelemetry_data.subscription_id
459 if project_id is None:
460 project_id = item.opentelemetry_data.project_id
461 else:
462 item.opentelemetry_data.add_subscribe_span_event("modack start")
463 for _ in range(total_chunks):
464 ack_reqs_dict = {
465 req.ack_id: req
466 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
467 }
468 subscribe_links: List[trace.Link] = []
469 subscribe_spans: List[trace.Span] = []
470 for ack_req in ack_reqs_dict.values():
471 if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0):
472 subscribe_span: Optional[
473 trace.Span
474 ] = ack_req.opentelemetry_data.subscribe_span
475 if (
476 subscribe_span
477 and subscribe_span.get_span_context().trace_flags.sampled
478 ):
479 subscribe_links.append(
480 trace.Link(subscribe_span.get_span_context())
481 )
482 subscribe_spans.append(subscribe_span)
483 nack_span: Optional[trace.Span] = None
484 if subscription_id and project_id:
485 nack_span = start_nack_span(
486 subscription_id,
487 len(ack_reqs_dict),
488 project_id,
489 subscribe_links,
490 )
491 if (
492 nack_span and nack_span.get_span_context().trace_flags.sampled
493 ): # pragma: NO COVER
494 nack_span_context: trace.SpanContext = nack_span.get_span_context()
495 for subscribe_span in subscribe_spans:
496 subscribe_span.add_link(
497 context=nack_span_context,
498 attributes={
499 "messaging.operation.name": "nack",
500 },
501 )
502 requests_to_retry: List[requests.ModAckRequest]
503 requests_completed: Optional[List[requests.ModAckRequest]] = None
504 if default_deadline is None:
505 # no further work needs to be done for `requests_to_retry`
506 requests_completed, requests_to_retry = self._manager.send_unary_modack(
507 modify_deadline_ack_ids=list(
508 itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
509 ),
510 modify_deadline_seconds=list(
511 itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
512 ),
513 ack_reqs_dict=ack_reqs_dict,
514 default_deadline=None,
515 )
516 else:
517 requests_completed, requests_to_retry = self._manager.send_unary_modack(
518 modify_deadline_ack_ids=itertools.islice(
519 ack_ids_gen, _ACK_IDS_BATCH_SIZE
520 ),
521 modify_deadline_seconds=None,
522 ack_reqs_dict=ack_reqs_dict,
523 default_deadline=default_deadline,
524 )
525 if nack_span:
526 nack_span.end()
527 assert (
528 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
529 ), "Too many requests to be retried."
530
531 for completed_modack in requests_completed:
532 if completed_modack.opentelemetry_data:
533 # nack is a modack with 0 extension seconds.
534 if math.isclose(completed_modack.seconds, 0):
535 completed_modack.opentelemetry_data.set_subscribe_span_result(
536 "nacked"
537 )
538 completed_modack.opentelemetry_data.add_subscribe_span_event(
539 "nack end"
540 )
541 completed_modack.opentelemetry_data.end_subscribe_span()
542 else:
543 completed_modack.opentelemetry_data.add_subscribe_span_event(
544 "modack end"
545 )
546
547 # Retry on a separate thread so the dispatcher thread isn't blocked
548 # by sleeps.
549 if requests_to_retry:
550 self._start_retry_thread(
551 "Thread-RetryModAcks",
552 functools.partial(self._retry_modacks, requests_to_retry),
553 )
554
555 def _retry_modacks(self, requests_to_retry):
556 retry_delay_gen = exponential_sleep_generator(
557 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
558 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
559 )
560 while requests_to_retry:
561 time_to_wait = next(retry_delay_gen)
562 _LOGGER.debug(
563 "Retrying {len(requests_to_retry)} modack(s) after delay of "
564 + str(time_to_wait)
565 + " seconds"
566 )
567 time.sleep(time_to_wait)
568
569 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
570
571 subscription_id = None
572 project_id = None
573 subscribe_links = []
574 subscribe_spans = []
575 for ack_req in ack_reqs_dict.values():
576 if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0):
577 if subscription_id is None:
578 subscription_id = ack_req.opentelemetry_data.subscription_id
579 if project_id is None:
580 project_id = ack_req.opentelemetry_data.project_id
581 subscribe_span = ack_req.opentelemetry_data.subscribe_span
582 if (
583 subscribe_span
584 and subscribe_span.get_span_context().trace_flags.sampled
585 ):
586 subscribe_links.append(
587 trace.Link(subscribe_span.get_span_context())
588 )
589 subscribe_spans.append(subscribe_span)
590 nack_span = None
591 if subscription_id and project_id:
592 nack_span = start_nack_span(
593 subscription_id,
594 len(ack_reqs_dict),
595 project_id,
596 subscribe_links,
597 )
598 if (
599 nack_span and nack_span.get_span_context().trace_flags.sampled
600 ): # pragma: NO COVER
601 nack_span_context: trace.SpanContext = nack_span.get_span_context()
602 for subscribe_span in subscribe_spans:
603 subscribe_span.add_link(
604 context=nack_span_context,
605 attributes={
606 "messaging.operation.name": "nack",
607 },
608 )
609 requests_completed, requests_to_retry = self._manager.send_unary_modack(
610 modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
611 modify_deadline_seconds=[req.seconds for req in requests_to_retry],
612 ack_reqs_dict=ack_reqs_dict,
613 )
614 if nack_span:
615 nack_span.end()
616 for completed_modack in requests_completed:
617 if completed_modack.opentelemetry_data:
618 # nack is a modack with 0 extension seconds.
619 if math.isclose(completed_modack.seconds, 0):
620 completed_modack.opentelemetry_data.set_subscribe_span_result(
621 "nacked"
622 )
623 completed_modack.opentelemetry_data.add_subscribe_span_event(
624 "nack end"
625 )
626 completed_modack.opentelemetry_data.end_subscribe_span()
627 else:
628 completed_modack.opentelemetry_data.add_subscribe_span_event(
629 "modack end"
630 )
631
632 def nack(self, items: Sequence[requests.NackRequest]) -> None:
633 """Explicitly deny receipt of messages.
634
635 Args:
636 items: The items to deny.
637 """
638 self.modify_ack_deadline(
639 [
640 requests.ModAckRequest(
641 ack_id=item.ack_id,
642 seconds=0,
643 future=item.future,
644 opentelemetry_data=item.opentelemetry_data,
645 )
646 for item in items
647 ]
648 )
649 self.drop(
650 [
651 requests.DropRequest(
652 ack_id=item.ack_id,
653 byte_size=item.byte_size,
654 ordering_key=item.ordering_key,
655 )
656 for item in items
657 ]
658 )