Coverage Report

Created: 2026-02-11 07:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/sharded-slab-0.1.7/src/shard.rs
Line
Count
Source
1
use crate::{
2
    cfg::{self, CfgPrivate},
3
    clear::Clear,
4
    page,
5
    sync::{
6
        alloc,
7
        atomic::{
8
            AtomicPtr, AtomicUsize,
9
            Ordering::{self, *},
10
        },
11
    },
12
    tid::Tid,
13
    Pack,
14
};
15
16
use std::{fmt, ptr, slice};
17
18
// ┌─────────────┐      ┌────────┐
19
// │ page 1      │      │        │
20
// ├─────────────┤ ┌───▶│  next──┼─┐
21
// │ page 2      │ │    ├────────┤ │
22
// │             │ │    │XXXXXXXX│ │
23
// │ local_free──┼─┘    ├────────┤ │
24
// │ global_free─┼─┐    │        │◀┘
25
// ├─────────────┤ └───▶│  next──┼─┐
26
// │   page 3    │      ├────────┤ │
27
// └─────────────┘      │XXXXXXXX│ │
28
//       ...            ├────────┤ │
29
// ┌─────────────┐      │XXXXXXXX│ │
30
// │ page n      │      ├────────┤ │
31
// └─────────────┘      │        │◀┘
32
//                      │  next──┼───▶
33
//                      ├────────┤
34
//                      │XXXXXXXX│
35
//                      └────────┘
36
//                         ...
37
pub(crate) struct Shard<T, C: cfg::Config> {
38
    /// The shard's parent thread ID.
39
    pub(crate) tid: usize,
40
    /// The local free list for each page.
41
    ///
42
    /// These are only ever accessed from this shard's thread, so they are
43
    /// stored separately from the shared state for the page that can be
44
    /// accessed concurrently, to minimize false sharing.
45
    local: Box<[page::Local]>,
46
    /// The shared state for each page in this shard.
47
    ///
48
    /// This consists of the page's metadata (size, previous size), remote free
49
    /// list, and a pointer to the actual array backing that page.
50
    shared: Box<[page::Shared<T, C>]>,
51
}
52
53
pub(crate) struct Array<T, C: cfg::Config> {
54
    shards: Box<[Ptr<T, C>]>,
55
    max: AtomicUsize,
56
}
57
58
#[derive(Debug)]
59
struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
60
61
#[derive(Debug)]
62
pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
63
64
// === impl Shard ===
65
66
impl<T, C> Shard<T, C>
67
where
68
    C: cfg::Config,
