Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

278 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16from __future__ import division 

17 

18import functools 

19import itertools 

20import logging 

21import math 

22import time 

23import threading 

24import typing 

25from typing import List, Optional, Sequence, Union 

26import warnings 

27from google.api_core.retry import exponential_sleep_generator 

28 

29from opentelemetry import trace 

30 

31from google.cloud.pubsub_v1.subscriber._protocol import helper_threads 

32from google.cloud.pubsub_v1.subscriber._protocol import requests 

33from google.cloud.pubsub_v1.subscriber.exceptions import ( 

34 AcknowledgeStatus, 

35) 

36from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

37 start_ack_span, 

38 start_nack_span, 

39) 

40 

41if typing.TYPE_CHECKING: # pragma: NO COVER 

42 import queue 

43 from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( 

44 StreamingPullManager, 

45 ) 

46 

47 

48RequestItem = Union[ 

49 requests.AckRequest, 

50 requests.DropRequest, 

51 requests.LeaseRequest, 

52 requests.ModAckRequest, 

53 requests.NackRequest, 

54] 

55 

56 

57_LOGGER = logging.getLogger(__name__) 

58_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher" 

59 

60 

61_MAX_BATCH_SIZE = 100 

62"""The maximum number of requests to process and dispatch at a time.""" 

63 

64_MAX_BATCH_LATENCY = 0.01 

65"""The maximum amount of time in seconds to wait for additional request items 

66before processing the next batch of requests.""" 

67 

68_ACK_IDS_BATCH_SIZE = 1000 

69"""The maximum number of ACK IDs to send in a single StreamingPullRequest. 

70""" 

71 

72_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1 

73"""The time to wait for the first retry of failed acks and modacks when exactly-once 

74delivery is enabled.""" 

75 

76_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 10 * 60 

77"""The maximum amount of time in seconds to retry failed acks and modacks when 

78exactly-once delivery is enabled.""" 

79 

80 

81class Dispatcher(object): 

82 def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): 

83 self._manager = manager 

84 self._queue = queue 

85 self._thread: Optional[threading.Thread] = None 

86 self._operational_lock = threading.Lock() 

87 

88 def start(self) -> None: 

89 """Start a thread to dispatch requests queued up by callbacks. 

90 

91 Spawns a thread to run :meth:`dispatch_callback`. 

92 """ 

93 with self._operational_lock: 

94 if self._thread is not None: 

95 raise ValueError("Dispatcher is already running.") 

96 

97 worker = helper_threads.QueueCallbackWorker( 

98 self._queue, 

99 self.dispatch_callback, 

100 max_items=_MAX_BATCH_SIZE, 

101 max_latency=_MAX_BATCH_LATENCY, 

102 ) 

103 # Create and start the helper thread. 

104 thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker) 

105 thread.daemon = True 

106 thread.start() 

107 _LOGGER.debug("Started helper thread %s", thread.name) 

108 self._thread = thread 

109 

110 def stop(self) -> None: 

111 with self._operational_lock: 

112 if self._thread is not None: 

113 # Signal the worker to stop by queueing a "poison pill" 

114 self._queue.put(helper_threads.STOP) 

115 self._thread.join() 

116 

117 self._thread = None 

118 

119 def dispatch_callback(self, items: Sequence[RequestItem]) -> None: 

120 """Map the callback request to the appropriate gRPC request. 

121 

122 Args: 

123 items: 

124 Queued requests to dispatch. 

125 """ 

126 lease_requests: List[requests.LeaseRequest] = [] 

127 modack_requests: List[requests.ModAckRequest] = [] 

128 ack_requests: List[requests.AckRequest] = [] 

129 nack_requests: List[requests.NackRequest] = [] 

130 drop_requests: List[requests.DropRequest] = [] 

131 

132 lease_ids = set() 

133 modack_ids = set() 

134 ack_ids = set() 

135 nack_ids = set() 

136 drop_ids = set() 

137 exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled() 

138 

139 for item in items: 

140 if isinstance(item, requests.LeaseRequest): 

141 if ( 

142 item.ack_id not in lease_ids 

143 ): # LeaseRequests have no futures to handle. 

144 lease_ids.add(item.ack_id) 

145 lease_requests.append(item) 

146 elif isinstance(item, requests.ModAckRequest): 

147 if item.ack_id in modack_ids: 

148 self._handle_duplicate_request_future( 

149 exactly_once_delivery_enabled, item 

150 ) 

151 else: 

152 modack_ids.add(item.ack_id) 

153 modack_requests.append(item) 

154 elif isinstance(item, requests.AckRequest): 

