Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py: 29%

100 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import enum 

16import collections 

17import threading 

18import typing 

19from typing import Deque, Iterable, Sequence 

20 

21from google.api_core import gapic_v1 

22from google.cloud.pubsub_v1.publisher import futures 

23from google.cloud.pubsub_v1.publisher import exceptions 

24from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base 

25from google.cloud.pubsub_v1.publisher._batch import base as batch_base 

26from google.pubsub_v1 import types as gapic_types 

27 

28if typing.TYPE_CHECKING: # pragma: NO COVER 

29 from google.cloud.pubsub_v1 import types 

30 from google.cloud.pubsub_v1.publisher import _batch 

31 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient 

32 from google.pubsub_v1.services.publisher.client import OptionalRetry 

33 

34 

35class _OrderedSequencerStatus(str, enum.Enum): 

36 """An enum-like class representing valid statuses for an OrderedSequencer. 

37 

38 Starting state: ACCEPTING_MESSAGES 

39 Valid transitions: 

40 ACCEPTING_MESSAGES -> PAUSED (on permanent error) 

41 ACCEPTING_MESSAGES -> STOPPED (when user calls stop() explicitly) 

42 ACCEPTING_MESSAGES -> FINISHED (all batch publishes finish normally) 

43 

44 PAUSED -> ACCEPTING_MESSAGES (when user unpauses) 

45 PAUSED -> STOPPED (when user calls stop() explicitly) 

46 

47 STOPPED -> FINISHED (user stops client and the one remaining batch finishes 

48 publish) 

49 STOPPED -> PAUSED (stop() commits one batch, which fails permanently) 

50 

51 FINISHED -> ACCEPTING_MESSAGES (publish happens while waiting for cleanup) 

52 FINISHED -> STOPPED (when user calls stop() explicitly) 

53 Illegal transitions: 

54 PAUSED -> FINISHED (since all batches are cancelled on pause, there should 

55 not be any that finish normally. paused sequencers 

56 should not be cleaned up because their presence 

57 indicates that the ordering key needs to be resumed) 

58 STOPPED -> ACCEPTING_MESSAGES (no way to make a user-stopped sequencer 

59 accept messages again. this is okay since 

60 stop() should only be called on shutdown.) 

61 FINISHED -> PAUSED (no messages remain in flight, so they can't cause a 

62 permanent error and pause the sequencer) 

63 """ 

64 

65 # Accepting publishes and/or waiting for result of batch publish 

66 ACCEPTING_MESSAGES = "accepting messages" 

67 # Permanent error occurred. User must unpause this sequencer to resume 

68 # publishing. This is done to maintain ordering. 

69 PAUSED = "paused" 

70 # No more publishes allowed. There may be an outstanding batch that will 

71 # call the _batch_done_callback when it's done (success or error.) 

72 STOPPED = "stopped" 

73 # No more work to do. Waiting to be cleaned-up. A publish will transform 

74 # this sequencer back into the normal accepting-messages state. 

75 FINISHED = "finished" 

76 

77 

78class OrderedSequencer(sequencer_base.Sequencer): 

79 """Sequences messages into batches ordered by an ordering key for one topic. 

80 

81 A sequencer always has at least one batch in it, unless paused or stopped. 

82 When no batches remain, the |publishes_done_callback| is called so the 

83 client can perform cleanup. 

84 

85 Public methods are thread-safe. 

86 

87 Args: 

88 client: 

89 The publisher client used to create this sequencer. 

90 topic: 

91 The topic. The format for this is ``projects/{project}/topics/{topic}``. 

92 ordering_key: 

93 The ordering key for this sequencer. 

94 """ 

95 

96 def __init__(self, client: "PublisherClient", topic: str, ordering_key: str): 

97 self._client = client 

98 self._topic = topic 

99 self._ordering_key = ordering_key 

100 # Guards the variables below 

101 self._state_lock = threading.Lock() 

102 # Batches ordered from first (head/left) to last (right/tail). 

103 # Invariant: always has at least one batch after the first publish, 

104 # unless paused or stopped. 

105 self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque() 

106 # See _OrderedSequencerStatus for valid state transitions. 

107 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

108 

109 def is_finished(self) -> bool: 

