Coverage Report

Created: 2025-06-22 06:18

/src/h2o/lib/common/socket/evloop/epoll.c.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014 DeNA Co., Ltd.
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <assert.h>
23
#include <limits.h>
24
#include <stdio.h>
25
#include <sys/epoll.h>
26
#if H2O_USE_IO_URING
27
#include "h2o/io_uring.h"
28
#endif
29
30
#if 0
31
#define DEBUG_LOG(...) h2o_error_printf(__VA_ARGS__)
32
#else
33
#define DEBUG_LOG(...)
34
#endif
35
36
struct st_h2o_evloop_epoll_t {
37
    h2o_evloop_t super;
38
    int ep;
39
#if H2O_USE_IO_URING
40
    h2o_io_uring_t io_uring;
41
#endif
42
};
43
44
static int change_epoll_mode(struct st_h2o_evloop_socket_t *sock, uint32_t events)
45
9.46k
{
46
9.46k
    struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop;
47
9.46k
    struct epoll_event ev = {.events = events, .data = {.ptr = sock}};
48
9.46k
    int op, ret;
49
50
9.46k
    if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) {
51
9.23k
        op = EPOLL_CTL_ADD;
52
9.23k
        sock->_flags |= H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED;
53
9.23k
    } else {
54
234
        op = EPOLL_CTL_MOD;
55
234
    }
56
9.46k
    while ((ret = epoll_ctl(loop->ep, op, sock->fd, &ev)) != 0 && errno == EINTR)
57
0
        ;
58
9.46k
    return ret == 0;
59
9.46k
}
60
61
static int delete_from_epoll_on_close(struct st_h2o_evloop_socket_t *sock)
62
9.27k
{
63
9.27k
    struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop;
64
9.27k
    int ret;
65
66
9.27k
    if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0)
67
181
        return 1;
68
9.09k
    while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR)
69
0
        ;
70
9.09k
    return ret == 0;
71
9.27k
}
72
73
static int handle_zerocopy_notification(struct st_h2o_evloop_socket_t *sock)
74
0
{
75
0
#if H2O_USE_MSG_ZEROCOPY
76
0
    int made_progress = 0;
77
78
    /* Read the completion events and release buffers. `recvmmsg` with two entries is used as a cheap way of making sure that all
79
     * notifications are read from the queue (this requirement comes from the us eof edge trigger once the socket is closed). */
80
0
    while (1) {
81
0
        struct mmsghdr msg[2];
82
0
        char cbuf[2][CMSG_SPACE(sizeof(struct sock_extended_err))];
83
0
        for (size_t i = 0; i < PTLS_ELEMENTSOF(msg); ++i)
84
0
            msg[i] = (struct mmsghdr){.msg_hdr = {.msg_control = cbuf[i], .msg_controllen = sizeof(cbuf[i])}};
85
0
        struct timespec timeout = {0};
86
87
0
        ssize_t ret;
88
0
        while ((ret = recvmmsg(sock->fd, msg, PTLS_ELEMENTSOF(msg), MSG_ERRQUEUE, &timeout)) == -1 && errno == EINTR)
89
0
            ;
90
0
        if (ret == -1)
91
0
            break;
92
93
0
        for (size_t i = 0; i < ret; ++i) {
94
0
            struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg[i].msg_hdr);
95
0
            if (cmsg != NULL) {
96
0
                struct sock_extended_err *ee = (void *)CMSG_DATA(cmsg);
97
0
                if (ee->ee_errno == 0 && ee->ee_origin == SO_EE_ORIGIN_ZEROCOPY) {
98
                    /* for each range being obtained, convert the wrapped value to 64-bit, then release the memory */
99
0
                    for (uint32_t c32 = ee->ee_info; c32 <= ee->ee_data; ++c32) {
100
0
                        uint64_t c64 = (sock->super._zerocopy->first_counter & 0xffffffff00000000) | c32;
101
0
                        if (c64 < sock->super._zerocopy->first_counter)
102
0
                            c64 += 0x100000000;
103
0
                        void *p = zerocopy_buffers_release(sock->super._zerocopy, c64);
104
0
                        if (p != NULL) {
105
0
                            if (sock->super.ssl != NULL && p == sock->super.ssl->output.buf.base) {
106
                                /* buffer being released from zerocopy still has some pending data to be written */
107
0
                                assert(sock->super.ssl->output.zerocopy_owned);
108
0
                                sock->super.ssl->output.zerocopy_owned = 0;
109
0
                            } else {
110
0
                                h2o_mem_free_recycle(&h2o_socket_zerocopy_buffer_allocator, p);
111
0
                            }
112
0
                            --h2o_socket_num_zerocopy_buffers_inflight;
113
0
                        }
114
0
                    }
115
0
                }
116
0
            }
117
0
            made_progress = 1;
118
0
        }
119
120
        /* partial read means that the notification queue has become empty */
121
0
        if (ret < PTLS_ELEMENTSOF(msg))
122
0
            break;
123
0
    }
