Coverage Report

Created: 2025-10-14 06:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/uWebSockets/src/WebSocket.h
Line
Count
Source
1
/*
2
 * Authored by Alex Hultman, 2018-2021.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_WEBSOCKET_H
19
#define UWS_WEBSOCKET_H
20
21
#include "WebSocketData.h"
22
#include "WebSocketProtocol.h"
23
#include "AsyncSocket.h"
24
#include "WebSocketContextData.h"
25
26
#include <string_view>
27
28
namespace uWS {
29
30
/* Experimental */
31
enum CompressFlags : int {
32
    NO_ACTION,
33
    COMPRESS,
34
    ALREADY_COMPRESSED
35
};
36
37
template <bool SSL, bool isServer, typename USERDATA>
38
struct WebSocket : AsyncSocket<SSL> {
39
    template <bool> friend struct TemplatedApp;
40
    template <bool> friend struct HttpResponse;
41
private:
42
    typedef AsyncSocket<SSL> Super;
43
44
286k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
286k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
286k
        return this;
47
286k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
107k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
107k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
107k
        return this;
47
107k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
35.7k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
35.7k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
35.7k
        return this;
47
35.7k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::init(bool, uWS::CompressOptions, uWS::BackPressure&&)
Line
Count
Source
44
142k
    void *init(bool perMessageDeflate, CompressOptions compressOptions, BackPressure &&backpressure) {
45
142k
        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, compressOptions, std::move(backpressure));
46
142k
        return this;
47
142k
    }
48
public:
49
50
    /* Returns pointer to the per socket user data */
51
1.75M
    USERDATA *getUserData() {
52
1.75M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
1.75M
        return (USERDATA *) (webSocketData + 1);
55
1.75M
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
1.31M
    USERDATA *getUserData() {
52
1.31M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
1.31M
        return (USERDATA *) (webSocketData + 1);
55
1.31M
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
71.4k
    USERDATA *getUserData() {
52
71.4k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
71.4k
        return (USERDATA *) (webSocketData + 1);
55
71.4k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::getUserData()
Line
Count
Source
51
368k
    USERDATA *getUserData() {
52
368k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
53
        /* We just have it overallocated by sizeof type */
54
368k
        return (USERDATA *) (webSocketData + 1);
55
368k
    }
56
57
    /* See AsyncSocket */
58
    using Super::getBufferedAmount;
59
    using Super::getRemoteAddress;
60
    using Super::getRemoteAddressAsText;
61
    using Super::getNativeHandle;
62
63
    /* WebSocket close cannot be an alias to AsyncSocket::close since
64
     * we need to check first if it was shut down by remote peer */
65
106k
    us_socket_t *close() {
66
106k
        if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
67
104k
            return nullptr;
68
104k
        }
69
2.30k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
70
2.30k
        if (webSocketData->isShuttingDown) {
71
1.70k
            return nullptr;
72
1.70k
        }
73
74
598
        return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr);
75
2.30k
    }
76
77
    enum SendStatus : int {
78
        BACKPRESSURE,
79
        SUCCESS,
80
        DROPPED
81
    };
82
83
    /* Sending fragmented messages puts a bit of effort on the user; you must not interleave regular sends
84
     * with fragmented sends and you must sendFirstFragment, [sendFragment], then finally sendLastFragment. */
85
    SendStatus sendFirstFragment(std::string_view message, OpCode opCode = OpCode::BINARY, bool compress = false) {
86
        return send(message, opCode, compress, false);
87
    }
88
89
    SendStatus sendFragment(std::string_view message, bool compress = false) {
90
        return send(message, CONTINUATION, compress, false);
91
    }
92
93
    SendStatus sendLastFragment(std::string_view message, bool compress = false) {
94
        return send(message, CONTINUATION, compress, true);
95
    }
96
97
    /* Experimental */
98
    bool hasNegotiatedCompression() {
99
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
100
        return webSocketData->compressionStatus == WebSocketData::ENABLED;
101
    }
102
103
    /* Experimental */
104
    SendStatus sendPrepared(PreparedMessage &preparedMessage) {
105
        if (preparedMessage.compressed && hasNegotiatedCompression() && preparedMessage.compressedMessage.length() < preparedMessage.originalMessage.length()) {
106
            return send({preparedMessage.compressedMessage.data(), preparedMessage.compressedMessage.length()}, (OpCode) preparedMessage.opCode, uWS::CompressFlags::ALREADY_COMPRESSED);
107
        }
108
        return send({preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, (OpCode) preparedMessage.opCode);
109
    }
110
111
    /* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure,
112
     * DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */
113
216k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
216k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
216k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
216k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
216k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
1.96k
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
1.96k
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
1.96k
            return DROPPED;
131
1.96k
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
214k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
214k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
214k
        } else {
157
158
214k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
152k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
152k
            }
162
163
            /* Transform the message to compressed domain if requested */
164
214k
            if (compress) {
165
137k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
137k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
11.5k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
11.5k
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
11.5k
                        if (webSocketData->deflationStream) {
174
7.65k
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
7.65k
                        } else {
176
3.90k
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
3.90k
                        }
178
11.5k
                    }
179
125k
                } else {
180
125k
                    compress = false;
181
125k
                }
