Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py: 18%

448 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import division 

16 

17import collections 

18import functools 

19import itertools 

20import logging 

21import threading 

22import typing 

23from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple 

24import uuid 

25 

26import grpc # type: ignore 

27 

28from google.api_core import bidi 

29from google.api_core import exceptions 

30from google.cloud.pubsub_v1 import types 

31from google.cloud.pubsub_v1.subscriber._protocol import dispatcher 

32from google.cloud.pubsub_v1.subscriber._protocol import heartbeater 

33from google.cloud.pubsub_v1.subscriber._protocol import histogram 

34from google.cloud.pubsub_v1.subscriber._protocol import leaser 

35from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold 

36from google.cloud.pubsub_v1.subscriber._protocol import requests 

37from google.cloud.pubsub_v1.subscriber.exceptions import ( 

38 AcknowledgeError, 

39 AcknowledgeStatus, 

40) 

41import google.cloud.pubsub_v1.subscriber.message 

42from google.cloud.pubsub_v1.subscriber import futures 

43from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler 

44from google.pubsub_v1 import types as gapic_types 

45from grpc_status import rpc_status # type: ignore 

46from google.rpc.error_details_pb2 import ErrorInfo # type: ignore 

47from google.rpc import code_pb2 # type: ignore 

48from google.rpc import status_pb2 

49 

50if typing.TYPE_CHECKING: # pragma: NO COVER 

51 from google.cloud.pubsub_v1 import subscriber 

52 

53 

54_LOGGER = logging.getLogger(__name__) 

55_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" 

56_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" 

57_RETRYABLE_STREAM_ERRORS = ( 

58 exceptions.DeadlineExceeded, 

59 exceptions.ServiceUnavailable, 

60 exceptions.InternalServerError, 

61 exceptions.Unknown, 

62 exceptions.GatewayTimeout, 

63 exceptions.Aborted, 

64) 

65_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,) 

66_MAX_LOAD = 1.0 

67"""The load threshold above which to pause the incoming message stream.""" 

68 

69_RESUME_THRESHOLD = 0.8 

70"""The load threshold below which to resume the incoming message stream.""" 

71 

72_MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED = 60 

73"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for 

74a subscription. We do this to reduce premature ack expiration. 

75""" 

76 

77_DEFAULT_STREAM_ACK_DEADLINE: float = 60 

78"""The default stream ack deadline in seconds.""" 

79 

80_MAX_STREAM_ACK_DEADLINE: float = 600 

81"""The maximum stream ack deadline in seconds.""" 

82 

83_MIN_STREAM_ACK_DEADLINE: float = 10 

84"""The minimum stream ack deadline in seconds.""" 

85 

86_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = { 

87 code_pb2.DEADLINE_EXCEEDED, 

88 code_pb2.RESOURCE_EXHAUSTED, 

89 code_pb2.ABORTED, 

90 code_pb2.INTERNAL, 

91 code_pb2.UNAVAILABLE, 

92} 

93 

94 

95def _wrap_as_exception(maybe_exception: Any) -> BaseException: 

96 """Wrap an object as a Python exception, if needed. 

97 

98 Args: 

99 maybe_exception: The object to wrap, usually a gRPC exception class. 

100 

101 Returns: 

102 The argument itself if an instance of ``BaseException``, otherwise 

103 the argument represented as an instance of ``Exception`` (sub)class. 

104 """ 

105 if isinstance(maybe_exception, grpc.RpcError): 

106 return exceptions.from_grpc_error(maybe_exception) 

107 elif isinstance(maybe_exception, BaseException): 

108 return maybe_exception 

109 

110 return Exception(maybe_exception) 

111 

112 

113def _wrap_callback_errors( 

114 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

115 on_callback_error: Callable[[Exception], Any], 

116 message: "google.cloud.pubsub_v1.subscriber.message.Message", 

117): 

118 """Wraps a user callback so that if an exception occurs the message is 

119 nacked. 

120 

121 Args: 

122 callback: The user callback. 

123 message: The Pub/Sub message. 

124 """ 

125 try: 

126 callback(message) 

127 except Exception as exc: 

128 # Note: the likelihood of this failing is extremely low. This just adds 

129 # a message to a queue, so if this doesn't work the world is in an 

130 # unrecoverable state and this thread should just bail. 

131 _LOGGER.exception( 

132 "Top-level exception occurred in callback while processing a message" 

133 ) 

134 message.nack() 

135 on_callback_error(exc) 

136 

137 

138def _get_status( 

139 exc: exceptions.GoogleAPICallError, 

140) -> Optional["status_pb2.Status"]: 

141 if not exc.response: 

142 _LOGGER.debug("No response obj in errored RPC call.") 

143 return None 

144 try: 

145 return rpc_status.from_call(exc.response) 

146 # Possible "If the gRPC call’s code or details are inconsistent 

147 # with the status code and message inside of the 

148 # google.rpc.status.Status" 

149 except ValueError: 

150 _LOGGER.debug("ValueError when parsing ErrorInfo.", exc_info=True) 

151 return None 

152 

153 

154def _get_ack_errors( 

155 exc: exceptions.GoogleAPICallError, 

156) -> Optional[Dict[str, str]]: 

157 status = _get_status(exc) 

158 if not status: 

159 _LOGGER.debug("Unable to get status of errored RPC.") 

160 return None 

161 for detail in status.details: 

162 info = ErrorInfo() 

163 if not (detail.Is(ErrorInfo.DESCRIPTOR) and detail.Unpack(info)): 

164 _LOGGER.debug("Unable to unpack ErrorInfo.") 

165 return None 

166 return info.metadata 

167 return None 

168 

169 

170def _process_requests( 

171 error_status: Optional["status_pb2.Status"], 

172 ack_reqs_dict: Dict[str, requests.AckRequest], 

173 errors_dict: Optional[Dict[str, str]], 

174): 

175 """Process requests when exactly-once delivery is enabled by referring to 

