Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/message.py: 77%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import datetime as dt 

18import json 

19import logging 

20import math 

21import time 

22import typing 

23from typing import Optional, Callable 

24 

25from google.cloud.pubsub_v1.subscriber._protocol import requests 

26from google.cloud.pubsub_v1.subscriber import futures 

27from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus 

28from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

29 SubscribeOpenTelemetry, 

30) 

31 

32 

33if typing.TYPE_CHECKING: # pragma: NO COVER 

34 import datetime 

35 import queue 

36 from google.cloud.pubsub_v1 import types 

37 from google.protobuf.internal import containers 

38 

39 

40_MESSAGE_REPR = """\ 

41Message {{ 

42 data: {!r} 

43 ordering_key: {!r} 

44 attributes: {} 

45}}""" 

46 

47_ACK_NACK_LOGGER = logging.getLogger("ack-nack") 

48 

49_SUCCESS_FUTURE = futures.Future() 

50_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) 

51 

52 

53def _indent(lines: str, prefix: str = " ") -> str: 

54 """Indent some text. 

55 

56 Note that this is present as ``textwrap.indent``, but not in Python 2. 

57 

58 Args: 

59 lines: 

60 The newline delimited string to be indented. 

61 prefix: 

62 The prefix to indent each line with. Defaults to two spaces. 

63 

64 Returns: 

65 The newly indented content. 

66 """ 

67 indented = [] 

68 for line in lines.split("\n"): 

69 indented.append(prefix + line) 

70 return "\n".join(indented) 

71 

72 

73class Message(object): 

74 """A representation of a single Pub/Sub message. 

75 

76 The common way to interact with 

77 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive 

78 them in callbacks on subscriptions; most users should never have a need 

79 to instantiate them by hand. (The exception to this is if you are 

80 implementing a custom subclass to 

81 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.) 

82 

83 Attributes: 

84 message_id (str): 

85 The message ID. In general, you should not need to use this directly. 

86 data (bytes): 

87 The data in the message. Note that this will be a :class:`bytes`, 

88 not a text string. 

89 attributes (MutableMapping[str, str]): 

90 The attributes sent along with the message. See :attr:`attributes` for more 

91 information on this type. 

92 publish_time (google.protobuf.timestamp_pb2.Timestamp): 

93 The time that this message was originally published. 

94 opentelemetry_data (google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry.SubscribeOpenTelemetry) 

95 Open Telemetry data associated with this message. None if Open Telemetry is not enabled. 

96 """ 

97 

98 def __init__( 

99 self, 

100 message: "types.PubsubMessage._meta._pb", # type: ignore 

101 ack_id: str, 

102 delivery_attempt: int, 

103 request_queue: "queue.Queue", 

104 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False, 

105 ): 

106 """Construct the Message. 

107 

108 .. note:: 

109 

110 This class should not be constructed directly; it is the 

111 responsibility of :class:`BasePolicy` subclasses to do so. 

112 

113 Args: 

114 message (types.PubsubMessage._meta._pb): 

115 The message received from Pub/Sub. For performance reasons it should be 

116 the raw protobuf message normally wrapped by 

117 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained 

118 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the 

119 latter's ``._pb`` attribute. 

120 ack_id (str): 

121 The ack_id received from Pub/Sub. 

122 delivery_attempt (int): 

123 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy 

124 is set on the subscription, and zero otherwise. 

125 request_queue (queue.Queue): 

126 A queue provided by the policy that can accept requests; the policy is 

127 responsible for handling those requests. 

128 exactly_once_delivery_enabled_func (Callable[[], bool]): 

129 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False. 

130 """ 

131 self._message = message 

132 self._ack_id = ack_id 

133 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None 

134 self._request_queue = request_queue 

135 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func 

136 self.message_id = message.message_id 

137 

138 # The instantiation time is the time that this message 

139 # was received. Tracking this provides us a way to be smart about 

140 # the default lease deadline. 

141 self._received_timestamp = time.time() 

142 

143 # Store the message attributes directly to speed up attribute access, i.e. 

144 # to avoid two lookups if self._message.<attribute> pattern was used in 

145 # properties. 

146 self._attributes = message.attributes 

147 self._data = message.data 

148 self._publish_time = dt.datetime.fromtimestamp( 

149 message.publish_time.seconds + message.publish_time.nanos / 1e9, 

150 tz=dt.timezone.utc, 

151 ) 

152 self._ordering_key = message.ordering_key 

153 self._size = message.ByteSize() 

154 

155 # None if Open Telemetry is disabled. Else contains OpenTelemetry data. 

156 self._opentelemetry_data: Optional[SubscribeOpenTelemetry] = None 

157 

