Coverage Report

Created: 2025-07-12 06:34

/src/openvswitch/lib/mpsc-queue.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2021 NVIDIA Corporation.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
19
#include "ovs-atomic.h"
20
21
#include "mpsc-queue.h"
22
23
/* Multi-producer, single-consumer queue
24
 * =====================================
25
 *
26
 * This an implementation of the MPSC queue described by Dmitri Vyukov [1].
27
 *
28
 * One atomic exchange operation is done per insertion.  Removal in most cases
29
 * will not require atomic operation and will use one atomic exchange to close
30
 * the queue chain.
31
 *
32
 * Insertion
33
 * =========
34
 *
35
 * The queue is implemented using a linked-list.  Insertion is done at the
36
 * back of the queue, by swapping the current end with the new node atomically,
37
 * then pointing the previous end toward the new node.  To follow Vyukov
38
 * nomenclature, the end-node of the chain is called head.  A producer will
39
 * only manipulate the head.
40
 *
41
 * The head swap is atomic, however the link from the previous head to the new
42
 * one is done in a separate operation.  This means that the chain is
43
 * momentarily broken, when the previous head still points to NULL and the
44
 * current head has been inserted.
45
 *
46
 * Considering a series of insertions, the queue state will remain consistent
47
 * and the insertions order is compatible with their precedence, thus the
48
 * queue is serializable.  However, because an insertion consists in two
49
 * separate memory transactions, it is not linearizable.
50
 *
51
 * Removal
52
 * =======
53
 *
54
 * The consumer must deal with the queue inconsistency.  It will manipulate
55
 * the tail of the queue and move it along the latest consumed elements.
56
 * When an end of the chain of elements is found (the next pointer is NULL),
57
 * the tail is compared with the head.
58
 *
59
 * If both points to different addresses, then the queue is in an inconsistent
60
 * state: the tail cannot move forward as the next is NULL, but the head is not
61
 * the last element in the chain: this can only happen if the chain is broken.
62
 *
63
 * In this case, the consumer must wait for the producer to finish writing the
64
 * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.
65
 *
66
 * Removal is thus in most cases (when there are elements in the queue)
67
 * accomplished without using atomics, until the last element of the queue.
68
 * There, the head is atomically loaded. If the queue is in a consistent state,
69
 * the head is moved back to the queue stub by inserting the stub in the queue:
70
 * ending the queue is the same as an insertion, which is one atomic XCHG.
71
 *
72
 * Forward guarantees
73
 * ==================
74
 *
75
 * Insertion and peeking are wait-free: they will execute in a known bounded
76
 * number of instructions, regardless of the state of the queue.
77
 *
78
 * However, while removal consists in peeking and a constant write to
79
 * update the tail, it can repeatedly fail until the queue become consistent.
80
 * It is thus dependent on other threads progressing.  This means that the
81
 * queue forward progress is obstruction-free only.  It has a potential for
82
 * livelocking.
83
 *
84
 * The chain will remain broken as long as a producer is not finished writing
85
 * its next pointer.  If a producer is cancelled for example, the queue could
86
 * remain broken for any future readings.  This queue should either be used
87
 * with cooperative threads or insertion must only be done outside cancellable
88
 * sections.
89
 *
90
 * Performances
91
 * ============
92
 *
93
 * In benchmarks this structure was better than alternatives such as:
94
 *
95
 *   * A reversed Treiber stack [2], using 1 CAS per operations
96
 *     and requiring reversal of the node list on removal.
97
 *
98
 *   * Michael-Scott lock-free queue [3], using 2 CAS per operations.
99
 *
100
 * While it is not linearizable, this queue is well-suited for message passing.
101
 * If a proper hardware XCHG operation is used, it scales better than
102
 * CAS-based implementations.
103
 *
104
 * References
105
 * ==========
106
 *
107
 * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
108
 *
109
 * [2]: R. K. Treiber. Systems programming: Coping with parallelism.
110
 *      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
111
 *
112
 * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and
113
 *      Blocking Concurrent Queue Algorithms
114
 * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
115
 *
116
 */
117
118
void
119
mpsc_queue_init(struct mpsc_queue *queue)
120
0
{
121
0
    atomic_store_relaxed(&queue->head, &queue->stub);
122
0
    atomic_store_relaxed(&queue->tail, &queue->stub);
123
0
    atomic_store_relaxed(&queue->stub.next, NULL);
124
125
0
    ovs_mutex_init(&queue->read_lock);
126
0
}
127
128
void
129
mpsc_queue_destroy(struct mpsc_queue *queue)
130
    OVS_EXCLUDED(queue->read_lock)
