Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/message.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import datetime as dt 

18import json 

19import math 

20import time 

21import typing 

22from typing import Optional, Callable 

23 

24from google.cloud.pubsub_v1.subscriber._protocol import requests 

25from google.cloud.pubsub_v1.subscriber import futures 

26from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus 

27from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( 

28 SubscribeOpenTelemetry, 

29) 

30 

31 

32if typing.TYPE_CHECKING: # pragma: NO COVER 

33 import datetime 

34 import queue 

35 from google.cloud.pubsub_v1 import types 

36 from google.protobuf.internal import containers 

37 

38 

39_MESSAGE_REPR = """\ 

40Message {{ 

41 data: {!r} 

42 ordering_key: {!r} 

43 attributes: {} 

44}}""" 

45 

46_SUCCESS_FUTURE = futures.Future() 

47_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) 

48 

49 

50def _indent(lines: str, prefix: str = " ") -> str: 

51 """Indent some text. 

52 

53 Note that this is present as ``textwrap.indent``, but not in Python 2. 

54 

55 Args: 

56 lines: 

57 The newline delimited string to be indented. 

58 prefix: 

59 The prefix to indent each line with. Defaults to two spaces. 

60 

61 Returns: 

62 The newly indented content. 

63 """ 

64 indented = [] 

65 for line in lines.split("\n"): 

66 indented.append(prefix + line) 

67 return "\n".join(indented) 

68 

69 

70class Message(object): 

71 """A representation of a single Pub/Sub message. 

72 

73 The common way to interact with 

74 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive 

75 them in callbacks on subscriptions; most users should never have a need 

76 to instantiate them by hand. (The exception to this is if you are 

77 implementing a custom subclass to 

78 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.) 

79 

80 Attributes: 

81 message_id (str): 

82 The message ID. In general, you should not need to use this directly. 

83 data (bytes): 

84 The data in the message. Note that this will be a :class:`bytes`, 

85 not a text string. 

86 attributes (MutableMapping[str, str]): 

87 The attributes sent along with the message. See :attr:`attributes` for more 

88 information on this type. 

89 publish_time (google.protobuf.timestamp_pb2.Timestamp): 

90 The time that this message was originally published. 

91 opentelemetry_data (google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry.SubscribeOpenTelemetry) 

92 Open Telemetry data associated with this message. None if Open Telemetry is not enabled. 

93 """ 

94 

95 def __init__( 

96 self, 

97 message: "types.PubsubMessage._meta._pb", # type: ignore 

98 ack_id: str, 

99 delivery_attempt: int, 

100 request_queue: "queue.Queue", 

101 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False, 

102 ): 

103 """Construct the Message. 

104 

105 .. note:: 

106 

107 This class should not be constructed directly; it is the 

108 responsibility of :class:`BasePolicy` subclasses to do so. 

109 

110 Args: 

111 message (types.PubsubMessage._meta._pb): 

112 The message received from Pub/Sub. For performance reasons it should be 

113 the raw protobuf message normally wrapped by 

114 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained 

115 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the 

116 latter's ``._pb`` attribute. 

117 ack_id (str): 

118 The ack_id received from Pub/Sub. 

119 delivery_attempt (int): 

120 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy 

121 is set on the subscription, and zero otherwise. 

122 request_queue (queue.Queue): 

123 A queue provided by the policy that can accept requests; the policy is 

124 responsible for handling those requests. 

125 exactly_once_delivery_enabled_func (Callable[[], bool]): 

126 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False. 

127 """ 

128 self._message = message 

129 self._ack_id = ack_id 

130 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None 

131 self._request_queue = request_queue 

132 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func 

133 self.message_id = message.message_id 

134 

135 # The instantiation time is the time that this message 

136 # was received. Tracking this provides us a way to be smart about 

137 # the default lease deadline. 

138 self._received_timestamp = time.time() 

139 

140 # Store the message attributes directly to speed up attribute access, i.e. 

141 # to avoid two lookups if self._message.<attribute> pattern was used in 

142 # properties. 

143 self._attributes = message.attributes 

144 self._data = message.data 

145 self._publish_time = dt.datetime.fromtimestamp( 

146 message.publish_time.seconds + message.publish_time.nanos / 1e9, 

147 tz=dt.timezone.utc, 

148 ) 

149 self._ordering_key = message.ordering_key 

150 self._size = message.ByteSize() 

151 

152 # None if Open Telemetry is disabled. Else contains OpenTelemetry data. 

