/src/uWebSockets/src/TopicTree.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_TOPICTREE_H |
19 | | #define UWS_TOPICTREE_H |
20 | | |
21 | | #include <map> |
22 | | #include <list> |
23 | | #include <iostream> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <memory> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | #include <string_view> |
30 | | #include <functional> |
31 | | #include <set> |
32 | | #include <string> |
33 | | #include <exception> |
34 | | |
35 | | namespace uWS { |
36 | | |
37 | | struct Subscriber; |
38 | | |
39 | | struct Topic : std::unordered_set<Subscriber *> { |
40 | | |
41 | 10.6M | Topic(std::string_view topic) : name(topic) { |
42 | | |
43 | 10.6M | } |
44 | | |
45 | | std::string name; |
46 | | }; |
47 | | |
48 | | struct Subscriber { |
49 | | |
50 | | template <typename, typename> friend struct TopicTree; |
51 | | |
52 | | private: |
53 | | /* We use a factory */ |
54 | 106k | Subscriber() = default; |
55 | | |
56 | | /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */ |
57 | | Subscriber *prev, *next; |
58 | | |
59 | | /* Any one subscriber can be part of at most 32 publishes before it needs a drain, |
60 | | * or whatever encoding of runs or whatever we might do in the future */ |
61 | | uint16_t messageIndices[32]; |
62 | | |
63 | | /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */ |
64 | | unsigned char numMessageIndices = 0; |
65 | | |
66 | | public: |
67 | | |
68 | | /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */ |
69 | | std::set<Topic *> topics; |
70 | | |
71 | | /* User data */ |
72 | | void *user; |
73 | | |
74 | 155k | bool needsDrainage() { |
75 | 155k | return numMessageIndices; |
76 | 155k | } |
77 | | }; |
78 | | |
79 | | template <typename T, typename B> |
80 | | struct TopicTree { |
81 | | |
82 | | enum IteratorFlags { |
83 | | LAST = 1, |
84 | | FIRST = 2 |
85 | | }; |
86 | | |
87 | | /* Whomever is iterating this topic is locked to not modify its own list */ |
88 | | Subscriber *iteratingSubscriber = nullptr; |
89 | | |
90 | | private: |
91 | | |
92 | | /* The drain callback must not publish, unsubscribe or subscribe. |
93 | | * It must only cork, uncork, send, write */ |
94 | | std::function<bool(Subscriber *, T &, IteratorFlags)> cb; |
95 | | |
96 | | /* The topics */ |
97 | | std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics; |
98 | | |
99 | | /* List of subscribers that needs drainage */ |
100 | | Subscriber *drainableSubscribers = nullptr; |
101 | | |
102 | | /* Palette of outgoing messages, up to 64k */ |
103 | | std::vector<T> outgoingMessages; |
104 | | |
105 | 10.6M | void checkIteratingSubscriber(Subscriber *s) { |
106 | | /* Notify user that they are doing something wrong here */ |
107 | 10.6M | if (iteratingSubscriber == s) { |
108 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; |
109 | 0 | std::terminate(); |
110 | 0 | } |
111 | 10.6M | } |
112 | | |
113 | | /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */ |
114 | 12.8k | void drainImpl(Subscriber *s) { |
115 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() |
116 | | * since WebSocket::send will call drain from within the cb in that case.*/ |
117 | 12.8k | int numMessageIndices = s->numMessageIndices; |
118 | 12.8k | s->numMessageIndices = 0; |
119 | | |
120 | | /* Then we emit cb */ |
121 | 47.5k | for (int i = 0; i < numMessageIndices; i++) { |
122 | 35.8k | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; |
123 | | |
124 | 35.8k | int flags = (i == numMessageIndices - 1) ? LAST : 0; |
125 | | |
126 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ |
127 | 35.8k | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { |
128 | 1.14k | break; |
129 | 1.14k | } |
130 | 35.8k | } |
131 | 12.8k | } |
132 | | |
133 | 43.2k | void unlinkDrainableSubscriber(Subscriber *s) { |
134 | 43.2k | if (s->prev) { |
135 | 0 | s->prev->next = s->next; |
136 | 0 | } |
137 | 43.2k | if (s->next) { |
138 | 534 | s->next->prev = s->prev; |
139 | 534 | } |
140 | | /* If we are the head, then we also need to reset the head */ |
141 | 43.2k | if (drainableSubscribers == s) { |
142 | 43.2k | drainableSubscribers = s->next; |
143 | 43.2k | } |
144 | 43.2k | } |
145 | | |
146 | | public: |
147 | | |
148 | 5.94k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { |
149 | | |
150 | 5.94k | } |
151 | | |
152 | | /* Returns nullptr if not found */ |
153 | 10.6M | Topic *lookupTopic(std::string_view topic) { |
154 | 10.6M | auto it = topics.find(topic); |
155 | 10.6M | if (it == topics.end()) { |
156 | 10.6M | return nullptr; |
157 | 10.6M | } |
158 | 0 | return it->second.get(); |
159 | 10.6M | } |
160 | | |
161 | | /* Subscribe fails if we already are subscribed */ |
162 | 10.6M | Topic *subscribe(Subscriber *s, std::string_view topic) { |
163 | | /* Notify user that they are doing something wrong here */ |
164 | 10.6M | checkIteratingSubscriber(s); |
165 | | |
166 | | /* Lookup or create new topic */ |
167 | 10.6M | Topic *topicPtr = lookupTopic(topic); |
168 | 10.6M | if (!topicPtr) { |
169 | 10.6M | Topic *newTopic = new Topic(topic); |
170 | 10.6M | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); |
171 | 10.6M | topicPtr = newTopic; |
172 | 10.6M | } |
173 | | |
174 | | /* Insert us in topic, insert topic in us */ |
175 | 10.6M | auto [it, inserted] = s->topics.insert(topicPtr); |
176 | 10.6M | if (!inserted) { |
177 | 0 | return nullptr; |
178 | 0 | } |
179 | 10.6M | topicPtr->insert(s); |
180 | | |
181 | | /* Success */ |
182 | 10.6M | return topicPtr; |
183 | 10.6M | } |
184 | | |
185 | | /* Returns ok, last, newCount */ |
186 | | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { |
187 | | /* Notify user that they are doing something wrong here */ |
188 | | checkIteratingSubscriber(s); |
189 | | |
190 | | /* Lookup topic */ |
191 | | Topic *topicPtr = lookupTopic(topic); |
192 | | if (!topicPtr) { |
193 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ |
194 | | return {false, false, -1}; |
195 | | } |
196 | | |
197 | | /* Erase from our list first */ |
198 | | if (s->topics.erase(topicPtr) == 0) { |
199 | | return {false, false, -1}; |
200 | | } |
201 | | |
202 | | /* Remove us from topic */ |
203 | | topicPtr->erase(s); |
204 | | |
205 | | int newCount = (int) topicPtr->size(); |
206 | | |
207 | | /* If there is no subscriber to this topic, remove it */ |
208 | | if (!topicPtr->size()) { |
209 | | /* Unique_ptr deletes the topic */ |
210 | | topics.erase(topic); |
211 | | } |
212 | | |
213 | | /* If we don't hold any topics we are to be freed altogether */ |
214 | | return {true, s->topics.size() == 0, newCount}; |
215 | | } |
216 | | |
217 | | /* Factory function for creating a Subscriber */ |
218 | 106k | Subscriber *createSubscriber() { |
219 | 106k | return new Subscriber(); |
220 | 106k | } |
221 | | |
222 | | /* This is used to end a Subscriber, before freeing it */ |
223 | 106k | void freeSubscriber(Subscriber *s) { |
224 | | |
225 | | /* I guess we call this one even if we are not subscribers */ |
226 | 106k | if (!s) { |
227 | 0 | return; |
228 | 0 | } |
229 | | |
230 | | /* For all topics, unsubscribe */ |
231 | 10.6M | for (Topic *topicPtr : s->topics) { |
232 | | /* If we are the last subscriber, simply remove the whole topic */ |
233 | 10.6M | if (topicPtr->size() == 1) { |
234 | 10.6M | topics.erase(topicPtr->name); |
235 | 10.6M | } else { |
236 | | /* Otherwise just remove us */ |
237 | 0 | topicPtr->erase(s); |
238 | 0 | } |
239 | 10.6M | } |
240 | | |
241 | | /* We also need to unlink us */ |
242 | 106k | if (s->needsDrainage()) { |
243 | 39.9k | unlinkDrainableSubscriber(s); |
244 | 39.9k | } |
245 | | |
246 | 106k | delete s; |
247 | 106k | } |
248 | | |
249 | | /* Mainly used by WebSocket::send to drain one socket before sending */ |
250 | 48.8k | void drain(Subscriber *s) { |
251 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ |
252 | 48.8k | if (s->needsDrainage()) { |
253 | | /* This function differs from drainImpl by properly unlinking |
254 | | * the subscriber from drainableSubscribers. drainImpl does not. */ |
255 | 3.23k | unlinkDrainableSubscriber(s); |
256 | | |
257 | | /* This one always resets needsDrainage before it calls any cb's. |
258 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ |
259 | 3.23k | drainImpl(s); |
260 | | |
261 | | /* If we drained last subscriber, also clear outgoingMessages */ |
262 | 3.23k | if (!drainableSubscribers) { |
263 | 2.96k | outgoingMessages.clear(); |
264 | 2.96k | } |
265 | 3.23k | } |
266 | 48.8k | } |
267 | | |
268 | | /* Called everytime we call send, to drain published messages so to sync outgoing messages */ |
269 | 2.63M | void drain() { |
270 | 2.63M | if (drainableSubscribers) { |
271 | | /* Drain one socket a time */ |
272 | 18.9k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { |
273 | | /* Instead of unlinking every single subscriber, we just leave the list undefined |
274 | | * and reset drainableSubscribers ptr below. */ |
275 | 9.58k | drainImpl(s); |
276 | 9.58k | } |
277 | | /* Drain always clears drainableSubscribers and outgoingMessages */ |
278 | 9.33k | drainableSubscribers = nullptr; |
279 | 9.33k | outgoingMessages.clear(); |
280 | 9.33k | } |
281 | 2.63M | } |
282 | | |
283 | | /* Big messages bypass all buffering and land directly in backpressure */ |
284 | | template <typename F> |
285 | 0 | bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { |
286 | | /* Do we even have this topic? */ |
287 | 0 | auto it = topics.find(topic); |
288 | 0 | if (it == topics.end()) { |
289 | 0 | return false; |
290 | 0 | } |
291 | | |
292 | | /* For all subscribers in topic */ |
293 | 0 | for (Subscriber *s : *it->second) { |
294 | | |
295 | | /* If we are sender then ignore us */ |
296 | 0 | if (sender != s) { |
297 | 0 | cb(s, bigMessage); |
298 | 0 | } |
299 | 0 | } |
300 | |
|
301 | 0 | return true; |
302 | 0 | } |
303 | | |
304 | | /* Linear in number of affected subscribers */ |
305 | 1.04M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { |
306 | | /* Do we even have this topic? */ |
307 | 1.04M | auto it = topics.find(topic); |
308 | 1.04M | if (it == topics.end()) { |
309 | 0 | return false; |
310 | 0 | } |
311 | | |
312 | | /* If we have more than 65k messages we need to drain every socket. */ |
313 | 1.04M | if (outgoingMessages.size() == UINT16_MAX) { |
314 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain |
315 | | * to their own backpressure */ |
316 | 4 | drain(); |
317 | 4 | } |
318 | | |
319 | | /* If nobody references this message, don't buffer it */ |
320 | 1.04M | bool referencedMessage = false; |
321 | | |
322 | | /* For all subscribers in topic */ |
323 | 1.04M | for (Subscriber *s : *it->second) { |
324 | | |
325 | | /* If we are sender then ignore us */ |
326 | 1.04M | if (sender != s) { |
327 | | |
328 | | /* At least one subscriber wants this message */ |
329 | 1.04M | referencedMessage = true; |
330 | | |
331 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ |
332 | 1.04M | if (s->numMessageIndices == 32) { |
333 | | /* This one does not need to check needsDrainage here but still does. */ |
334 | 396 | drain(s); |
335 | 396 | } |
336 | | |
337 | | /* Finally we can continue */ |
338 | 1.04M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); |
339 | | /* First message adds subscriber to list of drainable subscribers */ |
340 | 1.04M | if (s->numMessageIndices == 1) { |
341 | | /* Insert us in the head of drainable subscribers */ |
342 | 52.7k | s->next = drainableSubscribers; |
343 | 52.7k | s->prev = nullptr; |
344 | 52.7k | if (s->next) { |
345 | 784 | s->next->prev = s; |
346 | 784 | } |
347 | 52.7k | drainableSubscribers = s; |
348 | 52.7k | } |
349 | 1.04M | } |
350 | 1.04M | } |
351 | | |
352 | | /* Push this message and return with success */ |
353 | 1.04M | if (referencedMessage) { |
354 | 1.04M | outgoingMessages.emplace_back(message); |
355 | 1.04M | } |
356 | | |
357 | | /* Success if someone wants it */ |
358 | 1.04M | return referencedMessage; |
359 | 1.04M | } |
360 | | }; |
361 | | |
362 | | } |
363 | | |
364 | | #endif |