/src/uWebSockets/uSockets/src/eventing/epoll_kqueue.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Authored by Alex Hultman, 2018-2019. |
3 | | * Intellectual property of third-party. |
4 | | |
5 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | | * you may not use this file except in compliance with the License. |
7 | | * You may obtain a copy of the License at |
8 | | |
9 | | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | | |
11 | | * Unless required by applicable law or agreed to in writing, software |
12 | | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | | * See the License for the specific language governing permissions and |
15 | | * limitations under the License. |
16 | | */ |
17 | | |
18 | | #include "libusockets.h" |
19 | | #include "internal/internal.h" |
20 | | #include <stdlib.h> |
21 | | |
22 | | #if defined(LIBUS_USE_EPOLL) || defined(LIBUS_USE_KQUEUE) |
23 | | |
24 | | /* Cannot include this one on Windows */ |
25 | | #include <unistd.h> |
26 | | |
27 | | #ifdef LIBUS_USE_EPOLL |
28 | 10.4M | #define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].data.ptr |
29 | 1.47M | #define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].data.ptr = poll |
30 | | #else |
31 | | #define GET_READY_POLL(loop, index) (struct us_poll_t *) loop->ready_polls[index].udata |
32 | | #define SET_READY_POLL(loop, index, poll) loop->ready_polls[index].udata = poll |
33 | | #endif |
34 | | |
35 | | /* Loop */ |
36 | 5.52k | void us_loop_free(struct us_loop_t *loop) { |
37 | 5.52k | us_internal_loop_data_free(loop); |
38 | 5.52k | close(loop->fd); |
39 | 5.52k | free(loop); |
40 | 5.52k | } |
41 | | |
42 | | /* Poll */ |
43 | 1.52M | struct us_poll_t *us_create_poll(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { |
44 | 1.52M | if (!fallthrough) { |
45 | 1.51M | loop->num_polls++; |
46 | 1.51M | } |
47 | 1.52M | return malloc(sizeof(struct us_poll_t) + ext_size); |
48 | 1.52M | } |
49 | | |
50 | | /* Todo: this one should be us_internal_poll_free */ |
51 | 1.52M | void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) { |
52 | 1.52M | loop->num_polls--; |
53 | 1.52M | free(p); |
54 | 1.52M | } |
55 | | |
56 | 0 | void *us_poll_ext(struct us_poll_t *p) { |
57 | 0 | return p + 1; |
58 | 0 | } |
59 | | |
60 | | /* Todo: why have us_poll_create AND us_poll_init!? libuv legacy! */ |
61 | 1.52M | void us_poll_init(struct us_poll_t *p, LIBUS_SOCKET_DESCRIPTOR fd, int poll_type) { |
62 | 1.52M | p->state.fd = fd; |
63 | 1.52M | p->state.poll_type = poll_type; |
64 | 1.52M | } |
65 | | |
66 | 13.4M | int us_poll_events(struct us_poll_t *p) { |
67 | 13.4M | return ((p->state.poll_type & POLL_TYPE_POLLING_IN) ? LIBUS_SOCKET_READABLE : 0) | ((p->state.poll_type & POLL_TYPE_POLLING_OUT) ? LIBUS_SOCKET_WRITABLE : 0); |
68 | 13.4M | } |
69 | | |
70 | 11.5M | LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) { |
71 | 11.5M | return p->state.fd; |
72 | 11.5M | } |
73 | | |
74 | | /* Returns any of listen socket, socket, shut down socket or callback */ |
75 | 12.3M | int us_internal_poll_type(struct us_poll_t *p) { |
76 | 12.3M | return p->state.poll_type & 3; |
77 | 12.3M | } |
78 | | |
79 | | /* Bug: doesn't really SET, rather read and change, so needs to be inited first! */ |
80 | 234k | void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) { |
81 | 234k | p->state.poll_type = poll_type | (p->state.poll_type & 12); |
82 | 234k | } |
83 | | |
84 | | /* Timer */ |
85 | 1.82M | void *us_timer_ext(struct us_timer_t *timer) { |
86 | 1.82M | return ((struct us_internal_callback_t *) timer) + 1; |
87 | 1.82M | } |
88 | | |
89 | 0 | struct us_loop_t *us_timer_loop(struct us_timer_t *t) { |
90 | 0 | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t; |
91 | |
|
92 | 0 | return internal_cb->loop; |
93 | 0 | } |
94 | | |
95 | | /* Loop */ |
96 | 5.52k | struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) { |
97 | 5.52k | struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size); |
98 | 5.52k | loop->num_polls = 0; |
99 | | /* These could be accessed if we close a poll before starting the loop */ |
100 | 5.52k | loop->num_ready_polls = 0; |
101 | 5.52k | loop->current_ready_poll = 0; |
102 | | |
103 | 5.52k | #ifdef LIBUS_USE_EPOLL |
104 | 5.52k | loop->fd = epoll_create1(EPOLL_CLOEXEC); |
105 | | #else |
106 | | loop->fd = kqueue(); |
107 | | #endif |
108 | | |
109 | 5.52k | us_internal_loop_data_init(loop, wakeup_cb, pre_cb, post_cb); |
110 | 5.52k | return loop; |
111 | 5.52k | } |
112 | | |
113 | 5.52k | void us_loop_run(struct us_loop_t *loop) { |
114 | 5.52k | us_loop_integrate(loop); |
115 | | |
116 | | /* While we have non-fallthrough polls we shouldn't fall through */ |
117 | 2.41M | while (loop->num_polls) { |
118 | | /* Emit pre callback */ |
119 | 2.40M | us_internal_loop_pre(loop); |
120 | | |
121 | | /* Fetch ready polls */ |
122 | 2.40M | #ifdef LIBUS_USE_EPOLL |
123 | 2.40M | loop->num_ready_polls = epoll_wait(loop->fd, loop->ready_polls, 1024, -1); |
124 | | #else |
125 | | loop->num_ready_polls = kevent(loop->fd, NULL, 0, loop->ready_polls, 1024, NULL); |
126 | | #endif |
127 | | |
128 | | /* Iterate ready polls, dispatching them by type */ |
129 | 11.3M | for (loop->current_ready_poll = 0; loop->current_ready_poll < loop->num_ready_polls; loop->current_ready_poll++) { |
130 | 8.91M | struct us_poll_t *poll = GET_READY_POLL(loop, loop->current_ready_poll); |
131 | | /* Any ready poll marked with nullptr will be ignored */ |
132 | 8.91M | if (poll) { |
133 | 8.91M | #ifdef LIBUS_USE_EPOLL |
134 | 8.91M | int events = loop->ready_polls[loop->current_ready_poll].events; |
135 | 8.91M | int error = loop->ready_polls[loop->current_ready_poll].events & (EPOLLERR | EPOLLHUP); |
136 | | #else |
137 | | /* EVFILT_READ, EVFILT_TIME, EVFILT_USER are all mapped to LIBUS_SOCKET_READABLE */ |
138 | | int events = LIBUS_SOCKET_READABLE; |
139 | | if (loop->ready_polls[loop->current_ready_poll].filter == EVFILT_WRITE) { |
140 | | events = LIBUS_SOCKET_WRITABLE; |
141 | | } |
142 | | int error = loop->ready_polls[loop->current_ready_poll].flags & (EV_ERROR | EV_EOF); |
143 | | #endif |
144 | | /* Always filter all polls by what they actually poll for (callback polls always poll for readable) */ |
145 | 8.91M | events &= us_poll_events(poll); |
146 | 8.91M | if (events || error) { |
147 | 8.91M | us_internal_dispatch_ready_poll(poll, error, events); |
148 | 8.91M | } |
149 | 8.91M | } |
150 | 8.91M | } |
151 | | /* Emit post callback */ |
152 | 2.40M | us_internal_loop_post(loop); |
153 | 2.40M | } |
154 | 5.52k | } |
155 | | |
156 | 1.52M | void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop, struct us_poll_t *old_poll, struct us_poll_t *new_poll, int old_events, int new_events) { |
157 | 1.52M | #ifdef LIBUS_USE_EPOLL |
158 | | /* Epoll only has one ready poll per poll */ |
159 | 1.52M | int num_entries_possibly_remaining = 1; |
160 | | #else |
161 | | /* Ready polls may contain same poll twice under kqueue, as one poll may hold two filters */ |
162 | | int num_entries_possibly_remaining = 2;//((old_events & LIBUS_SOCKET_READABLE) ? 1 : 0) + ((old_events & LIBUS_SOCKET_WRITABLE) ? 1 : 0); |
163 | | #endif |
164 | | |
165 | | /* Todo: for kqueue if we track things in us_change_poll it is possible to have a fast path with no seeking in cases of: |
166 | | * current poll being us AND we only poll for one thing */ |
167 | | |
168 | 3.10M | for (int i = loop->current_ready_poll; i < loop->num_ready_polls && num_entries_possibly_remaining; i++) { |
169 | 1.57M | if (GET_READY_POLL(loop, i) == old_poll) { |
170 | | |
171 | | // if new events does not contain the ready events of this poll then remove (no we filter that out later on) |
172 | 1.47M | SET_READY_POLL(loop, i, new_poll); |
173 | | |
174 | 1.47M | num_entries_possibly_remaining--; |
175 | 1.47M | } |
176 | 1.57M | } |
177 | 1.52M | } |
178 | | |
179 | | /* Poll */ |
180 | | |
181 | | #ifdef LIBUS_USE_KQUEUE |
182 | | /* Helper function for setting or updating EVFILT_READ and EVFILT_WRITE */ |
183 | | int kqueue_change(int kqfd, int fd, int old_events, int new_events, void *user_data) { |
184 | | struct kevent change_list[2]; |
185 | | int change_length = 0; |
186 | | |
187 | | /* Do they differ in readable? */ |
188 | | if ((new_events & LIBUS_SOCKET_READABLE) != (old_events & LIBUS_SOCKET_READABLE)) { |
189 | | EV_SET(&change_list[change_length++], fd, EVFILT_READ, (new_events & LIBUS_SOCKET_READABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data); |
190 | | } |
191 | | |
192 | | /* Do they differ in writable? */ |
193 | | if ((new_events & LIBUS_SOCKET_WRITABLE) != (old_events & LIBUS_SOCKET_WRITABLE)) { |
194 | | EV_SET(&change_list[change_length++], fd, EVFILT_WRITE, (new_events & LIBUS_SOCKET_WRITABLE) ? EV_ADD : EV_DELETE, 0, 0, user_data); |
195 | | } |
196 | | |
197 | | int ret = kevent(kqfd, change_list, change_length, NULL, 0, NULL); |
198 | | |
199 | | // ret should be 0 in most cases (not guaranteed when removing async) |
200 | | |
201 | | return ret; |
202 | | } |
203 | | #endif |
204 | | |
205 | 108k | struct us_poll_t *us_poll_resize(struct us_poll_t *p, struct us_loop_t *loop, unsigned int ext_size) { |
206 | 108k | int events = us_poll_events(p); |
207 | | |
208 | 108k | struct us_poll_t *new_p = realloc(p, sizeof(struct us_poll_t) + ext_size); |
209 | 108k | if (p != new_p && events) { |
210 | 0 | #ifdef LIBUS_USE_EPOLL |
211 | | /* Hack: forcefully update poll by stripping away already set events */ |
212 | 0 | new_p->state.poll_type = us_internal_poll_type(new_p); |
213 | 0 | us_poll_change(new_p, loop, events); |
214 | | #else |
215 | | /* Forcefully update poll by resetting them with new_p as user data */ |
216 | | kqueue_change(loop->fd, new_p->state.fd, 0, events, new_p); |
217 | | #endif |
218 | | |
219 | | /* This is needed for epoll also (us_change_poll doesn't update the old poll) */ |
220 | 0 | us_internal_loop_update_pending_ready_polls(loop, p, new_p, events, events); |
221 | 0 | } |
222 | | |
223 | 108k | return new_p; |
224 | 108k | } |
225 | | |
226 | 1.52M | void us_poll_start(struct us_poll_t *p, struct us_loop_t *loop, int events) { |
227 | 1.52M | p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0); |
228 | | |
229 | 1.52M | #ifdef LIBUS_USE_EPOLL |
230 | 1.52M | struct epoll_event event; |
231 | 1.52M | event.events = events; |
232 | 1.52M | event.data.ptr = p; |
233 | 1.52M | epoll_ctl(loop->fd, EPOLL_CTL_ADD, p->state.fd, &event); |
234 | | #else |
235 | | kqueue_change(loop->fd, p->state.fd, 0, events, p); |
236 | | #endif |
237 | 1.52M | } |
238 | | |
239 | 794k | void us_poll_change(struct us_poll_t *p, struct us_loop_t *loop, int events) { |
240 | 794k | int old_events = us_poll_events(p); |
241 | 794k | if (old_events != events) { |
242 | | |
243 | 659k | p->state.poll_type = us_internal_poll_type(p) | ((events & LIBUS_SOCKET_READABLE) ? POLL_TYPE_POLLING_IN : 0) | ((events & LIBUS_SOCKET_WRITABLE) ? POLL_TYPE_POLLING_OUT : 0); |
244 | | |
245 | 659k | #ifdef LIBUS_USE_EPOLL |
246 | 659k | struct epoll_event event; |
247 | 659k | event.events = events; |
248 | 659k | event.data.ptr = p; |
249 | 659k | epoll_ctl(loop->fd, EPOLL_CTL_MOD, p->state.fd, &event); |
250 | | #else |
251 | | kqueue_change(loop->fd, p->state.fd, old_events, events, p); |
252 | | #endif |
253 | | /* Set all removed events to null-polls in pending ready poll list */ |
254 | | //us_internal_loop_update_pending_ready_polls(loop, p, p, old_events, events); |
255 | 659k | } |
256 | 794k | } |
257 | | |
258 | 1.52M | void us_poll_stop(struct us_poll_t *p, struct us_loop_t *loop) { |
259 | 1.52M | int old_events = us_poll_events(p); |
260 | 1.52M | int new_events = 0; |
261 | 1.52M | #ifdef LIBUS_USE_EPOLL |
262 | 1.52M | struct epoll_event event; |
263 | 1.52M | epoll_ctl(loop->fd, EPOLL_CTL_DEL, p->state.fd, &event); |
264 | | #else |
265 | | if (old_events) { |
266 | | kqueue_change(loop->fd, p->state.fd, old_events, new_events, NULL); |
267 | | } |
268 | | #endif |
269 | | |
270 | | /* Disable any instance of us in the pending ready poll list */ |
271 | 1.52M | us_internal_loop_update_pending_ready_polls(loop, p, 0, old_events, new_events); |
272 | 1.52M | } |
273 | | |
274 | 5.49M | unsigned int us_internal_accept_poll_event(struct us_poll_t *p) { |
275 | 5.49M | #ifdef LIBUS_USE_EPOLL |
276 | 5.49M | int fd = us_poll_fd(p); |
277 | 5.49M | uint64_t buf; |
278 | 5.49M | int read_length = read(fd, &buf, 8); |
279 | 5.49M | (void)read_length; |
280 | 5.49M | return buf; |
281 | | #else |
282 | | /* Kqueue has no underlying FD for timers or user events */ |
283 | | return 0; |
284 | | #endif |
285 | 5.49M | } |
286 | | |
287 | | /* Timer */ |
288 | | #ifdef LIBUS_USE_EPOLL |
289 | 11.0k | struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { |
290 | 11.0k | struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size); |
291 | 11.0k | int timerfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC); |
292 | 11.0k | if (timerfd == -1) { |
293 | 0 | return NULL; |
294 | 0 | } |
295 | 11.0k | us_poll_init(p, timerfd, POLL_TYPE_CALLBACK); |
296 | | |
297 | 11.0k | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p; |
298 | 11.0k | cb->loop = loop; |
299 | 11.0k | cb->cb_expects_the_loop = 0; |
300 | 11.0k | cb->leave_poll_ready = 0; |
301 | | |
302 | 11.0k | return (struct us_timer_t *) cb; |
303 | 11.0k | } |
304 | | #else |
305 | | struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { |
306 | | struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size); |
307 | | |
308 | | cb->loop = loop; |
309 | | cb->cb_expects_the_loop = 0; |
310 | | cb->leave_poll_ready = 0; |
311 | | |
312 | | /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */ |
313 | | cb->p.state.poll_type = POLL_TYPE_POLLING_IN; |
314 | | us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK); |
315 | | |
316 | | if (!fallthrough) { |
317 | | loop->num_polls++; |
318 | | } |
319 | | |
320 | | return (struct us_timer_t *) cb; |
321 | | } |
322 | | #endif |
323 | | |
324 | | #ifdef LIBUS_USE_EPOLL |
325 | 11.0k | void us_timer_close(struct us_timer_t *timer) { |
326 | 11.0k | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer; |
327 | | |
328 | 11.0k | us_poll_stop(&cb->p, cb->loop); |
329 | 11.0k | close(us_poll_fd(&cb->p)); |
330 | | |
331 | | /* (regular) sockets are the only polls which are not freed immediately */ |
332 | 11.0k | us_poll_free((struct us_poll_t *) timer, cb->loop); |
333 | 11.0k | } |
334 | | |
335 | 11.0k | void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) { |
336 | 11.0k | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t; |
337 | | |
338 | 11.0k | internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb; |
339 | | |
340 | 11.0k | struct itimerspec timer_spec = { |
341 | 11.0k | {repeat_ms / 1000, (long) (repeat_ms % 1000) * (long) 1000000}, |
342 | 11.0k | {ms / 1000, (long) (ms % 1000) * (long) 1000000} |
343 | 11.0k | }; |
344 | | |
345 | 11.0k | timerfd_settime(us_poll_fd((struct us_poll_t *) t), 0, &timer_spec, NULL); |
346 | 11.0k | us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE); |
347 | 11.0k | } |
348 | | #else |
349 | | void us_timer_close(struct us_timer_t *timer) { |
350 | | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer; |
351 | | |
352 | | struct kevent event; |
353 | | EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, internal_cb); |
354 | | kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL); |
355 | | |
356 | | /* (regular) sockets are the only polls which are not freed immediately */ |
357 | | us_poll_free((struct us_poll_t *) timer, internal_cb->loop); |
358 | | } |
359 | | |
360 | | void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) { |
361 | | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) t; |
362 | | |
363 | | internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb; |
364 | | |
365 | | /* Bug: repeat_ms must be the same as ms, or 0 */ |
366 | | struct kevent event; |
367 | | EV_SET(&event, (uintptr_t) internal_cb, EVFILT_TIMER, EV_ADD | (repeat_ms ? 0 : EV_ONESHOT), 0, ms, internal_cb); |
368 | | kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL); |
369 | | } |
370 | | #endif |
371 | | |
372 | | /* Async (internal helper for loop's wakeup feature) */ |
373 | | #ifdef LIBUS_USE_EPOLL |
374 | 5.52k | struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { |
375 | 5.52k | struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size); |
376 | 5.52k | us_poll_init(p, eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC), POLL_TYPE_CALLBACK); |
377 | | |
378 | 5.52k | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p; |
379 | 5.52k | cb->loop = loop; |
380 | 5.52k | cb->cb_expects_the_loop = 1; |
381 | 5.52k | cb->leave_poll_ready = 0; |
382 | | |
383 | 5.52k | return (struct us_internal_async *) cb; |
384 | 5.52k | } |
385 | | |
386 | | // identical code as for timer, make it shared for "callback types" |
387 | 5.52k | void us_internal_async_close(struct us_internal_async *a) { |
388 | 5.52k | struct us_internal_callback_t *cb = (struct us_internal_callback_t *) a; |
389 | | |
390 | 5.52k | us_poll_stop(&cb->p, cb->loop); |
391 | 5.52k | close(us_poll_fd(&cb->p)); |
392 | | |
393 | | /* (regular) sockets are the only polls which are not freed immediately */ |
394 | 5.52k | us_poll_free((struct us_poll_t *) a, cb->loop); |
395 | 5.52k | } |
396 | | |
397 | 5.52k | void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) { |
398 | 5.52k | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a; |
399 | | |
400 | 5.52k | internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb; |
401 | | |
402 | 5.52k | us_poll_start((struct us_poll_t *) a, internal_cb->loop, LIBUS_SOCKET_READABLE); |
403 | 5.52k | } |
404 | | |
405 | 4.18k | void us_internal_async_wakeup(struct us_internal_async *a) { |
406 | 4.18k | uint64_t one = 1; |
407 | 4.18k | int written = write(us_poll_fd((struct us_poll_t *) a), &one, 8); |
408 | 4.18k | (void)written; |
409 | 4.18k | } |
410 | | #else |
411 | | struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) { |
412 | | struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + ext_size); |
413 | | |
414 | | cb->loop = loop; |
415 | | cb->cb_expects_the_loop = 1; |
416 | | cb->leave_poll_ready = 0; |
417 | | |
418 | | /* Bug: us_internal_poll_set_type does not SET the type, it only CHANGES it */ |
419 | | cb->p.state.poll_type = POLL_TYPE_POLLING_IN; |
420 | | us_internal_poll_set_type((struct us_poll_t *) cb, POLL_TYPE_CALLBACK); |
421 | | |
422 | | if (!fallthrough) { |
423 | | loop->num_polls++; |
424 | | } |
425 | | |
426 | | return (struct us_internal_async *) cb; |
427 | | } |
428 | | |
429 | | // identical code as for timer, make it shared for "callback types" |
430 | | void us_internal_async_close(struct us_internal_async *a) { |
431 | | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a; |
432 | | |
433 | | /* Note: This will fail most of the time as there probably is no pending trigger */ |
434 | | struct kevent event; |
435 | | EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_DELETE, 0, 0, internal_cb); |
436 | | kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL); |
437 | | |
438 | | /* (regular) sockets are the only polls which are not freed immediately */ |
439 | | us_poll_free((struct us_poll_t *) a, internal_cb->loop); |
440 | | } |
441 | | |
442 | | void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_internal_async *)) { |
443 | | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a; |
444 | | |
445 | | internal_cb->cb = (void (*)(struct us_internal_callback_t *)) cb; |
446 | | } |
447 | | |
448 | | void us_internal_async_wakeup(struct us_internal_async *a) { |
449 | | struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a; |
450 | | |
451 | | /* In kqueue you really only need to add a triggered oneshot event */ |
452 | | struct kevent event; |
453 | | EV_SET(&event, (uintptr_t) internal_cb, EVFILT_USER, EV_ADD | EV_ONESHOT, NOTE_TRIGGER, 0, internal_cb); |
454 | | kevent(internal_cb->loop->fd, &event, 1, NULL, 0, NULL); |
455 | | } |
456 | | #endif |
457 | | |
458 | | #endif |