Coverage Report

Created: 2023-09-25 07:18

/src/unit/src/nxt_poll_engine.c
Line
Count
Source (jump to first uncovered line)
1
2
/*
3
 * Copyright (C) Igor Sysoev
4
 * Copyright (C) NGINX, Inc.
5
 */
6
7
#include <nxt_main.h>
8
9
10
0
#define NXT_POLL_ADD     0
11
0
#define NXT_POLL_CHANGE  1
12
0
#define NXT_POLL_DELETE  2
13
14
15
typedef struct {
16
    /*
17
     * A file descriptor is stored in hash entry to allow
18
     * nxt_poll_fd_hash_test() to not dereference a pointer to
19
     * nxt_fd_event_t which may be invalid if the file descriptor has
20
     * been already closed and the nxt_fd_event_t's memory has been freed.
21
     */
22
    nxt_socket_t         fd;
23
24
    uint32_t             index;
25
    void                 *event;
26
} nxt_poll_hash_entry_t;
27
28
29
static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine,
30
    nxt_uint_t mchanges, nxt_uint_t mevents);
31
static void nxt_poll_free(nxt_event_engine_t *engine);
32
static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
33
static void nxt_poll_disable(nxt_event_engine_t *engine,
34
    nxt_fd_event_t *ev);
35
static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine,
36
    nxt_fd_event_t *ev);
37
static void nxt_poll_enable_read(nxt_event_engine_t *engine,
38
    nxt_fd_event_t *ev);
39
static void nxt_poll_enable_write(nxt_event_engine_t *engine,
40
    nxt_fd_event_t *ev);
41
static void nxt_poll_disable_read(nxt_event_engine_t *engine,
42
    nxt_fd_event_t *ev);
43
static void nxt_poll_disable_write(nxt_event_engine_t *engine,
44
    nxt_fd_event_t *ev);
45
static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46
static void nxt_poll_block_write(nxt_event_engine_t *engine,
47
    nxt_fd_event_t *ev);
48
static void nxt_poll_oneshot_read(nxt_event_engine_t *engine,
49
    nxt_fd_event_t *ev);
50
static void nxt_poll_oneshot_write(nxt_event_engine_t *engine,
51
    nxt_fd_event_t *ev);
52
static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
53
    nxt_uint_t op, nxt_uint_t events);
54
static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine);
55
static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine,
56
    nxt_fd_event_t *ev, int events);
57
static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine,
58
    nxt_fd_t fd, int events);
59
static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd);
60
static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
61
static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine,
62
    nxt_fd_t fd);
63
static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
64
static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine,
65
    nxt_lvlhsh_t *lh);
66
67
68
const nxt_event_interface_t  nxt_poll_engine = {
69
    "poll",
70
    nxt_poll_create,
71
    nxt_poll_free,
72
    nxt_poll_enable,
73
    nxt_poll_disable,
74
    nxt_poll_disable,
75
    nxt_poll_close,
76
    nxt_poll_enable_read,
77
    nxt_poll_enable_write,
78
    nxt_poll_disable_read,
79
    nxt_poll_disable_write,
80
    nxt_poll_block_read,
81
    nxt_poll_block_write,
82
    nxt_poll_oneshot_read,
83
    nxt_poll_oneshot_write,
84
    nxt_poll_enable_read,
85
    NULL,
86
    NULL,
87
    NULL,
88
    NULL,
89
    nxt_poll,
90
91
    &nxt_unix_conn_io,
92
93
    NXT_NO_FILE_EVENTS,
94
    NXT_NO_SIGNAL_EVENTS,
95
};
96
97
98
static const nxt_lvlhsh_proto_t  nxt_poll_fd_hash_proto  nxt_aligned(64) =
99
{
100
    NXT_LVLHSH_LARGE_MEMALIGN,
101
    nxt_poll_fd_hash_test,
102
    nxt_lvlhsh_alloc,
103
    nxt_lvlhsh_free,
104
};
105
106
107
static nxt_int_t
108
nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
109
    nxt_uint_t mevents)
