Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

519 statements  

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import division 

16 

17import collections 

18import functools 

19import inspect 

20import itertools 

21import logging 

22import threading 

23import typing 

24from typing import ( 

25 Any, 

26 Dict, 

27 Callable, 

28 Iterable, 

29 List, 

30 Optional, 

31 Set, 

32 Tuple, 

33) 

34import uuid 

35 

36from opentelemetry import trace 

37import grpc # type: ignore 

38 

39from google.api_core import bidi 

40from google.api_core import exceptions 

41from google.cloud.pubsub_v1 import types 

42from google.cloud.pubsub_v1.subscriber._protocol import dispatcher 

43from google.cloud.pubsub_v1.subscriber._protocol import heartbeater 

44from google.cloud.pubsub_v1.subscriber._protocol import histogram 

45from google.cloud.pubsub_v1.subscriber._protocol import leaser 

46from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold 

47from google.cloud.pubsub_v1.subscriber._protocol import requests 

48from google.cloud.pubsub_v1.subscriber.exceptions import ( 

49 AcknowledgeError, 

50 AcknowledgeStatus, 

51) 

52from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

53 SubscribeOpenTelemetry, 

54) 

55import google.cloud.pubsub_v1.subscriber.message 

56from google.cloud.pubsub_v1.subscriber import futures 

57from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler 

58from google.pubsub_v1 import types as gapic_types 

59from grpc_status import rpc_status # type: ignore 

60from google.rpc.error_details_pb2 import ErrorInfo # type: ignore 

61from google.rpc import code_pb2 # type: ignore 

62from google.rpc import status_pb2 

63from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

64 start_modack_span, 

65) 

66 

67if typing.TYPE_CHECKING: # pragma: NO COVER 

68 from google.cloud.pubsub_v1 import subscriber 

69 

70 

71_LOGGER = logging.getLogger(__name__) 

72_SLOW_ACK_LOGGER = logging.getLogger("slow-ack") 

73_STREAMS_LOGGER = logging.getLogger("subscriber-streams") 

74_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control") 

75_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery") 

76_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions") 

77_EXPIRY_LOGGER = logging.getLogger("expiry") 

78_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" 

79_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" 

80_RETRYABLE_STREAM_ERRORS = ( 

81 exceptions.Aborted, 

82 exceptions.DeadlineExceeded, 

83 exceptions.GatewayTimeout, 

84 exceptions.InternalServerError, 

85 exceptions.ResourceExhausted, 

86 exceptions.ServiceUnavailable, 

87 exceptions.Unknown, 

88) 

89_TERMINATING_STREAM_ERRORS = ( 

90 exceptions.Cancelled, 

91 exceptions.InvalidArgument, 

92 exceptions.NotFound, 

93 exceptions.PermissionDenied, 

94 exceptions.Unauthenticated, 

95 exceptions.Unauthorized, 

96) 

97_MAX_LOAD = 1.0 

98"""The load threshold above which to pause the incoming message stream.""" 

99 

100_RESUME_THRESHOLD = 0.8 

101"""The load threshold below which to resume the incoming message stream.""" 

102 

103_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60 

104"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for 

105a subscription. We do this to reduce premature ack expiration. 

106""" 

107 

108_DEFAULT_STREAM_ACK_DEADLINE: float = 60 

109"""The default stream ack deadline in seconds.""" 

110 

111_MAX_STREAM_ACK_DEADLINE: float = 600 

112"""The maximum stream ack deadline in seconds.""" 

113 

114_MIN_STREAM_ACK_DEADLINE: float = 10 

115"""The minimum stream ack deadline in seconds.""" 

116 

117_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { 

118 code_pb2.DEADLINE_EXCEEDED, 

119 code_pb2.RESOURCE_EXHAUSTED, 

120 code_pb2.ABORTED, 

121 code_pb2.INTERNAL, 

122 code_pb2.UNAVAILABLE, 

123} 

124 

125# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform 

126# callers on unrecoverable errors. We can only pass this arg if it's available in the 

127# `BackgroundConsumer` spec. 

128_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec( 

129 bidi.BackgroundConsumer 

130) 

131 

132 

133def _wrap_as_exception(maybe_exception: Any) -> BaseException: 

134 """Wrap an object as a Python exception, if needed. 

135 

136 Args: 

137 maybe_exception: The object to wrap, usually a gRPC exception class. 

138 

139 Returns: 

140 The argument itself if an instance of ``BaseException``, otherwise 

141 the argument represented as an instance of ``Exception`` (sub)class. 

142 """ 

143 if isinstance(maybe_exception, grpc.RpcError): 

144 return exceptions.from_grpc_error(maybe_exception) 

145 elif isinstance(maybe_exception, BaseException): 

146 return maybe_exception 

147 

148 return Exception(maybe_exception) 

149 

150 

151def _wrap_callback_errors( 

152 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

153 on_callback_error: Callable[[BaseException], Any], 

154 message: "google.cloud.pubsub_v1.subscriber.message.Message", 

155): 

156 """Wraps a user callback so that if an exception occurs the message is 

157 nacked. 

158 

159 Args: 

160 callback: The user callback. 

161 message: The Pub/Sub message. 

162 """ 

163 _CALLBACK_DELIVERY_LOGGER.debug( 

164 "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback", 

165 message.message_id, 

166 message.ack_id, 

167 message.ordering_key, 

168 message.exactly_once_enabled, 

169 ) 

170 

171 try: 

172 if message.opentelemetry_data: 

173 message.opentelemetry_data.end_subscribe_concurrency_control_span() 

174 with message.opentelemetry_data: 

175 callback(message) 

176 else: 

177 callback(message) 

178 except BaseException as exc: 

179 # Note: the likelihood of this failing is extremely low. This just adds 

180 # a message to a queue, so if this doesn't work the world is in an 

181 # unrecoverable state and this thread should just bail. 

182 

183 _CALLBACK_EXCEPTION_LOGGER.exception( 

184 "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.", 

185 message.message_id, 

186 message.ack_id, 

187 message.ordering_key, 

188 message.exactly_once_enabled, 

189 ) 

190 

191 message.nack() 

192 on_callback_error(exc) 

193 

194 

195def _get_status( 

196 exc: exceptions.GoogleAPICallError, 

197) -> Optional["status_pb2.Status"]: 

198 if not exc.response: 

199 _LOGGER.debug("No response obj in errored RPC call.") 

200 return None 

201 try: 

202 return rpc_status.from_call(exc.response) 

203 # Possible "If the gRPC call’s code or details are inconsistent 

204 # with the status code and message inside of the 

205 # google.rpc.status.Status" 

206 except ValueError: 

207 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True) 

208 return None 

209 

210 

211def _get_ack_errors( 

212 exc: exceptions.GoogleAPICallError, 

213) -> Optional[Dict[str, str]]: 

214 status = _get_status(exc) 

215 if not status: 

216 _LOGGER.debug("Unable to get status of errored RPC.") 

217 return None 

218 for detail in status.details: 

219 info = ErrorInfo() 

220 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)): 

221 _LOGGER.debug("Unable to unpack ErrorInfo.") 

222 return None 

223 return info.metadata 

224 return None 

225 

226 

227def _process_requests( 

228 error_status: Optional["status_pb2.Status"], 

229 ack_reqs_dict: Dict[str, requests.AckRequest], 

230 errors_dict: Optional[Dict[str, str]], 

231 ack_histogram: Optional[histogram.Histogram] = None, 

232 # TODO - Change this param to a Union of Literals when we drop p3.7 support 

233 req_type: str = "ack", 

234): 

235 """Process requests when exactly-once delivery is enabled by referring to 

