Coverage Report

Created: 2023-06-06 06:17

/src/uWebSockets/uSockets/src/eventing/epoll_kqueue.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2019.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#include "libusockets.h"
19
#include "internal/internal.h"
20
#include <stdlib.h>
21
22
#if defined(LIBUS_USE_EPOLL) || defined(LIBUS_USE_KQUEUE)
23
24
/* Cannot include this one on Windows */
25
#include <unistd.h>
26
27
#ifdef LIBUS_USE_EPOLL
28
10.4M
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].data.ptr
29
1.47M
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].data.ptr = poll
30
#else
31
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].udata
32
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].udata = poll
33
#endif
34
35
/* Loop */
36
5.52k
void us_loop_free(struct us_loop_t *loop) {
37
5.52k
    us_internal_loop_data_free(loop);
38
5.52k
    close(loop->fd);
39
5.52k
    free(loop);
40
5.52k
}
41
42
/* Poll */
43
1.52M
struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
44
1.52M
    if (!fallthrough) {
45
1.51M
        loop->num_polls++;
46
1.51M
    }
47
1.52M
    return malloc(sizeof(struct us_poll_t) + ext_size);
48
1.52M
}
49
50
/* Todo: this one should be us_internal_poll_free */
51
1.52M
void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) {
52
1.52M
    loop->num_polls--;
53
1.52M
    free(p);
54
1.52M
}
55
56
0
void *us_poll_ext(struct us_poll_t *p) {
57
0
    return p + 1;
58
0
}
59
60
/* Todo: why have us_poll_create AND us_poll_init!? libuv legacy! */
61
1.52M
void us_poll_init(struct us_poll_t *p, LIBUS_SOCKET_DESCRIPTOR fd, int poll_type) {
62
1.52M
    p->state.fd = fd;
63
1.52M
    p->state.poll_type = poll_type;
64
1.52M
}
65
66
13.4M
int us_poll_events(struct us_poll_t *p) {
67
13.4M
    return ((p->state.poll_type & POLL_TYPE_POLLING_IN) ? LIBUS_SOCKET_READABLE : 0) | ((p->state.poll_type & POLL_TYPE_POLLING_OUT) ? LIBUS_SOCKET_WRITABLE : 0);
68
13.4M
}
69
70
11.5M
LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) {
71
11.5M
    return p->state.fd;
72
11.5M
}
73
74
/* Returns any of listen socket, socket, shut down socket or callback */
75
12.3M
int us_internal_poll_type(struct us_poll_t *p) {
76
12.3M
    return p->state.poll_type & 3;
77
12.3M
}
78
79
/* Bug: doesn't really SET, rather read and change, so needs to be inited first! */
80
234k
void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) {
81
234k
    p->state.poll_type = poll_type | (p->state.poll_type & 12);
82
234k
}
83
84
/* Timer */
85
1.82M
void *us_timer_ext(struct us_timer_t *timer) {
86
1.82M
    return ((struct us_internal_callback_t *) timer) + 1;
87
1.82M
}
88
89
0
struct us_loop_t *us_timer_loop(struct us_timer_t *t) {
90
0
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
91
92
0
    return internal_cb->loop;
93
0
}
94
95
/* Loop */
96
5.52k
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
97
5.52k
    struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size);
98
5.52k
    loop->num_polls = 0;
99
    /* These could be accessed if we close a poll before starting the loop */
100
5.52k
    loop->num_ready_polls = 0;
101
5.52k
    loop->current_ready_poll = 0;
102
103
5.52k
#ifdef LIBUS_USE_EPOLL
104
5.52k
    loop->fd = epoll_create1(EPOLL_CLOEXEC);
105
#else
106
    loop->fd = kqueue();
107
#endif
108
109
5.52k
    us_internal_loop_data_init(loop, wakeup_cb, pre_cb, post_cb);
110
5.52k
    return loop;