110
0
{
111
0
    engine->u.poll.mchanges = mchanges;
112
113
0
    engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
114
115
0
    if (engine->u.poll.changes != NULL) {
116
0
        return NXT_OK;
117
0
    }
118
119
0
    return NXT_ERROR;
120
0
}
121
122
123
static void
124
nxt_poll_free(nxt_event_engine_t *engine)
125
0
{
126
0
    nxt_debug(&engine->task, "poll free");
127
128
0
    nxt_free(engine->u.poll.set);
129
0
    nxt_free(engine->u.poll.changes);
130
0
    nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash);
131
132
0
    nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t));
133
0
}
134
135
136
static void
137
nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
138
0
{
139
0
    ev->read = NXT_EVENT_ACTIVE;
140
0
    ev->write = NXT_EVENT_ACTIVE;
141
142
0
    nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
143
0
}
144
145
146
static void
147
nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
148
0
{
149
0
    if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) {
150
0
        ev->read = NXT_EVENT_INACTIVE;
151
0
        ev->write = NXT_EVENT_INACTIVE;
152
153
0
        nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
154
0
    }
155
0
}
156
157
158
static nxt_bool_t
159
nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
160
0
{
161
0
    nxt_poll_disable(engine, ev);
162
163
0
    return ev->changing;
164
0
}
165
166
167
static void
168
nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
169
0
{
170
0
    nxt_uint_t  op, events;
171
172
0
    ev->read = NXT_EVENT_ACTIVE;
173
174
0
    if (ev->write == NXT_EVENT_INACTIVE) {
175
0
        op = NXT_POLL_ADD;
176
0
        events = POLLIN;
177
178
0
    } else {
179
0
        op = NXT_POLL_CHANGE;
180
0
        events = POLLIN | POLLOUT;
181
0
    }
182
183
0
    nxt_poll_change(engine, ev, op, events);
184
0
}
185
186
187
static void
188
nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
189
0
{
190
0
    nxt_uint_t  op, events;
191
192
0
    ev->write = NXT_EVENT_ACTIVE;
193
194
0
    if (ev->read == NXT_EVENT_INACTIVE) {
195
0
        op = NXT_POLL_ADD;
196
0
        events = POLLOUT;
197
198
0
    } else {
199
0
        op = NXT_POLL_CHANGE;
200
0
        events = POLLIN | POLLOUT;
201
0
    }
202
203
0
    nxt_poll_change(engine, ev, op, events);
204
0
}
205
206
207
static void
208
nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
209
0
{
210
0
    nxt_uint_t  op, events;
211
212
0
    ev->read = NXT_EVENT_INACTIVE;
213
214
0
    if (ev->write == NXT_EVENT_INACTIVE) {
215
0
        op = NXT_POLL_DELETE;
216
0
        events = 0;
217
218
0
    } else {
219
0
        op = NXT_POLL_CHANGE;
220
0
        events = POLLOUT;
221
0
    }
222
223
0
    nxt_poll_change(engine, ev, op, events);
224
0
}
225
226
227
static void
228
nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
229
0
{
230
0
    nxt_uint_t  op, events;
231
232
0
    ev->write = NXT_EVENT_INACTIVE;
233
234
0
    if (ev->read == NXT_EVENT_INACTIVE) {
235
0
        op = NXT_POLL_DELETE;
236
0
        events = 0;
237
238
0
    } else {
239
0
        op = NXT_POLL_CHANGE;
240
0
        events = POLLIN;
241
0
    }
242
243
0
    nxt_poll_change(engine, ev, op, events);
244
0
}
245
246
247
static void
248
nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
249
0
{
250
0
    if (ev->read != NXT_EVENT_INACTIVE) {
251
0
        nxt_poll_disable_read(engine, ev);
252
0
    }
253
0
}
254
255
256
static void
257
nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
258
0
{
259
0
    if (ev->write != NXT_EVENT_INACTIVE) {
260
0
        nxt_poll_disable_write(engine, ev);
261
0
    }
262
0
}
263
264
265
static void
266
nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
267
0
{
268
0
    nxt_uint_t  op;
269
270
0
    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
271
0
             NXT_POLL_ADD : NXT_POLL_CHANGE;
272
273
0
    ev->read = NXT_EVENT_ONESHOT;
274
0
    ev->write = NXT_EVENT_INACTIVE;
275
276
0
    nxt_poll_change(engine, ev, op, POLLIN);
277
0
}
278
279
280
static void
281
nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
282
0
{
283
0
    nxt_uint_t  op;
284
285
0
    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
286
0
             NXT_POLL_ADD : NXT_POLL_CHANGE;
287
288
0
    ev->read = NXT_EVENT_INACTIVE;
289
0
    ev->write = NXT_EVENT_ONESHOT;
290
291
0
    nxt_poll_change(engine, ev, op, POLLOUT);
292
0
}
293
294
295
/*
296
 * poll changes are batched to improve instruction and data cache
297
 * locality of several lvlhsh operations followed by poll() call.
298
 */
