Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

167 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import logging 

18import threading 

19import time 

20import typing 

21from typing import Any, Callable, List, Optional, Sequence 

22from datetime import datetime 

23 

24from opentelemetry import trace 

25import google.api_core.exceptions 

26from google.api_core import gapic_v1 

27from google.auth import exceptions as auth_exceptions 

28 

29from google.cloud.pubsub_v1.publisher import exceptions 

30from google.cloud.pubsub_v1.publisher import futures 

31from google.cloud.pubsub_v1.publisher._batch import base 

32from google.pubsub_v1 import types as gapic_types 

33from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( 

34 PublishMessageWrapper, 

35) 

36 

37if typing.TYPE_CHECKING: # pragma: NO COVER 

38 from google.cloud import pubsub_v1 

39 from google.cloud.pubsub_v1 import types 

40 from google.cloud.pubsub_v1.publisher import Client as PublisherClient 

41 from google.pubsub_v1.services.publisher.client import OptionalRetry 

42 

43_LOGGER = logging.getLogger(__name__) 

44_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) 

45_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest 

46 

47_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() 

48 

49 

50class Batch(base.Batch): 

51 """A batch of messages. 

52 

53 The batch is the internal group of messages which are either awaiting 

54 publication or currently in progress. 

55 

56 A batch is automatically created by the PublisherClient when the first 

57 message to be published is received; subsequent messages are added to 

58 that batch until the process of actual publishing _starts_. 

59 

60 Once this occurs, any new messages sent to :meth:`publish` open a new 

61 batch. 

62 

63 If you are using this library, you most likely do not need to instantiate 

64 batch objects directly; they will be created for you. If you want to 

65 change the actual batching settings, see the ``batching`` argument on 

66 :class:`~.pubsub_v1.PublisherClient`. 

67 

68 Any properties or methods on this class which are not defined in 

69 :class:`~.pubsub_v1.publisher.batch.BaseBatch` should be considered 

70 implementation details. 

71 

72 Args: 

73 client: 

74 The publisher client used to create this batch. 

75 topic: 

76 The topic. The format for this is ``projects/{project}/topics/{topic}``. 

77 settings: 

78 The settings for batch publishing. These should be considered immutable 

79 once the batch has been opened. 

80 batch_done_callback: 

81 Callback called when the response for a batch publish has been received. 

82 Called with one boolean argument: successfully published or a permanent 

83 error occurred. Temporary errors are not surfaced because they are retried 

84 at a lower level. 

85 commit_when_full: 

86 Whether to commit the batch when the batch is full. 

87 commit_retry: 

88 Designation of what errors, if any, should be retried when commiting 

89 the batch. If not provided, a default retry is used. 

90 commit_timeout: 

91 The timeout to apply when commiting the batch. If not provided, a default 

92 timeout is used. 

93 """ 

94 

95 _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" 

96 _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" 

97 

98 def __init__( 

99 self, 

100 client: "PublisherClient", 

101 topic: str, 

102 settings: "types.BatchSettings", 

103 batch_done_callback: Optional[Callable[[bool], Any]] = None, 

104 commit_when_full: bool = True, 

105 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

106 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

107 ): 

108 self._client = client 

109 self._topic = topic 

110 self._settings = settings 

111 self._batch_done_callback = batch_done_callback 

112 self._commit_when_full = commit_when_full 

113 

114 self._state_lock = threading.Lock() 

115 # These members are all communicated between threads; ensure that 

116 # any writes to them use the "state lock" to remain atomic. 

117 # _futures list should remain unchanged after batch 

118 # status changed from ACCEPTING_MESSAGES to any other 

119 # in order to avoid race conditions 

120 self._futures: List[futures.Future] = [] 

121 self._message_wrappers: List[PublishMessageWrapper] = [] 

122 self._status = base.BatchStatus.ACCEPTING_MESSAGES 

123 

124 # The initial size is not zero, we need to account for the size overhead 

125 # of the PublishRequest message itself. 

126 self._base_request_size = gapic_types.PublishRequest(topic=topic)._pb.ByteSize() 

127 self._size = self._base_request_size 

128 

129 self._commit_retry = commit_retry 

