Coverage Report

Created: 2024-04-25 06:10

/src/uWebSockets/src/AsyncSocket.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2020.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_ASYNCSOCKET_H
19
#define UWS_ASYNCSOCKET_H
20
21
/* This class implements async socket memory management strategies */
22
23
/* NOTE: Many unsigned/signed conversion warnings could be solved by moving from int length
24
 * to unsigned length for everything to/from uSockets - this would however remove the opportunity
25
 * to signal error with -1 (which is how the entire UNIX syscalling is built). */
26
27
#include <cstring>
28
#include <iostream>
29
30
#include "libusockets.h"
31
32
#include "LoopData.h"
33
#include "AsyncSocketData.h"
34
35
namespace uWS {
36
37
    enum SendBufferAttribute {
38
        NEEDS_NOTHING,
39
        NEEDS_DRAIN,
40
        NEEDS_UNCORK
41
    };
42
43
    template <bool, bool, typename> struct WebSocketContext;
44
45
template <bool SSL>
46
struct AsyncSocket {
47
    /* This guy is promiscuous */
48
    template <bool> friend struct HttpContext;
49
    template <bool, bool, typename> friend struct WebSocketContext;
50
    template <bool> friend struct TemplatedApp;
51
    template <bool, typename> friend struct WebSocketContextData;
52
    template <typename, typename> friend struct TopicTree;
53
    template <bool> friend struct HttpResponse;
54
55
private:
56
    /* Helper, do not use directly (todo: move to uSockets or de-crazify) */
57
    void throttle_helper(int toggle) {
58
        /* These should be exposed by uSockets */
59
        static thread_local int us_events[2] = {0, 0};
60
61
        struct us_poll_t *p = (struct us_poll_t *) this;
62
        struct us_loop_t *loop = us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this));
63
64
        if (toggle) {
65
            /* Pause */
66
            int events = us_poll_events(p);
67
            if (events) {
68
                us_events[getBufferedAmount() ? 1 : 0] = events;
69
            }
70
            us_poll_change(p, loop, 0);
71
        } else {
72
            /* Resume */
73
            int events = us_events[getBufferedAmount() ? 1 : 0];
74
            us_poll_change(p, loop, events);
75
        }
76
    }
77
78
protected:
79
    /* Returns SSL pointer or FD as pointer */
80
39.0k
    void *getNativeHandle() {
81
39.0k
        return us_socket_get_native_handle(SSL, (us_socket_t *) this);
82
39.0k
    }
83
84
    /* Get loop data for socket */
85
3.03M
    LoopData *getLoopData() {
86
3.03M
        return (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this)));
87
3.03M
    }
88
89
    /* Get socket extension */
90
2.16M
    AsyncSocketData<SSL> *getAsyncSocketData() {
91
2.16M
        return (AsyncSocketData<SSL> *) us_socket_ext(SSL, (us_socket_t *) this);
92
2.16M
    }
93
94
    /* Socket timeout */
95
174k
    void timeout(unsigned int seconds) {
96
174k
        us_socket_timeout(SSL, (us_socket_t *) this, seconds);
97
174k
    }
98
99
    /* Shutdown socket without any automatic drainage */
100
2.25k
    void shutdown() {
101
2.25k
        us_socket_shutdown(SSL, (us_socket_t *) this);
102
2.25k
    }
103
104
    /* Experimental pause */
105
    us_socket_t *pause() {
106
        throttle_helper(1);
107
        return (us_socket_t *) this;
108
    }
109
110
    /* Experimental resume */
111
    us_socket_t *resume() {
112
        throttle_helper(0);
113
        return (us_socket_t *) this;
114
    }
115
116
    /* Immediately close socket */
117
34.6k
    us_socket_t *close() {
118
34.6k
        return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr);
119
34.6k
    }
120
121
40.1k
    void corkUnchecked() {
122
        /* What if another socket is corked? */
123
40.1k
        getLoopData()->corkedSocket = this;
124
40.1k
    }
125
126
0
    void uncorkWithoutSending() {
127
0
        if (isCorked()) {
128
0
            getLoopData()->corkedSocket = nullptr;
129
0
        }
130
0
    }
131
132
    /* Cork this socket. Only one socket may ever be corked per-loop at any given time */
133
440k
    void cork() {
134
        /* Extra check for invalid corking of others */
135
440k
        if (getLoopData()->corkOffset && getLoopData()->corkedSocket != this) {
136
0
            std::cerr << "Error: Cork buffer must not be acquired without checking canCork!" << std::endl;
137
0
            std::terminate();
138
0
        }
139
140
        /* What if another socket is corked? */
141
440k
        getLoopData()->corkedSocket = this;
142
440k
    }
143
144
    /* Returns wheter we are corked or not */
145
135k
    bool isCorked() {
146
135k
        return getLoopData()->corkedSocket == this;
147
135k
    }
