Coverage Report

Created: 2025-10-10 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/unit/src/nxt_epoll_engine.c
Line
Count
Source
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
10
/*
11
 * The first epoll version has been introduced in Linux 2.5.44.  The
12
 * interface was changed several times since then and the final version
13
 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14
 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15
 *
16
 * EPOLLET mode did not work reliable in early implementaions and in
17
 * Linux 2.4 backport.
18
 *
19
 * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
20
 * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
21
 * epoll_pwait()            Linux 2.6.19, glibc 2.6.
22
 * signalfd()               Linux 2.6.22, glibc 2.7.
23
 * eventfd()                Linux 2.6.22, glibc 2.7.
24
 * timerfd_create()         Linux 2.6.25, glibc 2.8.
25
 * epoll_create1()          Linux 2.6.27, glibc 2.9.
26
 * signalfd4()              Linux 2.6.27, glibc 2.9.
27
 * eventfd2()               Linux 2.6.27, glibc 2.9.
28
 * accept4()                Linux 2.6.28, glibc 2.10.
29
 * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
30
 * EPOLLEXCLUSIVE           Linux 4.5, glibc 2.24.
31
 */
32
33
34
#if (NXT_HAVE_EPOLL_EDGE)
35
static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36
    nxt_uint_t mchanges, nxt_uint_t mevents);
37
#endif
38
static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39
    nxt_uint_t mchanges, nxt_uint_t mevents);
40
static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41
    nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42
static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43
    nxt_conn_io_t *io);
44
static void nxt_epoll_free(nxt_event_engine_t *engine);
45
static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46
static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47
static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48
static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49
    nxt_fd_event_t *ev);
50
static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51
    nxt_fd_event_t *ev);
52
static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53
    nxt_fd_event_t *ev);
54
static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55
    nxt_fd_event_t *ev);
56
static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57
    nxt_fd_event_t *ev);
58
static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59
    nxt_fd_event_t *ev);
60
static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61
    nxt_fd_event_t *ev);
62
static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63
    nxt_fd_event_t *ev);
64
static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65
    nxt_fd_event_t *ev);
66
static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67
    nxt_fd_event_t *ev);
68
static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69
    int op, uint32_t events);
70
static void nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71
static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72
#if (NXT_HAVE_SIGNALFD)
73
static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74
static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75
#endif
76
#if (NXT_HAVE_EVENTFD)
77
static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78
    nxt_work_handler_t handler);
79
static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80
static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81
#endif
82
static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83
84
#if (NXT_HAVE_ACCEPT4)
85
static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86
    void *data);
87
#endif
88
89
90
#if (NXT_HAVE_EPOLL_EDGE)
91
92
static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93
    void *data);
94
static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95
    void *data);
