Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/client.py: 30%
158 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import copy
18import logging
19import os
20import threading
21import time
22import typing
23from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
24import warnings
26from google.api_core import gapic_v1
27from google.auth.credentials import AnonymousCredentials # type: ignore
28from google.oauth2 import service_account # type: ignore
30from google.cloud.pubsub_v1 import types
31from google.cloud.pubsub_v1.publisher import exceptions
32from google.cloud.pubsub_v1.publisher import futures
33from google.cloud.pubsub_v1.publisher._batch import thread
34from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
35from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
36from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
37from google.pubsub_v1 import gapic_version as package_version
38from google.pubsub_v1 import types as gapic_types
39from google.pubsub_v1.services.publisher import client as publisher_client
41__version__ = package_version.__version__
43if typing.TYPE_CHECKING: # pragma: NO COVER
44 from google.cloud import pubsub_v1
45 from google.cloud.pubsub_v1.publisher import _batch
46 from google.pubsub_v1.services.publisher.client import OptionalRetry
47 from google.pubsub_v1.types import pubsub as pubsub_types
50_LOGGER = logging.getLogger(__name__)
53_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
55SequencerType = Union[
56 ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer
57]
60class Client(publisher_client.PublisherClient):
61 """A publisher client for Google Cloud Pub/Sub.
63 This creates an object that is capable of publishing messages.
64 Generally, you can instantiate this client with no arguments, and you
65 get sensible defaults.
67 Args:
68 batch_settings:
69 The settings for batch publishing.
70 publisher_options:
71 The options for the publisher client. Note that enabling message ordering
72 will override the publish retry timeout to be infinite.
73 kwargs:
74 Any additional arguments provided are sent as keyword arguments to the
75 underlying
76 :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`.
77 Generally you should not need to set additional keyword
78 arguments. Regional endpoints can be set via ``client_options`` that
79 takes a single key-value pair that defines the endpoint.
81 Example:
83 .. code-block:: python
85 from google.cloud import pubsub_v1
87 publisher_client = pubsub_v1.PublisherClient(
88 # Optional
89 batch_settings = pubsub_v1.types.BatchSettings(
90 max_bytes=1024, # One kilobyte
91 max_latency=1, # One second
92 ),
94 # Optional
95 publisher_options = pubsub_v1.types.PublisherOptions(
96 enable_message_ordering=False,
97 flow_control=pubsub_v1.types.PublishFlowControl(
98 message_limit=2000,
99 limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
100 ),
101 ),
103 # Optional
104 client_options = {
105 "api_endpoint": REGIONAL_ENDPOINT
106 }
107 )
108 """
110 def __init__(
111 self,
112 batch_settings: Union[types.BatchSettings, Sequence] = (),
113 publisher_options: Union[types.PublisherOptions, Sequence] = (),
114 **kwargs: Any,
115 ):
116 assert (
117 type(batch_settings) is types.BatchSettings or len(batch_settings) == 0
118 ), "batch_settings must be of type BatchSettings or an empty sequence."
119 assert (
120 type(publisher_options) is types.PublisherOptions
121 or len(publisher_options) == 0
122 ), "publisher_options must be of type PublisherOptions or an empty sequence."
124 # Sanity check: Is our goal to use the emulator?
125 # If so, create a grpc insecure channel with the emulator host
126 # as the target.
127 if os.environ.get("PUBSUB_EMULATOR_HOST"):
128 kwargs["client_options"] = {
129 "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST")
130 }
131 kwargs["credentials"] = AnonymousCredentials()
133 # For a transient failure, retry publishing the message infinitely.
134 self.publisher_options = types.PublisherOptions(*publisher_options)
135 self._enable_message_ordering = self.publisher_options[0]
137 # Add the metrics headers, and instantiate the underlying GAPIC
138 # client.
139 super().__init__(**kwargs)
140 self._target = self._transport._host
141 self._batch_class = thread.Batch
142 self.batch_settings = types.BatchSettings(*batch_settings)
144 # The batches on the publisher client are responsible for holding
145 # messages. One batch exists for each topic.
146 self._batch_lock = self._batch_class.make_lock()
147 # (topic, ordering_key) => sequencers object
148 self._sequencers: Dict[Tuple[str, str], SequencerType] = {}
149 self._is_stopped = False
150 # Thread created to commit all sequencers after a timeout.
151 self._commit_thread: Optional[threading.Thread] = None
153 # The object controlling the message publishing flow
154 self._flow_controller = FlowController(self.publisher_options.flow_control)
156 @classmethod
157 def from_service_account_file( # type: ignore[override]
158 cls,
159 filename: str,
160 batch_settings: Union[types.BatchSettings, Sequence] = (),
161 **kwargs: Any,
162 ) -> "Client":
163 """Creates an instance of this client using the provided credentials
164 file.
166 Args:
167 filename:
168 The path to the service account private key JSON file.
169 batch_settings:
170 The settings for batch publishing.
171 kwargs:
172 Additional arguments to pass to the constructor.
174 Returns:
175 A Publisher instance that is the constructed client.
176 """
177 credentials = service_account.Credentials.from_service_account_file(filename)
178 kwargs["credentials"] = credentials
179 return cls(batch_settings, **kwargs)
181 from_service_account_json = from_service_account_file # type: ignore[assignment]
183 @property
184 def target(self) -> str:
185 """Return the target (where the API is).
187 Returns:
188 The location of the API.
189 """
190 return self._target
192 @property
193 def api(self):
194 """The underlying gapic API client.
196 .. versionchanged:: 2.10.0
197 Instead of a GAPIC ``PublisherClient`` client instance, this property is a
198 proxy object to it with the same interface.
200 .. deprecated:: 2.10.0
201 Use the GAPIC methods and properties on the client instance directly
202 instead of through the :attr:`api` attribute.
203 """
204 msg = (
205 'The "api" property only exists for backward compatibility, access its '
206 'attributes directly thorugh the client instance (e.g. "client.foo" '
207 'instead of "client.api.foo").'
208 )
209 warnings.warn(msg, category=DeprecationWarning)
210 return super()
212 def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
213 """Get an existing sequencer or create a new one given the (topic,
214 ordering_key) pair.
215 """
216 sequencer_key = (topic, ordering_key)
217 sequencer = self._sequencers.get(sequencer_key)
218 if sequencer is None:
219 if ordering_key == "":
220 sequencer = unordered_sequencer.UnorderedSequencer(self, topic)
221 else:
222 sequencer = ordered_sequencer.OrderedSequencer(
223 self, topic, ordering_key
224 )
225 self._sequencers[sequencer_key] = sequencer
227 return sequencer
229 def resume_publish(self, topic: str, ordering_key: str) -> None:
230 """Resume publish on an ordering key that has had unrecoverable errors.
232 Args:
233 topic: The topic to publish messages to.
234 ordering_key: A string that identifies related messages for which
235 publish order should be respected.
237 Raises:
238 RuntimeError:
239 If called after publisher has been stopped by a `stop()` method
240 call.
241 ValueError:
242 If the topic/ordering key combination has not been seen before
243 by this client.
244 """
245 with self._batch_lock:
246 if self._is_stopped:
247 raise RuntimeError("Cannot resume publish on a stopped publisher.")
249 if not self._enable_message_ordering:
250 raise ValueError(
251 "Cannot resume publish on a topic/ordering key if ordering "
252 "is not enabled."
253 )
255 sequencer_key = (topic, ordering_key)
256 sequencer = self._sequencers.get(sequencer_key)
257 if sequencer is None:
258 _LOGGER.debug(
259 "Error: The topic/ordering key combination has not "
260 "been seen before."
261 )
262 else:
263 sequencer.unpause()
265 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
266 """Call the GAPIC public API directly."""
267 return super().publish(*args, **kwargs)
269 def publish( # type: ignore[override]
270 self,
271 topic: str,
272 data: bytes,
273 ordering_key: str = "",
274 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
275 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
276 **attrs: Union[bytes, str],
277 ) -> "pubsub_v1.publisher.futures.Future":
278 """Publish a single message.
280 .. note::
281 Messages in Pub/Sub are blobs of bytes. They are *binary* data,
282 not text. You must send data as a bytestring
283 (``bytes`` in Python 3; ``str`` in Python 2), and this library
284 will raise an exception if you send a text string.
286 The reason that this is so important (and why we do not try to
287 coerce for you) is because Pub/Sub is also platform independent
288 and there is no way to know how to decode messages properly on
289 the other side; therefore, encoding and decoding is a required
290 exercise for the developer.
292 Add the given message to this object; this will cause it to be
293 published once the batch either has enough messages or a sufficient
294 period of time has elapsed.
295 This method may block if LimitExceededBehavior.BLOCK is used in the
296 flow control settings.
298 Example:
299 >>> from google.cloud import pubsub_v1
300 >>> client = pubsub_v1.PublisherClient()
301 >>> topic = client.topic_path('[PROJECT]', '[TOPIC]')
302 >>> data = b'The rain in Wales falls mainly on the snails.'
303 >>> response = client.publish(topic, data, username='guido')
305 Args:
306 topic: The topic to publish messages to.
307 data: A bytestring representing the message body. This
308 must be a bytestring.
309 ordering_key: A string that identifies related messages for which
310 publish order should be respected. Message ordering must be
311 enabled for this client to use this feature.
312 retry:
313 Designation of what errors, if any, should be retried. If `ordering_key`
314 is specified, the total retry deadline will be changed to "infinity".
315 If given, it overides any retry passed into the client through
316 the ``publisher_options`` argument.
317 timeout:
318 The timeout for the RPC request. Can be used to override any timeout
319 passed in through ``publisher_options`` when instantiating the client.
321 attrs: A dictionary of attributes to be
322 sent as metadata. (These may be text strings or byte strings.)
324 Returns:
325 A :class:`~google.cloud.pubsub_v1.publisher.futures.Future`
326 instance that conforms to Python Standard library's
327 :class:`~concurrent.futures.Future` interface (but not an
328 instance of that class).
330 Raises:
331 RuntimeError:
332 If called after publisher has been stopped by a `stop()` method
333 call.
335 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
336 the ``message`` would exceed the max size limit on the backend.
337 """
338 # Sanity check: Is the data being sent as a bytestring?
339 # If it is literally anything else, complain loudly about it.
340 if not isinstance(data, bytes):
341 raise TypeError(
342 "Data being published to Pub/Sub must be sent as a bytestring."
343 )
345 if not self._enable_message_ordering and ordering_key != "":
346 raise ValueError(
347 "Cannot publish a message with an ordering key when message "
348 "ordering is not enabled."
349 )
351 # Coerce all attributes to text strings.
352 for k, v in copy.copy(attrs).items():
353 if isinstance(v, str):
354 continue
355 if isinstance(v, bytes):
356 attrs[k] = v.decode("utf-8")
357 continue
358 raise TypeError(
359 "All attributes being published to Pub/Sub must "
360 "be sent as text strings."
361 )
363 # Create the Pub/Sub message object. For performance reasons, the message
364 # should be constructed by directly using the raw protobuf class, and only
365 # then wrapping it into the higher-level PubsubMessage class.
366 vanilla_pb = _raw_proto_pubbsub_message(
367 data=data, ordering_key=ordering_key, attributes=attrs
368 )
369 message = gapic_types.PubsubMessage.wrap(vanilla_pb)
371 # Messages should go through flow control to prevent excessive
372 # queuing on the client side (depending on the settings).
373 try:
374 self._flow_controller.add(message)
375 except exceptions.FlowControlLimitError as exc:
376 future = futures.Future()
377 future.set_exception(exc)
378 return future
380 def on_publish_done(future):
381 self._flow_controller.release(message)
383 if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in
384 retry = self.publisher_options.retry
386 if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
387 timeout = self.publisher_options.timeout
389 with self._batch_lock:
390 if self._is_stopped:
391 raise RuntimeError("Cannot publish on a stopped publisher.")
393 # Set retry timeout to "infinite" when message ordering is enabled.
394 # Note that this then also impacts messages added with an empty
395 # ordering key.
396 if self._enable_message_ordering:
397 if retry is gapic_v1.method.DEFAULT:
398 # use the default retry for the publish GRPC method as a base
399 transport = self._transport
400 base_retry = transport._wrapped_methods[transport.publish]._retry
401 retry = base_retry.with_deadline(2.0**32)
402 else:
403 retry = retry.with_deadline(2.0**32)
405 # Delegate the publishing to the sequencer.
406 sequencer = self._get_or_create_sequencer(topic, ordering_key)
407 future = sequencer.publish(message, retry=retry, timeout=timeout)
408 future.add_done_callback(on_publish_done)
410 # Create a timer thread if necessary to enforce the batching
411 # timeout.
412 self._ensure_commit_timer_runs_no_lock()
414 return future
416 def ensure_cleanup_and_commit_timer_runs(self) -> None:
417 """Ensure a cleanup/commit timer thread is running.
419 If a cleanup/commit timer thread is already running, this does nothing.
420 """
421 with self._batch_lock:
422 self._ensure_commit_timer_runs_no_lock()
424 def _ensure_commit_timer_runs_no_lock(self) -> None:
425 """Ensure a commit timer thread is running, without taking
426 _batch_lock.
428 _batch_lock must be held before calling this method.
429 """
430 if not self._commit_thread and self.batch_settings.max_latency < float("inf"):
431 self._start_commit_thread()
433 def _start_commit_thread(self) -> None:
434 """Start a new thread to actually wait and commit the sequencers."""
435 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue.
436 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
437 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
438 self._commit_thread = threading.Thread(
439 name="Thread-PubSubBatchCommitter",
440 target=self._wait_and_commit_sequencers,
441 daemon=True,
442 )
443 self._commit_thread.start()
445 def _wait_and_commit_sequencers(self) -> None:
446 """Wait up to the batching timeout, and commit all sequencers."""
447 # Sleep for however long we should be waiting.
448 time.sleep(self.batch_settings.max_latency)
449 _LOGGER.debug("Commit thread is waking up")
451 with self._batch_lock:
452 if self._is_stopped:
453 return
454 self._commit_sequencers()
455 self._commit_thread = None
457 def _commit_sequencers(self) -> None:
458 """Clean up finished sequencers and commit the rest."""
459 finished_sequencer_keys = [
460 key
461 for key, sequencer in self._sequencers.items()
462 if sequencer.is_finished()
463 ]
464 for sequencer_key in finished_sequencer_keys:
465 del self._sequencers[sequencer_key]
467 for sequencer in self._sequencers.values():
468 sequencer.commit()
470 def stop(self) -> None:
471 """Immediately publish all outstanding messages.
473 Asynchronously sends all outstanding messages and
474 prevents future calls to `publish()`. Method should
475 be invoked prior to deleting this `Client()` object
476 in order to ensure that no pending messages are lost.
478 .. note::
480 This method is non-blocking. Use `Future()` objects
481 returned by `publish()` to make sure all publish
482 requests completed, either in success or error.
484 Raises:
485 RuntimeError:
486 If called after publisher has been stopped by a `stop()` method
487 call.
488 """
489 with self._batch_lock:
490 if self._is_stopped:
491 raise RuntimeError("Cannot stop a publisher already stopped.")
493 self._is_stopped = True
495 for sequencer in self._sequencers.values():
496 sequencer.stop()
498 # Used only for testing.
499 def _set_batch(
500 self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = ""
501 ) -> None:
502 sequencer = self._get_or_create_sequencer(topic, ordering_key)
503 sequencer._set_batch(batch)
505 # Used only for testing.
506 def _set_batch_class(self, batch_class: Type) -> None:
507 self._batch_class = batch_class
509 # Used only for testing.
510 def _set_sequencer(
511 self, topic: str, sequencer: SequencerType, ordering_key: str = ""
512 ) -> None:
513 sequencer_key = (topic, ordering_key)
514 self._sequencers[sequencer_key] = sequencer