176 error_status and errors_dict. 

177 

178 The errors returned by the server in as `error_status` or in `errors_dict` 

179 are used to complete the request futures in `ack_reqs_dict` (with a success 

180 or exception) or to return requests for further retries. 

181 """ 

182 requests_completed = [] 

183 requests_to_retry = [] 

184 for ack_id in ack_reqs_dict: 

185 # Handle special errors returned for ack/modack RPCs via the ErrorInfo 

186 # sidecar metadata when exactly-once delivery is enabled. 

187 if errors_dict and ack_id in errors_dict: 

188 exactly_once_error = errors_dict[ack_id] 

189 if exactly_once_error.startswith("TRANSIENT_"): 

190 requests_to_retry.append(ack_reqs_dict[ack_id]) 

191 else: 

192 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": 

193 exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) 

194 else: 

195 exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) 

196 future = ack_reqs_dict[ack_id].future 

197 if future is not None: 

198 future.set_exception(exc) 

199 requests_completed.append(ack_reqs_dict[ack_id]) 

200 # Temporary GRPC errors are retried 

201 elif ( 

202 error_status 

203 and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS 

204 ): 

205 requests_to_retry.append(ack_reqs_dict[ack_id]) 

206 # Other GRPC errors are NOT retried 

207 elif error_status: 

208 if error_status.code == code_pb2.PERMISSION_DENIED: 

209 exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None) 

210 elif error_status.code == code_pb2.FAILED_PRECONDITION: 

211 exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) 

212 else: 

213 exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) 

214 future = ack_reqs_dict[ack_id].future 

215 if future is not None: 

216 future.set_exception(exc) 

217 requests_completed.append(ack_reqs_dict[ack_id]) 

218 # Since no error occurred, requests with futures are completed successfully. 

219 elif ack_reqs_dict[ack_id].future: 

220 future = ack_reqs_dict[ack_id].future 

221 # success 

222 assert future is not None 

223 future.set_result(AcknowledgeStatus.SUCCESS) 

224 requests_completed.append(ack_reqs_dict[ack_id]) 

225 # All other requests are considered completed. 

226 else: 

227 requests_completed.append(ack_reqs_dict[ack_id]) 

228 

229 return requests_completed, requests_to_retry 

230 

231 

232class StreamingPullManager(object): 

233 """The streaming pull manager coordinates pulling messages from Pub/Sub, 

234 leasing them, and scheduling them to be processed. 

235 

236 Args: 

237 client: 

238 The subscriber client used to create this instance. 

239 subscription: 

240 The name of the subscription. The canonical format for this is 

241 ``projects/{project}/subscriptions/{subscription}``. 

242 flow_control: 

243 The flow control settings. 

244 scheduler: 

245 The scheduler to use to process messages. If not provided, a thread 

246 pool-based scheduler will be used. 

247 use_legacy_flow_control: 

248 If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, 

249 though client-side flow control is still enabled. If set to ``False`` 

250 (default), both server-side and client-side flow control are enabled. 

251 await_callbacks_on_shutdown: 

252 If ``True``, the shutdown thread will wait until all scheduler threads 

253 terminate and only then proceed with shutting down the remaining running 

254 helper threads. 

255 

256 If ``False`` (default), the shutdown thread will shut the scheduler down, 

257 but it will not wait for the currently executing scheduler threads to 

258 terminate. 

259 

260 This setting affects when the on close callbacks get invoked, and 

261 consequently, when the StreamingPullFuture associated with the stream gets 

262 resolved. 

