Coverage Report

Created: 2025-11-24 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/haproxy/src/ring.c
Line
Count
Source
1
/*
2
 * Ring buffer management
3
 *
4
 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5
 *
6
 * This library is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU Lesser General Public
8
 * License as published by the Free Software Foundation, version 2.1
9
 * exclusively.
10
 *
11
 * This library is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
 * Lesser General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
#include <stdlib.h>
22
#include <haproxy/api.h>
23
#include <haproxy/applet.h>
24
#include <haproxy/buf.h>
25
#include <haproxy/cfgparse.h>
26
#include <haproxy/cli.h>
27
#include <haproxy/ring.h>
28
#include <haproxy/sc_strm.h>
29
#include <haproxy/stconn.h>
30
#include <haproxy/thread.h>
31
#include <haproxy/vecpair.h>
32
33
/* context used to dump the contents of a ring via "show events" or "show errors" */
34
struct show_ring_ctx {
35
  struct ring *ring; /* ring to be dumped */
36
  size_t ofs;        /* storage offset to restart from; ~0=oldest */
37
  uint flags;        /* set of RING_WF_* */
38
};
39
40
/* Initialize a pre-allocated ring with the buffer area of size <size>.
41
 * Makes the storage point to the indicated area and adjusts the declared
42
 * ring size according to the position of the area in the storage. If <reset>
43
 * is non-zero, the storage area is reset, otherwise it's left intact (except
44
 * for the area origin pointer which is updated so that the area can come from
45
 * an mmap()).
46
 */
47
void ring_init(struct ring *ring, void *area, size_t size, int reset)
48
0
{
49
0
  MT_LIST_INIT(&ring->waiters);
50
0
  ring->readers_count = 0;
51
0
  ring->flags = 0;
52
0
  ring->storage = area;
53
0
  ring->pending = 0;
54
0
  ring->waking = 0;
55
0
  memset(&ring->queue, 0, sizeof(ring->queue));
56
57
0
  if (reset) {
58
0
    ring->storage->size = size - sizeof(*ring->storage);
59
0
    ring->storage->rsvd = sizeof(*ring->storage);
60
0
    ring->storage->head = 0;
61
0
    ring->storage->tail = 0;
62
63
    /* write the initial RC byte */
64
0
    *ring->storage->area = 0;
65
0
    ring->storage->tail = 1;
66
0
  }
67
0
}
68
69
/* Creates a ring and its storage area at address <area> for <size> bytes.
70
 * If <area> is null, then it's allocated of the requested size. The ring
71
 * storage struct is part of the area so the usable area is slightly reduced.
72
 * However the storage is immediately adjacent to the struct so that the ring
73
 * remains consistent on-disk. ring_free() will ignore such ring storages and
74
 * will only release the ring part, so the caller is responsible for releasing
75
 * them. If <reset> is non-zero, the storage area is reset, otherwise it's left
76
 * intact.
77
 */
78
struct ring *ring_make_from_area(void *area, size_t size, int reset)
79
0
{
80
0
  struct ring *ring = NULL;
81
0
  uint flags = 0;
82
83
0
  if (size < sizeof(*ring->storage) + 2)
84
0
    return NULL;
85
86
0
  ring = ha_aligned_alloc_typed(1, typeof(*ring));
87
0
  if (!ring)
88
0
    goto fail;
89
90
0
  if (!area)
91
0
    area = ha_aligned_alloc(__alignof__(*ring->storage), size);
92
0
  else
93
0
    flags |= RING_FL_MAPPED;
94
95
0
  if (!area)
96
0
    goto fail;
97
98
0
  ring_init(ring, area, size, reset);
99
0
  ring->flags |= flags;
100
0
  return ring;
101
0
 fail:
102
0
  ha_aligned_free(ring);
103
0
  return NULL;
104
0
}
105
106
/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
107
 * allocation failure. The size is the area size, not the usable size.
108
 */
