Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/client.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

198 statements  

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import copy 

18import logging 

19import os 

20import threading 

21import time 

22import typing 

23from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union 

24import warnings 

25import sys 

26 

27from google.api_core import gapic_v1 

28from google.auth.credentials import AnonymousCredentials # type: ignore 

29from google.oauth2 import service_account # type: ignore 

30 

31from google.cloud.pubsub_v1 import types 

32from google.cloud.pubsub_v1.publisher import exceptions 

33from google.cloud.pubsub_v1.publisher import futures 

34from google.cloud.pubsub_v1.publisher._batch import thread 

35from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer 

36from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer 

37from google.cloud.pubsub_v1.publisher.flow_controller import FlowController 

38from google.pubsub_v1 import gapic_version as package_version 

39from google.pubsub_v1 import types as gapic_types 

40from google.pubsub_v1.services.publisher import client as publisher_client 

41from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( 

42 PublishMessageWrapper, 

43) 

44 

45__version__ = package_version.__version__ 

46 

47if typing.TYPE_CHECKING: # pragma: NO COVER 

48 from google.cloud import pubsub_v1 

49 from google.cloud.pubsub_v1.publisher import _batch 

50 from google.pubsub_v1.services.publisher.client import OptionalRetry 

51 from google.pubsub_v1.types import pubsub as pubsub_types 

52 

53 

54_LOGGER = logging.getLogger(__name__) 

55 

56 

57_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() 

58 

59SequencerType = Union[ 

60 ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer 

61] 

62 

63 

64class Client(publisher_client.PublisherClient): 

65 """A publisher client for Google Cloud Pub/Sub. 

66 

67 This creates an object that is capable of publishing messages. 

68 Generally, you can instantiate this client with no arguments, and you 

69 get sensible defaults. 

70 

71 Args: 

72 batch_settings: 

73 The settings for batch publishing. 

74 publisher_options: 

75 The options for the publisher client. Note that enabling message ordering 

76 will override the publish retry timeout to be infinite. 

77 kwargs: 

78 Any additional arguments provided are sent as keyword arguments to the 

79 underlying 

80 :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`. 

81 Generally you should not need to set additional keyword 

82 arguments. Regional endpoints can be set via ``client_options`` that 

83 takes a single key-value pair that defines the endpoint. 

84 

85 Example: 

86 

87 .. code-block:: python 

88 

89 from google.cloud import pubsub_v1 

90 

91 publisher_client = pubsub_v1.PublisherClient( 

92 # Optional 

93 batch_settings = pubsub_v1.types.BatchSettings( 

94 max_bytes=1024, # One kilobyte 

95 max_latency=1, # One second 

96 ), 

97 

98 # Optional 

99 publisher_options = pubsub_v1.types.PublisherOptions( 

100 enable_message_ordering=False, 

101 flow_control=pubsub_v1.types.PublishFlowControl( 

102 message_limit=2000, 

103 limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, 

104 ), 

105 ), 

106 

107 # Optional 

108 client_options = { 

109 "api_endpoint": REGIONAL_ENDPOINT 

110 } 

111 ) 

112 """ 

113 

114 def __init__( 

115 self, 

116 batch_settings: Union[types.BatchSettings, Sequence] = (), 

117 publisher_options: Union[types.PublisherOptions, Sequence] = (), 

118 **kwargs: Any, 

119 ): 

120 assert ( 

121 type(batch_settings) is types.BatchSettings or len(batch_settings) == 0 

122 ), "batch_settings must be of type BatchSettings or an empty sequence." 

123 assert ( 

124 type(publisher_options) is types.PublisherOptions 

125 or len(publisher_options) == 0 

126 ), "publisher_options must be of type PublisherOptions or an empty sequence." 

127 

128 # Sanity check: Is our goal to use the emulator? 

129 # If so, create a grpc insecure channel with the emulator host 

130 # as the target. 

131 # TODO(https://github.com/googleapis/python-pubsub/issues/1349): Move the emulator 

132 # code below to test files. 

133 if os.environ.get("PUBSUB_EMULATOR_HOST"): 

134 kwargs["client_options"] = { 

135 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") 

136 } 

137 # Configure credentials directly to transport, if provided. 

138 if "transport" not in kwargs: 

139 kwargs["credentials"] = AnonymousCredentials() 

140 

141 # For a transient failure, retry publishing the message infinitely. 

