/src/uWebSockets/uSockets/src/loop.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2021. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #ifndef LIBUS_USE_IO_URING |
19 | | |
20 | | #include "libusockets.h" |
21 | | #include "internal/internal.h" |
22 | | #include <stdlib.h> |
23 | | |
24 | | /* The loop has 2 fallthrough polls */ |
25 | | void us_internal_loop_data_init(struct us_loop_t *loop, void (*wakeup_cb)(struct us_loop_t *loop), |
26 | 4.13k | void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop)) { |
27 | 4.13k | loop->data.sweep_timer = us_create_timer(loop, 1, 0); |
28 | 4.13k | loop->data.recv_buf = malloc(LIBUS_RECV_BUFFER_LENGTH + LIBUS_RECV_BUFFER_PADDING * 2); |
29 | 4.13k | loop->data.ssl_data = 0; |
30 | 4.13k | loop->data.head = 0; |
31 | 4.13k | loop->data.iterator = 0; |
32 | 4.13k | loop->data.closed_head = 0; |
33 | 4.13k | loop->data.low_prio_head = 0; |
34 | 4.13k | loop->data.low_prio_budget = 0; |
35 | | |
36 | 4.13k | loop->data.pre_cb = pre_cb; |
37 | 4.13k | loop->data.post_cb = post_cb; |
38 | 4.13k | loop->data.iteration_nr = 0; |
39 | | |
40 | 4.13k | loop->data.wakeup_async = us_internal_create_async(loop, 1, 0); |
41 | 4.13k | us_internal_async_set(loop->data.wakeup_async, (void (*)(struct us_internal_async *)) wakeup_cb); |
42 | 4.13k | } |
43 | | |
44 | 4.13k | void us_internal_loop_data_free(struct us_loop_t *loop) { |
45 | | #ifndef LIBUS_NO_SSL |
46 | | us_internal_free_loop_ssl_data(loop); |
47 | | #endif |
48 | | |
49 | 4.13k | free(loop->data.recv_buf); |
50 | | |
51 | 4.13k | us_timer_close(loop->data.sweep_timer); |
52 | 4.13k | us_internal_async_close(loop->data.wakeup_async); |
53 | 4.13k | } |
54 | | |
55 | 0 | void us_wakeup_loop(struct us_loop_t *loop) { |
56 | 0 | us_internal_async_wakeup(loop->data.wakeup_async); |
57 | 0 | } |
58 | | |
59 | 8.27k | void us_internal_loop_link(struct us_loop_t *loop, struct us_socket_context_t *context) { |
60 | | /* Insert this context as the head of loop */ |
61 | 8.27k | context->next = loop->data.head; |
62 | 8.27k | context->prev = 0; |
63 | 8.27k | if (loop->data.head) { |
64 | 4.13k | loop->data.head->prev = context; |
65 | 4.13k | } |
66 | 8.27k | loop->data.head = context; |
67 | 8.27k | } |
68 | | |
69 | | /* Unlink is called before free */ |
70 | 8.27k | void us_internal_loop_unlink(struct us_loop_t *loop, struct us_socket_context_t *context) { |
71 | 8.27k | if (loop->data.head == context) { |
72 | 4.13k | loop->data.head = context->next; |
73 | 4.13k | if (loop->data.head) { |
74 | 0 | loop->data.head->prev = 0; |
75 | 0 | } |
76 | 4.13k | } else { |
77 | 4.13k | context->prev->next = context->next; |
78 | 4.13k | if (context->next) { |
79 | 0 | context->next->prev = context->prev; |
80 | 0 | } |
81 | 4.13k | } |
82 | 8.27k | } |
83 | | |
84 | | /* This functions should never run recursively */ |
85 | 1.02M | void us_internal_timer_sweep(struct us_loop_t *loop) { |
86 | 1.02M | struct us_internal_loop_data_t *loop_data = &loop->data; |
87 | | /* For all socket contexts in this loop */ |
88 | 3.07M | for (loop_data->iterator = loop_data->head; loop_data->iterator; loop_data->iterator = loop_data->iterator->next) { |
89 | | |
90 | 2.04M | struct us_socket_context_t *context = loop_data->iterator; |
91 | | |
92 | | /* Update this context's timestamps (this could be moved to loop and done once) */ |
93 | 2.04M | context->global_tick++; |
94 | 2.04M | unsigned char short_ticks = context->timestamp = context->global_tick % 240; |
95 | 2.04M | unsigned char long_ticks = context->long_timestamp = (context->global_tick / 15) % 240; |
96 | | |
97 | | /* Begin at head */ |
98 | 2.04M | struct us_socket_t *s = context->head_sockets; |
99 | 2.05M | while (s) { |
100 | | /* Seek until end or timeout found (tightest loop) */ |
101 | 151k | while (1) { |
102 | | /* We only read from 1 random cache line here */ |
103 | 151k | if (short_ticks == s->timeout || long_ticks == s->long_timeout) { |
104 | 3.35k | break; |
105 | 3.35k | } |
106 | | |
107 | | /* Did we reach the end without a find? */ |
108 | 148k | if ((s = s->next) == 0) { |
109 | 56.8k | goto next_context; |
110 | 56.8k | } |
111 | 148k | } |
112 | | |
113 | | /* Here we have a timeout to emit (slow path) */ |
114 | 3.35k | context->iterator = s; |
115 | | |
116 | 3.35k | if (short_ticks == s->timeout) { |
117 | 3.35k | s->timeout = 255; |
118 | 3.35k | context->on_socket_timeout(s); |
119 | 3.35k | } |
120 | | |
121 | 3.35k | if (context->iterator == s && long_ticks == s->long_timeout) { |
122 | 0 | s->long_timeout = 255; |
123 | 0 | context->on_socket_long_timeout(s); |
124 | 0 | } |
125 | | |
126 | | /* Check for unlink / link (if the event handler did not modify the chain, we step 1) */ |
127 | 3.35k | if (s == context->iterator) { |
128 | 0 | s = s->next; |
129 | 3.35k | } else { |
130 | | /* The iterator was changed by event handler */ |
131 | 3.35k | s = context->iterator; |
132 | 3.35k | } |
133 | 3.35k | } |
134 | | /* We always store a 0 to context->iterator here since we are no longer iterating this context */ |
135 | 2.04M | next_context: |
136 | 2.04M | context->iterator = 0; |
137 | 2.04M | } |
138 | 1.02M | } |
139 | | |
140 | | /* We do not want to block the loop with tons and tons of CPU-intensive work for SSL handshakes. |
141 | | * Spread it out during many loop iterations, prioritizing already open connections, they are far |
142 | | * easier on CPU */ |
143 | | static const int MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION = 5; |
144 | | |
145 | 1.31M | void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) { |
146 | 1.31M | struct us_internal_loop_data_t *loop_data = &loop->data; |
147 | 1.31M | struct us_socket_t *s; |
148 | | |
149 | 1.31M | loop_data->low_prio_budget = MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION; |
150 | | |
151 | 1.31M | for (s = loop_data->low_prio_head; s && loop_data->low_prio_budget > 0; s = loop_data->low_prio_head, loop_data->low_prio_budget--) { |
152 | | /* Unlink this socket from the low-priority queue */ |
153 | 0 | loop_data->low_prio_head = s->next; |
154 | 0 | if (s->next) s->next->prev = 0; |
155 | 0 | s->next = 0; |
156 | |
|
157 | 0 | us_internal_socket_context_link_socket(s->context, s); |
158 | 0 | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) | LIBUS_SOCKET_READABLE); |
159 | |
|
160 | 0 | s->low_prio_state = 2; |
161 | 0 | } |
162 | 1.31M | } |
163 | | |
164 | | /* Note: Properly takes the linked list and timeout sweep into account */ |
165 | 1.31M | void us_internal_free_closed_sockets(struct us_loop_t *loop) { |
166 | | /* Free all closed sockets (maybe it is better to reverse order?) */ |
167 | 1.31M | if (loop->data.closed_head) { |
168 | 1.25M | for (struct us_socket_t *s = loop->data.closed_head; s; ) { |
169 | 1.00M | struct us_socket_t *next = s->next; |
170 | 1.00M | us_poll_free((struct us_poll_t *) s, loop); |
171 | 1.00M | s = next; |
172 | 1.00M | } |
173 | 252k | loop->data.closed_head = 0; |
174 | 252k | } |
175 | 1.31M | } |
176 | | |
177 | 1.02M | void sweep_timer_cb(struct us_internal_callback_t *cb) { |
178 | 1.02M | us_internal_timer_sweep(cb->loop); |
179 | 1.02M | } |
180 | | |
181 | 0 | long long us_loop_iteration_number(struct us_loop_t *loop) { |
182 | 0 | return loop->data.iteration_nr; |
183 | 0 | } |
184 | | |
185 | | /* These may have somewhat different meaning depending on the underlying event library */ |
186 | 1.31M | void us_internal_loop_pre(struct us_loop_t *loop) { |
187 | 1.31M | loop->data.iteration_nr++; |
188 | 1.31M | us_internal_handle_low_priority_sockets(loop); |
189 | 1.31M | loop->data.pre_cb(loop); |
190 | 1.31M | } |
191 | | |
192 | 1.31M | void us_internal_loop_post(struct us_loop_t *loop) { |
193 | 1.31M | us_internal_free_closed_sockets(loop); |
194 | 1.31M | loop->data.post_cb(loop); |
195 | 1.31M | } |
196 | | |
197 | 5.09M | void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events) { |
198 | 5.09M | switch (us_internal_poll_type(p)) { |
199 | 3.00M | case POLL_TYPE_CALLBACK: { |
200 | 3.00M | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p; |
201 | | /* Timers, asyncs should accept (read), while UDP sockets should obviously not */ |
202 | 3.00M | if (!cb->leave_poll_ready) { |
203 | | /* Let's just have this macro to silence the CodeQL alert regarding empty function when using libuv */ |
204 | 3.00M | #ifndef LIBUS_USE_LIBUV |
205 | 3.00M | us_internal_accept_poll_event(p); |
206 | 3.00M | #endif |
207 | 3.00M | } |
208 | 3.00M | cb->cb(cb->cb_expects_the_loop ? (struct us_internal_callback_t *) cb->loop : (struct us_internal_callback_t *) &cb->p); |
209 | 3.00M | } |
210 | 3.00M | break; |
211 | 957k | case POLL_TYPE_SEMI_SOCKET: { |
212 | | /* Both connect and listen sockets are semi-sockets |
213 | | * but they poll for different events */ |
214 | 957k | if (us_poll_events(p) == LIBUS_SOCKET_WRITABLE) { |
215 | 0 | struct us_socket_t *s = (struct us_socket_t *) p; |
216 | | |
217 | | /* It is perfectly possible to come here with an error */ |
218 | 0 | if (error) { |
219 | | /* Emit error, close without emitting on_close */ |
220 | 0 | s->context->on_connect_error(s, 0); |
221 | 0 | us_socket_close_connecting(0, s); |
222 | 0 | } else { |
223 | | /* All sockets poll for readable */ |
224 | 0 | us_poll_change(p, s->context->loop, LIBUS_SOCKET_READABLE); |
225 | | |
226 | | /* We always use nodelay */ |
227 | 0 | bsd_socket_nodelay(us_poll_fd(p), 1); |
228 | | |
229 | | /* We are now a proper socket */ |
230 | 0 | us_internal_poll_set_type(p, POLL_TYPE_SOCKET); |
231 | | |
232 | | /* If we used a connection timeout we have to reset it here */ |
233 | 0 | us_socket_timeout(0, s, 0); |
234 | |
|
235 | 0 | s->context->on_open(s, 1, 0, 0); |
236 | 0 | } |
237 | 957k | } else { |
238 | 957k | struct us_listen_socket_t *listen_socket = (struct us_listen_socket_t *) p; |
239 | 957k | struct bsd_addr_t addr; |
240 | | |
241 | 957k | LIBUS_SOCKET_DESCRIPTOR client_fd = bsd_accept_socket(us_poll_fd(p), &addr); |
242 | 957k | if (client_fd == LIBUS_SOCKET_ERROR) { |
243 | | /* Todo: start timer here */ |
244 | | |
245 | 771k | } else { |
246 | | |
247 | | /* Todo: stop timer if any */ |
248 | | |
249 | 998k | do { |
250 | 998k | struct us_poll_t *accepted_p = us_create_poll(us_socket_context(0, &listen_socket->s)->loop, 0, sizeof(struct us_socket_t) - sizeof(struct us_poll_t) + listen_socket->socket_ext_size); |
251 | 998k | us_poll_init(accepted_p, client_fd, POLL_TYPE_SOCKET); |
252 | 998k | us_poll_start(accepted_p, listen_socket->s.context->loop, LIBUS_SOCKET_READABLE); |
253 | | |
254 | 998k | struct us_socket_t *s = (struct us_socket_t *) accepted_p; |
255 | | |
256 | 998k | s->context = listen_socket->s.context; |
257 | 998k | s->timeout = 255; |
258 | 998k | s->long_timeout = 255; |
259 | 998k | s->low_prio_state = 0; |
260 | | |
261 | | /* We always use nodelay */ |
262 | 998k | bsd_socket_nodelay(client_fd, 1); |
263 | | |
264 | 998k | us_internal_socket_context_link_socket(listen_socket->s.context, s); |
265 | | |
266 | 998k | listen_socket->s.context->on_open(s, 0, bsd_addr_get_ip(&addr), bsd_addr_get_ip_length(&addr)); |
267 | | |
268 | | /* Exit accept loop if listen socket was closed in on_open handler */ |
269 | 998k | if (us_socket_is_closed(0, &listen_socket->s)) { |
270 | 0 | break; |
271 | 0 | } |
272 | | |
273 | 998k | } while ((client_fd = bsd_accept_socket(us_poll_fd(p), &addr)) != LIBUS_SOCKET_ERROR); |
274 | 186k | } |
275 | 957k | } |
276 | 957k | } |
277 | 0 | break; |
278 | 1.29k | case POLL_TYPE_SOCKET_SHUT_DOWN: |
279 | 1.13M | case POLL_TYPE_SOCKET: { |
280 | | /* We should only use s, no p after this point */ |
281 | 1.13M | struct us_socket_t *s = (struct us_socket_t *) p; |
282 | | |
283 | | /* Such as epollerr epollhup */ |
284 | 1.13M | if (error) { |
285 | | /* Todo: decide what code we give here */ |
286 | 600k | s = us_socket_close(0, s, 0, NULL); |
287 | 600k | return; |
288 | 600k | } |
289 | | |
290 | 534k | if (events & LIBUS_SOCKET_WRITABLE) { |
291 | | /* Note: if we failed a write as a socket of one loop then adopted |
292 | | * to another loop, this will be wrong. Absurd case though */ |
293 | 19.0k | s->context->loop->data.last_write_failed = 0; |
294 | | |
295 | 19.0k | s = s->context->on_writable(s); |
296 | | |
297 | 19.0k | if (us_socket_is_closed(0, s)) { |
298 | 0 | return; |
299 | 0 | } |
300 | | |
301 | | /* If we have no failed write or if we shut down, then stop polling for more writable */ |
302 | 19.0k | if (!s->context->loop->data.last_write_failed || us_socket_is_shut_down(0, s)) { |
303 | 2.34k | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_READABLE); |
304 | 2.34k | } |
305 | 19.0k | } |
306 | | |
307 | 534k | if (events & LIBUS_SOCKET_READABLE) { |
308 | | /* Contexts may prioritize down sockets that are currently readable, e.g. when SSL handshake has to be done. |
309 | | * SSL handshakes are CPU intensive, so we limit the number of handshakes per loop iteration, and move the rest |
310 | | * to the low-priority queue */ |
311 | 522k | if (s->context->is_low_prio(s)) { |
312 | 0 | if (s->low_prio_state == 2) { |
313 | 0 | s->low_prio_state = 0; /* Socket has been delayed and now it's time to process incoming data for one iteration */ |
314 | 0 | } else if (s->context->loop->data.low_prio_budget > 0) { |
315 | 0 | s->context->loop->data.low_prio_budget--; /* Still having budget for this iteration - do normal processing */ |
316 | 0 | } else { |
317 | 0 | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE); |
318 | 0 | us_internal_socket_context_unlink_socket(s->context, s); |
319 | | |
320 | | /* Link this socket to the low-priority queue - we use a LIFO queue, to prioritize newer clients that are |
321 | | * maybe not already timeouted - sounds unfair, but works better in real-life with smaller client-timeouts |
322 | | * under high load */ |
323 | 0 | s->prev = 0; |
324 | 0 | s->next = s->context->loop->data.low_prio_head; |
325 | 0 | if (s->next) s->next->prev = s; |
326 | 0 | s->context->loop->data.low_prio_head = s; |
327 | |
|
328 | 0 | s->low_prio_state = 1; |
329 | |
|
330 | 0 | break; |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | 522k | int length = bsd_recv(us_poll_fd(&s->p), s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, LIBUS_RECV_BUFFER_LENGTH, 0); |
335 | 522k | if (length > 0) { |
336 | 457k | s = s->context->on_data(s, s->context->loop->data.recv_buf + LIBUS_RECV_BUFFER_PADDING, length); |
337 | 457k | } else if (!length) { |
338 | 56.4k | if (us_socket_is_shut_down(0, s)) { |
339 | | /* We got FIN back after sending it */ |
340 | | /* Todo: We should give "CLEAN SHUTDOWN" as reason here */ |
341 | 284 | s = us_socket_close(0, s, 0, NULL); |
342 | 56.1k | } else { |
343 | | /* We got FIN, so stop polling for readable */ |
344 | 56.1k | us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE); |
345 | 56.1k | s = s->context->on_end(s); |
346 | 56.1k | } |
347 | 56.4k | } else if (length == LIBUS_SOCKET_ERROR && !bsd_would_block()) { |
348 | | /* Todo: decide also here what kind of reason we should give */ |
349 | 0 | s = us_socket_close(0, s, 0, NULL); |
350 | 0 | } |
351 | 522k | } |
352 | 534k | } |
353 | 534k | break; |
354 | 5.09M | } |
355 | 5.09M | } |
356 | | |
357 | | /* Integration only requires the timer to be set up */ |
358 | 4.13k | void us_loop_integrate(struct us_loop_t *loop) { |
359 | 4.13k | us_timer_set(loop->data.sweep_timer, (void (*)(struct us_timer_t *)) sweep_timer_cb, LIBUS_TIMEOUT_GRANULARITY * 1000, LIBUS_TIMEOUT_GRANULARITY * 1000); |
360 | 4.13k | } |
361 | | |
362 | 7.33M | void *us_loop_ext(struct us_loop_t *loop) { |
363 | 7.33M | return loop + 1; |
364 | 7.33M | } |
365 | | |
366 | | #endif |