109
struct ring *ring_new(size_t size)
110
0
{
111
0
  return ring_make_from_area(NULL, size, 1);
112
0
}
113
114
/* Resizes existing ring <ring> to <size> which must be larger, without losing
115
 * its contents. The new size must be at least as large as the previous one or
116
 * no change will be performed. The pointer to the ring is returned on success,
117
 * or NULL on allocation failure. This will lock the ring for writes. The size
118
 * is the allocated area size, and includes the ring_storage header.
119
 */
120
struct ring *ring_resize(struct ring *ring, size_t size)
121
0
{
122
0
  struct ring_storage *old, *new;
123
124
0
  if (size <= ring_data(ring) + sizeof(*ring->storage))
125
0
    return ring;
126
127
0
  old = ring->storage;
128
0
  new = ha_aligned_alloc(__alignof__(*ring->storage), size);
129
0
  if (!new)
130
0
    return NULL;
131
132
0
  thread_isolate();
133
134
  /* recheck the ring's size, it may have changed during the malloc */
135
0
  if (size > ring_data(ring) + sizeof(*ring->storage)) {
136
    /* copy old contents */
137
0
    struct ist v1, v2;
138
0
    size_t len;
139
140
0
    vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail);
141
0
    len = vp_size(v1, v2);
142
0
    vp_peek_ofs(v1, v2, 0, new->area, len);
143
0
    new->size = size - sizeof(*ring->storage);
144
0
    new->rsvd = sizeof(*ring->storage);
145
0
    new->head = 0;
146
0
    new->tail = len;
147
0
    new = HA_ATOMIC_XCHG(&ring->storage, new);
148
0
  }
149
150
0
  thread_release();
151
152
  /* free the unused one */
153
0
  ha_aligned_free(new);
154
0
  return ring;
155
0
}
156
157
/* destroys and frees ring <ring> */
158
void ring_free(struct ring *ring)
159
0
{
160
0
  if (!ring)
161
0
    return;
162
163
  /* make sure it was not allocated by ring_make_from_area */
164
0
  if (!(ring->flags & RING_FL_MAPPED))
165
0
    ha_aligned_free(ring->storage);
166
0
  ha_aligned_free(ring);
167
0
}
168
169
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
170
 * to ring <ring>. The message is sent atomically. It may be truncated to
171
 * <maxlen> bytes if <maxlen> is non-null. There is no distinction between the
172
 * two lists, it's just a convenience to help the caller prepend some prefixes
173
 * when necessary. It takes the ring's write lock to make sure no other thread
174
 * will touch the buffer during the update. Returns the number of bytes sent,
175
 * or <=0 on failure.
176
 */
