Coverage Report

Created: 2024-04-25 06:10

/src/uWebSockets/src/TopicTree.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_TOPICTREE_H
19
#define UWS_TOPICTREE_H
20
21
#include <map>
22
#include <list>
23
#include <iostream>
24
#include <unordered_set>
25
#include <utility>
26
#include <memory>
27
#include <unordered_map>
28
#include <vector>
29
#include <string_view>
30
#include <functional>
31
#include <set>
32
#include <string>
33
34
namespace uWS {
35
36
struct Subscriber;
37
38
struct Topic : std::unordered_set<Subscriber *> {
39
40
10.7M
    Topic(std::string_view topic) : name(topic) {
41
42
10.7M
    }
43
44
    std::string name;
45
};
46
47
struct Subscriber {
48
49
    template <typename, typename> friend struct TopicTree;
50
51
private:
52
    /* We use a factory */
53
1.50M
    Subscriber() = default;
54
55
    /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */
56
    Subscriber *prev, *next;
57
58
    /* Any one subscriber can be part of at most 32 publishes before it needs a drain,
59
     * or whatever encoding of runs or whatever we might do in the future */
60
    uint16_t messageIndices[32];
61
62
    /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */
63
    unsigned char numMessageIndices = 0;
64
65
public:
66
67
    /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */
68
    std::set<Topic *> topics;
69
70
    /* User data */
71
    void *user;
72
73
1.66M
    bool needsDrainage() {
74
1.66M
        return numMessageIndices;
75
1.66M
    }
76
};
77
78
template <typename T, typename B>
79
struct TopicTree {
80
81
    enum IteratorFlags {
82
        LAST = 1,
83
        FIRST = 2
84
    };
85
86
    /* Whomever is iterating this topic is locked to not modify its own list */
87
    Subscriber *iteratingSubscriber = nullptr;
88
89
private:
90
91
    /* The drain callback must not publish, unsubscribe or subscribe.
92
     * It must only cork, uncork, send, write */
93
    std::function<bool(Subscriber *, T &, IteratorFlags)> cb;
94
95
    /* The topics */
96
    std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics;
97
98
    /* List of subscribers that needs drainage */
99
    Subscriber *drainableSubscribers = nullptr;
100
101
    /* Palette of outgoing messages, up to 64k */
102
    std::vector<T> outgoingMessages;
103
104
12.1M
    void checkIteratingSubscriber(Subscriber *s) {
105
        /* Notify user that they are doing something wrong here */
106
12.1M
        if (iteratingSubscriber == s) {
107
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
108
0
            std::terminate();
109
0
        }
110
12.1M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::checkIteratingSubscriber(uWS::Subscriber*)
Line
Count
Source
104
10.6M
    void checkIteratingSubscriber(Subscriber *s) {
105
        /* Notify user that they are doing something wrong here */
106
10.6M
        if (iteratingSubscriber == s) {
107
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
108
0
            std::terminate();
109
0
        }
110
10.6M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::checkIteratingSubscriber(uWS::Subscriber*)
Line
Count
Source
104
1.44M
    void checkIteratingSubscriber(Subscriber *s) {
105
        /* Notify user that they are doing something wrong here */
106
1.44M
        if (iteratingSubscriber == s) {
107
0
            std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl;
108
0
            std::terminate();
109
0
        }
110
1.44M
    }
111
112
    /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */
113
149k
    void drainImpl(Subscriber *s) {
114
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
115
         * since WebSocket::send will call drain from within the cb in that case.*/
116
149k
        int numMessageIndices = s->numMessageIndices;
117
149k
        s->numMessageIndices = 0;
118
119
        /* Then we emit cb */
120
2.07M
        for (int i = 0; i < numMessageIndices; i++) {
121
1.92M
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
122
123
1.92M
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
124
125
            /* Returning true will stop drainage short (such as when backpressure is too high) */
126
1.92M
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
127
1.08k
                break;
128
1.08k
            }
129
1.92M
        }
130
149k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drainImpl(uWS::Subscriber*)
Line
Count
Source
113
14.7k
    void drainImpl(Subscriber *s) {
114
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
115
         * since WebSocket::send will call drain from within the cb in that case.*/
116
14.7k
        int numMessageIndices = s->numMessageIndices;
117
14.7k
        s->numMessageIndices = 0;
118
119
        /* Then we emit cb */
120
98.7k
        for (int i = 0; i < numMessageIndices; i++) {
121
84.5k
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
122
123
84.5k
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
124
125
            /* Returning true will stop drainage short (such as when backpressure is too high) */
126
84.5k
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
127
495
                break;
128
495
            }
129
84.5k
        }
130
14.7k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drainImpl(uWS::Subscriber*)
Line
Count
Source
113
134k
    void drainImpl(Subscriber *s) {
114
        /* Before we call cb we need to make sure this subscriber will not report needsDrainage()
115
         * since WebSocket::send will call drain from within the cb in that case.*/
116
134k
        int numMessageIndices = s->numMessageIndices;
117
134k
        s->numMessageIndices = 0;
118
119
        /* Then we emit cb */
120
1.97M
        for (int i = 0; i < numMessageIndices; i++) {
121
1.84M
            T &outgoingMessage = outgoingMessages[s->messageIndices[i]];
122
123
1.84M
            int flags = (i == numMessageIndices - 1) ? LAST : 0;
124
125
            /* Returning true will stop drainage short (such as when backpressure is too high) */
126
1.84M
            if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) {
127
593
                break;
128
593
            }
129
1.84M
        }
130
134k
    }
131
132
108k
    void unlinkDrainableSubscriber(Subscriber *s) {
133
108k
        if (s->prev) {
134
57.0k
            s->prev->next = s->next;
135
57.0k
        }
136
108k
        if (s->next) {
137
44.8k
            s->next->prev = s->prev;
138
44.8k
        }
139
        /* If we are the head, then we also need to reset the head */
140
108k
        if (drainableSubscribers == s) {
141
51.7k
            drainableSubscribers = s->next;
142
51.7k
        }
143
108k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unlinkDrainableSubscriber(uWS::Subscriber*)
Line
Count
Source
132
50.7k
    void unlinkDrainableSubscriber(Subscriber *s) {
133
50.7k
        if (s->prev) {
134
2.96k
            s->prev->next = s->next;
135
2.96k
        }
136
50.7k
        if (s->next) {
137
3.31k
            s->next->prev = s->prev;
138
3.31k
        }
139
        /* If we are the head, then we also need to reset the head */
140
50.7k
        if (drainableSubscribers == s) {
141
47.8k
            drainableSubscribers = s->next;
142
47.8k
        }
143
50.7k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unlinkDrainableSubscriber(uWS::Subscriber*)
Line
Count
Source
132
57.9k
    void unlinkDrainableSubscriber(Subscriber *s) {
133
57.9k
        if (s->prev) {
134
54.1k
            s->prev->next = s->next;
135
54.1k
        }
136
57.9k
        if (s->next) {
137
41.5k
            s->next->prev = s->prev;
138
41.5k
        }
139
        /* If we are the head, then we also need to reset the head */
140
57.9k
        if (drainableSubscribers == s) {
141
3.88k
            drainableSubscribers = s->next;
142
3.88k
        }
143
57.9k
    }
144
145
public:
146
147
21.0k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
148
149
21.0k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::TopicTree(std::__1::function<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)>)
Line
Count
Source
147
17.6k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
148
149
17.6k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::TopicTree(std::__1::function<bool (uWS::Subscriber*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::IteratorFlags)>)
Line
Count
Source
147
3.40k
    TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) {
148
149
3.40k
    }
150
151
    /* Returns nullptr if not found */
152
15.8M
    Topic *lookupTopic(std::string_view topic) {
153
15.8M
        auto it = topics.find(topic);
154
15.8M
        if (it == topics.end()) {
155
11.1M
            return nullptr;
156
11.1M
        }
157
4.74M
        return it->second.get();
158
15.8M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
152
10.6M
    Topic *lookupTopic(std::string_view topic) {
153
10.6M
        auto it = topics.find(topic);
154
10.6M
        if (it == topics.end()) {
155
10.5M
            return nullptr;
156
10.5M
        }
157
103k
        return it->second.get();
158
10.6M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
152
5.19M
    Topic *lookupTopic(std::string_view topic) {
153
5.19M
        auto it = topics.find(topic);
154
5.19M
        if (it == topics.end()) {
155
554k
            return nullptr;
156
554k
        }
157
4.63M
        return it->second.get();
158
5.19M
    }
159
160
    /* Subscribe fails if we already are subscribed */
161
12.0M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
162
        /* Notify user that they are doing something wrong here */
163
12.0M
        checkIteratingSubscriber(s);
164
165
        /* Lookup or create new topic */
166
12.0M
        Topic *topicPtr = lookupTopic(topic);
167
12.0M
        if (!topicPtr) {
168
10.7M
            Topic *newTopic = new Topic(topic);
169
10.7M
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
170
10.7M
            topicPtr = newTopic;
171
10.7M
        }
172
173
        /* Insert us in topic, insert topic in us */
174
12.0M
        auto [it, inserted] = s->topics.insert(topicPtr);
175
12.0M
        if (!inserted) {
176
80.7k
            return nullptr;
177
80.7k
        }
178
11.9M
        topicPtr->insert(s);
179
180
        /* Success */
181
11.9M
        return topicPtr;
182
12.0M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
161
10.6M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
162
        /* Notify user that they are doing something wrong here */
163
10.6M
        checkIteratingSubscriber(s);
164
165
        /* Lookup or create new topic */
166
10.6M
        Topic *topicPtr = lookupTopic(topic);
167
10.6M
        if (!topicPtr) {
168
10.5M
            Topic *newTopic = new Topic(topic);
169
10.5M
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
170
10.5M
            topicPtr = newTopic;
171
10.5M
        }
172
173
        /* Insert us in topic, insert topic in us */
174
10.6M
        auto [it, inserted] = s->topics.insert(topicPtr);
175
10.6M
        if (!inserted) {
176
0
            return nullptr;
177
0
        }
178
10.6M
        topicPtr->insert(s);
179
180
        /* Success */
181
10.6M
        return topicPtr;
182
10.6M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
161
1.40M
    Topic *subscribe(Subscriber *s, std::string_view topic) {
162
        /* Notify user that they are doing something wrong here */
163
1.40M
        checkIteratingSubscriber(s);
164
165
        /* Lookup or create new topic */
166
1.40M
        Topic *topicPtr = lookupTopic(topic);
167
1.40M
        if (!topicPtr) {
168
157k
            Topic *newTopic = new Topic(topic);
169
157k
            topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)});
170
157k
            topicPtr = newTopic;
171
157k
        }
172
173
        /* Insert us in topic, insert topic in us */
174
1.40M
        auto [it, inserted] = s->topics.insert(topicPtr);
175
1.40M
        if (!inserted) {
176
80.7k
            return nullptr;
177
80.7k
        }
178
1.32M
        topicPtr->insert(s);
179
180
        /* Success */
181
1.32M
        return topicPtr;
182
1.40M
    }
183
184
    /* Returns ok, last, newCount */
185
43.5k
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
186
        /* Notify user that they are doing something wrong here */
187
43.5k
        checkIteratingSubscriber(s);
188
189
        /* Lookup topic */
190
43.5k
        Topic *topicPtr = lookupTopic(topic);
191
43.5k
        if (!topicPtr) {
192
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
193
1.57k
            return {false, false, -1};
194
1.57k
        }
195
196
        /* Erase from our list first */
197
41.9k
        if (s->topics.erase(topicPtr) == 0) {
198
848
            return {false, false, -1};
199
848
        }
200
201
        /* Remove us from topic */
202
41.0k
        topicPtr->erase(s);
203
204
41.0k
        int newCount = topicPtr->size();
205
206
        /* If there is no subscriber to this topic, remove it */
207
41.0k
        if (!topicPtr->size()) {
208
            /* Unique_ptr deletes the topic */
209
25.5k
            topics.erase(topic);
210
25.5k
        }
211
212
        /* If we don't hold any topics we are to be freed altogether */
213
41.0k
        return {true, s->topics.size() == 0, newCount};
214
41.9k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
185
1.01k
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
186
        /* Notify user that they are doing something wrong here */
187
1.01k
        checkIteratingSubscriber(s);
188
189
        /* Lookup topic */
190
1.01k
        Topic *topicPtr = lookupTopic(topic);
191
1.01k
        if (!topicPtr) {
192
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
193
1.01k
            return {false, false, -1};
194
1.01k
        }
195
196
        /* Erase from our list first */
197
0
        if (s->topics.erase(topicPtr) == 0) {
198
0
            return {false, false, -1};
199
0
        }
200
201
        /* Remove us from topic */
202
0
        topicPtr->erase(s);
203
204
0
        int newCount = topicPtr->size();
205
206
        /* If there is no subscriber to this topic, remove it */
207
0
        if (!topicPtr->size()) {
208
            /* Unique_ptr deletes the topic */
209
0
            topics.erase(topic);
210
0
        }
211
212
        /* If we don't hold any topics we are to be freed altogether */
213
0
        return {true, s->topics.size() == 0, newCount};
214
0
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
185
42.4k
    std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) {
186
        /* Notify user that they are doing something wrong here */
187
42.4k
        checkIteratingSubscriber(s);
188
189
        /* Lookup topic */
190
42.4k
        Topic *topicPtr = lookupTopic(topic);
191
42.4k
        if (!topicPtr) {
192
            /* If the topic doesn't exist we are assumed to still be subscribers of something */
193
562
            return {false, false, -1};
194
562
        }
195
196
        /* Erase from our list first */
197
41.9k
        if (s->topics.erase(topicPtr) == 0) {
198
848
            return {false, false, -1};
199
848
        }
200
201
        /* Remove us from topic */
202
41.0k
        topicPtr->erase(s);
203
204
41.0k
        int newCount = topicPtr->size();
205
206
        /* If there is no subscriber to this topic, remove it */
207
41.0k
        if (!topicPtr->size()) {
208
            /* Unique_ptr deletes the topic */
209
25.5k
            topics.erase(topic);
210
25.5k
        }
211
212
        /* If we don't hold any topics we are to be freed altogether */
213
41.0k
        return {true, s->topics.size() == 0, newCount};
214
41.9k
    }
215
216
    /* Factory function for creating a Subscriber */
217
1.50M
    Subscriber *createSubscriber() {
218
1.50M
        return new Subscriber();
219
1.50M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::createSubscriber()
Line
Count
Source
217
235k
    Subscriber *createSubscriber() {
218
235k
        return new Subscriber();
219
235k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::createSubscriber()
Line
Count
Source
217
1.26M
    Subscriber *createSubscriber() {
218
1.26M
        return new Subscriber();
219
1.26M
    }
220
221
    /* This is used to end a Subscriber, before freeing it */
222
1.57M
    void freeSubscriber(Subscriber *s) {
223
224
        /* I guess we call this one even if we are not subscribers */
225
1.57M
        if (!s) {
226
73.9k
            return;
227
73.9k
        }
228
229
        /* For all topics, unsubscribe */
230
11.9M
        for (Topic *topicPtr : s->topics) {
231
            /* If we are the last subscriber, simply remove the whole topic */
232
11.9M
            if (topicPtr->size() == 1) {
233
10.7M
                topics.erase(topicPtr->name);
234
10.7M
            } else {
235
                /* Otherwise just remove us */
236
1.25M
                topicPtr->erase(s);
237
1.25M
            }
238
11.9M
        }
239
240
        /* We also need to unlink us */
241
1.50M
        if (s->needsDrainage()) {
242
46.8k
            unlinkDrainableSubscriber(s);
243
46.8k
        }
244
245
1.50M
        delete s;
246
1.50M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::freeSubscriber(uWS::Subscriber*)
Line
Count
Source
222
309k
    void freeSubscriber(Subscriber *s) {
223
224
        /* I guess we call this one even if we are not subscribers */
225
309k
        if (!s) {
226
73.9k
            return;
227
73.9k
        }
228
229
        /* For all topics, unsubscribe */
230
10.6M
        for (Topic *topicPtr : s->topics) {
231
            /* If we are the last subscriber, simply remove the whole topic */
232
10.6M
            if (topicPtr->size() == 1) {
233
10.5M
                topics.erase(topicPtr->name);
234
10.5M
            } else {
235
                /* Otherwise just remove us */
236
103k
                topicPtr->erase(s);
237
103k
            }
238
10.6M
        }
239
240
        /* We also need to unlink us */
241
235k
        if (s->needsDrainage()) {
242
44.8k
            unlinkDrainableSubscriber(s);
243
44.8k
        }
244
245
235k
        delete s;
246
235k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::freeSubscriber(uWS::Subscriber*)
Line
Count
Source
222
1.26M
    void freeSubscriber(Subscriber *s) {
223
224
        /* I guess we call this one even if we are not subscribers */
225
1.26M
        if (!s) {
226
0
            return;
227
0
        }
228
229
        /* For all topics, unsubscribe */
230
1.28M
        for (Topic *topicPtr : s->topics) {
231
            /* If we are the last subscriber, simply remove the whole topic */
232
1.28M
            if (topicPtr->size() == 1) {
233
131k
                topics.erase(topicPtr->name);
234
1.14M
            } else {
235
                /* Otherwise just remove us */
236
1.14M
                topicPtr->erase(s);
237
1.14M
            }
238
1.28M
        }
239
240
        /* We also need to unlink us */
241
1.26M
        if (s->needsDrainage()) {
242
1.98k
            unlinkDrainableSubscriber(s);
243
1.98k
        }
244
245
1.26M
        delete s;
246
1.26M
    }
247
248
    /* Mainly used by WebSocket::send to drain one socket before sending */
249
159k
    void drain(Subscriber *s) {
250
        /* The list is undefined and cannot be touched unless needsDrainage(). */
251
159k
        if (s->needsDrainage()) {
252
            /* This function differs from drainImpl by properly unlinking
253
            * the subscriber from drainableSubscribers. drainImpl does not. */
254
61.9k
            unlinkDrainableSubscriber(s);
255
256
            /* This one always resets needsDrainage before it calls any cb's.
257
             * Otherwise we would stackoverflow when sending after publish but before drain. */
258
61.9k
            drainImpl(s);
259
            
260
            /* If we drained last subscriber, also clear outgoingMessages */
261
61.9k
            if (!drainableSubscribers) {
262
4.20k
                outgoingMessages.clear();
263
4.20k
            }
264
61.9k
        }
265
159k
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain(uWS::Subscriber*)
Line
Count
Source
249
103k
    void drain(Subscriber *s) {
250
        /* The list is undefined and cannot be touched unless needsDrainage(). */
251
103k
        if (s->needsDrainage()) {
252
            /* This function differs from drainImpl by properly unlinking
253
            * the subscriber from drainableSubscribers. drainImpl does not. */
254
5.91k
            unlinkDrainableSubscriber(s);
255
256
            /* This one always resets needsDrainage before it calls any cb's.
257
             * Otherwise we would stackoverflow when sending after publish but before drain. */
258
5.91k
            drainImpl(s);
259
            
260
            /* If we drained last subscriber, also clear outgoingMessages */
261
5.91k
            if (!drainableSubscribers) {
262
3.69k
                outgoingMessages.clear();
263
3.69k
            }
264
5.91k
        }
265
103k
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain(uWS::Subscriber*)
Line
Count
Source
249
56.3k
    void drain(Subscriber *s) {
250
        /* The list is undefined and cannot be touched unless needsDrainage(). */
251
56.3k
        if (s->needsDrainage()) {
252
            /* This function differs from drainImpl by properly unlinking
253
            * the subscriber from drainableSubscribers. drainImpl does not. */
254
56.0k
            unlinkDrainableSubscriber(s);
255
256
            /* This one always resets needsDrainage before it calls any cb's.
257
             * Otherwise we would stackoverflow when sending after publish but before drain. */
258
56.0k
            drainImpl(s);
259
            
260
            /* If we drained last subscriber, also clear outgoingMessages */
261
56.0k
            if (!drainableSubscribers) {
262
513
                outgoingMessages.clear();
263
513
            }
264
56.0k
        }
265
56.3k
    }
266
267
    /* Called everytime we call send, to drain published messages so to sync outgoing messages */
268
10.3M
    void drain() {
269
10.3M
        if (drainableSubscribers) {
270
            /* Drain one socket a time */
271
104k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
272
                /* Instead of unlinking every single subscriber, we just leave the list undefined
273
                 * and reset drainableSubscribers ptr below. */
274
87.4k
                drainImpl(s);
275
87.4k
            }
276
            /* Drain always clears drainableSubscribers and outgoingMessages */
277
16.9k
            drainableSubscribers = nullptr;
278
16.9k
            outgoingMessages.clear();
279
16.9k
        }
280
10.3M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain()
Line
Count
Source
268
10.1M
    void drain() {
269
10.1M
        if (drainableSubscribers) {
270
            /* Drain one socket a time */
271
15.8k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
272
                /* Instead of unlinking every single subscriber, we just leave the list undefined
273
                 * and reset drainableSubscribers ptr below. */
274
8.79k
                drainImpl(s);
275
8.79k
            }
276
            /* Drain always clears drainableSubscribers and outgoingMessages */
277
7.03k
            drainableSubscribers = nullptr;
278
7.03k
            outgoingMessages.clear();
279
7.03k
        }
280
10.1M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain()
Line
Count
Source
268
230k
    void drain() {
269
230k
        if (drainableSubscribers) {
270
            /* Drain one socket a time */
271
88.6k
            for (Subscriber *s = drainableSubscribers; s; s = s->next) {
272
                /* Instead of unlinking every single subscriber, we just leave the list undefined
273
                 * and reset drainableSubscribers ptr below. */
274
78.6k
                drainImpl(s);
275
78.6k
            }
276
            /* Drain always clears drainableSubscribers and outgoingMessages */
277
9.96k
            drainableSubscribers = nullptr;
278
9.96k
            outgoingMessages.clear();
279
9.96k
        }
280
230k
    }
281
282
    /* Big messages bypass all buffering and land directly in backpressure */
283
    template <typename F>
284
0
    bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) {
285
        /* Do we even have this topic? */
286
0
        auto it = topics.find(topic);
287
0
        if (it == topics.end()) {
288
0
            return false;
289
0
        }
290
291
        /* For all subscribers in topic */
292
0
        for (Subscriber *s : *it->second) {
293
294
            /* If we are sender then ignore us */
295
0
            if (sender != s) {
296
0
                cb(s, bigMessage);
297
0
            }
298
0
        }
299
300
0
        return true;
301
0
    }
Unexecuted instantiation: EpollEchoServer.cpp:bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1})
Unexecuted instantiation: bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1})
302
303
    /* Linear in number of affected subscribers */
304
1.91M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
305
        /* Do we even have this topic? */
306
1.91M
        auto it = topics.find(topic);
307
1.91M
        if (it == topics.end()) {
308
24.7k
            return false;
309
24.7k
        }
310
311
        /* If we have more than 65k messages we need to drain every socket. */
312
1.89M
        if (outgoingMessages.size() == UINT16_MAX) {
313
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
314
             * to their own backpressure */
315
7
            drain();
316
7
        }
317
318
        /* If nobody references this message, don't buffer it */
319
1.89M
        bool referencedMessage = false;
320
321
        /* For all subscribers in topic */
322
2.99M
        for (Subscriber *s : *it->second) {
323
324
            /* If we are sender then ignore us */
325
2.99M
            if (sender != s) {
326
327
                /* At least one subscriber wants this message */
328
2.97M
                referencedMessage = true;
329
330
                /* If we already have too many outgoing messages on this subscriber, drain it now */
331
2.97M
                if (s->numMessageIndices == 32) {
332
                    /* This one does not need to check needsDrainage here but still does. */
333
56.2k
                    drain(s);
334
56.2k
                }
335
336
                /* Finally we can continue */
337
2.97M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
338
                /* First message adds subscriber to list of drainable subscribers */
339
2.97M
                if (s->numMessageIndices == 1) {
340
                    /* Insert us in the head of drainable subscribers */
341
196k
                    s->next = drainableSubscribers;
342
196k
                    s->prev = nullptr;
343
196k
                    if (s->next) {
344
132k
                        s->next->prev = s;
345
132k
                    }
346
196k
                    drainableSubscribers = s;
347
196k
                }
348
2.97M
            }
349
2.99M
        }
350
351
        /* Push this message and return with success */
352
1.89M
        if (referencedMessage) {
353
1.89M
            outgoingMessages.emplace_back(message);
354
1.89M
        }
355
356
        /* Success if someone wants it */
357
1.89M
        return referencedMessage;
358
1.91M
    }
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeMessage&&)
Line
Count
Source
304
1.10M
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
305
        /* Do we even have this topic? */
