Coverage Report

Created: 2023-06-06 06:17

/src/uWebSockets/src/AsyncSocket.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2020.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_ASYNCSOCKET_H
19
#define UWS_ASYNCSOCKET_H
20
21
/* This class implements async socket memory management strategies */
22
23
/* NOTE: Many unsigned/signed conversion warnings could be solved by moving from int length
24
 * to unsigned length for everything to/from uSockets - this would however remove the opportunity
25
 * to signal error with -1 (which is how the entire UNIX syscalling is built). */
26
27
#include <cstring>
28
#include <iostream>
29
30
#include "libusockets.h"
31
32
#include "LoopData.h"
33
#include "AsyncSocketData.h"
34
35
namespace uWS {
36
37
    enum SendBufferAttribute {
38
        NEEDS_NOTHING,
39
        NEEDS_DRAIN,
40
        NEEDS_UNCORK
41
    };
42
43
    template <bool, bool, typename> struct WebSocketContext;
44
45
template <bool SSL>
46
struct AsyncSocket {
47
    /* This guy is promiscuous */
48
    template <bool> friend struct HttpContext;
49
    template <bool, bool, typename> friend struct WebSocketContext;
50
    template <bool> friend struct TemplatedApp;
51
    template <bool, typename> friend struct WebSocketContextData;
52
    template <typename, typename> friend struct TopicTree;
53
    template <bool> friend struct HttpResponse;
54
55
private:
56
    /* Helper, do not use directly (todo: move to uSockets or de-crazify) */
57
    void throttle_helper(int toggle) {
58
        /* These should be exposed by uSockets */
59
        static thread_local int us_events[2] = {0, 0};
60
61
        struct us_poll_t *p = (struct us_poll_t *) this;
62
        struct us_loop_t *loop = us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this));
63
64
        if (toggle) {
65
            /* Pause */
66
            int events = us_poll_events(p);
67
            if (events) {
68
                us_events[getBufferedAmount() ? 1 : 0] = events;
69
            }
70
            us_poll_change(p, loop, 0);
71
        } else {
72
            /* Resume */
73
            int events = us_events[getBufferedAmount() ? 1 : 0];
74
            us_poll_change(p, loop, events);
75
        }
76
    }
77
78
protected:
79
    /* Returns SSL pointer or FD as pointer */
80
    void *getNativeHandle() {
81
        return us_socket_get_native_handle(SSL, (us_socket_t *) this);
82
    }
83
84
    /* Get loop data for socket */
85
764k
    LoopData *getLoopData() {
86
764k
        return (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this)));
87
764k
    }
88
89
    /* Get socket extension */
90
413k
    AsyncSocketData<SSL> *getAsyncSocketData() {
91
413k
        return (AsyncSocketData<SSL> *) us_socket_ext(SSL, (us_socket_t *) this);
92
413k
    }
93
94
    /* Socket timeout */
95
47.7k
    void timeout(unsigned int seconds) {
96
47.7k
        us_socket_timeout(SSL, (us_socket_t *) this, seconds);
97
47.7k
    }
98
99
    /* Shutdown socket without any automatic drainage */
100
2.07k
    void shutdown() {
101
2.07k
        us_socket_shutdown(SSL, (us_socket_t *) this);
102
2.07k
    }
103
104
    /* Experimental pause */
105
    us_socket_t *pause() {
106
        throttle_helper(1);
107
        return (us_socket_t *) this;
108
    }
109
110
    /* Experimental resume */
111
    us_socket_t *resume() {
112
        throttle_helper(0);
113
        return (us_socket_t *) this;
114
    }
115
116
    /* Immediately close socket */
117
29.9k
    us_socket_t *close() {
118
29.9k
        return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr);
119
29.9k
    }
120
121
    void corkUnchecked() {
122
        /* What if another socket is corked? */
123
        getLoopData()->corkedSocket = this;
124
    }
125
126
    void uncorkWithoutSending() {
127
        if (isCorked()) {
128
            getLoopData()->corkedSocket = nullptr;
129
        }
130
    }
131
132
    /* Cork this socket. Only one socket may ever be corked per-loop at any given time */
133
146k
    void cork() {
134
        /* Extra check for invalid corking of others */
135
146k
        if (getLoopData()->corkOffset && getLoopData()->corkedSocket != this) {
136
0
            std::cerr << "Error: Cork buffer must not be acquired without checking canCork!" << std::endl;
137
0
            std::terminate();
138
0
        }
139
140
        /* What if another socket is corked? */
141
146k
        getLoopData()->corkedSocket = this;
142
146k
    }
143
144
    /* Returns the corked socket or nullptr */
