Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py: 33%

105 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import copy 

18import logging 

19import random 

20import threading 

21import time 

22import typing 

23from typing import Dict, Iterable, Optional, Union 

24 

25from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY 

26 

27try: 

28 from collections.abc import KeysView 

29 

30 KeysView[None] # KeysView is only subscriptable in Python 3.9+ 

31except TypeError: 

32 # Deprecated since Python 3.9, thus only use as a fallback in older Python versions 

33 from typing import KeysView 

34 

35from google.cloud.pubsub_v1.subscriber._protocol import requests 

36 

37if typing.TYPE_CHECKING: # pragma: NO COVER 

38 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

39 StreamingPullManager, 

40 ) 

41 

42 

43_LOGGER = logging.getLogger(__name__) 

44_LEASE_WORKER_NAME = "Thread-LeaseMaintainer" 

45 

46 

47class _LeasedMessage(typing.NamedTuple): 

48 sent_time: float 

49 """The local time when ACK ID was initially leased in seconds since the epoch.""" 

50 

51 size: int 

52 ordering_key: Optional[str] 

53 

54 

55class Leaser(object): 

56 def __init__(self, manager: "StreamingPullManager"): 

57 self._thread: Optional[threading.Thread] = None 

58 self._manager = manager 

59 

60 # a lock used for start/stop operations, protecting the _thread attribute 

61 self._operational_lock = threading.Lock() 

62 

63 # A lock ensuring that add/remove operations are atomic and cannot be 

64 # intertwined. Protects the _leased_messages and _bytes attributes. 

65 self._add_remove_lock = threading.Lock() 

66 

67 # Dict of ack_id -> _LeasedMessage 

68 self._leased_messages: Dict[str, _LeasedMessage] = {} 

69 

70 self._bytes = 0 

71 """The total number of bytes consumed by leased messages.""" 

72 

73 self._stop_event = threading.Event() 

74 

75 @property 

76 def message_count(self) -> int: 

77 """The number of leased messages.""" 

78 return len(self._leased_messages) 

79 

80 @property 

81 def ack_ids(self) -> KeysView[str]: 

82 """The ack IDs of all leased messages.""" 

83 return self._leased_messages.keys() 

84 

85 @property 

86 def bytes(self) -> int: 

87 """The total size, in bytes, of all leased messages.""" 

88 return self._bytes 

89 

90 def add(self, items: Iterable[requests.LeaseRequest]) -> None: 

91 """Add messages to be managed by the leaser.""" 

92 with self._add_remove_lock: 

93 for item in items: 

94 # Add the ack ID to the set of managed ack IDs, and increment 

95 # the size counter. 

96 if item.ack_id not in self._leased_messages: 

97 self._leased_messages[item.ack_id] = _LeasedMessage( 

98 sent_time=float("inf"), 

99 size=item.byte_size, 

100 ordering_key=item.ordering_key, 

101 ) 

102 self._bytes += item.byte_size 

103 else: 

104 _LOGGER.debug("Message %s is already lease managed", item.ack_id) 

105 

106 def start_lease_expiry_timer(self, ack_ids: Iterable[str]) -> None: 

107 """Start the lease expiry timer for `items`. 

108 

109 Args: 

110 items: Sequence of ack-ids for which to start lease expiry timers. 

111 """ 

112 with self._add_remove_lock: 

113 for ack_id in ack_ids: 

114 lease_info = self._leased_messages.get(ack_id) 

115 # Lease info might not exist for this ack_id because it has already 

116 # been removed by remove(). 

117 if lease_info: 

118 self._leased_messages[ack_id] = lease_info._replace( 

119 sent_time=time.time() 

120 ) 

121 

122 def remove( 

123 self, 

124 items: Iterable[ 

125 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] 

126 ], 

127 ) -> None: 

128 """Remove messages from lease management.""" 

129 with self._add_remove_lock: 

130 # Remove the ack ID from lease management, and decrement the 

131 # byte counter. 

132 for item in items: 

133 if self._leased_messages.pop(item.ack_id, None) is not None: 

134 self._bytes -= item.byte_size 

135 else: 

136 _LOGGER.debug("Item %s was not managed.", item.ack_id) 

137 

138 if self._bytes < 0: 

139 _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) 

140 self._bytes = 0 

141 

142 def maintain_leases(self) -> None: 

143 """Maintain all of the leases being managed. 

144 

145 This method modifies the ack deadline for all of the managed 

146 ack IDs, then waits for most of that time (but with jitter), and 

147 repeats. 

148 """ 