69
{
70
    #[inline(always)]
71
0
    pub(crate) fn with_slot<'a, U>(
72
0
        &'a self,
73
0
        idx: usize,
74
0
        f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
75
0
    ) -> Option<U> {
76
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
77
0
        let (addr, page_index) = page::indices::<C>(idx);
78
79
0
        test_println!("-> {:?}", addr);
80
0
        if page_index >= self.shared.len() {
81
0
            return None;
82
0
        }
83
84
0
        self.shared[page_index].with_slot(addr, f)
85
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::with_slot::<sharded_slab::page::slot::Guard<tracing_subscriber::registry::sharded::DataInner>, <sharded_slab::pool::Pool<tracing_subscriber::registry::sharded::DataInner>>::get::{closure#0}>
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::with_slot::<_, _>
86
87
0
    pub(crate) fn new(tid: usize) -> Self {
88
0
        let mut total_sz = 0;
89
0
        let shared = (0..C::MAX_PAGES)
90
0
            .map(|page_num| {
91
0
                let sz = C::page_size(page_num);
92
0
                let prev_sz = total_sz;
93
0
                total_sz += sz;
94
0
                page::Shared::new(sz, prev_sz)
95
0
            })
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::new::{closure#0}
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::new::{closure#0}
96
0
            .collect();
97
0
        let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::new::{closure#1}
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::new::{closure#1}
98
0
        Self { tid, local, shared }
99
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::new
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::new
100
}
101
102
impl<T, C> Shard<Option<T>, C>
103
where
104
    C: cfg::Config,
105
{
106
    /// Remove an item on the shard's local thread.
107
0
    pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
108
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
109
0
        let (addr, page_index) = page::indices::<C>(idx);
110
111
0
        test_println!("-> remove_local {:?}", addr);
112
113
0
        self.shared
114
0
            .get(page_index)?
115
0
            .take(addr, C::unpack_gen(idx), self.local(page_index))
116
0
    }
117
118
    /// Remove an item, while on a different thread from the shard's local thread.
119
0
    pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
120
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
121
0
        debug_assert!(Tid::<C>::current().as_usize() != self.tid);
122
123
0
        let (addr, page_index) = page::indices::<C>(idx);
124
125
0
        test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
126
127
0
        let shared = self.shared.get(page_index)?;
128
0
        shared.take(addr, C::unpack_gen(idx), shared.free_list())
129
0
    }
130
131
0
    pub(crate) fn remove_local(&self, idx: usize) -> bool {
132
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
133
0
        let (addr, page_index) = page::indices::<C>(idx);
134
135
0
        if page_index >= self.shared.len() {
136
0
            return false;
137
0
        }
138
139
0
        self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
140
0
    }
141
142
0
    pub(crate) fn remove_remote(&self, idx: usize) -> bool {
143
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
144
0
        let (addr, page_index) = page::indices::<C>(idx);
145
146
0
        if page_index >= self.shared.len() {
147
0
            return false;
148
0
        }
149
150
0
        let shared = &self.shared[page_index];
151
0
        shared.remove(addr, C::unpack_gen(idx), shared.free_list())
152
0
    }
153
154
0
    pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
155
0
        self.shared.iter()
156
0
    }
157
}
158
159
impl<T, C> Shard<T, C>
160
where
161
    T: Clear + Default,
162
    C: cfg::Config,
163
{
164
0
    pub(crate) fn init_with<U>(
165
0
        &self,
166
0
        mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
167
0
    ) -> Option<U> {
168
        // Can we fit the value into an exist`ing page?
169
0
        for (page_idx, page) in self.shared.iter().enumerate() {
170
0
            let local = self.local(page_idx);
171
172
0
            test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
173
174
0
            if let Some(res) = page.init_with(local, &mut init) {
175
0
                return Some(res);
176
0
            }
177
        }
178
179
0
        None
180
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::init_with::<(usize, sharded_slab::page::slot::InitGuard<tracing_subscriber::registry::sharded::DataInner>), <sharded_slab::pool::Pool<tracing_subscriber::registry::sharded::DataInner>>::create::{closure#0}>
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::init_with::<_, _>
181
182
0
    pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
183
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
184
0
        let (addr, page_index) = page::indices::<C>(idx);
185
186
0
        if page_index >= self.shared.len() {
187
0
            return false;
188
0
        }
189
190
0
        self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
191
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::mark_clear_local
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::mark_clear_local
192
193
0
    pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
194
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
195
0
        let (addr, page_index) = page::indices::<C>(idx);
196
197
0
        if page_index >= self.shared.len() {
198
0
            return false;
199
0
        }
200
201
0
        let shared = &self.shared[page_index];
202
0
        shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
203
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::mark_clear_remote
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::mark_clear_remote
204
205
0
    pub(crate) fn clear_after_release(&self, idx: usize) {
206
0
        crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
207
0
        let tid = Tid::<C>::current().as_usize();
208
0
        test_println!(
209
            "-> clear_after_release; self.tid={:?}; current.tid={:?};",
210
            tid,
211
            self.tid
212
        );
213
0
        if tid == self.tid {
214
0
            self.clear_local(idx);
215
0
        } else {
216
0
            self.clear_remote(idx);
217
0
        }
218
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::clear_after_release
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::clear_after_release
219
220
0
    fn clear_local(&self, idx: usize) -> bool {
221
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
222
0
        let (addr, page_index) = page::indices::<C>(idx);
223
224
0
        if page_index >= self.shared.len() {
225
0
            return false;
226
0
        }
227
228
0
        self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
229
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::clear_local
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::clear_local
230
231
0
    fn clear_remote(&self, idx: usize) -> bool {
232
0
        debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
233
0
        let (addr, page_index) = page::indices::<C>(idx);
234
235
0
        if page_index >= self.shared.len() {
236
0
            return false;
237
0
        }
238
239
0
        let shared = &self.shared[page_index];
240
0
        shared.clear(addr, C::unpack_gen(idx), shared.free_list())
241
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::clear_remote
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::clear_remote
242
243
    #[inline(always)]
244
0
    fn local(&self, i: usize) -> &page::Local {
245
        #[cfg(debug_assertions)]
246
        debug_assert_eq_in_drop!(
247
            Tid::<C>::current().as_usize(),
248
            self.tid,
249
            "tried to access local data from another thread!"
250
        );
251
252
0
        &self.local[i]
253
0
    }
Unexecuted instantiation: <sharded_slab::shard::Shard<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::local
Unexecuted instantiation: <sharded_slab::shard::Shard<_, _>>::local
254
}
255
256
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
257
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258
0
        let mut d = f.debug_struct("Shard");
259
260
        #[cfg(debug_assertions)]
261
        d.field("tid", &self.tid);
262
0
        d.field("shared", &self.shared).finish()
263
0
    }
264
}
265
266
// === impl Array ===
267
268
impl<T, C> Array<T, C>
269
where
270
    C: cfg::Config,
271
{
272
0
    pub(crate) fn new() -> Self {
273
0
        let mut shards = Vec::with_capacity(C::MAX_SHARDS);
274
0
        for _ in 0..C::MAX_SHARDS {
275
0
            // XXX(eliza): T_T this could be avoided with maybeuninit or something...
276
0
            shards.push(Ptr::null());
277
0
        }
278
0
        Self {
279
0
            shards: shards.into(),
280
0
            max: AtomicUsize::new(0),
281
0
        }
282
0
    }
Unexecuted instantiation: <sharded_slab::shard::Array<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::new
Unexecuted instantiation: <sharded_slab::shard::Array<_, _>>::new
283
284
    #[inline]
285
0
    pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
286
0
        test_println!("-> get shard={}", idx);
287
0
        self.shards.get(idx)?.load(Acquire)
288
0
    }
Unexecuted instantiation: <sharded_slab::shard::Array<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::get
Unexecuted instantiation: <sharded_slab::shard::Array<_, _>>::get
289
290
    #[inline]
291
0
    pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
292
0
        let tid = Tid::<C>::current();
293
0
        test_println!("current: {:?}", tid);
294
0
        let idx = tid.as_usize();
295
0
        assert!(
296
0
            idx < self.shards.len(),
297
0
            "Thread count overflowed the configured max count. \
298
0
            Thread index = {}, max threads = {}.",
299
            idx,
300
            C::MAX_SHARDS,
301
        );
302
        // It's okay for this to be relaxed. The value is only ever stored by
303
        // the thread that corresponds to the index, and we are that thread.
304
0
        let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
305
0
            let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
306
0
            test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
307
0
            self.shards[idx].set(ptr);
308
0
            let mut max = self.max.load(Acquire);
309
0
            while max < idx {
310
0
                match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
311
0
                    Ok(_) => break,
312
0
                    Err(actual) => max = actual,
313
                }
314
            }
315
0
            test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
316
0
            unsafe {
317
0
                // Safety: we just put it there!
318
0
                &*ptr
319
0
            }
320
0
            .get_ref()
321
0
        });
Unexecuted instantiation: <sharded_slab::shard::Array<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::current::{closure#0}
Unexecuted instantiation: <sharded_slab::shard::Array<_, _>>::current::{closure#0}
322
0
        (tid, shard)
323
0
    }
Unexecuted instantiation: <sharded_slab::shard::Array<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::current
Unexecuted instantiation: <sharded_slab::shard::Array<_, _>>::current
324
325
0
    pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
326
0
        test_println!("Array::iter_mut");
327
0
        let max = self.max.load(Acquire);
328
0
        test_println!("-> highest index={}", max);
329
0
        IterMut(self.shards[0..=max].iter_mut())
330
0
    }
331
}
332
333
impl<T, C: cfg::Config> Drop for Array<T, C> {
334
0
    fn drop(&mut self) {
335
        // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
336
0
        let max = self.max.load(Acquire);
337
0
        for shard in &self.shards[0..=max] {
338
            // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
339
0
            let ptr = shard.0.load(Acquire);
340
0
            if ptr.is_null() {
341
0
                continue;
342
0
            }
343
0
            let shard = unsafe {
344
                // Safety: this is the only place where these boxes are
345
                // deallocated, and we have exclusive access to the shard array,
346
                // because...we are dropping it...
347
0
                Box::from_raw(ptr)
348
            };
349
0
            drop(shard)
350
        }
351
0
    }
Unexecuted instantiation: <sharded_slab::shard::Array<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <sharded_slab::shard::Array<_, _> as core::ops::drop::Drop>::drop
352
}
353
354
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
355
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356
0
        let max = self.max.load(Acquire);
357
0
        let mut set = f.debug_map();
358
0
        for shard in &self.shards[0..=max] {
359
0
            let ptr = shard.0.load(Acquire);
360
0
            if let Some(shard) = ptr::NonNull::new(ptr) {
361
0
                set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
362
0
            } else {
363
0
                set.entry(&format_args!("{:p}", ptr), &());
364
0
            }
365
        }
366
0
        set.finish()
367
0
    }
368
}
369
370
// === impl Ptr ===
371
372
impl<T, C: cfg::Config> Ptr<T, C> {
373
    #[inline]
374
0
    fn null() -> Self {
375
0
        Self(AtomicPtr::new(ptr::null_mut()))
376
0
    }
Unexecuted instantiation: <sharded_slab::shard::Ptr<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::null
Unexecuted instantiation: <sharded_slab::shard::Ptr<_, _>>::null
377
378
    #[inline]
379
0
    fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
380
0
        let ptr = self.0.load(order);
381
0
        test_println!("---> loaded={:p} (order={:?})", ptr, order);
382
0
        if ptr.is_null() {
383
0
            test_println!("---> null");
384
0
            return None;
385
0
        }
386
0
        let track = unsafe {
387
            // Safety: The returned reference will have the same lifetime as the
388
            // reference to the shard pointer, which (morally, if not actually)
389
            // owns the shard. The shard is only deallocated when the shard
390
            // array is dropped, and it won't be dropped while this pointer is
391
            // borrowed --- and the returned reference has the same lifetime.
392
            //
393
            // We know that the pointer is not null, because we just
394
            // null-checked it immediately prior.
395
0
            &*ptr
396
        };
397
398
0
        Some(track.get_ref())
399
0
    }
