/src/uWebSockets/src/TopicTree.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_TOPICTREE_H |
19 | | #define UWS_TOPICTREE_H |
20 | | |
21 | | #include <map> |
22 | | #include <list> |
23 | | #include <iostream> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <memory> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | #include <string_view> |
30 | | #include <functional> |
31 | | #include <set> |
32 | | #include <string> |
33 | | |
34 | | namespace uWS { |
35 | | |
36 | | struct Subscriber; |
37 | | |
38 | | struct Topic : std::unordered_set<Subscriber *> { |
39 | | |
40 | 0 | Topic(std::string_view topic) : name(topic) { |
41 | 0 |
|
42 | 0 | } |
43 | | |
44 | | std::string name; |
45 | | }; |
46 | | |
47 | | struct Subscriber { |
48 | | |
49 | | template <typename, typename> friend struct TopicTree; |
50 | | |
51 | | private: |
52 | | /* We use a factory */ |
53 | | Subscriber() = default; |
54 | | |
55 | | /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */ |
56 | | Subscriber *prev, *next; |
57 | | |
58 | | /* Any one subscriber can be part of at most 32 publishes before it needs a drain, |
59 | | * or whatever encoding of runs or whatever we might do in the future */ |
60 | | uint16_t messageIndices[32]; |
61 | | |
62 | | /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */ |
63 | | unsigned char numMessageIndices = 0; |
64 | | |
65 | | public: |
66 | | |
67 | | /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */ |
68 | | std::set<Topic *> topics; |
69 | | |
70 | | /* User data */ |
71 | | void *user; |
72 | | |
73 | 0 | bool needsDrainage() { |
74 | 0 | return numMessageIndices; |
75 | 0 | } |
76 | | }; |
77 | | |
78 | | template <typename T, typename B> |
79 | | struct TopicTree { |
80 | | |
81 | | enum IteratorFlags { |
82 | | LAST = 1, |
83 | | FIRST = 2 |
84 | | }; |
85 | | |
86 | | /* Whomever is iterating this topic is locked to not modify its own list */ |
87 | | Subscriber *iteratingSubscriber = nullptr; |
88 | | |
89 | | private: |
90 | | |
91 | | /* The drain callback must not publish, unsubscribe or subscribe. |
92 | | * It must only cork, uncork, send, write */ |
93 | | std::function<bool(Subscriber *, T &, IteratorFlags)> cb; |
94 | | |
95 | | /* The topics */ |
96 | | std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics; |
97 | | |
98 | | /* List of subscribers that needs drainage */ |
99 | | Subscriber *drainableSubscribers = nullptr; |
100 | | |
101 | | /* Palette of outgoing messages, up to 64k */ |
102 | | std::vector<T> outgoingMessages; |
103 | | |
104 | | void checkIteratingSubscriber(Subscriber *s) { |
105 | | /* Notify user that they are doing something wrong here */ |
106 | | if (iteratingSubscriber == s) { |
107 | | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; |
108 | | std::terminate(); |
109 | | } |
110 | | } |
111 | | |
112 | | /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */ |
113 | 0 | void drainImpl(Subscriber *s) { |
114 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() |
115 | | * since WebSocket::send will call drain from within the cb in that case.*/ |
116 | 0 | int numMessageIndices = s->numMessageIndices; |
117 | 0 | s->numMessageIndices = 0; |
118 | | |
119 | | /* Then we emit cb */ |
120 | 0 | for (int i = 0; i < numMessageIndices; i++) { |
121 | 0 | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; |
122 | |
|
123 | 0 | int flags = (i == numMessageIndices - 1) ? LAST : 0; |
124 | | |
125 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ |
126 | 0 | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { |
127 | 0 | break; |
128 | 0 | } |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | 0 | void unlinkDrainableSubscriber(Subscriber *s) { |
133 | 0 | if (s->prev) { |
134 | 0 | s->prev->next = s->next; |
135 | 0 | } |
136 | 0 | if (s->next) { |
137 | 0 | s->next->prev = s->prev; |
138 | 0 | } |
139 | | /* If we are the head, then we also need to reset the head */ |
140 | 0 | if (drainableSubscribers == s) { |
141 | 0 | drainableSubscribers = s->next; |
142 | 0 | } |
143 | 0 | } |
144 | | |
145 | | public: |
146 | | |
147 | 6.41k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { |
148 | | |
149 | 6.41k | } |
150 | | |
151 | | /* Returns nullptr if not found */ |
152 | | Topic *lookupTopic(std::string_view topic) { |
153 | | auto it = topics.find(topic); |
154 | | if (it == topics.end()) { |
155 | | return nullptr; |
156 | | } |
157 | | return it->second.get(); |
158 | | } |
159 | | |
160 | | /* Subscribe fails if we already are subscribed */ |
161 | | Topic *subscribe(Subscriber *s, std::string_view topic) { |
162 | | /* Notify user that they are doing something wrong here */ |
163 | | checkIteratingSubscriber(s); |
164 | | |
165 | | /* Lookup or create new topic */ |
166 | | Topic *topicPtr = lookupTopic(topic); |
167 | | if (!topicPtr) { |
168 | | Topic *newTopic = new Topic(topic); |
169 | | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); |
170 | | topicPtr = newTopic; |
171 | | } |
172 | | |
173 | | /* Insert us in topic, insert topic in us */ |
174 | | auto [it, inserted] = s->topics.insert(topicPtr); |
175 | | if (!inserted) { |
176 | | return nullptr; |
177 | | } |
178 | | topicPtr->insert(s); |
179 | | |
180 | | /* Success */ |
181 | | return topicPtr; |
182 | | } |
183 | | |
184 | | /* Returns ok, last, newCount */ |
185 | | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { |
186 | | /* Notify user that they are doing something wrong here */ |
187 | | checkIteratingSubscriber(s); |
188 | | |
189 | | /* Lookup topic */ |
190 | | Topic *topicPtr = lookupTopic(topic); |
191 | | if (!topicPtr) { |
192 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ |
193 | | return {false, false, -1}; |
194 | | } |
195 | | |
196 | | /* Erase from our list first */ |
197 | | if (s->topics.erase(topicPtr) == 0) { |
198 | | return {false, false, -1}; |
199 | | } |
200 | | |
201 | | /* Remove us from topic */ |
202 | | topicPtr->erase(s); |
203 | | |
204 | | int newCount = topicPtr->size(); |
205 | | |
206 | | /* If there is no subscriber to this topic, remove it */ |
207 | | if (!topicPtr->size()) { |
208 | | /* Unique_ptr deletes the topic */ |
209 | | topics.erase(topic); |
210 | | } |
211 | | |
212 | | /* If we don't hold any topics we are to be freed altogether */ |
213 | | return {true, s->topics.size() == 0, newCount}; |
214 | | } |
215 | | |
216 | | /* Factory function for creating a Subscriber */ |
217 | | Subscriber *createSubscriber() { |
218 | | return new Subscriber(); |
219 | | } |
220 | | |
221 | | /* This is used to end a Subscriber, before freeing it */ |
222 | 40.1k | void freeSubscriber(Subscriber *s) { |
223 | | |
224 | | /* I guess we call this one even if we are not subscribers */ |
225 | 40.1k | if (!s) { |
226 | 40.1k | return; |
227 | 40.1k | } |
228 | | |
229 | | /* For all topics, unsubscribe */ |
230 | 0 | for (Topic *topicPtr : s->topics) { |
231 | | /* If we are the last subscriber, simply remove the whole topic */ |
232 | 0 | if (topicPtr->size() == 1) { |
233 | 0 | topics.erase(topicPtr->name); |
234 | 0 | } else { |
235 | | /* Otherwise just remove us */ |
236 | 0 | topicPtr->erase(s); |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | | /* We also need to unlink us */ |
241 | 0 | if (s->needsDrainage()) { |
242 | 0 | unlinkDrainableSubscriber(s); |
243 | 0 | } |
244 | |
|
245 | 0 | delete s; |
246 | 0 | } |
247 | | |
248 | | /* Mainly used by WebSocket::send to drain one socket before sending */ |
249 | 0 | void drain(Subscriber *s) { |
250 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ |
251 | 0 | if (s->needsDrainage()) { |
252 | | /* This function differs from drainImpl by properly unlinking |
253 | | * the subscriber from drainableSubscribers. drainImpl does not. */ |
254 | 0 | unlinkDrainableSubscriber(s); |
255 | | |
256 | | /* This one always resets needsDrainage before it calls any cb's. |
257 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ |
258 | 0 | drainImpl(s); |
259 | | |
260 | | /* If we drained last subscriber, also clear outgoingMessages */ |
261 | 0 | if (!drainableSubscribers) { |
262 | 0 | outgoingMessages.clear(); |
263 | 0 | } |
264 | 0 | } |
265 | 0 | } |
266 | | |
267 | | /* Called everytime we call send, to drain published messages so to sync outgoing messages */ |
268 | 3.19M | void drain() { |
269 | 3.19M | if (drainableSubscribers) { |
270 | | /* Drain one socket a time */ |
271 | 0 | for (Subscriber *s = drainableSubscribers; s; s = s->next) { |
272 | | /* Instead of unlinking every single subscriber, we just leave the list undefined |
273 | | * and reset drainableSubscribers ptr below. */ |
274 | 0 | drainImpl(s); |
275 | 0 | } |
276 | | /* Drain always clears drainableSubscribers and outgoingMessages */ |
277 | 0 | drainableSubscribers = nullptr; |
278 | 0 | outgoingMessages.clear(); |
279 | 0 | } |
280 | 3.19M | } |
281 | | |
282 | | /* Big messages bypass all buffering and land directly in backpressure */ |
283 | | template <typename F> |
284 | | bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { |
285 | | /* Do we even have this topic? */ |
286 | | auto it = topics.find(topic); |
287 | | if (it == topics.end()) { |
288 | | return false; |
289 | | } |
290 | | |
291 | | /* For all subscribers in topic */ |
292 | | for (Subscriber *s : *it->second) { |
293 | | |
294 | | /* If we are sender then ignore us */ |
295 | | if (sender != s) { |
296 | | cb(s, bigMessage); |
297 | | } |
298 | | } |
299 | | |
300 | | return true; |
301 | | } |
302 | | |
303 | | /* Linear in number of affected subscribers */ |
304 | | bool publish(Subscriber *sender, std::string_view topic, T &&message) { |
305 | | /* Do we even have this topic? */ |
306 | | auto it = topics.find(topic); |
307 | | if (it == topics.end()) { |
308 | | return false; |
309 | | } |
310 | | |
311 | | /* If we have more than 65k messages we need to drain every socket. */ |
312 | | if (outgoingMessages.size() == UINT16_MAX) { |
313 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain |
314 | | * to their own backpressure */ |
315 | | drain(); |
316 | | } |
317 | | |
318 | | /* If nobody references this message, don't buffer it */ |
319 | | bool referencedMessage = false; |
320 | | |
321 | | /* For all subscribers in topic */ |
322 | | for (Subscriber *s : *it->second) { |
323 | | |
324 | | /* If we are sender then ignore us */ |
325 | | if (sender != s) { |
326 | | |
327 | | /* At least one subscriber wants this message */ |
328 | | referencedMessage = true; |
329 | | |
330 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ |
331 | | if (s->numMessageIndices == 32) { |
332 | | /* This one does not need to check needsDrainage here but still does. */ |
333 | | drain(s); |
334 | | } |
335 | | |
336 | | /* Finally we can continue */ |
337 | | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); |
338 | | /* First message adds subscriber to list of drainable subscribers */ |
339 | | if (s->numMessageIndices == 1) { |
340 | | /* Insert us in the head of drainable subscribers */ |
341 | | s->next = drainableSubscribers; |
342 | | s->prev = nullptr; |
343 | | if (s->next) { |
344 | | s->next->prev = s; |
345 | | } |
346 | | drainableSubscribers = s; |
347 | | } |
348 | | } |
349 | | } |
350 | | |
351 | | /* Push this message and return with success */ |
352 | | if (referencedMessage) { |
353 | | outgoingMessages.emplace_back(message); |
354 | | } |
355 | | |
356 | | /* Success if someone wants it */ |
357 | | return referencedMessage; |
358 | | } |
359 | | }; |
360 | | |
361 | | } |
362 | | |
363 | | #endif |