Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/message.py: 85%

95 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import datetime as dt 

18import json 

19import math 

20import time 

21import typing 

22from typing import Optional, Callable 

23 

24from google.cloud.pubsub_v1.subscriber._protocol import requests 

25from google.cloud.pubsub_v1.subscriber import futures 

26from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus 

27 

28 

29if typing.TYPE_CHECKING: # pragma: NO COVER 

30 import datetime 

31 import queue 

32 from google.cloud.pubsub_v1 import types 

33 from google.protobuf.internal import containers 

34 

35 

36_MESSAGE_REPR = """\ 

37Message {{ 

38 data: {!r} 

39 ordering_key: {!r} 

40 attributes: {} 

41}}""" 

42 

43_SUCCESS_FUTURE = futures.Future() 

44_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) 

45 

46 

47def _indent(lines: str, prefix: str = " ") -> str: 

48 """Indent some text. 

49 

50 Note that this is present as ``textwrap.indent``, but not in Python 2. 

51 

52 Args: 

53 lines: 

54 The newline delimited string to be indented. 

55 prefix: 

56 The prefix to indent each line with. Defaults to two spaces. 

57 

58 Returns: 

59 The newly indented content. 

60 """ 

61 indented = [] 

62 for line in lines.split("\n"): 

63 indented.append(prefix + line) 

64 return "\n".join(indented) 

65 

66 

67class Message(object): 

68 """A representation of a single Pub/Sub message. 

69 

70 The common way to interact with 

71 :class:`~.pubsub_v1.subscriber.message.Message` objects is to receive 

72 them in callbacks on subscriptions; most users should never have a need 

73 to instantiate them by hand. (The exception to this is if you are 

74 implementing a custom subclass to 

75 :class:`~.pubsub_v1.subscriber._consumer.Consumer`.) 

76 

77 Attributes: 

78 message_id (str): 

79 The message ID. In general, you should not need to use this directly. 

80 data (bytes): 

81 The data in the message. Note that this will be a :class:`bytes`, 

82 not a text string. 

83 attributes (MutableMapping[str, str]): 

84 The attributes sent along with the message. See :attr:`attributes` for more 

85 information on this type. 

86 publish_time (google.protobuf.timestamp_pb2.Timestamp): 

87 The time that this message was originally published. 

88 """ 

89 

90 def __init__( 

91 self, 

92 message: "types.PubsubMessage._meta._pb", # type: ignore 

93 ack_id: str, 

94 delivery_attempt: int, 

95 request_queue: "queue.Queue", 

96 exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False, 

97 ): 

98 """Construct the Message. 

99 

100 .. note:: 

101 

102 This class should not be constructed directly; it is the 

103 responsibility of :class:`BasePolicy` subclasses to do so. 

104 

105 Args: 

106 message (types.PubsubMessage._meta._pb): 

107 The message received from Pub/Sub. For performance reasons it should be 

108 the raw protobuf message normally wrapped by 

109 :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained 

110 from a :class:`~pubsub_v1.types.PubsubMessage` instance through the 

111 latter's ``._pb`` attribute. 

112 ack_id (str): 

113 The ack_id received from Pub/Sub. 

114 delivery_attempt (int): 

115 The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy 

116 is set on the subscription, and zero otherwise. 

117 request_queue (queue.Queue): 

118 A queue provided by the policy that can accept requests; the policy is 

119 responsible for handling those requests. 

120 exactly_once_delivery_enabled_func (Callable[[], bool]): 

121 A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False. 

122 """ 

123 self._message = message 

124 self._ack_id = ack_id 

125 self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None 

126 self._request_queue = request_queue 

127 self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func 

128 self.message_id = message.message_id 

129 

130 # The instantiation time is the time that this message 

131 # was received. Tracking this provides us a way to be smart about 

132 # the default lease deadline. 

133 self._received_timestamp = time.time() 

134 

135 # Store the message attributes directly to speed up attribute access, i.e. 

136 # to avoid two lookups if self._message.<attribute> pattern was used in 

137 # properties. 

138 self._attributes = message.attributes 

139 self._data = message.data 

