Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

125 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import copy 

18import logging 

19import random 

20import threading 

21import time 

22import typing 

23from typing import Dict, Iterable, Optional, Union 

24 

25from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY 

26from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

27 SubscribeOpenTelemetry, 

28) 

29 

30try: 

31 from collections.abc import KeysView 

32 

33 KeysView[None] # KeysView is only subscriptable in Python 3.9+ 

34except TypeError: 

35 # Deprecated since Python 3.9, thus only use as a fallback in older Python versions 

36 from typing import KeysView 

37 

38from google.cloud.pubsub_v1.subscriber._protocol import requests 

39 

40if typing.TYPE_CHECKING: # pragma: NO COVER 

41 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

42 StreamingPullManager, 

43 ) 

44 

45 

46_LOGGER = logging.getLogger(__name__) 

47_LEASE_WORKER_NAME = "Thread-LeaseMaintainer" 

48 

49 

50class _LeasedMessage(typing.NamedTuple): 

51 sent_time: float 

52 """The local time when ACK ID was initially leased in seconds since the epoch.""" 

53 

54 size: int 

55 ordering_key: Optional[str] 

56 opentelemetry_data: Optional[SubscribeOpenTelemetry] 

57 

58 

59class Leaser(object): 

60 def __init__(self, manager: "StreamingPullManager"): 

61 self._thread: Optional[threading.Thread] = None 

62 self._manager = manager 

63 

64 # a lock used for start/stop operations, protecting the _thread attribute 

65 self._operational_lock = threading.Lock() 

66 

67 # A lock ensuring that add/remove operations are atomic and cannot be 

68 # intertwined. Protects the _leased_messages and _bytes attributes. 

69 self._add_remove_lock = threading.Lock() 

70 

71 # Dict of ack_id -> _LeasedMessage 

72 self._leased_messages: Dict[str, _LeasedMessage] = {} 

73 

74 self._bytes = 0 

75 """The total number of bytes consumed by leased messages.""" 

76 

77 self._stop_event = threading.Event() 

78 

79 @property 

80 def message_count(self) -> int: 

81 """The number of leased messages.""" 

82 return len(self._leased_messages) 

83 

84 @property 

85 def ack_ids(self) -> KeysView[str]: 

86 """The ack IDs of all leased messages.""" 

87 return self._leased_messages.keys() 

88 

89 @property 

90 def bytes(self) -> int: 

91 """The total size, in bytes, of all leased messages.""" 

92 return self._bytes 

93 

94 def add(self, items: Iterable[requests.LeaseRequest]) -> None: 

95 """Add messages to be managed by the leaser.""" 

96 with self._add_remove_lock: 

97 for item in items: 

98 # Add the ack ID to the set of managed ack IDs, and increment 

99 # the size counter. 

100 if item.ack_id not in self._leased_messages: 

101 self._leased_messages[item.ack_id] = _LeasedMessage( 

102 sent_time=float("inf"), 

103 size=item.byte_size, 

104 ordering_key=item.ordering_key, 

105 opentelemetry_data=item.opentelemetry_data, 

106 ) 

107 self._bytes += item.byte_size 

108 else: 

109 _LOGGER.debug("Message %s is already lease managed", item.ack_id) 

110 

111 def start_lease_expiry_timer(self, ack_ids: Iterable[str]) -> None: 

112 """Start the lease expiry timer for `items`. 

113 

114 Args: 

115 items: Sequence of ack-ids for which to start lease expiry timers. 

116 """ 

117 with self._add_remove_lock: 

118 for ack_id in ack_ids: 

119 lease_info = self._leased_messages.get(ack_id) 

120 # Lease info might not exist for this ack_id because it has already 

121 # been removed by remove(). 

122 if lease_info: 

123 self._leased_messages[ack_id] = lease_info._replace( 

124 sent_time=time.time() 

125 ) 

126 

127 def remove( 

128 self, 

129 items: Iterable[ 

130 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] 

131 ], 

132 ) -> None: 

133 """Remove messages from lease management.""" 

134 with self._add_remove_lock: 

135 # Remove the ack ID from lease management, and decrement the 

136 # byte counter. 

137 for item in items: 

138 if self._leased_messages.pop(item.ack_id, None) is not None: 

139 self._bytes -= item.byte_size 

140 else: 

141 _LOGGER.debug("Item %s was not managed.", item.ack_id) 

142 

143 if self._bytes < 0: 

144 _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) 

145 self._bytes = 0 

146 

147 def maintain_leases(self) -> None: 

148 """Maintain all of the leases being managed. 

149 

150 This method modifies the ack deadline for all of the managed 

151 ack IDs, then waits for most of that time (but with jitter), and 

152 repeats. 

153 """ 

154 while not self._stop_event.is_set(): 

155 # Determine the appropriate duration for the lease. This is 

156 # based off of how long previous messages have taken to ack, with 

157 # a sensible default and within the ranges allowed by Pub/Sub. 

158 # Also update the deadline currently used if enough new ACK data has been 

159 # gathered since the last deadline update. 

160 deadline = self._manager._obtain_ack_deadline(maybe_update=True) 

