1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import logging
18import threading
19import time
20import typing
21from typing import Any, Callable, List, Optional, Sequence
22from datetime import datetime
23
24from opentelemetry import trace
25import google.api_core.exceptions
26from google.api_core import gapic_v1
27from google.auth import exceptions as auth_exceptions
28
29from google.cloud.pubsub_v1.publisher import exceptions
30from google.cloud.pubsub_v1.publisher import futures
31from google.cloud.pubsub_v1.publisher._batch import base
32from google.pubsub_v1 import types as gapic_types
33from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
34 PublishMessageWrapper,
35)
36
37if typing.TYPE_CHECKING: # pragma: NO COVER
38 from google.cloud import pubsub_v1
39 from google.cloud.pubsub_v1 import types
40 from google.cloud.pubsub_v1.publisher import Client as PublisherClient
41 from google.pubsub_v1.services.publisher.client import OptionalRetry
42
43_LOGGER = logging.getLogger(__name__)
44_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
45_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest
46
47_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()
48
49
50class Batch(base.Batch):
51 """A batch of messages.
52
53 The batch is the internal group of messages which are either awaiting
54 publication or currently in progress.
55
56 A batch is automatically created by the PublisherClient when the first
57 message to be published is received; subsequent messages are added to
58 that batch until the process of actual publishing _starts_.
59
60 Once this occurs, any new messages sent to :meth:`publish` open a new
61 batch.
62
63 If you are using this library, you most likely do not need to instantiate
64 batch objects directly; they will be created for you. If you want to
65 change the actual batching settings, see the ``batching`` argument on
66 :class:`~.pubsub_v1.PublisherClient`.
67
68 Any properties or methods on this class which are not defined in
69 :class:`~.pubsub_v1.publisher.batch.BaseBatch` should be considered
70 implementation details.
71
72 Args:
73 client:
74 The publisher client used to create this batch.
75 topic:
76 The topic. The format for this is ``projects/{project}/topics/{topic}``.
77 settings:
78 The settings for batch publishing. These should be considered immutable
79 once the batch has been opened.
80 batch_done_callback:
81 Callback called when the response for a batch publish has been received.
82 Called with one boolean argument: successfully published or a permanent
83 error occurred. Temporary errors are not surfaced because they are retried
84 at a lower level.
85 commit_when_full:
86 Whether to commit the batch when the batch is full.
87 commit_retry:
88 Designation of what errors, if any, should be retried when commiting
89 the batch. If not provided, a default retry is used.
90 commit_timeout:
91 The timeout to apply when commiting the batch. If not provided, a default
92 timeout is used.
93 """
94
95 _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
96 _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
97
98 def __init__(
99 self,
100 client: "PublisherClient",
101 topic: str,
102 settings: "types.BatchSettings",
103 batch_done_callback: Optional[Callable[[bool], Any]] = None,
104 commit_when_full: bool = True,
105 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
106 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
107 ):
108 self._client = client
109 self._topic = topic
110 self._settings = settings
111 self._batch_done_callback = batch_done_callback
112 self._commit_when_full = commit_when_full
113
114 self._state_lock = threading.Lock()
115 # These members are all communicated between threads; ensure that
116 # any writes to them use the "state lock" to remain atomic.
117 # _futures list should remain unchanged after batch
118 # status changed from ACCEPTING_MESSAGES to any other
119 # in order to avoid race conditions
120 self._futures: List[futures.Future] = []
121 self._message_wrappers: List[PublishMessageWrapper] = []
122 self._status = base.BatchStatus.ACCEPTING_MESSAGES
123
124 # The initial size is not zero, we need to account for the size overhead
125 # of the PublishRequest message itself.
126 self._base_request_size = gapic_types.PublishRequest(topic=topic)._pb.ByteSize()
127 self._size = self._base_request_size
128
129 self._commit_retry = commit_retry
130 self._commit_timeout = commit_timeout
131
132 # Publish RPC Span that will be set by method `_start_publish_rpc_span`
133 # if Open Telemetry is enabled.
134 self._rpc_span: Optional[trace.Span] = None
135
136 @staticmethod
137 def make_lock() -> threading.Lock:
138 """Return a threading lock.
139
140 Returns:
141 A newly created lock.
142 """
143 return threading.Lock()
144
145 @property
146 def client(self) -> "PublisherClient":
147 """A publisher client."""
148 return self._client
149
150 @property
151 def message_wrappers(self) -> Sequence[PublishMessageWrapper]:
152 """The message wrappers currently in the batch."""
153 return self._message_wrappers
154
155 @property
156 def settings(self) -> "types.BatchSettings":
157 """Return the batch settings.
158
159 Returns:
160 The batch settings. These are considered immutable once the batch has
161 been opened.
162 """
163 return self._settings
164
165 @property
166 def size(self) -> int:
167 """Return the total size of all of the messages currently in the batch.
168
169 The size includes any overhead of the actual ``PublishRequest`` that is
170 sent to the backend.
171
172 Returns:
173 The total size of all of the messages currently in the batch (including
174 the request overhead), in bytes.
175 """
176 return self._size
177
178 @property
179 def status(self) -> base.BatchStatus:
180 """Return the status of this batch.
181
182 Returns:
183 The status of this batch. All statuses are human-readable, all-lowercase
184 strings.
185 """
186 return self._status
187
188 def cancel(self, cancellation_reason: base.BatchCancellationReason) -> None:
189 """Complete pending futures with an exception.
190
191 This method must be called before publishing starts (ie: while the
192 batch is still accepting messages.)
193
194 Args:
195 The reason why this batch has been cancelled.
196 """
197
198 with self._state_lock:
199 assert (
200 self._status == base.BatchStatus.ACCEPTING_MESSAGES
201 ), "Cancel should not be called after sending has started."
202
203 exc = RuntimeError(cancellation_reason.value)
204 for future in self._futures:
205 future.set_exception(exc)
206 self._status = base.BatchStatus.ERROR
207
208 def commit(self) -> None:
209 """Actually publish all of the messages on the active batch.
210
211 .. note::
212
213 This method is non-blocking. It opens a new thread, which calls
214 :meth:`_commit`, which does block.
215
216 This synchronously sets the batch status to "starting", and then opens
217 a new thread, which handles actually sending the messages to Pub/Sub.
218
219 If the current batch is **not** accepting messages, this method
220 does nothing.
221 """
222
223 # Set the status to "starting" synchronously, to ensure that
224 # this batch will necessarily not accept new messages.
225 with self._state_lock:
226 if self._status == base.BatchStatus.ACCEPTING_MESSAGES:
227 self._status = base.BatchStatus.STARTING
228 else:
229 return
230
231 self._start_commit_thread()
232
233 def _start_commit_thread(self) -> None:
234 """Start a new thread to actually handle the commit."""
235 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue.
236 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
237 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418
238 commit_thread = threading.Thread(
239 name="Thread-CommitBatchPublisher", target=self._commit, daemon=True
240 )
241 commit_thread.start()
242
243 def _start_publish_rpc_span(self) -> None:
244 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
245 links = []
246
247 for wrapper in self._message_wrappers:
248 span = wrapper.create_span
249 # Add links only for sampled spans.
250 if span.get_span_context().trace_flags.sampled:
251 links.append(trace.Link(span.get_span_context()))
252 assert len(self._topic.split("/")) == 4
253 topic_short_name = self._topic.split("/")[3]
254 with tracer.start_as_current_span(
255 name=f"{topic_short_name} publish",
256 attributes={
257 "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
258 "messaging.destination.name": topic_short_name,
259 "gcp.project_id": self._topic.split("/")[1],
260 "messaging.batch.message_count": len(self._message_wrappers),
261 "messaging.operation": "publish",
262 "code.function": "_commit",
263 },
264 links=links,
265 kind=trace.SpanKind.CLIENT,
266 end_on_exit=False,
267 ) as rpc_span:
268 ctx = rpc_span.get_span_context()
269 for wrapper in self._message_wrappers:
270 span = wrapper.create_span
271 if span.get_span_context().trace_flags.sampled:
272 span.add_link(ctx)
273 self._rpc_span = rpc_span
274
275 def _commit(self) -> None:
276 """Actually publish all of the messages on the active batch.
277
278 This moves the batch out from being the active batch to an in progress
279 batch on the publisher, and then the batch is discarded upon
280 completion.
281
282 .. note::
283
284 This method blocks. The :meth:`commit` method is the non-blocking
285 version, which calls this one.
286 """
287 with self._state_lock:
288 if self._status in _CAN_COMMIT:
289 self._status = base.BatchStatus.IN_PROGRESS
290 else:
291 # If, in the intervening period between when this method was
292 # called and now, the batch started to be committed, or
293 # completed a commit, then no-op at this point.
294 _LOGGER.debug(
295 "Batch is already in progress or has been cancelled, "
296 "exiting commit"
297 )
298 return
299
300 # Once in the IN_PROGRESS state, no other thread can publish additional
301 # messages or initiate a commit (those operations become a no-op), thus
302 # it is safe to release the state lock here. Releasing the lock avoids
303 # blocking other threads in case api.publish() below takes a long time
304 # to complete.
305 # https://github.com/googleapis/google-cloud-python/issues/8036
306
307 # Sanity check: If there are no messages, no-op.
308 if not self._message_wrappers:
309 _LOGGER.debug("No messages to publish, exiting commit")
310 self._status = base.BatchStatus.SUCCESS
311 return
312
313 # Begin the request to publish these messages.
314 # Log how long the underlying request takes.
315 start = time.time()
316
317 batch_transport_succeeded = True
318 try:
319 if self._client.open_telemetry_enabled:
320 self._start_publish_rpc_span()
321
322 # Performs retries for errors defined by the retry configuration.
323 response = self._client._gapic_publish(
324 topic=self._topic,
325 messages=[wrapper.message for wrapper in self._message_wrappers],
326 retry=self._commit_retry,
327 timeout=self._commit_timeout,
328 )
329
330 if self._client.open_telemetry_enabled:
331 assert self._rpc_span is not None
332 self._rpc_span.end()
333 end_time = str(datetime.now())
334 for message_id, wrapper in zip(
335 response.message_ids, self._message_wrappers
336 ):
337 span = wrapper.create_span
338 span.add_event(
339 name="publish end",
340 attributes={
341 "timestamp": end_time,
342 },
343 )
344 span.set_attribute(key="messaging.message.id", value=message_id)
345 wrapper.end_create_span()
346 except (
347 google.api_core.exceptions.GoogleAPIError,
348 auth_exceptions.TransportError,
349 ) as exc:
350 # We failed to publish, even after retries, so set the exception on
351 # all futures and exit.
352 self._status = base.BatchStatus.ERROR
353
354 if self._client.open_telemetry_enabled:
355 if self._rpc_span:
356 self._rpc_span.record_exception(
357 exception=exc,
358 )
359 self._rpc_span.set_status(
360 trace.Status(status_code=trace.StatusCode.ERROR)
361 )
362 self._rpc_span.end()
363
364 for wrapper in self._message_wrappers:
365 wrapper.end_create_span(exc=exc)
366
367 batch_transport_succeeded = False
368 if self._batch_done_callback is not None:
369 # Failed to publish batch.
370 self._batch_done_callback(batch_transport_succeeded)
371
372 for future in self._futures:
373 future.set_exception(exc)
374
375 return
376
377 end = time.time()
378 _LOGGER.debug("gRPC Publish took %s seconds.", end - start)
379
380 if len(response.message_ids) == len(self._futures):
381 # Iterate over the futures on the queue and return the response
382 # IDs. We are trusting that there is a 1:1 mapping, and raise
383 # an exception if not.
384 self._status = base.BatchStatus.SUCCESS
385 for message_id, future in zip(response.message_ids, self._futures):
386 future.set_result(message_id)
387 else:
388 # Sanity check: If the number of message IDs is not equal to
389 # the number of futures I have, then something went wrong.
390 self._status = base.BatchStatus.ERROR
391 exception = exceptions.PublishError(
392 "Some messages were not successfully published."
393 )
394
395 for future in self._futures:
396 future.set_exception(exception)
397
398 # Unknown error -> batch failed to be correctly transported/
399 batch_transport_succeeded = False
400
401 _LOGGER.error(
402 "Only %s of %s messages were published.",
403 len(response.message_ids),
404 len(self._futures),
405 )
406
407 if self._batch_done_callback is not None:
408 self._batch_done_callback(batch_transport_succeeded)
409
410 def publish(
411 self,
412 wrapper: PublishMessageWrapper,
413 ) -> Optional["pubsub_v1.publisher.futures.Future"]:
414 """Publish a single message.
415
416 Add the given message to this object; this will cause it to be
417 published once the batch either has enough messages or a sufficient
418 period of time has elapsed. If the batch is full or the commit is
419 already in progress, the method does not do anything.
420
421 This method is called by :meth:`~.PublisherClient.publish`.
422
423 Args:
424 wrapper: The Pub/Sub message wrapper.
425
426 Returns:
427 An object conforming to the :class:`~concurrent.futures.Future` interface
428 or :data:`None`. If :data:`None` is returned, that signals that the batch
429 cannot accept a message.
430
431 Raises:
432 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
433 the ``message`` would exceed the max size limit on the backend.
434 """
435
436 # Coerce the type, just in case.
437 if not isinstance(
438 wrapper.message, gapic_types.PubsubMessage
439 ): # pragma: NO COVER
440 # For performance reasons, the message should be constructed by directly
441 # using the raw protobuf class, and only then wrapping it into the
442 # higher-level PubsubMessage class.
443 vanilla_pb = _raw_proto_pubbsub_message(**wrapper.message)
444 wrapper.message = gapic_types.PubsubMessage.wrap(vanilla_pb)
445
446 future = None
447
448 with self._state_lock:
449 assert (
450 self._status != base.BatchStatus.ERROR
451 ), "Publish after stop() or publish error."
452
453 if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
454 return None
455
456 size_increase = gapic_types.PublishRequest(
457 messages=[wrapper.message]
458 )._pb.ByteSize()
459
460 if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
461 err_msg = (
462 "The message being published would produce too large a publish "
463 "request that would exceed the maximum allowed size on the "
464 "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
465 )
466 raise exceptions.MessageTooLargeError(err_msg)
467
468 new_size = self._size + size_increase
469 new_count = len(self._message_wrappers) + 1
470
471 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
472 overflow = new_size > size_limit or new_count >= self.settings.max_messages
473
474 if not self._message_wrappers or not overflow:
475 # Store the actual message in the batch's message queue.
476 self._message_wrappers.append(wrapper)
477 self._size = new_size
478
479 # Track the future on this batch (so that the result of the
480 # future can be set).
481 future = futures.Future()
482 self._futures.append(future)
483
484 # Try to commit, but it must be **without** the lock held, since
485 # ``commit()`` will try to obtain the lock.
486 if self._commit_when_full and overflow:
487 self.commit()
488
489 return future
490
491 def _set_status(self, status: base.BatchStatus):
492 self._status = status