Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

507 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import division 

16 

17import collections 

18import functools 

19import inspect 

20import itertools 

21import logging 

22import threading 

23import typing 

24from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple 

25import uuid 

26 

27from opentelemetry import trace 

28import grpc # type: ignore 

29 

30from google.api_core import bidi 

31from google.api_core import exceptions 

32from google.cloud.pubsub_v1 import types 

33from google.cloud.pubsub_v1.subscriber._protocol import dispatcher 

34from google.cloud.pubsub_v1.subscriber._protocol import heartbeater 

35from google.cloud.pubsub_v1.subscriber._protocol import histogram 

36from google.cloud.pubsub_v1.subscriber._protocol import leaser 

37from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold 

38from google.cloud.pubsub_v1.subscriber._protocol import requests 

39from google.cloud.pubsub_v1.subscriber.exceptions import ( 

40 AcknowledgeError, 

41 AcknowledgeStatus, 

42) 

43from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

44 SubscribeOpenTelemetry, 

45) 

46import google.cloud.pubsub_v1.subscriber.message 

47from google.cloud.pubsub_v1.subscriber import futures 

48from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler 

49from google.pubsub_v1 import types as gapic_types 

50from grpc_status import rpc_status # type: ignore 

51from google.rpc.error_details_pb2 import ErrorInfo # type: ignore 

52from google.rpc import code_pb2 # type: ignore 

53from google.rpc import status_pb2 

54from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

55 start_modack_span, 

56) 

57 

58if typing.TYPE_CHECKING: # pragma: NO COVER 

59 from google.cloud.pubsub_v1 import subscriber 

60 

61 

62_LOGGER = logging.getLogger(__name__) 

63_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" 

64_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" 

65_RETRYABLE_STREAM_ERRORS = ( 

66 exceptions.Aborted, 

67 exceptions.DeadlineExceeded, 

68 exceptions.GatewayTimeout, 

69 exceptions.InternalServerError, 

70 exceptions.ResourceExhausted, 

71 exceptions.ServiceUnavailable, 

72 exceptions.Unknown, 

73) 

74_TERMINATING_STREAM_ERRORS = ( 

75 exceptions.Cancelled, 

76 exceptions.InvalidArgument, 

77 exceptions.NotFound, 

78 exceptions.PermissionDenied, 

79 exceptions.Unauthenticated, 

80 exceptions.Unauthorized, 

81) 

82_MAX_LOAD = 1.0 

83"""The load threshold above which to pause the incoming message stream.""" 

84 

85_RESUME_THRESHOLD = 0.8 

86"""The load threshold below which to resume the incoming message stream.""" 

87 

88_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60 

89"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for 

90a subscription. We do this to reduce premature ack expiration. 

91""" 

92 

93_DEFAULT_STREAM_ACK_DEADLINE: float = 60 

94"""The default stream ack deadline in seconds.""" 

95 

96_MAX_STREAM_ACK_DEADLINE: float = 600 

97"""The maximum stream ack deadline in seconds.""" 

98 

99_MIN_STREAM_ACK_DEADLINE: float = 10 

100"""The minimum stream ack deadline in seconds.""" 

101 

102_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { 

103 code_pb2.DEADLINE_EXCEEDED, 

104 code_pb2.RESOURCE_EXHAUSTED, 

105 code_pb2.ABORTED, 

106 code_pb2.INTERNAL, 

107 code_pb2.UNAVAILABLE, 

108} 

109 

110# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform 

111# callers on unrecoverable errors. We can only pass this arg if it's available in the 

112# `BackgroundConsumer` spec. 

113_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec( 

114 bidi.BackgroundConsumer 

115) 

116 

117 

118def _wrap_as_exception(maybe_exception: Any) -> BaseException: 

119 """Wrap an object as a Python exception, if needed. 

120 

121 Args: 

122 maybe_exception: The object to wrap, usually a gRPC exception class. 

123 

124 Returns: 

125 The argument itself if an instance of ``BaseException``, otherwise 

126 the argument represented as an instance of ``Exception`` (sub)class. 

127 """ 

128 if isinstance(maybe_exception, grpc.RpcError): 

129 return exceptions.from_grpc_error(maybe_exception) 

130 elif isinstance(maybe_exception, BaseException): 

131 return maybe_exception 

132 

133 return Exception(maybe_exception) 

134 

135 

136def _wrap_callback_errors( 

137 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

138 on_callback_error: Callable[[BaseException], Any], 

139 message: "google.cloud.pubsub_v1.subscriber.message.Message", 

140): 

141 """Wraps a user callback so that if an exception occurs the message is 

142 nacked. 

143 

144 Args: 

145 callback: The user callback. 

146 message: The Pub/Sub message. 

147 """ 

148 try: 

149 if message.opentelemetry_data: 

150 message.opentelemetry_data.end_subscribe_concurrency_control_span() 

151 message.opentelemetry_data.start_process_span() 

152 callback(message) 

153 except BaseException as exc: 

154 # Note: the likelihood of this failing is extremely low. This just adds 

155 # a message to a queue, so if this doesn't work the world is in an 

156 # unrecoverable state and this thread should just bail. 

157 _LOGGER.exception( 

158 "Top-level exception occurred in callback while processing a message" 

159 ) 

160 message.nack() 

161 on_callback_error(exc) 

162 

163 

164def _get_status( 

165 exc: exceptions.GoogleAPICallError, 

166) -> Optional["status_pb2.Status"]: 

167 if not exc.response: 

168 _LOGGER.debug("No response obj in errored RPC call.") 

169 return None 

170 try: 

171 return rpc_status.from_call(exc.response) 

172 # Possible "If the gRPC call’s code or details are inconsistent 

173 # with the status code and message inside of the 

174 # google.rpc.status.Status" 

175 except ValueError: 

176 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True) 

177 return None 

178 

179 

180def _get_ack_errors( 

181 exc: exceptions.GoogleAPICallError, 

182) -> Optional[Dict[str, str]]: 

183 status = _get_status(exc) 

184 if not status: 

185 _LOGGER.debug("Unable to get status of errored RPC.") 

186 return None 

187 for detail in status.details: 

188 info = ErrorInfo() 

189 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)): 

190 _LOGGER.debug("Unable to unpack ErrorInfo.") 

191 return None 

192 return info.metadata 

193 return None 

194 

195 

196def _process_requests( 

197 error_status: Optional["status_pb2.Status"], 

198 ack_reqs_dict: Dict[str, requests.AckRequest], 

199 errors_dict: Optional[Dict[str, str]], 

200): 

201 """Process requests when exactly-once delivery is enabled by referring to 