263 """ 

264 

265 def __init__( 

266 self, 

267 client: "subscriber.Client", 

268 subscription: str, 

269 flow_control: types.FlowControl = types.FlowControl(), 

270 scheduler: ThreadScheduler = None, 

271 use_legacy_flow_control: bool = False, 

272 await_callbacks_on_shutdown: bool = False, 

273 ): 

274 self._client = client 

275 self._subscription = subscription 

276 self._exactly_once_enabled = False 

277 self._flow_control = flow_control 

278 self._use_legacy_flow_control = use_legacy_flow_control 

279 self._await_callbacks_on_shutdown = await_callbacks_on_shutdown 

280 self._ack_histogram = histogram.Histogram() 

281 self._last_histogram_size = 0 

282 self._stream_metadata = [ 

283 ["x-goog-request-params", "subscription=" + subscription] 

284 ] 

285 

286 # If max_duration_per_lease_extension is the default 

287 # we set the stream_ack_deadline to the default of 60 

288 if self._flow_control.max_duration_per_lease_extension == 0: 

289 self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE 

290 # We will not be able to extend more than the default minimum 

291 elif ( 

292 self._flow_control.max_duration_per_lease_extension 

293 < _MIN_STREAM_ACK_DEADLINE 

294 ): 

295 self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE 

296 # Will not be able to extend past the max 

297 elif ( 

298 self._flow_control.max_duration_per_lease_extension 

299 > _MAX_STREAM_ACK_DEADLINE 

300 ): 

301 self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE 

302 else: 

303 self._stream_ack_deadline = ( 

304 self._flow_control.max_duration_per_lease_extension 

305 ) 

306 

307 self._ack_deadline = max( 

308 min( 

309 self._flow_control.min_duration_per_lease_extension, 

310 histogram.MAX_ACK_DEADLINE, 

311 ), 

312 histogram.MIN_ACK_DEADLINE, 

313 ) 

314 

315 self._rpc: Optional[bidi.ResumableBidiRpc] = None 

316 self._callback: Optional[functools.partial] = None 

317 self._closing = threading.Lock() 

318 self._closed = False 

319 self._close_callbacks: List[Callable[["StreamingPullManager", Any], Any]] = [] 

320 # Guarded by self._exactly_once_enabled_lock 

321 self._send_new_ack_deadline = False 

322 

323 # A shutdown thread is created on intentional shutdown. 

324 self._regular_shutdown_thread: Optional[threading.Thread] = None 

325 

326 # Generate a random client id tied to this object. All streaming pull 

327 # connections (initial and re-connects) will then use the same client 

328 # id. Doing so lets the server establish affinity even across stream 

329 # disconncetions. 

330 self._client_id = str(uuid.uuid4()) 

331 

332 if scheduler is None: 

333 self._scheduler: Optional[ThreadScheduler] = ThreadScheduler() 

334 else: 

335 self._scheduler = scheduler 

336 

337 # A collection for the messages that have been received from the server, 

338 # but not yet sent to the user callback. 

339 self._messages_on_hold = messages_on_hold.MessagesOnHold() 

340 

341 # The total number of bytes consumed by the messages currently on hold 

342 self._on_hold_bytes = 0 

343 

344 # A lock ensuring that pausing / resuming the consumer are both atomic 

345 # operations that cannot be executed concurrently. Needed for properly 

346 # syncing these operations with the current leaser load. Additionally, 

347 # the lock is used to protect modifications of internal data that 

348 # affects the load computation, i.e. the count and size of the messages 

349 # currently on hold. 

350 self._pause_resume_lock = threading.Lock() 

351 

352 # A lock guarding the self._exactly_once_enabled variable. We may also 

353 # acquire the self._ack_deadline_lock while this lock is held, but not 

354 # the reverse. So, we maintain a simple ordering of these two locks to 

355 # prevent deadlocks. 

356 self._exactly_once_enabled_lock = threading.Lock() 

357 

358 # A lock protecting the current ACK deadline used in the lease management. This 

359 # value can be potentially updated both by the leaser thread and by the message 

360 # consumer thread when invoking the internal _on_response() callback. 

361 self._ack_deadline_lock = threading.Lock() 

362 

363 # The threads created in ``.open()``. 

364 self._dispatcher: Optional[dispatcher.Dispatcher] = None 

365 self._leaser: Optional[leaser.Leaser] = None 

366 self._consumer: Optional[bidi.BackgroundConsumer] = None 

367 self._heartbeater: Optional[heartbeater.Heartbeater] = None 

368 

369 @property 

370 def is_active(self) -> bool: 

371 """``True`` if this manager is actively streaming. 

372 

373 Note that ``False`` does not indicate this is complete shut down, 

374 just that it stopped getting new messages. 

375 """ 

376 return self._consumer is not None and self._consumer.is_active 

377 

378 @property 

379 def flow_control(self) -> types.FlowControl: 

380 """The active flow control settings.""" 

381 return self._flow_control 

382 

383 @property 

384 def dispatcher(self) -> Optional[dispatcher.Dispatcher]: 

385 """The dispatcher helper.""" 

386 return self._dispatcher 

387 

388 @property 

389 def leaser(self) -> Optional[leaser.Leaser]: 

390 """The leaser helper.""" 

391 return self._leaser 

392 

393 @property 

394 def ack_histogram(self) -> histogram.Histogram: 

395 """The histogram tracking time-to-acknowledge.""" 

396 return self._ack_histogram 

397 

398 @property 

399 def ack_deadline(self) -> float: 

400 """Return the current ACK deadline based on historical data without updating it. 

401 

402 Returns: 

403 The ack deadline. 

404 """ 

405 return self._obtain_ack_deadline(maybe_update=False) 

406 

407 def _obtain_ack_deadline(self, maybe_update: bool) -> float: 

408 """The actual `ack_deadline` implementation. 

409 

410 This method is "sticky". It will only perform the computations to check on the 

411 right ACK deadline if explicitly requested AND if the histogram with past 

412 time-to-ack data has gained a significant amount of new information. 

413 

414 Args: 

415 maybe_update: 

416 If ``True``, also update the current ACK deadline before returning it if 

417 enough new ACK data has been gathered. 

418 

419 Returns: 

420 The current ACK deadline in seconds to use. 

421 """ 

422 with self._ack_deadline_lock: 

423 if not maybe_update: 

424 return self._ack_deadline 

425 

426 target_size = min( 

427 self._last_histogram_size * 2, self._last_histogram_size + 100 

428 ) 

429 hist_size = len(self.ack_histogram) 

430 

431 if hist_size > target_size: 

432 self._last_histogram_size = hist_size 

433 self._ack_deadline = self.ack_histogram.percentile(percent=99) 

434 

435 if self.flow_control.max_duration_per_lease_extension > 0: 

436 # The setting in flow control could be too low, adjust if needed. 

437 flow_control_setting = max( 

438 self.flow_control.max_duration_per_lease_extension, 

439 histogram.MIN_ACK_DEADLINE, 

440 ) 

441 self._ack_deadline = min(self._ack_deadline, flow_control_setting) 

442 

443 # If the user explicitly sets a min ack_deadline, respect it. 

444 if self.flow_control.min_duration_per_lease_extension > 0: 

445 # The setting in flow control could be too high, adjust if needed. 

446 flow_control_setting = min( 

447 self.flow_control.min_duration_per_lease_extension, 

448 histogram.MAX_ACK_DEADLINE, 

449 ) 

450 self._ack_deadline = max(self._ack_deadline, flow_control_setting) 

451 elif self._exactly_once_enabled: 

452 # Higher minimum ack_deadline for subscriptions with 

453 # exactly-once delivery enabled. 

454 self._ack_deadline = max( 

455 self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED 

456 ) 

457 # If we have updated the ack_deadline and it is longer than the stream_ack_deadline 

458 # set the stream_ack_deadline to the new ack_deadline. 

459 if self._ack_deadline > self._stream_ack_deadline: 

460 self._stream_ack_deadline = self._ack_deadline 

461 return self._ack_deadline 

462 

463 @property 

464 def load(self) -> float: 

465 """Return the current load. 

