Coverage Report

Created: 2024-04-25 06:10

/src/uWebSockets/src/TopicTree.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_TOPICTREE_H
19
#define UWS_TOPICTREE_H
20
21
#include <map>
22
#include <list>
23
#include <iostream>
24
#include <unordered_set>
25
#include <utility>
26
#include <memory>
27
#include <unordered_map>
28
#include <vector>
29
#include <string_view>
30
#include <functional>
31
#include <set>
32
#include <string>
33
34
namespace uWS {
35
36
struct Subscriber;
37
38
struct Topic : std::unordered_set<Subscriber *> {
39
40
0
    Topic(std::string_view topic) : name(topic) {
41
0
42
0
    }
43
44
    std::string name;
45
};
46
47
struct Subscriber {
48
49
    template <typename, typename> friend struct TopicTree;
50
51
private:
52
    /* We use a factory */
53
    Subscriber() = default;
54
55
    /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */
56
    Subscriber *prev, *next;
57
58
    /* Any one subscriber can be part of at most 32 publishes before it needs a drain,
59
     * or whatever encoding of runs or whatever we might do in the future */
60
    uint16_t messageIndices[32];
61
62
    /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */
63
    unsigned char numMessageIndices = 0;
64
65
public:
66
67
    /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */
68
    std::set<Topic *> topics;
69
70
    /* User data */
71
    void *user;
72
73
0
    bool needsDrainage() {
74
0
        return numMessageIndices;
75
0
    }
76
};
77
78
template <typename T, typename B>
79
struct TopicTree {
80
81
    enum IteratorFlags {
82
        LAST = 1,
83
        FIRST = 2
84
    };
85
86
    /* Whomever is iterating this topic is locked to not modify its own list */
87
    Subscriber *iteratingSubscriber = nullptr;
88
89
private:
90
91
    /* The drain callback must not publish, unsubscribe or subscribe.
92
     * It must only cork, uncork, send, write */
93
    std::function<bool(Subscriber *, T &, IteratorFlags)> cb;
94
95
    /* The topics */
96
    std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics;
97
98
    /* List of subscribers that needs drainage */
99
    Subscriber *drainableSubscribers = nullptr;
100
101
    /* Palette of outgoing messages, up to 64k */
102
    std::vector<T> outgoingMessages;
103
104
    void checkIteratingSubscriber(Subscriber *s) {
105
        /* Notify user that they are doing something wrong here */
106
        if (iteratingSubscriber == s) {
107
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
108
            std::terminate();
109
        }
110
    }
111
112
    /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */
113
0
    void drainImpl(Subscriber *s) {
114
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
115
         * since WebSocket::send will call drain from within the cb in that case.*/
116
0
        int numMessageIndices = s->numMessageIndices;
117
0
        s->numMessageIndices = 0;
118
119
        /* Then we emit cb */
120
0
        for (int i = 0; i < numMessageIndices; i++) {
121
0
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
122
123
0
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
124
125
            /* Returning true will stop drainage short (such as when backpressure is too high) */
126
0
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
127
0
                break;
128
0
            }
129
0
        }
130
0
    }
131
132
0
    void unlinkDrainableSubscriber(Subscriber *s) {
133
0
        if (s->prev) {
134
0
            s->prev->next = s->next;
135
0
        }
136
0
        if (s->next) {
137
0
            s->next->prev = s->prev;
138
0
        }
139
        /* If we are the head, then we also need to reset the head */
140
0
        if (drainableSubscribers == s) {
141
0
            drainableSubscribers = s->next;
142
0
        }
143
0
    }
144
145
public:
146
147
6.41k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
148
149
6.41k
    }
150
151
    /* Returns nullptr if not found */
152
    Topic *lookupTopic(std::string_view topic) {
153
        auto it = topics.find(topic);
154
        if (it == topics.end()) {
155
            return nullptr;
156
        }
157
        return it->second.get();
158
    }
159
160
    /* Subscribe fails if we already are subscribed */
161
    Topic *subscribe(Subscriber *s, std::string_view topic) {
162
        /* Notify user that they are doing something wrong here */
163
        checkIteratingSubscriber(s);
164
165
        /* Lookup or create new topic */
166
        Topic *topicPtr = lookupTopic(topic);
167
        if (!topicPtr) {
168
            Topic *newTopic = new Topic(topic);
169
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
170
            topicPtr = newTopic;
171
        }
172
173
        /* Insert us in topic, insert topic in us */
174
        auto [it, inserted] = s->topics.insert(topicPtr);
175
        if (!inserted) {
176
            return nullptr;
177
        }
178
        topicPtr->insert(s);
179
180
        /* Success */
181
        return topicPtr;
182
    }
183
184
    /* Returns ok, last, newCount */
185
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
186
        /* Notify user that they are doing something wrong here */
187
        checkIteratingSubscriber(s);
188
189
        /* Lookup topic */
190
        Topic *topicPtr = lookupTopic(topic);
191
        if (!topicPtr) {
192
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
193
            return {false, false, -1};
194
        }
195
196
        /* Erase from our list first */
197
        if (s->topics.erase(topicPtr) == 0) {
198
            return {false, false, -1};
199
        }
200
201
        /* Remove us from topic */