96
static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97
98
99
static nxt_conn_io_t  nxt_epoll_edge_conn_io = {
100
    .connect = nxt_epoll_edge_conn_io_connect,
101
    .accept = nxt_conn_io_accept,
102
103
    .read = nxt_conn_io_read,
104
    .recvbuf = nxt_epoll_edge_conn_io_recvbuf,
105
    .recv = nxt_conn_io_recv,
106
107
    .write = nxt_conn_io_write,
108
    .sendbuf = nxt_conn_io_sendbuf,
109
110
#if (NXT_HAVE_LINUX_SENDFILE)
111
    .old_sendbuf = nxt_linux_event_conn_io_sendfile,
112
#else
113
    .old_sendbuf = nxt_event_conn_io_sendbuf,
114
#endif
115
116
    .writev = nxt_event_conn_io_writev,
117
    .send = nxt_event_conn_io_send,
118
};
119
120
121
const nxt_event_interface_t  nxt_epoll_edge_engine = {
122
    "epoll_edge",
123
    nxt_epoll_edge_create,
124
    nxt_epoll_free,
125
    nxt_epoll_enable,
126
    nxt_epoll_disable,
127
    nxt_epoll_delete,
128
    nxt_epoll_close,
129
    nxt_epoll_enable_read,
130
    nxt_epoll_enable_write,
131
    nxt_epoll_disable_read,
132
    nxt_epoll_disable_write,
133
    nxt_epoll_block_read,
134
    nxt_epoll_block_write,
135
    nxt_epoll_oneshot_read,
136
    nxt_epoll_oneshot_write,
137
    nxt_epoll_enable_accept,
138
    NULL,
139
    NULL,
140
#if (NXT_HAVE_EVENTFD)
141
    nxt_epoll_enable_post,
142
    nxt_epoll_signal,
143
#else
144
    NULL,
145
    NULL,
146
#endif
147
    nxt_epoll_poll,
148
149
    &nxt_epoll_edge_conn_io,
150
151
#if (NXT_HAVE_INOTIFY)
152
    NXT_FILE_EVENTS,
153
#else
154
    NXT_NO_FILE_EVENTS,
155
#endif
156
157
#if (NXT_HAVE_SIGNALFD)
158
    NXT_SIGNAL_EVENTS,
159
#else
160
    NXT_NO_SIGNAL_EVENTS,
161
#endif
162
};
163
164
#endif
165
166
167
const nxt_event_interface_t  nxt_epoll_level_engine = {
168
    "epoll_level",
169
    nxt_epoll_level_create,
170
    nxt_epoll_free,
171
    nxt_epoll_enable,
172
    nxt_epoll_disable,
173
    nxt_epoll_delete,
174
    nxt_epoll_close,
175
    nxt_epoll_enable_read,
176
    nxt_epoll_enable_write,
177
    nxt_epoll_disable_read,
178
    nxt_epoll_disable_write,
179
    nxt_epoll_block_read,
180
    nxt_epoll_block_write,
181
    nxt_epoll_oneshot_read,
182
    nxt_epoll_oneshot_write,
183
    nxt_epoll_enable_accept,
184
    NULL,
185
    NULL,
186
#if (NXT_HAVE_EVENTFD)
187
    nxt_epoll_enable_post,
188
    nxt_epoll_signal,
189
#else
190
    NULL,
191
    NULL,
192
#endif
193
    nxt_epoll_poll,
194
195
    &nxt_unix_conn_io,
196
197
#if (NXT_HAVE_INOTIFY)
198
    NXT_FILE_EVENTS,
199
#else
200
    NXT_NO_FILE_EVENTS,
201
#endif
202
203
#if (NXT_HAVE_SIGNALFD)
204
    NXT_SIGNAL_EVENTS,
205
#else
206
    NXT_NO_SIGNAL_EVENTS,
207
#endif
208
};
209
210
211
#if (NXT_HAVE_EPOLL_EDGE)
212
213
static nxt_int_t
214
nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
215
    nxt_uint_t mevents)
216
0
{
217
0
    return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
218
0
                            EPOLLET | EPOLLRDHUP);
219
0
}
220
221
#endif
222
223
224
static nxt_int_t
225
nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
226
    nxt_uint_t mevents)
227
0
{
228
0
    return nxt_epoll_create(engine, mchanges, mevents,
229
0
                            &nxt_unix_conn_io, 0);
230
0
}
231
232
233
static nxt_int_t
234
nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
235
    nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