202 error_status and errors_dict. 

203 

204 The errors returned by the server in as `error_status` or in `errors_dict` 

205 are used to complete the request futures in `ack_reqs_dict` (with a success 

206 or exception) or to return requests for further retries. 

207 """ 

208 requests_completed = [] 

209 requests_to_retry = [] 

210 for ack_id in ack_reqs_dict: 

211 # Handle special errors returned for ack/modack RPCs via the ErrorInfo 

212 # sidecar metadata when exactly-once delivery is enabled. 

213 if errors_dict and ack_id in errors_dict: 

214 exactly_once_error = errors_dict[ack_id] 

215 if exactly_once_error.startswith("TRANSIENT_"): 

216 requests_to_retry.append(ack_reqs_dict[ack_id]) 

217 else: 

218 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": 

219 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) 

220 else: 

221 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) 

222 future = ack_reqs_dict[ack_id].future 

223 if future is not None: 

224 future.set_exception(exc) 

225 requests_completed.append(ack_reqs_dict[ack_id]) 

226 # Temporary GRPC errors are retried 

227 elif ( 

228 error_status 

229 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS 

230 ): 

231 requests_to_retry.append(ack_reqs_dict[ack_id]) 

232 # Other GRPC errors are NOT retried 

233 elif error_status: 

234 if error_status.code == code_pb2.PERMISSION_DENIED: 

235 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) 

236 elif error_status.code == code_pb2.FAILED_PRECONDITION: 

237 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) 

238 else: 

239 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) 

240 future = ack_reqs_dict[ack_id].future 

241 if future is not None: 

242 future.set_exception(exc) 

243 requests_completed.append(ack_reqs_dict[ack_id]) 

244 # Since no error occurred, requests with futures are completed successfully. 

245 elif ack_reqs_dict[ack_id].future: 

246 future = ack_reqs_dict[ack_id].future 

247 # success 

248 assert future is not None 

249 future.set_result(AcknowledgeStatus.SUCCESS) 

250 requests_completed.append(ack_reqs_dict[ack_id]) 

251 # All other requests are considered completed. 

252 else: 

253 requests_completed.append(ack_reqs_dict[ack_id]) 

254 

255 return requests_completed, requests_to_retry 

256 

257 

258class StreamingPullManager(object): 

259 """The streaming pull manager coordinates pulling messages from Pub/Sub, 

260 leasing them, and scheduling them to be processed. 

261 

262 Args: 

263 client: 

264 The subscriber client used to create this instance. 

265 subscription: 

266 The name of the subscription. The canonical format for this is 

267 ``projects/{project}/subscriptions/{subscription}``. 

268 flow_control: 

269 The flow control settings. 

270 scheduler: 

271 The scheduler to use to process messages. If not provided, a thread 

272 pool-based scheduler will be used. 

273 use_legacy_flow_control: 

274 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

275 though client-side flow control is still enabled. If set to ``False`` 

276 (default), both server-side and client-side flow control are enabled. 

277 await_callbacks_on_shutdown: 

278 If ``True``, the shutdown thread will wait until all scheduler threads 

279 terminate and only then proceed with shutting down the remaining running 

280 helper threads. 

281 

282 If ``False`` (default), the shutdown thread will shut the scheduler down, 

283 but it will not wait for the currently executing scheduler threads to 

284 terminate. 

285 

286 This setting affects when the on close callbacks get invoked, and 

287 consequently, when the StreamingPullFuture associated with the stream gets 

288 resolved. 

289 """ 

290 

291 def __init__( 

292 self, 

293 client: "subscriber.Client", 

294 subscription: str, 

295 flow_control: types.FlowControl = types.FlowControl(), 

296 scheduler: Optional[ThreadScheduler] = None, 

297 use_legacy_flow_control: bool = False, 

298 await_callbacks_on_shutdown: bool = False, 

299 ): 

300 self._client = client 

301 self._subscription = subscription 

302 self._exactly_once_enabled = False 

303 self._flow_control = flow_control 

304 self._use_legacy_flow_control = use_legacy_flow_control 

305 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown 

306 self._ack_histogram = histogram.Histogram() 

307 self._last_histogram_size = 0 

308 self._stream_metadata = [ 

309 ["x-goog-request-params", "subscription=" + subscription] 

310 ] 

311 

312 # If max_duration_per_lease_extension is the default 

313 # we set the stream_ack_deadline to the default of 60 

314 if self._flow_control.max_duration_per_lease_extension == 0: 

315 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE 

316 # We will not be able to extend more than the default minimum 

317 elif ( 

318 self._flow_control.max_duration_per_lease_extension 

319 < _MIN_STREAM_ACK_DEADLINE 

320 ): 

321 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE 

322 # Will not be able to extend past the max 

323 elif ( 

324 self._flow_control.max_duration_per_lease_extension 

325 > _MAX_STREAM_ACK_DEADLINE 

326 ): 

327 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE 

328 else: 

329 self._stream_ack_deadline = ( 

330 self._flow_control.max_duration_per_lease_extension 

331 ) 

332 

333 self._ack_deadline = max( 

334 min( 

335 self._flow_control.min_duration_per_lease_extension, 

336 histogram.MAX_ACK_DEADLINE, 

337 ), 

338 histogram.MIN_ACK_DEADLINE, 

339 ) 

340 

341 self._rpc: Optional[bidi.ResumableBidiRpc] = None 

342 self._callback: Optional[functools.partial] = None 

343 self._closing = threading.Lock() 

344 self._closed = False 

345 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = [] 

346 # Guarded by self._exactly_once_enabled_lock 

347 self._send_new_ack_deadline = False 

348 

349 # A shutdown thread is created on intentional shutdown. 

350 self._regular_shutdown_thread: Optional[threading.Thread] = None 

351 

352 # Generate a random client id tied to this object. All streaming pull 

353 # connections (initial and re-connects) will then use the same client 

354 # id. Doing so lets the server establish affinity even across stream 

355 # disconncetions. 

356 self._client_id = str(uuid.uuid4()) 

357 

358 if scheduler is None: 

359 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler() 

360 else: 

361 self._scheduler = scheduler 

362 

363 # A collection for the messages that have been received from the server, 

364 # but not yet sent to the user callback. 

365 self._messages_on_hold = messages_on_hold.MessagesOnHold() 

366 

367 # The total number of bytes consumed by the messages currently on hold 

368 self._on_hold_bytes = 0 

369 

370 # A lock ensuring that pausing / resuming the consumer are both atomic 

371 # operations that cannot be executed concurrently. Needed for properly 

372 # syncing these operations with the current leaser load. Additionally, 

373 # the lock is used to protect modifications of internal data that 

374 # affects the load computation, i.e. the count and size of the messages 

375 # currently on hold. 

376 self._pause_resume_lock = threading.Lock() 

377 

378 # A lock guarding the self._exactly_once_enabled variable. We may also 

379 # acquire the self._ack_deadline_lock while this lock is held, but not 

380 # the reverse. So, we maintain a simple ordering of these two locks to 

381 # prevent deadlocks. 

382 self._exactly_once_enabled_lock = threading.Lock() 

383 

384 # A lock protecting the current ACK deadline used in the lease management. This 

385 # value can be potentially updated both by the leaser thread and by the message 

386 # consumer thread when invoking the internal _on_response() callback. 

387 self._ack_deadline_lock = threading.Lock() 

388 

389 # The threads created in ``.open()``. 

390 self._dispatcher: Optional[dispatcher.Dispatcher] = None 

391 self._leaser: Optional[leaser.Leaser] = None 

392 self._consumer: Optional[bidi.BackgroundConsumer] = None 

393 self._heartbeater: Optional[heartbeater.Heartbeater] = None 

394 

395 @property 

396 def is_active(self) -> bool: 

397 """``True`` if this manager is actively streaming. 

