Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py: 23%

160 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16from __future__ import division 

17 

18import functools 

19import itertools 

20import logging 

21import math 

22import time 

23import threading 

24import typing 

25from typing import List, Optional, Sequence, Union 

26import warnings 

27from google.api_core.retry import exponential_sleep_generator 

28 

29from google.cloud.pubsub_v1.subscriber._protocol import helper_threads 

30from google.cloud.pubsub_v1.subscriber._protocol import requests 

31from google.cloud.pubsub_v1.subscriber.exceptions import ( 

32 AcknowledgeStatus, 

33) 

34 

35if typing.TYPE_CHECKING: # pragma: NO COVER 

36 import queue 

37 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

38 StreamingPullManager, 

39 ) 

40 

41 

42RequestItem = Union[ 

43 requests.AckRequest, 

44 requests.DropRequest, 

45 requests.LeaseRequest, 

46 requests.ModAckRequest, 

47 requests.NackRequest, 

48] 

49 

50 

51_LOGGER = logging.getLogger(__name__) 

52_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher" 

53 

54 

55_MAX_BATCH_SIZE = 100 

56"""The maximum number of requests to process and dispatch at a time.""" 

57 

58_MAX_BATCH_LATENCY = 0.01 

59"""The maximum amount of time in seconds to wait for additional request items 

60before processing the next batch of requests.""" 

61 

62_ACK_IDS_BATCH_SIZE = 1000 

63"""The maximum number of ACK IDs to send in a single StreamingPullRequest. 

64""" 

65 

66_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1 

67"""The time to wait for the first retry of failed acks and modacks when exactly-once 

68delivery is enabled.""" 

69 

70_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60 

71"""The maximum amount of time in seconds to retry failed acks and modacks when 

72exactly-once delivery is enabled.""" 

73 

74 

75class Dispatcher(object): 

76 def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): 

77 self._manager = manager 

78 self._queue = queue 

79 self._thread: Optional[threading.Thread] = None 

80 self._operational_lock = threading.Lock() 

81 

82 def start(self) -> None: 

83 """Start a thread to dispatch requests queued up by callbacks. 

84 

85 Spawns a thread to run :meth:`dispatch_callback`. 

86 """ 

87 with self._operational_lock: 

88 if self._thread is not None: 

89 raise ValueError("Dispatcher is already running.") 

90 

91 worker = helper_threads.QueueCallbackWorker( 

92 self._queue, 

93 self.dispatch_callback, 

94 max_items=_MAX_BATCH_SIZE, 

95 max_latency=_MAX_BATCH_LATENCY, 

96 ) 

97 # Create and start the helper thread. 

98 thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker) 

99 thread.daemon = True 

100 thread.start() 

101 _LOGGER.debug("Started helper thread %s", thread.name) 

102 self._thread = thread 

103 

104 def stop(self) -> None: 

105 with self._operational_lock: 

106 if self._thread is not None: 

107 # Signal the worker to stop by queueing a "poison pill" 

108 self._queue.put(helper_threads.STOP) 

109 self._thread.join() 

110 

111 self._thread = None 

112 

113 def dispatch_callback(self, items: Sequence[RequestItem]) -> None: 

114 """Map the callback request to the appropriate gRPC request. 

115 

116 Args: 

117 items: 

118 Queued requests to dispatch. 

119 """ 

120 lease_requests: List[requests.LeaseRequest] = [] 

121 modack_requests: List[requests.ModAckRequest] = [] 

122 ack_requests: List[requests.AckRequest] = [] 

123 nack_requests: List[requests.NackRequest] = [] 

124 drop_requests: List[requests.DropRequest] = [] 

125 

126 lease_ids = set() 

127 modack_ids = set() 

128 ack_ids = set() 

129 nack_ids = set() 

130 drop_ids = set() 

131 exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled() 

132 

133 for item in items: 

134 if isinstance(item, requests.LeaseRequest): 

135 if ( 

136 item.ack_id not in lease_ids 

137 ): # LeaseRequests have no futures to handle. 

138 lease_ids.add(item.ack_id) 

139 lease_requests.append(item) 

140 elif isinstance(item, requests.ModAckRequest): 

141 if item.ack_id in modack_ids: 

142 self._handle_duplicate_request_future( 

143 exactly_once_delivery_enabled, item 

144 ) 

145 else: 

146 modack_ids.add(item.ack_id) 

147 modack_requests.append(item) 

148 elif isinstance(item, requests.AckRequest): 

149 if item.ack_id in ack_ids: 

150 self._handle_duplicate_request_future( 

151 exactly_once_delivery_enabled, item 

152 ) 

