1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import typing
16from typing import Optional
17
18from google.api_core import gapic_v1
19
20from google.cloud.pubsub_v1.publisher._sequencer import base
21from google.pubsub_v1 import types as gapic_types
22
23if typing.TYPE_CHECKING: # pragma: NO COVER
24 from google.cloud.pubsub_v1.publisher import _batch
25 from google.cloud.pubsub_v1.publisher import futures
26 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
27 from google.pubsub_v1.services.publisher.client import OptionalRetry
28
29 from google.cloud.pubsub_v1 import types
30
31
32class UnorderedSequencer(base.Sequencer):
33 """Sequences messages into batches for one topic without any ordering.
34
35 Public methods are NOT thread-safe.
36 """
37
38 def __init__(self, client: "PublisherClient", topic: str):
39 self._client = client
40 self._topic = topic
41 self._current_batch: Optional["_batch.thread.Batch"] = None
42 self._stopped = False
43
44 def is_finished(self) -> bool:
45 """Whether the sequencer is finished and should be cleaned up.
46
47 Returns:
48 Whether the sequencer is finished and should be cleaned up.
49 """
50 # TODO: Implement. Not implementing yet because of possible performance
51 # impact due to extra locking required. This does mean that
52 # UnorderedSequencers don't get cleaned up, but this is the same as
53 # previously existing behavior.
54 return False
55
56 def stop(self) -> None:
57 """Stop the sequencer.
58
59 Subsequent publishes will fail.
60
61 Raises:
62 RuntimeError:
63 If called after stop() has already been called.
64 """
65 if self._stopped:
66 raise RuntimeError("Unordered sequencer already stopped.")
67 self.commit()
68 self._stopped = True
69
70 def commit(self) -> None:
71 """Commit the batch.
72
73 Raises:
74 RuntimeError:
75 If called after stop() has already been called.
76 """
77 if self._stopped:
78 raise RuntimeError("Unordered sequencer already stopped.")
79 if self._current_batch:
80 self._current_batch.commit()
81
82 # At this point, we lose track of the old batch, but we don't
83 # care since we just committed it.
84 # Setting this to None guarantees the next publish() creates a new
85 # batch.
86 self._current_batch = None
87
88 def unpause(self) -> typing.NoReturn:
89 """Not relevant for this class."""
90 raise NotImplementedError
91
92 def _create_batch(
93 self,
94 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
95 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
96 ) -> "_batch.thread.Batch":
97 """Create a new batch using the client's batch class and other stored
98 settings.
99
100 Args:
101 commit_retry:
102 The retry settings to apply when publishing the batch.
103 commit_timeout:
104 The timeout to apply when publishing the batch.
105 """
106 return self._client._batch_class(
107 client=self._client,
108 topic=self._topic,
109 settings=self._client.batch_settings,
110 batch_done_callback=None,
111 commit_when_full=True,
112 commit_retry=commit_retry,
113 commit_timeout=commit_timeout,
114 )
115
116 def publish(
117 self,
118 message: gapic_types.PubsubMessage,
119 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
120 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
121 ) -> "futures.Future":
122 """Batch message into existing or new batch.
123
124 Args:
125 message:
126 The Pub/Sub message.
127 retry:
128 The retry settings to apply when publishing the message.
129 timeout:
130 The timeout to apply when publishing the message.
131
132 Returns:
133 An object conforming to the :class:`~concurrent.futures.Future` interface.
134 The future tracks the publishing status of the message.
135
136 Raises:
137 RuntimeError:
138 If called after stop() has already been called.
139
140 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
141 the ``message`` would exceed the max size limit on the backend.
142 """
143 if self._stopped:
144 raise RuntimeError("Unordered sequencer already stopped.")
145
146 if not self._current_batch:
147 newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
148 self._current_batch = newbatch
149
150 batch = self._current_batch
151 future = None
152 while future is None:
153 # Might throw MessageTooLargeError
154 future = batch.publish(message)
155 # batch is full, triggering commit_when_full
156 if future is None:
157 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
158 # At this point, we lose track of the old batch, but we don't
159 # care since it's already committed (because it was full.)
160 self._current_batch = batch
161 return future
162
163 # Used only for testing.
164 def _set_batch(self, batch: "_batch.thread.Batch") -> None:
165 self._current_batch = batch