Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

499 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import division 

16 

17import collections 

18import functools 

19import itertools 

20import logging 

21import threading 

22import typing 

23from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple 

24import uuid 

25 

26from opentelemetry import trace 

27import grpc # type: ignore 

28 

29from google.api_core import bidi 

30from google.api_core import exceptions 

31from google.cloud.pubsub_v1 import types 

32from google.cloud.pubsub_v1.subscriber._protocol import dispatcher 

33from google.cloud.pubsub_v1.subscriber._protocol import heartbeater 

34from google.cloud.pubsub_v1.subscriber._protocol import histogram 

35from google.cloud.pubsub_v1.subscriber._protocol import leaser 

36from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold 

37from google.cloud.pubsub_v1.subscriber._protocol import requests 

38from google.cloud.pubsub_v1.subscriber.exceptions import ( 

39 AcknowledgeError, 

40 AcknowledgeStatus, 

41) 

42from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

43 SubscribeOpenTelemetry, 

44) 

45import google.cloud.pubsub_v1.subscriber.message 

46from google.cloud.pubsub_v1.subscriber import futures 

47from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler 

48from google.pubsub_v1 import types as gapic_types 

49from grpc_status import rpc_status # type: ignore 

50from google.rpc.error_details_pb2 import ErrorInfo # type: ignore 

51from google.rpc import code_pb2 # type: ignore 

52from google.rpc import status_pb2 

53from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

54 start_modack_span, 

55) 

56 

57if typing.TYPE_CHECKING: # pragma: NO COVER 

58 from google.cloud.pubsub_v1 import subscriber 

59 

60 

61_LOGGER = logging.getLogger(__name__) 

62_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" 

63_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" 

64_RETRYABLE_STREAM_ERRORS = ( 

65 exceptions.DeadlineExceeded, 

66 exceptions.ServiceUnavailable, 

67 exceptions.InternalServerError, 

68 exceptions.Unknown, 

69 exceptions.GatewayTimeout, 

70 exceptions.Aborted, 

71) 

72_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,) 

73_MAX_LOAD = 1.0 

74"""The load threshold above which to pause the incoming message stream.""" 

75 

76_RESUME_THRESHOLD = 0.8 

77"""The load threshold below which to resume the incoming message stream.""" 

78 

79_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60 

80"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for 

81a subscription. We do this to reduce premature ack expiration. 

82""" 

83 

84_DEFAULT_STREAM_ACK_DEADLINE: float = 60 

85"""The default stream ack deadline in seconds.""" 

86 

87_MAX_STREAM_ACK_DEADLINE: float = 600 

88"""The maximum stream ack deadline in seconds.""" 

89 

90_MIN_STREAM_ACK_DEADLINE: float = 10 

91"""The minimum stream ack deadline in seconds.""" 

92 

93_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { 

94 code_pb2.DEADLINE_EXCEEDED, 

95 code_pb2.RESOURCE_EXHAUSTED, 

96 code_pb2.ABORTED, 

97 code_pb2.INTERNAL, 

98 code_pb2.UNAVAILABLE, 

99} 

100 

101 

102def _wrap_as_exception(maybe_exception: Any) -> BaseException: 

103 """Wrap an object as a Python exception, if needed. 

104 

105 Args: 

106 maybe_exception: The object to wrap, usually a gRPC exception class. 

107 

108 Returns: 

109 The argument itself if an instance of ``BaseException``, otherwise 

110 the argument represented as an instance of ``Exception`` (sub)class. 

111 """ 

112 if isinstance(maybe_exception, grpc.RpcError): 

113 return exceptions.from_grpc_error(maybe_exception) 

114 elif isinstance(maybe_exception, BaseException): 

115 return maybe_exception 

116 

117 return Exception(maybe_exception) 

118 

119 

120def _wrap_callback_errors( 

121 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

122 on_callback_error: Callable[[BaseException], Any], 

123 message: "google.cloud.pubsub_v1.subscriber.message.Message", 

124): 

125 """Wraps a user callback so that if an exception occurs the message is 

126 nacked. 

127 

128 Args: 

129 callback: The user callback. 

130 message: The Pub/Sub message. 

131 """ 

132 try: 

133 if message.opentelemetry_data: 

134 message.opentelemetry_data.end_subscribe_concurrency_control_span() 

135 message.opentelemetry_data.start_process_span() 

136 callback(message) 

137 except BaseException as exc: 

138 # Note: the likelihood of this failing is extremely low. This just adds 

139 # a message to a queue, so if this doesn't work the world is in an 

140 # unrecoverable state and this thread should just bail. 

141 _LOGGER.exception( 

142 "Top-level exception occurred in callback while processing a message" 

143 ) 

144 message.nack() 

145 on_callback_error(exc) 

146 

147 

148def _get_status( 

149 exc: exceptions.GoogleAPICallError, 

150) -> Optional["status_pb2.Status"]: 

151 if not exc.response: 

152 _LOGGER.debug("No response obj in errored RPC call.") 

153 return None 

154 try: 

155 return rpc_status.from_call(exc.response) 

156 # Possible "If the gRPC call’s code or details are inconsistent 

157 # with the status code and message inside of the 

158 # google.rpc.status.Status" 

159 except ValueError: 

160 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True) 

161 return None 

162 

163 

164def _get_ack_errors( 

165 exc: exceptions.GoogleAPICallError, 

166) -> Optional[Dict[str, str]]: 

167 status = _get_status(exc) 

168 if not status: 

169 _LOGGER.debug("Unable to get status of errored RPC.") 

170 return None 

171 for detail in status.details: 

172 info = ErrorInfo() 

173 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)): 

174 _LOGGER.debug("Unable to unpack ErrorInfo.") 

175 return None 

176 return info.metadata 

177 return None 

178 

179 

180def _process_requests( 

181 error_status: Optional["status_pb2.Status"], 

182 ack_reqs_dict: Dict[str, requests.AckRequest], 

183 errors_dict: Optional[Dict[str, str]], 

184): 

185 """Process requests when exactly-once delivery is enabled by referring to 

