/src/uWebSockets/src/TopicTree.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_TOPICTREE_H |
19 | | #define UWS_TOPICTREE_H |
20 | | |
21 | | #include <map> |
22 | | #include <list> |
23 | | #include <iostream> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <memory> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | #include <string_view> |
30 | | #include <functional> |
31 | | #include <set> |
32 | | #include <string> |
33 | | |
34 | | namespace uWS { |
35 | | |
36 | | struct Subscriber; |
37 | | |
38 | | struct Topic : std::unordered_set<Subscriber *> { |
39 | | |
40 | 10.7M | Topic(std::string_view topic) : name(topic) { |
41 | | |
42 | 10.7M | } |
43 | | |
44 | | std::string name; |
45 | | }; |
46 | | |
47 | | struct Subscriber { |
48 | | |
49 | | template <typename, typename> friend struct TopicTree; |
50 | | |
51 | | private: |
52 | | /* We use a factory */ |
53 | 1.50M | Subscriber() = default; |
54 | | |
55 | | /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */ |
56 | | Subscriber *prev, *next; |
57 | | |
58 | | /* Any one subscriber can be part of at most 32 publishes before it needs a drain, |
59 | | * or whatever encoding of runs or whatever we might do in the future */ |
60 | | uint16_t messageIndices[32]; |
61 | | |
62 | | /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */ |
63 | | unsigned char numMessageIndices = 0; |
64 | | |
65 | | public: |
66 | | |
67 | | /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */ |
68 | | std::set<Topic *> topics; |
69 | | |
70 | | /* User data */ |
71 | | void *user; |
72 | | |
73 | 1.66M | bool needsDrainage() { |
74 | 1.66M | return numMessageIndices; |
75 | 1.66M | } |
76 | | }; |
77 | | |
78 | | template <typename T, typename B> |
79 | | struct TopicTree { |
80 | | |
81 | | enum IteratorFlags { |
82 | | LAST = 1, |
83 | | FIRST = 2 |
84 | | }; |
85 | | |
86 | | /* Whomever is iterating this topic is locked to not modify its own list */ |
87 | | Subscriber *iteratingSubscriber = nullptr; |
88 | | |
89 | | private: |
90 | | |
91 | | /* The drain callback must not publish, unsubscribe or subscribe. |
92 | | * It must only cork, uncork, send, write */ |
93 | | std::function<bool(Subscriber *, T &, IteratorFlags)> cb; |
94 | | |
95 | | /* The topics */ |
96 | | std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics; |
97 | | |
98 | | /* List of subscribers that needs drainage */ |
99 | | Subscriber *drainableSubscribers = nullptr; |
100 | | |
101 | | /* Palette of outgoing messages, up to 64k */ |
102 | | std::vector<T> outgoingMessages; |
103 | | |
104 | 12.1M | void checkIteratingSubscriber(Subscriber *s) { |
105 | | /* Notify user that they are doing something wrong here */ |
106 | 12.1M | if (iteratingSubscriber == s) { |
107 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; |
108 | 0 | std::terminate(); |
109 | 0 | } |
110 | 12.1M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::checkIteratingSubscriber(uWS::Subscriber*) Line | Count | Source | 104 | 10.6M | void checkIteratingSubscriber(Subscriber *s) { | 105 | | /* Notify user that they are doing something wrong here */ | 106 | 10.6M | if (iteratingSubscriber == s) { | 107 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; | 108 | 0 | std::terminate(); | 109 | 0 | } | 110 | 10.6M | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::checkIteratingSubscriber(uWS::Subscriber*) Line | Count | Source | 104 | 1.44M | void checkIteratingSubscriber(Subscriber *s) { | 105 | | /* Notify user that they are doing something wrong here */ | 106 | 1.44M | if (iteratingSubscriber == s) { | 107 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; | 108 | 0 | std::terminate(); | 109 | 0 | } | 110 | 1.44M | } |
|
111 | | |
112 | | /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */ |
113 | 149k | void drainImpl(Subscriber *s) { |
114 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() |
115 | | * since WebSocket::send will call drain from within the cb in that case.*/ |
116 | 149k | int numMessageIndices = s->numMessageIndices; |
117 | 149k | s->numMessageIndices = 0; |
118 | | |
119 | | /* Then we emit cb */ |
120 | 2.07M | for (int i = 0; i < numMessageIndices; i++) { |
121 | 1.92M | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; |
122 | | |
123 | 1.92M | int flags = (i == numMessageIndices - 1) ? LAST : 0; |
124 | | |
125 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ |
126 | 1.92M | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { |
127 | 1.08k | break; |
128 | 1.08k | } |
129 | 1.92M | } |
130 | 149k | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drainImpl(uWS::Subscriber*) Line | Count | Source | 113 | 14.7k | void drainImpl(Subscriber *s) { | 114 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() | 115 | | * since WebSocket::send will call drain from within the cb in that case.*/ | 116 | 14.7k | int numMessageIndices = s->numMessageIndices; | 117 | 14.7k | s->numMessageIndices = 0; | 118 | | | 119 | | /* Then we emit cb */ | 120 | 98.7k | for (int i = 0; i < numMessageIndices; i++) { | 121 | 84.5k | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; | 122 | | | 123 | 84.5k | int flags = (i == numMessageIndices - 1) ? LAST : 0; | 124 | | | 125 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ | 126 | 84.5k | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { | 127 | 495 | break; | 128 | 495 | } | 129 | 84.5k | } | 130 | 14.7k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drainImpl(uWS::Subscriber*) Line | Count | Source | 113 | 134k | void drainImpl(Subscriber *s) { | 114 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() | 115 | | * since WebSocket::send will call drain from within the cb in that case.*/ | 116 | 134k | int numMessageIndices = s->numMessageIndices; | 117 | 134k | s->numMessageIndices = 0; | 118 | | | 119 | | /* Then we emit cb */ | 120 | 1.97M | for (int i = 0; i < numMessageIndices; i++) { | 121 | 1.84M | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; | 122 | | | 123 | 1.84M | int flags = (i == numMessageIndices - 1) ? LAST : 0; | 124 | | | 125 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ | 126 | 1.84M | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { | 127 | 593 | break; | 128 | 593 | } | 129 | 1.84M | } | 130 | 134k | } |
|
131 | | |
132 | 108k | void unlinkDrainableSubscriber(Subscriber *s) { |
133 | 108k | if (s->prev) { |
134 | 57.0k | s->prev->next = s->next; |
135 | 57.0k | } |
136 | 108k | if (s->next) { |
137 | 44.8k | s->next->prev = s->prev; |
138 | 44.8k | } |
139 | | /* If we are the head, then we also need to reset the head */ |
140 | 108k | if (drainableSubscribers == s) { |
141 | 51.7k | drainableSubscribers = s->next; |
142 | 51.7k | } |
143 | 108k | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unlinkDrainableSubscriber(uWS::Subscriber*) Line | Count | Source | 132 | 50.7k | void unlinkDrainableSubscriber(Subscriber *s) { | 133 | 50.7k | if (s->prev) { | 134 | 2.96k | s->prev->next = s->next; | 135 | 2.96k | } | 136 | 50.7k | if (s->next) { | 137 | 3.31k | s->next->prev = s->prev; | 138 | 3.31k | } | 139 | | /* If we are the head, then we also need to reset the head */ | 140 | 50.7k | if (drainableSubscribers == s) { | 141 | 47.8k | drainableSubscribers = s->next; | 142 | 47.8k | } | 143 | 50.7k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unlinkDrainableSubscriber(uWS::Subscriber*) Line | Count | Source | 132 | 57.9k | void unlinkDrainableSubscriber(Subscriber *s) { | 133 | 57.9k | if (s->prev) { | 134 | 54.1k | s->prev->next = s->next; | 135 | 54.1k | } | 136 | 57.9k | if (s->next) { | 137 | 41.5k | s->next->prev = s->prev; | 138 | 41.5k | } | 139 | | /* If we are the head, then we also need to reset the head */ | 140 | 57.9k | if (drainableSubscribers == s) { | 141 | 3.88k | drainableSubscribers = s->next; | 142 | 3.88k | } | 143 | 57.9k | } |
|
144 | | |
145 | | public: |
146 | | |
147 | 21.0k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { |
148 | | |
149 | 21.0k | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::TopicTree(std::__1::function<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)>) Line | Count | Source | 147 | 17.6k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { | 148 | | | 149 | 17.6k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::TopicTree(std::__1::function<bool (uWS::Subscriber*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::IteratorFlags)>) Line | Count | Source | 147 | 3.40k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { | 148 | | | 149 | 3.40k | } |
|
150 | | |
151 | | /* Returns nullptr if not found */ |
152 | 15.8M | Topic *lookupTopic(std::string_view topic) { |
153 | 15.8M | auto it = topics.find(topic); |
154 | 15.8M | if (it == topics.end()) { |
155 | 11.1M | return nullptr; |
156 | 11.1M | } |
157 | 4.74M | return it->second.get(); |
158 | 15.8M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 152 | 10.6M | Topic *lookupTopic(std::string_view topic) { | 153 | 10.6M | auto it = topics.find(topic); | 154 | 10.6M | if (it == topics.end()) { | 155 | 10.5M | return nullptr; | 156 | 10.5M | } | 157 | 103k | return it->second.get(); | 158 | 10.6M | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 152 | 5.19M | Topic *lookupTopic(std::string_view topic) { | 153 | 5.19M | auto it = topics.find(topic); | 154 | 5.19M | if (it == topics.end()) { | 155 | 554k | return nullptr; | 156 | 554k | } | 157 | 4.63M | return it->second.get(); | 158 | 5.19M | } |
|
159 | | |
160 | | /* Subscribe fails if we already are subscribed */ |
161 | 12.0M | Topic *subscribe(Subscriber *s, std::string_view topic) { |
162 | | /* Notify user that they are doing something wrong here */ |
163 | 12.0M | checkIteratingSubscriber(s); |
164 | | |
165 | | /* Lookup or create new topic */ |
166 | 12.0M | Topic *topicPtr = lookupTopic(topic); |
167 | 12.0M | if (!topicPtr) { |
168 | 10.7M | Topic *newTopic = new Topic(topic); |
169 | 10.7M | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); |
170 | 10.7M | topicPtr = newTopic; |
171 | 10.7M | } |
172 | | |
173 | | /* Insert us in topic, insert topic in us */ |
174 | 12.0M | auto [it, inserted] = s->topics.insert(topicPtr); |
175 | 12.0M | if (!inserted) { |
176 | 80.7k | return nullptr; |
177 | 80.7k | } |
178 | 11.9M | topicPtr->insert(s); |
179 | | |
180 | | /* Success */ |
181 | 11.9M | return topicPtr; |
182 | 12.0M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 161 | 10.6M | Topic *subscribe(Subscriber *s, std::string_view topic) { | 162 | | /* Notify user that they are doing something wrong here */ | 163 | 10.6M | checkIteratingSubscriber(s); | 164 | | | 165 | | /* Lookup or create new topic */ | 166 | 10.6M | Topic *topicPtr = lookupTopic(topic); | 167 | 10.6M | if (!topicPtr) { | 168 | 10.5M | Topic *newTopic = new Topic(topic); | 169 | 10.5M | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); | 170 | 10.5M | topicPtr = newTopic; | 171 | 10.5M | } | 172 | | | 173 | | /* Insert us in topic, insert topic in us */ | 174 | 10.6M | auto [it, inserted] = s->topics.insert(topicPtr); | 175 | 10.6M | if (!inserted) { | 176 | 0 | return nullptr; | 177 | 0 | } | 178 | 10.6M | topicPtr->insert(s); | 179 | | | 180 | | /* Success */ | 181 | 10.6M | return topicPtr; | 182 | 10.6M | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 161 | 1.40M | Topic *subscribe(Subscriber *s, std::string_view topic) { | 162 | | /* Notify user that they are doing something wrong here */ | 163 | 1.40M | checkIteratingSubscriber(s); | 164 | | | 165 | | /* Lookup or create new topic */ | 166 | 1.40M | Topic *topicPtr = lookupTopic(topic); | 167 | 1.40M | if (!topicPtr) { | 168 | 157k | Topic *newTopic = new Topic(topic); | 169 | 157k | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); | 170 | 157k | topicPtr = newTopic; | 171 | 157k | } | 172 | | | 173 | | /* Insert us in topic, insert topic in us */ | 174 | 1.40M | auto [it, inserted] = s->topics.insert(topicPtr); | 175 | 1.40M | if (!inserted) { | 176 | 80.7k | return nullptr; | 177 | 80.7k | } | 178 | 1.32M | topicPtr->insert(s); | 179 | | | 180 | | /* Success */ | 181 | 1.32M | return topicPtr; | 182 | 1.40M | } |
|
183 | | |
184 | | /* Returns ok, last, newCount */ |
185 | 43.5k | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { |
186 | | /* Notify user that they are doing something wrong here */ |
187 | 43.5k | checkIteratingSubscriber(s); |
188 | | |
189 | | /* Lookup topic */ |
190 | 43.5k | Topic *topicPtr = lookupTopic(topic); |
191 | 43.5k | if (!topicPtr) { |
192 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ |
193 | 1.57k | return {false, false, -1}; |
194 | 1.57k | } |
195 | | |
196 | | /* Erase from our list first */ |
197 | 41.9k | if (s->topics.erase(topicPtr) == 0) { |
198 | 848 | return {false, false, -1}; |
199 | 848 | } |
200 | | |
201 | | /* Remove us from topic */ |
202 | 41.0k | topicPtr->erase(s); |
203 | | |
204 | 41.0k | int newCount = topicPtr->size(); |
205 | | |
206 | | /* If there is no subscriber to this topic, remove it */ |
207 | 41.0k | if (!topicPtr->size()) { |
208 | | /* Unique_ptr deletes the topic */ |
209 | 25.5k | topics.erase(topic); |
210 | 25.5k | } |
211 | | |
212 | | /* If we don't hold any topics we are to be freed altogether */ |
213 | 41.0k | return {true, s->topics.size() == 0, newCount}; |
214 | 41.9k | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 185 | 1.01k | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { | 186 | | /* Notify user that they are doing something wrong here */ | 187 | 1.01k | checkIteratingSubscriber(s); | 188 | | | 189 | | /* Lookup topic */ | 190 | 1.01k | Topic *topicPtr = lookupTopic(topic); | 191 | 1.01k | if (!topicPtr) { | 192 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ | 193 | 1.01k | return {false, false, -1}; | 194 | 1.01k | } | 195 | | | 196 | | /* Erase from our list first */ | 197 | 0 | if (s->topics.erase(topicPtr) == 0) { | 198 | 0 | return {false, false, -1}; | 199 | 0 | } | 200 | | | 201 | | /* Remove us from topic */ | 202 | 0 | topicPtr->erase(s); | 203 | |
| 204 | 0 | int newCount = topicPtr->size(); | 205 | | | 206 | | /* If there is no subscriber to this topic, remove it */ | 207 | 0 | if (!topicPtr->size()) { | 208 | | /* Unique_ptr deletes the topic */ | 209 | 0 | topics.erase(topic); | 210 | 0 | } | 211 | | | 212 | | /* If we don't hold any topics we are to be freed altogether */ | 213 | 0 | return {true, s->topics.size() == 0, newCount}; | 214 | 0 | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 185 | 42.4k | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { | 186 | | /* Notify user that they are doing something wrong here */ | 187 | 42.4k | checkIteratingSubscriber(s); | 188 | | | 189 | | /* Lookup topic */ | 190 | 42.4k | Topic *topicPtr = lookupTopic(topic); | 191 | 42.4k | if (!topicPtr) { | 192 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ | 193 | 562 | return {false, false, -1}; | 194 | 562 | } | 195 | | | 196 | | /* Erase from our list first */ | 197 | 41.9k | if (s->topics.erase(topicPtr) == 0) { | 198 | 848 | return {false, false, -1}; | 199 | 848 | } | 200 | | | 201 | | /* Remove us from topic */ | 202 | 41.0k | topicPtr->erase(s); | 203 | | | 204 | 41.0k | int newCount = topicPtr->size(); | 205 | | | 206 | | /* If there is no subscriber to this topic, remove it */ | 207 | 41.0k | if (!topicPtr->size()) { | 208 | | /* Unique_ptr deletes the topic */ | 209 | 25.5k | topics.erase(topic); | 210 | 25.5k | } | 211 | | | 212 | | /* If we don't hold any topics we are to be freed altogether */ | 213 | 41.0k | return {true, s->topics.size() == 0, newCount}; | 214 | 41.9k | } |
|
215 | | |
216 | | /* Factory function for creating a Subscriber */ |
217 | 1.50M | Subscriber *createSubscriber() { |
218 | 1.50M | return new Subscriber(); |
219 | 1.50M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::createSubscriber() Line | Count | Source | 217 | 235k | Subscriber *createSubscriber() { | 218 | 235k | return new Subscriber(); | 219 | 235k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::createSubscriber() Line | Count | Source | 217 | 1.26M | Subscriber *createSubscriber() { | 218 | 1.26M | return new Subscriber(); | 219 | 1.26M | } |
|
220 | | |
221 | | /* This is used to end a Subscriber, before freeing it */ |
222 | 1.57M | void freeSubscriber(Subscriber *s) { |
223 | | |
224 | | /* I guess we call this one even if we are not subscribers */ |
225 | 1.57M | if (!s) { |
226 | 73.9k | return; |
227 | 73.9k | } |
228 | | |
229 | | /* For all topics, unsubscribe */ |
230 | 11.9M | for (Topic *topicPtr : s->topics) { |
231 | | /* If we are the last subscriber, simply remove the whole topic */ |
232 | 11.9M | if (topicPtr->size() == 1) { |
233 | 10.7M | topics.erase(topicPtr->name); |
234 | 10.7M | } else { |
235 | | /* Otherwise just remove us */ |
236 | 1.25M | topicPtr->erase(s); |
237 | 1.25M | } |
238 | 11.9M | } |
239 | | |
240 | | /* We also need to unlink us */ |
241 | 1.50M | if (s->needsDrainage()) { |
242 | 46.8k | unlinkDrainableSubscriber(s); |
243 | 46.8k | } |
244 | | |
245 | 1.50M | delete s; |
246 | 1.50M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::freeSubscriber(uWS::Subscriber*) Line | Count | Source | 222 | 309k | void freeSubscriber(Subscriber *s) { | 223 | | | 224 | | /* I guess we call this one even if we are not subscribers */ | 225 | 309k | if (!s) { | 226 | 73.9k | return; | 227 | 73.9k | } | 228 | | | 229 | | /* For all topics, unsubscribe */ | 230 | 10.6M | for (Topic *topicPtr : s->topics) { | 231 | | /* If we are the last subscriber, simply remove the whole topic */ | 232 | 10.6M | if (topicPtr->size() == 1) { | 233 | 10.5M | topics.erase(topicPtr->name); | 234 | 10.5M | } else { | 235 | | /* Otherwise just remove us */ | 236 | 103k | topicPtr->erase(s); | 237 | 103k | } | 238 | 10.6M | } | 239 | | | 240 | | /* We also need to unlink us */ | 241 | 235k | if (s->needsDrainage()) { | 242 | 44.8k | unlinkDrainableSubscriber(s); | 243 | 44.8k | } | 244 | | | 245 | 235k | delete s; | 246 | 235k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::freeSubscriber(uWS::Subscriber*) Line | Count | Source | 222 | 1.26M | void freeSubscriber(Subscriber *s) { | 223 | | | 224 | | /* I guess we call this one even if we are not subscribers */ | 225 | 1.26M | if (!s) { | 226 | 0 | return; | 227 | 0 | } | 228 | | | 229 | | /* For all topics, unsubscribe */ | 230 | 1.28M | for (Topic *topicPtr : s->topics) { | 231 | | /* If we are the last subscriber, simply remove the whole topic */ | 232 | 1.28M | if (topicPtr->size() == 1) { | 233 | 131k | topics.erase(topicPtr->name); | 234 | 1.14M | } else { | 235 | | /* Otherwise just remove us */ | 236 | 1.14M | topicPtr->erase(s); | 237 | 1.14M | } | 238 | 1.28M | } | 239 | | | 240 | | /* We also need to unlink us */ | 241 | 1.26M | if (s->needsDrainage()) { | 242 | 1.98k | unlinkDrainableSubscriber(s); | 243 | 1.98k | } | 244 | | | 245 | 1.26M | delete s; | 246 | 1.26M | } |
|
247 | | |
248 | | /* Mainly used by WebSocket::send to drain one socket before sending */ |
249 | 159k | void drain(Subscriber *s) { |
250 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ |
251 | 159k | if (s->needsDrainage()) { |
252 | | /* This function differs from drainImpl by properly unlinking |
253 | | * the subscriber from drainableSubscribers. drainImpl does not. */ |
254 | 61.9k | unlinkDrainableSubscriber(s); |
255 | | |
256 | | /* This one always resets needsDrainage before it calls any cb's. |
257 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ |
258 | 61.9k | drainImpl(s); |
259 | | |
260 | | /* If we drained last subscriber, also clear outgoingMessages */ |
261 | 61.9k | if (!drainableSubscribers) { |
262 | 4.20k | outgoingMessages.clear(); |
263 | 4.20k | } |
264 | 61.9k | } |
265 | 159k | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain(uWS::Subscriber*) Line | Count | Source | 249 | 103k | void drain(Subscriber *s) { | 250 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ | 251 | 103k | if (s->needsDrainage()) { | 252 | | /* This function differs from drainImpl by properly unlinking | 253 | | * the subscriber from drainableSubscribers. drainImpl does not. */ | 254 | 5.91k | unlinkDrainableSubscriber(s); | 255 | | | 256 | | /* This one always resets needsDrainage before it calls any cb's. | 257 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ | 258 | 5.91k | drainImpl(s); | 259 | | | 260 | | /* If we drained last subscriber, also clear outgoingMessages */ | 261 | 5.91k | if (!drainableSubscribers) { | 262 | 3.69k | outgoingMessages.clear(); | 263 | 3.69k | } | 264 | 5.91k | } | 265 | 103k | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain(uWS::Subscriber*) Line | Count | Source | 249 | 56.3k | void drain(Subscriber *s) { | 250 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ | 251 | 56.3k | if (s->needsDrainage()) { | 252 | | /* This function differs from drainImpl by properly unlinking | 253 | | * the subscriber from drainableSubscribers. drainImpl does not. */ | 254 | 56.0k | unlinkDrainableSubscriber(s); | 255 | | | 256 | | /* This one always resets needsDrainage before it calls any cb's. | 257 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ | 258 | 56.0k | drainImpl(s); | 259 | | | 260 | | /* If we drained last subscriber, also clear outgoingMessages */ | 261 | 56.0k | if (!drainableSubscribers) { | 262 | 513 | outgoingMessages.clear(); | 263 | 513 | } | 264 | 56.0k | } | 265 | 56.3k | } |
|
266 | | |
267 | | /* Called everytime we call send, to drain published messages so to sync outgoing messages */ |
268 | 10.3M | void drain() { |
269 | 10.3M | if (drainableSubscribers) { |
270 | | /* Drain one socket a time */ |
271 | 104k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { |
272 | | /* Instead of unlinking every single subscriber, we just leave the list undefined |
273 | | * and reset drainableSubscribers ptr below. */ |
274 | 87.4k | drainImpl(s); |
275 | 87.4k | } |
276 | | /* Drain always clears drainableSubscribers and outgoingMessages */ |
277 | 16.9k | drainableSubscribers = nullptr; |
278 | 16.9k | outgoingMessages.clear(); |
279 | 16.9k | } |
280 | 10.3M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain() Line | Count | Source | 268 | 10.1M | void drain() { | 269 | 10.1M | if (drainableSubscribers) { | 270 | | /* Drain one socket a time */ | 271 | 15.8k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { | 272 | | /* Instead of unlinking every single subscriber, we just leave the list undefined | 273 | | * and reset drainableSubscribers ptr below. */ | 274 | 8.79k | drainImpl(s); | 275 | 8.79k | } | 276 | | /* Drain always clears drainableSubscribers and outgoingMessages */ | 277 | 7.03k | drainableSubscribers = nullptr; | 278 | 7.03k | outgoingMessages.clear(); | 279 | 7.03k | } | 280 | 10.1M | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain() Line | Count | Source | 268 | 230k | void drain() { | 269 | 230k | if (drainableSubscribers) { | 270 | | /* Drain one socket a time */ | 271 | 88.6k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { | 272 | | /* Instead of unlinking every single subscriber, we just leave the list undefined | 273 | | * and reset drainableSubscribers ptr below. */ | 274 | 78.6k | drainImpl(s); | 275 | 78.6k | } | 276 | | /* Drain always clears drainableSubscribers and outgoingMessages */ | 277 | 9.96k | drainableSubscribers = nullptr; | 278 | 9.96k | outgoingMessages.clear(); | 279 | 9.96k | } | 280 | 230k | } |
|
281 | | |
282 | | /* Big messages bypass all buffering and land directly in backpressure */ |
283 | | template <typename F> |
284 | 0 | bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { |
285 | | /* Do we even have this topic? */ |
286 | 0 | auto it = topics.find(topic); |
287 | 0 | if (it == topics.end()) { |
288 | 0 | return false; |
289 | 0 | } |
290 | | |
291 | | /* For all subscribers in topic */ |
292 | 0 | for (Subscriber *s : *it->second) { |
293 | | |
294 | | /* If we are sender then ignore us */ |
295 | 0 | if (sender != s) { |
296 | 0 | cb(s, bigMessage); |
297 | 0 | } |
298 | 0 | } |
299 | |
|
300 | 0 | return true; |
301 | 0 | } Unexecuted instantiation: EpollEchoServer.cpp:bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}) Unexecuted instantiation: bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}) |
302 | | |
303 | | /* Linear in number of affected subscribers */ |
304 | 1.91M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { |
305 | | /* Do we even have this topic? */ |
306 | 1.91M | auto it = topics.find(topic); |
307 | 1.91M | if (it == topics.end()) { |
308 | 24.7k | return false; |
309 | 24.7k | } |
310 | | |
311 | | /* If we have more than 65k messages we need to drain every socket. */ |
312 | 1.89M | if (outgoingMessages.size() == UINT16_MAX) { |
313 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain |
314 | | * to their own backpressure */ |
315 | 7 | drain(); |
316 | 7 | } |
317 | | |
318 | | /* If nobody references this message, don't buffer it */ |
319 | 1.89M | bool referencedMessage = false; |
320 | | |
321 | | /* For all subscribers in topic */ |
322 | 2.99M | for (Subscriber *s : *it->second) { |
323 | | |
324 | | /* If we are sender then ignore us */ |
325 | 2.99M | if (sender != s) { |
326 | | |
327 | | /* At least one subscriber wants this message */ |
328 | 2.97M | referencedMessage = true; |
329 | | |
330 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ |
331 | 2.97M | if (s->numMessageIndices == 32) { |
332 | | /* This one does not need to check needsDrainage here but still does. */ |
333 | 56.2k | drain(s); |
334 | 56.2k | } |
335 | | |
336 | | /* Finally we can continue */ |
337 | 2.97M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); |
338 | | /* First message adds subscriber to list of drainable subscribers */ |
339 | 2.97M | if (s->numMessageIndices == 1) { |
340 | | /* Insert us in the head of drainable subscribers */ |
341 | 196k | s->next = drainableSubscribers; |
342 | 196k | s->prev = nullptr; |
343 | 196k | if (s->next) { |
344 | 132k | s->next->prev = s; |
345 | 132k | } |
346 | 196k | drainableSubscribers = s; |
347 | 196k | } |
348 | 2.97M | } |
349 | 2.99M | } |
350 | | |
351 | | /* Push this message and return with success */ |
352 | 1.89M | if (referencedMessage) { |
353 | 1.89M | outgoingMessages.emplace_back(message); |
354 | 1.89M | } |
355 | | |
356 | | /* Success if someone wants it */ |
357 | 1.89M | return referencedMessage; |
358 | 1.91M | } uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeMessage&&) Line | Count | Source | 304 | 1.10M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { | 305 | | /* Do we even have this topic? */ | 306 | 1.10M | auto it = topics.find(topic); | 307 | 1.10M | if (it == topics.end()) { | 308 | 12.0k | return false; | 309 | 12.0k | } | 310 | | | 311 | | /* If we have more than 65k messages we need to drain every socket. */ | 312 | 1.09M | if (outgoingMessages.size() == UINT16_MAX) { | 313 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain | 314 | | * to their own backpressure */ | 315 | 4 | drain(); | 316 | 4 | } | 317 | | | 318 | | /* If nobody references this message, don't buffer it */ | 319 | 1.09M | bool referencedMessage = false; | 320 | | | 321 | | /* For all subscribers in topic */ | 322 | 1.14M | for (Subscriber *s : *it->second) { | 323 | | | 324 | | /* If we are sender then ignore us */ | 325 | 1.14M | if (sender != s) { | 326 | | | 327 | | /* At least one subscriber wants this message */ | 328 | 1.12M | referencedMessage = true; | 329 | | | 330 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ | 331 | 1.12M | if (s->numMessageIndices == 32) { | 332 | | /* This one does not need to check needsDrainage here but still does. */ | 333 | 1.19k | drain(s); | 334 | 1.19k | } | 335 | | | 336 | | /* Finally we can continue */ | 337 | 1.12M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); | 338 | | /* First message adds subscriber to list of drainable subscribers */ | 339 | 1.12M | if (s->numMessageIndices == 1) { | 340 | | /* Insert us in the head of drainable subscribers */ | 341 | 59.5k | s->next = drainableSubscribers; | 342 | 59.5k | s->prev = nullptr; | 343 | 59.5k | if (s->next) { | 344 | 6.54k | s->next->prev = s; | 345 | 6.54k | } | 346 | 59.5k | drainableSubscribers = s; | 347 | 59.5k | } | 348 | 1.12M | } | 349 | 1.14M | } | 350 | | | 351 | | /* Push this message and return with success */ | 352 | 1.09M | if (referencedMessage) { | 353 | 1.09M | outgoingMessages.emplace_back(message); | 354 | 1.09M | } | 355 | | | 356 | | /* Success if someone wants it */ | 357 | 1.09M | return referencedMessage; | 358 | 1.10M | } |
uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&&) Line | Count | Source | 304 | 813k | bool publish(Subscriber *sender, std::string_view topic, T &&message) { | 305 | | /* Do we even have this topic? */ | 306 | 813k | auto it = topics.find(topic); | 307 | 813k | if (it == topics.end()) { | 308 | 12.6k | return false; | 309 | 12.6k | } | 310 | | | 311 | | /* If we have more than 65k messages we need to drain every socket. */ | 312 | 800k | if (outgoingMessages.size() == UINT16_MAX) { | 313 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain | 314 | | * to their own backpressure */ | 315 | 3 | drain(); | 316 | 3 | } | 317 | | | 318 | | /* If nobody references this message, don't buffer it */ | 319 | 800k | bool referencedMessage = false; | 320 | | | 321 | | /* For all subscribers in topic */ | 322 | 1.85M | for (Subscriber *s : *it->second) { | 323 | | | 324 | | /* If we are sender then ignore us */ | 325 | 1.85M | if (sender != s) { | 326 | | | 327 | | /* At least one subscriber wants this message */ | 328 | 1.85M | referencedMessage = true; | 329 | | | 330 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ | 331 | 1.85M | if (s->numMessageIndices == 32) { | 332 | | /* This one does not need to check needsDrainage here but still does. */ | 333 | 55.0k | drain(s); | 334 | 55.0k | } | 335 | | | 336 | | /* Finally we can continue */ | 337 | 1.85M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); | 338 | | /* First message adds subscriber to list of drainable subscribers */ | 339 | 1.85M | if (s->numMessageIndices == 1) { | 340 | | /* Insert us in the head of drainable subscribers */ | 341 | 136k | s->next = drainableSubscribers; | 342 | 136k | s->prev = nullptr; | 343 | 136k | if (s->next) { | 344 | 125k | s->next->prev = s; | 345 | 125k | } | 346 | 136k | drainableSubscribers = s; | 347 | 136k | } | 348 | 1.85M | } | 349 | 1.85M | } | 350 | | | 351 | | /* Push this message and return with success */ | 352 | 800k | if (referencedMessage) { | 353 | 800k | outgoingMessages.emplace_back(message); | 354 | 800k | } | 355 | | | 356 | | /* Success if someone wants it */ | 357 | 800k | return referencedMessage; | 358 | 813k | } |
|
359 | | }; |
360 | | |
361 | | } |
362 | | |
363 | | #endif |