Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py: 27%
45 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# Copyright 2020, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import collections
16import typing
17from typing import Any, Callable, Iterable, Optional
19if typing.TYPE_CHECKING: # pragma: NO COVER
20 from google.cloud.pubsub_v1 import subscriber
23class MessagesOnHold(object):
24 """Tracks messages on hold by ordering key. Not thread-safe."""
26 def __init__(self):
27 self._size = 0
29 # A FIFO queue for the messages that have been received from the server,
30 # but not yet sent to the user callback.
31 # Both ordered and unordered messages may be in this queue. Ordered
32 # message state tracked in _pending_ordered_messages once ordered
33 # messages are taken off this queue.
34 # The tail of the queue is to the right side of the deque; the head is
35 # to the left side.
36 self._messages_on_hold = collections.deque()
38 # Dict of ordering_key -> queue of ordered messages that have not been
39 # delivered to the user.
40 # All ordering keys in this collection have a message in flight. Once
41 # that one is acked or nacked, the next message in the queue for that
42 # ordering key will be sent.
43 # If the queue is empty, it means there's a message for that key in
44 # flight, but there are no pending messages.
45 self._pending_ordered_messages = {}
47 @property
48 def size(self) -> int:
49 """Return the number of messages on hold across ordered and unordered messages.
51 Note that this object may still store information about ordered messages
52 in flight even if size is zero.
54 Returns:
55 The size value.
56 """
57 return self._size
59 def get(self) -> Optional["subscriber.message.Message"]:
60 """Gets a message from the on-hold queue. A message with an ordering
61 key wont be returned if there's another message with the same key in
62 flight.
64 Returns:
65 A message that hasn't been sent to the user yet or ``None`` if there are no
66 messages available.
67 """
68 while self._messages_on_hold:
69 msg = self._messages_on_hold.popleft()
71 if msg.ordering_key:
72 pending_queue = self._pending_ordered_messages.get(msg.ordering_key)
73 if pending_queue is None:
74 # Create empty queue to indicate a message with the
75 # ordering key is in flight.
76 self._pending_ordered_messages[
77 msg.ordering_key
78 ] = collections.deque()
79 self._size = self._size - 1
80 return msg
81 else:
82 # Another message is in flight so add message to end of
83 # queue for this ordering key.
84 pending_queue.append(msg)
85 else:
86 # Unordered messages can be returned without any
87 # restrictions.
88 self._size = self._size - 1
89 return msg
91 return None
93 def put(self, message: "subscriber.message.Message") -> None:
94 """Put a message on hold.
96 Args:
97 message: The message to put on hold.
98 """
99 self._messages_on_hold.append(message)
100 self._size = self._size + 1
102 def activate_ordering_keys(
103 self,
104 ordering_keys: Iterable[str],
105 schedule_message_callback: Callable[["subscriber.message.Message"], Any],
106 ) -> None:
107 """Send the next message in the queue for each of the passed-in
108 ordering keys, if they exist. Clean up state for keys that no longer
109 have any queued messages.
111 See comment at streaming_pull_manager.activate_ordering_keys() for more
112 detail about the impact of this method on load.
114 Args:
115 ordering_keys:
116 The ordering keys to activate. May be empty.
117 schedule_message_callback:
118 The callback to call to schedule a message to be sent to the user.
119 """
120 for key in ordering_keys:
121 assert (
122 self._pending_ordered_messages.get(key) is not None
123 ), "A message queue should exist for every ordered message in flight."
124 next_msg = self._get_next_for_ordering_key(key)
125 if next_msg:
126 # Schedule the next message because the previous was dropped.
127 # Note that this may overload the user's `max_bytes` limit, but
128 # not their `max_messages` limit.
129 schedule_message_callback(next_msg)
130 else:
131 # No more messages for this ordering key, so do clean-up.
132 self._clean_up_ordering_key(key)
134 def _get_next_for_ordering_key(
135 self, ordering_key: str
136 ) -> Optional["subscriber.message.Message"]:
137 """Get next message for ordering key.
139 The client should call clean_up_ordering_key() if this method returns
140 None.
142 Args:
143 ordering_key: Ordering key for which to get the next message.
145 Returns:
146 The next message for this ordering key or None if there aren't any.
147 """
148 queue_for_key = self._pending_ordered_messages.get(ordering_key)
149 if queue_for_key:
150 self._size = self._size - 1
151 return queue_for_key.popleft()
152 return None
154 def _clean_up_ordering_key(self, ordering_key: str) -> None:
155 """Clean up state for an ordering key with no pending messages.
157 Args:
158 ordering_key: The ordering key to clean up.
159 """
160 message_queue = self._pending_ordered_messages.get(ordering_key)
161 assert (
162 message_queue is not None
163 ), "Cleaning up ordering key that does not exist."
164 assert not len(message_queue), (
165 "Ordering key must only be removed if there are no messages "
166 "left for that key."
167 )
168 del self._pending_ordered_messages[ordering_key]