Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py: 32%

44 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# Copyright 2019, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import typing 

16from typing import Optional 

17 

18from google.api_core import gapic_v1 

19 

20from google.cloud.pubsub_v1.publisher._sequencer import base 

21from google.pubsub_v1 import types as gapic_types 

22 

23if typing.TYPE_CHECKING: # pragma: NO COVER 

24 from google.cloud.pubsub_v1.publisher import _batch 

25 from google.cloud.pubsub_v1.publisher import futures 

26 from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient 

27 from google.pubsub_v1.services.publisher.client import OptionalRetry 

28 

29 from google.cloud.pubsub_v1 import types 

30 

31 

32class UnorderedSequencer(base.Sequencer): 

33 """Sequences messages into batches for one topic without any ordering. 

34 

35 Public methods are NOT thread-safe. 

36 """ 

37 

38 def __init__(self, client: "PublisherClient", topic: str): 

39 self._client = client 

40 self._topic = topic 

41 self._current_batch: Optional["_batch.thread.Batch"] = None 

42 self._stopped = False 

43 

44 def is_finished(self) -> bool: 

45 """Whether the sequencer is finished and should be cleaned up. 

46 

47 Returns: 

48 Whether the sequencer is finished and should be cleaned up. 

49 """ 

50 # TODO: Implement. Not implementing yet because of possible performance 

51 # impact due to extra locking required. This does mean that 

52 # UnorderedSequencers don't get cleaned up, but this is the same as 

53 # previously existing behavior. 

54 return False 

55 

56 def stop(self) -> None: 

57 """Stop the sequencer. 

58 

59 Subsequent publishes will fail. 

60 

61 Raises: 

62 RuntimeError: 

63 If called after stop() has already been called. 

64 """ 

65 if self._stopped: 

66 raise RuntimeError("Unordered sequencer already stopped.") 

67 self.commit() 

68 self._stopped = True 

69 

70 def commit(self) -> None: 

71 """Commit the batch. 

72 

73 Raises: 

74 RuntimeError: 

75 If called after stop() has already been called. 

76 """ 

77 if self._stopped: 

78 raise RuntimeError("Unordered sequencer already stopped.") 

79 if self._current_batch: 

80 self._current_batch.commit() 

81 

82 # At this point, we lose track of the old batch, but we don't 

83 # care since we just committed it. 

84 # Setting this to None guarantees the next publish() creates a new 

85 # batch. 

86 self._current_batch = None 

87 

88 def unpause(self) -> typing.NoReturn: 

89 """Not relevant for this class.""" 

90 raise NotImplementedError 

91 

92 def _create_batch( 

93 self, 

94 commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

95 commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

96 ) -> "_batch.thread.Batch": 

97 """Create a new batch using the client's batch class and other stored 

98 settings. 

99 

100 Args: 

101 commit_retry: 

102 The retry settings to apply when publishing the batch. 

103 commit_timeout: 

104 The timeout to apply when publishing the batch. 

105 """ 

106 return self._client._batch_class( 

107 client=self._client, 

108 topic=self._topic, 

109 settings=self._client.batch_settings, 

110 batch_done_callback=None, 

111 commit_when_full=True, 

112 commit_retry=commit_retry, 

113 commit_timeout=commit_timeout, 

114 ) 

115 

116 def publish( 

117 self, 

118 message: gapic_types.PubsubMessage, 

119 retry: "OptionalRetry" = gapic_v1.method.DEFAULT, 

120 timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, 

121 ) -> "futures.Future": 

122 """Batch message into existing or new batch. 

123 

124 Args: 

125 message: 

126 The Pub/Sub message. 

127 retry: 

128 The retry settings to apply when publishing the message. 

129 timeout: 

130 The timeout to apply when publishing the message. 

131 

132 Returns: 

133 An object conforming to the :class:`~concurrent.futures.Future` interface. 

134 The future tracks the publishing status of the message. 

135 

136 Raises: 

137 RuntimeError: 

138 If called after stop() has already been called. 

139 

140 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing 

141 the ``message`` would exceed the max size limit on the backend. 

142 """ 

143 if self._stopped: 

144 raise RuntimeError("Unordered sequencer already stopped.") 

145 

146 if not self._current_batch: 

147 newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

148 self._current_batch = newbatch 

149 

150 batch = self._current_batch 

151 future = None 

152 while future is None: 

153 # Might throw MessageTooLargeError 

154 future = batch.publish(message) 

155 # batch is full, triggering commit_when_full 

156 if future is None: 

157 batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) 

158 # At this point, we lose track of the old batch, but we don't 

159 # care since it's already committed (because it was full.) 

160 self._current_batch = batch 

161 return future 

162 

163 # Used only for testing. 

164 def _set_batch(self, batch: "_batch.thread.Batch") -> None: 

165 self._current_batch = batch