155 if item.ack_id in ack_ids: 

156 self._handle_duplicate_request_future( 

157 exactly_once_delivery_enabled, item 

158 ) 

159 else: 

160 ack_ids.add(item.ack_id) 

161 ack_requests.append(item) 

162 elif isinstance(item, requests.NackRequest): 

163 if item.ack_id in nack_ids: 

164 self._handle_duplicate_request_future( 

165 exactly_once_delivery_enabled, item 

166 ) 

167 else: 

168 nack_ids.add(item.ack_id) 

169 nack_requests.append(item) 

170 elif isinstance(item, requests.DropRequest): 

171 if ( 

172 item.ack_id not in drop_ids 

173 ): # DropRequests have no futures to handle. 

174 drop_ids.add(item.ack_id) 

175 drop_requests.append(item) 

176 else: 

177 warnings.warn( 

178 f'Skipping unknown request item of type "{type(item)}"', 

179 category=RuntimeWarning, 

180 ) 

181 

182 _LOGGER.debug("Handling %d batched requests", len(items)) 

183 

184 if lease_requests: 

185 self.lease(lease_requests) 

186 

187 if modack_requests: 

188 self.modify_ack_deadline(modack_requests) 

189 

190 # Note: Drop and ack *must* be after lease. It's possible to get both 

191 # the lease and the ack/drop request in the same batch. 

192 if ack_requests: 

193 self.ack(ack_requests) 

194 

195 if nack_requests: 

196 self.nack(nack_requests) 

197 

198 if drop_requests: 

199 self.drop(drop_requests) 

200 

201 def _handle_duplicate_request_future( 

202 self, 

203 exactly_once_delivery_enabled: bool, 

204 item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest], 

205 ) -> None: 

206 _LOGGER.debug( 

207 "This is a duplicate %s with the same ack_id: %s.", 

208 type(item), 

209 item.ack_id, 

210 ) 

211 if item.future: 

212 if exactly_once_delivery_enabled: 

213 item.future.set_exception( 

214 ValueError(f"Duplicate ack_id for {type(item)}") 

215 ) 

216 # Futures may be present even with exactly-once delivery 

217 # disabled, in transition periods after the setting is changed on 

218 # the subscription. 

219 else: 

220 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

221 # best-effort, so the future should succeed even though this is a duplicate. 

222 item.future.set_result(AcknowledgeStatus.SUCCESS) 

223 

224 def ack(self, items: Sequence[requests.AckRequest]) -> None: 

225 """Acknowledge the given messages. 

226 

227 Args: 

228 items: The items to acknowledge. 

229 """ 

230 # If we got timing information, add it to the histogram. 

231 for item in items: 

232 time_to_ack = item.time_to_ack 

233 if time_to_ack is not None: 

234 self._manager.ack_histogram.add(time_to_ack) 

235 

236 # We must potentially split the request into multiple smaller requests 

237 # to avoid the server-side max request size limit. 

238 items_gen = iter(items) 

239 ack_ids_gen = (item.ack_id for item in items) 

240 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) 

241 subscription_id: Optional[str] = None 

242 project_id: Optional[str] = None 

243 for item in items: 

244 if item.opentelemetry_data: 

245 item.opentelemetry_data.add_subscribe_span_event("ack start") 

246 if subscription_id is None: 

247 subscription_id = item.opentelemetry_data.subscription_id 

248 if project_id is None: 

249 project_id = item.opentelemetry_data.project_id 

250 

251 for _ in range(total_chunks): 

252 ack_reqs_dict = { 

253 req.ack_id: req 

254 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) 

255 } 

256 

257 subscribe_links: List[trace.Link] = [] 

258 subscribe_spans: List[trace.Span] = [] 

259 for ack_req in ack_reqs_dict.values(): 

260 if ack_req.opentelemetry_data: 

261 subscribe_span: Optional[ 

262 trace.Span 

263 ] = ack_req.opentelemetry_data.subscribe_span 

264 if ( 

265 subscribe_span 

266 and subscribe_span.get_span_context().trace_flags.sampled 

267 ): 

268 subscribe_links.append( 

269 trace.Link(subscribe_span.get_span_context()) 

270 ) 

271 subscribe_spans.append(subscribe_span) 

272 ack_span: Optional[trace.Span] = None 

273 if subscription_id and project_id: 

274 ack_span = start_ack_span( 

275 subscription_id, 

276 len(ack_reqs_dict), 

277 project_id, 

278 subscribe_links, 

279 ) 

280 if ( 

281 ack_span and ack_span.get_span_context().trace_flags.sampled 

282 ): # pragma: NO COVER 

