Coverage Report

Created: 2025-08-03 06:36

/src/frr/lib/frrcu.c
Line
Count
Source (jump to first uncovered line)
1
// SPDX-License-Identifier: ISC
2
/*
3
 * Copyright (c) 2017-19  David Lamparter, for NetDEF, Inc.
4
 */
5
6
/* implementation notes:  this is an epoch-based RCU implementation.  rcu_seq
7
 * (global variable) counts the current epoch.  Threads hold a specific epoch
8
 * in rcu_read_lock().  This is the oldest epoch a thread might be accessing
9
 * data from.
10
 *
11
 * The rcu_seq global is only pushed forward on rcu_read_lock() and
12
 * rcu_read_unlock() calls.  This makes things a tad more efficient since
13
 * those are the only places it matters:
14
 * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
15
 * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
16
 *   when heading into a long idle period where no thread holds RCU
17
 *
18
 * rcu_thread structures themselves are RCU-free'd.
19
 *
20
 * rcu_head structures are the most iffy;  normally for an ATOMLIST we would
21
 * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
22
 * to prevent ABA or use-after-free problems.  However, our ATOMLIST code
23
 * guarantees that if the list remains non-empty in all cases, we only need
24
 * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
25
 * issues - but we do need to keep at least 1 item on the list.
26
 *
27
 * (Search the atomlist code for all uses of "last")
28
 */
29
30
#ifdef HAVE_CONFIG_H
31
#include "config.h"
32
#endif
33
34
#include <pthread.h>
35
#ifdef HAVE_PTHREAD_NP_H
36
#include <pthread_np.h>
37
#endif
38
#include <string.h>
39
#include <unistd.h>
40
#include <signal.h>
41
42
#include "frrcu.h"
43
#include "seqlock.h"
44
#include "atomlist.h"
45
46
DEFINE_MTYPE_STATIC(LIB, RCU_THREAD,    "RCU thread");
47
DEFINE_MTYPE_STATIC(LIB, RCU_NEXT,      "RCU sequence barrier");
48
49
DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head);
50
51
PREDECL_ATOMLIST(rcu_threads);
52
struct rcu_thread {
53
  struct rcu_threads_item head;
54
55
  struct rcu_head rcu_head;
56
57
  struct seqlock rcu;
58
59
  /* only accessed by thread itself, not atomic */
60
  unsigned depth;
61
};
62
DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head);
63
64
static const struct rcu_action rcua_next  = { .type = RCUA_NEXT };
65
static const struct rcu_action rcua_end   = { .type = RCUA_END };
66
static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
67
68
struct rcu_next {
69
  struct rcu_head head_free;
70
  struct rcu_head head_next;
71
};
72
73
#define rcu_free_internal(mtype, ptr, field)                                   \
74
0
  do {                                                                   \
75
0
    typeof(ptr) _ptr = (ptr);                                      \
76
0
    struct rcu_head *_rcu_head = &_ptr->field;                     \
77
0
    static const struct rcu_action _rcu_action = {                 \
78
0
      .type = RCUA_FREE,                                     \
79
0
      .u.free = {                                            \
80
0
        .mt = mtype,                                   \
81
0
        .offset = offsetof(typeof(*_ptr), field),      \
82
0
      },                                                     \
83
0
    };                                                             \
84
0
    _rcu_head->action = &_rcu_action;                              \
85
0
    rcu_heads_add_tail(&rcu_heads, _rcu_head);                     \
86
0
  } while (0)
87
88
/* primary global RCU position */
89
static struct seqlock rcu_seq;
90
/* this is set to rcu_seq whenever something is added on the RCU queue.
91
 * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
92
 */
93
static _Atomic seqlock_val_t rcu_dirty;
94
95
static struct rcu_threads_head rcu_threads;
96
static struct rcu_heads_head rcu_heads;
97
98
/* main thread & RCU sweeper have pre-setup rcu_thread structures.  The
99
 * reasons are different:
100
 *
101
 * - rcu_thread_main is there because the main thread isn't started like
102
 *   other threads, it's implicitly created when the program is started.  So
103
 *   rcu_thread_main matches up implicitly.
104
 *
105
 * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
106
 *   sense really), it only exists so we can call RCU-using functions from
107
 *   the RCU thread without special handling in rcu_read_lock/unlock.
108
 */
109
static struct rcu_thread rcu_thread_main;
110
static struct rcu_thread rcu_thread_rcu;
111
112
static pthread_t rcu_pthread;
113
static pthread_key_t rcu_thread_key;
114
static bool rcu_active;
115
116
static void rcu_start(void);
117
static void rcu_bump(void);
118
119
/*
120
 * preinitialization for main thread
121
 */
