Coverage Report

Created: 2023-03-26 07:41

/src/openvswitch/lib/ovs-rcu.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014, 2017 Nicira, Inc.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
#include <errno.h>
19
#include "ovs-rcu.h"
20
#include "fatal-signal.h"
21
#include "guarded-list.h"
22
#include "latch.h"
23
#include "openvswitch/list.h"
24
#include "ovs-thread.h"
25
#include "openvswitch/poll-loop.h"
26
#include "seq.h"
27
#include "timeval.h"
28
#include "util.h"
29
#include "openvswitch/vlog.h"
30
31
VLOG_DEFINE_THIS_MODULE(ovs_rcu);
32
33
0
#define MIN_CBS 16
34
35
struct ovsrcu_cb {
36
    void (*function)(void *aux);
37
    void *aux;
38
};
39
40
struct ovsrcu_cbset {
41
    struct ovs_list list_node;
42
    struct ovsrcu_cb *cbs;
43
    size_t n_allocated;
44
    int n_cbs;
45
};
46
47
struct ovsrcu_perthread {
48
    struct ovs_list list_node;  /* In global list. */
49
50
    uint64_t seqno;
51
    struct ovsrcu_cbset *cbset;
52
    char name[16];              /* This thread's name. */
53
};
54
55
static struct seq *global_seqno;
56
57
static pthread_key_t perthread_key;
58
static struct ovs_list ovsrcu_threads;
59
static struct ovs_mutex ovsrcu_threads_mutex;
60
61
static struct guarded_list flushed_cbsets;
62
static struct seq *flushed_cbsets_seq;
63
64
static struct latch postpone_exit;
65
static struct ovs_barrier postpone_barrier;
66
67
static void ovsrcu_init_module(void);
68
static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
69
static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
70
static void ovsrcu_unregister__(struct ovsrcu_perthread *);
71
static bool ovsrcu_call_postponed(void);
72
static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
73
74
static struct ovsrcu_perthread *
75
ovsrcu_perthread_get(void)
76
0
{
77
0
    struct ovsrcu_perthread *perthread;
78
79
0
    ovsrcu_init_module();
80
81
0
    perthread = pthread_getspecific(perthread_key);
82
0
    if (!perthread) {
83
0
        const char *name = get_subprogram_name();
84
85
0
        perthread = xmalloc(sizeof *perthread);
86
0
        perthread->seqno = seq_read(global_seqno);
87
0
        perthread->cbset = NULL;
88
0
        ovs_strlcpy(perthread->name, name[0] ? name : "main",
89
0
                    sizeof perthread->name);
90
91
0
        ovs_mutex_lock(&ovsrcu_threads_mutex);
92
0
        ovs_list_push_back(&ovsrcu_threads, &perthread->list_node);
93
0
        ovs_mutex_unlock(&ovsrcu_threads_mutex);
94
95
0
        pthread_setspecific(perthread_key, perthread);
96
0
    }
97
0
    return perthread;
98
0
}
99
100
/* Indicates the end of a quiescent state.  See "Details" near the top of
101
 * ovs-rcu.h.
102
 *
103
 * Quiescent states don't stack or nest, so this always ends a quiescent state
104
 * even if ovsrcu_quiesce_start() was called multiple times in a row. */
105
void
106
ovsrcu_quiesce_end(void)
107
0
{
108
0
    ovsrcu_perthread_get();
109
0
}
110
111
static void
112
ovsrcu_quiesced(void)
113
0
{
114
0
    if (single_threaded()) {
115
0
        ovsrcu_call_postponed();
116
0
    } else {
117
0
        static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
118
0
        if (ovsthread_once_start(&once)) {
119
0
            latch_init(&postpone_exit);
120
0
            ovs_barrier_init(&postpone_barrier, 2);
121
0
            ovs_thread_create("urcu", ovsrcu_postpone_thread, NULL);
122
0
            ovsthread_once_done(&once);
123
0
        }
124
0
    }
125
0
}
126
127
/* Indicates the beginning of a quiescent state.  See "Details" near the top of
128
 * ovs-rcu.h. */
