Coverage Report

Created: 2026-06-07 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/uWebSockets/uSockets/src/eventing/epoll_kqueue.c
Line
Count
Source
1
/*
2
 * Authored by Alex Hultman, 2018-2019.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#include "libusockets.h"
19
#include "internal/internal.h"
20
#include <stdlib.h>
21
22
#if defined(LIBUS_USE_EPOLL) || defined(LIBUS_USE_KQUEUE)
23
24
/* Cannot include this one on Windows */
25
#include <unistd.h>
26
27
#ifdef LIBUS_USE_EPOLL
28
53.4M
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].data.ptr
29
4.78M
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].data.ptr = poll
30
#else
31
#define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].udata
32
#define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].udata = poll
33
#endif
34
35
/* Loop */
36
22.0k
void us_loop_free(struct us_loop_t *loop) {
37
22.0k
    us_internal_loop_data_free(loop);
38
22.0k
    close(loop->fd);
39
22.0k
    free(loop);
40
22.0k
}
41
42
/* Poll */
43
4.92M
struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
44
4.92M
    if (!fallthrough) {
45
4.85M
        loop->num_polls++;
46
4.85M
    }
47
4.92M
    return malloc(sizeof(struct us_poll_t) + ext_size);
48
4.92M
}
49
50
/* Todo: this one should be us_internal_poll_free */
51
4.92M
void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) {
52
4.92M
    loop->num_polls--;
53
4.92M
    free(p);
54
4.92M
}
55
56
38.7k
void *us_poll_ext(struct us_poll_t *p) {
57
38.7k
    return p + 1;
58
38.7k
}
59
60
/* Todo: why have us_poll_create AND us_poll_init!? libuv legacy! */
61
4.92M
void us_poll_init(struct us_poll_t *p, LIBUS_SOCKET_DESCRIPTOR fd, int poll_type) {
62
4.92M
    p->state.fd = fd;
63
4.92M
    p->state.poll_type = poll_type;
64
4.92M
}
65
66
69.6M
int us_poll_events(struct us_poll_t *p) {
67
69.6M
    return ((p->state.poll_type & POLL_TYPE_POLLING_IN) ? LIBUS_SOCKET_READABLE : 0) | ((p->state.poll_type & POLL_TYPE_POLLING_OUT) ? LIBUS_SOCKET_WRITABLE : 0);
68
69.6M
}
69
70
59.0M
LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) {
71
59.0M
    return p->state.fd;
72
59.0M
}
73
74
/* Returns any of listen socket, socket, shut down socket or callback */
75
63.5M
int us_internal_poll_type(struct us_poll_t *p) {
76
63.5M
    return p->state.poll_type & 3;
77
63.5M
}
78
79
/* Bug: doesn't really SET, rather read and change, so needs to be inited first! */
80
1.22M
void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) {
81
1.22M
    p->state.poll_type = poll_type | (p->state.poll_type & 12);
82
1.22M
}
83
84
/* Timer */
85
10.4M
void *us_timer_ext(struct us_timer_t *timer) {
86
10.4M
    return ((struct us_internal_callback_t *) timer) + 1;
87
10.4M
}
88
89
0
struct us_loop_t *us_timer_loop(struct us_timer_t *t) {
90
0
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
91
92
0
    return internal_cb->loop;
93
0
}
94
95
/* Loop */
96
22.0k
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
97
22.0k
    struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size);
98
22.0k
    loop->num_polls = 0;
99
    /* These could be accessed if we close a poll before starting the loop */
100
22.0k
    loop->num_ready_polls = 0;
101
22.0k
    loop->current_ready_poll = 0;
102
103
22.0k
#ifdef LIBUS_USE_EPOLL
104
22.0k
    loop->fd = epoll_create1(EPOLL_CLOEXEC);
105
#else
106
    loop->fd = kqueue();
107
#endif
108
109
22.0k
    us_internal_loop_data_init(loop, wakeup_cb, pre_cb, post_cb);
110
22.0k
    return loop;
111
22.0k
}
112
113
22.0k
void us_loop_run(struct us_loop_t *loop) {
114
22.0k
    us_loop_integrate(loop);
115
116
    /* While we have non-fallthrough polls we shouldn't fall through */
117
12.7M
    while (loop->num_polls) {
118
        /* Emit pre callback */
119
12.7M
        us_internal_loop_pre(loop);
120
121
        /* Fetch ready polls */
122
12.7M
#ifdef LIBUS_USE_EPOLL
123
12.7M
        loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1);
124
#else
125
        loop->num_ready_polls = kevent(loop->fd, NULL, 0, loop->ready_polls, 1024, NULL);
126
#endif
127
128
        /* Iterate ready polls, dispatching them by type */
129
61.2M
        for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) {
130
48.5M
            struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll);
