Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py: 33%
105 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import copy
18import logging
19import random
20import threading
21import time
22import typing
23from typing import Dict, Iterable, Optional, Union
25from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY
27try:
28 from collections.abc import KeysView
30 KeysView[None] # KeysView is only subscriptable in Python 3.9+
31except TypeError:
32 # Deprecated since Python 3.9, thus only use as a fallback in older Python versions
33 from typing import KeysView
35from google.cloud.pubsub_v1.subscriber._protocol import requests
37if typing.TYPE_CHECKING: # pragma: NO COVER
38 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
39 StreamingPullManager,
40 )
43_LOGGER = logging.getLogger(__name__)
44_LEASE_WORKER_NAME = "Thread-LeaseMaintainer"
47class _LeasedMessage(typing.NamedTuple):
48 sent_time: float
49 """The local time when ACK ID was initially leased in seconds since the epoch."""
51 size: int
52 ordering_key: Optional[str]
55class Leaser(object):
56 def __init__(self, manager: "StreamingPullManager"):
57 self._thread: Optional[threading.Thread] = None
58 self._manager = manager
60 # a lock used for start/stop operations, protecting the _thread attribute
61 self._operational_lock = threading.Lock()
63 # A lock ensuring that add/remove operations are atomic and cannot be
64 # intertwined. Protects the _leased_messages and _bytes attributes.
65 self._add_remove_lock = threading.Lock()
67 # Dict of ack_id -> _LeasedMessage
68 self._leased_messages: Dict[str, _LeasedMessage] = {}
70 self._bytes = 0
71 """The total number of bytes consumed by leased messages."""
73 self._stop_event = threading.Event()
75 @property
76 def message_count(self) -> int:
77 """The number of leased messages."""
78 return len(self._leased_messages)
80 @property
81 def ack_ids(self) -> KeysView[str]:
82 """The ack IDs of all leased messages."""
83 return self._leased_messages.keys()
85 @property
86 def bytes(self) -> int:
87 """The total size, in bytes, of all leased messages."""
88 return self._bytes
90 def add(self, items: Iterable[requests.LeaseRequest]) -> None:
91 """Add messages to be managed by the leaser."""
92 with self._add_remove_lock:
93 for item in items:
94 # Add the ack ID to the set of managed ack IDs, and increment
95 # the size counter.
96 if item.ack_id not in self._leased_messages:
97 self._leased_messages[item.ack_id] = _LeasedMessage(
98 sent_time=float("inf"),
99 size=item.byte_size,
100 ordering_key=item.ordering_key,
101 )
102 self._bytes += item.byte_size
103 else:
104 _LOGGER.debug("Message %s is already lease managed", item.ack_id)
106 def start_lease_expiry_timer(self, ack_ids: Iterable[str]) -> None:
107 """Start the lease expiry timer for `items`.
109 Args:
110 items: Sequence of ack-ids for which to start lease expiry timers.
111 """
112 with self._add_remove_lock:
113 for ack_id in ack_ids:
114 lease_info = self._leased_messages.get(ack_id)
115 # Lease info might not exist for this ack_id because it has already
116 # been removed by remove().
117 if lease_info:
118 self._leased_messages[ack_id] = lease_info._replace(
119 sent_time=time.time()
120 )
122 def remove(
123 self,
124 items: Iterable[
125 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest]
126 ],
127 ) -> None:
128 """Remove messages from lease management."""
129 with self._add_remove_lock:
130 # Remove the ack ID from lease management, and decrement the
131 # byte counter.
132 for item in items:
133 if self._leased_messages.pop(item.ack_id, None) is not None:
134 self._bytes -= item.byte_size
135 else:
136 _LOGGER.debug("Item %s was not managed.", item.ack_id)
138 if self._bytes < 0:
139 _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
140 self._bytes = 0
142 def maintain_leases(self) -> None:
143 """Maintain all of the leases being managed.
145 This method modifies the ack deadline for all of the managed
146 ack IDs, then waits for most of that time (but with jitter), and
147 repeats.
148 """
149 while not self._stop_event.is_set():
150 # Determine the appropriate duration for the lease. This is
151 # based off of how long previous messages have taken to ack, with
152 # a sensible default and within the ranges allowed by Pub/Sub.
153 # Also update the deadline currently used if enough new ACK data has been
154 # gathered since the last deadline update.
155 deadline = self._manager._obtain_ack_deadline(maybe_update=True)
156 _LOGGER.debug("The current deadline value is %d seconds.", deadline)
158 # Make a copy of the leased messages. This is needed because it's
159 # possible for another thread to modify the dictionary while
160 # we're iterating over it.
161 leased_messages = copy.copy(self._leased_messages)
163 # Drop any leases that are beyond the max lease time. This ensures
164 # that in the event of a badly behaving actor, we can drop messages
165 # and allow the Pub/Sub server to resend them.
166 cutoff = time.time() - self._manager.flow_control.max_lease_duration
167 to_drop = [
168 requests.DropRequest(ack_id, item.size, item.ordering_key)
169 for ack_id, item in leased_messages.items()
170 if item.sent_time < cutoff
171 ]
173 if to_drop:
174 _LOGGER.warning(
175 "Dropping %s items because they were leased too long.", len(to_drop)
176 )
177 assert self._manager.dispatcher is not None
178 self._manager.dispatcher.drop(to_drop)
180 # Remove dropped items from our copy of the leased messages (they
181 # have already been removed from the real one by
182 # self._manager.drop(), which calls self.remove()).
183 for item in to_drop:
184 leased_messages.pop(item.ack_id)
186 # Create a modack request.
187 # We do not actually call `modify_ack_deadline` over and over
188 # because it is more efficient to make a single request.
189 ack_ids = leased_messages.keys()
190 expired_ack_ids = set()
191 if ack_ids:
192 _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids))
194 # NOTE: This may not work as expected if ``consumer.active``
195 # has changed since we checked it. An implementation
196 # without any sort of race condition would require a
197 # way for ``send_request`` to fail when the consumer
198 # is inactive.
199 assert self._manager.dispatcher is not None
200 ack_id_gen = (ack_id for ack_id in ack_ids)
201 expired_ack_ids = self._manager._send_lease_modacks(
202 ack_id_gen, deadline
203 )
205 start_time = time.time()
206 # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
207 if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
208 assert self._manager.dispatcher is not None
209 self._manager.dispatcher.drop(
210 [
211 requests.DropRequest(
212 ack_id,
213 leased_messages.get(ack_id).size, # type: ignore
214 leased_messages.get(ack_id).ordering_key, # type: ignore
215 )
216 for ack_id in expired_ack_ids
217 if ack_id in leased_messages
218 ]
219 )
220 # Now wait an appropriate period of time and do this again.
221 #
222 # We determine the appropriate period of time based on a random
223 # period between:
224 # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch)
225 # maximum: 90% of the deadline
226 # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server.
227 # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
228 # where there are many clients.
229 # If we spent any time iterating over expired acks, we should subtract this from the deadline.
230 snooze = random.uniform(
231 _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time))
232 )
233 _LOGGER.debug("Snoozing lease management for %f seconds.", snooze)
234 self._stop_event.wait(timeout=snooze)
236 _LOGGER.debug("%s exiting.", _LEASE_WORKER_NAME)
238 def start(self) -> None:
239 with self._operational_lock:
240 if self._thread is not None:
241 raise ValueError("Leaser is already running.")
243 # Create and start the helper thread.
244 self._stop_event.clear()
245 thread = threading.Thread(
246 name=_LEASE_WORKER_NAME, target=self.maintain_leases
247 )
248 thread.daemon = True
249 thread.start()
250 _LOGGER.debug("Started helper thread %s", thread.name)
251 self._thread = thread
253 def stop(self) -> None:
254 with self._operational_lock:
255 self._stop_event.set()
257 if self._thread is not None:
258 # The thread should automatically exit when the consumer is
259 # inactive.
260 self._thread.join()
262 self._thread = None