Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import enum 

16import collections 

17import threading 

18import typing 

19from typing import Deque, Iterable, Sequence 

20 

21from google.api_core import gapic_v1 

22from google.cloud.pubsub_v1.publisher import futures 

23from google.cloud.pubsub_v1.publisher import exceptions 

24from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base 

25from google.cloud.pubsub_v1.publisher._batch import base as batch_base 

26from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( 

27 PublishMessageWrapper, 

28) 

29 

30if typing.TYPE_CHECKING: # pragma: NO COVER 

31 from google.cloud.pubsub_v1 import types 

32 from google.cloud.pubsub_v1.publisher import _batch 

33 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient 

34 from google.pubsub_v1.services.publisher.client import OptionalRetry 

35 

36 

37class _OrderedSequencerStatus(str, enum.Enum): 

38 """An enum-like class representing valid statuses for an OrderedSequencer. 

39 

40 Starting state: ACCEPTING_MESSAGES 

41 Valid transitions: 

42 ACCEPTING_MESSAGES -> PAUSED (on permanent error) 

43 ACCEPTING_MESSAGES -> STOPPED (when user calls stop() explicitly) 

44 ACCEPTING_MESSAGES -> FINISHED (all batch publishes finish normally) 

45 

46 PAUSED -> ACCEPTING_MESSAGES (when user unpauses) 

47 PAUSED -> STOPPED (when user calls stop() explicitly) 

48 

49 STOPPED -> FINISHED (user stops client and the one remaining batch finishes 

50 publish) 

51 STOPPED -> PAUSED (stop() commits one batch, which fails permanently) 

52 

53 FINISHED -> ACCEPTING_MESSAGES (publish happens while waiting for cleanup) 

54 FINISHED -> STOPPED (when user calls stop() explicitly) 

55 Illegal transitions: 

56 PAUSED -> FINISHED (since all batches are cancelled on pause, there should 

57 not be any that finish normally. paused sequencers 

58 should not be cleaned up because their presence 

59 indicates that the ordering key needs to be resumed) 

60 STOPPED -> ACCEPTING_MESSAGES (no way to make a user-stopped sequencer 

61 accept messages again. this is okay since 

62 stop() should only be called on shutdown.) 

63 FINISHED -> PAUSED (no messages remain in flight, so they can't cause a 

64 permanent error and pause the sequencer) 

65 """ 

66 

67 # Accepting publishes and/or waiting for result of batch publish 

68 ACCEPTING_MESSAGES = "accepting messages" 

69 # Permanent error occurred. User must unpause this sequencer to resume 

70 # publishing. This is done to maintain ordering. 

71 PAUSED = "paused" 

72 # No more publishes allowed. There may be an outstanding batch that will 

73 # call the _batch_done_callback when it's done (success or error.) 

74 STOPPED = "stopped" 

75 # No more work to do. Waiting to be cleaned-up. A publish will transform 

76 # this sequencer back into the normal accepting-messages state. 

77 FINISHED = "finished" 

78 

79 

80class OrderedSequencer(sequencer_base.Sequencer): 

81 """Sequences messages into batches ordered by an ordering key for one topic. 

82 

83 A sequencer always has at least one batch in it, unless paused or stopped. 

84 When no batches remain, the |publishes_done_callback| is called so the 

85 client can perform cleanup. 

86 

87 Public methods are thread-safe. 

88 

89 Args: 

90 client: 

91 The publisher client used to create this sequencer. 

92 topic: 

93 The topic. The format for this is ``projects/{project}/topics/{topic}``. 

94 ordering_key: 

95 The ordering key for this sequencer. 

96 """ 

97 

98 def __init__(self, client: "PublisherClient", topic: str, ordering_key: str): 

99 self._client = client 

100 self._topic = topic 

101 self._ordering_key = ordering_key 

102 # Guards the variables below 

103 self._state_lock = threading.Lock() 

104 # Batches ordered from first (head/left) to last (right/tail). 

105 # Invariant: always has at least one batch after the first publish, 

106 # unless paused or stopped. 

107 self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque() 

108 # See _OrderedSequencerStatus for valid state transitions. 

109 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

110 

111 def is_finished(self) -> bool: 