236
0
{
237
0
    engine->u.epoll.fd = -1;
238
0
    engine->u.epoll.mode = mode;
239
0
    engine->u.epoll.mchanges = mchanges;
240
0
    engine->u.epoll.mevents = mevents;
241
0
#if (NXT_HAVE_SIGNALFD)
242
0
    engine->u.epoll.signalfd.fd = -1;
243
0
#endif
244
245
0
    engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
246
0
    if (engine->u.epoll.changes == NULL) {
247
0
        goto fail;
248
0
    }
249
250
0
    engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
251
0
    if (engine->u.epoll.events == NULL) {
252
0
        goto fail;
253
0
    }
254
255
0
    engine->u.epoll.fd = epoll_create(1);
256
0
    if (engine->u.epoll.fd == -1) {
257
0
        nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
258
0
        goto fail;
259
0
    }
260
261
0
    nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
262
263
0
    if (engine->signals != NULL) {
264
265
0
#if (NXT_HAVE_SIGNALFD)
266
267
0
        if (nxt_epoll_add_signal(engine) != NXT_OK) {
268
0
            goto fail;
269
0
        }
270
271
0
#endif
272
273
0
        nxt_epoll_test_accept4(engine, io);
274
0
    }
275
276
0
    return NXT_OK;
277
278
0
fail:
279
280
0
    nxt_epoll_free(engine);
281
282
0
    return NXT_ERROR;
283
0
}
284
285
286
static void
287
nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
288
0
{
289
0
    static nxt_work_handler_t  handler;
290
291
0
    if (handler == NULL) {
292
293
0
        handler = io->accept;
294
295
0
#if (NXT_HAVE_ACCEPT4)
296
297
0
        (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
298
299
0
        if (nxt_errno != NXT_ENOSYS) {
300
0
            handler = nxt_epoll_conn_io_accept4;
301
302
0
        } else {
303
0
            nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
304
0
                    NXT_ENOSYS);
305
0
        }
306
307
0
#endif
308
0
    }
309
310
0
    io->accept = handler;
311
0
}
312
313
314
static void
315
nxt_epoll_free(nxt_event_engine_t *engine)
316
0
{
317
0
    int  fd;
318
319
0
    nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
320
321
0
#if (NXT_HAVE_SIGNALFD)
322
323
0
    fd = engine->u.epoll.signalfd.fd;
324
325
0
    if (fd != -1 && close(fd) != 0) {
326
0
        nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
327
0
    }
328
329
0
#endif
330
331
0
#if (NXT_HAVE_EVENTFD)
332
333
0
    fd = engine->u.epoll.eventfd.fd;
334
335
0
    if (fd != -1 && close(fd) != 0) {
336
0
        nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
337
0
    }
338
339
0
#endif
340
341
0
    fd = engine->u.epoll.fd;
342
343
0
    if (fd != -1 && close(fd) != 0) {
344
0
        nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
345
0
    }
346
347
0
    nxt_free(engine->u.epoll.events);
348
0
    nxt_free(engine->u.epoll.changes);
349
350
0
    nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
351
0
}
352
353
354
static void
355
nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
356
0
{
357
0
    ev->read = NXT_EVENT_ACTIVE;
358
0
    ev->write = NXT_EVENT_ACTIVE;
359
360
0
    nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
361
0
                     EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
362
0
}
363
364
365
static void
366
nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367
0
{
368
0
    if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
369
370
0
        ev->read = NXT_EVENT_INACTIVE;
371
0
        ev->write = NXT_EVENT_INACTIVE;
372
373
0
        nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
374
0
    }
375
0
}
376
377
378
static void
379
nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
380
0
{
381
0
    if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
382
383
0
        ev->read = NXT_EVENT_INACTIVE;
384
0
        ev->write = NXT_EVENT_INACTIVE;
385
386
0
        nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
387
0
    }
388
0
}
389
390
391
/*
392
 * Although calling close() on a file descriptor will remove any epoll
393
 * events that reference the descriptor, in this case the close() acquires
394
 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
395
 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
396
 * only in one epoll set.  Thus removing events explicitly before closing
397
 * eliminates possible lock contention.
398
 */
399
400
static nxt_bool_t
401
nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
402
0
{
403
0
    nxt_epoll_delete(engine, ev);
404
405
0
    return ev->changing;
406
0
}
407
408
409
static void
410
nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
411
0
{
412
0
    int       op;
413
0
    uint32_t  events;
414
415
0
    if (ev->read != NXT_EVENT_BLOCKED) {
416
417
0
        op = EPOLL_CTL_MOD;
418
0
        events = EPOLLIN | engine->u.epoll.mode;
419
420
0
        if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
421
0
            op = EPOLL_CTL_ADD;
422
423
0
        } else if (ev->write >= NXT_EVENT_BLOCKED) {
424
0
            events |= EPOLLOUT;
425
0
        }
426
427
0
        nxt_epoll_change(engine, ev, op, events);
428
0
    }
429
430
0
    ev->read = NXT_EVENT_ACTIVE;
431
0
}
432
433
434
static void
435
nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
436
0
{
437
0
    int       op;
438
0
    uint32_t  events;
439
440
0
    if (ev->write != NXT_EVENT_BLOCKED) {
441
442
0
        op = EPOLL_CTL_MOD;
443
0
        events = EPOLLOUT | engine->u.epoll.mode;
444
445
0
        if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
446
0
            op = EPOLL_CTL_ADD;
447
448
0
        } else if (ev->read >= NXT_EVENT_BLOCKED) {
449
0
            events |= EPOLLIN;
450
0
        }
451
452
0
        nxt_epoll_change(engine, ev, op, events);
453
0
    }