283 ack_span_context: trace.SpanContext = ack_span.get_span_context() 

284 for subscribe_span in subscribe_spans: 

285 subscribe_span.add_link( 

286 context=ack_span_context, 

287 attributes={ 

288 "messaging.operation.name": "ack", 

289 }, 

290 ) 

291 

292 requests_completed, requests_to_retry = self._manager.send_unary_ack( 

293 ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), 

294 ack_reqs_dict=ack_reqs_dict, 

295 ) 

296 if ack_span: 

297 ack_span.end() 

298 

299 for completed_ack in requests_completed: 

300 if completed_ack.opentelemetry_data: 

301 completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") 

302 completed_ack.opentelemetry_data.set_subscribe_span_result("acked") 

303 completed_ack.opentelemetry_data.end_subscribe_span() 

304 

305 # Remove the completed messages from lease management. 

306 self.drop(requests_completed) 

307 

308 # Retry on a separate thread so the dispatcher thread isn't blocked 

309 # by sleeps. 

310 if requests_to_retry: 

311 self._start_retry_thread( 

312 "Thread-RetryAcks", 

313 functools.partial(self._retry_acks, requests_to_retry), 

314 ) 

315 

316 def _start_retry_thread(self, thread_name, thread_target): 

317 # note: if the thread is *not* a daemon, a memory leak exists due to a cpython issue. 

318 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

319 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

320 retry_thread = threading.Thread( 

321 name=thread_name, 

322 target=thread_target, 

323 daemon=True, 

324 ) 

325 # The thread finishes when the requests succeed or eventually fail with 

326 # a back-end timeout error or other permanent failure. 

327 retry_thread.start() 

328 

329 def _retry_acks(self, requests_to_retry: List[requests.AckRequest]): 

330 retry_delay_gen = exponential_sleep_generator( 

331 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

332 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

333 ) 

334 while requests_to_retry: 

335 time_to_wait = next(retry_delay_gen) 

336 _LOGGER.debug( 

337 "Retrying {len(requests_to_retry)} ack(s) after delay of " 

338 + str(time_to_wait) 

339 + " seconds" 

340 ) 

341 time.sleep(time_to_wait) 

342 

343 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} 

344 subscription_id: Optional[str] = None 

345 project_id: Optional[str] = None 

346 subscribe_links: List[trace.Link] = [] 

347 subscribe_spans: List[trace.Span] = [] 

348 for req in requests_to_retry: 

349 if req.opentelemetry_data: 

350 req.opentelemetry_data.add_subscribe_span_event("ack start") 

351 if subscription_id is None: 

352 subscription_id = req.opentelemetry_data.subscription_id 

353 if project_id is None: 

354 project_id = req.opentelemetry_data.project_id 

355 subscribe_span: Optional[ 

356 trace.Span 

357 ] = req.opentelemetry_data.subscribe_span 

358 if ( 

359 subscribe_span 

360 and subscribe_span.get_span_context().trace_flags.sampled 

361 ): 

362 subscribe_links.append( 

363 trace.Link(subscribe_span.get_span_context()) 

364 ) 

365 subscribe_spans.append(subscribe_span) 

366 ack_span: Optional[trace.Span] = None 

367 if subscription_id and project_id: 

368 ack_span = start_ack_span( 

369 subscription_id, 

370 len(ack_reqs_dict), 

371 project_id, 

372 subscribe_links, 

373 ) 

374 if ( 

375 ack_span and ack_span.get_span_context().trace_flags.sampled 

376 ): # pragma: NO COVER 

377 ack_span_context: trace.SpanContext = ack_span.get_span_context() 

378 for subscribe_span in subscribe_spans: 

379 subscribe_span.add_link( 

380 context=ack_span_context, 

381 attributes={ 

382 "messaging.operation.name": "ack", 

383 }, 

384 ) 

385 

386 requests_completed, requests_to_retry = self._manager.send_unary_ack( 

387 ack_ids=[req.ack_id for req in requests_to_retry], 

388 ack_reqs_dict=ack_reqs_dict, 

389 ) 

390 

391 if ack_span: 

392 ack_span.end() 

393 

394 for completed_ack in requests_completed: 

395 if completed_ack.opentelemetry_data: 

396 completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") 

397 completed_ack.opentelemetry_data.set_subscribe_span_result("acked") 

398 completed_ack.opentelemetry_data.end_subscribe_span() 

399 

400 assert ( 

401 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE 

402 ), "Too many requests to be retried." 

403 # Remove the completed messages from lease management. 

404 self.drop(requests_completed) 

405 