236 error_status and errors_dict. 

237 

238 The errors returned by the server in as `error_status` or in `errors_dict` 

239 are used to complete the request futures in `ack_reqs_dict` (with a success 

240 or exception) or to return requests for further retries. 

241 """ 

242 requests_completed = [] 

243 requests_to_retry = [] 

244 for ack_id, ack_request in ack_reqs_dict.items(): 

245 # Debug logging: slow acks 

246 if ( 

247 req_type == "ack" 

248 and ack_histogram 

249 and ack_request.time_to_ack > ack_histogram.percentile(percent=99) 

250 ): 

251 _SLOW_ACK_LOGGER.debug( 

252 "Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration", 

253 ack_request.message_id, 

254 ack_request.ack_id, 

255 ) 

256 

257 # Handle special errors returned for ack/modack RPCs via the ErrorInfo 

258 # sidecar metadata when exactly-once delivery is enabled. 

259 if errors_dict and ack_id in errors_dict: 

260 exactly_once_error = errors_dict[ack_id] 

261 if exactly_once_error.startswith("TRANSIENT_"): 

262 requests_to_retry.append(ack_request) 

263 else: 

264 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": 

265 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) 

266 else: 

267 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) 

268 future = ack_request.future 

269 if future is not None: 

270 future.set_exception(exc) 

271 requests_completed.append(ack_request) 

272 # Temporary GRPC errors are retried 

273 elif ( 

274 error_status 

275 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS 

276 ): 

277 requests_to_retry.append(ack_request) 

278 # Other GRPC errors are NOT retried 

279 elif error_status: 

280 if error_status.code == code_pb2.PERMISSION_DENIED: 

281 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) 

282 elif error_status.code == code_pb2.FAILED_PRECONDITION: 

283 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) 

284 else: 

285 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) 

286 future = ack_request.future 

287 if future is not None: 

288 future.set_exception(exc) 

289 requests_completed.append(ack_request) 

290 # Since no error occurred, requests with futures are completed successfully. 

291 elif ack_request.future: 

292 future = ack_request.future 

293 # success 

294 assert future is not None 

295 future.set_result(AcknowledgeStatus.SUCCESS) 

296 requests_completed.append(ack_request) 

297 # All other requests are considered completed. 

298 else: 

299 requests_completed.append(ack_request) 

300 

301 return requests_completed, requests_to_retry 

302 

303 

304class StreamingPullManager(object): 

305 """The streaming pull manager coordinates pulling messages from Pub/Sub, 

306 leasing them, and scheduling them to be processed. 

307 

308 Args: 

309 client: 

310 The subscriber client used to create this instance. 

311 subscription: 

312 The name of the subscription. The canonical format for this is 

313 ``projects/{project}/subscriptions/{subscription}``. 

314 flow_control: 

315 The flow control settings. 

316 scheduler: 

317 The scheduler to use to process messages. If not provided, a thread 

318 pool-based scheduler will be used. 

319 use_legacy_flow_control: 

320 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

321 though client-side flow control is still enabled. If set to ``False`` 

322 (default), both server-side and client-side flow control are enabled. 

323 await_callbacks_on_shutdown: 

324 If ``True``, the shutdown thread will wait until all scheduler threads 

325 terminate and only then proceed with shutting down the remaining running 

326 helper threads. 

327 

328 If ``False`` (default), the shutdown thread will shut the scheduler down, 

329 but it will not wait for the currently executing scheduler threads to 

330 terminate. 

331 

332 This setting affects when the on close callbacks get invoked, and 

333 consequently, when the StreamingPullFuture associated with the stream gets 

334 resolved. 