153 else: 

154 ack_ids.add(item.ack_id) 

155 ack_requests.append(item) 

156 elif isinstance(item, requests.NackRequest): 

157 if item.ack_id in nack_ids: 

158 self._handle_duplicate_request_future( 

159 exactly_once_delivery_enabled, item 

160 ) 

161 else: 

162 nack_ids.add(item.ack_id) 

163 nack_requests.append(item) 

164 elif isinstance(item, requests.DropRequest): 

165 if ( 

166 item.ack_id not in drop_ids 

167 ): # DropRequests have no futures to handle. 

168 drop_ids.add(item.ack_id) 

169 drop_requests.append(item) 

170 else: 

171 warnings.warn( 

172 f'Skipping unknown request item of type "{type(item)}"', 

173 category=RuntimeWarning, 

174 ) 

175 

176 _LOGGER.debug("Handling %d batched requests", len(items)) 

177 

178 if lease_requests: 

179 self.lease(lease_requests) 

180 

181 if modack_requests: 

182 self.modify_ack_deadline(modack_requests) 

183 

184 # Note: Drop and ack *must* be after lease. It's possible to get both 

185 # the lease and the ack/drop request in the same batch. 

186 if ack_requests: 

187 self.ack(ack_requests) 

188 

189 if nack_requests: 

190 self.nack(nack_requests) 

191 

192 if drop_requests: 

193 self.drop(drop_requests) 

194 

195 def _handle_duplicate_request_future( 

196 self, 

197 exactly_once_delivery_enabled: bool, 

198 item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest], 

199 ) -> None: 

200 _LOGGER.debug( 

201 "This is a duplicate %s with the same ack_id: %s.", 

202 type(item), 

203 item.ack_id, 

204 ) 

205 if item.future: 

206 if exactly_once_delivery_enabled: 

207 item.future.set_exception( 

208 ValueError(f"Duplicate ack_id for {type(item)}") 

209 ) 

210 # Futures may be present even with exactly-once delivery 

211 # disabled, in transition periods after the setting is changed on 

212 # the subscription. 

213 else: 

214 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

215 # best-effort, so the future should succeed even though this is a duplicate. 

216 item.future.set_result(AcknowledgeStatus.SUCCESS) 

217 

218 def ack(self, items: Sequence[requests.AckRequest]) -> None: 

219 """Acknowledge the given messages. 

220 

221 Args: 

222 items: The items to acknowledge. 

223 """ 

224 # If we got timing information, add it to the histogram. 

225 for item in items: 

226 time_to_ack = item.time_to_ack 

227 if time_to_ack is not None: 

228 self._manager.ack_histogram.add(time_to_ack) 

229 

230 # We must potentially split the request into multiple smaller requests 

231 # to avoid the server-side max request size limit. 

232 items_gen = iter(items) 

233 ack_ids_gen = (item.ack_id for item in items) 

234 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) 

235 

236 for _ in range(total_chunks): 

237 ack_reqs_dict = { 

238 req.ack_id: req 

239 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) 

240 } 

241 requests_completed, requests_to_retry = self._manager.send_unary_ack( 

242 ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), 

243 ack_reqs_dict=ack_reqs_dict, 

244 ) 

245 

246 # Remove the completed messages from lease management. 

247 self.drop(requests_completed) 

248 

249 # Retry on a separate thread so the dispatcher thread isn't blocked 

250 # by sleeps. 

251 if requests_to_retry: 

252 self._start_retry_thread( 

253 "Thread-RetryAcks", 

254 functools.partial(self._retry_acks, requests_to_retry), 

255 ) 

256 

257 def _start_retry_thread(self, thread_name, thread_target): 

258 # note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue. 

259 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

260 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

261 retry_thread = threading.Thread( 

262 name=thread_name, 

263 target=thread_target, 

264 daemon=True, 

265 ) 

266 # The thread finishes when the requests succeed or eventually fail with 

267 # a back-end timeout error or other permanent failure. 

268 retry_thread.start() 

269 

270 def _retry_acks(self, requests_to_retry): 

271 retry_delay_gen = exponential_sleep_generator( 

272 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

273 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

274 ) 

275 while requests_to_retry: 

276 time_to_wait = next(retry_delay_gen) 

277 _LOGGER.debug( 

278 "Retrying {len(requests_to_retry)} ack(s) after delay of " 

279 + str(time_to_wait) 

280 + " seconds" 

281 ) 

282 time.sleep(time_to_wait) 

283 

284 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} 

