/src/uWebSockets/src/WebSocket.h
Line | Count | Source |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_WEBSOCKET_H |
19 | | #define UWS_WEBSOCKET_H |
20 | | |
21 | | #include "WebSocketData.h" |
22 | | #include "WebSocketProtocol.h" |
23 | | #include "AsyncSocket.h" |
24 | | #include "WebSocketContextData.h" |
25 | | |
26 | | #include <string_view> |
27 | | |
28 | | namespace uWS { |
29 | | |
30 | | /* Experimental */ |
31 | | enum CompressFlags : int { |
32 | | NO_ACTION, |
33 | | COMPRESS, |
34 | | ALREADY_COMPRESSED |
35 | | }; |
36 | | |
37 | | template <bool SSL, bool isServer, typename USERDATA> |
38 | | struct WebSocket : AsyncSocket<SSL> { |
39 | | template <bool> friend struct TemplatedApp; |
40 | | template <bool> friend struct HttpResponse; |
41 | | private: |
42 | | typedef AsyncSocket<SSL> Super; |
43 | | |
44 | 392k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { |
45 | 392k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); |
46 | 392k | return this; |
47 | 392k | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 119k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 119k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 119k | return this; | 47 | 119k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 41.0k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 41.0k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 41.0k | return this; | 47 | 41.0k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&) Line | Count | Source | 44 | 231k | void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) { | 45 | 231k | new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure)); | 46 | 231k | return this; | 47 | 231k | } |
|
48 | | public: |
49 | | |
50 | | /* Returns pointer to the per socket user data */ |
51 | 2.00M | USERDATA *getUserData() { |
52 | 2.00M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
53 | | /* We just have it overallocated by sizeof type */ |
54 | 2.00M | return (USERDATA *) (webSocketData + 1); |
55 | 2.00M | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 1.37M | USERDATA *getUserData() { | 52 | 1.37M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 1.37M | return (USERDATA *) (webSocketData + 1); | 55 | 1.37M | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 82.1k | USERDATA *getUserData() { | 52 | 82.1k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 82.1k | return (USERDATA *) (webSocketData + 1); | 55 | 82.1k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData() Line | Count | Source | 51 | 551k | USERDATA *getUserData() { | 52 | 551k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 53 | | /* We just have it overallocated by sizeof type */ | 54 | 551k | return (USERDATA *) (webSocketData + 1); | 55 | 551k | } |
|
56 | | |
57 | | /* See AsyncSocket */ |
58 | | using Super::getBufferedAmount; |
59 | | using Super::getRemoteAddress; |
60 | | using Super::getRemoteAddressAsText; |
61 | | using Super::getRemotePort; |
62 | | using Super::getNativeHandle; |
63 | | |
64 | | /* WebSocket close cannot be an alias to AsyncSocket::close since |
65 | | * we need to check first if it was shut down by remote peer */ |
66 | 193k | us_socket_t *close() { |
67 | 193k | if (us_socket_is_closed(SSL, (us_socket_t *) this)) { |
68 | 188k | return nullptr; |
69 | 188k | } |
70 | 5.12k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
71 | 5.12k | if (webSocketData->isShuttingDown) { |
72 | 4.19k | return nullptr; |
73 | 4.19k | } |
74 | | |
75 | 926 | return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr); |
76 | 5.12k | } |
77 | | |
78 | | enum SendStatus : int { |
79 | | BACKPRESSURE, |
80 | | SUCCESS, |
81 | | DROPPED |
82 | | }; |
83 | | |
84 | | /* Sending fragmented messages puts a bit of effort on the user; you must not interleave regular sends |
85 | | * with fragmented sends and you must sendFirstFragment, [sendFragment], then finally sendLastFragment. */ |
86 | | SendStatus sendFirstFragment(std::string_view message, OpCode opCode = OpCode::BINARY, bool compress = false) { |
87 | | return send(message, opCode, compress, false); |
88 | | } |
89 | | |
90 | | SendStatus sendFragment(std::string_view message, bool compress = false) { |
91 | | return send(message, CONTINUATION, compress, false); |
92 | | } |
93 | | |
94 | | SendStatus sendLastFragment(std::string_view message, bool compress = false) { |
95 | | return send(message, CONTINUATION, compress, true); |
96 | | } |
97 | | |
98 | | /* Experimental */ |
99 | | bool hasNegotiatedCompression() { |
100 | | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
101 | | return webSocketData->compressionStatus == WebSocketData::ENABLED; |
102 | | } |
103 | | |
104 | | /* Experimental */ |
105 | | SendStatus sendPrepared(PreparedMessage &preparedMessage) { |
106 | | if (preparedMessage.compressed && hasNegotiatedCompression() && preparedMessage.compressedMessage.length() < preparedMessage.originalMessage.length()) { |
107 | | return send({preparedMessage.compressedMessage.data(), preparedMessage.compressedMessage.length()}, (OpCode) preparedMessage.opCode, uWS::CompressFlags::ALREADY_COMPRESSED); |
108 | | } |
109 | | return send({preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, (OpCode) preparedMessage.opCode); |
110 | | } |
111 | | |
112 | | /* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure, |
113 | | * DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */ |
114 | 448k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { |
115 | 448k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
116 | 448k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
117 | 448k | ); |
118 | | |
119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ |
120 | 448k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { |
121 | | /* Also defer a close if we should */ |
122 | 2.83k | if (webSocketContextData->closeOnBackpressureLimit) { |
123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); |
124 | 0 | } |
125 | | |
126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ |
127 | 2.83k | if (webSocketContextData->droppedHandler) { |
128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); |
129 | 0 | } |
130 | | |
131 | 2.83k | return DROPPED; |
132 | 2.83k | } |
133 | | |
134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ |
135 | 445k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
136 | | |
137 | | /* Special path for long sends of non-compressed, non-SSL messages */ |
138 | 445k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { |
139 | 0 | char header[10]; |
140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); |
141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); |
142 | | |
143 | 0 | if (written != header_length + (int) message.length()) { |
144 | | /* Buffer up backpressure */ |
145 | 0 | if (written > header_length) { |
146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); |
147 | 0 | } else { |
148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); |
149 | 0 | webSocketData->buffer.append(message.data(), message.length()); |
150 | 0 | } |
151 | | /* We cannot still be corked if we have backpressure. |
152 | | * We also cannot uncork normally since it will re-write the already buffered |
153 | | * up backpressure again. */ |
154 | 0 | Super::uncorkWithoutSending(); |
155 | 0 | return BACKPRESSURE; |
156 | 0 | } |
157 | 445k | } else { |
158 | | |
159 | 445k | if (webSocketData->subscriber) { |
160 | | /* This will call back into us, send. */ |
161 | 368k | webSocketContextData->topicTree->drain(webSocketData->subscriber); |
162 | 368k | } |
163 | | |
164 | | /* Transform the message to compressed domain if requested */ |
165 | 445k | if (compress) { |
166 | 341k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
167 | | |
168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ |
169 | 341k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { |
170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ |
171 | 4.23k | if (compress != CompressFlags::ALREADY_COMPRESSED) { |
172 | 4.23k | LoopData *loopData = Super::getLoopData(); |
173 | | /* Compress using either shared or dedicated deflationStream */ |
174 | 4.23k | if (webSocketData->deflationStream) { |
175 | 1.29k | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); |
176 | 2.93k | } else { |
177 | 2.93k | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); |
178 | 2.93k | } |
179 | 4.23k | } |
180 | 337k | } else { |
181 | 337k | compress = false; |
182 | 337k | } |
183 | 341k | } |
184 | | |
185 | | /* Get size, allocate size, write if needed */ |
186 | 445k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); |
187 | 445k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); |
188 | 445k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); |
189 | | |
190 | | /* Depending on size of message we have different paths */ |
191 | 445k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { |
192 | | /* This is a drain */ |
193 | 334k | auto[written, failed] = Super::write(nullptr, 0); |
194 | 334k | if (failed) { |
195 | | /* Return false for failure, skipping to reset the timeout below */ |
196 | 320k | return BACKPRESSURE; |
197 | 320k | } |
198 | 334k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { |
199 | | /* Uncork if we came here uncorked */ |
200 | 3.18k | auto [written, failed] = Super::uncork(); |
201 | 3.18k | if (failed) { |
202 | 1.44k | return BACKPRESSURE; |
203 | 1.44k | } |
204 | 3.18k | } |
205 | | |
206 | 445k | } |
207 | | |
208 | | /* Every successful send resets the timeout */ |
209 | 124k | if (webSocketContextData->resetIdleTimeoutOnSend) { |
210 | 8.58k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); |
211 | 8.58k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); |
212 | 8.58k | webSocketData->hasTimedOut = false; |
213 | 8.58k | } |
214 | | |
215 | | /* Return success */ |
216 | 124k | return SUCCESS; |
217 | 445k | } uWS::WebSocket<true, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 114 | 25.5k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 115 | 25.5k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 116 | 25.5k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 117 | 25.5k | ); | 118 | | | 119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 120 | 25.5k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 121 | | /* Also defer a close if we should */ | 122 | 983 | if (webSocketContextData->closeOnBackpressureLimit) { | 123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 124 | 0 | } | 125 | | | 126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 127 | 983 | if (webSocketContextData->droppedHandler) { | 128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 129 | 0 | } | 130 | | | 131 | 983 | return DROPPED; | 132 | 983 | } | 133 | | | 134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 135 | 24.5k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 136 | | | 137 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 138 | 24.5k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 139 | 0 | char header[10]; | 140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 142 | | | 143 | 0 | if (written != header_length + (int) message.length()) { | 144 | | /* Buffer up backpressure */ | 145 | 0 | if (written > header_length) { | 146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 147 | 0 | } else { | 148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 149 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 150 | 0 | } | 151 | | /* We cannot still be corked if we have backpressure. | 152 | | * We also cannot uncork normally since it will re-write the already buffered | 153 | | * up backpressure again. */ | 154 | 0 | Super::uncorkWithoutSending(); | 155 | 0 | return BACKPRESSURE; | 156 | 0 | } | 157 | 24.5k | } else { | 158 | | | 159 | 24.5k | if (webSocketData->subscriber) { | 160 | | /* This will call back into us, send. */ | 161 | 24.5k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 162 | 24.5k | } | 163 | | | 164 | | /* Transform the message to compressed domain if requested */ | 165 | 24.5k | if (compress) { | 166 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 167 | | | 168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 169 | 0 | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 171 | 0 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 172 | 0 | LoopData *loopData = Super::getLoopData(); | 173 | | /* Compress using either shared or dedicated deflationStream */ | 174 | 0 | if (webSocketData->deflationStream) { | 175 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 176 | 0 | } else { | 177 | 0 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 178 | 0 | } | 179 | 0 | } | 180 | 0 | } else { | 181 | 0 | compress = false; | 182 | 0 | } | 183 | 0 | } | 184 | | | 185 | | /* Get size, allocate size, write if needed */ | 186 | 24.5k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 187 | 24.5k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 188 | 24.5k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 189 | | | 190 | | /* Depending on size of message we have different paths */ | 191 | 24.5k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 192 | | /* This is a drain */ | 193 | 20.1k | auto[written, failed] = Super::write(nullptr, 0); | 194 | 20.1k | if (failed) { | 195 | | /* Return false for failure, skipping to reset the timeout below */ | 196 | 19.0k | return BACKPRESSURE; | 197 | 19.0k | } | 198 | 20.1k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 199 | | /* Uncork if we came here uncorked */ | 200 | 0 | auto [written, failed] = Super::uncork(); | 201 | 0 | if (failed) { | 202 | 0 | return BACKPRESSURE; | 203 | 0 | } | 204 | 0 | } | 205 | | | 206 | 24.5k | } | 207 | | | 208 | | /* Every successful send resets the timeout */ | 209 | 5.49k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 210 | 5.49k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 211 | 5.49k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 212 | 5.49k | webSocketData->hasTimedOut = false; | 213 | 5.49k | } | 214 | | | 215 | | /* Return success */ | 216 | 5.49k | return SUCCESS; | 217 | 24.5k | } |
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 114 | 19.2k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 115 | 19.2k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 116 | 19.2k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 117 | 19.2k | ); | 118 | | | 119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 120 | 19.2k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 121 | | /* Also defer a close if we should */ | 122 | 997 | if (webSocketContextData->closeOnBackpressureLimit) { | 123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 124 | 0 | } | 125 | | | 126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 127 | 997 | if (webSocketContextData->droppedHandler) { | 128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 129 | 0 | } | 130 | | | 131 | 997 | return DROPPED; | 132 | 997 | } | 133 | | | 134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 135 | 18.2k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 136 | | | 137 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 138 | 18.2k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 139 | 0 | char header[10]; | 140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 142 | | | 143 | 0 | if (written != header_length + (int) message.length()) { | 144 | | /* Buffer up backpressure */ | 145 | 0 | if (written > header_length) { | 146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 147 | 0 | } else { | 148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 149 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 150 | 0 | } | 151 | | /* We cannot still be corked if we have backpressure. | 152 | | * We also cannot uncork normally since it will re-write the already buffered | 153 | | * up backpressure again. */ | 154 | 0 | Super::uncorkWithoutSending(); | 155 | 0 | return BACKPRESSURE; | 156 | 0 | } | 157 | 18.2k | } else { | 158 | | | 159 | 18.2k | if (webSocketData->subscriber) { | 160 | | /* This will call back into us, send. */ | 161 | 18.2k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 162 | 18.2k | } | 163 | | | 164 | | /* Transform the message to compressed domain if requested */ | 165 | 18.2k | if (compress) { | 166 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 167 | | | 168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 169 | 0 | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 171 | 0 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 172 | 0 | LoopData *loopData = Super::getLoopData(); | 173 | | /* Compress using either shared or dedicated deflationStream */ | 174 | 0 | if (webSocketData->deflationStream) { | 175 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 176 | 0 | } else { | 177 | 0 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 178 | 0 | } | 179 | 0 | } | 180 | 0 | } else { | 181 | 0 | compress = false; | 182 | 0 | } | 183 | 0 | } | 184 | | | 185 | | /* Get size, allocate size, write if needed */ | 186 | 18.2k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 187 | 18.2k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 188 | 18.2k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 189 | | | 190 | | /* Depending on size of message we have different paths */ | 191 | 18.2k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 192 | | /* This is a drain */ | 193 | 15.5k | auto[written, failed] = Super::write(nullptr, 0); | 194 | 15.5k | if (failed) { | 195 | | /* Return false for failure, skipping to reset the timeout below */ | 196 | 14.5k | return BACKPRESSURE; | 197 | 14.5k | } | 198 | 15.5k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 199 | | /* Uncork if we came here uncorked */ | 200 | 1.41k | auto [written, failed] = Super::uncork(); | 201 | 1.41k | if (failed) { | 202 | 636 | return BACKPRESSURE; | 203 | 636 | } | 204 | 1.41k | } | 205 | | | 206 | 18.2k | } | 207 | | | 208 | | /* Every successful send resets the timeout */ | 209 | 3.09k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 210 | 3.09k | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 211 | 3.09k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 212 | 3.09k | webSocketData->hasTimedOut = false; | 213 | 3.09k | } | 214 | | | 215 | | /* Return success */ | 216 | 3.09k | return SUCCESS; | 217 | 18.2k | } |
uWS::WebSocket<false, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 114 | 319k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 115 | 319k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 116 | 319k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 117 | 319k | ); | 118 | | | 119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 120 | 319k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 121 | | /* Also defer a close if we should */ | 122 | 0 | if (webSocketContextData->closeOnBackpressureLimit) { | 123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 124 | 0 | } | 125 | | | 126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 127 | 0 | if (webSocketContextData->droppedHandler) { | 128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 129 | 0 | } | 130 | |
| 131 | 0 | return DROPPED; | 132 | 0 | } | 133 | | | 134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 135 | 319k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 136 | | | 137 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 138 | 319k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 139 | 0 | char header[10]; | 140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 142 | | | 143 | 0 | if (written != header_length + (int) message.length()) { | 144 | | /* Buffer up backpressure */ | 145 | 0 | if (written > header_length) { | 146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 147 | 0 | } else { | 148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 149 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 150 | 0 | } | 151 | | /* We cannot still be corked if we have backpressure. | 152 | | * We also cannot uncork normally since it will re-write the already buffered | 153 | | * up backpressure again. */ | 154 | 0 | Super::uncorkWithoutSending(); | 155 | 0 | return BACKPRESSURE; | 156 | 0 | } | 157 | 319k | } else { | 158 | | | 159 | 319k | if (webSocketData->subscriber) { | 160 | | /* This will call back into us, send. */ | 161 | 319k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 162 | 319k | } | 163 | | | 164 | | /* Transform the message to compressed domain if requested */ | 165 | 319k | if (compress) { | 166 | 319k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 167 | | | 168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 169 | 319k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 171 | 229 | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 172 | 229 | LoopData *loopData = Super::getLoopData(); | 173 | | /* Compress using either shared or dedicated deflationStream */ | 174 | 229 | if (webSocketData->deflationStream) { | 175 | 177 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 176 | 177 | } else { | 177 | 52 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 178 | 52 | } | 179 | 229 | } | 180 | 319k | } else { | 181 | 319k | compress = false; | 182 | 319k | } | 183 | 319k | } | 184 | | | 185 | | /* Get size, allocate size, write if needed */ | 186 | 319k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 187 | 319k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 188 | 319k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 189 | | | 190 | | /* Depending on size of message we have different paths */ | 191 | 319k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 192 | | /* This is a drain */ | 193 | 251k | auto[written, failed] = Super::write(nullptr, 0); | 194 | 251k | if (failed) { | 195 | | /* Return false for failure, skipping to reset the timeout below */ | 196 | 245k | return BACKPRESSURE; | 197 | 245k | } | 198 | 251k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 199 | | /* Uncork if we came here uncorked */ | 200 | 0 | auto [written, failed] = Super::uncork(); | 201 | 0 | if (failed) { | 202 | 0 | return BACKPRESSURE; | 203 | 0 | } | 204 | 0 | } | 205 | | | 206 | 319k | } | 207 | | | 208 | | /* Every successful send resets the timeout */ | 209 | 74.1k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 210 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 211 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 212 | 0 | webSocketData->hasTimedOut = false; | 213 | 0 | } | 214 | | | 215 | | /* Return success */ | 216 | 74.1k | return SUCCESS; | 217 | 319k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 114 | 45.7k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 115 | 45.7k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 116 | 45.7k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 117 | 45.7k | ); | 118 | | | 119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 120 | 45.7k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 121 | | /* Also defer a close if we should */ | 122 | 853 | if (webSocketContextData->closeOnBackpressureLimit) { | 123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 124 | 0 | } | 125 | | | 126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 127 | 853 | if (webSocketContextData->droppedHandler) { | 128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 129 | 0 | } | 130 | | | 131 | 853 | return DROPPED; | 132 | 853 | } | 133 | | | 134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 135 | 44.8k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 136 | | | 137 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 138 | 44.8k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 139 | 0 | char header[10]; | 140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 142 | | | 143 | 0 | if (written != header_length + (int) message.length()) { | 144 | | /* Buffer up backpressure */ | 145 | 0 | if (written > header_length) { | 146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 147 | 0 | } else { | 148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 149 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 150 | 0 | } | 151 | | /* We cannot still be corked if we have backpressure. | 152 | | * We also cannot uncork normally since it will re-write the already buffered | 153 | | * up backpressure again. */ | 154 | 0 | Super::uncorkWithoutSending(); | 155 | 0 | return BACKPRESSURE; | 156 | 0 | } | 157 | 44.8k | } else { | 158 | | | 159 | 44.8k | if (webSocketData->subscriber) { | 160 | | /* This will call back into us, send. */ | 161 | 0 | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 162 | 0 | } | 163 | | | 164 | | /* Transform the message to compressed domain if requested */ | 165 | 44.8k | if (compress) { | 166 | 9.64k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 167 | | | 168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 169 | 9.64k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 171 | 2.07k | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 172 | 2.07k | LoopData *loopData = Super::getLoopData(); | 173 | | /* Compress using either shared or dedicated deflationStream */ | 174 | 2.07k | if (webSocketData->deflationStream) { | 175 | 0 | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 176 | 2.07k | } else { | 177 | 2.07k | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 178 | 2.07k | } | 179 | 2.07k | } | 180 | 7.57k | } else { | 181 | 7.57k | compress = false; | 182 | 7.57k | } | 183 | 9.64k | } | 184 | | | 185 | | /* Get size, allocate size, write if needed */ | 186 | 44.8k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 187 | 44.8k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 188 | 44.8k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 189 | | | 190 | | /* Depending on size of message we have different paths */ | 191 | 44.8k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 192 | | /* This is a drain */ | 193 | 22.0k | auto[written, failed] = Super::write(nullptr, 0); | 194 | 22.0k | if (failed) { | 195 | | /* Return false for failure, skipping to reset the timeout below */ | 196 | 19.8k | return BACKPRESSURE; | 197 | 19.8k | } | 198 | 22.8k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 199 | | /* Uncork if we came here uncorked */ | 200 | 0 | auto [written, failed] = Super::uncork(); | 201 | 0 | if (failed) { | 202 | 0 | return BACKPRESSURE; | 203 | 0 | } | 204 | 0 | } | 205 | | | 206 | 44.8k | } | 207 | | | 208 | | /* Every successful send resets the timeout */ | 209 | 25.0k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 210 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 211 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 212 | 0 | webSocketData->hasTimedOut = false; | 213 | 0 | } | 214 | | | 215 | | /* Return success */ | 216 | 25.0k | return SUCCESS; | 217 | 44.8k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool) Line | Count | Source | 114 | 38.2k | SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { | 115 | 38.2k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 116 | 38.2k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 117 | 38.2k | ); | 118 | | | 119 | | /* Skip sending and report success if we are over the limit of maxBackpressure */ | 120 | 38.2k | if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) { | 121 | | /* Also defer a close if we should */ | 122 | 0 | if (webSocketContextData->closeOnBackpressureLimit) { | 123 | 0 | us_socket_shutdown_read(SSL, (us_socket_t *) this); | 124 | 0 | } | 125 | | | 126 | | /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */ | 127 | 0 | if (webSocketContextData->droppedHandler) { | 128 | 0 | webSocketContextData->droppedHandler(this, message, opCode); | 129 | 0 | } | 130 | |
| 131 | 0 | return DROPPED; | 132 | 0 | } | 133 | | | 134 | | /* If we are subscribers and have messages to drain we need to drain them here to stay synced */ | 135 | 38.2k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 136 | | | 137 | | /* Special path for long sends of non-compressed, non-SSL messages */ | 138 | 38.2k | if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) { | 139 | 0 | char header[10]; | 140 | 0 | int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin); | 141 | 0 | int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length()); | 142 | | | 143 | 0 | if (written != header_length + (int) message.length()) { | 144 | | /* Buffer up backpressure */ | 145 | 0 | if (written > header_length) { | 146 | 0 | webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length)); | 147 | 0 | } else { | 148 | 0 | webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written); | 149 | 0 | webSocketData->buffer.append(message.data(), message.length()); | 150 | 0 | } | 151 | | /* We cannot still be corked if we have backpressure. | 152 | | * We also cannot uncork normally since it will re-write the already buffered | 153 | | * up backpressure again. */ | 154 | 0 | Super::uncorkWithoutSending(); | 155 | 0 | return BACKPRESSURE; | 156 | 0 | } | 157 | 38.2k | } else { | 158 | | | 159 | 38.2k | if (webSocketData->subscriber) { | 160 | | /* This will call back into us, send. */ | 161 | 5.51k | webSocketContextData->topicTree->drain(webSocketData->subscriber); | 162 | 5.51k | } | 163 | | | 164 | | /* Transform the message to compressed domain if requested */ | 165 | 38.2k | if (compress) { | 166 | 12.4k | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 167 | | | 168 | | /* Check and correct the compress hint. It is never valid to compress 0 bytes */ | 169 | 12.4k | if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { | 170 | | /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ | 171 | 1.93k | if (compress != CompressFlags::ALREADY_COMPRESSED) { | 172 | 1.93k | LoopData *loopData = Super::getLoopData(); | 173 | | /* Compress using either shared or dedicated deflationStream */ | 174 | 1.93k | if (webSocketData->deflationStream) { | 175 | 1.11k | message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); | 176 | 1.11k | } else { | 177 | 814 | message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); | 178 | 814 | } | 179 | 1.93k | } | 180 | 10.5k | } else { | 181 | 10.5k | compress = false; | 182 | 10.5k | } | 183 | 12.4k | } | 184 | | | 185 | | /* Get size, allocate size, write if needed */ | 186 | 38.2k | size_t messageFrameSize = protocol::messageFrameSize(message.length()); | 187 | 38.2k | auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize); | 188 | 38.2k | protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin); | 189 | | | 190 | | /* Depending on size of message we have different paths */ | 191 | 38.2k | if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) { | 192 | | /* This is a drain */ | 193 | 25.4k | auto[written, failed] = Super::write(nullptr, 0); | 194 | 25.4k | if (failed) { | 195 | | /* Return false for failure, skipping to reset the timeout below */ | 196 | 21.1k | return BACKPRESSURE; | 197 | 21.1k | } | 198 | 25.4k | } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) { | 199 | | /* Uncork if we came here uncorked */ | 200 | 1.77k | auto [written, failed] = Super::uncork(); | 201 | 1.77k | if (failed) { | 202 | 809 | return BACKPRESSURE; | 203 | 809 | } | 204 | 1.77k | } | 205 | | | 206 | 38.2k | } | 207 | | | 208 | | /* Every successful send resets the timeout */ | 209 | 16.3k | if (webSocketContextData->resetIdleTimeoutOnSend) { | 210 | 0 | Super::timeout(webSocketContextData->idleTimeoutComponents.first); | 211 | 0 | WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); | 212 | 0 | webSocketData->hasTimedOut = false; | 213 | 0 | } | 214 | | | 215 | | /* Return success */ | 216 | 16.3k | return SUCCESS; | 217 | 38.2k | } |
|
218 | | |
219 | | /* Send websocket close frame, emit close event, send FIN if successful. |
220 | | * Will not append a close reason if code is 0 or 1005. */ |
221 | 34.6k | void end(int code = 0, std::string_view message = {}) { |
222 | | /* Check if we already called this one */ |
223 | 34.6k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
224 | 34.6k | if (webSocketData->isShuttingDown) { |
225 | 0 | return; |
226 | 0 | } |
227 | | |
228 | | /* We postpone any FIN sending to either drainage or uncorking */ |
229 | 34.6k | webSocketData->isShuttingDown = true; |
230 | | |
231 | | /* Format and send the close frame */ |
232 | 34.6k | static const int MAX_CLOSE_PAYLOAD = 123; |
233 | 34.6k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); |
234 | 34.6k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; |
235 | 34.6k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); |
236 | 34.6k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); |
237 | | |
238 | | /* FIN if we are ok and not corked */ |
239 | 34.6k | if (!this->isCorked()) { |
240 | 4.20k | if (ok) { |
241 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. |
242 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ |
243 | 1.38k | this->shutdown(); |
244 | 1.38k | } |
245 | 4.20k | } |
246 | | |
247 | 34.6k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
248 | 34.6k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
249 | 34.6k | ); |
250 | | |
251 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ |
252 | 34.6k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); |
253 | | |
254 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ |
255 | 34.6k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { |
256 | 0 | for (Topic *t : webSocketData->subscriber->topics) { |
257 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); |
258 | 0 | } |
259 | 0 | } |
260 | | |
261 | | /* Make sure to unsubscribe from any pub/sub node at exit */ |
262 | 34.6k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); |
263 | 34.6k | webSocketData->subscriber = nullptr; |
264 | | |
265 | | /* Emit close event */ |
266 | 34.6k | if (webSocketContextData->closeHandler) { |
267 | 34.3k | webSocketContextData->closeHandler(this, code, message); |
268 | 34.3k | } |
269 | 34.6k | ((USERDATA *) this->getUserData())->~USERDATA(); |
270 | 34.6k | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 221 | 13.3k | void end(int code = 0, std::string_view message = {}) { | 222 | | /* Check if we already called this one */ | 223 | 13.3k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 224 | 13.3k | if (webSocketData->isShuttingDown) { | 225 | 0 | return; | 226 | 0 | } | 227 | | | 228 | | /* We postpone any FIN sending to either drainage or uncorking */ | 229 | 13.3k | webSocketData->isShuttingDown = true; | 230 | | | 231 | | /* Format and send the close frame */ | 232 | 13.3k | static const int MAX_CLOSE_PAYLOAD = 123; | 233 | 13.3k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 234 | 13.3k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 235 | 13.3k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 236 | 13.3k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 237 | | | 238 | | /* FIN if we are ok and not corked */ | 239 | 13.3k | if (!this->isCorked()) { | 240 | 310 | if (ok) { | 241 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 242 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 243 | 216 | this->shutdown(); | 244 | 216 | } | 245 | 310 | } | 246 | | | 247 | 13.3k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 248 | 13.3k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 249 | 13.3k | ); | 250 | | | 251 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 252 | 13.3k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 253 | | | 254 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 255 | 13.3k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 256 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 257 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 258 | 0 | } | 259 | 0 | } | 260 | | | 261 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 262 | 13.3k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 263 | 13.3k | webSocketData->subscriber = nullptr; | 264 | | | 265 | | /* Emit close event */ | 266 | 13.3k | if (webSocketContextData->closeHandler) { | 267 | 13.3k | webSocketContextData->closeHandler(this, code, message); | 268 | 13.3k | } | 269 | 13.3k | ((USERDATA *) this->getUserData())->~USERDATA(); | 270 | 13.3k | } |
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 221 | 10.1k | void end(int code = 0, std::string_view message = {}) { | 222 | | /* Check if we already called this one */ | 223 | 10.1k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 224 | 10.1k | if (webSocketData->isShuttingDown) { | 225 | 0 | return; | 226 | 0 | } | 227 | | | 228 | | /* We postpone any FIN sending to either drainage or uncorking */ | 229 | 10.1k | webSocketData->isShuttingDown = true; | 230 | | | 231 | | /* Format and send the close frame */ | 232 | 10.1k | static const int MAX_CLOSE_PAYLOAD = 123; | 233 | 10.1k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 234 | 10.1k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 235 | 10.1k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 236 | 10.1k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 237 | | | 238 | | /* FIN if we are ok and not corked */ | 239 | 10.1k | if (!this->isCorked()) { | 240 | 0 | if (ok) { | 241 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 242 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 243 | 0 | this->shutdown(); | 244 | 0 | } | 245 | 0 | } | 246 | | | 247 | 10.1k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 248 | 10.1k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 249 | 10.1k | ); | 250 | | | 251 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 252 | 10.1k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 253 | | | 254 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 255 | 10.1k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 256 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 257 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 258 | 0 | } | 259 | 0 | } | 260 | | | 261 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 262 | 10.1k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 263 | 10.1k | webSocketData->subscriber = nullptr; | 264 | | | 265 | | /* Emit close event */ | 266 | 10.1k | if (webSocketContextData->closeHandler) { | 267 | 9.95k | webSocketContextData->closeHandler(this, code, message); | 268 | 9.95k | } | 269 | 10.1k | ((USERDATA *) this->getUserData())->~USERDATA(); | 270 | 10.1k | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >) Line | Count | Source | 221 | 11.1k | void end(int code = 0, std::string_view message = {}) { | 222 | | /* Check if we already called this one */ | 223 | 11.1k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 224 | 11.1k | if (webSocketData->isShuttingDown) { | 225 | 0 | return; | 226 | 0 | } | 227 | | | 228 | | /* We postpone any FIN sending to either drainage or uncorking */ | 229 | 11.1k | webSocketData->isShuttingDown = true; | 230 | | | 231 | | /* Format and send the close frame */ | 232 | 11.1k | static const int MAX_CLOSE_PAYLOAD = 123; | 233 | 11.1k | size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length()); | 234 | 11.1k | char closePayload[MAX_CLOSE_PAYLOAD + 2]; | 235 | 11.1k | size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length); | 236 | 11.1k | bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE); | 237 | | | 238 | | /* FIN if we are ok and not corked */ | 239 | 11.1k | if (!this->isCorked()) { | 240 | 3.89k | if (ok) { | 241 | | /* If we are not corked, and we just sent off everything, we need to FIN right here. | 242 | | * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */ | 243 | 1.16k | this->shutdown(); | 244 | 1.16k | } | 245 | 3.89k | } | 246 | | | 247 | 11.1k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 248 | 11.1k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 249 | 11.1k | ); | 250 | | | 251 | | /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */ | 252 | 11.1k | Super::timeout(webSocketContextData->idleTimeoutComponents.second); | 253 | | | 254 | | /* At this point we iterate all currently held subscriptions and emit an event for all of them */ | 255 | 11.1k | if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) { | 256 | 0 | for (Topic *t : webSocketData->subscriber->topics) { | 257 | 0 | webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size()); | 258 | 0 | } | 259 | 0 | } | 260 | | | 261 | | /* Make sure to unsubscribe from any pub/sub node at exit */ | 262 | 11.1k | webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber); | 263 | 11.1k | webSocketData->subscriber = nullptr; | 264 | | | 265 | | /* Emit close event */ | 266 | 11.1k | if (webSocketContextData->closeHandler) { | 267 | 11.1k | webSocketContextData->closeHandler(this, code, message); | 268 | 11.1k | } | 269 | 11.1k | ((USERDATA *) this->getUserData())->~USERDATA(); | 270 | 11.1k | } |
|
271 | | |
272 | | /* Corks the response if possible. Leaves already corked socket be. */ |
273 | | void cork(MoveOnlyFunction<void()> &&handler) { |
274 | | if (!Super::isCorked() && Super::canCork()) { |
275 | | Super::cork(); |
276 | | handler(); |
277 | | |
278 | | /* There is no timeout when failing to uncork for WebSockets, |
279 | | * as that is handled by idleTimeout */ |
280 | | auto [written, failed] = Super::uncork(); |
281 | | (void)written; |
282 | | (void)failed; |
283 | | } else { |
284 | | /* We are already corked, or can't cork so let's just call the handler */ |
285 | | handler(); |
286 | | } |
287 | | } |
288 | | |
289 | | /* Subscribe to a topic according to MQTT rules and syntax. Returns success */ |
290 | 12.1M | bool subscribe(std::string_view topic, bool = false) { |
291 | 12.1M | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
292 | 12.1M | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
293 | 12.1M | ); |
294 | | |
295 | | /* Make us a subscriber if we aren't yet */ |
296 | 12.1M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
297 | 12.1M | if (!webSocketData->subscriber) { |
298 | 312k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); |
299 | 312k | webSocketData->subscriber->user = this; |
300 | 312k | } |
301 | | |
302 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ |
303 | 12.1M | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); |
304 | 12.1M | if (topicOrNull && webSocketContextData->subscriptionHandler) { |
305 | | /* Emit this socket, the topic, new count, old count */ |
306 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); |
307 | 0 | } |
308 | | |
309 | | /* Subscribe always succeeds */ |
310 | 12.1M | return true; |
311 | 12.1M | } EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool) Line | Count | Source | 290 | 11.9M | bool subscribe(std::string_view topic, bool = false) { | 291 | 11.9M | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 292 | 11.9M | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 293 | 11.9M | ); | 294 | | | 295 | | /* Make us a subscriber if we aren't yet */ | 296 | 11.9M | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 297 | 11.9M | if (!webSocketData->subscriber) { | 298 | 119k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); | 299 | 119k | webSocketData->subscriber->user = this; | 300 | 119k | } | 301 | | | 302 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ | 303 | 11.9M | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); | 304 | 11.9M | if (topicOrNull && webSocketContextData->subscriptionHandler) { | 305 | | /* Emit this socket, the topic, new count, old count */ | 306 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); | 307 | 0 | } | 308 | | | 309 | | /* Subscribe always succeeds */ | 310 | 11.9M | return true; | 311 | 11.9M | } |
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool) Line | Count | Source | 290 | 192k | bool subscribe(std::string_view topic, bool = false) { | 291 | 192k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, | 292 | 192k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) | 293 | 192k | ); | 294 | | | 295 | | /* Make us a subscriber if we aren't yet */ | 296 | 192k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); | 297 | 192k | if (!webSocketData->subscriber) { | 298 | 192k | webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber(); | 299 | 192k | webSocketData->subscriber->user = this; | 300 | 192k | } | 301 | | | 302 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ | 303 | 192k | Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic); | 304 | 192k | if (topicOrNull && webSocketContextData->subscriptionHandler) { | 305 | | /* Emit this socket, the topic, new count, old count */ | 306 | 0 | webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1); | 307 | 0 | } | 308 | | | 309 | | /* Subscribe always succeeds */ | 310 | 192k | return true; | 311 | 192k | } |
|
312 | | |
313 | | /* Unsubscribe from a topic, returns true if we were subscribed. */ |
314 | 569 | bool unsubscribe(std::string_view topic, bool = false) { |
315 | 569 | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
316 | 569 | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
317 | 569 | ); |
318 | | |
319 | 569 | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
320 | | |
321 | 569 | if (!webSocketData->subscriber) { return false; } |
322 | | |
323 | | /* Cannot return numSubscribers as this is only for this particular websocket context */ |
324 | 569 | auto [ok, last, newCount] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic); |
325 | | /* Emit subscription event if last */ |
326 | 569 | if (ok && webSocketContextData->subscriptionHandler) { |
327 | 0 | webSocketContextData->subscriptionHandler(this, topic, newCount, newCount + 1); |
328 | 0 | } |
329 | | |
330 | | /* Leave us as subscribers even if we subscribe to nothing (last unsubscribed topic might miss its message otherwise) */ |
331 | | |
332 | 569 | return ok; |
333 | 569 | } |
334 | | |
335 | | /* Returns whether this socket is subscribed to the specified topic */ |
336 | | bool isSubscribed(std::string_view topic) { |
337 | | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
338 | | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
339 | | ); |
340 | | |
341 | | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
342 | | if (!webSocketData->subscriber) { |
343 | | return false; |
344 | | } |
345 | | |
346 | | Topic *topicPtr = webSocketContextData->topicTree->lookupTopic(topic); |
347 | | if (!topicPtr) { |
348 | | return false; |
349 | | } |
350 | | |
351 | | return topicPtr->count(webSocketData->subscriber); |
352 | | } |
353 | | |
354 | | /* Iterates all topics of this WebSocket. Every topic is represented by its full name. |
355 | | * Can be called in close handler. It is possible to modify the subscription list while |
356 | | * inside the callback ONLY IF not modifying the topic passed to the callback. |
357 | | * Topic names are valid only for the duration of the callback. */ |
358 | | void iterateTopics(MoveOnlyFunction<void(std::string_view)> cb) { |
359 | | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
360 | | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
361 | | ); |
362 | | |
363 | | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
364 | | if (webSocketData->subscriber) { |
365 | | /* Lock this subscriber for unsubscription / subscription */ |
366 | | webSocketContextData->topicTree->iteratingSubscriber = webSocketData->subscriber; |
367 | | |
368 | | for (Topic *topicPtr : webSocketData->subscriber->topics) { |
369 | | cb({topicPtr->name.data(), topicPtr->name.length()}); |
370 | | } |
371 | | |
372 | | /* Unlock subscriber */ |
373 | | webSocketContextData->topicTree->iteratingSubscriber = nullptr; |
374 | | } |
375 | | } |
376 | | |
377 | | /* Publish a message to a topic according to MQTT rules and syntax. Returns success. |
378 | | * We, the WebSocket, must be subscribed to the topic itself and if so - no message will be sent to ourselves. |
379 | | * Use App::publish for an unconditional publish that simply publishes to whomever might be subscribed. */ |
380 | 160k | bool publish(std::string_view topic, std::string_view message, OpCode opCode = OpCode::TEXT, bool compress = false) { |
381 | 160k | WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, |
382 | 160k | (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) |
383 | 160k | ); |
384 | | |
385 | | /* We cannot be a subscriber of this topic if we are not a subscriber of anything */ |
386 | 160k | WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this); |
387 | 160k | if (!webSocketData->subscriber) { |
388 | | /* Failure, but still do return the number of subscribers */ |
389 | 0 | return false; |
390 | 0 | } |
391 | | |
392 | | /* Publish as sender, does not receive its own messages even if subscribed to relevant topics */ |
393 | 160k | if (message.length() >= LoopData::CORK_BUFFER_SIZE) { |
394 | 0 | return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) { |
395 | 0 | auto *ws = (WebSocket<SSL, true, int> *) s->user; |
396 | |
|
397 | 0 | ws->send(message.message, (OpCode)message.opCode, message.compress); |
398 | 0 | }); |
399 | 160k | } else { |
400 | 160k | return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress}); |
401 | 160k | } |
402 | 160k | } |
403 | | }; |
404 | | |
405 | | } |
406 | | |
407 | | #endif // UWS_WEBSOCKET_H |