186 error_status and errors_dict. 

187 

188 The errors returned by the server in as `error_status` or in `errors_dict` 

189 are used to complete the request futures in `ack_reqs_dict` (with a success 

190 or exception) or to return requests for further retries. 

191 """ 

192 requests_completed = [] 

193 requests_to_retry = [] 

194 for ack_id in ack_reqs_dict: 

195 # Handle special errors returned for ack/modack RPCs via the ErrorInfo 

196 # sidecar metadata when exactly-once delivery is enabled. 

197 if errors_dict and ack_id in errors_dict: 

198 exactly_once_error = errors_dict[ack_id] 

199 if exactly_once_error.startswith("TRANSIENT_"): 

200 requests_to_retry.append(ack_reqs_dict[ack_id]) 

201 else: 

202 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": 

203 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) 

204 else: 

205 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) 

206 future = ack_reqs_dict[ack_id].future 

207 if future is not None: 

208 future.set_exception(exc) 

209 requests_completed.append(ack_reqs_dict[ack_id]) 

210 # Temporary GRPC errors are retried 

211 elif ( 

212 error_status 

213 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS 

214 ): 

215 requests_to_retry.append(ack_reqs_dict[ack_id]) 

216 # Other GRPC errors are NOT retried 

217 elif error_status: 

218 if error_status.code == code_pb2.PERMISSION_DENIED: 

219 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) 

220 elif error_status.code == code_pb2.FAILED_PRECONDITION: 

221 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) 

222 else: 

223 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) 

224 future = ack_reqs_dict[ack_id].future 

225 if future is not None: 

226 future.set_exception(exc) 

227 requests_completed.append(ack_reqs_dict[ack_id]) 

228 # Since no error occurred, requests with futures are completed successfully. 

229 elif ack_reqs_dict[ack_id].future: 

230 future = ack_reqs_dict[ack_id].future 

231 # success 

232 assert future is not None 

233 future.set_result(AcknowledgeStatus.SUCCESS) 

234 requests_completed.append(ack_reqs_dict[ack_id]) 

235 # All other requests are considered completed. 

236 else: 

237 requests_completed.append(ack_reqs_dict[ack_id]) 

238 

239 return requests_completed, requests_to_retry 

240 

241 

242class StreamingPullManager(object): 

243 """The streaming pull manager coordinates pulling messages from Pub/Sub, 

244 leasing them, and scheduling them to be processed. 

245 

246 Args: 

247 client: 

248 The subscriber client used to create this instance. 

249 subscription: 

250 The name of the subscription. The canonical format for this is 

251 ``projects/{project}/subscriptions/{subscription}``. 

252 flow_control: 

253 The flow control settings. 

254 scheduler: 

255 The scheduler to use to process messages. If not provided, a thread 

256 pool-based scheduler will be used. 

257 use_legacy_flow_control: 

258 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

259 though client-side flow control is still enabled. If set to ``False`` 

260 (default), both server-side and client-side flow control are enabled. 

261 await_callbacks_on_shutdown: 

262 If ``True``, the shutdown thread will wait until all scheduler threads 

263 terminate and only then proceed with shutting down the remaining running 

264 helper threads. 

265 

266 If ``False`` (default), the shutdown thread will shut the scheduler down, 

267 but it will not wait for the currently executing scheduler threads to 

268 terminate. 

269 

270 This setting affects when the on close callbacks get invoked, and 

271 consequently, when the StreamingPullFuture associated with the stream gets 

272 resolved. 

273 """ 

274 

275 def __init__( 

276 self, 

277 client: "subscriber.Client", 

278 subscription: str, 

279 flow_control: types.FlowControl = types.FlowControl(), 

280 scheduler: Optional[ThreadScheduler] = None, 

281 use_legacy_flow_control: bool = False, 

282 await_callbacks_on_shutdown: bool = False, 

283 ): 

284 self._client = client 

285 self._subscription = subscription 

286 self._exactly_once_enabled = False 

287 self._flow_control = flow_control 

288 self._use_legacy_flow_control = use_legacy_flow_control 

289 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown 

290 self._ack_histogram = histogram.Histogram() 

291 self._last_histogram_size = 0 

292 self._stream_metadata = [ 

293 ["x-goog-request-params", "subscription=" + subscription] 

294 ] 

295 

296 # If max_duration_per_lease_extension is the default 

297 # we set the stream_ack_deadline to the default of 60 

298 if self._flow_control.max_duration_per_lease_extension == 0: 

299 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE 

300 # We will not be able to extend more than the default minimum 

301 elif ( 

302 self._flow_control.max_duration_per_lease_extension 

303 < _MIN_STREAM_ACK_DEADLINE 

304 ): 

305 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE 

306 # Will not be able to extend past the max 

307 elif ( 

308 self._flow_control.max_duration_per_lease_extension 

309 > _MAX_STREAM_ACK_DEADLINE 

310 ): 

311 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE 

312 else: 

313 self._stream_ack_deadline = ( 

314 self._flow_control.max_duration_per_lease_extension 

315 ) 

316 

317 self._ack_deadline = max( 

318 min( 

319 self._flow_control.min_duration_per_lease_extension, 

320 histogram.MAX_ACK_DEADLINE, 

321 ), 

322 histogram.MIN_ACK_DEADLINE, 

323 ) 

324 

325 self._rpc: Optional[bidi.ResumableBidiRpc] = None 

326 self._callback: Optional[functools.partial] = None 

327 self._closing = threading.Lock() 

328 self._closed = False 

329 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = [] 

330 # Guarded by self._exactly_once_enabled_lock 

331 self._send_new_ack_deadline = False 

332 

333 # A shutdown thread is created on intentional shutdown. 

334 self._regular_shutdown_thread: Optional[threading.Thread] = None 

335 

336 # Generate a random client id tied to this object. All streaming pull 

337 # connections (initial and re-connects) will then use the same client 

338 # id. Doing so lets the server establish affinity even across stream 

339 # disconncetions. 

340 self._client_id = str(uuid.uuid4()) 

341 

342 if scheduler is None: 

343 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler() 

344 else: 

345 self._scheduler = scheduler 

346 

347 # A collection for the messages that have been received from the server, 

348 # but not yet sent to the user callback. 

349 self._messages_on_hold = messages_on_hold.MessagesOnHold() 

350 

351 # The total number of bytes consumed by the messages currently on hold 

352 self._on_hold_bytes = 0 

353 

354 # A lock ensuring that pausing / resuming the consumer are both atomic 

355 # operations that cannot be executed concurrently. Needed for properly 

356 # syncing these operations with the current leaser load. Additionally, 

357 # the lock is used to protect modifications of internal data that 

358 # affects the load computation, i.e. the count and size of the messages 

359 # currently on hold. 

360 self._pause_resume_lock = threading.Lock() 

361 

362 # A lock guarding the self._exactly_once_enabled variable. We may also 

363 # acquire the self._ack_deadline_lock while this lock is held, but not 

364 # the reverse. So, we maintain a simple ordering of these two locks to 

365 # prevent deadlocks. 

366 self._exactly_once_enabled_lock = threading.Lock() 

367 

368 # A lock protecting the current ACK deadline used in the lease management. This 

369 # value can be potentially updated both by the leaser thread and by the message 

370 # consumer thread when invoking the internal _on_response() callback. 

371 self._ack_deadline_lock = threading.Lock() 

372 

373 # The threads created in ``.open()``. 

374 self._dispatcher: Optional[dispatcher.Dispatcher] = None 

375 self._leaser: Optional[leaser.Leaser] = None 

376 self._consumer: Optional[bidi.BackgroundConsumer] = None 

377 self._heartbeater: Optional[heartbeater.Heartbeater] = None 

378 

379 @property 

380 def is_active(self) -> bool: 

381 """``True`` if this manager is actively streaming. 