110 """Whether the sequencer is finished and should be cleaned up. 

111 

112 Returns: 

113 Whether the sequencer is finished and should be cleaned up. 

114 """ 

115 with self._state_lock: 

116 return self._state == _OrderedSequencerStatus.FINISHED 

117 

118 def stop(self) -> None: 

119 """Permanently stop this sequencer. 

120 

121 This differs from pausing, which may be resumed. Immediately commits 

122 the first batch and cancels the rest. 

123 

124 Raises: 

125 RuntimeError: 

126 If called after stop() has already been called. 

127 """ 

128 with self._state_lock: 

129 if self._state == _OrderedSequencerStatus.STOPPED: 

130 raise RuntimeError("Ordered sequencer already stopped.") 

131 

132 self._state = _OrderedSequencerStatus.STOPPED 

133 if self._ordered_batches: 

134 # Give only the first batch the chance to finish. 

135 self._ordered_batches[0].commit() 

136 

137 # Cancel the rest of the batches and remove them from the deque 

138 # of batches. 

139 while len(self._ordered_batches) > 1: 

140 # Pops from the tail until it leaves only the head in the 

141 # deque. 

142 batch = self._ordered_batches.pop() 

143 batch.cancel(batch_base.BatchCancellationReason.CLIENT_STOPPED) 

144 

145 def commit(self) -> None: 

146 """Commit the first batch, if unpaused. 

147 

148 If paused or no batches exist, this method does nothing. 

149 

150 Raises: 

151 RuntimeError: 

152 If called after stop() has already been called. 

153 """ 

154 with self._state_lock: 

155 if self._state == _OrderedSequencerStatus.STOPPED: 

156 raise RuntimeError("Ordered sequencer already stopped.") 

157 

158 if self._state != _OrderedSequencerStatus.PAUSED and self._ordered_batches: 

159 # It's okay to commit the same batch more than once. The 

160 # operation is idempotent. 

161 self._ordered_batches[0].commit() 

162 

163 def _batch_done_callback(self, success: bool) -> None: 

164 """Deal with completion of a batch. 

165 

166 Called when a batch has finished publishing, with either a success 

167 or a failure. (Temporary failures are retried infinitely when 

168 ordering keys are enabled.) 

169 """ 

170 ensure_cleanup_and_commit_timer_runs = False 

171 with self._state_lock: 

172 assert self._state != _OrderedSequencerStatus.PAUSED, ( 

173 "This method should not be called after pause() because " 

174 "pause() should have cancelled all of the batches." 

175 ) 

176 assert self._state != _OrderedSequencerStatus.FINISHED, ( 

177 "This method should not be called after all batches have been " 

178 "finished." 

179 ) 

180 

181 # Message futures for the batch have been completed (either with a 

182 # result or an exception) already, so remove the batch. 

183 self._ordered_batches.popleft() 

184 

185 if success: 

186 if len(self._ordered_batches) == 0: 

187 # Mark this sequencer as finished. 

188 # If new messages come in for this ordering key and this 

189 # sequencer hasn't been cleaned up yet, it will go back 

190 # into accepting-messages state. Otherwise, the client 

191 # must create a new OrderedSequencer. 

192 self._state = _OrderedSequencerStatus.FINISHED 

193 # Ensure cleanup thread runs at some point. 

194 ensure_cleanup_and_commit_timer_runs = True 

195 elif len(self._ordered_batches) == 1: 

196 # Wait for messages and/or commit timeout 

197 # Ensure there's actually a commit timer thread that'll commit 

198 # after a delay. 

199 ensure_cleanup_and_commit_timer_runs = True 

200 else: 

201 # If there is more than one batch, we know that the next batch 

202 # must be full and, therefore, ready to be committed. 

203 self._ordered_batches[0].commit() 

204 else: 

205 # Unrecoverable error detected 

206 self._pause() 

207 

208 if ensure_cleanup_and_commit_timer_runs: 

209 self._client.ensure_cleanup_and_commit_timer_runs() 

210 

211 def _pause(self) -> None: 

212 """Pause this sequencer: set state to paused, cancel all batches, and 

213 clear the list of ordered batches. 

214 

215 _state_lock must be taken before calling this method. 

216 """ 