153 self._opentelemetry_data: Optional[SubscribeOpenTelemetry] = None 

154 

155 def __repr__(self): 

156 # Get an abbreviated version of the data. 

157 abbv_data = self._message.data 

158 if len(abbv_data) > 50: 

159 abbv_data = abbv_data[:50] + b"..." 

160 

161 pretty_attrs = json.dumps( 

162 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True 

163 ) 

164 pretty_attrs = _indent(pretty_attrs) 

165 # We don't actually want the first line indented. 

166 pretty_attrs = pretty_attrs.lstrip() 

167 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs) 

168 

169 @property 

170 def opentelemetry_data(self): 

171 return self._opentelemetry_data # pragma: NO COVER 

172 

173 @opentelemetry_data.setter 

174 def opentelemetry_data(self, data): 

175 self._opentelemetry_data = data # pragma: NO COVER 

176 

177 @property 

178 def attributes(self) -> "containers.ScalarMap": 

179 """Return the attributes of the underlying Pub/Sub Message. 

180 

181 .. warning:: 

182 

183 A ``ScalarMap`` behaves slightly differently than a 

184 ``dict``. For a Pub / Sub message this is a ``string->string`` map. 

185 When trying to access a value via ``map['key']``, if the key is 

186 not in the map, then the default value for the string type will 

187 be returned, which is an empty string. It may be more intuitive 

188 to just cast the map to a ``dict`` or to one use ``map.get``. 

189 

190 Returns: 

191 containers.ScalarMap: The message's attributes. This is a 

192 ``dict``-like object provided by ``google.protobuf``. 

193 """ 

194 return self._attributes 

195 

196 @property 

197 def data(self) -> bytes: 

198 """Return the data for the underlying Pub/Sub Message. 

199 

200 Returns: 

201 bytes: The message data. This is always a bytestring; if you want 

202 a text string, call :meth:`bytes.decode`. 

203 """ 

204 return self._data 

205 

206 @property 

207 def publish_time(self) -> "datetime.datetime": 

208 """Return the time that the message was originally published. 

209 

210 Returns: 

211 datetime.datetime: The date and time that the message was 

212 published. 

213 """ 

214 return self._publish_time 

215 

216 @property 

217 def ordering_key(self) -> str: 

218 """The ordering key used to publish the message.""" 

219 return self._ordering_key 

220 

221 @property 

222 def size(self) -> int: 

223 """Return the size of the underlying message, in bytes.""" 

224 return self._size 

225 

226 @property 

227 def ack_id(self) -> str: 

228 """the ID used to ack the message.""" 

229 return self._ack_id 

230 

231 @property 

232 def delivery_attempt(self) -> Optional[int]: 

233 """The delivery attempt counter is 1 + (the sum of number of NACKs 

234 and number of ack_deadline exceeds) for this message. It is set to None 

235 if a DeadLetterPolicy is not set on the subscription. 

236 

237 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline 

238 exceeds event is whenever a message is not acknowledged within 

239 ack_deadline. Note that ack_deadline is initially 

240 Subscription.ackDeadlineSeconds, but may get extended automatically by 

241 the client library. 

242 

243 The first delivery of a given message will have this value as 1. The value 

244 is calculated at best effort and is approximate. 

245 

246 Returns: 

247 Optional[int]: The delivery attempt counter or ``None``. 

248 """ 

249 return self._delivery_attempt 

250 

251 def ack(self) -> None: 

252 """Acknowledge the given message. 

253 

254 Acknowledging a message in Pub/Sub means that you are done 

255 with it, and it will not be delivered to this subscription again. 

256 You should avoid acknowledging messages until you have 

257 *finished* processing them, so that in the event of a failure, 

258 you receive the message again. 

259 

260 .. warning:: 

261 Acks in Pub/Sub are best effort. You should always 

262 ensure that your processing code is idempotent, as you may 

263 receive any given message more than once. If you need strong 

264 guarantees about acks and re-deliveres, enable exactly-once 

265 delivery on your subscription and use the `ack_with_response` 

266 method instead. Exactly once delivery is a preview feature. 

267 For more details, see: 

268 https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

269 

270 """ 

271 if self.opentelemetry_data: 

272 self.opentelemetry_data.add_process_span_event("ack called") 

273 self.opentelemetry_data.end_process_span() 

274 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