335 """ 

336 

337 def __init__( 

338 self, 

339 client: "subscriber.Client", 

340 subscription: str, 

341 flow_control: types.FlowControl = types.FlowControl(), 

342 scheduler: Optional[ThreadScheduler] = None, 

343 use_legacy_flow_control: bool = False, 

344 await_callbacks_on_shutdown: bool = False, 

345 ): 

346 self._client = client 

347 self._subscription = subscription 

348 self._exactly_once_enabled = False 

349 self._flow_control = flow_control 

350 self._use_legacy_flow_control = use_legacy_flow_control 

351 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown 

352 self._ack_histogram = histogram.Histogram() 

353 self._last_histogram_size = 0 

354 self._stream_metadata = [ 

355 ["x-goog-request-params", "subscription=" + subscription] 

356 ] 

357 

358 # If max_duration_per_lease_extension is the default 

359 # we set the stream_ack_deadline to the default of 60 

360 if self._flow_control.max_duration_per_lease_extension == 0: 

361 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE 

362 # We will not be able to extend more than the default minimum 

363 elif ( 

364 self._flow_control.max_duration_per_lease_extension 

365 < _MIN_STREAM_ACK_DEADLINE 

366 ): 

367 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE 

368 # Will not be able to extend past the max 

369 elif ( 

370 self._flow_control.max_duration_per_lease_extension 

371 > _MAX_STREAM_ACK_DEADLINE 

372 ): 

373 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE 

374 else: 

375 self._stream_ack_deadline = ( 

376 self._flow_control.max_duration_per_lease_extension 

377 ) 

378 

379 self._ack_deadline = max( 

380 min( 

381 self._flow_control.min_duration_per_lease_extension, 

382 histogram.MAX_ACK_DEADLINE, 

383 ), 

384 histogram.MIN_ACK_DEADLINE, 

385 ) 

386 

387 self._rpc: Optional[bidi.ResumableBidiRpc] = None 

388 self._callback: Optional[functools.partial] = None 

389 self._closing = threading.Lock() 

390 self._closed = False 

391 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = [] 

392 # Guarded by self._exactly_once_enabled_lock 

393 self._send_new_ack_deadline = False 

394 

395 # A shutdown thread is created on intentional shutdown. 

396 self._regular_shutdown_thread: Optional[threading.Thread] = None 

397 

398 # Generate a random client id tied to this object. All streaming pull 

399 # connections (initial and re-connects) will then use the same client 

400 # id. Doing so lets the server establish affinity even across stream 

401 # disconncetions. 

402 self._client_id = str(uuid.uuid4()) 

403 

404 if scheduler is None: 

405 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler() 

406 else: 

407 self._scheduler = scheduler 

408 

409 # A collection for the messages that have been received from the server, 

410 # but not yet sent to the user callback. 

411 self._messages_on_hold = messages_on_hold.MessagesOnHold() 

412 

413 # The total number of bytes consumed by the messages currently on hold 

414 self._on_hold_bytes = 0 

415 

416 # A lock ensuring that pausing / resuming the consumer are both atomic 

417 # operations that cannot be executed concurrently. Needed for properly 

418 # syncing these operations with the current leaser load. Additionally, 

419 # the lock is used to protect modifications of internal data that 

420 # affects the load computation, i.e. the count and size of the messages 

421 # currently on hold. 

422 self._pause_resume_lock = threading.Lock() 

423 

424 # A lock guarding the self._exactly_once_enabled variable. We may also 

425 # acquire the self._ack_deadline_lock while this lock is held, but not 

426 # the reverse. So, we maintain a simple ordering of these two locks to 

427 # prevent deadlocks. 

428 self._exactly_once_enabled_lock = threading.Lock() 

429 

430 # A lock protecting the current ACK deadline used in the lease management. This 

431 # value can be potentially updated both by the leaser thread and by the message 

432 # consumer thread when invoking the internal _on_response() callback. 

433 self._ack_deadline_lock = threading.Lock() 

434 

435 # The threads created in ``.open()``. 

436 self._dispatcher: Optional[dispatcher.Dispatcher] = None 

437 self._leaser: Optional[leaser.Leaser] = None 

438 self._consumer: Optional[bidi.BackgroundConsumer] = None 

439 self._heartbeater: Optional[heartbeater.Heartbeater] = None 

440 

441 @property 

442 def is_active(self) -> bool: 

443 """``True`` if this manager is actively streaming. 

444 

445 Note that ``False`` does not indicate this is complete shut down, 

446 just that it stopped getting new messages. 

447 """ 

448 return self._consumer is not None and self._consumer.is_active 

449 

450 @property 

451 def flow_control(self) -> types.FlowControl: 

452 """The active flow control settings.""" 

453 return self._flow_control 

454 

455 @property 

456 def dispatcher(self) -> Optional[dispatcher.Dispatcher]: 

457 """The dispatcher helper.""" 

458 return self._dispatcher 

459 

460 @property 

461 def leaser(self) -> Optional[leaser.Leaser]: 

462 """The leaser helper.""" 

463 return self._leaser 

464 

465 @property 

466 def ack_histogram(self) -> histogram.Histogram: 

467 """The histogram tracking time-to-acknowledge.""" 

468 return self._ack_histogram 

469 

470 @property 

471 def ack_deadline(self) -> float: 

472 """Return the current ACK deadline based on historical data without updating it. 

473 

474 Returns: 

475 The ack deadline. 

476 """ 

477 return self._obtain_ack_deadline(maybe_update=False) 

478 

479 def _obtain_ack_deadline(self, maybe_update: bool) -> float: 

480 """The actual `ack_deadline` implementation. 

481 

482 This method is "sticky". It will only perform the computations to check on the 

483 right ACK deadline if explicitly requested AND if the histogram with past 

484 time-to-ack data has gained a significant amount of new information. 

485 

486 Args: 

487 maybe_update: 

488 If ``True``, also update the current ACK deadline before returning it if 

489 enough new ACK data has been gathered. 

490 

491 Returns: 

492 The current ACK deadline in seconds to use. 

493 """ 

494 with self._ack_deadline_lock: 

495 if not maybe_update: 

496 return self._ack_deadline 

497 

498 target_size = min( 

499 self._last_histogram_size * 2, self._last_histogram_size + 100 

500 ) 

501 hist_size = len(self.ack_histogram) 

502 

503 if hist_size > target_size: 

504 self._last_histogram_size = hist_size 

505 self._ack_deadline = self.ack_histogram.percentile(percent=99) 

506 

507 if self.flow_control.max_duration_per_lease_extension > 0: 

508 # The setting in flow control could be too low, adjust if needed. 

509 flow_control_setting = max( 

510 self.flow_control.max_duration_per_lease_extension, 

511 histogram.MIN_ACK_DEADLINE, 

512 ) 

513 self._ack_deadline = min(self._ack_deadline, flow_control_setting) 

514 

515 # If the user explicitly sets a min ack_deadline, respect it. 

516 if self.flow_control.min_duration_per_lease_extension > 0: 

517 # The setting in flow control could be too high, adjust if needed. 

518 flow_control_setting = min( 

519 self.flow_control.min_duration_per_lease_extension, 

520 histogram.MAX_ACK_DEADLINE, 

521 ) 

522 self._ack_deadline = max(self._ack_deadline, flow_control_setting) 

523 elif self._exactly_once_enabled: 

524 # Higher minimum ack_deadline for subscriptions with 

525 # exactly-once delivery enabled. 

526 self._ack_deadline = max( 

527 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED 

528 ) 

529 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline 

530 # set the stream_ack_deadline to the new ack_deadline. 

531 if self._ack_deadline > self._stream_ack_deadline: 

532 self._stream_ack_deadline = self._ack_deadline 

533 return self._ack_deadline 

534 

535 @property 

536 def load(self) -> float: 

537 """Return the current load. 

538 

539 The load is represented as a float, where 1.0 represents having 

540 hit one of the flow control limits, and values between 0.0 and 1.0 

541 represent how close we are to them. (0.5 means we have exactly half 

542 of what the flow control setting allows, for example.) 

543 

544 There are (currently) two flow control settings; this property 

545 computes how close the manager is to each of them, and returns 