466 

467 The load is represented as a float, where 1.0 represents having 

468 hit one of the flow control limits, and values between 0.0 and 1.0 

469 represent how close we are to them. (0.5 means we have exactly half 

470 of what the flow control setting allows, for example.) 

471 

472 There are (currently) two flow control settings; this property 

473 computes how close the manager is to each of them, and returns 

474 whichever value is higher. (It does not matter that we have lots of 

475 running room on setting A if setting B is over.) 

476 

477 Returns: 

478 The load value. 

479 """ 

480 if self._leaser is None: 

481 return 0.0 

482 

483 # Messages that are temporarily put on hold are not being delivered to 

484 # user's callbacks, thus they should not contribute to the flow control 

485 # load calculation. 

486 # However, since these messages must still be lease-managed to avoid 

487 # unnecessary ACK deadline expirations, their count and total size must 

488 # be subtracted from the leaser's values. 

489 return max( 

490 [ 

491 (self._leaser.message_count - self._messages_on_hold.size) 

492 / self._flow_control.max_messages, 

493 (self._leaser.bytes - self._on_hold_bytes) 

494 / self._flow_control.max_bytes, 

495 ] 

496 ) 

497 

498 def add_close_callback( 

499 self, callback: Callable[["StreamingPullManager", Any], Any] 

500 ) -> None: 

501 """Schedules a callable when the manager closes. 

502 

503 Args: 

504 The method to call. 

505 """ 

506 self._close_callbacks.append(callback) 

507 

508 def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None: 

509 """Send the next message in the queue for each of the passed-in 

510 ordering keys, if they exist. Clean up state for keys that no longer 

511 have any queued messages. 

512 

513 Since the load went down by one message, it's probably safe to send the 

514 user another message for the same key. Since the released message may be 

515 bigger than the previous one, this may increase the load above the maximum. 

516 This decision is by design because it simplifies MessagesOnHold. 

517 

518 Args: 

519 ordering_keys: 

520 A sequence of ordering keys to activate. May be empty. 

521 """ 

522 with self._pause_resume_lock: 

523 if self._scheduler is None: 

524 return # We are shutting down, don't try to dispatch any more messages. 

525 

526 self._messages_on_hold.activate_ordering_keys( 

527 ordering_keys, self._schedule_message_on_hold 

528 ) 

529 

530 def maybe_pause_consumer(self) -> None: 

531 """Check the current load and pause the consumer if needed.""" 

532 with self._pause_resume_lock: 

533 if self.load >= _MAX_LOAD: 

534 if self._consumer is not None and not self._consumer.is_paused: 

535 _LOGGER.debug( 

536 "Message backlog over load at %.2f, pausing.", self.load 

537 ) 

538 self._consumer.pause() 

539 

540 def maybe_resume_consumer(self) -> None: 

541 """Check the load and held messages and resume the consumer if needed. 

542 

543 If there are messages held internally, release those messages before 

544 resuming the consumer. That will avoid leaser overload. 

545 """ 

546 with self._pause_resume_lock: 

547 # If we have been paused by flow control, check and see if we are 

548 # back within our limits. 

549 # 

550 # In order to not thrash too much, require us to have passed below 

551 # the resume threshold (80% by default) of each flow control setting 

552 # before restarting. 

553 if self._consumer is None or not self._consumer.is_paused: 

554 return 

555 

556 _LOGGER.debug("Current load: %.2f", self.load) 

557 

558 # Before maybe resuming the background consumer, release any messages 

559 # currently on hold, if the current load allows for it. 

560 self._maybe_release_messages() 

561 

562 if self.load < _RESUME_THRESHOLD: 

563 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) 

564 self._consumer.resume() 

565 else: 

566 _LOGGER.debug("Did not resume, current load is %.2f.", self.load) 

567 

568 def _maybe_release_messages(self) -> None: 

569 """Release (some of) the held messages if the current load allows for it. 

570 

571 The method tries to release as many messages as the current leaser load 

572 would allow. Each released message is added to the lease management, 

573 and the user callback is scheduled for it. 

574 

575 If there are currently no messages on hold, or if the leaser is 

576 already overloaded, this method is effectively a no-op. 

577 

578 The method assumes the caller has acquired the ``_pause_resume_lock``. 

579 """ 

580 released_ack_ids = [] 

581 while self.load < _MAX_LOAD: 

582 msg = self._messages_on_hold.get() 

583 if not msg: 

584 break 

585 

586 self._schedule_message_on_hold(msg) 

587 released_ack_ids.append(msg.ack_id) 

588 

589 assert self._leaser is not None 

590 self._leaser.start_lease_expiry_timer(released_ack_ids) 

591 

592 def _schedule_message_on_hold( 

593 self, msg: "google.cloud.pubsub_v1.subscriber.message.Message" 

594 ): 

595 """Schedule a message on hold to be sent to the user and change on-hold-bytes. 

596 

