Coverage Report

Created: 2026-04-29 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/uWebSockets/src/WebSocket.h
Line
Count
Source
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_WEBSOCKET_H
19
#define UWS_WEBSOCKET_H
20
21
#include "WebSocketData.h"
22
#include "WebSocketProtocol.h"
23
#include "AsyncSocket.h"
24
#include "WebSocketContextData.h"
25
26
#include <string_view>
27
28
namespace uWS {
29
30
/* Experimental */
31
enum CompressFlags : int {
32
    NO_ACTION,
33
    COMPRESS,
34
    ALREADY_COMPRESSED
35
};
36
37
template <bool SSL, bool isServer, typename USERDATA>
38
struct WebSocket : AsyncSocket<SSL> {
39
    template <bool> friend struct TemplatedApp;
40
    template <bool> friend struct HttpResponse;
41
private:
42
    typedef AsyncSocket<SSL> Super;
43
44
392k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
392k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
392k
        return this;
47
392k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
119k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
119k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
119k
        return this;
47
119k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
41.0k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
41.0k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
41.0k
        return this;
47
41.0k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
231k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
231k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
231k
        return this;
47
231k
    }
48
public:
49
50
    /* Returns pointer to the per socket user data */
51
2.00M
    USERDATA *getUserData() {
52
2.00M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
2.00M
        return (USERDATA *) (webSocketData + 1);
55
2.00M
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
1.37M
    USERDATA *getUserData() {
52
1.37M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
1.37M
        return (USERDATA *) (webSocketData + 1);
55
1.37M
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
82.1k
    USERDATA *getUserData() {
52
82.1k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
82.1k
        return (USERDATA *) (webSocketData + 1);
55
82.1k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
551k
    USERDATA *getUserData() {
52
551k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
551k
        return (USERDATA *) (webSocketData + 1);
55
551k
    }
56
57
    /* See AsyncSocket */
58
    using Super::getBufferedAmount;
59
    using Super::getRemoteAddress;
60
    using Super::getRemoteAddressAsText;
61
    using Super::getRemotePort;
62
    using Super::getNativeHandle;
63
64
    /* WebSocket close cannot be an alias to AsyncSocket::close since
65
     * we need to check first if it was shut down by remote peer */
66
193k
    us_socket_t *close() {
67
193k
        if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
68
188k
            return nullptr;
69
188k
        }
70
5.12k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
71
5.12k
        if (webSocketData->isShuttingDown) {
72
4.19k
            return nullptr;
73
4.19k
        }
74
75
926
        return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr);
76
5.12k
    }
77
78
    enum SendStatus : int {
79
        BACKPRESSURE,
80
        SUCCESS,
81
        DROPPED
82
    };
83
84
    /* Sending fragmented messages puts a bit of effort on the user; you must not interleave regular sends
85
     * with fragmented sends and you must sendFirstFragment, [sendFragment], then finally sendLastFragment. */
86
    SendStatus sendFirstFragment(std::string_view message, OpCode opCode = OpCode::BINARY, bool compress = false) {
87
        return send(message, opCode, compress, false);
88
    }
89
90
    SendStatus sendFragment(std::string_view message, bool compress = false) {
91
        return send(message, CONTINUATION, compress, false);
92
    }
93
94
    SendStatus sendLastFragment(std::string_view message, bool compress = false) {
95
        return send(message, CONTINUATION, compress, true);
96
    }
97
98
    /* Experimental */
99
    bool hasNegotiatedCompression() {
100
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
101
        return webSocketData->compressionStatus == WebSocketData::ENABLED;
102
    }
103
104
    /* Experimental */
105
    SendStatus sendPrepared(PreparedMessage &preparedMessage) {
106
        if (preparedMessage.compressed && hasNegotiatedCompression() && preparedMessage.compressedMessage.length() < preparedMessage.originalMessage.length()) {
107
            return send({preparedMessage.compressedMessage.data(), preparedMessage.compressedMessage.length()}, (OpCode) preparedMessage.opCode, uWS::CompressFlags::ALREADY_COMPRESSED);
108
        }
109
        return send({preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, (OpCode) preparedMessage.opCode);
110
    }
111
112
    /* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure,
113
     * DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */
114
448k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
448k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
448k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
448k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
448k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
2.83k
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
2.83k
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
2.83k
            return DROPPED;
132
2.83k
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
445k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
445k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
445k
        } else {
158
159
445k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
368k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
368k
            }
163
164
            /* Transform the message to compressed domain if requested */
165
445k
            if (compress) {
166
341k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
341k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
4.23k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
4.23k
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
4.23k
                        if (webSocketData->deflationStream) {
175
1.29k
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
2.93k
                        } else {
177
2.93k
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
2.93k
                        }
179
4.23k
                    }
180
337k
                } else {
181
337k
                    compress = false;
182
337k
                }
183
341k
            }