182
137k
            }
183
184
            /* Get size, allocate size, write if needed */
185
214k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
214k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
214k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
214k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
161k
                auto[written, failed] = Super::write(nullptr, 0);
193
161k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
153k
                    return BACKPRESSURE;
196
153k
                }
197
161k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
3.05k
                auto [written, failed] = Super::uncork();
200
3.05k
                if (failed) {
201
1.28k
                    return BACKPRESSURE;
202
1.28k
                }
203
3.05k
            }
204
205
214k
        }
206
207
        /* Every successful send resets the timeout */
208
59.6k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
8.86k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
8.86k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
8.86k
            webSocketData->hasTimedOut = false;
212
8.86k
        }
213
214
        /* Return success */
215
59.6k
        return SUCCESS;
216
214k
    }
uWS::WebSocket<true, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
113
28.5k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
28.5k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
28.5k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
28.5k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
28.5k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
1.04k
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
1.04k
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
1.04k
            return DROPPED;
131
1.04k
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
27.4k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
27.4k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
27.4k
        } else {
157
158
27.4k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
27.4k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
27.4k
            }
162
163
            /* Transform the message to compressed domain if requested */
164
27.4k
            if (compress) {
165
0
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
0
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
0
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
0
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
0
                        if (webSocketData->deflationStream) {
174
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
0
                        } else {
176
0
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
0
                        }
178
0
                    }
179
0
                } else {
180
0
                    compress = false;
181
0
                }
182
0
            }
183
184
            /* Get size, allocate size, write if needed */
185
27.4k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
27.4k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
27.4k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
27.4k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
22.6k
                auto[written, failed] = Super::write(nullptr, 0);
193
22.6k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
21.8k
                    return BACKPRESSURE;
196
21.8k
                }
197
22.6k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
0
                auto [written, failed] = Super::uncork();
200
0
                if (failed) {
201
0
                    return BACKPRESSURE;
202
0
                }
203
0
            }
204
205
27.4k
        }
206
207
        /* Every successful send resets the timeout */
208
5.61k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
5.61k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
5.61k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
5.61k
            webSocketData->hasTimedOut = false;
212
5.61k
        }
213
214
        /* Return success */
215
5.61k
        return SUCCESS;
216
27.4k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
113
15.9k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
15.9k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
15.9k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
15.9k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
15.9k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
460
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
460
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
460
            return DROPPED;
131
460
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
15.4k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
15.4k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
15.4k
        } else {
157
158
15.4k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
15.4k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
15.4k
            }
162
163
            /* Transform the message to compressed domain if requested */
164
15.4k
            if (compress) {
165
0
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
0
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
0
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
0
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
0
                        if (webSocketData->deflationStream) {
174
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
0
                        } else {
176
0
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
0
                        }
178
0
                    }
