/src/uWebSockets/src/WebSocket.h
Line | Count | Source |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_WEBSOCKET_H |
19 | | #define UWS_WEBSOCKET_H |
20 | | |
21 | | #include "WebSocketData.h" |
22 | | #include "WebSocketProtocol.h" |
23 | | #include "AsyncSocket.h" |
24 | | #include "WebSocketContextData.h" |
25 | | |
26 | | #include <string_view> |
27 | | |
28 | | namespace uWS { |
29 | | |
30 | | /* Experimental */ |
31 | | enum CompressFlags : int { |
32 | | NO_ACTION, |
33 | | COMPRESS, |
34 | | ALREADY_COMPRESSED |
35 | | }; |
36 | | |
37 | | template <bool SSL, bool isServer, typename USERDATA> |
38 | | struct WebSocket : AsyncSocket<SSL> { |
39 | | template <bool> friend struct TemplatedApp; |
40 | | template <bool> friend struct HttpResponse; |
41 | | private: |
42 | | typedef AsyncSocket<SSL> Super; |
43 | | |
44 | 286k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { |
45 | 286k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); |
46 | 286k | return this; |
47 | 286k | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 107k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 107k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 107k | return this; | 47 | 107k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 35.7k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 35.7k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 35.7k | return this; | 47 | 35.7k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 142k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 142k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 142k | return this; | 47 | 142k | } |
|
48 | | public: |
49 | | |
50 | | /* Returns pointer to the per socket user data */ |
51 | 1.75M | USERDATA *getUserData() { |
52 | 1.75M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
53 | | /* We just have it overallocated by sizeof type */ |
54 | 1.75M | return (USERDATA *) (webSocketData + 1); |
55 | 1.75M | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 1.31M | USERDATA *getUserData() { | 52 | 1.31M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 1.31M | return (USERDATA *) (webSocketData + 1); | 55 | 1.31M | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 71.4k | USERDATA *getUserData() { | 52 | 71.4k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 71.4k | return (USERDATA *) (webSocketData + 1); | 55 | 71.4k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 368k | USERDATA *getUserData() { | 52 | 368k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 368k | return (USERDATA *) (webSocketData + 1); | 55 | 368k | } |
|
56 | | |
57 | | /* See AsyncSocket */ |
58 | | using Super::getBufferedAmount; |
59 | | using Super::getRemoteAddress; |
60 | | using Super::getRemoteAddressAsText; |
61 | | using Super::getNativeHandle; |
62 | | |
63 | | /* WebSocket close cannot be an alias to AsyncSocket::close since |
64 | | * we need to check first if it was shut down by remote peer */ |
65 | 106k | us_socket_t *close() { |
66 | 106k | if (us_socket_is_closed(SSL, (us_socket_t *) this)) { |
67 | 104k | return nullptr; |
68 | 104k | } |
69 | 2.30k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
70 | 2.30k | if (webSocketData->isShuttingDown) { |
71 | 1.70k | return nullptr; |
72 | 1.70k | } |
73 | | |
74 | 598 | return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr); |
75 | 2.30k | } |
76 | | |
77 | | enum SendStatus : int { |
78 | | BACKPRESSURE, |
79 | | SUCCESS, |
80 | | DROPPED |
81 | | }; |
82 | | |
83 | | /* Sending fragmented messages puts a bit of effort on the user; you must not interleave regular sends |
84 | | * with fragmented sends and you must sendFirstFragment, [sendFragment], then finally sendLastFragment. */ |
85 | | SendStatus sendFirstFragment(std::string_view message, OpCode opCode = OpCode::BINARY, bool compress = false) { |
86 | | return send(message, opCode, compress, false); |
87 | | } |
88 | | |
89 | | SendStatus sendFragment(std::string_view message, bool compress = false) { |
90 | | return send(message, CONTINUATION, compress, false); |
91 | | } |
92 | | |
93 | | SendStatus sendLastFragment(std::string_view message, bool compress = false) { |
94 | | return send(message, CONTINUATION, compress, true); |
95 | | } |
96 | | |
97 | | /* Experimental */ |
98 | | bool hasNegotiatedCompression() { |
99 | | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
100 | | return webSocketData->compressionStatus == WebSocketData::ENABLED; |
101 | | } |
102 | | |
103 | | /* Experimental */ |
104 | | SendStatus sendPrepared(PreparedMessage &preparedMessage) { |
105 | | if (preparedMessage.compressed && hasNegotiatedCompression() && preparedMessage.compressedMessage.length() < preparedMessage.originalMessage.length()) { |
106 | | return send({preparedMessage.compressedMessage.data(), preparedMessage.compressedMessage.length()}, (OpCode) preparedMessage.opCode, uWS::CompressFlags::ALREADY_COMPRESSED); |
107 | | } |
108 | | return send({preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, (OpCode) preparedMessage.opCode); |
109 | | } |
110 | | |
111 | | /* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure, |
112 | | * DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */ |
113 | 216k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { |
114 | 216k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
115 | 216k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
116 | 216k | ); |
117 | | |
118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ |
119 | 216k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { |
120 | | /* Also defer a close if we should */ |
121 | 1.96k | if (webSocketContextData->closeOnBackpressureLimit) { |
122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); |
123 | 0 | } |
124 | | |
125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ |
126 | 1.96k | if (webSocketContextData->droppedHandler) { |
127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); |
128 | 0 | } |
129 | | |
130 | 1.96k | return DROPPED; |
131 | 1.96k | } |
132 | | |
133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ |
134 | 214k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
135 | | |
136 | | /* Special path for long sends of non-compressed, non-SSL messages */ |
137 | 214k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { |
138 | 0 | char header[10]; |
139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); |
140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); |
141 | | |
142 | 0 | if (written != header_length + (int) message.length()) { |
143 | | /* Buffer up backpressure */ |
144 | 0 | if (written > header_length) { |
145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); |
146 | 0 | } else { |
147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); |
148 | 0 | webSocketData->buffer.append(message.data(), message.length()); |
149 | 0 | } |
150 | | /* We cannot still be corked if we have backpressure. |
151 | | * We also cannot uncork normally since it will re-write the already buffered |
152 | | * up backpressure again. */ |
153 | 0 | Super::uncorkWithoutSending(); |
154 | 0 | return BACKPRESSURE; |
155 | 0 | } |
156 | 214k | } else { |
157 | | |
158 | 214k | if (webSocketData->subscriber) { |
159 | | /* This will call back into us, send. */ |
160 | 152k | webSocketContextData->topicTree->drain(webSocketData->subscriber); |
161 | 152k | } |
162 | | |
163 | | /* Transform the message to compressed domain if requested */ |
164 | 214k | if (compress) { |
165 | 137k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
166 | | |
167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ |
168 | 137k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { |
169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ |
170 | 11.5k | if (compress != CompressFlags::ALREADY_COMPRESSED) { |
171 | 11.5k | LoopData *loopData = Super::getLoopData(); |
172 | | /* Compress using either shared or dedicated deflationStream */ |
173 | 11.5k | if (webSocketData->deflationStream) { |
174 | 7.65k | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); |
175 | 7.65k | } else { |
176 | 3.90k | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); |
177 | 3.90k | } |
178 | 11.5k | } |
179 | 125k | } else { |
180 | 125k | compress = false; |
181 | 125k | } |
182 | 137k | } |
183 | | |
184 | | /* Get size, allocate size, write if needed */ |
185 | 214k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); |
186 | 214k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); |
187 | 214k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); |
188 | | |
189 | | /* Depending on size of message we have different paths */ |
190 | 214k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { |
191 | | /* This is a drain */ |
192 | 161k | auto[written, failed] = Super::write(nullptr, 0); |
193 | 161k | if (failed) { |
194 | | /* Return false for failure, skipping to reset the timeout below */ |
195 | 153k | return BACKPRESSURE; |
196 | 153k | } |
197 | 161k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { |
198 | | /* Uncork if we came here uncorked */ |
199 | 3.05k | auto [written, failed] = Super::uncork(); |
200 | 3.05k | if (failed) { |
201 | 1.28k | return BACKPRESSURE; |
202 | 1.28k | } |
203 | 3.05k | } |
204 | | |
205 | 214k | } |
206 | | |
207 | | /* Every successful send resets the timeout */ |
208 | 59.6k | if (webSocketContextData->resetIdleTimeoutOnSend) { |
209 | 8.86k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); |
210 | 8.86k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
211 | 8.86k | webSocketData->hasTimedOut = false; |
212 | 8.86k | } |
213 | | |
214 | | /* Return success */ |
215 | 59.6k | return SUCCESS; |
216 | 214k | } uWS::WebSocket<true, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 113 | 28.5k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 114 | 28.5k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 115 | 28.5k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 116 | 28.5k | ); | 117 | | | 118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 119 | 28.5k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 120 | | /* Also defer a close if we should */ | 121 | 1.04k | if (webSocketContextData->closeOnBackpressureLimit) { | 122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 123 | 0 | } | 124 | | | 125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 126 | 1.04k | if (webSocketContextData->droppedHandler) { | 127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 128 | 0 | } | 129 | | | 130 | 1.04k | return DROPPED; | 131 | 1.04k | } | 132 | | | 133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 134 | 27.4k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 135 | | | 136 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 137 | 27.4k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 138 | 0 | char header[10]; | 139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 141 | | | 142 | 0 | if (written != header_length + (int) message.length()) { | 143 | | /* Buffer up backpressure */ | 144 | 0 | if (written > header_length) { | 145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 146 | 0 | } else { | 147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 148 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 149 | 0 | } | 150 | | /* We cannot still be corked if we have backpressure. | 151 | | * We also cannot uncork normally since it will re-write the already buffered | 152 | | * up backpressure again. */ | 153 | 0 | Super::uncorkWithoutSending(); | 154 | 0 | return BACKPRESSURE; | 155 | 0 | } | 156 | 27.4k | } else { | 157 | | | 158 | 27.4k | if (webSocketData->subscriber) { | 159 | | /* This will call back into us, send. */ | 160 | 27.4k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 161 | 27.4k | } | 162 | | | 163 | | /* Transform the message to compressed domain if requested */ | 164 | 27.4k | if (compress) { | 165 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 166 | | | 167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 168 | 0 | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 170 | 0 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 171 | 0 | LoopData *loopData = Super::getLoopData(); | 172 | | /* Compress using either shared or dedicated deflationStream */ | 173 | 0 | if (webSocketData->deflationStream) { | 174 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 175 | 0 | } else { | 176 | 0 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 177 | 0 | } | 178 | 0 | } | 179 | 0 | } else { | 180 | 0 | compress = false; | 181 | 0 | } | 182 | 0 | } | 183 | | | 184 | | /* Get size, allocate size, write if needed */ | 185 | 27.4k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 186 | 27.4k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 187 | 27.4k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 188 | | | 189 | | /* Depending on size of message we have different paths */ | 190 | 27.4k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 191 | | /* This is a drain */ | 192 | 22.6k | auto[written, failed] = Super::write(nullptr, 0); | 193 | 22.6k | if (failed) { | 194 | | /* Return false for failure, skipping to reset the timeout below */ | 195 | 21.8k | return BACKPRESSURE; | 196 | 21.8k | } | 197 | 22.6k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 198 | | /* Uncork if we came here uncorked */ | 199 | 0 | auto [written, failed] = Super::uncork(); | 200 | 0 | if (failed) { | 201 | 0 | return BACKPRESSURE; | 202 | 0 | } | 203 | 0 | } | 204 | | | 205 | 27.4k | } | 206 | | | 207 | | /* Every successful send resets the timeout */ | 208 | 5.61k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 209 | 5.61k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 210 | 5.61k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 211 | 5.61k | webSocketData->hasTimedOut = false; | 212 | 5.61k | } | 213 | | | 214 | | /* Return success */ | 215 | 5.61k | return SUCCESS; | 216 | 27.4k | } |
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 113 | 15.9k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 114 | 15.9k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 115 | 15.9k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 116 | 15.9k | ); | 117 | | | 118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 119 | 15.9k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 120 | | /* Also defer a close if we should */ | 121 | 460 | if (webSocketContextData->closeOnBackpressureLimit) { | 122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 123 | 0 | } | 124 | | | 125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 126 | 460 | if (webSocketContextData->droppedHandler) { | 127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 128 | 0 | } | 129 | | | 130 | 460 | return DROPPED; | 131 | 460 | } | 132 | | | 133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 134 | 15.4k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 135 | | | 136 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 137 | 15.4k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 138 | 0 | char header[10]; | 139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 141 | | | 142 | 0 | if (written != header_length + (int) message.length()) { | 143 | | /* Buffer up backpressure */ | 144 | 0 | if (written > header_length) { | 145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 146 | 0 | } else { | 147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 148 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 149 | 0 | } | 150 | | /* We cannot still be corked if we have backpressure. | 151 | | * We also cannot uncork normally since it will re-write the already buffered | 152 | | * up backpressure again. */ | 153 | 0 | Super::uncorkWithoutSending(); | 154 | 0 | return BACKPRESSURE; | 155 | 0 | } | 156 | 15.4k | } else { | 157 | | | 158 | 15.4k | if (webSocketData->subscriber) { | 159 | | /* This will call back into us, send. */ | 160 | 15.4k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 161 | 15.4k | } | 162 | | | 163 | | /* Transform the message to compressed domain if requested */ | 164 | 15.4k | if (compress) { | 165 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 166 | | | 167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 168 | 0 | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 170 | 0 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 171 | 0 | LoopData *loopData = Super::getLoopData(); | 172 | | /* Compress using either shared or dedicated deflationStream */ | 173 | 0 | if (webSocketData->deflationStream) { | 174 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 175 | 0 | } else { | 176 | 0 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 177 | 0 | } | 178 | 0 | } | 179 | 0 | } else { | 180 | 0 | compress = false; | 181 | 0 | } | 182 | 0 | } | 183 | | | 184 | | /* Get size, allocate size, write if needed */ | 185 | 15.4k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 186 | 15.4k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 187 | 15.4k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 188 | | | 189 | | /* Depending on size of message we have different paths */ | 190 | 15.4k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 191 | | /* This is a drain */ | 192 | 13.2k | auto[written, failed] = Super::write(nullptr, 0); | 193 | 13.2k | if (failed) { | 194 | | /* Return false for failure, skipping to reset the timeout below */ | 195 | 11.8k | return BACKPRESSURE; | 196 | 11.8k | } | 197 | 13.2k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 198 | | /* Uncork if we came here uncorked */ | 199 | 1.35k | auto [written, failed] = Super::uncork(); | 200 | 1.35k | if (failed) { | 201 | 402 | return BACKPRESSURE; | 202 | 402 | } | 203 | 1.35k | } | 204 | | | 205 | 15.4k | } | 206 | | | 207 | | /* Every successful send resets the timeout */ | 208 | 3.25k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 209 | 3.25k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 210 | 3.25k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 211 | 3.25k | webSocketData->hasTimedOut = false; | 212 | 3.25k | } | 213 | | | 214 | | /* Return success */ | 215 | 3.25k | return SUCCESS; | 216 | 15.4k | } |
uWS::WebSocket<false, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 113 | 106k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 114 | 106k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 115 | 106k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 116 | 106k | ); | 117 | | | 118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 119 | 106k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 120 | | /* Also defer a close if we should */ | 121 | 0 | if (webSocketContextData->closeOnBackpressureLimit) { | 122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 123 | 0 | } | 124 | | | 125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 126 | 0 | if (webSocketContextData->droppedHandler) { | 127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 128 | 0 | } | 129 | |
| 130 | 0 | return DROPPED; | 131 | 0 | } | 132 | | | 133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 134 | 106k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 135 | | | 136 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 137 | 106k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 138 | 0 | char header[10]; | 139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 141 | | | 142 | 0 | if (written != header_length + (int) message.length()) { | 143 | | /* Buffer up backpressure */ | 144 | 0 | if (written > header_length) { | 145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 146 | 0 | } else { | 147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 148 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 149 | 0 | } | 150 | | /* We cannot still be corked if we have backpressure. | 151 | | * We also cannot uncork normally since it will re-write the already buffered | 152 | | * up backpressure again. */ | 153 | 0 | Super::uncorkWithoutSending(); | 154 | 0 | return BACKPRESSURE; | 155 | 0 | } | 156 | 106k | } else { | 157 | | | 158 | 106k | if (webSocketData->subscriber) { | 159 | | /* This will call back into us, send. */ | 160 | 106k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 161 | 106k | } | 162 | | | 163 | | /* Transform the message to compressed domain if requested */ | 164 | 106k | if (compress) { | 165 | 106k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 166 | | | 167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 168 | 106k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 170 | 194 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 171 | 194 | LoopData *loopData = Super::getLoopData(); | 172 | | /* Compress using either shared or dedicated deflationStream */ | 173 | 194 | if (webSocketData->deflationStream) { | 174 | 153 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 175 | 153 | } else { | 176 | 41 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 177 | 41 | } | 178 | 194 | } | 179 | 106k | } else { | 180 | 106k | compress = false; | 181 | 106k | } | 182 | 106k | } | 183 | | | 184 | | /* Get size, allocate size, write if needed */ | 185 | 106k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 186 | 106k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 187 | 106k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 188 | | | 189 | | /* Depending on size of message we have different paths */ | 190 | 106k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 191 | | /* This is a drain */ | 192 | 78.3k | auto[written, failed] = Super::write(nullptr, 0); | 193 | 78.3k | if (failed) { | 194 | | /* Return false for failure, skipping to reset the timeout below */ | 195 | 76.1k | return BACKPRESSURE; | 196 | 76.1k | } | 197 | 78.3k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 198 | | /* Uncork if we came here uncorked */ | 199 | 0 | auto [written, failed] = Super::uncork(); | 200 | 0 | if (failed) { | 201 | 0 | return BACKPRESSURE; | 202 | 0 | } | 203 | 0 | } | 204 | | | 205 | 106k | } | 206 | | | 207 | | /* Every successful send resets the timeout */ | 208 | 30.7k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 209 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 210 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 211 | 0 | webSocketData->hasTimedOut = false; | 212 | 0 | } | 213 | | | 214 | | /* Return success */ | 215 | 30.7k | return SUCCESS; | 216 | 106k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 113 | 25.3k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 114 | 25.3k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 115 | 25.3k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 116 | 25.3k | ); | 117 | | | 118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 119 | 25.3k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 120 | | /* Also defer a close if we should */ | 121 | 453 | if (webSocketContextData->closeOnBackpressureLimit) { | 122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 123 | 0 | } | 124 | | | 125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 126 | 453 | if (webSocketContextData->droppedHandler) { | 127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 128 | 0 | } | 129 | | | 130 | 453 | return DROPPED; | 131 | 453 | } | 132 | | | 133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 134 | 24.9k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 135 | | | 136 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 137 | 24.9k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 138 | 0 | char header[10]; | 139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 141 | | | 142 | 0 | if (written != header_length + (int) message.length()) { | 143 | | /* Buffer up backpressure */ | 144 | 0 | if (written > header_length) { | 145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 146 | 0 | } else { | 147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 148 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 149 | 0 | } | 150 | | /* We cannot still be corked if we have backpressure. | 151 | | * We also cannot uncork normally since it will re-write the already buffered | 152 | | * up backpressure again. */ | 153 | 0 | Super::uncorkWithoutSending(); | 154 | 0 | return BACKPRESSURE; | 155 | 0 | } | 156 | 24.9k | } else { | 157 | | | 158 | 24.9k | if (webSocketData->subscriber) { | 159 | | /* This will call back into us, send. */ | 160 | 0 | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 161 | 0 | } | 162 | | | 163 | | /* Transform the message to compressed domain if requested */ | 164 | 24.9k | if (compress) { | 165 | 10.9k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 166 | | | 167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 168 | 10.9k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 170 | 2.78k | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 171 | 2.78k | LoopData *loopData = Super::getLoopData(); | 172 | | /* Compress using either shared or dedicated deflationStream */ | 173 | 2.78k | if (webSocketData->deflationStream) { | 174 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 175 | 2.78k | } else { | 176 | 2.78k | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 177 | 2.78k | } | 178 | 2.78k | } | 179 | 8.15k | } else { | 180 | 8.15k | compress = false; | 181 | 8.15k | } | 182 | 10.9k | } | 183 | | | 184 | | /* Get size, allocate size, write if needed */ | 185 | 24.9k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 186 | 24.9k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 187 | 24.9k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 188 | | | 189 | | /* Depending on size of message we have different paths */ | 190 | 24.9k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 191 | | /* This is a drain */ | 192 | 18.6k | auto[written, failed] = Super::write(nullptr, 0); | 193 | 18.6k | if (failed) { | 194 | | /* Return false for failure, skipping to reset the timeout below */ | 195 | 16.9k | return BACKPRESSURE; | 196 | 16.9k | } | 197 | 18.6k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 198 | | /* Uncork if we came here uncorked */ | 199 | 0 | auto [written, failed] = Super::uncork(); | 200 | 0 | if (failed) { | 201 | 0 | return BACKPRESSURE; | 202 | 0 | } | 203 | 0 | } | 204 | | | 205 | 24.9k | } | 206 | | | 207 | | /* Every successful send resets the timeout */ | 208 | 7.96k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 209 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 210 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 211 | 0 | webSocketData->hasTimedOut = false; | 212 | 0 | } | 213 | | | 214 | | /* Return success */ | 215 | 7.96k | return SUCCESS; | 216 | 24.9k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 113 | 39.9k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 114 | 39.9k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 115 | 39.9k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 116 | 39.9k | ); | 117 | | | 118 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 119 | 39.9k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 120 | | /* Also defer a close if we should */ | 121 | 0 | if (webSocketContextData->closeOnBackpressureLimit) { | 122 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 123 | 0 | } | 124 | | | 125 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 126 | 0 | if (webSocketContextData->droppedHandler) { | 127 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 128 | 0 | } | 129 | |
| 130 | 0 | return DROPPED; | 131 | 0 | } | 132 | | | 133 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 134 | 39.9k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 135 | | | 136 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 137 | 39.9k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 138 | 0 | char header[10]; | 139 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 140 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 141 | | | 142 | 0 | if (written != header_length + (int) message.length()) { | 143 | | /* Buffer up backpressure */ | 144 | 0 | if (written > header_length) { | 145 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 146 | 0 | } else { | 147 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 148 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 149 | 0 | } | 150 | | /* We cannot still be corked if we have backpressure. | 151 | | * We also cannot uncork normally since it will re-write the already buffered | 152 | | * up backpressure again. */ | 153 | 0 | Super::uncorkWithoutSending(); | 154 | 0 | return BACKPRESSURE; | 155 | 0 | } | 156 | 39.9k | } else { | 157 | | | 158 | 39.9k | if (webSocketData->subscriber) { | 159 | | /* This will call back into us, send. */ | 160 | 2.68k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 161 | 2.68k | } | 162 | | | 163 | | /* Transform the message to compressed domain if requested */ | 164 | 39.9k | if (compress) { | 165 | 19.2k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 166 | | | 167 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 168 | 19.2k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 169 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 170 | 8.58k | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 171 | 8.58k | LoopData *loopData = Super::getLoopData(); | 172 | | /* Compress using either shared or dedicated deflationStream */ | 173 | 8.58k | if (webSocketData->deflationStream) { | 174 | 7.50k | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 175 | 7.50k | } else { | 176 | 1.08k | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 177 | 1.08k | } | 178 | 8.58k | } | 179 | 10.7k | } else { | 180 | 10.7k | compress = false; | 181 | 10.7k | } | 182 | 19.2k | } | 183 | | | 184 | | /* Get size, allocate size, write if needed */ | 185 | 39.9k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 186 | 39.9k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 187 | 39.9k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 188 | | | 189 | | /* Depending on size of message we have different paths */ | 190 | 39.9k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 191 | | /* This is a drain */ | 192 | 28.9k | auto[written, failed] = Super::write(nullptr, 0); | 193 | 28.9k | if (failed) { | 194 | | /* Return false for failure, skipping to reset the timeout below */ | 195 | 26.9k | return BACKPRESSURE; | 196 | 26.9k | } | 197 | 28.9k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 198 | | /* Uncork if we came here uncorked */ | 199 | 1.69k | auto [written, failed] = Super::uncork(); | 200 | 1.69k | if (failed) { | 201 | 883 | return BACKPRESSURE; | 202 | 883 | } | 203 | 1.69k | } | 204 | | | 205 | 39.9k | } | 206 | | | 207 | | /* Every successful send resets the timeout */ | 208 | 12.1k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 209 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 210 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 211 | 0 | webSocketData->hasTimedOut = false; | 212 | 0 | } | 213 | | | 214 | | /* Return success */ | 215 | 12.1k | return SUCCESS; | 216 | 39.9k | } |
|
217 | | |
218 | | /* Send websocket close frame, emit close event, send FIN if successful. |
219 | | * Will not append a close reason if code is 0 or 1005. */ |
220 | 26.7k | void end(int code = 0, std::string_view message = {}) { |
221 | | /* Check if we already called this one */ |
222 | 26.7k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
223 | 26.7k | if (webSocketData->isShuttingDown) { |
224 | 0 | return; |
225 | 0 | } |
226 | | |
227 | | /* We postpone any FIN sending to either drainage or uncorking */ |
228 | 26.7k | webSocketData->isShuttingDown = true; |
229 | | |
230 | | /* Format and send the close frame */ |
231 | 26.7k | static const int MAX_CLOSE_PAYLOAD = 123; |
232 | 26.7k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); |
233 | 26.7k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; |
234 | 26.7k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); |
235 | 26.7k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); |
236 | | |
237 | | /* FIN if we are ok and not corked */ |
238 | 26.7k | if (!this->isCorked()) { |
239 | 3.33k | if (ok) { |
240 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. |
241 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ |
242 | 1.15k | this->shutdown(); |
243 | 1.15k | } |
244 | 3.33k | } |
245 | | |
246 | 26.7k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
247 | 26.7k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
248 | 26.7k | ); |
249 | | |
250 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ |
251 | 26.7k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); |
252 | | |
253 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ |
254 | 26.7k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { |
255 | 0 | for (Topic *t : webSocketData->subscriber->topics) { |
256 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | | /* Make sure to unsubscribe from any pub/sub node at exit */ |
261 | 26.7k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); |
262 | 26.7k | webSocketData->subscriber = nullptr; |
263 | | |
264 | | /* Emit close event */ |
265 | 26.7k | if (webSocketContextData->closeHandler) { |
266 | 26.5k | webSocketContextData->closeHandler(this, code, message); |
267 | 26.5k | } |
268 | 26.7k | ((USERDATA *) this->getUserData())->~USERDATA(); |
269 | 26.7k | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 220 | 11.5k | void end(int code = 0, std::string_view message = {}) { | 221 | | /* Check if we already called this one */ | 222 | 11.5k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 223 | 11.5k | if (webSocketData->isShuttingDown) { | 224 | 0 | return; | 225 | 0 | } | 226 | | | 227 | | /* We postpone any FIN sending to either drainage or uncorking */ | 228 | 11.5k | webSocketData->isShuttingDown = true; | 229 | | | 230 | | /* Format and send the close frame */ | 231 | 11.5k | static const int MAX_CLOSE_PAYLOAD = 123; | 232 | 11.5k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 233 | 11.5k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 234 | 11.5k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 235 | 11.5k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 236 | | | 237 | | /* FIN if we are ok and not corked */ | 238 | 11.5k | if (!this->isCorked()) { | 239 | 396 | if (ok) { | 240 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 241 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 242 | 196 | this->shutdown(); | 243 | 196 | } | 244 | 396 | } | 245 | | | 246 | 11.5k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 247 | 11.5k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 248 | 11.5k | ); | 249 | | | 250 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 251 | 11.5k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 252 | | | 253 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 254 | 11.5k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 255 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 256 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 257 | 0 | } | 258 | 0 | } | 259 | | | 260 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 261 | 11.5k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 262 | 11.5k | webSocketData->subscriber = nullptr; | 263 | | | 264 | | /* Emit close event */ | 265 | 11.5k | if (webSocketContextData->closeHandler) { | 266 | 11.5k | webSocketContextData->closeHandler(this, code, message); | 267 | 11.5k | } | 268 | 11.5k | ((USERDATA *) this->getUserData())->~USERDATA(); | 269 | 11.5k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 220 | 6.37k | void end(int code = 0, std::string_view message = {}) { | 221 | | /* Check if we already called this one */ | 222 | 6.37k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 223 | 6.37k | if (webSocketData->isShuttingDown) { | 224 | 0 | return; | 225 | 0 | } | 226 | | | 227 | | /* We postpone any FIN sending to either drainage or uncorking */ | 228 | 6.37k | webSocketData->isShuttingDown = true; | 229 | | | 230 | | /* Format and send the close frame */ | 231 | 6.37k | static const int MAX_CLOSE_PAYLOAD = 123; | 232 | 6.37k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 233 | 6.37k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 234 | 6.37k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 235 | 6.37k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 236 | | | 237 | | /* FIN if we are ok and not corked */ | 238 | 6.37k | if (!this->isCorked()) { | 239 | 0 | if (ok) { | 240 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 241 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 242 | 0 | this->shutdown(); | 243 | 0 | } | 244 | 0 | } | 245 | | | 246 | 6.37k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 247 | 6.37k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 248 | 6.37k | ); | 249 | | | 250 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 251 | 6.37k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 252 | | | 253 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 254 | 6.37k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 255 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 256 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 257 | 0 | } | 258 | 0 | } | 259 | | | 260 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 261 | 6.37k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 262 | 6.37k | webSocketData->subscriber = nullptr; | 263 | | | 264 | | /* Emit close event */ | 265 | 6.37k | if (webSocketContextData->closeHandler) { | 266 | 6.17k | webSocketContextData->closeHandler(this, code, message); | 267 | 6.17k | } | 268 | 6.37k | ((USERDATA *) this->getUserData())->~USERDATA(); | 269 | 6.37k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 220 | 8.88k | void end(int code = 0, std::string_view message = {}) { | 221 | | /* Check if we already called this one */ | 222 | 8.88k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 223 | 8.88k | if (webSocketData->isShuttingDown) { | 224 | 0 | return; | 225 | 0 | } | 226 | | | 227 | | /* We postpone any FIN sending to either drainage or uncorking */ | 228 | 8.88k | webSocketData->isShuttingDown = true; | 229 | | | 230 | | /* Format and send the close frame */ | 231 | 8.88k | static const int MAX_CLOSE_PAYLOAD = 123; | 232 | 8.88k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 233 | 8.88k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 234 | 8.88k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 235 | 8.88k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 236 | | | 237 | | /* FIN if we are ok and not corked */ | 238 | 8.88k | if (!this->isCorked()) { | 239 | 2.94k | if (ok) { | 240 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 241 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 242 | 962 | this->shutdown(); | 243 | 962 | } | 244 | 2.94k | } | 245 | | | 246 | 8.88k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 247 | 8.88k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 248 | 8.88k | ); | 249 | | | 250 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 251 | 8.88k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 252 | | | 253 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 254 | 8.88k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 255 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 256 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 257 | 0 | } | 258 | 0 | } | 259 | | | 260 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 261 | 8.88k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 262 | 8.88k | webSocketData->subscriber = nullptr; | 263 | | | 264 | | /* Emit close event */ | 265 | 8.88k | if (webSocketContextData->closeHandler) { | 266 | 8.88k | webSocketContextData->closeHandler(this, code, message); | 267 | 8.88k | } | 268 | 8.88k | ((USERDATA *) this->getUserData())->~USERDATA(); | 269 | 8.88k | } |
|
270 | | |
271 | | /* Corks the response if possible. Leaves already corked socket be. */ |
272 | | void cork(MoveOnlyFunction<void()> &&handler) { |
273 | | if (!Super::isCorked() && Super::canCork()) { |
274 | | Super::cork(); |
275 | | handler(); |
276 | | |
277 | | /* There is no timeout when failing to uncork for WebSockets, |
278 | | * as that is handled by idleTimeout */ |
279 | | auto [written, failed] = Super::uncork(); |
280 | | (void)written; |
281 | | (void)failed; |
282 | | } else { |
283 | | /* We are already corked, or can't cork so let's just call the handler */ |
284 | | handler(); |
285 | | } |
286 | | } |
287 | | |
288 | | /* Subscribe to a topic according to MQTT rules and syntax. Returns success */ |
289 | 10.8M | bool subscribe(std::string_view topic, bool = false) { |
290 | 10.8M | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
291 | 10.8M | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
292 | 10.8M | ); |
293 | | |
294 | | /* Make us a subscriber if we aren't yet */ |
295 | 10.8M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
296 | 10.8M | if (!webSocketData->subscriber) { |
297 | 213k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); |
298 | 213k | webSocketData->subscriber->user = this; |
299 | 213k | } |
300 | | |
301 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ |
302 | 10.8M | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); |
303 | 10.8M | if (topicOrNull && webSocketContextData->subscriptionHandler) { |
304 | | /* Emit this socket, the topic, new count, old count */ |
305 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); |
306 | 0 | } |
307 | | |
308 | | /* Subscribe always succeeds */ |
309 | 10.8M | return true; |
310 | 10.8M | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool) Line | Count | Source | 289 | 10.7M | bool subscribe(std::string_view topic, bool = false) { | 290 | 10.7M | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 291 | 10.7M | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 292 | 10.7M | ); | 293 | | | 294 | | /* Make us a subscriber if we aren't yet */ | 295 | 10.7M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 296 | 10.7M | if (!webSocketData->subscriber) { | 297 | 107k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); | 298 | 107k | webSocketData->subscriber->user = this; | 299 | 107k | } | 300 | | | 301 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ | 302 | 10.7M | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); | 303 | 10.7M | if (topicOrNull && webSocketContextData->subscriptionHandler) { | 304 | | /* Emit this socket, the topic, new count, old count */ | 305 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); | 306 | 0 | } | 307 | | | 308 | | /* Subscribe always succeeds */ | 309 | 10.7M | return true; | 310 | 10.7M | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool) Line | Count | Source | 289 | 105k | bool subscribe(std::string_view topic, bool = false) { | 290 | 105k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 291 | 105k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 292 | 105k | ); | 293 | | | 294 | | /* Make us a subscriber if we aren't yet */ | 295 | 105k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 296 | 105k | if (!webSocketData->subscriber) { | 297 | 105k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); | 298 | 105k | webSocketData->subscriber->user = this; | 299 | 105k | } | 300 | | | 301 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ | 302 | 105k | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); | 303 | 105k | if (topicOrNull && webSocketContextData->subscriptionHandler) { | 304 | | /* Emit this socket, the topic, new count, old count */ | 305 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); | 306 | 0 | } | 307 | | | 308 | | /* Subscribe always succeeds */ | 309 | 105k | return true; | 310 | 105k | } |
|
311 | | |
312 | | /* Unsubscribe from a topic, returns true if we were subscribed. */ |
313 | 562 | bool unsubscribe(std::string_view topic, bool = false) { |
314 | 562 | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
315 | 562 | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
316 | 562 | ); |
317 | | |
318 | 562 | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
319 | | |
320 | 562 | if (!webSocketData->subscriber) { return false; } |
321 | | |
322 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ |
323 | 562 | auto [ok, last, newCount] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic); |
324 | | /* Emit subscription event if last */ |
325 | 562 | if (ok && webSocketContextData->subscriptionHandler) { |
326 | 0 | webSocketContextData->subscriptionHandler(this, topic, newCount, newCount + 1); |
327 | 0 | } |
328 | | |
329 | | /* Leave us as subscribers even if we subscribe to nothing (last unsubscribed topic might miss its message otherwise) */ |
330 | | |
331 | 562 | return ok; |
332 | 562 | } |
333 | | |
334 | | /* Returns whether this socket is subscribed to the specified topic */ |
335 | | bool isSubscribed(std::string_view topic) { |
336 | | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
337 | | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
338 | | ); |
339 | | |
340 | | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
341 | | if (!webSocketData->subscriber) { |
342 | | return false; |
343 | | } |
344 | | |
345 | | Topic *topicPtr = webSocketContextData->topicTree->lookupTopic(topic); |
346 | | if (!topicPtr) { |
347 | | return false; |
348 | | } |
349 | | |
350 | | return topicPtr->count(webSocketData->subscriber); |
351 | | } |
352 | | |
353 | | /* Iterates all topics of this WebSocket. Every topic is represented by its full name. |
354 | | * Can be called in close handler. It is possible to modify the subscription list while |
355 | | * inside the callback ONLY IF not modifying the topic passed to the callback. |
356 | | * Topic names are valid only for the duration of the callback. */ |
357 | | void iterateTopics(MoveOnlyFunction<void(std::string_view)> cb) { |
358 | | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
359 | | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
360 | | ); |
361 | | |
362 | | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
363 | | if (webSocketData->subscriber) { |
364 | | /* Lock this subscriber for unsubscription / subscription */ |
365 | | webSocketContextData->topicTree->iteratingSubscriber = webSocketData->subscriber; |
366 | | |
367 | | for (Topic *topicPtr : webSocketData->subscriber->topics) { |
368 | | cb({topicPtr->name.data(), topicPtr->name.length()}); |
369 | | } |
370 | | |
371 | | /* Unlock subscriber */ |
372 | | webSocketContextData->topicTree->iteratingSubscriber = nullptr; |
373 | | } |
374 | | } |
375 | | |
376 | | /* Publish a message to a topic according to MQTT rules and syntax. Returns success. |
377 | | * We, the WebSocket, must be subscribed to the topic itself and if so - no message will be sent to ourselves. |
378 | | * Use App::publish for an unconditional publish that simply publishes to whomever might be subscribed. */ |
379 | 45.5k | bool publish(std::string_view topic, std::string_view message, OpCode opCode = OpCode::TEXT, bool compress = false) { |
380 | 45.5k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
381 | 45.5k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
382 | 45.5k | ); |
383 | | |
384 | | /* We cannot be a subscriber of this topic if we are not a subscriber of anything */ |
385 | 45.5k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
386 | 45.5k | if (!webSocketData->subscriber) { |
387 | | /* Failure, but still do return the number of subscribers */ |
388 | 0 | return false; |
389 | 0 | } |
390 | | |
391 | | /* Publish as sender, does not receive its own messages even if subscribed to relevant topics */ |
392 | 45.5k | if (message.length() >= LoopData::CORK_BUFFER_SIZE) { |
393 | 0 | return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) { |
394 | 0 | auto *ws = (WebSocket<SSL, true, int> *) s->user; |
395 | |
|
396 | 0 | ws->send(message.message, (OpCode)message.opCode, message.compress); |
397 | 0 | }); |
398 | 45.5k | } else { |
399 | 45.5k | return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress}); |
400 | 45.5k | } |
401 | 45.5k | } |
402 | | }; |
403 | | |
404 | | } |
405 | | |
406 | | #endif // UWS_WEBSOCKET_H |