406 def drop( 

407 self, 

408 items: Sequence[ 

409 Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] 

410 ], 

411 ) -> None: 

412 """Remove the given messages from lease management. 

413 

414 Args: 

415 items: The items to drop. 

416 """ 

417 assert self._manager.leaser is not None 

418 self._manager.leaser.remove(items) 

419 ordering_keys = (k.ordering_key for k in items if k.ordering_key) 

420 self._manager.activate_ordering_keys(ordering_keys) 

421 self._manager.maybe_resume_consumer() 

422 

423 def lease(self, items: Sequence[requests.LeaseRequest]) -> None: 

424 """Add the given messages to lease management. 

425 

426 Args: 

427 items: The items to lease. 

428 """ 

429 assert self._manager.leaser is not None 

430 self._manager.leaser.add(items) 

431 self._manager.maybe_pause_consumer() 

432 

433 def modify_ack_deadline( 

434 self, 

435 items: Sequence[requests.ModAckRequest], 

436 default_deadline: Optional[float] = None, 

437 ) -> None: 

438 """Modify the ack deadline for the given messages. 

439 

440 Args: 

441 items: The items to modify. 

442 """ 

443 # We must potentially split the request into multiple smaller requests 

444 # to avoid the server-side max request size limit. 

445 items_gen = iter(items) 

446 ack_ids_gen = (item.ack_id for item in items) 

447 deadline_seconds_gen = (item.seconds for item in items) 

448 total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) 

449 

450 subscription_id: Optional[str] = None 

451 project_id: Optional[str] = None 

452 

453 for item in items: 

454 if item.opentelemetry_data: 

455 if math.isclose(item.seconds, 0): 

456 item.opentelemetry_data.add_subscribe_span_event("nack start") 

457 if subscription_id is None: 

458 subscription_id = item.opentelemetry_data.subscription_id 

459 if project_id is None: 

460 project_id = item.opentelemetry_data.project_id 

461 else: 

462 item.opentelemetry_data.add_subscribe_span_event("modack start") 

463 for _ in range(total_chunks): 

464 ack_reqs_dict = { 

465 req.ack_id: req 

466 for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) 

467 } 

468 subscribe_links: List[trace.Link] = [] 

469 subscribe_spans: List[trace.Span] = [] 

470 for ack_req in ack_reqs_dict.values(): 

471 if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0): 

472 subscribe_span: Optional[ 

473 trace.Span 

474 ] = ack_req.opentelemetry_data.subscribe_span 

475 if ( 

476 subscribe_span 

477 and subscribe_span.get_span_context().trace_flags.sampled 

478 ): 

479 subscribe_links.append( 

480 trace.Link(subscribe_span.get_span_context()) 

481 ) 

482 subscribe_spans.append(subscribe_span) 

483 nack_span: Optional[trace.Span] = None 

484 if subscription_id and project_id: 

485 nack_span = start_nack_span( 

486 subscription_id, 

487 len(ack_reqs_dict), 

488 project_id, 

489 subscribe_links, 

490 ) 

491 if ( 

492 nack_span and nack_span.get_span_context().trace_flags.sampled 

493 ): # pragma: NO COVER 

494 nack_span_context: trace.SpanContext = nack_span.get_span_context() 

495 for subscribe_span in subscribe_spans: 

496 subscribe_span.add_link( 

497 context=nack_span_context, 

498 attributes={ 

499 "messaging.operation.name": "nack", 

500 }, 

501 ) 

502 requests_to_retry: List[requests.ModAckRequest] 

503 requests_completed: Optional[List[requests.ModAckRequest]] = None 

504 if default_deadline is None: 

505 # no further work needs to be done for `requests_to_retry` 

506 requests_completed, requests_to_retry = self._manager.send_unary_modack( 

507 modify_deadline_ack_ids=list( 

508 itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) 

509 ), 

510 modify_deadline_seconds=list( 

511 itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) 

512 ), 

513 ack_reqs_dict=ack_reqs_dict, 

514 default_deadline=None, 

515 ) 

516 else: 

517 requests_completed, requests_to_retry = self._manager.send_unary_modack( 

518 modify_deadline_ack_ids=itertools.islice( 

519 ack_ids_gen, _ACK_IDS_BATCH_SIZE 

520 ), 

521 modify_deadline_seconds=None, 

522 ack_reqs_dict=ack_reqs_dict, 

523 default_deadline=default_deadline, 

524 ) 

525 if nack_span: 

526 nack_span.end() 

527 assert ( 

528 len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE 

529 ), "Too many requests to be retried." 

530 

531 for completed_modack in requests_completed: 