124
125
    /* if the socket has been shut down and zerocopy buffer has become empty, link the socket so that it would be destroyed */
126
0
    if (made_progress && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0 && zerocopy_buffers_is_empty(sock->super._zerocopy))
127
0
        link_to_statechanged(sock);
128
129
0
    return made_progress;
130
#else
131
    return 0;
132
#endif
133
0
}
134
135
static int update_status(struct st_h2o_evloop_epoll_t *loop)
136
23.5k
{
137
42.7k
    while (loop->super._statechanged.head != NULL) {
138
        /* detach the top */
139
19.1k
        struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head;
140
19.1k
        loop->super._statechanged.head = sock->_next_statechanged;
141
19.1k
        sock->_next_statechanged = sock;
142
        /* update the state */
143
19.1k
        if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) {
144
9.27k
            if (sock->super._zerocopy == NULL || zerocopy_buffers_is_empty(sock->super._zerocopy)) {
145
                /* Call close (2) and destroy, now that all zero copy buffers have been reclaimed. */
146
9.27k
                if (sock->super._zerocopy != NULL) {
147
0
                    zerocopy_buffers_dispose(sock->super._zerocopy);
148
0
                    free(sock->super._zerocopy);
149
0
                }
150
9.27k
                if (sock->fd != -1) {
151
0
                    if (!delete_from_epoll_on_close(sock))
152
0
                        h2o_error_printf("update_status: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd);
153
0
                    close(sock->fd);
154
0
                }
155
9.27k
                free(sock);
156
9.27k
            }
157
9.88k
        } else {
158
9.88k
            uint32_t events = 0;
159
9.88k
            int changed = 0;
160
9.88k
            if (h2o_socket_is_reading(&sock->super)) {
161
9.75k
                events |= EPOLLIN;
162
9.75k
                if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) {
163
9.23k
                    sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ;
164
9.23k
                    changed = 1;
165
9.23k
                }
166
9.75k
            } else {
167
134
                if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) {
168
134
                    sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ;
169
134
                    changed = 1;
170
134
                }
171
134
            }
172
9.88k
            if (h2o_socket_is_writing(&sock->super)) {
173
64
                events |= EPOLLOUT;
174
64
                if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) {
175
56
                    sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE;
176
56
                    changed = 1;
177
56
                }
178
9.82k
            } else {
179
9.82k
                if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) {
180
44
                    sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE;
181
44
                    changed = 1;
182
44
                }
183
9.82k
            }
184
9.88k
            if (changed) {
185
9.46k
                if (!change_epoll_mode(sock, events))
186
0
                    return -1;
187
9.46k
            }
188
9.88k
        }
189
19.1k
    }
190
23.5k
    loop->super._statechanged.tail_ref = &loop->super._statechanged.head;
191
192
23.5k
    return 0;
193
23.5k
}
194
195
int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait)
196
23.5k
{
197
23.5k
    struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop;
198
23.5k
    struct epoll_event events[256];
199
23.5k
    int nevents, i;
200
201
    /* collect (and update) status */
202
23.5k
    if (update_status(loop) != 0)
203
0
        return -1;
204
205
    /* poll */
206
23.5k
    max_wait = adjust_max_wait(&loop->super, max_wait);
207
23.5k
    nevents = epoll_wait(loop->ep, events, sizeof(events) / sizeof(events[0]), max_wait);
208
23.5k
    update_now(&loop->super);
209
23.5k
    if (nevents == -1)
210
1
        return -1;
211
212
23.5k
    if (nevents != 0) {
213
15.5k
        h2o_sliding_counter_start(&loop->super.exec_time_nanosec_counter, loop->super._now_nanosec);
214
15.5k
    }
215
216
    /* update readable flags, perform writes */
217
39.3k
    for (i = 0; i != nevents; ++i) {
218
15.8k
        struct st_h2o_evloop_socket_t *sock = events[i].data.ptr;
219
15.8k
        int notified = 0;
220
        /* When receiving HUP (indicating reset) while the socket is polled neither for read nor write, unregister the socket from
221
         * epoll, otherwise epoll_wait() would continue raising the HUP event. This problem cannot be avoided by using edge trigger.
222
         * The application will eventually try to read or write to the socket and at that point close the socket, detecting that it
223
         * has become unusable. */
224
15.8k
        if ((events[i].events & EPOLLHUP) != 0 &&
225
15.8k
            (sock->_flags & (H2O_SOCKET_FLAG_IS_POLLED_FOR_READ | H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE)) == 0 &&
226
15.8k
            !(sock->super._zerocopy != NULL && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0)) {
227
134
            assert((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) != 0);
228
134
            int ret;
229
134
            while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR)
230
0
                ;
231
134
            if (ret != 0)
232
0
                h2o_error_printf("failed to unregister socket (fd:%d) that raised HUP; errno=%d\n", sock->fd, errno);
233
134
            sock->_flags &= ~H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED;
234
134
            notified = 1;
235
134
        }
236
        /* If the error event was a zerocopy notification, hide the error notification to application. Doing so is fine because
237
         * level-triggered interface is used while the socket is open. If there is another type of pending error event, it would be
238
         * notified once we run out of zerocopy notifications. */
239
15.8k
        if ((events[i].events & EPOLLERR) != 0 && sock->super._zerocopy != NULL && handle_zerocopy_notification(sock)) {
240
0
            events[i].events &= ~EPOLLERR;
241
0
            notified = 1;
242
0
        }
243
        /* Handle read and write events. */
244
15.8k
        if ((events[i].events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) {
245
15.7k
            if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) {
246
15.6k
                sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY;
247
15.6k
                link_to_pending(sock);
248
15.6k
                notified = 1;
249
15.6k
            }
250
15.7k
        }
251
15.8k
        if ((events[i].events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) {
252
1.54k
            if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) {
253
64
                write_pending(sock);
254
64
                notified = 1;
255
64
            }
256
1.54k
        }
257
        /* Report events that could be notified, as that would help us debug issues. This mechanism is disabled once the socket is
258
         * closed, as there will be misfires due to the nature of edge triggers (race between us draining between events queued up).
259
         */
260
15.8k
        if (!notified && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0) {
261
0
            static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
262
0
            static time_t last_reported = 0;
263
0
            time_t now = time(NULL);
264
0
            pthread_mutex_lock(&lock);
265
0
            if (last_reported + 60 < now) {
266
0
                last_reported = now;
267
0
                h2o_error_printf("ignoring epoll event (fd:%d,event:0x%x,flags:0x%x)\n", sock->fd, (int)events[i].events,
268
0
                                 sock->_flags);
269
0
            }
270
0
            pthread_mutex_unlock(&lock);
271
0
        }
272
15.8k
    }