179
0
                } else {
180
0
                    compress = false;
181
0
                }
182
0
            }
183
184
            /* Get size, allocate size, write if needed */
185
15.4k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
15.4k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
15.4k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
15.4k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
13.2k
                auto[written, failed] = Super::write(nullptr, 0);
193
13.2k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
11.8k
                    return BACKPRESSURE;
196
11.8k
                }
197
13.2k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
1.35k
                auto [written, failed] = Super::uncork();
200
1.35k
                if (failed) {
201
402
                    return BACKPRESSURE;
202
402
                }
203
1.35k
            }
204
205
15.4k
        }
206
207
        /* Every successful send resets the timeout */
208
3.25k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
3.25k
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
3.25k
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
3.25k
            webSocketData->hasTimedOut = false;
212
3.25k
        }
213
214
        /* Return success */
215
3.25k
        return SUCCESS;
216
15.4k
    }
uWS::WebSocket<false, true, int>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
113
106k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
106k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
106k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
106k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
106k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
0
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
0
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
0
            return DROPPED;
131
0
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
106k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
106k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
106k
        } else {
157
158
106k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
106k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
106k
            }
162
163
            /* Transform the message to compressed domain if requested */
164
106k
            if (compress) {
165
106k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
106k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
194
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
194
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
194
                        if (webSocketData->deflationStream) {
174
153
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
153
                        } else {
176
41
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
41
                        }
178
194
                    }
179
106k
                } else {
180
106k
                    compress = false;
181
106k
                }
182
106k
            }
183
184
            /* Get size, allocate size, write if needed */
185
106k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
106k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
106k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
106k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
78.3k
                auto[written, failed] = Super::write(nullptr, 0);
193
78.3k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
76.1k
                    return BACKPRESSURE;
196
76.1k
                }
197
78.3k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
0
                auto [written, failed] = Super::uncork();
200
0
                if (failed) {
201
0
                    return BACKPRESSURE;
202
0
                }
203
0
            }
204
205
106k
        }
206
207
        /* Every successful send resets the timeout */
208
30.7k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
0
            webSocketData->hasTimedOut = false;
212
0
        }
213
214
        /* Return success */
215
30.7k
        return SUCCESS;
216
106k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
113
25.3k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
25.3k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
25.3k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
25.3k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
25.3k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
453
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
453
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
453
            return DROPPED;
131
453
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
24.9k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
24.9k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
24.9k
        } else {
157
158
24.9k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
0
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
0
            }
162
163
            /* Transform the message to compressed domain if requested */
164
24.9k
            if (compress) {
165
10.9k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
10.9k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
2.78k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
2.78k
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
2.78k
                        if (webSocketData->deflationStream) {
174
0
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
2.78k
                        } else {
176
2.78k
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
2.78k
                        }
178
2.78k
                    }
179
8.15k
                } else {
180
8.15k
                    compress = false;
181
8.15k
                }
182
10.9k
            }
183
184
            /* Get size, allocate size, write if needed */
185
24.9k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
24.9k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
24.9k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
24.9k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
18.6k
                auto[written, failed] = Super::write(nullptr, 0);
193
18.6k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
16.9k
                    return BACKPRESSURE;
196
16.9k
                }
197
18.6k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
0
                auto [written, failed] = Super::uncork();
200
0
                if (failed) {
201
0
                    return BACKPRESSURE;
202
0
                }
203
0
            }
204
205
24.9k
        }
206
207
        /* Every successful send resets the timeout */
208
7.96k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
0
            webSocketData->hasTimedOut = false;
212
0
        }
213
214
        /* Return success */
215
7.96k
        return SUCCESS;
216
24.9k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::send(std::__1::basic_string_view<char, std::__1::char_traits<char> >, uWS::OpCode, int, bool)
Line
Count
Source
113
39.9k
    SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {
114
39.9k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
115
39.9k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
116
39.9k
        );