299
300
static void
301
nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op,
302
    nxt_uint_t events)
303
0
{
304
0
    nxt_poll_change_t  *change;
305
306
0
    nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events);
307
308
0
    if (engine->u.poll.nchanges >= engine->u.poll.mchanges) {
309
0
        (void) nxt_poll_commit_changes(engine);
310
0
    }
311
312
0
    ev->changing = 1;
313
314
0
    change = &engine->u.poll.changes[engine->u.poll.nchanges++];
315
0
    change->op = op;
316
0
    change->events = events;
317
0
    change->event = ev;
318
0
}
319
320
321
static nxt_int_t
322
nxt_poll_commit_changes(nxt_event_engine_t *engine)
323
0
{
324
0
    nxt_int_t          ret, retval;
325
0
    nxt_fd_event_t     *ev;
326
0
    nxt_poll_change_t  *change, *end;
327
328
0
    nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges);
329
330
0
    retval = NXT_OK;
331
0
    change = engine->u.poll.changes;
332
0
    end = change + engine->u.poll.nchanges;
333
334
0
    do {
335
0
        ev = change->event;
336
0
        ev->changing = 0;
337
338
0
        switch (change->op) {
339
340
0
        case NXT_POLL_ADD:
341
0
            ret = nxt_poll_set_add(engine, ev, change->events);
342
343
0
            if (nxt_fast_path(ret == NXT_OK)) {
344
0
                goto next;
345
0
            }
346
347
0
            break;
348
349
0
        case NXT_POLL_CHANGE:
350
0
            ret = nxt_poll_set_change(engine, ev->fd, change->events);
351
352
0
            if (nxt_fast_path(ret == NXT_OK)) {
353
0
                goto next;
354
0
            }
355
356
0
            break;
357
358
0
        case NXT_POLL_DELETE:
359
0
            ret = nxt_poll_set_delete(engine, ev->fd);
360
361
0
            if (nxt_fast_path(ret == NXT_OK)) {
362
0
                goto next;
363
0
            }
364
365
0
            break;
366
0
        }
367
368
0
        nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
369
0
                           ev->task, ev, ev->data);
370
371
0
        retval = NXT_ERROR;
372
373
0
    next:
374
375
0
        change++;
376
377
0
    } while (change < end);
378
379
0
    engine->u.poll.nchanges = 0;
380
381
0
    return retval;
382
0
}
383
384
385
static nxt_int_t
386
nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events)
387
0
{
388
0
    nxt_int_t              ret;
389
0
    nxt_uint_t             max_nfds;
390
0
    struct pollfd          *pfd;
391
0
    nxt_lvlhsh_query_t     lhq;
392
0
    nxt_poll_hash_entry_t  *phe;
393
394
0
    nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events);
395
396
0
    if (engine->u.poll.nfds >= engine->u.poll.max_nfds) {
397
0
        max_nfds = engine->u.poll.max_nfds + 512; /* 4K */
398
399
0
        pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds);
400
0
        if (nxt_slow_path(pfd == NULL)) {
401
0
            return NXT_ERROR;
402
0
        }
403
404
0
        engine->u.poll.set = pfd;
405
0
        engine->u.poll.max_nfds = max_nfds;
406
0
    }
407
408
0
    phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
409
0
    if (nxt_slow_path(phe == NULL)) {
410
0
        return NXT_ERROR;
411
0
    }
412
413
0
    phe->fd = ev->fd;
414
0
    phe->index = engine->u.poll.nfds;
415
0
    phe->event = ev;
416
417
0
    pfd = &engine->u.poll.set[engine->u.poll.nfds++];
418
0
    pfd->fd = ev->fd;
419
0
    pfd->events = events;
420
0
    pfd->revents = 0;
421
422
0
    lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t));
