Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/message.py: 85%
95 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import datetime as dt
18import json
19import math
20import time
21import typing
22from typing import Optional, Callable
24from google.cloud.pubsub_v1.subscriber._protocol import requests
25from google.cloud.pubsub_v1.subscriber import futures
26from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
29if typing.TYPE_CHECKING: # pragma: NO COVER
30 import datetime
31 import queue
32 from google.cloud.pubsub_v1 import types
33 from google.protobuf.internal import containers
36_MESSAGE_REPR = """\
37Message {{
38 data: {!r}
39 ordering_key: {!r}
40 attributes: {}
41}}"""
43_SUCCESS_FUTURE = futures.Future()
44_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS)
47def _indent(lines: str, prefix: str = " ") -> str:
48 """Indent some text.
50 Note that this is present as ``textwrap.indent``, but not in Python 2.
52 Args:
53 lines:
54 The newline delimited string to be indented.
55 prefix:
56 The prefix to indent each line with. Defaults to two spaces.
58 Returns:
59 The newly indented content.
60 """
61 indented = []
62 for line in lines.split("\n"):
63 indented.append(prefix + line)
64 return "\n".join(indented)
67class Message(object):
68 """A representation of a single Pub/Sub message.
70 The common way to interact with
71 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive
72 them in callbacks on subscriptions; most users should never have a need
73 to instantiate them by hand. (The exception to this is if you are
74 implementing a custom subclass to
75 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.)
77 Attributes:
78 message_id (str):
79 The message ID. In general, you should not need to use this directly.
80 data (bytes):
81 The data in the message. Note that this will be a :class:`bytes`,
82 not a text string.
83 attributes (MutableMapping[str, str]):
84 The attributes sent along with the message. See :attr:`attributes` for more
85 information on this type.
86 publish_time (google.protobuf.timestamp_pb2.Timestamp):
87 The time that this message was originally published.
88 """
90 def __init__(
91 self,
92 message: "types.PubsubMessage._meta._pb", # type: ignore
93 ack_id: str,
94 delivery_attempt: int,
95 request_queue: "queue.Queue",
96 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False,
97 ):
98 """Construct the Message.
100 .. note::
102 This class should not be constructed directly; it is the
103 responsibility of :class:`BasePolicy` subclasses to do so.
105 Args:
106 message (types.PubsubMessage._meta._pb):
107 The message received from Pub/Sub. For performance reasons it should be
108 the raw protobuf message normally wrapped by
109 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained
110 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the
111 latter's ``._pb`` attribute.
112 ack_id (str):
113 The ack_id received from Pub/Sub.
114 delivery_attempt (int):
115 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy
116 is set on the subscription, and zero otherwise.
117 request_queue (queue.Queue):
118 A queue provided by the policy that can accept requests; the policy is
119 responsible for handling those requests.
120 exactly_once_delivery_enabled_func (Callable[[], bool]):
121 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False.
122 """
123 self._message = message
124 self._ack_id = ack_id
125 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
126 self._request_queue = request_queue
127 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func
128 self.message_id = message.message_id
130 # The instantiation time is the time that this message
131 # was received. Tracking this provides us a way to be smart about
132 # the default lease deadline.
133 self._received_timestamp = time.time()
135 # Store the message attributes directly to speed up attribute access, i.e.
136 # to avoid two lookups if self._message.<attribute> pattern was used in
137 # properties.
138 self._attributes = message.attributes
139 self._data = message.data
140 self._publish_time = dt.datetime.fromtimestamp(
141 message.publish_time.seconds + message.publish_time.nanos / 1e9,
142 tz=dt.timezone.utc,
143 )
144 self._ordering_key = message.ordering_key
145 self._size = message.ByteSize()
147 def __repr__(self):
148 # Get an abbreviated version of the data.
149 abbv_data = self._message.data
150 if len(abbv_data) > 50:
151 abbv_data = abbv_data[:50] + b"..."
153 pretty_attrs = json.dumps(
154 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True
155 )
156 pretty_attrs = _indent(pretty_attrs)
157 # We don't actually want the first line indented.
158 pretty_attrs = pretty_attrs.lstrip()
159 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs)
161 @property
162 def attributes(self) -> "containers.ScalarMap":
163 """Return the attributes of the underlying Pub/Sub Message.
165 .. warning::
167 A ``ScalarMap`` behaves slightly differently than a
168 ``dict``. For a Pub / Sub message this is a ``string->string`` map.
169 When trying to access a value via ``map['key']``, if the key is
170 not in the map, then the default value for the string type will
171 be returned, which is an empty string. It may be more intuitive
172 to just cast the map to a ``dict`` or to one use ``map.get``.
174 Returns:
175 containers.ScalarMap: The message's attributes. This is a
176 ``dict``-like object provided by ``google.protobuf``.
177 """
178 return self._attributes
180 @property
181 def data(self) -> bytes:
182 """Return the data for the underlying Pub/Sub Message.
184 Returns:
185 bytes: The message data. This is always a bytestring; if you want
186 a text string, call :meth:`bytes.decode`.
187 """
188 return self._data
190 @property
191 def publish_time(self) -> "datetime.datetime":
192 """Return the time that the message was originally published.
194 Returns:
195 datetime.datetime: The date and time that the message was
196 published.
197 """
198 return self._publish_time
200 @property
201 def ordering_key(self) -> str:
202 """The ordering key used to publish the message."""
203 return self._ordering_key
205 @property
206 def size(self) -> int:
207 """Return the size of the underlying message, in bytes."""
208 return self._size
210 @property
211 def ack_id(self) -> str:
212 """the ID used to ack the message."""
213 return self._ack_id
215 @property
216 def delivery_attempt(self) -> Optional[int]:
217 """The delivery attempt counter is 1 + (the sum of number of NACKs
218 and number of ack_deadline exceeds) for this message. It is set to None
219 if a DeadLetterPolicy is not set on the subscription.
221 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
222 exceeds event is whenever a message is not acknowledged within
223 ack_deadline. Note that ack_deadline is initially
224 Subscription.ackDeadlineSeconds, but may get extended automatically by
225 the client library.
227 The first delivery of a given message will have this value as 1. The value
228 is calculated at best effort and is approximate.
230 Returns:
231 Optional[int]: The delivery attempt counter or ``None``.
232 """
233 return self._delivery_attempt
235 def ack(self) -> None:
236 """Acknowledge the given message.
238 Acknowledging a message in Pub/Sub means that you are done
239 with it, and it will not be delivered to this subscription again.
240 You should avoid acknowledging messages until you have
241 *finished* processing them, so that in the event of a failure,
242 you receive the message again.
244 .. warning::
245 Acks in Pub/Sub are best effort. You should always
246 ensure that your processing code is idempotent, as you may
247 receive any given message more than once. If you need strong
248 guarantees about acks and re-deliveres, enable exactly-once
249 delivery on your subscription and use the `ack_with_response`
250 method instead. Exactly once delivery is a preview feature.
251 For more details, see:
252 https://cloud.google.com/pubsub/docs/exactly-once-delivery."
254 """
255 time_to_ack = math.ceil(time.time() - self._received_timestamp)
256 self._request_queue.put(
257 requests.AckRequest(
258 ack_id=self._ack_id,
259 byte_size=self.size,
260 time_to_ack=time_to_ack,
261 ordering_key=self.ordering_key,
262 future=None,
263 )
264 )
266 def ack_with_response(self) -> "futures.Future":
267 """Acknowledge the given message.
269 Acknowledging a message in Pub/Sub means that you are done
270 with it, and it will not be delivered to this subscription again.
271 You should avoid acknowledging messages until you have
272 *finished* processing them, so that in the event of a failure,
273 you receive the message again.
275 If exactly-once delivery is NOT enabled on the subscription, the
276 future returns immediately with an AcknowledgeStatus.SUCCESS.
277 Since acks in Cloud Pub/Sub are best effort when exactly-once
278 delivery is disabled, the message may be re-delivered. Because
279 re-deliveries are possible, you should ensure that your processing
280 code is idempotent, as you may receive any given message more than
281 once.
283 If exactly-once delivery is enabled on the subscription, the
284 future returned by this method tracks the state of acknowledgement
285 operation. If the future completes successfully, the message is
286 guaranteed NOT to be re-delivered. Otherwise, the future will
287 contain an exception with more details about the failure and the
288 message may be re-delivered.
290 Exactly once delivery is a preview feature. For more details,
291 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
293 Returns:
294 futures.Future: A
295 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
296 instance that conforms to Python Standard library's
297 :class:`~concurrent.futures.Future` interface (but not an
298 instance of that class). Call `result()` to get the result
299 of the operation; upon success, a
300 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
301 will be returned and upon an error, an
302 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
303 will be thrown.
304 """
305 req_future: Optional[futures.Future]
306 if self._exactly_once_delivery_enabled_func():
307 future = futures.Future()
308 req_future = future
309 else:
310 future = _SUCCESS_FUTURE
311 req_future = None
312 time_to_ack = math.ceil(time.time() - self._received_timestamp)
313 self._request_queue.put(
314 requests.AckRequest(
315 ack_id=self._ack_id,
316 byte_size=self.size,
317 time_to_ack=time_to_ack,
318 ordering_key=self.ordering_key,
319 future=req_future,
320 )
321 )
322 return future
324 def drop(self) -> None:
325 """Release the message from lease management.
327 This informs the policy to no longer hold on to the lease for this
328 message. Pub/Sub will re-deliver the message if it is not acknowledged
329 before the existing lease expires.
331 .. warning::
332 For most use cases, the only reason to drop a message from
333 lease management is on `ack` or `nack`; this library
334 automatically drop()s the message on `ack` or `nack`. You probably
335 do not want to call this method directly.
336 """
337 self._request_queue.put(
338 requests.DropRequest(
339 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key
340 )
341 )
343 def modify_ack_deadline(self, seconds: int) -> None:
344 """Resets the deadline for acknowledgement.
346 New deadline will be the given value of seconds from now.
348 The default implementation handles automatically modacking received messages for you;
349 you should not need to manually deal with setting ack deadlines. The exception case is
350 if you are implementing your own custom subclass of
351 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
353 Args:
354 seconds (int):
355 The number of seconds to set the lease deadline to. This should be
356 between 0 and 600. Due to network latency, values below 10 are advised
357 against.
358 """
359 self._request_queue.put(
360 requests.ModAckRequest(ack_id=self._ack_id, seconds=seconds, future=None)
361 )
363 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future":
364 """Resets the deadline for acknowledgement and returns the response
365 status via a future.
367 New deadline will be the given value of seconds from now.
369 The default implementation handles automatically modacking received messages for you;
370 you should not need to manually deal with setting ack deadlines. The exception case is
371 if you are implementing your own custom subclass of
372 :class:`~.pubsub_v1.subcriber._consumer.Consumer`.
374 If exactly-once delivery is NOT enabled on the subscription, the
375 future returns immediately with an AcknowledgeStatus.SUCCESS.
376 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort
377 when exactly-once delivery is disabled, the message may be re-delivered
378 within the set deadline.
380 If exactly-once delivery is enabled on the subscription, the
381 future returned by this method tracks the state of the
382 modify-ack-deadline operation. If the future completes successfully,
383 the message is guaranteed NOT to be re-delivered within the new deadline.
384 Otherwise, the future will contain an exception with more details about
385 the failure and the message will be redelivered according to its
386 currently-set ack deadline.
388 Exactly once delivery is a preview feature. For more details,
389 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
391 Args:
392 seconds (int):
393 The number of seconds to set the lease deadline to. This should be
394 between 0 and 600. Due to network latency, values below 10 are advised
395 against.
396 Returns:
397 futures.Future: A
398 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
399 instance that conforms to Python Standard library's
400 :class:`~concurrent.futures.Future` interface (but not an
401 instance of that class). Call `result()` to get the result
402 of the operation; upon success, a
403 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
404 will be returned and upon an error, an
405 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
406 will be thrown.
408 """
409 req_future: Optional[futures.Future]
410 if self._exactly_once_delivery_enabled_func():
411 future = futures.Future()
412 req_future = future
413 else:
414 future = _SUCCESS_FUTURE
415 req_future = None
417 self._request_queue.put(
418 requests.ModAckRequest(
419 ack_id=self._ack_id, seconds=seconds, future=req_future
420 )
421 )
423 return future
425 def nack(self) -> None:
426 """Decline to acknowledge the given message.
428 This will cause the message to be re-delivered to subscribers. Re-deliveries
429 may take place immediately or after a delay, and may arrive at this subscriber
430 or another.
431 """
432 self._request_queue.put(
433 requests.NackRequest(
434 ack_id=self._ack_id,
435 byte_size=self.size,
436 ordering_key=self.ordering_key,
437 future=None,
438 )
439 )
441 def nack_with_response(self) -> "futures.Future":
442 """Decline to acknowledge the given message, returning the response status via
443 a future.
445 This will cause the message to be re-delivered to subscribers. Re-deliveries
446 may take place immediately or after a delay, and may arrive at this subscriber
447 or another.
449 If exactly-once delivery is NOT enabled on the subscription, the
450 future returns immediately with an AcknowledgeStatus.SUCCESS.
452 If exactly-once delivery is enabled on the subscription, the
453 future returned by this method tracks the state of the
454 nack operation. If the future completes successfully,
455 the future's result will be an AcknowledgeStatus.SUCCESS.
456 Otherwise, the future will contain an exception with more details about
457 the failure.
459 Exactly once delivery is a preview feature. For more details,
460 see https://cloud.google.com/pubsub/docs/exactly-once-delivery."
462 Returns:
463 futures.Future: A
464 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future`
465 instance that conforms to Python Standard library's
466 :class:`~concurrent.futures.Future` interface (but not an
467 instance of that class). Call `result()` to get the result
468 of the operation; upon success, a
469 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS
470 will be returned and upon an error, an
471 pubsub_v1.subscriber.exceptions.AcknowledgeError exception
472 will be thrown.
474 """
475 req_future: Optional[futures.Future]
476 if self._exactly_once_delivery_enabled_func():
477 future = futures.Future()
478 req_future = future
479 else:
480 future = _SUCCESS_FUTURE
481 req_future = None
483 self._request_queue.put(
484 requests.NackRequest(
485 ack_id=self._ack_id,
486 byte_size=self.size,
487 ordering_key=self.ordering_key,
488 future=req_future,
489 )
490 )
492 return future