/src/uWebSockets/src/TopicTree.h
Line | Count | Source |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_TOPICTREE_H |
19 | | #define UWS_TOPICTREE_H |
20 | | |
21 | | #include <map> |
22 | | #include <list> |
23 | | #include <iostream> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <memory> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | #include <string_view> |
30 | | #include <functional> |
31 | | #include <set> |
32 | | #include <string> |
33 | | #include <exception> |
34 | | |
35 | | namespace uWS { |
36 | | |
37 | | struct Subscriber; |
38 | | |
39 | | struct Topic : std::unordered_set<Subscriber *> { |
40 | | |
41 | 10.9M | Topic(std::string_view topic) : name(topic) { |
42 | | |
43 | 10.9M | } |
44 | | |
45 | | std::string name; |
46 | | }; |
47 | | |
48 | | struct Subscriber { |
49 | | |
50 | | template <typename, typename> friend struct TopicTree; |
51 | | |
52 | | private: |
53 | | /* We use a factory */ |
54 | 1.27M | Subscriber() = default; |
55 | | |
56 | | /* State of prev, next does not matter unless we are needsDrainage() since we are not in the list */ |
57 | | Subscriber *prev, *next; |
58 | | |
59 | | /* Any one subscriber can be part of at most 32 publishes before it needs a drain, |
60 | | * or whatever encoding of runs or whatever we might do in the future */ |
61 | | uint16_t messageIndices[32]; |
62 | | |
63 | | /* This one matters the most, if it is 0 we are not in the list of drainableSubscribers */ |
64 | | unsigned char numMessageIndices = 0; |
65 | | |
66 | | public: |
67 | | |
68 | | /* We have a list of topics we subscribe to (read by WebSocket::iterateTopics) */ |
69 | | std::set<Topic *> topics; |
70 | | |
71 | | /* User data */ |
72 | | void *user; |
73 | | |
74 | 1.51M | bool needsDrainage() { |
75 | 1.51M | return numMessageIndices; |
76 | 1.51M | } |
77 | | }; |
78 | | |
79 | | template <typename T, typename B> |
80 | | struct TopicTree { |
81 | | |
82 | | enum IteratorFlags { |
83 | | LAST = 1, |
84 | | FIRST = 2 |
85 | | }; |
86 | | |
87 | | /* Whomever is iterating this topic is locked to not modify its own list */ |
88 | | Subscriber *iteratingSubscriber = nullptr; |
89 | | |
90 | | private: |
91 | | |
92 | | /* The drain callback must not publish, unsubscribe or subscribe. |
93 | | * It must only cork, uncork, send, write */ |
94 | | std::function<bool(Subscriber *, T &, IteratorFlags)> cb; |
95 | | |
96 | | /* The topics */ |
97 | | std::unordered_map<std::string_view, std::unique_ptr<Topic>> topics; |
98 | | |
99 | | /* List of subscribers that needs drainage */ |
100 | | Subscriber *drainableSubscribers = nullptr; |
101 | | |
102 | | /* Palette of outgoing messages, up to 64k */ |
103 | | std::vector<T> outgoingMessages; |
104 | | |
105 | 12.1M | void checkIteratingSubscriber(Subscriber *s) { |
106 | | /* Notify user that they are doing something wrong here */ |
107 | 12.1M | if (iteratingSubscriber == s) { |
108 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; |
109 | 0 | std::terminate(); |
110 | 0 | } |
111 | 12.1M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::checkIteratingSubscriber(uWS::Subscriber*) Line | Count | Source | 105 | 1.26M | void checkIteratingSubscriber(Subscriber *s) { | 106 | | /* Notify user that they are doing something wrong here */ | 107 | 1.26M | if (iteratingSubscriber == s) { | 108 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; | 109 | 0 | std::terminate(); | 110 | 0 | } | 111 | 1.26M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::checkIteratingSubscriber(uWS::Subscriber*) Line | Count | Source | 105 | 10.8M | void checkIteratingSubscriber(Subscriber *s) { | 106 | | /* Notify user that they are doing something wrong here */ | 107 | 10.8M | if (iteratingSubscriber == s) { | 108 | 0 | std::cerr << "Error: WebSocket must not subscribe or unsubscribe to topics while iterating its topics!" << std::endl; | 109 | 0 | std::terminate(); | 110 | 0 | } | 111 | 10.8M | } |
|
112 | | |
113 | | /* Warning: does NOT unlink from drainableSubscribers or modify next, prev. */ |
114 | 222k | void drainImpl(Subscriber *s) { |
115 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() |
116 | | * since WebSocket::send will call drain from within the cb in that case.*/ |
117 | 222k | int numMessageIndices = s->numMessageIndices; |
118 | 222k | s->numMessageIndices = 0; |
119 | | |
120 | | /* Then we emit cb */ |
121 | 3.35M | for (int i = 0; i < numMessageIndices; i++) { |
122 | 3.13M | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; |
123 | | |
124 | 3.13M | int flags = (i == numMessageIndices - 1) ? LAST : 0; |
125 | | |
126 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ |
127 | 3.13M | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { |
128 | 2.44k | break; |
129 | 2.44k | } |
130 | 3.13M | } |
131 | 222k | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drainImpl(uWS::Subscriber*) Line | Count | Source | 114 | 200k | void drainImpl(Subscriber *s) { | 115 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() | 116 | | * since WebSocket::send will call drain from within the cb in that case.*/ | 117 | 200k | int numMessageIndices = s->numMessageIndices; | 118 | 200k | s->numMessageIndices = 0; | 119 | | | 120 | | /* Then we emit cb */ | 121 | 3.20M | for (int i = 0; i < numMessageIndices; i++) { | 122 | 3.00M | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; | 123 | | | 124 | 3.00M | int flags = (i == numMessageIndices - 1) ? LAST : 0; | 125 | | | 126 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ | 127 | 3.00M | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { | 128 | 1.39k | break; | 129 | 1.39k | } | 130 | 3.00M | } | 131 | 200k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drainImpl(uWS::Subscriber*) Line | Count | Source | 114 | 21.2k | void drainImpl(Subscriber *s) { | 115 | | /* Before we call cb we need to make sure this subscriber will not report needsDrainage() | 116 | | * since WebSocket::send will call drain from within the cb in that case.*/ | 117 | 21.2k | int numMessageIndices = s->numMessageIndices; | 118 | 21.2k | s->numMessageIndices = 0; | 119 | | | 120 | | /* Then we emit cb */ | 121 | 155k | for (int i = 0; i < numMessageIndices; i++) { | 122 | 135k | T &outgoingMessage = outgoingMessages[s->messageIndices[i]]; | 123 | | | 124 | 135k | int flags = (i == numMessageIndices - 1) ? LAST : 0; | 125 | | | 126 | | /* Returning true will stop drainage short (such as when backpressure is too high) */ | 127 | 135k | if (cb(s, outgoingMessage, (IteratorFlags)(flags | (i == 0 ? FIRST : 0)))) { | 128 | 1.04k | break; | 129 | 1.04k | } | 130 | 135k | } | 131 | 21.2k | } |
|
132 | | |
133 | 163k | void unlinkDrainableSubscriber(Subscriber *s) { |
134 | 163k | if (s->prev) { |
135 | 105k | s->prev->next = s->next; |
136 | 105k | } |
137 | 163k | if (s->next) { |
138 | 47.3k | s->next->prev = s->prev; |
139 | 47.3k | } |
140 | | /* If we are the head, then we also need to reset the head */ |
141 | 163k | if (drainableSubscribers == s) { |
142 | 58.0k | drainableSubscribers = s->next; |
143 | 58.0k | } |
144 | 163k | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unlinkDrainableSubscriber(uWS::Subscriber*) Line | Count | Source | 133 | 90.4k | void unlinkDrainableSubscriber(Subscriber *s) { | 134 | 90.4k | if (s->prev) { | 135 | 82.6k | s->prev->next = s->next; | 136 | 82.6k | } | 137 | 90.4k | if (s->next) { | 138 | 32.8k | s->next->prev = s->prev; | 139 | 32.8k | } | 140 | | /* If we are the head, then we also need to reset the head */ | 141 | 90.4k | if (drainableSubscribers == s) { | 142 | 7.85k | drainableSubscribers = s->next; | 143 | 7.85k | } | 144 | 90.4k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unlinkDrainableSubscriber(uWS::Subscriber*) Line | Count | Source | 133 | 72.6k | void unlinkDrainableSubscriber(Subscriber *s) { | 134 | 72.6k | if (s->prev) { | 135 | 22.4k | s->prev->next = s->next; | 136 | 22.4k | } | 137 | 72.6k | if (s->next) { | 138 | 14.5k | s->next->prev = s->prev; | 139 | 14.5k | } | 140 | | /* If we are the head, then we also need to reset the head */ | 141 | 72.6k | if (drainableSubscribers == s) { | 142 | 50.2k | drainableSubscribers = s->next; | 143 | 50.2k | } | 144 | 72.6k | } |
|
145 | | |
146 | | public: |
147 | | |
148 | 22.3k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { |
149 | | |
150 | 22.3k | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::TopicTree(std::__1::function<bool (uWS::Subscriber*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::IteratorFlags)>) Line | Count | Source | 148 | 2.99k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { | 149 | | | 150 | 2.99k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::TopicTree(std::__1::function<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)>) Line | Count | Source | 148 | 19.3k | TopicTree(std::function<bool(Subscriber *, T &, IteratorFlags)> cb) : cb(cb) { | 149 | | | 150 | 19.3k | } |
|
151 | | |
152 | | /* Returns nullptr if not found */ |
153 | 15.9M | Topic *lookupTopic(std::string_view topic) { |
154 | 15.9M | auto it = topics.find(topic); |
155 | 15.9M | if (it == topics.end()) { |
156 | 11.3M | return nullptr; |
157 | 11.3M | } |
158 | 4.62M | return it->second.get(); |
159 | 15.9M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 153 | 5.09M | Topic *lookupTopic(std::string_view topic) { | 154 | 5.09M | auto it = topics.find(topic); | 155 | 5.09M | if (it == topics.end()) { | 156 | 556k | return nullptr; | 157 | 556k | } | 158 | 4.53M | return it->second.get(); | 159 | 5.09M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::lookupTopic(std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 153 | 10.8M | Topic *lookupTopic(std::string_view topic) { | 154 | 10.8M | auto it = topics.find(topic); | 155 | 10.8M | if (it == topics.end()) { | 156 | 10.7M | return nullptr; | 157 | 10.7M | } | 158 | 82.6k | return it->second.get(); | 159 | 10.8M | } |
|
160 | | |
161 | | /* Subscribe fails if we already are subscribed */ |
162 | 12.0M | Topic *subscribe(Subscriber *s, std::string_view topic) { |
163 | | /* Notify user that they are doing something wrong here */ |
164 | 12.0M | checkIteratingSubscriber(s); |
165 | | |
166 | | /* Lookup or create new topic */ |
167 | 12.0M | Topic *topicPtr = lookupTopic(topic); |
168 | 12.0M | if (!topicPtr) { |
169 | 10.9M | Topic *newTopic = new Topic(topic); |
170 | 10.9M | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); |
171 | 10.9M | topicPtr = newTopic; |
172 | 10.9M | } |
173 | | |
174 | | /* Insert us in topic, insert topic in us */ |
175 | 12.0M | auto [it, inserted] = s->topics.insert(topicPtr); |
176 | 12.0M | if (!inserted) { |
177 | 54.0k | return nullptr; |
178 | 54.0k | } |
179 | 12.0M | topicPtr->insert(s); |
180 | | |
181 | | /* Success */ |
182 | 12.0M | return topicPtr; |
183 | 12.0M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 162 | 1.19M | Topic *subscribe(Subscriber *s, std::string_view topic) { | 163 | | /* Notify user that they are doing something wrong here */ | 164 | 1.19M | checkIteratingSubscriber(s); | 165 | | | 166 | | /* Lookup or create new topic */ | 167 | 1.19M | Topic *topicPtr = lookupTopic(topic); | 168 | 1.19M | if (!topicPtr) { | 169 | 168k | Topic *newTopic = new Topic(topic); | 170 | 168k | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); | 171 | 168k | topicPtr = newTopic; | 172 | 168k | } | 173 | | | 174 | | /* Insert us in topic, insert topic in us */ | 175 | 1.19M | auto [it, inserted] = s->topics.insert(topicPtr); | 176 | 1.19M | if (!inserted) { | 177 | 54.0k | return nullptr; | 178 | 54.0k | } | 179 | 1.14M | topicPtr->insert(s); | 180 | | | 181 | | /* Success */ | 182 | 1.14M | return topicPtr; | 183 | 1.19M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::subscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 162 | 10.8M | Topic *subscribe(Subscriber *s, std::string_view topic) { | 163 | | /* Notify user that they are doing something wrong here */ | 164 | 10.8M | checkIteratingSubscriber(s); | 165 | | | 166 | | /* Lookup or create new topic */ | 167 | 10.8M | Topic *topicPtr = lookupTopic(topic); | 168 | 10.8M | if (!topicPtr) { | 169 | 10.7M | Topic *newTopic = new Topic(topic); | 170 | 10.7M | topics.insert({std::string_view(newTopic->name.data(), newTopic->name.length()), std::unique_ptr<Topic>(newTopic)}); | 171 | 10.7M | topicPtr = newTopic; | 172 | 10.7M | } | 173 | | | 174 | | /* Insert us in topic, insert topic in us */ | 175 | 10.8M | auto [it, inserted] = s->topics.insert(topicPtr); | 176 | 10.8M | if (!inserted) { | 177 | 0 | return nullptr; | 178 | 0 | } | 179 | 10.8M | topicPtr->insert(s); | 180 | | | 181 | | /* Success */ | 182 | 10.8M | return topicPtr; | 183 | 10.8M | } |
|
184 | | |
185 | | /* Returns ok, last, newCount */ |
186 | 67.2k | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { |
187 | | /* Notify user that they are doing something wrong here */ |
188 | 67.2k | checkIteratingSubscriber(s); |
189 | | |
190 | | /* Lookup topic */ |
191 | 67.2k | Topic *topicPtr = lookupTopic(topic); |
192 | 67.2k | if (!topicPtr) { |
193 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ |
194 | 2.26k | return {false, false, -1}; |
195 | 2.26k | } |
196 | | |
197 | | /* Erase from our list first */ |
198 | 64.9k | if (s->topics.erase(topicPtr) == 0) { |
199 | 3.06k | return {false, false, -1}; |
200 | 3.06k | } |
201 | | |
202 | | /* Remove us from topic */ |
203 | 61.8k | topicPtr->erase(s); |
204 | | |
205 | 61.8k | int newCount = (int) topicPtr->size(); |
206 | | |
207 | | /* If there is no subscriber to this topic, remove it */ |
208 | 61.8k | if (!topicPtr->size()) { |
209 | | /* Unique_ptr deletes the topic */ |
210 | 34.6k | topics.erase(topic); |
211 | 34.6k | } |
212 | | |
213 | | /* If we don't hold any topics we are to be freed altogether */ |
214 | 61.8k | return {true, s->topics.size() == 0, newCount}; |
215 | 64.9k | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 186 | 66.6k | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { | 187 | | /* Notify user that they are doing something wrong here */ | 188 | 66.6k | checkIteratingSubscriber(s); | 189 | | | 190 | | /* Lookup topic */ | 191 | 66.6k | Topic *topicPtr = lookupTopic(topic); | 192 | 66.6k | if (!topicPtr) { | 193 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ | 194 | 1.70k | return {false, false, -1}; | 195 | 1.70k | } | 196 | | | 197 | | /* Erase from our list first */ | 198 | 64.9k | if (s->topics.erase(topicPtr) == 0) { | 199 | 3.06k | return {false, false, -1}; | 200 | 3.06k | } | 201 | | | 202 | | /* Remove us from topic */ | 203 | 61.8k | topicPtr->erase(s); | 204 | | | 205 | 61.8k | int newCount = (int) topicPtr->size(); | 206 | | | 207 | | /* If there is no subscriber to this topic, remove it */ | 208 | 61.8k | if (!topicPtr->size()) { | 209 | | /* Unique_ptr deletes the topic */ | 210 | 34.6k | topics.erase(topic); | 211 | 34.6k | } | 212 | | | 213 | | /* If we don't hold any topics we are to be freed altogether */ | 214 | 61.8k | return {true, s->topics.size() == 0, newCount}; | 215 | 64.9k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::unsubscribe(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 186 | 562 | std::tuple<bool, bool, int> unsubscribe(Subscriber *s, std::string_view topic) { | 187 | | /* Notify user that they are doing something wrong here */ | 188 | 562 | checkIteratingSubscriber(s); | 189 | | | 190 | | /* Lookup topic */ | 191 | 562 | Topic *topicPtr = lookupTopic(topic); | 192 | 562 | if (!topicPtr) { | 193 | | /* If the topic doesn't exist we are assumed to still be subscribers of something */ | 194 | 562 | return {false, false, -1}; | 195 | 562 | } | 196 | | | 197 | | /* Erase from our list first */ | 198 | 0 | if (s->topics.erase(topicPtr) == 0) { | 199 | 0 | return {false, false, -1}; | 200 | 0 | } | 201 | | | 202 | | /* Remove us from topic */ | 203 | 0 | topicPtr->erase(s); | 204 | |
| 205 | 0 | int newCount = (int) topicPtr->size(); | 206 | | | 207 | | /* If there is no subscriber to this topic, remove it */ | 208 | 0 | if (!topicPtr->size()) { | 209 | | /* Unique_ptr deletes the topic */ | 210 | 0 | topics.erase(topic); | 211 | 0 | } | 212 | | | 213 | | /* If we don't hold any topics we are to be freed altogether */ | 214 | 0 | return {true, s->topics.size() == 0, newCount}; | 215 | 0 | } |
|
216 | | |
217 | | /* Factory function for creating a Subscriber */ |
218 | 1.27M | Subscriber *createSubscriber() { |
219 | 1.27M | return new Subscriber(); |
220 | 1.27M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::createSubscriber() Line | Count | Source | 218 | 1.06M | Subscriber *createSubscriber() { | 219 | 1.06M | return new Subscriber(); | 220 | 1.06M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::createSubscriber() Line | Count | Source | 218 | 213k | Subscriber *createSubscriber() { | 219 | 213k | return new Subscriber(); | 220 | 213k | } |
|
221 | | |
222 | | /* This is used to end a Subscriber, before freeing it */ |
223 | 1.35M | void freeSubscriber(Subscriber *s) { |
224 | | |
225 | | /* I guess we call this one even if we are not subscribers */ |
226 | 1.35M | if (!s) { |
227 | 72.8k | return; |
228 | 72.8k | } |
229 | | |
230 | | /* For all topics, unsubscribe */ |
231 | 11.9M | for (Topic *topicPtr : s->topics) { |
232 | | /* If we are the last subscriber, simply remove the whole topic */ |
233 | 11.9M | if (topicPtr->size() == 1) { |
234 | 10.9M | topics.erase(topicPtr->name); |
235 | 10.9M | } else { |
236 | | /* Otherwise just remove us */ |
237 | 1.02M | topicPtr->erase(s); |
238 | 1.02M | } |
239 | 11.9M | } |
240 | | |
241 | | /* We also need to unlink us */ |
242 | 1.27M | if (s->needsDrainage()) { |
243 | 71.3k | unlinkDrainableSubscriber(s); |
244 | 71.3k | } |
245 | | |
246 | 1.27M | delete s; |
247 | 1.27M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::freeSubscriber(uWS::Subscriber*) Line | Count | Source | 223 | 1.06M | void freeSubscriber(Subscriber *s) { | 224 | | | 225 | | /* I guess we call this one even if we are not subscribers */ | 226 | 1.06M | if (!s) { | 227 | 0 | return; | 228 | 0 | } | 229 | | | 230 | | /* For all topics, unsubscribe */ | 231 | 1.07M | for (Topic *topicPtr : s->topics) { | 232 | | /* If we are the last subscriber, simply remove the whole topic */ | 233 | 1.07M | if (topicPtr->size() == 1) { | 234 | 133k | topics.erase(topicPtr->name); | 235 | 945k | } else { | 236 | | /* Otherwise just remove us */ | 237 | 945k | topicPtr->erase(s); | 238 | 945k | } | 239 | 1.07M | } | 240 | | | 241 | | /* We also need to unlink us */ | 242 | 1.06M | if (s->needsDrainage()) { | 243 | 6.18k | unlinkDrainableSubscriber(s); | 244 | 6.18k | } | 245 | | | 246 | 1.06M | delete s; | 247 | 1.06M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::freeSubscriber(uWS::Subscriber*) Line | Count | Source | 223 | 286k | void freeSubscriber(Subscriber *s) { | 224 | | | 225 | | /* I guess we call this one even if we are not subscribers */ | 226 | 286k | if (!s) { | 227 | 72.8k | return; | 228 | 72.8k | } | 229 | | | 230 | | /* For all topics, unsubscribe */ | 231 | 10.8M | for (Topic *topicPtr : s->topics) { | 232 | | /* If we are the last subscriber, simply remove the whole topic */ | 233 | 10.8M | if (topicPtr->size() == 1) { | 234 | 10.7M | topics.erase(topicPtr->name); | 235 | 10.7M | } else { | 236 | | /* Otherwise just remove us */ | 237 | 82.6k | topicPtr->erase(s); | 238 | 82.6k | } | 239 | 10.8M | } | 240 | | | 241 | | /* We also need to unlink us */ | 242 | 213k | if (s->needsDrainage()) { | 243 | 65.1k | unlinkDrainableSubscriber(s); | 244 | 65.1k | } | 245 | | | 246 | 213k | delete s; | 247 | 213k | } |
|
248 | | |
249 | | /* Mainly used by WebSocket::send to drain one socket before sending */ |
250 | 239k | void drain(Subscriber *s) { |
251 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ |
252 | 239k | if (s->needsDrainage()) { |
253 | | /* This function differs from drainImpl by properly unlinking |
254 | | * the subscriber from drainableSubscribers. drainImpl does not. */ |
255 | 91.7k | unlinkDrainableSubscriber(s); |
256 | | |
257 | | /* This one always resets needsDrainage before it calls any cb's. |
258 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ |
259 | 91.7k | drainImpl(s); |
260 | | |
261 | | /* If we drained last subscriber, also clear outgoingMessages */ |
262 | 91.7k | if (!drainableSubscribers) { |
263 | 4.72k | outgoingMessages.clear(); |
264 | 4.72k | } |
265 | 91.7k | } |
266 | 239k | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain(uWS::Subscriber*) Line | Count | Source | 250 | 84.6k | void drain(Subscriber *s) { | 251 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ | 252 | 84.6k | if (s->needsDrainage()) { | 253 | | /* This function differs from drainImpl by properly unlinking | 254 | | * the subscriber from drainableSubscribers. drainImpl does not. */ | 255 | 84.2k | unlinkDrainableSubscriber(s); | 256 | | | 257 | | /* This one always resets needsDrainage before it calls any cb's. | 258 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ | 259 | 84.2k | drainImpl(s); | 260 | | | 261 | | /* If we drained last subscriber, also clear outgoingMessages */ | 262 | 84.2k | if (!drainableSubscribers) { | 263 | 680 | outgoingMessages.clear(); | 264 | 680 | } | 265 | 84.2k | } | 266 | 84.6k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain(uWS::Subscriber*) Line | Count | Source | 250 | 155k | void drain(Subscriber *s) { | 251 | | /* The list is undefined and cannot be touched unless needsDrainage(). */ | 252 | 155k | if (s->needsDrainage()) { | 253 | | /* This function differs from drainImpl by properly unlinking | 254 | | * the subscriber from drainableSubscribers. drainImpl does not. */ | 255 | 7.50k | unlinkDrainableSubscriber(s); | 256 | | | 257 | | /* This one always resets needsDrainage before it calls any cb's. | 258 | | * Otherwise we would stackoverflow when sending after publish but before drain. */ | 259 | 7.50k | drainImpl(s); | 260 | | | 261 | | /* If we drained last subscriber, also clear outgoingMessages */ | 262 | 7.50k | if (!drainableSubscribers) { | 263 | 4.04k | outgoingMessages.clear(); | 264 | 4.04k | } | 265 | 7.50k | } | 266 | 155k | } |
|
267 | | |
268 | | /* Called everytime we call send, to drain published messages so to sync outgoing messages */ |
269 | 17.0M | void drain() { |
270 | 17.0M | if (drainableSubscribers) { |
271 | | /* Drain one socket a time */ |
272 | 147k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { |
273 | | /* Instead of unlinking every single subscriber, we just leave the list undefined |
274 | | * and reset drainableSubscribers ptr below. */ |
275 | 130k | drainImpl(s); |
276 | 130k | } |
277 | | /* Drain always clears drainableSubscribers and outgoingMessages */ |
278 | 16.9k | drainableSubscribers = nullptr; |
279 | 16.9k | outgoingMessages.clear(); |
280 | 16.9k | } |
281 | 17.0M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::drain() Line | Count | Source | 269 | 202k | void drain() { | 270 | 202k | if (drainableSubscribers) { | 271 | | /* Drain one socket a time */ | 272 | 122k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { | 273 | | /* Instead of unlinking every single subscriber, we just leave the list undefined | 274 | | * and reset drainableSubscribers ptr below. */ | 275 | 116k | drainImpl(s); | 276 | 116k | } | 277 | | /* Drain always clears drainableSubscribers and outgoingMessages */ | 278 | 6.23k | drainableSubscribers = nullptr; | 279 | 6.23k | outgoingMessages.clear(); | 280 | 6.23k | } | 281 | 202k | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain() Line | Count | Source | 269 | 16.8M | void drain() { | 270 | 16.8M | if (drainableSubscribers) { | 271 | | /* Drain one socket a time */ | 272 | 24.3k | for (Subscriber *s = drainableSubscribers; s; s = s->next) { | 273 | | /* Instead of unlinking every single subscriber, we just leave the list undefined | 274 | | * and reset drainableSubscribers ptr below. */ | 275 | 13.7k | drainImpl(s); | 276 | 13.7k | } | 277 | | /* Drain always clears drainableSubscribers and outgoingMessages */ | 278 | 10.6k | drainableSubscribers = nullptr; | 279 | 10.6k | outgoingMessages.clear(); | 280 | 10.6k | } | 281 | 16.8M | } |
|
282 | | |
283 | | /* Big messages bypass all buffering and land directly in backpressure */ |
284 | | template <typename F> |
285 | 0 | bool publishBig(Subscriber *sender, std::string_view topic, B &&bigMessage, F cb) { |
286 | | /* Do we even have this topic? */ |
287 | 0 | auto it = topics.find(topic); |
288 | 0 | if (it == topics.end()) { |
289 | 0 | return false; |
290 | 0 | } |
291 | | |
292 | | /* For all subscribers in topic */ |
293 | 0 | for (Subscriber *s : *it->second) { |
294 | | |
295 | | /* If we are sender then ignore us */ |
296 | 0 | if (sender != s) { |
297 | 0 | cb(s, bigMessage); |
298 | 0 | } |
299 | 0 | } |
300 | |
|
301 | 0 | return true; |
302 | 0 | } Unexecuted instantiation: bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::TemplatedApp<true>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}) Unexecuted instantiation: EpollEchoServer.cpp:bool uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publishBig<uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}>(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeBigMessage&&, uWS::WebSocket<false, true, test()::PerSocketData>::publish(std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, bool)::{lambda(uWS::Subscriber*, uWS::TopicTreeBigMessage&)#1}) |
303 | | |
304 | | /* Linear in number of affected subscribers */ |
305 | 2.32M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { |
306 | | /* Do we even have this topic? */ |
307 | 2.32M | auto it = topics.find(topic); |
308 | 2.32M | if (it == topics.end()) { |
309 | 22.9k | return false; |
310 | 22.9k | } |
311 | | |
312 | | /* If we have more than 65k messages we need to drain every socket. */ |
313 | 2.30M | if (outgoingMessages.size() == UINT16_MAX) { |
314 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain |
315 | | * to their own backpressure */ |
316 | 7 | drain(); |
317 | 7 | } |
318 | | |
319 | | /* If nobody references this message, don't buffer it */ |
320 | 2.30M | bool referencedMessage = false; |
321 | | |
322 | | /* For all subscribers in topic */ |
323 | 4.20M | for (Subscriber *s : *it->second) { |
324 | | |
325 | | /* If we are sender then ignore us */ |
326 | 4.20M | if (sender != s) { |
327 | | |
328 | | /* At least one subscriber wants this message */ |
329 | 4.17M | referencedMessage = true; |
330 | | |
331 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ |
332 | 4.17M | if (s->numMessageIndices == 32) { |
333 | | /* This one does not need to check needsDrainage here but still does. */ |
334 | 85.1k | drain(s); |
335 | 85.1k | } |
336 | | |
337 | | /* Finally we can continue */ |
338 | 4.17M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); |
339 | | /* First message adds subscriber to list of drainable subscribers */ |
340 | 4.17M | if (s->numMessageIndices == 1) { |
341 | | /* Insert us in the head of drainable subscribers */ |
342 | 293k | s->next = drainableSubscribers; |
343 | 293k | s->prev = nullptr; |
344 | 293k | if (s->next) { |
345 | 226k | s->next->prev = s; |
346 | 226k | } |
347 | 293k | drainableSubscribers = s; |
348 | 293k | } |
349 | 4.17M | } |
350 | 4.20M | } |
351 | | |
352 | | /* Push this message and return with success */ |
353 | 2.30M | if (referencedMessage) { |
354 | 2.30M | outgoingMessages.emplace_back(message); |
355 | 2.30M | } |
356 | | |
357 | | /* Success if someone wants it */ |
358 | 2.30M | return referencedMessage; |
359 | 2.32M | } uWS::TopicTree<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string_view<char, std::__1::char_traits<char> > >::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&&) Line | Count | Source | 305 | 1.28M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { | 306 | | /* Do we even have this topic? */ | 307 | 1.28M | auto it = topics.find(topic); | 308 | 1.28M | if (it == topics.end()) { | 309 | 13.4k | return false; | 310 | 13.4k | } | 311 | | | 312 | | /* If we have more than 65k messages we need to drain every socket. */ | 313 | 1.27M | if (outgoingMessages.size() == UINT16_MAX) { | 314 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain | 315 | | * to their own backpressure */ | 316 | 3 | drain(); | 317 | 3 | } | 318 | | | 319 | | /* If nobody references this message, don't buffer it */ | 320 | 1.27M | bool referencedMessage = false; | 321 | | | 322 | | /* For all subscribers in topic */ | 323 | 3.01M | for (Subscriber *s : *it->second) { | 324 | | | 325 | | /* If we are sender then ignore us */ | 326 | 3.01M | if (sender != s) { | 327 | | | 328 | | /* At least one subscriber wants this message */ | 329 | 3.01M | referencedMessage = true; | 330 | | | 331 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ | 332 | 3.01M | if (s->numMessageIndices == 32) { | 333 | | /* This one does not need to check needsDrainage here but still does. */ | 334 | 82.4k | drain(s); | 335 | 82.4k | } | 336 | | | 337 | | /* Finally we can continue */ | 338 | 3.01M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); | 339 | | /* First message adds subscriber to list of drainable subscribers */ | 340 | 3.01M | if (s->numMessageIndices == 1) { | 341 | | /* Insert us in the head of drainable subscribers */ | 342 | 207k | s->next = drainableSubscribers; | 343 | 207k | s->prev = nullptr; | 344 | 207k | if (s->next) { | 345 | 198k | s->next->prev = s; | 346 | 198k | } | 347 | 207k | drainableSubscribers = s; | 348 | 207k | } | 349 | 3.01M | } | 350 | 3.01M | } | 351 | | | 352 | | /* Push this message and return with success */ | 353 | 1.27M | if (referencedMessage) { | 354 | 1.27M | outgoingMessages.emplace_back(message); | 355 | 1.27M | } | 356 | | | 357 | | /* Success if someone wants it */ | 358 | 1.27M | return referencedMessage; | 359 | 1.28M | } |
uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::publish(uWS::Subscriber*, std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::TopicTreeMessage&&) Line | Count | Source | 305 | 1.04M | bool publish(Subscriber *sender, std::string_view topic, T &&message) { | 306 | | /* Do we even have this topic? */ | 307 | 1.04M | auto it = topics.find(topic); | 308 | 1.04M | if (it == topics.end()) { | 309 | 9.43k | return false; | 310 | 9.43k | } | 311 | | | 312 | | /* If we have more than 65k messages we need to drain every socket. */ | 313 | 1.03M | if (outgoingMessages.size() == UINT16_MAX) { | 314 | | /* If there is a socket that is currently corked, this will be ugly as all sockets will drain | 315 | | * to their own backpressure */ | 316 | 4 | drain(); | 317 | 4 | } | 318 | | | 319 | | /* If nobody references this message, don't buffer it */ | 320 | 1.03M | bool referencedMessage = false; | 321 | | | 322 | | /* For all subscribers in topic */ | 323 | 1.19M | for (Subscriber *s : *it->second) { | 324 | | | 325 | | /* If we are sender then ignore us */ | 326 | 1.19M | if (sender != s) { | 327 | | | 328 | | /* At least one subscriber wants this message */ | 329 | 1.15M | referencedMessage = true; | 330 | | | 331 | | /* If we already have too many outgoing messages on this subscriber, drain it now */ | 332 | 1.15M | if (s->numMessageIndices == 32) { | 333 | | /* This one does not need to check needsDrainage here but still does. */ | 334 | 2.70k | drain(s); | 335 | 2.70k | } | 336 | | | 337 | | /* Finally we can continue */ | 338 | 1.15M | s->messageIndices[s->numMessageIndices++] = (uint16_t)outgoingMessages.size(); | 339 | | /* First message adds subscriber to list of drainable subscribers */ | 340 | 1.15M | if (s->numMessageIndices == 1) { | 341 | | /* Insert us in the head of drainable subscribers */ | 342 | 86.3k | s->next = drainableSubscribers; | 343 | 86.3k | s->prev = nullptr; | 344 | 86.3k | if (s->next) { | 345 | 27.1k | s->next->prev = s; | 346 | 27.1k | } | 347 | 86.3k | drainableSubscribers = s; | 348 | 86.3k | } | 349 | 1.15M | } | 350 | 1.19M | } | 351 | | | 352 | | /* Push this message and return with success */ | 353 | 1.03M | if (referencedMessage) { | 354 | 1.02M | outgoingMessages.emplace_back(message); | 355 | 1.02M | } | 356 | | | 357 | | /* Success if someone wants it */ | 358 | 1.03M | return referencedMessage; | 359 | 1.04M | } |
|
360 | | }; |
361 | | |
362 | | } |
363 | | |
364 | | #endif |