546 whichever value is higher. (It does not matter that we have lots of 

547 running room on setting A if setting B is over.) 

548 

549 Returns: 

550 The load value. 

551 """ 

552 if self._leaser is None: 

553 return 0.0 

554 

555 # Messages that are temporarily put on hold are not being delivered to 

556 # user's callbacks, thus they should not contribute to the flow control 

557 # load calculation. 

558 # However, since these messages must still be lease-managed to avoid 

559 # unnecessary ACK deadline expirations, their count and total size must 

560 # be subtracted from the leaser's values. 

561 return max( 

562 [ 

563 (self._leaser.message_count - self._messages_on_hold.size) 

564 / self._flow_control.max_messages, 

565 (self._leaser.bytes - self._on_hold_bytes) 

566 / self._flow_control.max_bytes, 

567 ] 

568 ) 

569 

570 def add_close_callback( 

571 self, callback: Callable[["StreamingPullManager", Any], Any] 

572 ) -> None: 

573 """Schedules a callable when the manager closes. 

574 

575 Args: 

576 The method to call. 

577 """ 

578 self._close_callbacks.append(callback) 

579 

580 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None: 

581 """Send the next message in the queue for each of the passed-in 

582 ordering keys, if they exist. Clean up state for keys that no longer 

583 have any queued messages. 

584 

585 Since the load went down by one message, it's probably safe to send the 

586 user another message for the same key. Since the released message may be 

587 bigger than the previous one, this may increase the load above the maximum. 

588 This decision is by design because it simplifies MessagesOnHold. 

589 

590 Args: 

591 ordering_keys: 

592 A sequence of ordering keys to activate. May be empty. 

593 """ 

594 with self._pause_resume_lock: 

595 if self._scheduler is None: 

596 return # We are shutting down, don't try to dispatch any more messages. 

597 

598 self._messages_on_hold.activate_ordering_keys( 

599 ordering_keys, self._schedule_message_on_hold 

600 ) 

601 

602 def maybe_pause_consumer(self) -> None: 

603 """Check the current load and pause the consumer if needed.""" 

604 with self._pause_resume_lock: 

605 if self.load >= _MAX_LOAD: 

606 if self._consumer is not None and not self._consumer.is_paused: 

607 _FLOW_CONTROL_LOGGER.debug( 

608 "Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control", 

609 self.load, 

610 _RESUME_THRESHOLD, 

611 ) 

612 self._consumer.pause() 

613 

614 def maybe_resume_consumer(self) -> None: 

615 """Check the load and held messages and resume the consumer if needed. 

616 

617 If there are messages held internally, release those messages before 

618 resuming the consumer. That will avoid leaser overload. 

619 """ 

620 with self._pause_resume_lock: 

621 # If we have been paused by flow control, check and see if we are 

622 # back within our limits. 

623 # 

624 # In order to not thrash too much, require us to have passed below 

625 # the resume threshold (80% by default) of each flow control setting 

626 # before restarting. 

627 if self._consumer is None or not self._consumer.is_paused: 

628 return 

629 

630 _LOGGER.debug("Current load: %.2f", self.load) 

631 

632 # Before maybe resuming the background consumer, release any messages 

633 # currently on hold, if the current load allows for it. 

634 self._maybe_release_messages() 

635 

636 if self.load < _RESUME_THRESHOLD: 

637 _FLOW_CONTROL_LOGGER.debug( 

638 "Current load is %.2f (threshold %.2f), suspending client-side flow control.", 

639 self.load, 

640 _RESUME_THRESHOLD, 

641 ) 

642 self._consumer.resume() 

643 else: 

644 _FLOW_CONTROL_LOGGER.debug( 

645 "Current load is %.2f (threshold %.2f), retaining client-side flow control.", 

646 self.load, 

647 _RESUME_THRESHOLD, 

648 ) 

649 

650 def _maybe_release_messages(self) -> None: 

651 """Release (some of) the held messages if the current load allows for it. 

652 

653 The method tries to release as many messages as the current leaser load 

654 would allow. Each released message is added to the lease management, 

655 and the user callback is scheduled for it. 

656 

657 If there are currently no messages on hold, or if the leaser is 

658 already overloaded, this method is effectively a no-op. 

659 

660 The method assumes the caller has acquired the ``_pause_resume_lock``. 

661 """ 

662 released_ack_ids = [] 

663 while self.load < _MAX_LOAD: 

664 msg = self._messages_on_hold.get() 

665 if not msg: 

666 break 

667 if msg.opentelemetry_data: 

668 msg.opentelemetry_data.end_subscribe_scheduler_span() 

669 self._schedule_message_on_hold(msg) 

670 released_ack_ids.append(msg.ack_id) 

671 

672 assert self._leaser is not None 

673 self._leaser.start_lease_expiry_timer(released_ack_ids) 

674 

675 def _schedule_message_on_hold( 

676 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message" 

677 ): 

678 """Schedule a message on hold to be sent to the user and change on-hold-bytes. 

679 

680 The method assumes the caller has acquired the ``_pause_resume_lock``. 

681 

682 Args: 

683 msg: The message to schedule to be sent to the user. 

684 """ 

685 assert msg, "Message must not be None." 

686 

687 # On-hold bytes goes down, increasing load. 

688 self._on_hold_bytes -= msg.size 

689 

690 if self._on_hold_bytes < 0: 

691 _LOGGER.warning( 

692 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes 

693 ) 

694 self._on_hold_bytes = 0 

695 

696 _LOGGER.debug( 

697 "Released held message, scheduling callback for it, " 

698 "still on hold %s (bytes %s).", 

699 self._messages_on_hold.size, 

700 self._on_hold_bytes, 

701 ) 

702 assert self._scheduler is not None 

703 assert self._callback is not None 

704 if msg.opentelemetry_data: 

705 msg.opentelemetry_data.start_subscribe_concurrency_control_span() 

706 self._scheduler.schedule(self._callback, msg) 

707 

708 def send_unary_ack( 

709 self, ack_ids, ack_reqs_dict 

710 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: 

711 """Send a request using a separate unary request instead of over the stream. 

712 

713 If a RetryError occurs, the manager shutdown is triggered, and the 

714 error is re-raised. 

