Coverage Report

Created: 2025-06-13 06:09

/src/uWebSockets/src/TopicTree.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_TOPICTREE_H
19
#define UWS_TOPICTREE_H
20
21
#include <map>
22
#include <list>
23
#include <iostream>
24
#include <unordered_set>
25
#include <utility>
26
#include <memory>
27
#include <unordered_map>
28
#include <vector>
29
#include <string_view>
30
#include <functional>
31
#include <set>
32
#include <string>
33
#include <exception>
34
35
namespace uWS {
36
37
struct Subscriber;
38
39
struct Topic : std::unordered_set<Subscriber *> {
40
41
10.6M
    Topic(std::string_view topic) : name(topic) {
42
43
10.6M
    }
44
45
    std::string name;
46
};
47
48
struct Subscriber {
49
50
    template <typename, typename> friend struct TopicTree;
51
52
private:
53
    /* We use a factory */
54
106k
    Subscriber() = default;
55
56
    /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */
57
    Subscriber *prev, *next;
58
59
    /* Any one subscriber can be part of at most 32 publishes before it needs a drain,
60
     * or whatever encoding of runs or whatever we might do in the future */
61
    uint16_t messageIndices[32];
62
63
    /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */
64
    unsigned char numMessageIndices = 0;
65
66
public:
67
68
    /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */
69
    std::set<Topic *> topics;
70
71
    /* User data */
72
    void *user;
73
74
155k
    bool needsDrainage() {
75
155k
        return numMessageIndices;
76
155k
    }
77
};
78
79
template <typename T, typename B>
80
struct TopicTree {
81
82
    enum IteratorFlags {
83
        LAST = 1,
84
        FIRST = 2
85
    };
86
87
    /* Whomever is iterating this topic is locked to not modify its own list */
88
    Subscriber *iteratingSubscriber = nullptr;
89
90
private:
91
92
    /* The drain callback must not publish, unsubscribe or subscribe.
93
     * It must only cork, uncork, send, write */
94
    std::function<bool(Subscriber *, T &, IteratorFlags)> cb;
95
96
    /* The topics */
97
    std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics;
98
99
    /* List of subscribers that needs drainage */
100
    Subscriber *drainableSubscribers = nullptr;
101
102
    /* Palette of outgoing messages, up to 64k */
103
    std::vector<T> outgoingMessages;
104
105
10.6M
    void checkIteratingSubscriber(Subscriber *s) {
106
        /* Notify user that they are doing something wrong here */
107
10.6M
        if (iteratingSubscriber == s) {
108
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
109
0
            std::terminate();
110
0
        }
111
10.6M
    }
112
113
    /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */
114
12.8k
    void drainImpl(Subscriber *s) {
115
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
116
         * since WebSocket::send will call drain from within the cb in that case.*/
117
12.8k
        int numMessageIndices = s->numMessageIndices;
118
12.8k
        s->numMessageIndices = 0;
119
120
        /* Then we emit cb */
121
47.5k
        for (int i = 0; i < numMessageIndices; i++) {
122
35.8k
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
123
124
35.8k
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
125
126
            /* Returning true will stop drainage short (such as when backpressure is too high) */
127
35.8k
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
128
1.14k
                break;
129
1.14k
            }
130
35.8k
        }
131
12.8k
    }
132
133
43.2k
    void unlinkDrainableSubscriber(Subscriber *s) {
134
43.2k
        if (s->prev) {
135
0
            s->prev->next = s->next;
136
0
        }
137
43.2k
        if (s->next) {
138
534
            s->next->prev = s->prev;
139
534
        }
140
        /* If we are the head, then we also need to reset the head */
141
43.2k
        if (drainableSubscribers == s) {
142
43.2k
            drainableSubscribers = s->next;
143
43.2k
        }
144
43.2k
    }
145
146
public:
147
148
5.94k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
149
150
5.94k
    }
151
152
    /* Returns nullptr if not found */
153
10.6M
    Topic *lookupTopic(std::string_view topic) {
154
10.6M
        auto it = topics.find(topic);
155
10.6M
        if (it == topics.end()) {
156
10.6M
            return nullptr;
157
10.6M
        }
158
0
        return it->second.get();
159
10.6M
    }
160
161
    /* Subscribe fails if we already are subscribed */
162
10.6M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
163
        /* Notify user that they are doing something wrong here */
164
10.6M
        checkIteratingSubscriber(s);
165
166
        /* Lookup or create new topic */
167
10.6M
        Topic *topicPtr = lookupTopic(topic);
168
10.6M
        if (!topicPtr) {
169
10.6M
            Topic *newTopic = new Topic(topic);
170
10.6M
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
171
10.6M
            topicPtr = newTopic;
172
10.6M
        }
173
174
        /* Insert us in topic, insert topic in us */
175
10.6M
        auto [it, inserted] = s->topics.insert(topicPtr);
176
10.6M
        if (!inserted) {
177
0
            return nullptr;
178
0
        }
179
10.6M
        topicPtr->insert(s);
180
181
        /* Success */
182
10.6M
        return topicPtr;
183
10.6M
    }
184
185
    /* Returns ok, last, newCount */
186
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
187
        /* Notify user that they are doing something wrong here */
188
        checkIteratingSubscriber(s);
189
190
        /* Lookup topic */
191
        Topic *topicPtr = lookupTopic(topic);
192
        if (!topicPtr) {
193
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
194
            return {false, false, -1};
195
        }
196
197
        /* Erase from our list first */
198
        if (s->topics.erase(topicPtr) == 0) {
199
            return {false, false, -1};
200
        }
201
202
        /* Remove us from topic */
203
        topicPtr->erase(s);