382 

383 Note that ``False`` does not indicate this is complete shut down, 

384 just that it stopped getting new messages. 

385 """ 

386 return self._consumer is not None and self._consumer.is_active 

387 

388 @property 

389 def flow_control(self) -> types.FlowControl: 

390 """The active flow control settings.""" 

391 return self._flow_control 

392 

393 @property 

394 def dispatcher(self) -> Optional[dispatcher.Dispatcher]: 

395 """The dispatcher helper.""" 

396 return self._dispatcher 

397 

398 @property 

399 def leaser(self) -> Optional[leaser.Leaser]: 

400 """The leaser helper.""" 

401 return self._leaser 

402 

403 @property 

404 def ack_histogram(self) -> histogram.Histogram: 

405 """The histogram tracking time-to-acknowledge.""" 

406 return self._ack_histogram 

407 

408 @property 

409 def ack_deadline(self) -> float: 

410 """Return the current ACK deadline based on historical data without updating it. 

411 

412 Returns: 

413 The ack deadline. 

414 """ 

415 return self._obtain_ack_deadline(maybe_update=False) 

416 

417 def _obtain_ack_deadline(self, maybe_update: bool) -> float: 

418 """The actual `ack_deadline` implementation. 

419 

420 This method is "sticky". It will only perform the computations to check on the 

421 right ACK deadline if explicitly requested AND if the histogram with past 

422 time-to-ack data has gained a significant amount of new information. 

423 

424 Args: 

425 maybe_update: 

426 If ``True``, also update the current ACK deadline before returning it if 

427 enough new ACK data has been gathered. 

428 

429 Returns: 

430 The current ACK deadline in seconds to use. 

431 """ 

432 with self._ack_deadline_lock: 

433 if not maybe_update: 

434 return self._ack_deadline 

435 

436 target_size = min( 

437 self._last_histogram_size * 2, self._last_histogram_size + 100 

438 ) 

439 hist_size = len(self.ack_histogram) 

440 

441 if hist_size > target_size: 

442 self._last_histogram_size = hist_size 

443 self._ack_deadline = self.ack_histogram.percentile(percent=99) 

444 

445 if self.flow_control.max_duration_per_lease_extension > 0: 

446 # The setting in flow control could be too low, adjust if needed. 

447 flow_control_setting = max( 

448 self.flow_control.max_duration_per_lease_extension, 

449 histogram.MIN_ACK_DEADLINE, 

450 ) 

451 self._ack_deadline = min(self._ack_deadline, flow_control_setting) 

452 

453 # If the user explicitly sets a min ack_deadline, respect it. 

454 if self.flow_control.min_duration_per_lease_extension > 0: 

455 # The setting in flow control could be too high, adjust if needed. 

456 flow_control_setting = min( 

457 self.flow_control.min_duration_per_lease_extension, 

458 histogram.MAX_ACK_DEADLINE, 

459 ) 

460 self._ack_deadline = max(self._ack_deadline, flow_control_setting) 

461 elif self._exactly_once_enabled: 

462 # Higher minimum ack_deadline for subscriptions with 

463 # exactly-once delivery enabled. 

464 self._ack_deadline = max( 

465 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED 

466 ) 

467 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline 

468 # set the stream_ack_deadline to the new ack_deadline. 

469 if self._ack_deadline > self._stream_ack_deadline: 

470 self._stream_ack_deadline = self._ack_deadline 

471 return self._ack_deadline 

472 

473 @property 

474 def load(self) -> float: 

475 """Return the current load. 

476 

477 The load is represented as a float, where 1.0 represents having 

478 hit one of the flow control limits, and values between 0.0 and 1.0 

479 represent how close we are to them. (0.5 means we have exactly half 

480 of what the flow control setting allows, for example.) 

481 

482 There are (currently) two flow control settings; this property 

483 computes how close the manager is to each of them, and returns 

484 whichever value is higher. (It does not matter that we have lots of 

485 running room on setting A if setting B is over.) 

486 

487 Returns: 

488 The load value. 