130 self._commit_timeout = commit_timeout 

131 

132 # Publish RPC Span that will be set by method `_start_publish_rpc_span` 

133 # if Open Telemetry is enabled. 

134 self._rpc_span: Optional[trace.Span] = None 

135 

136 @staticmethod 

137 def make_lock() -> threading.Lock: 

138 """Return a threading lock. 

139 

140 Returns: 

141 A newly created lock. 

142 """ 

143 return threading.Lock() 

144 

145 @property 

146 def client(self) -> "PublisherClient": 

147 """A publisher client.""" 

148 return self._client 

149 

150 @property 

151 def message_wrappers(self) -> Sequence[PublishMessageWrapper]: 

152 """The message wrappers currently in the batch.""" 

153 return self._message_wrappers 

154 

155 @property 

156 def settings(self) -> "types.BatchSettings": 

157 """Return the batch settings. 

158 

159 Returns: 

160 The batch settings. These are considered immutable once the batch has 

161 been opened. 

162 """ 

163 return self._settings 

164 

165 @property 

166 def size(self) -> int: 

167 """Return the total size of all of the messages currently in the batch. 

168 

169 The size includes any overhead of the actual ``PublishRequest`` that is 

170 sent to the backend. 

171 

172 Returns: 

173 The total size of all of the messages currently in the batch (including 

174 the request overhead), in bytes. 

175 """ 

176 return self._size 

177 

178 @property 

179 def status(self) -> base.BatchStatus: 

180 """Return the status of this batch. 

181 

182 Returns: 

183 The status of this batch. All statuses are human-readable, all-lowercase 

184 strings. 

185 """ 

186 return self._status 

187 

188 def cancel(self, cancellation_reason: base.BatchCancellationReason) -> None: 

189 """Complete pending futures with an exception. 

190 

191 This method must be called before publishing starts (ie: while the 

192 batch is still accepting messages.) 

193 

194 Args: 

195 The reason why this batch has been cancelled. 

196 """ 

197 

198 with self._state_lock: 

199 assert ( 

200 self._status == base.BatchStatus.ACCEPTING_MESSAGES 

201 ), "Cancel should not be called after sending has started." 

202 

203 exc = RuntimeError(cancellation_reason.value) 

204 for future in self._futures: 

205 future.set_exception(exc) 

206 self._status = base.BatchStatus.ERROR 

207 

208 def commit(self) -> None: 

209 """Actually publish all of the messages on the active batch. 

210 

211 .. note:: 

212 

213 This method is non-blocking. It opens a new thread, which calls 

214 :meth:`_commit`, which does block. 

215 

216 This synchronously sets the batch status to "starting", and then opens 

217 a new thread, which handles actually sending the messages to Pub/Sub. 

218 

219 If the current batch is **not** accepting messages, this method 

220 does nothing. 

221 """ 

222 

223 # Set the status to "starting" synchronously, to ensure that 

224 # this batch will necessarily not accept new messages. 

225 with self._state_lock: 

226 if self._status == base.BatchStatus.ACCEPTING_MESSAGES: 

227 self._status = base.BatchStatus.STARTING 

228 else: 

229 return 

230 

231 self._start_commit_thread() 

232 

233 def _start_commit_thread(self) -> None: 

234 """Start a new thread to actually handle the commit.""" 

235 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. 

236 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

237 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

238 commit_thread = threading.Thread( 

239 name="Thread-CommitBatchPublisher", target=self._commit, daemon=True 

240 ) 

241 commit_thread.start() 

242 

243 def _start_publish_rpc_span(self) -> None: 

244 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) 

245 links = [] 

246 

247 for wrapper in self._message_wrappers: 

248 span = wrapper.create_span 

249 # Add links only for sampled spans. 

250 if span.get_span_context().trace_flags.sampled: 

251 links.append(trace.Link(span.get_span_context())) 

252 assert len(self._topic.split("/")) == 4 

253 topic_short_name = self._topic.split("/")[3] 