145
13.0k
    void *corkedSocket() {
146
13.0k
        return getLoopData()->corkedSocket;
147
13.0k
    }
148
149
    /* Returns wheter we are corked or not */
150
26.1k
    bool isCorked() {
151
26.1k
        return getLoopData()->corkedSocket == this;
152
26.1k
    }
153
154
    /* Returns whether we could cork (it is free) */
155
13.0k
    bool canCork() {
156
13.0k
        return getLoopData()->corkedSocket == nullptr;
157
13.0k
    }
158
159
    /* Returns a suitable buffer for temporary assemblation of send data */
160
    std::pair<char *, SendBufferAttribute> getSendBuffer(size_t size) {
161
        /* First step is to determine if we already have backpressure or not */
162
        LoopData *loopData = getLoopData();
163
        BackPressure &backPressure = getAsyncSocketData()->buffer;
164
        size_t existingBackpressure = backPressure.length();
165
        if ((!existingBackpressure) && (isCorked() || canCork()) && (loopData->corkOffset + size < LoopData::CORK_BUFFER_SIZE)) {
166
            /* Cork automatically if we can */
167
            if (isCorked()) {
168
                char *sendBuffer = loopData->corkBuffer + loopData->corkOffset;
169
                loopData->corkOffset += (unsigned int) size;
170
                return {sendBuffer, SendBufferAttribute::NEEDS_NOTHING};
171
            } else {
172
                cork();
173
                char *sendBuffer = loopData->corkBuffer + loopData->corkOffset;
174
                loopData->corkOffset += (unsigned int) size;
175
                return {sendBuffer, SendBufferAttribute::NEEDS_UNCORK};
176
            }
177
        } else {
178
179
            /* If we are corked and there is already data in the cork buffer,
180
            mark how much is ours and reset it */
181
            unsigned int ourCorkOffset = 0;
182
            if (isCorked() && loopData->corkOffset) {
183
                ourCorkOffset = loopData->corkOffset;
184
                loopData->corkOffset = 0;
185
            }
186
187
            /* Fallback is to use the backpressure as buffer */
188
            backPressure.resize(ourCorkOffset + existingBackpressure + size);
189
190
            /* And copy corkbuffer in front */
191
            memcpy((char *) backPressure.data() + existingBackpressure, loopData->corkBuffer, ourCorkOffset);
192
193
            return {(char *) backPressure.data() + ourCorkOffset + existingBackpressure, SendBufferAttribute::NEEDS_DRAIN};
194
        }
195
    }
196
197
    /* Returns the user space backpressure. */
198
6.18k
    unsigned int getBufferedAmount() {
199
        /* We return the actual amount of bytes in backbuffer, including pendingRemoval */
200
6.18k
        return (unsigned int) getAsyncSocketData()->buffer.totalLength();
201
6.18k
    }
202
203
    /* Returns the text representation of an IPv4 or IPv6 address */
204
    std::string_view addressAsText(std::string_view binary) {
205
        static thread_local char buf[64];
206
        int ipLength = 0;
207
208
        if (!binary.length()) {
209
            return {};
210
        }
211
212
        unsigned char *b = (unsigned char *) binary.data();
213
214
        if (binary.length() == 4) {
215
            ipLength = sprintf(buf, "%u.%u.%u.%u", b[0], b[1], b[2], b[3]);
216
        } else {
217
            ipLength = sprintf(buf, "%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x",
218
                b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11],
219
                b[12], b[13], b[14], b[15]);
220
        }
221
222
        return {buf, (unsigned int) ipLength};
223
    }
224
225
    /* Returns the remote IP address or empty string on failure */
226
    std::string_view getRemoteAddress() {
227
        static thread_local char buf[16];
228
        int ipLength = 16;
229
        us_socket_remote_address(SSL, (us_socket_t *) this, buf, &ipLength);
230
        return std::string_view(buf, (unsigned int) ipLength);
231
    }
232
233
    /* Returns the text representation of IP */
234
    std::string_view getRemoteAddressAsText() {
235
        return addressAsText(getRemoteAddress());
236
    }
237
238
    /* Write in three levels of prioritization: cork-buffer, syscall, socket-buffer. Always drain if possible.
239
     * Returns pair of bytes written (anywhere) and wheter or not this call resulted in the polling for
240
     * writable (or we are in a state that implies polling for writable). */
241
354k
    std::pair<int, bool> write(const char *src, int length, bool optionally = false, int nextLength = 0) {
242
        /* Fake success if closed, simple fix to allow uncork of closed socket to succeed */
243
354k
        if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
244
92.9k
            return {length, false};
245
92.9k
        }