275 self._request_queue.put( 

276 requests.AckRequest( 

277 ack_id=self._ack_id, 

278 byte_size=self.size, 

279 time_to_ack=time_to_ack, 

280 ordering_key=self.ordering_key, 

281 future=None, 

282 opentelemetry_data=self.opentelemetry_data, 

283 ) 

284 ) 

285 

286 def ack_with_response(self) -> "futures.Future": 

287 """Acknowledge the given message. 

288 

289 Acknowledging a message in Pub/Sub means that you are done 

290 with it, and it will not be delivered to this subscription again. 

291 You should avoid acknowledging messages until you have 

292 *finished* processing them, so that in the event of a failure, 

293 you receive the message again. 

294 

295 If exactly-once delivery is NOT enabled on the subscription, the 

296 future returns immediately with an AcknowledgeStatus.SUCCESS. 

297 Since acks in Cloud Pub/Sub are best effort when exactly-once 

298 delivery is disabled, the message may be re-delivered. Because 

299 re-deliveries are possible, you should ensure that your processing 

300 code is idempotent, as you may receive any given message more than 

301 once. 

302 

303 If exactly-once delivery is enabled on the subscription, the 

304 future returned by this method tracks the state of acknowledgement 

305 operation. If the future completes successfully, the message is 

306 guaranteed NOT to be re-delivered. Otherwise, the future will 

307 contain an exception with more details about the failure and the 

308 message may be re-delivered. 

309 

310 Exactly once delivery is a preview feature. For more details, 

311 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

312 

313 Returns: 

314 futures.Future: A 

315 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

316 instance that conforms to Python Standard library's 

317 :class:`~concurrent.futures.Future` interface (but not an 

318 instance of that class). Call `result()` to get the result 

319 of the operation; upon success, a 

320 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

321 will be returned and upon an error, an 

322 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

323 will be thrown. 

324 """ 

325 if self.opentelemetry_data: 

326 self.opentelemetry_data.add_process_span_event("ack called") 

327 self.opentelemetry_data.end_process_span() 

328 req_future: Optional[futures.Future] 

329 if self._exactly_once_delivery_enabled_func(): 

330 future = futures.Future() 

331 req_future = future 

332 else: 

333 future = _SUCCESS_FUTURE 

334 req_future = None 

335 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

336 self._request_queue.put( 

337 requests.AckRequest( 

338 ack_id=self._ack_id, 

339 byte_size=self.size, 

340 time_to_ack=time_to_ack, 

341 ordering_key=self.ordering_key, 

342 future=req_future, 

343 opentelemetry_data=self.opentelemetry_data, 

344 ) 

345 ) 

346 return future 

347 

348 def drop(self) -> None: 

349 """Release the message from lease management. 

350 

351 This informs the policy to no longer hold on to the lease for this 

352 message. Pub/Sub will re-deliver the message if it is not acknowledged 

353 before the existing lease expires. 

354 

355 .. warning:: 

356 For most use cases, the only reason to drop a message from 

357 lease management is on `ack` or `nack`; this library 

358 automatically drop()s the message on `ack` or `nack`. You probably 

359 do not want to call this method directly. 

360 """ 

361 self._request_queue.put( 

362 requests.DropRequest( 

363 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key 

364 ) 

365 ) 

366 

367 def modify_ack_deadline(self, seconds: int) -> None: 

368 """Resets the deadline for acknowledgement. 

369 

370 New deadline will be the given value of seconds from now. 

371 

372 The default implementation handles automatically modacking received messages for you; 

373 you should not need to manually deal with setting ack deadlines. The exception case is 

374 if you are implementing your own custom subclass of 

375 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

376 

377 Args: 

378 seconds (int): 

379 The number of seconds to set the lease deadline to. This should be 

380 between 0 and 600. Due to network latency, values below 10 are advised 

381 against. 

382 """ 

383 self._request_queue.put( 

384 requests.ModAckRequest( 

385 ack_id=self._ack_id, 

386 seconds=seconds, 

387 future=None, 

388 opentelemetry_data=self.opentelemetry_data, 

389 ) 

390 ) 

391 

392 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": 