204
205
        int newCount = (int) topicPtr->size();
206
207
        /* If there is no subscriber to this topic, remove it */
208
        if (!topicPtr->size()) {
209
            /* Unique_ptr deletes the topic */
210
            topics.erase(topic);
211
        }
212
213
        /* If we don't hold any topics we are to be freed altogether */
214
        return {true, s->topics.size() == 0, newCount};
215
    }
216
217
    /* Factory function for creating a Subscriber */
218
106k
    Subscriber *createSubscriber() {
219
106k
        return new Subscriber();
220
106k
    }
221
222
    /* This is used to end a Subscriber, before freeing it */
223
106k
    void freeSubscriber(Subscriber *s) {
224
225
        /* I guess we call this one even if we are not subscribers */
226
106k
        if (!s) {
227
0
            return;
228
0
        }
229
230
        /* For all topics, unsubscribe */
231
10.6M
        for (Topic *topicPtr : s->topics) {
232
            /* If we are the last subscriber, simply remove the whole topic */
233
10.6M
            if (topicPtr->size() == 1) {
234
10.6M
                topics.erase(topicPtr->name);
235
10.6M
            } else {
236
                /* Otherwise just remove us */
237
0
                topicPtr->erase(s);
238
0
            }
239
10.6M
        }
240
241
        /* We also need to unlink us */
242
106k
        if (s->needsDrainage()) {
243
39.9k
            unlinkDrainableSubscriber(s);
244
39.9k
        }
245
246
106k
        delete s;
247
106k
    }
248
249
    /* Mainly used by WebSocket::send to drain one socket before sending */
250
48.8k
    void drain(Subscriber *s) {
251
        /* The list is undefined and cannot be touched unless needsDrainage(). */
252
48.8k
        if (s->needsDrainage()) {
253
            /* This function differs from drainImpl by properly unlinking
254
            * the subscriber from drainableSubscribers. drainImpl does not. */
255
3.23k
            unlinkDrainableSubscriber(s);
256
257
            /* This one always resets needsDrainage before it calls any cb's.
258
             * Otherwise we would stackoverflow when sending after publish but before drain. */
259
3.23k
            drainImpl(s);
260
            
261
            /* If we drained last subscriber, also clear outgoingMessages */
262
3.23k
            if (!drainableSubscribers) {
263
2.96k
                outgoingMessages.clear();
264
2.96k
            }
265
3.23k
        }
266
48.8k
    }
267
268
    /* Called everytime we call send, to drain published messages so to sync outgoing messages */
269
2.63M
    void drain() {
270
2.63M
        if (drainableSubscribers) {
271
            /* Drain one socket a time */
272
18.9k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
273
                /* Instead of unlinking every single subscriber, we just leave the list undefined
274
                 * and reset drainableSubscribers ptr below. */
275
9.58k
                drainImpl(s);
276
9.58k
            }
277
            /* Drain always clears drainableSubscribers and outgoingMessages */
278
9.33k
            drainableSubscribers = nullptr;
279
9.33k
            outgoingMessages.clear();
280
9.33k
        }
281
2.63M
    }
282
283
    /* Big messages bypass all buffering and land directly in backpressure */
284
    template <typename F>
285
0
    bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
286
        /* Do we even have this topic? */
287
0
        auto it = topics.find(topic);
288
0
        if (it == topics.end()) {
289
0
            return false;
290
0
        }
291
292
        /* For all subscribers in topic */
293
0
        for (Subscriber *s : *it->second) {
294
295
            /* If we are sender then ignore us */
296
0
            if (sender != s) {
297
0
                cb(s, bigMessage);
298
0
            }
299
0
        }
300
301
0
        return true;
302
0
    }
303
304
    /* Linear in number of affected subscribers */
305
1.04M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
306
        /* Do we even have this topic? */
307
1.04M
        auto it = topics.find(topic);
308
1.04M
        if (it == topics.end()) {
309
0
            return false;
310
0
        }
311
312
        /* If we have more than 65k messages we need to drain every socket. */
313
1.04M
        if (outgoingMessages.size() == UINT16_MAX) {
314
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
315
             * to their own backpressure */
316
4
            drain();
317
4
        }
318
319
        /* If nobody references this message, don't buffer it */
320
1.04M
        bool referencedMessage = false;
321
322
        /* For all subscribers in topic */
323
1.04M
        for (Subscriber *s : *it->second) {
324
325
            /* If we are sender then ignore us */
326
1.04M
            if (sender != s) {
327
328
                /* At least one subscriber wants this message */
329
1.04M
                referencedMessage = true;
330
331
                /* If we already have too many outgoing messages on this subscriber, drain it now */
332
1.04M
                if (s->numMessageIndices == 32) {
333
                    /* This one does not need to check needsDrainage here but still does. */
334
396
                    drain(s);
335
396
                }
336
337
                /* Finally we can continue */
338
1.04M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
339
                /* First message adds subscriber to list of drainable subscribers */
340
1.04M
                if (s->numMessageIndices == 1) {
341
                    /* Insert us in the head of drainable subscribers */
342
52.7k
                    s->next = drainableSubscribers;
343
52.7k
                    s->prev = nullptr;
344
52.7k
                    if (s->next) {
345
784
                        s->next->prev = s;
346
784
                    }
347
52.7k
                    drainableSubscribers = s;
348
52.7k
                }
349
1.04M
            }
350
1.04M
        }
351
352
        /* Push this message and return with success */
353
1.04M
        if (referencedMessage) {
354
1.04M
            outgoingMessages.emplace_back(message);
355
1.04M
        }
356
357
        /* Success if someone wants it */
358
1.04M
        return referencedMessage;
359
1.04M
    }
360
};
361
362
}
363
364
#endif