117
118
        /* Skip sending and report success if we are over the limit of maxBackpressure */
119
39.9k
        if (webSocketContextData->maxBackpressure && webSocketContextData->maxBackpressure < getBufferedAmount()) {
120
            /* Also defer a close if we should */
121
0
            if (webSocketContextData->closeOnBackpressureLimit) {
122
0
                us_socket_shutdown_read(SSL, (us_socket_t *) this);
123
0
            }
124
125
            /* It is okay to call send again from within this callback since we immediately return with DROPPED afterwards */
126
0
            if (webSocketContextData->droppedHandler) {
127
0
                webSocketContextData->droppedHandler(this, message, opCode);
128
0
            }
129
130
0
            return DROPPED;
131
0
        }
132
133
        /* If we are subscribers and have messages to drain we need to drain them here to stay synced */
134
39.9k
        WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
135
136
        /* Special path for long sends of non-compressed, non-SSL messages */
137
39.9k
        if (message.length() >= 16 * 1024 && !compress && !SSL && !webSocketData->subscriber && getBufferedAmount() == 0 && Super::getLoopData()->corkOffset == 0) {
138
0
            char header[10];
139
0
            int header_length = (int) protocol::formatMessage<isServer>(header, "", 0, opCode, message.length(), compress, fin);
140
0
            int written = us_socket_write2(0, (struct us_socket_t *)this, header, header_length, message.data(), (int) message.length());
141
        
142
0
            if (written != header_length + (int) message.length()) {
143
                /* Buffer up backpressure */
144
0
                if (written > header_length) {
145
0
                    webSocketData->buffer.append(message.data() + written - header_length, message.length() - (size_t) (written - header_length));
146
0
                } else {
147
0
                    webSocketData->buffer.append(header + written, (size_t) header_length - (size_t) written);
148
0
                    webSocketData->buffer.append(message.data(), message.length());
149
0
                }
150
                /* We cannot still be corked if we have backpressure.
151
                 * We also cannot uncork normally since it will re-write the already buffered
152
                 * up backpressure again. */
153
0
                Super::uncorkWithoutSending();
154
0
                return BACKPRESSURE;
155
0
            }
156
39.9k
        } else {
157
158
39.9k
            if (webSocketData->subscriber) {
159
                /* This will call back into us, send. */
160
2.68k
                webSocketContextData->topicTree->drain(webSocketData->subscriber);
161
2.68k
            }
162
163
            /* Transform the message to compressed domain if requested */
164
39.9k
            if (compress) {
165
19.2k
                WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
166
167
                /* Check and correct the compress hint. It is never valid to compress 0 bytes */
168
19.2k
                if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) {
169
                    /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */
170
8.58k
                    if (compress != CompressFlags::ALREADY_COMPRESSED) {
171
8.58k
                        LoopData *loopData = Super::getLoopData();
172
                        /* Compress using either shared or dedicated deflationStream */
173
8.58k
                        if (webSocketData->deflationStream) {
174
7.50k
                            message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false);
175
7.50k
                        } else {
176
1.08k
                            message = loopData->deflationStream->deflate(loopData->zlibContext, message, true);
177
1.08k
                        }
178
8.58k
                    }
179
10.7k
                } else {
180
10.7k
                    compress = false;
181
10.7k
                }
182
19.2k
            }
183
184
            /* Get size, allocate size, write if needed */
185
39.9k
            size_t messageFrameSize = protocol::messageFrameSize(message.length());
186
39.9k
            auto [sendBuffer, sendBufferAttribute] = Super::getSendBuffer(messageFrameSize);
187
39.9k
            protocol::formatMessage<isServer>(sendBuffer, message.data(), message.length(), opCode, message.length(), compress, fin);
188
189
            /* Depending on size of message we have different paths */