489 """ 

490 if self._leaser is None: 

491 return 0.0 

492 

493 # Messages that are temporarily put on hold are not being delivered to 

494 # user's callbacks, thus they should not contribute to the flow control 

495 # load calculation. 

496 # However, since these messages must still be lease-managed to avoid 

497 # unnecessary ACK deadline expirations, their count and total size must 

498 # be subtracted from the leaser's values. 

499 return max( 

500 [ 

501 (self._leaser.message_count - self._messages_on_hold.size) 

502 / self._flow_control.max_messages, 

503 (self._leaser.bytes - self._on_hold_bytes) 

504 / self._flow_control.max_bytes, 

505 ] 

506 ) 

507 

508 def add_close_callback( 

509 self, callback: Callable[["StreamingPullManager", Any], Any] 

510 ) -> None: 

511 """Schedules a callable when the manager closes. 

512 

513 Args: 

514 The method to call. 

515 """ 

516 self._close_callbacks.append(callback) 

517 

518 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None: 

519 """Send the next message in the queue for each of the passed-in 

520 ordering keys, if they exist. Clean up state for keys that no longer 

521 have any queued messages. 

522 

523 Since the load went down by one message, it's probably safe to send the 

524 user another message for the same key. Since the released message may be 

525 bigger than the previous one, this may increase the load above the maximum. 

526 This decision is by design because it simplifies MessagesOnHold. 

527 

528 Args: 

529 ordering_keys: 

530 A sequence of ordering keys to activate. May be empty. 

531 """ 

532 with self._pause_resume_lock: 

533 if self._scheduler is None: 

534 return # We are shutting down, don't try to dispatch any more messages. 

535 

536 self._messages_on_hold.activate_ordering_keys( 

537 ordering_keys, self._schedule_message_on_hold 

538 ) 

539 

540 def maybe_pause_consumer(self) -> None: 

541 """Check the current load and pause the consumer if needed.""" 

542 with self._pause_resume_lock: 

543 if self.load >= _MAX_LOAD: 

544 if self._consumer is not None and not self._consumer.is_paused: 

545 _LOGGER.debug( 

546 "Message backlog over load at %.2f, pausing.", self.load 

547 ) 

548 self._consumer.pause() 

549 

550 def maybe_resume_consumer(self) -> None: 

551 """Check the load and held messages and resume the consumer if needed. 

552 

553 If there are messages held internally, release those messages before 

554 resuming the consumer. That will avoid leaser overload. 

555 """ 

556 with self._pause_resume_lock: 

557 # If we have been paused by flow control, check and see if we are 

558 # back within our limits. 

559 # 

560 # In order to not thrash too much, require us to have passed below 

561 # the resume threshold (80% by default) of each flow control setting 

562 # before restarting. 

563 if self._consumer is None or not self._consumer.is_paused: 

564 return 

565 

566 _LOGGER.debug("Current load: %.2f", self.load) 

567 

568 # Before maybe resuming the background consumer, release any messages 

569 # currently on hold, if the current load allows for it. 

570 self._maybe_release_messages() 

571 

572 if self.load < _RESUME_THRESHOLD: 

573 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) 

574 self._consumer.resume() 

575 else: 

576 _LOGGER.debug("Did not resume, current load is %.2f.", self.load) 

577 

578 def _maybe_release_messages(self) -> None: 

579 """Release (some of) the held messages if the current load allows for it. 

580 

581 The method tries to release as many messages as the current leaser load 

582 would allow. Each released message is added to the lease management, 

583 and the user callback is scheduled for it. 

584 

585 If there are currently no messages on hold, or if the leaser is 

586 already overloaded, this method is effectively a no-op. 

587 

588 The method assumes the caller has acquired the ``_pause_resume_lock``. 

589 """ 

590 released_ack_ids = [] 

591 while self.load < _MAX_LOAD: 

592 msg = self._messages_on_hold.get() 

593 if not msg: 

594 break 

595 if msg.opentelemetry_data: 

596 msg.opentelemetry_data.end_subscribe_scheduler_span() 

597 self._schedule_message_on_hold(msg) 

598 released_ack_ids.append(msg.ack_id) 

599 

600 assert self._leaser is not None 

601 self._leaser.start_lease_expiry_timer(released_ack_ids) 

602 

603 def _schedule_message_on_hold( 

604 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message" 

605 ): 

606 """Schedule a message on hold to be sent to the user and change on-hold-bytes. 

607 

608 The method assumes the caller has acquired the ``_pause_resume_lock``. 

609 

610 Args: 

611 msg: The message to schedule to be sent to the user. 

612 """ 

613 assert msg, "Message must not be None." 

614 

615 # On-hold bytes goes down, increasing load. 

616 self._on_hold_bytes -= msg.size 

617 

618 if self._on_hold_bytes < 0: 

619 _LOGGER.warning( 

620 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes 

621 ) 

622 self._on_hold_bytes = 0 

623 

624 _LOGGER.debug( 

625 "Released held message, scheduling callback for it, " 

626 "still on hold %s (bytes %s).", 

627 self._messages_on_hold.size, 

628 self._on_hold_bytes, 

629 ) 

630 assert self._scheduler is not None 

631 assert self._callback is not None 

632 if msg.opentelemetry_data: 

633 msg.opentelemetry_data.start_subscribe_concurrency_control_span() 

634 self._scheduler.schedule(self._callback, msg) 

635 

636 def send_unary_ack( 

637 self, ack_ids, ack_reqs_dict 

638 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: 

639 """Send a request using a separate unary request instead of over the stream. 

640 

641 If a RetryError occurs, the manager shutdown is triggered, and the 

642 error is re-raised. 