131
            /* Any ready poll marked with nullptr will be ignored */
132
48.5M
            if (poll) {
133
48.5M
#ifdef LIBUS_USE_EPOLL
134
48.5M
                int events = loop->ready_polls[loop->current_ready_poll].events;
135
48.5M
                int error = loop->ready_polls[loop->current_ready_poll].events & (EPOLLERR | EPOLLHUP);
136
#else
137
                /* EVFILT_READ, EVFILT_TIME, EVFILT_USER are all mapped to LIBUS_SOCKET_READABLE */
138
                int events = LIBUS_SOCKET_READABLE;
139
                if (loop->ready_polls[loop->current_ready_poll].filter == EVFILT_WRITE) {
140
                    events = LIBUS_SOCKET_WRITABLE;
141
                }
142
                int error = loop->ready_polls[loop->current_ready_poll].flags & (EV_ERROR | EV_EOF);
143
#endif
144
                /* Always filter all polls by what they actually poll for (callback polls always poll for readable) */
145
48.5M
                events &= us_poll_events(poll);
146
48.5M
                if (events || error) {
147
48.5M
                    us_internal_dispatch_ready_poll(poll, error, events);
148
48.5M
                }
149
48.5M
            }
150
48.5M
        }
151
        /* Emit post callback */
152
12.7M
        us_internal_loop_post(loop);
153
12.7M
    }
154
22.0k
}
155
156
4.92M
void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop, struct us_poll_t *old_poll, struct us_poll_t *new_poll, int old_events, int new_events) {
157
4.92M
#ifdef LIBUS_USE_EPOLL
158
    /* Epoll only has one ready poll per poll */
159
4.92M
    int num_entries_possibly_remaining = 1;
160
#else
161
    /* Ready polls may contain same poll twice under kqueue, as one poll may hold two filters */
162
    int num_entries_possibly_remaining = 2;//((old_events & LIBUS_SOCKET_READABLE) ? 1 : 0) + ((old_events & LIBUS_SOCKET_WRITABLE) ? 1 : 0);
163
#endif
164
165
    /* Todo: for kqueue if we track things in us_change_poll it is possible to have a fast path with no seeking in cases of:
166
    * current poll being us AND we only poll for one thing */
167
168
9.85M
    for (int i = loop->current_ready_poll; i < loop->num_ready_polls && num_entries_possibly_remaining; i++) {
169
4.93M
        if (GET_READY_POLL(loop, i) == old_poll) {
170
171
            // if new events does not contain the ready events of this poll then remove (no we filter that out later on)
172
4.78M
            SET_READY_POLL(loop, i, new_poll);
173
174
4.78M
            num_entries_possibly_remaining--;
175
4.78M
        }
176
4.93M
    }
177
4.92M
}
178
179
/* Poll */
180
181
#ifdef LIBUS_USE_KQUEUE
182
/* Helper function for setting or updating EVFILT_READ and EVFILT_WRITE */
183
int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_data) {
184
    struct kevent change_list[2];
185
    int change_length = 0;
186
187
    /* Do they differ in readable? */
188
    if ((new_events & LIBUS_SOCKET_READABLE) != (old_events & LIBUS_SOCKET_READABLE)) {
189
        EV_SET(&change_list[change_length++], fd, EVFILT_READ, (new_events & LIBUS_SOCKET_READABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data);
190
    }
191
192
    /* Do they differ in writable? */
193
    if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) {
194
        EV_SET(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data);
195
    }
196
197
    int ret = kevent(kqfd, change_list, change_length, NULL, 0, NULL);
198
199
    // ret should be 0 in most cases (not guaranteed when removing async)
200
201
    return ret;
202
}
203
#endif
204
205
360k
struct us_poll_t *us_poll_resize(struct us_poll_t *p, struct us_loop_t *loop, unsigned int ext_size) {
206
360k
    int events = us_poll_events(p);
207
208
360k
    struct us_poll_t *new_p = realloc(p, sizeof(struct us_poll_t) + ext_size);
209
360k
    if (p != new_p && events) {
210
0
#ifdef LIBUS_USE_EPOLL
211
        /* Hack: forcefully update poll by stripping away already set events */
212
0
        new_p->state.poll_type = us_internal_poll_type(new_p);
213
0
        us_poll_change(new_p, loop, events);
214
#else
215
        /* Forcefully update poll by resetting them with new_p as user data */
216
        kqueue_change(loop->fd, new_p->state.fd, 0, events, new_p);
217
#endif
218
219
        /* This is needed for epoll also (us_change_poll doesn't update the old poll) */
220
0
        us_internal_loop_update_pending_ready_polls(loop, p, new_p, events, events);
221
0
    }
222
223
360k
    return new_p;
224
360k
}
225
226
4.92M
void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) {
227
4.92M
    p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
228
229
4.92M
#ifdef LIBUS_USE_EPOLL
230
4.92M
    struct epoll_event event;
231
4.92M
    event.events = events;
232
4.92M
    event.data.ptr = p;
233
4.92M
    epoll_ctl(loop->fd, EPOLL_CTL_ADD, p->state.fd, &event);
234
#else
235
    kqueue_change(loop->fd, p->state.fd, 0, events, p);
236
#endif
237
4.92M
}
238
239
4.00M
void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) {
240
4.00M
    int old_events = us_poll_events(p);
241
4.00M
    if (old_events != events) {
242
243
3.08M
        p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0);
244
245
3.08M
#ifdef LIBUS_USE_EPOLL
246
3.08M
        struct epoll_event event;
247
3.08M
        event.events = events;
248
3.08M
        event.data.ptr = p;
249
3.08M
        epoll_ctl(loop->fd, EPOLL_CTL_MOD, p->state.fd, &event);
250
#else
251
        kqueue_change(loop->fd, p->state.fd, old_events, events, p);
252
#endif
253
        /* Set all removed events to null-polls in pending ready poll list */
254
        //us_internal_loop_update_pending_ready_polls(loop, p, p, old_events, events);
255
3.08M
    }