715 """ 

716 assert ack_ids 

717 assert len(ack_ids) == len(ack_reqs_dict) 

718 

719 error_status = None 

720 ack_errors_dict = None 

721 try: 

722 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids) 

723 except exceptions.GoogleAPICallError as exc: 

724 _LOGGER.debug( 

725 "Exception while sending unary RPC. This is typically " 

726 "non-fatal as stream requests are best-effort.", 

727 exc_info=True, 

728 ) 

729 error_status = _get_status(exc) 

730 ack_errors_dict = _get_ack_errors(exc) 

731 except exceptions.RetryError as exc: 

732 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

733 # Makes sure to complete futures so they don't block forever. 

734 for req in ack_reqs_dict.values(): 

735 # Futures may be present even with exactly-once delivery 

736 # disabled, in transition periods after the setting is changed on 

737 # the subscription. 

738 if req.future: 

739 if exactly_once_delivery_enabled: 

740 e = AcknowledgeError( 

741 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." 

742 ) 

743 req.future.set_exception(e) 

744 else: 

745 req.future.set_result(AcknowledgeStatus.SUCCESS) 

746 

747 _LOGGER.debug( 

748 "RetryError while sending ack RPC. Waiting on a transient " 

749 "error resolution for too long, will now trigger shutdown.", 

750 exc_info=False, 

751 ) 

752 # The underlying channel has been suffering from a retryable error 

753 # for too long, time to give up and shut the streaming pull down. 

754 self._on_rpc_done(exc) 

755 raise 

756 

757 if self._exactly_once_delivery_enabled(): 

758 requests_completed, requests_to_retry = _process_requests( 

759 error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram, "ack" 

760 ) 

761 else: 

762 requests_completed = [] 

763 requests_to_retry = [] 

764 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

765 # best-effort. So, they always succeed even if the RPC fails. 

766 for req in ack_reqs_dict.values(): 

767 # Futures may be present even with exactly-once delivery 

768 # disabled, in transition periods after the setting is changed on 

769 # the subscription. 

770 if req.future: 

771 req.future.set_result(AcknowledgeStatus.SUCCESS) 

772 requests_completed.append(req) 

773 

774 return requests_completed, requests_to_retry 

775 

776 def send_unary_modack( 

777 self, 

778 modify_deadline_ack_ids, 

779 modify_deadline_seconds, 

780 ack_reqs_dict, 

781 default_deadline=None, 

782 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: 

783 """Send a request using a separate unary request instead of over the stream. 

784 

785 If a RetryError occurs, the manager shutdown is triggered, and the 

786 error is re-raised. 

787 """ 

788 assert modify_deadline_ack_ids 

789 # Either we have a generator or a single deadline. 

790 assert modify_deadline_seconds is None or default_deadline is None 

791 

792 error_status = None 

793 modack_errors_dict = None 

794 try: 

795 if default_deadline is None: 

796 # Send ack_ids with the same deadline seconds together. 

797 deadline_to_ack_ids = collections.defaultdict(list) 

798 

799 for n, ack_id in enumerate(modify_deadline_ack_ids): 

800 deadline = modify_deadline_seconds[n] 

801 deadline_to_ack_ids[deadline].append(ack_id) 

802 

803 for deadline, ack_ids in deadline_to_ack_ids.items(): 

804 self._client.modify_ack_deadline( 

805 subscription=self._subscription, 

806 ack_ids=ack_ids, 

807 ack_deadline_seconds=deadline, 

808 ) 

809 else: 

810 # We can send all requests with the default deadline. 

811 self._client.modify_ack_deadline( 

812 subscription=self._subscription, 

813 ack_ids=modify_deadline_ack_ids, 

814 ack_deadline_seconds=default_deadline, 

815 ) 

816 except exceptions.GoogleAPICallError as exc: 

817 _LOGGER.debug( 

818 "Exception while sending unary RPC. This is typically " 

819 "non-fatal as stream requests are best-effort.", 

820 exc_info=True, 

821 ) 

822 error_status = _get_status(exc) 

823 modack_errors_dict = _get_ack_errors(exc) 

824 except exceptions.RetryError as exc: 

825 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

826 # Makes sure to complete futures so they don't block forever. 

827 for req in ack_reqs_dict.values(): 

828 # Futures may be present even with exactly-once delivery 

829 # disabled, in transition periods after the setting is changed on 

830 # the subscription. 

831 if req.future: 

832 if exactly_once_delivery_enabled: 

833 e = AcknowledgeError( 

834 AcknowledgeStatus.OTHER, 

835 "RetryError while sending modack RPC.", 

836 ) 

837 req.future.set_exception(e) 

838 else: 

839 req.future.set_result(AcknowledgeStatus.SUCCESS) 

840 

841 _LOGGER.debug( 

842 "RetryError while sending modack RPC. Waiting on a transient " 

843 "error resolution for too long, will now trigger shutdown.", 

844 exc_info=False, 

845 ) 

846 # The underlying channel has been suffering from a retryable error 

847 # for too long, time to give up and shut the streaming pull down. 

848 self._on_rpc_done(exc) 

849 raise 

850 

851 if self._exactly_once_delivery_enabled(): 

852 requests_completed, requests_to_retry = _process_requests( 

853 error_status, 

854 ack_reqs_dict, 

855 modack_errors_dict, 

856 self.ack_histogram, 

857 "modack", 

858 ) 

859 else: 

860 requests_completed = [] 

861 requests_to_retry = [] 

862 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

863 # best-effort. So, they always succeed even if the RPC fails. 

864 for req in ack_reqs_dict.values(): 

865 # Futures may be present even with exactly-once delivery 

866 # disabled, in transition periods after the setting is changed on 

867 # the subscription. 

868 if req.future: 

869 req.future.set_result(AcknowledgeStatus.SUCCESS) 

870 requests_completed.append(req) 

871 

872 return requests_completed, requests_to_retry 

873 

874 def heartbeat(self) -> bool: 

875 """Sends a heartbeat request over the streaming pull RPC. 

876 

877 The request is empty by default, but may contain the current ack_deadline 

878 if the self._exactly_once_enabled flag has changed. 

879 

880 Returns: 

881 If a heartbeat request has actually been sent. 

882 """ 

883 if self._rpc is not None and self._rpc.is_active: 

884 send_new_ack_deadline = False 

885 with self._exactly_once_enabled_lock: 

886 send_new_ack_deadline = self._send_new_ack_deadline 

887 self._send_new_ack_deadline = False 

888 

889 if send_new_ack_deadline: 

890 request = gapic_types.StreamingPullRequest( 

891 stream_ack_deadline_seconds=self._stream_ack_deadline 

892 ) 

893 _LOGGER.debug( 

894 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline 

895 ) 

896 else: 

897 request = gapic_types.StreamingPullRequest() 

898 

899 self._rpc.send(request) 

900 return True 

901 

902 return False 

903 

904 def open( 

905 self, 

906 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

907 on_callback_error: Callable[[Exception], Any], 

908 ) -> None: 

909 """Begin consuming messages. 