148
149
    /* Returns whether we could cork (it is free) */
150
0
    bool canCork() {
151
0
        return getLoopData()->corkedSocket == nullptr;
152
0
    }
153
154
    /* Returns a suitable buffer for temporary assemblation of send data */
155
22.7k
    std::pair<char *, SendBufferAttribute> getSendBuffer(size_t size) {
156
        /* First step is to determine if we already have backpressure or not */
157
22.7k
        LoopData *loopData = getLoopData();
158
22.7k
        BackPressure &backPressure = getAsyncSocketData()->buffer;
159
22.7k
        size_t existingBackpressure = backPressure.length();
160
22.7k
        if ((!existingBackpressure) && (isCorked() || canCork()) && (loopData->corkOffset + size < LoopData::CORK_BUFFER_SIZE)) {
161
            /* Cork automatically if we can */
162
5.40k
            if (isCorked()) {
163
5.40k
                char *sendBuffer = loopData->corkBuffer + loopData->corkOffset;
164
5.40k
                loopData->corkOffset += (unsigned int) size;
165
5.40k
                return {sendBuffer, SendBufferAttribute::NEEDS_NOTHING};
166
5.40k
            } else {
167
0
                cork();
168
0
                char *sendBuffer = loopData->corkBuffer + loopData->corkOffset;
169
0
                loopData->corkOffset += (unsigned int) size;
170
0
                return {sendBuffer, SendBufferAttribute::NEEDS_UNCORK};
171
0
            }
172
17.3k
        } else {
173
174
            /* If we are corked and there is already data in the cork buffer,
175
            mark how much is ours and reset it */
176
17.3k
            unsigned int ourCorkOffset = 0;
177
17.3k
            if (isCorked() && loopData->corkOffset) {
178
74
                ourCorkOffset = loopData->corkOffset;
179
74
                loopData->corkOffset = 0;
180
74
            }
181
182
            /* Fallback is to use the backpressure as buffer */
183
17.3k
            backPressure.resize(ourCorkOffset + existingBackpressure + size);
184
185
            /* And copy corkbuffer in front */
186
17.3k
            memcpy((char *) backPressure.data() + existingBackpressure, loopData->corkBuffer, ourCorkOffset);
187
188
17.3k
            return {(char *) backPressure.data() + ourCorkOffset + existingBackpressure, SendBufferAttribute::NEEDS_DRAIN};
189
17.3k
        }
190
22.7k
    }
191
192
    /* Returns the user space backpressure. */
193
109k
    unsigned int getBufferedAmount() {
194
        /* We return the actual amount of bytes in backbuffer, including pendingRemoval */
195
109k
        return (unsigned int) getAsyncSocketData()->buffer.totalLength();
196
109k
    }
197
198
    /* Returns the text representation of an IPv4 or IPv6 address */
199
39.0k
    std::string_view addressAsText(std::string_view binary) {
200
39.0k
        static thread_local char buf[64];
201
39.0k
        int ipLength = 0;
202
203
39.0k
        if (!binary.length()) {
204
0
            return {};
205
0
        }
206
207
39.0k
        unsigned char *b = (unsigned char *) binary.data();
208
209
39.0k
        if (binary.length() == 4) {
210
20.3k
            ipLength = snprintf(buf, 64, "%u.%u.%u.%u", b[0], b[1], b[2], b[3]);
211
20.3k
        } else {
212
18.7k
            ipLength = snprintf(buf, 64, "%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x",
213
18.7k
                b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11],
214
18.7k
                b[12], b[13], b[14], b[15]);
215
18.7k
        }
216
217
39.0k
        return {buf, (unsigned int) ipLength};
218
39.0k
    }
219
220
    /* Returns the remote IP address or empty string on failure */
221
39.0k
    std::string_view getRemoteAddress() {
222
39.0k
        static thread_local char buf[16];
223
39.0k
        int ipLength = 16;
224
39.0k
        us_socket_remote_address(SSL, (us_socket_t *) this, buf, &ipLength);
225
39.0k
        return std::string_view(buf, (unsigned int) ipLength);
226
39.0k
    }
227
228
    /* Returns the text representation of IP */
229
39.0k
    std::string_view getRemoteAddressAsText() {
230
39.0k
        return addressAsText(getRemoteAddress());
231
39.0k
    }
232
233
    /* Write in three levels of prioritization: cork-buffer, syscall, socket-buffer. Always drain if possible.
234
     * Returns pair of bytes written (anywhere) and wheter or not this call resulted in the polling for
235
     * writable (or we are in a state that implies polling for writable). */
236
1.75M
    std::pair<int, bool> write(const char *src, int length, bool optionally = false, int nextLength = 0) {
237
        /* Fake success if closed, simple fix to allow uncork of closed socket to succeed */
238
1.75M
        if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
239
300k
            return {length, false};
240
300k
        }