184
185
            /* Get size, allocate size, write if needed */
186
445k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
445k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
445k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
445k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
334k
                auto[written, failed] = Super::write(nullptr, 0);
194
334k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
320k
                    return BACKPRESSURE;
197
320k
                }
198
334k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
3.18k
                auto [written, failed] = Super::uncork();
201
3.18k
                if (failed) {
202
1.44k
                    return BACKPRESSURE;
203
1.44k
                }
204
3.18k
            }
205
206
445k
        }
207
208
        /* Every successful send resets the timeout */
209
124k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
8.58k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
8.58k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
8.58k
            webSocketData->hasTimedOut = false;
213
8.58k
        }
214
215
        /* Return success */
216
124k
        return SUCCESS;
217
445k
    }
uWS::WebSocket<true, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
114
25.5k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
25.5k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
25.5k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
25.5k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
25.5k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
983
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
983
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
983
            return DROPPED;
132
983
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
24.5k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
24.5k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
24.5k
        } else {
158
159
24.5k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
24.5k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
24.5k
            }
163
164
            /* Transform the message to compressed domain if requested */
165
24.5k
            if (compress) {
166
0
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
0
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
0
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
0
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
0
                        if (webSocketData->deflationStream) {
175
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
0
                        } else {
177
0
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
0
                        }
179
0
                    }
180
0
                } else {
181
0
                    compress = false;
182
0
                }
183
0
            }
184
185
            /* Get size, allocate size, write if needed */
186
24.5k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
24.5k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
24.5k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
24.5k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
20.1k
                auto[written, failed] = Super::write(nullptr, 0);
194
20.1k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
19.0k
                    return BACKPRESSURE;
197
19.0k
                }
198
20.1k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
0
                auto [written, failed] = Super::uncork();
201
0
                if (failed) {
202
0
                    return BACKPRESSURE;
203
0
                }
204
0
            }
205
206
24.5k
        }
207
208
        /* Every successful send resets the timeout */
209
5.49k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
5.49k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
5.49k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
5.49k
            webSocketData->hasTimedOut = false;
213
5.49k
        }
214
215
        /* Return success */
216
5.49k
        return SUCCESS;
217
24.5k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
114
19.2k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
19.2k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
19.2k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
19.2k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
19.2k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
997
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
997
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
997
            return DROPPED;
132
997
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
18.2k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
18.2k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
18.2k
        } else {
158
159
18.2k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
18.2k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
18.2k
            }
163
164
            /* Transform the message to compressed domain if requested */
165
18.2k
            if (compress) {
166
0
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
0
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
0
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
0
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
0
                        if (webSocketData->deflationStream) {
175
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
0
                        } else {
177
0
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
0
                        }
179
0
                    }
180
0
                } else {
181
0
                    compress = false;
182
0
                }
183
0
            }
184
185
            /* Get size, allocate size, write if needed */
186
18.2k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
18.2k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
18.2k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
18.2k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
15.5k
                auto[written, failed] = Super::write(nullptr, 0);
194
15.5k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
14.5k
                    return BACKPRESSURE;
197
14.5k
                }
198
15.5k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
1.41k
                auto [written, failed] = Super::uncork();
201
1.41k
                if (failed) {
202
636
                    return BACKPRESSURE;
203
636
                }
204
1.41k
            }
205
206
18.2k
        }
207
208
        /* Every successful send resets the timeout */
209
3.09k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
3.09k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
3.09k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
3.09k
            webSocketData->hasTimedOut = false;
213
3.09k
        }
214
215
        /* Return success */
216
3.09k
        return SUCCESS;
217
18.2k
    }
uWS::WebSocket<false, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
114
319k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
319k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
319k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
319k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
319k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
0
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
0
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
0
            return DROPPED;
132
0
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
319k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
319k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
319k
        } else {
158
159
319k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
319k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
319k
            }
163
164
            /* Transform the message to compressed domain if requested */
165
319k
            if (compress) {
166
319k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
319k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
229
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
229
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
229
                        if (webSocketData->deflationStream) {
175
177
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
177
                        } else {
177
52
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
52
                        }
179
229
                    }