454
455
0
    ev->write = NXT_EVENT_ACTIVE;
456
0
}
457
458
459
static void
460
nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
461
0
{
462
0
    int       op;
463
0
    uint32_t  events;
464
465
0
    ev->read = NXT_EVENT_INACTIVE;
466
467
0
    if (ev->write <= NXT_EVENT_DISABLED) {
468
0
        ev->write = NXT_EVENT_INACTIVE;
469
0
        op = EPOLL_CTL_DEL;
470
0
        events = 0;
471
472
0
    } else {
473
0
        op = EPOLL_CTL_MOD;
474
0
        events = EPOLLOUT | engine->u.epoll.mode;
475
0
    }
476
477
0
    nxt_epoll_change(engine, ev, op, events);
478
0
}
479
480
481
static void
482
nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
483
0
{
484
0
    int       op;
485
0
    uint32_t  events;
486
487
0
    ev->write = NXT_EVENT_INACTIVE;
488
489
0
    if (ev->read <= NXT_EVENT_DISABLED) {
490
0
        ev->read = NXT_EVENT_INACTIVE;
491
0
        op = EPOLL_CTL_DEL;
492
0
        events = 0;
493
494
0
    } else {
495
0
        op = EPOLL_CTL_MOD;
496
0
        events = EPOLLIN | engine->u.epoll.mode;
497
0
    }
498
499
0
    nxt_epoll_change(engine, ev, op, events);
500
0
}
501
502
503
static void
504
nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
505
0
{
506
0
    if (ev->read != NXT_EVENT_INACTIVE) {
507
0
        ev->read = NXT_EVENT_BLOCKED;
508
0
    }
509
0
}
510
511
512
static void
513
nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
514
0
{
515
0
    if (ev->write != NXT_EVENT_INACTIVE) {
516
0
        ev->write = NXT_EVENT_BLOCKED;
517
0
    }
518
0
}
519
520
521
/*
522
 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
523
 * event should be added or modified, epoll_ctl(2):
524
 *
525
 * EPOLLONESHOT (since Linux 2.6.2)
526
 *     Sets the one-shot behavior for the associated file descriptor.
527
 *     This means that after an event is pulled out with epoll_wait(2)
528
 *     the associated file descriptor is internally disabled and no
529
 *     other events will be reported by the epoll interface.  The user
530
 *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
531
 *     descriptor with a new event mask.
532
 */
533
534
static void
535
nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
536
0
{
537
0
    int  op;
538
539
0
    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
540
0
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;
541
542
0
    ev->read = NXT_EVENT_ONESHOT;
543
0
    ev->write = NXT_EVENT_INACTIVE;
544
545
0
    nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
546
0
}
547
548
549
static void
550
nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
551
0
{
552
0
    int  op;
553
554
0
    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
555
0
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;
556
557
0
    ev->read = NXT_EVENT_INACTIVE;
558
0
    ev->write = NXT_EVENT_ONESHOT;
559
560
0
    nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
561
0
}
562
563
564
static void
565
nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
566
0
{
567
0
    uint32_t  events;
568
569
0
    ev->read = NXT_EVENT_ACTIVE;
570
571
0
    events = EPOLLIN;
572
573
0
#ifdef EPOLLEXCLUSIVE
574
0
    events |= EPOLLEXCLUSIVE;
575
0
#endif
576
577
0
    nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
578
0
}
579
580
581
/*
582
 * epoll changes are batched to improve instruction and data cache
583
 * locality of several epoll_ctl() calls followed by epoll_wait() call.
584
 */
585
586
static void
587
nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
588
    uint32_t events)
589
0
{
590
0
    nxt_epoll_change_t  *change;
591
592
0
    nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
593
0
              engine->u.epoll.fd, ev->fd, op, events);
594
595
0
    if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
596
0
        nxt_epoll_commit_changes(engine);
597
0
    }
598
599
0
    ev->changing = 1;
600
601
0
    change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
602
0
    change->op = op;
603
0
    change->event.events = events;
