1# Copyright 2020, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from collections import OrderedDict
16import logging
17import threading
18from typing import Dict, Optional, Type
19import warnings
20
21from google.cloud.pubsub_v1 import types
22from google.cloud.pubsub_v1.publisher import exceptions
23
24
25_LOGGER = logging.getLogger(__name__)
26
27
28MessageType = Type[types.PubsubMessage] # type: ignore
29
30
31class _QuantityReservation:
32 """A (partial) reservation of quantifiable resources."""
33
34 def __init__(self, bytes_reserved: int, bytes_needed: int, has_slot: bool):
35 self.bytes_reserved = bytes_reserved
36 self.bytes_needed = bytes_needed
37 self.has_slot = has_slot
38
39 def __repr__(self):
40 return (
41 f"{type(self).__name__}("
42 f"bytes_reserved={self.bytes_reserved}, "
43 f"bytes_needed={self.bytes_needed}, "
44 f"has_slot={self.has_slot})"
45 )
46
47
48class FlowController(object):
49 """A class used to control the flow of messages passing through it.
50
51 Args:
52 settings: Desired flow control configuration.
53 """
54
55 def __init__(self, settings: types.PublishFlowControl):
56 self._settings = settings
57
58 # Load statistics. They represent the number of messages added, but not
59 # yet released (and their total size).
60 self._message_count = 0
61 self._total_bytes = 0
62
63 # A FIFO queue of threads blocked on adding a message that also tracks their
64 # reservations of available flow control bytes and message slots.
65 # Only relevant if the configured limit exceeded behavior is BLOCK.
66 self._waiting: Dict[threading.Thread, _QuantityReservation] = OrderedDict()
67
68 self._reserved_bytes = 0
69 self._reserved_slots = 0
70
71 # The lock is used to protect all internal state (message and byte count,
72 # waiting threads to add, etc.).
73 self._operational_lock = threading.Lock()
74
75 # The condition for blocking the flow if capacity is exceeded.
76 self._has_capacity = threading.Condition(lock=self._operational_lock)
77
78 def add(self, message: MessageType) -> None:
79 """Add a message to flow control.
80
81 Adding a message updates the internal load statistics, and an action is
82 taken if these limits are exceeded (depending on the flow control settings).
83
84 Args:
85 message:
86 The message entering the flow control.
87
88 Raises:
89 :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
90 Raised when the desired action is
91 :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
92 the message would exceed flow control limits, or when the desired action
93 is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
94 the message would block forever against the flow control limits.
95 """
96 if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
97 return
98
99 with self._operational_lock:
100 if not self._would_overflow(message):
101 self._message_count += 1
102 self._total_bytes += message._pb.ByteSize()
103 return
104
105 # Adding a message would overflow, react.
106 if (
107 self._settings.limit_exceeded_behavior
108 == types.LimitExceededBehavior.ERROR
109 ):
110 # Raising an error means rejecting a message, thus we do not
111 # add anything to the existing load, but we do report the would-be
112 # load if we accepted the message.
113 load_info = self._load_info(
114 message_count=self._message_count + 1,
115 total_bytes=self._total_bytes + message._pb.ByteSize(),
116 )
117 error_msg = "Flow control limits would be exceeded - {}.".format(
118 load_info
119 )
120 raise exceptions.FlowControlLimitError(error_msg)
121
122 assert (
123 self._settings.limit_exceeded_behavior
124 == types.LimitExceededBehavior.BLOCK
125 )
126
127 # Sanity check - if a message exceeds total flow control limits all
128 # by itself, it would block forever, thus raise error.
129 if (
130 message._pb.ByteSize() > self._settings.byte_limit
131 or self._settings.message_limit < 1
132 ):
133 load_info = self._load_info(
134 message_count=1, total_bytes=message._pb.ByteSize()
135 )
136 error_msg = (
137 "Total flow control limits too low for the message, "
138 "would block forever - {}.".format(load_info)
139 )
140 raise exceptions.FlowControlLimitError(error_msg)
141
142 current_thread = threading.current_thread()
143
144 while self._would_overflow(message):
145 if current_thread not in self._waiting:
146 reservation = _QuantityReservation(
147 bytes_reserved=0,
148 bytes_needed=message._pb.ByteSize(),
149 has_slot=False,
150 )
151 self._waiting[current_thread] = reservation # Will be placed last.
152
153 _LOGGER.debug(
154 "Blocking until there is enough free capacity in the flow - "
155 "{}.".format(self._load_info())
156 )
157
158 self._has_capacity.wait()
159
160 _LOGGER.debug(
161 "Woke up from waiting on free capacity in the flow - "
162 "{}.".format(self._load_info())
163 )
164
165 # Message accepted, increase the load and remove thread stats.
166 self._message_count += 1
167 self._total_bytes += message._pb.ByteSize()
168 self._reserved_bytes -= self._waiting[current_thread].bytes_reserved
169 self._reserved_slots -= 1
170 del self._waiting[current_thread]
171
172 def release(self, message: MessageType) -> None:
173 """Release a mesage from flow control.
174
175 Args:
176 message:
177 The message entering the flow control.
178 """
179 if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
180 return
181
182 with self._operational_lock:
183 # Releasing a message decreases the load.
184 self._message_count -= 1
185 self._total_bytes -= message._pb.ByteSize()
186
187 if self._message_count < 0 or self._total_bytes < 0:
188 warnings.warn(
189 "Releasing a message that was never added or already released.",
190 category=RuntimeWarning,
191 stacklevel=2,
192 )
193 self._message_count = max(0, self._message_count)
194 self._total_bytes = max(0, self._total_bytes)
195
196 self._distribute_available_capacity()
197
198 # If at least one thread waiting to add() can be unblocked, wake them up.
199 if self._ready_to_unblock():
200 _LOGGER.debug("Notifying threads waiting to add messages to flow.")
201 self._has_capacity.notify_all()
202
203 def _distribute_available_capacity(self) -> None:
204 """Distribute available capacity among the waiting threads in FIFO order.
205
206 The method assumes that the caller has obtained ``_operational_lock``.
207 """
208 available_slots = (
209 self._settings.message_limit - self._message_count - self._reserved_slots
210 )
211 available_bytes = (
212 self._settings.byte_limit - self._total_bytes - self._reserved_bytes
213 )
214
215 for reservation in self._waiting.values():
216 if available_slots <= 0 and available_bytes <= 0:
217 break # Santa is now empty-handed, better luck next time.
218
219 # Distribute any free slots.
220 if available_slots > 0 and not reservation.has_slot:
221 reservation.has_slot = True
222 self._reserved_slots += 1
223 available_slots -= 1
224
225 # Distribute any free bytes.
226 if available_bytes <= 0:
227 continue
228
229 bytes_still_needed = reservation.bytes_needed - reservation.bytes_reserved
230
231 if bytes_still_needed < 0: # Sanity check for any internal inconsistencies.
232 msg = "Too many bytes reserved: {} / {}".format(
233 reservation.bytes_reserved, reservation.bytes_needed
234 )
235 warnings.warn(msg, category=RuntimeWarning)
236 bytes_still_needed = 0
237
238 can_give = min(bytes_still_needed, available_bytes)
239 reservation.bytes_reserved += can_give
240 self._reserved_bytes += can_give
241 available_bytes -= can_give
242
243 def _ready_to_unblock(self) -> bool:
244 """Determine if any of the threads waiting to add a message can proceed.
245
246 The method assumes that the caller has obtained ``_operational_lock``.
247 """
248 if self._waiting:
249 # It's enough to only check the head of the queue, because FIFO
250 # distribution of any free capacity.
251 first_reservation = next(iter(self._waiting.values()))
252 return (
253 first_reservation.bytes_reserved >= first_reservation.bytes_needed
254 and first_reservation.has_slot
255 )
256
257 return False
258
259 def _would_overflow(self, message: MessageType) -> bool:
260 """Determine if accepting a message would exceed flow control limits.
261
262 The method assumes that the caller has obtained ``_operational_lock``.
263
264 Args:
265 message: The message entering the flow control.
266 """
267 reservation = self._waiting.get(threading.current_thread())
268
269 if reservation:
270 enough_reserved = reservation.bytes_reserved >= reservation.bytes_needed
271 has_slot = reservation.has_slot
272 else:
273 enough_reserved = False
274 has_slot = False
275
276 bytes_taken = self._total_bytes + self._reserved_bytes + message._pb.ByteSize()
277 size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
278
279 msg_count_overflow = not has_slot and (
280 (self._message_count + self._reserved_slots + 1)
281 > self._settings.message_limit
282 )
283
284 return size_overflow or msg_count_overflow
285
286 def _load_info(
287 self, message_count: Optional[int] = None, total_bytes: Optional[int] = None
288 ) -> str:
289 """Return the current flow control load information.
290
291 The caller can optionally adjust some of the values to fit its reporting
292 needs.
293
294 The method assumes that the caller has obtained ``_operational_lock``.
295
296 Args:
297 message_count:
298 The value to override the current message count with.
299 total_bytes:
300 The value to override the current total bytes with.
301 """
302 if message_count is None:
303 message_count = self._message_count
304
305 if total_bytes is None:
306 total_bytes = self._total_bytes
307
308 return (
309 f"messages: {message_count} / {self._settings.message_limit} "
310 f"(reserved: {self._reserved_slots}), "
311 f"bytes: {total_bytes} / {self._settings.byte_limit} "
312 f"(reserved: {self._reserved_bytes})"
313 )