254 with tracer.start_as_current_span( 

255 name=f"{topic_short_name} publish", 

256 attributes={ 

257 "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, 

258 "messaging.destination.name": topic_short_name, 

259 "gcp.project_id": self._topic.split("/")[1], 

260 "messaging.batch.message_count": len(self._message_wrappers), 

261 "messaging.operation": "publish", 

262 "code.function": "_commit", 

263 }, 

264 links=links, 

265 kind=trace.SpanKind.CLIENT, 

266 end_on_exit=False, 

267 ) as rpc_span: 

268 ctx = rpc_span.get_span_context() 

269 for wrapper in self._message_wrappers: 

270 span = wrapper.create_span 

271 if span.get_span_context().trace_flags.sampled: 

272 span.add_link(ctx) 

273 self._rpc_span = rpc_span 

274 

275 def _commit(self) -> None: 

276 """Actually publish all of the messages on the active batch. 

277 

278 This moves the batch out from being the active batch to an in progress 

279 batch on the publisher, and then the batch is discarded upon 

280 completion. 

281 

282 .. note:: 

283 

284 This method blocks. The :meth:`commit` method is the non-blocking 

285 version, which calls this one. 

286 """ 

287 with self._state_lock: 

288 if self._status in _CAN_COMMIT: 

289 self._status = base.BatchStatus.IN_PROGRESS 

290 else: 

291 # If, in the intervening period between when this method was 

292 # called and now, the batch started to be committed, or 

293 # completed a commit, then no-op at this point. 

294 _LOGGER.debug( 

295 "Batch is already in progress or has been cancelled, " 

296 "exiting commit" 

297 ) 

298 return 

299 

300 # Once in the IN_PROGRESS state, no other thread can publish additional 

301 # messages or initiate a commit (those operations become a no-op), thus 

302 # it is safe to release the state lock here. Releasing the lock avoids 

303 # blocking other threads in case api.publish() below takes a long time 

304 # to complete. 

305 # https://github.com/googleapis/google-cloud-python/issues/8036 

306 

307 # Sanity check: If there are no messages, no-op. 

308 if not self._message_wrappers: 

309 _LOGGER.debug("No messages to publish, exiting commit") 

310 self._status = base.BatchStatus.SUCCESS 

311 return 

312 

313 # Begin the request to publish these messages. 

314 # Log how long the underlying request takes. 

315 start = time.time() 

316 

317 batch_transport_succeeded = True 

318 try: 

319 if self._client.open_telemetry_enabled: 

320 self._start_publish_rpc_span() 

321 

322 # Performs retries for errors defined by the retry configuration. 

323 response = self._client._gapic_publish( 

324 topic=self._topic, 

325 messages=[wrapper.message for wrapper in self._message_wrappers], 

326 retry=self._commit_retry, 

327 timeout=self._commit_timeout, 

328 ) 

329 

330 if self._client.open_telemetry_enabled: 

331 assert self._rpc_span is not None 

332 self._rpc_span.end() 

333 end_time = str(datetime.now()) 

334 for message_id, wrapper in zip( 

335 response.message_ids, self._message_wrappers 

336 ): 

337 span = wrapper.create_span 

338 span.add_event( 

339 name="publish end", 

340 attributes={ 

341 "timestamp": end_time, 

342 }, 

343 ) 

344 span.set_attribute(key="messaging.message.id", value=message_id) 

345 wrapper.end_create_span() 

346 except ( 

347 google.api_core.exceptions.GoogleAPIError, 

348 auth_exceptions.TransportError, 

349 ) as exc: 

350 # We failed to publish, even after retries, so set the exception on 

351 # all futures and exit. 

352 self._status = base.BatchStatus.ERROR 

353 

354 if self._client.open_telemetry_enabled: 

355 if self._rpc_span: 

356 self._rpc_span.record_exception( 

357 exception=exc, 

358 ) 

359 self._rpc_span.set_status( 

360 trace.Status(status_code=trace.StatusCode.ERROR) 

361 ) 

362 self._rpc_span.end() 

363 

364 for wrapper in self._message_wrappers: 

365 wrapper.end_create_span(exc=exc) 

366 

367 batch_transport_succeeded = False 

368 if self._batch_done_callback is not None: 

369 # Failed to publish batch. 

370 self._batch_done_callback(batch_transport_succeeded) 

371 

372 for future in self._futures: 

373 future.set_exception(exc) 

374 

