Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/flow_controller.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1# Copyright 2020, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from collections import OrderedDict 

16import logging 

17import threading 

18from typing import Dict, Optional, Type 

19import warnings 

20 

21from google.cloud.pubsub_v1 import types 

22from google.cloud.pubsub_v1.publisher import exceptions 

23 

24 

25_LOGGER = logging.getLogger(__name__) 

26 

27 

28MessageType = Type[types.PubsubMessage] # type: ignore 

29 

30 

31class _QuantityReservation: 

32 """A (partial) reservation of quantifiable resources.""" 

33 

34 def __init__(self, bytes_reserved: int, bytes_needed: int, has_slot: bool): 

35 self.bytes_reserved = bytes_reserved 

36 self.bytes_needed = bytes_needed 

37 self.has_slot = has_slot 

38 

39 def __repr__(self): 

40 return ( 

41 f"{type(self).__name__}(" 

42 f"bytes_reserved={self.bytes_reserved}, " 

43 f"bytes_needed={self.bytes_needed}, " 

44 f"has_slot={self.has_slot})" 

45 ) 

46 

47 

48class FlowController(object): 

49 """A class used to control the flow of messages passing through it. 

50 

51 Args: 

52 settings: Desired flow control configuration. 

53 """ 

54 

55 def __init__(self, settings: types.PublishFlowControl): 

56 self._settings = settings 

57 

58 # Load statistics. They represent the number of messages added, but not 

59 # yet released (and their total size). 

60 self._message_count = 0 

61 self._total_bytes = 0 

62 

63 # A FIFO queue of threads blocked on adding a message that also tracks their 

64 # reservations of available flow control bytes and message slots. 

65 # Only relevant if the configured limit exceeded behavior is BLOCK. 

66 self._waiting: Dict[threading.Thread, _QuantityReservation] = OrderedDict() 

67 

68 self._reserved_bytes = 0 

69 self._reserved_slots = 0 

70 

71 # The lock is used to protect all internal state (message and byte count, 

72 # waiting threads to add, etc.). 

73 self._operational_lock = threading.Lock() 

74 

75 # The condition for blocking the flow if capacity is exceeded. 

76 self._has_capacity = threading.Condition(lock=self._operational_lock) 

77 

78 def add(self, message: MessageType) -> None: 

79 """Add a message to flow control. 

80 

81 Adding a message updates the internal load statistics, and an action is 

82 taken if these limits are exceeded (depending on the flow control settings). 

83 

84 Args: 

85 message: 

86 The message entering the flow control. 

87 

88 Raises: 

89 :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`: 

90 Raised when the desired action is 

91 :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and 

92 the message would exceed flow control limits, or when the desired action 

93 is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and 

94 the message would block forever against the flow control limits. 

95 """ 

96 if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: 

97 return 

98 

99 with self._operational_lock: 

100 if not self._would_overflow(message): 

101 self._message_count += 1 

102 self._total_bytes += message._pb.ByteSize() 

103 return 

104 

105 # Adding a message would overflow, react. 

106 if ( 

107 self._settings.limit_exceeded_behavior 

108 == types.LimitExceededBehavior.ERROR 

109 ): 

110 # Raising an error means rejecting a message, thus we do not 

111 # add anything to the existing load, but we do report the would-be 

112 # load if we accepted the message. 

113 load_info = self._load_info( 

114 message_count=self._message_count + 1, 

115 total_bytes=self._total_bytes + message._pb.ByteSize(), 

116 ) 

117 error_msg = "Flow control limits would be exceeded - {}.".format( 

118 load_info 

119 ) 

120 raise exceptions.FlowControlLimitError(error_msg) 

121 

122 assert ( 

123 self._settings.limit_exceeded_behavior 

124 == types.LimitExceededBehavior.BLOCK 

125 ) 

126 

127 # Sanity check - if a message exceeds total flow control limits all 

128 # by itself, it would block forever, thus raise error. 

129 if ( 

130 message._pb.ByteSize() > self._settings.byte_limit 

131 or self._settings.message_limit < 1 

132 ): 

133 load_info = self._load_info( 

134 message_count=1, total_bytes=message._pb.ByteSize() 

135 ) 

136 error_msg = ( 

137 "Total flow control limits too low for the message, " 

138 "would block forever - {}.".format(load_info) 

139 ) 

140 raise exceptions.FlowControlLimitError(error_msg) 

141 

142 current_thread = threading.current_thread() 

143 

144 while self._would_overflow(message): 

145 if current_thread not in self._waiting: 

146 reservation = _QuantityReservation( 

147 bytes_reserved=0, 

148 bytes_needed=message._pb.ByteSize(), 

149 has_slot=False, 

150 ) 

151 self._waiting[current_thread] = reservation # Will be placed last. 

152 

153 _LOGGER.debug( 

154 "Blocking until there is enough free capacity in the flow - " 

155 "{}.".format(self._load_info()) 

156 ) 

157 

158 self._has_capacity.wait() 

159 

160 _LOGGER.debug( 

161 "Woke up from waiting on free capacity in the flow - " 

162 "{}.".format(self._load_info()) 

163 ) 

164 

165 # Message accepted, increase the load and remove thread stats. 

166 self._message_count += 1 