129
void
130
ovsrcu_quiesce_start(void)
131
0
{
132
0
    struct ovsrcu_perthread *perthread;
133
134
0
    ovsrcu_init_module();
135
0
    perthread = pthread_getspecific(perthread_key);
136
0
    if (perthread) {
137
0
        pthread_setspecific(perthread_key, NULL);
138
0
        ovsrcu_unregister__(perthread);
139
0
    }
140
141
0
    ovsrcu_quiesced();
142
0
}
143
144
/* Indicates a momentary quiescent state.  See "Details" near the top of
145
 * ovs-rcu.h.
146
 *
147
 * Provides a full memory barrier via seq_change().
148
 */
149
void
150
ovsrcu_quiesce(void)
151
0
{
152
0
    struct ovsrcu_perthread *perthread;
153
154
0
    perthread = ovsrcu_perthread_get();
155
0
    perthread->seqno = seq_read(global_seqno);
156
0
    if (perthread->cbset) {
157
0
        ovsrcu_flush_cbset(perthread);
158
0
    }
159
0
    seq_change(global_seqno);
160
161
0
    ovsrcu_quiesced();
162
0
}
163
164
int
165
ovsrcu_try_quiesce(void)
166
0
{
167
0
    struct ovsrcu_perthread *perthread;
168
0
    int ret = EBUSY;
169
170
0
    ovs_assert(!single_threaded());
171
0
    perthread = ovsrcu_perthread_get();
172
0
    if (!seq_try_lock()) {
173
0
        perthread->seqno = seq_read_protected(global_seqno);
174
0
        if (perthread->cbset) {
175
0
            ovsrcu_flush_cbset__(perthread, true);
176
0
        }
177
0
        seq_change_protected(global_seqno);
178
0
        seq_unlock();
179
0
        ovsrcu_quiesced();
180
0
        ret = 0;
181
0
    }
182
0
    return ret;
183
0
}
184
185
bool
186
ovsrcu_is_quiescent(void)
187
0
{
188
0
    ovsrcu_init_module();
189
0
    return pthread_getspecific(perthread_key) == NULL;
190
0
}
191
192
void
193
ovsrcu_synchronize(void)
194
0
{
195
0
    unsigned int warning_threshold = 1000;
196
0
    uint64_t target_seqno;
197
0
    long long int start;
198
199
0
    if (single_threaded()) {
200
0
        return;
201
0
    }
202
203
0
    target_seqno = seq_read(global_seqno);
204
0
    ovsrcu_quiesce_start();
205
0
    start = time_msec();
206
207
0
    for (;;) {
208
0
        uint64_t cur_seqno = seq_read(global_seqno);
209
0
        struct ovsrcu_perthread *perthread;
210
0
        char stalled_thread[16];
211
0
        unsigned int elapsed;
212
0
        bool done = true;
213
214
0
        ovs_mutex_lock(&ovsrcu_threads_mutex);
215
0
        LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
216
0
            if (perthread->seqno <= target_seqno) {
217
0
                ovs_strlcpy_arrays(stalled_thread, perthread->name);
218
0
                done = false;
219
0
                break;
220
0
            }
221
0
        }
222
0
        ovs_mutex_unlock(&ovsrcu_threads_mutex);
223
224
0
        if (done) {
225
0
            break;
226
0
        }
227
228
0
        elapsed = time_msec() - start;
229
0
        if (elapsed >= warning_threshold) {
230
0
            VLOG_WARN("blocked %u ms waiting for %s to quiesce",
231
0
                      elapsed, stalled_thread);
232
0
            warning_threshold *= 2;
233
0
        }
234
0
        poll_timer_wait_until(start + warning_threshold);
235
236
0
        seq_wait(global_seqno, cur_seqno);
237
0
        poll_block();
238
0
    }
239
0
    ovsrcu_quiesce_end();
240
0
}
241
242
/* Waits until as many postponed callbacks as possible have executed.
243
 *
244
 * As a side effect, stops the background thread that calls the callbacks and
245
 * prevents it from being restarted.  This means that this function should only
246
 * be called soon before a process exits, as a mechanism for releasing memory
247
 * to make memory leaks easier to detect, since any further postponed callbacks
248
 * won't actually get called.
249
 *
250
 * This function can only wait for callbacks registered by the current thread
251
 * and the background thread that calls the callbacks.  Thus, it will be most
252
 * effective if other threads have already exited. */
