/src/uWebSockets/uSockets/src/loop.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef LIBUS_USE_IO_URING |
19 | | |
20 | | #include "libusockets.h" |
21 | | #include "internal/internal.h" |
22 | | #include <stdlib.h> |
23 | | |
24 | | /* The loop has 2 fallthrough polls */ |
25 | | void us_internal_loop_data_init(struct us_loop_t *loop, void (*wakeup_cb)(struct us_loop_t *loop), |
26 | 3.30k | void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop)) { |
27 | 3.30k | loop->data.sweep_timer = us_create_timer(loop, 1, 0); |
28 | 3.30k | loop->data.recv_buf = malloc(LIBUS_RECV_BUFFER_LENGTH + LIBUS_RECV_BUFFER_PADDING * 2); |
29 | 3.30k | loop->data.ssl_data = 0; |
30 | 3.30k | loop->data.head = 0; |
31 | 3.30k | loop->data.iterator = 0; |
32 | 3.30k | loop->data.closed_head = 0; |
33 | 3.30k | loop->data.low_prio_head = 0; |
34 | 3.30k | loop->data.low_prio_budget = 0; |
35 | | |
36 | 3.30k | loop->data.pre_cb = pre_cb; |
37 | 3.30k | loop->data.post_cb = post_cb; |
38 | 3.30k | loop->data.iteration_nr = 0; |
39 | | |
40 | 3.30k | loop->data.wakeup_async = us_internal_create_async(loop, 1, 0); |
41 | 3.30k | us_internal_async_set(loop->data.wakeup_async, (void (*)(struct us_internal_async *)) wakeup_cb); |
42 | 3.30k | } |
43 | | |
44 | 3.30k | void us_internal_loop_data_free(struct us_loop_t *loop) { |
45 | | #ifndef LIBUS_NO_SSL |
46 | | us_internal_free_loop_ssl_data(loop); |
47 | | #endif |
48 | | |
49 | 3.30k | free(loop->data.recv_buf); |
50 | | |
51 | 3.30k | us_timer_close(loop->data.sweep_timer); |
52 | 3.30k | us_internal_async_close(loop->data.wakeup_async); |
53 | 3.30k | } |
54 | | |
55 | 24.6k | void us_wakeup_loop(struct us_loop_t *loop) { |
56 | 24.6k | us_internal_async_wakeup(loop->data.wakeup_async); |
57 | 24.6k | } |
58 | | |
59 | 3.30k | void us_internal_loop_link(struct us_loop_t *loop, struct us_socket_context_t *context) { |
60 | | /* Insert this context as the head of loop */ |
61 | 3.30k | context->next = loop->data.head; |
62 | 3.30k | context->prev = 0; |
63 | 3.30k | if (loop->data.head) { |
64 | 0 | loop->data.head->prev = context; |
65 | 0 | } |
66 | 3.30k | loop->data.head = context; |
67 | 3.30k | } |
68 | | |
69 | | /* Unlink is called before free */ |
70 | 3.30k | void us_internal_loop_unlink(struct us_loop_t *loop, struct us_socket_context_t *context) { |
71 | 3.30k | if (loop->data.head == context) { |
72 | 3.30k | loop->data.head = context->next; |
73 | 3.30k | if (loop->data.head) { |
74 | 0 | loop->data.head->prev = 0; |
75 | 0 | } |
76 | 3.30k | } else { |
77 | 0 | context->prev->next = context->next; |
78 | 0 | if (context->next) { |
79 | 0 | context->next->prev = context->prev; |
80 | 0 | } |
81 | 0 | } |
82 | 3.30k | } |
83 | | |
84 | | /* This functions should never run recursively */ |
85 | 2.51M | void us_internal_timer_sweep(struct us_loop_t *loop) { |
86 | 2.51M | struct us_internal_loop_data_t *loop_data = &loop->data; |
87 | | /* For all socket contexts in this loop */ |
88 | 5.02M | for (loop_data->iterator = loop_data->head; loop_data->iterator; loop_data->iterator = loop_data->iterator->next) { |
89 | | |
90 | 2.51M | struct us_socket_context_t *context = loop_data->iterator; |
91 | | |
92 | | /* Update this context's timestamps (this could be moved to loop and done once) */ |
93 | 2.51M | context->global_tick++; |
94 | 2.51M | unsigned char short_ticks = context->timestamp = context->global_tick % 240; |
95 | 2.51M | unsigned char long_ticks = context->long_timestamp = (context->global_tick / 15) % 240; |
96 | | |
97 | | /* Begin at head */ |
98 | 2.51M | struct us_socket_t *s = context->head_sockets; |
99 | 2.51M | while (s) { |
100 | | /* Seek until end or timeout found (tightest loop) */ |
101 | 235k | while (1) { |
102 | | /* We only read from 1 random cache line here */ |
103 | 235k | if (short_ticks == s->timeout || long_ticks == s->long_timeout) { |
104 | 6.36k | break; |
105 | 6.36k | } |
106 | | |
107 | | /* Did we reach the end without a find? */ |
108 | 229k | if ((s = s->next) == 0) { |
109 | 83.8k | goto next_context; |
110 | 83.8k | } |
111 | 229k | } |
112 | | |
113 | | /* Here we have a timeout to emit (slow path) */ |
114 | 6.36k | context->iterator = s; |
115 | | |
116 | 6.36k | if (short_ticks == s->timeout) { |
117 | 6.36k | s->timeout = 255; |
118 | 6.36k | context->on_socket_timeout(s); |
119 | 6.36k | } |
120 | | |
121 | 6.36k | if (context->iterator == s && long_ticks == s->long_timeout) { |
122 | 0 | s->long_timeout = 255; |
123 | 0 | context->on_socket_long_timeout(s); |
124 | 0 | } |
125 | | |
126 | | /* Check for unlink / link (if the event handler did not modify the chain, we step 1) */ |
127 | 6.36k | if (s == context->iterator) { |
128 | 0 | s = s->next; |
129 | 6.36k | } else { |
130 | | /* The iterator was changed by event handler */ |
131 | 6.36k | s = context->iterator; |
132 | 6.36k | } |
133 | 6.36k | } |
134 | | /* We always store a 0 to context->iterator here since we are no longer iterating this context */ |
135 | 2.51M | next_context: |
136 | 2.51M | context->iterator = 0; |
137 | 2.51M | } |
138 | 2.51M | } |
139 | | |
140 | | /* We do not want to block the loop with tons and tons of CPU-intensive work for SSL handshakes. |
141 | | * Spread it out during many loop iterations, prioritizing already open connections, they are far |
142 | | * easier on CPU */ |
143 | | static const int MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION = 5; |
144 | | |
145 | 2.90M | void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) { |
146 | 2.90M | struct us_internal_loop_data_t *loop_data = &loop->data; |
147 | 2.90M | struct us_socket_t *s; |
148 | | |
149 | 2.90M | loop_data->low_prio_budget = MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION; |
150 | | |
151 | 2.90M | for (s = loop_data->low_prio_head; s && loop_data->low_prio_budget > 0; s = loop_data->low_prio_head, loop_data->low_prio_budget--) { |
152 | | /* Unlink this socket from the low-priority queue */ |
153 | 0 | loop_data->low_prio_head = s->next; |
154 | 0 | if (s->next) s->next->prev = 0; |
155 | 0 | s->next = 0; |
156 | |
|
157 | 0 | us_internal_socket_context_link_socket(s->context, s); |
158 | 0 | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) | LIBUS_SOCKET_READABLE); |
159 | |
|
160 | 0 | s->low_prio_state = 2; |
161 | 0 | } |
162 | 2.90M | } |
163 | | |
164 | | /* Note: Properly takes the linked list and timeout sweep into account */ |
165 | 2.90M | void us_internal_free_closed_sockets(struct us_loop_t *loop) { |
166 | | /* Free all closed sockets (maybe it is better to reverse order?) */ |
167 | 2.90M | if (loop->data.closed_head) { |
168 | 1.14M | for (struct us_socket_t *s = loop->data.closed_head; s; ) { |
169 | 881k | struct us_socket_t *next = s->next; |
170 | 881k | us_poll_free((struct us_poll_t *) s, loop); |
171 | 881k | s = next; |
172 | 881k | } |
173 | 258k | loop->data.closed_head = 0; |
174 | 258k | } |
175 | 2.90M | } |
176 | | |
177 | 2.51M | void sweep_timer_cb(struct us_internal_callback_t *cb) { |
178 | 2.51M | us_internal_timer_sweep(cb->loop); |
179 | 2.51M | } |
180 | | |
181 | 0 | long long us_loop_iteration_number(struct us_loop_t *loop) { |
182 | 0 | return loop->data.iteration_nr; |
183 | 0 | } |
184 | | |
185 | | /* These may have somewhat different meaning depending on the underlying event library */ |
186 | 2.90M | void us_internal_loop_pre(struct us_loop_t *loop) { |
187 | 2.90M | loop->data.iteration_nr++; |
188 | 2.90M | us_internal_handle_low_priority_sockets(loop); |
189 | 2.90M | loop->data.pre_cb(loop); |
190 | 2.90M | } |
191 | | |
192 | 2.90M | void us_internal_loop_post(struct us_loop_t *loop) { |
193 | 2.90M | us_internal_free_closed_sockets(loop); |
194 | 2.90M | loop->data.post_cb(loop); |
195 | 2.90M | } |
196 | | |
197 | | struct us_socket_t *us_adopt_accepted_socket(int ssl, struct us_socket_context_t *context, LIBUS_SOCKET_DESCRIPTOR accepted_fd, |
198 | 878k | unsigned int socket_ext_size, char *addr_ip, int addr_ip_length) { |
199 | | #ifndef LIBUS_NO_SSL |
200 | | if (ssl) { |
201 | | return (struct us_socket_t *)us_internal_ssl_adopt_accepted_socket((struct us_internal_ssl_socket_context_t *)context, accepted_fd, |
202 | | socket_ext_size, addr_ip, addr_ip_length); |
203 | | } |
204 | | #endif |
205 | 878k | struct us_poll_t *accepted_p = us_create_poll(context->loop, 0, sizeof(struct us_socket_t) - sizeof(struct us_poll_t) + socket_ext_size); |
206 | 878k | us_poll_init(accepted_p, accepted_fd, POLL_TYPE_SOCKET); |
207 | 878k | us_poll_start(accepted_p, context->loop, LIBUS_SOCKET_READABLE); |
208 | | |
209 | 878k | struct us_socket_t *s = (struct us_socket_t *) accepted_p; |
210 | | |
211 | 878k | s->context = context; |
212 | 878k | s->timeout = 255; |
213 | 878k | s->long_timeout = 255; |
214 | 878k | s->low_prio_state = 0; |
215 | | |
216 | | /* We always use nodelay */ |
217 | 878k | bsd_socket_nodelay(accepted_fd, 1); |
218 | | |
219 | 878k | us_internal_socket_context_link_socket(context, s); |
220 | | |
221 | 878k | context->on_open(s, 0, addr_ip, addr_ip_length); |
222 | 878k | return s; |
223 | 878k | } |
224 | | |
225 | 10.6M | void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events) { |
226 | 10.6M | switch (us_internal_poll_type(p)) { |
227 | 7.36M | case POLL_TYPE_CALLBACK: { |
228 | 7.36M | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p; |
229 | | /* Timers, asyncs should accept (read), while UDP sockets should obviously not */ |
230 | 7.36M | if (!cb->leave_poll_ready) { |
231 | | /* Let's just have this macro to silence the CodeQL alert regarding empty function when using libuv */ |
232 | 7.36M | #ifndef LIBUS_USE_LIBUV |
233 | 7.36M | us_internal_accept_poll_event(p); |
234 | 7.36M | #endif |
235 | 7.36M | } |
236 | 7.36M | cb->cb(cb->cb_expects_the_loop ? (struct us_internal_callback_t *) cb->loop : (struct us_internal_callback_t *) &cb->p); |
237 | 7.36M | } |
238 | 7.36M | break; |
239 | 2.35M | case POLL_TYPE_SEMI_SOCKET: { |
240 | | /* Both connect and listen sockets are semi-sockets |
241 | | * but they poll for different events */ |
242 | 2.35M | if (us_poll_events(p) == LIBUS_SOCKET_WRITABLE) { |
243 | 0 | struct us_socket_t *s = (struct us_socket_t *) p; |
244 | | |
245 | | /* It is perfectly possible to come here with an error */ |
246 | 0 | if (error) { |
247 | | /* Emit error, close without emitting on_close */ |
248 | 0 | s->context->on_connect_error(s, 0); |
249 | 0 | us_socket_close_connecting(0, s); |
250 | 0 | } else { |
251 | | /* All sockets poll for readable */ |
252 | 0 | us_poll_change(p, s->context->loop, LIBUS_SOCKET_READABLE); |
253 | | |
254 | | /* We always use nodelay */ |
255 | 0 | bsd_socket_nodelay(us_poll_fd(p), 1); |
256 | | |
257 | | /* We are now a proper socket */ |
258 | 0 | us_internal_poll_set_type(p, POLL_TYPE_SOCKET); |
259 | | |
260 | | /* If we used a connection timeout we have to reset it here */ |
261 | 0 | us_socket_timeout(0, s, 0); |
262 | |
|
263 | 0 | s->context->on_open(s, 1, 0, 0); |
264 | 0 | } |
265 | 2.35M | } else { |
266 | 2.35M | struct us_listen_socket_t *listen_socket = (struct us_listen_socket_t *) p; |
267 | 2.35M | struct bsd_addr_t addr; |
268 | | |
269 | 2.35M | LIBUS_SOCKET_DESCRIPTOR client_fd = bsd_accept_socket(us_poll_fd(p), &addr); |
270 | 2.35M | if (client_fd == LIBUS_SOCKET_ERROR) { |
271 | | /* Todo: start timer here */ |
272 | | |
273 | 2.13M | } else { |
274 | | |
275 | | /* Todo: stop timer if any */ |
276 | | |
277 | 878k | do { |
278 | 878k | struct us_socket_context_t *context = us_socket_context(0, &listen_socket->s); |
279 | | /* See if we want to export the FD or keep it here (this event can be unset) */ |
280 | 878k | if (context->on_pre_open == 0 || context->on_pre_open(context, client_fd) == client_fd) { |
281 | | |
282 | | /* Adopt the newly accepted socket */ |
283 | 878k | us_adopt_accepted_socket(0, context, |
284 | 878k | client_fd, listen_socket->socket_ext_size, bsd_addr_get_ip(&addr), bsd_addr_get_ip_length(&addr)); |
285 | | |
286 | | /* Exit accept loop if listen socket was closed in on_open handler */ |
287 | 878k | if (us_socket_is_closed(0, &listen_socket->s)) { |
288 | 0 | break; |
289 | 0 | } |
290 | | |
291 | 878k | } |
292 | | |
293 | 878k | } while ((client_fd = bsd_accept_socket(us_poll_fd(p), &addr)) != LIBUS_SOCKET_ERROR); |
294 | 220k | } |
295 | 2.35M | } |
296 | 2.35M | } |
297 | 0 | break; |
298 | 0 | case POLL_TYPE_SOCKET_SHUT_DOWN: |
299 | 976k | case POLL_TYPE_SOCKET: { |
300 | | /* We should only use s, no p after this point */ |
301 | 976k | struct us_socket_t *s = (struct us_socket_t *) p; |
302 | | |
303 | | /* Such as epollerr epollhup */ |
304 | 976k | if (error) { |
305 | | /* Todo: decide what code we give here */ |
306 | 705k | s = us_socket_close(0, s, 0, NULL); |
307 | 705k | return; |
308 | 705k | } |
309 | | |
310 | 271k | if (events & LIBUS_SOCKET_WRITABLE) { |
311 | | /* Note: if we failed a write as a socket of one loop then adopted |
312 | | * to another loop, this will be wrong. Absurd case though */ |
313 | 11.2k | s->context->loop->data.last_write_failed = 0; |
314 | | |
315 | 11.2k | s = s->context->on_writable(s); |
316 | | |
317 | 11.2k | if (us_socket_is_closed(0, s)) { |
318 | 799 | return; |
319 | 799 | } |
320 | | |
321 | | /* If we have no failed write or if we shut down, then stop polling for more writable */ |
322 | 10.4k | if (!s->context->loop->data.last_write_failed || us_socket_is_shut_down(0, s)) { |
323 | 507 | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE); |
324 | 507 | } |
325 | 10.4k | } |
326 | | |
327 | 270k | if (events & LIBUS_SOCKET_READABLE) { |
328 | | /* Contexts may prioritize down sockets that are currently readable, e.g. when SSL handshake has to be done. |
329 | | * SSL handshakes are CPU intensive, so we limit the number of handshakes per loop iteration, and move the rest |
330 | | * to the low-priority queue */ |
331 | 268k | if (s->context->is_low_prio(s)) { |
332 | 0 | if (s->low_prio_state == 2) { |
333 | 0 | s->low_prio_state = 0; /* Socket has been delayed and now it's time to process incoming data for one iteration */ |
334 | 0 | } else if (s->context->loop->data.low_prio_budget > 0) { |
335 | 0 | s->context->loop->data.low_prio_budget--; /* Still having budget for this iteration - do normal processing */ |
336 | 0 | } else { |
337 | 0 | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE); |
338 | 0 | us_internal_socket_context_unlink_socket(s->context, s); |
339 | | |
340 | | /* Link this socket to the low-priority queue - we use a LIFO queue, to prioritize newer clients that are |
341 | | * maybe not already timeouted - sounds unfair, but works better in real-life with smaller client-timeouts |
342 | | * under high load */ |
343 | 0 | s->prev = 0; |
344 | 0 | s->next = s->context->loop->data.low_prio_head; |
345 | 0 | if (s->next) s->next->prev = s; |
346 | 0 | s->context->loop->data.low_prio_head = s; |
347 | |
|
348 | 0 | s->low_prio_state = 1; |
349 | |
|
350 | 0 | break; |
351 | 0 | } |
352 | 0 | } |
353 | | |
354 | 268k | int length; |
355 | 268k | read_more: |
356 | 268k | length = bsd_recv(us_poll_fd(&s->p), s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, LIBUS_RECV_BUFFER_LENGTH, 0); |
357 | 268k | if (length > 0) { |
358 | 224k | s = s->context->on_data(s, s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, length); |
359 | | |
360 | | /* If we filled the entire recv buffer, we need to immediately read again since otherwise a |
361 | | * pending hangup event in the same even loop iteration can close the socket before we get |
362 | | * the chance to read again next iteration */ |
363 | 224k | if (length == LIBUS_RECV_BUFFER_LENGTH && s && !us_socket_is_closed(0, s)) { |
364 | 0 | goto read_more; |
365 | 0 | } |
366 | | |
367 | 224k | } else if (!length) { |
368 | 42.5k | if (us_socket_is_shut_down(0, s)) { |
369 | | /* We got FIN back after sending it */ |
370 | | /* Todo: We should give "CLEAN SHUTDOWN" as reason here */ |
371 | 0 | s = us_socket_close(0, s, 0, NULL); |
372 | 42.5k | } else { |
373 | | /* We got FIN, so stop polling for readable */ |
374 | 42.5k | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE); |
375 | 42.5k | s = s->context->on_end(s); |
376 | 42.5k | } |
377 | 42.5k | } else if (length == LIBUS_SOCKET_ERROR && !bsd_would_block()) { |
378 | | /* Todo: decide also here what kind of reason we should give */ |
379 | 0 | s = us_socket_close(0, s, 0, NULL); |
380 | 0 | } |
381 | 268k | } |
382 | 270k | } |
383 | 270k | break; |
384 | 10.6M | } |
385 | 10.6M | } |
386 | | |
387 | | /* Integration only requires the timer to be set up */ |
388 | 3.30k | void us_loop_integrate(struct us_loop_t *loop) { |
389 | 3.30k | us_timer_set(loop->data.sweep_timer, (void (*)(struct us_timer_t *)) sweep_timer_cb, LIBUS_TIMEOUT_GRANULARITY * 1000, LIBUS_TIMEOUT_GRANULARITY * 1000); |
390 | 3.30k | } |
391 | | |
392 | 10.3M | void *us_loop_ext(struct us_loop_t *loop) { |
393 | 10.3M | return loop + 1; |
394 | 10.3M | } |
395 | | |
396 | | #endif |