Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/base.py: 96%
28 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from __future__ import absolute_import
17import abc
18import enum
19import typing
20from typing import Optional, Sequence
23if typing.TYPE_CHECKING: # pragma: NO COVER
24 from google.cloud import pubsub_v1
25 from google.cloud.pubsub_v1 import types
26 from google.pubsub_v1 import types as gapic_types
29class Batch(metaclass=abc.ABCMeta):
30 """The base batching class for Pub/Sub publishing.
32 Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based
33 on :class:`threading.Thread`, is fine for most cases, advanced
34 users may need to implement something based on a different concurrency
35 model.
37 This class defines the interface for the Batch implementation;
38 subclasses may be passed as the ``batch_class`` argument to
39 :class:`~.pubsub_v1.client.PublisherClient`.
41 The batching behavior works like this: When the
42 :class:`~.pubsub_v1.publisher.client.Client` is asked to publish a new
43 message, it requires a batch. The client will see if there is an
44 already-opened batch for the given topic; if there is, then the message
45 is sent to that batch. If there is not, then a new batch is created
46 and the message put there.
48 When a new batch is created, it automatically starts a timer counting
49 down to the maximum latency before the batch should commit.
50 Essentially, if enough time passes, the batch automatically commits
51 regardless of how much is in it. However, if either the message count or
52 size thresholds are encountered first, then the batch will commit early.
53 """
55 def __len__(self):
56 """Return the number of messages currently in the batch."""
57 return len(self.messages)
59 @staticmethod
60 @abc.abstractmethod
61 def make_lock(): # pragma: NO COVER
62 """Return a lock in the chosen concurrency model.
64 Returns:
65 ContextManager: A newly created lock.
66 """
67 raise NotImplementedError
69 @property
70 @abc.abstractmethod
71 def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER
72 """Return the messages currently in the batch.
74 Returns:
75 The messages currently in the batch.
76 """
77 raise NotImplementedError
79 @property
80 @abc.abstractmethod
81 def size(self) -> int: # pragma: NO COVER
82 """Return the total size of all of the messages currently in the batch.
84 The size includes any overhead of the actual ``PublishRequest`` that is
85 sent to the backend.
87 Returns:
88 int: The total size of all of the messages currently
89 in the batch (including the request overhead), in bytes.
90 """
91 raise NotImplementedError
93 @property
94 @abc.abstractmethod
95 def settings(self) -> "types.BatchSettings": # pragma: NO COVER
96 """Return the batch settings.
98 Returns:
99 The batch settings. These are considered immutable once the batch has
100 been opened.
101 """
102 raise NotImplementedError
104 @property
105 @abc.abstractmethod
106 def status(self) -> "BatchStatus": # pragma: NO COVER
107 """Return the status of this batch.
109 Returns:
110 The status of this batch. All statuses are human-readable, all-lowercase
111 strings. The ones represented in the :class:`BaseBatch.Status` enum are
112 special, but other statuses are permitted.
113 """
114 raise NotImplementedError
116 def cancel(
117 self, cancellation_reason: "BatchCancellationReason"
118 ) -> None: # pragma: NO COVER
119 """Complete pending futures with an exception.
121 This method must be called before publishing starts (ie: while the
122 batch is still accepting messages.)
124 Args:
125 cancellation_reason:
126 The reason why this batch has been cancelled.
127 """
128 raise NotImplementedError
130 @abc.abstractmethod
131 def publish(
132 self, message: "gapic_types.PubsubMessage"
133 ) -> Optional["pubsub_v1.publisher.futures.Future"]: # pragma: NO COVER
134 """Publish a single message.
136 Add the given message to this object; this will cause it to be
137 published once the batch either has enough messages or a sufficient
138 period of time has elapsed.
140 This method is called by :meth:`~.PublisherClient.publish`.
142 Args:
143 message: The Pub/Sub message.
145 Returns:
146 An object conforming to the :class:`concurrent.futures.Future` interface.
147 If :data:`None` is returned, that signals that the batch cannot
148 accept a message.
149 """
150 raise NotImplementedError
153class BatchStatus(str, enum.Enum):
154 """An enum-like class representing valid statuses for a batch."""
156 ACCEPTING_MESSAGES = "accepting messages"
157 STARTING = "starting"
158 IN_PROGRESS = "in progress"
159 ERROR = "error"
160 SUCCESS = "success"
163class BatchCancellationReason(str, enum.Enum):
164 """An enum-like class representing reasons why a batch was cancelled."""
166 PRIOR_ORDERED_MESSAGE_FAILED = (
167 "Batch cancelled because prior ordered message for the same key has "
168 "failed. This batch has been cancelled to avoid out-of-order publish."
169 )
170 CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."