111
5.52k
}
112
113
5.52k
void us_loop_run(struct us_loop_t *loop) {
114
5.52k
    us_loop_integrate(loop);
115
116
    /* While we have non-fallthrough polls we shouldn't fall through */
117
2.41M
    while (loop->num_polls) {
118
        /* Emit pre callback */
119
2.40M
        us_internal_loop_pre(loop);
120
121
        /* Fetch ready polls */
122
2.40M
#ifdef LIBUS_USE_EPOLL
123
2.40M
        loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
124
#else
125
        loop->num_ready_polls = kevent(loop->fd, NULL, 0, loop->ready_polls, 1024, NULL);
126
#endif
127
128
        /* Iterate ready polls, dispatching them by type */
129
11.3M
        for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
130
8.91M
            struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
131
            /* Any ready poll marked with nullptr will be ignored */
132
8.91M
            if (poll) {
133
8.91M
#ifdef LIBUS_USE_EPOLL
134
8.91M
                int events = loop->ready_polls[loop->current_ready_poll].events;
135
8.91M
                int error = loop->ready_polls[loop->current_ready_poll].events & (EPOLLERR | EPOLLHUP);
136
#else
137
                /* EVFILT_READ, EVFILT_TIME, EVFILT_USER are all mapped to LIBUS_SOCKET_READABLE */
138
                int events = LIBUS_SOCKET_READABLE;
139
                if (loop->ready_polls[loop->current_ready_poll].filter == EVFILT_WRITE) {
140
                    events = LIBUS_SOCKET_WRITABLE;
141
                }
142
                int error = loop->ready_polls[loop->current_ready_poll].flags & (EV_ERROR | EV_EOF);
143
#endif
144
                /* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
145
8.91M
                events &= us_poll_events(poll);
146
8.91M
                if (events || error) {
147
8.91M
                    us_internal_dispatch_ready_poll(poll, error, events);
148
8.91M
                }
149
8.91M
            }
150
8.91M
        }
151
        /* Emit post callback */
152
2.40M
        us_internal_loop_post(loop);
153
2.40M
    }
154
5.52k
}
155
156
1.52M
void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop, struct us_poll_t *old_poll, struct us_poll_t *new_poll, int old_events, int new_events) {
157
1.52M
#ifdef LIBUS_USE_EPOLL
158
    /* Epoll only has one ready poll per poll */
159
1.52M
    int num_entries_possibly_remaining = 1;
160
#else
161
    /* Ready polls may contain same poll twice under kqueue, as one poll may hold two filters */
162
    int num_entries_possibly_remaining = 2;//((old_events & LIBUS_SOCKET_READABLE) ? 1 : 0) + ((old_events & LIBUS_SOCKET_WRITABLE) ? 1 : 0);
163
#endif
164
165
    /* Todo: for kqueue if we track things in us_change_poll it is possible to have a fast path with no seeking in cases of:
166
    * current poll being us AND we only poll for one thing */
167
168
3.10M
    for (int i = loop->current_ready_poll; i < loop->num_ready_polls && num_entries_possibly_remaining; i++) {
169
1.57M
        if (GET_READY_POLL(loop, i) == old_poll) {
170
171
            // if new events does not contain the ready events of this poll then remove (no we filter that out later on)
172
1.47M
            SET_READY_POLL(loop, i, new_poll);
173
174
1.47M
            num_entries_possibly_remaining--;
175
1.47M
        }
176
1.57M
    }
177
1.52M
}
178
179
/* Poll */
180
181
#ifdef LIBUS_USE_KQUEUE
182
/* Helper function for setting or updating EVFILT_READ and EVFILT_WRITE */
183
int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_data) {
184
    struct kevent change_list[2];
185
    int change_length = 0;
186
187
    /* Do they differ in readable? */
188
    if ((new_events & LIBUS_SOCKET_READABLE) != (old_events & LIBUS_SOCKET_READABLE)) {
189
        EV_SET(&change_list[change_length++], fd, EVFILT_READ, (new_events & LIBUS_SOCKET_READABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data);
190
    }
191
192
    /* Do they differ in writable? */
193
    if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) {
194
        EV_SET(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data);
195
    }
196
197
    int ret = kevent(kqfd, change_list, change_length, NULL, 0, NULL);
198
199
    // ret should be 0 in most cases (not guaranteed when removing async)
200
201
    return ret;
202
}
203
#endif
204
205
108k
struct us_poll_t *us_poll_resize(struct us_poll_t *p, struct us_loop_t *loop, unsigned int ext_size) {
206
108k
    int events = us_poll_events(p);
207
208
108k
    struct us_poll_t *new_p = realloc(p, sizeof(struct us_poll_t) + ext_size);
209
108k
    if (p != new_p && events) {
210
0
#ifdef LIBUS_USE_EPOLL
211
        /* Hack: forcefully update poll by stripping away already set events */
212
0
        new_p->state.poll_type = us_internal_poll_type(new_p);
213
0
        us_poll_change(new_p, loop, events);
214
#else
215
        /* Forcefully update poll by resetting them with new_p as user data */
216
        kqueue_change(loop->fd, new_p->state.fd, 0, events, new_p);
217
#endif
218
219
        /* This is needed for epoll also (us_change_poll doesn't update the old poll) */
220
0
        us_internal_loop_update_pending_ready_polls(loop, p, new_p, events, events);
221
0
    }
222
223
108k
    return new_p;
224
108k
}
225
226
1.52M
void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) {
227
1.52M
    p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
228
229
1.52M
#ifdef LIBUS_USE_EPOLL
230
1.52M
    struct epoll_event event;
231
1.52M
    event.events = events;
232
1.52M
    event.data.ptr = p;
233
1.52M
    epoll_ctl(loop->fd, EPOLL_CTL_ADD, p->state.fd, &event);
234
#else
235
    kqueue_change(loop->fd, p->state.fd, 0, events, p);
236
#endif
237
1.52M
}
238
239
794k
void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) {
240
794k
    int old_events = us_poll_events(p);
241
794k
    if (old_events != events) {
242
243
659k
        p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
244
245
659k
#ifdef LIBUS_USE_EPOLL
246
659k
        struct epoll_event event;
247
659k
        event.events = events;
248
659k
        event.data.ptr = p;
249
659k
        epoll_ctl(loop->fd, EPOLL_CTL_MOD, p->state.fd, &event);
250
#else
251
        kqueue_change(loop->fd, p->state.fd, old_events, events, p);
252
#endif
253
        /* Set all removed events to null-polls in pending ready poll list */
254
        //us_internal_loop_update_pending_ready_polls(loop, p, p, old_events, events);
255
659k
    }
256
794k
}
257
258
1.52M
void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) {
259
1.52M
    int old_events = us_poll_events(p);
260
1.52M
    int new_events = 0;
261
1.52M
#ifdef LIBUS_USE_EPOLL
262
1.52M
    struct epoll_event event;
263
1.52M
    epoll_ctl(loop->fd, EPOLL_CTL_DEL, p->state.fd, &event);
264
#else
265
    if (old_events) {
266
        kqueue_change(loop->fd, p->state.fd, old_events, new_events, NULL);
267
    }
268
#endif
269
270
    /* Disable any instance of us in the pending ready poll list */
271
1.52M
    us_internal_loop_update_pending_ready_polls(loop, p, 0, old_events, new_events);
272
1.52M
}
273
274
5.49M
unsigned int us_internal_accept_poll_event(struct us_poll_t *p) {
275
5.49M
#ifdef LIBUS_USE_EPOLL
276
5.49M
    int fd = us_poll_fd(p);
277
5.49M
    uint64_t buf;
278
5.49M
    int read_length = read(fd, &buf, 8);
279
5.49M
    (void)read_length;
280
5.49M
    return buf;
281
#else
282
    /* Kqueue has no underlying FD for timers or user events */
283
    return 0;
284
#endif
285
5.49M
}
286
287
/* Timer */
288
#ifdef LIBUS_USE_EPOLL
289
11.0k
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
290
11.0k
    struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size);
