Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/client.py: 30%

158 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import copy 

18import logging 

19import os 

20import threading 

21import time 

22import typing 

23from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union 

24import warnings 

25 

26from google.api_core import gapic_v1 

27from google.auth.credentials import AnonymousCredentials # type: ignore 

28from google.oauth2 import service_account # type: ignore 

29 

30from google.cloud.pubsub_v1 import types 

31from google.cloud.pubsub_v1.publisher import exceptions 

32from google.cloud.pubsub_v1.publisher import futures 

33from google.cloud.pubsub_v1.publisher._batch import thread 

34from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer 

35from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer 

36from google.cloud.pubsub_v1.publisher.flow_controller import FlowController 

37from google.pubsub_v1 import gapic_version as package_version 

38from google.pubsub_v1 import types as gapic_types 

39from google.pubsub_v1.services.publisher import client as publisher_client 

40 

41__version__ = package_version.__version__ 

42 

43if typing.TYPE_CHECKING: # pragma: NO COVER 

44 from google.cloud import pubsub_v1 

45 from google.cloud.pubsub_v1.publisher import _batch 

46 from google.pubsub_v1.services.publisher.client import OptionalRetry 

47 from google.pubsub_v1.types import pubsub as pubsub_types 

48 

49 

50_LOGGER = logging.getLogger(__name__) 

51 

52 

53_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() 

54 

55SequencerType = Union[ 

56 ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer 

57] 

58 

59 

60class Client(publisher_client.PublisherClient): 

61 """A publisher client for Google Cloud Pub/Sub. 

62 

63 This creates an object that is capable of publishing messages. 

64 Generally, you can instantiate this client with no arguments, and you 

65 get sensible defaults. 

66 

67 Args: 

68 batch_settings: 

69 The settings for batch publishing. 

70 publisher_options: 

71 The options for the publisher client. Note that enabling message ordering 

72 will override the publish retry timeout to be infinite. 

73 kwargs: 

74 Any additional arguments provided are sent as keyword arguments to the 

75 underlying 

76 :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`. 

77 Generally you should not need to set additional keyword 

78 arguments. Regional endpoints can be set via ``client_options`` that 

79 takes a single key-value pair that defines the endpoint. 

80 

81 Example: 

82 

83 .. code-block:: python 

84 

85 from google.cloud import pubsub_v1 

86 

87 publisher_client = pubsub_v1.PublisherClient( 

88 # Optional 

89 batch_settings = pubsub_v1.types.BatchSettings( 

90 max_bytes=1024, # One kilobyte 

91 max_latency=1, # One second 

92 ), 

93 

94 # Optional 

95 publisher_options = pubsub_v1.types.PublisherOptions( 

96 enable_message_ordering=False, 

97 flow_control=pubsub_v1.types.PublishFlowControl( 

98 message_limit=2000, 

99 limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, 

100 ), 

101 ), 

102 

103 # Optional 

104 client_options = { 

105 "api_endpoint": REGIONAL_ENDPOINT 

106 } 

107 ) 

108 """ 

109 

110 def __init__( 

111 self, 

112 batch_settings: Union[types.BatchSettings, Sequence] = (), 

113 publisher_options: Union[types.PublisherOptions, Sequence] = (), 

114 **kwargs: Any, 

115 ): 

116 assert ( 

117 type(batch_settings) is types.BatchSettings or len(batch_settings) == 0 

118 ), "batch_settings must be of type BatchSettings or an empty sequence." 

119 assert ( 

120 type(publisher_options) is types.PublisherOptions 

121 or len(publisher_options) == 0 

122 ), "publisher_options must be of type PublisherOptions or an empty sequence." 

123 

124 # Sanity check: Is our goal to use the emulator? 

125 # If so, create a grpc insecure channel with the emulator host 

126 # as the target. 

127 if os.environ.get("PUBSUB_EMULATOR_HOST"): 

128 kwargs["client_options"] = { 

129 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") 

130 } 

131 kwargs["credentials"] = AnonymousCredentials() 

132 

133 # For a transient failure, retry publishing the message infinitely. 

134 self.publisher_options = types.PublisherOptions(*publisher_options) 

135 self._enable_message_ordering = self.publisher_options[0] 

136 

137 # Add the metrics headers, and instantiate the underlying GAPIC 

138 # client. 

139 super().__init__(**kwargs) 

140 self._target = self._transport._host 

141 self._batch_class = thread.Batch 

142 self.batch_settings = types.BatchSettings(*batch_settings) 

143 