161 _LOGGER.debug("The current deadline value is %d seconds.", deadline) 

162 

163 # Make a copy of the leased messages. This is needed because it's 

164 # possible for another thread to modify the dictionary while 

165 # we're iterating over it. 

166 leased_messages = copy.copy(self._leased_messages) 

167 

168 # Drop any leases that are beyond the max lease time. This ensures 

169 # that in the event of a badly behaving actor, we can drop messages 

170 # and allow the Pub/Sub server to resend them. 

171 cutoff = time.time() - self._manager.flow_control.max_lease_duration 

172 to_drop = [ 

173 requests.DropRequest(ack_id, item.size, item.ordering_key) 

174 for ack_id, item in leased_messages.items() 

175 if item.sent_time < cutoff 

176 ] 

177 

178 if to_drop: 

179 _LOGGER.warning( 

180 "Dropping %s items because they were leased too long.", len(to_drop) 

181 ) 

182 assert self._manager.dispatcher is not None 

183 for drop_msg in to_drop: 

184 leased_message = leased_messages.get(drop_msg.ack_id) 

185 if leased_message and leased_message.opentelemetry_data: 

186 leased_message.opentelemetry_data.add_process_span_event( 

187 "expired" 

188 ) 

189 leased_message.opentelemetry_data.end_process_span() 

190 leased_message.opentelemetry_data.set_subscribe_span_result( 

191 "expired" 

192 ) 

193 leased_message.opentelemetry_data.end_subscribe_span() 

194 self._manager.dispatcher.drop(to_drop) 

195 

196 # Remove dropped items from our copy of the leased messages (they 

197 # have already been removed from the real one by 

198 # self._manager.drop(), which calls self.remove()). 

199 for item in to_drop: 

200 leased_messages.pop(item.ack_id) 

201 

202 # Create a modack request. 

203 # We do not actually call `modify_ack_deadline` over and over 

204 # because it is more efficient to make a single request. 

205 ack_ids = leased_messages.keys() 

206 expired_ack_ids = set() 

207 if ack_ids: 

208 _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids)) 

209 

210 # NOTE: This may not work as expected if ``consumer.active`` 

211 # has changed since we checked it. An implementation 

212 # without any sort of race condition would require a 

213 # way for ``send_request`` to fail when the consumer 

214 # is inactive. 

215 assert self._manager.dispatcher is not None 

216 ack_id_gen = (ack_id for ack_id in ack_ids) 

217 opentelemetry_data = [ 

218 message.opentelemetry_data 

219 for message in list(leased_messages.values()) 

220 if message.opentelemetry_data 

221 ] 

222 expired_ack_ids = self._manager._send_lease_modacks( 

223 ack_id_gen, 

224 deadline, 

225 opentelemetry_data, 

226 ) 

227 

228 start_time = time.time() 

229 # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. 

230 if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): 

231 assert self._manager.dispatcher is not None 

232 for ack_id in expired_ack_ids: 

233 msg = leased_messages.get(ack_id) 

234 if msg and msg.opentelemetry_data: 

235 msg.opentelemetry_data.add_process_span_event("expired") 

236 msg.opentelemetry_data.end_process_span() 

237 msg.opentelemetry_data.set_subscribe_span_result("expired") 

238 msg.opentelemetry_data.end_subscribe_span() 

239 self._manager.dispatcher.drop( 

240 [ 

241 requests.DropRequest( 

242 ack_id, 

243 leased_messages.get(ack_id).size, # type: ignore 

244 leased_messages.get(ack_id).ordering_key, # type: ignore 

245 ) 

246 for ack_id in expired_ack_ids 

247 if ack_id in leased_messages 

248 ] 

249 ) 

250 # Now wait an appropriate period of time and do this again. 

251 # 

252 # We determine the appropriate period of time based on a random 

253 # period between: 

254 # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch) 

255 # maximum: 90% of the deadline 

256 # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. 

257 # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases 

258 # where there are many clients. 

259 # If we spent any time iterating over expired acks, we should subtract this from the deadline. 

260 snooze = random.uniform( 

261 _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time)) 

262 ) 

263 _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) 

264 self._stop_event.wait(timeout=snooze) 

265 

266 _LOGGER.debug("%s exiting.", _LEASE_WORKER_NAME) 

267 

268 def start(self) -> None: 

269 with self._operational_lock: 

270 if self._thread is not None: 

271 raise ValueError("Leaser is already running.") 

272 

273 # Create and start the helper thread. 

274 self._stop_event.clear() 

275 thread = threading.Thread( 

276 name=_LEASE_WORKER_NAME, target=self.maintain_leases 

277 ) 

278 thread.daemon = True 

279 thread.start() 

280 _LOGGER.debug("Started helper thread %s", thread.name) 

281 self._thread = thread 

282 

283 def stop(self) -> None: 

284 with self._operational_lock: 

285 self._stop_event.set() 

286 

287 if self._thread is not None: 

288 # The thread should automatically exit when the consumer is 

289 # inactive. 

290 self._thread.join() 

291 

292 self._thread = None