910 

911 Args: 

912 callback: 

913 A callback that will be called for each message received on the 

914 stream. 

915 on_callback_error: 

916 A callable that will be called if an exception is raised in 

917 the provided `callback`. 

918 """ 

919 if self.is_active: 

920 raise ValueError("This manager is already open.") 

921 

922 if self._closed: 

923 raise ValueError("This manager has been closed and can not be re-used.") 

924 

925 self._callback = functools.partial( 

926 _wrap_callback_errors, callback, on_callback_error 

927 ) 

928 

929 # Create the RPC 

930 stream_ack_deadline_seconds = self._stream_ack_deadline 

931 

932 get_initial_request = functools.partial( 

933 self._get_initial_request, stream_ack_deadline_seconds 

934 ) 

935 self._rpc = bidi.ResumableBidiRpc( 

936 start_rpc=self._client.streaming_pull, 

937 initial_request=get_initial_request, 

938 should_recover=self._should_recover, 

939 should_terminate=self._should_terminate, 

940 metadata=self._stream_metadata, 

941 throttle_reopen=True, 

942 ) 

943 self._rpc.add_done_callback(self._on_rpc_done) 

944 

945 _LOGGER.debug( 

946 "Creating a stream, default ACK deadline set to {} seconds.".format( 

947 self._stream_ack_deadline 

948 ) 

949 ) 

950 

951 # Create references to threads 

952 assert self._scheduler is not None 

953 scheduler_queue = self._scheduler.queue 

954 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue) 

955 

956 # `on_fatal_exception` is only available in more recent library versions. 

957 # For backwards compatibility reasons, we only pass it when `google-api-core` supports it. 

958 if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK: 

959 self._consumer = bidi.BackgroundConsumer( 

960 self._rpc, 

961 self._on_response, 

962 on_fatal_exception=self._on_fatal_exception, 

963 ) 

964 else: 

965 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) 

966 

967 self._leaser = leaser.Leaser(self) 

968 self._heartbeater = heartbeater.Heartbeater(self) 

969 

970 # Start the thread to pass the requests. 

971 self._dispatcher.start() 

972 

973 # Start consuming messages. 

974 self._consumer.start() 

975 

976 # Start the lease maintainer thread. 

977 self._leaser.start() 

978 

979 # Start the stream heartbeater thread. 

980 self._heartbeater.start() 

981 

982 def close(self, reason: Any = None) -> None: 

983 """Stop consuming messages and shutdown all helper threads. 

984 

985 This method is idempotent. Additional calls will have no effect. 

986 

987 The method does not block, it delegates the shutdown operations to a background 

988 thread. 

989 

990 Args: 

991 reason: 

992 The reason to close this. If ``None``, this is considered 

993 an "intentional" shutdown. This is passed to the callbacks 

994 specified via :meth:`add_close_callback`. 

995 """ 

996 self._regular_shutdown_thread = threading.Thread( 

997 name=_REGULAR_SHUTDOWN_THREAD_NAME, 

998 daemon=True, 

999 target=self._shutdown, 

1000 kwargs={"reason": reason}, 

1001 ) 

1002 self._regular_shutdown_thread.start() 

1003 

1004 def _shutdown(self, reason: Any = None) -> None: 

1005 """Run the actual shutdown sequence (stop the stream and all helper threads). 

1006 

1007 Args: 

1008 reason: 

1009 The reason to close the stream. If ``None``, this is considered 

1010 an "intentional" shutdown. 

1011 """ 

1012 with self._closing: 

1013 if self._closed: 

1014 return 

1015 

1016 # Stop consuming messages. 

1017 if self.is_active: 

1018 _LOGGER.debug("Stopping consumer.") 

1019 assert self._consumer is not None 

1020 self._consumer.stop() 

1021 self._consumer = None 

1022 

1023 # Shutdown all helper threads 

1024 _LOGGER.debug("Stopping scheduler.") 

1025 assert self._scheduler is not None 

1026 dropped_messages = self._scheduler.shutdown( 

1027 await_msg_callbacks=self._await_callbacks_on_shutdown 

1028 ) 

1029 self._scheduler = None 

1030 

1031 # Leaser and dispatcher reference each other through the shared 

1032 # StreamingPullManager instance, i.e. "self", thus do not set their 

1033 # references to None until both have been shut down. 

1034 # 

1035 # NOTE: Even if the dispatcher operates on an inactive leaser using 

1036 # the latter's add() and remove() methods, these have no impact on 

1037 # the stopped leaser (the leaser is never again re-started). Ditto 

1038 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), 

1039 # because the consumer gets shut down first. 

1040 _LOGGER.debug("Stopping leaser.") 

1041 assert self._leaser is not None 

1042 self._leaser.stop() 

1043 

1044 total = len(dropped_messages) + len( 

1045 self._messages_on_hold._messages_on_hold 

1046 ) 

1047 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).") 

1048 messages_to_nack = itertools.chain( 

1049 dropped_messages, self._messages_on_hold._messages_on_hold 

1050 ) 

1051 for msg in messages_to_nack: 

1052 msg.nack() 

1053 

1054 _LOGGER.debug("Stopping dispatcher.") 

1055 assert self._dispatcher is not None 

1056 self._dispatcher.stop() 

1057 self._dispatcher = None 

1058 # dispatcher terminated, OK to dispose the leaser reference now 

1059 self._leaser = None 

1060 

1061 _LOGGER.debug("Stopping heartbeater.") 

1062 assert self._heartbeater is not None 

1063 self._heartbeater.stop() 

1064 self._heartbeater = None 

1065 

1066 self._rpc = None 

1067 self._closed = True 

1068 _LOGGER.debug("Finished stopping manager.") 

1069 

1070 for callback in self._close_callbacks: 

1071 callback(self, reason) 

1072 

1073 def _get_initial_request( 

1074 self, stream_ack_deadline_seconds: int 

1075 ) -> gapic_types.StreamingPullRequest: 

1076 """Return the initial request for the RPC. 

1077 

1078 This defines the initial request that must always be sent to Pub/Sub 

1079 immediately upon opening the subscription. 

1080 

1081 Args: 

1082 stream_ack_deadline_seconds: 

1083 The default message acknowledge deadline for the stream. 

1084 

1085 Returns: 

