1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import abc
18import enum
19import typing
20from typing import Optional, Sequence
21
22from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
23 PublishMessageWrapper,
24)
25
26
27if typing.TYPE_CHECKING: # pragma: NO COVER
28 from google.cloud import pubsub_v1
29 from google.cloud.pubsub_v1 import types
30 from google.pubsub_v1 import types as gapic_types
31
32
33class Batch(metaclass=abc.ABCMeta):
34 """The base batching class for Pub/Sub publishing.
35
36 Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based
37 on :class:`threading.Thread`, is fine for most cases, advanced
38 users may need to implement something based on a different concurrency
39 model.
40
41 This class defines the interface for the Batch implementation;
42 subclasses may be passed as the ``batch_class`` argument to
43 :class:`~.pubsub_v1.client.PublisherClient`.
44
45 The batching behavior works like this: When the
46 :class:`~.pubsub_v1.publisher.client.Client` is asked to publish a new
47 message, it requires a batch. The client will see if there is an
48 already-opened batch for the given topic; if there is, then the message
49 is sent to that batch. If there is not, then a new batch is created
50 and the message put there.
51
52 When a new batch is created, it automatically starts a timer counting
53 down to the maximum latency before the batch should commit.
54 Essentially, if enough time passes, the batch automatically commits
55 regardless of how much is in it. However, if either the message count or
56 size thresholds are encountered first, then the batch will commit early.
57 """
58
59 def __len__(self):
60 """Return the number of messages currently in the batch."""
61 return len(self.message_wrappers)
62
63 @staticmethod
64 @abc.abstractmethod
65 def make_lock(): # pragma: NO COVER
66 """Return a lock in the chosen concurrency model.
67
68 Returns:
69 ContextManager: A newly created lock.
70 """
71 raise NotImplementedError
72
73 @property
74 @abc.abstractmethod
75 def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER
76 """Return the messages currently in the batch.
77
78 Returns:
79 The messages currently in the batch.
80 """
81 raise NotImplementedError
82
83 @property
84 @abc.abstractmethod
85 def size(self) -> int: # pragma: NO COVER
86 """Return the total size of all of the messages currently in the batch.
87
88 The size includes any overhead of the actual ``PublishRequest`` that is
89 sent to the backend.
90
91 Returns:
92 int: The total size of all of the messages currently
93 in the batch (including the request overhead), in bytes.
94 """
95 raise NotImplementedError
96
97 @property
98 @abc.abstractmethod
99 def settings(self) -> "types.BatchSettings": # pragma: NO COVER
100 """Return the batch settings.
101
102 Returns:
103 The batch settings. These are considered immutable once the batch has
104 been opened.
105 """
106 raise NotImplementedError
107
108 @property
109 @abc.abstractmethod
110 def status(self) -> "BatchStatus": # pragma: NO COVER
111 """Return the status of this batch.
112
113 Returns:
114 The status of this batch. All statuses are human-readable, all-lowercase
115 strings. The ones represented in the :class:`BaseBatch.Status` enum are
116 special, but other statuses are permitted.
117 """
118 raise NotImplementedError
119
120 def cancel(
121 self, cancellation_reason: "BatchCancellationReason"
122 ) -> None: # pragma: NO COVER
123 """Complete pending futures with an exception.
124
125 This method must be called before publishing starts (ie: while the
126 batch is still accepting messages.)
127
128 Args:
129 cancellation_reason:
130 The reason why this batch has been cancelled.
131 """
132 raise NotImplementedError
133
134 @abc.abstractmethod
135 def publish(
136 self, message: "gapic_types.PubsubMessage"
137 ) -> Optional["pubsub_v1.publisher.futures.Future"]: # pragma: NO COVER
138 """Publish a single message.
139
140 Add the given message to this object; this will cause it to be
141 published once the batch either has enough messages or a sufficient
142 period of time has elapsed.
143
144 This method is called by :meth:`~.PublisherClient.publish`.
145
146 Args:
147 message: The Pub/Sub message.
148
149 Returns:
150 An object conforming to the :class:`concurrent.futures.Future` interface.
151 If :data:`None` is returned, that signals that the batch cannot
152 accept a message.
153 """
154 raise NotImplementedError
155
156
157class BatchStatus(str, enum.Enum):
158 """An enum-like class representing valid statuses for a batch."""
159
160 ACCEPTING_MESSAGES = "accepting messages"
161 STARTING = "starting"
162 IN_PROGRESS = "in progress"
163 ERROR = "error"
164 SUCCESS = "success"
165
166
167class BatchCancellationReason(str, enum.Enum):
168 """An enum-like class representing reasons why a batch was cancelled."""
169
170 PRIOR_ORDERED_MESSAGE_FAILED = (
171 "Batch cancelled because prior ordered message for the same key has "
172 "failed. This batch has been cancelled to avoid out-of-order publish."
173 )
174 CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."