112 """Whether the sequencer is finished and should be cleaned up. 

113 

114 Returns: 

115 Whether the sequencer is finished and should be cleaned up. 

116 """ 

117 with self._state_lock: 

118 return self._state == _OrderedSequencerStatus.FINISHED 

119 

120 def stop(self) -> None: 

121 """Permanently stop this sequencer. 

122 

123 This differs from pausing, which may be resumed. Immediately commits 

124 the first batch and cancels the rest. 

125 

126 Raises: 

127 RuntimeError: 

128 If called after stop() has already been called. 

129 """ 

130 with self._state_lock: 

131 if self._state == _OrderedSequencerStatus.STOPPED: 

132 raise RuntimeError("Ordered sequencer already stopped.") 

133 

134 self._state = _OrderedSequencerStatus.STOPPED 

135 if self._ordered_batches: 

136 # Give only the first batch the chance to finish. 

137 self._ordered_batches[0].commit() 

138 

139 # Cancel the rest of the batches and remove them from the deque 

140 # of batches. 

141 while len(self._ordered_batches) > 1: 

142 # Pops from the tail until it leaves only the head in the 

143 # deque. 

144 batch = self._ordered_batches.pop() 

145 batch.cancel(batch_base.BatchCancellationReason.CLIENT_STOPPED) 

146 

147 def commit(self) -> None: 

148 """Commit the first batch, if unpaused. 

149 

150 If paused or no batches exist, this method does nothing. 

151 

152 Raises: 

153 RuntimeError: 

154 If called after stop() has already been called. 

155 """ 

156 with self._state_lock: 

157 if self._state == _OrderedSequencerStatus.STOPPED: 

158 raise RuntimeError("Ordered sequencer already stopped.") 

159 

160 if self._state != _OrderedSequencerStatus.PAUSED and self._ordered_batches: 

161 # It's okay to commit the same batch more than once. The 

162 # operation is idempotent. 

163 self._ordered_batches[0].commit() 

164 

165 def _batch_done_callback(self, success: bool) -> None: 

166 """Deal with completion of a batch. 

167 

168 Called when a batch has finished publishing, with either a success 

169 or a failure. (Temporary failures are retried infinitely when 

170 ordering keys are enabled.) 

171 """ 

172 ensure_cleanup_and_commit_timer_runs = False 

173 with self._state_lock: 

174 assert self._state != _OrderedSequencerStatus.PAUSED, ( 

175 "This method should not be called after pause() because " 

176 "pause() should have cancelled all of the batches." 

177 ) 

178 assert self._state != _OrderedSequencerStatus.FINISHED, ( 

179 "This method should not be called after all batches have been " 

180 "finished." 

181 ) 

182 

183 # Message futures for the batch have been completed (either with a 

184 # result or an exception) already, so remove the batch. 

185 self._ordered_batches.popleft() 

186 

187 if success: 

188 if len(self._ordered_batches) == 0: 

189 # Mark this sequencer as finished. 

190 # If new messages come in for this ordering key and this 

191 # sequencer hasn't been cleaned up yet, it will go back 

192 # into accepting-messages state. Otherwise, the client 

193 # must create a new OrderedSequencer. 

194 self._state = _OrderedSequencerStatus.FINISHED 

195 # Ensure cleanup thread runs at some point. 

196 ensure_cleanup_and_commit_timer_runs = True 

197 elif len(self._ordered_batches) == 1: 

198 # Wait for messages and/or commit timeout 

199 # Ensure there's actually a commit timer thread that'll commit 

200 # after a delay. 

201 ensure_cleanup_and_commit_timer_runs = True 

202 else: 

203 # If there is more than one batch, we know that the next batch 

204 # must be full and, therefore, ready to be committed. 

205 self._ordered_batches[0].commit() 

206 else: 

207 # Unrecoverable error detected 

208 self._pause() 

209 

210 if ensure_cleanup_and_commit_timer_runs: 

211 self._client.ensure_cleanup_and_commit_timer_runs() 

212 

213 def _pause(self) -> None: 

214 """Pause this sequencer: set state to paused, cancel all batches, and 

215 clear the list of ordered batches. 

216 

217 _state_lock must be taken before calling this method. 

218 """ 