306
1.10M
        auto it = topics.find(topic);
307
1.10M
        if (it == topics.end()) {
308
12.0k
            return false;
309
12.0k
        }
310
311
        /* If we have more than 65k messages we need to drain every socket. */
312
1.09M
        if (outgoingMessages.size() == UINT16_MAX) {
313
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
314
             * to their own backpressure */
315
4
            drain();
316
4
        }
317
318
        /* If nobody references this message, don't buffer it */
319
1.09M
        bool referencedMessage = false;
320
321
        /* For all subscribers in topic */
322
1.14M
        for (Subscriber *s : *it->second) {
323
324
            /* If we are sender then ignore us */
325
1.14M
            if (sender != s) {
326
327
                /* At least one subscriber wants this message */
328
1.12M
                referencedMessage = true;
329
330
                /* If we already have too many outgoing messages on this subscriber, drain it now */
331
1.12M
                if (s->numMessageIndices == 32) {
332
                    /* This one does not need to check needsDrainage here but still does. */
333
1.19k
                    drain(s);
334
1.19k
                }
335
336
                /* Finally we can continue */
337
1.12M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
338
                /* First message adds subscriber to list of drainable subscribers */
339
1.12M
                if (s->numMessageIndices == 1) {
340
                    /* Insert us in the head of drainable subscribers */
341
59.5k
                    s->next = drainableSubscribers;
342
59.5k
                    s->prev = nullptr;
343
59.5k
                    if (s->next) {
344
6.54k
                        s->next->prev = s;
345
6.54k
                    }
346
59.5k
                    drainableSubscribers = s;
347
59.5k
                }
348
1.12M
            }
349
1.14M
        }
