Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/base.py: 96%

28 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import abc 

18import enum 

19import typing 

20from typing import Optional, Sequence 

21 

22 

23if typing.TYPE_CHECKING: # pragma: NO COVER 

24 from google.cloud import pubsub_v1 

25 from google.cloud.pubsub_v1 import types 

26 from google.pubsub_v1 import types as gapic_types 

27 

28 

29class Batch(metaclass=abc.ABCMeta): 

30 """The base batching class for Pub/Sub publishing. 

31 

32 Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based 

33 on :class:`threading.Thread`, is fine for most cases, advanced 

34 users may need to implement something based on a different concurrency 

35 model. 

36 

37 This class defines the interface for the Batch implementation; 

38 subclasses may be passed as the ``batch_class`` argument to 

39 :class:`~.pubsub_v1.client.PublisherClient`. 

40 

41 The batching behavior works like this: When the 

42 :class:`~.pubsub_v1.publisher.client.Client` is asked to publish a new 

43 message, it requires a batch. The client will see if there is an 

44 already-opened batch for the given topic; if there is, then the message 

45 is sent to that batch. If there is not, then a new batch is created 

46 and the message put there. 

47 

48 When a new batch is created, it automatically starts a timer counting 

49 down to the maximum latency before the batch should commit. 

50 Essentially, if enough time passes, the batch automatically commits 

51 regardless of how much is in it. However, if either the message count or 

52 size thresholds are encountered first, then the batch will commit early. 

53 """ 

54 

55 def __len__(self): 

56 """Return the number of messages currently in the batch.""" 

57 return len(self.messages) 

58 

59 @staticmethod 

60 @abc.abstractmethod 

61 def make_lock(): # pragma: NO COVER 

62 """Return a lock in the chosen concurrency model. 

63 

64 Returns: 

65 ContextManager: A newly created lock. 

66 """ 

67 raise NotImplementedError 

68 

69 @property 

70 @abc.abstractmethod 

71 def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER 

72 """Return the messages currently in the batch. 

73 

74 Returns: 

75 The messages currently in the batch. 

76 """ 

77 raise NotImplementedError 

78 

79 @property 

80 @abc.abstractmethod 

81 def size(self) -> int: # pragma: NO COVER 

82 """Return the total size of all of the messages currently in the batch. 

83 

84 The size includes any overhead of the actual ``PublishRequest`` that is 

85 sent to the backend. 

86 

87 Returns: 

88 int: The total size of all of the messages currently 

89 in the batch (including the request overhead), in bytes. 

90 """ 

91 raise NotImplementedError 

92 

93 @property 

94 @abc.abstractmethod 

95 def settings(self) -> "types.BatchSettings": # pragma: NO COVER 

96 """Return the batch settings. 

97 

98 Returns: 

99 The batch settings. These are considered immutable once the batch has 

100 been opened. 

101 """ 

102 raise NotImplementedError 

103 

104 @property 

105 @abc.abstractmethod 

106 def status(self) -> "BatchStatus": # pragma: NO COVER 

107 """Return the status of this batch. 

108 

109 Returns: 

110 The status of this batch. All statuses are human-readable, all-lowercase 

111 strings. The ones represented in the :class:`BaseBatch.Status` enum are 

112 special, but other statuses are permitted. 

113 """ 

114 raise NotImplementedError 

115 

116 def cancel( 

117 self, cancellation_reason: "BatchCancellationReason" 

118 ) -> None: # pragma: NO COVER 

119 """Complete pending futures with an exception. 

120 

121 This method must be called before publishing starts (ie: while the 

122 batch is still accepting messages.) 

123 

124 Args: 

125 cancellation_reason: 

126 The reason why this batch has been cancelled. 

127 """ 

128 raise NotImplementedError 

129 

130 @abc.abstractmethod 

131 def publish( 

132 self, message: "gapic_types.PubsubMessage" 

133 ) -> Optional["pubsub_v1.publisher.futures.Future"]: # pragma: NO COVER 

134 """Publish a single message. 

135 

136 Add the given message to this object; this will cause it to be 

137 published once the batch either has enough messages or a sufficient 

138 period of time has elapsed. 

139 

140 This method is called by :meth:`~.PublisherClient.publish`. 

141 

142 Args: 

143 message: The Pub/Sub message. 

144 

145 Returns: 

146 An object conforming to the :class:`concurrent.futures.Future` interface. 

147 If :data:`None` is returned, that signals that the batch cannot 

148 accept a message. 

149 """ 

150 raise NotImplementedError 

151 

152 

153class BatchStatus(str, enum.Enum): 

154 """An enum-like class representing valid statuses for a batch.""" 

155 

156 ACCEPTING_MESSAGES = "accepting messages" 

157 STARTING = "starting" 

158 IN_PROGRESS = "in progress" 

159 ERROR = "error" 

160 SUCCESS = "success" 

161 

162 

163class BatchCancellationReason(str, enum.Enum): 

164 """An enum-like class representing reasons why a batch was cancelled.""" 

165 

166 PRIOR_ORDERED_MESSAGE_FAILED = ( 

167 "Batch cancelled because prior ordered message for the same key has " 

168 "failed. This batch has been cancelled to avoid out-of-order publish." 

169 ) 

170 CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."