604
0
    change->event.data.ptr = ev;
605
0
}
606
607
608
static void
609
nxt_epoll_commit_changes(nxt_event_engine_t *engine)
610
0
{
611
0
    int                 ret;
612
0
    nxt_fd_event_t      *ev;
613
0
    nxt_epoll_change_t  *change, *end;
614
615
0
    nxt_debug(&engine->task, "epoll %d changes:%ui",
616
0
              engine->u.epoll.fd, engine->u.epoll.nchanges);
617
618
0
    change = engine->u.epoll.changes;
619
0
    end = change + engine->u.epoll.nchanges;
620
621
0
    do {
622
0
        ev = change->event.data.ptr;
623
0
        ev->changing = 0;
624
625
0
        nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
626
0
                  engine->u.epoll.fd, ev->fd, change->op,
627
0
                  change->event.events);
628
629
0
        ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
630
631
0
        if (nxt_slow_path(ret != 0)) {
632
0
            nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
633
0
                      engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
634
635
0
            nxt_work_queue_add(&engine->fast_work_queue,
636
0
                               nxt_epoll_error_handler, ev->task, ev, ev->data);
637
638
0
            engine->u.epoll.error = 1;
639
0
        }
640
641
0
        change++;
642
643
0
    } while (change < end);
644
645
0
    engine->u.epoll.nchanges = 0;
646
0
}
647
648
649
static void
650
nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
651
0
{
652
0
    nxt_fd_event_t  *ev;
653
654
0
    ev = obj;
655
656
0
    ev->read = NXT_EVENT_INACTIVE;
657
0
    ev->write = NXT_EVENT_INACTIVE;
658
659
0
    ev->error_handler(ev->task, ev, data);
660
0
}
661
662
663
#if (NXT_HAVE_SIGNALFD)
664
665
static nxt_int_t
666
nxt_epoll_add_signal(nxt_event_engine_t *engine)
667
0
{
668
0
    int                 fd;
669
0
    struct epoll_event  ee;
670
671
0
    if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
672
0
        nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
673
0
        return NXT_ERROR;
674
0
    }
675
676
    /*
677
     * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
678
     * and 2.8 signalfd() wrappers call the original signalfd() syscall
679
     * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
680
     * tries to call signalfd4() syscall and if it fails then calls the
681
     * original signalfd() syscall.  For this reason the non-blocking mode
682
     * is set separately.
683
     */
684
685
0
    fd = signalfd(-1, &engine->signals->sigmask, 0);
686
687
0
    if (fd == -1) {
688
0
        nxt_alert(&engine->task, "signalfd(%d) failed %E",
689
0
                  engine->u.epoll.signalfd.fd, nxt_errno);
690
0
        return NXT_ERROR;
691
0
    }
692
693
0
    engine->u.epoll.signalfd.fd = fd;
694
695
0
    if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
696
0
        return NXT_ERROR;
697
0
    }
698
699
0
    nxt_debug(&engine->task, "signalfd(): %d", fd);
700
701
0
    engine->u.epoll.signalfd.data = engine->signals->handler;
702
0
    engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
703
0
    engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
704
0
    engine->u.epoll.signalfd.log = engine->task.log;
705
0
    engine->u.epoll.signalfd.task = &engine->task;
706
707
0
    ee.events = EPOLLIN;
708
0
    ee.data.ptr = &engine->u.epoll.signalfd;
709
710
0
    if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
711
0
        nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
712
0
                  engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
713
714
0
        return NXT_ERROR;
715
0
    }
716
717
0
    return NXT_OK;