643 """ 

644 assert ack_ids 

645 assert len(ack_ids) == len(ack_reqs_dict) 

646 

647 error_status = None 

648 ack_errors_dict = None 

649 try: 

650 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids) 

651 except exceptions.GoogleAPICallError as exc: 

652 _LOGGER.debug( 

653 "Exception while sending unary RPC. This is typically " 

654 "non-fatal as stream requests are best-effort.", 

655 exc_info=True, 

656 ) 

657 error_status = _get_status(exc) 

658 ack_errors_dict = _get_ack_errors(exc) 

659 except exceptions.RetryError as exc: 

660 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

661 # Makes sure to complete futures so they don't block forever. 

662 for req in ack_reqs_dict.values(): 

663 # Futures may be present even with exactly-once delivery 

664 # disabled, in transition periods after the setting is changed on 

665 # the subscription. 

666 if req.future: 

667 if exactly_once_delivery_enabled: 

668 e = AcknowledgeError( 

669 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." 

670 ) 

671 req.future.set_exception(e) 

672 else: 

673 req.future.set_result(AcknowledgeStatus.SUCCESS) 

674 

675 _LOGGER.debug( 

676 "RetryError while sending ack RPC. Waiting on a transient " 

677 "error resolution for too long, will now trigger shutdown.", 

678 exc_info=False, 

679 ) 

680 # The underlying channel has been suffering from a retryable error 

681 # for too long, time to give up and shut the streaming pull down. 

682 self._on_rpc_done(exc) 

683 raise 

684 

685 if self._exactly_once_delivery_enabled(): 

686 requests_completed, requests_to_retry = _process_requests( 

687 error_status, ack_reqs_dict, ack_errors_dict 

688 ) 

689 else: 

690 requests_completed = [] 

691 requests_to_retry = [] 

692 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

693 # best-effort. So, they always succeed even if the RPC fails. 

694 for req in ack_reqs_dict.values(): 

695 # Futures may be present even with exactly-once delivery 

696 # disabled, in transition periods after the setting is changed on 

697 # the subscription. 

698 if req.future: 

699 req.future.set_result(AcknowledgeStatus.SUCCESS) 

700 requests_completed.append(req) 

701 

702 return requests_completed, requests_to_retry 

703 

704 def send_unary_modack( 

705 self, 

706 modify_deadline_ack_ids, 

707 modify_deadline_seconds, 

708 ack_reqs_dict, 

709 default_deadline=None, 

710 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: 

711 """Send a request using a separate unary request instead of over the stream. 

712 

713 If a RetryError occurs, the manager shutdown is triggered, and the 

714 error is re-raised. 

715 """ 

716 assert modify_deadline_ack_ids 

717 # Either we have a generator or a single deadline. 

718 assert modify_deadline_seconds is None or default_deadline is None 

719 

720 error_status = None 

721 modack_errors_dict = None 

722 try: 

723 if default_deadline is None: 

724 # Send ack_ids with the same deadline seconds together. 

725 deadline_to_ack_ids = collections.defaultdict(list) 

726 

727 for n, ack_id in enumerate(modify_deadline_ack_ids): 

728 deadline = modify_deadline_seconds[n] 

729 deadline_to_ack_ids[deadline].append(ack_id) 

730 

731 for deadline, ack_ids in deadline_to_ack_ids.items(): 

732 self._client.modify_ack_deadline( 

733 subscription=self._subscription, 

734 ack_ids=ack_ids, 

735 ack_deadline_seconds=deadline, 

736 ) 

737 else: 

738 # We can send all requests with the default deadline. 

739 self._client.modify_ack_deadline( 

740 subscription=self._subscription, 

741 ack_ids=modify_deadline_ack_ids, 

742 ack_deadline_seconds=default_deadline, 

743 ) 

744 except exceptions.GoogleAPICallError as exc: 

745 _LOGGER.debug( 

746 "Exception while sending unary RPC. This is typically " 

747 "non-fatal as stream requests are best-effort.", 

748 exc_info=True, 

749 ) 

750 error_status = _get_status(exc) 

751 modack_errors_dict = _get_ack_errors(exc) 

752 except exceptions.RetryError as exc: 

753 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

754 # Makes sure to complete futures so they don't block forever. 

755 for req in ack_reqs_dict.values(): 

756 # Futures may be present even with exactly-once delivery 

757 # disabled, in transition periods after the setting is changed on 

758 # the subscription. 

759 if req.future: 

760 if exactly_once_delivery_enabled: 

761 e = AcknowledgeError( 

762 AcknowledgeStatus.OTHER, 

763 "RetryError while sending modack RPC.", 

764 ) 

765 req.future.set_exception(e) 

766 else: 

767 req.future.set_result(AcknowledgeStatus.SUCCESS) 

768 

769 _LOGGER.debug( 

770 "RetryError while sending modack RPC. Waiting on a transient " 

771 "error resolution for too long, will now trigger shutdown.", 

772 exc_info=False, 

773 ) 

774 # The underlying channel has been suffering from a retryable error 

775 # for too long, time to give up and shut the streaming pull down. 

776 self._on_rpc_done(exc) 

777 raise 

778 

779 if self._exactly_once_delivery_enabled(): 

780 requests_completed, requests_to_retry = _process_requests( 

781 error_status, ack_reqs_dict, modack_errors_dict 

782 ) 

783 else: 

784 requests_completed = [] 

785 requests_to_retry = [] 

786 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

787 # best-effort. So, they always succeed even if the RPC fails. 

788 for req in ack_reqs_dict.values(): 

789 # Futures may be present even with exactly-once delivery 

790 # disabled, in transition periods after the setting is changed on 

791 # the subscription. 

792 if req.future: 

793 req.future.set_result(AcknowledgeStatus.SUCCESS) 

794 requests_completed.append(req) 

795 

796 return requests_completed, requests_to_retry 

797 

798 def heartbeat(self) -> bool: 

799 """Sends a heartbeat request over the streaming pull RPC. 

800 

801 The request is empty by default, but may contain the current ack_deadline 

802 if the self._exactly_once_enabled flag has changed. 

803 

804 Returns: 

805 If a heartbeat request has actually been sent. 

806 """ 

807 if self._rpc is not None and self._rpc.is_active: 

808 send_new_ack_deadline = False 

809 with self._exactly_once_enabled_lock: 

810 send_new_ack_deadline = self._send_new_ack_deadline 

811 self._send_new_ack_deadline = False 

812 

813 if send_new_ack_deadline: 

814 request = gapic_types.StreamingPullRequest( 

815 stream_ack_deadline_seconds=self._stream_ack_deadline 

816 ) 

817 _LOGGER.debug( 

818 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline 

819 ) 

820 else: 

821 request = gapic_types.StreamingPullRequest() 

822 

823 self._rpc.send(request) 

824 return True 

825 

826 return False 

827 

828 def open( 

829 self, 

830 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

831 on_callback_error: Callable[[Exception], Any], 

832 ) -> None: 

833 """Begin consuming messages. 