398 

399 Note that ``False`` does not indicate this is complete shut down, 

400 just that it stopped getting new messages. 

401 """ 

402 return self._consumer is not None and self._consumer.is_active 

403 

404 @property 

405 def flow_control(self) -> types.FlowControl: 

406 """The active flow control settings.""" 

407 return self._flow_control 

408 

409 @property 

410 def dispatcher(self) -> Optional[dispatcher.Dispatcher]: 

411 """The dispatcher helper.""" 

412 return self._dispatcher 

413 

414 @property 

415 def leaser(self) -> Optional[leaser.Leaser]: 

416 """The leaser helper.""" 

417 return self._leaser 

418 

419 @property 

420 def ack_histogram(self) -> histogram.Histogram: 

421 """The histogram tracking time-to-acknowledge.""" 

422 return self._ack_histogram 

423 

424 @property 

425 def ack_deadline(self) -> float: 

426 """Return the current ACK deadline based on historical data without updating it. 

427 

428 Returns: 

429 The ack deadline. 

430 """ 

431 return self._obtain_ack_deadline(maybe_update=False) 

432 

433 def _obtain_ack_deadline(self, maybe_update: bool) -> float: 

434 """The actual `ack_deadline` implementation. 

435 

436 This method is "sticky". It will only perform the computations to check on the 

437 right ACK deadline if explicitly requested AND if the histogram with past 

438 time-to-ack data has gained a significant amount of new information. 

439 

440 Args: 

441 maybe_update: 

442 If ``True``, also update the current ACK deadline before returning it if 

443 enough new ACK data has been gathered. 

444 

445 Returns: 

446 The current ACK deadline in seconds to use. 

447 """ 

448 with self._ack_deadline_lock: 

449 if not maybe_update: 

450 return self._ack_deadline 

451 

452 target_size = min( 

453 self._last_histogram_size * 2, self._last_histogram_size + 100 

454 ) 

455 hist_size = len(self.ack_histogram) 

456 

457 if hist_size > target_size: 

458 self._last_histogram_size = hist_size 

459 self._ack_deadline = self.ack_histogram.percentile(percent=99) 

460 

461 if self.flow_control.max_duration_per_lease_extension > 0: 

462 # The setting in flow control could be too low, adjust if needed. 

463 flow_control_setting = max( 

464 self.flow_control.max_duration_per_lease_extension, 

465 histogram.MIN_ACK_DEADLINE, 

466 ) 

467 self._ack_deadline = min(self._ack_deadline, flow_control_setting) 

468 

469 # If the user explicitly sets a min ack_deadline, respect it. 

470 if self.flow_control.min_duration_per_lease_extension > 0: 

471 # The setting in flow control could be too high, adjust if needed. 

472 flow_control_setting = min( 

473 self.flow_control.min_duration_per_lease_extension, 

474 histogram.MAX_ACK_DEADLINE, 

475 ) 

476 self._ack_deadline = max(self._ack_deadline, flow_control_setting) 

477 elif self._exactly_once_enabled: 

478 # Higher minimum ack_deadline for subscriptions with 

479 # exactly-once delivery enabled. 

480 self._ack_deadline = max( 

481 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED 

482 ) 

483 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline 

484 # set the stream_ack_deadline to the new ack_deadline. 

485 if self._ack_deadline > self._stream_ack_deadline: 

486 self._stream_ack_deadline = self._ack_deadline 

487 return self._ack_deadline 

488 

489 @property 

490 def load(self) -> float: 

491 """Return the current load. 

492 

493 The load is represented as a float, where 1.0 represents having 

494 hit one of the flow control limits, and values between 0.0 and 1.0 

495 represent how close we are to them. (0.5 means we have exactly half 

496 of what the flow control setting allows, for example.) 

497 

498 There are (currently) two flow control settings; this property 

499 computes how close the manager is to each of them, and returns 

500 whichever value is higher. (It does not matter that we have lots of 

501 running room on setting A if setting B is over.) 

502 

503 Returns: 

504 The load value. 