597 The method assumes the caller has acquired the ``_pause_resume_lock``. 

598 

599 Args: 

600 msg: The message to schedule to be sent to the user. 

601 """ 

602 assert msg, "Message must not be None." 

603 

604 # On-hold bytes goes down, increasing load. 

605 self._on_hold_bytes -= msg.size 

606 

607 if self._on_hold_bytes < 0: 

608 _LOGGER.warning( 

609 "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes 

610 ) 

611 self._on_hold_bytes = 0 

612 

613 _LOGGER.debug( 

614 "Released held message, scheduling callback for it, " 

615 "still on hold %s (bytes %s).", 

616 self._messages_on_hold.size, 

617 self._on_hold_bytes, 

618 ) 

619 assert self._scheduler is not None 

620 assert self._callback is not None 

621 self._scheduler.schedule(self._callback, msg) 

622 

623 def send_unary_ack( 

624 self, ack_ids, ack_reqs_dict 

625 ) -> Tuple[List[requests.AckRequest], List[requests.AckRequest]]: 

626 """Send a request using a separate unary request instead of over the stream. 

627 

628 If a RetryError occurs, the manager shutdown is triggered, and the 

629 error is re-raised. 

630 """ 

631 assert ack_ids 

632 assert len(ack_ids) == len(ack_reqs_dict) 

633 

634 error_status = None 

635 ack_errors_dict = None 

636 try: 

637 self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids) 

638 except exceptions.GoogleAPICallError as exc: 

639 _LOGGER.debug( 

640 "Exception while sending unary RPC. This is typically " 

641 "non-fatal as stream requests are best-effort.", 

642 exc_info=True, 

643 ) 

644 error_status = _get_status(exc) 

645 ack_errors_dict = _get_ack_errors(exc) 

646 except exceptions.RetryError as exc: 

647 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

648 # Makes sure to complete futures so they don't block forever. 

649 for req in ack_reqs_dict.values(): 

650 # Futures may be present even with exactly-once delivery 

651 # disabled, in transition periods after the setting is changed on 

652 # the subscription. 

653 if req.future: 

654 if exactly_once_delivery_enabled: 

655 e = AcknowledgeError( 

656 AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." 

657 ) 

658 req.future.set_exception(e) 

659 else: 

660 req.future.set_result(AcknowledgeStatus.SUCCESS) 

661 

662 _LOGGER.debug( 

663 "RetryError while sending ack RPC. Waiting on a transient " 

664 "error resolution for too long, will now trigger shutdown.", 

665 exc_info=False, 

666 ) 

667 # The underlying channel has been suffering from a retryable error 

668 # for too long, time to give up and shut the streaming pull down. 

669 self._on_rpc_done(exc) 

670 raise 

671 

672 if self._exactly_once_delivery_enabled(): 

673 requests_completed, requests_to_retry = _process_requests( 

674 error_status, ack_reqs_dict, ack_errors_dict 

675 ) 

676 else: 

677 requests_completed = [] 

678 requests_to_retry = [] 

679 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

680 # best-effort. So, they always succeed even if the RPC fails. 

681 for req in ack_reqs_dict.values(): 

682 # Futures may be present even with exactly-once delivery 

683 # disabled, in transition periods after the setting is changed on 

684 # the subscription. 

685 if req.future: 

686 req.future.set_result(AcknowledgeStatus.SUCCESS) 

687 requests_completed.append(req) 

688 

689 return requests_completed, requests_to_retry 

690 

691 def send_unary_modack( 

692 self, 

693 modify_deadline_ack_ids, 

694 modify_deadline_seconds, 

695 ack_reqs_dict, 

696 default_deadline=None, 

697 ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: 

698 """Send a request using a separate unary request instead of over the stream. 

699 

700 If a RetryError occurs, the manager shutdown is triggered, and the 

701 error is re-raised. 

702 """ 

703 assert modify_deadline_ack_ids 

704 # Either we have a generator or a single deadline. 

705 assert modify_deadline_seconds is None or default_deadline is None 

706 

707 error_status = None 

708 modack_errors_dict = None 

709 try: 

710 if default_deadline is None: 

711 # Send ack_ids with the same deadline seconds together. 

712 deadline_to_ack_ids = collections.defaultdict(list) 

713 

714 for n, ack_id in enumerate(modify_deadline_ack_ids): 

715 deadline = modify_deadline_seconds[n] 

716 deadline_to_ack_ids[deadline].append(ack_id) 

717 

718 for deadline, ack_ids in deadline_to_ack_ids.items(): 

719 self._client.modify_ack_deadline( 

720 subscription=self._subscription, 

721 ack_ids=ack_ids, 

722 ack_deadline_seconds=deadline, 

723 ) 

724 else: 

725 # We can send all requests with the default deadline. 

726 self._client.modify_ack_deadline( 

727 subscription=self._subscription, 

728 ack_ids=modify_deadline_ack_ids, 

729 ack_deadline_seconds=default_deadline, 

730 ) 

731 except exceptions.GoogleAPICallError as exc: 

732 _LOGGER.debug( 

733 "Exception while sending unary RPC. This is typically " 

734 "non-fatal as stream requests are best-effort.", 

735 exc_info=True, 

736 ) 

737 error_status = _get_status(exc) 

738 modack_errors_dict = _get_ack_errors(exc) 

739 except exceptions.RetryError as exc: 

740 exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() 

741 # Makes sure to complete futures so they don't block forever. 

742 for req in ack_reqs_dict.values(): 

743 # Futures may be present even with exactly-once delivery 

744 # disabled, in transition periods after the setting is changed on 

745 # the subscription. 

746 if req.future: 

747 if exactly_once_delivery_enabled: 

748 e = AcknowledgeError( 

749 AcknowledgeStatus.OTHER, 

750 "RetryError while sending modack RPC.", 

751 ) 

752 req.future.set_exception(e) 

753 else: 

754 req.future.set_result(AcknowledgeStatus.SUCCESS) 

755 

756 _LOGGER.debug( 

757 "RetryError while sending modack RPC. Waiting on a transient " 

758 "error resolution for too long, will now trigger shutdown.", 

759 exc_info=False, 

760 ) 

761 # The underlying channel has been suffering from a retryable error 

762 # for too long, time to give up and shut the streaming pull down. 

763 self._on_rpc_done(exc) 

764 raise 

765 

766 if self._exactly_once_delivery_enabled(): 

767 requests_completed, requests_to_retry = _process_requests( 

768 error_status, ack_reqs_dict, modack_errors_dict 

769 ) 

770 else: 

771 requests_completed = [] 

772 requests_to_retry = [] 

773 # When exactly-once delivery is NOT enabled, acks/modacks are considered 

774 # best-effort. So, they always succeed even if the RPC fails. 

775 for req in ack_reqs_dict.values(): 

776 # Futures may be present even with exactly-once delivery 

777 # disabled, in transition periods after the setting is changed on 

778 # the subscription. 

779 if req.future: 

780 req.future.set_result(AcknowledgeStatus.SUCCESS) 

781 requests_completed.append(req) 

782 

783 return requests_completed, requests_to_retry 

784 

785 def heartbeat(self) -> bool: 

786 """Sends a heartbeat request over the streaming pull RPC. 

