1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import enum
16import collections
17import threading
18import typing
19from typing import Deque, Iterable, Sequence
20
21from google.api_core import gapic_v1
22from google.cloud.pubsub_v1.publisher import futures
23from google.cloud.pubsub_v1.publisher import exceptions
24from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
25from google.cloud.pubsub_v1.publisher._batch import base as batch_base
26from google.pubsub_v1 import types as gapic_types
27
28if typing.TYPE_CHECKING: # pragma: NO COVER
29 from google.cloud.pubsub_v1 import types
30 from google.cloud.pubsub_v1.publisher import _batch
31 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
32 from google.pubsub_v1.services.publisher.client import OptionalRetry
33
34
35class _OrderedSequencerStatus(str, enum.Enum):
36 """An enum-like class representing valid statuses for an OrderedSequencer.
37
38 Starting state: ACCEPTING_MESSAGES
39 Valid transitions:
40 ACCEPTING_MESSAGES -> PAUSED (on permanent error)
41 ACCEPTING_MESSAGES -> STOPPED (when user calls stop() explicitly)
42 ACCEPTING_MESSAGES -> FINISHED (all batch publishes finish normally)
43
44 PAUSED -> ACCEPTING_MESSAGES (when user unpauses)
45 PAUSED -> STOPPED (when user calls stop() explicitly)
46
47 STOPPED -> FINISHED (user stops client and the one remaining batch finishes
48 publish)
49 STOPPED -> PAUSED (stop() commits one batch, which fails permanently)
50
51 FINISHED -> ACCEPTING_MESSAGES (publish happens while waiting for cleanup)
52 FINISHED -> STOPPED (when user calls stop() explicitly)
53 Illegal transitions:
54 PAUSED -> FINISHED (since all batches are cancelled on pause, there should
55 not be any that finish normally. paused sequencers
56 should not be cleaned up because their presence
57 indicates that the ordering key needs to be resumed)
58 STOPPED -> ACCEPTING_MESSAGES (no way to make a user-stopped sequencer
59 accept messages again. this is okay since
60 stop() should only be called on shutdown.)
61 FINISHED -> PAUSED (no messages remain in flight, so they can't cause a
62 permanent error and pause the sequencer)
63 """
64
65 # Accepting publishes and/or waiting for result of batch publish
66 ACCEPTING_MESSAGES = "accepting messages"
67 # Permanent error occurred. User must unpause this sequencer to resume
68 # publishing. This is done to maintain ordering.
69 PAUSED = "paused"
70 # No more publishes allowed. There may be an outstanding batch that will
71 # call the _batch_done_callback when it's done (success or error.)
72 STOPPED = "stopped"
73 # No more work to do. Waiting to be cleaned-up. A publish will transform
74 # this sequencer back into the normal accepting-messages state.
75 FINISHED = "finished"
76
77
78class OrderedSequencer(sequencer_base.Sequencer):
79 """Sequences messages into batches ordered by an ordering key for one topic.
80
81 A sequencer always has at least one batch in it, unless paused or stopped.
82 When no batches remain, the |publishes_done_callback| is called so the
83 client can perform cleanup.
84
85 Public methods are thread-safe.
86
87 Args:
88 client:
89 The publisher client used to create this sequencer.
90 topic:
91 The topic. The format for this is ``projects/{project}/topics/{topic}``.
92 ordering_key:
93 The ordering key for this sequencer.
94 """
95
96 def __init__(self, client: "PublisherClient", topic: str, ordering_key: str):
97 self._client = client
98 self._topic = topic
99 self._ordering_key = ordering_key
100 # Guards the variables below
101 self._state_lock = threading.Lock()
102 # Batches ordered from first (head/left) to last (right/tail).
103 # Invariant: always has at least one batch after the first publish,
104 # unless paused or stopped.
105 self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque()
106 # See _OrderedSequencerStatus for valid state transitions.
107 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
108
109 def is_finished(self) -> bool:
110 """Whether the sequencer is finished and should be cleaned up.
111
112 Returns:
113 Whether the sequencer is finished and should be cleaned up.
114 """
115 with self._state_lock:
116 return self._state == _OrderedSequencerStatus.FINISHED
117
118 def stop(self) -> None:
119 """Permanently stop this sequencer.
120
121 This differs from pausing, which may be resumed. Immediately commits
122 the first batch and cancels the rest.
123
124 Raises:
125 RuntimeError:
126 If called after stop() has already been called.
127 """
128 with self._state_lock:
129 if self._state == _OrderedSequencerStatus.STOPPED:
130 raise RuntimeError("Ordered sequencer already stopped.")
131
132 self._state = _OrderedSequencerStatus.STOPPED
133 if self._ordered_batches:
134 # Give only the first batch the chance to finish.
135 self._ordered_batches[0].commit()
136
137 # Cancel the rest of the batches and remove them from the deque
138 # of batches.
139 while len(self._ordered_batches) > 1:
140 # Pops from the tail until it leaves only the head in the
141 # deque.
142 batch = self._ordered_batches.pop()
143 batch.cancel(batch_base.BatchCancellationReason.CLIENT_STOPPED)
144
145 def commit(self) -> None:
146 """Commit the first batch, if unpaused.
147
148 If paused or no batches exist, this method does nothing.
149
150 Raises:
151 RuntimeError:
152 If called after stop() has already been called.
153 """
154 with self._state_lock:
155 if self._state == _OrderedSequencerStatus.STOPPED:
156 raise RuntimeError("Ordered sequencer already stopped.")
157
158 if self._state != _OrderedSequencerStatus.PAUSED and self._ordered_batches:
159 # It's okay to commit the same batch more than once. The
160 # operation is idempotent.
161 self._ordered_batches[0].commit()
162
163 def _batch_done_callback(self, success: bool) -> None:
164 """Deal with completion of a batch.
165
166 Called when a batch has finished publishing, with either a success
167 or a failure. (Temporary failures are retried infinitely when
168 ordering keys are enabled.)
169 """
170 ensure_cleanup_and_commit_timer_runs = False
171 with self._state_lock:
172 assert self._state != _OrderedSequencerStatus.PAUSED, (
173 "This method should not be called after pause() because "
174 "pause() should have cancelled all of the batches."
175 )
176 assert self._state != _OrderedSequencerStatus.FINISHED, (
177 "This method should not be called after all batches have been "
178 "finished."
179 )
180
181 # Message futures for the batch have been completed (either with a
182 # result or an exception) already, so remove the batch.
183 self._ordered_batches.popleft()
184
185 if success:
186 if len(self._ordered_batches) == 0:
187 # Mark this sequencer as finished.
188 # If new messages come in for this ordering key and this
189 # sequencer hasn't been cleaned up yet, it will go back
190 # into accepting-messages state. Otherwise, the client
191 # must create a new OrderedSequencer.
192 self._state = _OrderedSequencerStatus.FINISHED
193 # Ensure cleanup thread runs at some point.
194 ensure_cleanup_and_commit_timer_runs = True
195 elif len(self._ordered_batches) == 1:
196 # Wait for messages and/or commit timeout
197 # Ensure there's actually a commit timer thread that'll commit
198 # after a delay.
199 ensure_cleanup_and_commit_timer_runs = True
200 else:
201 # If there is more than one batch, we know that the next batch
202 # must be full and, therefore, ready to be committed.
203 self._ordered_batches[0].commit()
204 else:
205 # Unrecoverable error detected
206 self._pause()
207
208 if ensure_cleanup_and_commit_timer_runs:
209 self._client.ensure_cleanup_and_commit_timer_runs()
210
211 def _pause(self) -> None:
212 """Pause this sequencer: set state to paused, cancel all batches, and
213 clear the list of ordered batches.
214
215 _state_lock must be taken before calling this method.
216 """
217 assert (
218 self._state != _OrderedSequencerStatus.FINISHED
219 ), "Pause should not be called after all batches have finished."
220 self._state = _OrderedSequencerStatus.PAUSED
221 for batch in self._ordered_batches:
222 batch.cancel(
223 batch_base.BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED
224 )
225 self._ordered_batches.clear()
226
227 def unpause(self) -> None:
228 """Unpause this sequencer.
229
230 Raises:
231 RuntimeError:
232 If called when the ordering key has not been paused.
233 """
234 with self._state_lock:
235 if self._state != _OrderedSequencerStatus.PAUSED:
236 raise RuntimeError("Ordering key is not paused.")
237 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
238
239 def _create_batch(
240 self,
241 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
242 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
243 ) -> "_batch.thread.Batch":
244 """Create a new batch using the client's batch class and other stored
245 settings.
246
247 Args:
248 commit_retry:
249 The retry settings to apply when publishing the batch.
250 commit_timeout:
251 The timeout to apply when publishing the batch.
252 """
253 return self._client._batch_class(
254 client=self._client,
255 topic=self._topic,
256 settings=self._client.batch_settings,
257 batch_done_callback=self._batch_done_callback,
258 commit_when_full=False,
259 commit_retry=commit_retry,
260 commit_timeout=commit_timeout,
261 )
262
263 def publish(
264 self,
265 message: gapic_types.PubsubMessage,
266 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
267 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
268 ) -> futures.Future:
269 """Publish message for this ordering key.
270
271 Args:
272 message:
273 The Pub/Sub message.
274 retry:
275 The retry settings to apply when publishing the message.
276 timeout:
277 The timeout to apply when publishing the message.
278
279 Returns:
280 A class instance that conforms to Python Standard library's
281 :class:`~concurrent.futures.Future` interface (but not an
282 instance of that class). The future might return immediately with a
283 PublishToPausedOrderingKeyException if the ordering key is paused.
284 Otherwise, the future tracks the lifetime of the message publish.
285
286 Raises:
287 RuntimeError:
288 If called after this sequencer has been stopped, either by
289 a call to stop() or after all batches have been published.
290 """
291 with self._state_lock:
292 if self._state == _OrderedSequencerStatus.PAUSED:
293 errored_future = futures.Future()
294 exception = exceptions.PublishToPausedOrderingKeyException(
295 self._ordering_key
296 )
297 errored_future.set_exception(exception)
298 return errored_future
299
300 # If waiting to be cleaned-up, convert to accepting messages to
301 # prevent this sequencer from being cleaned-up only to have another
302 # one with the same ordering key created immediately afterward.
303 if self._state == _OrderedSequencerStatus.FINISHED:
304 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
305
306 if self._state == _OrderedSequencerStatus.STOPPED:
307 raise RuntimeError("Cannot publish on a stopped sequencer.")
308
309 assert (
310 self._state == _OrderedSequencerStatus.ACCEPTING_MESSAGES
311 ), "Publish is only allowed in accepting-messages state."
312
313 if not self._ordered_batches:
314 new_batch = self._create_batch(
315 commit_retry=retry, commit_timeout=timeout
316 )
317 self._ordered_batches.append(new_batch)
318
319 batch = self._ordered_batches[-1]
320 future = batch.publish(message)
321 while future is None:
322 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
323 self._ordered_batches.append(batch)
324 future = batch.publish(message)
325
326 return future
327
328 # Used only for testing.
329 def _set_batch(self, batch: "_batch.thread.Batch") -> None:
330 self._ordered_batches = collections.deque([batch])
331
332 # Used only for testing.
333 def _set_batches(self, batches: Iterable["_batch.thread.Batch"]) -> None:
334 self._ordered_batches = collections.deque(batches)
335
336 # Used only for testing.
337 def _get_batches(self) -> Sequence["_batch.thread.Batch"]:
338 return self._ordered_batches