180
319k
                } else {
181
319k
                    compress = false;
182
319k
                }
183
319k
            }
184
185
            /* Get size, allocate size, write if needed */
186
319k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
319k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
319k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
319k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
251k
                auto[written, failed] = Super::write(nullptr, 0);
194
251k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
245k
                    return BACKPRESSURE;
197
245k
                }
198
251k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
0
                auto [written, failed] = Super::uncork();
201
0
                if (failed) {
202
0
                    return BACKPRESSURE;
203
0
                }
204
0
            }
205
206
319k
        }
207
208
        /* Every successful send resets the timeout */
209
74.1k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
0
            webSocketData->hasTimedOut = false;
213
0
        }
214
215
        /* Return success */
216
74.1k
        return SUCCESS;
217
319k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
114
45.7k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
45.7k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
45.7k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
45.7k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
45.7k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
853
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
853
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
853
            return DROPPED;
132
853
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
44.8k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
44.8k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
44.8k
        } else {
158
159
44.8k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
0
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
0
            }
163
164
            /* Transform the message to compressed domain if requested */
165
44.8k
            if (compress) {
166
9.64k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
9.64k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
2.07k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
2.07k
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
2.07k
                        if (webSocketData->deflationStream) {
175
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
2.07k
                        } else {
177
2.07k
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
2.07k
                        }
179
2.07k
                    }
180
7.57k
                } else {
181
7.57k
                    compress = false;
182
7.57k
                }
183
9.64k
            }
184
185
            /* Get size, allocate size, write if needed */
186
44.8k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
44.8k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
44.8k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
44.8k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
22.0k
                auto[written, failed] = Super::write(nullptr, 0);
194
22.0k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
19.8k
                    return BACKPRESSURE;
197
19.8k
                }
198
22.8k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
0
                auto [written, failed] = Super::uncork();
201
0
                if (failed) {
202
0
                    return BACKPRESSURE;
203
0
                }
204
0
            }
205
206
44.8k
        }
207
208
        /* Every successful send resets the timeout */
209
25.0k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
0
            webSocketData->hasTimedOut = false;
213
0
        }
214
215
        /* Return success */
216
25.0k
        return SUCCESS;
217
44.8k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
114
38.2k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
115
38.2k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
116
38.2k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
117
38.2k
        );
118
119
        /* Skip sending and report success if we are over the limit of maxBackpressure */
120
38.2k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
121
            /* Also defer a close if we should */
122
0
            if (webSocketContextData->closeOnBackpressureLimit) {
123
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
124
0
            }
125
126
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
127
0
            if (webSocketContextData->droppedHandler) {
128
0
                webSocketContextData->droppedHandler(this, message, opCode);
129
0
            }
130
131
0
            return DROPPED;
132
0
        }
133
134
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
135
38.2k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
136
137
        /* Special path for long sends of non-compressed, non-SSL messages */
138
38.2k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
139
0
            char header[10];
140
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
141
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
142
        
143
0
            if (written != header_length + (int) message.length()) {
144
                /* Buffer up backpressure */
145
0
                if (written > header_length) {
146
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
147
0
                } else {
148
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
149
0
                    webSocketData->buffer.append(message.data(), message.length());
150
0
                }
151
                /* We cannot still be corked if we have backpressure.
152
                 * We also cannot uncork normally since it will re-write the already buffered
153
                 * up backpressure again. */
154
0
                Super::uncorkWithoutSending();
155
0
                return BACKPRESSURE;
156
0
            }
157
38.2k
        } else {
158
159
38.2k
            if (webSocketData->subscriber) {
160
                /* This will call back into us, send. */
161
5.51k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
162
5.51k
            }
163
164
            /* Transform the message to compressed domain if requested */
165
38.2k
            if (compress) {
166
12.4k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
167
168
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
169
12.4k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
170
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
171
1.93k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
172
1.93k
                        LoopData *loopData = Super::getLoopData();
173
                        /* Compress using either shared or dedicated deflationStream */
174
1.93k
                        if (webSocketData->deflationStream) {
175
1.11k
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
176
1.11k
                        } else {
177
814
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
178
814
                        }
179
1.93k
                    }
180
10.5k
                } else {
181
10.5k
                    compress = false;
182
10.5k
                }
183
12.4k
            }
184
185
            /* Get size, allocate size, write if needed */
186
38.2k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
187
38.2k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
188
38.2k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
189
190
            /* Depending on size of message we have different paths */
191
38.2k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
192
                /* This is a drain */
