Coverage Report

Created: 2025-02-25 06:39

/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-deque-0.8.6/src/deque.rs
Line
Count
Source (jump to first uncovered line)
1
use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
2
use std::boxed::Box;
3
use std::cell::{Cell, UnsafeCell};
4
use std::cmp;
5
use std::fmt;
6
use std::marker::PhantomData;
7
use std::mem::{self, MaybeUninit};
8
use std::ptr;
9
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
10
use std::sync::Arc;
11
12
use crossbeam_epoch::{self as epoch, Atomic, Owned};
13
use crossbeam_utils::{Backoff, CachePadded};
14
15
// Minimum buffer capacity.
16
const MIN_CAP: usize = 64;
17
// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
18
const MAX_BATCH: usize = 32;
19
// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
20
// deallocated as soon as possible.
21
const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
22
23
/// A buffer that holds tasks in a worker queue.
24
///
25
/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
26
/// *not* deallocate the buffer.
27
struct Buffer<T> {
28
    /// Pointer to the allocated memory.
29
    ptr: *mut T,
30
31
    /// Capacity of the buffer. Always a power of two.
32
    cap: usize,
33
}
34
35
unsafe impl<T> Send for Buffer<T> {}
36
37
impl<T> Buffer<T> {
38
    /// Allocates a new buffer with the specified capacity.
39
0
    fn alloc(cap: usize) -> Buffer<T> {
40
0
        debug_assert_eq!(cap, cap.next_power_of_two());
41
42
0
        let ptr = Box::into_raw(
43
0
            (0..cap)
44
0
                .map(|_| MaybeUninit::<T>::uninit())
45
0
                .collect::<Box<[_]>>(),
46
0
        )
47
0
        .cast::<T>();
48
0
49
0
        Buffer { ptr, cap }
50
0
    }
51
52
    /// Deallocates the buffer.
53
0
    unsafe fn dealloc(self) {
54
0
        drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
55
0
            self.ptr.cast::<MaybeUninit<T>>(),
56
0
            self.cap,
57
0
        )));
58
0
    }
59
60
    /// Returns a pointer to the task at the specified `index`.
61
0
    unsafe fn at(&self, index: isize) -> *mut T {
62
0
        // `self.cap` is always a power of two.
63
0
        // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
64
0
        // don't actually have the right to access this memory.
65
0
        self.ptr.offset(index & (self.cap - 1) as isize)
66
0
    }
67
68
    /// Writes `task` into the specified `index`.
69
    ///
70
    /// This method might be concurrently called with another `read` at the same index, which is
71
    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
72
    /// that would be more expensive and difficult to implement generically for all types `T`.
73
    /// Hence, as a hack, we use a volatile write instead.
74
0
    unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
75
0
        ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
76
0
    }
77
78
    /// Reads a task from the specified `index`.
79
    ///
80
    /// This method might be concurrently called with another `write` at the same index, which is
81
    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
82
    /// that would be more expensive and difficult to implement generically for all types `T`.
83
    /// Hence, as a hack, we use a volatile load instead.
84
0
    unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
85
0
        ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
86
0
    }
87
}
88
89
impl<T> Clone for Buffer<T> {
90
    fn clone(&self) -> Buffer<T> {
91
        *self
92
    }
93
}
94
95
impl<T> Copy for Buffer<T> {}
96
97
/// Internal queue data shared between the worker and stealers.
98
///
99
/// The implementation is based on the following work:
100
///
101
/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
102
/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
103
///    PPoPP 2013.][weak-mem]
104
/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
105
///    atomics. OOPSLA 2013.][checker]
106
///
107
/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
108
/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
109
/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
110
struct Inner<T> {
111
    /// The front index.
112
    front: AtomicIsize,
113
114
    /// The back index.
115
    back: AtomicIsize,
116
117
    /// The underlying buffer.
118
    buffer: CachePadded<Atomic<Buffer<T>>>,
119
}
120
121
impl<T> Drop for Inner<T> {
122
0
    fn drop(&mut self) {
123
0
        // Load the back index, front index, and buffer.
124
0
        let b = *self.back.get_mut();
125
0
        let f = *self.front.get_mut();
126
0
127
0
        unsafe {
128
0
            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
129
0
130
0
            // Go through the buffer from front to back and drop all tasks in the queue.
131
0
            let mut i = f;
132
0
            while i != b {
133
0
                buffer.deref().at(i).drop_in_place();
134
0
                i = i.wrapping_add(1);
135
0
            }
136
137
            // Free the memory allocated by the buffer.
138
0
            buffer.into_owned().into_box().dealloc();
139
0
        }
140
0
    }
141
}
142
143
/// Worker queue flavor: FIFO or LIFO.
144
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
145
enum Flavor {
146
    /// The first-in first-out flavor.
147
    Fifo,
148
149
    /// The last-in first-out flavor.
150
    Lifo,
151
}
152
153
/// A worker queue.
154
///
155
/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
156
/// tasks from it. Task schedulers typically create a single worker queue per thread.
157
///
158
/// # Examples
159
///
160
/// A FIFO worker:
161
///
162
/// ```
163
/// use crossbeam_deque::{Steal, Worker};
164
///
165
/// let w = Worker::new_fifo();
166
/// let s = w.stealer();
167
///
168
/// w.push(1);
169
/// w.push(2);
170
/// w.push(3);
171
///
172
/// assert_eq!(s.steal(), Steal::Success(1));
173
/// assert_eq!(w.pop(), Some(2));
174
/// assert_eq!(w.pop(), Some(3));
175
/// ```
176
///
177
/// A LIFO worker:
178
///
179
/// ```
180
/// use crossbeam_deque::{Steal, Worker};
181
///
182
/// let w = Worker::new_lifo();
183
/// let s = w.stealer();
184
///
185
/// w.push(1);
186
/// w.push(2);
187
/// w.push(3);
188
///
189
/// assert_eq!(s.steal(), Steal::Success(1));
190
/// assert_eq!(w.pop(), Some(3));
191
/// assert_eq!(w.pop(), Some(2));
192
/// ```
193
pub struct Worker<T> {
194
    /// A reference to the inner representation of the queue.
195
    inner: Arc<CachePadded<Inner<T>>>,
196
197
    /// A copy of `inner.buffer` for quick access.
198
    buffer: Cell<Buffer<T>>,
199
200
    /// The flavor of the queue.
201
    flavor: Flavor,
202
203
    /// Indicates that the worker cannot be shared among threads.
204
    _marker: PhantomData<*mut ()>, // !Send + !Sync
205
}
206
207
unsafe impl<T: Send> Send for Worker<T> {}
208
209
impl<T> Worker<T> {
210
    /// Creates a FIFO worker queue.
211
    ///
212
    /// Tasks are pushed and popped from opposite ends.
213
    ///
214
    /// # Examples
215
    ///
216
    /// ```
217
    /// use crossbeam_deque::Worker;
218
    ///
219
    /// let w = Worker::<i32>::new_fifo();
220
    /// ```
221
0
    pub fn new_fifo() -> Worker<T> {
222
0
        let buffer = Buffer::alloc(MIN_CAP);
223
0
224
0
        let inner = Arc::new(CachePadded::new(Inner {
225
0
            front: AtomicIsize::new(0),
226
0
            back: AtomicIsize::new(0),
227
0
            buffer: CachePadded::new(Atomic::new(buffer)),
228
0
        }));
229
0
230
0
        Worker {
231
0
            inner,
232
0
            buffer: Cell::new(buffer),
233
0
            flavor: Flavor::Fifo,
234
0
            _marker: PhantomData,
235
0
        }
236
0
    }
237
238
    /// Creates a LIFO worker queue.
239
    ///
240
    /// Tasks are pushed and popped from the same end.
241
    ///
242
    /// # Examples
243
    ///
244
    /// ```
245
    /// use crossbeam_deque::Worker;
246
    ///
247
    /// let w = Worker::<i32>::new_lifo();
248
    /// ```
249
0
    pub fn new_lifo() -> Worker<T> {
250
0
        let buffer = Buffer::alloc(MIN_CAP);
251
0
252
0
        let inner = Arc::new(CachePadded::new(Inner {
253
0
            front: AtomicIsize::new(0),
254
0
            back: AtomicIsize::new(0),
255
0
            buffer: CachePadded::new(Atomic::new(buffer)),
256
0
        }));
257
0
258
0
        Worker {
259
0
            inner,
260
0
            buffer: Cell::new(buffer),
261
0
            flavor: Flavor::Lifo,
262
0
            _marker: PhantomData,
263
0
        }
264
0
    }
265
266
    /// Creates a stealer for this queue.
267
    ///
268
    /// The returned stealer can be shared among threads and cloned.
269
    ///
270
    /// # Examples
271
    ///
272
    /// ```
273
    /// use crossbeam_deque::Worker;
274
    ///
275
    /// let w = Worker::<i32>::new_lifo();
276
    /// let s = w.stealer();
277
    /// ```
278
0
    pub fn stealer(&self) -> Stealer<T> {
279
0
        Stealer {
280
0
            inner: self.inner.clone(),
281
0
            flavor: self.flavor,
282
0
        }
283
0
    }
284
285
    /// Resizes the internal buffer to the new capacity of `new_cap`.
286
    #[cold]
287
0
    unsafe fn resize(&self, new_cap: usize) {
288
0
        // Load the back index, front index, and buffer.
289
0
        let b = self.inner.back.load(Ordering::Relaxed);
290
0
        let f = self.inner.front.load(Ordering::Relaxed);
291
0
        let buffer = self.buffer.get();
292
0
293
0
        // Allocate a new buffer and copy data from the old buffer to the new one.
294
0
        let new = Buffer::alloc(new_cap);
295
0
        let mut i = f;
296
0
        while i != b {
297
0
            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
298
0
            i = i.wrapping_add(1);
299
0
        }
300
301
0
        let guard = &epoch::pin();
302
0
303
0
        // Replace the old buffer with the new one.
304
0
        self.buffer.replace(new);
305
0
        let old =
306
0
            self.inner
307
0
                .buffer
308
0
                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
309
0
310
0
        // Destroy the old buffer later.
311
0
        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
312
0
313
0
        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
314
0
        // it as soon as possible.
315
0
        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
316
0
            guard.flush();
317
0
        }
318
0
    }