140 self._publish_time = dt.datetime.fromtimestamp( 

141 message.publish_time.seconds + message.publish_time.nanos / 1e9, 

142 tz=dt.timezone.utc, 

143 ) 

144 self._ordering_key = message.ordering_key 

145 self._size = message.ByteSize() 

146 

147 def __repr__(self): 

148 # Get an abbreviated version of the data. 

149 abbv_data = self._message.data 

150 if len(abbv_data) > 50: 

151 abbv_data = abbv_data[:50] + b"..." 

152 

153 pretty_attrs = json.dumps( 

154 dict(self.attributes), indent=2, separators=(",", ": "), sort_keys=True 

155 ) 

156 pretty_attrs = _indent(pretty_attrs) 

157 # We don't actually want the first line indented. 

158 pretty_attrs = pretty_attrs.lstrip() 

159 return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs) 

160 

161 @property 

162 def attributes(self) -> "containers.ScalarMap": 

163 """Return the attributes of the underlying Pub/Sub Message. 

164 

165 .. warning:: 

166 

167 A ``ScalarMap`` behaves slightly differently than a 

168 ``dict``. For a Pub / Sub message this is a ``string->string`` map. 

169 When trying to access a value via ``map['key']``, if the key is 

170 not in the map, then the default value for the string type will 

171 be returned, which is an empty string. It may be more intuitive 

172 to just cast the map to a ``dict`` or to one use ``map.get``. 

173 

174 Returns: 

175 containers.ScalarMap: The message's attributes. This is a 

176 ``dict``-like object provided by ``google.protobuf``. 

177 """ 

178 return self._attributes 

179 

180 @property 

181 def data(self) -> bytes: 

182 """Return the data for the underlying Pub/Sub Message. 

183 

184 Returns: 

185 bytes: The message data. This is always a bytestring; if you want 

186 a text string, call :meth:`bytes.decode`. 

187 """ 

188 return self._data 

189 

190 @property 

191 def publish_time(self) -> "datetime.datetime": 

192 """Return the time that the message was originally published. 

193 

194 Returns: 

195 datetime.datetime: The date and time that the message was 

196 published. 

197 """ 

198 return self._publish_time 

199 

200 @property 

201 def ordering_key(self) -> str: 

202 """The ordering key used to publish the message.""" 

203 return self._ordering_key 

204 

205 @property 

206 def size(self) -> int: 

207 """Return the size of the underlying message, in bytes.""" 

208 return self._size 

209 

210 @property 

211 def ack_id(self) -> str: 

212 """the ID used to ack the message.""" 

213 return self._ack_id 

214 

215 @property 

216 def delivery_attempt(self) -> Optional[int]: 

217 """The delivery attempt counter is 1 + (the sum of number of NACKs 

218 and number of ack_deadline exceeds) for this message. It is set to None 

219 if a DeadLetterPolicy is not set on the subscription. 

220 

221 A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline 

222 exceeds event is whenever a message is not acknowledged within 

223 ack_deadline. Note that ack_deadline is initially 

224 Subscription.ackDeadlineSeconds, but may get extended automatically by 

225 the client library. 

226 

227 The first delivery of a given message will have this value as 1. The value 

228 is calculated at best effort and is approximate. 

229 

230 Returns: 

231 Optional[int]: The delivery attempt counter or ``None``. 

232 """ 

233 return self._delivery_attempt 

234 

235 def ack(self) -> None: 

236 """Acknowledge the given message. 

237 

238 Acknowledging a message in Pub/Sub means that you are done 

239 with it, and it will not be delivered to this subscription again. 

240 You should avoid acknowledging messages until you have 

241 *finished* processing them, so that in the event of a failure, 

242 you receive the message again. 

243 

244 .. warning:: 

245 Acks in Pub/Sub are best effort. You should always 

246 ensure that your processing code is idempotent, as you may 

247 receive any given message more than once. If you need strong 

248 guarantees about acks and re-deliveres, enable exactly-once 

249 delivery on your subscription and use the `ack_with_response` 

250 method instead. Exactly once delivery is a preview feature. 

251 For more details, see: 

252 https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

253 

254 """ 

255 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

256 self._request_queue.put( 

257 requests.AckRequest( 

258 ack_id=self._ack_id, 

259 byte_size=self.size, 

260 time_to_ack=time_to_ack, 

261 ordering_key=self.ordering_key, 

262 future=None, 

263 ) 

264 ) 