718
0
}
719
720
721
static void
722
nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
723
0
{
724
0
    int                      n;
725
0
    nxt_fd_event_t           *ev;
726
0
    nxt_work_handler_t       handler;
727
0
    struct signalfd_siginfo  sfd;
728
729
0
    ev = obj;
730
0
    handler = data;
731
732
0
    nxt_debug(task, "signalfd handler");
733
734
0
    n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
735
736
0
    nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
737
738
0
    if (n != sizeof(struct signalfd_siginfo)) {
739
0
        nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
740
0
        return;
741
0
    }
742
743
0
    nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
744
745
0
    handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
746
0
}
747
748
#endif
749
750
751
#if (NXT_HAVE_EVENTFD)
752
753
static nxt_int_t
754
nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
755
0
{
756
0
    int                 ret;
757
0
    struct epoll_event  ee;
758
759
0
    engine->u.epoll.post_handler = handler;
760
761
    /*
762
     * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
763
     * and 2.8 eventfd() wrappers call the original eventfd() syscall
764
     * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
765
     * tries to call eventfd2() syscall and if it fails then calls the
766
     * original eventfd() syscall.  For this reason the non-blocking mode
767
     * is set separately.
768
     */
769
770
0
    engine->u.epoll.eventfd.fd = eventfd(0, 0);
771
772
0
    if (engine->u.epoll.eventfd.fd == -1) {
773
0
        nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
774
0
        return NXT_ERROR;
775
0
    }
776
777
0
    ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
778
0
    if (nxt_slow_path(ret != NXT_OK)) {
779
0
        return NXT_ERROR;
780
0
    }
781
782
0
    nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
783
784
0
    engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
785
0
    engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
786
0
    engine->u.epoll.eventfd.data = engine;
787
0
    engine->u.epoll.eventfd.log = engine->task.log;
788
0
    engine->u.epoll.eventfd.task = &engine->task;
789
790
0
    ee.events = EPOLLIN | EPOLLET;
791
0
    ee.data.ptr = &engine->u.epoll.eventfd;
792
793
0
    ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
794
0
                    engine->u.epoll.eventfd.fd, &ee);
795
796
0
    if (nxt_fast_path(ret == 0)) {
797
0
        return NXT_OK;
798
0
    }
799
800
0
    nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
801
0
              engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
802
0
              nxt_errno);
803
804
0
    return NXT_ERROR;
805
0
}
806
807
808
static void
809
nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
810
0
{
811
0
    int                 n;
812
0
    uint64_t            events;
813
0
    nxt_event_engine_t  *engine;
814
815
0
    engine = data;
816
817
0
    nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
818
819
    /*
820
     * The maximum value after write() to a eventfd() descriptor will
821
     * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
822
     * can be read once per many notifications, for example, once per
823
     * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
824
     * always registered in EPOLLET mode, epoll returns event about
825
     * only the latest write() to the descriptor.
826
     */
827
828
0
    if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
829
0
        engine->u.epoll.neventfd = 0;
830
831
0
        n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
832
833
0
        nxt_debug(task, "read(%d): %d events:%uL",
834
0
                  engine->u.epoll.eventfd.fd, n, events);
835
836
0
        if (n != sizeof(uint64_t)) {
837
0
            nxt_alert(task, "read eventfd(%d) failed %E",
838
0
                      engine->u.epoll.eventfd.fd, nxt_errno);
839
0
        }
840
0
    }
841
842
0
    engine->u.epoll.post_handler(task, NULL, NULL);
843
0
}
844
845
846
static void
847
nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
848
0
{
849
0
    size_t    ret;
850
0
    uint64_t  event;
851
852
    /*
853
     * eventfd() presents along with signalfd(), so the function
854
     * is used only to post events and the signo argument is ignored.
855
     */
856
857
0
    event = 1;
858
859
0
    ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
860
861
0
    if (nxt_slow_path(ret != sizeof(uint64_t))) {
862
0
        nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
863
0
                  engine->u.epoll.eventfd.fd, nxt_errno);
864
0
    }
865
0
}
866
867
#endif
868
869
870
static void
871
nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
872
0
{
873
0
    int                 nevents;
874
0
    uint32_t            events;
875
0
    nxt_int_t           i;
876
0
    nxt_err_t           err;
877
0
    nxt_bool_t          error;
878
0
    nxt_uint_t          level;
879
0
    nxt_fd_event_t      *ev;
880
0
    struct epoll_event  *event;
881
882
0
    if (engine->u.epoll.nchanges != 0) {
883
0
        nxt_epoll_commit_changes(engine);
884
0
    }
885
886
0
    if (engine->u.epoll.error) {
887
0
        engine->u.epoll.error = 0;
888
        /* Error handlers have been enqueued on failure. */
889
0
        timeout = 0;
890
0
    }
891
892
0
    nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
893
0
              engine->u.epoll.fd, timeout);