319
320
    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
321
    /// buffer.
322
    fn reserve(&self, reserve_cap: usize) {
323
        if reserve_cap > 0 {
324
            // Compute the current length.
325
            let b = self.inner.back.load(Ordering::Relaxed);
326
            let f = self.inner.front.load(Ordering::SeqCst);
327
            let len = b.wrapping_sub(f) as usize;
328
329
            // The current capacity.
330
            let cap = self.buffer.get().cap;
331
332
            // Is there enough capacity to push `reserve_cap` tasks?
333
            if cap - len < reserve_cap {
334
                // Keep doubling the capacity as much as is needed.
335
                let mut new_cap = cap * 2;
336
                while new_cap - len < reserve_cap {
337
                    new_cap *= 2;
338
                }
339
340
                // Resize the buffer.
341
                unsafe {
342
                    self.resize(new_cap);
343
                }
344
            }
345
        }
346
    }
347
348
    /// Returns `true` if the queue is empty.
349
    ///
350
    /// ```
351
    /// use crossbeam_deque::Worker;
352
    ///
353
    /// let w = Worker::new_lifo();
354
    ///
355
    /// assert!(w.is_empty());
356
    /// w.push(1);
357
    /// assert!(!w.is_empty());
358
    /// ```
359
0
    pub fn is_empty(&self) -> bool {
360
0
        let b = self.inner.back.load(Ordering::Relaxed);
361
0
        let f = self.inner.front.load(Ordering::SeqCst);
362
0
        b.wrapping_sub(f) <= 0
363
0
    }
364
365
    /// Returns the number of tasks in the deque.
366
    ///
367
    /// ```
368
    /// use crossbeam_deque::Worker;
369
    ///
370
    /// let w = Worker::new_lifo();
371
    ///
372
    /// assert_eq!(w.len(), 0);
373
    /// w.push(1);
374
    /// assert_eq!(w.len(), 1);
375
    /// w.push(1);
376
    /// assert_eq!(w.len(), 2);
377
    /// ```
378
    pub fn len(&self) -> usize {
379
        let b = self.inner.back.load(Ordering::Relaxed);
380
        let f = self.inner.front.load(Ordering::SeqCst);
381
        b.wrapping_sub(f).max(0) as usize
382
    }
383
384
    /// Pushes a task into the queue.
385
    ///
386
    /// # Examples
387
    ///
388
    /// ```
389
    /// use crossbeam_deque::Worker;
390
    ///
391
    /// let w = Worker::new_lifo();
392
    /// w.push(1);
393
    /// w.push(2);
394
    /// ```
395
0
    pub fn push(&self, task: T) {
396
0
        // Load the back index, front index, and buffer.
397
0
        let b = self.inner.back.load(Ordering::Relaxed);
398
0
        let f = self.inner.front.load(Ordering::Acquire);
399
0
        let mut buffer = self.buffer.get();
400
0
401
0
        // Calculate the length of the queue.
402
0
        let len = b.wrapping_sub(f);
403
0
404
0
        // Is the queue full?
405
0
        if len >= buffer.cap as isize {
406
0
            // Yes. Grow the underlying buffer.
407
0
            unsafe {
408
0
                self.resize(2 * buffer.cap);
409
0
            }
410
0
            buffer = self.buffer.get();
411
0
        }
412
413
        // Write `task` into the slot.
414
0
        unsafe {
415
0
            buffer.write(b, MaybeUninit::new(task));
416
0
        }
417
0
418
0
        atomic::fence(Ordering::Release);
419
0
420
0
        // Increment the back index.
421
0
        //
422
0
        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
423
0
        // races because it doesn't understand fences.
424
0
        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
425
0
    }
426
427
    /// Pops a task from the queue.
428
    ///
429
    /// # Examples
430
    ///
431
    /// ```
432
    /// use crossbeam_deque::Worker;
433
    ///
434
    /// let w = Worker::new_fifo();
435
    /// w.push(1);
436
    /// w.push(2);
437
    ///
438
    /// assert_eq!(w.pop(), Some(1));
439
    /// assert_eq!(w.pop(), Some(2));
440
    /// assert_eq!(w.pop(), None);
441
    /// ```
442
0
    pub fn pop(&self) -> Option<T> {
443
0
        // Load the back and front index.
444
0
        let b = self.inner.back.load(Ordering::Relaxed);
445
0
        let f = self.inner.front.load(Ordering::Relaxed);
446
0
447
0
        // Calculate the length of the queue.
448
0
        let len = b.wrapping_sub(f);
449
0
450
0
        // Is the queue empty?
451
0
        if len <= 0 {
452
0
            return None;
453
0
        }
454
0
455
0
        match self.flavor {
456
            // Pop from the front of the queue.
457
            Flavor::Fifo => {
458
                // Try incrementing the front index to pop the task.
459
0
                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
460
0
                let new_f = f.wrapping_add(1);
461
0
462
0
                if b.wrapping_sub(new_f) < 0 {
463
0
                    self.inner.front.store(f, Ordering::Relaxed);
464
0
                    return None;
465
0
                }
466
0
467
0
                unsafe {
468
0
                    // Read the popped task.
469
0
                    let buffer = self.buffer.get();
470
0
                    let task = buffer.read(f).assume_init();
471
0
472
0
                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
473
0
                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
474
0
                        self.resize(buffer.cap / 2);
475
0
                    }
476
477
0
                    Some(task)
478
                }
479
            }
480
481
            // Pop from the back of the queue.
482
            Flavor::Lifo => {
483
                // Decrement the back index.
484
0
                let b = b.wrapping_sub(1);
485
0
                self.inner.back.store(b, Ordering::Relaxed);
486
0
487
0
                atomic::fence(Ordering::SeqCst);
488
0
489
0
                // Load the front index.
490
0
                let f = self.inner.front.load(Ordering::Relaxed);
491
0
492
0
                // Compute the length after the back index was decremented.
493
0
                let len = b.wrapping_sub(f);
494
0
495
0
                if len < 0 {
496
                    // The queue is empty. Restore the back index to the original task.
497
0
                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
498
0
                    None
499
                } else {
500
                    // Read the task to be popped.
501
0
                    let buffer = self.buffer.get();
502
0
                    let mut task = unsafe { Some(buffer.read(b)) };
503
0
504
0
                    // Are we popping the last task from the queue?
505
0
                    if len == 0 {
506
                        // Try incrementing the front index.
507
0
                        if self
508
0
                            .inner
509
0
                            .front
510
0
                            .compare_exchange(
511
0
                                f,
512
0
                                f.wrapping_add(1),
513
0
                                Ordering::SeqCst,
514
0
                                Ordering::Relaxed,
515
0
                            )
516
0
                            .is_err()
517
0
                        {
518
0
                            // Failed. We didn't pop anything. Reset to `None`.
519
0
                            task.take();
520
0
                        }
521
522
                        // Restore the back index to the original task.
523
0
                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
524
                    } else {
525
                        // Shrink the buffer if `len` is less than one fourth of the capacity.
526
0
                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
527
0
                            unsafe {
528
0
                                self.resize(buffer.cap / 2);
529
0
                            }
530
0
                        }
531
                    }
532
533
0
                    task.map(|t| unsafe { t.assume_init() })
534
                }
535
            }
536
        }
537
0
    }
538
}
539
540
impl<T> fmt::Debug for Worker<T> {
541
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
542
        f.pad("Worker { .. }")
543
    }