177
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
178
0
{
179
0
  struct ring_wait_cell **ring_queue_ptr = DISGUISE(&ring->queue[ti->ring_queue].ptr);
180
0
  struct ring_wait_cell cell, *next_cell, *curr_cell;
181
0
  size_t *tail_ptr = &ring->storage->tail;
182
0
  size_t head_ofs, tail_ofs, new_tail_ofs;
183
0
  size_t ring_size;
184
0
  char *ring_area;
185
0
  struct ist v1, v2;
186
0
  size_t msglen = 0;
187
0
  size_t lenlen;
188
0
  size_t needed;
189
0
  uint64_t dellen;
190
0
  int dellenlen;
191
0
  uint8_t *lock_ptr;
192
0
  uint8_t readers;
193
0
  ssize_t sent = 0;
194
0
  int i;
195
196
  /* we have to find some room to add our message (the buffer is
197
   * never empty and at least contains the previous counter) and
198
   * to update both the buffer contents and heads at the same
199
   * time (it's doable using atomic ops but not worth the
200
   * trouble, let's just lock). For this we first need to know
201
   * the total message's length. We cannot measure it while
202
   * copying due to the varint encoding of the length.
203
   */
204
0
  for (i = 0; i < npfx; i++)
205
0
    msglen += pfx[i].len;
206
0
  for (i = 0; i < nmsg; i++)
207
0
    msglen += msg[i].len;
208
209
0
  if (msglen > maxlen)
210
0
    msglen = maxlen;
211
212
0
  lenlen = varint_bytes(msglen);
213
214
  /* We need:
215
   *   - lenlen bytes for the size encoding
216
   *   - msglen for the message
217
   *   - one byte for the new marker
218
   *
219
   * Note that we'll also reserve one extra byte to make sure we never
220
   * leave a full buffer (the vec-to-ring conversion cannot be done if
221
   * both areas are of size 0).
222
   */
223
0
  needed = lenlen + msglen + 1;
224
225
  /* these ones do not change under us (only resize affects them and it
226
   * must be done under thread isolation).
227
   */
228
0
  ring_area = ring->storage->area;
229
0
  ring_size = ring->storage->size;
230
231
0
  if (needed + 1 > ring_size)
232
0
    goto leave;
233
234
0
  cell.to_send_self = needed;
235
0
  cell.needed_tot = 0; // only when non-zero the cell is considered ready.
236
0
  cell.maxlen = msglen;
237
0
  cell.pfx = pfx;
238
0
  cell.npfx = npfx;
239
0
  cell.msg = msg;
240
0
  cell.nmsg = nmsg;
241
242
  /* insert our cell into the queue before the previous one. We may have
243
   * to wait a bit if the queue's leader is attempting an election to win
244
   * the tail, hence the busy value (should be rare enough).
245
   */
246
0
  next_cell = HA_ATOMIC_XCHG(ring_queue_ptr, &cell);
247
248
  /* let's add the cumulated size of pending messages to ours */
249
0
  cell.next = next_cell;
250
0
  if (next_cell) {
251
0
    size_t next_needed;
252
253
0
    while ((next_needed = HA_ATOMIC_LOAD(&next_cell->needed_tot)) == 0)
254
0
      __ha_cpu_relax_for_read();
255
0
    needed += next_needed;
256
0
  }
257
258
  /* now <needed> will represent the size to store *all* messages. The
259
   * atomic store may unlock a subsequent thread waiting for this one.
260
   */
261
0
  HA_ATOMIC_STORE(&cell.needed_tot, needed);
262
263
  /* OK now we're the queue leader, it's our job to try to get ownership
264
   * of the tail, if we succeeded above, we don't even enter the loop. If
265
   * we failed, we set ourselves at the top the queue, waiting for the
266
   * tail to be unlocked again. We stop doing that if another thread
267
   * comes in and becomes the leader in turn.
268
   */
269
270
  /* Wait for another thread to take the lead or for the tail to
271
   * be available again. It's critical to be read-only in this
272
   * loop so as not to lose time synchronizing cache lines. Also,
273
   * we must detect a new leader ASAP so that the fewest possible
274
   * threads check the tail.
275
   */
276
277
0
  tail_ofs = 0;
278
0
  while (1) {
279
0
#if defined(__x86_64__)
280
    /* read using a CAS on x86, as it will keep the cache line
281
     * in exclusive state for a few more cycles that will allow
282
     * us to release the queue without waiting after the loop.
283
     */
284
0
    curr_cell = &cell;
285
0
    HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, curr_cell);
286
#else
287
    curr_cell = HA_ATOMIC_LOAD(ring_queue_ptr);
288
#endif
289
    /* give up if another thread took the leadership of the queue */
290
0
    if (curr_cell != &cell)
291
0
      goto wait_for_flush;
292
293
    /* OK the queue is locked, let's attempt to get the tail lock.
294
     * we'll atomically OR the lock on the pointer and check if
295
     * someone else had it already, otherwise we own it.
296
     */
297
298
#if defined(__ARM_FEATURE_ATOMICS)
299
    /* ARMv8.1-a has a true atomic OR and doesn't need the preliminary read */
300
    tail_ofs = HA_ATOMIC_FETCH_OR(tail_ptr, RING_TAIL_LOCK);
301
    if (!(tail_ofs & RING_TAIL_LOCK))
302
      break;
303
#else
304
0
    if (HA_ATOMIC_CAS(tail_ptr, &tail_ofs, tail_ofs | RING_TAIL_LOCK))
305
0
      break;
306
0
    tail_ofs &= ~RING_TAIL_LOCK;
307
0
#endif
308
0
    __ha_cpu_relax();
309
0
  }