131
0
{
132
0
    ovs_mutex_destroy(&queue->read_lock);
133
0
}
134
135
enum mpsc_queue_poll_result
136
mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)
137
    OVS_REQUIRES(queue->read_lock)
138
0
{
139
0
    struct mpsc_queue_node *tail;
140
0
    struct mpsc_queue_node *next;
141
0
    struct mpsc_queue_node *head;
142
143
0
    atomic_read_relaxed(&queue->tail, &tail);
144
0
    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
145
146
0
    if (tail == &queue->stub) {
147
0
        if (next == NULL) {
148
0
            return MPSC_QUEUE_EMPTY;
149
0
        }
150
151
0
        atomic_store_relaxed(&queue->tail, next);
152
0
        tail = next;
153
0
        atomic_read_explicit(&tail->next, &next, memory_order_acquire);
154
0
    }
155
156
0
    if (next != NULL) {
157
0
        atomic_store_relaxed(&queue->tail, next);
158
0
        *node = tail;
159
0
        return MPSC_QUEUE_ITEM;
160
0
    }
161
162
0
    atomic_read_explicit(&queue->head, &head, memory_order_acquire);
163
0
    if (tail != head) {
164
0
        return MPSC_QUEUE_RETRY;
165
0
    }
166
167
0
    mpsc_queue_insert(queue, &queue->stub);
168
169
0
    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
170
0
    if (next != NULL) {
171
0
        atomic_store_relaxed(&queue->tail, next);
172
0
        *node = tail;
173
0
        return MPSC_QUEUE_ITEM;
174
0
    }
175
176
0
    return MPSC_QUEUE_EMPTY;
177
0
}
178
179
struct mpsc_queue_node *
180
mpsc_queue_pop(struct mpsc_queue *queue)
181
    OVS_REQUIRES(queue->read_lock)
182
0
{
183
0
    enum mpsc_queue_poll_result result;
184
0
    struct mpsc_queue_node *node;
185
186
0
    do {
187
0
        result = mpsc_queue_poll(queue, &node);
188
0
        if (result == MPSC_QUEUE_EMPTY) {
189
0
            return NULL;
190
0
        }
191
0
    } while (result == MPSC_QUEUE_RETRY);
192
193
0
    return node;
194
0
}
195
196
void
197
mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)
198
    OVS_REQUIRES(queue->read_lock)
199
0
{
200
0
    struct mpsc_queue_node *tail;
201
202
0
    atomic_read_relaxed(&queue->tail, &tail);
203
0
    atomic_store_relaxed(&node->next, tail);
204
0
    atomic_store_relaxed(&queue->tail, node);
205
0
}
206
207
struct mpsc_queue_node *
208
mpsc_queue_tail(struct mpsc_queue *queue)
209
    OVS_REQUIRES(queue->read_lock)
210
0
{
211
0
    struct mpsc_queue_node *tail;
212
0
    struct mpsc_queue_node *next;
213
214
0
    atomic_read_relaxed(&queue->tail, &tail);
215
0
    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
216
217
0
    if (tail == &queue->stub) {
218
0
        if (next == NULL) {
219
0
            return NULL;
220
0
        }
221
222
0
        atomic_store_relaxed(&queue->tail, next);
223
0
        tail = next;
224
0
    }
225
226
0
    return tail;
227
0
}
228
229
/* Get the next element of a node. */
230
struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
231
                                        struct mpsc_queue_node *prev)
232
    OVS_REQUIRES(queue->read_lock)
233
0
{
234
0
    struct mpsc_queue_node *next;
235
236
0
    atomic_read_explicit(&prev->next, &next, memory_order_acquire);
237
0
    if (next == &queue->stub) {
238
0
        atomic_read_explicit(&next->next, &next, memory_order_acquire);
239
0
    }
240
0
    return next;
241
0
}
242
243
void
244
mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)
245
0
{
246
0
    struct mpsc_queue_node *prev;
247
248
0
    atomic_store_relaxed(&node->next, NULL);
249
0
    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
250
0
    atomic_store_explicit(&prev->next, node, memory_order_release);
251
0
}