1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import copy
18import logging
19import random
20import threading
21import time
22import typing
23from typing import Dict, Iterable, Optional, Union
24
25from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY
26from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
27 SubscribeOpenTelemetry,
28)
29
30try:
31 from collections.abc import KeysView
32
33 KeysView[None] # KeysView is only subscriptable in Python 3.9+
34except TypeError:
35 # Deprecated since Python 3.9, thus only use as a fallback in older Python versions
36 from typing import KeysView
37
38from google.cloud.pubsub_v1.subscriber._protocol import requests
39
40if typing.TYPE_CHECKING: # pragma: NO COVER
41 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import (
42 StreamingPullManager,
43 )
44
45
46_LOGGER = logging.getLogger(__name__)
47_LEASE_WORKER_NAME = "Thread-LeaseMaintainer"
48
49
50class _LeasedMessage(typing.NamedTuple):
51 sent_time: float
52 """The local time when ACK ID was initially leased in seconds since the epoch."""
53
54 size: int
55 ordering_key: Optional[str]
56 opentelemetry_data: Optional[SubscribeOpenTelemetry]
57
58
59class Leaser(object):
60 def __init__(self, manager: "StreamingPullManager"):
61 self._thread: Optional[threading.Thread] = None
62 self._manager = manager
63
64 # a lock used for start/stop operations, protecting the _thread attribute
65 self._operational_lock = threading.Lock()
66
67 # A lock ensuring that add/remove operations are atomic and cannot be
68 # intertwined. Protects the _leased_messages and _bytes attributes.
69 self._add_remove_lock = threading.Lock()
70
71 # Dict of ack_id -> _LeasedMessage
72 self._leased_messages: Dict[str, _LeasedMessage] = {}
73
74 self._bytes = 0
75 """The total number of bytes consumed by leased messages."""
76
77 self._stop_event = threading.Event()
78
79 @property
80 def message_count(self) -> int:
81 """The number of leased messages."""
82 return len(self._leased_messages)
83
84 @property
85 def ack_ids(self) -> KeysView[str]:
86 """The ack IDs of all leased messages."""
87 return self._leased_messages.keys()
88
89 @property
90 def bytes(self) -> int:
91 """The total size, in bytes, of all leased messages."""
92 return self._bytes
93
94 def add(self, items: Iterable[requests.LeaseRequest]) -> None:
95 """Add messages to be managed by the leaser."""
96 with self._add_remove_lock:
97 for item in items:
98 # Add the ack ID to the set of managed ack IDs, and increment
99 # the size counter.
100 if item.ack_id not in self._leased_messages:
101 self._leased_messages[item.ack_id] = _LeasedMessage(
102 sent_time=float("inf"),
103 size=item.byte_size,
104 ordering_key=item.ordering_key,
105 opentelemetry_data=item.opentelemetry_data,
106 )
107 self._bytes += item.byte_size
108 else:
109 _LOGGER.debug("Message %s is already lease managed", item.ack_id)
110
111 def start_lease_expiry_timer(self, ack_ids: Iterable[str]) -> None:
112 """Start the lease expiry timer for `items`.
113
114 Args:
115 items: Sequence of ack-ids for which to start lease expiry timers.
116 """
117 with self._add_remove_lock:
118 for ack_id in ack_ids:
119 lease_info = self._leased_messages.get(ack_id)
120 # Lease info might not exist for this ack_id because it has already
121 # been removed by remove().
122 if lease_info:
123 self._leased_messages[ack_id] = lease_info._replace(
124 sent_time=time.time()
125 )
126
127 def remove(
128 self,
129 items: Iterable[
130 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest]
131 ],
132 ) -> None:
133 """Remove messages from lease management."""
134 with self._add_remove_lock:
135 # Remove the ack ID from lease management, and decrement the
136 # byte counter.
137 for item in items:
138 if self._leased_messages.pop(item.ack_id, None) is not None:
139 self._bytes -= item.byte_size
140 else:
141 _LOGGER.debug("Item %s was not managed.", item.ack_id)
142
143 if self._bytes < 0:
144 _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
145 self._bytes = 0
146
147 def maintain_leases(self) -> None:
148 """Maintain all of the leases being managed.
149
150 This method modifies the ack deadline for all of the managed
151 ack IDs, then waits for most of that time (but with jitter), and
152 repeats.
153 """
154 while not self._stop_event.is_set():
155 # Determine the appropriate duration for the lease. This is
156 # based off of how long previous messages have taken to ack, with
157 # a sensible default and within the ranges allowed by Pub/Sub.
158 # Also update the deadline currently used if enough new ACK data has been
159 # gathered since the last deadline update.
160 deadline = self._manager._obtain_ack_deadline(maybe_update=True)
161 _LOGGER.debug("The current deadline value is %d seconds.", deadline)
162
163 # Make a copy of the leased messages. This is needed because it's
164 # possible for another thread to modify the dictionary while
165 # we're iterating over it.
166 leased_messages = copy.copy(self._leased_messages)
167
168 # Drop any leases that are beyond the max lease time. This ensures
169 # that in the event of a badly behaving actor, we can drop messages
170 # and allow the Pub/Sub server to resend them.
171 cutoff = time.time() - self._manager.flow_control.max_lease_duration
172 to_drop = [
173 requests.DropRequest(ack_id, item.size, item.ordering_key)
174 for ack_id, item in leased_messages.items()
175 if item.sent_time < cutoff
176 ]
177
178 if to_drop:
179 _LOGGER.warning(
180 "Dropping %s items because they were leased too long.", len(to_drop)
181 )
182 assert self._manager.dispatcher is not None
183 for drop_msg in to_drop:
184 leased_message = leased_messages.get(drop_msg.ack_id)
185 if leased_message and leased_message.opentelemetry_data:
186 leased_message.opentelemetry_data.add_process_span_event(
187 "expired"
188 )
189 leased_message.opentelemetry_data.end_process_span()
190 leased_message.opentelemetry_data.set_subscribe_span_result(
191 "expired"
192 )
193 leased_message.opentelemetry_data.end_subscribe_span()
194 self._manager.dispatcher.drop(to_drop)
195
196 # Remove dropped items from our copy of the leased messages (they
197 # have already been removed from the real one by
198 # self._manager.drop(), which calls self.remove()).
199 for item in to_drop:
200 leased_messages.pop(item.ack_id)
201
202 # Create a modack request.
203 # We do not actually call `modify_ack_deadline` over and over
204 # because it is more efficient to make a single request.
205 ack_ids = leased_messages.keys()
206 expired_ack_ids = set()
207 if ack_ids:
208 _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids))
209
210 # NOTE: This may not work as expected if ``consumer.active``
211 # has changed since we checked it. An implementation
212 # without any sort of race condition would require a
213 # way for ``send_request`` to fail when the consumer
214 # is inactive.
215 assert self._manager.dispatcher is not None
216 ack_id_gen = (ack_id for ack_id in ack_ids)
217 opentelemetry_data = [
218 message.opentelemetry_data
219 for message in list(leased_messages.values())
220 if message.opentelemetry_data
221 ]
222 expired_ack_ids = self._manager._send_lease_modacks(
223 ack_id_gen,
224 deadline,
225 opentelemetry_data,
226 )
227
228 start_time = time.time()
229 # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
230 if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
231 assert self._manager.dispatcher is not None
232 for ack_id in expired_ack_ids:
233 msg = leased_messages.get(ack_id)
234 if msg and msg.opentelemetry_data:
235 msg.opentelemetry_data.add_process_span_event("expired")
236 msg.opentelemetry_data.end_process_span()
237 msg.opentelemetry_data.set_subscribe_span_result("expired")
238 msg.opentelemetry_data.end_subscribe_span()
239 self._manager.dispatcher.drop(
240 [
241 requests.DropRequest(
242 ack_id,
243 leased_messages.get(ack_id).size, # type: ignore
244 leased_messages.get(ack_id).ordering_key, # type: ignore
245 )
246 for ack_id in expired_ack_ids
247 if ack_id in leased_messages
248 ]
249 )
250 # Now wait an appropriate period of time and do this again.
251 #
252 # We determine the appropriate period of time based on a random
253 # period between:
254 # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch)
255 # maximum: 90% of the deadline
256 # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server.
257 # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
258 # where there are many clients.
259 # If we spent any time iterating over expired acks, we should subtract this from the deadline.
260 snooze = random.uniform(
261 _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time))
262 )
263 _LOGGER.debug("Snoozing lease management for %f seconds.", snooze)
264 self._stop_event.wait(timeout=snooze)
265
266 _LOGGER.debug("%s exiting.", _LEASE_WORKER_NAME)
267
268 def start(self) -> None:
269 with self._operational_lock:
270 if self._thread is not None:
271 raise ValueError("Leaser is already running.")
272
273 # Create and start the helper thread.
274 self._stop_event.clear()
275 thread = threading.Thread(
276 name=_LEASE_WORKER_NAME, target=self.maintain_leases
277 )
278 thread.daemon = True
279 thread.start()
280 _LOGGER.debug("Started helper thread %s", thread.name)
281 self._thread = thread
282
283 def stop(self) -> None:
284 with self._operational_lock:
285 self._stop_event.set()
286
287 if self._thread is not None:
288 # The thread should automatically exit when the consumer is
289 # inactive.
290 self._thread.join()
291
292 self._thread = None