1086 A request suitable for being the first request on the stream (and not 

1087 suitable for any other purpose). 

1088 """ 

1089 # Put the request together. 

1090 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt 

1091 # anyway. Set to some big-ish value in case we modack late. 

1092 request = gapic_types.StreamingPullRequest( 

1093 stream_ack_deadline_seconds=stream_ack_deadline_seconds, 

1094 modify_deadline_ack_ids=[], 

1095 modify_deadline_seconds=[], 

1096 subscription=self._subscription, 

1097 client_id=self._client_id, 

1098 max_outstanding_messages=( 

1099 0 if self._use_legacy_flow_control else self._flow_control.max_messages 

1100 ), 

1101 max_outstanding_bytes=( 

1102 0 if self._use_legacy_flow_control else self._flow_control.max_bytes 

1103 ), 

1104 ) 

1105 

1106 # Return the initial request. 

1107 return request 

1108 

1109 def _send_lease_modacks( 

1110 self, 

1111 ack_ids: Iterable[str], 

1112 ack_deadline: float, 

1113 opentelemetry_data: List[SubscribeOpenTelemetry], 

1114 warn_on_invalid=True, 

1115 receipt_modack: bool = False, 

1116 ) -> Set[str]: 

1117 exactly_once_enabled = False 

1118 

1119 modack_span: Optional[trace.Span] = None 

1120 if self._client.open_telemetry_enabled: 

1121 subscribe_span_links: List[trace.Link] = [] 

1122 subscribe_spans: List[trace.Span] = [] 

1123 subscription_split: List[str] = self._subscription.split("/") 

1124 assert len(subscription_split) == 4 

1125 subscription_id: str = subscription_split[3] 

1126 project_id: str = subscription_split[1] 

1127 for data in opentelemetry_data: 

1128 subscribe_span: Optional[trace.Span] = data.subscribe_span 

1129 if ( 

1130 subscribe_span 

1131 and subscribe_span.get_span_context().trace_flags.sampled 

1132 ): 

1133 subscribe_span_links.append( 

1134 trace.Link(subscribe_span.get_span_context()) 

1135 ) 

1136 subscribe_spans.append(subscribe_span) 

1137 modack_span = start_modack_span( 

1138 subscribe_span_links, 

1139 subscription_id, 

1140 len(opentelemetry_data), 

1141 ack_deadline, 

1142 project_id, 

1143 "_send_lease_modacks", 

1144 receipt_modack, 

1145 ) 

1146 if ( 

1147 modack_span and modack_span.get_span_context().trace_flags.sampled 

1148 ): # pragma: NO COVER 

1149 modack_span_context: trace.SpanContext = modack_span.get_span_context() 

1150 for subscribe_span in subscribe_spans: 

1151 subscribe_span.add_link( 

1152 context=modack_span_context, 

1153 attributes={ 

1154 "messaging.operation.name": "modack", 

1155 }, 

1156 ) 

1157 

1158 with self._exactly_once_enabled_lock: 

1159 exactly_once_enabled = self._exactly_once_enabled 

1160 if exactly_once_enabled: 

1161 eod_items: List[requests.ModAckRequest] = [] 

1162 if self._client.open_telemetry_enabled: 

1163 for ack_id, data in zip( 

1164 ack_ids, opentelemetry_data 

1165 ): # pragma: NO COVER # Identical code covered in the same function below 

1166 assert data is not None 

1167 eod_items.append( 

1168 requests.ModAckRequest( 

1169 ack_id, 

1170 ack_deadline, 

1171 futures.Future(), 

1172 data, 

1173 ) 

1174 ) 

1175 else: 

1176 eod_items = [ 

1177 requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) 

1178 for ack_id in ack_ids 

1179 ] 

1180 

1181 assert self._dispatcher is not None 

1182 self._dispatcher.modify_ack_deadline(eod_items, ack_deadline) 

1183 if ( 

1184 modack_span 

1185 ): # pragma: NO COVER # Identical code covered in the same function below 

1186 modack_span.end() 

1187 expired_ack_ids = set() 

1188 for req in eod_items: 

1189 try: 

1190 assert req.future is not None 

1191 req.future.result() 

1192 except AcknowledgeError as ack_error: 

1193 if ( 

1194 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID 

1195 or warn_on_invalid 

1196 ): 

1197 _LOGGER.warning( 

1198 "AcknowledgeError when lease-modacking a message.", 

1199 exc_info=True, 

1200 ) 

1201 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: 

1202 expired_ack_ids.add(req.ack_id) 

1203 return expired_ack_ids 

1204 else: 

1205 items: List[requests.ModAckRequest] = [] 

1206 if self._client.open_telemetry_enabled: 

1207 for ack_id, data in zip(ack_ids, opentelemetry_data): 

1208 assert data is not None 

1209 items.append( 

1210 requests.ModAckRequest( 

1211 ack_id, 

1212 self.ack_deadline, 

1213 None, 

1214 data, 

1215 ) 

1216 ) 

1217 else: 

1218 items = [ 

1219 requests.ModAckRequest(ack_id, self.ack_deadline, None) 

1220 for ack_id in ack_ids 

1221 ] 

1222 assert self._dispatcher is not None 

1223 self._dispatcher.modify_ack_deadline(items, ack_deadline) 

1224 if modack_span: 

1225 modack_span.end() 

1226 return set() 

1227 

1228 def _exactly_once_delivery_enabled(self) -> bool: 

1229 """Whether exactly-once delivery is enabled for the subscription.""" 

1230 with self._exactly_once_enabled_lock: 

1231 return self._exactly_once_enabled 

1232 

1233 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: 

1234 """Process all received Pub/Sub messages. 

1235 

1236 For each message, send a modified acknowledgment request to the 

1237 server. This prevents expiration of the message due to buffering by 

1238 gRPC or proxy/firewall. This makes the server and client expiration 

1239 timer closer to each other thus preventing the message being 

1240 redelivered multiple times. 

1241 

1242 After the messages have all had their ack deadline updated, execute 

1243 the callback for each message using the executor. 