310
311
  /* Here we own the tail. We can go on if we're still the leader,
312
   * which we'll confirm by trying to reset the queue. If we're
313
   * still the leader, we're done.
314
   */
315
0
  if (!HA_ATOMIC_CAS(ring_queue_ptr, &curr_cell, NULL)) {
316
    /* oops, no, let's give it back to another thread and wait.
317
     * This does not happen often enough to warrant more complex
318
     * approaches (tried already).
319
     */
320
0
    HA_ATOMIC_STORE(tail_ptr, tail_ofs);
321
0
    goto wait_for_flush;
322
0
  }
323
324
0
  head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
325
326
  /* this is the byte before tail, it contains the users count */
327
0
  lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
328
329
  /* Take the lock on the area. We're guaranteed to be the only writer
330
   * here.
331
   */
332
0
  readers = HA_ATOMIC_XCHG(lock_ptr, RING_WRITING_SIZE);
333
334
0
  vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
335
336
0
  while (vp_size(v1, v2) + needed + 1 + 1 > ring_size) {
337
    /* we need to delete the oldest message (from the end),
338
     * and we have to stop if there's a reader stuck there.
339
     * Unless there's corruption in the buffer it's guaranteed
340
     * that we have enough data to find 1 counter byte, a
341
     * varint-encoded length (1 byte min) and the message
342
     * payload (0 bytes min).
343
     */
344
0
    if (*_vp_head(v1, v2))
345
0
      break;
346
0
    dellenlen = vp_peek_varint_ofs(v1, v2, 1, &dellen);
347
0
    if (!dellenlen)
348
0
      break;
349
0
    BUG_ON_HOT(vp_size(v1, v2) < 1 + dellenlen + dellen);
350
0
    vp_skip(&v1, &v2, 1 + dellenlen + dellen);
351
0
  }
352
353
  /* now let's update the buffer with the new tail if our message will fit */
354
0
  new_tail_ofs = tail_ofs;
355
0
  if (vp_size(v1, v2) + needed + 1 + 1 <= ring_size) {
356
0
    vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
357
358
    /* update the new space in the buffer */
359
0
    HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
360
361
    /* calculate next tail pointer */
362
0
    new_tail_ofs += needed;
363
0
    if (new_tail_ofs >= ring_size)
364
0
      new_tail_ofs -= ring_size;
365
366
    /* reset next read counter before releasing writers */
367
0
    HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
368
0
  }
369
0
  else {
370
    /* release readers right now, before writing the tail, so as
371
     * not to expose the readers count byte to another writer.
372
     */
373
0
    HA_ATOMIC_STORE(lock_ptr, readers);
374
0
  }
375
376
  /* and release other writers */
377
0
  HA_ATOMIC_STORE(tail_ptr, new_tail_ofs);
378
379
0
  vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
