Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py: 28%

129 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import logging 

18import threading 

19import time 

20import typing 

21from typing import Any, Callable, List, Optional, Sequence 

22 

23import google.api_core.exceptions 

24from google.api_core import gapic_v1 

25from google.cloud.pubsub_v1.publisher import exceptions 

26from google.cloud.pubsub_v1.publisher import futures 

27from google.cloud.pubsub_v1.publisher._batch import base 

28from google.pubsub_v1 import types as gapic_types 

29 

30if typing.TYPE_CHECKING: # pragma: NO COVER 

31 from google.cloud import pubsub_v1 

32 from google.cloud.pubsub_v1 import types 

33 from google.cloud.pubsub_v1.publisher import Client as PublisherClient 

34 from google.pubsub_v1.services.publisher.client import OptionalRetry 

35 

36_LOGGER = logging.getLogger(__name__) 

37_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) 

38_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest 

39 

40_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb() 

41 

42 

43class Batch(base.Batch): 

44 """A batch of messages. 

45 

46 The batch is the internal group of messages which are either awaiting 

47 publication or currently in progress. 

48 

49 A batch is automatically created by the PublisherClient when the first 

50 message to be published is received; subsequent messages are added to 

51 that batch until the process of actual publishing _starts_. 

52 

53 Once this occurs, any new messages sent to :meth:`publish` open a new 

54 batch. 

55 

56 If you are using this library, you most likely do not need to instantiate 

57 batch objects directly; they will be created for you. If you want to 

58 change the actual batching settings, see the ``batching`` argument on 

59 :class:`~.pubsub_v1.PublisherClient`. 

60 

61 Any properties or methods on this class which are not defined in 

62 :class:`~.pubsub_v1.publisher.batch.BaseBatch` should be considered 

63 implementation details. 

64 

65 Args: 

66 client: 

67 The publisher client used to create this batch. 

68 topic: 

69 The topic. The format for this is ``projects/{project}/topics/{topic}``. 

70 settings: 

71 The settings for batch publishing. These should be considered immutable 

72 once the batch has been opened. 

73 batch_done_callback: 

74 Callback called when the response for a batch publish has been received. 

75 Called with one boolean argument: successfully published or a permanent 

76 error occurred. Temporary errors are not surfaced because they are retried 

77 at a lower level. 

78 commit_when_full: 

79 Whether to commit the batch when the batch is full. 

80 commit_retry: 

81 Designation of what errors, if any, should be retried when commiting 

82 the batch. If not provided, a default retry is used. 

83 commit_timeout: 

84 The timeout to apply when commiting the batch. If not provided, a default 

85 timeout is used. 

86 """ 

87 

88 def __init__( 

89 self, 

90 client: "PublisherClient", 

91 topic: str, 

92 settings: "types.BatchSettings", 

93 batch_done_callback: Callable[[bool], Any] = None, 

94 commit_when_full: bool = True, 

95 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

96 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

97 ): 

98 self._client = client 

99 self._topic = topic 

100 self._settings = settings 

101 self._batch_done_callback = batch_done_callback 

102 self._commit_when_full = commit_when_full 

103 

104 self._state_lock = threading.Lock() 

105 # These members are all communicated between threads; ensure that 

106 # any writes to them use the "state lock" to remain atomic. 

107 # _futures list should remain unchanged after batch 

108 # status changed from ACCEPTING_MESSAGES to any other 

109 # in order to avoid race conditions 

110 self._futures: List[futures.Future] = [] 

111 self._messages: List[gapic_types.PubsubMessage] = [] 

112 self._status = base.BatchStatus.ACCEPTING_MESSAGES 

113 

114 # The initial size is not zero, we need to account for the size overhead 

115 # of the PublishRequest message itself. 

116 self._base_request_size = gapic_types.PublishRequest(topic=topic)._pb.ByteSize() 

117 self._size = self._base_request_size 

118 

119 self._commit_retry = commit_retry 

120 self._commit_timeout = commit_timeout 

121 

122 @staticmethod 

123 def make_lock() -> threading.Lock: 

124 """Return a threading lock. 

125 

126 Returns: 

127 A newly created lock. 

128 """ 

129 return threading.Lock() 

130 

131 @property 

132 def client(self) -> "PublisherClient": 

133 """A publisher client.""" 

134 return self._client 

135 

136 @property 

137 def messages(self) -> Sequence[gapic_types.PubsubMessage]: 

138 """The messages currently in the batch.""" 

139 return self._messages 

140 

141 @property 

142 def settings(self) -> "types.BatchSettings": 