190
39.9k
            if (sendBufferAttribute == SendBufferAttribute::NEEDS_DRAIN) {
191
                /* This is a drain */
192
28.9k
                auto[written, failed] = Super::write(nullptr, 0);
193
28.9k
                if (failed) {
194
                    /* Return false for failure, skipping to reset the timeout below */
195
26.9k
                    return BACKPRESSURE;
196
26.9k
                }
197
28.9k
            } else if (sendBufferAttribute == SendBufferAttribute::NEEDS_UNCORK) {
198
                /* Uncork if we came here uncorked */
199
1.69k
                auto [written, failed] = Super::uncork();
200
1.69k
                if (failed) {
201
883
                    return BACKPRESSURE;
202
883
                }
203
1.69k
            }
204
205
39.9k
        }
206
207
        /* Every successful send resets the timeout */
208
12.1k
        if (webSocketContextData->resetIdleTimeoutOnSend) {
209
0
            Super::timeout(webSocketContextData->idleTimeoutComponents.first);
210
0
            WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData();
211
0
            webSocketData->hasTimedOut = false;
212
0
        }
213
214
        /* Return success */
215
12.1k
        return SUCCESS;
216
39.9k
    }
217
218
    /* Send websocket close frame, emit close event, send FIN if successful.
219
     * Will not append a close reason if code is 0 or 1005. */
220
26.7k
    void end(int code = 0, std::string_view message = {}) {
221
        /* Check if we already called this one */
222
26.7k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
223
26.7k
        if (webSocketData->isShuttingDown) {
224
0
            return;
225
0
        }
226
227
        /* We postpone any FIN sending to either drainage or uncorking */
228
26.7k
        webSocketData->isShuttingDown = true;
229
230
        /* Format and send the close frame */
231
26.7k
        static const int MAX_CLOSE_PAYLOAD = 123;
232
26.7k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
233
26.7k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
234
26.7k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
235
26.7k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
236
237
        /* FIN if we are ok and not corked */
238
26.7k
        if (!this->isCorked()) {
239
3.33k
            if (ok) {
240
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
241
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
242
1.15k
                this->shutdown();
243
1.15k
            }
244
3.33k
        }
245
246
26.7k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
247
26.7k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
248
26.7k
        );
249
250
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
251
26.7k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
252
253
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
254
26.7k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
255
0
            for (Topic *t : webSocketData->subscriber->topics) {
256
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
257
0
            }
258
0
        }
259
260
        /* Make sure to unsubscribe from any pub/sub node at exit */
261
26.7k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
262
26.7k
        webSocketData->subscriber = nullptr;
263
264
        /* Emit close event */
265
26.7k
        if (webSocketContextData->closeHandler) {
266
26.5k
            webSocketContextData->closeHandler(this, code, message);
267
26.5k
        }
268
26.7k
        ((USERDATA *) this->getUserData())->~USERDATA();
269
26.7k
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
220
11.5k
    void end(int code = 0, std::string_view message = {}) {
221
        /* Check if we already called this one */
222
11.5k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
223
11.5k
        if (webSocketData->isShuttingDown) {
224
0
            return;
225
0
        }
226
227
        /* We postpone any FIN sending to either drainage or uncorking */
228
11.5k
        webSocketData->isShuttingDown = true;
229
230
        /* Format and send the close frame */
231
11.5k
        static const int MAX_CLOSE_PAYLOAD = 123;
232
11.5k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
233
11.5k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
234
11.5k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
235
11.5k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
236
237
        /* FIN if we are ok and not corked */
238
11.5k
        if (!this->isCorked()) {
239
396
            if (ok) {
240
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
241
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
242
196
                this->shutdown();
243
196
            }
244
396
        }
245
246
11.5k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
247
11.5k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
248
11.5k
        );
249
250
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
251
11.5k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
252
253
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
254
11.5k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
255
0
            for (Topic *t : webSocketData->subscriber->topics) {
256
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
257
0
            }
258
0
        }
259
260
        /* Make sure to unsubscribe from any pub/sub node at exit */
261
11.5k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
262
11.5k
        webSocketData->subscriber = nullptr;
263
264
        /* Emit close event */