423
0
    lhq.replace = 0;
424
0
    lhq.value = phe;
425
0
    lhq.proto = &nxt_poll_fd_hash_proto;
426
0
    lhq.data = engine;
427
428
0
    ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq);
429
430
0
    if (nxt_fast_path(ret == NXT_OK)) {
431
0
        return NXT_OK;
432
0
    }
433
434
0
    nxt_free(phe);
435
436
0
    return NXT_ERROR;
437
0
}
438
439
440
static nxt_int_t
441
nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events)
442
0
{
443
0
    nxt_poll_hash_entry_t  *phe;
444
445
0
    nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi",
446
0
              fd, events);
447
448
0
    phe = nxt_poll_fd_hash_get(engine, fd);
449
450
0
    if (nxt_fast_path(phe != NULL)) {
451
0
        engine->u.poll.set[phe->index].events = events;
452
0
        return NXT_OK;
453
0
    }
454
455
0
    return NXT_ERROR;
456
0
}
457
458
459
static nxt_int_t
460
nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd)
461
0
{
462
0
    nxt_int_t              ret;
463
0
    nxt_uint_t             index, nfds;
464
0
    nxt_lvlhsh_query_t     lhq;
465
0
    nxt_poll_hash_entry_t  *phe;
466
467
0
    nxt_debug(&engine->task, "poll delete event: fd:%d", fd);
468
469
0
    lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
470
0
    lhq.proto = &nxt_poll_fd_hash_proto;
471
0
    lhq.data = engine;
472
473
0
    ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq);
474
475
0
    if (nxt_slow_path(ret != NXT_OK)) {
476
0
        return NXT_ERROR;
477
0
    }
478
479
0
    phe = lhq.value;
480
481
0
    index = phe->index;
482
0
    engine->u.poll.nfds--;
483
0
    nfds = engine->u.poll.nfds;
484
485
0
    if (index != nfds) {
486
0
        engine->u.poll.set[index] = engine->u.poll.set[nfds];
487
488
0
        phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd);
489
490
0
        phe->index = index;
491
0
    }
492
493
0
    nxt_free(lhq.value);
494
495
0
    return NXT_OK;
496
0
}
497
498
499
static void
500
nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
501
0
{
502
0
    int                    nevents;
503
0
    nxt_fd_t               fd;
504
0
    nxt_err_t              err;
505
0
    nxt_bool_t             error;
506
0
    nxt_uint_t             i, events, level;
507
0
    struct pollfd          *pfd;
508
0
    nxt_fd_event_t         *ev;
509
0
    nxt_poll_hash_entry_t  *phe;
510
511
0
    if (engine->u.poll.nchanges != 0) {
512
0
        if (nxt_poll_commit_changes(engine) != NXT_OK) {
513
            /* Error handlers have been enqueued on failure. */
514
0
            timeout = 0;
515
0
        }
516
0
    }
517
518
0
    nxt_debug(&engine->task, "poll() events:%ui timeout:%M",
519
0
              engine->u.poll.nfds, timeout);
520
521
0
    nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout);
522
523
0
    err = (nevents == -1) ? nxt_errno : 0;
524
525
0
    nxt_thread_time_update(engine->task.thread);
526
527
0
    nxt_debug(&engine->task, "poll(): %d", nevents);
528
529
0
    if (nevents == -1) {
530
0
        level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
531
0
        nxt_log(&engine->task, level, "poll() failed %E", err);
532
0
        return;
533
0
    }
534
535
0
    for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) {
536
537
0
        pfd = &engine->u.poll.set[i];
538
0
        events = pfd->revents;
539
540
0
        if (events == 0) {
541
0
            continue;
542
0
        }
543
544
0
        fd = pfd->fd;
545
546
0
        phe = nxt_poll_fd_hash_get(engine, fd);
547
548
0
        if (nxt_slow_path(phe == NULL)) {
549
0
            nxt_alert(&engine->task,
550
0
                      "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
551
0
                      fd, pfd->events, events);
552
553
            /* Mark the poll entry to ignore it by the kernel. */
554
0
            pfd->fd = -1;
555
0
            goto next;
556
0
        }
557
558
0
        ev = phe->event;
559
560
0
        nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d wr:%d",
561
0
                  fd, events, ev->read, ev->write);