380
381
0
  if (likely(tail_ofs != new_tail_ofs)) {
382
    /* the list stops on a NULL */
383
0
    for (curr_cell = &cell; curr_cell; curr_cell = HA_ATOMIC_LOAD(&curr_cell->next)) {
384
0
      maxlen = curr_cell->maxlen;
385
0
      pfx = curr_cell->pfx;
386
0
      npfx = curr_cell->npfx;
387
0
      msg = curr_cell->msg;
388
0
      nmsg = curr_cell->nmsg;
389
390
      /* let's write the message size */
391
0
      vp_put_varint(&v1, &v2, maxlen);
392
393
      /* then write the messages */
394
0
      msglen = 0;
395
0
      for (i = 0; i < npfx; i++) {
396
0
        size_t len = pfx[i].len;
397
398
0
        if (len + msglen > maxlen)
399
0
          len = maxlen - msglen;
400
0
        if (len)
401
0
          vp_putblk(&v1, &v2, pfx[i].ptr, len);
402
0
        msglen += len;
403
0
      }
404
405
0
      for (i = 0; i < nmsg; i++) {
406
0
        size_t len = msg[i].len;
407
408
0
        if (len + msglen > maxlen)
409
0
          len = maxlen - msglen;
410
0
        if (len)
411
0
          vp_putblk(&v1, &v2, msg[i].ptr, len);
412
0
        msglen += len;
413
0
      }
414
415
      /* for all but the last message we need to write the
416
       * readers count byte.
417
       */
418
0
      if (curr_cell->next)
419
0
        vp_putchr(&v1, &v2, 0);
420
0
    }
421
422
    /* now release */
423
0
    for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
424
0
      next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
425
0
      _HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
426
0
    }
427
428
    /* unlock the message area */
429
0
    HA_ATOMIC_STORE(lock_ptr, readers);
430
0
  } else {
431
    /* messages were dropped, notify about this and release them  */
432
0
    for (curr_cell = &cell; curr_cell; curr_cell = next_cell) {
433
0
      next_cell = HA_ATOMIC_LOAD(&curr_cell->next);
434
0
      HA_ATOMIC_STORE(&curr_cell->to_send_self, 0);
435
0
      _HA_ATOMIC_STORE(&curr_cell->next, curr_cell);
436
0
    }
437
0
  }
438
439
  /* we must not write the trailing read counter, it was already done,
440
   * plus we could ruin the one of the next writer. And the front was
441
   * unlocked either at the top if the ring was full, or just above if it
442
   * could be properly filled.
443
   */
444
445
0
  sent = cell.to_send_self;
446
447
  /* notify potential readers */
448
0
  if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
449
0
    HA_ATOMIC_INC(&ring->pending);
450
0
    while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) {
451
0
      struct mt_list back;
452
0
      struct appctx *appctx;
453
454
0
      HA_ATOMIC_STORE(&ring->pending, 0);
455
0
      MT_LIST_FOR_EACH_ENTRY_LOCKED(appctx, &ring->waiters, wait_entry, back)
456
0
        appctx_wakeup(appctx);
457
0
      HA_ATOMIC_STORE(&ring->waking, 0);
458
0
    }
459
0
  }
460
461
0
 leave:
462
0
  return sent;
463
464
0
 wait_for_flush:
465
  /* if we arrive here, it means we found another leader */
466
467
  /* The leader will write our own pointer in the cell's next to
468
   * mark it as released. Let's wait for this.
469
   */
470
0
  do {
471
0
    next_cell = HA_ATOMIC_LOAD(&cell.next);
472
0
  } while (next_cell != &cell && __ha_cpu_relax());
473
474
  /* OK our message was queued. Retrieving the sent size in the ring cell
475
   * allows another leader thread to zero it if it finally couldn't send
476
   * it (should only happen when using too small ring buffers to store
477
   * all competing threads' messages at once).
478
   */
479
0
  return HA_ATOMIC_LOAD(&cell.to_send_self);
480
0
}
481
482
/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is
483
 * meant to be used by low level appctx code such as CLI or ring forwarding.
484
 * For higher level functions, please see the relevant parts in appctx or CLI.
485
 * It returns non-zero on success or zero on failure if too many users are
486
 * already attached. On success, the caller MUST call ring_detach_appctx()
487
 * to detach itself, even if it was never woken up.
488
 */
489
int ring_attach(struct ring *ring)
490
0
{
491
0
  int users = ring->readers_count;
492
493
0
  do {
494
0
    if (users >= RING_MAX_READERS)
495
0
      return 0;
496
0
  } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
497
0
  return 1;
498
0
}
499
500
/* detach an appctx from a ring. The appctx is expected to be waiting at offset
501
 * <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
502
 * Nothing is done if <ring> is NULL.
503
 */
