1# Copyright 2019, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import typing
16from typing import Optional
17
18from google.api_core import gapic_v1
19
20from google.cloud.pubsub_v1.publisher._sequencer import base
21from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
22 PublishMessageWrapper,
23)
24
25if typing.TYPE_CHECKING: # pragma: NO COVER
26 from google.cloud.pubsub_v1.publisher import _batch
27 from google.cloud.pubsub_v1.publisher import futures
28 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
29 from google.pubsub_v1.services.publisher.client import OptionalRetry
30
31 from google.cloud.pubsub_v1 import types
32
33
34class UnorderedSequencer(base.Sequencer):
35 """Sequences messages into batches for one topic without any ordering.
36
37 Public methods are NOT thread-safe.
38 """
39
40 def __init__(self, client: "PublisherClient", topic: str):
41 self._client = client
42 self._topic = topic
43 self._current_batch: Optional["_batch.thread.Batch"] = None
44 self._stopped = False
45
46 def is_finished(self) -> bool:
47 """Whether the sequencer is finished and should be cleaned up.
48
49 Returns:
50 Whether the sequencer is finished and should be cleaned up.
51 """
52 # TODO: Implement. Not implementing yet because of possible performance
53 # impact due to extra locking required. This does mean that
54 # UnorderedSequencers don't get cleaned up, but this is the same as
55 # previously existing behavior.
56 return False
57
58 def stop(self) -> None:
59 """Stop the sequencer.
60
61 Subsequent publishes will fail.
62
63 Raises:
64 RuntimeError:
65 If called after stop() has already been called.
66 """
67 if self._stopped:
68 raise RuntimeError("Unordered sequencer already stopped.")
69 self.commit()
70 self._stopped = True
71
72 def commit(self) -> None:
73 """Commit the batch.
74
75 Raises:
76 RuntimeError:
77 If called after stop() has already been called.
78 """
79 if self._stopped:
80 raise RuntimeError("Unordered sequencer already stopped.")
81 if self._current_batch:
82 self._current_batch.commit()
83
84 # At this point, we lose track of the old batch, but we don't
85 # care since we just committed it.
86 # Setting this to None guarantees the next publish() creates a new
87 # batch.
88 self._current_batch = None
89
90 def unpause(self) -> typing.NoReturn:
91 """Not relevant for this class."""
92 raise NotImplementedError
93
94 def _create_batch(
95 self,
96 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
97 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
98 ) -> "_batch.thread.Batch":
99 """Create a new batch using the client's batch class and other stored
100 settings.
101
102 Args:
103 commit_retry:
104 The retry settings to apply when publishing the batch.
105 commit_timeout:
106 The timeout to apply when publishing the batch.
107 """
108 return self._client._batch_class(
109 client=self._client,
110 topic=self._topic,
111 settings=self._client.batch_settings,
112 batch_done_callback=None,
113 commit_when_full=True,
114 commit_retry=commit_retry,
115 commit_timeout=commit_timeout,
116 )
117
118 def publish(
119 self,
120 wrapper: PublishMessageWrapper,
121 retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
122 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
123 ) -> "futures.Future":
124 """Batch message into existing or new batch.
125
126 Args:
127 wrapper:
128 The Pub/Sub message wrapper.
129 retry:
130 The retry settings to apply when publishing the message.
131 timeout:
132 The timeout to apply when publishing the message.
133
134 Returns:
135 An object conforming to the :class:`~concurrent.futures.Future` interface.
136 The future tracks the publishing status of the message.
137
138 Raises:
139 RuntimeError:
140 If called after stop() has already been called.
141
142 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
143 the ``message`` would exceed the max size limit on the backend.
144 """
145 if self._stopped:
146 raise RuntimeError("Unordered sequencer already stopped.")
147
148 if not self._current_batch:
149 newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
150 self._current_batch = newbatch
151
152 batch = self._current_batch
153 future = None
154 while future is None:
155 # Might throw MessageTooLargeError
156 future = batch.publish(wrapper)
157 # batch is full, triggering commit_when_full
158 if future is None:
159 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
160 # At this point, we lose track of the old batch, but we don't
161 # care since it's already committed (because it was full.)
162 self._current_batch = batch
163 return future
164
165 # Used only for testing.
166 def _set_batch(self, batch: "_batch.thread.Batch") -> None:
167 self._current_batch = batch