787 

788 The request is empty by default, but may contain the current ack_deadline 

789 if the self._exactly_once_enabled flag has changed. 

790 

791 Returns: 

792 If a heartbeat request has actually been sent. 

793 """ 

794 if self._rpc is not None and self._rpc.is_active: 

795 send_new_ack_deadline = False 

796 with self._exactly_once_enabled_lock: 

797 send_new_ack_deadline = self._send_new_ack_deadline 

798 self._send_new_ack_deadline = False 

799 

800 if send_new_ack_deadline: 

801 request = gapic_types.StreamingPullRequest( 

802 stream_ack_deadline_seconds=self._stream_ack_deadline 

803 ) 

804 _LOGGER.debug( 

805 "Sending new ack_deadline of %d seconds.", self._stream_ack_deadline 

806 ) 

807 else: 

808 request = gapic_types.StreamingPullRequest() 

809 

810 self._rpc.send(request) 

811 return True 

812 

813 return False 

814 

815 def open( 

816 self, 

817 callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], 

818 on_callback_error: Callable[[Exception], Any], 

819 ) -> None: 

820 """Begin consuming messages. 

821 

822 Args: 

823 callback: 

824 A callback that will be called for each message received on the 

825 stream. 

826 on_callback_error: 

827 A callable that will be called if an exception is raised in 

828 the provided `callback`. 

829 """ 

830 if self.is_active: 

831 raise ValueError("This manager is already open.") 

832 

833 if self._closed: 

834 raise ValueError("This manager has been closed and can not be re-used.") 

835 

836 self._callback = functools.partial( 

837 _wrap_callback_errors, callback, on_callback_error 

838 ) 

839 

840 # Create the RPC 

841 stream_ack_deadline_seconds = self._stream_ack_deadline 

842 

843 get_initial_request = functools.partial( 

844 self._get_initial_request, stream_ack_deadline_seconds 

845 ) 

846 self._rpc = bidi.ResumableBidiRpc( 

847 start_rpc=self._client.streaming_pull, 

848 initial_request=get_initial_request, 

849 should_recover=self._should_recover, 

850 should_terminate=self._should_terminate, 

851 metadata=self._stream_metadata, 

852 throttle_reopen=True, 

853 ) 

854 self._rpc.add_done_callback(self._on_rpc_done) 

855 

856 _LOGGER.debug( 

857 "Creating a stream, default ACK deadline set to {} seconds.".format( 

858 self._stream_ack_deadline 

859 ) 

860 ) 

861 

862 # Create references to threads 

863 assert self._scheduler is not None 

864 scheduler_queue = self._scheduler.queue 

865 self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue) 

866 self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) 

867 self._leaser = leaser.Leaser(self) 

868 self._heartbeater = heartbeater.Heartbeater(self) 

869 

870 # Start the thread to pass the requests. 

871 self._dispatcher.start() 

872 

873 # Start consuming messages. 

874 self._consumer.start() 

875 

876 # Start the lease maintainer thread. 

877 self._leaser.start() 

878 

879 # Start the stream heartbeater thread. 

880 self._heartbeater.start() 

881 

882 def close(self, reason: Any = None) -> None: 

883 """Stop consuming messages and shutdown all helper threads. 

884 

885 This method is idempotent. Additional calls will have no effect. 

886 

887 The method does not block, it delegates the shutdown operations to a background 

888 thread. 

889 

890 Args: 

891 reason: 

892 The reason to close this. If ``None``, this is considered 

893 an "intentional" shutdown. This is passed to the callbacks 

894 specified via :meth:`add_close_callback`. 

895 """ 

896 self._regular_shutdown_thread = threading.Thread( 

897 name=_REGULAR_SHUTDOWN_THREAD_NAME, 

898 daemon=True, 

899 target=self._shutdown, 

900 kwargs={"reason": reason}, 

901 ) 

902 self._regular_shutdown_thread.start() 

903 

904 def _shutdown(self, reason: Any = None) -> None: 

905 """Run the actual shutdown sequence (stop the stream and all helper threads). 