504
void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
505
0
{
506
0
  if (!ring)
507
0
    return;
508
509
0
  HA_ATOMIC_DEC(&ring->readers_count);
510
511
0
  if (ofs != ~0) {
512
    /* reader was still attached */
513
0
    uint8_t *area = (uint8_t *)ring_area(ring);
514
0
    uint8_t readers;
515
516
0
    BUG_ON(ofs >= ring_size(ring));
517
0
    MT_LIST_DELETE(&appctx->wait_entry);
518
519
    /* dec readers count */
520
0
    do {
521
0
      readers = _HA_ATOMIC_LOAD(area + ofs);
522
0
    } while ((readers > RING_MAX_READERS ||
523
0
        !_HA_ATOMIC_CAS(area + ofs, &readers, readers - 1)) && __ha_cpu_relax());
524
0
  }
525
0
}
526
527
/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
528
 * meant to be used when registering a CLI function to dump a buffer, so it
529
 * returns zero on success, or non-zero on failure with a message in the appctx
530
 * CLI context. It automatically sets the io_handler and io_release callbacks if
531
 * they were not set. The <flags> take a combination of RING_WF_*.
532
 */
533
int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
534
0
{
535
0
  struct show_ring_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx));
536
537
0
  if (!ring_attach(ring))
538
0
    return cli_err(appctx,
539
0
                   "Sorry, too many watchers (" TOSTR(RING_MAX_READERS) ") on this ring buffer. "
540
0
                   "What could it have so interesting to attract so many watchers ?");
541
542
0
  if (!appctx->cli_ctx.io_handler)
543
0
    appctx->cli_ctx.io_handler = cli_io_handler_show_ring;
544
0
  if (!appctx->cli_ctx.io_release)
545
0
                appctx->cli_ctx.io_release = cli_io_release_show_ring;
546
547
0
  memset(ctx, 0, sizeof(*ctx));
548
0
  ctx->ring  = ring;
549
0
  ctx->ofs   = ~0; // start from the oldest event
550
0
  ctx->flags = flags;
551
0
  return 0;
552
0
}
553
554
555
/* parses as many messages as possible from ring <ring>, starting at the offset
556
 * stored at *ofs_ptr, with RING_WF_* flags in <flags>, and passes them to
557
 * the message handler <msg_handler>. If <last_of_ptr> is not NULL, a copy of
558
 * the last known tail pointer will be copied there so that the caller may use
559
 * this to detect new data have arrived since we left the function. Returns 0
560
 * if it needs to pause, 1 once finished.
561
 *
562
 * If <processed> is not NULL, it will be set to the number of messages
563
 * processed by the function (even when the function returns 0)
564
 */
565
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
566
                           ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len, char delim),
567
                           char delim,
568
                           size_t *processed)
