/rust/registry/src/index.crates.io-1949cf8c6b5b557f/crossbeam-epoch-0.9.18/src/internal.rs
Line | Count | Source |
1 | | //! The global data and participant for garbage collection. |
2 | | //! |
3 | | //! # Registration |
4 | | //! |
5 | | //! In order to track all participants in one place, we need some form of participant |
6 | | //! registration. When a participant is created, it is registered to a global lock-free |
7 | | //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the |
8 | | //! list. |
9 | | //! |
10 | | //! # Pinning |
11 | | //! |
12 | | //! Every participant contains an integer that tells whether the participant is pinned and if so, |
13 | | //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that |
14 | | //! aids in periodic global epoch advancement. |
15 | | //! |
16 | | //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned. |
17 | | //! Guards are necessary for performing atomic operations, and for freeing/dropping locations. |
18 | | //! |
19 | | //! # Thread-local bag |
20 | | //! |
21 | | //! Objects that get unlinked from concurrent data structures must be stashed away until the global |
22 | | //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects |
23 | | //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current |
24 | | //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages |
25 | | //! for amortizing the synchronization cost of pushing the garbages to a global queue. |
26 | | //! |
27 | | //! # Global queue |
28 | | //! |
29 | | //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and |
30 | | //! destroyed along the way. This design reduces contention on data structures. The global queue |
31 | | //! cannot be explicitly accessed: the only way to interact with it is by calling functions |
32 | | //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers |
33 | | //! garbage collection. |
34 | | //! |
35 | | //! Ideally each instance of concurrent data structure may have its own queue that gets fully |
36 | | //! destroyed as soon as the data structure gets dropped. |
37 | | |
38 | | use crate::primitive::cell::UnsafeCell; |
39 | | use crate::primitive::sync::atomic::{self, Ordering}; |
40 | | use core::cell::Cell; |
41 | | use core::mem::{self, ManuallyDrop}; |
42 | | use core::num::Wrapping; |
43 | | use core::{fmt, ptr}; |
44 | | |
45 | | use crossbeam_utils::CachePadded; |
46 | | |
47 | | use crate::atomic::{Owned, Shared}; |
48 | | use crate::collector::{Collector, LocalHandle}; |
49 | | use crate::deferred::Deferred; |
50 | | use crate::epoch::{AtomicEpoch, Epoch}; |
51 | | use crate::guard::{unprotected, Guard}; |
52 | | use crate::sync::list::{Entry, IsElement, IterError, List}; |
53 | | use crate::sync::queue::Queue; |
54 | | |
55 | | /// Maximum number of objects a bag can contain. |
56 | | #[cfg(not(any(crossbeam_sanitize, miri)))] |
57 | | const MAX_OBJECTS: usize = 64; |
58 | | // Makes it more likely to trigger any potential data races. |
59 | | #[cfg(any(crossbeam_sanitize, miri))] |
60 | | const MAX_OBJECTS: usize = 4; |
61 | | |
62 | | /// A bag of deferred functions. |
63 | | pub(crate) struct Bag { |
64 | | /// Stashed objects. |
65 | | deferreds: [Deferred; MAX_OBJECTS], |
66 | | len: usize, |
67 | | } |
68 | | |
69 | | /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions. |
70 | | unsafe impl Send for Bag {} |
71 | | |
72 | | impl Bag { |
73 | | /// Returns a new, empty bag. |
74 | 100k | pub(crate) fn new() -> Self { |
75 | 100k | Self::default() |
76 | 100k | } |
77 | | |
78 | | /// Returns `true` if the bag is empty. |
79 | 0 | pub(crate) fn is_empty(&self) -> bool { |
80 | 0 | self.len == 0 |
81 | 0 | } |
82 | | |
83 | | /// Attempts to insert a deferred function into the bag. |
84 | | /// |
85 | | /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is |
86 | | /// full. |
87 | | /// |
88 | | /// # Safety |
89 | | /// |
90 | | /// It should be safe for another thread to execute the given function. |
91 | 100k | pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> { |
92 | 100k | if self.len < MAX_OBJECTS { |
93 | 100k | self.deferreds[self.len] = deferred; |
94 | 100k | self.len += 1; |
95 | 100k | Ok(()) |
96 | | } else { |
97 | 0 | Err(deferred) |
98 | | } |
99 | 100k | } |
100 | | |
101 | | /// Seals the bag with the given epoch. |
102 | 50.3k | fn seal(self, epoch: Epoch) -> SealedBag { |
103 | 50.3k | SealedBag { epoch, _bag: self } |
104 | 50.3k | } |
105 | | } |
106 | | |
107 | | impl Default for Bag { |
108 | 100k | fn default() -> Self { |
109 | 100k | Bag { |
110 | 100k | len: 0, |
111 | 100k | deferreds: [Deferred::NO_OP; MAX_OBJECTS], |
112 | 100k | } |
113 | 100k | } |
114 | | } |
115 | | |
116 | | impl Drop for Bag { |
117 | 100k | fn drop(&mut self) { |
118 | | // Call all deferred functions. |
119 | 100k | for deferred in &mut self.deferreds[..self.len] { |
120 | 100k | let no_op = Deferred::NO_OP; |
121 | 100k | let owned_deferred = mem::replace(deferred, no_op); |
122 | 100k | owned_deferred.call(); |
123 | 100k | } |
124 | 100k | } |
125 | | } |
126 | | |
127 | | // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long |
128 | | impl fmt::Debug for Bag { |
129 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
130 | 0 | f.debug_struct("Bag") |
131 | 0 | .field("deferreds", &&self.deferreds[..self.len]) |
132 | 0 | .finish() |
133 | 0 | } |
134 | | } |
135 | | |
136 | | /// A pair of an epoch and a bag. |
137 | | #[derive(Default, Debug)] |
138 | | struct SealedBag { |
139 | | epoch: Epoch, |
140 | | _bag: Bag, |
141 | | } |
142 | | |
143 | | /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch. |
144 | | unsafe impl Sync for SealedBag {} |
145 | | |
146 | | impl SealedBag { |
147 | | /// Checks if it is safe to drop the bag w.r.t. the given global epoch. |
148 | 82.2k | fn is_expired(&self, global_epoch: Epoch) -> bool { |
149 | | // A pinned participant can witness at most one epoch advancement. Therefore, any bag that |
150 | | // is within one epoch of the current one cannot be destroyed yet. |
151 | 82.2k | global_epoch.wrapping_sub(self.epoch) >= 2 |
152 | 82.2k | } |
153 | | } |
154 | | |
155 | | /// The global data for a garbage collector. |
156 | | pub(crate) struct Global { |
157 | | /// The intrusive linked list of `Local`s. |
158 | | locals: List<Local>, |
159 | | |
160 | | /// The global queue of bags of deferred functions. |
161 | | queue: Queue<SealedBag>, |
162 | | |
163 | | /// The global epoch. |
164 | | pub(crate) epoch: CachePadded<AtomicEpoch>, |
165 | | } |
166 | | |
167 | | impl Global { |
168 | | /// Number of bags to destroy. |
169 | | const COLLECT_STEPS: usize = 8; |
170 | | |
171 | | /// Creates a new global data for garbage collection. |
172 | | #[inline] |
173 | 2 | pub(crate) fn new() -> Self { |
174 | 2 | Self { |
175 | 2 | locals: List::new(), |
176 | 2 | queue: Queue::new(), |
177 | 2 | epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), |
178 | 2 | } |
179 | 2 | } |
180 | | |
181 | | /// Pushes the bag into the global queue and replaces the bag with a new empty bag. |
182 | 50.3k | pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) { |
183 | 50.3k | let bag = mem::replace(bag, Bag::new()); |
184 | | |
185 | 50.3k | atomic::fence(Ordering::SeqCst); |
186 | | |
187 | 50.3k | let epoch = self.epoch.load(Ordering::Relaxed); |
188 | 50.3k | self.queue.push(bag.seal(epoch), guard); |
189 | 50.3k | } |
190 | | |
191 | | /// Collects several bags from the global queue and executes deferred functions in them. |
192 | | /// |
193 | | /// Note: This may itself produce garbage and in turn allocate new bags. |
194 | | /// |
195 | | /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold |
196 | | /// path. In other words, we want the compiler to optimize branching for the case when |
197 | | /// `collect()` is not called. |
198 | | #[cold] |
199 | 878k | pub(crate) fn collect(&self, guard: &Guard) { |
200 | 878k | let global_epoch = self.try_advance(guard); |
201 | | |
202 | 878k | let steps = if cfg!(crossbeam_sanitize) { |
203 | 0 | usize::max_value() |
204 | | } else { |
205 | 878k | Self::COLLECT_STEPS |
206 | | }; |
207 | | |
208 | 878k | for _ in 0..steps { |
209 | 925k | match self.queue.try_pop_if( |
210 | 82.2k | &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch), |
211 | 925k | guard, |
212 | | ) { |
213 | 875k | None => break, |
214 | 50.2k | Some(sealed_bag) => drop(sealed_bag), |
215 | | } |
216 | | } |
217 | 878k | } |
218 | | |
219 | | /// Attempts to advance the global epoch. |
220 | | /// |
221 | | /// The global epoch can advance only if all currently pinned participants have been pinned in |
222 | | /// the current epoch. |
223 | | /// |
224 | | /// Returns the current global epoch. |
225 | | /// |
226 | | /// `try_advance()` is annotated `#[cold]` because it is rarely called. |
227 | | #[cold] |
228 | 878k | pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch { |
229 | 878k | let global_epoch = self.epoch.load(Ordering::Relaxed); |
230 | 878k | atomic::fence(Ordering::SeqCst); |
231 | | |
232 | | // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly |
233 | | // easy to implement in a lock-free manner. However, traversal can be slow due to cache |
234 | | // misses and data dependencies. We should experiment with other data structures as well. |
235 | 24.3M | for local in self.locals.iter(guard) { |
236 | 24.3M | match local { |
237 | | Err(IterError::Stalled) => { |
238 | | // A concurrent thread stalled this iteration. That thread might also try to |
239 | | // advance the epoch, in which case we leave the job to it. Otherwise, the |
240 | | // epoch will not be advanced. |
241 | 1.47k | return global_epoch; |
242 | | } |
243 | 24.3M | Ok(local) => { |
244 | 24.3M | let local_epoch = local.epoch.load(Ordering::Relaxed); |
245 | | |
246 | | // If the participant was pinned in a different epoch, we cannot advance the |
247 | | // global epoch just yet. |
248 | 24.3M | if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { |
249 | 252k | return global_epoch; |
250 | 24.0M | } |
251 | | } |
252 | | } |
253 | | } |
254 | 623k | atomic::fence(Ordering::Acquire); |
255 | | |
256 | | // All pinned participants were pinned in the current global epoch. |
257 | | // Now let's advance the global epoch... |
258 | | // |
259 | | // Note that if another thread already advanced it before us, this store will simply |
260 | | // overwrite the global epoch with the same value. This is true because `try_advance` was |
261 | | // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be |
262 | | // advanced two steps ahead of it. |
263 | 623k | let new_epoch = global_epoch.successor(); |
264 | 623k | self.epoch.store(new_epoch, Ordering::Release); |
265 | 623k | new_epoch |
266 | 878k | } |
267 | | } |
268 | | |
269 | | /// Participant for garbage collection. |
270 | | #[repr(C)] // Note: `entry` must be the first field |
271 | | pub(crate) struct Local { |
272 | | /// A node in the intrusive linked list of `Local`s. |
273 | | entry: Entry, |
274 | | |
275 | | /// A reference to the global data. |
276 | | /// |
277 | | /// When all guards and handles get dropped, this reference is destroyed. |
278 | | collector: UnsafeCell<ManuallyDrop<Collector>>, |
279 | | |
280 | | /// The local bag of deferred functions. |
281 | | pub(crate) bag: UnsafeCell<Bag>, |
282 | | |
283 | | /// The number of guards keeping this participant pinned. |
284 | | guard_count: Cell<usize>, |
285 | | |
286 | | /// The number of active handles. |
287 | | handle_count: Cell<usize>, |
288 | | |
289 | | /// Total number of pinnings performed. |
290 | | /// |
291 | | /// This is just an auxiliary counter that sometimes kicks off collection. |
292 | | pin_count: Cell<Wrapping<usize>>, |
293 | | |
294 | | /// The local epoch. |
295 | | epoch: CachePadded<AtomicEpoch>, |
296 | | } |
297 | | |
298 | | // Make sure `Local` is less than or equal to 2048 bytes. |
299 | | // https://github.com/crossbeam-rs/crossbeam/issues/551 |
300 | | #[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local` |
301 | | #[test] |
302 | | fn local_size() { |
303 | | // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869 |
304 | | // assert!( |
305 | | // core::mem::size_of::<Local>() <= 2048, |
306 | | // "An allocation of `Local` should be <= 2048 bytes." |
307 | | // ); |
308 | | } |
309 | | |
310 | | impl Local { |
311 | | /// Number of pinnings after which a participant will execute some deferred functions from the |
312 | | /// global queue. |
313 | | const PINNINGS_BETWEEN_COLLECT: usize = 128; |
314 | | |
315 | | /// Registers a new `Local` in the provided `Global`. |
316 | 50.3k | pub(crate) fn register(collector: &Collector) -> LocalHandle { |
317 | | unsafe { |
318 | | // Since we dereference no pointers in this block, it is safe to use `unprotected`. |
319 | | |
320 | 50.3k | let local = Owned::new(Local { |
321 | 50.3k | entry: Entry::default(), |
322 | 50.3k | collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())), |
323 | 50.3k | bag: UnsafeCell::new(Bag::new()), |
324 | 50.3k | guard_count: Cell::new(0), |
325 | 50.3k | handle_count: Cell::new(1), |
326 | 50.3k | pin_count: Cell::new(Wrapping(0)), |
327 | 50.3k | epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), |
328 | 50.3k | }) |
329 | 50.3k | .into_shared(unprotected()); |
330 | 50.3k | collector.global.locals.insert(local, unprotected()); |
331 | 50.3k | LocalHandle { |
332 | 50.3k | local: local.as_raw(), |
333 | 50.3k | } |
334 | | } |
335 | 50.3k | } |
336 | | |
337 | | /// Returns a reference to the `Global` in which this `Local` resides. |
338 | | #[inline] |
339 | 109M | pub(crate) fn global(&self) -> &Global { |
340 | 109M | &self.collector().global |
341 | 109M | } <crossbeam_epoch::internal::Local>::global Line | Count | Source | 339 | 109M | pub(crate) fn global(&self) -> &Global { | 340 | 109M | &self.collector().global | 341 | 109M | } |
<crossbeam_epoch::internal::Local>::global Line | Count | Source | 339 | 100k | pub(crate) fn global(&self) -> &Global { | 340 | 100k | &self.collector().global | 341 | 100k | } |
|
342 | | |
343 | | /// Returns a reference to the `Collector` in which this `Local` resides. |
344 | | #[inline] |
345 | 109M | pub(crate) fn collector(&self) -> &Collector { |
346 | 109M | self.collector.with(|c| unsafe { &**c }) |
347 | 109M | } <crossbeam_epoch::internal::Local>::collector Line | Count | Source | 345 | 109M | pub(crate) fn collector(&self) -> &Collector { | 346 | 109M | self.collector.with(|c| unsafe { &**c }) | 347 | 109M | } |
<crossbeam_epoch::internal::Local>::collector Line | Count | Source | 345 | 100k | pub(crate) fn collector(&self) -> &Collector { | 346 | 100k | self.collector.with(|c| unsafe { &**c }) | 347 | 100k | } |
|
348 | | |
349 | | /// Returns `true` if the current participant is pinned. |
350 | | #[inline] |
351 | 108M | pub(crate) fn is_pinned(&self) -> bool { |
352 | 108M | self.guard_count.get() > 0 |
353 | 108M | } <crossbeam_epoch::internal::Local>::is_pinned Line | Count | Source | 351 | 108M | pub(crate) fn is_pinned(&self) -> bool { | 352 | 108M | self.guard_count.get() > 0 | 353 | 108M | } |
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::is_pinned |
354 | | |
355 | | /// Adds `deferred` to the thread-local bag. |
356 | | /// |
357 | | /// # Safety |
358 | | /// |
359 | | /// It should be safe for another thread to execute the given function. |
360 | 100k | pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) { |
361 | 100k | let bag = self.bag.with_mut(|b| &mut *b); |
362 | | |
363 | 100k | while let Err(d) = bag.try_push(deferred) { |
364 | 0 | self.global().push_bag(bag, guard); |
365 | 0 | deferred = d; |
366 | 0 | } |
367 | 100k | } |
368 | | |
369 | 0 | pub(crate) fn flush(&self, guard: &Guard) { |
370 | 0 | let bag = self.bag.with_mut(|b| unsafe { &mut *b }); |
371 | | |
372 | 0 | if !bag.is_empty() { |
373 | 0 | self.global().push_bag(bag, guard); |
374 | 0 | } |
375 | | |
376 | 0 | self.global().collect(guard); |
377 | 0 | } |
378 | | |
379 | | /// Pins the `Local`. |
380 | | #[inline] |
381 | 108M | pub(crate) fn pin(&self) -> Guard { |
382 | 108M | let guard = Guard { local: self }; |
383 | | |
384 | 108M | let guard_count = self.guard_count.get(); |
385 | 108M | self.guard_count.set(guard_count.checked_add(1).unwrap()); |
386 | | |
387 | 108M | if guard_count == 0 { |
388 | 108M | let global_epoch = self.global().epoch.load(Ordering::Relaxed); |
389 | 108M | let new_epoch = global_epoch.pinned(); |
390 | | |
391 | | // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. |
392 | | // The fence makes sure that any future loads from `Atomic`s will not happen before |
393 | | // this store. |
394 | 108M | if cfg!(all( |
395 | | any(target_arch = "x86", target_arch = "x86_64"), |
396 | | not(miri) |
397 | | )) { |
398 | | // HACK(stjepang): On x86 architectures there are two different ways of executing |
399 | | // a `SeqCst` fence. |
400 | | // |
401 | | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. |
402 | | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` |
403 | | // instruction. |
404 | | // |
405 | | // Both instructions have the effect of a full barrier, but benchmarks have shown |
406 | | // that the second one makes pinning faster in this particular case. It is not |
407 | | // clear that this is permitted by the C++ memory model (SC fences work very |
408 | | // differently from SC accesses), but experimental evidence suggests that this |
409 | | // works fine. Using inline assembly would be a viable (and correct) alternative, |
410 | | // but alas, that is not possible on stable Rust. |
411 | 108M | let current = Epoch::starting(); |
412 | 108M | let res = self.epoch.compare_exchange( |
413 | 108M | current, |
414 | 108M | new_epoch, |
415 | 108M | Ordering::SeqCst, |
416 | 108M | Ordering::SeqCst, |
417 | 108M | ); |
418 | 108M | debug_assert!(res.is_ok(), "participant was expected to be unpinned"); |
419 | | // We add a compiler fence to make it less likely for LLVM to do something wrong |
420 | | // here. Formally, this is not enough to get rid of data races; practically, |
421 | | // it should go a long way. |
422 | 108M | atomic::compiler_fence(Ordering::SeqCst); |
423 | 0 | } else { |
424 | 0 | self.epoch.store(new_epoch, Ordering::Relaxed); |
425 | 0 | atomic::fence(Ordering::SeqCst); |
426 | 0 | } |
427 | | |
428 | | // Increment the pin counter. |
429 | 108M | let count = self.pin_count.get(); |
430 | 108M | self.pin_count.set(count + Wrapping(1)); |
431 | | |
432 | | // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting |
433 | | // some garbage. |
434 | 108M | if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { |
435 | 878k | self.global().collect(&guard); |
436 | 107M | } |
437 | 0 | } |
438 | | |
439 | 108M | guard |
440 | 108M | } <crossbeam_epoch::internal::Local>::pin Line | Count | Source | 381 | 108M | pub(crate) fn pin(&self) -> Guard { | 382 | 108M | let guard = Guard { local: self }; | 383 | | | 384 | 108M | let guard_count = self.guard_count.get(); | 385 | 108M | self.guard_count.set(guard_count.checked_add(1).unwrap()); | 386 | | | 387 | 108M | if guard_count == 0 { | 388 | 108M | let global_epoch = self.global().epoch.load(Ordering::Relaxed); | 389 | 108M | let new_epoch = global_epoch.pinned(); | 390 | | | 391 | | // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. | 392 | | // The fence makes sure that any future loads from `Atomic`s will not happen before | 393 | | // this store. | 394 | 108M | if cfg!(all( | 395 | | any(target_arch = "x86", target_arch = "x86_64"), | 396 | | not(miri) | 397 | | )) { | 398 | | // HACK(stjepang): On x86 architectures there are two different ways of executing | 399 | | // a `SeqCst` fence. | 400 | | // | 401 | | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. | 402 | | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` | 403 | | // instruction. | 404 | | // | 405 | | // Both instructions have the effect of a full barrier, but benchmarks have shown | 406 | | // that the second one makes pinning faster in this particular case. It is not | 407 | | // clear that this is permitted by the C++ memory model (SC fences work very | 408 | | // differently from SC accesses), but experimental evidence suggests that this | 409 | | // works fine. Using inline assembly would be a viable (and correct) alternative, | 410 | | // but alas, that is not possible on stable Rust. | 411 | 108M | let current = Epoch::starting(); | 412 | 108M | let res = self.epoch.compare_exchange( | 413 | 108M | current, | 414 | 108M | new_epoch, | 415 | 108M | Ordering::SeqCst, | 416 | 108M | Ordering::SeqCst, | 417 | 108M | ); | 418 | 108M | debug_assert!(res.is_ok(), "participant was expected to be unpinned"); | 419 | | // We add a compiler fence to make it less likely for LLVM to do something wrong | 420 | | // here. Formally, this is not enough to get rid of data races; practically, | 421 | | // it should go a long way. | 422 | 108M | atomic::compiler_fence(Ordering::SeqCst); | 423 | 0 | } else { | 424 | 0 | self.epoch.store(new_epoch, Ordering::Relaxed); | 425 | 0 | atomic::fence(Ordering::SeqCst); | 426 | 0 | } | 427 | | | 428 | | // Increment the pin counter. | 429 | 108M | let count = self.pin_count.get(); | 430 | 108M | self.pin_count.set(count + Wrapping(1)); | 431 | | | 432 | | // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting | 433 | | // some garbage. | 434 | 108M | if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { | 435 | 878k | self.global().collect(&guard); | 436 | 107M | } | 437 | 0 | } | 438 | | | 439 | 108M | guard | 440 | 108M | } |
<crossbeam_epoch::internal::Local>::pin Line | Count | Source | 381 | 50.3k | pub(crate) fn pin(&self) -> Guard { | 382 | 50.3k | let guard = Guard { local: self }; | 383 | | | 384 | 50.3k | let guard_count = self.guard_count.get(); | 385 | 50.3k | self.guard_count.set(guard_count.checked_add(1).unwrap()); | 386 | | | 387 | 50.3k | if guard_count == 0 { | 388 | 50.3k | let global_epoch = self.global().epoch.load(Ordering::Relaxed); | 389 | 50.3k | let new_epoch = global_epoch.pinned(); | 390 | | | 391 | | // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. | 392 | | // The fence makes sure that any future loads from `Atomic`s will not happen before | 393 | | // this store. | 394 | 50.3k | if cfg!(all( | 395 | | any(target_arch = "x86", target_arch = "x86_64"), | 396 | | not(miri) | 397 | | )) { | 398 | | // HACK(stjepang): On x86 architectures there are two different ways of executing | 399 | | // a `SeqCst` fence. | 400 | | // | 401 | | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. | 402 | | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` | 403 | | // instruction. | 404 | | // | 405 | | // Both instructions have the effect of a full barrier, but benchmarks have shown | 406 | | // that the second one makes pinning faster in this particular case. It is not | 407 | | // clear that this is permitted by the C++ memory model (SC fences work very | 408 | | // differently from SC accesses), but experimental evidence suggests that this | 409 | | // works fine. Using inline assembly would be a viable (and correct) alternative, | 410 | | // but alas, that is not possible on stable Rust. | 411 | 50.3k | let current = Epoch::starting(); | 412 | 50.3k | let res = self.epoch.compare_exchange( | 413 | 50.3k | current, | 414 | 50.3k | new_epoch, | 415 | 50.3k | Ordering::SeqCst, | 416 | 50.3k | Ordering::SeqCst, | 417 | 50.3k | ); | 418 | 50.3k | debug_assert!(res.is_ok(), "participant was expected to be unpinned"); | 419 | | // We add a compiler fence to make it less likely for LLVM to do something wrong | 420 | | // here. Formally, this is not enough to get rid of data races; practically, | 421 | | // it should go a long way. | 422 | 50.3k | atomic::compiler_fence(Ordering::SeqCst); | 423 | 0 | } else { | 424 | 0 | self.epoch.store(new_epoch, Ordering::Relaxed); | 425 | 0 | atomic::fence(Ordering::SeqCst); | 426 | 0 | } | 427 | | | 428 | | // Increment the pin counter. | 429 | 50.3k | let count = self.pin_count.get(); | 430 | 50.3k | self.pin_count.set(count + Wrapping(1)); | 431 | | | 432 | | // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting | 433 | | // some garbage. | 434 | 50.3k | if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { | 435 | 12 | self.global().collect(&guard); | 436 | 50.3k | } | 437 | 0 | } | 438 | | | 439 | 50.3k | guard | 440 | 50.3k | } |
|
441 | | |
442 | | /// Unpins the `Local`. |
443 | | #[inline] |
444 | 108M | pub(crate) fn unpin(&self) { |
445 | 108M | let guard_count = self.guard_count.get(); |
446 | 108M | self.guard_count.set(guard_count - 1); |
447 | | |
448 | 108M | if guard_count == 1 { |
449 | 108M | self.epoch.store(Epoch::starting(), Ordering::Release); |
450 | | |
451 | 108M | if self.handle_count.get() == 0 { |
452 | 0 | self.finalize(); |
453 | 108M | } |
454 | 0 | } |
455 | 108M | } |
456 | | |
457 | | /// Unpins and then pins the `Local`. |
458 | | #[inline] |
459 | 0 | pub(crate) fn repin(&self) { |
460 | 0 | let guard_count = self.guard_count.get(); |
461 | | |
462 | | // Update the local epoch only if there's only one guard. |
463 | 0 | if guard_count == 1 { |
464 | 0 | let epoch = self.epoch.load(Ordering::Relaxed); |
465 | 0 | let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned(); |
466 | | |
467 | | // Update the local epoch only if the global epoch is greater than the local epoch. |
468 | 0 | if epoch != global_epoch { |
469 | 0 | // We store the new epoch with `Release` because we need to ensure any memory |
470 | 0 | // accesses from the previous epoch do not leak into the new one. |
471 | 0 | self.epoch.store(global_epoch, Ordering::Release); |
472 | 0 |
|
473 | 0 | // However, we don't need a following `SeqCst` fence, because it is safe for memory |
474 | 0 | // accesses from the new epoch to be executed before updating the local epoch. At |
475 | 0 | // worse, other threads will see the new epoch late and delay GC slightly. |
476 | 0 | } |
477 | 0 | } |
478 | 0 | } |
479 | | |
480 | | /// Increments the handle count. |
481 | | #[inline] |
482 | 0 | pub(crate) fn acquire_handle(&self) { |
483 | 0 | let handle_count = self.handle_count.get(); |
484 | 0 | debug_assert!(handle_count >= 1); |
485 | 0 | self.handle_count.set(handle_count + 1); |
486 | 0 | } |
487 | | |
488 | | /// Decrements the handle count. |
489 | | #[inline] |
490 | 50.3k | pub(crate) fn release_handle(&self) { |
491 | 50.3k | let guard_count = self.guard_count.get(); |
492 | 50.3k | let handle_count = self.handle_count.get(); |
493 | 50.3k | debug_assert!(handle_count >= 1); |
494 | 50.3k | self.handle_count.set(handle_count - 1); |
495 | | |
496 | 50.3k | if guard_count == 0 && handle_count == 1 { |
497 | 50.3k | self.finalize(); |
498 | 50.3k | } |
499 | 50.3k | } <crossbeam_epoch::internal::Local>::release_handle Line | Count | Source | 490 | 50.3k | pub(crate) fn release_handle(&self) { | 491 | 50.3k | let guard_count = self.guard_count.get(); | 492 | 50.3k | let handle_count = self.handle_count.get(); | 493 | 50.3k | debug_assert!(handle_count >= 1); | 494 | 50.3k | self.handle_count.set(handle_count - 1); | 495 | | | 496 | 50.3k | if guard_count == 0 && handle_count == 1 { | 497 | 50.3k | self.finalize(); | 498 | 50.3k | } | 499 | 50.3k | } |
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::release_handle |
500 | | |
501 | | /// Removes the `Local` from the global linked list. |
502 | | #[cold] |
503 | 50.3k | fn finalize(&self) { |
504 | 50.3k | debug_assert_eq!(self.guard_count.get(), 0); |
505 | 50.3k | debug_assert_eq!(self.handle_count.get(), 0); |
506 | | |
507 | | // Temporarily increment handle count. This is required so that the following call to `pin` |
508 | | // doesn't call `finalize` again. |
509 | 50.3k | self.handle_count.set(1); |
510 | | unsafe { |
511 | | // Pin and move the local bag into the global queue. It's important that `push_bag` |
512 | | // doesn't defer destruction on any new garbage. |
513 | 50.3k | let guard = &self.pin(); |
514 | 50.3k | self.global() |
515 | 50.3k | .push_bag(self.bag.with_mut(|b| &mut *b), guard); |
516 | | } |
517 | | // Revert the handle count back to zero. |
518 | 50.3k | self.handle_count.set(0); |
519 | | |
520 | | unsafe { |
521 | | // Take the reference to the `Global` out of this `Local`. Since we're not protected |
522 | | // by a guard at this time, it's crucial that the reference is read before marking the |
523 | | // `Local` as deleted. |
524 | 50.3k | let collector: Collector = ptr::read(self.collector.with(|c| &*(*c))); |
525 | | |
526 | | // Mark this node in the linked list as deleted. |
527 | 50.3k | self.entry.delete(unprotected()); |
528 | | |
529 | | // Finally, drop the reference to the global. Note that this might be the last reference |
530 | | // to the `Global`. If so, the global data will be destroyed and all deferred functions |
531 | | // in its queue will be executed. |
532 | 50.3k | drop(collector); |
533 | | } |
534 | 50.3k | } |
535 | | } |
536 | | |
537 | | impl IsElement<Self> for Local { |
538 | 50.3k | fn entry_of(local: &Self) -> &Entry { |
539 | | // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it. |
540 | | unsafe { |
541 | 50.3k | let entry_ptr = (local as *const Self).cast::<Entry>(); |
542 | 50.3k | &*entry_ptr |
543 | | } |
544 | 50.3k | } |
545 | | |
546 | 24.4M | unsafe fn element_of(entry: &Entry) -> &Self { |
547 | | // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it. |
548 | 24.4M | let local_ptr = (entry as *const Entry).cast::<Self>(); |
549 | 24.4M | &*local_ptr |
550 | 24.4M | } |
551 | | |
552 | 50.3k | unsafe fn finalize(entry: &Entry, guard: &Guard) { |
553 | 50.3k | guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)); |
554 | 50.3k | } |
555 | | } |
556 | | |
557 | | #[cfg(all(test, not(crossbeam_loom)))] |
558 | | mod tests { |
559 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
560 | | |
561 | | use super::*; |
562 | | |
563 | | #[test] |
564 | | fn check_defer() { |
565 | | static FLAG: AtomicUsize = AtomicUsize::new(0); |
566 | | fn set() { |
567 | | FLAG.store(42, Ordering::Relaxed); |
568 | | } |
569 | | |
570 | | let d = Deferred::new(set); |
571 | | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
572 | | d.call(); |
573 | | assert_eq!(FLAG.load(Ordering::Relaxed), 42); |
574 | | } |
575 | | |
576 | | #[test] |
577 | | fn check_bag() { |
578 | | static FLAG: AtomicUsize = AtomicUsize::new(0); |
579 | | fn incr() { |
580 | | FLAG.fetch_add(1, Ordering::Relaxed); |
581 | | } |
582 | | |
583 | | let mut bag = Bag::new(); |
584 | | assert!(bag.is_empty()); |
585 | | |
586 | | for _ in 0..MAX_OBJECTS { |
587 | | assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() }); |
588 | | assert!(!bag.is_empty()); |
589 | | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
590 | | } |
591 | | |
592 | | let result = unsafe { bag.try_push(Deferred::new(incr)) }; |
593 | | assert!(result.is_err()); |
594 | | assert!(!bag.is_empty()); |
595 | | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
596 | | |
597 | | drop(bag); |
598 | | assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS); |
599 | | } |
600 | | } |