253
void
254
ovsrcu_exit(void)
255
0
{
256
    /* Stop the postpone thread and wait for it to exit.  Otherwise, there's no
257
     * way to wait for that thread to finish calling callbacks itself. */
258
0
    if (!single_threaded()) {
259
0
        ovsrcu_quiesced();      /* Ensure that the postpone thread exists. */
260
0
        latch_set(&postpone_exit);
261
0
        ovs_barrier_block(&postpone_barrier);
262
0
    }
263
264
    /* Repeatedly:
265
     *
266
     *    - Wait for a grace period.  One important side effect is to push the
267
     *      running thread's cbset into 'flushed_cbsets' so that the next call
268
     *      has something to call.
269
     *
270
     *    - Call all the callbacks in 'flushed_cbsets'.  If there aren't any,
271
     *      we're done, otherwise the callbacks themselves might have requested
272
     *      more deferred callbacks so we go around again.
273
     *
274
     * We limit the number of iterations just in case some bug causes an
275
     * infinite loop.  This function is just for making memory leaks easier to
276
     * spot so there's no point in breaking things on that basis. */
277
0
    for (int i = 0; i < 8; i++) {
278
0
        ovsrcu_synchronize();
279
0
        if (!ovsrcu_call_postponed()) {
280
0
            break;
281
0
        }
282
0
    }
283
0
}
284
285
/* Registers 'function' to be called, passing 'aux' as argument, after the
286
 * next grace period.
287
 *
288
 * The call is guaranteed to happen after the next time all participating
289
 * threads have quiesced at least once, but there is no quarantee that all
290
 * registered functions are called as early as possible, or that the functions
291
 * registered by different threads would be called in the order the
292
 * registrations took place.  In particular, even if two threads provably
293
 * register a function each in a specific order, the functions may still be
294
 * called in the opposite order, depending on the timing of when the threads
295
 * call ovsrcu_quiesce(), how many functions they postpone, and when the
296
 * ovs-rcu thread happens to grab the functions to be called.
297
 *
298
 * All functions registered by a single thread are guaranteed to execute in the
299
 * registering order, however.
300
 *
301
 * This function is more conveniently called through the ovsrcu_postpone()
302
 * macro, which provides a type-safe way to allow 'function''s parameter to be
303
 * any pointer type. */
304
void
305
ovsrcu_postpone__(void (*function)(void *aux), void *aux)
306
0
{
307
0
    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
308
0
    struct ovsrcu_cbset *cbset;
309
0
    struct ovsrcu_cb *cb;
310
311
0
    cbset = perthread->cbset;
312
0
    if (!cbset) {
313
0
        cbset = perthread->cbset = xmalloc(sizeof *perthread->cbset);
314
0
        cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs);
315
0
        cbset->n_allocated = MIN_CBS;
316
0
        cbset->n_cbs = 0;
317
0
    }
318
319
0
    if (cbset->n_cbs == cbset->n_allocated) {
320
0
        cbset->cbs = x2nrealloc(cbset->cbs, &cbset->n_allocated,
321
0
                                sizeof *cbset->cbs);
322
0
    }
323
324
0
    cb = &cbset->cbs[cbset->n_cbs++];
325
0
    cb->function = function;
326
0
    cb->aux = aux;
327
0
}
328
329
static bool
330
ovsrcu_call_postponed(void)
331
0
{
332
0
    struct ovsrcu_cbset *cbset;
333
0
    struct ovs_list cbsets;
334
335
0
    guarded_list_pop_all(&flushed_cbsets, &cbsets);
336
0
    if (ovs_list_is_empty(&cbsets)) {
337
0
        return false;
338
0
    }
339
340
0
    ovsrcu_synchronize();
341
342
0
    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
343
0
        struct ovsrcu_cb *cb;
344
345
0
        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
346
0
            cb->function(cb->aux);
347
0
        }
348
0
        free(cbset->cbs);
349
0
        free(cbset);
350
0
    }
351
352
0
    return true;