569
0
{
570
0
  size_t head_ofs, tail_ofs, prev_ofs;
571
0
  size_t ring_size;
572
0
  uint8_t *ring_area;
573
0
  struct ist v1, v2;
574
0
  uint64_t msg_len;
575
0
  size_t len, cnt;
576
0
  size_t msg_count = 0;
577
0
  ssize_t copied;
578
0
  uint8_t readers;
579
0
  int ret;
580
581
0
  ring_area = (uint8_t *)ring->storage->area;
582
0
  ring_size = ring->storage->size;
583
584
  /* explanation for the initialization below: it would be better to do
585
   * this in the parsing function but this would occasionally result in
586
   * dropped events because we'd take a reference on the oldest message
587
   * and keep it while being scheduled. Thus instead let's take it the
588
   * first time we enter here so that we have a chance to pass many
589
   * existing messages before grabbing a reference to a location. This
590
   * value cannot be produced after initialization. The first offset
591
   * needs to be taken under isolation as it must not move while we're
592
   * trying to catch it.
593
   */
594
0
  if (unlikely(*ofs_ptr == ~0)) {
595
0
    thread_isolate();
596
597
0
    head_ofs = HA_ATOMIC_LOAD(&ring->storage->head);
598
0
    tail_ofs = ring_tail(ring);
599
600
0
    if (flags & RING_WF_SEEK_NEW) {
601
      /* going to the end means looking at tail-1 */
602
0
      head_ofs = tail_ofs + ring_size - 1;
603
0
      if (head_ofs >= ring_size)
604
0
        head_ofs -= ring_size;
605
0
    }
606
607
    /* reserve our slot here (inc readers count) */
608
0
    do {
609
0
      readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
610
0
    } while ((readers > RING_MAX_READERS ||
611
0
        !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
612
613
0
    thread_release();
614
615
    /* store this precious offset in our context, and we're done */
616
0
    *ofs_ptr = head_ofs;
617
0
  }
618
619
  /* we have the guarantee we can restart from our own head */
620
0
  head_ofs = *ofs_ptr;
621
0
  BUG_ON(head_ofs >= ring_size);
622
623
  /* the tail will continue to move but we're getting a safe value
624
   * here that will continue to work.
625
   */
626
0
  tail_ofs = ring_tail(ring);
627
628
  /* we keep track of where we were and we don't release it before
629
   * we've protected the next place.
630
   */
631
0
  prev_ofs = head_ofs;
632
633
  /* in this loop, head_ofs always points to the counter byte that precedes
634
   * the message so that we can take our reference there if we have to
635
   * stop before the end (ret=0). The reference is relative to the ring's
636
   * origin, while pos is relative to the ring's head.
637
   */
638
0
  ret = 1;
639
0
  vp_ring_to_data(&v1, &v2, (char *)ring_area, ring_size, head_ofs, tail_ofs);
640
641
0
  while (1) {
642
0
    if (vp_size(v1, v2) <= 1) {
643
      /* no more data */
644
0
      break;
645
0
    }
646
647
0
    readers = _HA_ATOMIC_LOAD(_vp_addr(v1, v2, 0));
648
0
    if (readers > RING_MAX_READERS) {
649
      /* we just met a writer which hasn't finished */
650
0
      break;
651
0
    }
652
653
0
    cnt = 1;
654
0
    len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
655
0
    if (!len)
656
0
      break;
657
0
    cnt += len;
658
659
0
    BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
660
661
0
    copied = msg_handler(ctx, v1, v2, cnt, msg_len, delim);
662
0
    if (copied == -2) {
663
      /* too large a message to ever fit, let's skip it */
664
0
      goto skip;
665
0
    }
666
0
    else if (copied == -1) {
667
      /* output full */
668
0
      ret = 0;
669
0
      break;
670
0
    }
671
0
  skip:
672
0
    msg_count += 1;
673
0
    vp_skip(&v1, &v2, cnt + msg_len);
674
0
  }
675
676
0
  vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
677
678
0
  if (head_ofs != prev_ofs) {
679
    /* inc readers count on new place */
680
0
    do {
681
0
      readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
682
0
    } while ((readers > RING_MAX_READERS ||
683
0
        !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
684
685
    /* dec readers count on old place */
686
0
    do {
687
0
      readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
688
0
    } while ((readers > RING_MAX_READERS ||
689
0
        !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
690
0
  }
691
692
0
  if (last_ofs_ptr)
693
0
    *last_ofs_ptr = tail_ofs;
694
0
  *ofs_ptr = head_ofs;
695
0
  if (processed)
696
0
    *processed = msg_count;
697
0
  return ret;
698
0
}
699
700
/* This function dumps all events from the ring whose pointer is in <p0> into
701
 * the appctx's output buffer, and takes from <o0> the seek offset into the
702
 * buffer's history (0 for oldest known event). It looks at <i0> for boolean
703
 * options: bit0 means it must wait for new data or any key to be pressed. Bit1
704
 * means it must seek directly to the end to wait for new contents. It returns
705
 * 0 if the output buffer or events are missing is full and it needs to be
706
 * called again, otherwise non-zero. It is meant to be used with
707
 * cli_release_show_ring() to clean up.
708
 */
709
int cli_io_handler_show_ring(struct appctx *appctx)
710
0
{
711
0
  struct show_ring_ctx *ctx = appctx->svcctx;
712
0
  struct ring *ring = ctx->ring;
713
0
  size_t last_ofs;
714
0
  size_t ofs;
715
0
  int ret;
716
717
0
  MT_LIST_DELETE(&appctx->wait_entry);
718
719
0
  ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line,
720
0
             (ctx->flags & RING_WF_END_ZERO) ? 0 : '\n', NULL);
721
722
0
  if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
723
    /* we've drained everything and are configured to wait for more
724
     * data or an event (keypress, close)
725
     */
726
0
    if (!b_data(&appctx->inbuf) && !se_fl_test(appctx->sedesc, SE_FL_SHW)) {
727
      /* let's be woken up once new data arrive */
728
0
      MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
729
0
      ofs = ring_tail(ring);
730
0
      if (ofs != last_ofs) {
731
        /* more data was added into the ring between the
732
         * unlock and the lock, and the writer might not
733
         * have seen us. We need to reschedule a read.
734
         */
735
0
        applet_have_more_data(appctx);
736
0
      } else
737
0
        applet_have_no_more_data(appctx);
738
0
      ret = 0;
739
0
    }
740
    /* always drain all the request */
741
0
    b_reset(&appctx->inbuf);
742
0
    applet_fl_clr(appctx, APPCTX_FL_INBLK_FULL);
743
0
  }
744
745
0
  applet_will_consume(appctx);
746
0
  applet_expect_no_data(appctx);
747
0
  return ret;
748
0
}
749
750
/* must be called after cli_io_handler_show_ring() above */
751
void cli_io_release_show_ring(struct appctx *appctx)
752
0
{
753
0
  struct show_ring_ctx *ctx = appctx->svcctx;
754
0
  struct ring *ring = ctx->ring;
755
0
  size_t ofs = ctx->ofs;
756
757
0
  ring_detach_appctx(ring, appctx, ofs);
758
0
}
759
760
/* Returns the MAXIMUM payload len that could theoretically fit into the ring
761
 * based on ring buffer size.
762
 *
763
 * Computation logic relies on implementation details from 'ring-t.h'.
764
 */
765
size_t ring_max_payload(const struct ring *ring)
766
0
{
767
0
  size_t max;
768
769
  /* initial max = bufsize - 1 (initial RC) - 1 (payload RC) */
770
0
  max = ring_size(ring) - 1 - 1;
771
772
  /* subtract payload VI (varint-encoded size) */
773
0
  max -= varint_bytes(max);
774
0
  return max;
775
0
}
776
777
/* config parser for global "tune.ring.queues", accepts a number from 0 to RING_WAIT_QUEUES */
778
static int cfg_parse_tune_ring_queues(char **args, int section_type, struct proxy *curpx,
779
                                       const struct proxy *defpx, const char *file, int line,
780
                                       char **err)
781
0
{
782
0
  int queues;
783
784
0
  if (too_many_args(1, args, err, NULL))
785
0
    return -1;
786
787
0
  queues = atoi(args[1]);
788
0
  if (queues < 0 || queues > RING_WAIT_QUEUES) {
789
0
    memprintf(err, "'%s' expects a number between 0 and %d but got '%s'.", args[0], RING_WAIT_QUEUES, args[1]);
790
0
    return -1;
791
0
  }
792
793
0
  global.tune.ring_queues = queues;
794
0
  return 0;
795
0
}
796
797
/* config keyword parsers */
798
static struct cfg_kw_list cfg_kws = {ILH, {
799
  { CFG_GLOBAL, "tune.ring.queues", cfg_parse_tune_ring_queues },
800
  { 0, NULL, NULL }
801
}};
802
803
INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
804
805
/*
806
 * Local variables:
807
 *  c-indent-level: 8
808
 *  c-basic-offset: 8
809
 * End:
810
 */