544
}
545
546
/// A stealer handle of a worker queue.
547
///
548
/// Stealers can be shared among threads.
549
///
550
/// Task schedulers typically have a single worker queue per worker thread.
551
///
552
/// # Examples
553
///
554
/// ```
555
/// use crossbeam_deque::{Steal, Worker};
556
///
557
/// let w = Worker::new_lifo();
558
/// w.push(1);
559
/// w.push(2);
560
///
561
/// let s = w.stealer();
562
/// assert_eq!(s.steal(), Steal::Success(1));
563
/// assert_eq!(s.steal(), Steal::Success(2));
564
/// assert_eq!(s.steal(), Steal::Empty);
565
/// ```
566
pub struct Stealer<T> {
567
    /// A reference to the inner representation of the queue.
568
    inner: Arc<CachePadded<Inner<T>>>,
569
570
    /// The flavor of the queue.
571
    flavor: Flavor,
572
}
573
574
unsafe impl<T: Send> Send for Stealer<T> {}
575
unsafe impl<T: Send> Sync for Stealer<T> {}
576
577
impl<T> Stealer<T> {
578
    /// Returns `true` if the queue is empty.
579
    ///
580
    /// ```
581
    /// use crossbeam_deque::Worker;
582
    ///
583
    /// let w = Worker::new_lifo();
584
    /// let s = w.stealer();
585
    ///
586
    /// assert!(s.is_empty());
587
    /// w.push(1);
588
    /// assert!(!s.is_empty());
589
    /// ```
590
0
    pub fn is_empty(&self) -> bool {
591
0
        let f = self.inner.front.load(Ordering::Acquire);
592
0
        atomic::fence(Ordering::SeqCst);
593
0
        let b = self.inner.back.load(Ordering::Acquire);
594
0
        b.wrapping_sub(f) <= 0
595
0
    }
596
597
    /// Returns the number of tasks in the deque.
598
    ///
599
    /// ```
600
    /// use crossbeam_deque::Worker;
601
    ///
602
    /// let w = Worker::new_lifo();
603
    /// let s = w.stealer();
604
    ///
605
    /// assert_eq!(s.len(), 0);
606
    /// w.push(1);
607
    /// assert_eq!(s.len(), 1);
608
    /// w.push(2);
609
    /// assert_eq!(s.len(), 2);
610
    /// ```
611
    pub fn len(&self) -> usize {
612
        let f = self.inner.front.load(Ordering::Acquire);
613
        atomic::fence(Ordering::SeqCst);
614
        let b = self.inner.back.load(Ordering::Acquire);
615
        b.wrapping_sub(f).max(0) as usize
616
    }
617
618
    /// Steals a task from the queue.
619
    ///
620
    /// # Examples
621
    ///
622
    /// ```
623
    /// use crossbeam_deque::{Steal, Worker};
624
    ///
625
    /// let w = Worker::new_lifo();
626
    /// w.push(1);
627
    /// w.push(2);
628
    ///
629
    /// let s = w.stealer();
630
    /// assert_eq!(s.steal(), Steal::Success(1));
631
    /// assert_eq!(s.steal(), Steal::Success(2));
632
    /// ```
633
0
    pub fn steal(&self) -> Steal<T> {
634
0
        // Load the front index.
635
0
        let f = self.inner.front.load(Ordering::Acquire);
636
0
637
0
        // A SeqCst fence is needed here.
638
0
        //
639
0
        // If the current thread is already pinned (reentrantly), we must manually issue the
640
0
        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
641
0
        // have to.
642
0
        if epoch::is_pinned() {
643
0
            atomic::fence(Ordering::SeqCst);
644
0
        }
645
646
0
        let guard = &epoch::pin();
647
0
648
0
        // Load the back index.
649
0
        let b = self.inner.back.load(Ordering::Acquire);
650
0
651
0
        // Is the queue empty?
652
0
        if b.wrapping_sub(f) <= 0 {
653
0
            return Steal::Empty;
654
0
        }
655
0
656
0
        // Load the buffer and read the task at the front.
657
0
        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
658
0
        let task = unsafe { buffer.deref().read(f) };
659
0
660
0
        // Try incrementing the front index to steal the task.
661
0
        // If the buffer has been swapped or the increment fails, we retry.
662
0
        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
663
0
            || self
664
0
                .inner
665
0
                .front
666
0
                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
667
0
                .is_err()
668
        {
669
            // We didn't steal this task, forget it.
670
0
            return Steal::Retry;
671
0
        }
672
0
673
0
        // Return the stolen task.
674
0
        Steal::Success(unsafe { task.assume_init() })
675
0
    }
676
677
    /// Steals a batch of tasks and pushes them into another worker.
678
    ///
679
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
680
    /// steal around half of the tasks in the queue, but also not more than some constant limit.
681
    ///
682
    /// # Examples
683
    ///
684
    /// ```
685
    /// use crossbeam_deque::Worker;
686
    ///
687
    /// let w1 = Worker::new_fifo();
688
    /// w1.push(1);
689
    /// w1.push(2);
690
    /// w1.push(3);
691
    /// w1.push(4);
692
    ///
693
    /// let s = w1.stealer();
694
    /// let w2 = Worker::new_fifo();
695
    ///
696
    /// let _ = s.steal_batch(&w2);
697
    /// assert_eq!(w2.pop(), Some(1));
698
    /// assert_eq!(w2.pop(), Some(2));
699
    /// ```
700
    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
701
        self.steal_batch_with_limit(dest, MAX_BATCH)
702
    }
703
704
    /// Steals no more than `limit` of tasks and pushes them into another worker.
705
    ///
706
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
707
    /// steal around half of the tasks in the queue, but also not more than the given limit.
708
    ///
709
    /// # Examples
710
    ///
711
    /// ```
712
    /// use crossbeam_deque::Worker;
713
    ///
714
    /// let w1 = Worker::new_fifo();
715
    /// w1.push(1);
716
    /// w1.push(2);
717
    /// w1.push(3);
718
    /// w1.push(4);
719
    /// w1.push(5);
720
    /// w1.push(6);
721
    ///
722
    /// let s = w1.stealer();
723
    /// let w2 = Worker::new_fifo();
724
    ///
725
    /// let _ = s.steal_batch_with_limit(&w2, 2);
726
    /// assert_eq!(w2.pop(), Some(1));
727
    /// assert_eq!(w2.pop(), Some(2));
728
    /// assert_eq!(w2.pop(), None);
729
    ///
730
    /// w1.push(7);
731
    /// w1.push(8);
732
    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
733
    /// // half of the elements are currently popped, but the number of popped elements is considered
734
    /// // an implementation detail that may be changed in the future.
735
    /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
736
    /// assert_eq!(w2.len(), 3);
737
    /// ```
738
    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
739
        assert!(limit > 0);
740
        if Arc::ptr_eq(&self.inner, &dest.inner) {
741
            if dest.is_empty() {
742
                return Steal::Empty;
743
            } else {
744
                return Steal::Success(());
745
            }
746
        }
747
748
        // Load the front index.
749
        let mut f = self.inner.front.load(Ordering::Acquire);
750
751
        // A SeqCst fence is needed here.
752
        //
753
        // If the current thread is already pinned (reentrantly), we must manually issue the
754
        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
755
        // have to.
756
        if epoch::is_pinned() {
757
            atomic::fence(Ordering::SeqCst);
758
        }
759
760
        let guard = &epoch::pin();
761
762
        // Load the back index.
763
        let b = self.inner.back.load(Ordering::Acquire);
764
765
        // Is the queue empty?
766
        let len = b.wrapping_sub(f);
767
        if len <= 0 {
768
            return Steal::Empty;
769
        }
770
771
        // Reserve capacity for the stolen batch.
772
        let batch_size = cmp::min((len as usize + 1) / 2, limit);
773
        dest.reserve(batch_size);
774
        let mut batch_size = batch_size as isize;
775
776
        // Get the destination buffer and back index.
777
        let dest_buffer = dest.buffer.get();
778
        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
779
780
        // Load the buffer.
781
        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
