Coverage Report

Created: 2026-01-10 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/crossbeam-epoch-0.9.18/src/internal.rs
Line
Count
Source
1
//! The global data and participant for garbage collection.
2
//!
3
//! # Registration
4
//!
5
//! In order to track all participants in one place, we need some form of participant
6
//! registration. When a participant is created, it is registered to a global lock-free
7
//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8
//! list.
9
//!
10
//! # Pinning
11
//!
12
//! Every participant contains an integer that tells whether the participant is pinned and if so,
13
//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14
//! aids in periodic global epoch advancement.
15
//!
16
//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17
//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18
//!
19
//! # Thread-local bag
20
//!
21
//! Objects that get unlinked from concurrent data structures must be stashed away until the global
22
//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23
//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24
//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25
//! for amortizing the synchronization cost of pushing the garbages to a global queue.
26
//!
27
//! # Global queue
28
//!
29
//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30
//! destroyed along the way. This design reduces contention on data structures. The global queue
31
//! cannot be explicitly accessed: the only way to interact with it is by calling functions
32
//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33
//! garbage collection.
34
//!
35
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
36
//! destroyed as soon as the data structure gets dropped.
37
38
use crate::primitive::cell::UnsafeCell;
39
use crate::primitive::sync::atomic::{self, Ordering};
40
use core::cell::Cell;
41
use core::mem::{self, ManuallyDrop};
42
use core::num::Wrapping;
43
use core::{fmt, ptr};
44
45
use crossbeam_utils::CachePadded;
46
47
use crate::atomic::{Owned, Shared};
48
use crate::collector::{Collector, LocalHandle};
49
use crate::deferred::Deferred;
50
use crate::epoch::{AtomicEpoch, Epoch};
51
use crate::guard::{unprotected, Guard};
52
use crate::sync::list::{Entry, IsElement, IterError, List};
53
use crate::sync::queue::Queue;
54
55
/// Maximum number of objects a bag can contain.
56
#[cfg(not(any(crossbeam_sanitize, miri)))]
57
const MAX_OBJECTS: usize = 64;
58
// Makes it more likely to trigger any potential data races.
59
#[cfg(any(crossbeam_sanitize, miri))]
60
const MAX_OBJECTS: usize = 4;
61
62
/// A bag of deferred functions.
63
pub(crate) struct Bag {
64
    /// Stashed objects.
65
    deferreds: [Deferred; MAX_OBJECTS],
66
    len: usize,
67
}
68
69
/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
70
unsafe impl Send for Bag {}
71
72
impl Bag {
73
    /// Returns a new, empty bag.
74
100k
    pub(crate) fn new() -> Self {
75
100k
        Self::default()
76
100k
    }
77
78
    /// Returns `true` if the bag is empty.
79
0
    pub(crate) fn is_empty(&self) -> bool {
80
0
        self.len == 0
81
0
    }
82
83
    /// Attempts to insert a deferred function into the bag.
84
    ///
85
    /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
86
    /// full.
87
    ///
88
    /// # Safety
89
    ///
90
    /// It should be safe for another thread to execute the given function.
91
100k
    pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
92
100k
        if self.len < MAX_OBJECTS {
93
100k
            self.deferreds[self.len] = deferred;
94
100k
            self.len += 1;
95
100k
            Ok(())
96
        } else {
97
0
            Err(deferred)
98
        }
99
100k
    }
100
101
    /// Seals the bag with the given epoch.
102
50.3k
    fn seal(self, epoch: Epoch) -> SealedBag {
103
50.3k
        SealedBag { epoch, _bag: self }
104
50.3k
    }
