/src/uWebSockets/src/AsyncSocket.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2020. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef UWS_ASYNCSOCKET_H |
19 | | #define UWS_ASYNCSOCKET_H |
20 | | |
21 | | /* This class implements async socket memory management strategies */ |
22 | | |
23 | | /* NOTE: Many unsigned/signed conversion warnings could be solved by moving from int length |
24 | | * to unsigned length for everything to/from uSockets - this would however remove the opportunity |
25 | | * to signal error with -1 (which is how the entire UNIX syscalling is built). */ |
26 | | |
27 | | #include <cstring> |
28 | | #include <iostream> |
29 | | |
30 | | #include "libusockets.h" |
31 | | |
32 | | #include "LoopData.h" |
33 | | #include "AsyncSocketData.h" |
34 | | |
35 | | namespace uWS { |
36 | | |
37 | | enum SendBufferAttribute { |
38 | | NEEDS_NOTHING, |
39 | | NEEDS_DRAIN, |
40 | | NEEDS_UNCORK |
41 | | }; |
42 | | |
43 | | template <bool, bool, typename> struct WebSocketContext; |
44 | | |
45 | | template <bool SSL> |
46 | | struct AsyncSocket { |
47 | | /* This guy is promiscuous */ |
48 | | template <bool> friend struct HttpContext; |
49 | | template <bool, bool, typename> friend struct WebSocketContext; |
50 | | template <bool> friend struct TemplatedApp; |
51 | | template <bool, typename> friend struct WebSocketContextData; |
52 | | template <typename, typename> friend struct TopicTree; |
53 | | template <bool> friend struct HttpResponse; |
54 | | |
55 | | private: |
56 | | /* Helper, do not use directly (todo: move to uSockets or de-crazify) */ |
57 | | void throttle_helper(int toggle) { |
58 | | /* These should be exposed by uSockets */ |
59 | | static thread_local int us_events[2] = {0, 0}; |
60 | | |
61 | | struct us_poll_t *p = (struct us_poll_t *) this; |
62 | | struct us_loop_t *loop = us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this)); |
63 | | |
64 | | if (toggle) { |
65 | | /* Pause */ |
66 | | int events = us_poll_events(p); |
67 | | if (events) { |
68 | | us_events[getBufferedAmount() ? 1 : 0] = events; |
69 | | } |
70 | | us_poll_change(p, loop, 0); |
71 | | } else { |
72 | | /* Resume */ |
73 | | int events = us_events[getBufferedAmount() ? 1 : 0]; |
74 | | us_poll_change(p, loop, events); |
75 | | } |
76 | | } |
77 | | |
78 | | protected: |
79 | | /* Returns SSL pointer or FD as pointer */ |
80 | | void *getNativeHandle() { |
81 | | return us_socket_get_native_handle(SSL, (us_socket_t *) this); |
82 | | } |
83 | | |
84 | | /* Get loop data for socket */ |
85 | 764k | LoopData *getLoopData() { |
86 | 764k | return (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this))); |
87 | 764k | } |
88 | | |
89 | | /* Get socket extension */ |
90 | 413k | AsyncSocketData<SSL> *getAsyncSocketData() { |
91 | 413k | return (AsyncSocketData<SSL> *) us_socket_ext(SSL, (us_socket_t *) this); |
92 | 413k | } |
93 | | |
94 | | /* Socket timeout */ |
95 | 47.7k | void timeout(unsigned int seconds) { |
96 | 47.7k | us_socket_timeout(SSL, (us_socket_t *) this, seconds); |
97 | 47.7k | } |
98 | | |
99 | | /* Shutdown socket without any automatic drainage */ |
100 | 2.07k | void shutdown() { |
101 | 2.07k | us_socket_shutdown(SSL, (us_socket_t *) this); |
102 | 2.07k | } |
103 | | |
104 | | /* Experimental pause */ |
105 | | us_socket_t *pause() { |
106 | | throttle_helper(1); |
107 | | return (us_socket_t *) this; |
108 | | } |
109 | | |
110 | | /* Experimental resume */ |
111 | | us_socket_t *resume() { |
112 | | throttle_helper(0); |
113 | | return (us_socket_t *) this; |
114 | | } |
115 | | |
116 | | /* Immediately close socket */ |
117 | 29.9k | us_socket_t *close() { |
118 | 29.9k | return us_socket_close(SSL, (us_socket_t *) this, 0, nullptr); |
119 | 29.9k | } |
120 | | |
121 | | void corkUnchecked() { |
122 | | /* What if another socket is corked? */ |
123 | | getLoopData()->corkedSocket = this; |
124 | | } |
125 | | |
126 | | void uncorkWithoutSending() { |
127 | | if (isCorked()) { |
128 | | getLoopData()->corkedSocket = nullptr; |
129 | | } |
130 | | } |
131 | | |
132 | | /* Cork this socket. Only one socket may ever be corked per-loop at any given time */ |
133 | 146k | void cork() { |
134 | | /* Extra check for invalid corking of others */ |
135 | 146k | if (getLoopData()->corkOffset && getLoopData()->corkedSocket != this) { |
136 | 0 | std::cerr << "Error: Cork buffer must not be acquired without checking canCork!" << std::endl; |
137 | 0 | std::terminate(); |
138 | 0 | } |
139 | | |
140 | | /* What if another socket is corked? */ |
141 | 146k | getLoopData()->corkedSocket = this; |
142 | 146k | } |
143 | | |
144 | | /* Returns the corked socket or nullptr */ |
145 | 13.0k | void *corkedSocket() { |
146 | 13.0k | return getLoopData()->corkedSocket; |
147 | 13.0k | } |
148 | | |
149 | | /* Returns wheter we are corked or not */ |
150 | 26.1k | bool isCorked() { |
151 | 26.1k | return getLoopData()->corkedSocket == this; |
152 | 26.1k | } |
153 | | |
154 | | /* Returns whether we could cork (it is free) */ |
155 | 13.0k | bool canCork() { |
156 | 13.0k | return getLoopData()->corkedSocket == nullptr; |
157 | 13.0k | } |
158 | | |
159 | | /* Returns a suitable buffer for temporary assemblation of send data */ |
160 | | std::pair<char *, SendBufferAttribute> getSendBuffer(size_t size) { |
161 | | /* First step is to determine if we already have backpressure or not */ |
162 | | LoopData *loopData = getLoopData(); |
163 | | BackPressure &backPressure = getAsyncSocketData()->buffer; |
164 | | size_t existingBackpressure = backPressure.length(); |
165 | | if ((!existingBackpressure) && (isCorked() || canCork()) && (loopData->corkOffset + size < LoopData::CORK_BUFFER_SIZE)) { |
166 | | /* Cork automatically if we can */ |
167 | | if (isCorked()) { |
168 | | char *sendBuffer = loopData->corkBuffer + loopData->corkOffset; |
169 | | loopData->corkOffset += (unsigned int) size; |
170 | | return {sendBuffer, SendBufferAttribute::NEEDS_NOTHING}; |
171 | | } else { |
172 | | cork(); |
173 | | char *sendBuffer = loopData->corkBuffer + loopData->corkOffset; |
174 | | loopData->corkOffset += (unsigned int) size; |
175 | | return {sendBuffer, SendBufferAttribute::NEEDS_UNCORK}; |
176 | | } |
177 | | } else { |
178 | | |
179 | | /* If we are corked and there is already data in the cork buffer, |
180 | | mark how much is ours and reset it */ |
181 | | unsigned int ourCorkOffset = 0; |
182 | | if (isCorked() && loopData->corkOffset) { |
183 | | ourCorkOffset = loopData->corkOffset; |
184 | | loopData->corkOffset = 0; |
185 | | } |
186 | | |
187 | | /* Fallback is to use the backpressure as buffer */ |
188 | | backPressure.resize(ourCorkOffset + existingBackpressure + size); |
189 | | |
190 | | /* And copy corkbuffer in front */ |
191 | | memcpy((char *) backPressure.data() + existingBackpressure, loopData->corkBuffer, ourCorkOffset); |
192 | | |
193 | | return {(char *) backPressure.data() + ourCorkOffset + existingBackpressure, SendBufferAttribute::NEEDS_DRAIN}; |
194 | | } |
195 | | } |
196 | | |
197 | | /* Returns the user space backpressure. */ |
198 | 6.18k | unsigned int getBufferedAmount() { |
199 | | /* We return the actual amount of bytes in backbuffer, including pendingRemoval */ |
200 | 6.18k | return (unsigned int) getAsyncSocketData()->buffer.totalLength(); |
201 | 6.18k | } |
202 | | |
203 | | /* Returns the text representation of an IPv4 or IPv6 address */ |
204 | | std::string_view addressAsText(std::string_view binary) { |
205 | | static thread_local char buf[64]; |
206 | | int ipLength = 0; |
207 | | |
208 | | if (!binary.length()) { |
209 | | return {}; |
210 | | } |
211 | | |
212 | | unsigned char *b = (unsigned char *) binary.data(); |
213 | | |
214 | | if (binary.length() == 4) { |
215 | | ipLength = sprintf(buf, "%u.%u.%u.%u", b[0], b[1], b[2], b[3]); |
216 | | } else { |
217 | | ipLength = sprintf(buf, "%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x", |
218 | | b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], |
219 | | b[12], b[13], b[14], b[15]); |
220 | | } |
221 | | |
222 | | return {buf, (unsigned int) ipLength}; |
223 | | } |
224 | | |
225 | | /* Returns the remote IP address or empty string on failure */ |
226 | | std::string_view getRemoteAddress() { |
227 | | static thread_local char buf[16]; |
228 | | int ipLength = 16; |
229 | | us_socket_remote_address(SSL, (us_socket_t *) this, buf, &ipLength); |
230 | | return std::string_view(buf, (unsigned int) ipLength); |
231 | | } |
232 | | |
233 | | /* Returns the text representation of IP */ |
234 | | std::string_view getRemoteAddressAsText() { |
235 | | return addressAsText(getRemoteAddress()); |
236 | | } |
237 | | |
238 | | /* Write in three levels of prioritization: cork-buffer, syscall, socket-buffer. Always drain if possible. |
239 | | * Returns pair of bytes written (anywhere) and wheter or not this call resulted in the polling for |
240 | | * writable (or we are in a state that implies polling for writable). */ |
241 | 354k | std::pair<int, bool> write(const char *src, int length, bool optionally = false, int nextLength = 0) { |
242 | | /* Fake success if closed, simple fix to allow uncork of closed socket to succeed */ |
243 | 354k | if (us_socket_is_closed(SSL, (us_socket_t *) this)) { |
244 | 92.9k | return {length, false}; |
245 | 92.9k | } |
246 | | |
247 | 261k | LoopData *loopData = getLoopData(); |
248 | 261k | AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData(); |
249 | | |
250 | | /* We are limited if we have a per-socket buffer */ |
251 | 261k | if (asyncSocketData->buffer.length()) { |
252 | | /* Write off as much as we can */ |
253 | 40.2k | int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length); |
254 | | |
255 | | /* On failure return, otherwise continue down the function */ |
256 | 40.2k | if ((unsigned int) written < asyncSocketData->buffer.length()) { |
257 | | |
258 | | /* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */ |
259 | 36.0k | asyncSocketData->buffer.erase((unsigned int) written); |
260 | | |
261 | 36.0k | if (optionally) { |
262 | | /* Thankfully we can exit early here */ |
263 | 7.98k | return {0, true}; |
264 | 28.1k | } else { |
265 | | /* This path is horrible and points towards erroneous usage */ |
266 | 28.1k | asyncSocketData->buffer.append(src, (unsigned int) length); |
267 | | |
268 | 28.1k | return {length, true}; |
269 | 28.1k | } |
270 | 36.0k | } |
271 | | |
272 | | /* At this point we simply have no buffer and can continue as normal */ |
273 | 4.12k | asyncSocketData->buffer.clear(); |
274 | 4.12k | } |
275 | | |
276 | 225k | if (length) { |
277 | 193k | if (loopData->corkedSocket == this) { |
278 | | /* We are corked */ |
279 | 181k | if (LoopData::CORK_BUFFER_SIZE - loopData->corkOffset >= (unsigned int) length) { |
280 | | /* If the entire chunk fits in cork buffer */ |
281 | 181k | memcpy(loopData->corkBuffer + loopData->corkOffset, src, (unsigned int) length); |
282 | 181k | loopData->corkOffset += (unsigned int) length; |
283 | | /* Fall through to default return */ |
284 | 181k | } else { |
285 | | /* Strategy differences between SSL and non-SSL regarding syscall minimizing */ |
286 | 0 | if constexpr (SSL) { |
287 | | /* Cork up as much as we can */ |
288 | 0 | unsigned int stripped = LoopData::CORK_BUFFER_SIZE - loopData->corkOffset; |
289 | 0 | memcpy(loopData->corkBuffer + loopData->corkOffset, src, stripped); |
290 | 0 | loopData->corkOffset = LoopData::CORK_BUFFER_SIZE; |
291 | |
|
292 | 0 | auto [written, failed] = uncork(src + stripped, length - (int) stripped, optionally); |
293 | 0 | return {written + (int) stripped, failed}; |
294 | 0 | } |
295 | | |
296 | | /* For non-SSL we take the penalty of two syscalls */ |
297 | 0 | return uncork(src, length, optionally); |
298 | 0 | } |
299 | 181k | } else { |
300 | | /* We are not corked */ |
301 | 12.6k | int written = us_socket_write(SSL, (us_socket_t *) this, src, length, nextLength != 0); |
302 | | |
303 | | /* Did we fail? */ |
304 | 12.6k | if (written < length) { |
305 | | /* If the write was optional then just bail out */ |
306 | 11.0k | if (optionally) { |
307 | 0 | return {written, true}; |
308 | 0 | } |
309 | | |
310 | | /* Fall back to worst possible case (should be very rare for HTTP) */ |
311 | | /* At least we can reserve room for next chunk if we know it up front */ |
312 | 11.0k | if (nextLength) { |
313 | 0 | asyncSocketData->buffer.reserve(asyncSocketData->buffer.length() + (size_t) (length - written + nextLength)); |
314 | 0 | } |
315 | | |
316 | | /* Buffer this chunk */ |
317 | 11.0k | asyncSocketData->buffer.append(src + written, (size_t) (length - written)); |
318 | | |
319 | | /* Return the failure */ |
320 | 11.0k | return {length, true}; |
321 | 11.0k | } |
322 | | /* Fall through to default return */ |
323 | 12.6k | } |
324 | 193k | } |
325 | | |
326 | | /* Default fall through return */ |
327 | 214k | return {length, false}; |
328 | 225k | } |
329 | | |
330 | | /* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */ |
331 | | /* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */ |
332 | 146k | std::pair<int, bool> uncork(const char *src = nullptr, int length = 0, bool optionally = false) { |
333 | 146k | LoopData *loopData = getLoopData(); |
334 | | |
335 | 146k | if (loopData->corkedSocket == this) { |
336 | 146k | loopData->corkedSocket = nullptr; |
337 | | |
338 | 146k | if (loopData->corkOffset) { |
339 | | /* Corked data is already accounted for via its write call */ |
340 | 12.6k | auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length); |
341 | 12.6k | loopData->corkOffset = 0; |
342 | | |
343 | 12.6k | if (failed) { |
344 | | /* We do not need to care for buffering here, write does that */ |
345 | 11.0k | return {0, true}; |
346 | 11.0k | } |
347 | 12.6k | } |
348 | | |
349 | | /* We should only return with new writes, not things written to cork already */ |
350 | 134k | return write(src, length, optionally, 0); |
351 | 146k | } else { |
352 | | /* We are not even corked! */ |
353 | 0 | return {0, false}; |
354 | 0 | } |
355 | 146k | } |
356 | | }; |
357 | | |
358 | | } |
359 | | |
360 | | #endif // UWS_ASYNCSOCKET_H |