122
static void rcu_thread_end(void *rcu_thread);
123
124
static void rcu_preinit(void) __attribute__((constructor));
125
static void rcu_preinit(void)
126
6
{
127
6
  struct rcu_thread *rt;
128
129
6
  rt = &rcu_thread_main;
130
6
  rt->depth = 1;
131
6
  seqlock_init(&rt->rcu);
132
6
  seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
133
134
6
  pthread_key_create(&rcu_thread_key, rcu_thread_end);
135
6
  pthread_setspecific(rcu_thread_key, rt);
136
137
6
  rcu_threads_add_tail(&rcu_threads, rt);
138
139
  /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
140
6
  rt = &rcu_thread_rcu;
141
6
  rt->depth = 1;
142
143
6
  seqlock_init(&rcu_seq);
144
6
  seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
145
6
}
146
147
static struct rcu_thread *rcu_self(void)
148
0
{
149
0
  return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
150
0
}
151
152
/*
153
 * thread management (for the non-main thread)
154
 */
155
struct rcu_thread *rcu_thread_prepare(void)
156
0
{
157
0
  struct rcu_thread *rt, *cur;
158
159
0
  rcu_assert_read_locked();
160
161
0
  if (!rcu_active)
162
0
    rcu_start();
163
164
0
  cur = rcu_self();
165
0
  assert(cur->depth);
166
167
  /* new thread always starts with rcu_read_lock held at depth 1, and
168
   * holding the same epoch as the parent (this makes it possible to
169
   * use RCU for things passed into the thread through its arg)
170
   */
171
0
  rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
172
0
  rt->depth = 1;
173
174
0
  seqlock_init(&rt->rcu);
175
0
  seqlock_acquire(&rt->rcu, &cur->rcu);
176
177
0
  rcu_threads_add_tail(&rcu_threads, rt);
178
179
0
  return rt;
180
0
}
181
182
void rcu_thread_start(struct rcu_thread *rt)
183
0
{
184
0
  pthread_setspecific(rcu_thread_key, rt);
185
0
}
186
187
void rcu_thread_unprepare(struct rcu_thread *rt)
188
0
{
189
0
  if (rt == &rcu_thread_rcu)
190
0
    return;
191
192
0
  rt->depth = 1;
193
0
  seqlock_acquire(&rt->rcu, &rcu_seq);
194
195
0
  rcu_bump();
196
0
  if (rt != &rcu_thread_main)
197
    /* this free() happens after seqlock_release() below */
198
0
    rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
199
200
0
  rcu_threads_del(&rcu_threads, rt);
201
0
  seqlock_release(&rt->rcu);
202
0
}
203
204
static void rcu_thread_end(void *rtvoid)
205
0
{
206
0
  struct rcu_thread *rt = rtvoid;
207
0
  rcu_thread_unprepare(rt);
208
0
}
209
210
/*
211
 * main RCU control aspects
212
 */