105
}
106
107
impl Default for Bag {
108
100k
    fn default() -> Self {
109
100k
        Bag {
110
100k
            len: 0,
111
100k
            deferreds: [Deferred::NO_OP; MAX_OBJECTS],
112
100k
        }
113
100k
    }
114
}
115
116
impl Drop for Bag {
117
100k
    fn drop(&mut self) {
118
        // Call all deferred functions.
119
100k
        for deferred in &mut self.deferreds[..self.len] {
120
100k
            let no_op = Deferred::NO_OP;
121
100k
            let owned_deferred = mem::replace(deferred, no_op);
122
100k
            owned_deferred.call();
123
100k
        }
124
100k
    }
125
}
126
127
// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
128
impl fmt::Debug for Bag {
129
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130
0
        f.debug_struct("Bag")
131
0
            .field("deferreds", &&self.deferreds[..self.len])
132
0
            .finish()
133
0
    }
134
}
135
136
/// A pair of an epoch and a bag.
137
#[derive(Default, Debug)]
138
struct SealedBag {
139
    epoch: Epoch,
140
    _bag: Bag,
141
}
142
143
/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
144
unsafe impl Sync for SealedBag {}
145
146
impl SealedBag {
147
    /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
148
82.2k
    fn is_expired(&self, global_epoch: Epoch) -> bool {
149
        // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
150
        // is within one epoch of the current one cannot be destroyed yet.
151
82.2k
        global_epoch.wrapping_sub(self.epoch) >= 2
152
82.2k
    }
153
}
154
155
/// The global data for a garbage collector.
156
pub(crate) struct Global {
157
    /// The intrusive linked list of `Local`s.
158
    locals: List<Local>,
159
160
    /// The global queue of bags of deferred functions.
161
    queue: Queue<SealedBag>,
162
163
    /// The global epoch.
164
    pub(crate) epoch: CachePadded<AtomicEpoch>,
165
}
166
167
impl Global {
168
    /// Number of bags to destroy.
169
    const COLLECT_STEPS: usize = 8;
170
171
    /// Creates a new global data for garbage collection.
172
    #[inline]
173
2
    pub(crate) fn new() -> Self {
174
2
        Self {
175
2
            locals: List::new(),
176
2
            queue: Queue::new(),
177
2
            epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
178
2
        }
179
2
    }
180
181
    /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
182
50.3k
    pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
183
50.3k
        let bag = mem::replace(bag, Bag::new());
184
185
50.3k
        atomic::fence(Ordering::SeqCst);
186
187
50.3k
        let epoch = self.epoch.load(Ordering::Relaxed);
188
50.3k
        self.queue.push(bag.seal(epoch), guard);
189
50.3k
    }
190
191
    /// Collects several bags from the global queue and executes deferred functions in them.
192
    ///
193
    /// Note: This may itself produce garbage and in turn allocate new bags.
194
    ///
195
    /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
196
    /// path. In other words, we want the compiler to optimize branching for the case when
197
    /// `collect()` is not called.
198
    #[cold]
199
878k
    pub(crate) fn collect(&self, guard: &Guard) {
200
878k
        let global_epoch = self.try_advance(guard);
201
202
878k
        let steps = if cfg!(crossbeam_sanitize) {
203
0
            usize::max_value()
204
        } else {
205
878k
            Self::COLLECT_STEPS
206
        };
207
208
878k
        for _ in 0..steps {
209
925k
            match self.queue.try_pop_if(
210
82.2k
                &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
211
925k
                guard,
212
            ) {
213
875k
                None => break,
214
50.2k
                Some(sealed_bag) => drop(sealed_bag),
215
            }
216
        }
217
878k
    }
218
219
    /// Attempts to advance the global epoch.
220
    ///
221
    /// The global epoch can advance only if all currently pinned participants have been pinned in
222
    /// the current epoch.
223
    ///
224
    /// Returns the current global epoch.
225
    ///
226
    /// `try_advance()` is annotated `#[cold]` because it is rarely called.
227
    #[cold]
228
878k
    pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
229
878k
        let global_epoch = self.epoch.load(Ordering::Relaxed);
230
878k
        atomic::fence(Ordering::SeqCst);
231
232
        // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
233
        // easy to implement in a lock-free manner. However, traversal can be slow due to cache