562
563
0
        if (nxt_slow_path((events & POLLNVAL) != 0)) {
564
0
            nxt_alert(ev->task, "poll() error fd:%d ev:%04Xd rev:%04uXi",
565
0
                      fd, pfd->events, events);
566
567
            /* Mark the poll entry to ignore it by the kernel. */
568
0
            pfd->fd = -1;
569
570
0
            nxt_work_queue_add(&engine->fast_work_queue,
571
0
                               ev->error_handler, ev->task, ev, ev->data);
572
0
            goto next;
573
0
        }
574
575
        /*
576
         * On a socket's remote end close:
577
         *
578
         *   Linux, FreeBSD, and Solaris set POLLIN;
579
         *   MacOSX sets POLLIN and POLLHUP;
580
         *   NetBSD sets POLLIN, and poll(2) claims this explicitly:
581
         *
582
         *     If the remote end of a socket is closed, poll()
583
         *     returns a POLLIN event, rather than a POLLHUP.
584
         *
585
         * On error:
586
         *
587
         *   Linux sets POLLHUP and POLLERR only;
588
         *   FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
589
         *   claims the opposite:
590
         *
591
         *     Note that POLLHUP and POLLOUT should never be
592
         *     present in the revents bitmask at the same time.
593
         *
594
         *   Solaris and NetBSD do not add POLLHUP or POLLERR;
595
         *   MacOSX sets POLLHUP only.
596
         *
597
         * If an implementation sets POLLERR or POLLHUP only without POLLIN
598
         * or POLLOUT, the "error" variable enqueues only one active handler.
599
         */
600
601
0
        error = (((events & (POLLERR | POLLHUP)) != 0)
602
0
                 && ((events & (POLLIN | POLLOUT)) == 0));
603
604
0
        if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
605
0
            error = 0;
606
0
            ev->read_ready = 1;
607
608
0
            if (ev->read == NXT_EVENT_ONESHOT) {
609
0
                ev->read = NXT_EVENT_INACTIVE;
610
0
                nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
611
0
            }
612
613
0
            nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
614
0
                               ev->task, ev, ev->data);
615
0
        }
616
617
0
        if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
618
0
            ev->write_ready = 1;
619
620
0
            if (ev->write == NXT_EVENT_ONESHOT) {
621
0
                ev->write = NXT_EVENT_INACTIVE;
622
0
                nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
623
0
            }
624
625
0
            nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
626
0
                               ev->task, ev, ev->data);
627
0
        }
628
629
0
    next:
630
631
0
        nevents--;
632
0
    }
633
0
}
634
635
636
static nxt_poll_hash_entry_t *
637
nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd)
638
0
{
639
0
    nxt_lvlhsh_query_t     lhq;
640
0
    nxt_poll_hash_entry_t  *phe;
641
642
0
    lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
643
0
    lhq.proto = &nxt_poll_fd_hash_proto;
644
0
    lhq.data = engine;
645
646
0
    if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) {
647
0
        phe = lhq.value;
648
0
        return phe;
649
0
    }
650
651
0
    nxt_alert(&engine->task, "fd %d not found in hash", fd);
652
653
0
    return NULL;
654
0
}
655
656
657
static nxt_int_t
658
nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
659
0
{
660
0
    nxt_event_engine_t     *engine;
661
0
    nxt_poll_hash_entry_t  *phe;
662
663
0
    phe = data;
664
665
    /* nxt_murmur_hash2() is unique for 4 bytes. */
666
667
0
    engine = lhq->data;
668
669
0
    if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) {
670
0
        return NXT_OK;
671
0
    }
672
673
0
    nxt_alert(&engine->task, "fd %d in hash mismatches fd %d in poll set",
674
0
              phe->fd, engine->u.poll.set[phe->index].fd);
675
676
0
    return NXT_DECLINED;
677
0
}
678
679
680
static void
681
nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh)
682
0
{
683
0
    nxt_poll_hash_entry_t  *phe;
684
685
0
    for ( ;; ) {
686
0
        phe = nxt_lvlhsh_retrieve(lh, &nxt_poll_fd_hash_proto, NULL);
687
688
0
        if (phe == NULL) {
689
0
            return;
690
0
        }
691
692
0
        nxt_free(phe);
693
0
    }
694
0
}