202
        topicPtr->erase(s);
203
204
        int newCount = topicPtr->size();
205
206
        /* If there is no subscriber to this topic, remove it */
207
        if (!topicPtr->size()) {
208
            /* Unique_ptr deletes the topic */
209
            topics.erase(topic);
210
        }
211
212
        /* If we don't hold any topics we are to be freed altogether */
213
        return {true, s->topics.size() == 0, newCount};
214
    }
215
216
    /* Factory function for creating a Subscriber */
217
    Subscriber *createSubscriber() {
218
        return new Subscriber();
219
    }
220
221
    /* This is used to end a Subscriber, before freeing it */
222
40.1k
    void freeSubscriber(Subscriber *s) {
223
224
        /* I guess we call this one even if we are not subscribers */
225
40.1k
        if (!s) {
226
40.1k
            return;
227
40.1k
        }
228
229
        /* For all topics, unsubscribe */
230
0
        for (Topic *topicPtr : s->topics) {
231
            /* If we are the last subscriber, simply remove the whole topic */
232
0
            if (topicPtr->size() == 1) {
233
0
                topics.erase(topicPtr->name);
234
0
            } else {
235
                /* Otherwise just remove us */
236
0
                topicPtr->erase(s);
237
0
            }
238
0
        }
239
240
        /* We also need to unlink us */
241
0
        if (s->needsDrainage()) {
242
0
            unlinkDrainableSubscriber(s);
243
0
        }
244
245
0
        delete s;
246
0
    }
247
248
    /* Mainly used by WebSocket::send to drain one socket before sending */
249
0
    void drain(Subscriber *s) {
250
        /* The list is undefined and cannot be touched unless needsDrainage(). */
251
0
        if (s->needsDrainage()) {
252
            /* This function differs from drainImpl by properly unlinking
253
            * the subscriber from drainableSubscribers. drainImpl does not. */
254
0
            unlinkDrainableSubscriber(s);
255
256
            /* This one always resets needsDrainage before it calls any cb's.
257
             * Otherwise we would stackoverflow when sending after publish but before drain. */
258
0
            drainImpl(s);
259
            
260
            /* If we drained last subscriber, also clear outgoingMessages */
261
0
            if (!drainableSubscribers) {
262
0
                outgoingMessages.clear();
263
0
            }
264
0
        }
265
0
    }
266
267
    /* Called everytime we call send, to drain published messages so to sync outgoing messages */
268
3.19M
    void drain() {
269
3.19M
        if (drainableSubscribers) {
270
            /* Drain one socket a time */
271
0
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
272
                /* Instead of unlinking every single subscriber, we just leave the list undefined
273
                 * and reset drainableSubscribers ptr below. */
274
0
                drainImpl(s);
275
0
            }
276
            /* Drain always clears drainableSubscribers and outgoingMessages */
277
0
            drainableSubscribers = nullptr;
278
0
            outgoingMessages.clear();
279
0
        }
280
3.19M
    }
281
282
    /* Big messages bypass all buffering and land directly in backpressure */
283
    template <typename F>
284
    bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
285
        /* Do we even have this topic? */
286
        auto it = topics.find(topic);
287
        if (it == topics.end()) {
288
            return false;
289
        }
290
291
        /* For all subscribers in topic */
292
        for (Subscriber *s : *it->second) {
293
294
            /* If we are sender then ignore us */
295
            if (sender != s) {
296
                cb(s, bigMessage);
297
            }
298
        }
299
300
        return true;
301
    }
302
303
    /* Linear in number of affected subscribers */
304
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
305
        /* Do we even have this topic? */
306
        auto it = topics.find(topic);
307
        if (it == topics.end()) {
308
            return false;
309
        }
310
311
        /* If we have more than 65k messages we need to drain every socket. */
312
        if (outgoingMessages.size() == UINT16_MAX) {
313
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
314
             * to their own backpressure */
315
            drain();
316
        }
317
318
        /* If nobody references this message, don't buffer it */
319
        bool referencedMessage = false;
320
321
        /* For all subscribers in topic */
322
        for (Subscriber *s : *it->second) {
323
324
            /* If we are sender then ignore us */
325
            if (sender != s) {
326
327
                /* At least one subscriber wants this message */
328
                referencedMessage = true;
329
330
                /* If we already have too many outgoing messages on this subscriber, drain it now */
331
                if (s->numMessageIndices == 32) {
332
                    /* This one does not need to check needsDrainage here but still does. */
333
                    drain(s);
334
                }
335
336
                /* Finally we can continue */
337
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
338
                /* First message adds subscriber to list of drainable subscribers */
339
                if (s->numMessageIndices == 1) {
340
                    /* Insert us in the head of drainable subscribers */
341
                    s->next = drainableSubscribers;
342
                    s->prev = nullptr;
343
                    if (s->next) {
344
                        s->next->prev = s;
345
                    }
346
                    drainableSubscribers = s;
347
                }
348
            }
349
        }
350
351
        /* Push this message and return with success */
352
        if (referencedMessage) {
353
            outgoingMessages.emplace_back(message);
354
        }
355
356
        /* Success if someone wants it */
357
        return referencedMessage;
358
    }
359
};
360
361
}
362
363
#endif