/src/openvswitch/lib/mpsc-queue.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2021 NVIDIA Corporation. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at: |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | #include <config.h> |
18 | | |
19 | | #include "ovs-atomic.h" |
20 | | |
21 | | #include "mpsc-queue.h" |
22 | | |
23 | | /* Multi-producer, single-consumer queue |
24 | | * ===================================== |
25 | | * |
26 | | * This an implementation of the MPSC queue described by Dmitri Vyukov [1]. |
27 | | * |
28 | | * One atomic exchange operation is done per insertion. Removal in most cases |
29 | | * will not require atomic operation and will use one atomic exchange to close |
30 | | * the queue chain. |
31 | | * |
32 | | * Insertion |
33 | | * ========= |
34 | | * |
35 | | * The queue is implemented using a linked-list. Insertion is done at the |
36 | | * back of the queue, by swapping the current end with the new node atomically, |
37 | | * then pointing the previous end toward the new node. To follow Vyukov |
38 | | * nomenclature, the end-node of the chain is called head. A producer will |
39 | | * only manipulate the head. |
40 | | * |
41 | | * The head swap is atomic, however the link from the previous head to the new |
42 | | * one is done in a separate operation. This means that the chain is |
43 | | * momentarily broken, when the previous head still points to NULL and the |
44 | | * current head has been inserted. |
45 | | * |
46 | | * Considering a series of insertions, the queue state will remain consistent |
47 | | * and the insertions order is compatible with their precedence, thus the |
48 | | * queue is serializable. However, because an insertion consists in two |
49 | | * separate memory transactions, it is not linearizable. |
50 | | * |
51 | | * Removal |
52 | | * ======= |
53 | | * |
54 | | * The consumer must deal with the queue inconsistency. It will manipulate |
55 | | * the tail of the queue and move it along the latest consumed elements. |
56 | | * When an end of the chain of elements is found (the next pointer is NULL), |
57 | | * the tail is compared with the head. |
58 | | * |
59 | | * If both points to different addresses, then the queue is in an inconsistent |
60 | | * state: the tail cannot move forward as the next is NULL, but the head is not |
61 | | * the last element in the chain: this can only happen if the chain is broken. |
62 | | * |
63 | | * In this case, the consumer must wait for the producer to finish writing the |
64 | | * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned. |
65 | | * |
66 | | * Removal is thus in most cases (when there are elements in the queue) |
67 | | * accomplished without using atomics, until the last element of the queue. |
68 | | * There, the head is atomically loaded. If the queue is in a consistent state, |
69 | | * the head is moved back to the queue stub by inserting the stub in the queue: |
70 | | * ending the queue is the same as an insertion, which is one atomic XCHG. |
71 | | * |
72 | | * Forward guarantees |
73 | | * ================== |
74 | | * |
75 | | * Insertion and peeking are wait-free: they will execute in a known bounded |
76 | | * number of instructions, regardless of the state of the queue. |
77 | | * |
78 | | * However, while removal consists in peeking and a constant write to |
79 | | * update the tail, it can repeatedly fail until the queue become consistent. |
80 | | * It is thus dependent on other threads progressing. This means that the |
81 | | * queue forward progress is obstruction-free only. It has a potential for |
82 | | * livelocking. |
83 | | * |
84 | | * The chain will remain broken as long as a producer is not finished writing |
85 | | * its next pointer. If a producer is cancelled for example, the queue could |
86 | | * remain broken for any future readings. This queue should either be used |
87 | | * with cooperative threads or insertion must only be done outside cancellable |
88 | | * sections. |
89 | | * |
90 | | * Performances |
91 | | * ============ |
92 | | * |
93 | | * In benchmarks this structure was better than alternatives such as: |
94 | | * |
95 | | * * A reversed Treiber stack [2], using 1 CAS per operations |
96 | | * and requiring reversal of the node list on removal. |
97 | | * |
98 | | * * Michael-Scott lock-free queue [3], using 2 CAS per operations. |
99 | | * |
100 | | * While it is not linearizable, this queue is well-suited for message passing. |
101 | | * If a proper hardware XCHG operation is used, it scales better than |
102 | | * CAS-based implementations. |
103 | | * |
104 | | * References |
105 | | * ========== |
106 | | * |
107 | | * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue |
108 | | * |
109 | | * [2]: R. K. Treiber. Systems programming: Coping with parallelism. |
110 | | * Technical Report RJ 5118, IBM Almaden Research Center, April 1986. |
111 | | * |
112 | | * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and |
113 | | * Blocking Concurrent Queue Algorithms |
114 | | * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html |
115 | | * |
116 | | */ |
117 | | |
118 | | void |
119 | | mpsc_queue_init(struct mpsc_queue *queue) |
120 | 0 | { |
121 | 0 | atomic_store_relaxed(&queue->head, &queue->stub); |
122 | 0 | atomic_store_relaxed(&queue->tail, &queue->stub); |
123 | 0 | atomic_store_relaxed(&queue->stub.next, NULL); |
124 | |
|
125 | 0 | ovs_mutex_init(&queue->read_lock); |
126 | 0 | } |
127 | | |
128 | | void |
129 | | mpsc_queue_destroy(struct mpsc_queue *queue) |
130 | | OVS_EXCLUDED(queue->read_lock) |
131 | 0 | { |
132 | 0 | ovs_mutex_destroy(&queue->read_lock); |
133 | 0 | } |
134 | | |
135 | | enum mpsc_queue_poll_result |
136 | | mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node) |
137 | | OVS_REQUIRES(queue->read_lock) |
138 | 0 | { |
139 | 0 | struct mpsc_queue_node *tail; |
140 | 0 | struct mpsc_queue_node *next; |
141 | 0 | struct mpsc_queue_node *head; |
142 | |
|
143 | 0 | atomic_read_relaxed(&queue->tail, &tail); |
144 | 0 | atomic_read_explicit(&tail->next, &next, memory_order_acquire); |
145 | |
|
146 | 0 | if (tail == &queue->stub) { |
147 | 0 | if (next == NULL) { |
148 | 0 | return MPSC_QUEUE_EMPTY; |
149 | 0 | } |
150 | | |
151 | 0 | atomic_store_relaxed(&queue->tail, next); |
152 | 0 | tail = next; |
153 | 0 | atomic_read_explicit(&tail->next, &next, memory_order_acquire); |
154 | 0 | } |
155 | | |
156 | 0 | if (next != NULL) { |
157 | 0 | atomic_store_relaxed(&queue->tail, next); |
158 | 0 | *node = tail; |
159 | 0 | return MPSC_QUEUE_ITEM; |
160 | 0 | } |
161 | | |
162 | 0 | atomic_read_explicit(&queue->head, &head, memory_order_acquire); |
163 | 0 | if (tail != head) { |
164 | 0 | return MPSC_QUEUE_RETRY; |
165 | 0 | } |
166 | | |
167 | 0 | mpsc_queue_insert(queue, &queue->stub); |
168 | |
|
169 | 0 | atomic_read_explicit(&tail->next, &next, memory_order_acquire); |
170 | 0 | if (next != NULL) { |
171 | 0 | atomic_store_relaxed(&queue->tail, next); |
172 | 0 | *node = tail; |
173 | 0 | return MPSC_QUEUE_ITEM; |
174 | 0 | } |
175 | | |
176 | 0 | return MPSC_QUEUE_EMPTY; |
177 | 0 | } |
178 | | |
179 | | struct mpsc_queue_node * |
180 | | mpsc_queue_pop(struct mpsc_queue *queue) |
181 | | OVS_REQUIRES(queue->read_lock) |
182 | 0 | { |
183 | 0 | enum mpsc_queue_poll_result result; |
184 | 0 | struct mpsc_queue_node *node; |
185 | |
|
186 | 0 | do { |
187 | 0 | result = mpsc_queue_poll(queue, &node); |
188 | 0 | if (result == MPSC_QUEUE_EMPTY) { |
189 | 0 | return NULL; |
190 | 0 | } |
191 | 0 | } while (result == MPSC_QUEUE_RETRY); |
192 | | |
193 | 0 | return node; |
194 | 0 | } |
195 | | |
196 | | void |
197 | | mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node) |
198 | | OVS_REQUIRES(queue->read_lock) |
199 | 0 | { |
200 | 0 | struct mpsc_queue_node *tail; |
201 | |
|
202 | 0 | atomic_read_relaxed(&queue->tail, &tail); |
203 | 0 | atomic_store_relaxed(&node->next, tail); |
204 | 0 | atomic_store_relaxed(&queue->tail, node); |
205 | 0 | } |
206 | | |
207 | | struct mpsc_queue_node * |
208 | | mpsc_queue_tail(struct mpsc_queue *queue) |
209 | | OVS_REQUIRES(queue->read_lock) |
210 | 0 | { |
211 | 0 | struct mpsc_queue_node *tail; |
212 | 0 | struct mpsc_queue_node *next; |
213 | |
|
214 | 0 | atomic_read_relaxed(&queue->tail, &tail); |
215 | 0 | atomic_read_explicit(&tail->next, &next, memory_order_acquire); |
216 | |
|
217 | 0 | if (tail == &queue->stub) { |
218 | 0 | if (next == NULL) { |
219 | 0 | return NULL; |
220 | 0 | } |
221 | | |
222 | 0 | atomic_store_relaxed(&queue->tail, next); |
223 | 0 | tail = next; |
224 | 0 | } |
225 | | |
226 | 0 | return tail; |
227 | 0 | } |
228 | | |
229 | | /* Get the next element of a node. */ |
230 | | struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue, |
231 | | struct mpsc_queue_node *prev) |
232 | | OVS_REQUIRES(queue->read_lock) |
233 | 0 | { |
234 | 0 | struct mpsc_queue_node *next; |
235 | |
|
236 | 0 | atomic_read_explicit(&prev->next, &next, memory_order_acquire); |
237 | 0 | if (next == &queue->stub) { |
238 | 0 | atomic_read_explicit(&next->next, &next, memory_order_acquire); |
239 | 0 | } |
240 | 0 | return next; |
241 | 0 | } |
242 | | |
243 | | void |
244 | | mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node) |
245 | 0 | { |
246 | 0 | struct mpsc_queue_node *prev; |
247 | |
|
248 | 0 | atomic_store_relaxed(&node->next, NULL); |
249 | 0 | prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel); |
250 | 0 | atomic_store_explicit(&prev->next, node, memory_order_release); |
251 | 0 | } |