834 

835 Args: 

836 callback: 

837 A callback that will be called for each message received on the 

838 stream. 

839 on_callback_error: 

840 A callable that will be called if an exception is raised in 

841 the provided `callback`. 

842 """ 

843 if self.is_active: 

844 raise ValueError("This manager is already open.") 

845 

846 if self._closed: 

847 raise ValueError("This manager has been closed and can not be re-used.") 

848 

849 self._callback = functools.partial( 

850 _wrap_callback_errors, callback, on_callback_error 

851 ) 

852 

853 # Create the RPC 

854 stream_ack_deadline_seconds = self._stream_ack_deadline 

855 

856 get_initial_request = functools.partial( 

857 self._get_initial_request, stream_ack_deadline_seconds 

858 ) 

859 self._rpc = bidi.ResumableBidiRpc( 

860 start_rpc=self._client.streaming_pull, 

861 initial_request=get_initial_request, 

862 should_recover=self._should_recover, 

863 should_terminate=self._should_terminate, 

864 metadata=self._stream_metadata, 

865 throttle_reopen=True, 

866 ) 

867 self._rpc.add_done_callback(self._on_rpc_done) 

868 

869 _LOGGER.debug( 

870 "Creating a stream, default ACK deadline set to {} seconds.".format( 

871 self._stream_ack_deadline 

872 ) 

873 ) 

874 

875 # Create references to threads 

876 assert self._scheduler is not None 

877 scheduler_queue = self._scheduler.queue 

878 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue) 

879 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) 

880 self._leaser = leaser.Leaser(self) 

881 self._heartbeater = heartbeater.Heartbeater(self) 

882 

883 # Start the thread to pass the requests. 

884 self._dispatcher.start() 

885 

886 # Start consuming messages. 

887 self._consumer.start() 

888 

889 # Start the lease maintainer thread. 

890 self._leaser.start() 

891 

892 # Start the stream heartbeater thread. 

893 self._heartbeater.start() 

894 

895 def close(self, reason: Any = None) -> None: 

896 """Stop consuming messages and shutdown all helper threads. 

897 

898 This method is idempotent. Additional calls will have no effect. 

899 

900 The method does not block, it delegates the shutdown operations to a background 

901 thread. 

902 

903 Args: 

904 reason: 

905 The reason to close this. If ``None``, this is considered 

906 an "intentional" shutdown. This is passed to the callbacks 

907 specified via :meth:`add_close_callback`. 

908 """ 

909 self._regular_shutdown_thread = threading.Thread( 

910 name=_REGULAR_SHUTDOWN_THREAD_NAME, 

911 daemon=True, 

912 target=self._shutdown, 

913 kwargs={"reason": reason}, 

914 ) 

915 self._regular_shutdown_thread.start() 

916 

917 def _shutdown(self, reason: Any = None) -> None: 

918 """Run the actual shutdown sequence (stop the stream and all helper threads). 

919 

920 Args: 

921 reason: 

922 The reason to close the stream. If ``None``, this is considered 

923 an "intentional" shutdown. 

924 """ 

925 with self._closing: 

926 if self._closed: 

927 return 

928 

929 # Stop consuming messages. 

930 if self.is_active: 

931 _LOGGER.debug("Stopping consumer.") 

932 assert self._consumer is not None 

933 self._consumer.stop() 

934 self._consumer = None 

935 

936 # Shutdown all helper threads 

937 _LOGGER.debug("Stopping scheduler.") 

938 assert self._scheduler is not None 

939 dropped_messages = self._scheduler.shutdown( 

940 await_msg_callbacks=self._await_callbacks_on_shutdown 

941 ) 

942 self._scheduler = None 

943 

944 # Leaser and dispatcher reference each other through the shared 

945 # StreamingPullManager instance, i.e. "self", thus do not set their 

946 # references to None until both have been shut down. 

947 # 

948 # NOTE: Even if the dispatcher operates on an inactive leaser using 

949 # the latter's add() and remove() methods, these have no impact on 

950 # the stopped leaser (the leaser is never again re-started). Ditto 

951 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), 

952 # because the consumer gets shut down first. 

953 _LOGGER.debug("Stopping leaser.") 

954 assert self._leaser is not None 

955 self._leaser.stop() 

956 

957 total = len(dropped_messages) + len( 

958 self._messages_on_hold._messages_on_hold 

959 ) 

960 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).") 

961 messages_to_nack = itertools.chain( 

962 dropped_messages, self._messages_on_hold._messages_on_hold 

963 ) 

964 for msg in messages_to_nack: 

965 msg.nack() 

966 

967 _LOGGER.debug("Stopping dispatcher.") 

968 assert self._dispatcher is not None 

969 self._dispatcher.stop() 

970 self._dispatcher = None 

971 # dispatcher terminated, OK to dispose the leaser reference now 

972 self._leaser = None 

973 

974 _LOGGER.debug("Stopping heartbeater.") 

975 assert self._heartbeater is not None 

976 self._heartbeater.stop() 

977 self._heartbeater = None 

978 

979 self._rpc = None 

980 self._closed = True 

981 _LOGGER.debug("Finished stopping manager.") 

982 

983 for callback in self._close_callbacks: 

984 callback(self, reason) 

985 

986 def _get_initial_request( 

987 self, stream_ack_deadline_seconds: int 

988 ) -> gapic_types.StreamingPullRequest: 

989 """Return the initial request for the RPC. 

990 

991 This defines the initial request that must always be sent to Pub/Sub 

992 immediately upon opening the subscription. 

993 

994 Args: 

995 stream_ack_deadline_seconds: 

996 The default message acknowledge deadline for the stream. 