234
        // misses and data dependencies. We should experiment with other data structures as well.
235
24.3M
        for local in self.locals.iter(guard) {
236
24.3M
            match local {
237
                Err(IterError::Stalled) => {
238
                    // A concurrent thread stalled this iteration. That thread might also try to
239
                    // advance the epoch, in which case we leave the job to it. Otherwise, the
240
                    // epoch will not be advanced.
241
1.47k
                    return global_epoch;
242
                }
243
24.3M
                Ok(local) => {
244
24.3M
                    let local_epoch = local.epoch.load(Ordering::Relaxed);
245
246
                    // If the participant was pinned in a different epoch, we cannot advance the
247
                    // global epoch just yet.
248
24.3M
                    if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
249
252k
                        return global_epoch;
250
24.0M
                    }
251
                }
252
            }
253
        }
254
623k
        atomic::fence(Ordering::Acquire);
255
256
        // All pinned participants were pinned in the current global epoch.
257
        // Now let's advance the global epoch...
258
        //
259
        // Note that if another thread already advanced it before us, this store will simply
260
        // overwrite the global epoch with the same value. This is true because `try_advance` was
261
        // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
262
        // advanced two steps ahead of it.
263
623k
        let new_epoch = global_epoch.successor();
264
623k
        self.epoch.store(new_epoch, Ordering::Release);
265
623k
        new_epoch
266
878k
    }