782
783
        match self.flavor {
784
            // Steal a batch of tasks from the front at once.
785
            Flavor::Fifo => {
786
                // Copy the batch from the source to the destination buffer.
787
                match dest.flavor {
788
                    Flavor::Fifo => {
789
                        for i in 0..batch_size {
790
                            unsafe {
791
                                let task = buffer.deref().read(f.wrapping_add(i));
792
                                dest_buffer.write(dest_b.wrapping_add(i), task);
793
                            }
794
                        }
795
                    }
796
                    Flavor::Lifo => {
797
                        for i in 0..batch_size {
798
                            unsafe {
799
                                let task = buffer.deref().read(f.wrapping_add(i));
800
                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
801
                            }
802
                        }
803
                    }
804
                }
805
806
                // Try incrementing the front index to steal the batch.
807
                // If the buffer has been swapped or the increment fails, we retry.
808
                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
809
                    || self
810
                        .inner
811
                        .front
812
                        .compare_exchange(
813
                            f,
814
                            f.wrapping_add(batch_size),
815
                            Ordering::SeqCst,
816
                            Ordering::Relaxed,
817
                        )
818
                        .is_err()
819
                {
820
                    return Steal::Retry;
821
                }
822
823
                dest_b = dest_b.wrapping_add(batch_size);
824
            }
825
826
            // Steal a batch of tasks from the front one by one.
827
            Flavor::Lifo => {
828
                // This loop may modify the batch_size, which triggers a clippy lint warning.
829
                // Use a new variable to avoid the warning, and to make it clear we aren't
830
                // modifying the loop exit condition during iteration.
831
                let original_batch_size = batch_size;
832
833
                for i in 0..original_batch_size {
834
                    // If this is not the first steal, check whether the queue is empty.
835
                    if i > 0 {
836
                        // We've already got the current front index. Now execute the fence to
837
                        // synchronize with other threads.
838
                        atomic::fence(Ordering::SeqCst);
839
840
                        // Load the back index.
841
                        let b = self.inner.back.load(Ordering::Acquire);
842
843
                        // Is the queue empty?
844
                        if b.wrapping_sub(f) <= 0 {
845
                            batch_size = i;
846
                            break;
847
                        }
848
                    }
849
850
                    // Read the task at the front.
851
                    let task = unsafe { buffer.deref().read(f) };
852
853
                    // Try incrementing the front index to steal the task.
854
                    // If the buffer has been swapped or the increment fails, we retry.
855
                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
856
                        || self
857
                            .inner
858
                            .front
859
                            .compare_exchange(
860
                                f,
861
                                f.wrapping_add(1),
862
                                Ordering::SeqCst,
863
                                Ordering::Relaxed,
864
                            )
865
                            .is_err()
866
                    {
867
                        // We didn't steal this task, forget it and break from the loop.
868
                        batch_size = i;
869
                        break;
870
                    }
871
872
                    // Write the stolen task into the destination buffer.
873
                    unsafe {
874
                        dest_buffer.write(dest_b, task);
875
                    }
876
877
                    // Move the source front index and the destination back index one step forward.
878
                    f = f.wrapping_add(1);
879
                    dest_b = dest_b.wrapping_add(1);
880
                }
881
882
                // If we didn't steal anything, the operation needs to be retried.
883
                if batch_size == 0 {
884
                    return Steal::Retry;
885
                }
886
887
                // If stealing into a FIFO queue, stolen tasks need to be reversed.
888
                if dest.flavor == Flavor::Fifo {
889
                    for i in 0..batch_size / 2 {
890
                        unsafe {
891
                            let i1 = dest_b.wrapping_sub(batch_size - i);
892
                            let i2 = dest_b.wrapping_sub(i + 1);
893
                            let t1 = dest_buffer.read(i1);
894
                            let t2 = dest_buffer.read(i2);
895
                            dest_buffer.write(i1, t2);
896
                            dest_buffer.write(i2, t1);
897
                        }
898
                    }
899
                }
900
            }
901
        }
902
903
        atomic::fence(Ordering::Release);
904
905
        // Update the back index in the destination queue.
906
        //
907
        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
908
        // races because it doesn't understand fences.
909
        dest.inner.back.store(dest_b, Ordering::Release);
910
911
        // Return with success.
912
        Steal::Success(())
913
    }
914
915
    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
916
    ///
917
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
918
    /// steal around half of the tasks in the queue, but also not more than some constant limit.
919
    ///
920
    /// # Examples
921
    ///
922
    /// ```
923
    /// use crossbeam_deque::{Steal, Worker};
924
    ///
925
    /// let w1 = Worker::new_fifo();
926
    /// w1.push(1);
927
    /// w1.push(2);
928
    /// w1.push(3);
929
    /// w1.push(4);
930
    ///
931
    /// let s = w1.stealer();
932
    /// let w2 = Worker::new_fifo();
933
    ///
934
    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
935
    /// assert_eq!(w2.pop(), Some(2));
936
    /// ```
937
    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
938
        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
939
    }
940
941
    /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
942
    /// that worker.
943
    ///
944
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
945
    /// steal around half of the tasks in the queue, but also not more than the given limit.
946
    ///
947
    /// # Examples
948
    ///
949
    /// ```
950
    /// use crossbeam_deque::{Steal, Worker};
951
    ///
952
    /// let w1 = Worker::new_fifo();
953
    /// w1.push(1);
954
    /// w1.push(2);
955
    /// w1.push(3);
956
    /// w1.push(4);
957
    /// w1.push(5);
958
    /// w1.push(6);
959
    ///
960
    /// let s = w1.stealer();
961
    /// let w2 = Worker::new_fifo();
962
    ///
963
    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
964
    /// assert_eq!(w2.pop(), Some(2));
965
    /// assert_eq!(w2.pop(), None);
966
    ///
967
    /// w1.push(7);
968
    /// w1.push(8);
969
    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
970
    /// // half of the elements are currently popped, but the number of popped elements is considered
971
    /// // an implementation detail that may be changed in the future.
972
    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
973
    /// assert_eq!(w2.pop(), Some(4));
974
    /// assert_eq!(w2.pop(), Some(5));
975
    /// assert_eq!(w2.pop(), None);
976
    /// ```
977
    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
978
        assert!(limit > 0);
979
        if Arc::ptr_eq(&self.inner, &dest.inner) {
980
            match dest.pop() {
981
                None => return Steal::Empty,
982
                Some(task) => return Steal::Success(task),
983
            }
984
        }
985
986
        // Load the front index.
987
        let mut f = self.inner.front.load(Ordering::Acquire);
988
989
        // A SeqCst fence is needed here.
990
        //
991
        // If the current thread is already pinned (reentrantly), we must manually issue the
992
        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
993
        // have to.
994
        if epoch::is_pinned() {
995
            atomic::fence(Ordering::SeqCst);
996
        }
997
998
        let guard = &epoch::pin();
999
1000
        // Load the back index.
1001
        let b = self.inner.back.load(Ordering::Acquire);
1002
1003
        // Is the queue empty?
1004
        let len = b.wrapping_sub(f);
1005
        if len <= 0 {
1006
            return Steal::Empty;
1007
        }
1008
1009
        // Reserve capacity for the stolen batch.
1010
        let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1011
        dest.reserve(batch_size);
1012
        let mut batch_size = batch_size as isize;
1013
1014
        // Get the destination buffer and back index.
1015
        let dest_buffer = dest.buffer.get();
1016
        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1017
1018
        // Load the buffer
1019
        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1020
1021
        // Read the task at the front.
1022
        let mut task = unsafe { buffer.deref().read(f) };
1023
1024
        match self.flavor {
1025
            // Steal a batch of tasks from the front at once.
1026
            Flavor::Fifo => {
1027
                // Copy the batch from the source to the destination buffer.
1028
                match dest.flavor {
1029
                    Flavor::Fifo => {
1030
                        for i in 0..batch_size {
1031
                            unsafe {
1032
                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1033
                                dest_buffer.write(dest_b.wrapping_add(i), task);
1034
                            }
1035
                        }
1036
                    }
1037
                    Flavor::Lifo => {
1038
                        for i in 0..batch_size {
1039
                            unsafe {
1040
                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1041
                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1042
                            }
1043
                        }
1044
                    }
1045
                }
1046
1047
                // Try incrementing the front index to steal the task.
1048
                // If the buffer has been swapped or the increment fails, we retry.
1049
                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1050
                    || self
1051
                        .inner
1052
                        .front
1053
                        .compare_exchange(
1054
                            f,
1055
                            f.wrapping_add(batch_size + 1),
1056
                            Ordering::SeqCst,
1057
                            Ordering::Relaxed,
1058
                        )
1059
                        .is_err()
1060
                {
1061
                    // We didn't steal this task, forget it.
1062
                    return Steal::Retry;
1063
                }
1064
1065
                dest_b = dest_b.wrapping_add(batch_size);
1066
            }
1067
1068
            // Steal a batch of tasks from the front one by one.
1069
            Flavor::Lifo => {
1070
                // Try incrementing the front index to steal the task.
1071
                if self
1072
                    .inner
1073
                    .front
1074
                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1075
                    .is_err()
1076
                {
1077
                    // We didn't steal this task, forget it.
1078
                    return Steal::Retry;
1079
                }
1080
1081
                // Move the front index one step forward.
1082
                f = f.wrapping_add(1);
1083
1084
                // Repeat the same procedure for the batch steals.
1085
                //
1086
                // This loop may modify the batch_size, which triggers a clippy lint warning.
1087
                // Use a new variable to avoid the warning, and to make it clear we aren't
1088
                // modifying the loop exit condition during iteration.
1089
                let original_batch_size = batch_size;
1090
                for i in 0..original_batch_size {
1091
                    // We've already got the current front index. Now execute the fence to
1092
                    // synchronize with other threads.
1093
                    atomic::fence(Ordering::SeqCst);
1094
1095
                    // Load the back index.
1096
                    let b = self.inner.back.load(Ordering::Acquire);
1097
1098
                    // Is the queue empty?
1099
                    if b.wrapping_sub(f) <= 0 {
1100
                        batch_size = i;
1101
                        break;
1102
                    }
1103
1104
                    // Read the task at the front.
1105
                    let tmp = unsafe { buffer.deref().read(f) };
1106
1107
                    // Try incrementing the front index to steal the task.
1108
                    // If the buffer has been swapped or the increment fails, we retry.
1109
                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1110
                        || self
1111
                            .inner
1112
                            .front
1113
                            .compare_exchange(
1114
                                f,
1115
                                f.wrapping_add(1),
1116
                                Ordering::SeqCst,
1117
                                Ordering::Relaxed,
1118
                            )
1119
                            .is_err()
1120
                    {
1121
                        // We didn't steal this task, forget it and break from the loop.
1122
                        batch_size = i;
1123
                        break;
1124
                    }
1125
1126
                    // Write the previously stolen task into the destination buffer.
1127
                    unsafe {
1128
                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1129
                    }
1130
1131
                    // Move the source front index and the destination back index one step forward.
1132
                    f = f.wrapping_add(1);
1133
                    dest_b = dest_b.wrapping_add(1);
1134
                }
1135
1136
                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1137
                if dest.flavor == Flavor::Fifo {
1138
                    for i in 0..batch_size / 2 {
1139
                        unsafe {
1140
                            let i1 = dest_b.wrapping_sub(batch_size - i);
1141
                            let i2 = dest_b.wrapping_sub(i + 1);
1142
                            let t1 = dest_buffer.read(i1);
1143
                            let t2 = dest_buffer.read(i2);
1144
                            dest_buffer.write(i1, t2);
1145
                            dest_buffer.write(i2, t1);
1146
                        }
1147
                    }
1148
                }
1149
            }
1150
        }