375 return 

376 

377 end = time.time() 

378 _LOGGER.debug("gRPC Publish took %s seconds.", end - start) 

379 

380 if len(response.message_ids) == len(self._futures): 

381 # Iterate over the futures on the queue and return the response 

382 # IDs. We are trusting that there is a 1:1 mapping, and raise 

383 # an exception if not. 

384 self._status = base.BatchStatus.SUCCESS 

385 for message_id, future in zip(response.message_ids, self._futures): 

386 future.set_result(message_id) 

387 else: 

388 # Sanity check: If the number of message IDs is not equal to 

389 # the number of futures I have, then something went wrong. 

390 self._status = base.BatchStatus.ERROR 

391 exception = exceptions.PublishError( 

392 "Some messages were not successfully published." 

393 ) 

394 

395 for future in self._futures: 

396 future.set_exception(exception) 

397 

398 # Unknown error -> batch failed to be correctly transported/ 

399 batch_transport_succeeded = False 

400 

401 _LOGGER.error( 

402 "Only %s of %s messages were published.", 

403 len(response.message_ids), 

404 len(self._futures), 

405 ) 

406 

407 if self._batch_done_callback is not None: 

408 self._batch_done_callback(batch_transport_succeeded) 

409 

410 def publish( 

411 self, 

412 wrapper: PublishMessageWrapper, 

413 ) -> Optional["pubsub_v1.publisher.futures.Future"]: 

414 """Publish a single message. 

415 

416 Add the given message to this object; this will cause it to be 

417 published once the batch either has enough messages or a sufficient 

418 period of time has elapsed. If the batch is full or the commit is 

419 already in progress, the method does not do anything. 

420 

421 This method is called by :meth:`~.PublisherClient.publish`. 

422 

423 Args: 

424 wrapper: The Pub/Sub message wrapper. 

425 

426 Returns: 

427 An object conforming to the :class:`~concurrent.futures.Future` interface 

428 or :data:`None`. If :data:`None` is returned, that signals that the batch 

429 cannot accept a message. 

430 

431 Raises: 

432 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

433 the ``message`` would exceed the max size limit on the backend. 

434 """ 

435 

436 # Coerce the type, just in case. 

437 if not isinstance( 

438 wrapper.message, gapic_types.PubsubMessage 

439 ): # pragma: NO COVER 

440 # For performance reasons, the message should be constructed by directly 

441 # using the raw protobuf class, and only then wrapping it into the 

442 # higher-level PubsubMessage class. 

443 vanilla_pb = _raw_proto_pubbsub_message(**wrapper.message) 

444 wrapper.message = gapic_types.PubsubMessage.wrap(vanilla_pb) 

445 

446 future = None 

447 

448 with self._state_lock: 

449 assert ( 

450 self._status != base.BatchStatus.ERROR 

451 ), "Publish after stop() or publish error." 

452 

453 if self.status != base.BatchStatus.ACCEPTING_MESSAGES: 

454 return None 

455 

456 size_increase = gapic_types.PublishRequest( 

457 messages=[wrapper.message] 

458 )._pb.ByteSize() 

459 

460 if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: 

461 err_msg = ( 

462 "The message being published would produce too large a publish " 

463 "request that would exceed the maximum allowed size on the " 

464 "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES) 

465 ) 

466 raise exceptions.MessageTooLargeError(err_msg) 

467 

468 new_size = self._size + size_increase 

469 new_count = len(self._message_wrappers) + 1 

470 

471 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) 

472 overflow = new_size > size_limit or new_count >= self.settings.max_messages 

473 

474 if not self._message_wrappers or not overflow: 

475 # Store the actual message in the batch's message queue. 

476 self._message_wrappers.append(wrapper) 

477 self._size = new_size 

478 

479 # Track the future on this batch (so that the result of the 

480 # future can be set). 

481 future = futures.Future() 

482 self._futures.append(future) 

483 

484 # Try to commit, but it must be **without** the lock held, since 

485 # ``commit()`` will try to obtain the lock. 

486 if self._commit_when_full and overflow: 

487 self.commit() 

488 

489 return future 

490 

491 def _set_status(self, status: base.BatchStatus): 

492 self._status = status