Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * File descriptors management functions. |
3 | | * |
4 | | * Copyright 2000-2014 Willy Tarreau <w@1wt.eu> |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or |
7 | | * modify it under the terms of the GNU General Public License |
8 | | * as published by the Free Software Foundation; either version |
9 | | * 2 of the License, or (at your option) any later version. |
10 | | * |
11 | | * There is no direct link between the FD and the updates list. There is only a |
12 | | * bit in the fdtab[] to indicate than a file descriptor is already present in |
13 | | * the updates list. Once an fd is present in the updates list, it will have to |
14 | | * be considered even if its changes are reverted in the middle or if the fd is |
15 | | * replaced. |
16 | | * |
17 | | * The event state for an FD, as found in fdtab[].state, is maintained for each |
18 | | * direction. The state field is built this way, with R bits in the low nibble |
19 | | * and W bits in the high nibble for ease of access and debugging : |
20 | | * |
21 | | * 7 6 5 4 3 2 1 0 |
22 | | * [ 0 | 0 | RW | AW | 0 | 0 | RR | AR ] |
23 | | * |
24 | | * A* = active *R = read |
25 | | * R* = ready *W = write |
26 | | * |
27 | | * An FD is marked "active" when there is a desire to use it. |
28 | | * An FD is marked "ready" when it has not faced a new EAGAIN since last wake-up |
29 | | * (it is a cache of the last EAGAIN regardless of polling changes). Each poller |
30 | | * has its own "polled" state for the same fd, as stored in the polled_mask. |
31 | | * |
32 | | * We have 4 possible states for each direction based on these 2 flags : |
33 | | * |
34 | | * +---+---+----------+---------------------------------------------+ |
35 | | * | R | A | State | Description | |
36 | | * +---+---+----------+---------------------------------------------+ |
37 | | * | 0 | 0 | DISABLED | No activity desired, not ready. | |
38 | | * | 0 | 1 | ACTIVE | Activity desired. | |
39 | | * | 1 | 0 | STOPPED | End of activity. | |
40 | | * | 1 | 1 | READY | Activity desired and reported. | |
41 | | * +---+---+----------+---------------------------------------------+ |
42 | | * |
43 | | * The transitions are pretty simple : |
44 | | * - fd_want_*() : set flag A |
45 | | * - fd_stop_*() : clear flag A |
46 | | * - fd_cant_*() : clear flag R (when facing EAGAIN) |
47 | | * - fd_may_*() : set flag R (upon return from poll()) |
48 | | * |
49 | | * Each poller then computes its own polled state : |
50 | | * if (A) { if (!R) P := 1 } else { P := 0 } |
51 | | * |
52 | | * The state transitions look like the diagram below. |
53 | | * |
54 | | * may +----------+ |
55 | | * ,----| DISABLED | (READY=0, ACTIVE=0) |
56 | | * | +----------+ |
57 | | * | want | ^ |
58 | | * | | | |
59 | | * | v | stop |
60 | | * | +----------+ |
61 | | * | | ACTIVE | (READY=0, ACTIVE=1) |
62 | | * | +----------+ |
63 | | * | | ^ |
64 | | * | may | | |
65 | | * | v | EAGAIN (can't) |
66 | | * | +--------+ |
67 | | * | | READY | (READY=1, ACTIVE=1) |
68 | | * | +--------+ |
69 | | * | stop | ^ |
70 | | * | | | |
71 | | * | v | want |
72 | | * | +---------+ |
73 | | * `--->| STOPPED | (READY=1, ACTIVE=0) |
74 | | * +---------+ |
75 | | */ |
76 | | |
77 | | #include <stdio.h> |
78 | | #include <string.h> |
79 | | #include <unistd.h> |
80 | | #include <fcntl.h> |
81 | | #include <sys/types.h> |
82 | | #include <sys/resource.h> |
83 | | #include <sys/uio.h> |
84 | | |
85 | | #if defined(USE_POLL) |
86 | | #include <poll.h> |
87 | | #include <errno.h> |
88 | | #endif |
89 | | |
90 | | #include <haproxy/api.h> |
91 | | #include <haproxy/activity.h> |
92 | | #include <haproxy/cfgparse.h> |
93 | | #include <haproxy/fd.h> |
94 | | #include <haproxy/global.h> |
95 | | #include <haproxy/log.h> |
96 | | #include <haproxy/port_range.h> |
97 | | #include <haproxy/ticks.h> |
98 | | #include <haproxy/tools.h> |
99 | | |
100 | | |
101 | | struct fdtab *fdtab __read_mostly = NULL; /* array of all the file descriptors */ |
102 | | struct polled_mask *polled_mask __read_mostly = NULL; /* Array for the polled_mask of each fd */ |
103 | | struct fdinfo *fdinfo __read_mostly = NULL; /* less-often used infos for file descriptors */ |
104 | | int totalconn; /* total # of terminated sessions */ |
105 | | int actconn; /* # of active sessions */ |
106 | | |
107 | | struct poller pollers[MAX_POLLERS] __read_mostly; |
108 | | struct poller cur_poller __read_mostly; |
109 | | int nbpollers = 0; |
110 | | |
111 | | volatile struct fdlist update_list[MAX_TGROUPS]; // Global update list |
112 | | |
113 | | THREAD_LOCAL int *fd_updt = NULL; // FD updates list |
114 | | THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list |
115 | | THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread |
116 | | int poller_wr_pipe[MAX_THREADS] __read_mostly; // Pipe to wake the threads |
117 | | |
118 | | volatile int ha_used_fds = 0; // Number of FD we're currently using |
119 | | static struct fdtab *fdtab_addr; /* address of the allocated area containing fdtab */ |
120 | | |
121 | | /* adds fd <fd> to fd list <list> if it was not yet in it */ |
122 | | void fd_add_to_fd_list(volatile struct fdlist *list, int fd) |
123 | 0 | { |
124 | 0 | int next; |
125 | 0 | int new; |
126 | 0 | int old; |
127 | 0 | int last; |
128 | |
|
129 | 0 | redo_next: |
130 | 0 | next = HA_ATOMIC_LOAD(&fdtab[fd].update.next); |
131 | | /* Check that we're not already in the cache, and if not, lock us. */ |
132 | 0 | if (next > -2) |
133 | 0 | goto done; |
134 | 0 | if (next == -2) |
135 | 0 | goto redo_next; |
136 | 0 | if (!_HA_ATOMIC_CAS(&fdtab[fd].update.next, &next, -2)) |
137 | 0 | goto redo_next; |
138 | 0 | __ha_barrier_atomic_store(); |
139 | |
|
140 | 0 | new = fd; |
141 | 0 | redo_last: |
142 | | /* First, insert in the linked list */ |
143 | 0 | last = list->last; |
144 | 0 | old = -1; |
145 | |
|
146 | 0 | fdtab[fd].update.prev = -2; |
147 | | /* Make sure the "prev" store is visible before we update the last entry */ |
148 | 0 | __ha_barrier_store(); |
149 | |
|
150 | 0 | if (unlikely(last == -1)) { |
151 | | /* list is empty, try to add ourselves alone so that list->last=fd */ |
152 | 0 | if (unlikely(!_HA_ATOMIC_CAS(&list->last, &old, new))) |
153 | 0 | goto redo_last; |
154 | | |
155 | | /* list->first was necessary -1, we're guaranteed to be alone here */ |
156 | 0 | list->first = fd; |
157 | 0 | } else { |
158 | | /* adding ourselves past the last element |
159 | | * The CAS will only succeed if its next is -1, |
160 | | * which means it's in the cache, and the last element. |
161 | | */ |
162 | 0 | if (unlikely(!_HA_ATOMIC_CAS(&fdtab[last].update.next, &old, new))) |
163 | 0 | goto redo_last; |
164 | | |
165 | | /* Then, update the last entry */ |
166 | 0 | list->last = fd; |
167 | 0 | } |
168 | 0 | __ha_barrier_store(); |
169 | | /* since we're alone at the end of the list and still locked(-2), |
170 | | * we know no one tried to add past us. Mark the end of list. |
171 | | */ |
172 | 0 | fdtab[fd].update.prev = last; |
173 | 0 | fdtab[fd].update.next = -1; |
174 | 0 | __ha_barrier_store(); |
175 | 0 | done: |
176 | 0 | return; |
177 | 0 | } |
178 | | |
179 | | /* removes fd <fd> from fd list <list> */ |
180 | | void fd_rm_from_fd_list(volatile struct fdlist *list, int fd) |
181 | 0 | { |
182 | 0 | #if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B) |
183 | 0 | volatile union { |
184 | 0 | struct fdlist_entry ent; |
185 | 0 | uint64_t u64; |
186 | 0 | uint32_t u32[2]; |
187 | 0 | } cur_list, next_list; |
188 | 0 | #endif |
189 | 0 | int old; |
190 | 0 | int new = -2; |
191 | 0 | int prev; |
192 | 0 | int next; |
193 | 0 | int last; |
194 | 0 | lock_self: |
195 | 0 | #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW)) |
196 | 0 | next_list.ent.next = next_list.ent.prev = -2; |
197 | 0 | cur_list.ent = *(volatile typeof(fdtab->update)*)&fdtab[fd].update; |
198 | | /* First, attempt to lock our own entries */ |
199 | 0 | do { |
200 | | /* The FD is not in the FD cache, give up */ |
201 | 0 | if (unlikely(cur_list.ent.next <= -3)) |
202 | 0 | return; |
203 | 0 | if (unlikely(cur_list.ent.prev == -2 || cur_list.ent.next == -2)) |
204 | 0 | goto lock_self; |
205 | 0 | } while ( |
206 | 0 | #ifdef HA_CAS_IS_8B |
207 | 0 | unlikely(!_HA_ATOMIC_CAS(((uint64_t *)&fdtab[fd].update), (uint64_t *)&cur_list.u64, next_list.u64)) |
208 | | #else |
209 | | unlikely(!_HA_ATOMIC_DWCAS(((long *)&fdtab[fd].update), (uint32_t *)&cur_list.u32, (const uint32_t *)&next_list.u32)) |
210 | | #endif |
211 | 0 | ); |
212 | 0 | next = cur_list.ent.next; |
213 | 0 | prev = cur_list.ent.prev; |
214 | |
|
215 | | #else |
216 | | lock_self_next: |
217 | | next = HA_ATOMIC_LOAD(&fdtab[fd].update.next); |
218 | | if (next == -2) |
219 | | goto lock_self_next; |
220 | | if (next <= -3) |
221 | | goto done; |
222 | | if (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].update.next, &next, -2))) |
223 | | goto lock_self_next; |
224 | | lock_self_prev: |
225 | | prev = HA_ATOMIC_LOAD(&fdtab[fd].update.prev); |
226 | | if (prev == -2) |
227 | | goto lock_self_prev; |
228 | | if (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].update.prev, &prev, -2))) |
229 | | goto lock_self_prev; |
230 | | #endif |
231 | 0 | __ha_barrier_atomic_store(); |
232 | | |
233 | | /* Now, lock the entries of our neighbours */ |
234 | 0 | if (likely(prev != -1)) { |
235 | 0 | redo_prev: |
236 | 0 | old = fd; |
237 | |
|
238 | 0 | if (unlikely(!_HA_ATOMIC_CAS(&fdtab[prev].update.next, &old, new))) { |
239 | 0 | if (unlikely(old == -2)) { |
240 | | /* Neighbour already locked, give up and |
241 | | * retry again once he's done |
242 | | */ |
243 | 0 | fdtab[fd].update.prev = prev; |
244 | 0 | __ha_barrier_store(); |
245 | 0 | fdtab[fd].update.next = next; |
246 | 0 | __ha_barrier_store(); |
247 | 0 | goto lock_self; |
248 | 0 | } |
249 | 0 | goto redo_prev; |
250 | 0 | } |
251 | 0 | } |
252 | 0 | if (likely(next != -1)) { |
253 | 0 | redo_next: |
254 | 0 | old = fd; |
255 | 0 | if (unlikely(!_HA_ATOMIC_CAS(&fdtab[next].update.prev, &old, new))) { |
256 | 0 | if (unlikely(old == -2)) { |
257 | | /* Neighbour already locked, give up and |
258 | | * retry again once he's done |
259 | | */ |
260 | 0 | if (prev != -1) { |
261 | 0 | fdtab[prev].update.next = fd; |
262 | 0 | __ha_barrier_store(); |
263 | 0 | } |
264 | 0 | fdtab[fd].update.prev = prev; |
265 | 0 | __ha_barrier_store(); |
266 | 0 | fdtab[fd].update.next = next; |
267 | 0 | __ha_barrier_store(); |
268 | 0 | goto lock_self; |
269 | 0 | } |
270 | 0 | goto redo_next; |
271 | 0 | } |
272 | 0 | } |
273 | 0 | if (list->first == fd) |
274 | 0 | list->first = next; |
275 | 0 | __ha_barrier_store(); |
276 | 0 | last = list->last; |
277 | 0 | while (unlikely(last == fd && (!_HA_ATOMIC_CAS(&list->last, &last, prev)))) |
278 | 0 | __ha_compiler_barrier(); |
279 | | /* Make sure we let other threads know we're no longer in cache, |
280 | | * before releasing our neighbours. |
281 | | */ |
282 | 0 | __ha_barrier_store(); |
283 | 0 | if (likely(prev != -1)) |
284 | 0 | fdtab[prev].update.next = next; |
285 | 0 | __ha_barrier_store(); |
286 | 0 | if (likely(next != -1)) |
287 | 0 | fdtab[next].update.prev = prev; |
288 | 0 | __ha_barrier_store(); |
289 | | /* Ok, now we're out of the fd cache */ |
290 | 0 | fdtab[fd].update.next = -(next + 4); |
291 | 0 | __ha_barrier_store(); |
292 | 0 | done: |
293 | 0 | return; |
294 | 0 | } |
295 | | |
296 | | /* deletes the FD once nobody uses it anymore, as detected by the caller by its |
297 | | * thread_mask being zero and its running mask turning to zero. There is no |
298 | | * protection against concurrent accesses, it's up to the caller to make sure |
299 | | * only the last thread will call it. If called under isolation, it is safe to |
300 | | * call this from another group than the FD's. This is only for internal use, |
301 | | * please use fd_delete() instead. |
302 | | */ |
303 | | void _fd_delete_orphan(int fd) |
304 | 0 | { |
305 | 0 | int tgrp = fd_tgid(fd); |
306 | 0 | uint fd_disown; |
307 | |
|
308 | 0 | fd_disown = fdtab[fd].state & FD_DISOWN; |
309 | 0 | if (fdtab[fd].state & FD_LINGER_RISK) { |
310 | | /* this is generally set when connecting to servers */ |
311 | 0 | DISGUISE(setsockopt(fd, SOL_SOCKET, SO_LINGER, |
312 | 0 | (struct linger *) &nolinger, sizeof(struct linger))); |
313 | 0 | } |
314 | | |
315 | | /* It's expected that a close() will result in the FD disappearing from |
316 | | * pollers, but some pollers may have some internal bookkeeping to be |
317 | | * done prior to the call (e.g. remove references from internal tables). |
318 | | */ |
319 | 0 | if (cur_poller.clo) |
320 | 0 | cur_poller.clo(fd); |
321 | | |
322 | | /* now we're about to reset some of this FD's fields. We don't want |
323 | | * anyone to grab it anymore and we need to make sure those which could |
324 | | * possibly have stumbled upon it right now are leaving before we |
325 | | * proceed. This is done in two steps. First we reset the tgid so that |
326 | | * fd_take_tgid() and fd_grab_tgid() fail, then we wait for existing |
327 | | * ref counts to drop. Past this point we're alone dealing with the |
328 | | * FD's thead/running/update/polled masks. |
329 | | */ |
330 | 0 | fd_reset_tgid(fd); |
331 | |
|
332 | 0 | while (_HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) != 0) // refc==0 ? |
333 | 0 | __ha_cpu_relax(); |
334 | | |
335 | | /* we don't want this FD anymore in the global list */ |
336 | 0 | fd_rm_from_fd_list(&update_list[tgrp - 1], fd); |
337 | | |
338 | | /* no more updates on this FD are relevant anymore */ |
339 | 0 | HA_ATOMIC_STORE(&fdtab[fd].update_mask, 0); |
340 | 0 | if (fd_nbupdt > 0 && fd_updt[fd_nbupdt - 1] == fd) |
341 | 0 | fd_nbupdt--; |
342 | |
|
343 | 0 | port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port); |
344 | 0 | polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0; |
345 | |
|
346 | 0 | fdtab[fd].state = 0; |
347 | |
|
348 | | #ifdef DEBUG_FD |
349 | | fdtab[fd].event_count = 0; |
350 | | #endif |
351 | 0 | fdinfo[fd].port_range = NULL; |
352 | 0 | fdtab[fd].owner = NULL; |
353 | | |
354 | | /* perform the close() call last as it's what unlocks the instant reuse |
355 | | * of this FD by any other thread. |
356 | | */ |
357 | 0 | if (!fd_disown) |
358 | 0 | close(fd); |
359 | 0 | _HA_ATOMIC_DEC(&ha_used_fds); |
360 | 0 | } |
361 | | |
362 | | /* Deletes an FD from the fdsets. The file descriptor is also closed, possibly |
363 | | * asynchronously. It is safe to call it from another thread from the same |
364 | | * group as the FD's or from a thread from a different group. However if called |
365 | | * from a thread from another group, there is an extra cost involved because |
366 | | * the operation is performed under thread isolation, so doing so must be |
367 | | * reserved for ultra-rare cases (e.g. stopping a listener). |
368 | | */ |
369 | | void fd_delete(int fd) |
370 | 0 | { |
371 | | /* This must never happen and would definitely indicate a bug, in |
372 | | * addition to overwriting some unexpected memory areas. |
373 | | */ |
374 | 0 | BUG_ON(fd < 0 || fd >= global.maxsock); |
375 | | |
376 | | /* NOTE: The master when going into reexec mode re-closes all FDs after |
377 | | * they were already dispatched. But we know we didn't start the polling |
378 | | * threads so we can still close them. The masks will probably not match |
379 | | * however so we force the value and erase the refcount if any. |
380 | | */ |
381 | 0 | if (unlikely(global.mode & MODE_STARTING)) |
382 | 0 | fdtab[fd].refc_tgid = ti->tgid; |
383 | | |
384 | | /* the tgid cannot change before a complete close so we should never |
385 | | * face the situation where we try to close an fd that was reassigned. |
386 | | * However there is one corner case where this happens, it's when an |
387 | | * attempt to pause a listener fails (e.g. abns), leaving the listener |
388 | | * in fault state and it is forcefully stopped. This needs to be done |
389 | | * under isolation, and it's quite rare (i.e. once per such FD per |
390 | | * process). Since we'll be isolated we can clear the thread mask and |
391 | | * close the FD ourselves. |
392 | | */ |
393 | 0 | if (unlikely(fd_tgid(fd) != ti->tgid)) { |
394 | 0 | int must_isolate = !thread_isolated() && !(global.mode & MODE_STOPPING); |
395 | |
|
396 | 0 | if (must_isolate) |
397 | 0 | thread_isolate(); |
398 | |
|
399 | 0 | HA_ATOMIC_STORE(&fdtab[fd].thread_mask, 0); |
400 | 0 | HA_ATOMIC_STORE(&fdtab[fd].running_mask, 0); |
401 | 0 | _fd_delete_orphan(fd); |
402 | |
|
403 | 0 | if (must_isolate) |
404 | 0 | thread_release(); |
405 | 0 | return; |
406 | 0 | } |
407 | | |
408 | | /* we must postpone removal of an FD that may currently be in use |
409 | | * by another thread. This can happen in the following two situations: |
410 | | * - after a takeover, the owning thread closes the connection but |
411 | | * the previous one just woke up from the poller and entered |
412 | | * the FD handler iocb. That thread holds an entry in running_mask |
413 | | * and requires removal protection. |
414 | | * - multiple threads are accepting connections on a listener, and |
415 | | * one of them (or even an separate one) decides to unbind the |
416 | | * listener under the listener's lock while other ones still hold |
417 | | * the running bit. |
418 | | * In both situations the FD is marked as unused (thread_mask = 0) and |
419 | | * will not take new bits in its running_mask so we have the guarantee |
420 | | * that the last thread eliminating running_mask is the one allowed to |
421 | | * safely delete the FD. Most of the time it will be the current thread. |
422 | | * We still need to set and check the one-shot flag FD_MUST_CLOSE |
423 | | * to take care of the rare cases where a thread wakes up on late I/O |
424 | | * before the thread_mask is zero, and sets its bit in the running_mask |
425 | | * just after the current thread finishes clearing its own bit, hence |
426 | | * the two threads see themselves as last ones (which they really are). |
427 | | */ |
428 | | |
429 | 0 | HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit); |
430 | 0 | HA_ATOMIC_OR(&fdtab[fd].state, FD_MUST_CLOSE); |
431 | 0 | HA_ATOMIC_STORE(&fdtab[fd].thread_mask, 0); |
432 | 0 | if (fd_clr_running(fd) == ti->ltid_bit) { |
433 | 0 | if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) { |
434 | 0 | _fd_delete_orphan(fd); |
435 | 0 | } |
436 | 0 | } |
437 | 0 | } |
438 | | |
439 | | /* makes the new fd non-blocking and clears all other O_* flags; this is meant |
440 | | * to be used on new FDs. Returns -1 on failure. The result is disguised at the |
441 | | * end because some callers need to be able to ignore it regardless of the libc |
442 | | * attributes. |
443 | | */ |
444 | | int fd_set_nonblock(int fd) |
445 | 0 | { |
446 | 0 | int ret = fcntl(fd, F_SETFL, O_NONBLOCK); |
447 | |
|
448 | 0 | return DISGUISE(ret); |
449 | 0 | } |
450 | | |
451 | | /* sets the close-on-exec flag on fd; returns -1 on failure. The result is |
452 | | * disguised at the end because some callers need to be able to ignore it |
453 | | * regardless of the libc attributes. |
454 | | */ |
455 | | int fd_set_cloexec(int fd) |
456 | 0 | { |
457 | 0 | int flags, ret; |
458 | |
|
459 | 0 | flags = fcntl(fd, F_GETFD); |
460 | 0 | flags |= FD_CLOEXEC; |
461 | 0 | ret = fcntl(fd, F_SETFD, flags); |
462 | 0 | return DISGUISE(ret); |
463 | 0 | } |
464 | | |
465 | | /* |
466 | | * Take over a FD belonging to another thread. |
467 | | * unexpected_conn is the expected owner of the fd. |
468 | | * Returns 0 on success, and -1 on failure. |
469 | | */ |
470 | | int fd_takeover(int fd, void *expected_owner) |
471 | 0 | { |
472 | 0 | unsigned long old; |
473 | | |
474 | | /* protect ourself against a delete then an insert for the same fd, |
475 | | * if it happens, then the owner will no longer be the expected |
476 | | * connection. |
477 | | */ |
478 | 0 | if (fdtab[fd].owner != expected_owner) |
479 | 0 | return -1; |
480 | | |
481 | | /* we must be alone to work on this idle FD. If not, it means that its |
482 | | * poller is currently waking up and is about to use it, likely to |
483 | | * close it on shut/error, but maybe also to process any unexpectedly |
484 | | * pending data. It's also possible that the FD was closed and |
485 | | * reassigned to another thread group, so let's be careful. |
486 | | */ |
487 | 0 | if (unlikely(!fd_grab_tgid(fd, ti->tgid))) |
488 | 0 | return -1; |
489 | | |
490 | 0 | old = 0; |
491 | 0 | if (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &old, ti->ltid_bit)) { |
492 | 0 | fd_drop_tgid(fd); |
493 | 0 | return -1; |
494 | 0 | } |
495 | | |
496 | | /* success, from now on it's ours */ |
497 | 0 | HA_ATOMIC_STORE(&fdtab[fd].thread_mask, ti->ltid_bit); |
498 | | |
499 | | /* Make sure the FD doesn't have the active bit. It is possible that |
500 | | * the fd is polled by the thread that used to own it, the new thread |
501 | | * is supposed to call subscribe() later, to activate polling. |
502 | | */ |
503 | 0 | fd_stop_recv(fd); |
504 | | |
505 | | /* we're done with it */ |
506 | 0 | HA_ATOMIC_AND(&fdtab[fd].running_mask, ~ti->ltid_bit); |
507 | | |
508 | | /* no more changes planned */ |
509 | 0 | fd_drop_tgid(fd); |
510 | 0 | return 0; |
511 | 0 | } |
512 | | |
513 | | void updt_fd_polling(const int fd) |
514 | 0 | { |
515 | 0 | uint tgrp = fd_take_tgid(fd); |
516 | | |
517 | | /* closed ? may happen */ |
518 | 0 | if (!tgrp) |
519 | 0 | return; |
520 | | |
521 | 0 | if (unlikely(tgrp != tgid && tgrp <= MAX_TGROUPS)) { |
522 | | /* Hmmm delivered an update for another group... That may |
523 | | * happen on suspend/resume of a listener for example when |
524 | | * the FD was not even marked for running. Let's broadcast |
525 | | * the update. |
526 | | */ |
527 | 0 | unsigned long update_mask = fdtab[fd].update_mask; |
528 | 0 | int thr; |
529 | |
|
530 | 0 | while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, |
531 | 0 | _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp - 1].threads_enabled))) |
532 | 0 | __ha_cpu_relax(); |
533 | |
|
534 | 0 | fd_add_to_fd_list(&update_list[tgrp - 1], fd); |
535 | |
|
536 | 0 | thr = one_among_mask(fdtab[fd].thread_mask & ha_tgroup_info[tgrp - 1].threads_enabled, |
537 | 0 | statistical_prng_range(ha_tgroup_info[tgrp - 1].count)); |
538 | 0 | thr += ha_tgroup_info[tgrp - 1].base; |
539 | 0 | wake_thread(thr); |
540 | |
|
541 | 0 | fd_drop_tgid(fd); |
542 | 0 | return; |
543 | 0 | } |
544 | | |
545 | 0 | fd_drop_tgid(fd); |
546 | |
|
547 | 0 | if (tg->threads_enabled == 1UL || (fdtab[fd].thread_mask & tg->threads_enabled) == ti->ltid_bit) { |
548 | 0 | if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) |
549 | 0 | return; |
550 | | |
551 | 0 | fd_updt[fd_nbupdt++] = fd; |
552 | 0 | } else { |
553 | 0 | unsigned long update_mask = fdtab[fd].update_mask; |
554 | 0 | do { |
555 | 0 | if (update_mask == fdtab[fd].thread_mask) // FIXME: this works only on thread-groups 1 |
556 | 0 | return; |
557 | 0 | } while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, fdtab[fd].thread_mask)); |
558 | | |
559 | 0 | fd_add_to_fd_list(&update_list[tgid - 1], fd); |
560 | |
|
561 | 0 | if (fd_active(fd) && !(fdtab[fd].thread_mask & ti->ltid_bit)) { |
562 | | /* we need to wake up another thread to handle it immediately, any will fit, |
563 | | * so let's pick a random one so that it doesn't always end up on the same. |
564 | | */ |
565 | 0 | int thr = one_among_mask(fdtab[fd].thread_mask & tg->threads_enabled, |
566 | 0 | statistical_prng_range(tg->count)); |
567 | 0 | thr += tg->base; |
568 | 0 | wake_thread(thr); |
569 | 0 | } |
570 | 0 | } |
571 | 0 | } |
572 | | |
573 | | /* Update events seen for FD <fd> and its state if needed. This should be |
574 | | * called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_* |
575 | | * doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are |
576 | | * allowed to be reported regardless of R/W readiness. Returns one of |
577 | | * FD_UPDT_*. |
578 | | */ |
579 | | int fd_update_events(int fd, uint evts) |
580 | 0 | { |
581 | 0 | unsigned long locked; |
582 | 0 | uint old, new; |
583 | 0 | uint new_flags, must_stop; |
584 | 0 | ulong rmask, tmask; |
585 | |
|
586 | 0 | _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_STUCK); // this thread is still running |
587 | |
|
588 | 0 | if (unlikely(!fd_grab_tgid(fd, ti->tgid))) { |
589 | | /* the FD changed to another tgid, we can't safely |
590 | | * check it anymore. The bits in the masks are not |
591 | | * ours anymore and we're not allowed to touch them. |
592 | | * Ours have already been cleared and the FD was |
593 | | * closed in between so we can safely leave now. |
594 | | */ |
595 | 0 | activity[tid].poll_drop_fd++; |
596 | 0 | return FD_UPDT_CLOSED; |
597 | 0 | } |
598 | | |
599 | | /* Do not take running_mask if not strictly needed (will trigger a |
600 | | * cosmetic BUG_ON() in fd_insert() anyway if done). |
601 | | */ |
602 | 0 | tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); |
603 | 0 | if (!(tmask & ti->ltid_bit)) |
604 | 0 | goto do_update; |
605 | | |
606 | 0 | HA_ATOMIC_OR(&fdtab[fd].running_mask, ti->ltid_bit); |
607 | | |
608 | | /* From this point, our bit may possibly be in thread_mask, but it may |
609 | | * still vanish, either because a takeover completed just before taking |
610 | | * the bit above with the new owner deleting the FD, or because a |
611 | | * takeover started just before taking the bit. In order to make sure a |
612 | | * started takeover is complete, we need to verify that all bits of |
613 | | * running_mask are present in thread_mask, since takeover first takes |
614 | | * running then atomically replaces thread_mask. Once it's stable, if |
615 | | * our bit remains there, no further takeover may happen because we |
616 | | * hold running, but if our bit is not there it means we've lost the |
617 | | * takeover race and have to decline touching the FD. Regarding the |
618 | | * risk of deletion, our bit in running_mask prevents fd_delete() from |
619 | | * finalizing the close, and the caller will leave the FD with a zero |
620 | | * thread_mask and the FD_MUST_CLOSE flag set. It will then be our |
621 | | * responsibility to close it. |
622 | | */ |
623 | 0 | do { |
624 | 0 | rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); |
625 | 0 | tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); |
626 | 0 | rmask &= ~ti->ltid_bit; |
627 | 0 | } while (rmask & ~tmask); |
628 | | |
629 | | /* Now tmask is stable. Do nothing if the FD was taken over under us */ |
630 | |
|
631 | 0 | if (!(tmask & ti->ltid_bit)) { |
632 | | /* a takeover has started */ |
633 | 0 | activity[tid].poll_skip_fd++; |
634 | |
|
635 | 0 | if (fd_clr_running(fd) == ti->ltid_bit) |
636 | 0 | goto closed_or_migrated; |
637 | | |
638 | 0 | goto do_update; |
639 | 0 | } |
640 | | |
641 | | /* with running we're safe now, we can drop the reference */ |
642 | 0 | fd_drop_tgid(fd); |
643 | |
|
644 | 0 | locked = (tmask != ti->ltid_bit); |
645 | | |
646 | | /* OK now we are guaranteed that our thread_mask was present and |
647 | | * that we're allowed to update the FD. |
648 | | */ |
649 | |
|
650 | 0 | new_flags = |
651 | 0 | ((evts & FD_EV_READY_R) ? FD_POLL_IN : 0) | |
652 | 0 | ((evts & FD_EV_READY_W) ? FD_POLL_OUT : 0) | |
653 | 0 | ((evts & FD_EV_SHUT_R) ? FD_POLL_HUP : 0) | |
654 | 0 | ((evts & FD_EV_ERR_RW) ? FD_POLL_ERR : 0); |
655 | | |
656 | | /* SHUTW reported while FD was active for writes is an error */ |
657 | 0 | if ((fdtab[fd].state & FD_EV_ACTIVE_W) && (evts & FD_EV_SHUT_W)) |
658 | 0 | new_flags |= FD_POLL_ERR; |
659 | | |
660 | | /* compute the inactive events reported late that must be stopped */ |
661 | 0 | must_stop = 0; |
662 | 0 | if (unlikely(!fd_active(fd))) { |
663 | | /* both sides stopped */ |
664 | 0 | must_stop = FD_POLL_IN | FD_POLL_OUT; |
665 | 0 | } |
666 | 0 | else if (unlikely(!fd_recv_active(fd) && (evts & (FD_EV_READY_R | FD_EV_SHUT_R | FD_EV_ERR_RW)))) { |
667 | | /* only send remains */ |
668 | 0 | must_stop = FD_POLL_IN; |
669 | 0 | } |
670 | 0 | else if (unlikely(!fd_send_active(fd) && (evts & (FD_EV_READY_W | FD_EV_SHUT_W | FD_EV_ERR_RW)))) { |
671 | | /* only recv remains */ |
672 | 0 | must_stop = FD_POLL_OUT; |
673 | 0 | } |
674 | |
|
675 | 0 | if (new_flags & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) |
676 | 0 | new_flags |= FD_EV_READY_R; |
677 | |
|
678 | 0 | if (new_flags & (FD_POLL_OUT | FD_POLL_ERR)) |
679 | 0 | new_flags |= FD_EV_READY_W; |
680 | |
|
681 | 0 | old = fdtab[fd].state; |
682 | 0 | new = (old & ~FD_POLL_UPDT_MASK) | new_flags; |
683 | |
|
684 | 0 | if (unlikely(locked)) { |
685 | | /* Locked FDs (those with more than 2 threads) are atomically updated */ |
686 | 0 | while (unlikely(new != old && !_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new))) |
687 | 0 | new = (old & ~FD_POLL_UPDT_MASK) | new_flags; |
688 | 0 | } else { |
689 | 0 | if (new != old) |
690 | 0 | fdtab[fd].state = new; |
691 | 0 | } |
692 | |
|
693 | 0 | if (fdtab[fd].iocb && fd_active(fd)) { |
694 | 0 | fdtab[fd].iocb(fd); |
695 | 0 | } |
696 | | |
697 | | /* |
698 | | * We entered iocb with running set and with the valid tgid. |
699 | | * Since then, this is what could have happened: |
700 | | * - another thread tried to close the FD (e.g. timeout task from |
701 | | * another one that owns it). We still have running set, but not |
702 | | * tmask. We must call fd_clr_running() then _fd_delete_orphan() |
703 | | * if we were the last one. |
704 | | * |
705 | | * - the iocb tried to close the FD => bit no more present in running, |
706 | | * nothing to do. If it managed to close it, the poller's ->clo() |
707 | | * has already been called. |
708 | | * |
709 | | * - after we closed, the FD was reassigned to another thread in |
710 | | * another group => running not present, tgid differs, nothing to |
711 | | * do because if it got reassigned it indicates it was already |
712 | | * closed. |
713 | | * |
714 | | * There's no risk of takeover of the valid FD here during this period. |
715 | | * Also if we still have running, immediately after we release it, the |
716 | | * events above might instantly happen due to another thread taking |
717 | | * over. |
718 | | * |
719 | | * As such, the only cases where the FD is still relevant are: |
720 | | * - tgid still set and running still set (most common) |
721 | | * - tgid still valid but running cleared due to fd_delete(): we may |
722 | | * still need to stop polling otherwise we may keep it enabled |
723 | | * while waiting for other threads to close it. |
724 | | * And given that we may need to program a tentative update in case we |
725 | | * don't immediately close, it's easier to grab the tgid during the |
726 | | * whole check. |
727 | | */ |
728 | |
|
729 | 0 | if (!fd_grab_tgid(fd, tgid)) |
730 | 0 | return FD_UPDT_CLOSED; |
731 | | |
732 | 0 | tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask); |
733 | | |
734 | | /* another thread might have attempted to close this FD in the mean |
735 | | * time (e.g. timeout task) striking on a previous thread and closing. |
736 | | * This is detected by us being the last owners of a running_mask bit, |
737 | | * and the thread_mask being zero. At the moment we release the running |
738 | | * bit, a takeover may also happen, so in practice we check for our loss |
739 | | * of the thread_mask bitboth thread_mask and running_mask being 0 after |
740 | | * we remove ourselves last. There is no risk the FD gets reassigned |
741 | | * to a different group since it's not released until the real close() |
742 | | * in _fd_delete_orphan(). |
743 | | */ |
744 | 0 | if (fd_clr_running(fd) == ti->ltid_bit && !(tmask & ti->ltid_bit)) |
745 | 0 | goto closed_or_migrated; |
746 | | |
747 | | /* we had to stop this FD and it still must be stopped after the I/O |
748 | | * cb's changes, so let's program an update for this. |
749 | | */ |
750 | 0 | if (must_stop && !(fdtab[fd].update_mask & ti->ltid_bit)) { |
751 | 0 | if (((must_stop & FD_POLL_IN) && !fd_recv_active(fd)) || |
752 | 0 | ((must_stop & FD_POLL_OUT) && !fd_send_active(fd))) |
753 | 0 | if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) |
754 | 0 | fd_updt[fd_nbupdt++] = fd; |
755 | 0 | } |
756 | |
|
757 | 0 | fd_drop_tgid(fd); |
758 | 0 | return FD_UPDT_DONE; |
759 | | |
760 | 0 | closed_or_migrated: |
761 | | /* We only come here once we've last dropped running and the FD is |
762 | | * not for us as per !(tmask & tid_bit). It may imply we're |
763 | | * responsible for closing it. Otherwise it's just a migration. |
764 | | */ |
765 | 0 | if (HA_ATOMIC_BTR(&fdtab[fd].state, FD_MUST_CLOSE_BIT)) { |
766 | 0 | fd_drop_tgid(fd); |
767 | 0 | _fd_delete_orphan(fd); |
768 | 0 | return FD_UPDT_CLOSED; |
769 | 0 | } |
770 | | |
771 | | /* So we were alone, no close bit, at best the FD was migrated, at |
772 | | * worst it's in the process of being closed by another thread. We must |
773 | | * be ultra-careful as it can be re-inserted by yet another thread as |
774 | | * the result of socket() or accept(). Let's just tell the poller the |
775 | | * FD was lost. If it was closed it was already removed and this will |
776 | | * only cost an update for nothing. |
777 | | */ |
778 | | |
779 | 0 | do_update: |
780 | | /* The FD is not closed but we don't want the poller to wake up for |
781 | | * it anymore. |
782 | | */ |
783 | 0 | if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid)) |
784 | 0 | fd_updt[fd_nbupdt++] = fd; |
785 | |
|
786 | 0 | fd_drop_tgid(fd); |
787 | 0 | return FD_UPDT_MIGRATED; |
788 | 0 | } |
789 | | |
790 | | /* This is used by pollers at boot time to re-register desired events for |
791 | | * all FDs after new pollers have been created. It doesn't do much, it checks |
792 | | * that their thread group matches the one in argument, and that the thread |
793 | | * mask matches at least one of the bits in the mask, and if so, marks the FD |
794 | | * as updated. |
795 | | */ |
796 | | void fd_reregister_all(int tgrp, ulong mask) |
797 | 0 | { |
798 | 0 | int fd; |
799 | |
|
800 | 0 | for (fd = 0; fd < global.maxsock; fd++) { |
801 | 0 | if (!fdtab[fd].owner) |
802 | 0 | continue; |
803 | | |
804 | | /* make sure we don't register other tgroups' FDs. We just |
805 | | * avoid needlessly taking the lock if not needed. |
806 | | */ |
807 | 0 | if (!(_HA_ATOMIC_LOAD(&fdtab[fd].thread_mask) & mask) || |
808 | 0 | !fd_grab_tgid(fd, tgrp)) |
809 | 0 | continue; // was not for us anyway |
810 | | |
811 | 0 | if (_HA_ATOMIC_LOAD(&fdtab[fd].thread_mask) & mask) |
812 | 0 | updt_fd_polling(fd); |
813 | 0 | fd_drop_tgid(fd); |
814 | 0 | } |
815 | 0 | } |
816 | | |
817 | | /* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg> |
818 | | * optionally followed by a newline if <nl> is non-null, to file descriptor |
819 | | * <fd>. The message is sent atomically using writev(). It may be truncated to |
820 | | * <maxlen> bytes if <maxlen> is non-null. There is no distinction between the |
821 | | * two lists, it's just a convenience to help the caller prepend some prefixes |
822 | | * when necessary. It takes the fd's lock to make sure no other thread will |
823 | | * write to the same fd in parallel. Returns the number of bytes sent, or <=0 |
824 | | * on failure. A limit to 31 total non-empty segments is enforced. The caller |
825 | | * is responsible for taking care of making the fd non-blocking. |
826 | | */ |
827 | | ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl) |
828 | 0 | { |
829 | 0 | struct iovec iovec[32]; |
830 | 0 | size_t sent = 0; |
831 | 0 | int vec = 0; |
832 | 0 | int attempts = 0; |
833 | |
|
834 | 0 | if (!maxlen) |
835 | 0 | maxlen = ~0; |
836 | | |
837 | | /* keep one char for a possible trailing '\n' in any case */ |
838 | 0 | maxlen--; |
839 | | |
840 | | /* make an iovec from the concatenation of all parts of the original |
841 | | * message. Skip empty fields and truncate the whole message to maxlen, |
842 | | * leaving one spare iovec for the '\n'. |
843 | | */ |
844 | 0 | while (vec < (sizeof(iovec) / sizeof(iovec[0]) - 1)) { |
845 | 0 | if (!npfx) { |
846 | 0 | pfx = msg; |
847 | 0 | npfx = nmsg; |
848 | 0 | nmsg = 0; |
849 | 0 | if (!npfx) |
850 | 0 | break; |
851 | 0 | } |
852 | | |
853 | 0 | iovec[vec].iov_base = pfx->ptr; |
854 | 0 | iovec[vec].iov_len = MIN(maxlen, pfx->len); |
855 | 0 | maxlen -= iovec[vec].iov_len; |
856 | 0 | if (iovec[vec].iov_len) |
857 | 0 | vec++; |
858 | 0 | pfx++; npfx--; |
859 | 0 | }; |
860 | |
|
861 | 0 | if (nl) { |
862 | 0 | iovec[vec].iov_base = "\n"; |
863 | 0 | iovec[vec].iov_len = 1; |
864 | 0 | vec++; |
865 | 0 | } |
866 | | |
867 | | /* make sure we never interleave writes and we never block. This means |
868 | | * we prefer to fail on collision than to block. But we don't want to |
869 | | * lose too many logs so we just perform a few lock attempts then give |
870 | | * up. |
871 | | */ |
872 | |
|
873 | 0 | while (HA_ATOMIC_BTS(&fdtab[fd].state, FD_EXCL_SYSCALL_BIT)) { |
874 | 0 | if (++attempts >= 200) { |
875 | | /* so that the caller knows the message couldn't be delivered */ |
876 | 0 | sent = -1; |
877 | 0 | errno = EAGAIN; |
878 | 0 | goto leave; |
879 | 0 | } |
880 | 0 | ha_thread_relax(); |
881 | 0 | } |
882 | | |
883 | 0 | if (unlikely(!(fdtab[fd].state & FD_INITIALIZED))) { |
884 | 0 | HA_ATOMIC_OR(&fdtab[fd].state, FD_INITIALIZED); |
885 | 0 | if (!isatty(fd)) |
886 | 0 | fd_set_nonblock(fd); |
887 | 0 | } |
888 | 0 | sent = writev(fd, iovec, vec); |
889 | 0 | HA_ATOMIC_BTR(&fdtab[fd].state, FD_EXCL_SYSCALL_BIT); |
890 | |
|
891 | 0 | leave: |
892 | | /* sent > 0 if the message was delivered */ |
893 | 0 | return sent; |
894 | 0 | } |
895 | | |
896 | | #if defined(USE_CLOSEFROM) |
897 | | void my_closefrom(int start) |
898 | | { |
899 | | closefrom(start); |
900 | | } |
901 | | |
902 | | #elif defined(USE_POLL) |
903 | | /* This is a portable implementation of closefrom(). It closes all open file |
904 | | * descriptors starting at <start> and above. It relies on the fact that poll() |
905 | | * will return POLLNVAL for each invalid (hence close) file descriptor passed |
906 | | * in argument in order to skip them. It acts with batches of FDs and will |
907 | | * typically perform one poll() call per 1024 FDs so the overhead is low in |
908 | | * case all FDs have to be closed. |
909 | | */ |
910 | | void my_closefrom(int start) |
911 | 0 | { |
912 | 0 | struct pollfd poll_events[1024]; |
913 | 0 | struct rlimit limit; |
914 | 0 | int nbfds, fd, ret, idx; |
915 | 0 | int step, next; |
916 | |
|
917 | 0 | if (getrlimit(RLIMIT_NOFILE, &limit) == 0) |
918 | 0 | step = nbfds = limit.rlim_cur; |
919 | 0 | else |
920 | 0 | step = nbfds = 0; |
921 | |
|
922 | 0 | if (nbfds <= 0) { |
923 | | /* set safe limit */ |
924 | 0 | nbfds = 1024; |
925 | 0 | step = 256; |
926 | 0 | } |
927 | |
|
928 | 0 | if (step > sizeof(poll_events) / sizeof(poll_events[0])) |
929 | 0 | step = sizeof(poll_events) / sizeof(poll_events[0]); |
930 | |
|
931 | 0 | while (start < nbfds) { |
932 | 0 | next = (start / step + 1) * step; |
933 | |
|
934 | 0 | for (fd = start; fd < next && fd < nbfds; fd++) { |
935 | 0 | poll_events[fd - start].fd = fd; |
936 | 0 | poll_events[fd - start].events = 0; |
937 | 0 | } |
938 | |
|
939 | 0 | do { |
940 | 0 | ret = poll(poll_events, fd - start, 0); |
941 | 0 | if (ret >= 0) |
942 | 0 | break; |
943 | 0 | } while (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR || errno == ENOMEM); |
944 | | |
945 | 0 | if (ret) |
946 | 0 | ret = fd - start; |
947 | |
|
948 | 0 | for (idx = 0; idx < ret; idx++) { |
949 | 0 | if (poll_events[idx].revents & POLLNVAL) |
950 | 0 | continue; /* already closed */ |
951 | | |
952 | 0 | fd = poll_events[idx].fd; |
953 | 0 | close(fd); |
954 | 0 | } |
955 | 0 | start = next; |
956 | 0 | } |
957 | 0 | } |
958 | | |
959 | | #else // defined(USE_POLL) |
960 | | |
961 | | /* This is a portable implementation of closefrom(). It closes all open file |
962 | | * descriptors starting at <start> and above. This is a naive version for use |
963 | | * when the operating system provides no alternative. |
964 | | */ |
965 | | void my_closefrom(int start) |
966 | | { |
967 | | struct rlimit limit; |
968 | | int nbfds; |
969 | | |
970 | | if (getrlimit(RLIMIT_NOFILE, &limit) == 0) |
971 | | nbfds = limit.rlim_cur; |
972 | | else |
973 | | nbfds = 0; |
974 | | |
975 | | if (nbfds <= 0) |
976 | | nbfds = 1024; /* safe limit */ |
977 | | |
978 | | while (start < nbfds) |
979 | | close(start++); |
980 | | } |
981 | | #endif // defined(USE_POLL) |
982 | | |
983 | | /* Sets the RLIMIT_NOFILE setting to <new_limit> and returns the previous one |
984 | | * in <old_limit> if the pointer is not NULL, even if set_rlimit() fails. The |
985 | | * two pointers may point to the same variable as the copy happens after |
986 | | * setting the new value. The value is only changed if at least one of the new |
987 | | * limits is strictly higher than the current one, otherwise returns 0 without |
988 | | * changing anything. The getrlimit() or setrlimit() syscall return value is |
989 | | * returned and errno is preserved. |
990 | | */ |
991 | | int raise_rlim_nofile(struct rlimit *old_limit, struct rlimit *new_limit) |
992 | 0 | { |
993 | 0 | struct rlimit limit = { }; |
994 | 0 | int ret = 0; |
995 | |
|
996 | 0 | ret = getrlimit(RLIMIT_NOFILE, &limit); |
997 | |
|
998 | 0 | if (ret == 0 && |
999 | 0 | (limit.rlim_max < new_limit->rlim_max || |
1000 | 0 | limit.rlim_cur < new_limit->rlim_cur)) { |
1001 | 0 | ret = setrlimit(RLIMIT_NOFILE, new_limit); |
1002 | 0 | } |
1003 | |
|
1004 | 0 | if (old_limit) |
1005 | 0 | *old_limit = limit; |
1006 | |
|
1007 | 0 | return ret; |
1008 | 0 | } |
1009 | | |
1010 | | /* Computes the bounded poll() timeout based on the next expiration timer <next> |
1011 | | * by bounding it to MAX_DELAY_MS. <next> may equal TICK_ETERNITY. The pollers |
1012 | | * just needs to call this function right before polling to get their timeout |
1013 | | * value. Timeouts that are already expired (possibly due to a pending event) |
1014 | | * are accounted for in activity.poll_exp. |
1015 | | */ |
1016 | | int compute_poll_timeout(int next) |
1017 | 0 | { |
1018 | 0 | int wait_time; |
1019 | |
|
1020 | 0 | if (!tick_isset(next)) |
1021 | 0 | wait_time = MAX_DELAY_MS; |
1022 | 0 | else if (tick_is_expired(next, now_ms)) { |
1023 | 0 | activity[tid].poll_exp++; |
1024 | 0 | wait_time = 0; |
1025 | 0 | } |
1026 | 0 | else { |
1027 | 0 | wait_time = TICKS_TO_MS(tick_remain(now_ms, next)) + 1; |
1028 | 0 | if (wait_time > MAX_DELAY_MS) |
1029 | 0 | wait_time = MAX_DELAY_MS; |
1030 | 0 | } |
1031 | 0 | return wait_time; |
1032 | 0 | } |
1033 | | |
1034 | | /* Handle the return of the poller, which consists in calculating the idle |
1035 | | * time, saving a few clocks, marking the thread harmful again etc. All that |
1036 | | * is some boring stuff that all pollers have to do anyway. |
1037 | | */ |
1038 | | void fd_leaving_poll(int wait_time, int status) |
1039 | 0 | { |
1040 | 0 | clock_leaving_poll(wait_time, status); |
1041 | |
|
1042 | 0 | thread_harmless_end(); |
1043 | 0 | thread_idle_end(); |
1044 | |
|
1045 | 0 | _HA_ATOMIC_AND(&th_ctx->flags, ~TH_FL_SLEEPING); |
1046 | 0 | } |
1047 | | |
1048 | | /* disable the specified poller */ |
1049 | | void disable_poller(const char *poller_name) |
1050 | 0 | { |
1051 | 0 | int p; |
1052 | |
|
1053 | 0 | for (p = 0; p < nbpollers; p++) |
1054 | 0 | if (strcmp(pollers[p].name, poller_name) == 0) |
1055 | 0 | pollers[p].pref = 0; |
1056 | 0 | } |
1057 | | |
1058 | | void poller_pipe_io_handler(int fd) |
1059 | 0 | { |
1060 | 0 | char buf[1024]; |
1061 | | /* Flush the pipe */ |
1062 | 0 | while (read(fd, buf, sizeof(buf)) > 0); |
1063 | 0 | fd_cant_recv(fd); |
1064 | 0 | } |
1065 | | |
1066 | | /* allocate the per-thread fd_updt thus needs to be called early after |
1067 | | * thread creation. |
1068 | | */ |
1069 | | static int alloc_pollers_per_thread() |
1070 | 0 | { |
1071 | 0 | fd_updt = calloc(global.maxsock, sizeof(*fd_updt)); |
1072 | 0 | return fd_updt != NULL; |
1073 | 0 | } |
1074 | | |
1075 | | /* Initialize the pollers per thread.*/ |
1076 | | static int init_pollers_per_thread() |
1077 | 0 | { |
1078 | 0 | int mypipe[2]; |
1079 | |
|
1080 | 0 | if (pipe(mypipe) < 0) |
1081 | 0 | return 0; |
1082 | | |
1083 | 0 | poller_rd_pipe = mypipe[0]; |
1084 | 0 | poller_wr_pipe[tid] = mypipe[1]; |
1085 | 0 | fd_set_nonblock(poller_rd_pipe); |
1086 | 0 | fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tgid, ti->ltid_bit); |
1087 | 0 | fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tgid, ti->ltid_bit); |
1088 | 0 | fd_want_recv(poller_rd_pipe); |
1089 | 0 | fd_stop_both(poller_wr_pipe[tid]); |
1090 | 0 | return 1; |
1091 | 0 | } |
1092 | | |
1093 | | /* Deinitialize the pollers per thread */ |
1094 | | static void deinit_pollers_per_thread() |
1095 | 0 | { |
1096 | | /* rd and wr are init at the same place, but only rd is init to -1, so |
1097 | | we rely to rd to close. */ |
1098 | 0 | if (poller_rd_pipe > -1) { |
1099 | 0 | fd_delete(poller_rd_pipe); |
1100 | 0 | poller_rd_pipe = -1; |
1101 | 0 | fd_delete(poller_wr_pipe[tid]); |
1102 | 0 | poller_wr_pipe[tid] = -1; |
1103 | 0 | } |
1104 | 0 | } |
1105 | | |
1106 | | /* Release the pollers per thread, to be called late */ |
1107 | | static void free_pollers_per_thread() |
1108 | 0 | { |
1109 | 0 | fd_nbupdt = 0; |
1110 | 0 | ha_free(&fd_updt); |
1111 | 0 | } |
1112 | | |
1113 | | /* |
1114 | | * Initialize the pollers till the best one is found. |
1115 | | * If none works, returns 0, otherwise 1. |
1116 | | */ |
1117 | | int init_pollers() |
1118 | 0 | { |
1119 | 0 | int p; |
1120 | 0 | struct poller *bp; |
1121 | |
|
1122 | 0 | if ((fdtab_addr = calloc(global.maxsock, sizeof(*fdtab) + 64)) == NULL) { |
1123 | 0 | ha_alert("Not enough memory to allocate %d entries for fdtab!\n", global.maxsock); |
1124 | 0 | goto fail_tab; |
1125 | 0 | } |
1126 | | |
1127 | | /* always provide an aligned fdtab */ |
1128 | 0 | fdtab = (struct fdtab*)((((size_t)fdtab_addr) + 63) & -(size_t)64); |
1129 | |
|
1130 | 0 | if ((polled_mask = calloc(global.maxsock, sizeof(*polled_mask))) == NULL) { |
1131 | 0 | ha_alert("Not enough memory to allocate %d entries for polled_mask!\n", global.maxsock); |
1132 | 0 | goto fail_polledmask; |
1133 | 0 | } |
1134 | | |
1135 | 0 | if ((fdinfo = calloc(global.maxsock, sizeof(*fdinfo))) == NULL) { |
1136 | 0 | ha_alert("Not enough memory to allocate %d entries for fdinfo!\n", global.maxsock); |
1137 | 0 | goto fail_info; |
1138 | 0 | } |
1139 | | |
1140 | 0 | for (p = 0; p < MAX_TGROUPS; p++) |
1141 | 0 | update_list[p].first = update_list[p].last = -1; |
1142 | |
|
1143 | 0 | for (p = 0; p < global.maxsock; p++) { |
1144 | | /* Mark the fd as out of the fd cache */ |
1145 | 0 | fdtab[p].update.next = -3; |
1146 | 0 | } |
1147 | |
|
1148 | 0 | do { |
1149 | 0 | bp = NULL; |
1150 | 0 | for (p = 0; p < nbpollers; p++) |
1151 | 0 | if (!bp || (pollers[p].pref > bp->pref)) |
1152 | 0 | bp = &pollers[p]; |
1153 | |
|
1154 | 0 | if (!bp || bp->pref == 0) |
1155 | 0 | break; |
1156 | | |
1157 | 0 | if (bp->init(bp)) { |
1158 | 0 | memcpy(&cur_poller, bp, sizeof(*bp)); |
1159 | 0 | return 1; |
1160 | 0 | } |
1161 | 0 | } while (!bp || bp->pref == 0); |
1162 | | |
1163 | 0 | free(fdinfo); |
1164 | 0 | fail_info: |
1165 | 0 | free(polled_mask); |
1166 | 0 | fail_polledmask: |
1167 | 0 | free(fdtab_addr); |
1168 | 0 | fail_tab: |
1169 | 0 | return 0; |
1170 | 0 | } |
1171 | | |
1172 | | /* |
1173 | | * Deinitialize the pollers. |
1174 | | */ |
1175 | 0 | void deinit_pollers() { |
1176 | |
|
1177 | 0 | struct poller *bp; |
1178 | 0 | int p; |
1179 | |
|
1180 | 0 | for (p = 0; p < nbpollers; p++) { |
1181 | 0 | bp = &pollers[p]; |
1182 | |
|
1183 | 0 | if (bp && bp->pref) |
1184 | 0 | bp->term(bp); |
1185 | 0 | } |
1186 | |
|
1187 | 0 | ha_free(&fdinfo); |
1188 | 0 | ha_free(&fdtab_addr); |
1189 | 0 | ha_free(&polled_mask); |
1190 | 0 | } |
1191 | | |
1192 | | /* |
1193 | | * Lists the known pollers on <out>. |
1194 | | * Should be performed only before initialization. |
1195 | | */ |
1196 | | int list_pollers(FILE *out) |
1197 | 0 | { |
1198 | 0 | int p; |
1199 | 0 | int last, next; |
1200 | 0 | int usable; |
1201 | 0 | struct poller *bp; |
1202 | |
|
1203 | 0 | fprintf(out, "Available polling systems :\n"); |
1204 | |
|
1205 | 0 | usable = 0; |
1206 | 0 | bp = NULL; |
1207 | 0 | last = next = -1; |
1208 | 0 | while (1) { |
1209 | 0 | for (p = 0; p < nbpollers; p++) { |
1210 | 0 | if ((next < 0 || pollers[p].pref > next) |
1211 | 0 | && (last < 0 || pollers[p].pref < last)) { |
1212 | 0 | next = pollers[p].pref; |
1213 | 0 | if (!bp || (pollers[p].pref > bp->pref)) |
1214 | 0 | bp = &pollers[p]; |
1215 | 0 | } |
1216 | 0 | } |
1217 | |
|
1218 | 0 | if (next == -1) |
1219 | 0 | break; |
1220 | | |
1221 | 0 | for (p = 0; p < nbpollers; p++) { |
1222 | 0 | if (pollers[p].pref == next) { |
1223 | 0 | fprintf(out, " %10s : ", pollers[p].name); |
1224 | 0 | if (pollers[p].pref == 0) |
1225 | 0 | fprintf(out, "disabled, "); |
1226 | 0 | else |
1227 | 0 | fprintf(out, "pref=%3d, ", pollers[p].pref); |
1228 | 0 | if (pollers[p].test(&pollers[p])) { |
1229 | 0 | fprintf(out, " test result OK"); |
1230 | 0 | if (next > 0) |
1231 | 0 | usable++; |
1232 | 0 | } else { |
1233 | 0 | fprintf(out, " test result FAILED"); |
1234 | 0 | if (bp == &pollers[p]) |
1235 | 0 | bp = NULL; |
1236 | 0 | } |
1237 | 0 | fprintf(out, "\n"); |
1238 | 0 | } |
1239 | 0 | } |
1240 | 0 | last = next; |
1241 | 0 | next = -1; |
1242 | 0 | }; |
1243 | 0 | fprintf(out, "Total: %d (%d usable), will use %s.\n", nbpollers, usable, bp ? bp->name : "none"); |
1244 | 0 | return 0; |
1245 | 0 | } |
1246 | | |
1247 | | /* |
1248 | | * Some pollers may lose their connection after a fork(). It may be necessary |
1249 | | * to create initialize part of them again. Returns 0 in case of failure, |
1250 | | * otherwise 1. The fork() function may be NULL if unused. In case of error, |
1251 | | * the the current poller is destroyed and the caller is responsible for trying |
1252 | | * another one by calling init_pollers() again. |
1253 | | */ |
1254 | | int fork_poller() |
1255 | 0 | { |
1256 | 0 | int fd; |
1257 | 0 | for (fd = 0; fd < global.maxsock; fd++) { |
1258 | 0 | if (fdtab[fd].owner) { |
1259 | 0 | HA_ATOMIC_OR(&fdtab[fd].state, FD_CLONED); |
1260 | 0 | } |
1261 | 0 | } |
1262 | |
|
1263 | 0 | if (cur_poller.fork) { |
1264 | 0 | if (cur_poller.fork(&cur_poller)) |
1265 | 0 | return 1; |
1266 | 0 | cur_poller.term(&cur_poller); |
1267 | 0 | return 0; |
1268 | 0 | } |
1269 | 0 | return 1; |
1270 | 0 | } |
1271 | | |
1272 | | /* config parser for global "tune.fd.edge-triggered", accepts "on" or "off" */ |
1273 | | static int cfg_parse_tune_fd_edge_triggered(char **args, int section_type, struct proxy *curpx, |
1274 | | const struct proxy *defpx, const char *file, int line, |
1275 | | char **err) |
1276 | 0 | { |
1277 | 0 | if (too_many_args(1, args, err, NULL)) |
1278 | 0 | return -1; |
1279 | | |
1280 | 0 | if (strcmp(args[1], "on") == 0) |
1281 | 0 | global.tune.options |= GTUNE_FD_ET; |
1282 | 0 | else if (strcmp(args[1], "off") == 0) |
1283 | 0 | global.tune.options &= ~GTUNE_FD_ET; |
1284 | 0 | else { |
1285 | 0 | memprintf(err, "'%s' expects either 'on' or 'off' but got '%s'.", args[0], args[1]); |
1286 | 0 | return -1; |
1287 | 0 | } |
1288 | 0 | return 0; |
1289 | 0 | } |
1290 | | |
1291 | | /* config keyword parsers */ |
1292 | | static struct cfg_kw_list cfg_kws = {ILH, { |
1293 | | { CFG_GLOBAL, "tune.fd.edge-triggered", cfg_parse_tune_fd_edge_triggered, KWF_EXPERIMENTAL }, |
1294 | | { 0, NULL, NULL } |
1295 | | }}; |
1296 | | |
1297 | | INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); |
1298 | | |
1299 | | REGISTER_PER_THREAD_ALLOC(alloc_pollers_per_thread); |
1300 | | REGISTER_PER_THREAD_INIT(init_pollers_per_thread); |
1301 | | REGISTER_PER_THREAD_DEINIT(deinit_pollers_per_thread); |
1302 | | REGISTER_PER_THREAD_FREE(free_pollers_per_thread); |
1303 | | |
1304 | | /* |
1305 | | * Local variables: |
1306 | | * c-indent-level: 8 |
1307 | | * c-basic-offset: 8 |
1308 | | * End: |
1309 | | */ |