1151
1152
        atomic::fence(Ordering::Release);
1153
1154
        // Update the back index in the destination queue.
1155
        //
1156
        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1157
        // races because it doesn't understand fences.
1158
        dest.inner.back.store(dest_b, Ordering::Release);
1159
1160
        // Return with success.
1161
        Steal::Success(unsafe { task.assume_init() })
1162
    }
1163
}
1164
1165
impl<T> Clone for Stealer<T> {
1166
    fn clone(&self) -> Stealer<T> {
1167
        Stealer {
1168
            inner: self.inner.clone(),
1169
            flavor: self.flavor,
1170
        }
1171
    }
1172
}
1173
1174
impl<T> fmt::Debug for Stealer<T> {
1175
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176
        f.pad("Stealer { .. }")
1177
    }
1178
}
1179
1180
// Bits indicating the state of a slot:
1181
// * If a task has been written into the slot, `WRITE` is set.
1182
// * If a task has been read from the slot, `READ` is set.
1183
// * If the block is being destroyed, `DESTROY` is set.
1184
const WRITE: usize = 1;
1185
const READ: usize = 2;
1186
const DESTROY: usize = 4;
1187
1188
// Each block covers one "lap" of indices.
1189
const LAP: usize = 64;
1190
// The maximum number of values a block can hold.
1191
const BLOCK_CAP: usize = LAP - 1;
1192
// How many lower bits are reserved for metadata.
1193
const SHIFT: usize = 1;
1194
// Indicates that the block is not the last one.
1195
const HAS_NEXT: usize = 1;
1196
1197
/// A slot in a block.
1198
struct Slot<T> {
1199
    /// The task.
1200
    task: UnsafeCell<MaybeUninit<T>>,
1201
1202
    /// The state of the slot.
1203
    state: AtomicUsize,
1204
}
1205
1206
impl<T> Slot<T> {
1207
    /// Waits until a task is written into the slot.
1208
0
    fn wait_write(&self) {
1209
0
        let backoff = Backoff::new();
1210
0
        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1211
0
            backoff.snooze();
1212
0
        }
1213
0
    }
1214
}
1215
1216
/// A block in a linked list.
1217
///
1218
/// Each block in the list can hold up to `BLOCK_CAP` values.
1219
struct Block<T> {
1220
    /// The next block in the linked list.
1221
    next: AtomicPtr<Block<T>>,
1222
1223
    /// Slots for values.
1224
    slots: [Slot<T>; BLOCK_CAP],
1225
}
1226
1227
impl<T> Block<T> {
1228
    const LAYOUT: Layout = {
1229
        let layout = Layout::new::<Self>();
1230
        assert!(
1231
            layout.size() != 0,
1232
            "Block should never be zero-sized, as it has an AtomicPtr field"
1233
        );
1234
        layout
1235
    };
1236
1237
    /// Creates an empty block.
1238
0
    fn new() -> Box<Self> {
1239
0
        // SAFETY: layout is not zero-sized
1240
0
        let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1241
0
        // Handle allocation failure
1242
0
        if ptr.is_null() {
1243
0
            handle_alloc_error(Self::LAYOUT)
1244
0
        }
1245
0
        // SAFETY: This is safe because:
1246
0
        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1247
0
        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1248
0
        //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1249
0
        //       holds a MaybeUninit.
1250
0
        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1251
0
        // TODO: unsafe { Box::new_zeroed().assume_init() }
1252
0
        unsafe { Box::from_raw(ptr.cast()) }
1253
0
    }
1254
1255
    /// Waits until the next pointer is set.
1256
0
    fn wait_next(&self) -> *mut Block<T> {
1257
0
        let backoff = Backoff::new();
1258
        loop {
1259
0
            let next = self.next.load(Ordering::Acquire);
1260
0
            if !next.is_null() {
1261
0
                return next;
1262
0
            }
1263
0
            backoff.snooze();
1264
        }
1265
0
    }
1266
1267
    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1268
0
    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1269
        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1270
        // begun destruction of the block.
1271
0
        for i in (0..count).rev() {
1272
0
            let slot = (*this).slots.get_unchecked(i);
1273
0
1274
0
            // Mark the `DESTROY` bit if a thread is still using the slot.
1275
0
            if slot.state.load(Ordering::Acquire) & READ == 0
1276
0
                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1277
            {
1278
                // If a thread is still using the slot, it will continue destruction of the block.
1279
0
                return;
1280
0
            }
1281
        }
1282
1283
        // No thread is using the block, now it is safe to destroy it.
1284
0
        drop(Box::from_raw(this));
1285
0
    }
1286
}
1287
1288
/// A position in a queue.
1289
struct Position<T> {
1290
    /// The index in the queue.
1291
    index: AtomicUsize,
1292
1293
    /// The block in the linked list.
1294
    block: AtomicPtr<Block<T>>,
1295
}
1296
1297
/// An injector queue.
1298
///
1299
/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1300
/// a single injector queue, which is the entry point for new tasks.
1301
///
1302
/// # Examples
1303
///
1304
/// ```
1305
/// use crossbeam_deque::{Injector, Steal};
1306
///
1307
/// let q = Injector::new();
1308
/// q.push(1);
1309
/// q.push(2);
1310
///
1311
/// assert_eq!(q.steal(), Steal::Success(1));
1312
/// assert_eq!(q.steal(), Steal::Success(2));
1313
/// assert_eq!(q.steal(), Steal::Empty);
1314
/// ```
1315
pub struct Injector<T> {
1316
    /// The head of the queue.
1317
    head: CachePadded<Position<T>>,
1318
1319
    /// The tail of the queue.
1320
    tail: CachePadded<Position<T>>,
1321
1322
    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1323
    _marker: PhantomData<T>,
1324
}
1325
1326
unsafe impl<T: Send> Send for Injector<T> {}
1327
unsafe impl<T: Send> Sync for Injector<T> {}
1328
1329
impl<T> Default for Injector<T> {
1330
0
    fn default() -> Self {
1331
0
        let block = Box::into_raw(Block::<T>::new());
1332
0
        Self {
1333
0
            head: CachePadded::new(Position {
1334
0
                block: AtomicPtr::new(block),
1335
0
                index: AtomicUsize::new(0),
1336
0
            }),
1337
0
            tail: CachePadded::new(Position {
1338
0
                block: AtomicPtr::new(block),
1339
0
                index: AtomicUsize::new(0),
1340
0
            }),
1341
0
            _marker: PhantomData,
1342
0
        }
1343
0
    }
1344
}
1345
1346
impl<T> Injector<T> {
1347
    /// Creates a new injector queue.
1348
    ///
1349
    /// # Examples
1350
    ///
1351
    /// ```
1352
    /// use crossbeam_deque::Injector;
1353
    ///
1354
    /// let q = Injector::<i32>::new();
1355
    /// ```
1356
0
    pub fn new() -> Injector<T> {
1357
0
        Self::default()
1358
0
    }
1359
1360
    /// Pushes a task into the queue.
1361
    ///
1362
    /// # Examples
1363
    ///
1364
    /// ```
1365
    /// use crossbeam_deque::Injector;
1366
    ///
1367
    /// let w = Injector::new();
1368
    /// w.push(1);
1369
    /// w.push(2);
1370
    /// ```
1371
0
    pub fn push(&self, task: T) {
1372
0
        let backoff = Backoff::new();
1373
0
        let mut tail = self.tail.index.load(Ordering::Acquire);
1374
0
        let mut block = self.tail.block.load(Ordering::Acquire);
1375
0
        let mut next_block = None;
1376
1377
        loop {
1378
            // Calculate the offset of the index into the block.
1379
0
            let offset = (tail >> SHIFT) % LAP;
1380
0
1381
0
            // If we reached the end of the block, wait until the next one is installed.
1382
0
            if offset == BLOCK_CAP {
1383
0
                backoff.snooze();
1384
0
                tail = self.tail.index.load(Ordering::Acquire);
1385
0
                block = self.tail.block.load(Ordering::Acquire);
1386
0
                continue;
1387
0
            }
1388
0
1389
0
            // If we're going to have to install the next block, allocate it in advance in order to
1390
0
            // make the wait for other threads as short as possible.
1391
0
            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1392
0
                next_block = Some(Block::<T>::new());
1393
0
            }
1394
1395
0
            let new_tail = tail + (1 << SHIFT);
1396
0
1397
0
            // Try advancing the tail forward.
1398
0
            match self.tail.index.compare_exchange_weak(
1399
0
                tail,
1400
0
                new_tail,
1401
0
                Ordering::SeqCst,
1402
0
                Ordering::Acquire,
1403
0
            ) {
1404
                Ok(_) => unsafe {
1405
                    // If we've reached the end of the block, install the next one.
1406
0
                    if offset + 1 == BLOCK_CAP {
1407
0
                        let next_block = Box::into_raw(next_block.unwrap());
1408
0
                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1409
0
1410
0
                        self.tail.block.store(next_block, Ordering::Release);
1411
0
                        self.tail.index.store(next_index, Ordering::Release);
1412
0
                        (*block).next.store(next_block, Ordering::Release);
1413
0
                    }
1414
1415
                    // Write the task into the slot.
1416
0
                    let slot = (*block).slots.get_unchecked(offset);
1417
0
                    slot.task.get().write(MaybeUninit::new(task));
1418
0
                    slot.state.fetch_or(WRITE, Ordering::Release);
1419
0
1420
0
                    return;
1421
                },
1422
0
                Err(t) => {
1423
0
                    tail = t;
1424
0
                    block = self.tail.block.load(Ordering::Acquire);
1425
0
                    backoff.spin();
1426
0
                }
1427
            }
1428
        }
1429
0
    }