265 

266 def ack_with_response(self) -> "futures.Future": 

267 """Acknowledge the given message. 

268 

269 Acknowledging a message in Pub/Sub means that you are done 

270 with it, and it will not be delivered to this subscription again. 

271 You should avoid acknowledging messages until you have 

272 *finished* processing them, so that in the event of a failure, 

273 you receive the message again. 

274 

275 If exactly-once delivery is NOT enabled on the subscription, the 

276 future returns immediately with an AcknowledgeStatus.SUCCESS. 

277 Since acks in Cloud Pub/Sub are best effort when exactly-once 

278 delivery is disabled, the message may be re-delivered. Because 

279 re-deliveries are possible, you should ensure that your processing 

280 code is idempotent, as you may receive any given message more than 

281 once. 

282 

283 If exactly-once delivery is enabled on the subscription, the 

284 future returned by this method tracks the state of acknowledgement 

285 operation. If the future completes successfully, the message is 

286 guaranteed NOT to be re-delivered. Otherwise, the future will 

287 contain an exception with more details about the failure and the 

288 message may be re-delivered. 

289 

290 Exactly once delivery is a preview feature. For more details, 

291 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

292 

293 Returns: 

294 futures.Future: A 

295 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

296 instance that conforms to Python Standard library's 

297 :class:`~concurrent.futures.Future` interface (but not an 

298 instance of that class). Call `result()` to get the result 

299 of the operation; upon success, a 

300 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

301 will be returned and upon an error, an 

302 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

303 will be thrown. 

304 """ 

305 req_future: Optional[futures.Future] 

306 if self._exactly_once_delivery_enabled_func(): 

307 future = futures.Future() 

308 req_future = future 

309 else: 

310 future = _SUCCESS_FUTURE 

311 req_future = None 

312 time_to_ack = math.ceil(time.time() - self._received_timestamp) 

313 self._request_queue.put( 

314 requests.AckRequest( 

315 ack_id=self._ack_id, 

316 byte_size=self.size, 

317 time_to_ack=time_to_ack, 

318 ordering_key=self.ordering_key, 

319 future=req_future, 

320 ) 

321 ) 

322 return future 

323 

324 def drop(self) -> None: 

325 """Release the message from lease management. 

326 

327 This informs the policy to no longer hold on to the lease for this 

328 message. Pub/Sub will re-deliver the message if it is not acknowledged 

329 before the existing lease expires. 

330 

331 .. warning:: 

332 For most use cases, the only reason to drop a message from 

333 lease management is on `ack` or `nack`; this library 

334 automatically drop()s the message on `ack` or `nack`. You probably 

335 do not want to call this method directly. 

336 """ 

337 self._request_queue.put( 

338 requests.DropRequest( 

339 ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key 

340 ) 

341 ) 

342 

343 def modify_ack_deadline(self, seconds: int) -> None: 

344 """Resets the deadline for acknowledgement. 

345 

346 New deadline will be the given value of seconds from now. 

347 

348 The default implementation handles automatically modacking received messages for you; 

349 you should not need to manually deal with setting ack deadlines. The exception case is 

350 if you are implementing your own custom subclass of 

351 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

352 

353 Args: 

354 seconds (int): 

355 The number of seconds to set the lease deadline to. This should be 

356 between 0 and 600. Due to network latency, values below 10 are advised 

357 against. 

358 """ 

359 self._request_queue.put( 

360 requests.ModAckRequest(ack_id=self._ack_id, seconds=seconds, future=None) 

361 ) 

362 

363 def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": 

