Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import typing 

16from typing import Optional 

17 

18from google.api_core import gapic_v1 

19 

20from google.cloud.pubsub_v1.publisher._sequencer import base 

21from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( 

22 PublishMessageWrapper, 

23) 

24 

25if typing.TYPE_CHECKING: # pragma: NO COVER 

26 from google.cloud.pubsub_v1.publisher import _batch 

27 from google.cloud.pubsub_v1.publisher import futures 

28 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient 

29 from google.pubsub_v1.services.publisher.client import OptionalRetry 

30 

31 from google.cloud.pubsub_v1 import types 

32 

33 

34class UnorderedSequencer(base.Sequencer): 

35 """Sequences messages into batches for one topic without any ordering. 

36 

37 Public methods are NOT thread-safe. 

38 """ 

39 

40 def __init__(self, client: "PublisherClient", topic: str): 

41 self._client = client 

42 self._topic = topic 

43 self._current_batch: Optional["_batch.thread.Batch"] = None 

44 self._stopped = False 

45 

46 def is_finished(self) -> bool: 

47 """Whether the sequencer is finished and should be cleaned up. 

48 

49 Returns: 

50 Whether the sequencer is finished and should be cleaned up. 

51 """ 

52 # TODO: Implement. Not implementing yet because of possible performance 

53 # impact due to extra locking required. This does mean that 

54 # UnorderedSequencers don't get cleaned up, but this is the same as 

55 # previously existing behavior. 

56 return False 

57 

58 def stop(self) -> None: 

59 """Stop the sequencer. 

60 

61 Subsequent publishes will fail. 

62 

63 Raises: 

64 RuntimeError: 

65 If called after stop() has already been called. 

66 """ 

67 if self._stopped: 

68 raise RuntimeError("Unordered sequencer already stopped.") 

69 self.commit() 

70 self._stopped = True 

71 

72 def commit(self) -> None: 

73 """Commit the batch. 

74 

75 Raises: 

76 RuntimeError: 

77 If called after stop() has already been called. 

78 """ 

79 if self._stopped: 

80 raise RuntimeError("Unordered sequencer already stopped.") 

81 if self._current_batch: 

82 self._current_batch.commit() 

83 

84 # At this point, we lose track of the old batch, but we don't 

85 # care since we just committed it. 

86 # Setting this to None guarantees the next publish() creates a new 

87 # batch. 

88 self._current_batch = None 

89 

90 def unpause(self) -> typing.NoReturn: 

91 """Not relevant for this class.""" 

92 raise NotImplementedError 

93 

94 def _create_batch( 

95 self, 

96 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

97 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

98 ) -> "_batch.thread.Batch": 

99 """Create a new batch using the client's batch class and other stored 

100 settings. 

101 

102 Args: 

103 commit_retry: 

104 The retry settings to apply when publishing the batch. 

105 commit_timeout: 

106 The timeout to apply when publishing the batch. 

107 """ 

108 return self._client._batch_class( 

109 client=self._client, 

110 topic=self._topic, 

111 settings=self._client.batch_settings, 

112 batch_done_callback=None, 

113 commit_when_full=True, 

114 commit_retry=commit_retry, 

115 commit_timeout=commit_timeout, 

116 ) 

117 

118 def publish( 

119 self, 

120 wrapper: PublishMessageWrapper, 

121 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

122 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

123 ) -> "futures.Future": 

124 """Batch message into existing or new batch. 

125 

126 Args: 

127 wrapper: 

128 The Pub/Sub message wrapper. 

129 retry: 

130 The retry settings to apply when publishing the message. 

131 timeout: 

132 The timeout to apply when publishing the message. 

133 

134 Returns: 

135 An object conforming to the :class:`~concurrent.futures.Future` interface. 

136 The future tracks the publishing status of the message. 

137 

138 Raises: 

139 RuntimeError: 

140 If called after stop() has already been called. 

141 

142 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

143 the ``message`` would exceed the max size limit on the backend. 

144 """ 

145 if self._stopped: 

146 raise RuntimeError("Unordered sequencer already stopped.") 

147 

148 if not self._current_batch: 

149 newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

150 self._current_batch = newbatch 

151 

152 batch = self._current_batch 

153 future = None 

154 while future is None: 

155 # Might throw MessageTooLargeError 

156 future = batch.publish(wrapper) 

157 # batch is full, triggering commit_when_full 

158 if future is None: 

159 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

160 # At this point, we lose track of the old batch, but we don't 

161 # care since it's already committed (because it was full.) 

162 self._current_batch = batch 

163 return future 

164 

165 # Used only for testing. 

166 def _set_batch(self, batch: "_batch.thread.Batch") -> None: 

167 self._current_batch = batch