158 def __repr__(self): 

159 # Get an abbreviated version of the data. 

160 abbv_data = self._message.data 

161 if len(abbv_data) > 50: 

162 abbv_data = abbv_data[:50] + b"..." 

163 

164 pretty_attrs = json.dumps( 

165 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True 

166 ) 

167 pretty_attrs = _indent(pretty_attrs) 

168 # We don't actually want the first line indented. 

169 pretty_attrs = pretty_attrs.lstrip() 

170 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs) 

171 

172 @property 

173 def opentelemetry_data(self): 

174 return self._opentelemetry_data # pragma: NO COVER 

175 

176 @opentelemetry_data.setter 

177 def opentelemetry_data(self, data): 

178 self._opentelemetry_data = data # pragma: NO COVER 

179 

180 @property 

181 def attributes(self) -> "containers.ScalarMap": 

182 """Return the attributes of the underlying Pub/Sub Message. 

183 

184 .. warning:: 

185 

186 A ``ScalarMap`` behaves slightly differently than a 

187 ``dict``. For a Pub / Sub message this is a ``string->string`` map. 

188 When trying to access a value via ``map['key']``, if the key is 

189 not in the map, then the default value for the string type will 

190 be returned, which is an empty string. It may be more intuitive 

191 to just cast the map to a ``dict`` or to one use ``map.get``. 

192 

193 Returns: 

194 containers.ScalarMap: The message's attributes. This is a 

195 ``dict``-like object provided by ``google.protobuf``. 

196 """ 

197 return self._attributes 

198 

199 @property 

200 def data(self) -> bytes: 

201 """Return the data for the underlying Pub/Sub Message. 

202 

203 Returns: 

204 bytes: The message data. This is always a bytestring; if you want 

205 a text string, call :meth:`bytes.decode`. 

206 """ 

207 return self._data 

208 

209 @property 

210 def publish_time(self) -> "datetime.datetime": 

211 """Return the time that the message was originally published. 

212 

213 Returns: 

214 datetime.datetime: The date and time that the message was 

215 published. 

216 """ 

217 return self._publish_time 

218 

219 @property 

220 def ordering_key(self) -> str: 

221 """The ordering key used to publish the message.""" 

222 return self._ordering_key 

223 

224 @property 

225 def size(self) -> int: 

226 """Return the size of the underlying message, in bytes.""" 

227 return self._size 

228 

229 @property 

230 def ack_id(self) -> str: 

231 """the ID used to ack the message.""" 

232 return self._ack_id 

233 

234 @property 

235 def delivery_attempt(self) -> Optional[int]: 

236 """The delivery attempt counter is 1 + (the sum of number of NACKs 

237 and number of ack_deadline exceeds) for this message. It is set to None 

238 if a DeadLetterPolicy is not set on the subscription. 

239 

240 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline 

241 exceeds event is whenever a message is not acknowledged within 

242 ack_deadline. Note that ack_deadline is initially 

243 Subscription.ackDeadlineSeconds, but may get extended automatically by 

244 the client library. 

245 

246 The first delivery of a given message will have this value as 1. The value 

247 is calculated at best effort and is approximate. 

248 

249 Returns: 

250 Optional[int]: The delivery attempt counter or ``None``. 

251 """ 

252 return self._delivery_attempt 

253 

254 def ack(self) -> None: 

255 """Acknowledge the given message. 

256 

257 Acknowledging a message in Pub/Sub means that you are done 

258 with it, and it will not be delivered to this subscription again. 

259 You should avoid acknowledging messages until you have 

260 *finished* processing them, so that in the event of a failure, 

261 you receive the message again. 

262 

263 .. warning:: 

264 Acks in Pub/Sub are best effort. You should always 

265 ensure that your processing code is idempotent, as you may 

266 receive any given message more than once. If you need strong 

267 guarantees about acks and re-deliveres, enable exactly-once 

268 delivery on your subscription and use the `ack_with_response` 

269 method instead. Exactly once delivery is a preview feature. 

270 For more details, see: 

271 https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

272 

273 """ 

274 if self.opentelemetry_data: 

275 self.opentelemetry_data.add_process_span_event("ack called") 

276 self.opentelemetry_data.end_process_span() 

277 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

278 self._request_queue.put( 

279 requests.AckRequest( 

280 message_id=self.message_id, 

281 ack_id=self._ack_id, 

282 byte_size=self.size, 

283 time_to_ack=time_to_ack, 

284 ordering_key=self.ordering_key, 

285 future=None, 

286 opentelemetry_data=self.opentelemetry_data, 

287 ) 

288 ) 