1430
1431
    /// Steals a task from the queue.
1432
    ///
1433
    /// # Examples
1434
    ///
1435
    /// ```
1436
    /// use crossbeam_deque::{Injector, Steal};
1437
    ///
1438
    /// let q = Injector::new();
1439
    /// q.push(1);
1440
    /// q.push(2);
1441
    ///
1442
    /// assert_eq!(q.steal(), Steal::Success(1));
1443
    /// assert_eq!(q.steal(), Steal::Success(2));
1444
    /// assert_eq!(q.steal(), Steal::Empty);
1445
    /// ```
1446
0
    pub fn steal(&self) -> Steal<T> {
1447
0
        let mut head;
1448
0
        let mut block;
1449
0
        let mut offset;
1450
0
1451
0
        let backoff = Backoff::new();
1452
        loop {
1453
0
            head = self.head.index.load(Ordering::Acquire);
1454
0
            block = self.head.block.load(Ordering::Acquire);
1455
0
1456
0
            // Calculate the offset of the index into the block.
1457
0
            offset = (head >> SHIFT) % LAP;
1458
0
1459
0
            // If we reached the end of the block, wait until the next one is installed.
1460
0
            if offset == BLOCK_CAP {
1461
0
                backoff.snooze();
1462
0
            } else {
1463
0
                break;
1464
0
            }
1465
0
        }
1466
0
1467
0
        let mut new_head = head + (1 << SHIFT);
1468
0
1469
0
        if new_head & HAS_NEXT == 0 {
1470
0
            atomic::fence(Ordering::SeqCst);
1471
0
            let tail = self.tail.index.load(Ordering::Relaxed);
1472
0
1473
0
            // If the tail equals the head, that means the queue is empty.
1474
0
            if head >> SHIFT == tail >> SHIFT {
1475
0
                return Steal::Empty;
1476
0
            }
1477
0
1478
0
            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1479
0
            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1480
0
                new_head |= HAS_NEXT;
1481
0
            }
1482
0
        }
1483
1484
        // Try moving the head index forward.
1485
0
        if self
1486
0
            .head
1487
0
            .index
1488
0
            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1489
0
            .is_err()
1490
        {
1491
0
            return Steal::Retry;
1492
0
        }
1493
0
1494
0
        unsafe {
1495
0
            // If we've reached the end of the block, move to the next one.
1496
0
            if offset + 1 == BLOCK_CAP {
1497
0
                let next = (*block).wait_next();
1498
0
                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1499
0
                if !(*next).next.load(Ordering::Relaxed).is_null() {
1500
0
                    next_index |= HAS_NEXT;
1501
0
                }
1502
1503
0
                self.head.block.store(next, Ordering::Release);
1504
0
                self.head.index.store(next_index, Ordering::Release);
1505
0
            }
1506
1507
            // Read the task.
1508
0
            let slot = (*block).slots.get_unchecked(offset);
1509
0
            slot.wait_write();
1510
0
            let task = slot.task.get().read().assume_init();
1511
0
1512
0
            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1513
0
            // but couldn't because we were busy reading from the slot.
1514
0
            if (offset + 1 == BLOCK_CAP)
1515
0
                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1516
0
            {
1517
0
                Block::destroy(block, offset);
1518
0
            }
1519
1520
0
            Steal::Success(task)
1521
        }
1522
0
    }
1523
1524
    /// Steals a batch of tasks and pushes them into a worker.
1525
    ///
1526
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1527
    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1528
    ///
1529
    /// # Examples
1530
    ///
1531
    /// ```
1532
    /// use crossbeam_deque::{Injector, Worker};
1533
    ///
1534
    /// let q = Injector::new();
1535
    /// q.push(1);
1536
    /// q.push(2);
1537
    /// q.push(3);
1538
    /// q.push(4);
1539
    ///
1540
    /// let w = Worker::new_fifo();
1541
    /// let _ = q.steal_batch(&w);
1542
    /// assert_eq!(w.pop(), Some(1));
1543
    /// assert_eq!(w.pop(), Some(2));
1544
    /// ```
1545
    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1546
        self.steal_batch_with_limit(dest, MAX_BATCH)
1547
    }
1548
1549
    /// Steals no more than of tasks and pushes them into a worker.
1550
    ///
1551
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1552
    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1553
    ///
1554
    /// # Examples
1555
    ///
1556
    /// ```
1557
    /// use crossbeam_deque::{Injector, Worker};
1558
    ///
1559
    /// let q = Injector::new();
1560
    /// q.push(1);
1561
    /// q.push(2);
1562
    /// q.push(3);
1563
    /// q.push(4);
1564
    /// q.push(5);
1565
    /// q.push(6);
1566
    ///
1567
    /// let w = Worker::new_fifo();
1568
    /// let _ = q.steal_batch_with_limit(&w, 2);
1569
    /// assert_eq!(w.pop(), Some(1));
1570
    /// assert_eq!(w.pop(), Some(2));
1571
    /// assert_eq!(w.pop(), None);
1572
    ///
1573
    /// q.push(7);
1574
    /// q.push(8);
1575
    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1576
    /// // half of the elements are currently popped, but the number of popped elements is considered
1577
    /// // an implementation detail that may be changed in the future.
1578
    /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1579
    /// assert_eq!(w.len(), 3);
1580
    /// ```
1581
    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1582
        assert!(limit > 0);
1583
        let mut head;
1584
        let mut block;
1585
        let mut offset;
1586
1587
        let backoff = Backoff::new();
1588
        loop {
1589
            head = self.head.index.load(Ordering::Acquire);
1590
            block = self.head.block.load(Ordering::Acquire);
1591
1592
            // Calculate the offset of the index into the block.
1593
            offset = (head >> SHIFT) % LAP;
1594
1595
            // If we reached the end of the block, wait until the next one is installed.
1596
            if offset == BLOCK_CAP {
1597
                backoff.snooze();
1598
            } else {
1599
                break;
1600
            }
1601
        }
1602
1603
        let mut new_head = head;
1604
        let advance;
1605
1606
        if new_head & HAS_NEXT == 0 {
1607
            atomic::fence(Ordering::SeqCst);
1608
            let tail = self.tail.index.load(Ordering::Relaxed);
1609
1610
            // If the tail equals the head, that means the queue is empty.
1611
            if head >> SHIFT == tail >> SHIFT {
1612
                return Steal::Empty;
1613
            }
1614
1615
            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1616
            // the right batch size to steal.
1617
            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1618
                new_head |= HAS_NEXT;
1619
                // We can steal all tasks till the end of the block.
1620
                advance = (BLOCK_CAP - offset).min(limit);
1621
            } else {
1622
                let len = (tail - head) >> SHIFT;
1623
                // Steal half of the available tasks.
1624
                advance = ((len + 1) / 2).min(limit);
1625
            }
1626
        } else {
1627
            // We can steal all tasks till the end of the block.
1628
            advance = (BLOCK_CAP - offset).min(limit);
1629
        }