143 """Return the batch settings. 

144 

145 Returns: 

146 The batch settings. These are considered immutable once the batch has 

147 been opened. 

148 """ 

149 return self._settings 

150 

151 @property 

152 def size(self) -> int: 

153 """Return the total size of all of the messages currently in the batch. 

154 

155 The size includes any overhead of the actual ``PublishRequest`` that is 

156 sent to the backend. 

157 

158 Returns: 

159 The total size of all of the messages currently in the batch (including 

160 the request overhead), in bytes. 

161 """ 

162 return self._size 

163 

164 @property 

165 def status(self) -> base.BatchStatus: 

166 """Return the status of this batch. 

167 

168 Returns: 

169 The status of this batch. All statuses are human-readable, all-lowercase 

170 strings. 

171 """ 

172 return self._status 

173 

174 def cancel(self, cancellation_reason: base.BatchCancellationReason) -> None: 

175 """Complete pending futures with an exception. 

176 

177 This method must be called before publishing starts (ie: while the 

178 batch is still accepting messages.) 

179 

180 Args: 

181 The reason why this batch has been cancelled. 

182 """ 

183 

184 with self._state_lock: 

185 assert ( 

186 self._status == base.BatchStatus.ACCEPTING_MESSAGES 

187 ), "Cancel should not be called after sending has started." 

188 

189 exc = RuntimeError(cancellation_reason.value) 

190 for future in self._futures: 

191 future.set_exception(exc) 

192 self._status = base.BatchStatus.ERROR 

193 

194 def commit(self) -> None: 

195 """Actually publish all of the messages on the active batch. 

196 

197 .. note:: 

198 

199 This method is non-blocking. It opens a new thread, which calls 

200 :meth:`_commit`, which does block. 

201 

202 This synchronously sets the batch status to "starting", and then opens 

203 a new thread, which handles actually sending the messages to Pub/Sub. 

204 

205 If the current batch is **not** accepting messages, this method 

206 does nothing. 

207 """ 

208 

209 # Set the status to "starting" synchronously, to ensure that 

210 # this batch will necessarily not accept new messages. 

211 with self._state_lock: 

212 if self._status == base.BatchStatus.ACCEPTING_MESSAGES: 

213 self._status = base.BatchStatus.STARTING 

214 else: 

215 return 

216 

217 self._start_commit_thread() 

218 

219 def _start_commit_thread(self) -> None: 

220 """Start a new thread to actually handle the commit.""" 

221 # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. 

222 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 

223 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 

224 commit_thread = threading.Thread( 

225 name="Thread-CommitBatchPublisher", target=self._commit, daemon=True 

226 ) 

227 commit_thread.start() 

228 

229 def _commit(self) -> None: 

230 """Actually publish all of the messages on the active batch. 

231 

232 This moves the batch out from being the active batch to an in progress 

233 batch on the publisher, and then the batch is discarded upon 

234 completion. 

235 

236 .. note:: 

237 

238 This method blocks. The :meth:`commit` method is the non-blocking 

239 version, which calls this one. 

240 """ 

241 with self._state_lock: 

242 if self._status in _CAN_COMMIT: 

243 self._status = base.BatchStatus.IN_PROGRESS 

244 else: 

245 # If, in the intervening period between when this method was 

246 # called and now, the batch started to be committed, or 

247 # completed a commit, then no-op at this point. 

248 _LOGGER.debug( 

249 "Batch is already in progress or has been cancelled, " 

250 "exiting commit" 

251 ) 

252 return 

253 

254 # Once in the IN_PROGRESS state, no other thread can publish additional 

255 # messages or initiate a commit (those operations become a no-op), thus 

256 # it is safe to release the state lock here. Releasing the lock avoids 

257 # blocking other threads in case api.publish() below takes a long time 

258 # to complete. 

259 # https://github.com/googleapis/google-cloud-python/issues/8036 

260 

261 # Sanity check: If there are no messages, no-op. 

262 if not self._messages: 

263 _LOGGER.debug("No messages to publish, exiting commit") 

264 self._status = base.BatchStatus.SUCCESS 

265 return 

266 

267 # Begin the request to publish these messages. 

268 # Log how long the underlying request takes. 

269 start = time.time() 

270 

271 batch_transport_succeeded = True 

272 try: 

273 # Performs retries for errors defined by the retry configuration. 

274 response = self._client._gapic_publish( 

275 topic=self._topic, 

276 messages=self._messages, 

277 retry=self._commit_retry, 

278 timeout=self._commit_timeout, 

279 ) 

