1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import copy
18import logging
19import os
20import threading
21import time
22import typing
23from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
24import warnings
25import sys
26
27from google.api_core import gapic_v1
28from google.auth.credentials import AnonymousCredentials # type: ignore
29from google.oauth2 import service_account # type: ignore
30
31from google.cloud.pubsub_v1 import types
32from google.cloud.pubsub_v1.publisher import exceptions
33from google.cloud.pubsub_v1.publisher import futures
34from google.cloud.pubsub_v1.publisher._batch import thread
35from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
36from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
37from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
38from google.pubsub_v1 import gapic_version as package_version
39from google.pubsub_v1 import types as gapic_types
40from google.pubsub_v1.services.publisher import client as publisher_client
41from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
42 PublishMessageWrapper,
43)
44
45__version__ = package_version.__version__
46
47if typing.TYPE_CHECKING: # pragma: NO COVER
48 from google.cloud import pubsub_v1
49 from google.cloud.pubsub_v1.publisher import _batch
50 from google.pubsub_v1.services.publisher.client import OptionalRetry
51 from google.pubsub_v1.types import pubsub as pubsub_types
52
53
54_LOGGER = logging.getLogger(__name__)
55
56
57_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
58
59SequencerType = Union[
60 ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer
61]
62
63
64class Client(publisher_client.PublisherClient):
65 """A publisher client for Google Cloud Pub/Sub.
66
67 This creates an object that is capable of publishing messages.
68 Generally, you can instantiate this client with no arguments, and you
69 get sensible defaults.
70
71 Args:
72 batch_settings:
73 The settings for batch publishing.
74 publisher_options:
75 The options for the publisher client. Note that enabling message ordering
76 will override the publish retry timeout to be infinite.
77 kwargs:
78 Any additional arguments provided are sent as keyword arguments to the
79 underlying
80 :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`.
81 Generally you should not need to set additional keyword
82 arguments. Regional endpoints can be set via ``client_options`` that
83 takes a single key-value pair that defines the endpoint.
84
85 Example:
86
87 .. code-block:: python
88
89 from google.cloud import pubsub_v1
90
91 publisher_client = pubsub_v1.PublisherClient(
92 # Optional
93 batch_settings = pubsub_v1.types.BatchSettings(
94 max_bytes=1024, # One kilobyte
95 max_latency=1, # One second
96 ),
97
98 # Optional
99 publisher_options = pubsub_v1.types.PublisherOptions(
100 enable_message_ordering=False,
101 flow_control=pubsub_v1.types.PublishFlowControl(
102 message_limit=2000,
103 limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
104 ),
105 ),
106
107 # Optional
108 client_options = {
109 "api_endpoint": REGIONAL_ENDPOINT
110 }
111 )
112 """
113
114 def __init__(
115 self,
116 batch_settings: Union[types.BatchSettings, Sequence] = (),
117 publisher_options: Union[types.PublisherOptions, Sequence] = (),
118 **kwargs: Any,
119 ):
120 assert (
121 type(batch_settings) is types.BatchSettings or len(batch_settings) == 0
122 ), "batch_settings must be of type BatchSettings or an empty sequence."
123 assert (
124 type(publisher_options) is types.PublisherOptions
125 or len(publisher_options) == 0
126 ), "publisher_options must be of type PublisherOptions or an empty sequence."
127
128 # Sanity check: Is our goal to use the emulator?
129 # If so, create a grpc insecure channel with the emulator host
130 # as the target.
131 # TODO(https://github.com/googleapis/python-pubsub/issues/1349): Move the emulator
132 # code below to test files.
133 if os.environ.get("PUBSUB_EMULATOR_HOST"):
134 kwargs["client_options"] = {
135 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST")
136 }
137 # Configure credentials directly to transport, if provided.
138 if "transport" not in kwargs:
139 kwargs["credentials"] = AnonymousCredentials()
140
141 # For a transient failure, retry publishing the message infinitely.
142 self.publisher_options = types.PublisherOptions(*publisher_options)
143 self._enable_message_ordering = self.publisher_options[0]
144
145 # Add the metrics headers, and instantiate the underlying GAPIC
146 # client.
147 super().__init__(**kwargs)
148 self._target = self._transport._host
149 self._batch_class = thread.Batch
150 self.batch_settings = types.BatchSettings(*batch_settings)
151
152 # The batches on the publisher client are responsible for holding
153 # messages. One batch exists for each topic.
154 self._batch_lock = self._batch_class.make_lock()
155 # (topic, ordering_key) => sequencers object
156 self._sequencers: Dict[Tuple[str, str], SequencerType] = {}
157 self._is_stopped = False
158 # Thread created to commit all sequencers after a timeout.
159 self._commit_thread: Optional[threading.Thread] = None
160
161 # The object controlling the message publishing flow
162 self._flow_controller = FlowController(self.publisher_options.flow_control)
163
164 self._open_telemetry_enabled = (
165 self.publisher_options.enable_open_telemetry_tracing
166 )
167 # OpenTelemetry features used by the library are not supported in Python versions <= 3.7.
168 # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389
169 if (
170 self.publisher_options.enable_open_telemetry_tracing
171 and sys.version_info.major == 3
172 and sys.version_info.minor < 8
173 ):
174 warnings.warn(
175 message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.",
176 category=RuntimeWarning,
177 )
178 self._open_telemetry_enabled = False
179
180 @classmethod
181 def from_service_account_file( # type: ignore[override]
182 cls,
183 filename: str,
184 batch_settings: Union[types.BatchSettings, Sequence] = (),
185 **kwargs: Any,
186 ) -> "Client":
187 """Creates an instance of this client using the provided credentials
188 file.
189
190 Args:
191 filename:
192 The path to the service account private key JSON file.
193 batch_settings:
194 The settings for batch publishing.
195 kwargs:
196 Additional arguments to pass to the constructor.
197
198 Returns:
199 A Publisher instance that is the constructed client.
200 """
201 credentials = service_account.Credentials.from_service_account_file(filename)
202 kwargs["credentials"] = credentials
203 return cls(batch_settings, **kwargs)
204
205 from_service_account_json = from_service_account_file # type: ignore[assignment]
206
207 @property
208 def target(self) -> str:
209 """Return the target (where the API is).
210
211 Returns:
212 The location of the API.
213 """
214 return self._target
215
216 @property
217 def api(self):
218 """The underlying gapic API client.
219
220 .. versionchanged:: 2.10.0
221 Instead of a GAPIC ``PublisherClient`` client instance, this property is a
222 proxy object to it with the same interface.
223
224 .. deprecated:: 2.10.0
225 Use the GAPIC methods and properties on the client instance directly
226 instead of through the :attr:`api` attribute.
227 """
228 msg = (
229 'The "api" property only exists for backward compatibility, access its '
230 'attributes directly thorugh the client instance (e.g. "client.foo" '
231 'instead of "client.api.foo").'
232 )
233 warnings.warn(msg, category=DeprecationWarning)
234 return super()
235
236 @property
237 def open_telemetry_enabled(self) -> bool:
238 return self._open_telemetry_enabled
239
240 def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
241 """Get an existing sequencer or create a new one given the (topic,
242 ordering_key) pair.
243 """
244 sequencer_key = (topic, ordering_key)
245 sequencer = self._sequencers.get(sequencer_key)
246 if sequencer is None:
247 if ordering_key == "":
248 sequencer = unordered_sequencer.UnorderedSequencer(self, topic)
249 else:
250 sequencer = ordered_sequencer.OrderedSequencer(
251 self, topic, ordering_key
252 )
253 self._sequencers[sequencer_key] = sequencer
254
255 return sequencer
256
257 def resume_publish(self, topic: str, ordering_key: str) -> None:
258 """Resume publish on an ordering key that has had unrecoverable errors.
259
260 Args:
261 topic: The topic to publish messages to.
262 ordering_key: A string that identifies related messages for which
263 publish order should be respected.
264
265 Raises:
266 RuntimeError:
267 If called after publisher has been stopped by a `stop()` method
268 call.
269 ValueError:
270 If the topic/ordering key combination has not been seen before
271 by this client.
272 """
273 with self._batch_lock:
274 if self._is_stopped:
275 raise RuntimeError("Cannot resume publish on a stopped publisher.")
276
277 if not self._enable_message_ordering:
278 raise ValueError(
279 "Cannot resume publish on a topic/ordering key if ordering "
280 "is not enabled."
281 )
282
283 sequencer_key = (topic, ordering_key)
284 sequencer = self._sequencers.get(sequencer_key)
285 if sequencer is None:
286 _LOGGER.debug(
287 "Error: The topic/ordering key combination has not "
288 "been seen before."
289 )
290 else:
291 sequencer.unpause()
292
293 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
294 """Call the GAPIC public API directly."""
295 return super().publish(*args, **kwargs)
296
297 def publish( # type: ignore[override]
298 self,
299 topic: str,
300 data: bytes,
301 ordering_key: str = "",
302 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
303 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
304 **attrs: Union[bytes, str],
305 ) -> "pubsub_v1.publisher.futures.Future":
306 """Publish a single message.
307
308 .. note::
309 Messages in Pub/Sub are blobs of bytes. They are *binary* data,
310 not text. You must send data as a bytestring
311 (``bytes`` in Python 3; ``str`` in Python 2), and this library
312 will raise an exception if you send a text string.
313
314 The reason that this is so important (and why we do not try to
315 coerce for you) is because Pub/Sub is also platform independent
316 and there is no way to know how to decode messages properly on
317 the other side; therefore, encoding and decoding is a required
318 exercise for the developer.
319
320 Add the given message to this object; this will cause it to be
321 published once the batch either has enough messages or a sufficient
322 period of time has elapsed.
323 This method may block if LimitExceededBehavior.BLOCK is used in the
324 flow control settings.
325
326 Example:
327 >>> from google.cloud import pubsub_v1
328 >>> client = pubsub_v1.PublisherClient()
329 >>> topic = client.topic_path('[PROJECT]', '[TOPIC]')
330 >>> data = b'The rain in Wales falls mainly on the snails.'
331 >>> response = client.publish(topic, data, username='guido')
332
333 Args:
334 topic: The topic to publish messages to.
335 data: A bytestring representing the message body. This
336 must be a bytestring.
337 ordering_key: A string that identifies related messages for which
338 publish order should be respected. Message ordering must be
339 enabled for this client to use this feature.
340 retry:
341 Designation of what errors, if any, should be retried. If `ordering_key`
342 is specified, the total retry deadline will be changed to "infinity".
343 If given, it overides any retry passed into the client through
344 the ``publisher_options`` argument.
345 timeout:
346 The timeout for the RPC request. Can be used to override any timeout
347 passed in through ``publisher_options`` when instantiating the client.
348
349 attrs: A dictionary of attributes to be
350 sent as metadata. (These may be text strings or byte strings.)
351
352 Returns:
353 A :class:`~google.cloud.pubsub_v1.publisher.futures.Future`
354 instance that conforms to Python Standard library's
355 :class:`~concurrent.futures.Future` interface (but not an
356 instance of that class).
357
358 Raises:
359 RuntimeError:
360 If called after publisher has been stopped by a `stop()` method
361 call.
362
363 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
364 the ``message`` would exceed the max size limit on the backend.
365 """
366 # Sanity check: Is the data being sent as a bytestring?
367 # If it is literally anything else, complain loudly about it.
368 if not isinstance(data, bytes):
369 raise TypeError(
370 "Data being published to Pub/Sub must be sent as a bytestring."
371 )
372
373 if not self._enable_message_ordering and ordering_key != "":
374 raise ValueError(
375 "Cannot publish a message with an ordering key when message "
376 "ordering is not enabled."
377 )
378
379 # Coerce all attributes to text strings.
380 for k, v in copy.copy(attrs).items():
381 if isinstance(v, str):
382 continue
383 if isinstance(v, bytes):
384 attrs[k] = v.decode("utf-8")
385 continue
386 raise TypeError(
387 "All attributes being published to Pub/Sub must "
388 "be sent as text strings."
389 )
390
391 # Create the Pub/Sub message object. For performance reasons, the message
392 # should be constructed by directly using the raw protobuf class, and only
393 # then wrapping it into the higher-level PubsubMessage class.
394 vanilla_pb = _raw_proto_pubbsub_message(
395 data=data, ordering_key=ordering_key, attributes=attrs
396 )
397 message = gapic_types.PubsubMessage.wrap(vanilla_pb)
398
399 wrapper: PublishMessageWrapper = PublishMessageWrapper(message)
400 if self._open_telemetry_enabled:
401 wrapper.start_create_span(topic=topic, ordering_key=ordering_key)
402
403 # Messages should go through flow control to prevent excessive
404 # queuing on the client side (depending on the settings).
405 try:
406 if self._open_telemetry_enabled:
407 if wrapper:
408 wrapper.start_publisher_flow_control_span()
409 else: # pragma: NO COVER
410 warnings.warn(
411 message="PubSubMessageWrapper is None. Not starting publisher flow control span.",
412 category=RuntimeWarning,
413 )
414 self._flow_controller.add(message)
415 if self._open_telemetry_enabled:
416 if wrapper:
417 wrapper.end_publisher_flow_control_span()
418 else: # pragma: NO COVER
419 warnings.warn(
420 message="PubSubMessageWrapper is None. Not ending publisher flow control span.",
421 category=RuntimeWarning,
422 )
423 except exceptions.FlowControlLimitError as exc:
424 if self._open_telemetry_enabled:
425 if wrapper:
426 wrapper.end_publisher_flow_control_span(exc)
427 wrapper.end_create_span(exc)
428 else: # pragma: NO COVER
429 warnings.warn(
430 message="PubSubMessageWrapper is None. Not ending publisher create and flow control spans on FlowControlLimitError.",
431 category=RuntimeWarning,
432 )
433
434 future = futures.Future()
435 future.set_exception(exc)
436 return future
437
438 def on_publish_done(future):
439 self._flow_controller.release(message)
440
441 if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in
442 retry = self.publisher_options.retry
443
444 if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
445 timeout = self.publisher_options.timeout
446
447 if self._open_telemetry_enabled:
448 if wrapper:
449 wrapper.start_publisher_batching_span()
450 else: # pragma: NO COVER
451 warnings.warn(
452 message="PublishMessageWrapper is None. Hence, not starting publisher batching span",
453 category=RuntimeWarning,
454 )
455 with self._batch_lock:
456 try:
457 if self._is_stopped:
458 raise RuntimeError("Cannot publish on a stopped publisher.")
459
460 # Set retry timeout to "infinite" when message ordering is enabled.
461 # Note that this then also impacts messages added with an empty
462 # ordering key.
463 if self._enable_message_ordering:
464 if retry is gapic_v1.method.DEFAULT:
465 # use the default retry for the publish GRPC method as a base
466 transport = self._transport
467 base_retry = transport._wrapped_methods[
468 transport.publish
469 ]._retry
470 retry = base_retry.with_deadline(2.0**32)
471 # timeout needs to be overridden and set to infinite in
472 # addition to the retry deadline since both determine
473 # the duration for which retries are attempted.
474 timeout = 2.0**32
475 elif retry is not None:
476 retry = retry.with_deadline(2.0**32)
477 timeout = 2.0**32
478
479 # Delegate the publishing to the sequencer.
480 sequencer = self._get_or_create_sequencer(topic, ordering_key)
481 future = sequencer.publish(
482 wrapper=wrapper, retry=retry, timeout=timeout
483 )
484 future.add_done_callback(on_publish_done)
485 except BaseException as be:
486 # Exceptions can be thrown when attempting to add messages to
487 # the batch. If they're thrown, record them in publisher
488 # batching and create span, end the spans and bubble the
489 # exception up.
490 if self._open_telemetry_enabled:
491 if wrapper:
492 wrapper.end_publisher_batching_span(be)
493 wrapper.end_create_span(be)
494 else: # pragma: NO COVER
495 warnings.warn(
496 message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span",
497 category=RuntimeWarning,
498 )
499 raise be
500
501 if self._open_telemetry_enabled:
502 if wrapper:
503 wrapper.end_publisher_batching_span()
504 else: # pragma: NO COVER
505 warnings.warn(
506 message="PublishMessageWrapper is None. Hence, not ending publisher batching span",
507 category=RuntimeWarning,
508 )
509
510 # Create a timer thread if necessary to enforce the batching
511 # timeout.
512 self._ensure_commit_timer_runs_no_lock()
513
514 return future
515
516 def ensure_cleanup_and_commit_timer_runs(self) -> None:
517 """Ensure a cleanup/commit timer thread is running.
518
519 If a cleanup/commit timer thread is already running, this does nothing.
520 """
521 with self._batch_lock:
522 self._ensure_commit_timer_runs_no_lock()
523
524 def _ensure_commit_timer_runs_no_lock(self) -> None:
525 """Ensure a commit timer thread is running, without taking
526 _batch_lock.
527
528 _batch_lock must be held before calling this method.
529 """
530 if not self._commit_thread and self.batch_settings.max_latency < float("inf"):
531 self._start_commit_thread()
532
533 def _start_commit_thread(self) -> None:
534 """Start a new thread to actually wait and commit the sequencers."""
535 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue.
536 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
537 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
538 self._commit_thread = threading.Thread(
539 name="Thread-PubSubBatchCommitter",
540 target=self._wait_and_commit_sequencers,
541 daemon=True,
542 )
543 self._commit_thread.start()
544
545 def _wait_and_commit_sequencers(self) -> None:
546 """Wait up to the batching timeout, and commit all sequencers."""
547 # Sleep for however long we should be waiting.
548 time.sleep(self.batch_settings.max_latency)
549 _LOGGER.debug("Commit thread is waking up")
550
551 with self._batch_lock:
552 if self._is_stopped:
553 return
554 self._commit_sequencers()
555 self._commit_thread = None
556
557 def _commit_sequencers(self) -> None:
558 """Clean up finished sequencers and commit the rest."""
559 finished_sequencer_keys = [
560 key
561 for key, sequencer in self._sequencers.items()
562 if sequencer.is_finished()
563 ]
564 for sequencer_key in finished_sequencer_keys:
565 del self._sequencers[sequencer_key]
566
567 for sequencer in self._sequencers.values():
568 sequencer.commit()
569
570 def stop(self) -> None:
571 """Immediately publish all outstanding messages.
572
573 Asynchronously sends all outstanding messages and
574 prevents future calls to `publish()`. Method should
575 be invoked prior to deleting this `Client()` object
576 in order to ensure that no pending messages are lost.
577
578 .. note::
579
580 This method is non-blocking. Use `Future()` objects
581 returned by `publish()` to make sure all publish
582 requests completed, either in success or error.
583
584 Raises:
585 RuntimeError:
586 If called after publisher has been stopped by a `stop()` method
587 call.
588 """
589 with self._batch_lock:
590 if self._is_stopped:
591 raise RuntimeError("Cannot stop a publisher already stopped.")
592
593 self._is_stopped = True
594
595 for sequencer in self._sequencers.values():
596 sequencer.stop()
597
598 # Used only for testing.
599 def _set_batch(
600 self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = ""
601 ) -> None:
602 sequencer = self._get_or_create_sequencer(topic, ordering_key)
603 sequencer._set_batch(batch)
604
605 # Used only for testing.
606 def _set_batch_class(self, batch_class: Type) -> None:
607 self._batch_class = batch_class
608
609 # Used only for testing.
610 def _set_sequencer(
611 self, topic: str, sequencer: SequencerType, ordering_key: str = ""
612 ) -> None:
613 sequencer_key = (topic, ordering_key)
614 self._sequencers[sequencer_key] = sequencer