505 """ 

506 if self._leaser is None: 

507 return 0.0 

508 

509 # Messages that are temporarily put on hold are not being delivered to 

510 # user's callbacks, thus they should not contribute to the flow control 

511 # load calculation. 

512 # However, since these messages must still be lease-managed to avoid 

513 # unnecessary ACK deadline expirations, their count and total size must 

514 # be subtracted from the leaser's values. 

515 return max( 

516 [ 

517 (self._leaser.message_count - self._messages_on_hold.size) 

518 / self._flow_control.max_messages, 

519 (self._leaser.bytes - self._on_hold_bytes) 

520 / self._flow_control.max_bytes, 

521 ] 

522 ) 

523 

524 def add_close_callback( 

525 self, callback: Callable[["StreamingPullManager", Any], Any] 

526 ) -> None: 

527 """Schedules a callable when the manager closes. 

528 

529 Args: 

530 The method to call. 

531 """ 

532 self._close_callbacks.append(callback) 

533 

534 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None: 

535 """Send the next message in the queue for each of the passed-in 

536 ordering keys, if they exist. Clean up state for keys that no longer 

537 have any queued messages. 

538 

539 Since the load went down by one message, it's probably safe to send the 

540 user another message for the same key. Since the released message may be 

541 bigger than the previous one, this may increase the load above the maximum. 

542 This decision is by design because it simplifies MessagesOnHold. 

543 

544 Args: 

545 ordering_keys: 

546 A sequence of ordering keys to activate. May be empty. 

547 """ 

548 with self._pause_resume_lock: 

549 if self._scheduler is None: 

550 return # We are shutting down, don't try to dispatch any more messages. 

551 

552 self._messages_on_hold.activate_ordering_keys( 

553 ordering_keys, self._schedule_message_on_hold 

554 ) 

555 

556 def maybe_pause_consumer(self) -> None: 

557 """Check the current load and pause the consumer if needed.""" 

558 with self._pause_resume_lock: 

559 if self.load >= _MAX_LOAD: 

560 if self._consumer is not None and not self._consumer.is_paused: 

561 _LOGGER.debug( 

562 "Message backlog over load at %.2f, pausing.", self.load 

563 ) 

564 self._consumer.pause() 

565 

566 def maybe_resume_consumer(self) -> None: 

567 """Check the load and held messages and resume the consumer if needed. 

568 

569 If there are messages held internally, release those messages before 

570 resuming the consumer. That will avoid leaser overload. 

571 """ 

572 with self._pause_resume_lock: 

573 # If we have been paused by flow control, check and see if we are 

574 # back within our limits. 

575 # 

576 # In order to not thrash too much, require us to have passed below 

577 # the resume threshold (80% by default) of each flow control setting 

578 # before restarting. 

579 if self._consumer is None or not self._consumer.is_paused: 

580 return 

581 

582 _LOGGER.debug("Current load: %.2f", self.load) 

583 

584 # Before maybe resuming the background consumer, release any messages 

585 # currently on hold, if the current load allows for it. 

586 self._maybe_release_messages() 

587 

588 if self.load < _RESUME_THRESHOLD: 

589 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) 

590 self._consumer.resume() 

591 else: 

592 _LOGGER.debug("Did not resume, current load is %.2f.", self.load) 

593 

594 def _maybe_release_messages(self) -> None: 

595 """Release (some of) the held messages if the current load allows for it. 

596 

597 The method tries to release as many messages as the current leaser load 

598 would allow. Each released message is added to the lease management, 

599 and the user callback is scheduled for it. 

600 

601 If there are currently no messages on hold, or if the leaser is 

602 already overloaded, this method is effectively a no-op. 

603 

604 The method assumes the caller has acquired the ``_pause_resume_lock``. 

605 """ 

606 released_ack_ids = [] 

607 while self.load < _MAX_LOAD: 

608 msg = self._messages_on_hold.get() 

609 if not msg: 

610 break 

611 if msg.opentelemetry_data: 

612 msg.opentelemetry_data.end_subscribe_scheduler_span() 

613 self._schedule_message_on_hold(msg) 

614 released_ack_ids.append(msg.ack_id) 

615 

616 assert self._leaser is not None 

617 self._leaser.start_lease_expiry_timer(released_ack_ids) 

618 

619 def _schedule_message_on_hold( 

620 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message" 

621 ): 

622 """Schedule a message on hold to be sent to the user and change on-hold-bytes. 

623 

624 The method assumes the caller has acquired the ``_pause_resume_lock``. 

625 

626 Args: 

627 msg: The message to schedule to be sent to the user. 

628 """ 

629 assert msg, "Message must not be None." 

630 

631 # On-hold bytes goes down, increasing load. 

632 self._on_hold_bytes -= msg.size 

633 

634 if self._on_hold_bytes < 0: 

635 _LOGGER.warning( 

636 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes 

637 ) 

638 self._on_hold_bytes = 0 

639 

640 _LOGGER.debug( 

641 "Released held message, scheduling callback for it, " 

642 "still on hold %s (bytes %s).", 

643 self._messages_on_hold.size, 

644 self._on_hold_bytes, 

645 ) 

646 assert self._scheduler is not None 

647 assert self._callback is not None 

648 if msg.opentelemetry_data: 

649 msg.opentelemetry_data.start_subscribe_concurrency_control_span() 

650 self._scheduler.schedule(self._callback, msg) 

651 

652 def send_unary_ack( 

653 self, ack_ids, ack_reqs_dict 

654 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: 

655 """Send a request using a separate unary request instead of over the stream. 

656 

657 If a RetryError occurs, the manager shutdown is triggered, and the 

658 error is re-raised. 