265
11.5k
        if (webSocketContextData->closeHandler) {
266
11.5k
            webSocketContextData->closeHandler(this, code, message);
267
11.5k
        }
268
11.5k
        ((USERDATA *) this->getUserData())->~USERDATA();
269
11.5k
    }
EpollHelloWorld.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
220
6.37k
    void end(int code = 0, std::string_view message = {}) {
221
        /* Check if we already called this one */
222
6.37k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
223
6.37k
        if (webSocketData->isShuttingDown) {
224
0
            return;
225
0
        }
226
227
        /* We postpone any FIN sending to either drainage or uncorking */
228
6.37k
        webSocketData->isShuttingDown = true;
229
230
        /* Format and send the close frame */
231
6.37k
        static const int MAX_CLOSE_PAYLOAD = 123;
232
6.37k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
233
6.37k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
234
6.37k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
235
6.37k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
236
237
        /* FIN if we are ok and not corked */
238
6.37k
        if (!this->isCorked()) {
239
0
            if (ok) {
240
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
241
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
242
0
                this->shutdown();
243
0
            }
244
0
        }
245
246
6.37k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
247
6.37k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
248
6.37k
        );
249
250
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
251
6.37k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
252
253
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
254
6.37k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
255
0
            for (Topic *t : webSocketData->subscriber->topics) {
256
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
257
0
            }
258
0
        }
259
260
        /* Make sure to unsubscribe from any pub/sub node at exit */
261
6.37k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
262
6.37k
        webSocketData->subscriber = nullptr;
263
264
        /* Emit close event */
265
6.37k
        if (webSocketContextData->closeHandler) {
266
6.17k
            webSocketContextData->closeHandler(this, code, message);
267
6.17k
        }
268
6.37k
        ((USERDATA *) this->getUserData())->~USERDATA();
269
6.37k
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::end(int, std::__1::basic_string_view<char, std::__1::char_traits<char> >)
Line
Count
Source
220
8.88k
    void end(int code = 0, std::string_view message = {}) {
221
        /* Check if we already called this one */
222
8.88k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
223
8.88k
        if (webSocketData->isShuttingDown) {
224
0
            return;
225
0
        }
226
227
        /* We postpone any FIN sending to either drainage or uncorking */
228
8.88k
        webSocketData->isShuttingDown = true;
229
230
        /* Format and send the close frame */
231
8.88k
        static const int MAX_CLOSE_PAYLOAD = 123;
232
8.88k
        size_t length = std::min<size_t>(MAX_CLOSE_PAYLOAD, message.length());
233
8.88k
        char closePayload[MAX_CLOSE_PAYLOAD + 2];
234
8.88k
        size_t closePayloadLength = protocol::formatClosePayload(closePayload, (uint16_t) code, message.data(), length);
235
8.88k
        bool ok = send(std::string_view(closePayload, closePayloadLength), OpCode::CLOSE);
236
237
        /* FIN if we are ok and not corked */
238
8.88k
        if (!this->isCorked()) {
239
2.94k
            if (ok) {
240
                /* If we are not corked, and we just sent off everything, we need to FIN right here.
241
                 * In all other cases, we need to fin either if uncork was successful, or when drainage is complete. */
242
962
                this->shutdown();
243
962
            }
244
2.94k
        }
245
246
8.88k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
247
8.88k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
248
8.88k
        );
249
250
        /* Set shorter timeout (use ping-timeout) to avoid long hanging sockets after end() on broken connections */
251
8.88k
        Super::timeout(webSocketContextData->idleTimeoutComponents.second);
252
253
        /* At this point we iterate all currently held subscriptions and emit an event for all of them */
254
8.88k
        if (webSocketData->subscriber && webSocketContextData->subscriptionHandler) {
255
0
            for (Topic *t : webSocketData->subscriber->topics) {
256
0
                webSocketContextData->subscriptionHandler(this, t->name, (int) t->size() - 1, (int) t->size());
257
0
            }
258
0
        }