241
242
1.45M
        LoopData *loopData = getLoopData();
243
1.45M
        AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData();
244
245
        /* We are limited if we have a per-socket buffer */
246
1.45M
        if (asyncSocketData->buffer.length()) {
247
            /* Write off as much as we can */
248
104k
            int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length);
249
250
            /* On failure return, otherwise continue down the function */
251
104k
            if ((unsigned int) written < asyncSocketData->buffer.length()) {
252
253
                /* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */
254
99.8k
                asyncSocketData->buffer.erase((unsigned int) written);
255
256
99.8k
                if (optionally) {
257
                    /* Thankfully we can exit early here */
258
2.42k
                    return {0, true};
259
97.4k
                } else {
260
                    /* This path is horrible and points towards erroneous usage */
261
97.4k
                    asyncSocketData->buffer.append(src, (unsigned int) length);
262
263
97.4k
                    return {length, true};
264
97.4k
                }
265
99.8k
            }
266
267
            /* At this point we simply have no buffer and can continue as normal */
268
5.10k
            asyncSocketData->buffer.clear();
269
5.10k
        }
270
271
1.35M
        if (length) {
272
1.30M
            if (loopData->corkedSocket == this) {
273
                /* We are corked */
274
1.26M
                if (LoopData::CORK_BUFFER_SIZE - loopData->corkOffset >= (unsigned int) length) {
275
                    /* If the entire chunk fits in cork buffer */
276
1.26M
                    memcpy(loopData->corkBuffer + loopData->corkOffset, src, (unsigned int) length);
277
1.26M
                    loopData->corkOffset += (unsigned int) length;
278
                    /* Fall through to default return */
279
1.26M
                } else {
280
                    /* Strategy differences between SSL and non-SSL regarding syscall minimizing */
281
0
                    if constexpr (false) {
282
                        /* Cork up as much as we can */
283
0
                        unsigned int stripped = LoopData::CORK_BUFFER_SIZE - loopData->corkOffset;
284
0
                        memcpy(loopData->corkBuffer + loopData->corkOffset, src, stripped);
285
0
                        loopData->corkOffset = LoopData::CORK_BUFFER_SIZE;
286
287
0
                        auto [written, failed] = uncork(src + stripped, length - (int) stripped, optionally);
288
0
                        return {written + (int) stripped, failed};
289
0
                    }
290
291
                    /* For non-SSL we take the penalty of two syscalls */
292
0
                    return uncork(src, length, optionally);
293
0
                }
294
1.26M
            } else {
295
                /* We are not corked */
296
48.0k
                int written = us_socket_write(SSL, (us_socket_t *) this, src, length, nextLength != 0);
297
298
                /* Did we fail? */
299
48.0k
                if (written < length) {
300
                    /* If the write was optional then just bail out */
301
45.5k
                    if (optionally) {
302
0
                        return {written, true};
303
0
                    }
304
305
                    /* Fall back to worst possible case (should be very rare for HTTP) */
306
                    /* At least we can reserve room for next chunk if we know it up front */
307
45.5k
                    if (nextLength) {
308
0
                        asyncSocketData->buffer.reserve(asyncSocketData->buffer.length() + (size_t) (length - written + nextLength));
309
0
                    }
310
311
                    /* Buffer this chunk */
312
45.5k
                    asyncSocketData->buffer.append(src + written, (size_t) (length - written));
313
314
                    /* Return the failure */
315
45.5k
                    return {length, true};
316
45.5k
                }
317
                /* Fall through to default return */
318
48.0k
            }
319
1.30M
        }
320
321
        /* Default fall through return */
322
1.30M
        return {length, false};
323
1.35M
    }
324
325
    /* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */
326
    /* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */
327
440k
    std::pair<int, bool> uncork(const char *src = nullptr, int length = 0, bool optionally = false) {
328
440k
        LoopData *loopData = getLoopData();
329
330
440k
        if (loopData->corkedSocket == this) {
331
440k
            loopData->corkedSocket = nullptr;
332
333
440k
            if (loopData->corkOffset) {
334
                /* Corked data is already accounted for via its write call */
335
52.6k
                auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length);
336
52.6k
                loopData->corkOffset = 0;
337
338
52.6k
                if (failed) {
339
                    /* We do not need to care for buffering here, write does that */
340
44.4k
                    return {0, true};
341
44.4k
                }
342
52.6k
            }
343
344
            /* We should only return with new writes, not things written to cork already */
345
396k
            return write(src, length, optionally, 0);
346
440k
        } else {
347
            /* We are not even corked! */
348
0
            return {0, false};
349
0
        }
350
440k
    }
351
};
352
353
}
354
355
#endif // UWS_ASYNCSOCKET_H