213
214
static void rcu_bump(void)
215
0
{
216
0
  struct rcu_next *rn;
217
218
0
  rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
219
220
  /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
221
   * This means we don't need to communicate which seqno is which
222
   * RCUA_NEXT, since we really don't care.
223
   */
224
225
  /*
226
   * Important race condition:  while rcu_heads_add_tail is executing,
227
   * there is an intermediate point where the rcu_heads "last" pointer
228
   * already points to rn->head_next, but rn->head_next isn't added to
229
   * the list yet.  That means any other "add_tail" calls append to this
230
   * item, which isn't fully on the list yet.  Freeze this thread at
231
   * that point and look at another thread doing a rcu_bump.  It adds
232
   * these two items and then does a seqlock_bump.  But the rcu_heads
233
   * list is still "interrupted" and there's no RCUA_NEXT on the list
234
   * yet (from either the frozen thread or the second thread).  So
235
   * rcu_main() might actually hit the end of the list at the
236
   * "interrupt".
237
   *
238
   * This situation is prevented by requiring that rcu_read_lock is held
239
   * for any calls to rcu_bump, since if we're holding the current RCU
240
   * epoch, that means rcu_main can't be chewing on rcu_heads and hit
241
   * that interruption point.  Only by the time the thread has continued
242
   * to rcu_read_unlock() - and therefore completed the add_tail - the
243
   * RCU sweeper gobbles up the epoch and can be sure to find at least
244
   * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
245
   */
246
0
  rn->head_next.action = &rcua_next;
247
0
  rcu_heads_add_tail(&rcu_heads, &rn->head_next);
248
249
  /* free rn that we allocated above.
250
   *
251
   * This is INTENTIONALLY not built into the RCUA_NEXT action.  This
252
   * ensures that after the action above is popped off the queue, there
253
   * is still at least 1 item on the RCU queue.  This means we never
254
   * delete the last item, which is extremely important since it keeps
255
   * the atomlist ->last pointer alive and well.
256
   *
257
   * If we were to "run dry" on the RCU queue, add_tail may run into the
258
   * "last item is being deleted - start over" case, and then we may end
259
   * up accessing old RCU queue items that are already free'd.
260
   */
261
0
  rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
262
263
  /* Only allow the RCU sweeper to run after these 2 items are queued.
264
   *
265
   * If another thread enqueues some RCU action in the intermediate
266
   * window here, nothing bad happens - the queued action is associated
267
   * with a larger seq# than strictly necessary.  Thus, it might get
268
   * executed a bit later, but that's not a problem.
269
   *
270
   * If another thread acquires the read lock in this window, it holds
271
   * the previous epoch, but its RCU queue actions will be in the next
272
   * epoch.  This isn't a problem either, just a tad inefficient.
273
   */
274
0
  seqlock_bump(&rcu_seq);
275
0
}
276
277
static void rcu_bump_maybe(void)
278
0
{
279
0
  seqlock_val_t dirty;
280
281
0
  dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
282
  /* no problem if we race here and multiple threads bump rcu_seq;
283
   * bumping too much causes no issues while not bumping enough will
284
   * result in delayed cleanup
285
   */
286
0
  if (dirty == seqlock_cur(&rcu_seq))
287
0
    rcu_bump();
288
0
}
289
290
void rcu_read_lock(void)
291
0
{
292
0
  struct rcu_thread *rt = rcu_self();
293
294
0
  assert(rt);
295
0
  if (rt->depth++ > 0)
296
0
    return;
297
298
0
  seqlock_acquire(&rt->rcu, &rcu_seq);
299
  /* need to hold RCU for bump ... */
300
0
  rcu_bump_maybe();
301
  /* ... but no point in holding the old epoch if we just bumped */
302
0
  seqlock_acquire(&rt->rcu, &rcu_seq);
303
0
}
304
305
void rcu_read_unlock(void)
306
0
{
307
0
  struct rcu_thread *rt = rcu_self();
308
309
0
  assert(rt && rt->depth);
310
0
  if (--rt->depth > 0)
311
0
    return;
312
0
  rcu_bump_maybe();
313
0
  seqlock_release(&rt->rcu);
314
0
}
315
316
void rcu_assert_read_locked(void)
317
0
{
318
0
  struct rcu_thread *rt = rcu_self();
319
0
  assert(rt && rt->depth && seqlock_held(&rt->rcu));
320
0
}
321
322
void rcu_assert_read_unlocked(void)
323
0
{
324
0
  struct rcu_thread *rt = rcu_self();
325
0
  assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
326
0
}
327
328
/*
329
 * RCU resource-release thread
330
 */