142 self.publisher_options = types.PublisherOptions(*publisher_options) 

143 self._enable_message_ordering = self.publisher_options[0] 

144 

145 # Add the metrics headers, and instantiate the underlying GAPIC 

146 # client. 

147 super().__init__(**kwargs) 

148 self._target = self._transport._host 

149 self._batch_class = thread.Batch 

150 self.batch_settings = types.BatchSettings(*batch_settings) 

151 

152 # The batches on the publisher client are responsible for holding 

153 # messages. One batch exists for each topic. 

154 self._batch_lock = self._batch_class.make_lock() 

155 # (topic, ordering_key) => sequencers object 

156 self._sequencers: Dict[Tuple[str, str], SequencerType] = {} 

157 self._is_stopped = False 

158 # Thread created to commit all sequencers after a timeout. 

159 self._commit_thread: Optional[threading.Thread] = None 

160 

161 # The object controlling the message publishing flow 

162 self._flow_controller = FlowController(self.publisher_options.flow_control) 

163 

164 self._open_telemetry_enabled = ( 

165 self.publisher_options.enable_open_telemetry_tracing 

166 ) 

167 # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. 

168 # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 

169 if ( 

170 self.publisher_options.enable_open_telemetry_tracing 

171 and sys.version_info.major == 3 

172 and sys.version_info.minor < 8 

173 ): 

174 warnings.warn( 

175 message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", 

176 category=RuntimeWarning, 

177 ) 

178 self._open_telemetry_enabled = False 

179 

180 @classmethod 

181 def from_service_account_file( # type: ignore[override] 

182 cls, 

183 filename: str, 

184 batch_settings: Union[types.BatchSettings, Sequence] = (), 

185 **kwargs: Any, 

186 ) -> "Client": 

187 """Creates an instance of this client using the provided credentials 

188 file. 

189 

190 Args: 

191 filename: 

192 The path to the service account private key JSON file. 

193 batch_settings: 

194 The settings for batch publishing. 

195 kwargs: 

196 Additional arguments to pass to the constructor. 

197 

198 Returns: 

199 A Publisher instance that is the constructed client. 

200 """ 

201 credentials = service_account.Credentials.from_service_account_file(filename) 

202 kwargs["credentials"] = credentials 

203 return cls(batch_settings, **kwargs) 

204 

205 from_service_account_json = from_service_account_file # type: ignore[assignment] 

206 

207 @property 

208 def target(self) -> str: 

209 """Return the target (where the API is). 

210 

211 Returns: 

212 The location of the API. 

213 """ 

214 return self._target 

215 

216 @property 

217 def api(self): 

218 """The underlying gapic API client. 

219 

220 .. versionchanged:: 2.10.0 

221 Instead of a GAPIC ``PublisherClient`` client instance, this property is a 

222 proxy object to it with the same interface. 

223 

224 .. deprecated:: 2.10.0 

225 Use the GAPIC methods and properties on the client instance directly 

226 instead of through the :attr:`api` attribute. 

227 """ 

228 msg = ( 

229 'The "api" property only exists for backward compatibility, access its ' 

230 'attributes directly thorugh the client instance (e.g. "client.foo" ' 

231 'instead of "client.api.foo").' 

232 ) 

233 warnings.warn(msg, category=DeprecationWarning) 

234 return super() 

235 

236 @property 

237 def open_telemetry_enabled(self) -> bool: 

238 return self._open_telemetry_enabled 

239 

240 def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType: 

241 """Get an existing sequencer or create a new one given the (topic, 

242 ordering_key) pair. 

243 """ 

244 sequencer_key = (topic, ordering_key) 

245 sequencer = self._sequencers.get(sequencer_key) 

246 if sequencer is None: 

247 if ordering_key == "": 

248 sequencer = unordered_sequencer.UnorderedSequencer(self, topic) 

249 else: 

250 sequencer = ordered_sequencer.OrderedSequencer( 

251 self, topic, ordering_key 

252 ) 

253 self._sequencers[sequencer_key] = sequencer 

254 

255 return sequencer 

256 

257 def resume_publish(self, topic: str, ordering_key: str) -> None: 