144 # The batches on the publisher client are responsible for holding 

145 # messages. One batch exists for each topic. 

146 self._batch_lock = self._batch_class.make_lock() 

147 # (topic, ordering_key) => sequencers object 

148 self._sequencers: Dict[Tuple[str, str], SequencerType] = {} 

149 self._is_stopped = False 

150 # Thread created to commit all sequencers after a timeout. 

151 self._commit_thread: Optional[threading.Thread] = None 

152 

153 # The object controlling the message publishing flow 

154 self._flow_controller = FlowController(self.publisher_options.flow_control) 

155 

156 @classmethod 

157 def from_service_account_file( # type: ignore[override] 

158 cls, 

159 filename: str, 

160 batch_settings: Union[types.BatchSettings, Sequence] = (), 

161 **kwargs: Any, 

162 ) -> "Client": 

163 """Creates an instance of this client using the provided credentials 

164 file. 

165 

166 Args: 

167 filename: 

168 The path to the service account private key JSON file. 

169 batch_settings: 

170 The settings for batch publishing. 

171 kwargs: 

172 Additional arguments to pass to the constructor. 

173 

174 Returns: 

175 A Publisher instance that is the constructed client. 

176 """ 

177 credentials = service_account.Credentials.from_service_account_file(filename) 

178 kwargs["credentials"] = credentials 

179 return cls(batch_settings, **kwargs) 

180 

181 from_service_account_json = from_service_account_file # type: ignore[assignment] 

182 

183 @property 

184 def target(self) -> str: 

185 """Return the target (where the API is). 

186 

187 Returns: 

188 The location of the API. 

189 """ 

190 return self._target 

191 

192 @property 

193 def api(self): 

194 """The underlying gapic API client. 

195 

196 .. versionchanged:: 2.10.0 

197 Instead of a GAPIC ``PublisherClient`` client instance, this property is a 

198 proxy object to it with the same interface. 

199 

200 .. deprecated:: 2.10.0 

201 Use the GAPIC methods and properties on the client instance directly 

202 instead of through the :attr:`api` attribute. 

203 """ 

204 msg = ( 

205 'The "api" property only exists for backward compatibility, access its ' 

206 'attributes directly thorugh the client instance (e.g. "client.foo" ' 

207 'instead of "client.api.foo").' 

208 ) 

209 warnings.warn(msg, category=DeprecationWarning) 

210 return super() 

211 

212 def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType: 

213 """Get an existing sequencer or create a new one given the (topic, 

214 ordering_key) pair. 

215 """ 

216 sequencer_key = (topic, ordering_key) 

217 sequencer = self._sequencers.get(sequencer_key) 

218 if sequencer is None: 

219 if ordering_key == "": 

220 sequencer = unordered_sequencer.UnorderedSequencer(self, topic) 

221 else: 

222 sequencer = ordered_sequencer.OrderedSequencer( 

223 self, topic, ordering_key 

224 ) 

225 self._sequencers[sequencer_key] = sequencer 

226 

227 return sequencer 

228 

229 def resume_publish(self, topic: str, ordering_key: str) -> None: 

230 """Resume publish on an ordering key that has had unrecoverable errors. 

231 

232 Args: 

233 topic: The topic to publish messages to. 

234 ordering_key: A string that identifies related messages for which 

235 publish order should be respected. 

236 

237 Raises: 

238 RuntimeError: 

239 If called after publisher has been stopped by a `stop()` method 

240 call. 

241 ValueError: 

242 If the topic/ordering key combination has not been seen before 

243 by this client. 

244 """ 

245 with self._batch_lock: 

246 if self._is_stopped: 

247 raise RuntimeError("Cannot resume publish on a stopped publisher.") 

248 

249 if not self._enable_message_ordering: 

250 raise ValueError( 

251 "Cannot resume publish on a topic/ordering key if ordering " 

252 "is not enabled." 

253 ) 

254 

255 sequencer_key = (topic, ordering_key) 

256 sequencer = self._sequencers.get(sequencer_key) 

257 if sequencer is None: 

258 _LOGGER.debug( 

259 "Error: The topic/ordering key combination has not " 

260 "been seen before." 

261 ) 

262 else: 

263 sequencer.unpause() 

264 

265 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse": 

266 """Call the GAPIC public API directly.""" 

267 return super().publish(*args, **kwargs) 

268 

269 def publish( # type: ignore[override] 

270 self, 

271 topic: str, 

272 data: bytes, 

273 ordering_key: str = "", 

274 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

275 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

276 **attrs: Union[bytes, str], 

277 ) -> "pubsub_v1.publisher.futures.Future": 