331
332
static void *rcu_main(void *arg);
333
334
static void rcu_start(void)
335
0
{
336
  /* ensure we never handle signals on the RCU thread by blocking
337
   * everything here (new thread inherits signal mask)
338
   */
339
0
  sigset_t oldsigs, blocksigs;
340
341
0
  sigfillset(&blocksigs);
342
0
  pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
343
344
0
  rcu_active = true;
345
346
0
  assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
347
348
0
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
349
350
0
#ifdef HAVE_PTHREAD_SETNAME_NP
351
0
# ifdef GNU_LINUX
352
0
  pthread_setname_np(rcu_pthread, "RCU sweeper");
353
# elif defined(__NetBSD__)
354
  pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
355
# endif
356
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
357
  pthread_set_name_np(rcu_pthread, "RCU sweeper");
358
#endif
359
0
}
360
361
static void rcu_do(struct rcu_head *rh)
362
0
{
363
0
  struct rcu_head_close *rhc;
364
0
  void *p;
365
366
0
  switch (rh->action->type) {
367
0
  case RCUA_FREE:
368
0
    p = (char *)rh - rh->action->u.free.offset;
369
0
    if (rh->action->u.free.mt)
370
0
      qfree(rh->action->u.free.mt, p);
371
0
    else
372
0
      free(p);
373
0
    break;
374
0
  case RCUA_CLOSE:
375
0
    rhc = container_of(rh, struct rcu_head_close,
376
0
           rcu_head);
377
0
    close(rhc->fd);
378
0
    break;
379
0
  case RCUA_CALL:
380
0
    p = (char *)rh - rh->action->u.call.offset;
381
0
    rh->action->u.call.fptr(p);
382
0
    break;
383
384
0
  case RCUA_INVALID:
385
0
  case RCUA_NEXT:
386
0
  case RCUA_END:
387
0
  default:
388
0
    assert(0);
389
0
  }
390
0
}
391
392
static void rcu_watchdog(struct rcu_thread *rt)
393
0
{
394
#if 0
395
  /* future work: print a backtrace for the thread that's holding up
396
   * RCU.  The only (good) way of doing that is to send a signal to the
397
   * other thread, save away the backtrace in the signal handler, and
398
   * block here until the signal is done processing.
399
   *
400
   * Just haven't implemented that yet.
401
   */
402
  fprintf(stderr, "RCU watchdog %p\n", rt);
403
#endif
404
0
}
405
406
static void *rcu_main(void *arg)
407
0
{
408
0
  struct rcu_thread *rt;
409
0
  struct rcu_head *rh = NULL;
410
0
  bool end = false;
411
0
  struct timespec maxwait;
412
413
0
  seqlock_val_t rcuval = SEQLOCK_STARTVAL;
414
415
0
  pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
416
417
0
  while (!end) {
418
0
    seqlock_wait(&rcu_seq, rcuval);
419
420
    /* RCU watchdog timeout, TODO: configurable value */
421
0
    clock_gettime(CLOCK_MONOTONIC, &maxwait);
422
0
    maxwait.tv_nsec += 100 * 1000 * 1000;
423
0
    if (maxwait.tv_nsec >= 1000000000) {
424
0
      maxwait.tv_sec++;
425
0
      maxwait.tv_nsec -= 1000000000;
426
0
    }
427
428
0
    frr_each (rcu_threads, &rcu_threads, rt)
429
0
      if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
430
0
        rcu_watchdog(rt);
431
0
        seqlock_wait(&rt->rcu, rcuval);
432
0
      }
433
434
0
    while ((rh = rcu_heads_pop(&rcu_heads))) {
435
0
      if (rh->action->type == RCUA_NEXT)
436
0
        break;
437
0
      else if (rh->action->type == RCUA_END)
438
0
        end = true;
439
0
      else
440
0
        rcu_do(rh);
441
0
    }
442
443
0
    rcuval += SEQLOCK_INCR;
444
0
  }
445
446
  /* rcu_shutdown can only be called singlethreaded, and it does a
447
   * pthread_join, so it should be impossible that anything ended up
448
   * on the queue after RCUA_END
449
   */
450
0
#if 1
451
0
  assert(!rcu_heads_first(&rcu_heads));
452
#else
453
  while ((rh = rcu_heads_pop(&rcu_heads)))
454
    if (rh->action->type >= RCUA_FREE)
455
      rcu_do(rh);
456
#endif
457
0
  return NULL;
458
0
}
459
460
void rcu_shutdown(void)
461
0
{
462
0
  static struct rcu_head rcu_head_end;
463
0
  struct rcu_thread *rt = rcu_self();
464
0
  void *retval;
465
466
0
  if (!rcu_active)
467
0
    return;
468
469
0
  rcu_assert_read_locked();
470
0
  assert(rcu_threads_count(&rcu_threads) == 1);
471
472
0
  rcu_enqueue(&rcu_head_end, &rcua_end);
473
474
0
  rt->depth = 0;
475
0
  seqlock_release(&rt->rcu);
476
0
  seqlock_release(&rcu_seq);
477
0
  rcu_active = false;
478
479
  /* clearing rcu_active is before pthread_join in case we hang in
480
   * pthread_join & get a SIGTERM or something - in that case, just
481
   * ignore the maybe-still-running RCU thread
482
   */
483
0
  if (pthread_join(rcu_pthread, &retval) == 0) {
484
0
    seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
485
0
    seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
486
0
    rt->depth = 1;
487
0
  }
488
0
}
489
490
/*
491
 * RCU'd free functions
492
 */
493
494
void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
495
0
{
496
  /* refer to rcu_bump() for why we need to hold RCU when adding items
497
   * to rcu_heads
498
   */
499
0
  rcu_assert_read_locked();
500
501
0
  rh->action = action;
502
503
0
  if (!rcu_active) {
504
0
    rcu_do(rh);
505
0
    return;
506
0
  }
507
0
  rcu_heads_add_tail(&rcu_heads, rh);
508
0
  atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
509
0
            memory_order_relaxed);
510
0
}
511
512
void rcu_close(struct rcu_head_close *rhc, int fd)
513
0
{
514
0
  rhc->fd = fd;
515
0
  rcu_enqueue(&rhc->rcu_head, &rcua_close);
516
0
}