280 except google.api_core.exceptions.GoogleAPIError as exc: 

281 # We failed to publish, even after retries, so set the exception on 

282 # all futures and exit. 

283 self._status = base.BatchStatus.ERROR 

284 

285 for future in self._futures: 

286 future.set_exception(exc) 

287 

288 batch_transport_succeeded = False 

289 if self._batch_done_callback is not None: 

290 # Failed to publish batch. 

291 self._batch_done_callback(batch_transport_succeeded) 

292 

293 _LOGGER.exception("Failed to publish %s messages.", len(self._futures)) 

294 return 

295 

296 end = time.time() 

297 _LOGGER.debug("gRPC Publish took %s seconds.", end - start) 

298 

299 if len(response.message_ids) == len(self._futures): 

300 # Iterate over the futures on the queue and return the response 

301 # IDs. We are trusting that there is a 1:1 mapping, and raise 

302 # an exception if not. 

303 self._status = base.BatchStatus.SUCCESS 

304 for message_id, future in zip(response.message_ids, self._futures): 

305 future.set_result(message_id) 

306 else: 

307 # Sanity check: If the number of message IDs is not equal to 

308 # the number of futures I have, then something went wrong. 

309 self._status = base.BatchStatus.ERROR 

310 exception = exceptions.PublishError( 

311 "Some messages were not successfully published." 

312 ) 

313 

314 for future in self._futures: 

315 future.set_exception(exception) 

316 

317 # Unknown error -> batch failed to be correctly transported/ 

318 batch_transport_succeeded = False 

319 

320 _LOGGER.error( 

321 "Only %s of %s messages were published.", 

322 len(response.message_ids), 

323 len(self._futures), 

324 ) 

325 

326 if self._batch_done_callback is not None: 

327 self._batch_done_callback(batch_transport_succeeded) 

328 

329 def publish( 

330 self, message: gapic_types.PubsubMessage 

331 ) -> Optional["pubsub_v1.publisher.futures.Future"]: 

332 """Publish a single message. 

333 

334 Add the given message to this object; this will cause it to be 

335 published once the batch either has enough messages or a sufficient 

336 period of time has elapsed. If the batch is full or the commit is 

337 already in progress, the method does not do anything. 

338 

339 This method is called by :meth:`~.PublisherClient.publish`. 

340 

341 Args: 

342 message: The Pub/Sub message. 

343 

344 Returns: 

345 An object conforming to the :class:`~concurrent.futures.Future` interface 

346 or :data:`None`. If :data:`None` is returned, that signals that the batch 

347 cannot accept a message. 

348 

349 Raises: 

350 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

351 the ``message`` would exceed the max size limit on the backend. 

352 """ 

353 

354 # Coerce the type, just in case. 

355 if not isinstance(message, gapic_types.PubsubMessage): 

356 # For performance reasons, the message should be constructed by directly 

357 # using the raw protobuf class, and only then wrapping it into the 

358 # higher-level PubsubMessage class. 

359 vanilla_pb = _raw_proto_pubbsub_message(**message) 

360 message = gapic_types.PubsubMessage.wrap(vanilla_pb) 

361 

362 future = None 

363 

364 with self._state_lock: 

365 assert ( 

366 self._status != base.BatchStatus.ERROR 

367 ), "Publish after stop() or publish error." 

368 

369 if self.status != base.BatchStatus.ACCEPTING_MESSAGES: 

370 return None 

371 

372 size_increase = gapic_types.PublishRequest( 

373 messages=[message] 

374 )._pb.ByteSize() 

375 

376 if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: 

377 err_msg = ( 

378 "The message being published would produce too large a publish " 

379 "request that would exceed the maximum allowed size on the " 

380 "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES) 

381 ) 

382 raise exceptions.MessageTooLargeError(err_msg) 

383 

384 new_size = self._size + size_increase 

385 new_count = len(self._messages) + 1 

386 

387 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) 

388 overflow = new_size > size_limit or new_count >= self.settings.max_messages 

389 

390 if not self._messages or not overflow: 

391 

392 # Store the actual message in the batch's message queue. 

393 self._messages.append(message) 

394 self._size = new_size 

395 

396 # Track the future on this batch (so that the result of the 

397 # future can be set). 

398 future = futures.Future() 

399 self._futures.append(future) 

400 

401 # Try to commit, but it must be **without** the lock held, since 

402 # ``commit()`` will try to obtain the lock. 

403 if self._commit_when_full and overflow: 

404 self.commit() 

405 

406 return future 

407 

408 def _set_status(self, status: base.BatchStatus): 

409 self._status = status