/src/openvswitch/lib/ovs-rcu.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2014, 2017 Nicira, Inc. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at: |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | #include <config.h> |
18 | | #include <errno.h> |
19 | | #include "ovs-rcu.h" |
20 | | #include "fatal-signal.h" |
21 | | #include "guarded-list.h" |
22 | | #include "latch.h" |
23 | | #include "openvswitch/list.h" |
24 | | #include "ovs-thread.h" |
25 | | #include "openvswitch/poll-loop.h" |
26 | | #include "seq.h" |
27 | | #include "timeval.h" |
28 | | #include "util.h" |
29 | | #include "openvswitch/vlog.h" |
30 | | |
31 | | VLOG_DEFINE_THIS_MODULE(ovs_rcu); |
32 | | |
33 | 0 | #define MIN_CBS 16 |
34 | | |
35 | | struct ovsrcu_cb { |
36 | | void (*function)(void *aux); |
37 | | void *aux; |
38 | | }; |
39 | | |
40 | | struct ovsrcu_cbset { |
41 | | struct ovs_list list_node; |
42 | | struct ovsrcu_cb *cbs; |
43 | | size_t n_allocated; |
44 | | int n_cbs; |
45 | | }; |
46 | | |
47 | | struct ovsrcu_perthread { |
48 | | struct ovs_list list_node; /* In global list. */ |
49 | | |
50 | | uint64_t seqno; |
51 | | struct ovsrcu_cbset *cbset; |
52 | | char name[16]; /* This thread's name. */ |
53 | | }; |
54 | | |
55 | | static struct seq *global_seqno; |
56 | | |
57 | | static pthread_key_t perthread_key; |
58 | | static struct ovs_list ovsrcu_threads; |
59 | | static struct ovs_mutex ovsrcu_threads_mutex; |
60 | | |
61 | | static struct guarded_list flushed_cbsets; |
62 | | static struct seq *flushed_cbsets_seq; |
63 | | |
64 | | static struct latch postpone_exit; |
65 | | static struct ovs_barrier postpone_barrier; |
66 | | |
67 | | static void ovsrcu_init_module(void); |
68 | | static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); |
69 | | static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); |
70 | | static void ovsrcu_unregister__(struct ovsrcu_perthread *); |
71 | | static bool ovsrcu_call_postponed(void); |
72 | | static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); |
73 | | |
74 | | static struct ovsrcu_perthread * |
75 | | ovsrcu_perthread_get(void) |
76 | 0 | { |
77 | 0 | struct ovsrcu_perthread *perthread; |
78 | |
|
79 | 0 | ovsrcu_init_module(); |
80 | |
|
81 | 0 | perthread = pthread_getspecific(perthread_key); |
82 | 0 | if (!perthread) { |
83 | 0 | const char *name = get_subprogram_name(); |
84 | |
|
85 | 0 | perthread = xmalloc(sizeof *perthread); |
86 | 0 | perthread->seqno = seq_read(global_seqno); |
87 | 0 | perthread->cbset = NULL; |
88 | 0 | ovs_strlcpy(perthread->name, name[0] ? name : "main", |
89 | 0 | sizeof perthread->name); |
90 | |
|
91 | 0 | ovs_mutex_lock(&ovsrcu_threads_mutex); |
92 | 0 | ovs_list_push_back(&ovsrcu_threads, &perthread->list_node); |
93 | 0 | ovs_mutex_unlock(&ovsrcu_threads_mutex); |
94 | |
|
95 | 0 | pthread_setspecific(perthread_key, perthread); |
96 | 0 | } |
97 | 0 | return perthread; |
98 | 0 | } |
99 | | |
100 | | /* Indicates the end of a quiescent state. See "Details" near the top of |
101 | | * ovs-rcu.h. |
102 | | * |
103 | | * Quiescent states don't stack or nest, so this always ends a quiescent state |
104 | | * even if ovsrcu_quiesce_start() was called multiple times in a row. */ |
105 | | void |
106 | | ovsrcu_quiesce_end(void) |
107 | 0 | { |
108 | 0 | ovsrcu_perthread_get(); |
109 | 0 | } |
110 | | |
111 | | static void |
112 | | ovsrcu_quiesced(void) |
113 | 0 | { |
114 | 0 | if (single_threaded()) { |
115 | 0 | ovsrcu_call_postponed(); |
116 | 0 | } else { |
117 | 0 | static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; |
118 | 0 | if (ovsthread_once_start(&once)) { |
119 | 0 | latch_init(&postpone_exit); |
120 | 0 | ovs_barrier_init(&postpone_barrier, 2); |
121 | 0 | ovs_thread_create("urcu", ovsrcu_postpone_thread, NULL); |
122 | 0 | ovsthread_once_done(&once); |
123 | 0 | } |
124 | 0 | } |
125 | 0 | } |
126 | | |
127 | | /* Indicates the beginning of a quiescent state. See "Details" near the top of |
128 | | * ovs-rcu.h. */ |
129 | | void |
130 | | ovsrcu_quiesce_start(void) |
131 | 0 | { |
132 | 0 | struct ovsrcu_perthread *perthread; |
133 | |
|
134 | 0 | ovsrcu_init_module(); |
135 | 0 | perthread = pthread_getspecific(perthread_key); |
136 | 0 | if (perthread) { |
137 | 0 | pthread_setspecific(perthread_key, NULL); |
138 | 0 | ovsrcu_unregister__(perthread); |
139 | 0 | } |
140 | |
|
141 | 0 | ovsrcu_quiesced(); |
142 | 0 | } |
143 | | |
144 | | /* Indicates a momentary quiescent state. See "Details" near the top of |
145 | | * ovs-rcu.h. |
146 | | * |
147 | | * Provides a full memory barrier via seq_change(). |
148 | | */ |
149 | | void |
150 | | ovsrcu_quiesce(void) |
151 | 0 | { |
152 | 0 | struct ovsrcu_perthread *perthread; |
153 | |
|
154 | 0 | perthread = ovsrcu_perthread_get(); |
155 | 0 | perthread->seqno = seq_read(global_seqno); |
156 | 0 | if (perthread->cbset) { |
157 | 0 | ovsrcu_flush_cbset(perthread); |
158 | 0 | } |
159 | 0 | seq_change(global_seqno); |
160 | |
|
161 | 0 | ovsrcu_quiesced(); |
162 | 0 | } |
163 | | |
164 | | int |
165 | | ovsrcu_try_quiesce(void) |
166 | 0 | { |
167 | 0 | struct ovsrcu_perthread *perthread; |
168 | 0 | int ret = EBUSY; |
169 | |
|
170 | 0 | ovs_assert(!single_threaded()); |
171 | 0 | perthread = ovsrcu_perthread_get(); |
172 | 0 | if (!seq_try_lock()) { |
173 | 0 | perthread->seqno = seq_read_protected(global_seqno); |
174 | 0 | if (perthread->cbset) { |
175 | 0 | ovsrcu_flush_cbset__(perthread, true); |
176 | 0 | } |
177 | 0 | seq_change_protected(global_seqno); |
178 | 0 | seq_unlock(); |
179 | 0 | ovsrcu_quiesced(); |
180 | 0 | ret = 0; |
181 | 0 | } |
182 | 0 | return ret; |
183 | 0 | } |
184 | | |
185 | | bool |
186 | | ovsrcu_is_quiescent(void) |
187 | 0 | { |
188 | 0 | ovsrcu_init_module(); |
189 | 0 | return pthread_getspecific(perthread_key) == NULL; |
190 | 0 | } |
191 | | |
192 | | void |
193 | | ovsrcu_synchronize(void) |
194 | 0 | { |
195 | 0 | unsigned int warning_threshold = 1000; |
196 | 0 | uint64_t target_seqno; |
197 | 0 | long long int start; |
198 | |
|
199 | 0 | if (single_threaded()) { |
200 | 0 | return; |
201 | 0 | } |
202 | | |
203 | 0 | target_seqno = seq_read(global_seqno); |
204 | 0 | ovsrcu_quiesce_start(); |
205 | 0 | start = time_msec(); |
206 | |
|
207 | 0 | for (;;) { |
208 | 0 | uint64_t cur_seqno = seq_read(global_seqno); |
209 | 0 | struct ovsrcu_perthread *perthread; |
210 | 0 | char stalled_thread[16]; |
211 | 0 | unsigned int elapsed; |
212 | 0 | bool done = true; |
213 | |
|
214 | 0 | ovs_mutex_lock(&ovsrcu_threads_mutex); |
215 | 0 | LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) { |
216 | 0 | if (perthread->seqno <= target_seqno) { |
217 | 0 | ovs_strlcpy_arrays(stalled_thread, perthread->name); |
218 | 0 | done = false; |
219 | 0 | break; |
220 | 0 | } |
221 | 0 | } |
222 | 0 | ovs_mutex_unlock(&ovsrcu_threads_mutex); |
223 | |
|
224 | 0 | if (done) { |
225 | 0 | break; |
226 | 0 | } |
227 | | |
228 | 0 | elapsed = time_msec() - start; |
229 | 0 | if (elapsed >= warning_threshold) { |
230 | 0 | VLOG_WARN("blocked %u ms waiting for %s to quiesce", |
231 | 0 | elapsed, stalled_thread); |
232 | 0 | warning_threshold *= 2; |
233 | 0 | } |
234 | 0 | poll_timer_wait_until(start + warning_threshold); |
235 | |
|
236 | 0 | seq_wait(global_seqno, cur_seqno); |
237 | 0 | poll_block(); |
238 | 0 | } |
239 | 0 | ovsrcu_quiesce_end(); |
240 | 0 | } |
241 | | |
242 | | /* Waits until as many postponed callbacks as possible have executed. |
243 | | * |
244 | | * As a side effect, stops the background thread that calls the callbacks and |
245 | | * prevents it from being restarted. This means that this function should only |
246 | | * be called soon before a process exits, as a mechanism for releasing memory |
247 | | * to make memory leaks easier to detect, since any further postponed callbacks |
248 | | * won't actually get called. |
249 | | * |
250 | | * This function can only wait for callbacks registered by the current thread |
251 | | * and the background thread that calls the callbacks. Thus, it will be most |
252 | | * effective if other threads have already exited. */ |
253 | | void |
254 | | ovsrcu_exit(void) |
255 | 0 | { |
256 | | /* Stop the postpone thread and wait for it to exit. Otherwise, there's no |
257 | | * way to wait for that thread to finish calling callbacks itself. */ |
258 | 0 | if (!single_threaded()) { |
259 | 0 | ovsrcu_quiesced(); /* Ensure that the postpone thread exists. */ |
260 | 0 | latch_set(&postpone_exit); |
261 | 0 | ovs_barrier_block(&postpone_barrier); |
262 | 0 | } |
263 | | |
264 | | /* Repeatedly: |
265 | | * |
266 | | * - Wait for a grace period. One important side effect is to push the |
267 | | * running thread's cbset into 'flushed_cbsets' so that the next call |
268 | | * has something to call. |
269 | | * |
270 | | * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, |
271 | | * we're done, otherwise the callbacks themselves might have requested |
272 | | * more deferred callbacks so we go around again. |
273 | | * |
274 | | * We limit the number of iterations just in case some bug causes an |
275 | | * infinite loop. This function is just for making memory leaks easier to |
276 | | * spot so there's no point in breaking things on that basis. */ |
277 | 0 | for (int i = 0; i < 8; i++) { |
278 | 0 | ovsrcu_synchronize(); |
279 | 0 | if (!ovsrcu_call_postponed()) { |
280 | 0 | break; |
281 | 0 | } |
282 | 0 | } |
283 | 0 | } |
284 | | |
285 | | /* Registers 'function' to be called, passing 'aux' as argument, after the |
286 | | * next grace period. |
287 | | * |
288 | | * The call is guaranteed to happen after the next time all participating |
289 | | * threads have quiesced at least once, but there is no quarantee that all |
290 | | * registered functions are called as early as possible, or that the functions |
291 | | * registered by different threads would be called in the order the |
292 | | * registrations took place. In particular, even if two threads provably |
293 | | * register a function each in a specific order, the functions may still be |
294 | | * called in the opposite order, depending on the timing of when the threads |
295 | | * call ovsrcu_quiesce(), how many functions they postpone, and when the |
296 | | * ovs-rcu thread happens to grab the functions to be called. |
297 | | * |
298 | | * All functions registered by a single thread are guaranteed to execute in the |
299 | | * registering order, however. |
300 | | * |
301 | | * This function is more conveniently called through the ovsrcu_postpone() |
302 | | * macro, which provides a type-safe way to allow 'function''s parameter to be |
303 | | * any pointer type. */ |
304 | | void |
305 | | ovsrcu_postpone__(void (*function)(void *aux), void *aux) |
306 | 0 | { |
307 | 0 | struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); |
308 | 0 | struct ovsrcu_cbset *cbset; |
309 | 0 | struct ovsrcu_cb *cb; |
310 | |
|
311 | 0 | cbset = perthread->cbset; |
312 | 0 | if (!cbset) { |
313 | 0 | cbset = perthread->cbset = xmalloc(sizeof *perthread->cbset); |
314 | 0 | cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); |
315 | 0 | cbset->n_allocated = MIN_CBS; |
316 | 0 | cbset->n_cbs = 0; |
317 | 0 | } |
318 | |
|
319 | 0 | if (cbset->n_cbs == cbset->n_allocated) { |
320 | 0 | cbset->cbs = x2nrealloc(cbset->cbs, &cbset->n_allocated, |
321 | 0 | sizeof *cbset->cbs); |
322 | 0 | } |
323 | |
|
324 | 0 | cb = &cbset->cbs[cbset->n_cbs++]; |
325 | 0 | cb->function = function; |
326 | 0 | cb->aux = aux; |
327 | 0 | } |
328 | | |
329 | | static bool |
330 | | ovsrcu_call_postponed(void) |
331 | 0 | { |
332 | 0 | struct ovsrcu_cbset *cbset; |
333 | 0 | struct ovs_list cbsets; |
334 | |
|
335 | 0 | guarded_list_pop_all(&flushed_cbsets, &cbsets); |
336 | 0 | if (ovs_list_is_empty(&cbsets)) { |
337 | 0 | return false; |
338 | 0 | } |
339 | | |
340 | 0 | ovsrcu_synchronize(); |
341 | |
|
342 | 0 | LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { |
343 | 0 | struct ovsrcu_cb *cb; |
344 | |
|
345 | 0 | for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { |
346 | 0 | cb->function(cb->aux); |
347 | 0 | } |
348 | 0 | free(cbset->cbs); |
349 | 0 | free(cbset); |
350 | 0 | } |
351 | |
|
352 | 0 | return true; |
353 | 0 | } |
354 | | |
355 | | static void * |
356 | | ovsrcu_postpone_thread(void *arg OVS_UNUSED) |
357 | 0 | { |
358 | 0 | pthread_detach(pthread_self()); |
359 | |
|
360 | 0 | while (!latch_is_set(&postpone_exit)) { |
361 | 0 | uint64_t seqno = seq_read(flushed_cbsets_seq); |
362 | 0 | if (!ovsrcu_call_postponed()) { |
363 | 0 | seq_wait(flushed_cbsets_seq, seqno); |
364 | 0 | latch_wait(&postpone_exit); |
365 | 0 | poll_block(); |
366 | 0 | } |
367 | 0 | } |
368 | |
|
369 | 0 | ovs_barrier_block(&postpone_barrier); |
370 | 0 | return NULL; |
371 | 0 | } |
372 | | |
373 | | static void |
374 | | ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) |
375 | 0 | { |
376 | 0 | struct ovsrcu_cbset *cbset = perthread->cbset; |
377 | |
|
378 | 0 | if (cbset) { |
379 | 0 | guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); |
380 | 0 | perthread->cbset = NULL; |
381 | |
|
382 | 0 | if (protected) { |
383 | 0 | seq_change_protected(flushed_cbsets_seq); |
384 | 0 | } else { |
385 | 0 | seq_change(flushed_cbsets_seq); |
386 | 0 | } |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | static void |
391 | | ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) |
392 | 0 | { |
393 | 0 | ovsrcu_flush_cbset__(perthread, false); |
394 | 0 | } |
395 | | |
396 | | static void |
397 | | ovsrcu_unregister__(struct ovsrcu_perthread *perthread) |
398 | 0 | { |
399 | 0 | if (perthread->cbset) { |
400 | 0 | ovsrcu_flush_cbset(perthread); |
401 | 0 | } |
402 | |
|
403 | 0 | ovs_mutex_lock(&ovsrcu_threads_mutex); |
404 | 0 | ovs_list_remove(&perthread->list_node); |
405 | 0 | ovs_mutex_unlock(&ovsrcu_threads_mutex); |
406 | |
|
407 | 0 | free(perthread); |
408 | |
|
409 | 0 | seq_change(global_seqno); |
410 | 0 | } |
411 | | |
412 | | static void |
413 | | ovsrcu_thread_exit_cb(void *perthread) |
414 | 0 | { |
415 | 0 | ovsrcu_unregister__(perthread); |
416 | 0 | } |
417 | | |
418 | | /* Cancels the callback to ovsrcu_thread_exit_cb(). |
419 | | * |
420 | | * Cancelling the call to the destructor during the main thread exit |
421 | | * is needed while using pthreads-win32 library in Windows. It has been |
422 | | * observed that in pthreads-win32, a call to the destructor during |
423 | | * main thread exit causes undefined behavior. */ |
424 | | static void |
425 | | ovsrcu_cancel_thread_exit_cb(void *aux OVS_UNUSED) |
426 | 0 | { |
427 | 0 | pthread_setspecific(perthread_key, NULL); |
428 | 0 | } |
429 | | |
430 | | static void |
431 | | ovsrcu_init_module(void) |
432 | 0 | { |
433 | 0 | static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; |
434 | 0 | if (ovsthread_once_start(&once)) { |
435 | 0 | global_seqno = seq_create(); |
436 | 0 | xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb); |
437 | 0 | fatal_signal_add_hook(ovsrcu_cancel_thread_exit_cb, NULL, NULL, true); |
438 | 0 | ovs_list_init(&ovsrcu_threads); |
439 | 0 | ovs_mutex_init(&ovsrcu_threads_mutex); |
440 | |
|
441 | 0 | guarded_list_init(&flushed_cbsets); |
442 | 0 | flushed_cbsets_seq = seq_create(); |
443 | |
|
444 | 0 | ovsthread_once_done(&once); |
445 | 0 | } |
446 | 0 | } |
447 | | |
448 | | static void |
449 | | ovsrcu_barrier_func(void *seq_) |
450 | 0 | { |
451 | 0 | struct seq *seq = (struct seq *) seq_; |
452 | 0 | seq_change(seq); |
453 | 0 | } |
454 | | |
455 | | /* Similar to the kernel rcu_barrier, ovsrcu_barrier waits for all outstanding |
456 | | * RCU callbacks to complete. However, unlike the kernel rcu_barrier, which |
457 | | * might return immediately if there are no outstanding RCU callbacks, |
458 | | * this API will at least wait for a grace period. |
459 | | * |
460 | | * Another issue the caller might need to know is that the barrier is just |
461 | | * for "one-shot", i.e. if inside some RCU callbacks, another RCU callback is |
462 | | * registered, this API only guarantees the first round of RCU callbacks have |
463 | | * been executed after it returns. |
464 | | */ |
465 | | void |
466 | | ovsrcu_barrier(void) |
467 | 0 | { |
468 | 0 | struct seq *seq = seq_create(); |
469 | | /* First let all threads flush their cbsets. */ |
470 | 0 | ovsrcu_synchronize(); |
471 | | |
472 | | /* Then register a new cbset, ensure this cbset |
473 | | * is at the tail of the global list. */ |
474 | 0 | uint64_t seqno = seq_read(seq); |
475 | 0 | ovsrcu_postpone__(ovsrcu_barrier_func, (void *) seq); |
476 | |
|
477 | 0 | do { |
478 | 0 | seq_wait(seq, seqno); |
479 | 0 | poll_block(); |
480 | 0 | } while (seqno == seq_read(seq)); |
481 | |
|
482 | 0 | seq_destroy(seq); |
483 | 0 | } |