Coverage Report

Created: 2025-02-25 06:39

/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-epoch-0.9.18/src/collector.rs
Line
Count
Source (jump to first uncovered line)
1
/// Epoch-based garbage collector.
2
///
3
/// # Examples
4
///
5
/// ```
6
/// use crossbeam_epoch::Collector;
7
///
8
/// let collector = Collector::new();
9
///
10
/// let handle = collector.register();
11
/// drop(collector); // `handle` still works after dropping `collector`
12
///
13
/// handle.pin().flush();
14
/// ```
15
use core::fmt;
16
17
use crate::guard::Guard;
18
use crate::internal::{Global, Local};
19
use crate::primitive::sync::Arc;
20
21
/// An epoch-based garbage collector.
22
pub struct Collector {
23
    pub(crate) global: Arc<Global>,
24
}
25
26
unsafe impl Send for Collector {}
27
unsafe impl Sync for Collector {}
28
29
impl Default for Collector {
30
0
    fn default() -> Self {
31
0
        Self {
32
0
            global: Arc::new(Global::new()),
33
0
        }
34
0
    }
35
}
36
37
impl Collector {
38
    /// Creates a new collector.
39
0
    pub fn new() -> Self {
40
0
        Self::default()
41
0
    }
42
43
    /// Registers a new handle for the collector.
44
0
    pub fn register(&self) -> LocalHandle {
45
0
        Local::register(self)
46
0
    }
47
}
48
49
impl Clone for Collector {
50
    /// Creates another reference to the same garbage collector.
51
0
    fn clone(&self) -> Self {
52
0
        Collector {
53
0
            global: self.global.clone(),
54
0
        }
55
0
    }
56
}
57
58
impl fmt::Debug for Collector {
59
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60
0
        f.pad("Collector { .. }")
61
0
    }
62
}
63
64
impl PartialEq for Collector {
65
    /// Checks if both handles point to the same collector.
66
0
    fn eq(&self, rhs: &Collector) -> bool {
67
0
        Arc::ptr_eq(&self.global, &rhs.global)
68
0
    }
69
}
70
impl Eq for Collector {}
71
72
/// A handle to a garbage collector.
73
pub struct LocalHandle {
74
    pub(crate) local: *const Local,
75
}
76
77
impl LocalHandle {
78
    /// Pins the handle.
79
    #[inline]
80
0
    pub fn pin(&self) -> Guard {
81
0
        unsafe { (*self.local).pin() }
82
0
    }
Unexecuted instantiation: <crossbeam_epoch::collector::LocalHandle>::pin
Unexecuted instantiation: <crossbeam_epoch::collector::LocalHandle>::pin
83
84
    /// Returns `true` if the handle is pinned.
85
    #[inline]
86
0
    pub fn is_pinned(&self) -> bool {
87
0
        unsafe { (*self.local).is_pinned() }
88
0
    }
Unexecuted instantiation: <crossbeam_epoch::collector::LocalHandle>::is_pinned
Unexecuted instantiation: <crossbeam_epoch::collector::LocalHandle>::is_pinned
89
90
    /// Returns the `Collector` associated with this handle.
91
    #[inline]
92
0
    pub fn collector(&self) -> &Collector {
93
0
        unsafe { (*self.local).collector() }
94
0
    }
95
}
96
97
impl Drop for LocalHandle {
98
    #[inline]
99
0
    fn drop(&mut self) {
100
0
        unsafe {
101
0
            Local::release_handle(&*self.local);
102
0
        }
103
0
    }
104
}
105
106
impl fmt::Debug for LocalHandle {
107
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108
0
        f.pad("LocalHandle { .. }")
109
0
    }
110
}
111
112
#[cfg(all(test, not(crossbeam_loom)))]
113
mod tests {
114
    use std::mem::ManuallyDrop;
115
    use std::sync::atomic::{AtomicUsize, Ordering};
116
117
    use crossbeam_utils::thread;
118
119
    use crate::{Collector, Owned};
120
121
    const NUM_THREADS: usize = 8;
122
123
    #[test]
124
    fn pin_reentrant() {
125
        let collector = Collector::new();
126
        let handle = collector.register();
127
        drop(collector);
128
129
        assert!(!handle.is_pinned());
130
        {
131
            let _guard = &handle.pin();
132
            assert!(handle.is_pinned());
133
            {
134
                let _guard = &handle.pin();
135
                assert!(handle.is_pinned());
136
            }
137
            assert!(handle.is_pinned());
138
        }
139
        assert!(!handle.is_pinned());
140
    }
141
142
    #[test]
143
    fn flush_local_bag() {
144
        let collector = Collector::new();
145
        let handle = collector.register();
146
        drop(collector);
147
148
        for _ in 0..100 {
149
            let guard = &handle.pin();
150
            unsafe {
151
                let a = Owned::new(7).into_shared(guard);
152
                guard.defer_destroy(a);
153
154
                assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155
156
                while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157
                    guard.flush();
158
                }
159
            }
160
        }
161
    }