273
274
23.5k
    return 0;
275
23.5k
}
276
277
static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock)
278
9.27k
{
279
9.27k
}
280
281
static int evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock)
282
9.27k
{
283
9.27k
    int ret;
284
285
    /* Nothing to do if fd has been detached already. */
286
9.27k
    if (sock->fd == -1)
287
0
        return 0;
288
289
    /* If zero copy is in action, disconnect using shutdown(). Then, poll the socket until all zero copy buffers are reclaimed, at
290
     * which point we dispose of the socket. Edge trigger is used, as in level trigger EPOLLHUP will be notified continuously. */
291
9.27k
    if (sock->super._zerocopy != NULL && !zerocopy_buffers_is_empty(sock->super._zerocopy)) {
292
0
        while ((ret = shutdown(sock->fd, SHUT_RDWR)) == -1 && errno == EINTR)
293
0
            ;
294
0
        if (ret != 0 && errno != ENOTCONN)
295
0
            h2o_error_printf("socket_close: shutdown(SHUT_RDWR) failed; errno=%d, fd=%d\n", errno, sock->fd);
296
0
        if (!change_epoll_mode(sock, EPOLLET))
297
0
            h2o_fatal("socket_close: epoll_ctl(MOD) failed; errno=%d, fd=%d\n", errno, sock->fd);
298
        /* drain error notifications after registering the edge trigger, otherwise there's chance of stall */
299
0
        handle_zerocopy_notification(sock);
300
0
        return 1;
301
0
    }
302
303
    /* Unregister from epoll. */
304
9.27k
    if (!delete_from_epoll_on_close(sock))
305
0
        h2o_error_printf("socket_close: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd);
306
307
9.27k
    return 0;
308
9.27k
}
309
310
static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock)
311
0
{
312
0
    if (!delete_from_epoll_on_close(sock))
313
0
        h2o_error_printf("socket_export: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd);
314
0
}
315
316
static void evloop_do_dispose(h2o_evloop_t *_loop)
317
0
{
318
0
    struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop;
319
0
    close(loop->ep);
320
0
}
321
322
h2o_evloop_t *h2o_evloop_create(void)
323
1
{
324
1
    struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)create_evloop(sizeof(*loop));
325
326
1
    if ((loop->ep = epoll_create1(EPOLL_CLOEXEC)) == -1) {
327
0
        char buf[128];
328
0
        h2o_fatal("h2o_evloop_create: epoll_create1 failed:%d:%s\n", errno, h2o_strerror_r(errno, buf, sizeof(buf)));
329
0
    }
330
331
#if H2O_USE_IO_URING
332
    h2o_io_uring_init(&loop->super);
333
#endif
334
335
1
    return &loop->super;
336
1
}
337
338
#if H2O_USE_IO_URING
339
struct st_h2o_io_uring_t *h2o_evloop__io_uring(h2o_evloop_t *_loop)
340
{
341
    struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop;
342
    return &loop->io_uring;
343
}
344
#endif