256
4.00M
}
257
258
4.92M
void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) {
259
4.92M
    int old_events = us_poll_events(p);
260
4.92M
    int new_events = 0;
261
4.92M
#ifdef LIBUS_USE_EPOLL
262
4.92M
    struct epoll_event event;
263
4.92M
    epoll_ctl(loop->fd, EPOLL_CTL_DEL, p->state.fd, &event);
264
#else
265
    if (old_events) {
266
        kqueue_change(loop->fd, p->state.fd, old_events, new_events, NULL);
267
    }
268
#endif
269
270
    /* Disable any instance of us in the pending ready poll list */
271
4.92M
    us_internal_loop_update_pending_ready_polls(loop, p, 0, old_events, new_events);
272
4.92M
}
273
274
31.6M
unsigned int us_internal_accept_poll_event(struct us_poll_t *p) {
275
31.6M
#ifdef LIBUS_USE_EPOLL
276
31.6M
    int fd = us_poll_fd(p);
277
31.6M
    uint64_t buf;
278
31.6M
    int read_length = read(fd, &buf, 8);
279
31.6M
    (void)read_length;
280
31.6M
    return buf;
281
#else
282
    /* Kqueue has no underlying FD for timers or user events */
283
    return 0;
284
#endif
285
31.6M
}
286
287
/* Timer */
288
#ifdef LIBUS_USE_EPOLL
289
44.1k
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
290
44.1k
    struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) - sizeof(struct us_poll_t) + ext_size);
291
44.1k
    int timerfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC);
292
44.1k
    if (timerfd == -1) {
293
0
      return NULL;
294
0
    }
295
44.1k
    us_poll_init(p, timerfd, POLL_TYPE_CALLBACK);
296
297
44.1k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
298
44.1k
    cb->loop = loop;
299
44.1k
    cb->cb_expects_the_loop = 0;
300
44.1k
    cb->leave_poll_ready = 0;
301
302
44.1k
    return (struct us_timer_t *) cb;
303
44.1k
}
304
#else
305
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
306
    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size);
307
308
    cb->loop = loop;
309
    cb->cb_expects_the_loop = 0;
310
    cb->leave_poll_ready = 0;
311
312
    /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
313
    cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
314
    us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
315
316
    if (!fallthrough) {
317
        loop->num_polls++;
318
    }
319
320
    return (struct us_timer_t *) cb;