1630
1631
        new_head += advance << SHIFT;
1632
        let new_offset = offset + advance;
1633
1634
        // Try moving the head index forward.
1635
        if self
1636
            .head
1637
            .index
1638
            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1639
            .is_err()
1640
        {
1641
            return Steal::Retry;
1642
        }
1643
1644
        // Reserve capacity for the stolen batch.
1645
        let batch_size = new_offset - offset;
1646
        dest.reserve(batch_size);
1647
1648
        // Get the destination buffer and back index.
1649
        let dest_buffer = dest.buffer.get();
1650
        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1651
1652
        unsafe {
1653
            // If we've reached the end of the block, move to the next one.
1654
            if new_offset == BLOCK_CAP {
1655
                let next = (*block).wait_next();
1656
                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1657
                if !(*next).next.load(Ordering::Relaxed).is_null() {
1658
                    next_index |= HAS_NEXT;
1659
                }
1660
1661
                self.head.block.store(next, Ordering::Release);
1662
                self.head.index.store(next_index, Ordering::Release);
1663
            }
1664
1665
            // Copy values from the injector into the destination queue.
1666
            match dest.flavor {
1667
                Flavor::Fifo => {
1668
                    for i in 0..batch_size {
1669
                        // Read the task.
1670
                        let slot = (*block).slots.get_unchecked(offset + i);
1671
                        slot.wait_write();
1672
                        let task = slot.task.get().read();
1673
1674
                        // Write it into the destination queue.
1675
                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1676
                    }
1677
                }
1678
1679
                Flavor::Lifo => {
1680
                    for i in 0..batch_size {
1681
                        // Read the task.
1682
                        let slot = (*block).slots.get_unchecked(offset + i);
1683
                        slot.wait_write();
1684
                        let task = slot.task.get().read();
1685
1686
                        // Write it into the destination queue.
1687
                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1688
                    }
1689
                }
1690
            }
1691
1692
            atomic::fence(Ordering::Release);
1693
1694
            // Update the back index in the destination queue.
1695
            //
1696
            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1697
            // data races because it doesn't understand fences.
1698
            dest.inner
1699
                .back
1700
                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1701
1702
            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1703
            // but couldn't because we were busy reading from the slot.
1704
            if new_offset == BLOCK_CAP {
1705
                Block::destroy(block, offset);
1706
            } else {
1707
                for i in offset..new_offset {
1708
                    let slot = (*block).slots.get_unchecked(i);
1709
1710
                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1711
                        Block::destroy(block, offset);
1712
                        break;
1713
                    }
1714
                }
1715
            }
1716
1717
            Steal::Success(())
1718
        }
1719
    }
1720
1721
    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1722
    ///
1723
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1724
    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1725
    ///
1726
    /// # Examples
1727
    ///
1728
    /// ```
1729
    /// use crossbeam_deque::{Injector, Steal, Worker};
1730
    ///
1731
    /// let q = Injector::new();
1732
    /// q.push(1);
1733
    /// q.push(2);
1734
    /// q.push(3);
1735
    /// q.push(4);
1736
    ///
1737
    /// let w = Worker::new_fifo();
1738
    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1739
    /// assert_eq!(w.pop(), Some(2));
1740
    /// ```
1741
    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1742
        // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1743
        // better, but we may change it in the future to be compatible with the same method in Stealer.
1744
        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1745
    }
1746
1747
    /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1748
    ///
1749
    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1750
    /// steal around half of the tasks in the queue, but also not more than the given limit.
1751
    ///
1752
    /// # Examples
1753
    ///
1754
    /// ```
1755
    /// use crossbeam_deque::{Injector, Steal, Worker};
1756
    ///
1757
    /// let q = Injector::new();
1758
    /// q.push(1);
1759
    /// q.push(2);
1760
    /// q.push(3);
1761
    /// q.push(4);
1762
    /// q.push(5);
1763
    /// q.push(6);
1764
    ///
1765
    /// let w = Worker::new_fifo();
1766
    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1767
    /// assert_eq!(w.pop(), Some(2));
1768
    /// assert_eq!(w.pop(), None);
1769
    ///
1770
    /// q.push(7);
1771
    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1772
    /// // half of the elements are currently popped, but the number of popped elements is considered
1773
    /// // an implementation detail that may be changed in the future.
1774
    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1775
    /// assert_eq!(w.pop(), Some(4));
1776
    /// assert_eq!(w.pop(), Some(5));
1777
    /// assert_eq!(w.pop(), None);
1778
    /// ```
1779
    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1780
        assert!(limit > 0);
1781
        let mut head;
1782
        let mut block;
1783
        let mut offset;
1784
1785
        let backoff = Backoff::new();
1786
        loop {
1787
            head = self.head.index.load(Ordering::Acquire);
1788
            block = self.head.block.load(Ordering::Acquire);
1789
1790
            // Calculate the offset of the index into the block.
1791
            offset = (head >> SHIFT) % LAP;
1792
1793
            // If we reached the end of the block, wait until the next one is installed.
1794
            if offset == BLOCK_CAP {
1795
                backoff.snooze();
1796
            } else {
1797
                break;
1798
            }
1799
        }
1800
1801
        let mut new_head = head;
1802
        let advance;
1803
1804
        if new_head & HAS_NEXT == 0 {
1805
            atomic::fence(Ordering::SeqCst);
1806
            let tail = self.tail.index.load(Ordering::Relaxed);
1807
1808
            // If the tail equals the head, that means the queue is empty.
1809
            if head >> SHIFT == tail >> SHIFT {
1810
                return Steal::Empty;
1811
            }
1812
1813
            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1814
            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1815
                new_head |= HAS_NEXT;
1816
                // We can steal all tasks till the end of the block.
1817
                advance = (BLOCK_CAP - offset).min(limit);
1818
            } else {
1819
                let len = (tail - head) >> SHIFT;
1820
                // Steal half of the available tasks.
1821
                advance = ((len + 1) / 2).min(limit);
1822
            }
1823
        } else {
1824
            // We can steal all tasks till the end of the block.
1825
            advance = (BLOCK_CAP - offset).min(limit);
1826
        }
1827
1828
        new_head += advance << SHIFT;
1829
        let new_offset = offset + advance;
1830
1831
        // Try moving the head index forward.
1832
        if self
1833
            .head
1834
            .index
1835
            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1836
            .is_err()
1837
        {
1838
            return Steal::Retry;
1839
        }
1840
1841
        // Reserve capacity for the stolen batch.
1842
        let batch_size = new_offset - offset - 1;
1843
        dest.reserve(batch_size);
1844
1845
        // Get the destination buffer and back index.
1846
        let dest_buffer = dest.buffer.get();
1847
        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1848
1849
        unsafe {
1850
            // If we've reached the end of the block, move to the next one.
1851
            if new_offset == BLOCK_CAP {
1852
                let next = (*block).wait_next();
1853
                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1854
                if !(*next).next.load(Ordering::Relaxed).is_null() {
1855
                    next_index |= HAS_NEXT;
1856
                }
1857
1858
                self.head.block.store(next, Ordering::Release);
1859
                self.head.index.store(next_index, Ordering::Release);
1860
            }
1861
1862
            // Read the task.
1863
            let slot = (*block).slots.get_unchecked(offset);
1864
            slot.wait_write();
1865
            let task = slot.task.get().read();
1866
1867
            match dest.flavor {
1868
                Flavor::Fifo => {
1869
                    // Copy values from the injector into the destination queue.
1870
                    for i in 0..batch_size {
1871
                        // Read the task.
1872
                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1873
                        slot.wait_write();
1874
                        let task = slot.task.get().read();
1875
1876
                        // Write it into the destination queue.
1877
                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1878
                    }
1879
                }
1880
1881
                Flavor::Lifo => {
1882
                    // Copy values from the injector into the destination queue.
1883
                    for i in 0..batch_size {
1884
                        // Read the task.
1885
                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1886
                        slot.wait_write();
1887
                        let task = slot.task.get().read();
1888
1889
                        // Write it into the destination queue.
1890
                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1891
                    }
1892
                }
1893
            }
1894
1895
            atomic::fence(Ordering::Release);
1896
1897
            // Update the back index in the destination queue.
1898
            //
1899
            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1900
            // data races because it doesn't understand fences.
1901
            dest.inner
1902
                .back
1903
                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1904
1905
            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1906
            // but couldn't because we were busy reading from the slot.
1907
            if new_offset == BLOCK_CAP {
1908
                Block::destroy(block, offset);
1909
            } else {
1910
                for i in offset..new_offset {
1911
                    let slot = (*block).slots.get_unchecked(i);
1912
1913
                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1914
                        Block::destroy(block, offset);
1915
                        break;
1916
                    }
1917
                }
1918
            }
1919
1920
            Steal::Success(task.assume_init())
1921
        }
1922
    }
1923
1924
    /// Returns `true` if the queue is empty.
1925
    ///
1926
    /// # Examples
1927
    ///
1928
    /// ```
1929
    /// use crossbeam_deque::Injector;