997 

998 Returns: 

999 A request suitable for being the first request on the stream (and not 

1000 suitable for any other purpose). 

1001 """ 

1002 # Put the request together. 

1003 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt 

1004 # anyway. Set to some big-ish value in case we modack late. 

1005 request = gapic_types.StreamingPullRequest( 

1006 stream_ack_deadline_seconds=stream_ack_deadline_seconds, 

1007 modify_deadline_ack_ids=[], 

1008 modify_deadline_seconds=[], 

1009 subscription=self._subscription, 

1010 client_id=self._client_id, 

1011 max_outstanding_messages=( 

1012 0 if self._use_legacy_flow_control else self._flow_control.max_messages 

1013 ), 

1014 max_outstanding_bytes=( 

1015 0 if self._use_legacy_flow_control else self._flow_control.max_bytes 

1016 ), 

1017 ) 

1018 

1019 # Return the initial request. 

1020 return request 

1021 

1022 def _send_lease_modacks( 

1023 self, 

1024 ack_ids: Iterable[str], 

1025 ack_deadline: float, 

1026 opentelemetry_data: List[SubscribeOpenTelemetry], 

1027 warn_on_invalid=True, 

1028 receipt_modack: bool = False, 

1029 ) -> Set[str]: 

1030 exactly_once_enabled = False 

1031 

1032 modack_span: Optional[trace.Span] = None 

1033 if self._client.open_telemetry_enabled: 

1034 subscribe_span_links: List[trace.Link] = [] 

1035 subscribe_spans: List[trace.Span] = [] 

1036 subscription_split: List[str] = self._subscription.split("/") 

1037 assert len(subscription_split) == 4 

1038 subscription_id: str = subscription_split[3] 

1039 project_id: str = subscription_split[1] 

1040 for data in opentelemetry_data: 

1041 subscribe_span: Optional[trace.Span] = data.subscribe_span 

1042 if ( 

1043 subscribe_span 

1044 and subscribe_span.get_span_context().trace_flags.sampled 

1045 ): 

1046 subscribe_span_links.append( 

1047 trace.Link(subscribe_span.get_span_context()) 

1048 ) 

1049 subscribe_spans.append(subscribe_span) 

1050 modack_span = start_modack_span( 

1051 subscribe_span_links, 

1052 subscription_id, 

1053 len(opentelemetry_data), 

1054 ack_deadline, 

1055 project_id, 

1056 "_send_lease_modacks", 

1057 receipt_modack, 

1058 ) 

1059 if ( 

1060 modack_span and modack_span.get_span_context().trace_flags.sampled 

1061 ): # pragma: NO COVER 

1062 modack_span_context: trace.SpanContext = modack_span.get_span_context() 

1063 for subscribe_span in subscribe_spans: 

1064 subscribe_span.add_link( 

1065 context=modack_span_context, 

1066 attributes={ 

1067 "messaging.operation.name": "modack", 

1068 }, 

1069 ) 

1070 

1071 with self._exactly_once_enabled_lock: 

1072 exactly_once_enabled = self._exactly_once_enabled 

1073 if exactly_once_enabled: 

1074 eod_items: List[requests.ModAckRequest] = [] 

1075 if self._client.open_telemetry_enabled: 

1076 for ack_id, data in zip( 

1077 ack_ids, opentelemetry_data 

1078 ): # pragma: NO COVER # Identical code covered in the same function below 

1079 assert data is not None 

1080 eod_items.append( 

1081 requests.ModAckRequest( 

1082 ack_id, 

1083 ack_deadline, 

1084 futures.Future(), 

1085 data, 

1086 ) 

1087 ) 

1088 else: 

1089 eod_items = [ 

1090 requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) 

1091 for ack_id in ack_ids 

1092 ] 

1093 

1094 assert self._dispatcher is not None 

1095 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline) 

1096 if ( 

1097 modack_span 

1098 ): # pragma: NO COVER # Identical code covered in the same function below 

1099 modack_span.end() 

1100 expired_ack_ids = set() 

1101 for req in eod_items: 

1102 try: 

1103 assert req.future is not None 

1104 req.future.result() 

1105 except AcknowledgeError as ack_error: 

1106 if ( 

1107 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID 

1108 or warn_on_invalid 

1109 ): 

1110 _LOGGER.warning( 

1111 "AcknowledgeError when lease-modacking a message.", 

1112 exc_info=True, 

1113 ) 

1114 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: 

1115 expired_ack_ids.add(req.ack_id) 

1116 return expired_ack_ids 

1117 else: 

1118 items: List[requests.ModAckRequest] = [] 

1119 if self._client.open_telemetry_enabled: 

1120 for ack_id, data in zip(ack_ids, opentelemetry_data): 

1121 assert data is not None 

1122 items.append( 

1123 requests.ModAckRequest( 

1124 ack_id, 

1125 self.ack_deadline, 

1126 None, 

1127 data, 

1128 ) 

1129 ) 

1130 else: 

1131 items = [ 

1132 requests.ModAckRequest(ack_id, self.ack_deadline, None) 

1133 for ack_id in ack_ids 

1134 ] 

1135 assert self._dispatcher is not None 

1136 self._dispatcher.modify_ack_deadline(items, ack_deadline) 

1137 if modack_span: 

1138 modack_span.end() 

1139 return set() 

1140 

1141 def _exactly_once_delivery_enabled(self) -> bool: 

1142 """Whether exactly-once delivery is enabled for the subscription.""" 

1143 with self._exactly_once_enabled_lock: 

1144 return self._exactly_once_enabled 

1145 

1146 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: 

1147 """Process all received Pub/Sub messages. 

1148 

1149 For each message, send a modified acknowledgment request to the 

1150 server. This prevents expiration of the message due to buffering by 

1151 gRPC or proxy/firewall. This makes the server and client expiration 

1152 timer closer to each other thus preventing the message being 

1153 redelivered multiple times. 

1154 

1155 After the messages have all had their ack deadline updated, execute 

1156 the callback for each message using the executor. 