321
}
322
#endif
323
324
#ifdef LIBUS_USE_EPOLL
325
44.1k
void us_timer_close(struct us_timer_t *timer) {
326
44.1k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;
327
328
44.1k
    us_poll_stop(&cb->p, cb->loop);
329
44.1k
    close(us_poll_fd(&cb->p));
330
331
    /* (regular) sockets are the only polls which are not freed immediately */
332
44.1k
    us_poll_free((struct us_poll_t *) timer, cb->loop);
333
44.1k
}
334
335
44.1k
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
336
44.1k
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
337
338
44.1k
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
339
340
44.1k
    struct itimerspec timer_spec = {
341
44.1k
        {repeat_ms / 1000, (long) (repeat_ms % 1000) * (long) 1000000},
342
44.1k
        {ms / 1000, (long) (ms % 1000) * (long) 1000000}
343
44.1k
    };
344
345
44.1k
    timerfd_settime(us_poll_fd((struct us_poll_t *) t), 0, &timer_spec, NULL);
346
44.1k
    us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
347
44.1k
}
348
#else
349
void us_timer_close(struct us_timer_t *timer) {
350
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;
351
352
    struct kevent event;
353
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, internal_cb);
354
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
355
356
    /* (regular) sockets are the only polls which are not freed immediately */
357
    us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
358
}
359
360
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
361
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t;
362
363
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
364
365
    /* Bug: repeat_ms must be the same as ms, or 0 */
366
    struct kevent event;
367
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_ADD | (repeat_ms ? 0 : EV_ONESHOT), 0, ms, internal_cb);
368
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
369
}
370
#endif
371
372
/* Async (internal helper for loop's wakeup feature) */
373
#ifdef LIBUS_USE_EPOLL
374
22.0k
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
375
22.0k
    struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) - sizeof(struct us_poll_t) + ext_size);
376
22.0k
    us_poll_init(p, eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC), POLL_TYPE_CALLBACK);
377
378
22.0k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
379
22.0k
    cb->loop = loop;
380
22.0k
    cb->cb_expects_the_loop = 1;
381
22.0k
    cb->leave_poll_ready = 0;
382
383
22.0k
    return (struct us_internal_async *) cb;
384
22.0k
}
385
386
// identical code as for timer, make it shared for "callback types"
387
22.0k
void us_internal_async_close(struct us_internal_async *a) {
388
22.0k
    struct us_internal_callback_t *cb = (struct us_internal_callback_t *) a;
389
390
22.0k
    us_poll_stop(&cb->p, cb->loop);
391
22.0k
    close(us_poll_fd(&cb->p));
392
393
    /* (regular) sockets are the only polls which are not freed immediately */
394
22.0k
    us_poll_free((struct us_poll_t *) a, cb->loop);
395
22.0k
}
396
397
22.0k
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
398
22.0k
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
399
400
22.0k
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
401
402
22.0k
    us_poll_start((struct us_poll_t *) a, internal_cb->loop, LIBUS_SOCKET_READABLE);
403
22.0k
}
404
405
47.1k
void us_internal_async_wakeup(struct us_internal_async *a) {
406
47.1k
    uint64_t one = 1;
407
47.1k
    int written = write(us_poll_fd((struct us_poll_t *) a), &one, 8);
408
47.1k
    (void)written;
409
47.1k
}
410
#else
411
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
412
    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size);
413
414
    cb->loop = loop;
415
    cb->cb_expects_the_loop = 1;
416
    cb->leave_poll_ready = 0;
417
418
    /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */
419
    cb->p.state.poll_type = POLL_TYPE_POLLING_IN;
420
    us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK);
421
422
    if (!fallthrough) {
423
        loop->num_polls++;
424
    }
425
426
    return (struct us_internal_async *) cb;
427
}
428
429
// identical code as for timer, make it shared for "callback types"
430
void us_internal_async_close(struct us_internal_async *a) {
431
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
432
433
    /* Note: This will fail most of the time as there probably is no pending trigger */
434
    struct kevent event;
435
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_DELETE, 0, 0, internal_cb);
436
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
437
438
    /* (regular) sockets are the only polls which are not freed immediately */
439
    us_poll_free((struct us_poll_t *) a, internal_cb->loop);
440
}
441
442
void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) {
443
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
444
445
    internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb;
446
}
447
448
void us_internal_async_wakeup(struct us_internal_async *a) {
449
    struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
450
451
    /* In kqueue you really only need to add a triggered oneshot event */
452
    struct kevent event;
453
    EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_ADD | EV_ONESHOT, NOTE_TRIGGER, 0, internal_cb);
454
    kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL);
455
}
456
#endif
457
458
#endif