291
11.0k
    int timerfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
292
11.0k
    if (timerfd == -1) {
293
0
      return NULL;
294
0
    }
295
11.0k
    us_poll_init(p, timerfd, POLL_TYPE_CALLBACK);
296
297
11.0k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
298
11.0k
    cb->loop = loop;
299
11.0k
    cb->cb_expects_the_loop = 0;
300
11.0k
    cb->leave_poll_ready = 0;
301
302
11.0k
    return (struct us_timer_t *) cb;
303
11.0k
}
304
#else
305
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
306
    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size);
307
308
    cb->loop = loop;
309
    cb->cb_expects_the_loop = 0;
310
    cb->leave_poll_ready = 0;
311
312
    /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
313
    cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
314
    us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
315
316
    if (!fallthrough) {
317
        loop->num_polls++;
318
    }
319
320
    return (struct us_timer_t *) cb;
321
}
322
#endif
323
324
#ifdef LIBUS_USE_EPOLL
325
11.0k
void us_timer_close(struct us_timer_t *timer) {
326
11.0k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;
327
328
11.0k
    us_poll_stop(&cb->p, cb->loop);
329
11.0k
    close(us_poll_fd(&cb->p));
330
331
    /* (regular) sockets are the only polls which are not freed immediately */
332
11.0k
    us_poll_free((struct us_poll_t *) timer, cb->loop);
333
11.0k
}
334
335
11.0k
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
336
11.0k
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
337
338
11.0k
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
339
340
11.0k
    struct itimerspec timer_spec = {
341
11.0k
        {repeat_ms / 1000, (long) (repeat_ms % 1000) * (long) 1000000},
342
11.0k
        {ms / 1000, (long) (ms % 1000) * (long) 1000000}
343
11.0k
    };
