Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/_batch/base.py: 93%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

30 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import abc 

18import enum 

19import typing 

20from typing import Optional, Sequence 

21 

22from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( 

23 PublishMessageWrapper, 

24) 

25 

26 

27if typing.TYPE_CHECKING: # pragma: NO COVER 

28 from google.cloud import pubsub_v1 

29 from google.cloud.pubsub_v1 import types 

30 from google.pubsub_v1 import types as gapic_types 

31 

32 

33class Batch(metaclass=abc.ABCMeta): 

34 """The base batching class for Pub/Sub publishing. 

35 

36 Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based 

37 on :class:`threading.Thread`, is fine for most cases, advanced 

38 users may need to implement something based on a different concurrency 

39 model. 

40 

41 This class defines the interface for the Batch implementation; 

42 subclasses may be passed as the ``batch_class`` argument to 

43 :class:`~.pubsub_v1.client.PublisherClient`. 

44 

45 The batching behavior works like this: When the 

46 :class:`~.pubsub_v1.publisher.client.Client` is asked to publish a new 

47 message, it requires a batch. The client will see if there is an 

48 already-opened batch for the given topic; if there is, then the message 

49 is sent to that batch. If there is not, then a new batch is created 

50 and the message put there. 

51 

52 When a new batch is created, it automatically starts a timer counting 

53 down to the maximum latency before the batch should commit. 

54 Essentially, if enough time passes, the batch automatically commits 

55 regardless of how much is in it. However, if either the message count or 

56 size thresholds are encountered first, then the batch will commit early. 

57 """ 

58 

59 def __len__(self): 

60 """Return the number of messages currently in the batch.""" 

61 return len(self.message_wrappers) 

62 

63 @staticmethod 

64 @abc.abstractmethod 

65 def make_lock(): # pragma: NO COVER 

66 """Return a lock in the chosen concurrency model. 

67 

68 Returns: 

69 ContextManager: A newly created lock. 

70 """ 

71 raise NotImplementedError 

72 

73 @property 

74 @abc.abstractmethod 

75 def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER 

76 """Return the messages currently in the batch. 

77 

78 Returns: 

79 The messages currently in the batch. 

80 """ 

81 raise NotImplementedError 

82 

83 @property 

84 @abc.abstractmethod 

85 def size(self) -> int: # pragma: NO COVER 

86 """Return the total size of all of the messages currently in the batch. 

87 

88 The size includes any overhead of the actual ``PublishRequest`` that is 

89 sent to the backend. 

90 

91 Returns: 

92 int: The total size of all of the messages currently 

93 in the batch (including the request overhead), in bytes. 

94 """ 

95 raise NotImplementedError 

96 

97 @property 

98 @abc.abstractmethod 

99 def settings(self) -> "types.BatchSettings": # pragma: NO COVER 

100 """Return the batch settings. 

101 

102 Returns: 

103 The batch settings. These are considered immutable once the batch has 

104 been opened. 

105 """ 

106 raise NotImplementedError 

107 

108 @property 

109 @abc.abstractmethod 

110 def status(self) -> "BatchStatus": # pragma: NO COVER 

111 """Return the status of this batch. 

112 

113 Returns: 

114 The status of this batch. All statuses are human-readable, all-lowercase 

115 strings. The ones represented in the :class:`BaseBatch.Status` enum are 

116 special, but other statuses are permitted. 

117 """ 

118 raise NotImplementedError 

119 

120 def cancel( 

121 self, cancellation_reason: "BatchCancellationReason" 

122 ) -> None: # pragma: NO COVER 

123 """Complete pending futures with an exception. 

124 

125 This method must be called before publishing starts (ie: while the 

126 batch is still accepting messages.) 

127 

128 Args: 

129 cancellation_reason: 

130 The reason why this batch has been cancelled. 

131 """ 

132 raise NotImplementedError 

133 

134 @abc.abstractmethod 

135 def publish( 

136 self, message: "gapic_types.PubsubMessage" 

137 ) -> Optional["pubsub_v1.publisher.futures.Future"]: # pragma: NO COVER 

138 """Publish a single message. 

139 

140 Add the given message to this object; this will cause it to be 

141 published once the batch either has enough messages or a sufficient 

142 period of time has elapsed. 

143 

144 This method is called by :meth:`~.PublisherClient.publish`. 

145 

146 Args: 

147 message: The Pub/Sub message. 

148 

149 Returns: 

150 An object conforming to the :class:`concurrent.futures.Future` interface. 

151 If :data:`None` is returned, that signals that the batch cannot 

152 accept a message. 

153 """ 

154 raise NotImplementedError 

155 

156 

157class BatchStatus(str, enum.Enum): 

158 """An enum-like class representing valid statuses for a batch.""" 

159 

160 ACCEPTING_MESSAGES = "accepting messages" 

161 STARTING = "starting" 

162 IN_PROGRESS = "in progress" 

163 ERROR = "error" 

164 SUCCESS = "success" 

165 

166 

167class BatchCancellationReason(str, enum.Enum): 

168 """An enum-like class representing reasons why a batch was cancelled.""" 

169 

170 PRIOR_ORDERED_MESSAGE_FAILED = ( 

171 "Batch cancelled because prior ordered message for the same key has " 

172 "failed. This batch has been cancelled to avoid out-of-order publish." 

173 ) 

174 CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."