219 assert ( 

220 self._state != _OrderedSequencerStatus.FINISHED 

221 ), "Pause should not be called after all batches have finished." 

222 self._state = _OrderedSequencerStatus.PAUSED 

223 for batch in self._ordered_batches: 

224 batch.cancel( 

225 batch_base.BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED 

226 ) 

227 self._ordered_batches.clear() 

228 

229 def unpause(self) -> None: 

230 """Unpause this sequencer. 

231 

232 Raises: 

233 RuntimeError: 

234 If called when the ordering key has not been paused. 

235 """ 

236 with self._state_lock: 

237 if self._state != _OrderedSequencerStatus.PAUSED: 

238 raise RuntimeError("Ordering key is not paused.") 

239 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

240 

241 def _create_batch( 

242 self, 

243 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

244 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

245 ) -> "_batch.thread.Batch": 

246 """Create a new batch using the client's batch class and other stored 

247 settings. 

248 

249 Args: 

250 commit_retry: 

251 The retry settings to apply when publishing the batch. 

252 commit_timeout: 

253 The timeout to apply when publishing the batch. 

254 """ 

255 return self._client._batch_class( 

256 client=self._client, 

257 topic=self._topic, 

258 settings=self._client.batch_settings, 

259 batch_done_callback=self._batch_done_callback, 

260 commit_when_full=False, 

261 commit_retry=commit_retry, 

262 commit_timeout=commit_timeout, 

263 ) 

264 

265 def publish( 

266 self, 

267 wrapper: PublishMessageWrapper, 

268 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

269 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

270 ) -> futures.Future: 

271 """Publish message for this ordering key. 

272 

273 Args: 

274 wrapper: 

275 The Pub/Sub message wrapper. 

276 retry: 

277 The retry settings to apply when publishing the message. 

278 timeout: 

279 The timeout to apply when publishing the message. 

280 

281 Returns: 

282 A class instance that conforms to Python Standard library's 

283 :class:`~concurrent.futures.Future` interface (but not an 

284 instance of that class). The future might return immediately with a 

285 PublishToPausedOrderingKeyException if the ordering key is paused. 

286 Otherwise, the future tracks the lifetime of the message publish. 

287 

288 Raises: 

289 RuntimeError: 

290 If called after this sequencer has been stopped, either by 

291 a call to stop() or after all batches have been published. 

292 """ 

293 with self._state_lock: 

294 if self._state == _OrderedSequencerStatus.PAUSED: 

295 errored_future = futures.Future() 

296 exception = exceptions.PublishToPausedOrderingKeyException( 

297 self._ordering_key 

298 ) 

299 errored_future.set_exception(exception) 

300 return errored_future 

301 

302 # If waiting to be cleaned-up, convert to accepting messages to 

303 # prevent this sequencer from being cleaned-up only to have another 

304 # one with the same ordering key created immediately afterward. 

305 if self._state == _OrderedSequencerStatus.FINISHED: 

306 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

307 

308 if self._state == _OrderedSequencerStatus.STOPPED: 

309 raise RuntimeError("Cannot publish on a stopped sequencer.") 

310 

311 assert ( 

312 self._state == _OrderedSequencerStatus.ACCEPTING_MESSAGES 

313 ), "Publish is only allowed in accepting-messages state." 

314 

315 if not self._ordered_batches: 

316 new_batch = self._create_batch( 

317 commit_retry=retry, commit_timeout=timeout 

318 ) 

319 self._ordered_batches.append(new_batch) 

320 

321 batch = self._ordered_batches[-1] 

322 future = batch.publish(wrapper) 

323 while future is None: 

324 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

325 self._ordered_batches.append(batch) 

326 future = batch.publish(wrapper) 

327 

328 return future 

329 

330 # Used only for testing. 

331 def _set_batch(self, batch: "_batch.thread.Batch") -> None: 

332 self._ordered_batches = collections.deque([batch]) 

333 

334 # Used only for testing. 

335 def _set_batches(self, batches: Iterable["_batch.thread.Batch"]) -> None: 

336 self._ordered_batches = collections.deque(batches) 

337 

338 # Used only for testing. 

339 def _get_batches(self) -> Sequence["_batch.thread.Batch"]: 

340 return self._ordered_batches