1930
    ///
1931
    /// let q = Injector::new();
1932
    ///
1933
    /// assert!(q.is_empty());
1934
    /// q.push(1);
1935
    /// assert!(!q.is_empty());
1936
    /// ```
1937
0
    pub fn is_empty(&self) -> bool {
1938
0
        let head = self.head.index.load(Ordering::SeqCst);
1939
0
        let tail = self.tail.index.load(Ordering::SeqCst);
1940
0
        head >> SHIFT == tail >> SHIFT
1941
0
    }
1942
1943
    /// Returns the number of tasks in the queue.
1944
    ///
1945
    /// # Examples
1946
    ///
1947
    /// ```
1948
    /// use crossbeam_deque::Injector;
1949
    ///
1950
    /// let q = Injector::new();
1951
    ///
1952
    /// assert_eq!(q.len(), 0);
1953
    /// q.push(1);
1954
    /// assert_eq!(q.len(), 1);
1955
    /// q.push(1);
1956
    /// assert_eq!(q.len(), 2);
1957
    /// ```
1958
    pub fn len(&self) -> usize {
1959
        loop {
1960
            // Load the tail index, then load the head index.
1961
            let mut tail = self.tail.index.load(Ordering::SeqCst);
1962
            let mut head = self.head.index.load(Ordering::SeqCst);
1963
1964
            // If the tail index didn't change, we've got consistent indices to work with.
1965
            if self.tail.index.load(Ordering::SeqCst) == tail {
1966
                // Erase the lower bits.
1967
                tail &= !((1 << SHIFT) - 1);
1968
                head &= !((1 << SHIFT) - 1);
1969
1970
                // Fix up indices if they fall onto block ends.
1971
                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1972
                    tail = tail.wrapping_add(1 << SHIFT);
1973
                }
1974
                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1975
                    head = head.wrapping_add(1 << SHIFT);
1976
                }
1977
1978
                // Rotate indices so that head falls into the first block.
1979
                let lap = (head >> SHIFT) / LAP;
1980
                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1981
                head = head.wrapping_sub((lap * LAP) << SHIFT);
1982
1983
                // Remove the lower bits.
1984
                tail >>= SHIFT;
1985
                head >>= SHIFT;
1986
1987
                // Return the difference minus the number of blocks between tail and head.
1988
                return tail - head - tail / LAP;
1989
            }
1990
        }
1991
    }
1992
}
1993
1994
impl<T> Drop for Injector<T> {
1995
0
    fn drop(&mut self) {
1996
0
        let mut head = *self.head.index.get_mut();
1997
0
        let mut tail = *self.tail.index.get_mut();
1998
0
        let mut block = *self.head.block.get_mut();
1999
0
2000
0
        // Erase the lower bits.
2001
0
        head &= !((1 << SHIFT) - 1);
2002
0
        tail &= !((1 << SHIFT) - 1);
2003
2004
        unsafe {
2005
            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2006
0
            while head != tail {
2007
0
                let offset = (head >> SHIFT) % LAP;
2008
0
2009
0
                if offset < BLOCK_CAP {
2010
0
                    // Drop the task in the slot.
2011
0
                    let slot = (*block).slots.get_unchecked(offset);
2012
0
                    (*slot.task.get()).assume_init_drop();
2013
0
                } else {
2014
0
                    // Deallocate the block and move to the next one.
2015
0
                    let next = *(*block).next.get_mut();
2016
0
                    drop(Box::from_raw(block));
2017
0
                    block = next;
2018
0
                }
2019
2020
0
                head = head.wrapping_add(1 << SHIFT);
2021
            }
2022
2023
            // Deallocate the last remaining block.
2024
0
            drop(Box::from_raw(block));
2025
0
        }
2026
0
    }
2027
}
2028
2029
impl<T> fmt::Debug for Injector<T> {
2030
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2031
        f.pad("Worker { .. }")
2032
    }
2033
}
2034
2035
/// Possible outcomes of a steal operation.
2036
///
2037
/// # Examples
2038
///
2039
/// There are lots of ways to chain results of steal operations together:
2040
///
2041
/// ```
2042
/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2043
///
2044
/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2045
///
2046
/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2047
/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2048
/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2049
///
2050
/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2051
/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2052
/// ```
2053
#[must_use]
2054
#[derive(PartialEq, Eq, Copy, Clone)]
2055
pub enum Steal<T> {
2056
    /// The queue was empty at the time of stealing.
2057
    Empty,
2058
2059
    /// At least one task was successfully stolen.
2060
    Success(T),
2061
2062
    /// The steal operation needs to be retried.
2063
    Retry,
2064
}
2065
2066
impl<T> Steal<T> {
2067
    /// Returns `true` if the queue was empty at the time of stealing.
2068
    ///
2069
    /// # Examples
2070
    ///
2071
    /// ```
2072
    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2073
    ///
2074
    /// assert!(!Success(7).is_empty());
2075
    /// assert!(!Retry::<i32>.is_empty());
2076
    ///
2077
    /// assert!(Empty::<i32>.is_empty());
2078
    /// ```
2079
    pub fn is_empty(&self) -> bool {
2080
        match self {
2081
            Steal::Empty => true,
2082
            _ => false,
2083
        }
2084
    }
2085
2086
    /// Returns `true` if at least one task was stolen.
2087
    ///
2088
    /// # Examples
2089
    ///
2090
    /// ```
2091
    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2092
    ///
2093
    /// assert!(!Empty::<i32>.is_success());
2094
    /// assert!(!Retry::<i32>.is_success());
2095
    ///
2096
    /// assert!(Success(7).is_success());
2097
    /// ```
2098
    pub fn is_success(&self) -> bool {
2099
        match self {
2100
            Steal::Success(_) => true,
2101
            _ => false,
2102
        }
2103
    }
2104
2105
    /// Returns `true` if the steal operation needs to be retried.
2106
    ///
2107
    /// # Examples
2108
    ///
2109
    /// ```
2110
    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2111
    ///
2112
    /// assert!(!Empty::<i32>.is_retry());
2113
    /// assert!(!Success(7).is_retry());
2114
    ///
2115
    /// assert!(Retry::<i32>.is_retry());
2116
    /// ```
2117
    pub fn is_retry(&self) -> bool {
2118
        match self {
2119
            Steal::Retry => true,
2120
            _ => false,
2121
        }
2122
    }
2123
2124
    /// Returns the result of the operation, if successful.
2125
    ///
2126
    /// # Examples
2127
    ///
2128
    /// ```
2129
    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2130
    ///
2131
    /// assert_eq!(Empty::<i32>.success(), None);
2132
    /// assert_eq!(Retry::<i32>.success(), None);
2133
    ///
2134
    /// assert_eq!(Success(7).success(), Some(7));
2135
    /// ```
2136
    pub fn success(self) -> Option<T> {
2137
        match self {
2138
            Steal::Success(res) => Some(res),
2139
            _ => None,
2140
        }
2141
    }
2142
2143
    /// If no task was stolen, attempts another steal operation.
2144
    ///
2145
    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2146
    ///
2147
    /// * If the second steal resulted in `Success`, it is returned.
2148
    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2149
    /// * If both resulted in `None`, then `None` is returned.
2150
    ///
2151
    /// # Examples
2152
    ///
2153
    /// ```
2154
    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2155
    ///
2156
    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2157
    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2158
    ///
2159
    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2160
    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2161
    ///
2162
    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2163
    /// ```
2164
    pub fn or_else<F>(self, f: F) -> Steal<T>
2165
    where
2166
        F: FnOnce() -> Steal<T>,
2167
    {
2168
        match self {
2169
            Steal::Empty => f(),
2170
            Steal::Success(_) => self,
2171
            Steal::Retry => {
2172
                if let Steal::Success(res) = f() {
2173
                    Steal::Success(res)
2174
                } else {
2175
                    Steal::Retry
2176
                }
2177
            }
2178
        }
2179
    }
2180
}
2181
2182
impl<T> fmt::Debug for Steal<T> {
2183
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2184
        match self {
2185
            Steal::Empty => f.pad("Empty"),
2186
            Steal::Success(_) => f.pad("Success(..)"),
2187
            Steal::Retry => f.pad("Retry"),
2188
        }
2189
    }
2190
}
2191
2192
impl<T> FromIterator<Steal<T>> for Steal<T> {
2193
    /// Consumes items until a `Success` is found and returns it.
2194
    ///
2195
    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2196
    /// Otherwise, `Empty` is returned.
2197
    fn from_iter<I>(iter: I) -> Steal<T>
2198
    where
2199
        I: IntoIterator<Item = Steal<T>>,
2200
    {
2201
        let mut retry = false;
2202
        for s in iter {
2203
            match &s {
2204
                Steal::Empty => {}
2205
                Steal::Success(_) => return s,
2206
                Steal::Retry => retry = true,
2207
            }
2208
        }
2209
2210
        if retry {
2211
            Steal::Retry
2212
        } else {
2213
            Steal::Empty
2214
        }
2215
    }
2216
}