1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import enum
16import collections
17import threading
18import typing
19from typing import Deque, Iterable, Sequence
20
21from google.api_core import gapic_v1
22from google.cloud.pubsub_v1.publisher import futures
23from google.cloud.pubsub_v1.publisher import exceptions
24from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
25from google.cloud.pubsub_v1.publisher._batch import base as batch_base
26from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
27 PublishMessageWrapper,
28)
29
30if typing.TYPE_CHECKING: # pragma: NO COVER
31 from google.cloud.pubsub_v1 import types
32 from google.cloud.pubsub_v1.publisher import _batch
33 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
34 from google.pubsub_v1.services.publisher.client import OptionalRetry
35
36
37class _OrderedSequencerStatus(str, enum.Enum):
38 """An enum-like class representing valid statuses for an OrderedSequencer.
39
40 Starting state: ACCEPTING_MESSAGES
41 Valid transitions:
42 ACCEPTING_MESSAGES -> PAUSED (on permanent error)
43 ACCEPTING_MESSAGES -> STOPPED (when user calls stop() explicitly)
44 ACCEPTING_MESSAGES -> FINISHED (all batch publishes finish normally)
45
46 PAUSED -> ACCEPTING_MESSAGES (when user unpauses)
47 PAUSED -> STOPPED (when user calls stop() explicitly)
48
49 STOPPED -> FINISHED (user stops client and the one remaining batch finishes
50 publish)
51 STOPPED -> PAUSED (stop() commits one batch, which fails permanently)
52
53 FINISHED -> ACCEPTING_MESSAGES (publish happens while waiting for cleanup)
54 FINISHED -> STOPPED (when user calls stop() explicitly)
55 Illegal transitions:
56 PAUSED -> FINISHED (since all batches are cancelled on pause, there should
57 not be any that finish normally. paused sequencers
58 should not be cleaned up because their presence
59 indicates that the ordering key needs to be resumed)
60 STOPPED -> ACCEPTING_MESSAGES (no way to make a user-stopped sequencer
61 accept messages again. this is okay since
62 stop() should only be called on shutdown.)
63 FINISHED -> PAUSED (no messages remain in flight, so they can't cause a
64 permanent error and pause the sequencer)
65 """
66
67 # Accepting publishes and/or waiting for result of batch publish
68 ACCEPTING_MESSAGES = "accepting messages"
69 # Permanent error occurred. User must unpause this sequencer to resume
70 # publishing. This is done to maintain ordering.
71 PAUSED = "paused"
72 # No more publishes allowed. There may be an outstanding batch that will
73 # call the _batch_done_callback when it's done (success or error.)
74 STOPPED = "stopped"
75 # No more work to do. Waiting to be cleaned-up. A publish will transform
76 # this sequencer back into the normal accepting-messages state.
77 FINISHED = "finished"
78
79
80class OrderedSequencer(sequencer_base.Sequencer):
81 """Sequences messages into batches ordered by an ordering key for one topic.
82
83 A sequencer always has at least one batch in it, unless paused or stopped.
84 When no batches remain, the |publishes_done_callback| is called so the
85 client can perform cleanup.
86
87 Public methods are thread-safe.
88
89 Args:
90 client:
91 The publisher client used to create this sequencer.
92 topic:
93 The topic. The format for this is ``projects/{project}/topics/{topic}``.
94 ordering_key:
95 The ordering key for this sequencer.
96 """
97
98 def __init__(self, client: "PublisherClient", topic: str, ordering_key: str):
99 self._client = client
100 self._topic = topic
101 self._ordering_key = ordering_key
102 # Guards the variables below
103 self._state_lock = threading.Lock()
104 # Batches ordered from first (head/left) to last (right/tail).
105 # Invariant: always has at least one batch after the first publish,
106 # unless paused or stopped.
107 self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque()
108 # See _OrderedSequencerStatus for valid state transitions.
109 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
110
111 def is_finished(self) -> bool:
112 """Whether the sequencer is finished and should be cleaned up.
113
114 Returns:
115 Whether the sequencer is finished and should be cleaned up.
116 """
117 with self._state_lock:
118 return self._state == _OrderedSequencerStatus.FINISHED
119
120 def stop(self) -> None:
121 """Permanently stop this sequencer.
122
123 This differs from pausing, which may be resumed. Immediately commits
124 the first batch and cancels the rest.
125
126 Raises:
127 RuntimeError:
128 If called after stop() has already been called.
129 """
130 with self._state_lock:
131 if self._state == _OrderedSequencerStatus.STOPPED:
132 raise RuntimeError("Ordered sequencer already stopped.")
133
134 self._state = _OrderedSequencerStatus.STOPPED
135 if self._ordered_batches:
136 # Give only the first batch the chance to finish.
137 self._ordered_batches[0].commit()
138
139 # Cancel the rest of the batches and remove them from the deque
140 # of batches.
141 while len(self._ordered_batches) > 1:
142 # Pops from the tail until it leaves only the head in the
143 # deque.
144 batch = self._ordered_batches.pop()
145 batch.cancel(batch_base.BatchCancellationReason.CLIENT_STOPPED)
146
147 def commit(self) -> None:
148 """Commit the first batch, if unpaused.
149
150 If paused or no batches exist, this method does nothing.
151
152 Raises:
153 RuntimeError:
154 If called after stop() has already been called.
155 """
156 with self._state_lock:
157 if self._state == _OrderedSequencerStatus.STOPPED:
158 raise RuntimeError("Ordered sequencer already stopped.")
159
160 if self._state != _OrderedSequencerStatus.PAUSED and self._ordered_batches:
161 # It's okay to commit the same batch more than once. The
162 # operation is idempotent.
163 self._ordered_batches[0].commit()
164
165 def _batch_done_callback(self, success: bool) -> None:
166 """Deal with completion of a batch.
167
168 Called when a batch has finished publishing, with either a success
169 or a failure. (Temporary failures are retried infinitely when
170 ordering keys are enabled.)
171 """
172 ensure_cleanup_and_commit_timer_runs = False
173 with self._state_lock:
174 assert self._state != _OrderedSequencerStatus.PAUSED, (
175 "This method should not be called after pause() because "
176 "pause() should have cancelled all of the batches."
177 )
178 assert self._state != _OrderedSequencerStatus.FINISHED, (
179 "This method should not be called after all batches have been "
180 "finished."
181 )
182
183 # Message futures for the batch have been completed (either with a
184 # result or an exception) already, so remove the batch.
185 self._ordered_batches.popleft()
186
187 if success:
188 if len(self._ordered_batches) == 0:
189 # Mark this sequencer as finished.
190 # If new messages come in for this ordering key and this
191 # sequencer hasn't been cleaned up yet, it will go back
192 # into accepting-messages state. Otherwise, the client
193 # must create a new OrderedSequencer.
194 self._state = _OrderedSequencerStatus.FINISHED
195 # Ensure cleanup thread runs at some point.
196 ensure_cleanup_and_commit_timer_runs = True
197 elif len(self._ordered_batches) == 1:
198 # Wait for messages and/or commit timeout
199 # Ensure there's actually a commit timer thread that'll commit
200 # after a delay.
201 ensure_cleanup_and_commit_timer_runs = True
202 else:
203 # If there is more than one batch, we know that the next batch
204 # must be full and, therefore, ready to be committed.
205 self._ordered_batches[0].commit()
206 else:
207 # Unrecoverable error detected
208 self._pause()
209
210 if ensure_cleanup_and_commit_timer_runs:
211 self._client.ensure_cleanup_and_commit_timer_runs()
212
213 def _pause(self) -> None:
214 """Pause this sequencer: set state to paused, cancel all batches, and
215 clear the list of ordered batches.
216
217 _state_lock must be taken before calling this method.
218 """
219 assert (
220 self._state != _OrderedSequencerStatus.FINISHED
221 ), "Pause should not be called after all batches have finished."
222 self._state = _OrderedSequencerStatus.PAUSED
223 for batch in self._ordered_batches:
224 batch.cancel(
225 batch_base.BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED
226 )
227 self._ordered_batches.clear()
228
229 def unpause(self) -> None:
230 """Unpause this sequencer.
231
232 Raises:
233 RuntimeError:
234 If called when the ordering key has not been paused.
235 """
236 with self._state_lock:
237 if self._state != _OrderedSequencerStatus.PAUSED:
238 raise RuntimeError("Ordering key is not paused.")
239 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
240
241 def _create_batch(
242 self,
243 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
244 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
245 ) -> "_batch.thread.Batch":
246 """Create a new batch using the client's batch class and other stored
247 settings.
248
249 Args:
250 commit_retry:
251 The retry settings to apply when publishing the batch.
252 commit_timeout:
253 The timeout to apply when publishing the batch.
254 """
255 return self._client._batch_class(
256 client=self._client,
257 topic=self._topic,
258 settings=self._client.batch_settings,
259 batch_done_callback=self._batch_done_callback,
260 commit_when_full=False,
261 commit_retry=commit_retry,
262 commit_timeout=commit_timeout,
263 )
264
265 def publish(
266 self,
267 wrapper: PublishMessageWrapper,
268 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
269 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
270 ) -> futures.Future:
271 """Publish message for this ordering key.
272
273 Args:
274 wrapper:
275 The Pub/Sub message wrapper.
276 retry:
277 The retry settings to apply when publishing the message.
278 timeout:
279 The timeout to apply when publishing the message.
280
281 Returns:
282 A class instance that conforms to Python Standard library's
283 :class:`~concurrent.futures.Future` interface (but not an
284 instance of that class). The future might return immediately with a
285 PublishToPausedOrderingKeyException if the ordering key is paused.
286 Otherwise, the future tracks the lifetime of the message publish.
287
288 Raises:
289 RuntimeError:
290 If called after this sequencer has been stopped, either by
291 a call to stop() or after all batches have been published.
292 """
293 with self._state_lock:
294 if self._state == _OrderedSequencerStatus.PAUSED:
295 errored_future = futures.Future()
296 exception = exceptions.PublishToPausedOrderingKeyException(
297 self._ordering_key
298 )
299 errored_future.set_exception(exception)
300 return errored_future
301
302 # If waiting to be cleaned-up, convert to accepting messages to
303 # prevent this sequencer from being cleaned-up only to have another
304 # one with the same ordering key created immediately afterward.
305 if self._state == _OrderedSequencerStatus.FINISHED:
306 self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES
307
308 if self._state == _OrderedSequencerStatus.STOPPED:
309 raise RuntimeError("Cannot publish on a stopped sequencer.")
310
311 assert (
312 self._state == _OrderedSequencerStatus.ACCEPTING_MESSAGES
313 ), "Publish is only allowed in accepting-messages state."
314
315 if not self._ordered_batches:
316 new_batch = self._create_batch(
317 commit_retry=retry, commit_timeout=timeout
318 )
319 self._ordered_batches.append(new_batch)
320
321 batch = self._ordered_batches[-1]
322 future = batch.publish(wrapper)
323 while future is None:
324 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
325 self._ordered_batches.append(batch)
326 future = batch.publish(wrapper)
327
328 return future
329
330 # Used only for testing.
331 def _set_batch(self, batch: "_batch.thread.Batch") -> None:
332 self._ordered_batches = collections.deque([batch])
333
334 # Used only for testing.
335 def _set_batches(self, batches: Iterable["_batch.thread.Batch"]) -> None:
336 self._ordered_batches = collections.deque(batches)
337
338 # Used only for testing.
339 def _get_batches(self) -> Sequence["_batch.thread.Batch"]:
340 return self._ordered_batches