259
260
        /* Make sure to unsubscribe from any pub/sub node at exit */
261
8.88k
        webSocketContextData->topicTree->freeSubscriber(webSocketData->subscriber);
262
8.88k
        webSocketData->subscriber = nullptr;
263
264
        /* Emit close event */
265
8.88k
        if (webSocketContextData->closeHandler) {
266
8.88k
            webSocketContextData->closeHandler(this, code, message);
267
8.88k
        }
268
8.88k
        ((USERDATA *) this->getUserData())->~USERDATA();
269
8.88k
    }
270
271
    /* Corks the response if possible. Leaves already corked socket be. */
272
    void cork(MoveOnlyFunction<void()> &&handler) {
273
        if (!Super::isCorked() && Super::canCork()) {
274
            Super::cork();
275
            handler();
276
277
            /* There is no timeout when failing to uncork for WebSockets,
278
             * as that is handled by idleTimeout */
279
            auto [written, failed] = Super::uncork();
280
            (void)written;
281
            (void)failed;
282
        } else {
283
            /* We are already corked, or can't cork so let's just call the handler */
284
            handler();
285
        }
286
    }
287
288
    /* Subscribe to a topic according to MQTT rules and syntax. Returns success */
289
10.8M
    bool subscribe(std::string_view topic, bool = false) {
290
10.8M
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
291
10.8M
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
292
10.8M
        );
293
294
        /* Make us a subscriber if we aren't yet */
295
10.8M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
296
10.8M
        if (!webSocketData->subscriber) {
297
213k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
298
213k
            webSocketData->subscriber->user = this;
299
213k
        }
300
301
        /* Cannot return numSubscribers as this is only for this particular websocket context */
302
10.8M
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
303
10.8M
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
304
            /* Emit this socket, the topic, new count, old count */
305
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
306
0
        }
307
308
        /* Subscribe always succeeds */
309
10.8M
        return true;
310
10.8M
    }
EpollEchoServerPubSub.cpp:uWS::WebSocket<true, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool)
Line
Count
Source
289
10.7M
    bool subscribe(std::string_view topic, bool = false) {
290
10.7M
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
291
10.7M
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
292
10.7M
        );
293
294
        /* Make us a subscriber if we aren't yet */
295
10.7M
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
296
10.7M
        if (!webSocketData->subscriber) {
297
107k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
298
107k
            webSocketData->subscriber->user = this;
299
107k
        }
300
301
        /* Cannot return numSubscribers as this is only for this particular websocket context */
302
10.7M
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
303
10.7M
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
304
            /* Emit this socket, the topic, new count, old count */
305
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
306
0
        }
307
308
        /* Subscribe always succeeds */
309
10.7M
        return true;
310
10.7M
    }
EpollEchoServer.cpp:uWS::WebSocket<false, true, test()::PerSocketData>::subscribe(std::__1::basic_string_view<char, std::__1::char_traits<char> >, bool)
Line
Count
Source
289
105k
    bool subscribe(std::string_view topic, bool = false) {
290
105k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
291
105k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
292
105k
        );
293
294
        /* Make us a subscriber if we aren't yet */
295
105k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
296
105k
        if (!webSocketData->subscriber) {
297
105k
            webSocketData->subscriber = webSocketContextData->topicTree->createSubscriber();
298
105k
            webSocketData->subscriber->user = this;
299
105k
        }
300
301
        /* Cannot return numSubscribers as this is only for this particular websocket context */
302
105k
        Topic *topicOrNull = webSocketContextData->topicTree->subscribe(webSocketData->subscriber, topic);
303
105k
        if (topicOrNull && webSocketContextData->subscriptionHandler) {
304
            /* Emit this socket, the topic, new count, old count */
305
0
            webSocketContextData->subscriptionHandler(this, topic, (int) topicOrNull->size(), (int) topicOrNull->size() - 1);
306
0
        }
307
308
        /* Subscribe always succeeds */
309
105k
        return true;
310
105k
    }