149 while not self._stop_event.is_set(): 

150 # Determine the appropriate duration for the lease. This is 

151 # based off of how long previous messages have taken to ack, with 

152 # a sensible default and within the ranges allowed by Pub/Sub. 

153 # Also update the deadline currently used if enough new ACK data has been 

154 # gathered since the last deadline update. 

155 deadline = self._manager._obtain_ack_deadline(maybe_update=True) 

156 _LOGGER.debug("The current deadline value is %d seconds.", deadline) 

157 

158 # Make a copy of the leased messages. This is needed because it's 

159 # possible for another thread to modify the dictionary while 

160 # we're iterating over it. 

161 leased_messages = copy.copy(self._leased_messages) 

162 

163 # Drop any leases that are beyond the max lease time. This ensures 

164 # that in the event of a badly behaving actor, we can drop messages 

165 # and allow the Pub/Sub server to resend them. 

166 cutoff = time.time() - self._manager.flow_control.max_lease_duration 

167 to_drop = [ 

168 requests.DropRequest(ack_id, item.size, item.ordering_key) 

169 for ack_id, item in leased_messages.items() 

170 if item.sent_time < cutoff 

171 ] 

172 

173 if to_drop: 

174 _LOGGER.warning( 

175 "Dropping %s items because they were leased too long.", len(to_drop) 

176 ) 

177 assert self._manager.dispatcher is not None 

178 self._manager.dispatcher.drop(to_drop) 

179 

180 # Remove dropped items from our copy of the leased messages (they 

181 # have already been removed from the real one by 

182 # self._manager.drop(), which calls self.remove()). 

183 for item in to_drop: 

184 leased_messages.pop(item.ack_id) 

185 

186 # Create a modack request. 

187 # We do not actually call `modify_ack_deadline` over and over 

188 # because it is more efficient to make a single request. 

189 ack_ids = leased_messages.keys() 

190 expired_ack_ids = set() 

191 if ack_ids: 

192 _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids)) 

193 

194 # NOTE: This may not work as expected if ``consumer.active`` 

195 # has changed since we checked it. An implementation 

196 # without any sort of race condition would require a 

197 # way for ``send_request`` to fail when the consumer 

198 # is inactive. 

199 assert self._manager.dispatcher is not None 

200 ack_id_gen = (ack_id for ack_id in ack_ids) 

201 expired_ack_ids = self._manager._send_lease_modacks( 

202 ack_id_gen, deadline 

203 ) 

204 

205 start_time = time.time() 

206 # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. 

207 if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): 

208 assert self._manager.dispatcher is not None 

209 self._manager.dispatcher.drop( 

210 [ 

211 requests.DropRequest( 

212 ack_id, 

213 leased_messages.get(ack_id).size, # type: ignore 

214 leased_messages.get(ack_id).ordering_key, # type: ignore 

215 ) 

216 for ack_id in expired_ack_ids 

217 if ack_id in leased_messages 

218 ] 

219 ) 

220 # Now wait an appropriate period of time and do this again. 

221 # 

222 # We determine the appropriate period of time based on a random 

223 # period between: 

224 # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch) 

225 # maximum: 90% of the deadline 

226 # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. 

227 # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases 

228 # where there are many clients. 

229 # If we spent any time iterating over expired acks, we should subtract this from the deadline. 

230 snooze = random.uniform( 

231 _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time)) 

232 ) 

233 _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) 

234 self._stop_event.wait(timeout=snooze) 

235 

236 _LOGGER.debug("%s exiting.", _LEASE_WORKER_NAME) 

237 

238 def start(self) -> None: 

239 with self._operational_lock: 

240 if self._thread is not None: 

241 raise ValueError("Leaser is already running.") 

242 

243 # Create and start the helper thread. 

244 self._stop_event.clear() 

245 thread = threading.Thread( 

246 name=_LEASE_WORKER_NAME, target=self.maintain_leases 

247 ) 

248 thread.daemon = True 

249 thread.start() 

250 _LOGGER.debug("Started helper thread %s", thread.name) 

251 self._thread = thread 

252 

253 def stop(self) -> None: 

254 with self._operational_lock: 

255 self._stop_event.set() 

256 

257 if self._thread is not None: 

258 # The thread should automatically exit when the consumer is 

259 # inactive. 

260 self._thread.join() 

261 

262 self._thread = None