285 requests_completed, requests_to_retry = self._manager.send_unary_ack( 

286 ack_ids=[req.ack_id for req in requests_to_retry], 

287 ack_reqs_dict=ack_reqs_dict, 

288 ) 

289 assert ( 

290 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE 

291 ), "Too many requests to be retried." 

292 # Remove the completed messages from lease management. 

293 self.drop(requests_completed) 

294 

295 def drop( 

296 self, 

297 items: Sequence[ 

298 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] 

299 ], 

300 ) -> None: 

301 """Remove the given messages from lease management. 

302 

303 Args: 

304 items: The items to drop. 

305 """ 

306 assert self._manager.leaser is not None 

307 self._manager.leaser.remove(items) 

308 ordering_keys = (k.ordering_key for k in items if k.ordering_key) 

309 self._manager.activate_ordering_keys(ordering_keys) 

310 self._manager.maybe_resume_consumer() 

311 

312 def lease(self, items: Sequence[requests.LeaseRequest]) -> None: 

313 """Add the given messages to lease management. 

314 

315 Args: 

316 items: The items to lease. 

317 """ 

318 assert self._manager.leaser is not None 

319 self._manager.leaser.add(items) 

320 self._manager.maybe_pause_consumer() 

321 

322 def modify_ack_deadline( 

323 self, 

324 items: Sequence[requests.ModAckRequest], 

325 default_deadline: Optional[float] = None, 

326 ) -> None: 

327 """Modify the ack deadline for the given messages. 

328 

329 Args: 

330 items: The items to modify. 

331 """ 

332 # We must potentially split the request into multiple smaller requests 

333 # to avoid the server-side max request size limit. 

334 items_gen = iter(items) 

335 ack_ids_gen = (item.ack_id for item in items) 

336 deadline_seconds_gen = (item.seconds for item in items) 

337 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) 

338 

339 for _ in range(total_chunks): 

340 ack_reqs_dict = { 

341 req.ack_id: req 

342 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) 

343 } 

344 requests_to_retry: List[requests.ModAckRequest] 

345 if default_deadline is None: 

346 # no further work needs to be done for `requests_to_retry` 

347 _, requests_to_retry = self._manager.send_unary_modack( 

348 modify_deadline_ack_ids=list( 

349 itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) 

350 ), 

351 modify_deadline_seconds=list( 

352 itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) 

353 ), 

354 ack_reqs_dict=ack_reqs_dict, 

355 default_deadline=None, 

356 ) 

357 else: 

358 _, requests_to_retry = self._manager.send_unary_modack( 

359 modify_deadline_ack_ids=itertools.islice( 

360 ack_ids_gen, _ACK_IDS_BATCH_SIZE 

361 ), 

362 modify_deadline_seconds=None, 

363 ack_reqs_dict=ack_reqs_dict, 

364 default_deadline=default_deadline, 

365 ) 

366 assert ( 

367 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE 

368 ), "Too many requests to be retried." 

369 

370 # Retry on a separate thread so the dispatcher thread isn't blocked 

371 # by sleeps. 

372 if requests_to_retry: 

373 self._start_retry_thread( 

374 "Thread-RetryModAcks", 

375 functools.partial(self._retry_modacks, requests_to_retry), 

376 ) 

377 

378 def _retry_modacks(self, requests_to_retry): 

379 retry_delay_gen = exponential_sleep_generator( 

380 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

381 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

382 ) 

383 while requests_to_retry: 

384 time_to_wait = next(retry_delay_gen) 

385 _LOGGER.debug( 

386 "Retrying {len(requests_to_retry)} modack(s) after delay of " 

387 + str(time_to_wait) 

388 + " seconds" 

389 ) 

390 time.sleep(time_to_wait) 

391 

392 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} 

393 requests_completed, requests_to_retry = self._manager.send_unary_modack( 

394 modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry], 

395 modify_deadline_seconds=[req.seconds for req in requests_to_retry], 

396 ack_reqs_dict=ack_reqs_dict, 

397 ) 

398 

399 def nack(self, items: Sequence[requests.NackRequest]) -> None: 

400 """Explicitly deny receipt of messages. 

401 

402 Args: 

403 items: The items to deny. 

404 """ 

405 self.modify_ack_deadline( 

406 [ 

407 requests.ModAckRequest( 

408 ack_id=item.ack_id, seconds=0, future=item.future 

409 ) 

410 for item in items 

411 ] 

412 ) 

413 self.drop( 

414 [ 

415 requests.DropRequest( 

416 ack_id=item.ack_id, 

417 byte_size=item.byte_size, 

418 ordering_key=item.ordering_key, 

419 ) 

420 for item in items 

421 ] 

422 )