906 

907 Args: 

908 reason: 

909 The reason to close the stream. If ``None``, this is considered 

910 an "intentional" shutdown. 

911 """ 

912 with self._closing: 

913 if self._closed: 

914 return 

915 

916 # Stop consuming messages. 

917 if self.is_active: 

918 _LOGGER.debug("Stopping consumer.") 

919 assert self._consumer is not None 

920 self._consumer.stop() 

921 self._consumer = None 

922 

923 # Shutdown all helper threads 

924 _LOGGER.debug("Stopping scheduler.") 

925 assert self._scheduler is not None 

926 dropped_messages = self._scheduler.shutdown( 

927 await_msg_callbacks=self._await_callbacks_on_shutdown 

928 ) 

929 self._scheduler = None 

930 

931 # Leaser and dispatcher reference each other through the shared 

932 # StreamingPullManager instance, i.e. "self", thus do not set their 

933 # references to None until both have been shut down. 

934 # 

935 # NOTE: Even if the dispatcher operates on an inactive leaser using 

936 # the latter's add() and remove() methods, these have no impact on 

937 # the stopped leaser (the leaser is never again re-started). Ditto 

938 # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), 

939 # because the consumer gets shut down first. 

940 _LOGGER.debug("Stopping leaser.") 

941 assert self._leaser is not None 

942 self._leaser.stop() 

943 

944 total = len(dropped_messages) + len( 

945 self._messages_on_hold._messages_on_hold 

946 ) 

947 _LOGGER.debug(f"NACK-ing all not-yet-dispatched messages (total: {total}).") 

948 messages_to_nack = itertools.chain( 

949 dropped_messages, self._messages_on_hold._messages_on_hold 

950 ) 

951 for msg in messages_to_nack: 

952 msg.nack() 

953 

954 _LOGGER.debug("Stopping dispatcher.") 

955 assert self._dispatcher is not None 

956 self._dispatcher.stop() 

957 self._dispatcher = None 

958 # dispatcher terminated, OK to dispose the leaser reference now 

959 self._leaser = None 

960 

961 _LOGGER.debug("Stopping heartbeater.") 

962 assert self._heartbeater is not None 

963 self._heartbeater.stop() 

964 self._heartbeater = None 

965 

966 self._rpc = None 

967 self._closed = True 

968 _LOGGER.debug("Finished stopping manager.") 

969 

970 for callback in self._close_callbacks: 

971 callback(self, reason) 

972 

973 def _get_initial_request( 

974 self, stream_ack_deadline_seconds: int 

975 ) -> gapic_types.StreamingPullRequest: 

976 """Return the initial request for the RPC. 

977 

978 This defines the initial request that must always be sent to Pub/Sub 

979 immediately upon opening the subscription. 

980 

981 Args: 

982 stream_ack_deadline_seconds: 

983 The default message acknowledge deadline for the stream. 

984 

985 Returns: 

986 A request suitable for being the first request on the stream (and not 

987 suitable for any other purpose). 

988 """ 

989 # Put the request together. 

990 # We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt 

991 # anyway. Set to some big-ish value in case we modack late. 

992 request = gapic_types.StreamingPullRequest( 

993 stream_ack_deadline_seconds=stream_ack_deadline_seconds, 

994 modify_deadline_ack_ids=[], 

995 modify_deadline_seconds=[], 

996 subscription=self._subscription, 

997 client_id=self._client_id, 

998 max_outstanding_messages=( 

999 0 if self._use_legacy_flow_control else self._flow_control.max_messages 

1000 ), 

1001 max_outstanding_bytes=( 

1002 0 if self._use_legacy_flow_control else self._flow_control.max_bytes 

1003 ), 

1004 ) 

1005 

1006 # Return the initial request. 

1007 return request 

1008 

1009 def _send_lease_modacks( 

1010 self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True 

1011 ) -> Set[str]: 

1012 exactly_once_enabled = False 

1013 with self._exactly_once_enabled_lock: 

1014 exactly_once_enabled = self._exactly_once_enabled 

1015 if exactly_once_enabled: 

1016 items = [ 

1017 requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) 

1018 for ack_id in ack_ids 

1019 ] 

1020 

1021 assert self._dispatcher is not None 

1022 self._dispatcher.modify_ack_deadline(items, ack_deadline) 

1023 

1024 expired_ack_ids = set() 

1025 for req in items: 

1026 try: 

1027 assert req.future is not None 

1028 req.future.result() 

1029 except AcknowledgeError as ack_error: 

1030 if ( 

1031 ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID 

1032 or warn_on_invalid 

1033 ): 

1034 _LOGGER.warning( 

1035 "AcknowledgeError when lease-modacking a message.", 

1036 exc_info=True, 

1037 ) 

1038 if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: 

1039 expired_ack_ids.add(req.ack_id) 

1040 return expired_ack_ids 

1041 else: 

1042 items = [ 

1043 requests.ModAckRequest(ack_id, self.ack_deadline, None) 

1044 for ack_id in ack_ids 

1045 ] 

1046 assert self._dispatcher is not None 

1047 self._dispatcher.modify_ack_deadline(items, ack_deadline) 

1048 return set() 

1049 

1050 def _exactly_once_delivery_enabled(self) -> bool: 

1051 """Whether exactly-once delivery is enabled for the subscription.""" 

1052 with self._exactly_once_enabled_lock: 

1053 return self._exactly_once_enabled 

1054 

1055 def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: 

1056 """Process all received Pub/Sub messages. 

1057 

1058 For each message, send a modified acknowledgment request to the 

1059 server. This prevents expiration of the message due to buffering by 

