Coverage Report

Created: 2024-10-16 07:58

/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-queue-0.3.11/src/seg_queue.rs
Line
Count
Source (jump to first uncovered line)
1
use alloc::boxed::Box;
2
use core::cell::UnsafeCell;
3
use core::fmt;
4
use core::marker::PhantomData;
5
use core::mem::MaybeUninit;
6
use core::panic::{RefUnwindSafe, UnwindSafe};
7
use core::ptr;
8
use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9
10
use crossbeam_utils::{Backoff, CachePadded};
11
12
// Bits indicating the state of a slot:
13
// * If a value has been written into the slot, `WRITE` is set.
14
// * If a value has been read from the slot, `READ` is set.
15
// * If the block is being destroyed, `DESTROY` is set.
16
const WRITE: usize = 1;
17
const READ: usize = 2;
18
const DESTROY: usize = 4;
19
20
// Each block covers one "lap" of indices.
21
const LAP: usize = 32;
22
// The maximum number of values a block can hold.
23
const BLOCK_CAP: usize = LAP - 1;
24
// How many lower bits are reserved for metadata.
25
const SHIFT: usize = 1;
26
// Indicates that the block is not the last one.
27
const HAS_NEXT: usize = 1;
28
29
/// A slot in a block.
30
struct Slot<T> {
31
    /// The value.
32
    value: UnsafeCell<MaybeUninit<T>>,
33
34
    /// The state of the slot.
35
    state: AtomicUsize,
36
}
37
38
impl<T> Slot<T> {
39
    const UNINIT: Self = Self {
40
        value: UnsafeCell::new(MaybeUninit::uninit()),
41
        state: AtomicUsize::new(0),
42
    };
43
44
    /// Waits until a value is written into the slot.
45
0
    fn wait_write(&self) {
46
0
        let backoff = Backoff::new();
47
0
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
48
0
            backoff.snooze();
49
0
        }
50
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Slot<corosensei::stack::unix::DefaultStack>>::wait_write
Unexecuted instantiation: <crossbeam_queue::seg_queue::Slot<corosensei::stack::unix::DefaultStack>>::wait_write
51
}
52
53
/// A block in a linked list.
54
///
55
/// Each block in the list can hold up to `BLOCK_CAP` values.
56
struct Block<T> {
57
    /// The next block in the linked list.
58
    next: AtomicPtr<Block<T>>,
59
60
    /// Slots for values.
61
    slots: [Slot<T>; BLOCK_CAP],
62
}
63
64
impl<T> Block<T> {
65
    /// Creates an empty block that starts at `start_index`.
66
0
    fn new() -> Block<T> {
67
0
        Self {
68
0
            next: AtomicPtr::new(ptr::null_mut()),
69
0
            slots: [Slot::UNINIT; BLOCK_CAP],
70
0
        }
71
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::new
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::new
72
73
    /// Waits until the next pointer is set.
74
0
    fn wait_next(&self) -> *mut Block<T> {
75
0
        let backoff = Backoff::new();
76
        loop {
77
0
            let next = self.next.load(Ordering::Acquire);
78
0
            if !next.is_null() {
79
0
                return next;
80
0
            }
81
0
            backoff.snooze();
82
        }
83
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::wait_next
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::wait_next
84
85
    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
86
0
    unsafe fn destroy(this: *mut Block<T>, start: usize) {
87
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88
        // begun destruction of the block.
89
0
        for i in start..BLOCK_CAP - 1 {
90
0
            let slot = (*this).slots.get_unchecked(i);
91
0
92
0
            // Mark the `DESTROY` bit if a thread is still using the slot.
93
0
            if slot.state.load(Ordering::Acquire) & READ == 0
94
0
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
95
            {
96
                // If a thread is still using the slot, it will continue destruction of the block.
97
0
                return;
98
0
            }
99
        }
100
101
        // No thread is using the block, now it is safe to destroy it.
102
0
        drop(Box::from_raw(this));
103
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::destroy
Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::destroy
104
}
105
106
/// A position in a queue.
107
struct Position<T> {
108
    /// The index in the queue.
109
    index: AtomicUsize,
110
111
    /// The block in the linked list.
112
    block: AtomicPtr<Block<T>>,
113
}
114
115
/// An unbounded multi-producer multi-consumer queue.
116
///
117
/// This queue is implemented as a linked list of segments, where each segment is a small buffer
118
/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119
/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120
/// this queue is somewhat slower than [`ArrayQueue`].
121
///
122
/// [`ArrayQueue`]: super::ArrayQueue
123
///
124
/// # Examples
125
///
126
/// ```
127
/// use crossbeam_queue::SegQueue;
128
///
129
/// let q = SegQueue::new();
130
///
131
/// q.push('a');
132
/// q.push('b');
133
///
134
/// assert_eq!(q.pop(), Some('a'));
135
/// assert_eq!(q.pop(), Some('b'));
136
/// assert!(q.pop().is_none());
137
/// ```
138
pub struct SegQueue<T> {
139
    /// The head of the queue.
140
    head: CachePadded<Position<T>>,
141
142
    /// The tail of the queue.
143
    tail: CachePadded<Position<T>>,
144
145
    /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146
    _marker: PhantomData<T>,
147
}
148
149
unsafe impl<T: Send> Send for SegQueue<T> {}
150
unsafe impl<T: Send> Sync for SegQueue<T> {}
151
152
impl<T> UnwindSafe for SegQueue<T> {}
153
impl<T> RefUnwindSafe for SegQueue<T> {}
154
155
impl<T> SegQueue<T> {
156
    /// Creates a new unbounded queue.
157
    ///
158
    /// # Examples
159
    ///
160
    /// ```
161
    /// use crossbeam_queue::SegQueue;
162
    ///
163
    /// let q = SegQueue::<i32>::new();
164
    /// ```
165
0
    pub const fn new() -> SegQueue<T> {
166
0
        SegQueue {
167
0
            head: CachePadded::new(Position {
168
0
                block: AtomicPtr::new(ptr::null_mut()),
169
0
                index: AtomicUsize::new(0),
170
0
            }),
171
0
            tail: CachePadded::new(Position {
172
0
                block: AtomicPtr::new(ptr::null_mut()),
173
0
                index: AtomicUsize::new(0),
174
0
            }),
175
0
            _marker: PhantomData,
176
0
        }
177
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::new
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::new
178
179
    /// Pushes an element into the queue.
180
    ///
181
    /// # Examples
182
    ///
183
    /// ```
184
    /// use crossbeam_queue::SegQueue;
185
    ///
186
    /// let q = SegQueue::new();
187
    ///
188
    /// q.push(10);
189
    /// q.push(20);
190
    /// ```
191
0
    pub fn push(&self, value: T) {
192
0
        let backoff = Backoff::new();
193
0
        let mut tail = self.tail.index.load(Ordering::Acquire);
194
0
        let mut block = self.tail.block.load(Ordering::Acquire);
195
0
        let mut next_block = None;
196
197
0
        loop {
198
0
            // Calculate the offset of the index into the block.
199
0
            let offset = (tail >> SHIFT) % LAP;
200
0
201
0
            // If we reached the end of the block, wait until the next one is installed.
202
0
            if offset == BLOCK_CAP {
203
0
                backoff.snooze();
204
0
                tail = self.tail.index.load(Ordering::Acquire);
205
0
                block = self.tail.block.load(Ordering::Acquire);
206
0
                continue;
207
0
            }
208
0
209
0
            // If we're going to have to install the next block, allocate it in advance in order to
210
0
            // make the wait for other threads as short as possible.
211
0
            if offset + 1 == BLOCK_CAP && next_block.is_none() {
212
0
                next_block = Some(Box::new(Block::<T>::new()));
213
0
            }
214
215
            // If this is the first push operation, we need to allocate the first block.
216
0
            if block.is_null() {
217
0
                let new = Box::into_raw(Box::new(Block::<T>::new()));
218
0
219
0
                if self
220
0
                    .tail
221
0
                    .block
222
0
                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
223
0
                    .is_ok()
224
0
                {
225
0
                    self.head.block.store(new, Ordering::Release);
226
0
                    block = new;
227
0
                } else {
228
0
                    next_block = unsafe { Some(Box::from_raw(new)) };
229
0
                    tail = self.tail.index.load(Ordering::Acquire);
230
0
                    block = self.tail.block.load(Ordering::Acquire);
231
0
                    continue;
232
                }
233
0
            }
234
235
0
            let new_tail = tail + (1 << SHIFT);
236
0
237
0
            // Try advancing the tail forward.
238
0
            match self.tail.index.compare_exchange_weak(
239
0
                tail,
240
0
                new_tail,
241
0
                Ordering::SeqCst,
242
0
                Ordering::Acquire,
243
0
            ) {
244
                Ok(_) => unsafe {
245
                    // If we've reached the end of the block, install the next one.
246
0
                    if offset + 1 == BLOCK_CAP {
247
0
                        let next_block = Box::into_raw(next_block.unwrap());
248
0
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
249
0
250
0
                        self.tail.block.store(next_block, Ordering::Release);
251
0
                        self.tail.index.store(next_index, Ordering::Release);
252
0
                        (*block).next.store(next_block, Ordering::Release);
253
0
                    }
254
255
                    // Write the value into the slot.
256
0
                    let slot = (*block).slots.get_unchecked(offset);
257
0
                    slot.value.get().write(MaybeUninit::new(value));
258
0
                    slot.state.fetch_or(WRITE, Ordering::Release);
259
0
260
0
                    return;
261
                },
262
0
                Err(t) => {
263
0
                    tail = t;
264
0
                    block = self.tail.block.load(Ordering::Acquire);
265
0
                    backoff.spin();
266
0
                }
267
            }
268
        }
269
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::push
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::push
270
271
    /// Pops an element from the queue.
272
    ///
273
    /// If the queue is empty, `None` is returned.
274
    ///
275
    /// # Examples
276
    ///
277
    /// ```
278
    /// use crossbeam_queue::SegQueue;
279
    ///
280
    /// let q = SegQueue::new();
281
    ///
282
    /// q.push(10);
283
    /// assert_eq!(q.pop(), Some(10));
284
    /// assert!(q.pop().is_none());
285
    /// ```
286
0
    pub fn pop(&self) -> Option<T> {
287
0
        let backoff = Backoff::new();
288
0
        let mut head = self.head.index.load(Ordering::Acquire);
289
0
        let mut block = self.head.block.load(Ordering::Acquire);
290
291
0
        loop {
292
0
            // Calculate the offset of the index into the block.
293
0
            let offset = (head >> SHIFT) % LAP;
294
0
295
0
            // If we reached the end of the block, wait until the next one is installed.
296
0
            if offset == BLOCK_CAP {
297
0
                backoff.snooze();
298
0
                head = self.head.index.load(Ordering::Acquire);
299
0
                block = self.head.block.load(Ordering::Acquire);
300
0
                continue;
301
0
            }
302
0
303
0
            let mut new_head = head + (1 << SHIFT);
304
0
305
0
            if new_head & HAS_NEXT == 0 {
306
0
                atomic::fence(Ordering::SeqCst);
307
0
                let tail = self.tail.index.load(Ordering::Relaxed);
308
0
309
0
                // If the tail equals the head, that means the queue is empty.
310
0
                if head >> SHIFT == tail >> SHIFT {
311
0
                    return None;
312
0
                }
313
0
314
0
                // If head and tail are not in the same block, set `HAS_NEXT` in head.
315
0
                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
316
0
                    new_head |= HAS_NEXT;
317
0
                }
318
0
            }
319
320
            // The block can be null here only if the first push operation is in progress. In that
321
            // case, just wait until it gets initialized.
322
0
            if block.is_null() {
323
0
                backoff.snooze();
324
0
                head = self.head.index.load(Ordering::Acquire);
325
0
                block = self.head.block.load(Ordering::Acquire);
326
0
                continue;
327
0
            }
328
0
329
0
            // Try moving the head index forward.
330
0
            match self.head.index.compare_exchange_weak(
331
0
                head,
332
0
                new_head,
333
0
                Ordering::SeqCst,
334
0
                Ordering::Acquire,
335
0
            ) {
336
                Ok(_) => unsafe {
337
                    // If we've reached the end of the block, move to the next one.
338
0
                    if offset + 1 == BLOCK_CAP {
339
0
                        let next = (*block).wait_next();
340
0
                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
341
0
                        if !(*next).next.load(Ordering::Relaxed).is_null() {
342
0
                            next_index |= HAS_NEXT;
343
0
                        }
344
345
0
                        self.head.block.store(next, Ordering::Release);
346
0
                        self.head.index.store(next_index, Ordering::Release);
347
0
                    }
348
349
                    // Read the value.
350
0
                    let slot = (*block).slots.get_unchecked(offset);
351
0
                    slot.wait_write();
352
0
                    let value = slot.value.get().read().assume_init();
353
0
354
0
                    // Destroy the block if we've reached the end, or if another thread wanted to
355
0
                    // destroy but couldn't because we were busy reading from the slot.
356
0
                    if offset + 1 == BLOCK_CAP {
357
0
                        Block::destroy(block, 0);
358
0
                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
359
0
                        Block::destroy(block, offset + 1);
360
0
                    }
361
362
0
                    return Some(value);
363
                },
364
0
                Err(h) => {
365
0
                    head = h;
366
0
                    block = self.head.block.load(Ordering::Acquire);
367
0
                    backoff.spin();
368
0
                }
369
            }
370
        }
371
0
    }
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::pop
Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::pop
372
373
    /// Returns `true` if the queue is empty.
374
    ///
375
    /// # Examples
376
    ///
377
    /// ```
378
    /// use crossbeam_queue::SegQueue;
379
    ///
380
    /// let q = SegQueue::new();
381
    ///
382
    /// assert!(q.is_empty());
383
    /// q.push(1);
384
    /// assert!(!q.is_empty());
385
    /// ```
386
    pub fn is_empty(&self) -> bool {
387
        let head = self.head.index.load(Ordering::SeqCst);
388
        let tail = self.tail.index.load(Ordering::SeqCst);
389
        head >> SHIFT == tail >> SHIFT
390
    }
391
392
    /// Returns the number of elements in the queue.
393
    ///
394
    /// # Examples
395
    ///
396
    /// ```
397
    /// use crossbeam_queue::SegQueue;
398
    ///
399
    /// let q = SegQueue::new();
400
    /// assert_eq!(q.len(), 0);
401
    ///
402
    /// q.push(10);
403
    /// assert_eq!(q.len(), 1);
404
    ///
405
    /// q.push(20);
406
    /// assert_eq!(q.len(), 2);
407
    /// ```
408
    pub fn len(&self) -> usize {
409
        loop {
410
            // Load the tail index, then load the head index.
411
            let mut tail = self.tail.index.load(Ordering::SeqCst);
412
            let mut head = self.head.index.load(Ordering::SeqCst);
413
414
            // If the tail index didn't change, we've got consistent indices to work with.
415
            if self.tail.index.load(Ordering::SeqCst) == tail {
416
                // Erase the lower bits.
417
                tail &= !((1 << SHIFT) - 1);
418
                head &= !((1 << SHIFT) - 1);
419
420
                // Fix up indices if they fall onto block ends.
421
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
422
                    tail = tail.wrapping_add(1 << SHIFT);
423
                }
424
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
425
                    head = head.wrapping_add(1 << SHIFT);
426
                }
427
428
                // Rotate indices so that head falls into the first block.
429
                let lap = (head >> SHIFT) / LAP;
430
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
431
                head = head.wrapping_sub((lap * LAP) << SHIFT);
432
433
                // Remove the lower bits.
434
                tail >>= SHIFT;
435
                head >>= SHIFT;
436
437
                // Return the difference minus the number of blocks between tail and head.
438
                return tail - head - tail / LAP;
439
            }
440
        }
441
    }
442
}
443
444
impl<T> Drop for SegQueue<T> {
445
    fn drop(&mut self) {
446
        let mut head = *self.head.index.get_mut();
447
        let mut tail = *self.tail.index.get_mut();
448
        let mut block = *self.head.block.get_mut();
449
450
        // Erase the lower bits.
451
        head &= !((1 << SHIFT) - 1);
452
        tail &= !((1 << SHIFT) - 1);
453
454
        unsafe {
455
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
456
            while head != tail {
457
                let offset = (head >> SHIFT) % LAP;
458
459
                if offset < BLOCK_CAP {
460
                    // Drop the value in the slot.
461
                    let slot = (*block).slots.get_unchecked(offset);
462
                    (*slot.value.get()).assume_init_drop();
463
                } else {
464
                    // Deallocate the block and move to the next one.
465
                    let next = *(*block).next.get_mut();
466
                    drop(Box::from_raw(block));
467
                    block = next;
468
                }
469
470
                head = head.wrapping_add(1 << SHIFT);
471
            }
472
473
            // Deallocate the last remaining block.
474
            if !block.is_null() {
475
                drop(Box::from_raw(block));
476
            }
477
        }
478
    }
479
}
480
481
impl<T> fmt::Debug for SegQueue<T> {
482
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483
        f.pad("SegQueue { .. }")
484
    }
485
}
486
487
impl<T> Default for SegQueue<T> {
488
    fn default() -> SegQueue<T> {
489
        SegQueue::new()
490
    }
491
}
492
493
impl<T> IntoIterator for SegQueue<T> {
494
    type Item = T;
495
496
    type IntoIter = IntoIter<T>;
497
498
    fn into_iter(self) -> Self::IntoIter {
499
        IntoIter { value: self }
500
    }
501
}
502
503
#[derive(Debug)]
504
pub struct IntoIter<T> {
505
    value: SegQueue<T>,
506
}
507
508
impl<T> Iterator for IntoIter<T> {
509
    type Item = T;
510
511
    fn next(&mut self) -> Option<Self::Item> {
512
        let value = &mut self.value;
513
        let head = *value.head.index.get_mut();
514
        let tail = *value.tail.index.get_mut();
515
        if head >> SHIFT == tail >> SHIFT {
516
            None
517
        } else {
518
            let block = *value.head.block.get_mut();
519
            let offset = (head >> SHIFT) % LAP;
520
521
            // SAFETY: We have mutable access to this, so we can read without
522
            // worrying about concurrency. Furthermore, we know this is
523
            // initialized because it is the value pointed at by `value.head`
524
            // and this is a non-empty queue.
525
            let item = unsafe {
526
                let slot = (*block).slots.get_unchecked(offset);
527
                slot.value.get().read().assume_init()
528
            };
529
            if offset + 1 == BLOCK_CAP {
530
                // Deallocate the block and move to the next one.
531
                // SAFETY: The block is initialized because we've been reading
532
                // from it this entire time. We can drop it b/c everything has
533
                // been read out of it, so nothing is pointing to it anymore.
534
                unsafe {
535
                    let next = *(*block).next.get_mut();
536
                    drop(Box::from_raw(block));
537
                    *value.head.block.get_mut() = next;
538
                }
539
                // The last value in a block is empty, so skip it
540
                *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
541
                // Double-check that we're pointing to the first item in a block.
542
                debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
543
            } else {
544
                *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
545
            }
546
            Some(item)
547
        }
548
    }
549
}