Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py: 23%
160 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
16from __future__ import division
18import functools
19import itertools
20import logging
21import math
22import time
23import threading
24import typing
25from typing import List, Optional, Sequence, Union
26import warnings
27from google.api_core.retry import exponential_sleep_generator
29from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
30from google.cloud.pubsub_v1.subscriber._protocol import requests
31from google.cloud.pubsub_v1.subscriber.exceptions import (
32 AcknowledgeStatus,
33)
35if typing.TYPE_CHECKING: # pragma: NO COVER
36 import queue
37 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
38 StreamingPullManager,
39 )
42RequestItem = Union[
43 requests.AckRequest,
44 requests.DropRequest,
45 requests.LeaseRequest,
46 requests.ModAckRequest,
47 requests.NackRequest,
48]
51_LOGGER = logging.getLogger(__name__)
52_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"
55_MAX_BATCH_SIZE = 100
56"""The maximum number of requests to process and dispatch at a time."""
58_MAX_BATCH_LATENCY = 0.01
59"""The maximum amount of time in seconds to wait for additional request items
60before processing the next batch of requests."""
62_ACK_IDS_BATCH_SIZE = 1000
63"""The maximum number of ACK IDs to send in a single StreamingPullRequest.
64"""
66_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1
67"""The time to wait for the first retry of failed acks and modacks when exactly-once
68delivery is enabled."""
70_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60
71"""The maximum amount of time in seconds to retry failed acks and modacks when
72exactly-once delivery is enabled."""
75class Dispatcher(object):
76 def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
77 self._manager = manager
78 self._queue = queue
79 self._thread: Optional[threading.Thread] = None
80 self._operational_lock = threading.Lock()
82 def start(self) -> None:
83 """Start a thread to dispatch requests queued up by callbacks.
85 Spawns a thread to run :meth:`dispatch_callback`.
86 """
87 with self._operational_lock:
88 if self._thread is not None:
89 raise ValueError("Dispatcher is already running.")
91 worker = helper_threads.QueueCallbackWorker(
92 self._queue,
93 self.dispatch_callback,
94 max_items=_MAX_BATCH_SIZE,
95 max_latency=_MAX_BATCH_LATENCY,
96 )
97 # Create and start the helper thread.
98 thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
99 thread.daemon = True
100 thread.start()
101 _LOGGER.debug("Started helper thread %s", thread.name)
102 self._thread = thread
104 def stop(self) -> None:
105 with self._operational_lock:
106 if self._thread is not None:
107 # Signal the worker to stop by queueing a "poison pill"
108 self._queue.put(helper_threads.STOP)
109 self._thread.join()
111 self._thread = None
113 def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
114 """Map the callback request to the appropriate gRPC request.
116 Args:
117 items:
118 Queued requests to dispatch.
119 """
120 lease_requests: List[requests.LeaseRequest] = []
121 modack_requests: List[requests.ModAckRequest] = []
122 ack_requests: List[requests.AckRequest] = []
123 nack_requests: List[requests.NackRequest] = []
124 drop_requests: List[requests.DropRequest] = []
126 lease_ids = set()
127 modack_ids = set()
128 ack_ids = set()
129 nack_ids = set()
130 drop_ids = set()
131 exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()
133 for item in items:
134 if isinstance(item, requests.LeaseRequest):
135 if (
136 item.ack_id not in lease_ids
137 ): # LeaseRequests have no futures to handle.
138 lease_ids.add(item.ack_id)
139 lease_requests.append(item)
140 elif isinstance(item, requests.ModAckRequest):
141 if item.ack_id in modack_ids:
142 self._handle_duplicate_request_future(
143 exactly_once_delivery_enabled, item
144 )
145 else:
146 modack_ids.add(item.ack_id)
147 modack_requests.append(item)
148 elif isinstance(item, requests.AckRequest):
149 if item.ack_id in ack_ids:
150 self._handle_duplicate_request_future(
151 exactly_once_delivery_enabled, item
152 )
153 else:
154 ack_ids.add(item.ack_id)
155 ack_requests.append(item)
156 elif isinstance(item, requests.NackRequest):
157 if item.ack_id in nack_ids:
158 self._handle_duplicate_request_future(
159 exactly_once_delivery_enabled, item
160 )
161 else:
162 nack_ids.add(item.ack_id)
163 nack_requests.append(item)
164 elif isinstance(item, requests.DropRequest):
165 if (
166 item.ack_id not in drop_ids
167 ): # DropRequests have no futures to handle.
168 drop_ids.add(item.ack_id)
169 drop_requests.append(item)
170 else:
171 warnings.warn(
172 f'Skipping unknown request item of type "{type(item)}"',
173 category=RuntimeWarning,
174 )
176 _LOGGER.debug("Handling %d batched requests", len(items))
178 if lease_requests:
179 self.lease(lease_requests)
181 if modack_requests:
182 self.modify_ack_deadline(modack_requests)
184 # Note: Drop and ack *must* be after lease. It's possible to get both
185 # the lease and the ack/drop request in the same batch.
186 if ack_requests:
187 self.ack(ack_requests)
189 if nack_requests:
190 self.nack(nack_requests)
192 if drop_requests:
193 self.drop(drop_requests)
195 def _handle_duplicate_request_future(
196 self,
197 exactly_once_delivery_enabled: bool,
198 item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest],
199 ) -> None:
200 _LOGGER.debug(
201 "This is a duplicate %s with the same ack_id: %s.",
202 type(item),
203 item.ack_id,
204 )
205 if item.future:
206 if exactly_once_delivery_enabled:
207 item.future.set_exception(
208 ValueError(f"Duplicate ack_id for {type(item)}")
209 )
210 # Futures may be present even with exactly-once delivery
211 # disabled, in transition periods after the setting is changed on
212 # the subscription.
213 else:
214 # When exactly-once delivery is NOT enabled, acks/modacks are considered
215 # best-effort, so the future should succeed even though this is a duplicate.
216 item.future.set_result(AcknowledgeStatus.SUCCESS)
218 def ack(self, items: Sequence[requests.AckRequest]) -> None:
219 """Acknowledge the given messages.
221 Args:
222 items: The items to acknowledge.
223 """
224 # If we got timing information, add it to the histogram.
225 for item in items:
226 time_to_ack = item.time_to_ack
227 if time_to_ack is not None:
228 self._manager.ack_histogram.add(time_to_ack)
230 # We must potentially split the request into multiple smaller requests
231 # to avoid the server-side max request size limit.
232 items_gen = iter(items)
233 ack_ids_gen = (item.ack_id for item in items)
234 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
236 for _ in range(total_chunks):
237 ack_reqs_dict = {
238 req.ack_id: req
239 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
240 }
241 requests_completed, requests_to_retry = self._manager.send_unary_ack(
242 ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
243 ack_reqs_dict=ack_reqs_dict,
244 )
246 # Remove the completed messages from lease management.
247 self.drop(requests_completed)
249 # Retry on a separate thread so the dispatcher thread isn't blocked
250 # by sleeps.
251 if requests_to_retry:
252 self._start_retry_thread(
253 "Thread-RetryAcks",
254 functools.partial(self._retry_acks, requests_to_retry),
255 )
257 def _start_retry_thread(self, thread_name, thread_target):
258 # note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue.
259 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
260 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
261 retry_thread = threading.Thread(
262 name=thread_name,
263 target=thread_target,
264 daemon=True,
265 )
266 # The thread finishes when the requests succeed or eventually fail with
267 # a back-end timeout error or other permanent failure.
268 retry_thread.start()
270 def _retry_acks(self, requests_to_retry):
271 retry_delay_gen = exponential_sleep_generator(
272 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
273 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
274 )
275 while requests_to_retry:
276 time_to_wait = next(retry_delay_gen)
277 _LOGGER.debug(
278 "Retrying {len(requests_to_retry)} ack(s) after delay of "
279 + str(time_to_wait)
280 + " seconds"
281 )
282 time.sleep(time_to_wait)
284 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
285 requests_completed, requests_to_retry = self._manager.send_unary_ack(
286 ack_ids=[req.ack_id for req in requests_to_retry],
287 ack_reqs_dict=ack_reqs_dict,
288 )
289 assert (
290 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
291 ), "Too many requests to be retried."
292 # Remove the completed messages from lease management.
293 self.drop(requests_completed)
295 def drop(
296 self,
297 items: Sequence[
298 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest]
299 ],
300 ) -> None:
301 """Remove the given messages from lease management.
303 Args:
304 items: The items to drop.
305 """
306 assert self._manager.leaser is not None
307 self._manager.leaser.remove(items)
308 ordering_keys = (k.ordering_key for k in items if k.ordering_key)
309 self._manager.activate_ordering_keys(ordering_keys)
310 self._manager.maybe_resume_consumer()
312 def lease(self, items: Sequence[requests.LeaseRequest]) -> None:
313 """Add the given messages to lease management.
315 Args:
316 items: The items to lease.
317 """
318 assert self._manager.leaser is not None
319 self._manager.leaser.add(items)
320 self._manager.maybe_pause_consumer()
322 def modify_ack_deadline(
323 self,
324 items: Sequence[requests.ModAckRequest],
325 default_deadline: Optional[float] = None,
326 ) -> None:
327 """Modify the ack deadline for the given messages.
329 Args:
330 items: The items to modify.
331 """
332 # We must potentially split the request into multiple smaller requests
333 # to avoid the server-side max request size limit.
334 items_gen = iter(items)
335 ack_ids_gen = (item.ack_id for item in items)
336 deadline_seconds_gen = (item.seconds for item in items)
337 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
339 for _ in range(total_chunks):
340 ack_reqs_dict = {
341 req.ack_id: req
342 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
343 }
344 requests_to_retry: List[requests.ModAckRequest]
345 if default_deadline is None:
346 # no further work needs to be done for `requests_to_retry`
347 _, requests_to_retry = self._manager.send_unary_modack(
348 modify_deadline_ack_ids=list(
349 itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
350 ),
351 modify_deadline_seconds=list(
352 itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
353 ),
354 ack_reqs_dict=ack_reqs_dict,
355 default_deadline=None,
356 )
357 else:
358 _, requests_to_retry = self._manager.send_unary_modack(
359 modify_deadline_ack_ids=itertools.islice(
360 ack_ids_gen, _ACK_IDS_BATCH_SIZE
361 ),
362 modify_deadline_seconds=None,
363 ack_reqs_dict=ack_reqs_dict,
364 default_deadline=default_deadline,
365 )
366 assert (
367 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
368 ), "Too many requests to be retried."
370 # Retry on a separate thread so the dispatcher thread isn't blocked
371 # by sleeps.
372 if requests_to_retry:
373 self._start_retry_thread(
374 "Thread-RetryModAcks",
375 functools.partial(self._retry_modacks, requests_to_retry),
376 )
378 def _retry_modacks(self, requests_to_retry):
379 retry_delay_gen = exponential_sleep_generator(
380 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
381 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
382 )
383 while requests_to_retry:
384 time_to_wait = next(retry_delay_gen)
385 _LOGGER.debug(
386 "Retrying {len(requests_to_retry)} modack(s) after delay of "
387 + str(time_to_wait)
388 + " seconds"
389 )
390 time.sleep(time_to_wait)
392 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
393 requests_completed, requests_to_retry = self._manager.send_unary_modack(
394 modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
395 modify_deadline_seconds=[req.seconds for req in requests_to_retry],
396 ack_reqs_dict=ack_reqs_dict,
397 )
399 def nack(self, items: Sequence[requests.NackRequest]) -> None:
400 """Explicitly deny receipt of messages.
402 Args:
403 items: The items to deny.
404 """
405 self.modify_ack_deadline(
406 [
407 requests.ModAckRequest(
408 ack_id=item.ack_id, seconds=0, future=item.future
409 )
410 for item in items
411 ]
412 )
413 self.drop(
414 [
415 requests.DropRequest(
416 ack_id=item.ack_id,
417 byte_size=item.byte_size,
418 ordering_key=item.ordering_key,
419 )
420 for item in items
421 ]
422 )