Line | Count | Source |
1 | | /* |
2 | | * Ring buffer management |
3 | | * |
4 | | * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu |
5 | | * |
6 | | * This library is free software; you can redistribute it and/or |
7 | | * modify it under the terms of the GNU Lesser General Public |
8 | | * License as published by the Free Software Foundation, version 2.1 |
9 | | * exclusively. |
10 | | * |
11 | | * This library is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 | | * Lesser General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU Lesser General Public |
17 | | * License along with this library; if not, write to the Free Software |
18 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
19 | | */ |
20 | | |
21 | | #include <stdlib.h> |
22 | | #include <haproxy/api.h> |
23 | | #include <haproxy/applet.h> |
24 | | #include <haproxy/buf.h> |
25 | | #include <haproxy/cfgparse.h> |
26 | | #include <haproxy/cli.h> |
27 | | #include <haproxy/ring.h> |
28 | | #include <haproxy/sc_strm.h> |
29 | | #include <haproxy/stconn.h> |
30 | | #include <haproxy/thread.h> |
31 | | #include <haproxy/vecpair.h> |
32 | | |
33 | | /* context used to dump the contents of a ring via "show events" or "show errors" */ |
34 | | struct show_ring_ctx { |
35 | | struct ring *ring; /* ring to be dumped */ |
36 | | size_t ofs; /* storage offset to restart from; ~0=oldest */ |
37 | | uint flags; /* set of RING_WF_* */ |
38 | | }; |
39 | | |
40 | | /* Initialize a pre-allocated ring with the buffer area of size <size>. |
41 | | * Makes the storage point to the indicated area and adjusts the declared |
42 | | * ring size according to the position of the area in the storage. If <reset> |
43 | | * is non-zero, the storage area is reset, otherwise it's left intact (except |
44 | | * for the area origin pointer which is updated so that the area can come from |
45 | | * an mmap()). |
46 | | */ |
47 | | void ring_init(struct ring *ring, void *area, size_t size, int reset) |
48 | 0 | { |
49 | 0 | MT_LIST_INIT(&ring->waiters); |
50 | 0 | ring->readers_count = 0; |
51 | 0 | ring->flags = 0; |
52 | 0 | ring->storage = area; |
53 | 0 | ring->pending = 0; |
54 | 0 | ring->waking = 0; |
55 | 0 | memset(&ring->queue, 0, sizeof(ring->queue)); |
56 | |
|
57 | 0 | if (reset) { |
58 | 0 | ring->storage->size = size - sizeof(*ring->storage); |
59 | 0 | ring->storage->rsvd = sizeof(*ring->storage); |
60 | 0 | ring->storage->head = 0; |
61 | 0 | ring->storage->tail = 0; |
62 | | |
63 | | /* write the initial RC byte */ |
64 | 0 | *ring->storage->area = 0; |
65 | 0 | ring->storage->tail = 1; |
66 | 0 | } |
67 | 0 | } |
68 | | |
69 | | /* Creates a ring and its storage area at address <area> for <size> bytes. |
70 | | * If <area> is null, then it's allocated of the requested size. The ring |
71 | | * storage struct is part of the area so the usable area is slightly reduced. |
72 | | * However the storage is immediately adjacent to the struct so that the ring |
73 | | * remains consistent on-disk. ring_free() will ignore such ring storages and |
74 | | * will only release the ring part, so the caller is responsible for releasing |
75 | | * them. If <reset> is non-zero, the storage area is reset, otherwise it's left |
76 | | * intact. |
77 | | */ |
78 | | struct ring *ring_make_from_area(void *area, size_t size, int reset) |
79 | 0 | { |
80 | 0 | struct ring *ring = NULL; |
81 | 0 | uint flags = 0; |
82 | |
|
83 | 0 | if (size < sizeof(*ring->storage) + 2) |
84 | 0 | return NULL; |
85 | | |
86 | 0 | ring = ha_aligned_alloc_typed(1, typeof(*ring)); |
87 | 0 | if (!ring) |
88 | 0 | goto fail; |
89 | | |
90 | 0 | if (!area) |
91 | 0 | area = ha_aligned_alloc(__alignof__(*ring->storage), size); |
92 | 0 | else |
93 | 0 | flags |= RING_FL_MAPPED; |
94 | |
|
95 | 0 | if (!area) |
96 | 0 | goto fail; |
97 | | |
98 | 0 | ring_init(ring, area, size, reset); |
99 | 0 | ring->flags |= flags; |
100 | 0 | return ring; |
101 | 0 | fail: |
102 | 0 | ha_aligned_free(ring); |
103 | 0 | return NULL; |
104 | 0 | } |
105 | | |
106 | | /* Creates and returns a ring buffer of size <size> bytes. Returns NULL on |
107 | | * allocation failure. The size is the area size, not the usable size. |
108 | | */ |
109 | | struct ring *ring_new(size_t size) |
110 | 0 | { |
111 | 0 | return ring_make_from_area(NULL, size, 1); |
112 | 0 | } |
113 | | |
114 | | /* Resizes existing ring <ring> to <size> which must be larger, without losing |
115 | | * its contents. The new size must be at least as large as the previous one or |
116 | | * no change will be performed. The pointer to the ring is returned on success, |
117 | | * or NULL on allocation failure. This will lock the ring for writes. The size |
118 | | * is the allocated area size, and includes the ring_storage header. |
119 | | */ |
120 | | struct ring *ring_resize(struct ring *ring, size_t size) |
121 | 0 | { |
122 | 0 | struct ring_storage *old, *new; |
123 | |
|
124 | 0 | if (size <= ring_data(ring) + sizeof(*ring->storage)) |
125 | 0 | return ring; |
126 | | |
127 | 0 | old = ring->storage; |
128 | 0 | new = ha_aligned_alloc(__alignof__(*ring->storage), size); |
129 | 0 | if (!new) |
130 | 0 | return NULL; |
131 | | |
132 | 0 | thread_isolate(); |
133 | | |
134 | | /* recheck the ring's size, it may have changed during the malloc */ |
135 | 0 | if (size > ring_data(ring) + sizeof(*ring->storage)) { |
136 | | /* copy old contents */ |
137 | 0 | struct ist v1, v2; |
138 | 0 | size_t len; |
139 | |
|
140 | 0 | vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail); |
141 | 0 | len = vp_size(v1, v2); |
142 | 0 | vp_peek_ofs(v1, v2, 0, new->area, len); |
143 | 0 | new->size = size - sizeof(*ring->storage); |
144 | 0 | new->rsvd = sizeof(*ring->storage); |
145 | 0 | new->head = 0; |
146 | 0 | new->tail = len; |
147 | 0 | new = HA_ATOMIC_XCHG(&ring->storage, new); |
148 | 0 | } |
149 | |
|
150 | 0 | thread_release(); |
151 | | |
152 | | /* free the unused one */ |
153 | 0 | ha_aligned_free(new); |
154 | 0 | return ring; |
155 | 0 | } |
156 | | |
157 | | /* destroys and frees ring <ring> */ |
158 | | void ring_free(struct ring *ring) |
159 | 0 | { |
160 | 0 | if (!ring) |
161 | 0 | return; |
162 | | |
163 | | /* make sure it was not allocated by ring_make_from_area */ |
164 | 0 | if (!(ring->flags & RING_FL_MAPPED)) |
165 | 0 | ha_aligned_free(ring->storage); |
166 | 0 | ha_aligned_free(ring); |
167 | 0 | } |
168 | | |
169 | | /* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg> |
170 | | * to ring <ring>. The message is sent atomically. It may be truncated to |
171 | | * <maxlen> bytes if <maxlen> is non-null. There is no distinction between the |
172 | | * two lists, it's just a convenience to help the caller prepend some prefixes |
173 | | * when necessary. It takes the ring's write lock to make sure no other thread |
174 | | * will touch the buffer during the update. Returns the number of bytes sent, |
175 | | * or <=0 on failure. |
176 | | */ |
177 | | ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) |
178 | 0 | { |
179 | 0 | struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr); |
180 | 0 | struct ring_wait_cell cell, *next_cell, *curr_cell; |
181 | 0 | size_t *tail_ptr = &ring->storage->tail; |
182 | 0 | size_t head_ofs, tail_ofs, new_tail_ofs; |
183 | 0 | size_t ring_size; |
184 | 0 | char *ring_area; |
185 | 0 | struct ist v1, v2; |
186 | 0 | size_t msglen = 0; |
187 | 0 | size_t lenlen; |
188 | 0 | size_t needed; |
189 | 0 | uint64_t dellen; |
190 | 0 | int dellenlen; |
191 | 0 | uint8_t *lock_ptr; |
192 | 0 | uint8_t readers; |
193 | 0 | ssize_t sent = 0; |
194 | 0 | int i; |
195 | | |
196 | | /* we have to find some room to add our message (the buffer is |
197 | | * never empty and at least contains the previous counter) and |
198 | | * to update both the buffer contents and heads at the same |
199 | | * time (it's doable using atomic ops but not worth the |
200 | | * trouble, let's just lock). For this we first need to know |
201 | | * the total message's length. We cannot measure it while |
202 | | * copying due to the varint encoding of the length. |
203 | | */ |
204 | 0 | for (i = 0; i < npfx; i++) |
205 | 0 | msglen += pfx[i].len; |
206 | 0 | for (i = 0; i < nmsg; i++) |
207 | 0 | msglen += msg[i].len; |
208 | |
|
209 | 0 | if (msglen > maxlen) |
210 | 0 | msglen = maxlen; |
211 | |
|
212 | 0 | lenlen = varint_bytes(msglen); |
213 | | |
214 | | /* We need: |
215 | | * - lenlen bytes for the size encoding |
216 | | * - msglen for the message |
217 | | * - one byte for the new marker |
218 | | * |
219 | | * Note that we'll also reserve one extra byte to make sure we never |
220 | | * leave a full buffer (the vec-to-ring conversion cannot be done if |
221 | | * both areas are of size 0). |
222 | | */ |
223 | 0 | needed = lenlen + msglen + 1; |
224 | | |
225 | | /* these ones do not change under us (only resize affects them and it |
226 | | * must be done under thread isolation). |
227 | | */ |
228 | 0 | ring_area = ring->storage->area; |
229 | 0 | ring_size = ring->storage->size; |
230 | |
|
231 | 0 | if (needed + 1 > ring_size) |
232 | 0 | goto leave; |
233 | | |
234 | 0 | cell.to_send_self = needed; |
235 | 0 | cell.needed_tot = 0; // only when non-zero the cell is considered ready. |
236 | 0 | cell.maxlen = msglen; |
237 | 0 | cell.pfx = pfx; |
238 | 0 | cell.npfx = npfx; |
239 | 0 | cell.msg = msg; |
240 | 0 | cell.nmsg = nmsg; |
241 | | |
242 | | /* insert our cell into the queue before the previous one. We may have |
243 | | * to wait a bit if the queue's leader is attempting an election to win |
244 | | * the tail, hence the busy value (should be rare enough). |
245 | | */ |
246 | 0 | next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell); |
247 | | |
248 | | /* let's add the cumulated size of pending messages to ours */ |
249 | 0 | cell.next = next_cell; |
250 | 0 | if (next_cell) { |
251 | 0 | size_t next_needed; |
252 | |
|
253 | 0 | while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0) |
254 | 0 | __ha_cpu_relax_for_read(); |
255 | 0 | needed += next_needed; |
256 | 0 | } |
257 | | |
258 | | /* now <needed> will represent the size to store *all* messages. The |
259 | | * atomic store may unlock a subsequent thread waiting for this one. |
260 | | */ |
261 | 0 | HA_ATOMIC_STORE(&cell.needed_tot, needed); |
262 | | |
263 | | /* OK now we're the queue leader, it's our job to try to get ownership |
264 | | * of the tail, if we succeeded above, we don't even enter the loop. If |
265 | | * we failed, we set ourselves at the top the queue, waiting for the |
266 | | * tail to be unlocked again. We stop doing that if another thread |
267 | | * comes in and becomes the leader in turn. |
268 | | */ |
269 | | |
270 | | /* Wait for another thread to take the lead or for the tail to |
271 | | * be available again. It's critical to be read-only in this |
272 | | * loop so as not to lose time synchronizing cache lines. Also, |
273 | | * we must detect a new leader ASAP so that the fewest possible |
274 | | * threads check the tail. |
275 | | */ |
276 | |
|
277 | 0 | tail_ofs = 0; |
278 | 0 | while (1) { |
279 | 0 | #if defined(__x86_64__) |
280 | | /* read using a CAS on x86, as it will keep the cache line |
281 | | * in exclusive state for a few more cycles that will allow |
282 | | * us to release the queue without waiting after the loop. |
283 | | */ |
284 | 0 | curr_cell = &cell; |
285 | 0 | HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, curr_cell); |
286 | | #else |
287 | | curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr); |
288 | | #endif |
289 | | /* give up if another thread took the leadership of the queue */ |
290 | 0 | if (curr_cell != &cell) |
291 | 0 | goto wait_for_flush; |
292 | | |
293 | | /* OK the queue is locked, let's attempt to get the tail lock. |
294 | | * we'll atomically OR the lock on the pointer and check if |
295 | | * someone else had it already, otherwise we own it. |
296 | | */ |
297 | | |
298 | | #if defined(__ARM_FEATURE_ATOMICS) |
299 | | /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */ |
300 | | tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK); |
301 | | if (!(tail_ofs & RING_TAIL_LOCK)) |
302 | | break; |
303 | | #else |
304 | 0 | if (HA_ATOMIC_CAS(tail_ptr, &tail_ofs, tail_ofs | RING_TAIL_LOCK)) |
305 | 0 | break; |
306 | 0 | tail_ofs &= ~RING_TAIL_LOCK; |
307 | 0 | #endif |
308 | 0 | __ha_cpu_relax(); |
309 | 0 | } |
310 | | |
311 | | /* Here we own the tail. We can go on if we're still the leader, |
312 | | * which we'll confirm by trying to reset the queue. If we're |
313 | | * still the leader, we're done. |
314 | | */ |
315 | 0 | if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) { |
316 | | /* oops, no, let's give it back to another thread and wait. |
317 | | * This does not happen often enough to warrant more complex |
318 | | * approaches (tried already). |
319 | | */ |
320 | 0 | HA_ATOMIC_STORE(tail_ptr, tail_ofs); |
321 | 0 | goto wait_for_flush; |
322 | 0 | } |
323 | | |
324 | 0 | head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); |
325 | | |
326 | | /* this is the byte before tail, it contains the users count */ |
327 | 0 | lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1); |
328 | | |
329 | | /* Take the lock on the area. We're guaranteed to be the only writer |
330 | | * here. |
331 | | */ |
332 | 0 | readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE); |
333 | |
|
334 | 0 | vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); |
335 | |
|
336 | 0 | while (vp_size(v1, v2) + needed + 1 + 1 > ring_size) { |
337 | | /* we need to delete the oldest message (from the end), |
338 | | * and we have to stop if there's a reader stuck there. |
339 | | * Unless there's corruption in the buffer it's guaranteed |
340 | | * that we have enough data to find 1 counter byte, a |
341 | | * varint-encoded length (1 byte min) and the message |
342 | | * payload (0 bytes min). |
343 | | */ |
344 | 0 | if (*_vp_head(v1, v2)) |
345 | 0 | break; |
346 | 0 | dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen); |
347 | 0 | if (!dellenlen) |
348 | 0 | break; |
349 | 0 | BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen); |
350 | 0 | vp_skip(&v1, &v2, 1 + dellenlen + dellen); |
351 | 0 | } |
352 | | |
353 | | /* now let's update the buffer with the new tail if our message will fit */ |
354 | 0 | new_tail_ofs = tail_ofs; |
355 | 0 | if (vp_size(v1, v2) + needed + 1 + 1 <= ring_size) { |
356 | 0 | vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); |
357 | | |
358 | | /* update the new space in the buffer */ |
359 | 0 | HA_ATOMIC_STORE(&ring->storage->head, head_ofs); |
360 | | |
361 | | /* calculate next tail pointer */ |
362 | 0 | new_tail_ofs += needed; |
363 | 0 | if (new_tail_ofs >= ring_size) |
364 | 0 | new_tail_ofs -= ring_size; |
365 | | |
366 | | /* reset next read counter before releasing writers */ |
367 | 0 | HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0); |
368 | 0 | } |
369 | 0 | else { |
370 | | /* release readers right now, before writing the tail, so as |
371 | | * not to expose the readers count byte to another writer. |
372 | | */ |
373 | 0 | HA_ATOMIC_STORE(lock_ptr, readers); |
374 | 0 | } |
375 | | |
376 | | /* and release other writers */ |
377 | 0 | HA_ATOMIC_STORE(tail_ptr, new_tail_ofs); |
378 | |
|
379 | 0 | vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); |
380 | |
|
381 | 0 | if (likely(tail_ofs != new_tail_ofs)) { |
382 | | /* the list stops on a NULL */ |
383 | 0 | for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) { |
384 | 0 | maxlen = curr_cell->maxlen; |
385 | 0 | pfx = curr_cell->pfx; |
386 | 0 | npfx = curr_cell->npfx; |
387 | 0 | msg = curr_cell->msg; |
388 | 0 | nmsg = curr_cell->nmsg; |
389 | | |
390 | | /* let's write the message size */ |
391 | 0 | vp_put_varint(&v1, &v2, maxlen); |
392 | | |
393 | | /* then write the messages */ |
394 | 0 | msglen = 0; |
395 | 0 | for (i = 0; i < npfx; i++) { |
396 | 0 | size_t len = pfx[i].len; |
397 | |
|
398 | 0 | if (len + msglen > maxlen) |
399 | 0 | len = maxlen - msglen; |
400 | 0 | if (len) |
401 | 0 | vp_putblk(&v1, &v2, pfx[i].ptr, len); |
402 | 0 | msglen += len; |
403 | 0 | } |
404 | |
|
405 | 0 | for (i = 0; i < nmsg; i++) { |
406 | 0 | size_t len = msg[i].len; |
407 | |
|
408 | 0 | if (len + msglen > maxlen) |
409 | 0 | len = maxlen - msglen; |
410 | 0 | if (len) |
411 | 0 | vp_putblk(&v1, &v2, msg[i].ptr, len); |
412 | 0 | msglen += len; |
413 | 0 | } |
414 | | |
415 | | /* for all but the last message we need to write the |
416 | | * readers count byte. |
417 | | */ |
418 | 0 | if (curr_cell->next) |
419 | 0 | vp_putchr(&v1, &v2, 0); |
420 | 0 | } |
421 | | |
422 | | /* now release */ |
423 | 0 | for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { |
424 | 0 | next_cell = HA_ATOMIC_LOAD(&curr_cell->next); |
425 | 0 | _HA_ATOMIC_STORE(&curr_cell->next, curr_cell); |
426 | 0 | } |
427 | | |
428 | | /* unlock the message area */ |
429 | 0 | HA_ATOMIC_STORE(lock_ptr, readers); |
430 | 0 | } else { |
431 | | /* messages were dropped, notify about this and release them */ |
432 | 0 | for (curr_cell = &cell; curr_cell; curr_cell = next_cell) { |
433 | 0 | next_cell = HA_ATOMIC_LOAD(&curr_cell->next); |
434 | 0 | HA_ATOMIC_STORE(&curr_cell->to_send_self, 0); |
435 | 0 | _HA_ATOMIC_STORE(&curr_cell->next, curr_cell); |
436 | 0 | } |
437 | 0 | } |
438 | | |
439 | | /* we must not write the trailing read counter, it was already done, |
440 | | * plus we could ruin the one of the next writer. And the front was |
441 | | * unlocked either at the top if the ring was full, or just above if it |
442 | | * could be properly filled. |
443 | | */ |
444 | |
|
445 | 0 | sent = cell.to_send_self; |
446 | | |
447 | | /* notify potential readers */ |
448 | 0 | if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) { |
449 | 0 | HA_ATOMIC_INC(&ring->pending); |
450 | 0 | while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) { |
451 | 0 | struct mt_list back; |
452 | 0 | struct appctx *appctx; |
453 | |
|
454 | 0 | HA_ATOMIC_STORE(&ring->pending, 0); |
455 | 0 | MT_LIST_FOR_EACH_ENTRY_LOCKED(appctx, &ring->waiters, wait_entry, back) |
456 | 0 | appctx_wakeup(appctx); |
457 | 0 | HA_ATOMIC_STORE(&ring->waking, 0); |
458 | 0 | } |
459 | 0 | } |
460 | |
|
461 | 0 | leave: |
462 | 0 | return sent; |
463 | | |
464 | 0 | wait_for_flush: |
465 | | /* if we arrive here, it means we found another leader */ |
466 | | |
467 | | /* The leader will write our own pointer in the cell's next to |
468 | | * mark it as released. Let's wait for this. |
469 | | */ |
470 | 0 | do { |
471 | 0 | next_cell = HA_ATOMIC_LOAD(&cell.next); |
472 | 0 | } while (next_cell != &cell && __ha_cpu_relax()); |
473 | | |
474 | | /* OK our message was queued. Retrieving the sent size in the ring cell |
475 | | * allows another leader thread to zero it if it finally couldn't send |
476 | | * it (should only happen when using too small ring buffers to store |
477 | | * all competing threads' messages at once). |
478 | | */ |
479 | 0 | return HA_ATOMIC_LOAD(&cell.to_send_self); |
480 | 0 | } |
481 | | |
482 | | /* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is |
483 | | * meant to be used by low level appctx code such as CLI or ring forwarding. |
484 | | * For higher level functions, please see the relevant parts in appctx or CLI. |
485 | | * It returns non-zero on success or zero on failure if too many users are |
486 | | * already attached. On success, the caller MUST call ring_detach_appctx() |
487 | | * to detach itself, even if it was never woken up. |
488 | | */ |
489 | | int ring_attach(struct ring *ring) |
490 | 0 | { |
491 | 0 | int users = ring->readers_count; |
492 | |
|
493 | 0 | do { |
494 | 0 | if (users >= RING_MAX_READERS) |
495 | 0 | return 0; |
496 | 0 | } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1)); |
497 | 0 | return 1; |
498 | 0 | } |
499 | | |
500 | | /* detach an appctx from a ring. The appctx is expected to be waiting at offset |
501 | | * <ofs> relative to the beginning of the storage, or ~0 if not waiting yet. |
502 | | * Nothing is done if <ring> is NULL. |
503 | | */ |
504 | | void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) |
505 | 0 | { |
506 | 0 | if (!ring) |
507 | 0 | return; |
508 | | |
509 | 0 | HA_ATOMIC_DEC(&ring->readers_count); |
510 | |
|
511 | 0 | if (ofs != ~0) { |
512 | | /* reader was still attached */ |
513 | 0 | uint8_t *area = (uint8_t *)ring_area(ring); |
514 | 0 | uint8_t readers; |
515 | |
|
516 | 0 | BUG_ON(ofs >= ring_size(ring)); |
517 | 0 | MT_LIST_DELETE(&appctx->wait_entry); |
518 | | |
519 | | /* dec readers count */ |
520 | 0 | do { |
521 | 0 | readers = _HA_ATOMIC_LOAD(area + ofs); |
522 | 0 | } while ((readers > RING_MAX_READERS || |
523 | 0 | !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax()); |
524 | 0 | } |
525 | 0 | } |
526 | | |
527 | | /* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is |
528 | | * meant to be used when registering a CLI function to dump a buffer, so it |
529 | | * returns zero on success, or non-zero on failure with a message in the appctx |
530 | | * CLI context. It automatically sets the io_handler and io_release callbacks if |
531 | | * they were not set. The <flags> take a combination of RING_WF_*. |
532 | | */ |
533 | | int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) |
534 | 0 | { |
535 | 0 | struct show_ring_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx)); |
536 | |
|
537 | 0 | if (!ring_attach(ring)) |
538 | 0 | return cli_err(appctx, |
539 | 0 | "Sorry, too many watchers (" TOSTR(RING_MAX_READERS) ") on this ring buffer. " |
540 | 0 | "What could it have so interesting to attract so many watchers ?"); |
541 | | |
542 | 0 | if (!appctx->cli_ctx.io_handler) |
543 | 0 | appctx->cli_ctx.io_handler = cli_io_handler_show_ring; |
544 | 0 | if (!appctx->cli_ctx.io_release) |
545 | 0 | appctx->cli_ctx.io_release = cli_io_release_show_ring; |
546 | |
|
547 | 0 | memset(ctx, 0, sizeof(*ctx)); |
548 | 0 | ctx->ring = ring; |
549 | 0 | ctx->ofs = ~0; // start from the oldest event |
550 | 0 | ctx->flags = flags; |
551 | 0 | return 0; |
552 | 0 | } |
553 | | |
554 | | |
555 | | /* parses as many messages as possible from ring <ring>, starting at the offset |
556 | | * stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to |
557 | | * the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of |
558 | | * the last known tail pointer will be copied there so that the caller may use |
559 | | * this to detect new data have arrived since we left the function. Returns 0 |
560 | | * if it needs to pause, 1 once finished. |
561 | | * |
562 | | * If <processed> is not NULL, it will be set to the number of messages |
563 | | * processed by the function (even when the function returns 0) |
564 | | */ |
565 | | int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, |
566 | | ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len, char delim), |
567 | | char delim, |
568 | | size_t *processed) |
569 | 0 | { |
570 | 0 | size_t head_ofs, tail_ofs, prev_ofs; |
571 | 0 | size_t ring_size; |
572 | 0 | uint8_t *ring_area; |
573 | 0 | struct ist v1, v2; |
574 | 0 | uint64_t msg_len; |
575 | 0 | size_t len, cnt; |
576 | 0 | size_t msg_count = 0; |
577 | 0 | ssize_t copied; |
578 | 0 | uint8_t readers; |
579 | 0 | int ret; |
580 | |
|
581 | 0 | ring_area = (uint8_t *)ring->storage->area; |
582 | 0 | ring_size = ring->storage->size; |
583 | | |
584 | | /* explanation for the initialization below: it would be better to do |
585 | | * this in the parsing function but this would occasionally result in |
586 | | * dropped events because we'd take a reference on the oldest message |
587 | | * and keep it while being scheduled. Thus instead let's take it the |
588 | | * first time we enter here so that we have a chance to pass many |
589 | | * existing messages before grabbing a reference to a location. This |
590 | | * value cannot be produced after initialization. The first offset |
591 | | * needs to be taken under isolation as it must not move while we're |
592 | | * trying to catch it. |
593 | | */ |
594 | 0 | if (unlikely(*ofs_ptr == ~0)) { |
595 | 0 | thread_isolate(); |
596 | |
|
597 | 0 | head_ofs = HA_ATOMIC_LOAD(&ring->storage->head); |
598 | 0 | tail_ofs = ring_tail(ring); |
599 | |
|
600 | 0 | if (flags & RING_WF_SEEK_NEW) { |
601 | | /* going to the end means looking at tail-1 */ |
602 | 0 | head_ofs = tail_ofs + ring_size - 1; |
603 | 0 | if (head_ofs >= ring_size) |
604 | 0 | head_ofs -= ring_size; |
605 | 0 | } |
606 | | |
607 | | /* reserve our slot here (inc readers count) */ |
608 | 0 | do { |
609 | 0 | readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); |
610 | 0 | } while ((readers > RING_MAX_READERS || |
611 | 0 | !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); |
612 | |
|
613 | 0 | thread_release(); |
614 | | |
615 | | /* store this precious offset in our context, and we're done */ |
616 | 0 | *ofs_ptr = head_ofs; |
617 | 0 | } |
618 | | |
619 | | /* we have the guarantee we can restart from our own head */ |
620 | 0 | head_ofs = *ofs_ptr; |
621 | 0 | BUG_ON(head_ofs >= ring_size); |
622 | | |
623 | | /* the tail will continue to move but we're getting a safe value |
624 | | * here that will continue to work. |
625 | | */ |
626 | 0 | tail_ofs = ring_tail(ring); |
627 | | |
628 | | /* we keep track of where we were and we don't release it before |
629 | | * we've protected the next place. |
630 | | */ |
631 | 0 | prev_ofs = head_ofs; |
632 | | |
633 | | /* in this loop, head_ofs always points to the counter byte that precedes |
634 | | * the message so that we can take our reference there if we have to |
635 | | * stop before the end (ret=0). The reference is relative to the ring's |
636 | | * origin, while pos is relative to the ring's head. |
637 | | */ |
638 | 0 | ret = 1; |
639 | 0 | vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs); |
640 | |
|
641 | 0 | while (1) { |
642 | 0 | if (vp_size(v1, v2) <= 1) { |
643 | | /* no more data */ |
644 | 0 | break; |
645 | 0 | } |
646 | | |
647 | 0 | readers = _HA_ATOMIC_LOAD(_vp_addr(v1, v2, 0)); |
648 | 0 | if (readers > RING_MAX_READERS) { |
649 | | /* we just met a writer which hasn't finished */ |
650 | 0 | break; |
651 | 0 | } |
652 | | |
653 | 0 | cnt = 1; |
654 | 0 | len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len); |
655 | 0 | if (!len) |
656 | 0 | break; |
657 | 0 | cnt += len; |
658 | |
|
659 | 0 | BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2)); |
660 | |
|
661 | 0 | copied = msg_handler(ctx, v1, v2, cnt, msg_len, delim); |
662 | 0 | if (copied == -2) { |
663 | | /* too large a message to ever fit, let's skip it */ |
664 | 0 | goto skip; |
665 | 0 | } |
666 | 0 | else if (copied == -1) { |
667 | | /* output full */ |
668 | 0 | ret = 0; |
669 | 0 | break; |
670 | 0 | } |
671 | 0 | skip: |
672 | 0 | msg_count += 1; |
673 | 0 | vp_skip(&v1, &v2, cnt + msg_len); |
674 | 0 | } |
675 | | |
676 | 0 | vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); |
677 | |
|
678 | 0 | if (head_ofs != prev_ofs) { |
679 | | /* inc readers count on new place */ |
680 | 0 | do { |
681 | 0 | readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); |
682 | 0 | } while ((readers > RING_MAX_READERS || |
683 | 0 | !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); |
684 | | |
685 | | /* dec readers count on old place */ |
686 | 0 | do { |
687 | 0 | readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs); |
688 | 0 | } while ((readers > RING_MAX_READERS || |
689 | 0 | !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax()); |
690 | 0 | } |
691 | |
|
692 | 0 | if (last_ofs_ptr) |
693 | 0 | *last_ofs_ptr = tail_ofs; |
694 | 0 | *ofs_ptr = head_ofs; |
695 | 0 | if (processed) |
696 | 0 | *processed = msg_count; |
697 | 0 | return ret; |
698 | 0 | } |
699 | | |
700 | | /* This function dumps all events from the ring whose pointer is in <p0> into |
701 | | * the appctx's output buffer, and takes from <o0> the seek offset into the |
702 | | * buffer's history (0 for oldest known event). It looks at <i0> for boolean |
703 | | * options: bit0 means it must wait for new data or any key to be pressed. Bit1 |
704 | | * means it must seek directly to the end to wait for new contents. It returns |
705 | | * 0 if the output buffer or events are missing is full and it needs to be |
706 | | * called again, otherwise non-zero. It is meant to be used with |
707 | | * cli_release_show_ring() to clean up. |
708 | | */ |
709 | | int cli_io_handler_show_ring(struct appctx *appctx) |
710 | 0 | { |
711 | 0 | struct show_ring_ctx *ctx = appctx->svcctx; |
712 | 0 | struct ring *ring = ctx->ring; |
713 | 0 | size_t last_ofs; |
714 | 0 | size_t ofs; |
715 | 0 | int ret; |
716 | |
|
717 | 0 | MT_LIST_DELETE(&appctx->wait_entry); |
718 | |
|
719 | 0 | ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line, |
720 | 0 | (ctx->flags & RING_WF_END_ZERO) ? 0 : '\n', NULL); |
721 | |
|
722 | 0 | if (ret && (ctx->flags & RING_WF_WAIT_MODE)) { |
723 | | /* we've drained everything and are configured to wait for more |
724 | | * data or an event (keypress, close) |
725 | | */ |
726 | 0 | if (!b_data(&appctx->inbuf) && !se_fl_test(appctx->sedesc, SE_FL_SHW)) { |
727 | | /* let's be woken up once new data arrive */ |
728 | 0 | MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry); |
729 | 0 | ofs = ring_tail(ring); |
730 | 0 | if (ofs != last_ofs) { |
731 | | /* more data was added into the ring between the |
732 | | * unlock and the lock, and the writer might not |
733 | | * have seen us. We need to reschedule a read. |
734 | | */ |
735 | 0 | applet_have_more_data(appctx); |
736 | 0 | } else |
737 | 0 | applet_have_no_more_data(appctx); |
738 | 0 | ret = 0; |
739 | 0 | } |
740 | | /* always drain all the request */ |
741 | 0 | b_reset(&appctx->inbuf); |
742 | 0 | applet_fl_clr(appctx, APPCTX_FL_INBLK_FULL); |
743 | 0 | } |
744 | |
|
745 | 0 | applet_will_consume(appctx); |
746 | 0 | applet_expect_no_data(appctx); |
747 | 0 | return ret; |
748 | 0 | } |
749 | | |
750 | | /* must be called after cli_io_handler_show_ring() above */ |
751 | | void cli_io_release_show_ring(struct appctx *appctx) |
752 | 0 | { |
753 | 0 | struct show_ring_ctx *ctx = appctx->svcctx; |
754 | 0 | struct ring *ring = ctx->ring; |
755 | 0 | size_t ofs = ctx->ofs; |
756 | |
|
757 | 0 | ring_detach_appctx(ring, appctx, ofs); |
758 | 0 | } |
759 | | |
760 | | /* Returns the MAXIMUM payload len that could theoretically fit into the ring |
761 | | * based on ring buffer size. |
762 | | * |
763 | | * Computation logic relies on implementation details from 'ring-t.h'. |
764 | | */ |
765 | | size_t ring_max_payload(const struct ring *ring) |
766 | 0 | { |
767 | 0 | size_t max; |
768 | | |
769 | | /* initial max = bufsize - 1 (initial RC) - 1 (payload RC) */ |
770 | 0 | max = ring_size(ring) - 1 - 1; |
771 | | |
772 | | /* subtract payload VI (varint-encoded size) */ |
773 | 0 | max -= varint_bytes(max); |
774 | 0 | return max; |
775 | 0 | } |
776 | | |
777 | | /* config parser for global "tune.ring.queues", accepts a number from 0 to RING_WAIT_QUEUES */ |
778 | | static int cfg_parse_tune_ring_queues(char **args, int section_type, struct proxy *curpx, |
779 | | const struct proxy *defpx, const char *file, int line, |
780 | | char **err) |
781 | 0 | { |
782 | 0 | int queues; |
783 | |
|
784 | 0 | if (too_many_args(1, args, err, NULL)) |
785 | 0 | return -1; |
786 | | |
787 | 0 | queues = atoi(args[1]); |
788 | 0 | if (queues < 0 || queues > RING_WAIT_QUEUES) { |
789 | 0 | memprintf(err, "'%s' expects a number between 0 and %d but got '%s'.", args[0], RING_WAIT_QUEUES, args[1]); |
790 | 0 | return -1; |
791 | 0 | } |
792 | | |
793 | 0 | global.tune.ring_queues = queues; |
794 | 0 | return 0; |
795 | 0 | } |
796 | | |
797 | | /* config keyword parsers */ |
798 | | static struct cfg_kw_list cfg_kws = {ILH, { |
799 | | { CFG_GLOBAL, "tune.ring.queues", cfg_parse_tune_ring_queues }, |
800 | | { 0, NULL, NULL } |
801 | | }}; |
802 | | |
803 | | INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); |
804 | | |
805 | | /* |
806 | | * Local variables: |
807 | | * c-indent-level: 8 |
808 | | * c-basic-offset: 8 |
809 | | * End: |
810 | | */ |