193
25.4k
                auto[written, failed] = Super::write(nullptr, 0);
194
25.4k
                if (failed) {
195
                    /* Return false for failure, skipping to reset the timeout below */
196
21.1k
                    return BACKPRESSURE;
197
21.1k
                }
198
25.4k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
199
                /* Uncork if we came here uncorked */
200
1.77k
                auto [written, failed] = Super::uncork();
201
1.77k
                if (failed) {
202
809
                    return BACKPRESSURE;
203
809
                }
204
1.77k
            }
205
206
38.2k
        }
207
208
        /* Every successful send resets the timeout */
209
16.3k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
210
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
211
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
212
0
            webSocketData->hasTimedOut = false;
213
0
        }
214
215
        /* Return success */
216
16.3k
        return SUCCESS;
217
38.2k
    }
218
219
    /* Send websocket close frame, emit close event, send FIN if successful.
220
     * Will not append a close reason if code is 0 or 1005. */
221
34.6k
    void end(int code = 0, std::string_view message = {}) {
222
        /* Check if we already called this one */
223
34.6k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
224
34.6k
        if (webSocketData->isShuttingDown) {
225
0
            return;
226
0
        }
227
228
        /* We postpone any FIN sending to either drainage or uncorking */
229
34.6k
        webSocketData->isShuttingDown = true;
230
231
        /* Format and send the close frame */
232
34.6k
        static const int MAX_CLOSE_PAYLOAD = 123;
233
34.6k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
234
34.6k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
235
34.6k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
236
34.6k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
237
238
        /* FIN if we are ok and not corked */
239
34.6k
        if (!this->isCorked()) {
240
4.20k
            if (ok) {
241
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
242
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
243
1.38k
                this->shutdown();
244
1.38k
            }
245
4.20k
        }
246
247
34.6k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
248
34.6k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
249
34.6k
        );
250
251
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
252
34.6k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
253
254
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
255
34.6k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
256
0
            for (Topic *t : webSocketData->subscriber->topics) {
257
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
258
0
            }
259
0
        }
260
261
        /* Make sure to unsubscribe from any pub/sub node at exit */
262
34.6k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
263
34.6k
        webSocketData->subscriber = nullptr;
264
265
        /* Emit close event */
266
34.6k
        if (webSocketContextData->closeHandler) {
267
34.3k
            webSocketContextData->closeHandler(this, code, message);
268
34.3k
        }
269
34.6k
        ((USERDATA *) this->getUserData())->~USERDATA();
270
34.6k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
221
13.3k
    void end(int code = 0, std::string_view message = {}) {
222
        /* Check if we already called this one */
223
13.3k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
224
13.3k
        if (webSocketData->isShuttingDown) {
225
0
            return;
226
0
        }
227
228
        /* We postpone any FIN sending to either drainage or uncorking */
229
13.3k
        webSocketData->isShuttingDown = true;
230
231
        /* Format and send the close frame */
232
13.3k
        static const int MAX_CLOSE_PAYLOAD = 123;
233
13.3k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
234
13.3k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
235
13.3k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
236
13.3k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
237
238
        /* FIN if we are ok and not corked */
239
13.3k
        if (!this->isCorked()) {
240
310
            if (ok) {
241
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
242
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
243
216
                this->shutdown();
244
216
            }
245
310
        }
246
247
13.3k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
248
13.3k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
249
13.3k
        );
250
251
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
252
13.3k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
253
254
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
255
13.3k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
256
0
            for (Topic *t : webSocketData->subscriber->topics) {
257
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
258
0
            }
259
0
        }
260
261
        /* Make sure to unsubscribe from any pub/sub node at exit */
262
13.3k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
263
13.3k
        webSocketData->subscriber = nullptr;
264
265
        /* Emit close event */
266
13.3k
        if (webSocketContextData->closeHandler) {
267
13.3k
            webSocketContextData->closeHandler(this, code, message);
268
13.3k
        }
269
13.3k
        ((USERDATA *) this->getUserData())->~USERDATA();
270
13.3k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
221
10.1k
    void end(int code = 0, std::string_view message = {}) {
222
        /* Check if we already called this one */
223
10.1k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
224
10.1k
        if (webSocketData->isShuttingDown) {
225
0
            return;
226
0
        }
227
228
        /* We postpone any FIN sending to either drainage or uncorking */
229
10.1k
        webSocketData->isShuttingDown = true;
230
231
        /* Format and send the close frame */
232
10.1k
        static const int MAX_CLOSE_PAYLOAD = 123;
233
10.1k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
234
10.1k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
235
10.1k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
236
10.1k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
237
238
        /* FIN if we are ok and not corked */
239
10.1k
        if (!this->isCorked()) {
240
0
            if (ok) {
241
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
242
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
243
0
                this->shutdown();
244
0
            }
245
0
        }
246
247
10.1k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
248
10.1k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
249
10.1k
        );