532 if completed_modack.opentelemetry_data: 

533 # nack is a modack with 0 extension seconds. 

534 if math.isclose(completed_modack.seconds, 0): 

535 completed_modack.opentelemetry_data.set_subscribe_span_result( 

536 "nacked" 

537 ) 

538 completed_modack.opentelemetry_data.add_subscribe_span_event( 

539 "nack end" 

540 ) 

541 completed_modack.opentelemetry_data.end_subscribe_span() 

542 else: 

543 completed_modack.opentelemetry_data.add_subscribe_span_event( 

544 "modack end" 

545 ) 

546 

547 # Retry on a separate thread so the dispatcher thread isn't blocked 

548 # by sleeps. 

549 if requests_to_retry: 

550 self._start_retry_thread( 

551 "Thread-RetryModAcks", 

552 functools.partial(self._retry_modacks, requests_to_retry), 

553 ) 

554 

555 def _retry_modacks(self, requests_to_retry): 

556 retry_delay_gen = exponential_sleep_generator( 

557 initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

558 maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, 

559 ) 

560 while requests_to_retry: 

561 time_to_wait = next(retry_delay_gen) 

562 _LOGGER.debug( 

563 "Retrying {len(requests_to_retry)} modack(s) after delay of " 

564 + str(time_to_wait) 

565 + " seconds" 

566 ) 

567 time.sleep(time_to_wait) 

568 

569 ack_reqs_dict = {req.ack_id: req for req in requests_to_retry} 

570 

571 subscription_id = None 

572 project_id = None 

573 subscribe_links = [] 

574 subscribe_spans = [] 

575 for ack_req in ack_reqs_dict.values(): 

576 if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0): 

577 if subscription_id is None: 

578 subscription_id = ack_req.opentelemetry_data.subscription_id 

579 if project_id is None: 

580 project_id = ack_req.opentelemetry_data.project_id 

581 subscribe_span = ack_req.opentelemetry_data.subscribe_span 

582 if ( 

583 subscribe_span 

584 and subscribe_span.get_span_context().trace_flags.sampled 

585 ): 

586 subscribe_links.append( 

587 trace.Link(subscribe_span.get_span_context()) 

588 ) 

589 subscribe_spans.append(subscribe_span) 

590 nack_span = None 

591 if subscription_id and project_id: 

592 nack_span = start_nack_span( 

593 subscription_id, 

594 len(ack_reqs_dict), 

595 project_id, 

596 subscribe_links, 

597 ) 

598 if ( 

599 nack_span and nack_span.get_span_context().trace_flags.sampled 

600 ): # pragma: NO COVER 

601 nack_span_context: trace.SpanContext = nack_span.get_span_context() 

602 for subscribe_span in subscribe_spans: 

603 subscribe_span.add_link( 

604 context=nack_span_context, 

605 attributes={ 

606 "messaging.operation.name": "nack", 

607 }, 

608 ) 

609 requests_completed, requests_to_retry = self._manager.send_unary_modack( 

610 modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry], 

611 modify_deadline_seconds=[req.seconds for req in requests_to_retry], 

612 ack_reqs_dict=ack_reqs_dict, 

613 ) 

614 if nack_span: 

615 nack_span.end() 

616 for completed_modack in requests_completed: 

617 if completed_modack.opentelemetry_data: 

618 # nack is a modack with 0 extension seconds. 

619 if math.isclose(completed_modack.seconds, 0): 

620 completed_modack.opentelemetry_data.set_subscribe_span_result( 

621 "nacked" 

622 ) 

623 completed_modack.opentelemetry_data.add_subscribe_span_event( 

624 "nack end" 

625 ) 

626 completed_modack.opentelemetry_data.end_subscribe_span() 

627 else: 

628 completed_modack.opentelemetry_data.add_subscribe_span_event( 

629 "modack end" 

630 ) 

631 

632 def nack(self, items: Sequence[requests.NackRequest]) -> None: 

633 """Explicitly deny receipt of messages. 

634 

635 Args: 

636 items: The items to deny. 

637 """ 

638 self.modify_ack_deadline( 

639 [ 

640 requests.ModAckRequest( 

641 ack_id=item.ack_id, 

642 seconds=0, 

643 future=item.future, 

644 opentelemetry_data=item.opentelemetry_data, 

645 ) 

646 for item in items 

647 ] 

648 ) 

649 self.drop( 

650 [ 

651 requests.DropRequest( 

652 ack_id=item.ack_id, 

653 byte_size=item.byte_size, 

654 ordering_key=item.ordering_key, 

655 ) 

656 for item in items 

657 ] 

658 )