344
345
11.0k
    timerfd_settime(us_poll_fd((struct us_poll_t *) t), 0, &timer_spec, NULL);
346
11.0k
    us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
347
11.0k
}
348
#else
349
void us_timer_close(struct us_timer_t *timer) {
350
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;
351
352
    struct kevent event;
353
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, internal_cb);
354
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
355
356
    /* (regular) sockets are the only polls which are not freed immediately */
357
    us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
358
}
359
360
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
361
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
362
363
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
364
365
    /* Bug: repeat_ms must be the same as ms, or 0 */
366
    struct kevent event;
367
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_ADD | (repeat_ms ? 0 : EV_ONESHOT), 0, ms, internal_cb);
368
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
369
}
370
#endif
371
372
/* Async (internal helper for loop's wakeup feature) */
373
#ifdef LIBUS_USE_EPOLL
374
5.52k
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
375
5.52k
    struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size);
376
5.52k
    us_poll_init(p, eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC), POLL_TYPE_CALLBACK);
377
378
5.52k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
379
5.52k
    cb->loop = loop;
380
5.52k
    cb->cb_expects_the_loop = 1;
381
5.52k
    cb->leave_poll_ready = 0;
382
383
5.52k
    return (struct us_internal_async *) cb;
384
5.52k
}
385
386
// identical code as for timer, make it shared for "callback types"
387
5.52k
void us_internal_async_close(struct us_internal_async *a) {
388
5.52k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) a;
389
390
5.52k
    us_poll_stop(&cb->p, cb->loop);
391
5.52k
    close(us_poll_fd(&cb->p));
392
393
    /* (regular) sockets are the only polls which are not freed immediately */
394
5.52k
    us_poll_free((struct us_poll_t *) a, cb->loop);
395
5.52k
}
396
397
5.52k
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
398
5.52k
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
399
400
5.52k
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
401
402
5.52k
    us_poll_start((struct us_poll_t *) a, internal_cb->loop, LIBUS_SOCKET_READABLE);
403
5.52k
}
404
405
4.18k
void us_internal_async_wakeup(struct us_internal_async *a) {
406
4.18k
    uint64_t one = 1;
407
4.18k
    int written = write(us_poll_fd((struct us_poll_t *) a), &one, 8);
408
4.18k
    (void)written;
409
4.18k
}
410
#else
411
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
412
    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size);
413
414
    cb->loop = loop;
415
    cb->cb_expects_the_loop = 1;
416
    cb->leave_poll_ready = 0;
417
418
    /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
419
    cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
420
    us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
421
422
    if (!fallthrough) {
423
        loop->num_polls++;
424
    }
425
426
    return (struct us_internal_async *) cb;
427
}
428
429
// identical code as for timer, make it shared for "callback types"
430
void us_internal_async_close(struct us_internal_async *a) {
431
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
432
433
    /* Note: This will fail most of the time as there probably is no pending trigger */
434
    struct kevent event;
435
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_DELETE, 0, 0, internal_cb);
436
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
437
438
    /* (regular) sockets are the only polls which are not freed immediately */
439
    us_poll_free((struct us_poll_t *) a, internal_cb->loop);
440
}
441
442
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
443
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
444
445
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
446
}
447
448
void us_internal_async_wakeup(struct us_internal_async *a) {
449
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
450
451
    /* In kqueue you really only need to add a triggered oneshot event */
452
    struct kevent event;
453
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_ADD | EV_ONESHOT, NOTE_TRIGGER, 0, internal_cb);
454
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
455
}
456
#endif
457
458
#endif