250
251
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
252
10.1k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
253
254
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
255
10.1k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
256
0
            for (Topic *t : webSocketData->subscriber->topics) {
257
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
258
0
            }
259
0
        }
260
261
        /* Make sure to unsubscribe from any pub/sub node at exit */
262
10.1k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
263
10.1k
        webSocketData->subscriber = nullptr;
264
265
        /* Emit close event */
266
10.1k
        if (webSocketContextData->closeHandler) {
267
9.95k
            webSocketContextData->closeHandler(this, code, message);
268
9.95k
        }
269
10.1k
        ((USERDATA *) this->getUserData())->~USERDATA();
270
10.1k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
221
11.1k
    void end(int code = 0, std::string_view message = {}) {
222
        /* Check if we already called this one */
223
11.1k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
224
11.1k
        if (webSocketData->isShuttingDown) {
225
0
            return;
226
0
        }
227
228
        /* We postpone any FIN sending to either drainage or uncorking */
229
11.1k
        webSocketData->isShuttingDown = true;
230
231
        /* Format and send the close frame */
232
11.1k
        static const int MAX_CLOSE_PAYLOAD = 123;
233
11.1k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
234
11.1k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
235
11.1k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
236
11.1k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
237
238
        /* FIN if we are ok and not corked */
239
11.1k
        if (!this->isCorked()) {
240
3.89k
            if (ok) {
241
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
242
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
243
1.16k
                this->shutdown();
244
1.16k
            }
245
3.89k
        }
246
247
11.1k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
248
11.1k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
249
11.1k
        );
250
251
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
252
11.1k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
253
254
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
255
11.1k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
256
0
            for (Topic *t : webSocketData->subscriber->topics) {
257
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
258
0
            }
259
0
        }
260
261
        /* Make sure to unsubscribe from any pub/sub node at exit */
262
11.1k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
263
11.1k
        webSocketData->subscriber = nullptr;
264
265
        /* Emit close event */
266
11.1k
        if (webSocketContextData->closeHandler) {
267
11.1k
            webSocketContextData->closeHandler(this, code, message);
268
11.1k
        }
269
11.1k
        ((USERDATA *) this->getUserData())->~USERDATA();
270
11.1k
    }
271
272
    /* Corks the response if possible. Leaves already corked socket be. */
273
    void cork(MoveOnlyFunction<void()> &&handler) {
274
        if (!Super::isCorked() && Super::canCork()) {
275
            Super::cork();
276
            handler();
277
278
            /* There is no timeout when failing to uncork for WebSockets,
279
             * as that is handled by idleTimeout */
280
            auto [written, failed] = Super::uncork();
281
            (void)written;
282
            (void)failed;
283
        } else {
284
            /* We are already corked, or can't cork so let's just call the handler */
285
            handler();
286
        }
287
    }
288
289
    /* Subscribe to a topic according to MQTT rules and syntax. Returns success */
290
12.1M
    bool subscribe(std::string_view topic, bool = false) {
291
12.1M
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
292
12.1M
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
293
12.1M
        );
294
295
        /* Make us a subscriber if we aren't yet */
296
12.1M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
297
12.1M
        if (!webSocketData->subscriber) {
298
312k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
299
312k
            webSocketData->subscriber->user = this;
300
312k
        }
301
302
        /* Cannot return numSubscribers as this is only for this particular websocket context */
303
12.1M
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
304
12.1M
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
305
            /* Emit this socket, the topic, new count, old count */
306
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
307
0
        }
308
309
        /* Subscribe always succeeds */
310
12.1M
        return true;
311
12.1M
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool)
Line
Count
Source
290
11.9M
    bool subscribe(std::string_view topic, bool = false) {
291
11.9M
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
292
11.9M
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
293
11.9M
        );
294
295
        /* Make us a subscriber if we aren't yet */
296
11.9M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
297
11.9M
        if (!webSocketData->subscriber) {
298
119k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
299
119k
            webSocketData->subscriber->user = this;
300
119k
        }
301
302
        /* Cannot return numSubscribers as this is only for this particular websocket context */
