Coverage Report

Created: 2025-10-14 06:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/uWebSockets/src/TopicTree.h
Line
Count
Source
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_TOPICTREE_H
19
#define UWS_TOPICTREE_H
20
21
#include <map>
22
#include <list>
23
#include <iostream>
24
#include <unordered_set>
25
#include <utility>
26
#include <memory>
27
#include <unordered_map>
28
#include <vector>
29
#include <string_view>
30
#include <functional>
31
#include <set>
32
#include <string>
33
#include <exception>
34
35
namespace uWS {
36
37
struct Subscriber;
38
39
struct Topic : std::unordered_set<Subscriber *> {
40
41
10.9M
    Topic(std::string_view topic) : name(topic) {
42
43
10.9M
    }
44
45
    std::string name;
46
};
47
48
struct Subscriber {
49
50
    template <typename, typename> friend struct TopicTree;
51
52
private:
53
    /* We use a factory */
54
1.27M
    Subscriber() = default;
55
56
    /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */
57
    Subscriber *prev, *next;
58
59
    /* Any one subscriber can be part of at most 32 publishes before it needs a drain,
60
     * or whatever encoding of runs or whatever we might do in the future */
61
    uint16_t messageIndices[32];
62
63
    /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */
64
    unsigned char numMessageIndices = 0;
65
66
public:
67
68
    /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */
69
    std::set<Topic *> topics;
70
71
    /* User data */
72
    void *user;
73
74
1.51M
    bool needsDrainage() {
75
1.51M
        return numMessageIndices;
76
1.51M
    }
77
};
78
79
template <typename T, typename B>
80
struct TopicTree {
81
82
    enum IteratorFlags {
83
        LAST = 1,
84
        FIRST = 2
85
    };
86
87
    /* Whomever is iterating this topic is locked to not modify its own list */
88
    Subscriber *iteratingSubscriber = nullptr;
89
90
private:
91
92
    /* The drain callback must not publish, unsubscribe or subscribe.
93
     * It must only cork, uncork, send, write */
94
    std::function<bool(Subscriber *, T &, IteratorFlags)> cb;
95
96
    /* The topics */
97
    std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics;
98
99
    /* List of subscribers that needs drainage */
100
    Subscriber *drainableSubscribers = nullptr;
101
102
    /* Palette of outgoing messages, up to 64k */
103
    std::vector<T> outgoingMessages;
104
105
12.1M
    void checkIteratingSubscriber(Subscriber *s) {
106
        /* Notify user that they are doing something wrong here */
107
12.1M
        if (iteratingSubscriber == s) {
108
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
109
0
            std::terminate();
110
0
        }
111
12.1M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::checkIteratingSubscriber(uWS::Subscriber*)
Line
Count
Source
105
1.26M
    void checkIteratingSubscriber(Subscriber *s) {
106
        /* Notify user that they are doing something wrong here */
107
1.26M
        if (iteratingSubscriber == s) {
108
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
109
0
            std::terminate();
110
0
        }
111
1.26M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::checkIteratingSubscriber(uWS::Subscriber*)
Line
Count
Source
105
10.8M
    void checkIteratingSubscriber(Subscriber *s) {
106
        /* Notify user that they are doing something wrong here */
107
10.8M
        if (iteratingSubscriber == s) {
108
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
109
0
            std::terminate();
110
0
        }
111
10.8M
    }
112
113
    /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */
114
222k
    void drainImpl(Subscriber *s) {
115
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
116
         * since WebSocket::send will call drain from within the cb in that case.*/
117
222k
        int numMessageIndices = s->numMessageIndices;
118
222k
        s->numMessageIndices = 0;
119
120
        /* Then we emit cb */
121
3.35M
        for (int i = 0; i < numMessageIndices; i++) {
122
3.13M
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
123
124
3.13M
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
125
126
            /* Returning true will stop drainage short (such as when backpressure is too high) */
127
3.13M
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
128
2.44k
                break;
129
2.44k
            }
130
3.13M
        }
131
222k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drainImpl(uWS::Subscriber*)
Line
Count
Source
114
200k
    void drainImpl(Subscriber *s) {
115
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
116
         * since WebSocket::send will call drain from within the cb in that case.*/
117
200k
        int numMessageIndices = s->numMessageIndices;
118
200k
        s->numMessageIndices = 0;
119
120
        /* Then we emit cb */
121
3.20M
        for (int i = 0; i < numMessageIndices; i++) {
122
3.00M
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
123
124
3.00M
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
125
126
            /* Returning true will stop drainage short (such as when backpressure is too high) */
127
3.00M
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
128
1.39k
                break;
129
1.39k
            }
130
3.00M
        }
131
200k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drainImpl(uWS::Subscriber*)
Line
Count
Source
114
21.2k
    void drainImpl(Subscriber *s) {
115
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
116
         * since WebSocket::send will call drain from within the cb in that case.*/
117
21.2k
        int numMessageIndices = s->numMessageIndices;
118
21.2k
        s->numMessageIndices = 0;
119
120
        /* Then we emit cb */
121
155k
        for (int i = 0; i < numMessageIndices; i++) {
122
135k
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
123
124
135k
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
125
126
            /* Returning true will stop drainage short (such as when backpressure is too high) */
127
135k
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
128
1.04k
                break;
129
1.04k
            }
130
135k
        }
131
21.2k
    }
132
133
163k
    void unlinkDrainableSubscriber(Subscriber *s) {
134
163k
        if (s->prev) {
135
105k
            s->prev->next = s->next;
136
105k
        }
137
163k
        if (s->next) {
138
47.3k
            s->next->prev = s->prev;
139
47.3k
        }
140
        /* If we are the head, then we also need to reset the head */
141
163k
        if (drainableSubscribers == s) {
142
58.0k
            drainableSubscribers = s->next;
143
58.0k
        }
144
163k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unlinkDrainableSubscriber(uWS::Subscriber*)
Line
Count
Source
133
90.4k
    void unlinkDrainableSubscriber(Subscriber *s) {
134
90.4k
        if (s->prev) {
135
82.6k
            s->prev->next = s->next;
136
82.6k
        }
137
90.4k
        if (s->next) {
138
32.8k
            s->next->prev = s->prev;
139
32.8k
        }
140
        /* If we are the head, then we also need to reset the head */
141
90.4k
        if (drainableSubscribers == s) {
142
7.85k
            drainableSubscribers = s->next;
143
7.85k
        }
144
90.4k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unlinkDrainableSubscriber(uWS::Subscriber*)
Line
Count
Source
133
72.6k
    void unlinkDrainableSubscriber(Subscriber *s) {
134
72.6k
        if (s->prev) {
135
22.4k
            s->prev->next = s->next;
136
22.4k
        }
137
72.6k
        if (s->next) {
138
14.5k
            s->next->prev = s->prev;
139
14.5k
        }
140
        /* If we are the head, then we also need to reset the head */
141
72.6k
        if (drainableSubscribers == s) {
142
50.2k
            drainableSubscribers = s->next;
143
50.2k
        }
144
72.6k
    }
145
146
public:
147
148
22.3k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
149
150
22.3k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::TopicTree(std::__1::function<bool (uWS::Subscriber*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::IteratorFlags)>)
Line
Count
Source
148
2.99k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
149
150
2.99k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::TopicTree(std::__1::function<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)>)
Line
Count
Source
148
19.3k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
149
150
19.3k
    }
151
152
    /* Returns nullptr if not found */
153
15.9M
    Topic *lookupTopic(std::string_view topic) {
154
15.9M
        auto it = topics.find(topic);
155
15.9M
        if (it == topics.end()) {
156
11.3M
            return nullptr;
157
11.3M
        }
158
4.62M
        return it->second.get();
159
15.9M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
153
5.09M
    Topic *lookupTopic(std::string_view topic) {
154
5.09M
        auto it = topics.find(topic);
155
5.09M
        if (it == topics.end()) {
156
556k
            return nullptr;
157
556k
        }
158
4.53M
        return it->second.get();
159
5.09M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
153
10.8M
    Topic *lookupTopic(std::string_view topic) {
154
10.8M
        auto it = topics.find(topic);
155
10.8M
        if (it == topics.end()) {
156
10.7M
            return nullptr;
157
10.7M
        }
158
82.6k
        return it->second.get();
159
10.8M
    }
160
161
    /* Subscribe fails if we already are subscribed */
162
12.0M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
163
        /* Notify user that they are doing something wrong here */
164
12.0M
        checkIteratingSubscriber(s);
165
166
        /* Lookup or create new topic */
167
12.0M
        Topic *topicPtr = lookupTopic(topic);
168
12.0M
        if (!topicPtr) {
169
10.9M
            Topic *newTopic = new Topic(topic);
170
10.9M
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
171
10.9M
            topicPtr = newTopic;
172
10.9M
        }
173
174
        /* Insert us in topic, insert topic in us */
175
12.0M
        auto [it, inserted] = s->topics.insert(topicPtr);
176
12.0M
        if (!inserted) {
177
54.0k
            return nullptr;
178
54.0k
        }
179
12.0M
        topicPtr->insert(s);
180
181
        /* Success */
182
12.0M
        return topicPtr;
183
12.0M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
162
1.19M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
163
        /* Notify user that they are doing something wrong here */
164
1.19M
        checkIteratingSubscriber(s);
165
166
        /* Lookup or create new topic */
167
1.19M
        Topic *topicPtr = lookupTopic(topic);
168
1.19M
        if (!topicPtr) {
169
168k
            Topic *newTopic = new Topic(topic);
170
168k
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
171
168k
            topicPtr = newTopic;
172
168k
        }
173
174
        /* Insert us in topic, insert topic in us */
175
1.19M
        auto [it, inserted] = s->topics.insert(topicPtr);
176
1.19M
        if (!inserted) {
177
54.0k
            return nullptr;
178
54.0k
        }
179
1.14M
        topicPtr->insert(s);
180
181
        /* Success */
182
1.14M
        return topicPtr;
183
1.19M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
162
10.8M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
163
        /* Notify user that they are doing something wrong here */
164
10.8M
        checkIteratingSubscriber(s);
165
166
        /* Lookup or create new topic */
167
10.8M
        Topic *topicPtr = lookupTopic(topic);
168
10.8M
        if (!topicPtr) {
169
10.7M
            Topic *newTopic = new Topic(topic);
170
10.7M
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
171
10.7M
            topicPtr = newTopic;
172
10.7M
        }
173
174
        /* Insert us in topic, insert topic in us */
175
10.8M
        auto [it, inserted] = s->topics.insert(topicPtr);
176
10.8M
        if (!inserted) {
177
0
            return nullptr;
178
0
        }
179
10.8M
        topicPtr->insert(s);
180
181
        /* Success */
182
10.8M
        return topicPtr;
183
10.8M
    }
184
185
    /* Returns ok, last, newCount */
186
67.2k
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
187
        /* Notify user that they are doing something wrong here */
188
67.2k
        checkIteratingSubscriber(s);
189
190
        /* Lookup topic */
191
67.2k
        Topic *topicPtr = lookupTopic(topic);
192
67.2k
        if (!topicPtr) {
193
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
194
2.26k
            return {false, false, -1};
195
2.26k
        }
196
197
        /* Erase from our list first */
198
64.9k
        if (s->topics.erase(topicPtr) == 0) {
199
3.06k
            return {false, false, -1};
200
3.06k
        }
201
202
        /* Remove us from topic */
203
61.8k
        topicPtr->erase(s);
204
205
61.8k
        int newCount = (int) topicPtr->size();
206
207
        /* If there is no subscriber to this topic, remove it */
208
61.8k
        if (!topicPtr->size()) {
209
            /* Unique_ptr deletes the topic */
210
34.6k
            topics.erase(topic);
211
34.6k
        }
212
213
        /* If we don't hold any topics we are to be freed altogether */
214
61.8k
        return {true, s->topics.size() == 0, newCount};
215
64.9k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
186
66.6k
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
187
        /* Notify user that they are doing something wrong here */
188
66.6k
        checkIteratingSubscriber(s);
189
190
        /* Lookup topic */
191
66.6k
        Topic *topicPtr = lookupTopic(topic);
192
66.6k
        if (!topicPtr) {
193
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
194
1.70k
            return {false, false, -1};
195
1.70k
        }
196
197
        /* Erase from our list first */
198
64.9k
        if (s->topics.erase(topicPtr) == 0) {
199
3.06k
            return {false, false, -1};
200
3.06k
        }
201
202
        /* Remove us from topic */
203
61.8k
        topicPtr->erase(s);
204
205
61.8k
        int newCount = (int) topicPtr->size();
206
207
        /* If there is no subscriber to this topic, remove it */
208
61.8k
        if (!topicPtr->size()) {
209
            /* Unique_ptr deletes the topic */
210
34.6k
            topics.erase(topic);
211
34.6k
        }
212
213
        /* If we don't hold any topics we are to be freed altogether */
214
61.8k
        return {true, s->topics.size() == 0, newCount};
215
64.9k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
186
562
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
187
        /* Notify user that they are doing something wrong here */
188
562
        checkIteratingSubscriber(s);
189
190
        /* Lookup topic */
191
562
        Topic *topicPtr = lookupTopic(topic);
192
562
        if (!topicPtr) {
193
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
194
562
            return {false, false, -1};
195
562
        }
196
197
        /* Erase from our list first */
198
0
        if (s->topics.erase(topicPtr) == 0) {
199
0
            return {false, false, -1};
200
0
        }
201
202
        /* Remove us from topic */
203
0
        topicPtr->erase(s);
204
205
0
        int newCount = (int) topicPtr->size();
206
207
        /* If there is no subscriber to this topic, remove it */
208
0
        if (!topicPtr->size()) {
209
            /* Unique_ptr deletes the topic */
210
0
            topics.erase(topic);
211
0
        }
212
213
        /* If we don't hold any topics we are to be freed altogether */
214
0
        return {true, s->topics.size() == 0, newCount};
215
0
    }
216
217
    /* Factory function for creating a Subscriber */
218
1.27M
    Subscriber *createSubscriber() {
219
1.27M
        return new Subscriber();
220
1.27M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::createSubscriber()
Line
Count
Source
218
1.06M
    Subscriber *createSubscriber() {
219
1.06M
        return new Subscriber();
220
1.06M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::createSubscriber()
Line
Count
Source
218
213k
    Subscriber *createSubscriber() {
219
213k
        return new Subscriber();
220
213k
    }
221
222
    /* This is used to end a Subscriber, before freeing it */
223
1.35M
    void freeSubscriber(Subscriber *s) {
224
225
        /* I guess we call this one even if we are not subscribers */
226
1.35M
        if (!s) {
227
72.8k
            return;
228
72.8k
        }
229
230
        /* For all topics, unsubscribe */
231
11.9M
        for (Topic *topicPtr : s->topics) {
232
            /* If we are the last subscriber, simply remove the whole topic */
233
11.9M
            if (topicPtr->size() == 1) {
234
10.9M
                topics.erase(topicPtr->name);
235
10.9M
            } else {
236
                /* Otherwise just remove us */
237
1.02M
                topicPtr->erase(s);
238
1.02M
            }
239
11.9M
        }
240
241
        /* We also need to unlink us */
242
1.27M
        if (s->needsDrainage()) {
243
71.3k
            unlinkDrainableSubscriber(s);
244
71.3k
        }
245
246
1.27M
        delete s;
247
1.27M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::freeSubscriber(uWS::Subscriber*)
Line
Count
Source
223
1.06M
    void freeSubscriber(Subscriber *s) {
224
225
        /* I guess we call this one even if we are not subscribers */
226
1.06M
        if (!s) {
227
0
            return;
228
0
        }
229
230
        /* For all topics, unsubscribe */
231
1.07M
        for (Topic *topicPtr : s->topics) {
232
            /* If we are the last subscriber, simply remove the whole topic */
233
1.07M
            if (topicPtr->size() == 1) {
234
133k
                topics.erase(topicPtr->name);
235
945k
            } else {
236
                /* Otherwise just remove us */
237
945k
                topicPtr->erase(s);
238
945k
            }
239
1.07M
        }
240
241
        /* We also need to unlink us */
242
1.06M
        if (s->needsDrainage()) {
243
6.18k
            unlinkDrainableSubscriber(s);
244
6.18k
        }
245
246
1.06M
        delete s;
247
1.06M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::freeSubscriber(uWS::Subscriber*)
Line
Count
Source
223
286k
    void freeSubscriber(Subscriber *s) {
224
225
        /* I guess we call this one even if we are not subscribers */
226
286k
        if (!s) {
227
72.8k
            return;
228
72.8k
        }
229
230
        /* For all topics, unsubscribe */
231
10.8M
        for (Topic *topicPtr : s->topics) {
232
            /* If we are the last subscriber, simply remove the whole topic */
233
10.8M
            if (topicPtr->size() == 1) {
234
10.7M
                topics.erase(topicPtr->name);
235
10.7M
            } else {
236
                /* Otherwise just remove us */
237
82.6k
                topicPtr->erase(s);
238
82.6k
            }
239
10.8M
        }
240
241
        /* We also need to unlink us */
242
213k
        if (s->needsDrainage()) {
243
65.1k
            unlinkDrainableSubscriber(s);
244
65.1k
        }
245
246
213k
        delete s;
247
213k
    }
248
249
    /* Mainly used by WebSocket::send to drain one socket before sending */
250
239k
    void drain(Subscriber *s) {
251
        /* The list is undefined and cannot be touched unless needsDrainage(). */
252
239k
        if (s->needsDrainage()) {
253
            /* This function differs from drainImpl by properly unlinking
254
            * the subscriber from drainableSubscribers. drainImpl does not. */
255
91.7k
            unlinkDrainableSubscriber(s);
256
257
            /* This one always resets needsDrainage before it calls any cb's.
258
             * Otherwise we would stackoverflow when sending after publish but before drain. */
259
91.7k
            drainImpl(s);
260
            
261
            /* If we drained last subscriber, also clear outgoingMessages */
262
91.7k
            if (!drainableSubscribers) {
263
4.72k
                outgoingMessages.clear();
264
4.72k
            }
265
91.7k
        }
266
239k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain(uWS::Subscriber*)
Line
Count
Source
250
84.6k
    void drain(Subscriber *s) {
251
        /* The list is undefined and cannot be touched unless needsDrainage(). */
252
84.6k
        if (s->needsDrainage()) {
253
            /* This function differs from drainImpl by properly unlinking
254
            * the subscriber from drainableSubscribers. drainImpl does not. */
255
84.2k
            unlinkDrainableSubscriber(s);
256
257
            /* This one always resets needsDrainage before it calls any cb's.
258
             * Otherwise we would stackoverflow when sending after publish but before drain. */
259
84.2k
            drainImpl(s);
260
            
261
            /* If we drained last subscriber, also clear outgoingMessages */
262
84.2k
            if (!drainableSubscribers) {
263
680
                outgoingMessages.clear();
264
680
            }
265
84.2k
        }
266
84.6k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain(uWS::Subscriber*)
Line
Count
Source
250
155k
    void drain(Subscriber *s) {
251
        /* The list is undefined and cannot be touched unless needsDrainage(). */
252
155k
        if (s->needsDrainage()) {
253
            /* This function differs from drainImpl by properly unlinking
254
            * the subscriber from drainableSubscribers. drainImpl does not. */
255
7.50k
            unlinkDrainableSubscriber(s);
256
257
            /* This one always resets needsDrainage before it calls any cb's.
258
             * Otherwise we would stackoverflow when sending after publish but before drain. */
259
7.50k
            drainImpl(s);
260
            
261
            /* If we drained last subscriber, also clear outgoingMessages */
262
7.50k
            if (!drainableSubscribers) {
263
4.04k
                outgoingMessages.clear();
264
4.04k
            }
265
7.50k
        }
266
155k
    }
267
268
    /* Called everytime we call send, to drain published messages so to sync outgoing messages */
269
17.0M
    void drain() {
270
17.0M
        if (drainableSubscribers) {
271
            /* Drain one socket a time */
272
147k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
273
                /* Instead of unlinking every single subscriber, we just leave the list undefined
274
                 * and reset drainableSubscribers ptr below. */
275
130k
                drainImpl(s);
276
130k
            }
277
            /* Drain always clears drainableSubscribers and outgoingMessages */
278
16.9k
            drainableSubscribers = nullptr;
279
16.9k
            outgoingMessages.clear();
280
16.9k
        }
281
17.0M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain()
Line
Count
Source
269
202k
    void drain() {
270
202k
        if (drainableSubscribers) {
271
            /* Drain one socket a time */
272
122k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
273
                /* Instead of unlinking every single subscriber, we just leave the list undefined
274
                 * and reset drainableSubscribers ptr below. */
275
116k
                drainImpl(s);
276
116k
            }
277
            /* Drain always clears drainableSubscribers and outgoingMessages */
278
6.23k
            drainableSubscribers = nullptr;
279
6.23k
            outgoingMessages.clear();
280
6.23k
        }
281
202k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain()
Line
Count
Source
269
16.8M
    void drain() {
270
16.8M
        if (drainableSubscribers) {
271
            /* Drain one socket a time */
272
24.3k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
273
                /* Instead of unlinking every single subscriber, we just leave the list undefined
274
                 * and reset drainableSubscribers ptr below. */
275
13.7k
                drainImpl(s);
276
13.7k
            }
277
            /* Drain always clears drainableSubscribers and outgoingMessages */
278
10.6k
            drainableSubscribers = nullptr;
279
10.6k
            outgoingMessages.clear();
280
10.6k
        }
281
16.8M
    }
282
283
    /* Big messages bypass all buffering and land directly in backpressure */
284
    template <typename F>
285
0
    bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
286
        /* Do we even have this topic? */
287
0
        auto it = topics.find(topic);
288
0
        if (it == topics.end()) {
289
0
            return false;
290
0
        }
291
292
        /* For all subscribers in topic */
293
0
        for (Subscriber *s : *it->second) {
294
295
            /* If we are sender then ignore us */
296
0
            if (sender != s) {
297
0
                cb(s, bigMessage);
298
0
            }
299
0
        }
300
301
0
        return true;
302
0
    }
Unexecuted instantiation: bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1})
Unexecuted instantiation: EpollEchoServer.cpp:bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1})
303
304
    /* Linear in number of affected subscribers */
305
2.32M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
306
        /* Do we even have this topic? */
307
2.32M
        auto it = topics.find(topic);
308
2.32M
        if (it == topics.end()) {
309
22.9k
            return false;
310
22.9k
        }
311
312
        /* If we have more than 65k messages we need to drain every socket. */
313
2.30M
        if (outgoingMessages.size() == UINT16_MAX) {
314
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
315
             * to their own backpressure */
316
7
            drain();
317
7
        }
318
319
        /* If nobody references this message, don't buffer it */
320
2.30M
        bool referencedMessage = false;
321
322
        /* For all subscribers in topic */
323
4.20M
        for (Subscriber *s : *it->second) {
324
325
            /* If we are sender then ignore us */
326
4.20M
            if (sender != s) {
327
328
                /* At least one subscriber wants this message */
329
4.17M
                referencedMessage = true;
330
331
                /* If we already have too many outgoing messages on this subscriber, drain it now */
332
4.17M
                if (s->numMessageIndices == 32) {
333
                    /* This one does not need to check needsDrainage here but still does. */
334
85.1k
                    drain(s);
335
85.1k
                }
336
337
                /* Finally we can continue */
338
4.17M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
339
                /* First message adds subscriber to list of drainable subscribers */
340
4.17M
                if (s->numMessageIndices == 1) {
341
                    /* Insert us in the head of drainable subscribers */
342
293k
                    s->next = drainableSubscribers;
343
293k
                    s->prev = nullptr;
344
293k
                    if (s->next) {
345
226k
                        s->next->prev = s;
346
226k
                    }
347
293k
                    drainableSubscribers = s;
348
293k
                }
349
4.17M
            }
350
4.20M
        }
351
352
        /* Push this message and return with success */
353
2.30M
        if (referencedMessage) {
354
2.30M
            outgoingMessages.emplace_back(message);
355
2.30M
        }
356
357
        /* Success if someone wants it */
358
2.30M
        return referencedMessage;
359
2.32M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&&)
Line
Count
Source
305
1.28M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
306
        /* Do we even have this topic? */
307
1.28M
        auto it = topics.find(topic);
308
1.28M
        if (it == topics.end()) {
309
13.4k
            return false;
310
13.4k
        }
311
312
        /* If we have more than 65k messages we need to drain every socket. */
313
1.27M
        if (outgoingMessages.size() == UINT16_MAX) {
314
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
315
             * to their own backpressure */
316
3
            drain();
317
3
        }
318
319
        /* If nobody references this message, don't buffer it */
320
1.27M
        bool referencedMessage = false;
321
322
        /* For all subscribers in topic */
323
3.01M
        for (Subscriber *s : *it->second) {
324
325
            /* If we are sender then ignore us */
326
3.01M
            if (sender != s) {
327
328
                /* At least one subscriber wants this message */
329
3.01M
                referencedMessage = true;
330
331
                /* If we already have too many outgoing messages on this subscriber, drain it now */
332
3.01M
                if (s->numMessageIndices == 32) {
333
                    /* This one does not need to check needsDrainage here but still does. */
334
82.4k
                    drain(s);
335
82.4k
                }
336
337
                /* Finally we can continue */
338
3.01M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
339
                /* First message adds subscriber to list of drainable subscribers */
340
3.01M
                if (s->numMessageIndices == 1) {
341
                    /* Insert us in the head of drainable subscribers */
342
207k
                    s->next = drainableSubscribers;
343
207k
                    s->prev = nullptr;
344
207k
                    if (s->next) {
345
198k
                        s->next->prev = s;
346
198k
                    }
347
207k
                    drainableSubscribers = s;
348
207k
                }
349
3.01M
            }
350
3.01M
        }
351
352
        /* Push this message and return with success */
353
1.27M
        if (referencedMessage) {
354
1.27M
            outgoingMessages.emplace_back(message);
355
1.27M
        }
356
357
        /* Success if someone wants it */
358
1.27M
        return referencedMessage;
359
1.28M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeMessage&&)
Line
Count
Source
305
1.04M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
306
        /* Do we even have this topic? */
307
1.04M
        auto it = topics.find(topic);
308
1.04M
        if (it == topics.end()) {
309
9.43k
            return false;
310
9.43k
        }
311
312
        /* If we have more than 65k messages we need to drain every socket. */
313
1.03M
        if (outgoingMessages.size() == UINT16_MAX) {
314
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
315
             * to their own backpressure */
316
4
            drain();
317
4
        }
318
319
        /* If nobody references this message, don't buffer it */
320
1.03M
        bool referencedMessage = false;
321
322
        /* For all subscribers in topic */
323
1.19M
        for (Subscriber *s : *it->second) {
324
325
            /* If we are sender then ignore us */
326
1.19M
            if (sender != s) {
327
328
                /* At least one subscriber wants this message */
329
1.15M
                referencedMessage = true;
330
331
                /* If we already have too many outgoing messages on this subscriber, drain it now */
332
1.15M
                if (s->numMessageIndices == 32) {
333
                    /* This one does not need to check needsDrainage here but still does. */
334
2.70k
                    drain(s);
335
2.70k
                }
336
337
                /* Finally we can continue */
338
1.15M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
339
                /* First message adds subscriber to list of drainable subscribers */
340
1.15M
                if (s->numMessageIndices == 1) {
341
                    /* Insert us in the head of drainable subscribers */
342
86.3k
                    s->next = drainableSubscribers;
343
86.3k
                    s->prev = nullptr;
344
86.3k
                    if (s->next) {
345
27.1k
                        s->next->prev = s;
346
27.1k
                    }
347
86.3k
                    drainableSubscribers = s;
348
86.3k
                }
349
1.15M
            }
350
1.19M
        }
351
352
        /* Push this message and return with success */
353
1.03M
        if (referencedMessage) {
354
1.02M
            outgoingMessages.emplace_back(message);
355
1.02M
        }
356
357
        /* Success if someone wants it */
358
1.03M
        return referencedMessage;
359
1.04M
    }
360
};
361
362
}
363
364
#endif