Coverage Report

Created: 2026-03-23 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/concurrent-queue-2.5.0/src/unbounded.rs
Line
Count
Source
1
use alloc::boxed::Box;
2
use core::mem::MaybeUninit;
3
use core::ptr;
4
5
use crossbeam_utils::CachePadded;
6
7
use crate::const_fn;
8
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9
use crate::sync::cell::UnsafeCell;
10
#[allow(unused_imports)]
11
use crate::sync::prelude::*;
12
use crate::{busy_wait, PopError, PushError};
13
14
// Bits indicating the state of a slot:
15
// * If a value has been written into the slot, `WRITE` is set.
16
// * If a value has been read from the slot, `READ` is set.
17
// * If the block is being destroyed, `DESTROY` is set.
18
const WRITE: usize = 1;
19
const READ: usize = 2;
20
const DESTROY: usize = 4;
21
22
// Each block covers one "lap" of indices.
23
const LAP: usize = 32;
24
// The maximum number of items a block can hold.
25
const BLOCK_CAP: usize = LAP - 1;
26
// How many lower bits are reserved for metadata.
27
const SHIFT: usize = 1;
28
// Has two different purposes:
29
// * If set in head, indicates that the block is not the last one.
30
// * If set in tail, indicates that the queue is closed.
31
const MARK_BIT: usize = 1;
32
33
/// A slot in a block.
34
struct Slot<T> {
35
    /// The value.
36
    value: UnsafeCell<MaybeUninit<T>>,
37
38
    /// The state of the slot.
39
    state: AtomicUsize,
40
}
41
42
impl<T> Slot<T> {
43
    #[cfg(not(loom))]
44
    const UNINIT: Slot<T> = Slot {
45
        value: UnsafeCell::new(MaybeUninit::uninit()),
46
        state: AtomicUsize::new(0),
47
    };
48
49
    #[cfg(not(loom))]
50
0
    fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
51
0
        [Self::UNINIT; BLOCK_CAP]
52
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Slot<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::uninit_block
Unexecuted instantiation: <concurrent_queue::unbounded::Slot<surrealdb_types::notification::Notification>>::uninit_block
53
54
    #[cfg(loom)]
55
    fn uninit_block() -> [Slot<T>; BLOCK_CAP] {
56
        // Repeat this expression 31 times.
57
        // Update if we change BLOCK_CAP
58
        macro_rules! repeat_31 {
59
            ($e: expr) => {
60
                [
61
                    $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
62
                    $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e,
63
                ]
64
            };
65
        }
66
67
        repeat_31!(Slot {
68
            value: UnsafeCell::new(MaybeUninit::uninit()),
69
            state: AtomicUsize::new(0),
70
        })
71
    }
72
73
    /// Waits until a value is written into the slot.
74
0
    fn wait_write(&self) {
75
0
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
76
0
            busy_wait();
77
0
        }
78
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Slot<surrealdb_types::notification::Notification>>::wait_write
Unexecuted instantiation: <concurrent_queue::unbounded::Slot<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::wait_write
Unexecuted instantiation: <concurrent_queue::unbounded::Slot<surrealdb_types::notification::Notification>>::wait_write
79
}
80
81
/// A block in a linked list.
82
///
83
/// Each block in the list can hold up to `BLOCK_CAP` values.
84
struct Block<T> {
85
    /// The next block in the linked list.
86
    next: AtomicPtr<Block<T>>,
87
88
    /// Slots for values.
89
    slots: [Slot<T>; BLOCK_CAP],
90
}
91
92
impl<T> Block<T> {
93
    /// Creates an empty block.
94
0
    fn new() -> Block<T> {
95
0
        Block {
96
0
            next: AtomicPtr::new(ptr::null_mut()),
97
0
            slots: Slot::uninit_block(),
98
0
        }
99
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Block<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::new
Unexecuted instantiation: <concurrent_queue::unbounded::Block<surrealdb_types::notification::Notification>>::new
100
101
    /// Waits until the next pointer is set.
102
0
    fn wait_next(&self) -> *mut Block<T> {
103
        loop {
104
0
            let next = self.next.load(Ordering::Acquire);
105
0
            if !next.is_null() {
106
0
                return next;
107
0
            }
108
0
            busy_wait();
109
        }
110
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Block<surrealdb_types::notification::Notification>>::wait_next
Unexecuted instantiation: <concurrent_queue::unbounded::Block<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::wait_next
Unexecuted instantiation: <concurrent_queue::unbounded::Block<surrealdb_types::notification::Notification>>::wait_next
111
112
    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
113
0
    unsafe fn destroy(this: *mut Block<T>, start: usize) {
114
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
115
        // begun destruction of the block.
116
0
        for i in start..BLOCK_CAP - 1 {
117
0
            let slot = (*this).slots.get_unchecked(i);
118
119
            // Mark the `DESTROY` bit if a thread is still using the slot.
120
0
            if slot.state.load(Ordering::Acquire) & READ == 0
121
0
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
122
            {
123
                // If a thread is still using the slot, it will continue destruction of the block.
124
0
                return;
125
0
            }
126
        }
127
128
        // No thread is using the block, now it is safe to destroy it.
129
0
        drop(Box::from_raw(this));
130
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Block<surrealdb_types::notification::Notification>>::destroy
Unexecuted instantiation: <concurrent_queue::unbounded::Block<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::destroy
Unexecuted instantiation: <concurrent_queue::unbounded::Block<surrealdb_types::notification::Notification>>::destroy
131
}
132
133
/// A position in a queue.
134
struct Position<T> {
135
    /// The index in the queue.
136
    index: AtomicUsize,
137
138
    /// The block in the linked list.
139
    block: AtomicPtr<Block<T>>,
140
}
141
142
/// An unbounded queue.
143
pub struct Unbounded<T> {
144
    /// The head of the queue.
145
    head: CachePadded<Position<T>>,
146
147
    /// The tail of the queue.
148
    tail: CachePadded<Position<T>>,
149
}
150
151
impl<T> Unbounded<T> {
152
    const_fn!(
153
        const_if: #[cfg(not(loom))];
154
        /// Creates a new unbounded queue.
155
0
        pub const fn new() -> Unbounded<T> {
156
0
            Unbounded {
157
0
                head: CachePadded::new(Position {
158
0
                    block: AtomicPtr::new(ptr::null_mut()),
159
0
                    index: AtomicUsize::new(0),
160
0
                }),
161
0
                tail: CachePadded::new(Position {
162
0
                    block: AtomicPtr::new(ptr::null_mut()),
163
0
                    index: AtomicUsize::new(0),
164
0
                }),
165
0
            }
166
        }
167
    );
168
169
    /// Pushes an item into the queue.
170
0
    pub fn push(&self, value: T) -> Result<(), PushError<T>> {
171
0
        let mut tail = self.tail.index.load(Ordering::Acquire);
172
0
        let mut block = self.tail.block.load(Ordering::Acquire);
173
0
        let mut next_block = None;
174
175
        loop {
176
            // Check if the queue is closed.
177
0
            if tail & MARK_BIT != 0 {
178
0
                return Err(PushError::Closed(value));
179
0
            }
180
181
            // Calculate the offset of the index into the block.
182
0
            let offset = (tail >> SHIFT) % LAP;
183
184
            // If we reached the end of the block, wait until the next one is installed.
185
0
            if offset == BLOCK_CAP {
186
0
                busy_wait();
187
0
                tail = self.tail.index.load(Ordering::Acquire);
188
0
                block = self.tail.block.load(Ordering::Acquire);
189
0
                continue;
190
0
            }
191
192
            // If we're going to have to install the next block, allocate it in advance in order to
193
            // make the wait for other threads as short as possible.
194
0
            if offset + 1 == BLOCK_CAP && next_block.is_none() {
195
0
                next_block = Some(Box::new(Block::<T>::new()));
196
0
            }
197
198
            // If this is the first value to be pushed into the queue, we need to allocate the
199
            // first block and install it.
200
0
            if block.is_null() {
201
0
                let new = Box::into_raw(Box::new(Block::<T>::new()));
202
203
0
                if self
204
0
                    .tail
205
0
                    .block
206
0
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
207
0
                    .is_ok()
208
0
                {
209
0
                    self.head.block.store(new, Ordering::Release);
210
0
                    block = new;
211
0
                } else {
212
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
213
0
                    tail = self.tail.index.load(Ordering::Acquire);
214
0
                    block = self.tail.block.load(Ordering::Acquire);
215
0
                    continue;
216
                }
217
0
            }
218
219
0
            let new_tail = tail + (1 << SHIFT);
220
221
            // Try advancing the tail forward.
222
0
            match self.tail.index.compare_exchange_weak(
223
0
                tail,
224
0
                new_tail,
225
0
                Ordering::SeqCst,
226
0
                Ordering::Acquire,
227
0
            ) {
228
                Ok(_) => unsafe {
229
                    // If we've reached the end of the block, install the next one.
230
0
                    if offset + 1 == BLOCK_CAP {
231
0
                        let next_block = Box::into_raw(next_block.unwrap());
232
0
                        self.tail.block.store(next_block, Ordering::Release);
233
0
                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
234
0
                        (*block).next.store(next_block, Ordering::Release);
235
0
                    }
236
237
                    // Write the value into the slot.
238
0
                    let slot = (*block).slots.get_unchecked(offset);
239
0
                    slot.value.with_mut(|slot| {
240
0
                        slot.write(MaybeUninit::new(value));
241
0
                    });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::push::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::push::{closure#0}
242
0
                    slot.state.fetch_or(WRITE, Ordering::Release);
243
0
                    return Ok(());
244
                },
245
0
                Err(t) => {
246
0
                    tail = t;
247
0
                    block = self.tail.block.load(Ordering::Acquire);
248
0
                }
249
            }
250
        }
251
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::push
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::push
252
253
    /// Pops an item from the queue.
254
0
    pub fn pop(&self) -> Result<T, PopError> {
255
0
        let mut head = self.head.index.load(Ordering::Acquire);
256
0
        let mut block = self.head.block.load(Ordering::Acquire);
257
258
        loop {
259
            // Calculate the offset of the index into the block.
260
0
            let offset = (head >> SHIFT) % LAP;
261
262
            // If we reached the end of the block, wait until the next one is installed.
263
0
            if offset == BLOCK_CAP {
264
0
                busy_wait();
265
0
                head = self.head.index.load(Ordering::Acquire);
266
0
                block = self.head.block.load(Ordering::Acquire);
267
0
                continue;
268
0
            }
269
270
0
            let mut new_head = head + (1 << SHIFT);
271
272
0
            if new_head & MARK_BIT == 0 {
273
0
                crate::full_fence();
274
0
                let tail = self.tail.index.load(Ordering::Relaxed);
275
276
                // If the tail equals the head, that means the queue is empty.
277
0
                if head >> SHIFT == tail >> SHIFT {
278
                    // Check if the queue is closed.
279
0
                    if tail & MARK_BIT != 0 {
280
0
                        return Err(PopError::Closed);
281
                    } else {
282
0
                        return Err(PopError::Empty);
283
                    }
284
0
                }
285
286
                // If head and tail are not in the same block, set `MARK_BIT` in head.
287
0
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
288
0
                    new_head |= MARK_BIT;
289
0
                }
290
0
            }
291
292
            // The block can be null here only if the first push operation is in progress.
293
0
            if block.is_null() {
294
0
                busy_wait();
295
0
                head = self.head.index.load(Ordering::Acquire);
296
0
                block = self.head.block.load(Ordering::Acquire);
297
0
                continue;
298
0
            }
299
300
            // Try moving the head index forward.
301
0
            match self.head.index.compare_exchange_weak(
302
0
                head,
303
0
                new_head,
304
0
                Ordering::SeqCst,
305
0
                Ordering::Acquire,
306
0
            ) {
307
                Ok(_) => unsafe {
308
                    // If we've reached the end of the block, move to the next one.
309
0
                    if offset + 1 == BLOCK_CAP {
310
0
                        let next = (*block).wait_next();
311
0
                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
312
0
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
313
0
                            next_index |= MARK_BIT;
314
0
                        }
315
316
0
                        self.head.block.store(next, Ordering::Release);
317
0
                        self.head.index.store(next_index, Ordering::Release);
318
0
                    }
319
320
                    // Read the value.
321
0
                    let slot = (*block).slots.get_unchecked(offset);
322
0
                    slot.wait_write();
323
0
                    let value = slot.value.with_mut(|slot| slot.read().assume_init());
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::pop::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::pop::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::pop::{closure#0}
324
325
                    // Destroy the block if we've reached the end, or if another thread wanted to
326
                    // destroy but couldn't because we were busy reading from the slot.
327
0
                    if offset + 1 == BLOCK_CAP {
328
0
                        Block::destroy(block, 0);
329
0
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
330
0
                        Block::destroy(block, offset + 1);
331
0
                    }
332
333
0
                    return Ok(value);
334
                },
335
0
                Err(h) => {
336
0
                    head = h;
337
0
                    block = self.head.block.load(Ordering::Acquire);
338
0
                }
339
            }
340
        }
341
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::pop
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::pop
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::pop
342
343
    /// Returns the number of items in the queue.
344
    pub fn len(&self) -> usize {
345
        loop {
346
            // Load the tail index, then load the head index.
347
            let mut tail = self.tail.index.load(Ordering::SeqCst);
348
            let mut head = self.head.index.load(Ordering::SeqCst);
349
350
            // If the tail index didn't change, we've got consistent indices to work with.
351
            if self.tail.index.load(Ordering::SeqCst) == tail {
352
                // Erase the lower bits.
353
                tail &= !((1 << SHIFT) - 1);
354
                head &= !((1 << SHIFT) - 1);
355
356
                // Fix up indices if they fall onto block ends.
357
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
358
                    tail = tail.wrapping_add(1 << SHIFT);
359
                }
360
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
361
                    head = head.wrapping_add(1 << SHIFT);
362
                }
363
364
                // Rotate indices so that head falls into the first block.
365
                let lap = (head >> SHIFT) / LAP;
366
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
367
                head = head.wrapping_sub((lap * LAP) << SHIFT);
368
369
                // Remove the lower bits.
370
                tail >>= SHIFT;
371
                head >>= SHIFT;
372
373
                // Return the difference minus the number of blocks between tail and head.
374
                return tail - head - tail / LAP;
375
            }
376
        }
377
    }
378
379
    /// Returns `true` if the queue is empty.
380
    pub fn is_empty(&self) -> bool {
381
        let head = self.head.index.load(Ordering::SeqCst);
382
        let tail = self.tail.index.load(Ordering::SeqCst);
383
        head >> SHIFT == tail >> SHIFT
384
    }
385
386
    /// Returns `true` if the queue is full.
387
    pub fn is_full(&self) -> bool {
388
        false
389
    }
390
391
    /// Closes the queue.
392
    ///
393
    /// Returns `true` if this call closed the queue.
394
0
    pub fn close(&self) -> bool {
395
0
        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
396
0
        tail & MARK_BIT == 0
397
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>>>::close
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification>>::close
398
399
    /// Returns `true` if the queue is closed.
400
    pub fn is_closed(&self) -> bool {
401
        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
402
    }
403
}
404
405
impl<T> Drop for Unbounded<T> {
406
0
    fn drop(&mut self) {
407
0
        let Self { head, tail } = self;
408
0
        let Position { index: head, block } = &mut **head;
409
410
0
        head.with_mut(|&mut mut head| {
411
0
            tail.index.with_mut(|&mut mut tail| {
412
                // Erase the lower bits.
413
0
                head &= !((1 << SHIFT) - 1);
414
0
                tail &= !((1 << SHIFT) - 1);
415
416
                unsafe {
417
                    // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
418
0
                    while head != tail {
419
0
                        let offset = (head >> SHIFT) % LAP;
420
421
0
                        if offset < BLOCK_CAP {
422
                            // Drop the value in the slot.
423
0
                            block.with_mut(|block| {
424
0
                                let slot = (**block).slots.get_unchecked(offset);
425
0
                                slot.value.with_mut(|slot| {
426
0
                                    let value = &mut *slot;
427
0
                                    value.as_mut_ptr().drop_in_place();
428
0
                                });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#0}::{closure#0}
429
0
                            });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#0}
430
                        } else {
431
                            // Deallocate the block and move to the next one.
432
0
                            block.with_mut(|block| {
433
0
                                let next_block = (**block).next.with_mut(|next| *next);
434
0
                                drop(Box::from_raw(*block));
435
0
                                *block = next_block;
436
0
                            });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#1}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#1}
437
                        }
438
439
0
                        head = head.wrapping_add(1 << SHIFT);
440
                    }
441
442
                    // Deallocate the last remaining block.
443
0
                    block.with_mut(|block| {
444
0
                        if !block.is_null() {
445
0
                            drop(Box::from_raw(*block));
446
0
                        }
447
0
                    });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#2}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}::{closure#2}
448
                }
449
0
            });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}::{closure#0}
450
0
        });
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop::{closure#0}
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop::{closure#0}
451
0
    }
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<core::result::Result<surrealdb_core::exec::ValueBatch, surrealdb_core::expr::ControlFlow>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <concurrent_queue::unbounded::Unbounded<surrealdb_types::notification::Notification> as core::ops::drop::Drop>::drop
452
}