/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-queue-0.3.11/src/seg_queue.rs
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | use alloc::boxed::Box; | 
| 2 |  | use core::cell::UnsafeCell; | 
| 3 |  | use core::fmt; | 
| 4 |  | use core::marker::PhantomData; | 
| 5 |  | use core::mem::MaybeUninit; | 
| 6 |  | use core::panic::{RefUnwindSafe, UnwindSafe}; | 
| 7 |  | use core::ptr; | 
| 8 |  | use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; | 
| 9 |  |  | 
| 10 |  | use crossbeam_utils::{Backoff, CachePadded}; | 
| 11 |  |  | 
| 12 |  | // Bits indicating the state of a slot: | 
| 13 |  | // * If a value has been written into the slot, `WRITE` is set. | 
| 14 |  | // * If a value has been read from the slot, `READ` is set. | 
| 15 |  | // * If the block is being destroyed, `DESTROY` is set. | 
| 16 |  | const WRITE: usize = 1; | 
| 17 |  | const READ: usize = 2; | 
| 18 |  | const DESTROY: usize = 4; | 
| 19 |  |  | 
| 20 |  | // Each block covers one "lap" of indices. | 
| 21 |  | const LAP: usize = 32; | 
| 22 |  | // The maximum number of values a block can hold. | 
| 23 |  | const BLOCK_CAP: usize = LAP - 1; | 
| 24 |  | // How many lower bits are reserved for metadata. | 
| 25 |  | const SHIFT: usize = 1; | 
| 26 |  | // Indicates that the block is not the last one. | 
| 27 |  | const HAS_NEXT: usize = 1; | 
| 28 |  |  | 
| 29 |  | /// A slot in a block. | 
| 30 |  | struct Slot<T> { | 
| 31 |  |     /// The value. | 
| 32 |  |     value: UnsafeCell<MaybeUninit<T>>, | 
| 33 |  |  | 
| 34 |  |     /// The state of the slot. | 
| 35 |  |     state: AtomicUsize, | 
| 36 |  | } | 
| 37 |  |  | 
| 38 |  | impl<T> Slot<T> { | 
| 39 |  |     const UNINIT: Self = Self { | 
| 40 |  |         value: UnsafeCell::new(MaybeUninit::uninit()), | 
| 41 |  |         state: AtomicUsize::new(0), | 
| 42 |  |     }; | 
| 43 |  |  | 
| 44 |  |     /// Waits until a value is written into the slot. | 
| 45 | 0 |     fn wait_write(&self) { | 
| 46 | 0 |         let backoff = Backoff::new(); | 
| 47 | 0 |         while self.state.load(Ordering::Acquire) & WRITE == 0 { | 
| 48 | 0 |             backoff.snooze(); | 
| 49 | 0 |         } | 
| 50 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::Slot<corosensei::stack::unix::DefaultStack>>::wait_writeUnexecuted instantiation: <crossbeam_queue::seg_queue::Slot<corosensei::stack::unix::DefaultStack>>::wait_write | 
| 51 |  | } | 
| 52 |  |  | 
| 53 |  | /// A block in a linked list. | 
| 54 |  | /// | 
| 55 |  | /// Each block in the list can hold up to `BLOCK_CAP` values. | 
| 56 |  | struct Block<T> { | 
| 57 |  |     /// The next block in the linked list. | 
| 58 |  |     next: AtomicPtr<Block<T>>, | 
| 59 |  |  | 
| 60 |  |     /// Slots for values. | 
| 61 |  |     slots: [Slot<T>; BLOCK_CAP], | 
| 62 |  | } | 
| 63 |  |  | 
| 64 |  | impl<T> Block<T> { | 
| 65 |  |     /// Creates an empty block that starts at `start_index`. | 
| 66 | 0 |     fn new() -> Block<T> { | 
| 67 | 0 |         Self { | 
| 68 | 0 |             next: AtomicPtr::new(ptr::null_mut()), | 
| 69 | 0 |             slots: [Slot::UNINIT; BLOCK_CAP], | 
| 70 | 0 |         } | 
| 71 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::newUnexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::new | 
| 72 |  |  | 
| 73 |  |     /// Waits until the next pointer is set. | 
| 74 | 0 |     fn wait_next(&self) -> *mut Block<T> { | 
| 75 | 0 |         let backoff = Backoff::new(); | 
| 76 |  |         loop { | 
| 77 | 0 |             let next = self.next.load(Ordering::Acquire); | 
| 78 | 0 |             if !next.is_null() { | 
| 79 | 0 |                 return next; | 
| 80 | 0 |             } | 
| 81 | 0 |             backoff.snooze(); | 
| 82 |  |         } | 
| 83 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::wait_nextUnexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::wait_next | 
| 84 |  |  | 
| 85 |  |     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. | 
| 86 | 0 |     unsafe fn destroy(this: *mut Block<T>, start: usize) { | 
| 87 |  |         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has | 
| 88 |  |         // begun destruction of the block. | 
| 89 | 0 |         for i in start..BLOCK_CAP - 1 { | 
| 90 | 0 |             let slot = (*this).slots.get_unchecked(i); | 
| 91 | 0 | 
 | 
| 92 | 0 |             // Mark the `DESTROY` bit if a thread is still using the slot. | 
| 93 | 0 |             if slot.state.load(Ordering::Acquire) & READ == 0 | 
| 94 | 0 |                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 | 
| 95 |  |             { | 
| 96 |  |                 // If a thread is still using the slot, it will continue destruction of the block. | 
| 97 | 0 |                 return; | 
| 98 | 0 |             } | 
| 99 |  |         } | 
| 100 |  |  | 
| 101 |  |         // No thread is using the block, now it is safe to destroy it. | 
| 102 | 0 |         drop(Box::from_raw(this)); | 
| 103 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::destroyUnexecuted instantiation: <crossbeam_queue::seg_queue::Block<corosensei::stack::unix::DefaultStack>>::destroy | 
| 104 |  | } | 
| 105 |  |  | 
| 106 |  | /// A position in a queue. | 
| 107 |  | struct Position<T> { | 
| 108 |  |     /// The index in the queue. | 
| 109 |  |     index: AtomicUsize, | 
| 110 |  |  | 
| 111 |  |     /// The block in the linked list. | 
| 112 |  |     block: AtomicPtr<Block<T>>, | 
| 113 |  | } | 
| 114 |  |  | 
| 115 |  | /// An unbounded multi-producer multi-consumer queue. | 
| 116 |  | /// | 
| 117 |  | /// This queue is implemented as a linked list of segments, where each segment is a small buffer | 
| 118 |  | /// that can hold a handful of elements. There is no limit to how many elements can be in the queue | 
| 119 |  | /// at a time. However, since segments need to be dynamically allocated as elements get pushed, | 
| 120 |  | /// this queue is somewhat slower than [`ArrayQueue`]. | 
| 121 |  | /// | 
| 122 |  | /// [`ArrayQueue`]: super::ArrayQueue | 
| 123 |  | /// | 
| 124 |  | /// # Examples | 
| 125 |  | /// | 
| 126 |  | /// ``` | 
| 127 |  | /// use crossbeam_queue::SegQueue; | 
| 128 |  | /// | 
| 129 |  | /// let q = SegQueue::new(); | 
| 130 |  | /// | 
| 131 |  | /// q.push('a'); | 
| 132 |  | /// q.push('b'); | 
| 133 |  | /// | 
| 134 |  | /// assert_eq!(q.pop(), Some('a')); | 
| 135 |  | /// assert_eq!(q.pop(), Some('b')); | 
| 136 |  | /// assert!(q.pop().is_none()); | 
| 137 |  | /// ``` | 
| 138 |  | pub struct SegQueue<T> { | 
| 139 |  |     /// The head of the queue. | 
| 140 |  |     head: CachePadded<Position<T>>, | 
| 141 |  |  | 
| 142 |  |     /// The tail of the queue. | 
| 143 |  |     tail: CachePadded<Position<T>>, | 
| 144 |  |  | 
| 145 |  |     /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. | 
| 146 |  |     _marker: PhantomData<T>, | 
| 147 |  | } | 
| 148 |  |  | 
| 149 |  | unsafe impl<T: Send> Send for SegQueue<T> {} | 
| 150 |  | unsafe impl<T: Send> Sync for SegQueue<T> {} | 
| 151 |  |  | 
| 152 |  | impl<T> UnwindSafe for SegQueue<T> {} | 
| 153 |  | impl<T> RefUnwindSafe for SegQueue<T> {} | 
| 154 |  |  | 
| 155 |  | impl<T> SegQueue<T> { | 
| 156 |  |     /// Creates a new unbounded queue. | 
| 157 |  |     /// | 
| 158 |  |     /// # Examples | 
| 159 |  |     /// | 
| 160 |  |     /// ``` | 
| 161 |  |     /// use crossbeam_queue::SegQueue; | 
| 162 |  |     /// | 
| 163 |  |     /// let q = SegQueue::<i32>::new(); | 
| 164 |  |     /// ``` | 
| 165 | 0 |     pub const fn new() -> SegQueue<T> { | 
| 166 | 0 |         SegQueue { | 
| 167 | 0 |             head: CachePadded::new(Position { | 
| 168 | 0 |                 block: AtomicPtr::new(ptr::null_mut()), | 
| 169 | 0 |                 index: AtomicUsize::new(0), | 
| 170 | 0 |             }), | 
| 171 | 0 |             tail: CachePadded::new(Position { | 
| 172 | 0 |                 block: AtomicPtr::new(ptr::null_mut()), | 
| 173 | 0 |                 index: AtomicUsize::new(0), | 
| 174 | 0 |             }), | 
| 175 | 0 |             _marker: PhantomData, | 
| 176 | 0 |         } | 
| 177 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::newUnexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::new | 
| 178 |  |  | 
| 179 |  |     /// Pushes an element into the queue. | 
| 180 |  |     /// | 
| 181 |  |     /// # Examples | 
| 182 |  |     /// | 
| 183 |  |     /// ``` | 
| 184 |  |     /// use crossbeam_queue::SegQueue; | 
| 185 |  |     /// | 
| 186 |  |     /// let q = SegQueue::new(); | 
| 187 |  |     /// | 
| 188 |  |     /// q.push(10); | 
| 189 |  |     /// q.push(20); | 
| 190 |  |     /// ``` | 
| 191 | 0 |     pub fn push(&self, value: T) { | 
| 192 | 0 |         let backoff = Backoff::new(); | 
| 193 | 0 |         let mut tail = self.tail.index.load(Ordering::Acquire); | 
| 194 | 0 |         let mut block = self.tail.block.load(Ordering::Acquire); | 
| 195 | 0 |         let mut next_block = None; | 
| 196 |  |  | 
| 197 | 0 |         loop { | 
| 198 | 0 |             // Calculate the offset of the index into the block. | 
| 199 | 0 |             let offset = (tail >> SHIFT) % LAP; | 
| 200 | 0 | 
 | 
| 201 | 0 |             // If we reached the end of the block, wait until the next one is installed. | 
| 202 | 0 |             if offset == BLOCK_CAP { | 
| 203 | 0 |                 backoff.snooze(); | 
| 204 | 0 |                 tail = self.tail.index.load(Ordering::Acquire); | 
| 205 | 0 |                 block = self.tail.block.load(Ordering::Acquire); | 
| 206 | 0 |                 continue; | 
| 207 | 0 |             } | 
| 208 | 0 | 
 | 
| 209 | 0 |             // If we're going to have to install the next block, allocate it in advance in order to | 
| 210 | 0 |             // make the wait for other threads as short as possible. | 
| 211 | 0 |             if offset + 1 == BLOCK_CAP && next_block.is_none() { | 
| 212 | 0 |                 next_block = Some(Box::new(Block::<T>::new())); | 
| 213 | 0 |             } | 
| 214 |  |  | 
| 215 |  |             // If this is the first push operation, we need to allocate the first block. | 
| 216 | 0 |             if block.is_null() { | 
| 217 | 0 |                 let new = Box::into_raw(Box::new(Block::<T>::new())); | 
| 218 | 0 | 
 | 
| 219 | 0 |                 if self | 
| 220 | 0 |                     .tail | 
| 221 | 0 |                     .block | 
| 222 | 0 |                     .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) | 
| 223 | 0 |                     .is_ok() | 
| 224 | 0 |                 { | 
| 225 | 0 |                     self.head.block.store(new, Ordering::Release); | 
| 226 | 0 |                     block = new; | 
| 227 | 0 |                 } else { | 
| 228 | 0 |                     next_block = unsafe { Some(Box::from_raw(new)) }; | 
| 229 | 0 |                     tail = self.tail.index.load(Ordering::Acquire); | 
| 230 | 0 |                     block = self.tail.block.load(Ordering::Acquire); | 
| 231 | 0 |                     continue; | 
| 232 |  |                 } | 
| 233 | 0 |             } | 
| 234 |  |  | 
| 235 | 0 |             let new_tail = tail + (1 << SHIFT); | 
| 236 | 0 | 
 | 
| 237 | 0 |             // Try advancing the tail forward. | 
| 238 | 0 |             match self.tail.index.compare_exchange_weak( | 
| 239 | 0 |                 tail, | 
| 240 | 0 |                 new_tail, | 
| 241 | 0 |                 Ordering::SeqCst, | 
| 242 | 0 |                 Ordering::Acquire, | 
| 243 | 0 |             ) { | 
| 244 |  |                 Ok(_) => unsafe { | 
| 245 |  |                     // If we've reached the end of the block, install the next one. | 
| 246 | 0 |                     if offset + 1 == BLOCK_CAP { | 
| 247 | 0 |                         let next_block = Box::into_raw(next_block.unwrap()); | 
| 248 | 0 |                         let next_index = new_tail.wrapping_add(1 << SHIFT); | 
| 249 | 0 | 
 | 
| 250 | 0 |                         self.tail.block.store(next_block, Ordering::Release); | 
| 251 | 0 |                         self.tail.index.store(next_index, Ordering::Release); | 
| 252 | 0 |                         (*block).next.store(next_block, Ordering::Release); | 
| 253 | 0 |                     } | 
| 254 |  |  | 
| 255 |  |                     // Write the value into the slot. | 
| 256 | 0 |                     let slot = (*block).slots.get_unchecked(offset); | 
| 257 | 0 |                     slot.value.get().write(MaybeUninit::new(value)); | 
| 258 | 0 |                     slot.state.fetch_or(WRITE, Ordering::Release); | 
| 259 | 0 | 
 | 
| 260 | 0 |                     return; | 
| 261 |  |                 }, | 
| 262 | 0 |                 Err(t) => { | 
| 263 | 0 |                     tail = t; | 
| 264 | 0 |                     block = self.tail.block.load(Ordering::Acquire); | 
| 265 | 0 |                     backoff.spin(); | 
| 266 | 0 |                 } | 
| 267 |  |             } | 
| 268 |  |         } | 
| 269 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::pushUnexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::push | 
| 270 |  |  | 
| 271 |  |     /// Pops an element from the queue. | 
| 272 |  |     /// | 
| 273 |  |     /// If the queue is empty, `None` is returned. | 
| 274 |  |     /// | 
| 275 |  |     /// # Examples | 
| 276 |  |     /// | 
| 277 |  |     /// ``` | 
| 278 |  |     /// use crossbeam_queue::SegQueue; | 
| 279 |  |     /// | 
| 280 |  |     /// let q = SegQueue::new(); | 
| 281 |  |     /// | 
| 282 |  |     /// q.push(10); | 
| 283 |  |     /// assert_eq!(q.pop(), Some(10)); | 
| 284 |  |     /// assert!(q.pop().is_none()); | 
| 285 |  |     /// ``` | 
| 286 | 0 |     pub fn pop(&self) -> Option<T> { | 
| 287 | 0 |         let backoff = Backoff::new(); | 
| 288 | 0 |         let mut head = self.head.index.load(Ordering::Acquire); | 
| 289 | 0 |         let mut block = self.head.block.load(Ordering::Acquire); | 
| 290 |  |  | 
| 291 | 0 |         loop { | 
| 292 | 0 |             // Calculate the offset of the index into the block. | 
| 293 | 0 |             let offset = (head >> SHIFT) % LAP; | 
| 294 | 0 | 
 | 
| 295 | 0 |             // If we reached the end of the block, wait until the next one is installed. | 
| 296 | 0 |             if offset == BLOCK_CAP { | 
| 297 | 0 |                 backoff.snooze(); | 
| 298 | 0 |                 head = self.head.index.load(Ordering::Acquire); | 
| 299 | 0 |                 block = self.head.block.load(Ordering::Acquire); | 
| 300 | 0 |                 continue; | 
| 301 | 0 |             } | 
| 302 | 0 | 
 | 
| 303 | 0 |             let mut new_head = head + (1 << SHIFT); | 
| 304 | 0 | 
 | 
| 305 | 0 |             if new_head & HAS_NEXT == 0 { | 
| 306 | 0 |                 atomic::fence(Ordering::SeqCst); | 
| 307 | 0 |                 let tail = self.tail.index.load(Ordering::Relaxed); | 
| 308 | 0 | 
 | 
| 309 | 0 |                 // If the tail equals the head, that means the queue is empty. | 
| 310 | 0 |                 if head >> SHIFT == tail >> SHIFT { | 
| 311 | 0 |                     return None; | 
| 312 | 0 |                 } | 
| 313 | 0 | 
 | 
| 314 | 0 |                 // If head and tail are not in the same block, set `HAS_NEXT` in head. | 
| 315 | 0 |                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 
| 316 | 0 |                     new_head |= HAS_NEXT; | 
| 317 | 0 |                 } | 
| 318 | 0 |             } | 
| 319 |  |  | 
| 320 |  |             // The block can be null here only if the first push operation is in progress. In that | 
| 321 |  |             // case, just wait until it gets initialized. | 
| 322 | 0 |             if block.is_null() { | 
| 323 | 0 |                 backoff.snooze(); | 
| 324 | 0 |                 head = self.head.index.load(Ordering::Acquire); | 
| 325 | 0 |                 block = self.head.block.load(Ordering::Acquire); | 
| 326 | 0 |                 continue; | 
| 327 | 0 |             } | 
| 328 | 0 | 
 | 
| 329 | 0 |             // Try moving the head index forward. | 
| 330 | 0 |             match self.head.index.compare_exchange_weak( | 
| 331 | 0 |                 head, | 
| 332 | 0 |                 new_head, | 
| 333 | 0 |                 Ordering::SeqCst, | 
| 334 | 0 |                 Ordering::Acquire, | 
| 335 | 0 |             ) { | 
| 336 |  |                 Ok(_) => unsafe { | 
| 337 |  |                     // If we've reached the end of the block, move to the next one. | 
| 338 | 0 |                     if offset + 1 == BLOCK_CAP { | 
| 339 | 0 |                         let next = (*block).wait_next(); | 
| 340 | 0 |                         let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 
| 341 | 0 |                         if !(*next).next.load(Ordering::Relaxed).is_null() { | 
| 342 | 0 |                             next_index |= HAS_NEXT; | 
| 343 | 0 |                         } | 
| 344 |  |  | 
| 345 | 0 |                         self.head.block.store(next, Ordering::Release); | 
| 346 | 0 |                         self.head.index.store(next_index, Ordering::Release); | 
| 347 | 0 |                     } | 
| 348 |  |  | 
| 349 |  |                     // Read the value. | 
| 350 | 0 |                     let slot = (*block).slots.get_unchecked(offset); | 
| 351 | 0 |                     slot.wait_write(); | 
| 352 | 0 |                     let value = slot.value.get().read().assume_init(); | 
| 353 | 0 | 
 | 
| 354 | 0 |                     // Destroy the block if we've reached the end, or if another thread wanted to | 
| 355 | 0 |                     // destroy but couldn't because we were busy reading from the slot. | 
| 356 | 0 |                     if offset + 1 == BLOCK_CAP { | 
| 357 | 0 |                         Block::destroy(block, 0); | 
| 358 | 0 |                     } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 
| 359 | 0 |                         Block::destroy(block, offset + 1); | 
| 360 | 0 |                     } | 
| 361 |  |  | 
| 362 | 0 |                     return Some(value); | 
| 363 |  |                 }, | 
| 364 | 0 |                 Err(h) => { | 
| 365 | 0 |                     head = h; | 
| 366 | 0 |                     block = self.head.block.load(Ordering::Acquire); | 
| 367 | 0 |                     backoff.spin(); | 
| 368 | 0 |                 } | 
| 369 |  |             } | 
| 370 |  |         } | 
| 371 | 0 |     } Unexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::popUnexecuted instantiation: <crossbeam_queue::seg_queue::SegQueue<corosensei::stack::unix::DefaultStack>>::pop | 
| 372 |  |  | 
| 373 |  |     /// Returns `true` if the queue is empty. | 
| 374 |  |     /// | 
| 375 |  |     /// # Examples | 
| 376 |  |     /// | 
| 377 |  |     /// ``` | 
| 378 |  |     /// use crossbeam_queue::SegQueue; | 
| 379 |  |     /// | 
| 380 |  |     /// let q = SegQueue::new(); | 
| 381 |  |     /// | 
| 382 |  |     /// assert!(q.is_empty()); | 
| 383 |  |     /// q.push(1); | 
| 384 |  |     /// assert!(!q.is_empty()); | 
| 385 |  |     /// ``` | 
| 386 |  |     pub fn is_empty(&self) -> bool { | 
| 387 |  |         let head = self.head.index.load(Ordering::SeqCst); | 
| 388 |  |         let tail = self.tail.index.load(Ordering::SeqCst); | 
| 389 |  |         head >> SHIFT == tail >> SHIFT | 
| 390 |  |     } | 
| 391 |  |  | 
| 392 |  |     /// Returns the number of elements in the queue. | 
| 393 |  |     /// | 
| 394 |  |     /// # Examples | 
| 395 |  |     /// | 
| 396 |  |     /// ``` | 
| 397 |  |     /// use crossbeam_queue::SegQueue; | 
| 398 |  |     /// | 
| 399 |  |     /// let q = SegQueue::new(); | 
| 400 |  |     /// assert_eq!(q.len(), 0); | 
| 401 |  |     /// | 
| 402 |  |     /// q.push(10); | 
| 403 |  |     /// assert_eq!(q.len(), 1); | 
| 404 |  |     /// | 
| 405 |  |     /// q.push(20); | 
| 406 |  |     /// assert_eq!(q.len(), 2); | 
| 407 |  |     /// ``` | 
| 408 |  |     pub fn len(&self) -> usize { | 
| 409 |  |         loop { | 
| 410 |  |             // Load the tail index, then load the head index. | 
| 411 |  |             let mut tail = self.tail.index.load(Ordering::SeqCst); | 
| 412 |  |             let mut head = self.head.index.load(Ordering::SeqCst); | 
| 413 |  |  | 
| 414 |  |             // If the tail index didn't change, we've got consistent indices to work with. | 
| 415 |  |             if self.tail.index.load(Ordering::SeqCst) == tail { | 
| 416 |  |                 // Erase the lower bits. | 
| 417 |  |                 tail &= !((1 << SHIFT) - 1); | 
| 418 |  |                 head &= !((1 << SHIFT) - 1); | 
| 419 |  |  | 
| 420 |  |                 // Fix up indices if they fall onto block ends. | 
| 421 |  |                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { | 
| 422 |  |                     tail = tail.wrapping_add(1 << SHIFT); | 
| 423 |  |                 } | 
| 424 |  |                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { | 
| 425 |  |                     head = head.wrapping_add(1 << SHIFT); | 
| 426 |  |                 } | 
| 427 |  |  | 
| 428 |  |                 // Rotate indices so that head falls into the first block. | 
| 429 |  |                 let lap = (head >> SHIFT) / LAP; | 
| 430 |  |                 tail = tail.wrapping_sub((lap * LAP) << SHIFT); | 
| 431 |  |                 head = head.wrapping_sub((lap * LAP) << SHIFT); | 
| 432 |  |  | 
| 433 |  |                 // Remove the lower bits. | 
| 434 |  |                 tail >>= SHIFT; | 
| 435 |  |                 head >>= SHIFT; | 
| 436 |  |  | 
| 437 |  |                 // Return the difference minus the number of blocks between tail and head. | 
| 438 |  |                 return tail - head - tail / LAP; | 
| 439 |  |             } | 
| 440 |  |         } | 
| 441 |  |     } | 
| 442 |  | } | 
| 443 |  |  | 
| 444 |  | impl<T> Drop for SegQueue<T> { | 
| 445 |  |     fn drop(&mut self) { | 
| 446 |  |         let mut head = *self.head.index.get_mut(); | 
| 447 |  |         let mut tail = *self.tail.index.get_mut(); | 
| 448 |  |         let mut block = *self.head.block.get_mut(); | 
| 449 |  |  | 
| 450 |  |         // Erase the lower bits. | 
| 451 |  |         head &= !((1 << SHIFT) - 1); | 
| 452 |  |         tail &= !((1 << SHIFT) - 1); | 
| 453 |  |  | 
| 454 |  |         unsafe { | 
| 455 |  |             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 
| 456 |  |             while head != tail { | 
| 457 |  |                 let offset = (head >> SHIFT) % LAP; | 
| 458 |  |  | 
| 459 |  |                 if offset < BLOCK_CAP { | 
| 460 |  |                     // Drop the value in the slot. | 
| 461 |  |                     let slot = (*block).slots.get_unchecked(offset); | 
| 462 |  |                     (*slot.value.get()).assume_init_drop(); | 
| 463 |  |                 } else { | 
| 464 |  |                     // Deallocate the block and move to the next one. | 
| 465 |  |                     let next = *(*block).next.get_mut(); | 
| 466 |  |                     drop(Box::from_raw(block)); | 
| 467 |  |                     block = next; | 
| 468 |  |                 } | 
| 469 |  |  | 
| 470 |  |                 head = head.wrapping_add(1 << SHIFT); | 
| 471 |  |             } | 
| 472 |  |  | 
| 473 |  |             // Deallocate the last remaining block. | 
| 474 |  |             if !block.is_null() { | 
| 475 |  |                 drop(Box::from_raw(block)); | 
| 476 |  |             } | 
| 477 |  |         } | 
| 478 |  |     } | 
| 479 |  | } | 
| 480 |  |  | 
| 481 |  | impl<T> fmt::Debug for SegQueue<T> { | 
| 482 |  |     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
| 483 |  |         f.pad("SegQueue { .. }") | 
| 484 |  |     } | 
| 485 |  | } | 
| 486 |  |  | 
| 487 |  | impl<T> Default for SegQueue<T> { | 
| 488 |  |     fn default() -> SegQueue<T> { | 
| 489 |  |         SegQueue::new() | 
| 490 |  |     } | 
| 491 |  | } | 
| 492 |  |  | 
| 493 |  | impl<T> IntoIterator for SegQueue<T> { | 
| 494 |  |     type Item = T; | 
| 495 |  |  | 
| 496 |  |     type IntoIter = IntoIter<T>; | 
| 497 |  |  | 
| 498 |  |     fn into_iter(self) -> Self::IntoIter { | 
| 499 |  |         IntoIter { value: self } | 
| 500 |  |     } | 
| 501 |  | } | 
| 502 |  |  | 
| 503 |  | #[derive(Debug)] | 
| 504 |  | pub struct IntoIter<T> { | 
| 505 |  |     value: SegQueue<T>, | 
| 506 |  | } | 
| 507 |  |  | 
| 508 |  | impl<T> Iterator for IntoIter<T> { | 
| 509 |  |     type Item = T; | 
| 510 |  |  | 
| 511 |  |     fn next(&mut self) -> Option<Self::Item> { | 
| 512 |  |         let value = &mut self.value; | 
| 513 |  |         let head = *value.head.index.get_mut(); | 
| 514 |  |         let tail = *value.tail.index.get_mut(); | 
| 515 |  |         if head >> SHIFT == tail >> SHIFT { | 
| 516 |  |             None | 
| 517 |  |         } else { | 
| 518 |  |             let block = *value.head.block.get_mut(); | 
| 519 |  |             let offset = (head >> SHIFT) % LAP; | 
| 520 |  |  | 
| 521 |  |             // SAFETY: We have mutable access to this, so we can read without | 
| 522 |  |             // worrying about concurrency. Furthermore, we know this is | 
| 523 |  |             // initialized because it is the value pointed at by `value.head` | 
| 524 |  |             // and this is a non-empty queue. | 
| 525 |  |             let item = unsafe { | 
| 526 |  |                 let slot = (*block).slots.get_unchecked(offset); | 
| 527 |  |                 slot.value.get().read().assume_init() | 
| 528 |  |             }; | 
| 529 |  |             if offset + 1 == BLOCK_CAP { | 
| 530 |  |                 // Deallocate the block and move to the next one. | 
| 531 |  |                 // SAFETY: The block is initialized because we've been reading | 
| 532 |  |                 // from it this entire time. We can drop it b/c everything has | 
| 533 |  |                 // been read out of it, so nothing is pointing to it anymore. | 
| 534 |  |                 unsafe { | 
| 535 |  |                     let next = *(*block).next.get_mut(); | 
| 536 |  |                     drop(Box::from_raw(block)); | 
| 537 |  |                     *value.head.block.get_mut() = next; | 
| 538 |  |                 } | 
| 539 |  |                 // The last value in a block is empty, so skip it | 
| 540 |  |                 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); | 
| 541 |  |                 // Double-check that we're pointing to the first item in a block. | 
| 542 |  |                 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); | 
| 543 |  |             } else { | 
| 544 |  |                 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); | 
| 545 |  |             } | 
| 546 |  |             Some(item) | 
| 547 |  |         } | 
| 548 |  |     } | 
| 549 |  | } |