353
0
}
354
355
static void *
356
ovsrcu_postpone_thread(void *arg OVS_UNUSED)
357
0
{
358
0
    pthread_detach(pthread_self());
359
360
0
    while (!latch_is_set(&postpone_exit)) {
361
0
        uint64_t seqno = seq_read(flushed_cbsets_seq);
362
0
        if (!ovsrcu_call_postponed()) {
363
0
            seq_wait(flushed_cbsets_seq, seqno);
364
0
            latch_wait(&postpone_exit);
365
0
            poll_block();
366
0
        }
367
0
    }
368
369
0
    ovs_barrier_block(&postpone_barrier);
370
0
    return NULL;
371
0
}
372
373
static void
374
ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
375
0
{
376
0
    struct ovsrcu_cbset *cbset = perthread->cbset;
377
378
0
    if (cbset) {
379
0
        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
380
0
        perthread->cbset = NULL;
381
382
0
        if (protected) {
383
0
            seq_change_protected(flushed_cbsets_seq);
384
0
        } else {
385
0
            seq_change(flushed_cbsets_seq);
386
0
        }
387
0
    }
388
0
}
389
390
static void
391
ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
392
0
{
393
0
    ovsrcu_flush_cbset__(perthread, false);
394
0
}
395
396
static void
397
ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
398
0
{
399
0
    if (perthread->cbset) {
400
0
        ovsrcu_flush_cbset(perthread);
401
0
    }
402
403
0
    ovs_mutex_lock(&ovsrcu_threads_mutex);
404
0
    ovs_list_remove(&perthread->list_node);
405
0
    ovs_mutex_unlock(&ovsrcu_threads_mutex);
406
407
0
    free(perthread);
408
409
0
    seq_change(global_seqno);
410
0
}
411
412
static void
413
ovsrcu_thread_exit_cb(void *perthread)
414
0
{
415
0
    ovsrcu_unregister__(perthread);
416
0
}
417
418
/* Cancels the callback to ovsrcu_thread_exit_cb().
419
 *
420
 * Cancelling the call to the destructor during the main thread exit
421
 * is needed while using pthreads-win32 library in Windows. It has been
422
 * observed that in pthreads-win32, a call to the destructor during
423
 * main thread exit causes undefined behavior. */
424
static void
425
ovsrcu_cancel_thread_exit_cb(void *aux OVS_UNUSED)
426
0
{
427
0
    pthread_setspecific(perthread_key, NULL);
428
0
}
429
430
static void
431
ovsrcu_init_module(void)
432
0
{
433
0
    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
434
0
    if (ovsthread_once_start(&once)) {
435
0
        global_seqno = seq_create();
436
0
        xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb);
437
0
        fatal_signal_add_hook(ovsrcu_cancel_thread_exit_cb, NULL, NULL, true);
438
0
        ovs_list_init(&ovsrcu_threads);
439
0
        ovs_mutex_init(&ovsrcu_threads_mutex);
440
441
0
        guarded_list_init(&flushed_cbsets);
442
0
        flushed_cbsets_seq = seq_create();
443
444
0
        ovsthread_once_done(&once);
445
0
    }
446
0
}
447
448
static void
449
ovsrcu_barrier_func(void *seq_)
450
0
{
451
0
    struct seq *seq = (struct seq *) seq_;
452
0
    seq_change(seq);
453
0
}
454
455
/* Similar to the kernel rcu_barrier, ovsrcu_barrier waits for all outstanding
456
 * RCU callbacks to complete. However, unlike the kernel rcu_barrier, which
457
 * might return immediately if there are no outstanding RCU callbacks,
458
 * this API will at least wait for a grace period.
459
 *
460
 * Another issue the caller might need to know is that the barrier is just
461
 * for "one-shot", i.e. if inside some RCU callbacks, another RCU callback is
462
 * registered, this API only guarantees the first round of RCU callbacks have
463
 * been executed after it returns.
464
 */
465
void
466
ovsrcu_barrier(void)
467
0
{
468
0
    struct seq *seq = seq_create();
469
    /* First let all threads flush their cbsets. */
470
0
    ovsrcu_synchronize();
471
472
    /* Then register a new cbset, ensure this cbset
473
     * is at the tail of the global list. */
474
0
    uint64_t seqno = seq_read(seq);
475
0
    ovsrcu_postpone__(ovsrcu_barrier_func, (void *) seq);
476
477
0
    do {
478
0
        seq_wait(seq, seqno);
479
0
        poll_block();
480
0
    } while (seqno == seq_read(seq));
481
482
0
    seq_destroy(seq);
483
0
}