Unexecuted instantiation: <sharded_slab::shard::Ptr<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::load
Unexecuted instantiation: <sharded_slab::shard::Ptr<_, _>>::load
400
401
    #[inline]
402
0
    fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
403
0
        self.0
404
0
            .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
405
0
            .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
406
0
    }
Unexecuted instantiation: <sharded_slab::shard::Ptr<tracing_subscriber::registry::sharded::DataInner, sharded_slab::cfg::DefaultConfig>>::set
Unexecuted instantiation: <sharded_slab::shard::Ptr<_, _>>::set
407
}
408
409
// === Iterators ===
410
411
impl<'a, T, C> Iterator for IterMut<'a, T, C>
412
where
413
    T: 'a,
414
    C: cfg::Config + 'a,
415
{
416
    type Item = &'a Shard<T, C>;
417
0
    fn next(&mut self) -> Option<Self::Item> {
418
0
        test_println!("IterMut::next");
419
        loop {
420
            // Skip over empty indices if they are less than the highest
421
            // allocated shard. Some threads may have accessed the slab
422
            // (generating a thread ID) but never actually inserted data, so
423
            // they may have never allocated a shard.
424
0
            let next = self.0.next();
425
0
            test_println!("-> next.is_some={}", next.is_some());
426
0
            if let Some(shard) = next?.load(Acquire) {
427
0
                test_println!("-> done");
428
0
                return Some(shard);
429
0
            }
430
        }
431
0
    }
432
}