659 """ 

660 assert ack_ids 

661 assert len(ack_ids) == len(ack_reqs_dict) 

662 

663 error_status = None 

664 ack_errors_dict = None 

665 try: 

666 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids) 

667 except exceptions.GoogleAPICallError as exc: 

668 _LOGGER.debug( 

669 "Exception while sending unary RPC. This is typically " 

670 "non-fatal as stream requests are best-effort.", 

671 exc_info=True, 

672 ) 

673 error_status = _get_status(exc) 

674 ack_errors_dict = _get_ack_errors(exc) 

675 except exceptions.RetryError as exc: 

676 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

677 # Makes sure to complete futures so they don't block forever. 

678 for req in ack_reqs_dict.values(): 

679 # Futures may be present even with exactly-once delivery 

680 # disabled, in transition periods after the setting is changed on 

681 # the subscription. 

682 if req.future: 

683 if exactly_once_delivery_enabled: 

684 e = AcknowledgeError( 

685 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." 

686 ) 

687 req.future.set_exception(e) 

688 else: 

689 req.future.set_result(AcknowledgeStatus.SUCCESS) 

690 

691 _LOGGER.debug( 

692 "RetryError while sending ack RPC. Waiting on a transient " 

693 "error resolution for too long, will now trigger shutdown.", 

694 exc_info=False, 

695 ) 

696 # The underlying channel has been suffering from a retryable error 

697 # for too long, time to give up and shut the streaming pull down. 

698 self._on_rpc_done(exc) 

699 raise 

700 

701 if self._exactly_once_delivery_enabled(): 

702 requests_completed, requests_to_retry = _process_requests( 

703 error_status, ack_reqs_dict, ack_errors_dict 

704 ) 

705 else: 

706 requests_completed = [] 

707 requests_to_retry = [] 

708 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

709 # best-effort. So, they always succeed even if the RPC fails. 

710 for req in ack_reqs_dict.values(): 

711 # Futures may be present even with exactly-once delivery 

712 # disabled, in transition periods after the setting is changed on 

713 # the subscription. 

714 if req.future: 

715 req.future.set_result(AcknowledgeStatus.SUCCESS) 

716 requests_completed.append(req) 

717 

718 return requests_completed, requests_to_retry 

719 

720 def send_unary_modack( 

721 self, 

722 modify_deadline_ack_ids, 

723 modify_deadline_seconds, 

724 ack_reqs_dict, 

725 default_deadline=None, 

726 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: 

727 """Send a request using a separate unary request instead of over the stream. 

728 

729 If a RetryError occurs, the manager shutdown is triggered, and the 

730 error is re-raised. 

731 """ 

732 assert modify_deadline_ack_ids 

733 # Either we have a generator or a single deadline. 

734 assert modify_deadline_seconds is None or default_deadline is None 

735 

736 error_status = None 

737 modack_errors_dict = None 

738 try: 

739 if default_deadline is None: 

740 # Send ack_ids with the same deadline seconds together. 

741 deadline_to_ack_ids = collections.defaultdict(list) 

742 

743 for n, ack_id in enumerate(modify_deadline_ack_ids): 

744 deadline = modify_deadline_seconds[n] 

745 deadline_to_ack_ids[deadline].append(ack_id) 

746 

747 for deadline, ack_ids in deadline_to_ack_ids.items(): 

748 self._client.modify_ack_deadline( 

749 subscription=self._subscription, 

750 ack_ids=ack_ids, 

751 ack_deadline_seconds=deadline, 

752 ) 

753 else: 

754 # We can send all requests with the default deadline. 

755 self._client.modify_ack_deadline( 

756 subscription=self._subscription, 

757 ack_ids=modify_deadline_ack_ids, 

758 ack_deadline_seconds=default_deadline, 

759 ) 

760 except exceptions.GoogleAPICallError as exc: 

761 _LOGGER.debug( 

762 "Exception while sending unary RPC. This is typically " 

763 "non-fatal as stream requests are best-effort.", 

764 exc_info=True, 

765 ) 

766 error_status = _get_status(exc) 

767 modack_errors_dict = _get_ack_errors(exc) 

768 except exceptions.RetryError as exc: 

769 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

770 # Makes sure to complete futures so they don't block forever. 

771 for req in ack_reqs_dict.values(): 

772 # Futures may be present even with exactly-once delivery 

773 # disabled, in transition periods after the setting is changed on 

774 # the subscription. 

775 if req.future: 

776 if exactly_once_delivery_enabled: 

777 e = AcknowledgeError( 

778 AcknowledgeStatus.OTHER, 

779 "RetryError while sending modack RPC.", 

780 ) 

781 req.future.set_exception(e) 

782 else: 

783 req.future.set_result(AcknowledgeStatus.SUCCESS) 

784 

785 _LOGGER.debug( 

786 "RetryError while sending modack RPC. Waiting on a transient " 

787 "error resolution for too long, will now trigger shutdown.", 

788 exc_info=False, 

789 ) 

790 # The underlying channel has been suffering from a retryable error 

791 # for too long, time to give up and shut the streaming pull down. 

792 self._on_rpc_done(exc) 

793 raise 

794 

795 if self._exactly_once_delivery_enabled(): 

796 requests_completed, requests_to_retry = _process_requests( 

797 error_status, ack_reqs_dict, modack_errors_dict 

798 ) 

799 else: 

800 requests_completed = [] 

801 requests_to_retry = [] 

802 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

803 # best-effort. So, they always succeed even if the RPC fails. 

804 for req in ack_reqs_dict.values(): 

805 # Futures may be present even with exactly-once delivery 

806 # disabled, in transition periods after the setting is changed on 

807 # the subscription. 

808 if req.future: 

809 req.future.set_result(AcknowledgeStatus.SUCCESS) 

810 requests_completed.append(req) 

811 

812 return requests_completed, requests_to_retry 

813 

814 def heartbeat(self) -> bool: 

815 """Sends a heartbeat request over the streaming pull RPC. 

816 

817 The request is empty by default, but may contain the current ack_deadline 

818 if the self._exactly_once_enabled flag has changed. 

819 

820 Returns: 

821 If a heartbeat request has actually been sent. 

822 """ 

823 if self._rpc is not None and self._rpc.is_active: 

824 send_new_ack_deadline = False 

825 with self._exactly_once_enabled_lock: 

826 send_new_ack_deadline = self._send_new_ack_deadline 

827 self._send_new_ack_deadline = False 

828 

829 if send_new_ack_deadline: 

830 request = gapic_types.StreamingPullRequest( 

831 stream_ack_deadline_seconds=self._stream_ack_deadline 

832 ) 

833 _LOGGER.debug( 

834 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline 

835 ) 

836 else: 

837 request = gapic_types.StreamingPullRequest() 

838 

839 self._rpc.send(request) 

840 return True 

841 

842 return False 

843 

844 def open( 

845 self, 

846 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

847 on_callback_error: Callable[[Exception], Any], 

848 ) -> None: 

849 """Begin consuming messages. 