894
895
0
    nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
896
0
                         engine->u.epoll.mevents, timeout);
897
898
0
    err = (nevents == -1) ? nxt_errno : 0;
899
900
0
    nxt_thread_time_update(engine->task.thread);
901
902
0
    nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
903
904
0
    if (nevents == -1) {
905
0
        level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
906
907
0
        nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
908
0
                engine->u.epoll.fd, err);
909
910
0
        return;
911
0
    }
912
913
0
    for (i = 0; i < nevents; i++) {
914
915
0
        event = &engine->u.epoll.events[i];
916
0
        events = event->events;
917
0
        ev = event->data.ptr;
918
919
0
        nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
920
0
                  ev->fd, events, ev, ev->read, ev->write);
921
922
        /*
923
         * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN
924
         * or EPOLLOUT, so the "error" variable enqueues only error handler.
925
         */
926
0
        error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
927
0
        ev->epoll_error = error;
928
929
0
        if (error
930
0
            && ev->read <= NXT_EVENT_BLOCKED
931
0
            && ev->write <= NXT_EVENT_BLOCKED)
932
0
        {
933
0
            error = 0;
934
0
        }
935
936
0
#if (NXT_HAVE_EPOLL_EDGE)
937
938
0
        ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
939
940
0
#endif
941
942
0
        if ((events & EPOLLIN) != 0) {
943
0
            ev->read_ready = 1;
944
945
0
            if (ev->read != NXT_EVENT_BLOCKED) {
946
947
0
                if (ev->read == NXT_EVENT_ONESHOT) {
948
0
                    ev->read = NXT_EVENT_DISABLED;
949
0
                }
950
951
0
                nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
952
0
                                   ev->task, ev, ev->data);
953
954
0
                error = 0;
955
956
0
            } else if (engine->u.epoll.mode == 0) {
957
                /* Level-triggered mode. */
958
0
                nxt_epoll_disable_read(engine, ev);
959
0
            }
960
0
        }
961
962
0
        if ((events & EPOLLOUT) != 0) {
963
0
            ev->write_ready = 1;
964
965
0
            if (ev->write != NXT_EVENT_BLOCKED) {
966
967
0
                if (ev->write == NXT_EVENT_ONESHOT) {
968
0
                    ev->write = NXT_EVENT_DISABLED;
969
0
                }
970
971
0
                nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
972
0
                                   ev->task, ev, ev->data);
973
974
0
                error = 0;
975
976
0
            } else if (engine->u.epoll.mode == 0) {
977
                /* Level-triggered mode. */
978
0
                nxt_epoll_disable_write(engine, ev);
979
0
            }
980
0
        }
981
982
0
        if (!error) {
983
0
            continue;
984
0
        }
985
986
0
        ev->read_ready = 1;
987
0
        ev->write_ready = 1;
988
989
0
        if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) {
990
991
0
            if (engine->u.epoll.mode == 0) {
992
                /* Level-triggered mode. */
993
0
                nxt_epoll_disable(engine, ev);
994
0
            }
995
996
0
            continue;
997
0
        }
998
999
0
        nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler,
1000
0
                           ev->task, ev, ev->data);
1001
0
    }
1002
0
}
1003
1004
1005
#if (NXT_HAVE_ACCEPT4)
1006
1007
static void
1008
nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
1009
0
{
1010
0
    socklen_t           socklen;
1011
0
    nxt_conn_t          *c;
1012
0
    nxt_socket_t        s;
1013
0
    struct sockaddr     *sa;
1014
0
    nxt_listen_event_t  *lev;
1015
1016
0
    lev = obj;
1017
0
    c = lev->next;
1018
1019
0
    lev->ready--;
1020
0
    lev->socket.read_ready = (lev->ready != 0);
1021
1022
0
    sa = &c->remote->u.sockaddr;
1023
0
    socklen = c->remote->socklen;
1024
    /*
1025
     * The returned socklen is ignored here,
1026
     * see comment in nxt_conn_io_accept().
1027
     */
1028
0
    s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1029
1030
0
    if (s != -1) {
1031
0
        c->socket.fd = s;
1032
1033
0
        nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1034
1035
0
        nxt_conn_accept(task, lev, c);
1036
0
        return;
1037
0
    }
1038
1039
0
    nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1040
0
}
1041
1042
#endif
1043
1044
1045
#if (NXT_HAVE_EPOLL_EDGE)
1046
1047
/*
1048
 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1049
 * syscall to test pending connect() error.  Although this special
1050
 * interface can work in both edge-triggered and level-triggered
1051
 * modes it is enabled only for the former mode because this mode is
1052
 * available in all modern Linux distributions.  For the latter mode
1053
 * it is required to create additional nxt_epoll_level_event_conn_io
1054
 * with single non-generic connect() interface.
1055
 */