278 """Publish a single message. 

279 

280 .. note:: 

281 Messages in Pub/Sub are blobs of bytes. They are *binary* data, 

282 not text. You must send data as a bytestring 

283 (``bytes`` in Python 3; ``str`` in Python 2), and this library 

284 will raise an exception if you send a text string. 

285 

286 The reason that this is so important (and why we do not try to 

287 coerce for you) is because Pub/Sub is also platform independent 

288 and there is no way to know how to decode messages properly on 

289 the other side; therefore, encoding and decoding is a required 

290 exercise for the developer. 

291 

292 Add the given message to this object; this will cause it to be 

293 published once the batch either has enough messages or a sufficient 

294 period of time has elapsed. 

295 This method may block if LimitExceededBehavior.BLOCK is used in the 

296 flow control settings. 

297 

298 Example: 

299 >>> from google.cloud import pubsub_v1 

300 >>> client = pubsub_v1.PublisherClient() 

301 >>> topic = client.topic_path('[PROJECT]', '[TOPIC]') 

302 >>> data = b'The rain in Wales falls mainly on the snails.' 

303 >>> response = client.publish(topic, data, username='guido') 

304 

305 Args: 

306 topic: The topic to publish messages to. 

307 data: A bytestring representing the message body. This 

308 must be a bytestring. 

309 ordering_key: A string that identifies related messages for which 

310 publish order should be respected. Message ordering must be 

311 enabled for this client to use this feature. 

312 retry: 

313 Designation of what errors, if any, should be retried. If `ordering_key` 

314 is specified, the total retry deadline will be changed to "infinity". 

315 If given, it overides any retry passed into the client through 

316 the ``publisher_options`` argument. 

317 timeout: 

318 The timeout for the RPC request. Can be used to override any timeout 

319 passed in through ``publisher_options`` when instantiating the client. 

320 

321 attrs: A dictionary of attributes to be 

322 sent as metadata. (These may be text strings or byte strings.) 

323 

324 Returns: 

325 A :class:`~google.cloud.pubsub_v1.publisher.futures.Future` 

326 instance that conforms to Python Standard library's 

327 :class:`~concurrent.futures.Future` interface (but not an 

328 instance of that class). 

329 

330 Raises: 

331 RuntimeError: 

332 If called after publisher has been stopped by a `stop()` method 

333 call. 

334 

335 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

336 the ``message`` would exceed the max size limit on the backend. 

337 """ 

338 # Sanity check: Is the data being sent as a bytestring? 

339 # If it is literally anything else, complain loudly about it. 

340 if not isinstance(data, bytes): 

341 raise TypeError( 

342 "Data being published to Pub/Sub must be sent as a bytestring." 

343 ) 

344 

345 if not self._enable_message_ordering and ordering_key != "": 

346 raise ValueError( 

347 "Cannot publish a message with an ordering key when message " 

348 "ordering is not enabled." 

349 ) 

350 

351 # Coerce all attributes to text strings. 

352 for k, v in copy.copy(attrs).items(): 

353 if isinstance(v, str): 

354 continue 

355 if isinstance(v, bytes): 

356 attrs[k] = v.decode("utf-8") 

357 continue 

358 raise TypeError( 

359 "All attributes being published to Pub/Sub must " 

360 "be sent as text strings." 

361 ) 

362 

363 # Create the Pub/Sub message object. For performance reasons, the message 

364 # should be constructed by directly using the raw protobuf class, and only 

365 # then wrapping it into the higher-level PubsubMessage class. 

366 vanilla_pb = _raw_proto_pubbsub_message( 

367 data=data, ordering_key=ordering_key, attributes=attrs 

368 ) 

369 message = gapic_types.PubsubMessage.wrap(vanilla_pb) 

370 

371 # Messages should go through flow control to prevent excessive 

372 # queuing on the client side (depending on the settings). 

373 try: 

374 self._flow_controller.add(message) 

375 except exceptions.FlowControlLimitError as exc: 

376 future = futures.Future() 

377 future.set_exception(exc) 

378 return future 

379 

380 def on_publish_done(future): 

381 self._flow_controller.release(message) 

382 

383 if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in 

384 retry = self.publisher_options.retry 

385 

386 if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in 

387 timeout = self.publisher_options.timeout 

388 

389 with self._batch_lock: 

390 if self._is_stopped: 

391 raise RuntimeError("Cannot publish on a stopped publisher.") 

392 