258 """Resume publish on an ordering key that has had unrecoverable errors. 

259 

260 Args: 

261 topic: The topic to publish messages to. 

262 ordering_key: A string that identifies related messages for which 

263 publish order should be respected. 

264 

265 Raises: 

266 RuntimeError: 

267 If called after publisher has been stopped by a `stop()` method 

268 call. 

269 ValueError: 

270 If the topic/ordering key combination has not been seen before 

271 by this client. 

272 """ 

273 with self._batch_lock: 

274 if self._is_stopped: 

275 raise RuntimeError("Cannot resume publish on a stopped publisher.") 

276 

277 if not self._enable_message_ordering: 

278 raise ValueError( 

279 "Cannot resume publish on a topic/ordering key if ordering " 

280 "is not enabled." 

281 ) 

282 

283 sequencer_key = (topic, ordering_key) 

284 sequencer = self._sequencers.get(sequencer_key) 

285 if sequencer is None: 

286 _LOGGER.debug( 

287 "Error: The topic/ordering key combination has not " 

288 "been seen before." 

289 ) 

290 else: 

291 sequencer.unpause() 

292 

293 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse": 

294 """Call the GAPIC public API directly.""" 

295 return super().publish(*args, **kwargs) 

296 

297 def publish( # type: ignore[override] 

298 self, 

299 topic: str, 

300 data: bytes, 

301 ordering_key: str = "", 

302 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

303 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

304 **attrs: Union[bytes, str], 

305 ) -> "pubsub_v1.publisher.futures.Future": 

306 """Publish a single message. 

307 

308 .. note:: 

309 Messages in Pub/Sub are blobs of bytes. They are *binary* data, 

310 not text. You must send data as a bytestring 

311 (``bytes`` in Python 3; ``str`` in Python 2), and this library 

312 will raise an exception if you send a text string. 

313 

314 The reason that this is so important (and why we do not try to 

315 coerce for you) is because Pub/Sub is also platform independent 

316 and there is no way to know how to decode messages properly on 

317 the other side; therefore, encoding and decoding is a required 

318 exercise for the developer. 

319 

320 Add the given message to this object; this will cause it to be 

321 published once the batch either has enough messages or a sufficient 

322 period of time has elapsed. 

323 This method may block if LimitExceededBehavior.BLOCK is used in the 

324 flow control settings. 

325 

326 Example: 

327 >>> from google.cloud import pubsub_v1 

328 >>> client = pubsub_v1.PublisherClient() 

329 >>> topic = client.topic_path('[PROJECT]', '[TOPIC]') 

330 >>> data = b'The rain in Wales falls mainly on the snails.' 

331 >>> response = client.publish(topic, data, username='guido') 

332 

333 Args: 

334 topic: The topic to publish messages to. 

335 data: A bytestring representing the message body. This 

336 must be a bytestring. 

337 ordering_key: A string that identifies related messages for which 

338 publish order should be respected. Message ordering must be 

339 enabled for this client to use this feature. 

340 retry: 

341 Designation of what errors, if any, should be retried. If `ordering_key` 

342 is specified, the total retry deadline will be changed to "infinity". 

343 If given, it overides any retry passed into the client through 

344 the ``publisher_options`` argument. 

345 timeout: 

346 The timeout for the RPC request. Can be used to override any timeout 

347 passed in through ``publisher_options`` when instantiating the client. 

348 

349 attrs: A dictionary of attributes to be 

350 sent as metadata. (These may be text strings or byte strings.) 

351 

352 Returns: 

353 A :class:`~google.cloud.pubsub_v1.publisher.futures.Future` 

354 instance that conforms to Python Standard library's 

355 :class:`~concurrent.futures.Future` interface (but not an 

356 instance of that class). 

357 

358 Raises: 

359 RuntimeError: 

360 If called after publisher has been stopped by a `stop()` method 

361 call. 

362 

363 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

364 the ``message`` would exceed the max size limit on the backend. 

365 """ 

366 # Sanity check: Is the data being sent as a bytestring? 

367 # If it is literally anything else, complain loudly about it. 

368 if not isinstance(data, bytes): 

369 raise TypeError( 

370 "Data being published to Pub/Sub must be sent as a bytestring." 

371 ) 

372 

373 if not self._enable_message_ordering and ordering_key != "": 

374 raise ValueError( 

375 "Cannot publish a message with an ordering key when message " 

376 "ordering is not enabled." 

377 ) 

378 

379 # Coerce all attributes to text strings. 

380 for k, v in copy.copy(attrs).items(): 

381 if isinstance(v, str): 