289 _ACK_NACK_LOGGER.debug( 

290 "Called ack for message (id=%s, ack_id=%s, ordering_key=%s)", 

291 self.message_id, 

292 self.ack_id, 

293 self.ordering_key, 

294 ) 

295 

296 def ack_with_response(self) -> "futures.Future": 

297 """Acknowledge the given message. 

298 

299 Acknowledging a message in Pub/Sub means that you are done 

300 with it, and it will not be delivered to this subscription again. 

301 You should avoid acknowledging messages until you have 

302 *finished* processing them, so that in the event of a failure, 

303 you receive the message again. 

304 

305 If exactly-once delivery is NOT enabled on the subscription, the 

306 future returns immediately with an AcknowledgeStatus.SUCCESS. 

307 Since acks in Cloud Pub/Sub are best effort when exactly-once 

308 delivery is disabled, the message may be re-delivered. Because 

309 re-deliveries are possible, you should ensure that your processing 

310 code is idempotent, as you may receive any given message more than 

311 once. 

312 

313 If exactly-once delivery is enabled on the subscription, the 

314 future returned by this method tracks the state of acknowledgement 

315 operation. If the future completes successfully, the message is 

316 guaranteed NOT to be re-delivered. Otherwise, the future will 

317 contain an exception with more details about the failure and the 

318 message may be re-delivered. 

319 

320 Exactly once delivery is a preview feature. For more details, 

321 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

322 

323 Returns: 

324 futures.Future: A 

325 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

326 instance that conforms to Python Standard library's 

327 :class:`~concurrent.futures.Future` interface (but not an 

328 instance of that class). Call `result()` to get the result 

329 of the operation; upon success, a 

330 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

331 will be returned and upon an error, an 

332 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

333 will be thrown. 

334 """ 

335 _ACK_NACK_LOGGER.debug( 

336 "Called ack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=True)", 

337 self.message_id, 

338 self.ack_id, 

339 self.ordering_key, 

340 ) 

341 if self.opentelemetry_data: 

342 self.opentelemetry_data.add_process_span_event("ack called") 

343 self.opentelemetry_data.end_process_span() 

344 req_future: Optional[futures.Future] 

345 if self._exactly_once_delivery_enabled_func(): 

346 future = futures.Future() 

347 req_future = future 

348 else: 

349 future = _SUCCESS_FUTURE 

350 req_future = None 

351 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

352 self._request_queue.put( 

353 requests.AckRequest( 

354 message_id=self.message_id, 

355 ack_id=self._ack_id, 

356 byte_size=self.size, 

357 time_to_ack=time_to_ack, 

358 ordering_key=self.ordering_key, 

359 future=req_future, 

360 opentelemetry_data=self.opentelemetry_data, 

361 ) 

362 ) 

363 return future 

364 

365 def drop(self) -> None: 

366 """Release the message from lease management. 

367 

368 This informs the policy to no longer hold on to the lease for this 

369 message. Pub/Sub will re-deliver the message if it is not acknowledged 

370 before the existing lease expires. 

371 

372 .. warning:: 

373 For most use cases, the only reason to drop a message from 

374 lease management is on `ack` or `nack`; this library 

375 automatically drop()s the message on `ack` or `nack`. You probably 

376 do not want to call this method directly. 

377 """ 

378 self._request_queue.put( 

379 requests.DropRequest( 

380 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key 

381 ) 

382 ) 

383 

384 def modify_ack_deadline(self, seconds: int) -> None: 

385 """Resets the deadline for acknowledgement. 

386 

387 New deadline will be the given value of seconds from now. 

388 

389 The default implementation handles automatically modacking received messages for you; 

390 you should not need to manually deal with setting ack deadlines. The exception case is 

391 if you are implementing your own custom subclass of 

392 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

393 

394 Args: 

395 seconds (int): 

396 The number of seconds to set the lease deadline to. This should be 

397 between 0 and 600. Due to network latency, values below 10 are advised 

398 against. 

399 """ 

400 self._request_queue.put( 

401 requests.ModAckRequest( 

402 message_id=self.message_id, 

403 ack_id=self._ack_id, 

404 seconds=seconds, 

405 future=None, 

406 opentelemetry_data=self.opentelemetry_data, 

407 ) 

408 ) 

409 

410 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": 