393 # Set retry timeout to "infinite" when message ordering is enabled. 

394 # Note that this then also impacts messages added with an empty 

395 # ordering key. 

396 if self._enable_message_ordering: 

397 if retry is gapic_v1.method.DEFAULT: 

398 # use the default retry for the publish GRPC method as a base 

399 transport = self._transport 

400 base_retry = transport._wrapped_methods[transport.publish]._retry 

401 retry = base_retry.with_deadline(2.0**32) 

402 else: 

403 retry = retry.with_deadline(2.0**32) 

404 

405 # Delegate the publishing to the sequencer. 

406 sequencer = self._get_or_create_sequencer(topic, ordering_key) 

407 future = sequencer.publish(message, retry=retry, timeout=timeout) 

408 future.add_done_callback(on_publish_done) 

409 

410 # Create a timer thread if necessary to enforce the batching 

411 # timeout. 

412 self._ensure_commit_timer_runs_no_lock() 

413 

414 return future 

415 

416 def ensure_cleanup_and_commit_timer_runs(self) -> None: 

417 """Ensure a cleanup/commit timer thread is running. 

418 

419 If a cleanup/commit timer thread is already running, this does nothing. 

420 """ 

421 with self._batch_lock: 

422 self._ensure_commit_timer_runs_no_lock() 

423 

424 def _ensure_commit_timer_runs_no_lock(self) -> None: 

425 """Ensure a commit timer thread is running, without taking 

426 _batch_lock. 

427 

428 _batch_lock must be held before calling this method. 

429 """ 

430 if not self._commit_thread and self.batch_settings.max_latency < float("inf"): 

431 self._start_commit_thread() 

432 

433 def _start_commit_thread(self) -> None: 

434 """Start a new thread to actually wait and commit the sequencers.""" 

435 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. 

436 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

437 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

438 self._commit_thread = threading.Thread( 

439 name="Thread-PubSubBatchCommitter", 

440 target=self._wait_and_commit_sequencers, 

441 daemon=True, 

442 ) 

443 self._commit_thread.start() 

444 

445 def _wait_and_commit_sequencers(self) -> None: 

446 """Wait up to the batching timeout, and commit all sequencers.""" 

447 # Sleep for however long we should be waiting. 

448 time.sleep(self.batch_settings.max_latency) 

449 _LOGGER.debug("Commit thread is waking up") 

450 

451 with self._batch_lock: 

452 if self._is_stopped: 

453 return 

454 self._commit_sequencers() 

455 self._commit_thread = None 

456 

457 def _commit_sequencers(self) -> None: 

458 """Clean up finished sequencers and commit the rest.""" 

459 finished_sequencer_keys = [ 

460 key 

461 for key, sequencer in self._sequencers.items() 

462 if sequencer.is_finished() 

463 ] 

464 for sequencer_key in finished_sequencer_keys: 

465 del self._sequencers[sequencer_key] 

466 

467 for sequencer in self._sequencers.values(): 

468 sequencer.commit() 

469 

470 def stop(self) -> None: 

471 """Immediately publish all outstanding messages. 

472 

473 Asynchronously sends all outstanding messages and 

474 prevents future calls to `publish()`. Method should 

475 be invoked prior to deleting this `Client()` object 

476 in order to ensure that no pending messages are lost. 

477 

478 .. note:: 

479 

480 This method is non-blocking. Use `Future()` objects 

481 returned by `publish()` to make sure all publish 

482 requests completed, either in success or error. 

483 

484 Raises: 

485 RuntimeError: 

486 If called after publisher has been stopped by a `stop()` method 

487 call. 

488 """ 

489 with self._batch_lock: 

490 if self._is_stopped: 

491 raise RuntimeError("Cannot stop a publisher already stopped.") 

492 

493 self._is_stopped = True 

494 

495 for sequencer in self._sequencers.values(): 

496 sequencer.stop() 

497 

498 # Used only for testing. 

499 def _set_batch( 

500 self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = "" 

501 ) -> None: 

502 sequencer = self._get_or_create_sequencer(topic, ordering_key) 

503 sequencer._set_batch(batch) 

504 

505 # Used only for testing. 

506 def _set_batch_class(self, batch_class: Type) -> None: 

507 self._batch_class = batch_class 

508 

509 # Used only for testing. 

510 def _set_sequencer( 

511 self, topic: str, sequencer: SequencerType, ordering_key: str = "" 

512 ) -> None: 

513 sequencer_key = (topic, ordering_key) 

514 self._sequencers[sequencer_key] = sequencer