364 """Resets the deadline for acknowledgement and returns the response 

365 status via a future. 

366 

367 New deadline will be the given value of seconds from now. 

368 

369 The default implementation handles automatically modacking received messages for you; 

370 you should not need to manually deal with setting ack deadlines. The exception case is 

371 if you are implementing your own custom subclass of 

372 :class:`~.pubsub_v1.subcriber._consumer.Consumer`. 

373 

374 If exactly-once delivery is NOT enabled on the subscription, the 

375 future returns immediately with an AcknowledgeStatus.SUCCESS. 

376 Since modify-ack-deadline operations in Cloud Pub/Sub are best effort 

377 when exactly-once delivery is disabled, the message may be re-delivered 

378 within the set deadline. 

379 

380 If exactly-once delivery is enabled on the subscription, the 

381 future returned by this method tracks the state of the 

382 modify-ack-deadline operation. If the future completes successfully, 

383 the message is guaranteed NOT to be re-delivered within the new deadline. 

384 Otherwise, the future will contain an exception with more details about 

385 the failure and the message will be redelivered according to its 

386 currently-set ack deadline. 

387 

388 Exactly once delivery is a preview feature. For more details, 

389 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

390 

391 Args: 

392 seconds (int): 

393 The number of seconds to set the lease deadline to. This should be 

394 between 0 and 600. Due to network latency, values below 10 are advised 

395 against. 

396 Returns: 

397 futures.Future: A 

398 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

399 instance that conforms to Python Standard library's 

400 :class:`~concurrent.futures.Future` interface (but not an 

401 instance of that class). Call `result()` to get the result 

402 of the operation; upon success, a 

403 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

404 will be returned and upon an error, an 

405 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

406 will be thrown. 

407 

408 """ 

409 req_future: Optional[futures.Future] 

410 if self._exactly_once_delivery_enabled_func(): 

411 future = futures.Future() 

412 req_future = future 

413 else: 

414 future = _SUCCESS_FUTURE 

415 req_future = None 

416 

417 self._request_queue.put( 

418 requests.ModAckRequest( 

419 ack_id=self._ack_id, seconds=seconds, future=req_future 

420 ) 

421 ) 

422 

423 return future 

424 

425 def nack(self) -> None: 

426 """Decline to acknowledge the given message. 

427 

428 This will cause the message to be re-delivered to subscribers. Re-deliveries 

429 may take place immediately or after a delay, and may arrive at this subscriber 

430 or another. 

431 """ 

432 self._request_queue.put( 

433 requests.NackRequest( 

434 ack_id=self._ack_id, 

435 byte_size=self.size, 

436 ordering_key=self.ordering_key, 

437 future=None, 

438 ) 

439 ) 

440 

441 def nack_with_response(self) -> "futures.Future": 

442 """Decline to acknowledge the given message, returning the response status via 

443 a future. 

444 

445 This will cause the message to be re-delivered to subscribers. Re-deliveries 

446 may take place immediately or after a delay, and may arrive at this subscriber 

447 or another. 

448 

449 If exactly-once delivery is NOT enabled on the subscription, the 

450 future returns immediately with an AcknowledgeStatus.SUCCESS. 

451 

452 If exactly-once delivery is enabled on the subscription, the 

453 future returned by this method tracks the state of the 

454 nack operation. If the future completes successfully, 

455 the future's result will be an AcknowledgeStatus.SUCCESS. 

456 Otherwise, the future will contain an exception with more details about 

457 the failure. 

458 

459 Exactly once delivery is a preview feature. For more details, 

460 see https://cloud.google.com/pubsub/docs/exactly-once-delivery." 

461 

462 Returns: 

463 futures.Future: A 

464 :class:`~google.cloud.pubsub_v1.subscriber.futures.Future` 

465 instance that conforms to Python Standard library's 

466 :class:`~concurrent.futures.Future` interface (but not an 

467 instance of that class). Call `result()` to get the result 

468 of the operation; upon success, a 

469 pubsub_v1.subscriber.exceptions.AcknowledgeStatus.SUCCESS 

470 will be returned and upon an error, an 

471 pubsub_v1.subscriber.exceptions.AcknowledgeError exception 

472 will be thrown. 

473 

474 """ 

475 req_future: Optional[futures.Future] 

476 if self._exactly_once_delivery_enabled_func(): 

477 future = futures.Future() 

478 req_future = future 

479 else: 

480 future = _SUCCESS_FUTURE 

481 req_future = None 

482 

483 self._request_queue.put( 

484 requests.NackRequest( 

485 ack_id=self._ack_id, 

486 byte_size=self.size, 

487 ordering_key=self.ordering_key, 

488 future=req_future, 

489 ) 

490 ) 

491 

492 return future