1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import datetime as dt
18import json
19import logging
20import math
21import time
22import typing
23from typing import Optional, Callable
24
25from google.cloud.pubsub_v1.subscriber._protocol import requests
26from google.cloud.pubsub_v1.subscriber import futures
27from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
28from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
29 SubscribeOpenTelemetry,
30)
31
32
33if typing.TYPE_CHECKING: # pragma: NO COVER
34 import datetime
35 import queue
36 from google.cloud.pubsub_v1 import types
37 from google.protobuf.internal import containers
38
39
40_MESSAGE_REPR = """\
41Message {{
42 data: {!r}
43 ordering_key: {!r}
44 attributes: {}
45}}"""
46
47_ACK_NACK_LOGGER = logging.getLogger("ack-nack")
48
49_SUCCESS_FUTURE = futures.Future()
50_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS)
51
52
53def _indent(lines: str, prefix: str = " ") -> str:
54 """Indent some text.
55
56 Note that this is present as ``textwrap.indent``, but not in Python 2.
57
58 Args:
59 lines:
60 The newline delimited string to be indented.
61 prefix:
62 The prefix to indent each line with. Defaults to two spaces.
63
64 Returns:
65 The newly indented content.
66 """
67 indented = []
68 for line in lines.split("\n"):
69 indented.append(prefix + line)
70 return "\n".join(indented)
71
72
73class Message(object):
74 """A representation of a single Pub/Sub message.
75
76 The common way to interact with
77 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive
78 them in callbacks on subscriptions; most users should never have a need
79 to instantiate them by hand. (The exception to this is if you are
80 implementing a custom subclass to
81 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.)
82
83 Attributes:
84 message_id (str):
85 The message ID. In general, you should not need to use this directly.
86 data (bytes):
87 The data in the message. Note that this will be a :class:`bytes`,
88 not a text string.
89 attributes (MutableMapping[str, str]):
90 The attributes sent along with the message. See :attr:`attributes` for more
91 information on this type.
92 publish_time (google.protobuf.timestamp_pb2.Timestamp):
93 The time that this message was originally published.
94 opentelemetry_data (google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry.SubscribeOpenTelemetry)
95 Open Telemetry data associated with this message. None if Open Telemetry is not enabled.
96 """
97
98 def __init__(
99 self,
100 message: "types.PubsubMessage._meta._pb", # type: ignore
101 ack_id: str,
102 delivery_attempt: int,
103 request_queue: "queue.Queue",
104 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False,
105 ):
106 """Construct the Message.
107
108 .. note::
109
110 This class should not be constructed directly; it is the
111 responsibility of :class:`BasePolicy` subclasses to do so.
112
113 Args:
114 message (types.PubsubMessage._meta._pb):
115 The message received from Pub/Sub. For performance reasons it should be
116 the raw protobuf message normally wrapped by
117 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained
118 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the
119 latter's ``._pb`` attribute.
120 ack_id (str):
121 The ack_id received from Pub/Sub.
122 delivery_attempt (int):
123 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy
124 is set on the subscription, and zero otherwise.
125 request_queue (queue.Queue):
126 A queue provided by the policy that can accept requests; the policy is
127 responsible for handling those requests.
128 exactly_once_delivery_enabled_func (Callable[[], bool]):
129 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False.
130 """
131 self._message = message
132 self._ack_id = ack_id
133 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
134 self._request_queue = request_queue
135 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func
136 self.message_id = message.message_id
137
138 # The instantiation time is the time that this message
139 # was received. Tracking this provides us a way to be smart about
140 # the default lease deadline.
141 self._received_timestamp = time.time()
142
143 # Store the message attributes directly to speed up attribute access, i.e.
144 # to avoid two lookups if self._message.<attribute> pattern was used in
145 # properties.
146 self._attributes = message.attributes
147 self._data = message.data
148 self._publish_time = dt.datetime.fromtimestamp(
149 message.publish_time.seconds + message.publish_time.nanos / 1e9,
150 tz=dt.timezone.utc,
151 )
152 self._ordering_key = message.ordering_key
153 self._size = message.ByteSize()
154
155 # None if Open Telemetry is disabled. Else contains OpenTelemetry data.
156 self._opentelemetry_data: Optional[SubscribeOpenTelemetry] = None
157
158 def __repr__(self):
159 # Get an abbreviated version of the data.
160 abbv_data = self._message.data
161 if len(abbv_data) > 50:
162 abbv_data = abbv_data[:50] + b"..."
163
164 pretty_attrs = json.dumps(
165 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True
166 )
167 pretty_attrs = _indent(pretty_attrs)
168 # We don't actually want the first line indented.
169 pretty_attrs = pretty_attrs.lstrip()
170 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs)
171
172 @property
173 def opentelemetry_data(self):
174 return self._opentelemetry_data # pragma: NO COVER
175
176 @opentelemetry_data.setter
177 def opentelemetry_data(self, data):
178 self._opentelemetry_data = data # pragma: NO COVER
179
180 @property
181 def attributes(self) -> "containers.ScalarMap":
182 """Return the attributes of the underlying Pub/Sub Message.
183
184 .. warning::
185
186 A ``ScalarMap`` behaves slightly differently than a
187 ``dict``. For a Pub / Sub message this is a ``string->string`` map.
188 When trying to access a value via ``map['key']``, if the key is
189 not in the map, then the default value for the string type will
190 be returned, which is an empty string. It may be more intuitive
191 to just cast the map to a ``dict`` or to one use ``map.get``.
192
193 Returns:
194 containers.ScalarMap: The message's attributes. This is a
195 ``dict``-like object provided by ``google.protobuf``.
196 """
197 return self._attributes
198
199 @property
200 def data(self) -> bytes:
201 """Return the data for the underlying Pub/Sub Message.
202
203 Returns:
204 bytes: The message data. This is always a bytestring; if you want
205 a text string, call :meth:`bytes.decode`.
206 """
207 return self._data
208
209 @property
210 def publish_time(self) -> "datetime.datetime":
211 """Return the time that the message was originally published.
212
213 Returns:
214 datetime.datetime: The date and time that the message was
215 published.
216 """
217 return self._publish_time
218
219 @property
220 def ordering_key(self) -> str:
221 """The ordering key used to publish the message."""
222 return self._ordering_key
223
224 @property
225 def size(self) -> int:
226 """Return the size of the underlying message, in bytes."""
227 return self._size
228
229 @property
230 def ack_id(self) -> str:
231 """the ID used to ack the message."""
232 return self._ack_id
233
234 @property
235 def delivery_attempt(self) -> Optional[int]:
236 """The delivery attempt counter is 1 + (the sum of number of NACKs
237 and number of ack_deadline exceeds) for this message. It is set to None
238 if a DeadLetterPolicy is not set on the subscription.
239
240 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
241 exceeds event is whenever a message is not acknowledged within
242 ack_deadline. Note that ack_deadline is initially
243 Subscription.ackDeadlineSeconds, but may get extended automatically by
244 the client library.
245
246 The first delivery of a given message will have this value as 1. The value
247 is calculated at best effort and is approximate.
248
249 Returns:
250 Optional[int]: The delivery attempt counter or ``None``.
251 """
252 return self._delivery_attempt
253
254 def ack(self) -> None:
255 """Acknowledge the given message.
256
257 Acknowledging a message in Pub/Sub means that you are done
258 with it, and it will not be delivered to this subscription again.
259 You should avoid acknowledging messages until you have
260 *finished* processing them, so that in the event of a failure,
261 you receive the message again.
262
263 .. warning::
264 Acks in Pub/Sub are best effort. You should always
265 ensure that your processing code is idempotent, as you may
266 receive any given message more than once. If you need strong
267 guarantees about acks and re-deliveres, enable exactly-once
268 delivery on your subscription and use the `ack_with_response`
269 method instead. Exactly once delivery is a preview feature.
270 For more details, see:
271 https://cloud.google.com/pubsub/docs/exactly-once-delivery."
272
273 """
274 if self.opentelemetry_data:
275 self.opentelemetry_data.add_process_span_event("ack called")
276 self.opentelemetry_data.end_process_span()
277 time_to_ack = math.ceil(time.time() - self._received_timestamp)
278 self._request_queue.put(
279 requests.AckRequest(
280 message_id=self.message_id,
281 ack_id=self._ack_id,
282 byte_size=self.size,
283 time_to_ack=time_to_ack,
284 ordering_key=self.ordering_key,
285 future=None,
286 opentelemetry_data=self.opentelemetry_data,
287 )
288 )
289 _ACK_NACK_LOGGER.debug(
290 "Called ack for message (id=%s, ack_id=%s, ordering_key=%s)",
291 self.message_id,
292 self.ack_id,
293 self.ordering_key,
294 )
295
296 def ack_with_response(self) -> "futures.Future":
297 """Acknowledge the given message.
298
299 Acknowledging a message in Pub/Sub means that you are done
300 with it, and it will not be delivered to this subscription again.
301 You should avoid acknowledging messages until you have
302 *finished* processing them, so that in the event of a failure,
303 you receive the message again.
304
305 If exactly-once delivery is NOT enabled on the subscription, the
306 future returns immediately with an AcknowledgeStatus.SUCCESS.
307 Since acks in Cloud Pub/Sub are best effort when exactly-once
308 delivery is disabled, the message may be re-delivered. Because
309 re-deliveries are possible, you should ensure that your processing
310 code is idempotent, as you may receive any given message more than
311 once.
312
313 If exactly-once delivery is enabled on the subscription, the
314 future returned by this method tracks the state of acknowledgement
315 operation. If the future completes successfully, the message is
316 guaranteed NOT to be re-delivered. Otherwise, the future will
317 contain an exception with more details about the failure and the
318 message may be re-delivered.
319
320 Exactly once delivery is a preview feature. For more details,
321 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
322
323 Returns:
324 futures.Future: A
325 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
326 instance that conforms to Python Standard library's
327 :class:`~concurrent.futures.Future` interface (but not an
328 instance of that class). Call `result()` to get the result
329 of the operation; upon success, a
330 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
331 will be returned and upon an error, an
332 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
333 will be thrown.
334 """
335 _ACK_NACK_LOGGER.debug(
336 "Called ack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=True)",
337 self.message_id,
338 self.ack_id,
339 self.ordering_key,
340 )
341 if self.opentelemetry_data:
342 self.opentelemetry_data.add_process_span_event("ack called")
343 self.opentelemetry_data.end_process_span()
344 req_future: Optional[futures.Future]
345 if self._exactly_once_delivery_enabled_func():
346 future = futures.Future()
347 req_future = future
348 else:
349 future = _SUCCESS_FUTURE
350 req_future = None
351 time_to_ack = math.ceil(time.time() - self._received_timestamp)
352 self._request_queue.put(
353 requests.AckRequest(
354 message_id=self.message_id,
355 ack_id=self._ack_id,
356 byte_size=self.size,
357 time_to_ack=time_to_ack,
358 ordering_key=self.ordering_key,
359 future=req_future,
360 opentelemetry_data=self.opentelemetry_data,
361 )
362 )
363 return future
364
365 def drop(self) -> None:
366 """Release the message from lease management.
367
368 This informs the policy to no longer hold on to the lease for this
369 message. Pub/Sub will re-deliver the message if it is not acknowledged
370 before the existing lease expires.
371
372 .. warning::
373 For most use cases, the only reason to drop a message from
374 lease management is on `ack` or `nack`; this library
375 automatically drop()s the message on `ack` or `nack`. You probably
376 do not want to call this method directly.
377 """
378 self._request_queue.put(
379 requests.DropRequest(
380 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key
381 )
382 )
383
384 def modify_ack_deadline(self, seconds: int) -> None:
385 """Resets the deadline for acknowledgement.
386
387 New deadline will be the given value of seconds from now.
388
389 The default implementation handles automatically modacking received messages for you;
390 you should not need to manually deal with setting ack deadlines. The exception case is
391 if you are implementing your own custom subclass of
392 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
393
394 Args:
395 seconds (int):
396 The number of seconds to set the lease deadline to. This should be
397 between 0 and 600. Due to network latency, values below 10 are advised
398 against.
399 """
400 self._request_queue.put(
401 requests.ModAckRequest(
402 message_id=self.message_id,
403 ack_id=self._ack_id,
404 seconds=seconds,
405 future=None,
406 opentelemetry_data=self.opentelemetry_data,
407 )
408 )
409
410 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future":
411 """Resets the deadline for acknowledgement and returns the response
412 status via a future.
413
414 New deadline will be the given value of seconds from now.
415
416 The default implementation handles automatically modacking received messages for you;
417 you should not need to manually deal with setting ack deadlines. The exception case is
418 if you are implementing your own custom subclass of
419 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
420
421 If exactly-once delivery is NOT enabled on the subscription, the
422 future returns immediately with an AcknowledgeStatus.SUCCESS.
423 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort
424 when exactly-once delivery is disabled, the message may be re-delivered
425 within the set deadline.
426
427 If exactly-once delivery is enabled on the subscription, the
428 future returned by this method tracks the state of the
429 modify-ack-deadline operation. If the future completes successfully,
430 the message is guaranteed NOT to be re-delivered within the new deadline.
431 Otherwise, the future will contain an exception with more details about
432 the failure and the message will be redelivered according to its
433 currently-set ack deadline.
434
435 Exactly once delivery is a preview feature. For more details,
436 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
437
438 Args:
439 seconds (int):
440 The number of seconds to set the lease deadline to. This should be
441 between 0 and 600. Due to network latency, values below 10 are advised
442 against.
443 Returns:
444 futures.Future: A
445 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
446 instance that conforms to Python Standard library's
447 :class:`~concurrent.futures.Future` interface (but not an
448 instance of that class). Call `result()` to get the result
449 of the operation; upon success, a
450 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
451 will be returned and upon an error, an
452 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
453 will be thrown.
454
455 """
456 req_future: Optional[futures.Future]
457 if self._exactly_once_delivery_enabled_func():
458 future = futures.Future()
459 req_future = future
460 else:
461 future = _SUCCESS_FUTURE
462 req_future = None
463
464 self._request_queue.put(
465 requests.ModAckRequest(
466 message_id=self.message_id,
467 ack_id=self._ack_id,
468 seconds=seconds,
469 future=req_future,
470 opentelemetry_data=self.opentelemetry_data,
471 )
472 )
473
474 return future
475
476 def nack(self) -> None:
477 """Decline to acknowledge the given message.
478
479 This will cause the message to be re-delivered to subscribers. Re-deliveries
480 may take place immediately or after a delay, and may arrive at this subscriber
481 or another.
482 """
483 _ACK_NACK_LOGGER.debug(
484 "Called nack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)",
485 self.message_id,
486 self.ack_id,
487 self.ordering_key,
488 self._exactly_once_delivery_enabled_func(),
489 )
490 if self.opentelemetry_data:
491 self.opentelemetry_data.add_process_span_event("nack called")
492 self.opentelemetry_data.end_process_span()
493 self._request_queue.put(
494 requests.NackRequest(
495 ack_id=self._ack_id,
496 byte_size=self.size,
497 ordering_key=self.ordering_key,
498 future=None,
499 opentelemetry_data=self.opentelemetry_data,
500 )
501 )
502
503 def nack_with_response(self) -> "futures.Future":
504 """Decline to acknowledge the given message, returning the response status via
505 a future.
506
507 This will cause the message to be re-delivered to subscribers. Re-deliveries
508 may take place immediately or after a delay, and may arrive at this subscriber
509 or another.
510
511 If exactly-once delivery is NOT enabled on the subscription, the
512 future returns immediately with an AcknowledgeStatus.SUCCESS.
513
514 If exactly-once delivery is enabled on the subscription, the
515 future returned by this method tracks the state of the
516 nack operation. If the future completes successfully,
517 the future's result will be an AcknowledgeStatus.SUCCESS.
518 Otherwise, the future will contain an exception with more details about
519 the failure.
520
521 Exactly once delivery is a preview feature. For more details,
522 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
523
524 Returns:
525 futures.Future: A
526 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
527 instance that conforms to Python Standard library's
528 :class:`~concurrent.futures.Future` interface (but not an
529 instance of that class). Call `result()` to get the result
530 of the operation; upon success, a
531 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
532 will be returned and upon an error, an
533 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
534 will be thrown.
535
536 """
537 if self.opentelemetry_data:
538 self.opentelemetry_data.add_process_span_event("nack called")
539 self.opentelemetry_data.end_process_span()
540 req_future: Optional[futures.Future]
541 if self._exactly_once_delivery_enabled_func():
542 future = futures.Future()
543 req_future = future
544 else:
545 future = _SUCCESS_FUTURE
546 req_future = None
547
548 self._request_queue.put(
549 requests.NackRequest(
550 ack_id=self._ack_id,
551 byte_size=self.size,
552 ordering_key=self.ordering_key,
553 future=req_future,
554 opentelemetry_data=self.opentelemetry_data,
555 )
556 )
557
558 return future
559
560 @property
561 def exactly_once_enabled(self):
562 return self._exactly_once_delivery_enabled_func()