850 

851 Args: 

852 callback: 

853 A callback that will be called for each message received on the 

854 stream. 

855 on_callback_error: 

856 A callable that will be called if an exception is raised in 

857 the provided `callback`. 

858 """ 

859 if self.is_active: 

860 raise ValueError("This manager is already open.") 

861 

862 if self._closed: 

863 raise ValueError("This manager has been closed and can not be re-used.") 

864 

865 self._callback = functools.partial( 

866 _wrap_callback_errors, callback, on_callback_error 

867 ) 

868 

869 # Create the RPC 

870 stream_ack_deadline_seconds = self._stream_ack_deadline 

871 

872 get_initial_request = functools.partial( 

873 self._get_initial_request, stream_ack_deadline_seconds 

874 ) 

875 self._rpc = bidi.ResumableBidiRpc( 

876 start_rpc=self._client.streaming_pull, 

877 initial_request=get_initial_request, 

878 should_recover=self._should_recover, 

879 should_terminate=self._should_terminate, 

880 metadata=self._stream_metadata, 

881 throttle_reopen=True, 

882 ) 

883 self._rpc.add_done_callback(self._on_rpc_done) 

884 

885 _LOGGER.debug( 

886 "Creating a stream, default ACK deadline set to {} seconds.".format( 

887 self._stream_ack_deadline 

888 ) 

889 ) 

890 

891 # Create references to threads 

892 assert self._scheduler is not None 

893 scheduler_queue = self._scheduler.queue 

894 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue) 

895 

896 # `on_fatal_exception` is only available in more recent library versions. 

897 # For backwards compatibility reasons, we only pass it when `google-api-core` supports it. 

898 if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK: 

899 self._consumer = bidi.BackgroundConsumer( 

900 self._rpc, 

901 self._on_response, 

902 on_fatal_exception=self._on_fatal_exception, 

903 ) 

904 else: 

905 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) 

906 

907 self._leaser = leaser.Leaser(self) 

908 self._heartbeater = heartbeater.Heartbeater(self) 

909 

910 # Start the thread to pass the requests. 

911 self._dispatcher.start() 

912 

913 # Start consuming messages. 

914 self._consumer.start() 

915 

916 # Start the lease maintainer thread. 

917 self._leaser.start() 

918 

919 # Start the stream heartbeater thread. 

920 self._heartbeater.start() 

921 

922 def close(self, reason: Any = None) -> None: 

923 """Stop consuming messages and shutdown all helper threads. 

924 

925 This method is idempotent. Additional calls will have no effect. 

926 

927 The method does not block, it delegates the shutdown operations to a background 

928 thread. 

929 

930 Args: 

931 reason: 

932 The reason to close this. If ``None``, this is considered 

933 an "intentional" shutdown. This is passed to the callbacks 

934 specified via :meth:`add_close_callback`. 

935 """ 

936 self._regular_shutdown_thread = threading.Thread( 

937 name=_REGULAR_SHUTDOWN_THREAD_NAME, 

938 daemon=True, 

939 target=self._shutdown, 

940 kwargs={"reason": reason}, 

941 ) 

942 self._regular_shutdown_thread.start() 

943 

944 def _shutdown(self, reason: Any = None) -> None: 

945 """Run the actual shutdown sequence (stop the stream and all helper threads). 

946 

947 Args: 

948 reason: 

949 The reason to close the stream. If ``None``, this is considered 

950 an "intentional" shutdown. 

951 """ 

952 with self._closing: 

953 if self._closed: 

954 return 

955 

956 # Stop consuming messages. 

957 if self.is_active: 

958 _LOGGER.debug("Stopping consumer.") 

959 assert self._consumer is not None 

960 self._consumer.stop() 

961 self._consumer = None 

962 

963 # Shutdown all helper threads 

964 _LOGGER.debug("Stopping scheduler.") 

965 assert self._scheduler is not None 

966 dropped_messages = self._scheduler.shutdown( 

967 await_msg_callbacks=self._await_callbacks_on_shutdown 

968 ) 

969 self._scheduler = None 

970 

971 # Leaser and dispatcher reference each other through the shared 

972 # StreamingPullManager instance, i.e. "self", thus do not set their 

973 # references to None until both have been shut down. 

974 # 

975 # NOTE: Even if the dispatcher operates on an inactive leaser using 

976 # the latter's add() and remove() methods, these have no impact on 

977 # the stopped leaser (the leaser is never again re-started). Ditto 

978 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), 

979 # because the consumer gets shut down first. 

980 _LOGGER.debug("Stopping leaser.") 

981 assert self._leaser is not None 

982 self._leaser.stop() 

983 

984 total = len(dropped_messages) + len( 

985 self._messages_on_hold._messages_on_hold 

986 ) 

987 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).") 

988 messages_to_nack = itertools.chain( 

989 dropped_messages, self._messages_on_hold._messages_on_hold 

990 ) 

991 for msg in messages_to_nack: 

992 msg.nack() 

993 

994 _LOGGER.debug("Stopping dispatcher.") 

995 assert self._dispatcher is not None 

996 self._dispatcher.stop() 

997 self._dispatcher = None 

998 # dispatcher terminated, OK to dispose the leaser reference now 

999 self._leaser = None 

1000 

1001 _LOGGER.debug("Stopping heartbeater.") 

1002 assert self._heartbeater is not None 

1003 self._heartbeater.stop() 

1004 self._heartbeater = None 

1005 

1006 self._rpc = None 

1007 self._closed = True 

1008 _LOGGER.debug("Finished stopping manager.") 

1009 

1010 for callback in self._close_callbacks: 

1011 callback(self, reason) 

1012 

1013 def _get_initial_request( 

1014 self, stream_ack_deadline_seconds: int 

1015 ) -> gapic_types.StreamingPullRequest: 

1016 """Return the initial request for the RPC. 

1017 

1018 This defines the initial request that must always be sent to Pub/Sub 

1019 immediately upon opening the subscription. 

1020 

1021 Args: 

1022 stream_ack_deadline_seconds: 

1023 The default message acknowledge deadline for the stream. 

1024 

1025 Returns: 