217 assert ( 

218 self._state != _OrderedSequencerStatus.FINISHED 

219 ), "Pause should not be called after all batches have finished." 

220 self._state = _OrderedSequencerStatus.PAUSED 

221 for batch in self._ordered_batches: 

222 batch.cancel( 

223 batch_base.BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED 

224 ) 

225 self._ordered_batches.clear() 

226 

227 def unpause(self) -> None: 

228 """Unpause this sequencer. 

229 

230 Raises: 

231 RuntimeError: 

232 If called when the ordering key has not been paused. 

233 """ 

234 with self._state_lock: 

235 if self._state != _OrderedSequencerStatus.PAUSED: 

236 raise RuntimeError("Ordering key is not paused.") 

237 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

238 

239 def _create_batch( 

240 self, 

241 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

242 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

243 ) -> "_batch.thread.Batch": 

244 """Create a new batch using the client's batch class and other stored 

245 settings. 

246 

247 Args: 

248 commit_retry: 

249 The retry settings to apply when publishing the batch. 

250 commit_timeout: 

251 The timeout to apply when publishing the batch. 

252 """ 

253 return self._client._batch_class( 

254 client=self._client, 

255 topic=self._topic, 

256 settings=self._client.batch_settings, 

257 batch_done_callback=self._batch_done_callback, 

258 commit_when_full=False, 

259 commit_retry=commit_retry, 

260 commit_timeout=commit_timeout, 

261 ) 

262 

263 def publish( 

264 self, 

265 message: gapic_types.PubsubMessage, 

266 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

267 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

268 ) -> futures.Future: 

269 """Publish message for this ordering key. 

270 

271 Args: 

272 message: 

273 The Pub/Sub message. 

274 retry: 

275 The retry settings to apply when publishing the message. 

276 timeout: 

277 The timeout to apply when publishing the message. 

278 

279 Returns: 

280 A class instance that conforms to Python Standard library's 

281 :class:`~concurrent.futures.Future` interface (but not an 

282 instance of that class). The future might return immediately with a 

283 PublishToPausedOrderingKeyException if the ordering key is paused. 

284 Otherwise, the future tracks the lifetime of the message publish. 

285 

286 Raises: 

287 RuntimeError: 

288 If called after this sequencer has been stopped, either by 

289 a call to stop() or after all batches have been published. 

290 """ 

291 with self._state_lock: 

292 if self._state == _OrderedSequencerStatus.PAUSED: 

293 errored_future = futures.Future() 

294 exception = exceptions.PublishToPausedOrderingKeyException( 

295 self._ordering_key 

296 ) 

297 errored_future.set_exception(exception) 

298 return errored_future 

299 

300 # If waiting to be cleaned-up, convert to accepting messages to 

301 # prevent this sequencer from being cleaned-up only to have another 

302 # one with the same ordering key created immediately afterward. 

303 if self._state == _OrderedSequencerStatus.FINISHED: 

304 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES 

305 

306 if self._state == _OrderedSequencerStatus.STOPPED: 

307 raise RuntimeError("Cannot publish on a stopped sequencer.") 

308 

309 assert ( 

310 self._state == _OrderedSequencerStatus.ACCEPTING_MESSAGES 

311 ), "Publish is only allowed in accepting-messages state." 

312 

313 if not self._ordered_batches: 

314 new_batch = self._create_batch( 

315 commit_retry=retry, commit_timeout=timeout 

316 ) 

317 self._ordered_batches.append(new_batch) 

318 

319 batch = self._ordered_batches[-1] 

320 future = batch.publish(message) 

321 while future is None: 

322 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

323 self._ordered_batches.append(batch) 

324 future = batch.publish(message) 

325 

326 return future 

327 

328 # Used only for testing. 

329 def _set_batch(self, batch: "_batch.thread.Batch") -> None: 

330 self._ordered_batches = collections.deque([batch]) 

331 

332 # Used only for testing. 

333 def _set_batches(self, batches: Iterable["_batch.thread.Batch"]) -> None: 

334 self._ordered_batches = collections.deque(batches) 

335 

336 # Used only for testing. 

337 def _get_batches(self) -> Sequence["_batch.thread.Batch"]: 

338 return self._ordered_batches