/src/unit/src/nxt_poll_engine.c
Line | Count | Source (jump to first uncovered line) |
1 | | |
2 | | /* |
3 | | * Copyright (C) Igor Sysoev |
4 | | * Copyright (C) NGINX, Inc. |
5 | | */ |
6 | | |
7 | | #include <nxt_main.h> |
8 | | |
9 | | |
10 | 0 | #define NXT_POLL_ADD 0 |
11 | 0 | #define NXT_POLL_CHANGE 1 |
12 | 0 | #define NXT_POLL_DELETE 2 |
13 | | |
14 | | |
15 | | typedef struct { |
16 | | /* |
17 | | * A file descriptor is stored in hash entry to allow |
18 | | * nxt_poll_fd_hash_test() to not dereference a pointer to |
19 | | * nxt_fd_event_t which may be invalid if the file descriptor has |
20 | | * been already closed and the nxt_fd_event_t's memory has been freed. |
21 | | */ |
22 | | nxt_socket_t fd; |
23 | | |
24 | | uint32_t index; |
25 | | void *event; |
26 | | } nxt_poll_hash_entry_t; |
27 | | |
28 | | |
29 | | static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine, |
30 | | nxt_uint_t mchanges, nxt_uint_t mevents); |
31 | | static void nxt_poll_free(nxt_event_engine_t *engine); |
32 | | static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); |
33 | | static void nxt_poll_disable(nxt_event_engine_t *engine, |
34 | | nxt_fd_event_t *ev); |
35 | | static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine, |
36 | | nxt_fd_event_t *ev); |
37 | | static void nxt_poll_enable_read(nxt_event_engine_t *engine, |
38 | | nxt_fd_event_t *ev); |
39 | | static void nxt_poll_enable_write(nxt_event_engine_t *engine, |
40 | | nxt_fd_event_t *ev); |
41 | | static void nxt_poll_disable_read(nxt_event_engine_t *engine, |
42 | | nxt_fd_event_t *ev); |
43 | | static void nxt_poll_disable_write(nxt_event_engine_t *engine, |
44 | | nxt_fd_event_t *ev); |
45 | | static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); |
46 | | static void nxt_poll_block_write(nxt_event_engine_t *engine, |
47 | | nxt_fd_event_t *ev); |
48 | | static void nxt_poll_oneshot_read(nxt_event_engine_t *engine, |
49 | | nxt_fd_event_t *ev); |
50 | | static void nxt_poll_oneshot_write(nxt_event_engine_t *engine, |
51 | | nxt_fd_event_t *ev); |
52 | | static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, |
53 | | nxt_uint_t op, nxt_uint_t events); |
54 | | static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine); |
55 | | static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine, |
56 | | nxt_fd_event_t *ev, int events); |
57 | | static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine, |
58 | | nxt_fd_t fd, int events); |
59 | | static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd); |
60 | | static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); |
61 | | static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine, |
62 | | nxt_fd_t fd); |
63 | | static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data); |
64 | | static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, |
65 | | nxt_lvlhsh_t *lh); |
66 | | |
67 | | |
68 | | const nxt_event_interface_t nxt_poll_engine = { |
69 | | "poll", |
70 | | nxt_poll_create, |
71 | | nxt_poll_free, |
72 | | nxt_poll_enable, |
73 | | nxt_poll_disable, |
74 | | nxt_poll_disable, |
75 | | nxt_poll_close, |
76 | | nxt_poll_enable_read, |
77 | | nxt_poll_enable_write, |
78 | | nxt_poll_disable_read, |
79 | | nxt_poll_disable_write, |
80 | | nxt_poll_block_read, |
81 | | nxt_poll_block_write, |
82 | | nxt_poll_oneshot_read, |
83 | | nxt_poll_oneshot_write, |
84 | | nxt_poll_enable_read, |
85 | | NULL, |
86 | | NULL, |
87 | | NULL, |
88 | | NULL, |
89 | | nxt_poll, |
90 | | |
91 | | &nxt_unix_conn_io, |
92 | | |
93 | | NXT_NO_FILE_EVENTS, |
94 | | NXT_NO_SIGNAL_EVENTS, |
95 | | }; |
96 | | |
97 | | |
98 | | static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) = |
99 | | { |
100 | | NXT_LVLHSH_LARGE_MEMALIGN, |
101 | | nxt_poll_fd_hash_test, |
102 | | nxt_lvlhsh_alloc, |
103 | | nxt_lvlhsh_free, |
104 | | }; |
105 | | |
106 | | |
107 | | static nxt_int_t |
108 | | nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, |
109 | | nxt_uint_t mevents) |
110 | 0 | { |
111 | 0 | engine->u.poll.mchanges = mchanges; |
112 | |
|
113 | 0 | engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges); |
114 | |
|
115 | 0 | if (engine->u.poll.changes != NULL) { |
116 | 0 | return NXT_OK; |
117 | 0 | } |
118 | | |
119 | 0 | return NXT_ERROR; |
120 | 0 | } |
121 | | |
122 | | |
123 | | static void |
124 | | nxt_poll_free(nxt_event_engine_t *engine) |
125 | 0 | { |
126 | 0 | nxt_debug(&engine->task, "poll free"); |
127 | |
|
128 | 0 | nxt_free(engine->u.poll.set); |
129 | 0 | nxt_free(engine->u.poll.changes); |
130 | 0 | nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash); |
131 | |
|
132 | 0 | nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t)); |
133 | 0 | } |
134 | | |
135 | | |
136 | | static void |
137 | | nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
138 | 0 | { |
139 | 0 | ev->read = NXT_EVENT_ACTIVE; |
140 | 0 | ev->write = NXT_EVENT_ACTIVE; |
141 | |
|
142 | 0 | nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT); |
143 | 0 | } |
144 | | |
145 | | |
146 | | static void |
147 | | nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
148 | 0 | { |
149 | 0 | if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) { |
150 | 0 | ev->read = NXT_EVENT_INACTIVE; |
151 | 0 | ev->write = NXT_EVENT_INACTIVE; |
152 | |
|
153 | 0 | nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); |
154 | 0 | } |
155 | 0 | } |
156 | | |
157 | | |
158 | | static nxt_bool_t |
159 | | nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
160 | 0 | { |
161 | 0 | nxt_poll_disable(engine, ev); |
162 | |
|
163 | 0 | return ev->changing; |
164 | 0 | } |
165 | | |
166 | | |
167 | | static void |
168 | | nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
169 | 0 | { |
170 | 0 | nxt_uint_t op, events; |
171 | |
|
172 | 0 | ev->read = NXT_EVENT_ACTIVE; |
173 | |
|
174 | 0 | if (ev->write == NXT_EVENT_INACTIVE) { |
175 | 0 | op = NXT_POLL_ADD; |
176 | 0 | events = POLLIN; |
177 | |
|
178 | 0 | } else { |
179 | 0 | op = NXT_POLL_CHANGE; |
180 | 0 | events = POLLIN | POLLOUT; |
181 | 0 | } |
182 | |
|
183 | 0 | nxt_poll_change(engine, ev, op, events); |
184 | 0 | } |
185 | | |
186 | | |
187 | | static void |
188 | | nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
189 | 0 | { |
190 | 0 | nxt_uint_t op, events; |
191 | |
|
192 | 0 | ev->write = NXT_EVENT_ACTIVE; |
193 | |
|
194 | 0 | if (ev->read == NXT_EVENT_INACTIVE) { |
195 | 0 | op = NXT_POLL_ADD; |
196 | 0 | events = POLLOUT; |
197 | |
|
198 | 0 | } else { |
199 | 0 | op = NXT_POLL_CHANGE; |
200 | 0 | events = POLLIN | POLLOUT; |
201 | 0 | } |
202 | |
|
203 | 0 | nxt_poll_change(engine, ev, op, events); |
204 | 0 | } |
205 | | |
206 | | |
207 | | static void |
208 | | nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
209 | 0 | { |
210 | 0 | nxt_uint_t op, events; |
211 | |
|
212 | 0 | ev->read = NXT_EVENT_INACTIVE; |
213 | |
|
214 | 0 | if (ev->write == NXT_EVENT_INACTIVE) { |
215 | 0 | op = NXT_POLL_DELETE; |
216 | 0 | events = 0; |
217 | |
|
218 | 0 | } else { |
219 | 0 | op = NXT_POLL_CHANGE; |
220 | 0 | events = POLLOUT; |
221 | 0 | } |
222 | |
|
223 | 0 | nxt_poll_change(engine, ev, op, events); |
224 | 0 | } |
225 | | |
226 | | |
227 | | static void |
228 | | nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
229 | 0 | { |
230 | 0 | nxt_uint_t op, events; |
231 | |
|
232 | 0 | ev->write = NXT_EVENT_INACTIVE; |
233 | |
|
234 | 0 | if (ev->read == NXT_EVENT_INACTIVE) { |
235 | 0 | op = NXT_POLL_DELETE; |
236 | 0 | events = 0; |
237 | |
|
238 | 0 | } else { |
239 | 0 | op = NXT_POLL_CHANGE; |
240 | 0 | events = POLLIN; |
241 | 0 | } |
242 | |
|
243 | 0 | nxt_poll_change(engine, ev, op, events); |
244 | 0 | } |
245 | | |
246 | | |
247 | | static void |
248 | | nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
249 | 0 | { |
250 | 0 | if (ev->read != NXT_EVENT_INACTIVE) { |
251 | 0 | nxt_poll_disable_read(engine, ev); |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | | |
256 | | static void |
257 | | nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
258 | 0 | { |
259 | 0 | if (ev->write != NXT_EVENT_INACTIVE) { |
260 | 0 | nxt_poll_disable_write(engine, ev); |
261 | 0 | } |
262 | 0 | } |
263 | | |
264 | | |
265 | | static void |
266 | | nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
267 | 0 | { |
268 | 0 | nxt_uint_t op; |
269 | |
|
270 | 0 | op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? |
271 | 0 | NXT_POLL_ADD : NXT_POLL_CHANGE; |
272 | |
|
273 | 0 | ev->read = NXT_EVENT_ONESHOT; |
274 | 0 | ev->write = NXT_EVENT_INACTIVE; |
275 | |
|
276 | 0 | nxt_poll_change(engine, ev, op, POLLIN); |
277 | 0 | } |
278 | | |
279 | | |
280 | | static void |
281 | | nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) |
282 | 0 | { |
283 | 0 | nxt_uint_t op; |
284 | |
|
285 | 0 | op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? |
286 | 0 | NXT_POLL_ADD : NXT_POLL_CHANGE; |
287 | |
|
288 | 0 | ev->read = NXT_EVENT_INACTIVE; |
289 | 0 | ev->write = NXT_EVENT_ONESHOT; |
290 | |
|
291 | 0 | nxt_poll_change(engine, ev, op, POLLOUT); |
292 | 0 | } |
293 | | |
294 | | |
295 | | /* |
296 | | * poll changes are batched to improve instruction and data cache |
297 | | * locality of several lvlhsh operations followed by poll() call. |
298 | | */ |
299 | | |
300 | | static void |
301 | | nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op, |
302 | | nxt_uint_t events) |
303 | 0 | { |
304 | 0 | nxt_poll_change_t *change; |
305 | |
|
306 | 0 | nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events); |
307 | |
|
308 | 0 | if (engine->u.poll.nchanges >= engine->u.poll.mchanges) { |
309 | 0 | (void) nxt_poll_commit_changes(engine); |
310 | 0 | } |
311 | |
|
312 | 0 | ev->changing = 1; |
313 | |
|
314 | 0 | change = &engine->u.poll.changes[engine->u.poll.nchanges++]; |
315 | 0 | change->op = op; |
316 | 0 | change->events = events; |
317 | 0 | change->event = ev; |
318 | 0 | } |
319 | | |
320 | | |
321 | | static nxt_int_t |
322 | | nxt_poll_commit_changes(nxt_event_engine_t *engine) |
323 | 0 | { |
324 | 0 | nxt_int_t ret, retval; |
325 | 0 | nxt_fd_event_t *ev; |
326 | 0 | nxt_poll_change_t *change, *end; |
327 | |
|
328 | 0 | nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges); |
329 | |
|
330 | 0 | retval = NXT_OK; |
331 | 0 | change = engine->u.poll.changes; |
332 | 0 | end = change + engine->u.poll.nchanges; |
333 | |
|
334 | 0 | do { |
335 | 0 | ev = change->event; |
336 | 0 | ev->changing = 0; |
337 | |
|
338 | 0 | switch (change->op) { |
339 | | |
340 | 0 | case NXT_POLL_ADD: |
341 | 0 | ret = nxt_poll_set_add(engine, ev, change->events); |
342 | |
|
343 | 0 | if (nxt_fast_path(ret == NXT_OK)) { |
344 | 0 | goto next; |
345 | 0 | } |
346 | | |
347 | 0 | break; |
348 | | |
349 | 0 | case NXT_POLL_CHANGE: |
350 | 0 | ret = nxt_poll_set_change(engine, ev->fd, change->events); |
351 | |
|
352 | 0 | if (nxt_fast_path(ret == NXT_OK)) { |
353 | 0 | goto next; |
354 | 0 | } |
355 | | |
356 | 0 | break; |
357 | | |
358 | 0 | case NXT_POLL_DELETE: |
359 | 0 | ret = nxt_poll_set_delete(engine, ev->fd); |
360 | |
|
361 | 0 | if (nxt_fast_path(ret == NXT_OK)) { |
362 | 0 | goto next; |
363 | 0 | } |
364 | | |
365 | 0 | break; |
366 | 0 | } |
367 | | |
368 | 0 | nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, |
369 | 0 | ev->task, ev, ev->data); |
370 | |
|
371 | 0 | retval = NXT_ERROR; |
372 | |
|
373 | 0 | next: |
374 | |
|
375 | 0 | change++; |
376 | |
|
377 | 0 | } while (change < end); |
378 | | |
379 | 0 | engine->u.poll.nchanges = 0; |
380 | |
|
381 | 0 | return retval; |
382 | 0 | } |
383 | | |
384 | | |
385 | | static nxt_int_t |
386 | | nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events) |
387 | 0 | { |
388 | 0 | nxt_int_t ret; |
389 | 0 | nxt_uint_t max_nfds; |
390 | 0 | struct pollfd *pfd; |
391 | 0 | nxt_lvlhsh_query_t lhq; |
392 | 0 | nxt_poll_hash_entry_t *phe; |
393 | |
|
394 | 0 | nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events); |
395 | |
|
396 | 0 | if (engine->u.poll.nfds >= engine->u.poll.max_nfds) { |
397 | 0 | max_nfds = engine->u.poll.max_nfds + 512; /* 4K */ |
398 | |
|
399 | 0 | pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds); |
400 | 0 | if (nxt_slow_path(pfd == NULL)) { |
401 | 0 | return NXT_ERROR; |
402 | 0 | } |
403 | | |
404 | 0 | engine->u.poll.set = pfd; |
405 | 0 | engine->u.poll.max_nfds = max_nfds; |
406 | 0 | } |
407 | | |
408 | 0 | phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t)); |
409 | 0 | if (nxt_slow_path(phe == NULL)) { |
410 | 0 | return NXT_ERROR; |
411 | 0 | } |
412 | | |
413 | 0 | phe->fd = ev->fd; |
414 | 0 | phe->index = engine->u.poll.nfds; |
415 | 0 | phe->event = ev; |
416 | |
|
417 | 0 | pfd = &engine->u.poll.set[engine->u.poll.nfds++]; |
418 | 0 | pfd->fd = ev->fd; |
419 | 0 | pfd->events = events; |
420 | 0 | pfd->revents = 0; |
421 | |
|
422 | 0 | lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t)); |
423 | 0 | lhq.replace = 0; |
424 | 0 | lhq.value = phe; |
425 | 0 | lhq.proto = &nxt_poll_fd_hash_proto; |
426 | 0 | lhq.data = engine; |
427 | |
|
428 | 0 | ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq); |
429 | |
|
430 | 0 | if (nxt_fast_path(ret == NXT_OK)) { |
431 | 0 | return NXT_OK; |
432 | 0 | } |
433 | | |
434 | 0 | nxt_free(phe); |
435 | |
|
436 | 0 | return NXT_ERROR; |
437 | 0 | } |
438 | | |
439 | | |
440 | | static nxt_int_t |
441 | | nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events) |
442 | 0 | { |
443 | 0 | nxt_poll_hash_entry_t *phe; |
444 | |
|
445 | 0 | nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi", |
446 | 0 | fd, events); |
447 | |
|
448 | 0 | phe = nxt_poll_fd_hash_get(engine, fd); |
449 | |
|
450 | 0 | if (nxt_fast_path(phe != NULL)) { |
451 | 0 | engine->u.poll.set[phe->index].events = events; |
452 | 0 | return NXT_OK; |
453 | 0 | } |
454 | | |
455 | 0 | return NXT_ERROR; |
456 | 0 | } |
457 | | |
458 | | |
459 | | static nxt_int_t |
460 | | nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd) |
461 | 0 | { |
462 | 0 | nxt_int_t ret; |
463 | 0 | nxt_uint_t index, nfds; |
464 | 0 | nxt_lvlhsh_query_t lhq; |
465 | 0 | nxt_poll_hash_entry_t *phe; |
466 | |
|
467 | 0 | nxt_debug(&engine->task, "poll delete event: fd:%d", fd); |
468 | |
|
469 | 0 | lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); |
470 | 0 | lhq.proto = &nxt_poll_fd_hash_proto; |
471 | 0 | lhq.data = engine; |
472 | |
|
473 | 0 | ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq); |
474 | |
|
475 | 0 | if (nxt_slow_path(ret != NXT_OK)) { |
476 | 0 | return NXT_ERROR; |
477 | 0 | } |
478 | | |
479 | 0 | phe = lhq.value; |
480 | |
|
481 | 0 | index = phe->index; |
482 | 0 | engine->u.poll.nfds--; |
483 | 0 | nfds = engine->u.poll.nfds; |
484 | |
|
485 | 0 | if (index != nfds) { |
486 | 0 | engine->u.poll.set[index] = engine->u.poll.set[nfds]; |
487 | |
|
488 | 0 | phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd); |
489 | |
|
490 | 0 | phe->index = index; |
491 | 0 | } |
492 | |
|
493 | 0 | nxt_free(lhq.value); |
494 | |
|
495 | 0 | return NXT_OK; |
496 | 0 | } |
497 | | |
498 | | |
499 | | static void |
500 | | nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) |
501 | 0 | { |
502 | 0 | int nevents; |
503 | 0 | nxt_fd_t fd; |
504 | 0 | nxt_err_t err; |
505 | 0 | nxt_bool_t error; |
506 | 0 | nxt_uint_t i, events, level; |
507 | 0 | struct pollfd *pfd; |
508 | 0 | nxt_fd_event_t *ev; |
509 | 0 | nxt_poll_hash_entry_t *phe; |
510 | |
|
511 | 0 | if (engine->u.poll.nchanges != 0) { |
512 | 0 | if (nxt_poll_commit_changes(engine) != NXT_OK) { |
513 | | /* Error handlers have been enqueued on failure. */ |
514 | 0 | timeout = 0; |
515 | 0 | } |
516 | 0 | } |
517 | |
|
518 | 0 | nxt_debug(&engine->task, "poll() events:%ui timeout:%M", |
519 | 0 | engine->u.poll.nfds, timeout); |
520 | |
|
521 | 0 | nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout); |
522 | |
|
523 | 0 | err = (nevents == -1) ? nxt_errno : 0; |
524 | |
|
525 | 0 | nxt_thread_time_update(engine->task.thread); |
526 | |
|
527 | 0 | nxt_debug(&engine->task, "poll(): %d", nevents); |
528 | |
|
529 | 0 | if (nevents == -1) { |
530 | 0 | level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; |
531 | 0 | nxt_log(&engine->task, level, "poll() failed %E", err); |
532 | 0 | return; |
533 | 0 | } |
534 | | |
535 | 0 | for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) { |
536 | |
|
537 | 0 | pfd = &engine->u.poll.set[i]; |
538 | 0 | events = pfd->revents; |
539 | |
|
540 | 0 | if (events == 0) { |
541 | 0 | continue; |
542 | 0 | } |
543 | | |
544 | 0 | fd = pfd->fd; |
545 | |
|
546 | 0 | phe = nxt_poll_fd_hash_get(engine, fd); |
547 | |
|
548 | 0 | if (nxt_slow_path(phe == NULL)) { |
549 | 0 | nxt_alert(&engine->task, |
550 | 0 | "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi", |
551 | 0 | fd, pfd->events, events); |
552 | | |
553 | | /* Mark the poll entry to ignore it by the kernel. */ |
554 | 0 | pfd->fd = -1; |
555 | 0 | goto next; |
556 | 0 | } |
557 | | |
558 | 0 | ev = phe->event; |
559 | |
|
560 | 0 | nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d wr:%d", |
561 | 0 | fd, events, ev->read, ev->write); |
562 | |
|
563 | 0 | if (nxt_slow_path((events & POLLNVAL) != 0)) { |
564 | 0 | nxt_alert(ev->task, "poll() error fd:%d ev:%04Xd rev:%04uXi", |
565 | 0 | fd, pfd->events, events); |
566 | | |
567 | | /* Mark the poll entry to ignore it by the kernel. */ |
568 | 0 | pfd->fd = -1; |
569 | |
|
570 | 0 | nxt_work_queue_add(&engine->fast_work_queue, |
571 | 0 | ev->error_handler, ev->task, ev, ev->data); |
572 | 0 | goto next; |
573 | 0 | } |
574 | | |
575 | | /* |
576 | | * On a socket's remote end close: |
577 | | * |
578 | | * Linux, FreeBSD, and Solaris set POLLIN; |
579 | | * MacOSX sets POLLIN and POLLHUP; |
580 | | * NetBSD sets POLLIN, and poll(2) claims this explicitly: |
581 | | * |
582 | | * If the remote end of a socket is closed, poll() |
583 | | * returns a POLLIN event, rather than a POLLHUP. |
584 | | * |
585 | | * On error: |
586 | | * |
587 | | * Linux sets POLLHUP and POLLERR only; |
588 | | * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2) |
589 | | * claims the opposite: |
590 | | * |
591 | | * Note that POLLHUP and POLLOUT should never be |
592 | | * present in the revents bitmask at the same time. |
593 | | * |
594 | | * Solaris and NetBSD do not add POLLHUP or POLLERR; |
595 | | * MacOSX sets POLLHUP only. |
596 | | * |
597 | | * If an implementation sets POLLERR or POLLHUP only without POLLIN |
598 | | * or POLLOUT, the "error" variable enqueues only one active handler. |
599 | | */ |
600 | | |
601 | 0 | error = (((events & (POLLERR | POLLHUP)) != 0) |
602 | 0 | && ((events & (POLLIN | POLLOUT)) == 0)); |
603 | |
|
604 | 0 | if ((events & POLLIN) || (error && ev->read_handler != NULL)) { |
605 | 0 | error = 0; |
606 | 0 | ev->read_ready = 1; |
607 | |
|
608 | 0 | if (ev->read == NXT_EVENT_ONESHOT) { |
609 | 0 | ev->read = NXT_EVENT_INACTIVE; |
610 | 0 | nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); |
611 | 0 | } |
612 | |
|
613 | 0 | nxt_work_queue_add(ev->read_work_queue, ev->read_handler, |
614 | 0 | ev->task, ev, ev->data); |
615 | 0 | } |
616 | |
|
617 | 0 | if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { |
618 | 0 | ev->write_ready = 1; |
619 | |
|
620 | 0 | if (ev->write == NXT_EVENT_ONESHOT) { |
621 | 0 | ev->write = NXT_EVENT_INACTIVE; |
622 | 0 | nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); |
623 | 0 | } |
624 | |
|
625 | 0 | nxt_work_queue_add(ev->write_work_queue, ev->write_handler, |
626 | 0 | ev->task, ev, ev->data); |
627 | 0 | } |
628 | |
|
629 | 0 | next: |
630 | |
|
631 | 0 | nevents--; |
632 | 0 | } |
633 | 0 | } |
634 | | |
635 | | |
636 | | static nxt_poll_hash_entry_t * |
637 | | nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd) |
638 | 0 | { |
639 | 0 | nxt_lvlhsh_query_t lhq; |
640 | 0 | nxt_poll_hash_entry_t *phe; |
641 | |
|
642 | 0 | lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); |
643 | 0 | lhq.proto = &nxt_poll_fd_hash_proto; |
644 | 0 | lhq.data = engine; |
645 | |
|
646 | 0 | if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) { |
647 | 0 | phe = lhq.value; |
648 | 0 | return phe; |
649 | 0 | } |
650 | | |
651 | 0 | nxt_alert(&engine->task, "fd %d not found in hash", fd); |
652 | |
|
653 | 0 | return NULL; |
654 | 0 | } |
655 | | |
656 | | |
657 | | static nxt_int_t |
658 | | nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data) |
659 | 0 | { |
660 | 0 | nxt_event_engine_t *engine; |
661 | 0 | nxt_poll_hash_entry_t *phe; |
662 | |
|
663 | 0 | phe = data; |
664 | | |
665 | | /* nxt_murmur_hash2() is unique for 4 bytes. */ |
666 | |
|
667 | 0 | engine = lhq->data; |
668 | |
|
669 | 0 | if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) { |
670 | 0 | return NXT_OK; |
671 | 0 | } |
672 | | |
673 | 0 | nxt_alert(&engine->task, "fd %d in hash mismatches fd %d in poll set", |
674 | 0 | phe->fd, engine->u.poll.set[phe->index].fd); |
675 | |
|
676 | 0 | return NXT_DECLINED; |
677 | 0 | } |
678 | | |
679 | | |
680 | | static void |
681 | | nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh) |
682 | 0 | { |
683 | 0 | nxt_poll_hash_entry_t *phe; |
684 | |
|
685 | 0 | for ( ;; ) { |
686 | 0 | phe = nxt_lvlhsh_retrieve(lh, &nxt_poll_fd_hash_proto, NULL); |
687 | |
|
688 | 0 | if (phe == NULL) { |
689 | 0 | return; |
690 | 0 | } |
691 | | |
692 | 0 | nxt_free(phe); |
693 | 0 | } |
694 | 0 | } |