/src/unit/src/nxt_epoll_engine.c
Line | Count | Source |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | |
10 | | /* |
11 | | * The first epoll version has been introduced in Linux 2.5.44. The |
12 | | * interface was changed several times since then and the final version |
13 | | * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has |
14 | | * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. |
15 | | * |
16 | | * EPOLLET mode did not work reliable in early implementaions and in |
17 | | * Linux 2.4 backport. |
18 | | * |
19 | | * EPOLLONESHOT Linux 2.6.2, glibc 2.3. |
20 | | * EPOLLRDHUP Linux 2.6.17, glibc 2.8. |
21 | | * epoll_pwait() Linux 2.6.19, glibc 2.6. |
22 | | * signalfd() Linux 2.6.22, glibc 2.7. |
23 | | * eventfd() Linux 2.6.22, glibc 2.7. |
24 | | * timerfd_create() Linux 2.6.25, glibc 2.8. |
25 | | * epoll_create1() Linux 2.6.27, glibc 2.9. |
26 | | * signalfd4() Linux 2.6.27, glibc 2.9. |
27 | | * eventfd2() Linux 2.6.27, glibc 2.9. |
28 | | * accept4() Linux 2.6.28, glibc 2.10. |
29 | | * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. |
30 | | * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. |
31 | | */ |
32 | | |
33 | | |
34 | | #if (NXT_HAVE_EPOLL_EDGE) |
35 | | static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, |
36 | | nxt_uint_t mchanges, nxt_uint_t mevents); |
37 | | #endif |
38 | | static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, |
39 | | nxt_uint_t mchanges, nxt_uint_t mevents); |
40 | | static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, |
41 | | nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); |
42 | | static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, |
43 | | nxt_conn_io_t *io); |
44 | | static void nxt_epoll_free(nxt_event_engine_t *engine); |
45 | | static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); |
46 | | static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); |
47 | | static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); |
48 | | static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, |
49 | | nxt_fd_event_t *ev); |
50 | | static void nxt_epoll_enable_read(nxt_event_engine_t *engine, |
51 | | nxt_fd_event_t *ev); |
52 | | static void nxt_epoll_enable_write(nxt_event_engine_t *engine, |
53 | | nxt_fd_event_t *ev); |
54 | | static void nxt_epoll_disable_read(nxt_event_engine_t *engine, |
55 | | nxt_fd_event_t *ev); |
56 | | static void nxt_epoll_disable_write(nxt_event_engine_t *engine, |
57 | | nxt_fd_event_t *ev); |
58 | | static void nxt_epoll_block_read(nxt_event_engine_t *engine, |
59 | | nxt_fd_event_t *ev); |
60 | | static void nxt_epoll_block_write(nxt_event_engine_t *engine, |
61 | | nxt_fd_event_t *ev); |
62 | | static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, |
63 | | nxt_fd_event_t *ev); |
64 | | static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, |
65 | | nxt_fd_event_t *ev); |
66 | | static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, |
67 | | nxt_fd_event_t *ev); |
68 | | static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, |
69 | | int op, uint32_t events); |
70 | | static void nxt_epoll_commit_changes(nxt_event_engine_t *engine); |
71 | | static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); |
72 | | #if (NXT_HAVE_SIGNALFD) |
73 | | static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); |
74 | | static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); |
75 | | #endif |
76 | | #if (NXT_HAVE_EVENTFD) |
77 | | static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, |
78 | | nxt_work_handler_t handler); |
79 | | static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); |
80 | | static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); |
81 | | #endif |
82 | | static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); |
83 | | |
84 | | #if (NXT_HAVE_ACCEPT4) |
85 | | static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, |
86 | | void *data); |
87 | | #endif |
88 | | |
89 | | |
90 | | #if (NXT_HAVE_EPOLL_EDGE) |
91 | | |
92 | | static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, |
93 | | void *data); |
94 | | static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, |
95 | | void *data); |
96 | | static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); |
97 | | |
98 | | |
99 | | static nxt_conn_io_t nxt_epoll_edge_conn_io = { |
100 | | .connect = nxt_epoll_edge_conn_io_connect, |
101 | | .accept = nxt_conn_io_accept, |
102 | | |
103 | | .read = nxt_conn_io_read, |
104 | | .recvbuf = nxt_epoll_edge_conn_io_recvbuf, |
105 | | .recv = nxt_conn_io_recv, |
106 | | |
107 | | .write = nxt_conn_io_write, |
108 | | .sendbuf = nxt_conn_io_sendbuf, |
109 | | |
110 | | #if (NXT_HAVE_LINUX_SENDFILE) |
111 | | .old_sendbuf = nxt_linux_event_conn_io_sendfile, |
112 | | #else |
113 | | .old_sendbuf = nxt_event_conn_io_sendbuf, |
114 | | #endif |
115 | | |
116 | | .writev = nxt_event_conn_io_writev, |
117 | | .send = nxt_event_conn_io_send, |
118 | | }; |
119 | | |
120 | | |
121 | | const nxt_event_interface_t nxt_epoll_edge_engine = { |
122 | | "epoll_edge", |
123 | | nxt_epoll_edge_create, |
124 | | nxt_epoll_free, |
125 | | nxt_epoll_enable, |
126 | | nxt_epoll_disable, |
127 | | nxt_epoll_delete, |
128 | | nxt_epoll_close, |
129 | | nxt_epoll_enable_read, |
130 | | nxt_epoll_enable_write, |
131 | | nxt_epoll_disable_read, |
132 | | nxt_epoll_disable_write, |
133 | | nxt_epoll_block_read, |
134 | | nxt_epoll_block_write, |
135 | | nxt_epoll_oneshot_read, |
136 | | nxt_epoll_oneshot_write, |
137 | | nxt_epoll_enable_accept, |
138 | | NULL, |
139 | | NULL, |
140 | | #if (NXT_HAVE_EVENTFD) |
141 | | nxt_epoll_enable_post, |
142 | | nxt_epoll_signal, |
143 | | #else |
144 | | NULL, |
145 | | NULL, |
146 | | #endif |
147 | | nxt_epoll_poll, |
148 | | |
149 | | &nxt_epoll_edge_conn_io, |
150 | | |
151 | | #if (NXT_HAVE_INOTIFY) |
152 | | NXT_FILE_EVENTS, |
153 | | #else |
154 | | NXT_NO_FILE_EVENTS, |
155 | | #endif |
156 | | |
157 | | #if (NXT_HAVE_SIGNALFD) |
158 | | NXT_SIGNAL_EVENTS, |
159 | | #else |
160 | | NXT_NO_SIGNAL_EVENTS, |
161 | | #endif |
162 | | }; |
163 | | |
164 | | #endif |
165 | | |
166 | | |
167 | | const nxt_event_interface_t nxt_epoll_level_engine = { |
168 | | "epoll_level", |
169 | | nxt_epoll_level_create, |
170 | | nxt_epoll_free, |
171 | | nxt_epoll_enable, |
172 | | nxt_epoll_disable, |
173 | | nxt_epoll_delete, |
174 | | nxt_epoll_close, |
175 | | nxt_epoll_enable_read, |
176 | | nxt_epoll_enable_write, |
177 | | nxt_epoll_disable_read, |
178 | | nxt_epoll_disable_write, |
179 | | nxt_epoll_block_read, |
180 | | nxt_epoll_block_write, |
181 | | nxt_epoll_oneshot_read, |
182 | | nxt_epoll_oneshot_write, |
183 | | nxt_epoll_enable_accept, |
184 | | NULL, |
185 | | NULL, |
186 | | #if (NXT_HAVE_EVENTFD) |
187 | | nxt_epoll_enable_post, |
188 | | nxt_epoll_signal, |
189 | | #else |
190 | | NULL, |
191 | | NULL, |
192 | | #endif |
193 | | nxt_epoll_poll, |
194 | | |
195 | | &nxt_unix_conn_io, |
196 | | |
197 | | #if (NXT_HAVE_INOTIFY) |
198 | | NXT_FILE_EVENTS, |
199 | | #else |
200 | | NXT_NO_FILE_EVENTS, |
201 | | #endif |
202 | | |
203 | | #if (NXT_HAVE_SIGNALFD) |
204 | | NXT_SIGNAL_EVENTS, |
205 | | #else |
206 | | NXT_NO_SIGNAL_EVENTS, |
207 | | #endif |
208 | | }; |
209 | | |
210 | | |
211 | | #if (NXT_HAVE_EPOLL_EDGE) |
212 | | |
213 | | static nxt_int_t |
214 | | nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, |
215 | | nxt_uint_t mevents) |
216 | 0 | { |
217 | 0 | return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, |
218 | 0 | EPOLLET | EPOLLRDHUP); |
219 | 0 | } |
220 | | |
221 | | #endif |
222 | | |
223 | | |
224 | | static nxt_int_t |
225 | | nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, |
226 | | nxt_uint_t mevents) |
227 | 0 | { |
228 | 0 | return nxt_epoll_create(engine, mchanges, mevents, |
229 | 0 | &nxt_unix_conn_io, 0); |
230 | 0 | } |
231 | | |
232 | | |
233 | | static nxt_int_t |
234 | | nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, |
235 | | nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) |
236 | 0 | { |
237 | 0 | engine->u.epoll.fd = -1; |
238 | 0 | engine->u.epoll.mode = mode; |
239 | 0 | engine->u.epoll.mchanges = mchanges; |
240 | 0 | engine->u.epoll.mevents = mevents; |
241 | 0 | #if (NXT_HAVE_SIGNALFD) |
242 | 0 | engine->u.epoll.signalfd.fd = -1; |
243 | 0 | #endif |
244 | |
|
245 | 0 | engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); |
246 | 0 | if (engine->u.epoll.changes == NULL) { |
247 | 0 | goto fail; |
248 | 0 | } |
249 | | |
250 | 0 | engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); |
251 | 0 | if (engine->u.epoll.events == NULL) { |
252 | 0 | goto fail; |
253 | 0 | } |
254 | | |
255 | 0 | engine->u.epoll.fd = epoll_create(1); |
256 | 0 | if (engine->u.epoll.fd == -1) { |
257 | 0 | nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); |
258 | 0 | goto fail; |
259 | 0 | } |
260 | | |
261 | 0 | nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); |
262 | |
|
263 | 0 | if (engine->signals != NULL) { |
264 | |
|
265 | 0 | #if (NXT_HAVE_SIGNALFD) |
266 | |
|
267 | 0 | if (nxt_epoll_add_signal(engine) != NXT_OK) { |
268 | 0 | goto fail; |
269 | 0 | } |
270 | | |
271 | 0 | #endif |
272 | | |
273 | 0 | nxt_epoll_test_accept4(engine, io); |
274 | 0 | } |
275 | | |
276 | 0 | return NXT_OK; |
277 | | |
278 | 0 | fail: |
279 | |
|
280 | 0 | nxt_epoll_free(engine); |
281 | |
|
282 | 0 | return NXT_ERROR; |
283 | 0 | } |
284 | | |
285 | | |
286 | | static void |
287 | | nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) |
288 | 0 | { |
289 | 0 | static nxt_work_handler_t handler; |
290 | |
|
291 | 0 | if (handler == NULL) { |
292 | |
|
293 | 0 | handler = io->accept; |
294 | |
|
295 | 0 | #if (NXT_HAVE_ACCEPT4) |
296 | |
|
297 | 0 | (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); |
298 | |
|
299 | 0 | if (nxt_errno != NXT_ENOSYS) { |
300 | 0 | handler = nxt_epoll_conn_io_accept4; |
301 | |
|
302 | 0 | } else { |
303 | 0 | nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", |
304 | 0 | NXT_ENOSYS); |
305 | 0 | } |
306 | |
|
307 | 0 | #endif |
308 | 0 | } |
309 | |
|
310 | 0 | io->accept = handler; |
311 | 0 | } |
312 | | |
313 | | |
314 | | static void |
315 | | nxt_epoll_free(nxt_event_engine_t *engine) |
316 | 0 | { |
317 | 0 | int fd; |
318 | |
|
319 | 0 | nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); |
320 | |
|
321 | 0 | #if (NXT_HAVE_SIGNALFD) |
322 | |
|
323 | 0 | fd = engine->u.epoll.signalfd.fd; |
324 | |
|
325 | 0 | if (fd != -1 && close(fd) != 0) { |
326 | 0 | nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); |
327 | 0 | } |
328 | |
|
329 | 0 | #endif |
330 | |
|
331 | 0 | #if (NXT_HAVE_EVENTFD) |
332 | |
|
333 | 0 | fd = engine->u.epoll.eventfd.fd; |
334 | |
|
335 | 0 | if (fd != -1 && close(fd) != 0) { |
336 | 0 | nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); |
337 | 0 | } |
338 | |
|
339 | 0 | #endif |
340 | |
|
341 | 0 | fd = engine->u.epoll.fd; |
342 | |
|
343 | 0 | if (fd != -1 && close(fd) != 0) { |
344 | 0 | nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); |
345 | 0 | } |
346 | |
|
347 | 0 | nxt_free(engine->u.epoll.events); |
348 | 0 | nxt_free(engine->u.epoll.changes); |
349 | |
|
350 | 0 | nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); |
351 | 0 | } |
352 | | |
353 | | |
354 | | static void |
355 | | nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
356 | 0 | { |
357 | 0 | ev->read = NXT_EVENT_ACTIVE; |
358 | 0 | ev->write = NXT_EVENT_ACTIVE; |
359 | |
|
360 | 0 | nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, |
361 | 0 | EPOLLIN | EPOLLOUT | engine->u.epoll.mode); |
362 | 0 | } |
363 | | |
364 | | |
365 | | static void |
366 | | nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
367 | 0 | { |
368 | 0 | if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { |
369 | |
|
370 | 0 | ev->read = NXT_EVENT_INACTIVE; |
371 | 0 | ev->write = NXT_EVENT_INACTIVE; |
372 | |
|
373 | 0 | nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); |
374 | 0 | } |
375 | 0 | } |
376 | | |
377 | | |
378 | | static void |
379 | | nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
380 | 0 | { |
381 | 0 | if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { |
382 | |
|
383 | 0 | ev->read = NXT_EVENT_INACTIVE; |
384 | 0 | ev->write = NXT_EVENT_INACTIVE; |
385 | |
|
386 | 0 | nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | |
391 | | /* |
392 | | * Although calling close() on a file descriptor will remove any epoll |
393 | | * events that reference the descriptor, in this case the close() acquires |
394 | | * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not |
395 | | * acquire the "epmutex" since Linux 3.13 if the file descriptor presents |
396 | | * only in one epoll set. Thus removing events explicitly before closing |
397 | | * eliminates possible lock contention. |
398 | | */ |
399 | | |
400 | | static nxt_bool_t |
401 | | nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
402 | 0 | { |
403 | 0 | nxt_epoll_delete(engine, ev); |
404 | |
|
405 | 0 | return ev->changing; |
406 | 0 | } |
407 | | |
408 | | |
409 | | static void |
410 | | nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
411 | 0 | { |
412 | 0 | int op; |
413 | 0 | uint32_t events; |
414 | |
|
415 | 0 | if (ev->read != NXT_EVENT_BLOCKED) { |
416 | |
|
417 | 0 | op = EPOLL_CTL_MOD; |
418 | 0 | events = EPOLLIN | engine->u.epoll.mode; |
419 | |
|
420 | 0 | if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { |
421 | 0 | op = EPOLL_CTL_ADD; |
422 | |
|
423 | 0 | } else if (ev->write >= NXT_EVENT_BLOCKED) { |
424 | 0 | events |= EPOLLOUT; |
425 | 0 | } |
426 | |
|
427 | 0 | nxt_epoll_change(engine, ev, op, events); |
428 | 0 | } |
429 | |
|
430 | 0 | ev->read = NXT_EVENT_ACTIVE; |
431 | 0 | } |
432 | | |
433 | | |
434 | | static void |
435 | | nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
436 | 0 | { |
437 | 0 | int op; |
438 | 0 | uint32_t events; |
439 | |
|
440 | 0 | if (ev->write != NXT_EVENT_BLOCKED) { |
441 | |
|
442 | 0 | op = EPOLL_CTL_MOD; |
443 | 0 | events = EPOLLOUT | engine->u.epoll.mode; |
444 | |
|
445 | 0 | if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { |
446 | 0 | op = EPOLL_CTL_ADD; |
447 | |
|
448 | 0 | } else if (ev->read >= NXT_EVENT_BLOCKED) { |
449 | 0 | events |= EPOLLIN; |
450 | 0 | } |
451 | |
|
452 | 0 | nxt_epoll_change(engine, ev, op, events); |
453 | 0 | } |
454 | |
|
455 | 0 | ev->write = NXT_EVENT_ACTIVE; |
456 | 0 | } |
457 | | |
458 | | |
459 | | static void |
460 | | nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
461 | 0 | { |
462 | 0 | int op; |
463 | 0 | uint32_t events; |
464 | |
|
465 | 0 | ev->read = NXT_EVENT_INACTIVE; |
466 | |
|
467 | 0 | if (ev->write <= NXT_EVENT_DISABLED) { |
468 | 0 | ev->write = NXT_EVENT_INACTIVE; |
469 | 0 | op = EPOLL_CTL_DEL; |
470 | 0 | events = 0; |
471 | |
|
472 | 0 | } else { |
473 | 0 | op = EPOLL_CTL_MOD; |
474 | 0 | events = EPOLLOUT | engine->u.epoll.mode; |
475 | 0 | } |
476 | |
|
477 | 0 | nxt_epoll_change(engine, ev, op, events); |
478 | 0 | } |
479 | | |
480 | | |
481 | | static void |
482 | | nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
483 | 0 | { |
484 | 0 | int op; |
485 | 0 | uint32_t events; |
486 | |
|
487 | 0 | ev->write = NXT_EVENT_INACTIVE; |
488 | |
|
489 | 0 | if (ev->read <= NXT_EVENT_DISABLED) { |
490 | 0 | ev->read = NXT_EVENT_INACTIVE; |
491 | 0 | op = EPOLL_CTL_DEL; |
492 | 0 | events = 0; |
493 | |
|
494 | 0 | } else { |
495 | 0 | op = EPOLL_CTL_MOD; |
496 | 0 | events = EPOLLIN | engine->u.epoll.mode; |
497 | 0 | } |
498 | |
|
499 | 0 | nxt_epoll_change(engine, ev, op, events); |
500 | 0 | } |
501 | | |
502 | | |
503 | | static void |
504 | | nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
505 | 0 | { |
506 | 0 | if (ev->read != NXT_EVENT_INACTIVE) { |
507 | 0 | ev->read = NXT_EVENT_BLOCKED; |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | | |
512 | | static void |
513 | | nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
514 | 0 | { |
515 | 0 | if (ev->write != NXT_EVENT_INACTIVE) { |
516 | 0 | ev->write = NXT_EVENT_BLOCKED; |
517 | 0 | } |
518 | 0 | } |
519 | | |
520 | | |
521 | | /* |
522 | | * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT |
523 | | * event should be added or modified, epoll_ctl(2): |
524 | | * |
525 | | * EPOLLONESHOT (since Linux 2.6.2) |
526 | | * Sets the one-shot behavior for the associated file descriptor. |
527 | | * This means that after an event is pulled out with epoll_wait(2) |
528 | | * the associated file descriptor is internally disabled and no |
529 | | * other events will be reported by the epoll interface. The user |
530 | | * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file |
531 | | * descriptor with a new event mask. |
532 | | */ |
533 | | |
534 | | static void |
535 | | nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
536 | 0 | { |
537 | 0 | int op; |
538 | |
|
539 | 0 | op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? |
540 | 0 | EPOLL_CTL_ADD : EPOLL_CTL_MOD; |
541 | |
|
542 | 0 | ev->read = NXT_EVENT_ONESHOT; |
543 | 0 | ev->write = NXT_EVENT_INACTIVE; |
544 | |
|
545 | 0 | nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); |
546 | 0 | } |
547 | | |
548 | | |
549 | | static void |
550 | | nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
551 | 0 | { |
552 | 0 | int op; |
553 | |
|
554 | 0 | op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? |
555 | 0 | EPOLL_CTL_ADD : EPOLL_CTL_MOD; |
556 | |
|
557 | 0 | ev->read = NXT_EVENT_INACTIVE; |
558 | 0 | ev->write = NXT_EVENT_ONESHOT; |
559 | |
|
560 | 0 | nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); |
561 | 0 | } |
562 | | |
563 | | |
564 | | static void |
565 | | nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
566 | 0 | { |
567 | 0 | uint32_t events; |
568 | |
|
569 | 0 | ev->read = NXT_EVENT_ACTIVE; |
570 | |
|
571 | 0 | events = EPOLLIN; |
572 | |
|
573 | 0 | #ifdef EPOLLEXCLUSIVE |
574 | 0 | events |= EPOLLEXCLUSIVE; |
575 | 0 | #endif |
576 | |
|
577 | 0 | nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); |
578 | 0 | } |
579 | | |
580 | | |
581 | | /* |
582 | | * epoll changes are batched to improve instruction and data cache |
583 | | * locality of several epoll_ctl() calls followed by epoll_wait() call. |
584 | | */ |
585 | | |
586 | | static void |
587 | | nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, |
588 | | uint32_t events) |
589 | 0 | { |
590 | 0 | nxt_epoll_change_t *change; |
591 | |
|
592 | 0 | nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", |
593 | 0 | engine->u.epoll.fd, ev->fd, op, events); |
594 | |
|
595 | 0 | if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { |
596 | 0 | nxt_epoll_commit_changes(engine); |
597 | 0 | } |
598 | |
|
599 | 0 | ev->changing = 1; |
600 | |
|
601 | 0 | change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; |
602 | 0 | change->op = op; |
603 | 0 | change->event.events = events; |
604 | 0 | change->event.data.ptr = ev; |
605 | 0 | } |
606 | | |
607 | | |
608 | | static void |
609 | | nxt_epoll_commit_changes(nxt_event_engine_t *engine) |
610 | 0 | { |
611 | 0 | int ret; |
612 | 0 | nxt_fd_event_t *ev; |
613 | 0 | nxt_epoll_change_t *change, *end; |
614 | |
|
615 | 0 | nxt_debug(&engine->task, "epoll %d changes:%ui", |
616 | 0 | engine->u.epoll.fd, engine->u.epoll.nchanges); |
617 | |
|
618 | 0 | change = engine->u.epoll.changes; |
619 | 0 | end = change + engine->u.epoll.nchanges; |
620 | |
|
621 | 0 | do { |
622 | 0 | ev = change->event.data.ptr; |
623 | 0 | ev->changing = 0; |
624 | |
|
625 | 0 | nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", |
626 | 0 | engine->u.epoll.fd, ev->fd, change->op, |
627 | 0 | change->event.events); |
628 | |
|
629 | 0 | ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); |
630 | |
|
631 | 0 | if (nxt_slow_path(ret != 0)) { |
632 | 0 | nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", |
633 | 0 | engine->u.epoll.fd, change->op, ev->fd, nxt_errno); |
634 | |
|
635 | 0 | nxt_work_queue_add(&engine->fast_work_queue, |
636 | 0 | nxt_epoll_error_handler, ev->task, ev, ev->data); |
637 | |
|
638 | 0 | engine->u.epoll.error = 1; |
639 | 0 | } |
640 | |
|
641 | 0 | change++; |
642 | |
|
643 | 0 | } while (change < end); |
644 | |
|
645 | 0 | engine->u.epoll.nchanges = 0; |
646 | 0 | } |
647 | | |
648 | | |
649 | | static void |
650 | | nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) |
651 | 0 | { |
652 | 0 | nxt_fd_event_t *ev; |
653 | |
|
654 | 0 | ev = obj; |
655 | |
|
656 | 0 | ev->read = NXT_EVENT_INACTIVE; |
657 | 0 | ev->write = NXT_EVENT_INACTIVE; |
658 | |
|
659 | 0 | ev->error_handler(ev->task, ev, data); |
660 | 0 | } |
661 | | |
662 | | |
663 | | #if (NXT_HAVE_SIGNALFD) |
664 | | |
665 | | static nxt_int_t |
666 | | nxt_epoll_add_signal(nxt_event_engine_t *engine) |
667 | 0 | { |
668 | 0 | int fd; |
669 | 0 | struct epoll_event ee; |
670 | |
|
671 | 0 | if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { |
672 | 0 | nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); |
673 | 0 | return NXT_ERROR; |
674 | 0 | } |
675 | | |
676 | | /* |
677 | | * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 |
678 | | * and 2.8 signalfd() wrappers call the original signalfd() syscall |
679 | | * without the flags argument. Glibc 2.9+ signalfd() wrapper at first |
680 | | * tries to call signalfd4() syscall and if it fails then calls the |
681 | | * original signalfd() syscall. For this reason the non-blocking mode |
682 | | * is set separately. |
683 | | */ |
684 | | |
685 | 0 | fd = signalfd(-1, &engine->signals->sigmask, 0); |
686 | |
|
687 | 0 | if (fd == -1) { |
688 | 0 | nxt_alert(&engine->task, "signalfd(%d) failed %E", |
689 | 0 | engine->u.epoll.signalfd.fd, nxt_errno); |
690 | 0 | return NXT_ERROR; |
691 | 0 | } |
692 | | |
693 | 0 | engine->u.epoll.signalfd.fd = fd; |
694 | |
|
695 | 0 | if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { |
696 | 0 | return NXT_ERROR; |
697 | 0 | } |
698 | | |
699 | 0 | nxt_debug(&engine->task, "signalfd(): %d", fd); |
700 | |
|
701 | 0 | engine->u.epoll.signalfd.data = engine->signals->handler; |
702 | 0 | engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; |
703 | 0 | engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; |
704 | 0 | engine->u.epoll.signalfd.log = engine->task.log; |
705 | 0 | engine->u.epoll.signalfd.task = &engine->task; |
706 | |
|
707 | 0 | ee.events = EPOLLIN; |
708 | 0 | ee.data.ptr = &engine->u.epoll.signalfd; |
709 | |
|
710 | 0 | if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { |
711 | 0 | nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", |
712 | 0 | engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); |
713 | |
|
714 | 0 | return NXT_ERROR; |
715 | 0 | } |
716 | | |
717 | 0 | return NXT_OK; |
718 | 0 | } |
719 | | |
720 | | |
721 | | static void |
722 | | nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) |
723 | 0 | { |
724 | 0 | int n; |
725 | 0 | nxt_fd_event_t *ev; |
726 | 0 | nxt_work_handler_t handler; |
727 | 0 | struct signalfd_siginfo sfd; |
728 | |
|
729 | 0 | ev = obj; |
730 | 0 | handler = data; |
731 | |
|
732 | 0 | nxt_debug(task, "signalfd handler"); |
733 | |
|
734 | 0 | n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); |
735 | |
|
736 | 0 | nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); |
737 | |
|
738 | 0 | if (n != sizeof(struct signalfd_siginfo)) { |
739 | 0 | nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); |
740 | 0 | return; |
741 | 0 | } |
742 | | |
743 | 0 | nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); |
744 | |
|
745 | 0 | handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); |
746 | 0 | } |
747 | | |
748 | | #endif |
749 | | |
750 | | |
751 | | #if (NXT_HAVE_EVENTFD) |
752 | | |
753 | | static nxt_int_t |
754 | | nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) |
755 | 0 | { |
756 | 0 | int ret; |
757 | 0 | struct epoll_event ee; |
758 | |
|
759 | 0 | engine->u.epoll.post_handler = handler; |
760 | | |
761 | | /* |
762 | | * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 |
763 | | * and 2.8 eventfd() wrappers call the original eventfd() syscall |
764 | | * without the flags argument. Glibc 2.9+ eventfd() wrapper at first |
765 | | * tries to call eventfd2() syscall and if it fails then calls the |
766 | | * original eventfd() syscall. For this reason the non-blocking mode |
767 | | * is set separately. |
768 | | */ |
769 | |
|
770 | 0 | engine->u.epoll.eventfd.fd = eventfd(0, 0); |
771 | |
|
772 | 0 | if (engine->u.epoll.eventfd.fd == -1) { |
773 | 0 | nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); |
774 | 0 | return NXT_ERROR; |
775 | 0 | } |
776 | | |
777 | 0 | ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); |
778 | 0 | if (nxt_slow_path(ret != NXT_OK)) { |
779 | 0 | return NXT_ERROR; |
780 | 0 | } |
781 | | |
782 | 0 | nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); |
783 | |
|
784 | 0 | engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; |
785 | 0 | engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; |
786 | 0 | engine->u.epoll.eventfd.data = engine; |
787 | 0 | engine->u.epoll.eventfd.log = engine->task.log; |
788 | 0 | engine->u.epoll.eventfd.task = &engine->task; |
789 | |
|
790 | 0 | ee.events = EPOLLIN | EPOLLET; |
791 | 0 | ee.data.ptr = &engine->u.epoll.eventfd; |
792 | |
|
793 | 0 | ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, |
794 | 0 | engine->u.epoll.eventfd.fd, &ee); |
795 | |
|
796 | 0 | if (nxt_fast_path(ret == 0)) { |
797 | 0 | return NXT_OK; |
798 | 0 | } |
799 | | |
800 | 0 | nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", |
801 | 0 | engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, |
802 | 0 | nxt_errno); |
803 | |
|
804 | 0 | return NXT_ERROR; |
805 | 0 | } |
806 | | |
807 | | |
808 | | static void |
809 | | nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) |
810 | 0 | { |
811 | 0 | int n; |
812 | 0 | uint64_t events; |
813 | 0 | nxt_event_engine_t *engine; |
814 | |
|
815 | 0 | engine = data; |
816 | |
|
817 | 0 | nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); |
818 | | |
819 | | /* |
820 | | * The maximum value after write() to a eventfd() descriptor will |
821 | | * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor |
822 | | * can be read once per many notifications, for example, once per |
823 | | * 2^32-2 noticifcations. Since the eventfd() file descriptor is |
824 | | * always registered in EPOLLET mode, epoll returns event about |
825 | | * only the latest write() to the descriptor. |
826 | | */ |
827 | |
|
828 | 0 | if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) { |
829 | 0 | engine->u.epoll.neventfd = 0; |
830 | |
|
831 | 0 | n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); |
832 | |
|
833 | 0 | nxt_debug(task, "read(%d): %d events:%uL", |
834 | 0 | engine->u.epoll.eventfd.fd, n, events); |
835 | |
|
836 | 0 | if (n != sizeof(uint64_t)) { |
837 | 0 | nxt_alert(task, "read eventfd(%d) failed %E", |
838 | 0 | engine->u.epoll.eventfd.fd, nxt_errno); |
839 | 0 | } |
840 | 0 | } |
841 | |
|
842 | 0 | engine->u.epoll.post_handler(task, NULL, NULL); |
843 | 0 | } |
844 | | |
845 | | |
846 | | static void |
847 | | nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) |
848 | 0 | { |
849 | 0 | size_t ret; |
850 | 0 | uint64_t event; |
851 | | |
852 | | /* |
853 | | * eventfd() presents along with signalfd(), so the function |
854 | | * is used only to post events and the signo argument is ignored. |
855 | | */ |
856 | |
|
857 | 0 | event = 1; |
858 | |
|
859 | 0 | ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); |
860 | |
|
861 | 0 | if (nxt_slow_path(ret != sizeof(uint64_t))) { |
862 | 0 | nxt_alert(&engine->task, "write(%d) to eventfd failed %E", |
863 | 0 | engine->u.epoll.eventfd.fd, nxt_errno); |
864 | 0 | } |
865 | 0 | } |
866 | | |
867 | | #endif |
868 | | |
869 | | |
870 | | static void |
871 | | nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) |
872 | 0 | { |
873 | 0 | int nevents; |
874 | 0 | uint32_t events; |
875 | 0 | nxt_int_t i; |
876 | 0 | nxt_err_t err; |
877 | 0 | nxt_bool_t error; |
878 | 0 | nxt_uint_t level; |
879 | 0 | nxt_fd_event_t *ev; |
880 | 0 | struct epoll_event *event; |
881 | |
|
882 | 0 | if (engine->u.epoll.nchanges != 0) { |
883 | 0 | nxt_epoll_commit_changes(engine); |
884 | 0 | } |
885 | |
|
886 | 0 | if (engine->u.epoll.error) { |
887 | 0 | engine->u.epoll.error = 0; |
888 | | /* Error handlers have been enqueued on failure. */ |
889 | 0 | timeout = 0; |
890 | 0 | } |
891 | |
|
892 | 0 | nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", |
893 | 0 | engine->u.epoll.fd, timeout); |
894 | |
|
895 | 0 | nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, |
896 | 0 | engine->u.epoll.mevents, timeout); |
897 | |
|
898 | 0 | err = (nevents == -1) ? nxt_errno : 0; |
899 | |
|
900 | 0 | nxt_thread_time_update(engine->task.thread); |
901 | |
|
902 | 0 | nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); |
903 | |
|
904 | 0 | if (nevents == -1) { |
905 | 0 | level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; |
906 | |
|
907 | 0 | nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", |
908 | 0 | engine->u.epoll.fd, err); |
909 | |
|
910 | 0 | return; |
911 | 0 | } |
912 | | |
913 | 0 | for (i = 0; i < nevents; i++) { |
914 | |
|
915 | 0 | event = &engine->u.epoll.events[i]; |
916 | 0 | events = event->events; |
917 | 0 | ev = event->data.ptr; |
918 | |
|
919 | 0 | nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", |
920 | 0 | ev->fd, events, ev, ev->read, ev->write); |
921 | | |
922 | | /* |
923 | | * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN |
924 | | * or EPOLLOUT, so the "error" variable enqueues only error handler. |
925 | | */ |
926 | 0 | error = ((events & (EPOLLERR | EPOLLHUP)) != 0); |
927 | 0 | ev->epoll_error = error; |
928 | |
|
929 | 0 | if (error |
930 | 0 | && ev->read <= NXT_EVENT_BLOCKED |
931 | 0 | && ev->write <= NXT_EVENT_BLOCKED) |
932 | 0 | { |
933 | 0 | error = 0; |
934 | 0 | } |
935 | |
|
936 | 0 | #if (NXT_HAVE_EPOLL_EDGE) |
937 | |
|
938 | 0 | ev->epoll_eof = ((events & EPOLLRDHUP) != 0); |
939 | |
|
940 | 0 | #endif |
941 | |
|
942 | 0 | if ((events & EPOLLIN) != 0) { |
943 | 0 | ev->read_ready = 1; |
944 | |
|
945 | 0 | if (ev->read != NXT_EVENT_BLOCKED) { |
946 | |
|
947 | 0 | if (ev->read == NXT_EVENT_ONESHOT) { |
948 | 0 | ev->read = NXT_EVENT_DISABLED; |
949 | 0 | } |
950 | |
|
951 | 0 | nxt_work_queue_add(ev->read_work_queue, ev->read_handler, |
952 | 0 | ev->task, ev, ev->data); |
953 | |
|
954 | 0 | error = 0; |
955 | |
|
956 | 0 | } else if (engine->u.epoll.mode == 0) { |
957 | | /* Level-triggered mode. */ |
958 | 0 | nxt_epoll_disable_read(engine, ev); |
959 | 0 | } |
960 | 0 | } |
961 | |
|
962 | 0 | if ((events & EPOLLOUT) != 0) { |
963 | 0 | ev->write_ready = 1; |
964 | |
|
965 | 0 | if (ev->write != NXT_EVENT_BLOCKED) { |
966 | |
|
967 | 0 | if (ev->write == NXT_EVENT_ONESHOT) { |
968 | 0 | ev->write = NXT_EVENT_DISABLED; |
969 | 0 | } |
970 | |
|
971 | 0 | nxt_work_queue_add(ev->write_work_queue, ev->write_handler, |
972 | 0 | ev->task, ev, ev->data); |
973 | |
|
974 | 0 | error = 0; |
975 | |
|
976 | 0 | } else if (engine->u.epoll.mode == 0) { |
977 | | /* Level-triggered mode. */ |
978 | 0 | nxt_epoll_disable_write(engine, ev); |
979 | 0 | } |
980 | 0 | } |
981 | |
|
982 | 0 | if (!error) { |
983 | 0 | continue; |
984 | 0 | } |
985 | | |
986 | 0 | ev->read_ready = 1; |
987 | 0 | ev->write_ready = 1; |
988 | |
|
989 | 0 | if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) { |
990 | |
|
991 | 0 | if (engine->u.epoll.mode == 0) { |
992 | | /* Level-triggered mode. */ |
993 | 0 | nxt_epoll_disable(engine, ev); |
994 | 0 | } |
995 | |
|
996 | 0 | continue; |
997 | 0 | } |
998 | | |
999 | 0 | nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler, |
1000 | 0 | ev->task, ev, ev->data); |
1001 | 0 | } |
1002 | 0 | } |
1003 | | |
1004 | | |
1005 | | #if (NXT_HAVE_ACCEPT4) |
1006 | | |
1007 | | static void |
1008 | | nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) |
1009 | 0 | { |
1010 | 0 | socklen_t socklen; |
1011 | 0 | nxt_conn_t *c; |
1012 | 0 | nxt_socket_t s; |
1013 | 0 | struct sockaddr *sa; |
1014 | 0 | nxt_listen_event_t *lev; |
1015 | |
|
1016 | 0 | lev = obj; |
1017 | 0 | c = lev->next; |
1018 | |
|
1019 | 0 | lev->ready--; |
1020 | 0 | lev->socket.read_ready = (lev->ready != 0); |
1021 | |
|
1022 | 0 | sa = &c->remote->u.sockaddr; |
1023 | 0 | socklen = c->remote->socklen; |
1024 | | /* |
1025 | | * The returned socklen is ignored here, |
1026 | | * see comment in nxt_conn_io_accept(). |
1027 | | */ |
1028 | 0 | s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); |
1029 | |
|
1030 | 0 | if (s != -1) { |
1031 | 0 | c->socket.fd = s; |
1032 | |
|
1033 | 0 | nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); |
1034 | |
|
1035 | 0 | nxt_conn_accept(task, lev, c); |
1036 | 0 | return; |
1037 | 0 | } |
1038 | | |
1039 | 0 | nxt_conn_accept_error(task, lev, "accept4", nxt_errno); |
1040 | 0 | } |
1041 | | |
1042 | | #endif |
1043 | | |
1044 | | |
1045 | | #if (NXT_HAVE_EPOLL_EDGE) |
1046 | | |
1047 | | /* |
1048 | | * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() |
1049 | | * syscall to test pending connect() error. Although this special |
1050 | | * interface can work in both edge-triggered and level-triggered |
1051 | | * modes it is enabled only for the former mode because this mode is |
1052 | | * available in all modern Linux distributions. For the latter mode |
1053 | | * it is required to create additional nxt_epoll_level_event_conn_io |
1054 | | * with single non-generic connect() interface. |
1055 | | */ |
1056 | | |
1057 | | static void |
1058 | | nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) |
1059 | 0 | { |
1060 | 0 | nxt_conn_t *c; |
1061 | 0 | nxt_event_engine_t *engine; |
1062 | 0 | nxt_work_handler_t handler; |
1063 | 0 | const nxt_event_conn_state_t *state; |
1064 | |
|
1065 | 0 | c = obj; |
1066 | |
|
1067 | 0 | state = c->write_state; |
1068 | |
|
1069 | 0 | switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { |
1070 | | |
1071 | 0 | case NXT_OK: |
1072 | 0 | c->socket.write_ready = 1; |
1073 | 0 | handler = state->ready_handler; |
1074 | 0 | break; |
1075 | | |
1076 | 0 | case NXT_AGAIN: |
1077 | 0 | c->socket.write_handler = nxt_epoll_edge_conn_connected; |
1078 | 0 | c->socket.error_handler = nxt_conn_connect_error; |
1079 | |
|
1080 | 0 | engine = task->thread->engine; |
1081 | 0 | nxt_conn_timer(engine, c, state, &c->write_timer); |
1082 | |
|
1083 | 0 | nxt_epoll_enable(engine, &c->socket); |
1084 | 0 | c->socket.read = NXT_EVENT_BLOCKED; |
1085 | 0 | return; |
1086 | | |
1087 | | #if 0 |
1088 | | case NXT_AGAIN: |
1089 | | nxt_conn_timer(engine, c, state, &c->write_timer); |
1090 | | |
1091 | | /* Fall through. */ |
1092 | | |
1093 | | case NXT_OK: |
1094 | | /* |
1095 | | * Mark both read and write directions as ready and try to perform |
1096 | | * I/O operations before receiving readiness notifications. |
1097 | | * On unconnected socket Linux send() and recv() return EAGAIN |
1098 | | * instead of ENOTCONN. |
1099 | | */ |
1100 | | c->socket.read_ready = 1; |
1101 | | c->socket.write_ready = 1; |
1102 | | /* |
1103 | | * Enabling both read and write notifications on a getting |
1104 | | * connected socket eliminates one epoll_ctl() syscall. |
1105 | | */ |
1106 | | c->socket.write_handler = nxt_epoll_edge_event_conn_connected; |
1107 | | c->socket.error_handler = state->error_handler; |
1108 | | |
1109 | | nxt_epoll_enable(engine, &c->socket); |
1110 | | c->socket.read = NXT_EVENT_BLOCKED; |
1111 | | |
1112 | | handler = state->ready_handler; |
1113 | | break; |
1114 | | #endif |
1115 | | |
1116 | 0 | case NXT_ERROR: |
1117 | 0 | handler = state->error_handler; |
1118 | 0 | break; |
1119 | | |
1120 | 0 | default: /* NXT_DECLINED: connection refused. */ |
1121 | 0 | handler = state->close_handler; |
1122 | 0 | break; |
1123 | 0 | } |
1124 | | |
1125 | 0 | nxt_work_queue_add(c->write_work_queue, handler, task, c, data); |
1126 | 0 | } |
1127 | | |
1128 | | |
1129 | | static void |
1130 | | nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) |
1131 | 0 | { |
1132 | 0 | nxt_conn_t *c; |
1133 | |
|
1134 | 0 | c = obj; |
1135 | |
|
1136 | 0 | nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); |
1137 | |
|
1138 | 0 | if (!c->socket.epoll_error) { |
1139 | 0 | c->socket.write = NXT_EVENT_BLOCKED; |
1140 | |
|
1141 | 0 | if (c->write_state->timer_autoreset) { |
1142 | 0 | nxt_timer_disable(task->thread->engine, &c->write_timer); |
1143 | 0 | } |
1144 | |
|
1145 | 0 | nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, |
1146 | 0 | task, c, data); |
1147 | 0 | return; |
1148 | 0 | } |
1149 | | |
1150 | 0 | nxt_conn_connect_test(task, c, data); |
1151 | 0 | } |
1152 | | |
1153 | | |
1154 | | /* |
1155 | | * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around |
1156 | | * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF |
1157 | | * in edge-triggered mode. |
1158 | | */ |
1159 | | |
1160 | | static ssize_t |
1161 | | nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) |
1162 | 0 | { |
1163 | 0 | ssize_t n; |
1164 | |
|
1165 | 0 | n = nxt_conn_io_recvbuf(c, b); |
1166 | |
|
1167 | 0 | if (n > 0 && c->socket.epoll_eof) { |
1168 | 0 | c->socket.read_ready = 1; |
1169 | 0 | } |
1170 | |
|
1171 | 0 | return n; |
1172 | 0 | } |
1173 | | |
1174 | | #endif |