246
247
261k
        LoopData *loopData = getLoopData();
248
261k
        AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData();
249
250
        /* We are limited if we have a per-socket buffer */
251
261k
        if (asyncSocketData->buffer.length()) {
252
            /* Write off as much as we can */
253
40.2k
            int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length);
254
255
            /* On failure return, otherwise continue down the function */
256
40.2k
            if ((unsigned int) written < asyncSocketData->buffer.length()) {
257
258
                /* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */
259
36.0k
                asyncSocketData->buffer.erase((unsigned int) written);
260
261
36.0k
                if (optionally) {
262
                    /* Thankfully we can exit early here */
263
7.98k
                    return {0, true};
264
28.1k
                } else {
265
                    /* This path is horrible and points towards erroneous usage */
266
28.1k
                    asyncSocketData->buffer.append(src, (unsigned int) length);
267
268
28.1k
                    return {length, true};
269
28.1k
                }
270
36.0k
            }
271
272
            /* At this point we simply have no buffer and can continue as normal */
273
4.12k
            asyncSocketData->buffer.clear();
274
4.12k
        }
275
276
225k
        if (length) {
277
193k
            if (loopData->corkedSocket == this) {
278
                /* We are corked */
279
181k
                if (LoopData::CORK_BUFFER_SIZE - loopData->corkOffset >= (unsigned int) length) {
280
                    /* If the entire chunk fits in cork buffer */
281
181k
                    memcpy(loopData->corkBuffer + loopData->corkOffset, src, (unsigned int) length);
282
181k
                    loopData->corkOffset += (unsigned int) length;
283
                    /* Fall through to default return */
284
181k
                } else {
285
                    /* Strategy differences between SSL and non-SSL regarding syscall minimizing */
286
0
                    if constexpr (SSL) {
287
                        /* Cork up as much as we can */
288
0
                        unsigned int stripped = LoopData::CORK_BUFFER_SIZE - loopData->corkOffset;
289
0
                        memcpy(loopData->corkBuffer + loopData->corkOffset, src, stripped);
290
0
                        loopData->corkOffset = LoopData::CORK_BUFFER_SIZE;
291
292
0
                        auto [written, failed] = uncork(src + stripped, length - (int) stripped, optionally);
293
0
                        return {written + (int) stripped, failed};
294
0
                    }
295
296
                    /* For non-SSL we take the penalty of two syscalls */
297
0
                    return uncork(src, length, optionally);
298
0
                }
299
181k
            } else {
300
                /* We are not corked */
301
12.6k
                int written = us_socket_write(SSL, (us_socket_t *) this, src, length, nextLength != 0);
302
303
                /* Did we fail? */
304
12.6k
                if (written < length) {
305
                    /* If the write was optional then just bail out */
306
11.0k
                    if (optionally) {
307
0
                        return {written, true};
308
0
                    }
309
310
                    /* Fall back to worst possible case (should be very rare for HTTP) */
311
                    /* At least we can reserve room for next chunk if we know it up front */
312
11.0k
                    if (nextLength) {
313
0
                        asyncSocketData->buffer.reserve(asyncSocketData->buffer.length() + (size_t) (length - written + nextLength));
314
0
                    }
315
316
                    /* Buffer this chunk */
317
11.0k
                    asyncSocketData->buffer.append(src + written, (size_t) (length - written));
318
319
                    /* Return the failure */
320
11.0k
                    return {length, true};
321
11.0k
                }
322
                /* Fall through to default return */
323
12.6k
            }
324
193k
        }
325
326
        /* Default fall through return */
327
214k
        return {length, false};
328
225k
    }
329
330
    /* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */
331
    /* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */
332
146k
    std::pair<int, bool> uncork(const char *src = nullptr, int length = 0, bool optionally = false) {
333
146k
        LoopData *loopData = getLoopData();
334
335
146k
        if (loopData->corkedSocket == this) {
336
146k
            loopData->corkedSocket = nullptr;
337
338
146k
            if (loopData->corkOffset) {
339
                /* Corked data is already accounted for via its write call */
340
12.6k
                auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length);
341
12.6k
                loopData->corkOffset = 0;
342
343
12.6k
                if (failed) {
344
                    /* We do not need to care for buffering here, write does that */
345
11.0k
                    return {0, true};
346
11.0k
                }
347
12.6k
            }
348
349
            /* We should only return with new writes, not things written to cork already */
350
134k
            return write(src, length, optionally, 0);
351
146k
        } else {
352
            /* We are not even corked! */
353
0
            return {0, false};
354
0
        }
355
146k
    }
356
};
357
358
}
359
360
#endif // UWS_ASYNCSOCKET_H