382 continue 

383 if isinstance(v, bytes): 

384 attrs[k] = v.decode("utf-8") 

385 continue 

386 raise TypeError( 

387 "All attributes being published to Pub/Sub must " 

388 "be sent as text strings." 

389 ) 

390 

391 # Create the Pub/Sub message object. For performance reasons, the message 

392 # should be constructed by directly using the raw protobuf class, and only 

393 # then wrapping it into the higher-level PubsubMessage class. 

394 vanilla_pb = _raw_proto_pubbsub_message( 

395 data=data, ordering_key=ordering_key, attributes=attrs 

396 ) 

397 message = gapic_types.PubsubMessage.wrap(vanilla_pb) 

398 

399 wrapper: PublishMessageWrapper = PublishMessageWrapper(message) 

400 if self._open_telemetry_enabled: 

401 wrapper.start_create_span(topic=topic, ordering_key=ordering_key) 

402 

403 # Messages should go through flow control to prevent excessive 

404 # queuing on the client side (depending on the settings). 

405 try: 

406 if self._open_telemetry_enabled: 

407 if wrapper: 

408 wrapper.start_publisher_flow_control_span() 

409 else: # pragma: NO COVER 

410 warnings.warn( 

411 message="PubSubMessageWrapper is None. Not starting publisher flow control span.", 

412 category=RuntimeWarning, 

413 ) 

414 self._flow_controller.add(message) 

415 if self._open_telemetry_enabled: 

416 if wrapper: 

417 wrapper.end_publisher_flow_control_span() 

418 else: # pragma: NO COVER 

419 warnings.warn( 

420 message="PubSubMessageWrapper is None. Not ending publisher flow control span.", 

421 category=RuntimeWarning, 

422 ) 

423 except exceptions.FlowControlLimitError as exc: 

424 if self._open_telemetry_enabled: 

425 if wrapper: 

426 wrapper.end_publisher_flow_control_span(exc) 

427 wrapper.end_create_span(exc) 

428 else: # pragma: NO COVER 

429 warnings.warn( 

430 message="PubSubMessageWrapper is None. Not ending publisher create and flow control spans on FlowControlLimitError.", 

431 category=RuntimeWarning, 

432 ) 

433 

434 future = futures.Future() 

435 future.set_exception(exc) 

436 return future 

437 

438 def on_publish_done(future): 

439 self._flow_controller.release(message) 

440 

441 if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in 

442 retry = self.publisher_options.retry 

443 

444 if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in 

445 timeout = self.publisher_options.timeout 

446 

447 if self._open_telemetry_enabled: 

448 if wrapper: 

449 wrapper.start_publisher_batching_span() 

450 else: # pragma: NO COVER 

451 warnings.warn( 

452 message="PublishMessageWrapper is None. Hence, not starting publisher batching span", 

453 category=RuntimeWarning, 

454 ) 

455 with self._batch_lock: 

456 try: 

457 if self._is_stopped: 

458 raise RuntimeError("Cannot publish on a stopped publisher.") 

459 

460 # Set retry timeout to "infinite" when message ordering is enabled. 

461 # Note that this then also impacts messages added with an empty 

462 # ordering key. 

463 if self._enable_message_ordering: 

464 if retry is gapic_v1.method.DEFAULT: 

465 # use the default retry for the publish GRPC method as a base 

466 transport = self._transport 

467 base_retry = transport._wrapped_methods[ 

468 transport.publish 

469 ]._retry 

470 retry = base_retry.with_deadline(2.0**32) 

471 # timeout needs to be overridden and set to infinite in 

472 # addition to the retry deadline since both determine 

473 # the duration for which retries are attempted. 

474 timeout = 2.0**32 

475 elif retry is not None: 

476 retry = retry.with_deadline(2.0**32) 

477 timeout = 2.0**32 

478 

479 # Delegate the publishing to the sequencer. 

480 sequencer = self._get_or_create_sequencer(topic, ordering_key) 

481 future = sequencer.publish( 

482 wrapper=wrapper, retry=retry, timeout=timeout 

483 ) 

484 future.add_done_callback(on_publish_done) 

485 except BaseException as be: 

486 # Exceptions can be thrown when attempting to add messages to 

487 # the batch. If they're thrown, record them in publisher 

488 # batching and create span, end the spans and bubble the 

489 # exception up. 

490 if self._open_telemetry_enabled: 