393 """Resets the deadline for acknowledgement and returns the response 

394 status via a future. 

395 

396 New deadline will be the given value of seconds from now. 

397 

398 The default implementation handles automatically modacking received messages for you; 

399 you should not need to manually deal with setting ack deadlines. The exception case is 

400 if you are implementing your own custom subclass of 

401 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

402 

403 If exactly-once delivery is NOT enabled on the subscription, the 

404 future returns immediately with an AcknowledgeStatus.SUCCESS. 

405 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort 

406 when exactly-once delivery is disabled, the message may be re-delivered 

407 within the set deadline. 

408 

409 If exactly-once delivery is enabled on the subscription, the 

410 future returned by this method tracks the state of the 

411 modify-ack-deadline operation. If the future completes successfully, 

412 the message is guaranteed NOT to be re-delivered within the new deadline. 

413 Otherwise, the future will contain an exception with more details about 

414 the failure and the message will be redelivered according to its 

415 currently-set ack deadline. 

416 

417 Exactly once delivery is a preview feature. For more details, 

418 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

419 

420 Args: 

421 seconds (int): 

422 The number of seconds to set the lease deadline to. This should be 

423 between 0 and 600. Due to network latency, values below 10 are advised 

424 against. 

425 Returns: 

426 futures.Future: A 

427 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

428 instance that conforms to Python Standard library's 

429 :class:`~concurrent.futures.Future` interface (but not an 

430 instance of that class). Call `result()` to get the result 

431 of the operation; upon success, a 

432 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

433 will be returned and upon an error, an 

434 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

435 will be thrown. 

436 

437 """ 

438 req_future: Optional[futures.Future] 

439 if self._exactly_once_delivery_enabled_func(): 

440 future = futures.Future() 

441 req_future = future 

442 else: 

443 future = _SUCCESS_FUTURE 

444 req_future = None 

445 

446 self._request_queue.put( 

447 requests.ModAckRequest( 

448 ack_id=self._ack_id, 

449 seconds=seconds, 

450 future=req_future, 

451 opentelemetry_data=self.opentelemetry_data, 

452 ) 

453 ) 

454 

455 return future 

456 

457 def nack(self) -> None: 

458 """Decline to acknowledge the given message. 

459 

460 This will cause the message to be re-delivered to subscribers. Re-deliveries 

461 may take place immediately or after a delay, and may arrive at this subscriber 

462 or another. 

463 """ 

464 if self.opentelemetry_data: 

465 self.opentelemetry_data.add_process_span_event("nack called") 

466 self.opentelemetry_data.end_process_span() 

467 self._request_queue.put( 

468 requests.NackRequest( 

469 ack_id=self._ack_id, 

470 byte_size=self.size, 

471 ordering_key=self.ordering_key, 

472 future=None, 

473 opentelemetry_data=self.opentelemetry_data, 

474 ) 

475 ) 

476 

477 def nack_with_response(self) -> "futures.Future": 

478 """Decline to acknowledge the given message, returning the response status via 

479 a future. 

480 

481 This will cause the message to be re-delivered to subscribers. Re-deliveries 

482 may take place immediately or after a delay, and may arrive at this subscriber 

483 or another. 

484 

485 If exactly-once delivery is NOT enabled on the subscription, the 

486 future returns immediately with an AcknowledgeStatus.SUCCESS. 

487 

488 If exactly-once delivery is enabled on the subscription, the 

489 future returned by this method tracks the state of the 

490 nack operation. If the future completes successfully, 

491 the future's result will be an AcknowledgeStatus.SUCCESS. 

492 Otherwise, the future will contain an exception with more details about 

493 the failure. 

494 

495 Exactly once delivery is a preview feature. For more details, 

496 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

497 

498 Returns: 

499 futures.Future: A 

500 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

501 instance that conforms to Python Standard library's 

502 :class:`~concurrent.futures.Future` interface (but not an 

503 instance of that class). Call `result()` to get the result 

504 of the operation; upon success, a 

505 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

506 will be returned and upon an error, an 

507 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

508 will be thrown. 

509 

510 """ 

511 if self.opentelemetry_data: 

512 self.opentelemetry_data.add_process_span_event("nack called") 

513 self.opentelemetry_data.end_process_span() 

514 req_future: Optional[futures.Future] 

515 if self._exactly_once_delivery_enabled_func(): 

516 future = futures.Future() 

517 req_future = future 

518 else: 

519 future = _SUCCESS_FUTURE 

520 req_future = None 

521 

522 self._request_queue.put( 

523 requests.NackRequest( 

524 ack_id=self._ack_id, 

525 byte_size=self.size, 

526 ordering_key=self.ordering_key, 

527 future=req_future, 

528 opentelemetry_data=self.opentelemetry_data, 

529 ) 

530 ) 

531 

532 return future