267
}
268
269
/// Participant for garbage collection.
270
#[repr(C)] // Note: `entry` must be the first field
271
pub(crate) struct Local {
272
    /// A node in the intrusive linked list of `Local`s.
273
    entry: Entry,
274
275
    /// A reference to the global data.
276
    ///
277
    /// When all guards and handles get dropped, this reference is destroyed.
278
    collector: UnsafeCell<ManuallyDrop<Collector>>,
279
280
    /// The local bag of deferred functions.
281
    pub(crate) bag: UnsafeCell<Bag>,
282
283
    /// The number of guards keeping this participant pinned.
284
    guard_count: Cell<usize>,
285
286
    /// The number of active handles.
287
    handle_count: Cell<usize>,
288
289
    /// Total number of pinnings performed.
290
    ///
291
    /// This is just an auxiliary counter that sometimes kicks off collection.
292
    pin_count: Cell<Wrapping<usize>>,
293
294
    /// The local epoch.
295
    epoch: CachePadded<AtomicEpoch>,
296
}
297
298
// Make sure `Local` is less than or equal to 2048 bytes.
299
// https://github.com/crossbeam-rs/crossbeam/issues/551
300
#[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
301
#[test]
302
fn local_size() {
303
    // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
304
    // assert!(
305
    //     core::mem::size_of::<Local>() <= 2048,
306
    //     "An allocation of `Local` should be <= 2048 bytes."
307
    // );
308
}
309
310
impl Local {
311
    /// Number of pinnings after which a participant will execute some deferred functions from the
312
    /// global queue.
313
    const PINNINGS_BETWEEN_COLLECT: usize = 128;
314
315
    /// Registers a new `Local` in the provided `Global`.
316
50.3k
    pub(crate) fn register(collector: &Collector) -> LocalHandle {
317
        unsafe {
318
            // Since we dereference no pointers in this block, it is safe to use `unprotected`.
319
320
50.3k
            let local = Owned::new(Local {
321
50.3k
                entry: Entry::default(),
322
50.3k
                collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
323
50.3k
                bag: UnsafeCell::new(Bag::new()),
324
50.3k
                guard_count: Cell::new(0),
325
50.3k
                handle_count: Cell::new(1),
326
50.3k
                pin_count: Cell::new(Wrapping(0)),
327
50.3k
                epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
328
50.3k
            })
329
50.3k
            .into_shared(unprotected());
330
50.3k
            collector.global.locals.insert(local, unprotected());
331
50.3k
            LocalHandle {
332
50.3k
                local: local.as_raw(),
333
50.3k
            }
334
        }
335
50.3k
    }
336
337
    /// Returns a reference to the `Global` in which this `Local` resides.
338
    #[inline]
339
109M
    pub(crate) fn global(&self) -> &Global {
340
109M
        &self.collector().global
341
109M
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
339
109M
    pub(crate) fn global(&self) -> &Global {
340
109M
        &self.collector().global
341
109M
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
339
100k
    pub(crate) fn global(&self) -> &Global {
340
100k
        &self.collector().global
341
100k
    }
342
343
    /// Returns a reference to the `Collector` in which this `Local` resides.
344
    #[inline]
345
109M
    pub(crate) fn collector(&self) -> &Collector {
346
109M
        self.collector.with(|c| unsafe { &**c })
347
109M
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
345
109M
    pub(crate) fn collector(&self) -> &Collector {
346
109M
        self.collector.with(|c| unsafe { &**c })
347
109M
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
345
100k
    pub(crate) fn collector(&self) -> &Collector {
346
100k
        self.collector.with(|c| unsafe { &**c })
347
100k
    }
348
349
    /// Returns `true` if the current participant is pinned.
350
    #[inline]
351
108M
    pub(crate) fn is_pinned(&self) -> bool {
352
108M
        self.guard_count.get() > 0
353
108M
    }
<crossbeam_epoch::internal::Local>::is_pinned
Line
Count
Source
351
108M
    pub(crate) fn is_pinned(&self) -> bool {
352
108M
        self.guard_count.get() > 0
353
108M
    }
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::is_pinned
354
355
    /// Adds `deferred` to the thread-local bag.
356
    ///
357
    /// # Safety
358
    ///
359
    /// It should be safe for another thread to execute the given function.
360
100k
    pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
361
100k
        let bag = self.bag.with_mut(|b| &mut *b);
362
363
100k
        while let Err(d) = bag.try_push(deferred) {
364
0
            self.global().push_bag(bag, guard);
365
0
            deferred = d;
366
0
        }
367
100k
    }
368
369
0
    pub(crate) fn flush(&self, guard: &Guard) {
370
0
        let bag = self.bag.with_mut(|b| unsafe { &mut *b });
371
372
0
        if !bag.is_empty() {
373
0
            self.global().push_bag(bag, guard);
374
0
        }
375
376
0
        self.global().collect(guard);
377
0
    }
378
379
    /// Pins the `Local`.
380
    #[inline]
381
108M
    pub(crate) fn pin(&self) -> Guard {
382
108M
        let guard = Guard { local: self };
383
384
108M
        let guard_count = self.guard_count.get();
385
108M
        self.guard_count.set(guard_count.checked_add(1).unwrap());
386
387
108M
        if guard_count == 0 {
388
108M
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
389
108M
            let new_epoch = global_epoch.pinned();
390
391
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
392
            // The fence makes sure that any future loads from `Atomic`s will not happen before
393
            // this store.
394
108M
            if cfg!(all(
395
                any(target_arch = "x86", target_arch = "x86_64"),
396
                not(miri)
397
            )) {
398
                // HACK(stjepang): On x86 architectures there are two different ways of executing
399
                // a `SeqCst` fence.
400
                //
401
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
402
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
403
                //    instruction.
404
                //
405
                // Both instructions have the effect of a full barrier, but benchmarks have shown
406
                // that the second one makes pinning faster in this particular case.  It is not
407
                // clear that this is permitted by the C++ memory model (SC fences work very
408
                // differently from SC accesses), but experimental evidence suggests that this
409
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
410
                // but alas, that is not possible on stable Rust.
411
108M
                let current = Epoch::starting();
412
108M
                let res = self.epoch.compare_exchange(
413
108M
                    current,
414
108M
                    new_epoch,
415
108M
                    Ordering::SeqCst,
416
108M
                    Ordering::SeqCst,
417
108M
                );
418
108M
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
419
                // We add a compiler fence to make it less likely for LLVM to do something wrong
420
                // here.  Formally, this is not enough to get rid of data races; practically,
421
                // it should go a long way.
422
108M
                atomic::compiler_fence(Ordering::SeqCst);
423
0
            } else {
424
0
                self.epoch.store(new_epoch, Ordering::Relaxed);
425
0
                atomic::fence(Ordering::SeqCst);
426
0
            }
427
428
            // Increment the pin counter.
429
108M
            let count = self.pin_count.get();
430
108M
            self.pin_count.set(count + Wrapping(1));
431
432
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
433
            // some garbage.
434
108M
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
435
878k
                self.global().collect(&guard);
436
107M
            }
437
0
        }
438
439
108M
        guard
440
108M
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
381
108M
    pub(crate) fn pin(&self) -> Guard {
382
108M
        let guard = Guard { local: self };
383
384
108M
        let guard_count = self.guard_count.get();
385
108M
        self.guard_count.set(guard_count.checked_add(1).unwrap());
386
387
108M
        if guard_count == 0 {
388
108M
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
389
108M
            let new_epoch = global_epoch.pinned();
390
391
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
392
            // The fence makes sure that any future loads from `Atomic`s will not happen before
393
            // this store.
394
108M
            if cfg!(all(
395
                any(target_arch = "x86", target_arch = "x86_64"),
396
                not(miri)
397
            )) {
398
                // HACK(stjepang): On x86 architectures there are two different ways of executing
399
                // a `SeqCst` fence.
400
                //
401
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
402
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
403
                //    instruction.
404
                //
405
                // Both instructions have the effect of a full barrier, but benchmarks have shown
406
                // that the second one makes pinning faster in this particular case.  It is not
407
                // clear that this is permitted by the C++ memory model (SC fences work very
408
                // differently from SC accesses), but experimental evidence suggests that this
409
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
410
                // but alas, that is not possible on stable Rust.
411
108M
                let current = Epoch::starting();
412
108M
                let res = self.epoch.compare_exchange(
413
108M
                    current,
414
108M
                    new_epoch,
415
108M
                    Ordering::SeqCst,
416
108M
                    Ordering::SeqCst,
417
108M
                );
418
108M
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
419
                // We add a compiler fence to make it less likely for LLVM to do something wrong
420
                // here.  Formally, this is not enough to get rid of data races; practically,
421
                // it should go a long way.
422
108M
                atomic::compiler_fence(Ordering::SeqCst);
423
0
            } else {
424
0
                self.epoch.store(new_epoch, Ordering::Relaxed);
425
0
                atomic::fence(Ordering::SeqCst);
426
0
            }
427
428
            // Increment the pin counter.
429
108M
            let count = self.pin_count.get();
430
108M
            self.pin_count.set(count + Wrapping(1));
431
432
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
433
            // some garbage.
434
108M
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
435
878k
                self.global().collect(&guard);
436
107M
            }
437
0
        }
438
439
108M
        guard
440
108M
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
381
50.3k
    pub(crate) fn pin(&self) -> Guard {
382
50.3k
        let guard = Guard { local: self };
383
384
50.3k
        let guard_count = self.guard_count.get();
385
50.3k
        self.guard_count.set(guard_count.checked_add(1).unwrap());
386
387
50.3k
        if guard_count == 0 {
388
50.3k
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
389
50.3k
            let new_epoch = global_epoch.pinned();
390
391
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
392
            // The fence makes sure that any future loads from `Atomic`s will not happen before
393
            // this store.
394
50.3k
            if cfg!(all(
395
                any(target_arch = "x86", target_arch = "x86_64"),
396
                not(miri)
397
            )) {
398
                // HACK(stjepang): On x86 architectures there are two different ways of executing
399
                // a `SeqCst` fence.
400
                //
401
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
402
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
403
                //    instruction.
404
                //
405
                // Both instructions have the effect of a full barrier, but benchmarks have shown
406
                // that the second one makes pinning faster in this particular case.  It is not
407
                // clear that this is permitted by the C++ memory model (SC fences work very
408
                // differently from SC accesses), but experimental evidence suggests that this
409
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
410
                // but alas, that is not possible on stable Rust.
411
50.3k
                let current = Epoch::starting();
412
50.3k
                let res = self.epoch.compare_exchange(
413
50.3k
                    current,
414
50.3k
                    new_epoch,
415
50.3k
                    Ordering::SeqCst,
416
50.3k
                    Ordering::SeqCst,
417
50.3k
                );
418
50.3k
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
419
                // We add a compiler fence to make it less likely for LLVM to do something wrong
420
                // here.  Formally, this is not enough to get rid of data races; practically,
421
                // it should go a long way.
422
50.3k
                atomic::compiler_fence(Ordering::SeqCst);
423
0
            } else {
424
0
                self.epoch.store(new_epoch, Ordering::Relaxed);
425
0
                atomic::fence(Ordering::SeqCst);
426
0
            }
427
428
            // Increment the pin counter.
429
50.3k
            let count = self.pin_count.get();
430
50.3k
            self.pin_count.set(count + Wrapping(1));
431
432
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
433
            // some garbage.
434
50.3k
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
435
12
                self.global().collect(&guard);
436
50.3k
            }
437
0
        }
438
439
50.3k
        guard
440
50.3k
    }
441
442
    /// Unpins the `Local`.
443
    #[inline]
444
108M
    pub(crate) fn unpin(&self) {
445
108M
        let guard_count = self.guard_count.get();
446
108M
        self.guard_count.set(guard_count - 1);
447
448
108M
        if guard_count == 1 {
449
108M
            self.epoch.store(Epoch::starting(), Ordering::Release);
450
451
108M
            if self.handle_count.get() == 0 {
452
0
                self.finalize();
453
108M
            }
454
0
        }
455
108M
    }
456
457
    /// Unpins and then pins the `Local`.
458
    #[inline]
459
0
    pub(crate) fn repin(&self) {
460
0
        let guard_count = self.guard_count.get();
461
462
        // Update the local epoch only if there's only one guard.
463
0
        if guard_count == 1 {
464
0
            let epoch = self.epoch.load(Ordering::Relaxed);
465
0
            let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
466
467
            // Update the local epoch only if the global epoch is greater than the local epoch.
468
0
            if epoch != global_epoch {
469
0
                // We store the new epoch with `Release` because we need to ensure any memory
470
0
                // accesses from the previous epoch do not leak into the new one.
471
0
                self.epoch.store(global_epoch, Ordering::Release);
472
0
473
0
                // However, we don't need a following `SeqCst` fence, because it is safe for memory
474
0
                // accesses from the new epoch to be executed before updating the local epoch. At
475
0
                // worse, other threads will see the new epoch late and delay GC slightly.
476
0
            }
477
0
        }
478
0
    }
479
480
    /// Increments the handle count.
481
    #[inline]
482
0
    pub(crate) fn acquire_handle(&self) {
483
0
        let handle_count = self.handle_count.get();
484
0
        debug_assert!(handle_count >= 1);
485
0
        self.handle_count.set(handle_count + 1);
486
0
    }
487
488
    /// Decrements the handle count.
489
    #[inline]
490
50.3k
    pub(crate) fn release_handle(&self) {
491
50.3k
        let guard_count = self.guard_count.get();
492
50.3k
        let handle_count = self.handle_count.get();
493
50.3k
        debug_assert!(handle_count >= 1);
494
50.3k
        self.handle_count.set(handle_count - 1);
495
496
50.3k
        if guard_count == 0 && handle_count == 1 {
497
50.3k
            self.finalize();
498
50.3k
        }
499
50.3k
    }
<crossbeam_epoch::internal::Local>::release_handle
Line
Count
Source
490
50.3k
    pub(crate) fn release_handle(&self) {
491
50.3k
        let guard_count = self.guard_count.get();
492
50.3k
        let handle_count = self.handle_count.get();
493
50.3k
        debug_assert!(handle_count >= 1);
494
50.3k
        self.handle_count.set(handle_count - 1);
495
496
50.3k
        if guard_count == 0 && handle_count == 1 {
497
50.3k
            self.finalize();
498
50.3k
        }
499
50.3k
    }
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::release_handle
500
501
    /// Removes the `Local` from the global linked list.
502
    #[cold]
503
50.3k
    fn finalize(&self) {
504
50.3k
        debug_assert_eq!(self.guard_count.get(), 0);
505
50.3k
        debug_assert_eq!(self.handle_count.get(), 0);
506
507
        // Temporarily increment handle count. This is required so that the following call to `pin`
508
        // doesn't call `finalize` again.
509
50.3k
        self.handle_count.set(1);
510
        unsafe {
511
            // Pin and move the local bag into the global queue. It's important that `push_bag`
512
            // doesn't defer destruction on any new garbage.
513
50.3k
            let guard = &self.pin();
514
50.3k
            self.global()
515
50.3k
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
516
        }
517
        // Revert the handle count back to zero.
518
50.3k
        self.handle_count.set(0);
519
520
        unsafe {
521
            // Take the reference to the `Global` out of this `Local`. Since we're not protected
522
            // by a guard at this time, it's crucial that the reference is read before marking the
523
            // `Local` as deleted.
524
50.3k
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
525
526
            // Mark this node in the linked list as deleted.
527
50.3k
            self.entry.delete(unprotected());
528
529
            // Finally, drop the reference to the global. Note that this might be the last reference
530
            // to the `Global`. If so, the global data will be destroyed and all deferred functions
531
            // in its queue will be executed.
532
50.3k
            drop(collector);
533
        }
534
50.3k
    }
535
}
536
537
impl IsElement<Self> for Local {
538
50.3k
    fn entry_of(local: &Self) -> &Entry {
539
        // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
540
        unsafe {
541
50.3k
            let entry_ptr = (local as *const Self).cast::<Entry>();
542
50.3k
            &*entry_ptr
543
        }
544
50.3k
    }
545
546
24.4M
    unsafe fn element_of(entry: &Entry) -> &Self {
547
        // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it.
548
24.4M
        let local_ptr = (entry as *const Entry).cast::<Self>();
549
24.4M
        &*local_ptr
550
24.4M
    }
551
552
50.3k
    unsafe fn finalize(entry: &Entry, guard: &Guard) {
553
50.3k
        guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
554
50.3k
    }
555
}
556
557
#[cfg(all(test, not(crossbeam_loom)))]
558
mod tests {
559
    use std::sync::atomic::{AtomicUsize, Ordering};
560
561
    use super::*;
562
563
    #[test]
564
    fn check_defer() {
565
        static FLAG: AtomicUsize = AtomicUsize::new(0);
566
        fn set() {
567
            FLAG.store(42, Ordering::Relaxed);
568
        }
569
570
        let d = Deferred::new(set);
571
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
572
        d.call();
573
        assert_eq!(FLAG.load(Ordering::Relaxed), 42);
574
    }
575
576
    #[test]
577
    fn check_bag() {
578
        static FLAG: AtomicUsize = AtomicUsize::new(0);
579
        fn incr() {
580
            FLAG.fetch_add(1, Ordering::Relaxed);
581
        }
582
583
        let mut bag = Bag::new();
584
        assert!(bag.is_empty());
585
586
        for _ in 0..MAX_OBJECTS {
587
            assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
588
            assert!(!bag.is_empty());
589
            assert_eq!(FLAG.load(Ordering::Relaxed), 0);
590
        }
591
592
        let result = unsafe { bag.try_push(Deferred::new(incr)) };
593
        assert!(result.is_err());
594
        assert!(!bag.is_empty());
595
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
596
597
        drop(bag);
598
        assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
599
    }
600
}