167 self._total_bytes += message._pb.ByteSize() 

168 self._reserved_bytes -= self._waiting[current_thread].bytes_reserved 

169 self._reserved_slots -= 1 

170 del self._waiting[current_thread] 

171 

172 def release(self, message: MessageType) -> None: 

173 """Release a mesage from flow control. 

174 

175 Args: 

176 message: 

177 The message entering the flow control. 

178 """ 

179 if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: 

180 return 

181 

182 with self._operational_lock: 

183 # Releasing a message decreases the load. 

184 self._message_count -= 1 

185 self._total_bytes -= message._pb.ByteSize() 

186 

187 if self._message_count < 0 or self._total_bytes < 0: 

188 warnings.warn( 

189 "Releasing a message that was never added or already released.", 

190 category=RuntimeWarning, 

191 stacklevel=2, 

192 ) 

193 self._message_count = max(0, self._message_count) 

194 self._total_bytes = max(0, self._total_bytes) 

195 

196 self._distribute_available_capacity() 

197 

198 # If at least one thread waiting to add() can be unblocked, wake them up. 

199 if self._ready_to_unblock(): 

200 _LOGGER.debug("Notifying threads waiting to add messages to flow.") 

201 self._has_capacity.notify_all() 

202 

203 def _distribute_available_capacity(self) -> None: 

204 """Distribute available capacity among the waiting threads in FIFO order. 

205 

206 The method assumes that the caller has obtained ``_operational_lock``. 

207 """ 

208 available_slots = ( 

209 self._settings.message_limit - self._message_count - self._reserved_slots 

210 ) 

211 available_bytes = ( 

212 self._settings.byte_limit - self._total_bytes - self._reserved_bytes 

213 ) 

214 

215 for reservation in self._waiting.values(): 

216 if available_slots <= 0 and available_bytes <= 0: 

217 break # Santa is now empty-handed, better luck next time. 

218 

219 # Distribute any free slots. 

220 if available_slots > 0 and not reservation.has_slot: 

221 reservation.has_slot = True 

222 self._reserved_slots += 1 

223 available_slots -= 1 

224 

225 # Distribute any free bytes. 

226 if available_bytes <= 0: 

227 continue 

228 

229 bytes_still_needed = reservation.bytes_needed - reservation.bytes_reserved 

230 

231 if bytes_still_needed < 0: # Sanity check for any internal inconsistencies. 

232 msg = "Too many bytes reserved: {} / {}".format( 

233 reservation.bytes_reserved, reservation.bytes_needed 

234 ) 

235 warnings.warn(msg, category=RuntimeWarning) 

236 bytes_still_needed = 0 

237 

238 can_give = min(bytes_still_needed, available_bytes) 

239 reservation.bytes_reserved += can_give 

240 self._reserved_bytes += can_give 

241 available_bytes -= can_give 

242 

243 def _ready_to_unblock(self) -> bool: 

244 """Determine if any of the threads waiting to add a message can proceed. 

245 

246 The method assumes that the caller has obtained ``_operational_lock``. 

247 """ 

248 if self._waiting: 

249 # It's enough to only check the head of the queue, because FIFO 

250 # distribution of any free capacity. 

251 first_reservation = next(iter(self._waiting.values())) 

252 return ( 

253 first_reservation.bytes_reserved >= first_reservation.bytes_needed 

254 and first_reservation.has_slot 

255 ) 

256 

257 return False 

258 

259 def _would_overflow(self, message: MessageType) -> bool: 

260 """Determine if accepting a message would exceed flow control limits. 

261 

262 The method assumes that the caller has obtained ``_operational_lock``. 

263 

264 Args: 

265 message: The message entering the flow control. 

266 """ 

267 reservation = self._waiting.get(threading.current_thread()) 

268 

269 if reservation: 

270 enough_reserved = reservation.bytes_reserved >= reservation.bytes_needed 

271 has_slot = reservation.has_slot 

272 else: 

273 enough_reserved = False 

274 has_slot = False 

275 

276 bytes_taken = self._total_bytes + self._reserved_bytes + message._pb.ByteSize() 

277 size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved 

278 

279 msg_count_overflow = not has_slot and ( 

280 (self._message_count + self._reserved_slots + 1) 

281 > self._settings.message_limit 

282 ) 

283 

284 return size_overflow or msg_count_overflow 

285 

286 def _load_info( 

287 self, message_count: Optional[int] = None, total_bytes: Optional[int] = None 

288 ) -> str: 

289 """Return the current flow control load information. 

290 

291 The caller can optionally adjust some of the values to fit its reporting 

292 needs. 

293 

294 The method assumes that the caller has obtained ``_operational_lock``. 

295 

296 Args: 

297 message_count: 

298 The value to override the current message count with. 

299 total_bytes: 

300 The value to override the current total bytes with. 

301 """ 

302 if message_count is None: 

303 message_count = self._message_count 

304 

305 if total_bytes is None: 

306 total_bytes = self._total_bytes 

307 

308 return ( 

309 f"messages: {message_count} / {self._settings.message_limit} " 

310 f"(reserved: {self._reserved_slots}), " 

311 f"bytes: {total_bytes} / {self._settings.byte_limit} " 

312 f"(reserved: {self._reserved_bytes})" 

313 )