/src/h2o/lib/common/socket/evloop/epoll.c.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2014 DeNA Co., Ltd. |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <assert.h> |
23 | | #include <limits.h> |
24 | | #include <stdio.h> |
25 | | #include <sys/epoll.h> |
26 | | #if H2O_USE_IO_URING |
27 | | #include "h2o/io_uring.h" |
28 | | #endif |
29 | | |
30 | | #if 0 |
31 | | #define DEBUG_LOG(...) h2o_error_printf(__VA_ARGS__) |
32 | | #else |
33 | | #define DEBUG_LOG(...) |
34 | | #endif |
35 | | |
36 | | struct st_h2o_evloop_epoll_t { |
37 | | h2o_evloop_t super; |
38 | | int ep; |
39 | | #if H2O_USE_IO_URING |
40 | | h2o_io_uring_t io_uring; |
41 | | #endif |
42 | | }; |
43 | | |
44 | | static int change_epoll_mode(struct st_h2o_evloop_socket_t *sock, uint32_t events) |
45 | 9.46k | { |
46 | 9.46k | struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; |
47 | 9.46k | struct epoll_event ev = {.events = events, .data = {.ptr = sock}}; |
48 | 9.46k | int op, ret; |
49 | | |
50 | 9.46k | if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) { |
51 | 9.23k | op = EPOLL_CTL_ADD; |
52 | 9.23k | sock->_flags |= H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; |
53 | 9.23k | } else { |
54 | 234 | op = EPOLL_CTL_MOD; |
55 | 234 | } |
56 | 9.46k | while ((ret = epoll_ctl(loop->ep, op, sock->fd, &ev)) != 0 && errno == EINTR) |
57 | 0 | ; |
58 | 9.46k | return ret == 0; |
59 | 9.46k | } |
60 | | |
61 | | static int delete_from_epoll_on_close(struct st_h2o_evloop_socket_t *sock) |
62 | 9.27k | { |
63 | 9.27k | struct st_h2o_evloop_epoll_t *loop = (void *)sock->loop; |
64 | 9.27k | int ret; |
65 | | |
66 | 9.27k | if ((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) == 0) |
67 | 181 | return 1; |
68 | 9.09k | while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) |
69 | 0 | ; |
70 | 9.09k | return ret == 0; |
71 | 9.27k | } |
72 | | |
73 | | static int handle_zerocopy_notification(struct st_h2o_evloop_socket_t *sock) |
74 | 0 | { |
75 | 0 | #if H2O_USE_MSG_ZEROCOPY |
76 | 0 | int made_progress = 0; |
77 | | |
78 | | /* Read the completion events and release buffers. `recvmmsg` with two entries is used as a cheap way of making sure that all |
79 | | * notifications are read from the queue (this requirement comes from the us eof edge trigger once the socket is closed). */ |
80 | 0 | while (1) { |
81 | 0 | struct mmsghdr msg[2]; |
82 | 0 | char cbuf[2][CMSG_SPACE(sizeof(struct sock_extended_err))]; |
83 | 0 | for (size_t i = 0; i < PTLS_ELEMENTSOF(msg); ++i) |
84 | 0 | msg[i] = (struct mmsghdr){.msg_hdr = {.msg_control = cbuf[i], .msg_controllen = sizeof(cbuf[i])}}; |
85 | 0 | struct timespec timeout = {0}; |
86 | |
|
87 | 0 | ssize_t ret; |
88 | 0 | while ((ret = recvmmsg(sock->fd, msg, PTLS_ELEMENTSOF(msg), MSG_ERRQUEUE, &timeout)) == -1 && errno == EINTR) |
89 | 0 | ; |
90 | 0 | if (ret == -1) |
91 | 0 | break; |
92 | | |
93 | 0 | for (size_t i = 0; i < ret; ++i) { |
94 | 0 | struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg[i].msg_hdr); |
95 | 0 | if (cmsg != NULL) { |
96 | 0 | struct sock_extended_err *ee = (void *)CMSG_DATA(cmsg); |
97 | 0 | if (ee->ee_errno == 0 && ee->ee_origin == SO_EE_ORIGIN_ZEROCOPY) { |
98 | | /* for each range being obtained, convert the wrapped value to 64-bit, then release the memory */ |
99 | 0 | for (uint32_t c32 = ee->ee_info; c32 <= ee->ee_data; ++c32) { |
100 | 0 | uint64_t c64 = (sock->super._zerocopy->first_counter & 0xffffffff00000000) | c32; |
101 | 0 | if (c64 < sock->super._zerocopy->first_counter) |
102 | 0 | c64 += 0x100000000; |
103 | 0 | void *p = zerocopy_buffers_release(sock->super._zerocopy, c64); |
104 | 0 | if (p != NULL) { |
105 | 0 | if (sock->super.ssl != NULL && p == sock->super.ssl->output.buf.base) { |
106 | | /* buffer being released from zerocopy still has some pending data to be written */ |
107 | 0 | assert(sock->super.ssl->output.zerocopy_owned); |
108 | 0 | sock->super.ssl->output.zerocopy_owned = 0; |
109 | 0 | } else { |
110 | 0 | h2o_mem_free_recycle(&h2o_socket_zerocopy_buffer_allocator, p); |
111 | 0 | } |
112 | 0 | --h2o_socket_num_zerocopy_buffers_inflight; |
113 | 0 | } |
114 | 0 | } |
115 | 0 | } |
116 | 0 | } |
117 | 0 | made_progress = 1; |
118 | 0 | } |
119 | | |
120 | | /* partial read means that the notification queue has become empty */ |
121 | 0 | if (ret < PTLS_ELEMENTSOF(msg)) |
122 | 0 | break; |
123 | 0 | } |
124 | | |
125 | | /* if the socket has been shut down and zerocopy buffer has become empty, link the socket so that it would be destroyed */ |
126 | 0 | if (made_progress && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0 && zerocopy_buffers_is_empty(sock->super._zerocopy)) |
127 | 0 | link_to_statechanged(sock); |
128 | |
|
129 | 0 | return made_progress; |
130 | | #else |
131 | | return 0; |
132 | | #endif |
133 | 0 | } |
134 | | |
135 | | static int update_status(struct st_h2o_evloop_epoll_t *loop) |
136 | 23.5k | { |
137 | 42.7k | while (loop->super._statechanged.head != NULL) { |
138 | | /* detach the top */ |
139 | 19.1k | struct st_h2o_evloop_socket_t *sock = loop->super._statechanged.head; |
140 | 19.1k | loop->super._statechanged.head = sock->_next_statechanged; |
141 | 19.1k | sock->_next_statechanged = sock; |
142 | | /* update the state */ |
143 | 19.1k | if ((sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0) { |
144 | 9.27k | if (sock->super._zerocopy == NULL || zerocopy_buffers_is_empty(sock->super._zerocopy)) { |
145 | | /* Call close (2) and destroy, now that all zero copy buffers have been reclaimed. */ |
146 | 9.27k | if (sock->super._zerocopy != NULL) { |
147 | 0 | zerocopy_buffers_dispose(sock->super._zerocopy); |
148 | 0 | free(sock->super._zerocopy); |
149 | 0 | } |
150 | 9.27k | if (sock->fd != -1) { |
151 | 0 | if (!delete_from_epoll_on_close(sock)) |
152 | 0 | h2o_error_printf("update_status: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); |
153 | 0 | close(sock->fd); |
154 | 0 | } |
155 | 9.27k | free(sock); |
156 | 9.27k | } |
157 | 9.88k | } else { |
158 | 9.88k | uint32_t events = 0; |
159 | 9.88k | int changed = 0; |
160 | 9.88k | if (h2o_socket_is_reading(&sock->super)) { |
161 | 9.75k | events |= EPOLLIN; |
162 | 9.75k | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) == 0) { |
163 | 9.23k | sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; |
164 | 9.23k | changed = 1; |
165 | 9.23k | } |
166 | 9.75k | } else { |
167 | 134 | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { |
168 | 134 | sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_READ; |
169 | 134 | changed = 1; |
170 | 134 | } |
171 | 134 | } |
172 | 9.88k | if (h2o_socket_is_writing(&sock->super)) { |
173 | 64 | events |= EPOLLOUT; |
174 | 64 | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) == 0) { |
175 | 56 | sock->_flags |= H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; |
176 | 56 | changed = 1; |
177 | 56 | } |
178 | 9.82k | } else { |
179 | 9.82k | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { |
180 | 44 | sock->_flags &= ~H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE; |
181 | 44 | changed = 1; |
182 | 44 | } |
183 | 9.82k | } |
184 | 9.88k | if (changed) { |
185 | 9.46k | if (!change_epoll_mode(sock, events)) |
186 | 0 | return -1; |
187 | 9.46k | } |
188 | 9.88k | } |
189 | 19.1k | } |
190 | 23.5k | loop->super._statechanged.tail_ref = &loop->super._statechanged.head; |
191 | | |
192 | 23.5k | return 0; |
193 | 23.5k | } |
194 | | |
195 | | int evloop_do_proceed(h2o_evloop_t *_loop, int32_t max_wait) |
196 | 23.5k | { |
197 | 23.5k | struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; |
198 | 23.5k | struct epoll_event events[256]; |
199 | 23.5k | int nevents, i; |
200 | | |
201 | | /* collect (and update) status */ |
202 | 23.5k | if (update_status(loop) != 0) |
203 | 0 | return -1; |
204 | | |
205 | | /* poll */ |
206 | 23.5k | max_wait = adjust_max_wait(&loop->super, max_wait); |
207 | 23.5k | nevents = epoll_wait(loop->ep, events, sizeof(events) / sizeof(events[0]), max_wait); |
208 | 23.5k | update_now(&loop->super); |
209 | 23.5k | if (nevents == -1) |
210 | 1 | return -1; |
211 | | |
212 | 23.5k | if (nevents != 0) { |
213 | 15.5k | h2o_sliding_counter_start(&loop->super.exec_time_nanosec_counter, loop->super._now_nanosec); |
214 | 15.5k | } |
215 | | |
216 | | /* update readable flags, perform writes */ |
217 | 39.3k | for (i = 0; i != nevents; ++i) { |
218 | 15.8k | struct st_h2o_evloop_socket_t *sock = events[i].data.ptr; |
219 | 15.8k | int notified = 0; |
220 | | /* When receiving HUP (indicating reset) while the socket is polled neither for read nor write, unregister the socket from |
221 | | * epoll, otherwise epoll_wait() would continue raising the HUP event. This problem cannot be avoided by using edge trigger. |
222 | | * The application will eventually try to read or write to the socket and at that point close the socket, detecting that it |
223 | | * has become unusable. */ |
224 | 15.8k | if ((events[i].events & EPOLLHUP) != 0 && |
225 | 15.8k | (sock->_flags & (H2O_SOCKET_FLAG_IS_POLLED_FOR_READ | H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE)) == 0 && |
226 | 15.8k | !(sock->super._zerocopy != NULL && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) != 0)) { |
227 | 134 | assert((sock->_flags & H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED) != 0); |
228 | 134 | int ret; |
229 | 134 | while ((ret = epoll_ctl(loop->ep, EPOLL_CTL_DEL, sock->fd, NULL)) != 0 && errno == EINTR) |
230 | 0 | ; |
231 | 134 | if (ret != 0) |
232 | 0 | h2o_error_printf("failed to unregister socket (fd:%d) that raised HUP; errno=%d\n", sock->fd, errno); |
233 | 134 | sock->_flags &= ~H2O_SOCKET_FLAG__EPOLL_IS_REGISTERED; |
234 | 134 | notified = 1; |
235 | 134 | } |
236 | | /* If the error event was a zerocopy notification, hide the error notification to application. Doing so is fine because |
237 | | * level-triggered interface is used while the socket is open. If there is another type of pending error event, it would be |
238 | | * notified once we run out of zerocopy notifications. */ |
239 | 15.8k | if ((events[i].events & EPOLLERR) != 0 && sock->super._zerocopy != NULL && handle_zerocopy_notification(sock)) { |
240 | 0 | events[i].events &= ~EPOLLERR; |
241 | 0 | notified = 1; |
242 | 0 | } |
243 | | /* Handle read and write events. */ |
244 | 15.8k | if ((events[i].events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) { |
245 | 15.7k | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_READ) != 0) { |
246 | 15.6k | sock->_flags |= H2O_SOCKET_FLAG_IS_READ_READY; |
247 | 15.6k | link_to_pending(sock); |
248 | 15.6k | notified = 1; |
249 | 15.6k | } |
250 | 15.7k | } |
251 | 15.8k | if ((events[i].events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) { |
252 | 1.54k | if ((sock->_flags & H2O_SOCKET_FLAG_IS_POLLED_FOR_WRITE) != 0) { |
253 | 64 | write_pending(sock); |
254 | 64 | notified = 1; |
255 | 64 | } |
256 | 1.54k | } |
257 | | /* Report events that could be notified, as that would help us debug issues. This mechanism is disabled once the socket is |
258 | | * closed, as there will be misfires due to the nature of edge triggers (race between us draining between events queued up). |
259 | | */ |
260 | 15.8k | if (!notified && (sock->_flags & H2O_SOCKET_FLAG_IS_DISPOSED) == 0) { |
261 | 0 | static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; |
262 | 0 | static time_t last_reported = 0; |
263 | 0 | time_t now = time(NULL); |
264 | 0 | pthread_mutex_lock(&lock); |
265 | 0 | if (last_reported + 60 < now) { |
266 | 0 | last_reported = now; |
267 | 0 | h2o_error_printf("ignoring epoll event (fd:%d,event:0x%x,flags:0x%x)\n", sock->fd, (int)events[i].events, |
268 | 0 | sock->_flags); |
269 | 0 | } |
270 | 0 | pthread_mutex_unlock(&lock); |
271 | 0 | } |
272 | 15.8k | } |
273 | | |
274 | 23.5k | return 0; |
275 | 23.5k | } |
276 | | |
277 | | static void evloop_do_on_socket_create(struct st_h2o_evloop_socket_t *sock) |
278 | 9.27k | { |
279 | 9.27k | } |
280 | | |
281 | | static int evloop_do_on_socket_close(struct st_h2o_evloop_socket_t *sock) |
282 | 9.27k | { |
283 | 9.27k | int ret; |
284 | | |
285 | | /* Nothing to do if fd has been detached already. */ |
286 | 9.27k | if (sock->fd == -1) |
287 | 0 | return 0; |
288 | | |
289 | | /* If zero copy is in action, disconnect using shutdown(). Then, poll the socket until all zero copy buffers are reclaimed, at |
290 | | * which point we dispose of the socket. Edge trigger is used, as in level trigger EPOLLHUP will be notified continuously. */ |
291 | 9.27k | if (sock->super._zerocopy != NULL && !zerocopy_buffers_is_empty(sock->super._zerocopy)) { |
292 | 0 | while ((ret = shutdown(sock->fd, SHUT_RDWR)) == -1 && errno == EINTR) |
293 | 0 | ; |
294 | 0 | if (ret != 0 && errno != ENOTCONN) |
295 | 0 | h2o_error_printf("socket_close: shutdown(SHUT_RDWR) failed; errno=%d, fd=%d\n", errno, sock->fd); |
296 | 0 | if (!change_epoll_mode(sock, EPOLLET)) |
297 | 0 | h2o_fatal("socket_close: epoll_ctl(MOD) failed; errno=%d, fd=%d\n", errno, sock->fd); |
298 | | /* drain error notifications after registering the edge trigger, otherwise there's chance of stall */ |
299 | 0 | handle_zerocopy_notification(sock); |
300 | 0 | return 1; |
301 | 0 | } |
302 | | |
303 | | /* Unregister from epoll. */ |
304 | 9.27k | if (!delete_from_epoll_on_close(sock)) |
305 | 0 | h2o_error_printf("socket_close: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); |
306 | | |
307 | 9.27k | return 0; |
308 | 9.27k | } |
309 | | |
310 | | static void evloop_do_on_socket_export(struct st_h2o_evloop_socket_t *sock) |
311 | 0 | { |
312 | 0 | if (!delete_from_epoll_on_close(sock)) |
313 | 0 | h2o_error_printf("socket_export: epoll(DEL) returned error %d (fd=%d)\n", errno, sock->fd); |
314 | 0 | } |
315 | | |
316 | | static void evloop_do_dispose(h2o_evloop_t *_loop) |
317 | 0 | { |
318 | 0 | struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; |
319 | 0 | close(loop->ep); |
320 | 0 | } |
321 | | |
322 | | h2o_evloop_t *h2o_evloop_create(void) |
323 | 1 | { |
324 | 1 | struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)create_evloop(sizeof(*loop)); |
325 | | |
326 | 1 | if ((loop->ep = epoll_create1(EPOLL_CLOEXEC)) == -1) { |
327 | 0 | char buf[128]; |
328 | 0 | h2o_fatal("h2o_evloop_create: epoll_create1 failed:%d:%s\n", errno, h2o_strerror_r(errno, buf, sizeof(buf))); |
329 | 0 | } |
330 | | |
331 | | #if H2O_USE_IO_URING |
332 | | h2o_io_uring_init(&loop->super); |
333 | | #endif |
334 | | |
335 | 1 | return &loop->super; |
336 | 1 | } |
337 | | |
338 | | #if H2O_USE_IO_URING |
339 | | struct st_h2o_io_uring_t *h2o_evloop__io_uring(h2o_evloop_t *_loop) |
340 | | { |
341 | | struct st_h2o_evloop_epoll_t *loop = (struct st_h2o_evloop_epoll_t *)_loop; |
342 | | return &loop->io_uring; |
343 | | } |
344 | | #endif |