1056
1057
static void
1058
nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1059
0
{
1060
0
    nxt_conn_t                    *c;
1061
0
    nxt_event_engine_t            *engine;
1062
0
    nxt_work_handler_t            handler;
1063
0
    const nxt_event_conn_state_t  *state;
1064
1065
0
    c = obj;
1066
1067
0
    state = c->write_state;
1068
1069
0
    switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
1070
1071
0
    case NXT_OK:
1072
0
        c->socket.write_ready = 1;
1073
0
        handler = state->ready_handler;
1074
0
        break;
1075
1076
0
    case NXT_AGAIN:
1077
0
        c->socket.write_handler = nxt_epoll_edge_conn_connected;
1078
0
        c->socket.error_handler = nxt_conn_connect_error;
1079
1080
0
        engine = task->thread->engine;
1081
0
        nxt_conn_timer(engine, c, state, &c->write_timer);
1082
1083
0
        nxt_epoll_enable(engine, &c->socket);
1084
0
        c->socket.read = NXT_EVENT_BLOCKED;
1085
0
        return;
1086
1087
#if 0
1088
    case NXT_AGAIN:
1089
        nxt_conn_timer(engine, c, state, &c->write_timer);
1090
1091
        /* Fall through. */
1092
1093
    case NXT_OK:
1094
        /*
1095
         * Mark both read and write directions as ready and try to perform
1096
         * I/O operations before receiving readiness notifications.
1097
         * On unconnected socket Linux send() and recv() return EAGAIN
1098
         * instead of ENOTCONN.
1099
         */
1100
        c->socket.read_ready = 1;
1101
        c->socket.write_ready = 1;
1102
        /*
1103
         * Enabling both read and write notifications on a getting
1104
         * connected socket eliminates one epoll_ctl() syscall.
1105
         */
1106
        c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1107
        c->socket.error_handler = state->error_handler;
1108
1109
        nxt_epoll_enable(engine, &c->socket);
1110
        c->socket.read = NXT_EVENT_BLOCKED;
1111
1112
        handler = state->ready_handler;
1113
        break;
1114
#endif
1115
1116
0
    case NXT_ERROR:
1117
0
        handler = state->error_handler;
1118
0
        break;
1119
1120
0
    default:  /* NXT_DECLINED: connection refused. */
1121
0
        handler = state->close_handler;
1122
0
        break;
1123
0
    }
1124
1125
0
    nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1126
0
}
1127
1128
1129
static void
1130
nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1131
0
{
1132
0
    nxt_conn_t  *c;
1133
1134
0
    c = obj;
1135
1136
0
    nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1137
1138
0
    if (!c->socket.epoll_error) {
1139
0
        c->socket.write = NXT_EVENT_BLOCKED;
1140
1141
0
        if (c->write_state->timer_autoreset) {
1142
0
            nxt_timer_disable(task->thread->engine, &c->write_timer);
1143
0
        }
1144
1145
0
        nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1146
0
                           task, c, data);
1147
0
        return;
1148
0
    }
1149
1150
0
    nxt_conn_connect_test(task, c, data);
1151
0
}
1152
1153
1154
/*
1155
 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1156
 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1157
 * in edge-triggered mode.
1158
 */
1159
1160
static ssize_t
1161
nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1162
0
{
1163
0
    ssize_t  n;
1164
1165
0
    n = nxt_conn_io_recvbuf(c, b);
1166
1167
0
    if (n > 0 && c->socket.epoll_eof) {
1168
0
        c->socket.read_ready = 1;
1169
0
    }
1170
1171
0
    return n;
1172
0
}
1173
1174
#endif