311
312
    /* Unsubscribe from a topic, returns true if we were subscribed. */
313
562
    bool unsubscribe(std::string_view topic, bool = false) {
314
562
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
315
562
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
316
562
        );
317
318
562
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
319
        
320
562
        if (!webSocketData->subscriber) { return false; }
321
322
        /* Cannot return numSubscribers as this is only for this particular websocket context */
323
562
        auto [ok, last, newCount] = webSocketContextData->topicTree->unsubscribe(webSocketData->subscriber, topic);
324
        /* Emit subscription event if last */
325
562
        if (ok && webSocketContextData->subscriptionHandler) {
326
0
            webSocketContextData->subscriptionHandler(this, topic, newCount, newCount + 1);
327
0
        }
328
329
        /* Leave us as subscribers even if we subscribe to nothing (last unsubscribed topic might miss its message otherwise) */
330
331
562
        return ok;
332
562
    }
333
334
    /* Returns whether this socket is subscribed to the specified topic */
335
    bool isSubscribed(std::string_view topic) {
336
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
337
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
338
        );
339
340
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
341
        if (!webSocketData->subscriber) {
342
            return false;
343
        }
344
345
        Topic *topicPtr = webSocketContextData->topicTree->lookupTopic(topic);
346
        if (!topicPtr) {
347
            return false;
348
        }
349
350
        return topicPtr->count(webSocketData->subscriber);
351
    }
352
353
    /* Iterates all topics of this WebSocket. Every topic is represented by its full name.
354
     * Can be called in close handler. It is possible to modify the subscription list while
355
     * inside the callback ONLY IF not modifying the topic passed to the callback.
356
     * Topic names are valid only for the duration of the callback. */
357
    void iterateTopics(MoveOnlyFunction<void(std::string_view)> cb) {
358
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
359
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
360
        );
361
362
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
363
        if (webSocketData->subscriber) {
364
            /* Lock this subscriber for unsubscription / subscription */
365
            webSocketContextData->topicTree->iteratingSubscriber = webSocketData->subscriber;
366
367
            for (Topic *topicPtr : webSocketData->subscriber->topics) {
368
                cb({topicPtr->name.data(), topicPtr->name.length()});
369
            }
370
371
            /* Unlock subscriber */
372
            webSocketContextData->topicTree->iteratingSubscriber = nullptr;
373
        }
374
    }
375
376
    /* Publish a message to a topic according to MQTT rules and syntax. Returns success.
377
     * We, the WebSocket, must be subscribed to the topic itself and if so - no message will be sent to ourselves.
378
     * Use App::publish for an unconditional publish that simply publishes to whomever might be subscribed. */
379
45.5k
    bool publish(std::string_view topic, std::string_view message, OpCode opCode = OpCode::TEXT, bool compress = false) {
380
45.5k
        WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL,
381
45.5k
            (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
382
45.5k
        );
383
384
        /* We cannot be a subscriber of this topic if we are not a subscriber of anything */
385
45.5k
        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
386
45.5k
        if (!webSocketData->subscriber) {
387
            /* Failure, but still do return the number of subscribers */
388
0
            return false;
389
0
        }
390
391
        /* Publish as sender, does not receive its own messages even if subscribed to relevant topics */
392
45.5k
        if (message.length() >= LoopData::CORK_BUFFER_SIZE) {
393
0
            return webSocketContextData->topicTree->publishBig(webSocketData->subscriber, topic, {message, opCode, compress}, [](Subscriber *s, TopicTreeBigMessage &message) {
394
0
                auto *ws = (WebSocket<SSL, true, int> *) s->user;
395
396
0
                ws->send(message.message, (OpCode)message.opCode, message.compress);
397
0
            });
398
45.5k
        } else {
399
45.5k
            return webSocketContextData->topicTree->publish(webSocketData->subscriber, topic, {std::string(message), opCode, compress});
400
45.5k
        }
401
45.5k
    }
402
};
403
404
}
405
406
#endif // UWS_WEBSOCKET_H