1026 A request suitable for being the first request on the stream (and not 

1027 suitable for any other purpose). 

1028 """ 

1029 # Put the request together. 

1030 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt 

1031 # anyway. Set to some big-ish value in case we modack late. 

1032 request = gapic_types.StreamingPullRequest( 

1033 stream_ack_deadline_seconds=stream_ack_deadline_seconds, 

1034 modify_deadline_ack_ids=[], 

1035 modify_deadline_seconds=[], 

1036 subscription=self._subscription, 

1037 client_id=self._client_id, 

1038 max_outstanding_messages=( 

1039 0 if self._use_legacy_flow_control else self._flow_control.max_messages 

1040 ), 

1041 max_outstanding_bytes=( 

1042 0 if self._use_legacy_flow_control else self._flow_control.max_bytes 

1043 ), 

1044 ) 

1045 

1046 # Return the initial request. 

1047 return request 

1048 

1049 def _send_lease_modacks( 

1050 self, 

1051 ack_ids: Iterable[str], 

1052 ack_deadline: float, 

1053 opentelemetry_data: List[SubscribeOpenTelemetry], 

1054 warn_on_invalid=True, 

1055 receipt_modack: bool = False, 

1056 ) -> Set[str]: 

1057 exactly_once_enabled = False 

1058 

1059 modack_span: Optional[trace.Span] = None 

1060 if self._client.open_telemetry_enabled: 

1061 subscribe_span_links: List[trace.Link] = [] 

1062 subscribe_spans: List[trace.Span] = [] 

1063 subscription_split: List[str] = self._subscription.split("/") 

1064 assert len(subscription_split) == 4 

1065 subscription_id: str = subscription_split[3] 

1066 project_id: str = subscription_split[1] 

1067 for data in opentelemetry_data: 

1068 subscribe_span: Optional[trace.Span] = data.subscribe_span 

1069 if ( 

1070 subscribe_span 

1071 and subscribe_span.get_span_context().trace_flags.sampled 

1072 ): 

1073 subscribe_span_links.append( 

1074 trace.Link(subscribe_span.get_span_context()) 

1075 ) 

1076 subscribe_spans.append(subscribe_span) 

1077 modack_span = start_modack_span( 

1078 subscribe_span_links, 

1079 subscription_id, 

1080 len(opentelemetry_data), 

1081 ack_deadline, 

1082 project_id, 

1083 "_send_lease_modacks", 

1084 receipt_modack, 

1085 ) 

1086 if ( 

1087 modack_span and modack_span.get_span_context().trace_flags.sampled 

1088 ): # pragma: NO COVER 

1089 modack_span_context: trace.SpanContext = modack_span.get_span_context() 

1090 for subscribe_span in subscribe_spans: 

1091 subscribe_span.add_link( 

1092 context=modack_span_context, 

1093 attributes={ 

1094 "messaging.operation.name": "modack", 

1095 }, 

1096 ) 

1097 

1098 with self._exactly_once_enabled_lock: 

1099 exactly_once_enabled = self._exactly_once_enabled 

1100 if exactly_once_enabled: 

1101 eod_items: List[requests.ModAckRequest] = [] 

1102 if self._client.open_telemetry_enabled: 

1103 for ack_id, data in zip( 

1104 ack_ids, opentelemetry_data 

1105 ): # pragma: NO COVER # Identical code covered in the same function below 

1106 assert data is not None 

1107 eod_items.append( 

1108 requests.ModAckRequest( 

1109 ack_id, 

1110 ack_deadline, 

1111 futures.Future(), 

1112 data, 

1113 ) 

1114 ) 

1115 else: 

1116 eod_items = [ 

1117 requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) 

1118 for ack_id in ack_ids 

1119 ] 

1120 

1121 assert self._dispatcher is not None 

1122 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline) 

1123 if ( 

1124 modack_span 

1125 ): # pragma: NO COVER # Identical code covered in the same function below 

1126 modack_span.end() 

1127 expired_ack_ids = set() 

1128 for req in eod_items: 

1129 try: 

1130 assert req.future is not None 

1131 req.future.result() 

1132 except AcknowledgeError as ack_error: 

1133 if ( 

1134 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID 

1135 or warn_on_invalid 

1136 ): 

1137 _LOGGER.warning( 

1138 "AcknowledgeError when lease-modacking a message.", 

1139 exc_info=True, 

1140 ) 

1141 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: 

1142 expired_ack_ids.add(req.ack_id) 

1143 return expired_ack_ids 

1144 else: 

1145 items: List[requests.ModAckRequest] = [] 

1146 if self._client.open_telemetry_enabled: 

1147 for ack_id, data in zip(ack_ids, opentelemetry_data): 

1148 assert data is not None 

1149 items.append( 

1150 requests.ModAckRequest( 

1151 ack_id, 

1152 self.ack_deadline, 

1153 None, 

1154 data, 

1155 ) 

1156 ) 

1157 else: 

1158 items = [ 

1159 requests.ModAckRequest(ack_id, self.ack_deadline, None) 

1160 for ack_id in ack_ids 

1161 ] 

1162 assert self._dispatcher is not None 

1163 self._dispatcher.modify_ack_deadline(items, ack_deadline) 

1164 if modack_span: 

1165 modack_span.end() 

1166 return set() 

1167 

1168 def _exactly_once_delivery_enabled(self) -> bool: 

1169 """Whether exactly-once delivery is enabled for the subscription.""" 

1170 with self._exactly_once_enabled_lock: 

1171 return self._exactly_once_enabled 

1172 

1173 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: 

1174 """Process all received Pub/Sub messages. 

1175 

1176 For each message, send a modified acknowledgment request to the 

1177 server. This prevents expiration of the message due to buffering by 

1178 gRPC or proxy/firewall. This makes the server and client expiration 

1179 timer closer to each other thus preventing the message being 

1180 redelivered multiple times. 

1181 

1182 After the messages have all had their ack deadline updated, execute 

1183 the callback for each message using the executor. 