491 if wrapper: 

492 wrapper.end_publisher_batching_span(be) 

493 wrapper.end_create_span(be) 

494 else: # pragma: NO COVER 

495 warnings.warn( 

496 message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span", 

497 category=RuntimeWarning, 

498 ) 

499 raise be 

500 

501 if self._open_telemetry_enabled: 

502 if wrapper: 

503 wrapper.end_publisher_batching_span() 

504 else: # pragma: NO COVER 

505 warnings.warn( 

506 message="PublishMessageWrapper is None. Hence, not ending publisher batching span", 

507 category=RuntimeWarning, 

508 ) 

509 

510 # Create a timer thread if necessary to enforce the batching 

511 # timeout. 

512 self._ensure_commit_timer_runs_no_lock() 

513 

514 return future 

515 

516 def ensure_cleanup_and_commit_timer_runs(self) -> None: 

517 """Ensure a cleanup/commit timer thread is running. 

518 

519 If a cleanup/commit timer thread is already running, this does nothing. 

520 """ 

521 with self._batch_lock: 

522 self._ensure_commit_timer_runs_no_lock() 

523 

524 def _ensure_commit_timer_runs_no_lock(self) -> None: 

525 """Ensure a commit timer thread is running, without taking 

526 _batch_lock. 

527 

528 _batch_lock must be held before calling this method. 

529 """ 

530 if not self._commit_thread and self.batch_settings.max_latency < float("inf"): 

531 self._start_commit_thread() 

532 

533 def _start_commit_thread(self) -> None: 

534 """Start a new thread to actually wait and commit the sequencers.""" 

535 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. 

536 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

537 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

538 self._commit_thread = threading.Thread( 

539 name="Thread-PubSubBatchCommitter", 

540 target=self._wait_and_commit_sequencers, 

541 daemon=True, 

542 ) 

543 self._commit_thread.start() 

544 

545 def _wait_and_commit_sequencers(self) -> None: 

546 """Wait up to the batching timeout, and commit all sequencers.""" 

547 # Sleep for however long we should be waiting. 

548 time.sleep(self.batch_settings.max_latency) 

549 _LOGGER.debug("Commit thread is waking up") 

550 

551 with self._batch_lock: 

552 if self._is_stopped: 

553 return 

554 self._commit_sequencers() 

555 self._commit_thread = None 

556 

557 def _commit_sequencers(self) -> None: 

558 """Clean up finished sequencers and commit the rest.""" 

559 finished_sequencer_keys = [ 

560 key 

561 for key, sequencer in self._sequencers.items() 

562 if sequencer.is_finished() 

563 ] 

564 for sequencer_key in finished_sequencer_keys: 

565 del self._sequencers[sequencer_key] 

566 

567 for sequencer in self._sequencers.values(): 

568 sequencer.commit() 

569 

570 def stop(self) -> None: 

571 """Immediately publish all outstanding messages. 

572 

573 Asynchronously sends all outstanding messages and 

574 prevents future calls to `publish()`. Method should 

575 be invoked prior to deleting this `Client()` object 

576 in order to ensure that no pending messages are lost. 

577 

578 .. note:: 

579 

580 This method is non-blocking. Use `Future()` objects 

581 returned by `publish()` to make sure all publish 

582 requests completed, either in success or error. 

583 

584 Raises: 

585 RuntimeError: 

586 If called after publisher has been stopped by a `stop()` method 

587 call. 

588 """ 

589 with self._batch_lock: 

590 if self._is_stopped: 

591 raise RuntimeError("Cannot stop a publisher already stopped.") 

592 

593 self._is_stopped = True 

594 

595 for sequencer in self._sequencers.values(): 

596 sequencer.stop() 

597 

598 # Used only for testing. 

599 def _set_batch( 

600 self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = "" 

601 ) -> None: 

602 sequencer = self._get_or_create_sequencer(topic, ordering_key) 

603 sequencer._set_batch(batch) 

604 

605 # Used only for testing. 

606 def _set_batch_class(self, batch_class: Type) -> None: 

607 self._batch_class = batch_class 

608 

609 # Used only for testing. 

610 def _set_sequencer( 

611 self, topic: str, sequencer: SequencerType, ordering_key: str = "" 

612 ) -> None: 

613 sequencer_key = (topic, ordering_key) 

614 self._sequencers[sequencer_key] = sequencer