1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import datetime as dt
18import json
19import math
20import time
21import typing
22from typing import Optional, Callable
23
24from google.cloud.pubsub_v1.subscriber._protocol import requests
25from google.cloud.pubsub_v1.subscriber import futures
26from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
27from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
28 SubscribeOpenTelemetry,
29)
30
31
32if typing.TYPE_CHECKING: # pragma: NO COVER
33 import datetime
34 import queue
35 from google.cloud.pubsub_v1 import types
36 from google.protobuf.internal import containers
37
38
39_MESSAGE_REPR = """\
40Message {{
41 data: {!r}
42 ordering_key: {!r}
43 attributes: {}
44}}"""
45
46_SUCCESS_FUTURE = futures.Future()
47_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS)
48
49
50def _indent(lines: str, prefix: str = " ") -> str:
51 """Indent some text.
52
53 Note that this is present as ``textwrap.indent``, but not in Python 2.
54
55 Args:
56 lines:
57 The newline delimited string to be indented.
58 prefix:
59 The prefix to indent each line with. Defaults to two spaces.
60
61 Returns:
62 The newly indented content.
63 """
64 indented = []
65 for line in lines.split("\n"):
66 indented.append(prefix + line)
67 return "\n".join(indented)
68
69
70class Message(object):
71 """A representation of a single Pub/Sub message.
72
73 The common way to interact with
74 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive
75 them in callbacks on subscriptions; most users should never have a need
76 to instantiate them by hand. (The exception to this is if you are
77 implementing a custom subclass to
78 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.)
79
80 Attributes:
81 message_id (str):
82 The message ID. In general, you should not need to use this directly.
83 data (bytes):
84 The data in the message. Note that this will be a :class:`bytes`,
85 not a text string.
86 attributes (MutableMapping[str, str]):
87 The attributes sent along with the message. See :attr:`attributes` for more
88 information on this type.
89 publish_time (google.protobuf.timestamp_pb2.Timestamp):
90 The time that this message was originally published.
91 opentelemetry_data (google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry.SubscribeOpenTelemetry)
92 Open Telemetry data associated with this message. None if Open Telemetry is not enabled.
93 """
94
95 def __init__(
96 self,
97 message: "types.PubsubMessage._meta._pb", # type: ignore
98 ack_id: str,
99 delivery_attempt: int,
100 request_queue: "queue.Queue",
101 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False,
102 ):
103 """Construct the Message.
104
105 .. note::
106
107 This class should not be constructed directly; it is the
108 responsibility of :class:`BasePolicy` subclasses to do so.
109
110 Args:
111 message (types.PubsubMessage._meta._pb):
112 The message received from Pub/Sub. For performance reasons it should be
113 the raw protobuf message normally wrapped by
114 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained
115 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the
116 latter's ``._pb`` attribute.
117 ack_id (str):
118 The ack_id received from Pub/Sub.
119 delivery_attempt (int):
120 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy
121 is set on the subscription, and zero otherwise.
122 request_queue (queue.Queue):
123 A queue provided by the policy that can accept requests; the policy is
124 responsible for handling those requests.
125 exactly_once_delivery_enabled_func (Callable[[], bool]):
126 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False.
127 """
128 self._message = message
129 self._ack_id = ack_id
130 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
131 self._request_queue = request_queue
132 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func
133 self.message_id = message.message_id
134
135 # The instantiation time is the time that this message
136 # was received. Tracking this provides us a way to be smart about
137 # the default lease deadline.
138 self._received_timestamp = time.time()
139
140 # Store the message attributes directly to speed up attribute access, i.e.
141 # to avoid two lookups if self._message.<attribute> pattern was used in
142 # properties.
143 self._attributes = message.attributes
144 self._data = message.data
145 self._publish_time = dt.datetime.fromtimestamp(
146 message.publish_time.seconds + message.publish_time.nanos / 1e9,
147 tz=dt.timezone.utc,
148 )
149 self._ordering_key = message.ordering_key
150 self._size = message.ByteSize()
151
152 # None if Open Telemetry is disabled. Else contains OpenTelemetry data.
153 self._opentelemetry_data: Optional[SubscribeOpenTelemetry] = None
154
155 def __repr__(self):
156 # Get an abbreviated version of the data.
157 abbv_data = self._message.data
158 if len(abbv_data) > 50:
159 abbv_data = abbv_data[:50] + b"..."
160
161 pretty_attrs = json.dumps(
162 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True
163 )
164 pretty_attrs = _indent(pretty_attrs)
165 # We don't actually want the first line indented.
166 pretty_attrs = pretty_attrs.lstrip()
167 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs)
168
169 @property
170 def opentelemetry_data(self):
171 return self._opentelemetry_data # pragma: NO COVER
172
173 @opentelemetry_data.setter
174 def opentelemetry_data(self, data):
175 self._opentelemetry_data = data # pragma: NO COVER
176
177 @property
178 def attributes(self) -> "containers.ScalarMap":
179 """Return the attributes of the underlying Pub/Sub Message.
180
181 .. warning::
182
183 A ``ScalarMap`` behaves slightly differently than a
184 ``dict``. For a Pub / Sub message this is a ``string->string`` map.
185 When trying to access a value via ``map['key']``, if the key is
186 not in the map, then the default value for the string type will
187 be returned, which is an empty string. It may be more intuitive
188 to just cast the map to a ``dict`` or to one use ``map.get``.
189
190 Returns:
191 containers.ScalarMap: The message's attributes. This is a
192 ``dict``-like object provided by ``google.protobuf``.
193 """
194 return self._attributes
195
196 @property
197 def data(self) -> bytes:
198 """Return the data for the underlying Pub/Sub Message.
199
200 Returns:
201 bytes: The message data. This is always a bytestring; if you want
202 a text string, call :meth:`bytes.decode`.
203 """
204 return self._data
205
206 @property
207 def publish_time(self) -> "datetime.datetime":
208 """Return the time that the message was originally published.
209
210 Returns:
211 datetime.datetime: The date and time that the message was
212 published.
213 """
214 return self._publish_time
215
216 @property
217 def ordering_key(self) -> str:
218 """The ordering key used to publish the message."""
219 return self._ordering_key
220
221 @property
222 def size(self) -> int:
223 """Return the size of the underlying message, in bytes."""
224 return self._size
225
226 @property
227 def ack_id(self) -> str:
228 """the ID used to ack the message."""
229 return self._ack_id
230
231 @property
232 def delivery_attempt(self) -> Optional[int]:
233 """The delivery attempt counter is 1 + (the sum of number of NACKs
234 and number of ack_deadline exceeds) for this message. It is set to None
235 if a DeadLetterPolicy is not set on the subscription.
236
237 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
238 exceeds event is whenever a message is not acknowledged within
239 ack_deadline. Note that ack_deadline is initially
240 Subscription.ackDeadlineSeconds, but may get extended automatically by
241 the client library.
242
243 The first delivery of a given message will have this value as 1. The value
244 is calculated at best effort and is approximate.
245
246 Returns:
247 Optional[int]: The delivery attempt counter or ``None``.
248 """
249 return self._delivery_attempt
250
251 def ack(self) -> None:
252 """Acknowledge the given message.
253
254 Acknowledging a message in Pub/Sub means that you are done
255 with it, and it will not be delivered to this subscription again.
256 You should avoid acknowledging messages until you have
257 *finished* processing them, so that in the event of a failure,
258 you receive the message again.
259
260 .. warning::
261 Acks in Pub/Sub are best effort. You should always
262 ensure that your processing code is idempotent, as you may
263 receive any given message more than once. If you need strong
264 guarantees about acks and re-deliveres, enable exactly-once
265 delivery on your subscription and use the `ack_with_response`
266 method instead. Exactly once delivery is a preview feature.
267 For more details, see:
268 https://cloud.google.com/pubsub/docs/exactly-once-delivery."
269
270 """
271 if self.opentelemetry_data:
272 self.opentelemetry_data.add_process_span_event("ack called")
273 self.opentelemetry_data.end_process_span()
274 time_to_ack = math.ceil(time.time() - self._received_timestamp)
275 self._request_queue.put(
276 requests.AckRequest(
277 ack_id=self._ack_id,
278 byte_size=self.size,
279 time_to_ack=time_to_ack,
280 ordering_key=self.ordering_key,
281 future=None,
282 opentelemetry_data=self.opentelemetry_data,
283 )
284 )
285
286 def ack_with_response(self) -> "futures.Future":
287 """Acknowledge the given message.
288
289 Acknowledging a message in Pub/Sub means that you are done
290 with it, and it will not be delivered to this subscription again.
291 You should avoid acknowledging messages until you have
292 *finished* processing them, so that in the event of a failure,
293 you receive the message again.
294
295 If exactly-once delivery is NOT enabled on the subscription, the
296 future returns immediately with an AcknowledgeStatus.SUCCESS.
297 Since acks in Cloud Pub/Sub are best effort when exactly-once
298 delivery is disabled, the message may be re-delivered. Because
299 re-deliveries are possible, you should ensure that your processing
300 code is idempotent, as you may receive any given message more than
301 once.
302
303 If exactly-once delivery is enabled on the subscription, the
304 future returned by this method tracks the state of acknowledgement
305 operation. If the future completes successfully, the message is
306 guaranteed NOT to be re-delivered. Otherwise, the future will
307 contain an exception with more details about the failure and the
308 message may be re-delivered.
309
310 Exactly once delivery is a preview feature. For more details,
311 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
312
313 Returns:
314 futures.Future: A
315 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
316 instance that conforms to Python Standard library's
317 :class:`~concurrent.futures.Future` interface (but not an
318 instance of that class). Call `result()` to get the result
319 of the operation; upon success, a
320 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
321 will be returned and upon an error, an
322 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
323 will be thrown.
324 """
325 if self.opentelemetry_data:
326 self.opentelemetry_data.add_process_span_event("ack called")
327 self.opentelemetry_data.end_process_span()
328 req_future: Optional[futures.Future]
329 if self._exactly_once_delivery_enabled_func():
330 future = futures.Future()
331 req_future = future
332 else:
333 future = _SUCCESS_FUTURE
334 req_future = None
335 time_to_ack = math.ceil(time.time() - self._received_timestamp)
336 self._request_queue.put(
337 requests.AckRequest(
338 ack_id=self._ack_id,
339 byte_size=self.size,
340 time_to_ack=time_to_ack,
341 ordering_key=self.ordering_key,
342 future=req_future,
343 opentelemetry_data=self.opentelemetry_data,
344 )
345 )
346 return future
347
348 def drop(self) -> None:
349 """Release the message from lease management.
350
351 This informs the policy to no longer hold on to the lease for this
352 message. Pub/Sub will re-deliver the message if it is not acknowledged
353 before the existing lease expires.
354
355 .. warning::
356 For most use cases, the only reason to drop a message from
357 lease management is on `ack` or `nack`; this library
358 automatically drop()s the message on `ack` or `nack`. You probably
359 do not want to call this method directly.
360 """
361 self._request_queue.put(
362 requests.DropRequest(
363 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key
364 )
365 )
366
367 def modify_ack_deadline(self, seconds: int) -> None:
368 """Resets the deadline for acknowledgement.
369
370 New deadline will be the given value of seconds from now.
371
372 The default implementation handles automatically modacking received messages for you;
373 you should not need to manually deal with setting ack deadlines. The exception case is
374 if you are implementing your own custom subclass of
375 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
376
377 Args:
378 seconds (int):
379 The number of seconds to set the lease deadline to. This should be
380 between 0 and 600. Due to network latency, values below 10 are advised
381 against.
382 """
383 self._request_queue.put(
384 requests.ModAckRequest(
385 ack_id=self._ack_id,
386 seconds=seconds,
387 future=None,
388 opentelemetry_data=self.opentelemetry_data,
389 )
390 )
391
392 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future":
393 """Resets the deadline for acknowledgement and returns the response
394 status via a future.
395
396 New deadline will be the given value of seconds from now.
397
398 The default implementation handles automatically modacking received messages for you;
399 you should not need to manually deal with setting ack deadlines. The exception case is
400 if you are implementing your own custom subclass of
401 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
402
403 If exactly-once delivery is NOT enabled on the subscription, the
404 future returns immediately with an AcknowledgeStatus.SUCCESS.
405 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort
406 when exactly-once delivery is disabled, the message may be re-delivered
407 within the set deadline.
408
409 If exactly-once delivery is enabled on the subscription, the
410 future returned by this method tracks the state of the
411 modify-ack-deadline operation. If the future completes successfully,
412 the message is guaranteed NOT to be re-delivered within the new deadline.
413 Otherwise, the future will contain an exception with more details about
414 the failure and the message will be redelivered according to its
415 currently-set ack deadline.
416
417 Exactly once delivery is a preview feature. For more details,
418 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
419
420 Args:
421 seconds (int):
422 The number of seconds to set the lease deadline to. This should be
423 between 0 and 600. Due to network latency, values below 10 are advised
424 against.
425 Returns:
426 futures.Future: A
427 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
428 instance that conforms to Python Standard library's
429 :class:`~concurrent.futures.Future` interface (but not an
430 instance of that class). Call `result()` to get the result
431 of the operation; upon success, a
432 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
433 will be returned and upon an error, an
434 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
435 will be thrown.
436
437 """
438 req_future: Optional[futures.Future]
439 if self._exactly_once_delivery_enabled_func():
440 future = futures.Future()
441 req_future = future
442 else:
443 future = _SUCCESS_FUTURE
444 req_future = None
445
446 self._request_queue.put(
447 requests.ModAckRequest(
448 ack_id=self._ack_id,
449 seconds=seconds,
450 future=req_future,
451 opentelemetry_data=self.opentelemetry_data,
452 )
453 )
454
455 return future
456
457 def nack(self) -> None:
458 """Decline to acknowledge the given message.
459
460 This will cause the message to be re-delivered to subscribers. Re-deliveries
461 may take place immediately or after a delay, and may arrive at this subscriber
462 or another.
463 """
464 if self.opentelemetry_data:
465 self.opentelemetry_data.add_process_span_event("nack called")
466 self.opentelemetry_data.end_process_span()
467 self._request_queue.put(
468 requests.NackRequest(
469 ack_id=self._ack_id,
470 byte_size=self.size,
471 ordering_key=self.ordering_key,
472 future=None,
473 opentelemetry_data=self.opentelemetry_data,
474 )
475 )
476
477 def nack_with_response(self) -> "futures.Future":
478 """Decline to acknowledge the given message, returning the response status via
479 a future.
480
481 This will cause the message to be re-delivered to subscribers. Re-deliveries
482 may take place immediately or after a delay, and may arrive at this subscriber
483 or another.
484
485 If exactly-once delivery is NOT enabled on the subscription, the
486 future returns immediately with an AcknowledgeStatus.SUCCESS.
487
488 If exactly-once delivery is enabled on the subscription, the
489 future returned by this method tracks the state of the
490 nack operation. If the future completes successfully,
491 the future's result will be an AcknowledgeStatus.SUCCESS.
492 Otherwise, the future will contain an exception with more details about
493 the failure.
494
495 Exactly once delivery is a preview feature. For more details,
496 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
497
498 Returns:
499 futures.Future: A
500 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
501 instance that conforms to Python Standard library's
502 :class:`~concurrent.futures.Future` interface (but not an
503 instance of that class). Call `result()` to get the result
504 of the operation; upon success, a
505 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
506 will be returned and upon an error, an
507 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
508 will be thrown.
509
510 """
511 if self.opentelemetry_data:
512 self.opentelemetry_data.add_process_span_event("nack called")
513 self.opentelemetry_data.end_process_span()
514 req_future: Optional[futures.Future]
515 if self._exactly_once_delivery_enabled_func():
516 future = futures.Future()
517 req_future = future
518 else:
519 future = _SUCCESS_FUTURE
520 req_future = None
521
522 self._request_queue.put(
523 requests.NackRequest(
524 ack_id=self._ack_id,
525 byte_size=self.size,
526 ordering_key=self.ordering_key,
527 future=req_future,
528 opentelemetry_data=self.opentelemetry_data,
529 )
530 )
531
532 return future