411 """Resets the deadline for acknowledgement and returns the response 

412 status via a future. 

413 

414 New deadline will be the given value of seconds from now. 

415 

416 The default implementation handles automatically modacking received messages for you; 

417 you should not need to manually deal with setting ack deadlines. The exception case is 

418 if you are implementing your own custom subclass of 

419 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

420 

421 If exactly-once delivery is NOT enabled on the subscription, the 

422 future returns immediately with an AcknowledgeStatus.SUCCESS. 

423 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort 

424 when exactly-once delivery is disabled, the message may be re-delivered 

425 within the set deadline. 

426 

427 If exactly-once delivery is enabled on the subscription, the 

428 future returned by this method tracks the state of the 

429 modify-ack-deadline operation. If the future completes successfully, 

430 the message is guaranteed NOT to be re-delivered within the new deadline. 

431 Otherwise, the future will contain an exception with more details about 

432 the failure and the message will be redelivered according to its 

433 currently-set ack deadline. 

434 

435 Exactly once delivery is a preview feature. For more details, 

436 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

437 

438 Args: 

439 seconds (int): 

440 The number of seconds to set the lease deadline to. This should be 

441 between 0 and 600. Due to network latency, values below 10 are advised 

442 against. 

443 Returns: 

444 futures.Future: A 

445 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

446 instance that conforms to Python Standard library's 

447 :class:`~concurrent.futures.Future` interface (but not an 

448 instance of that class). Call `result()` to get the result 

449 of the operation; upon success, a 

450 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

451 will be returned and upon an error, an 

452 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

453 will be thrown. 

454 

455 """ 

456 req_future: Optional[futures.Future] 

457 if self._exactly_once_delivery_enabled_func(): 

458 future = futures.Future() 

459 req_future = future 

460 else: 

461 future = _SUCCESS_FUTURE 

462 req_future = None 

463 

464 self._request_queue.put( 

465 requests.ModAckRequest( 

466 message_id=self.message_id, 

467 ack_id=self._ack_id, 

468 seconds=seconds, 

469 future=req_future, 

470 opentelemetry_data=self.opentelemetry_data, 

471 ) 

472 ) 

473 

474 return future 

475 

476 def nack(self) -> None: 

477 """Decline to acknowledge the given message. 

478 

479 This will cause the message to be re-delivered to subscribers. Re-deliveries 

480 may take place immediately or after a delay, and may arrive at this subscriber 

481 or another. 

482 """ 

483 _ACK_NACK_LOGGER.debug( 

484 "Called nack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)", 

485 self.message_id, 

486 self.ack_id, 

487 self.ordering_key, 

488 self._exactly_once_delivery_enabled_func(), 

489 ) 

490 if self.opentelemetry_data: 

491 self.opentelemetry_data.add_process_span_event("nack called") 

492 self.opentelemetry_data.end_process_span() 

493 self._request_queue.put( 

494 requests.NackRequest( 

495 ack_id=self._ack_id, 

496 byte_size=self.size, 

497 ordering_key=self.ordering_key, 

498 future=None, 

499 opentelemetry_data=self.opentelemetry_data, 

500 ) 

501 ) 

502 

503 def nack_with_response(self) -> "futures.Future": 

504 """Decline to acknowledge the given message, returning the response status via 

505 a future. 

506 

507 This will cause the message to be re-delivered to subscribers. Re-deliveries 

508 may take place immediately or after a delay, and may arrive at this subscriber 

509 or another. 

510 

511 If exactly-once delivery is NOT enabled on the subscription, the 

512 future returns immediately with an AcknowledgeStatus.SUCCESS. 

513 

514 If exactly-once delivery is enabled on the subscription, the 

515 future returned by this method tracks the state of the 

516 nack operation. If the future completes successfully, 

517 the future's result will be an AcknowledgeStatus.SUCCESS. 

518 Otherwise, the future will contain an exception with more details about 

519 the failure. 

520 

521 Exactly once delivery is a preview feature. For more details, 

522 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

523 

524 Returns: 

525 futures.Future: A 

526 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

527 instance that conforms to Python Standard library's 

528 :class:`~concurrent.futures.Future` interface (but not an 

529 instance of that class). Call `result()` to get the result 

530 of the operation; upon success, a 

531 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

532 will be returned and upon an error, an 

533 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

534 will be thrown. 

535 

536 """ 

537 if self.opentelemetry_data: 

538 self.opentelemetry_data.add_process_span_event("nack called") 

539 self.opentelemetry_data.end_process_span() 

540 req_future: Optional[futures.Future] 

541 if self._exactly_once_delivery_enabled_func(): 

542 future = futures.Future() 

543 req_future = future 

544 else: 

545 future = _SUCCESS_FUTURE 

546 req_future = None 

547 

548 self._request_queue.put( 

549 requests.NackRequest( 

550 ack_id=self._ack_id, 

551 byte_size=self.size, 

552 ordering_key=self.ordering_key, 

553 future=req_future, 

554 opentelemetry_data=self.opentelemetry_data, 

555 ) 

556 ) 

557 

558 return future 

559 

560 @property 

561 def exactly_once_enabled(self): 

562 return self._exactly_once_delivery_enabled_func()