162
163
    #[test]
164
    fn garbage_buffering() {
165
        let collector = Collector::new();
166
        let handle = collector.register();
167
        drop(collector);
168
169
        let guard = &handle.pin();
170
        unsafe {
171
            for _ in 0..10 {
172
                let a = Owned::new(7).into_shared(guard);
173
                guard.defer_destroy(a);
174
            }
175
            assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176
        }
177
    }
178
179
    #[test]
180
    fn pin_holds_advance() {
181
        #[cfg(miri)]
182
        const N: usize = 500;
183
        #[cfg(not(miri))]
184
        const N: usize = 500_000;
185
186
        let collector = Collector::new();
187
188
        thread::scope(|scope| {
189
            for _ in 0..NUM_THREADS {
190
                scope.spawn(|_| {
191
                    let handle = collector.register();
192
                    for _ in 0..N {
193
                        let guard = &handle.pin();
194
195
                        let before = collector.global.epoch.load(Ordering::Relaxed);
196
                        collector.global.collect(guard);
197
                        let after = collector.global.epoch.load(Ordering::Relaxed);
198
199
                        assert!(after.wrapping_sub(before) <= 2);
200
                    }
201
                });
202
            }
203
        })
204
        .unwrap();
205
    }
206
207
    #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
208
    #[test]
209
    fn incremental() {
210
        #[cfg(miri)]
211
        const COUNT: usize = 500;
212
        #[cfg(not(miri))]
213
        const COUNT: usize = 100_000;
214
        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
215
216
        let collector = Collector::new();
217
        let handle = collector.register();
218
219
        unsafe {
220
            let guard = &handle.pin();
221
            for _ in 0..COUNT {
222
                let a = Owned::new(7i32).into_shared(guard);
223
                guard.defer_unchecked(move || {
224
                    drop(a.into_owned());
225
                    DESTROYS.fetch_add(1, Ordering::Relaxed);
226
                });
227
            }
228
            guard.flush();
229
        }
230
231
        let mut last = 0;
232
233
        while last < COUNT {
234
            let curr = DESTROYS.load(Ordering::Relaxed);
235
            assert!(curr - last <= 1024);
236
            last = curr;
237
238
            let guard = &handle.pin();
239
            collector.global.collect(guard);
240
        }
241
        assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
242
    }
243
244
    #[test]
245
    fn buffering() {
246
        const COUNT: usize = 10;
247
        #[cfg(miri)]
248
        const N: usize = 500;
249
        #[cfg(not(miri))]
250
        const N: usize = 100_000;
251
        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
252
253
        let collector = Collector::new();
254
        let handle = collector.register();
255
256
        unsafe {
257
            let guard = &handle.pin();
258
            for _ in 0..COUNT {
259
                let a = Owned::new(7i32).into_shared(guard);
260
                guard.defer_unchecked(move || {
261
                    drop(a.into_owned());
262
                    DESTROYS.fetch_add(1, Ordering::Relaxed);
263
                });
264
            }
265
        }
266
267
        for _ in 0..N {
268
            collector.global.collect(&handle.pin());
269
        }
270
        assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
271
272
        handle.pin().flush();
273
274
        while DESTROYS.load(Ordering::Relaxed) < COUNT {
275
            let guard = &handle.pin();
276
            collector.global.collect(guard);
277
        }
278
        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
279
    }
280
281
    #[test]