1244 """ 

1245 if response is None: 

1246 _LOGGER.debug( 

1247 "Response callback invoked with None, likely due to a " 

1248 "transport shutdown." 

1249 ) 

1250 return 

1251 

1252 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying 

1253 # protobuf message to significantly gain on attribute access performance. 

1254 received_messages = response._pb.received_messages 

1255 

1256 subscribe_opentelemetry: List[SubscribeOpenTelemetry] = [] 

1257 if self._client.open_telemetry_enabled: 

1258 for received_message in received_messages: 

1259 opentelemetry_data = SubscribeOpenTelemetry(received_message.message) 

1260 opentelemetry_data.start_subscribe_span( 

1261 self._subscription, 

1262 response.subscription_properties.exactly_once_delivery_enabled, 

1263 received_message.ack_id, 

1264 received_message.delivery_attempt, 

1265 ) 

1266 subscribe_opentelemetry.append(opentelemetry_data) 

1267 

1268 _LOGGER.debug( 

1269 "Processing %s received message(s), currently on hold %s (bytes %s).", 

1270 len(received_messages), 

1271 self._messages_on_hold.size, 

1272 self._on_hold_bytes, 

1273 ) 

1274 

1275 with self._exactly_once_enabled_lock: 

1276 if ( 

1277 response.subscription_properties.exactly_once_delivery_enabled 

1278 != self._exactly_once_enabled 

1279 ): 

1280 self._exactly_once_enabled = ( 

1281 response.subscription_properties.exactly_once_delivery_enabled 

1282 ) 

1283 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled 

1284 # This method acquires the self._ack_deadline_lock lock. 

1285 self._obtain_ack_deadline(maybe_update=True) 

1286 self._send_new_ack_deadline = True 

1287 

1288 # Immediately (i.e. without waiting for the auto lease management) 

1289 # modack the messages we received, as this tells the server that we've 

1290 # received them. 

1291 ack_id_gen = (message.ack_id for message in received_messages) 

1292 expired_ack_ids = self._send_lease_modacks( 

1293 ack_id_gen, 

1294 self.ack_deadline, 

1295 subscribe_opentelemetry, 

1296 warn_on_invalid=False, 

1297 receipt_modack=True, 

1298 ) 

1299 

1300 if len(expired_ack_ids): 

1301 _EXPIRY_LOGGER.debug( 

1302 "ack ids %s were dropped as they have already expired.", expired_ack_ids 

1303 ) 

1304 

1305 with self._pause_resume_lock: 

1306 if self._scheduler is None or self._leaser is None: 

1307 _LOGGER.debug( 

1308 f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing." 

1309 ) 

1310 return 

1311 

1312 i: int = 0 

1313 for received_message in received_messages: 

1314 if ( 

1315 not self._exactly_once_delivery_enabled() 

1316 or received_message.ack_id not in expired_ack_ids 

1317 ): 

1318 message = google.cloud.pubsub_v1.subscriber.message.Message( 

1319 received_message.message, 

1320 received_message.ack_id, 

1321 received_message.delivery_attempt, 

1322 self._scheduler.queue, 

1323 self._exactly_once_delivery_enabled, 

1324 ) 

1325 if self._client.open_telemetry_enabled: 

1326 message.opentelemetry_data = subscribe_opentelemetry[i] 

1327 i = i + 1 

1328 self._messages_on_hold.put(message) 

1329 self._on_hold_bytes += message.size 

1330 req = requests.LeaseRequest( 

1331 ack_id=message.ack_id, 

1332 byte_size=message.size, 

1333 ordering_key=message.ordering_key, 

1334 opentelemetry_data=message.opentelemetry_data, 

1335 ) 

1336 self._leaser.add([req]) 

1337 

1338 self._maybe_release_messages() 

1339 

1340 self.maybe_pause_consumer() 

1341 

1342 def _on_fatal_exception(self, exception: BaseException) -> None: 

1343 """ 

1344 Called whenever `self.consumer` receives a non-retryable exception. 

1345 We close the manager on such non-retryable cases. 

1346 """ 

1347 _LOGGER.info( 

1348 "Streaming pull terminating after receiving non-recoverable error: %s", 

1349 exception, 

1350 ) 

1351 self.close(exception) 

1352 

1353 def _should_recover(self, exception: BaseException) -> bool: 

1354 """Determine if an error on the RPC stream should be recovered. 

1355 

1356 If the exception is one of the retryable exceptions, this will signal 

1357 to the consumer thread that it should "recover" from the failure. 

1358 

1359 This will cause the stream to exit when it returns :data:`False`. 

1360 

1361 Returns: 

1362 Indicates if the caller should recover or shut down. 

1363 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1364 in a list of retryable / idempotent exceptions. 

1365 """ 

1366 exception = _wrap_as_exception(exception) 

1367 # If this is in the list of idempotent exceptions, then we want to 

1368 # recover. 

1369 if isinstance(exception, _RETRYABLE_STREAM_ERRORS): 

1370 _STREAMS_LOGGER.debug( 

1371 "Observed recoverable stream error %s, reopening stream", exception 

1372 ) 

1373 return True 

1374 _STREAMS_LOGGER.debug( 

1375 "Observed non-recoverable stream error %s, shutting down stream", exception 

1376 ) 

1377 return False 

1378 

1379 def _should_terminate(self, exception: BaseException) -> bool: 

1380 """Determine if an error on the RPC stream should be terminated. 

1381 

1382 If the exception is one of the terminating exceptions, this will signal 

1383 to the consumer thread that it should terminate. 

1384 

1385 This will cause the stream to exit when it returns :data:`True`. 

1386 

1387 Returns: 

1388 Indicates if the caller should terminate or attempt recovery. 

1389 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1390 in a list of terminating exceptions. 

1391 """ 

1392 exception = _wrap_as_exception(exception) 

1393 is_api_error = isinstance(exception, exceptions.GoogleAPICallError) 

1394 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) 

1395 if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): 

1396 _STREAMS_LOGGER.debug( 

1397 "Observed terminating stream error %s, shutting down stream", exception 

1398 ) 

1399 return True 

1400 _STREAMS_LOGGER.debug( 

1401 "Observed non-terminating stream error %s, attempting to reopen", exception 

1402 ) 

1403 return False 

1404 

1405 def _on_rpc_done(self, future: Any) -> None: 

1406 """Triggered whenever the underlying RPC terminates without recovery. 

1407 

1408 This is typically triggered from one of two threads: the background 

1409 consumer thread (when calling ``recv()`` produces a non-recoverable 

1410 error) or the grpc management thread (when cancelling the RPC). 

1411 

1412 This method is *non-blocking*. It will start another thread to deal 

1413 with shutting everything down. This is to prevent blocking in the 

1414 background consumer and preventing it from being ``joined()``. 

1415 """ 

1416 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.") 

1417 error = _wrap_as_exception(future) 

1418 thread = threading.Thread( 

1419 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} 

1420 ) 

1421 thread.daemon = True 

1422 thread.start()