303
11.9M
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
304
11.9M
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
305
            /* Emit this socket, the topic, new count, old count */
306
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
307
0
        }
308
309
        /* Subscribe always succeeds */
310
11.9M
        return true;
311
11.9M
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool)
Line
Count
Source
290
192k
    bool subscribe(std::string_view topic, bool = false) {
291
192k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
292
192k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
293
192k
        );
294
295
        /* Make us a subscriber if we aren't yet */
296
192k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
297
192k
        if (!webSocketData->subscriber) {
298
192k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
299
192k
            webSocketData->subscriber->user = this;
300
192k
        }
301
302
        /* Cannot return numSubscribers as this is only for this particular websocket context */
303
192k
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
304
192k
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
305
            /* Emit this socket, the topic, new count, old count */
306
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
307
0
        }
308
309
        /* Subscribe always succeeds */
310
192k
        return true;
311
192k
    }
312
313
    /* Unsubscribe from a topic, returns true if we were subscribed. */
314
569
    bool unsubscribe(std::string_view topic, bool = false) {
315
569
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
316
569
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
317
569
        );
318
319
569
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
320
        
321
569
        if (!webSocketData->subscriber) { return false; }
322
323
        /* Cannot return numSubscribers as this is only for this particular websocket context */
324
569
        auto [ok, last, newCount] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic);
325
        /* Emit subscription event if last */
326
569
        if (ok && webSocketContextData->subscriptionHandler) {
327
0
            webSocketContextData->subscriptionHandler(this, topic, newCount, newCount + 1);
328
0
        }
329
330
        /* Leave us as subscribers even if we subscribe to nothing (last unsubscribed topic might miss its message otherwise) */
331
332
569
        return ok;
333
569
    }
334
335
    /* Returns whether this socket is subscribed to the specified topic */
336
    bool isSubscribed(std::string_view topic) {
337
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
338
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
339
        );
340
341
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
342
        if (!webSocketData->subscriber) {
343
            return false;
344
        }
345
346
        Topic *topicPtr = webSocketContextData->topicTree->lookupTopic(topic);
347
        if (!topicPtr) {
348
            return false;
349
        }
350
351
        return topicPtr->count(webSocketData->subscriber);
352
    }
353
354
    /* Iterates all topics of this WebSocket. Every topic is represented by its full name.
355
     * Can be called in close handler. It is possible to modify the subscription list while
356
     * inside the callback ONLY IF not modifying the topic passed to the callback.
357
     * Topic names are valid only for the duration of the callback. */
358
    void iterateTopics(MoveOnlyFunction<void(std::string_view)> cb) {
359
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
360
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
361
        );
362
363
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
364
        if (webSocketData->subscriber) {
365
            /* Lock this subscriber for unsubscription / subscription */
366
            webSocketContextData->topicTree->iteratingSubscriber = webSocketData->subscriber;
367
368
            for (Topic *topicPtr : webSocketData->subscriber->topics) {
369
                cb({topicPtr->name.data(), topicPtr->name.length()});
370
            }
371
372
            /* Unlock subscriber */
373
            webSocketContextData->topicTree->iteratingSubscriber = nullptr;
374
        }
375
    }
376
377
    /* Publish a message to a topic according to MQTT rules and syntax. Returns success.
378
     * We, the WebSocket, must be subscribed to the topic itself and if so - no message will be sent to ourselves.
379
     * Use App::publish for an unconditional publish that simply publishes to whomever might be subscribed. */
380
160k
    bool publish(std::string_view topic, std::string_view message, OpCode opCode = OpCode::TEXT, bool compress = false) {
381
160k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
382
160k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
383
160k
        );
384
385
        /* We cannot be a subscriber of this topic if we are not a subscriber of anything */
386
160k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
387
160k
        if (!webSocketData->subscriber) {
388
            /* Failure, but still do return the number of subscribers */
389
0
            return false;
390
0
        }
391
392
        /* Publish as sender, does not receive its own messages even if subscribed to relevant topics */
393
160k
        if (message.length() >= LoopData::CORK_BUFFER_SIZE) {
394
0
            return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) {
395
0
                auto *ws = (WebSocket<SSL, true, int> *) s->user;
396
397
0
                ws->send(message.message, (OpCode)message.opCode, message.compress);
398
0
            });
399
160k
        } else {
400
160k
            return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress});
401
160k
        }
402
160k
    }
403
};
404
405
}
406
407
#endif // UWS_WEBSOCKET_H