282
    fn count_drops() {
283
        #[cfg(miri)]
284
        const COUNT: usize = 500;
285
        #[cfg(not(miri))]
286
        const COUNT: usize = 100_000;
287
        static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289
        struct Elem(#[allow(dead_code)] i32);
290
291
        impl Drop for Elem {
292
            fn drop(&mut self) {
293
                DROPS.fetch_add(1, Ordering::Relaxed);
294
            }
295
        }
296
297
        let collector = Collector::new();
298
        let handle = collector.register();
299
300
        unsafe {
301
            let guard = &handle.pin();
302
303
            for _ in 0..COUNT {
304
                let a = Owned::new(Elem(7i32)).into_shared(guard);
305
                guard.defer_destroy(a);
306
            }
307
            guard.flush();
308
        }
309
310
        while DROPS.load(Ordering::Relaxed) < COUNT {
311
            let guard = &handle.pin();
312
            collector.global.collect(guard);
313
        }
314
        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
315
    }
316
317
    #[test]
318
    fn count_destroy() {
319
        #[cfg(miri)]
320
        const COUNT: usize = 500;
321
        #[cfg(not(miri))]
322
        const COUNT: usize = 100_000;
323
        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
324
325
        let collector = Collector::new();
326
        let handle = collector.register();
327
328
        unsafe {
329
            let guard = &handle.pin();
330
331
            for _ in 0..COUNT {
332
                let a = Owned::new(7i32).into_shared(guard);
333
                guard.defer_unchecked(move || {
334
                    drop(a.into_owned());
335
                    DESTROYS.fetch_add(1, Ordering::Relaxed);
336
                });
337
            }
338
            guard.flush();
339
        }
340
341
        while DESTROYS.load(Ordering::Relaxed) < COUNT {
342
            let guard = &handle.pin();
343
            collector.global.collect(guard);
344
        }
345
        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
346
    }
347
348
    #[test]
349
    fn drop_array() {
350
        const COUNT: usize = 700;
351
        static DROPS: AtomicUsize = AtomicUsize::new(0);
352
353
        struct Elem(#[allow(dead_code)] i32);
354
355
        impl Drop for Elem {
356
            fn drop(&mut self) {
357
                DROPS.fetch_add(1, Ordering::Relaxed);
358
            }
359
        }
360
361
        let collector = Collector::new();
362
        let handle = collector.register();
363
364
        let mut guard = handle.pin();
365
366
        let mut v = Vec::with_capacity(COUNT);
367
        for i in 0..COUNT {
368
            v.push(Elem(i as i32));
369
        }
370
371
        {
372
            let a = Owned::new(v).into_shared(&guard);
373
            unsafe {
374
                guard.defer_destroy(a);
375
            }
376
            guard.flush();
377
        }
378
379
        while DROPS.load(Ordering::Relaxed) < COUNT {
380
            guard.repin();
381
            collector.global.collect(&guard);
382
        }
383
        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
384
    }
385
386
    #[test]
387
    fn destroy_array() {
388
        #[cfg(miri)]
389
        const COUNT: usize = 500;
390
        #[cfg(not(miri))]
391
        const COUNT: usize = 100_000;
392
        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
393
394
        let collector = Collector::new();
395
        let handle = collector.register();
396
397
        unsafe {
398
            let guard = &handle.pin();
399
400
            let mut v = Vec::with_capacity(COUNT);
401
            for i in 0..COUNT {
402
                v.push(i as i32);
403
            }
404
405
            let len = v.len();
406
            let cap = v.capacity();
407
            let ptr = ManuallyDrop::new(v).as_mut_ptr();
408
            guard.defer_unchecked(move || {
409
                drop(Vec::from_raw_parts(ptr, len, cap));
410
                DESTROYS.fetch_add(len, Ordering::Relaxed);
411
            });
412
            guard.flush();
413
        }
414
415
        while DESTROYS.load(Ordering::Relaxed) < COUNT {
416
            let guard = &handle.pin();
417
            collector.global.collect(guard);
418
        }
419
        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
420
    }
421
422
    #[test]
423
    fn stress() {
424
        const THREADS: usize = 8;
425
        #[cfg(miri)]
426
        const COUNT: usize = 500;
427
        #[cfg(not(miri))]
428
        const COUNT: usize = 100_000;
429
        static DROPS: AtomicUsize = AtomicUsize::new(0);
430
431
        struct Elem(#[allow(dead_code)] i32);
432
433
        impl Drop for Elem {
434
            fn drop(&mut self) {
435
                DROPS.fetch_add(1, Ordering::Relaxed);
436
            }
437
        }
438
439
        let collector = Collector::new();
440
441
        thread::scope(|scope| {
442
            for _ in 0..THREADS {
443
                scope.spawn(|_| {
444
                    let handle = collector.register();
445
                    for _ in 0..COUNT {
446
                        let guard = &handle.pin();
447
                        unsafe {
448
                            let a = Owned::new(Elem(7i32)).into_shared(guard);
449
                            guard.defer_destroy(a);
450
                        }
451
                    }
452
                });
453
            }
454
        })
455
        .unwrap();
456
457
        let handle = collector.register();
458
        while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
459
            let guard = &handle.pin();
460
            collector.global.collect(guard);
461
        }
462
        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
463
    }
464
}