Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py: 28%
129 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import logging
18import threading
19import time
20import typing
21from typing import Any, Callable, List, Optional, Sequence
23import google.api_core.exceptions
24from google.api_core import gapic_v1
25from google.cloud.pubsub_v1.publisher import exceptions
26from google.cloud.pubsub_v1.publisher import futures
27from google.cloud.pubsub_v1.publisher._batch import base
28from google.pubsub_v1 import types as gapic_types
30if typing.TYPE_CHECKING: # pragma: NO COVER
31 from google.cloud import pubsub_v1
32 from google.cloud.pubsub_v1 import types
33 from google.cloud.pubsub_v1.publisher import Client as PublisherClient
34 from google.pubsub_v1.services.publisher.client import OptionalRetry
36_LOGGER = logging.getLogger(__name__)
37_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
38_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest
40_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
43class Batch(base.Batch):
44 """A batch of messages.
46 The batch is the internal group of messages which are either awaiting
47 publication or currently in progress.
49 A batch is automatically created by the PublisherClient when the first
50 message to be published is received; subsequent messages are added to
51 that batch until the process of actual publishing _starts_.
53 Once this occurs, any new messages sent to :meth:`publish` open a new
54 batch.
56 If you are using this library, you most likely do not need to instantiate
57 batch objects directly; they will be created for you. If you want to
58 change the actual batching settings, see the ``batching`` argument on
59 :class:`~.pubsub_v1.PublisherClient`.
61 Any properties or methods on this class which are not defined in
62 :class:`~.pubsub_v1.publisher.batch.BaseBatch` should be considered
63 implementation details.
65 Args:
66 client:
67 The publisher client used to create this batch.
68 topic:
69 The topic. The format for this is ``projects/{project}/topics/{topic}``.
70 settings:
71 The settings for batch publishing. These should be considered immutable
72 once the batch has been opened.
73 batch_done_callback:
74 Callback called when the response for a batch publish has been received.
75 Called with one boolean argument: successfully published or a permanent
76 error occurred. Temporary errors are not surfaced because they are retried
77 at a lower level.
78 commit_when_full:
79 Whether to commit the batch when the batch is full.
80 commit_retry:
81 Designation of what errors, if any, should be retried when commiting
82 the batch. If not provided, a default retry is used.
83 commit_timeout:
84 The timeout to apply when commiting the batch. If not provided, a default
85 timeout is used.
86 """
88 def __init__(
89 self,
90 client: "PublisherClient",
91 topic: str,
92 settings: "types.BatchSettings",
93 batch_done_callback: Callable[[bool], Any] = None,
94 commit_when_full: bool = True,
95 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
96 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
97 ):
98 self._client = client
99 self._topic = topic
100 self._settings = settings
101 self._batch_done_callback = batch_done_callback
102 self._commit_when_full = commit_when_full
104 self._state_lock = threading.Lock()
105 # These members are all communicated between threads; ensure that
106 # any writes to them use the "state lock" to remain atomic.
107 # _futures list should remain unchanged after batch
108 # status changed from ACCEPTING_MESSAGES to any other
109 # in order to avoid race conditions
110 self._futures: List[futures.Future] = []
111 self._messages: List[gapic_types.PubsubMessage] = []
112 self._status = base.BatchStatus.ACCEPTING_MESSAGES
114 # The initial size is not zero, we need to account for the size overhead
115 # of the PublishRequest message itself.
116 self._base_request_size = gapic_types.PublishRequest(topic=topic)._pb.ByteSize()
117 self._size = self._base_request_size
119 self._commit_retry = commit_retry
120 self._commit_timeout = commit_timeout
122 @staticmethod
123 def make_lock() -> threading.Lock:
124 """Return a threading lock.
126 Returns:
127 A newly created lock.
128 """
129 return threading.Lock()
131 @property
132 def client(self) -> "PublisherClient":
133 """A publisher client."""
134 return self._client
136 @property
137 def messages(self) -> Sequence[gapic_types.PubsubMessage]:
138 """The messages currently in the batch."""
139 return self._messages
141 @property
142 def settings(self) -> "types.BatchSettings":
143 """Return the batch settings.
145 Returns:
146 The batch settings. These are considered immutable once the batch has
147 been opened.
148 """
149 return self._settings
151 @property
152 def size(self) -> int:
153 """Return the total size of all of the messages currently in the batch.
155 The size includes any overhead of the actual ``PublishRequest`` that is
156 sent to the backend.
158 Returns:
159 The total size of all of the messages currently in the batch (including
160 the request overhead), in bytes.
161 """
162 return self._size
164 @property
165 def status(self) -> base.BatchStatus:
166 """Return the status of this batch.
168 Returns:
169 The status of this batch. All statuses are human-readable, all-lowercase
170 strings.
171 """
172 return self._status
174 def cancel(self, cancellation_reason: base.BatchCancellationReason) -> None:
175 """Complete pending futures with an exception.
177 This method must be called before publishing starts (ie: while the
178 batch is still accepting messages.)
180 Args:
181 The reason why this batch has been cancelled.
182 """
184 with self._state_lock:
185 assert (
186 self._status == base.BatchStatus.ACCEPTING_MESSAGES
187 ), "Cancel should not be called after sending has started."
189 exc = RuntimeError(cancellation_reason.value)
190 for future in self._futures:
191 future.set_exception(exc)
192 self._status = base.BatchStatus.ERROR
194 def commit(self) -> None:
195 """Actually publish all of the messages on the active batch.
197 .. note::
199 This method is non-blocking. It opens a new thread, which calls
200 :meth:`_commit`, which does block.
202 This synchronously sets the batch status to "starting", and then opens
203 a new thread, which handles actually sending the messages to Pub/Sub.
205 If the current batch is **not** accepting messages, this method
206 does nothing.
207 """
209 # Set the status to "starting" synchronously, to ensure that
210 # this batch will necessarily not accept new messages.
211 with self._state_lock:
212 if self._status == base.BatchStatus.ACCEPTING_MESSAGES:
213 self._status = base.BatchStatus.STARTING
214 else:
215 return
217 self._start_commit_thread()
219 def _start_commit_thread(self) -> None:
220 """Start a new thread to actually handle the commit."""
221 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue.
222 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
223 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
224 commit_thread = threading.Thread(
225 name="Thread-CommitBatchPublisher", target=self._commit, daemon=True
226 )
227 commit_thread.start()
229 def _commit(self) -> None:
230 """Actually publish all of the messages on the active batch.
232 This moves the batch out from being the active batch to an in progress
233 batch on the publisher, and then the batch is discarded upon
234 completion.
236 .. note::
238 This method blocks. The :meth:`commit` method is the non-blocking
239 version, which calls this one.
240 """
241 with self._state_lock:
242 if self._status in _CAN_COMMIT:
243 self._status = base.BatchStatus.IN_PROGRESS
244 else:
245 # If, in the intervening period between when this method was
246 # called and now, the batch started to be committed, or
247 # completed a commit, then no-op at this point.
248 _LOGGER.debug(
249 "Batch is already in progress or has been cancelled, "
250 "exiting commit"
251 )
252 return
254 # Once in the IN_PROGRESS state, no other thread can publish additional
255 # messages or initiate a commit (those operations become a no-op), thus
256 # it is safe to release the state lock here. Releasing the lock avoids
257 # blocking other threads in case api.publish() below takes a long time
258 # to complete.
259 # https://github.com/googleapis/google-cloud-python/issues/8036
261 # Sanity check: If there are no messages, no-op.
262 if not self._messages:
263 _LOGGER.debug("No messages to publish, exiting commit")
264 self._status = base.BatchStatus.SUCCESS
265 return
267 # Begin the request to publish these messages.
268 # Log how long the underlying request takes.
269 start = time.time()
271 batch_transport_succeeded = True
272 try:
273 # Performs retries for errors defined by the retry configuration.
274 response = self._client._gapic_publish(
275 topic=self._topic,
276 messages=self._messages,
277 retry=self._commit_retry,
278 timeout=self._commit_timeout,
279 )
280 except google.api_core.exceptions.GoogleAPIError as exc:
281 # We failed to publish, even after retries, so set the exception on
282 # all futures and exit.
283 self._status = base.BatchStatus.ERROR
285 for future in self._futures:
286 future.set_exception(exc)
288 batch_transport_succeeded = False
289 if self._batch_done_callback is not None:
290 # Failed to publish batch.
291 self._batch_done_callback(batch_transport_succeeded)
293 _LOGGER.exception("Failed to publish %s messages.", len(self._futures))
294 return
296 end = time.time()
297 _LOGGER.debug("gRPC Publish took %s seconds.", end - start)
299 if len(response.message_ids) == len(self._futures):
300 # Iterate over the futures on the queue and return the response
301 # IDs. We are trusting that there is a 1:1 mapping, and raise
302 # an exception if not.
303 self._status = base.BatchStatus.SUCCESS
304 for message_id, future in zip(response.message_ids, self._futures):
305 future.set_result(message_id)
306 else:
307 # Sanity check: If the number of message IDs is not equal to
308 # the number of futures I have, then something went wrong.
309 self._status = base.BatchStatus.ERROR
310 exception = exceptions.PublishError(
311 "Some messages were not successfully published."
312 )
314 for future in self._futures:
315 future.set_exception(exception)
317 # Unknown error -> batch failed to be correctly transported/
318 batch_transport_succeeded = False
320 _LOGGER.error(
321 "Only %s of %s messages were published.",
322 len(response.message_ids),
323 len(self._futures),
324 )
326 if self._batch_done_callback is not None:
327 self._batch_done_callback(batch_transport_succeeded)
329 def publish(
330 self, message: gapic_types.PubsubMessage
331 ) -> Optional["pubsub_v1.publisher.futures.Future"]:
332 """Publish a single message.
334 Add the given message to this object; this will cause it to be
335 published once the batch either has enough messages or a sufficient
336 period of time has elapsed. If the batch is full or the commit is
337 already in progress, the method does not do anything.
339 This method is called by :meth:`~.PublisherClient.publish`.
341 Args:
342 message: The Pub/Sub message.
344 Returns:
345 An object conforming to the :class:`~concurrent.futures.Future` interface
346 or :data:`None`. If :data:`None` is returned, that signals that the batch
347 cannot accept a message.
349 Raises:
350 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
351 the ``message`` would exceed the max size limit on the backend.
352 """
354 # Coerce the type, just in case.
355 if not isinstance(message, gapic_types.PubsubMessage):
356 # For performance reasons, the message should be constructed by directly
357 # using the raw protobuf class, and only then wrapping it into the
358 # higher-level PubsubMessage class.
359 vanilla_pb = _raw_proto_pubbsub_message(**message)
360 message = gapic_types.PubsubMessage.wrap(vanilla_pb)
362 future = None
364 with self._state_lock:
365 assert (
366 self._status != base.BatchStatus.ERROR
367 ), "Publish after stop() or publish error."
369 if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
370 return None
372 size_increase = gapic_types.PublishRequest(
373 messages=[message]
374 )._pb.ByteSize()
376 if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
377 err_msg = (
378 "The message being published would produce too large a publish "
379 "request that would exceed the maximum allowed size on the "
380 "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
381 )
382 raise exceptions.MessageTooLargeError(err_msg)
384 new_size = self._size + size_increase
385 new_count = len(self._messages) + 1
387 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
388 overflow = new_size > size_limit or new_count >= self.settings.max_messages
390 if not self._messages or not overflow:
392 # Store the actual message in the batch's message queue.
393 self._messages.append(message)
394 self._size = new_size
396 # Track the future on this batch (so that the result of the
397 # future can be set).
398 future = futures.Future()
399 self._futures.append(future)
401 # Try to commit, but it must be **without** the lock held, since
402 # ``commit()`` will try to obtain the lock.
403 if self._commit_when_full and overflow:
404 self.commit()
406 return future
408 def _set_status(self, status: base.BatchStatus):
409 self._status = status