1157 """ 

1158 if response is None: 

1159 _LOGGER.debug( 

1160 "Response callback invoked with None, likely due to a " 

1161 "transport shutdown." 

1162 ) 

1163 return 

1164 

1165 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying 

1166 # protobuf message to significantly gain on attribute access performance. 

1167 received_messages = response._pb.received_messages 

1168 

1169 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = [] 

1170 if self._client.open_telemetry_enabled: 

1171 for received_message in received_messages: 

1172 opentelemetry_data = SubscribeOpenTelemetry(received_message.message) 

1173 opentelemetry_data.start_subscribe_span( 

1174 self._subscription, 

1175 response.subscription_properties.exactly_once_delivery_enabled, 

1176 received_message.ack_id, 

1177 received_message.delivery_attempt, 

1178 ) 

1179 subscribe_opentelemetry.append(opentelemetry_data) 

1180 

1181 _LOGGER.debug( 

1182 "Processing %s received message(s), currently on hold %s (bytes %s).", 

1183 len(received_messages), 

1184 self._messages_on_hold.size, 

1185 self._on_hold_bytes, 

1186 ) 

1187 

1188 with self._exactly_once_enabled_lock: 

1189 if ( 

1190 response.subscription_properties.exactly_once_delivery_enabled 

1191 != self._exactly_once_enabled 

1192 ): 

1193 self._exactly_once_enabled = ( 

1194 response.subscription_properties.exactly_once_delivery_enabled 

1195 ) 

1196 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled 

1197 # This method acquires the self._ack_deadline_lock lock. 

1198 self._obtain_ack_deadline(maybe_update=True) 

1199 self._send_new_ack_deadline = True 

1200 

1201 # Immediately (i.e. without waiting for the auto lease management) 

1202 # modack the messages we received, as this tells the server that we've 

1203 # received them. 

1204 ack_id_gen = (message.ack_id for message in received_messages) 

1205 expired_ack_ids = self._send_lease_modacks( 

1206 ack_id_gen, 

1207 self.ack_deadline, 

1208 subscribe_opentelemetry, 

1209 warn_on_invalid=False, 

1210 receipt_modack=True, 

1211 ) 

1212 

1213 with self._pause_resume_lock: 

1214 if self._scheduler is None or self._leaser is None: 

1215 _LOGGER.debug( 

1216 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing." 

1217 ) 

1218 return 

1219 

1220 i: int = 0 

1221 for received_message in received_messages: 

1222 if ( 

1223 not self._exactly_once_delivery_enabled() 

1224 or received_message.ack_id not in expired_ack_ids 

1225 ): 

1226 message = google.cloud.pubsub_v1.subscriber.message.Message( 

1227 received_message.message, 

1228 received_message.ack_id, 

1229 received_message.delivery_attempt, 

1230 self._scheduler.queue, 

1231 self._exactly_once_delivery_enabled, 

1232 ) 

1233 if self._client.open_telemetry_enabled: 

1234 message.opentelemetry_data = subscribe_opentelemetry[i] 

1235 i = i + 1 

1236 self._messages_on_hold.put(message) 

1237 self._on_hold_bytes += message.size 

1238 req = requests.LeaseRequest( 

1239 ack_id=message.ack_id, 

1240 byte_size=message.size, 

1241 ordering_key=message.ordering_key, 

1242 opentelemetry_data=message.opentelemetry_data, 

1243 ) 

1244 self._leaser.add([req]) 

1245 

1246 self._maybe_release_messages() 

1247 

1248 self.maybe_pause_consumer() 

1249 

1250 def _should_recover(self, exception: BaseException) -> bool: 

1251 """Determine if an error on the RPC stream should be recovered. 

1252 

1253 If the exception is one of the retryable exceptions, this will signal 

1254 to the consumer thread that it should "recover" from the failure. 

1255 

1256 This will cause the stream to exit when it returns :data:`False`. 

1257 

1258 Returns: 

1259 Indicates if the caller should recover or shut down. 

1260 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1261 in a list of retryable / idempotent exceptions. 

1262 """ 

1263 exception = _wrap_as_exception(exception) 

1264 # If this is in the list of idempotent exceptions, then we want to 

1265 # recover. 

1266 if isinstance(exception, _RETRYABLE_STREAM_ERRORS): 

1267 _LOGGER.debug("Observed recoverable stream error %s", exception) 

1268 return True 

1269 _LOGGER.debug("Observed non-recoverable stream error %s", exception) 

1270 return False 

1271 

1272 def _should_terminate(self, exception: BaseException) -> bool: 

1273 """Determine if an error on the RPC stream should be terminated. 

1274 

1275 If the exception is one of the terminating exceptions, this will signal 

1276 to the consumer thread that it should terminate. 

1277 

1278 This will cause the stream to exit when it returns :data:`True`. 

1279 

1280 Returns: 

1281 Indicates if the caller should terminate or attempt recovery. 

1282 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1283 in a list of terminating exceptions. 

1284 """ 

1285 exception = _wrap_as_exception(exception) 

1286 if isinstance(exception, _TERMINATING_STREAM_ERRORS): 

1287 _LOGGER.debug("Observed terminating stream error %s", exception) 

1288 return True 

1289 _LOGGER.debug("Observed non-terminating stream error %s", exception) 

1290 return False 

1291 

1292 def _on_rpc_done(self, future: Any) -> None: 

1293 """Triggered whenever the underlying RPC terminates without recovery. 

1294 

1295 This is typically triggered from one of two threads: the background 

1296 consumer thread (when calling ``recv()`` produces a non-recoverable 

1297 error) or the grpc management thread (when cancelling the RPC). 

1298 

1299 This method is *non-blocking*. It will start another thread to deal 

1300 with shutting everything down. This is to prevent blocking in the 

1301 background consumer and preventing it from being ``joined()``. 

1302 """ 

1303 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.") 

1304 error = _wrap_as_exception(future) 

1305 thread = threading.Thread( 

1306 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} 

1307 ) 

1308 thread.daemon = True 

1309 thread.start()