1060 gRPC or proxy/firewall. This makes the server and client expiration 

1061 timer closer to each other thus preventing the message being 

1062 redelivered multiple times. 

1063 

1064 After the messages have all had their ack deadline updated, execute 

1065 the callback for each message using the executor. 

1066 """ 

1067 if response is None: 

1068 _LOGGER.debug( 

1069 "Response callback invoked with None, likely due to a " 

1070 "transport shutdown." 

1071 ) 

1072 return 

1073 

1074 # IMPORTANT: Circumvent the wrapper class and operate on the raw underlying 

1075 # protobuf message to significantly gain on attribute access performance. 

1076 received_messages = response._pb.received_messages 

1077 

1078 _LOGGER.debug( 

1079 "Processing %s received message(s), currently on hold %s (bytes %s).", 

1080 len(received_messages), 

1081 self._messages_on_hold.size, 

1082 self._on_hold_bytes, 

1083 ) 

1084 

1085 with self._exactly_once_enabled_lock: 

1086 if ( 

1087 response.subscription_properties.exactly_once_delivery_enabled 

1088 != self._exactly_once_enabled 

1089 ): 

1090 self._exactly_once_enabled = ( 

1091 response.subscription_properties.exactly_once_delivery_enabled 

1092 ) 

1093 # Update ack_deadline, whose minimum depends on self._exactly_once_enabled 

1094 # This method acquires the self._ack_deadline_lock lock. 

1095 self._obtain_ack_deadline(maybe_update=True) 

1096 self._send_new_ack_deadline = True 

1097 

1098 # Immediately (i.e. without waiting for the auto lease management) 

1099 # modack the messages we received, as this tells the server that we've 

1100 # received them. 

1101 ack_id_gen = (message.ack_id for message in received_messages) 

1102 expired_ack_ids = self._send_lease_modacks( 

1103 ack_id_gen, self.ack_deadline, warn_on_invalid=False 

1104 ) 

1105 

1106 with self._pause_resume_lock: 

1107 assert self._scheduler is not None 

1108 assert self._leaser is not None 

1109 

1110 for received_message in received_messages: 

1111 if ( 

1112 not self._exactly_once_delivery_enabled() 

1113 or received_message.ack_id not in expired_ack_ids 

1114 ): 

1115 message = google.cloud.pubsub_v1.subscriber.message.Message( 

1116 received_message.message, 

1117 received_message.ack_id, 

1118 received_message.delivery_attempt, 

1119 self._scheduler.queue, 

1120 self._exactly_once_delivery_enabled, 

1121 ) 

1122 self._messages_on_hold.put(message) 

1123 self._on_hold_bytes += message.size 

1124 req = requests.LeaseRequest( 

1125 ack_id=message.ack_id, 

1126 byte_size=message.size, 

1127 ordering_key=message.ordering_key, 

1128 ) 

1129 self._leaser.add([req]) 

1130 

1131 self._maybe_release_messages() 

1132 

1133 self.maybe_pause_consumer() 

1134 

1135 def _should_recover(self, exception: BaseException) -> bool: 

1136 """Determine if an error on the RPC stream should be recovered. 

1137 

1138 If the exception is one of the retryable exceptions, this will signal 

1139 to the consumer thread that it should "recover" from the failure. 

1140 

1141 This will cause the stream to exit when it returns :data:`False`. 

1142 

1143 Returns: 

1144 Indicates if the caller should recover or shut down. 

1145 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1146 in a list of retryable / idempotent exceptions. 

1147 """ 

1148 exception = _wrap_as_exception(exception) 

1149 # If this is in the list of idempotent exceptions, then we want to 

1150 # recover. 

1151 if isinstance(exception, _RETRYABLE_STREAM_ERRORS): 

1152 _LOGGER.debug("Observed recoverable stream error %s", exception) 

1153 return True 

1154 _LOGGER.debug("Observed non-recoverable stream error %s", exception) 

1155 return False 

1156 

1157 def _should_terminate(self, exception: BaseException) -> bool: 

1158 """Determine if an error on the RPC stream should be terminated. 

1159 

1160 If the exception is one of the terminating exceptions, this will signal 

1161 to the consumer thread that it should terminate. 

1162 

1163 This will cause the stream to exit when it returns :data:`True`. 

1164 

1165 Returns: 

1166 Indicates if the caller should terminate or attempt recovery. 

1167 Will be :data:`True` if the ``exception`` is "acceptable", i.e. 

1168 in a list of terminating exceptions. 

1169 """ 

1170 exception = _wrap_as_exception(exception) 

1171 if isinstance(exception, _TERMINATING_STREAM_ERRORS): 

1172 _LOGGER.debug("Observed terminating stream error %s", exception) 

1173 return True 

1174 _LOGGER.debug("Observed non-terminating stream error %s", exception) 

1175 return False 

1176 

1177 def _on_rpc_done(self, future: Any) -> None: 

1178 """Triggered whenever the underlying RPC terminates without recovery. 

1179 

1180 This is typically triggered from one of two threads: the background 

1181 consumer thread (when calling ``recv()`` produces a non-recoverable 

1182 error) or the grpc management thread (when cancelling the RPC). 

1183 

1184 This method is *non-blocking*. It will start another thread to deal 

1185 with shutting everything down. This is to prevent blocking in the 

1186 background consumer and preventing it from being ``joined()``. 

1187 """ 

1188 _LOGGER.debug("RPC termination has signaled streaming pull manager shutdown.") 

1189 error = _wrap_as_exception(future) 

1190 thread = threading.Thread( 

1191 name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} 

1192 ) 

1193 thread.daemon = True 

1194 thread.start()