1184 """ 

1185 if response is None: 

1186 _LOGGER.debug( 

1187 "Response callback invoked with None, likely due to a " 

1188 "transport shutdown." 

1189 ) 

1190 return 

1191 

1192 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying 

1193 # protobuf message to significantly gain on attribute access performance. 

1194 received_messages = response._pb.received_messages 

1195 

1196 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = [] 

1197 if self._client.open_telemetry_enabled: 

1198 for received_message in received_messages: 

1199 opentelemetry_data = SubscribeOpenTelemetry(received_message.message) 

1200 opentelemetry_data.start_subscribe_span( 

1201 self._subscription, 

1202 response.subscription_properties.exactly_once_delivery_enabled, 

1203 received_message.ack_id, 

1204 received_message.delivery_attempt, 

1205 ) 

1206 subscribe_opentelemetry.append(opentelemetry_data) 

1207 

1208 _LOGGER.debug( 

1209 "Processing %s received message(s), currently on hold %s (bytes %s).", 

1210 len(received_messages), 

1211 self._messages_on_hold.size, 

1212 self._on_hold_bytes, 

1213 ) 

1214 

1215 with self._exactly_once_enabled_lock: 

1216 if ( 

1217 response.subscription_properties.exactly_once_delivery_enabled 

1218 != self._exactly_once_enabled 

1219 ): 

1220 self._exactly_once_enabled = ( 

1221 response.subscription_properties.exactly_once_delivery_enabled 

1222 ) 

1223 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled 

1224 # This method acquires the self._ack_deadline_lock lock. 

1225 self._obtain_ack_deadline(maybe_update=True) 

1226 self._send_new_ack_deadline = True 

1227 

1228 # Immediately (i.e. without waiting for the auto lease management) 

1229 # modack the messages we received, as this tells the server that we've 

1230 # received them. 

1231 ack_id_gen = (message.ack_id for message in received_messages) 

1232 expired_ack_ids = self._send_lease_modacks( 

1233 ack_id_gen, 

1234 self.ack_deadline, 

1235 subscribe_opentelemetry, 

1236 warn_on_invalid=False, 

1237 receipt_modack=True, 

1238 ) 

1239 

1240 with self._pause_resume_lock: 

1241 if self._scheduler is None or self._leaser is None: 

1242 _LOGGER.debug( 

1243 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing." 

1244 ) 

1245 return 

1246 

1247 i: int = 0 

1248 for received_message in received_messages: 

1249 if ( 

1250 not self._exactly_once_delivery_enabled() 

1251 or received_message.ack_id not in expired_ack_ids 

1252 ): 

1253 message = google.cloud.pubsub_v1.subscriber.message.Message( 

1254 received_message.message, 

1255 received_message.ack_id, 

1256 received_message.delivery_attempt, 

1257 self._scheduler.queue, 

1258 self._exactly_once_delivery_enabled, 

1259 ) 

1260 if self._client.open_telemetry_enabled: 

1261 message.opentelemetry_data = subscribe_opentelemetry[i] 

1262 i = i + 1 

1263 self._messages_on_hold.put(message) 

1264 self._on_hold_bytes += message.size 

1265 req = requests.LeaseRequest( 

1266 ack_id=message.ack_id, 

1267 byte_size=message.size, 

1268 ordering_key=message.ordering_key, 

1269 opentelemetry_data=message.opentelemetry_data, 

1270 ) 

1271 self._leaser.add([req]) 

1272 

1273 self._maybe_release_messages() 

1274 

1275 self.maybe_pause_consumer() 

1276 

1277 def _on_fatal_exception(self, exception: BaseException) -> None: 

1278 """ 

1279 Called whenever `self.consumer` receives a non-retryable exception. 

1280 We close the manager on such non-retryable cases. 

1281 """ 

1282 _LOGGER.exception( 

1283 "Streaming pull terminating after receiving non-recoverable error: %s", 

1284 exception, 

1285 ) 

1286 self.close(exception) 

1287 

1288 def _should_recover(self, exception: BaseException) -> bool: 

1289 """Determine if an error on the RPC stream should be recovered. 

1290 

1291 If the exception is one of the retryable exceptions, this will signal 

1292 to the consumer thread that it should "recover" from the failure. 

1293 

1294 This will cause the stream to exit when it returns :data:`False`. 

1295 

1296 Returns: 

1297 Indicates if the caller should recover or shut down. 

1298 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1299 in a list of retryable / idempotent exceptions. 

1300 """ 

1301 exception = _wrap_as_exception(exception) 

1302 # If this is in the list of idempotent exceptions, then we want to 

1303 # recover. 

1304 if isinstance(exception, _RETRYABLE_STREAM_ERRORS): 

1305 _LOGGER.debug("Observed recoverable stream error %s", exception) 

1306 return True 

1307 _LOGGER.debug("Observed non-recoverable stream error %s", exception) 

1308 return False 

1309 

1310 def _should_terminate(self, exception: BaseException) -> bool: 

1311 """Determine if an error on the RPC stream should be terminated. 

1312 

1313 If the exception is one of the terminating exceptions, this will signal 

1314 to the consumer thread that it should terminate. 

1315 

1316 This will cause the stream to exit when it returns :data:`True`. 

1317 

1318 Returns: 

1319 Indicates if the caller should terminate or attempt recovery. 

1320 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1321 in a list of terminating exceptions. 

1322 """ 

1323 exception = _wrap_as_exception(exception) 

1324 is_api_error = isinstance(exception, exceptions.GoogleAPICallError) 

1325 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) 

1326 if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): 

1327 _LOGGER.error("Observed terminating stream error %s", exception) 

1328 return True 

1329 _LOGGER.debug("Observed non-terminating stream error %s", exception) 

1330 return False 

1331 

1332 def _on_rpc_done(self, future: Any) -> None: 

1333 """Triggered whenever the underlying RPC terminates without recovery. 

1334 

1335 This is typically triggered from one of two threads: the background 

1336 consumer thread (when calling ``recv()`` produces a non-recoverable 

1337 error) or the grpc management thread (when cancelling the RPC). 

1338 

1339 This method is *non-blocking*. It will start another thread to deal 

1340 with shutting everything down. This is to prevent blocking in the 

1341 background consumer and preventing it from being ``joined()``. 

1342 """ 

1343 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.") 

1344 error = _wrap_as_exception(future) 

1345 thread = threading.Thread( 

1346 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} 

1347 ) 

1348 thread.daemon = True 

1349 thread.start()