350
351
        /* Push this message and return with success */
352
1.09M
        if (referencedMessage) {
353
1.09M
            outgoingMessages.emplace_back(message);
354
1.09M
        }
355
356
        /* Success if someone wants it */
357
1.09M
        return referencedMessage;
358
1.10M
    }
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&&)
Line
Count
Source
304
813k
    bool publish(Subscriber *sender, std::string_view topic, T &&message) {
305
        /* Do we even have this topic? */
306
813k
        auto it = topics.find(topic);
307
813k
        if (it == topics.end()) {
308
12.6k
            return false;
309
12.6k
        }
310
311
        /* If we have more than 65k messages we need to drain every socket. */
312
800k
        if (outgoingMessages.size() == UINT16_MAX) {
313
            /* If there is a socket that is currently corked, this will be ugly as all sockets will drain
314
             * to their own backpressure */
315
3
            drain();
316
3
        }
317
318
        /* If nobody references this message, don't buffer it */
319
800k
        bool referencedMessage = false;
320
321
        /* For all subscribers in topic */
322
1.85M
        for (Subscriber *s : *it->second) {
323
324
            /* If we are sender then ignore us */
325
1.85M
            if (sender != s) {
326
327
                /* At least one subscriber wants this message */
328
1.85M
                referencedMessage = true;
329
330
                /* If we already have too many outgoing messages on this subscriber, drain it now */
331
1.85M
                if (s->numMessageIndices == 32) {
332
                    /* This one does not need to check needsDrainage here but still does. */
333
55.0k
                    drain(s);
334
55.0k
                }
335
336
                /* Finally we can continue */
337
1.85M
                s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size();
338
                /* First message adds subscriber to list of drainable subscribers */
339
1.85M
                if (s->numMessageIndices == 1) {
340
                    /* Insert us in the head of drainable subscribers */
341
136k
                    s->next = drainableSubscribers;
342
136k
                    s->prev = nullptr;
343
136k
                    if (s->next) {
344
125k
                        s->next->prev = s;
345
125k
                    }
346
136k
                    drainableSubscribers = s;
347
136k
                }
348
1.85M
            }
349
1.85M
        }
350
351
        /* Push this message and return with success */
352
800k
        if (referencedMessage) {
353
800k
            outgoingMessages.emplace_back(message);
354
800k
        }
355
356
        /* Success if someone wants it */
357
800k
        return referencedMessage;
358
813k
    }
359
};
360
361
}
362
363
#endif