/rust/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/latch.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use std::marker::PhantomData; |
2 | | use std::ops::Deref; |
3 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
4 | | use std::sync::Arc; |
5 | | use std::usize; |
6 | | |
7 | | use crate::registry::{Registry, WorkerThread}; |
8 | | use crate::sync::{Condvar, Mutex}; |
9 | | |
10 | | /// We define various kinds of latches, which are all a primitive signaling |
11 | | /// mechanism. A latch starts as false. Eventually someone calls `set()` and |
12 | | /// it becomes true. You can test if it has been set by calling `probe()`. |
13 | | /// |
14 | | /// Some kinds of latches, but not all, support a `wait()` operation |
15 | | /// that will wait until the latch is set, blocking efficiently. That |
16 | | /// is not part of the trait since it is not possibly to do with all |
17 | | /// latches. |
18 | | /// |
19 | | /// The intention is that `set()` is called once, but `probe()` may be |
20 | | /// called any number of times. Once `probe()` returns true, the memory |
21 | | /// effects that occurred before `set()` become visible. |
22 | | /// |
23 | | /// It'd probably be better to refactor the API into two paired types, |
24 | | /// but that's a bit of work, and this is not a public API. |
25 | | /// |
26 | | /// ## Memory ordering |
27 | | /// |
28 | | /// Latches need to guarantee two things: |
29 | | /// |
30 | | /// - Once `probe()` returns true, all memory effects from the `set()` |
31 | | /// are visible (in other words, the set should synchronize-with |
32 | | /// the probe). |
33 | | /// - Once `set()` occurs, the next `probe()` *will* observe it. This |
34 | | /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep |
35 | | /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. |
36 | | pub(super) trait Latch { |
37 | | /// Set the latch, signalling others. |
38 | | /// |
39 | | /// # WARNING |
40 | | /// |
41 | | /// Setting a latch triggers other threads to wake up and (in some |
42 | | /// cases) complete. This may, in turn, cause memory to be |
43 | | /// deallocated and so forth. One must be very careful about this, |
44 | | /// and it's typically better to read all the fields you will need |
45 | | /// to access *before* a latch is set! |
46 | | /// |
47 | | /// This function operates on `*const Self` instead of `&self` to allow it |
48 | | /// to become dangling during this call. The caller must ensure that the |
49 | | /// pointer is valid upon entry, and not invalidated during the call by any |
50 | | /// actions other than `set` itself. |
51 | | unsafe fn set(this: *const Self); |
52 | | } |
53 | | |
54 | | pub(super) trait AsCoreLatch { |
55 | | fn as_core_latch(&self) -> &CoreLatch; |
56 | | } |
57 | | |
58 | | /// Latch is not set, owning thread is awake |
59 | | const UNSET: usize = 0; |
60 | | |
61 | | /// Latch is not set, owning thread is going to sleep on this latch |
62 | | /// (but has not yet fallen asleep). |
63 | | const SLEEPY: usize = 1; |
64 | | |
65 | | /// Latch is not set, owning thread is asleep on this latch and |
66 | | /// must be awoken. |
67 | | const SLEEPING: usize = 2; |
68 | | |
69 | | /// Latch is set. |
70 | | const SET: usize = 3; |
71 | | |
72 | | /// Spin latches are the simplest, most efficient kind, but they do |
73 | | /// not support a `wait()` operation. They just have a boolean flag |
74 | | /// that becomes true when `set()` is called. |
75 | | #[derive(Debug)] |
76 | | pub(super) struct CoreLatch { |
77 | | state: AtomicUsize, |
78 | | } |
79 | | |
80 | | impl CoreLatch { |
81 | | #[inline] |
82 | 574k | fn new() -> Self { |
83 | 574k | Self { |
84 | 574k | state: AtomicUsize::new(0), |
85 | 574k | } |
86 | 574k | } <rayon_core::latch::CoreLatch>::new Line | Count | Source | 82 | 141k | fn new() -> Self { | 83 | 141k | Self { | 84 | 141k | state: AtomicUsize::new(0), | 85 | 141k | } | 86 | 141k | } |
<rayon_core::latch::CoreLatch>::new Line | Count | Source | 82 | 121k | fn new() -> Self { | 83 | 121k | Self { | 84 | 121k | state: AtomicUsize::new(0), | 85 | 121k | } | 86 | 121k | } |
<rayon_core::latch::CoreLatch>::new Line | Count | Source | 82 | 96 | fn new() -> Self { | 83 | 96 | Self { | 84 | 96 | state: AtomicUsize::new(0), | 85 | 96 | } | 86 | 96 | } |
<rayon_core::latch::CoreLatch>::new Line | Count | Source | 82 | 189k | fn new() -> Self { | 83 | 189k | Self { | 84 | 189k | state: AtomicUsize::new(0), | 85 | 189k | } | 86 | 189k | } |
Unexecuted instantiation: <rayon_core::latch::CoreLatch>::new <rayon_core::latch::CoreLatch>::new Line | Count | Source | 82 | 121k | fn new() -> Self { | 83 | 121k | Self { | 84 | 121k | state: AtomicUsize::new(0), | 85 | 121k | } | 86 | 121k | } |
|
87 | | |
88 | | /// Invoked by owning thread as it prepares to sleep. Returns true |
89 | | /// if the owning thread may proceed to fall asleep, false if the |
90 | | /// latch was set in the meantime. |
91 | | #[inline] |
92 | 634k | pub(super) fn get_sleepy(&self) -> bool { |
93 | 634k | self.state |
94 | 634k | .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) |
95 | 634k | .is_ok() |
96 | 634k | } |
97 | | |
98 | | /// Invoked by owning thread as it falls asleep sleep. Returns |
99 | | /// true if the owning thread should block, or false if the latch |
100 | | /// was set in the meantime. |
101 | | #[inline] |
102 | 633k | pub(super) fn fall_asleep(&self) -> bool { |
103 | 633k | self.state |
104 | 633k | .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) |
105 | 633k | .is_ok() |
106 | 633k | } |
107 | | |
108 | | /// Invoked by owning thread as it falls asleep sleep. Returns |
109 | | /// true if the owning thread should block, or false if the latch |
110 | | /// was set in the meantime. |
111 | | #[inline] |
112 | 633k | pub(super) fn wake_up(&self) { |
113 | 633k | if !self.probe() { |
114 | 614k | let _ = |
115 | 614k | self.state |
116 | 614k | .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); |
117 | 614k | } |
118 | 633k | } |
119 | | |
120 | | /// Set the latch. If this returns true, the owning thread was sleeping |
121 | | /// and must be awoken. |
122 | | /// |
123 | | /// This is private because, typically, setting a latch involves |
124 | | /// doing some wakeups; those are encapsulated in the surrounding |
125 | | /// latch code. |
126 | | #[inline] |
127 | 504k | unsafe fn set(this: *const Self) -> bool { |
128 | 504k | let old_state = (*this).state.swap(SET, Ordering::AcqRel); |
129 | 504k | old_state == SLEEPING |
130 | 504k | } <rayon_core::latch::CoreLatch>::set Line | Count | Source | 127 | 124k | unsafe fn set(this: *const Self) -> bool { | 128 | 124k | let old_state = (*this).state.swap(SET, Ordering::AcqRel); | 129 | 124k | old_state == SLEEPING | 130 | 124k | } |
<rayon_core::latch::CoreLatch>::set Line | Count | Source | 127 | 108k | unsafe fn set(this: *const Self) -> bool { | 128 | 108k | let old_state = (*this).state.swap(SET, Ordering::AcqRel); | 129 | 108k | old_state == SLEEPING | 130 | 108k | } |
Unexecuted instantiation: <rayon_core::latch::CoreLatch>::set <rayon_core::latch::CoreLatch>::set Line | Count | Source | 127 | 164k | unsafe fn set(this: *const Self) -> bool { | 128 | 164k | let old_state = (*this).state.swap(SET, Ordering::AcqRel); | 129 | 164k | old_state == SLEEPING | 130 | 164k | } |
Unexecuted instantiation: <rayon_core::latch::CoreLatch>::set <rayon_core::latch::CoreLatch>::set Line | Count | Source | 127 | 106k | unsafe fn set(this: *const Self) -> bool { | 128 | 106k | let old_state = (*this).state.swap(SET, Ordering::AcqRel); | 129 | 106k | old_state == SLEEPING | 130 | 106k | } |
|
131 | | |
132 | | /// Test if this latch has been set. |
133 | | #[inline] |
134 | 27.1M | pub(super) fn probe(&self) -> bool { |
135 | 27.1M | self.state.load(Ordering::Acquire) == SET |
136 | 27.1M | } <rayon_core::latch::CoreLatch>::probe Line | Count | Source | 134 | 221k | pub(super) fn probe(&self) -> bool { | 135 | 221k | self.state.load(Ordering::Acquire) == SET | 136 | 221k | } |
<rayon_core::latch::CoreLatch>::probe Line | Count | Source | 134 | 189k | pub(super) fn probe(&self) -> bool { | 135 | 189k | self.state.load(Ordering::Acquire) == SET | 136 | 189k | } |
<rayon_core::latch::CoreLatch>::probe Line | Count | Source | 134 | 26.2M | pub(super) fn probe(&self) -> bool { | 135 | 26.2M | self.state.load(Ordering::Acquire) == SET | 136 | 26.2M | } |
<rayon_core::latch::CoreLatch>::probe Line | Count | Source | 134 | 299k | pub(super) fn probe(&self) -> bool { | 135 | 299k | self.state.load(Ordering::Acquire) == SET | 136 | 299k | } |
Unexecuted instantiation: <rayon_core::latch::CoreLatch>::probe <rayon_core::latch::CoreLatch>::probe Line | Count | Source | 134 | 196k | pub(super) fn probe(&self) -> bool { | 135 | 196k | self.state.load(Ordering::Acquire) == SET | 136 | 196k | } |
|
137 | | } |
138 | | |
139 | | impl AsCoreLatch for CoreLatch { |
140 | | #[inline] |
141 | 0 | fn as_core_latch(&self) -> &CoreLatch { |
142 | 0 | self |
143 | 0 | } |
144 | | } |
145 | | |
146 | | /// Spin latches are the simplest, most efficient kind, but they do |
147 | | /// not support a `wait()` operation. They just have a boolean flag |
148 | | /// that becomes true when `set()` is called. |
149 | | pub(super) struct SpinLatch<'r> { |
150 | | core_latch: CoreLatch, |
151 | | registry: &'r Arc<Registry>, |
152 | | target_worker_index: usize, |
153 | | cross: bool, |
154 | | } |
155 | | |
156 | | impl<'r> SpinLatch<'r> { |
157 | | /// Creates a new spin latch that is owned by `thread`. This means |
158 | | /// that `thread` is the only thread that should be blocking on |
159 | | /// this latch -- it also means that when the latch is set, we |
160 | | /// will wake `thread` if it is sleeping. |
161 | | #[inline] |
162 | 574k | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { |
163 | 574k | SpinLatch { |
164 | 574k | core_latch: CoreLatch::new(), |
165 | 574k | registry: thread.registry(), |
166 | 574k | target_worker_index: thread.index(), |
167 | 574k | cross: false, |
168 | 574k | } |
169 | 574k | } <rayon_core::latch::SpinLatch>::new Line | Count | Source | 162 | 141k | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { | 163 | 141k | SpinLatch { | 164 | 141k | core_latch: CoreLatch::new(), | 165 | 141k | registry: thread.registry(), | 166 | 141k | target_worker_index: thread.index(), | 167 | 141k | cross: false, | 168 | 141k | } | 169 | 141k | } |
<rayon_core::latch::SpinLatch>::new Line | Count | Source | 162 | 121k | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { | 163 | 121k | SpinLatch { | 164 | 121k | core_latch: CoreLatch::new(), | 165 | 121k | registry: thread.registry(), | 166 | 121k | target_worker_index: thread.index(), | 167 | 121k | cross: false, | 168 | 121k | } | 169 | 121k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch>::new <rayon_core::latch::SpinLatch>::new Line | Count | Source | 162 | 189k | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { | 163 | 189k | SpinLatch { | 164 | 189k | core_latch: CoreLatch::new(), | 165 | 189k | registry: thread.registry(), | 166 | 189k | target_worker_index: thread.index(), | 167 | 189k | cross: false, | 168 | 189k | } | 169 | 189k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch>::new <rayon_core::latch::SpinLatch>::new Line | Count | Source | 162 | 121k | pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { | 163 | 121k | SpinLatch { | 164 | 121k | core_latch: CoreLatch::new(), | 165 | 121k | registry: thread.registry(), | 166 | 121k | target_worker_index: thread.index(), | 167 | 121k | cross: false, | 168 | 121k | } | 169 | 121k | } |
|
170 | | |
171 | | /// Creates a new spin latch for cross-threadpool blocking. Notably, we |
172 | | /// need to make sure the registry is kept alive after setting, so we can |
173 | | /// safely call the notification. |
174 | | #[inline] |
175 | 0 | pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { |
176 | 0 | SpinLatch { |
177 | 0 | cross: true, |
178 | 0 | ..SpinLatch::new(thread) |
179 | 0 | } |
180 | 0 | } Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross Unexecuted instantiation: <rayon_core::latch::SpinLatch>::cross |
181 | | |
182 | | #[inline] |
183 | 574k | pub(super) fn probe(&self) -> bool { |
184 | 574k | self.core_latch.probe() |
185 | 574k | } <rayon_core::latch::SpinLatch>::probe Line | Count | Source | 183 | 141k | pub(super) fn probe(&self) -> bool { | 184 | 141k | self.core_latch.probe() | 185 | 141k | } |
<rayon_core::latch::SpinLatch>::probe Line | Count | Source | 183 | 121k | pub(super) fn probe(&self) -> bool { | 184 | 121k | self.core_latch.probe() | 185 | 121k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch>::probe <rayon_core::latch::SpinLatch>::probe Line | Count | Source | 183 | 189k | pub(super) fn probe(&self) -> bool { | 184 | 189k | self.core_latch.probe() | 185 | 189k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch>::probe <rayon_core::latch::SpinLatch>::probe Line | Count | Source | 183 | 121k | pub(super) fn probe(&self) -> bool { | 184 | 121k | self.core_latch.probe() | 185 | 121k | } |
|
186 | | } |
187 | | |
188 | | impl<'r> AsCoreLatch for SpinLatch<'r> { |
189 | | #[inline] |
190 | 332k | fn as_core_latch(&self) -> &CoreLatch { |
191 | 332k | &self.core_latch |
192 | 332k | } <rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch Line | Count | Source | 190 | 80.6k | fn as_core_latch(&self) -> &CoreLatch { | 191 | 80.6k | &self.core_latch | 192 | 80.6k | } |
<rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch Line | Count | Source | 190 | 67.9k | fn as_core_latch(&self) -> &CoreLatch { | 191 | 67.9k | &self.core_latch | 192 | 67.9k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch <rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch Line | Count | Source | 190 | 109k | fn as_core_latch(&self) -> &CoreLatch { | 191 | 109k | &self.core_latch | 192 | 109k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch <rayon_core::latch::SpinLatch as rayon_core::latch::AsCoreLatch>::as_core_latch Line | Count | Source | 190 | 74.7k | fn as_core_latch(&self) -> &CoreLatch { | 191 | 74.7k | &self.core_latch | 192 | 74.7k | } |
|
193 | | } |
194 | | |
195 | | impl<'r> Latch for SpinLatch<'r> { |
196 | | #[inline] |
197 | 504k | unsafe fn set(this: *const Self) { |
198 | | let cross_registry; |
199 | | |
200 | 504k | let registry: &Registry = if (*this).cross { |
201 | | // Ensure the registry stays alive while we notify it. |
202 | | // Otherwise, it would be possible that we set the spin |
203 | | // latch and the other thread sees it and exits, causing |
204 | | // the registry to be deallocated, all before we get a |
205 | | // chance to invoke `registry.notify_worker_latch_is_set`. |
206 | 0 | cross_registry = Arc::clone((*this).registry); |
207 | 0 | &cross_registry |
208 | | } else { |
209 | | // If this is not a "cross-registry" spin-latch, then the |
210 | | // thread which is performing `set` is itself ensuring |
211 | | // that the registry stays alive. However, that doesn't |
212 | | // include this *particular* `Arc` handle if the waiting |
213 | | // thread then exits, so we must completely dereference it. |
214 | 504k | (*this).registry |
215 | | }; |
216 | 504k | let target_worker_index = (*this).target_worker_index; |
217 | 504k | |
218 | 504k | // NOTE: Once we `set`, the target may proceed and invalidate `this`! |
219 | 504k | if CoreLatch::set(&(*this).core_latch) { |
220 | 18.8k | // Subtle: at this point, we can no longer read from |
221 | 18.8k | // `self`, because the thread owning this spin latch may |
222 | 18.8k | // have awoken and deallocated the latch. Therefore, we |
223 | 18.8k | // only use fields whose values we already read. |
224 | 18.8k | registry.notify_worker_latch_is_set(target_worker_index); |
225 | 485k | } |
226 | 504k | } <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set Line | Count | Source | 197 | 124k | unsafe fn set(this: *const Self) { | 198 | | let cross_registry; | 199 | | | 200 | 124k | let registry: &Registry = if (*this).cross { | 201 | | // Ensure the registry stays alive while we notify it. | 202 | | // Otherwise, it would be possible that we set the spin | 203 | | // latch and the other thread sees it and exits, causing | 204 | | // the registry to be deallocated, all before we get a | 205 | | // chance to invoke `registry.notify_worker_latch_is_set`. | 206 | 0 | cross_registry = Arc::clone((*this).registry); | 207 | 0 | &cross_registry | 208 | | } else { | 209 | | // If this is not a "cross-registry" spin-latch, then the | 210 | | // thread which is performing `set` is itself ensuring | 211 | | // that the registry stays alive. However, that doesn't | 212 | | // include this *particular* `Arc` handle if the waiting | 213 | | // thread then exits, so we must completely dereference it. | 214 | 124k | (*this).registry | 215 | | }; | 216 | 124k | let target_worker_index = (*this).target_worker_index; | 217 | 124k | | 218 | 124k | // NOTE: Once we `set`, the target may proceed and invalidate `this`! | 219 | 124k | if CoreLatch::set(&(*this).core_latch) { | 220 | 10.2k | // Subtle: at this point, we can no longer read from | 221 | 10.2k | // `self`, because the thread owning this spin latch may | 222 | 10.2k | // have awoken and deallocated the latch. Therefore, we | 223 | 10.2k | // only use fields whose values we already read. | 224 | 10.2k | registry.notify_worker_latch_is_set(target_worker_index); | 225 | 114k | } | 226 | 124k | } |
<rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set Line | Count | Source | 197 | 108k | unsafe fn set(this: *const Self) { | 198 | | let cross_registry; | 199 | | | 200 | 108k | let registry: &Registry = if (*this).cross { | 201 | | // Ensure the registry stays alive while we notify it. | 202 | | // Otherwise, it would be possible that we set the spin | 203 | | // latch and the other thread sees it and exits, causing | 204 | | // the registry to be deallocated, all before we get a | 205 | | // chance to invoke `registry.notify_worker_latch_is_set`. | 206 | 0 | cross_registry = Arc::clone((*this).registry); | 207 | 0 | &cross_registry | 208 | | } else { | 209 | | // If this is not a "cross-registry" spin-latch, then the | 210 | | // thread which is performing `set` is itself ensuring | 211 | | // that the registry stays alive. However, that doesn't | 212 | | // include this *particular* `Arc` handle if the waiting | 213 | | // thread then exits, so we must completely dereference it. | 214 | 108k | (*this).registry | 215 | | }; | 216 | 108k | let target_worker_index = (*this).target_worker_index; | 217 | 108k | | 218 | 108k | // NOTE: Once we `set`, the target may proceed and invalidate `this`! | 219 | 108k | if CoreLatch::set(&(*this).core_latch) { | 220 | 6.75k | // Subtle: at this point, we can no longer read from | 221 | 6.75k | // `self`, because the thread owning this spin latch may | 222 | 6.75k | // have awoken and deallocated the latch. Therefore, we | 223 | 6.75k | // only use fields whose values we already read. | 224 | 6.75k | registry.notify_worker_latch_is_set(target_worker_index); | 225 | 102k | } | 226 | 108k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set Line | Count | Source | 197 | 164k | unsafe fn set(this: *const Self) { | 198 | | let cross_registry; | 199 | | | 200 | 164k | let registry: &Registry = if (*this).cross { | 201 | | // Ensure the registry stays alive while we notify it. | 202 | | // Otherwise, it would be possible that we set the spin | 203 | | // latch and the other thread sees it and exits, causing | 204 | | // the registry to be deallocated, all before we get a | 205 | | // chance to invoke `registry.notify_worker_latch_is_set`. | 206 | 0 | cross_registry = Arc::clone((*this).registry); | 207 | 0 | &cross_registry | 208 | | } else { | 209 | | // If this is not a "cross-registry" spin-latch, then the | 210 | | // thread which is performing `set` is itself ensuring | 211 | | // that the registry stays alive. However, that doesn't | 212 | | // include this *particular* `Arc` handle if the waiting | 213 | | // thread then exits, so we must completely dereference it. | 214 | 164k | (*this).registry | 215 | | }; | 216 | 164k | let target_worker_index = (*this).target_worker_index; | 217 | 164k | | 218 | 164k | // NOTE: Once we `set`, the target may proceed and invalidate `this`! | 219 | 164k | if CoreLatch::set(&(*this).core_latch) { | 220 | 1.05k | // Subtle: at this point, we can no longer read from | 221 | 1.05k | // `self`, because the thread owning this spin latch may | 222 | 1.05k | // have awoken and deallocated the latch. Therefore, we | 223 | 1.05k | // only use fields whose values we already read. | 224 | 1.05k | registry.notify_worker_latch_is_set(target_worker_index); | 225 | 163k | } | 226 | 164k | } |
Unexecuted instantiation: <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set Line | Count | Source | 197 | 106k | unsafe fn set(this: *const Self) { | 198 | | let cross_registry; | 199 | | | 200 | 106k | let registry: &Registry = if (*this).cross { | 201 | | // Ensure the registry stays alive while we notify it. | 202 | | // Otherwise, it would be possible that we set the spin | 203 | | // latch and the other thread sees it and exits, causing | 204 | | // the registry to be deallocated, all before we get a | 205 | | // chance to invoke `registry.notify_worker_latch_is_set`. | 206 | 0 | cross_registry = Arc::clone((*this).registry); | 207 | 0 | &cross_registry | 208 | | } else { | 209 | | // If this is not a "cross-registry" spin-latch, then the | 210 | | // thread which is performing `set` is itself ensuring | 211 | | // that the registry stays alive. However, that doesn't | 212 | | // include this *particular* `Arc` handle if the waiting | 213 | | // thread then exits, so we must completely dereference it. | 214 | 106k | (*this).registry | 215 | | }; | 216 | 106k | let target_worker_index = (*this).target_worker_index; | 217 | 106k | | 218 | 106k | // NOTE: Once we `set`, the target may proceed and invalidate `this`! | 219 | 106k | if CoreLatch::set(&(*this).core_latch) { | 220 | 822 | // Subtle: at this point, we can no longer read from | 221 | 822 | // `self`, because the thread owning this spin latch may | 222 | 822 | // have awoken and deallocated the latch. Therefore, we | 223 | 822 | // only use fields whose values we already read. | 224 | 822 | registry.notify_worker_latch_is_set(target_worker_index); | 225 | 105k | } | 226 | 106k | } |
|
227 | | } |
228 | | |
229 | | /// A Latch starts as false and eventually becomes true. You can block |
230 | | /// until it becomes true. |
231 | | #[derive(Debug)] |
232 | | pub(super) struct LockLatch { |
233 | | m: Mutex<bool>, |
234 | | v: Condvar, |
235 | | } |
236 | | |
237 | | impl LockLatch { |
238 | | #[inline] |
239 | 195 | pub(super) fn new() -> LockLatch { |
240 | 195 | LockLatch { |
241 | 195 | m: Mutex::new(false), |
242 | 195 | v: Condvar::new(), |
243 | 195 | } |
244 | 195 | } <rayon_core::latch::LockLatch>::new Line | Count | Source | 239 | 1 | pub(super) fn new() -> LockLatch { | 240 | 1 | LockLatch { | 241 | 1 | m: Mutex::new(false), | 242 | 1 | v: Condvar::new(), | 243 | 1 | } | 244 | 1 | } |
<rayon_core::latch::LockLatch>::new Line | Count | Source | 239 | 1 | pub(super) fn new() -> LockLatch { | 240 | 1 | LockLatch { | 241 | 1 | m: Mutex::new(false), | 242 | 1 | v: Condvar::new(), | 243 | 1 | } | 244 | 1 | } |
<rayon_core::latch::LockLatch>::new Line | Count | Source | 239 | 192 | pub(super) fn new() -> LockLatch { | 240 | 192 | LockLatch { | 241 | 192 | m: Mutex::new(false), | 242 | 192 | v: Condvar::new(), | 243 | 192 | } | 244 | 192 | } |
<rayon_core::latch::LockLatch>::new Line | Count | Source | 239 | 1 | pub(super) fn new() -> LockLatch { | 240 | 1 | LockLatch { | 241 | 1 | m: Mutex::new(false), | 242 | 1 | v: Condvar::new(), | 243 | 1 | } | 244 | 1 | } |
Unexecuted instantiation: <rayon_core::latch::LockLatch>::new Unexecuted instantiation: <rayon_core::latch::LockLatch>::new |
245 | | |
246 | | /// Block until latch is set, then resets this lock latch so it can be reused again. |
247 | 33.4k | pub(super) fn wait_and_reset(&self) { |
248 | 33.4k | let mut guard = self.m.lock().unwrap(); |
249 | 66.6k | while !*guard { |
250 | 33.2k | guard = self.v.wait(guard).unwrap(); |
251 | 33.2k | } |
252 | 33.4k | *guard = false; |
253 | 33.4k | } |
254 | | |
255 | | /// Block until latch is set. |
256 | 0 | pub(super) fn wait(&self) { |
257 | 0 | let mut guard = self.m.lock().unwrap(); |
258 | 0 | while !*guard { |
259 | 0 | guard = self.v.wait(guard).unwrap(); |
260 | 0 | } |
261 | 0 | } |
262 | | } |
263 | | |
264 | | impl Latch for LockLatch { |
265 | | #[inline] |
266 | 33.5k | unsafe fn set(this: *const Self) { |
267 | 33.5k | let mut guard = (*this).m.lock().unwrap(); |
268 | 33.5k | *guard = true; |
269 | 33.5k | (*this).v.notify_all(); |
270 | 33.5k | } <rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set Line | Count | Source | 266 | 9.00k | unsafe fn set(this: *const Self) { | 267 | 9.00k | let mut guard = (*this).m.lock().unwrap(); | 268 | 9.00k | *guard = true; | 269 | 9.00k | (*this).v.notify_all(); | 270 | 9.00k | } |
<rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set Line | Count | Source | 266 | 8.70k | unsafe fn set(this: *const Self) { | 267 | 8.70k | let mut guard = (*this).m.lock().unwrap(); | 268 | 8.70k | *guard = true; | 269 | 8.70k | (*this).v.notify_all(); | 270 | 8.70k | } |
<rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set Line | Count | Source | 266 | 96 | unsafe fn set(this: *const Self) { | 267 | 96 | let mut guard = (*this).m.lock().unwrap(); | 268 | 96 | *guard = true; | 269 | 96 | (*this).v.notify_all(); | 270 | 96 | } |
<rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set Line | Count | Source | 266 | 7.08k | unsafe fn set(this: *const Self) { | 267 | 7.08k | let mut guard = (*this).m.lock().unwrap(); | 268 | 7.08k | *guard = true; | 269 | 7.08k | (*this).v.notify_all(); | 270 | 7.08k | } |
Unexecuted instantiation: <rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set <rayon_core::latch::LockLatch as rayon_core::latch::Latch>::set Line | Count | Source | 266 | 8.70k | unsafe fn set(this: *const Self) { | 267 | 8.70k | let mut guard = (*this).m.lock().unwrap(); | 268 | 8.70k | *guard = true; | 269 | 8.70k | (*this).v.notify_all(); | 270 | 8.70k | } |
|
271 | | } |
272 | | |
273 | | /// Once latches are used to implement one-time blocking, primarily |
274 | | /// for the termination flag of the threads in the pool. |
275 | | /// |
276 | | /// Note: like a `SpinLatch`, once-latches are always associated with |
277 | | /// some registry that is probing them, which must be tickled when |
278 | | /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a |
279 | | /// reference to that registry. This is because in some cases the |
280 | | /// registry owns the once-latch, and that would create a cycle. So a |
281 | | /// `OnceLatch` must be given a reference to its owning registry when |
282 | | /// it is set. For this reason, it does not implement the `Latch` |
283 | | /// trait (but it doesn't have to, as it is not used in those generic |
284 | | /// contexts). |
285 | | #[derive(Debug)] |
286 | | pub(super) struct OnceLatch { |
287 | | core_latch: CoreLatch, |
288 | | } |
289 | | |
290 | | impl OnceLatch { |
291 | | #[inline] |
292 | 96 | pub(super) fn new() -> OnceLatch { |
293 | 96 | Self { |
294 | 96 | core_latch: CoreLatch::new(), |
295 | 96 | } |
296 | 96 | } |
297 | | |
298 | | /// Set the latch, then tickle the specific worker thread, |
299 | | /// which should be the one that owns this latch. |
300 | | #[inline] |
301 | 0 | pub(super) unsafe fn set_and_tickle_one( |
302 | 0 | this: *const Self, |
303 | 0 | registry: &Registry, |
304 | 0 | target_worker_index: usize, |
305 | 0 | ) { |
306 | 0 | if CoreLatch::set(&(*this).core_latch) { |
307 | 0 | registry.notify_worker_latch_is_set(target_worker_index); |
308 | 0 | } |
309 | 0 | } |
310 | | } |
311 | | |
312 | | impl AsCoreLatch for OnceLatch { |
313 | | #[inline] |
314 | 96 | fn as_core_latch(&self) -> &CoreLatch { |
315 | 96 | &self.core_latch |
316 | 96 | } |
317 | | } |
318 | | |
319 | | /// Counting latches are used to implement scopes. They track a |
320 | | /// counter. Unlike other latches, calling `set()` does not |
321 | | /// necessarily make the latch be considered `set()`; instead, it just |
322 | | /// decrements the counter. The latch is only "set" (in the sense that |
323 | | /// `probe()` returns true) once the counter reaches zero. |
324 | | #[derive(Debug)] |
325 | | pub(super) struct CountLatch { |
326 | | counter: AtomicUsize, |
327 | | kind: CountLatchKind, |
328 | | } |
329 | | |
330 | | enum CountLatchKind { |
331 | | /// A latch for scopes created on a rayon thread which will participate in work- |
332 | | /// stealing while it waits for completion. This thread is not necessarily part |
333 | | /// of the same registry as the scope itself! |
334 | | Stealing { |
335 | | latch: CoreLatch, |
336 | | /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool |
337 | | /// with registry B, when a job completes in a thread of registry B, we may |
338 | | /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. |
339 | | /// That means we need a reference to registry A (since at that point we will |
340 | | /// only have a reference to registry B), so we stash it here. |
341 | | registry: Arc<Registry>, |
342 | | /// The index of the worker to wake in `registry` |
343 | | worker_index: usize, |
344 | | }, |
345 | | |
346 | | /// A latch for scopes created on a non-rayon thread which will block to wait. |
347 | | Blocking { latch: LockLatch }, |
348 | | } |
349 | | |
350 | | impl std::fmt::Debug for CountLatchKind { |
351 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
352 | 0 | match self { |
353 | 0 | CountLatchKind::Stealing { latch, .. } => { |
354 | 0 | f.debug_tuple("Stealing").field(latch).finish() |
355 | | } |
356 | 0 | CountLatchKind::Blocking { latch, .. } => { |
357 | 0 | f.debug_tuple("Blocking").field(latch).finish() |
358 | | } |
359 | | } |
360 | 0 | } |
361 | | } |
362 | | |
363 | | impl CountLatch { |
364 | 0 | pub(super) fn new(owner: Option<&WorkerThread>) -> Self { |
365 | 0 | Self::with_count(1, owner) |
366 | 0 | } |
367 | | |
368 | 0 | pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { |
369 | 0 | Self { |
370 | 0 | counter: AtomicUsize::new(count), |
371 | 0 | kind: match owner { |
372 | 0 | Some(owner) => CountLatchKind::Stealing { |
373 | 0 | latch: CoreLatch::new(), |
374 | 0 | registry: Arc::clone(owner.registry()), |
375 | 0 | worker_index: owner.index(), |
376 | 0 | }, |
377 | 0 | None => CountLatchKind::Blocking { |
378 | 0 | latch: LockLatch::new(), |
379 | 0 | }, |
380 | | }, |
381 | | } |
382 | 0 | } |
383 | | |
384 | | #[inline] |
385 | 0 | pub(super) fn increment(&self) { |
386 | 0 | let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); |
387 | 0 | debug_assert!(old_counter != 0); |
388 | 0 | } |
389 | | |
390 | 0 | pub(super) fn wait(&self, owner: Option<&WorkerThread>) { |
391 | 0 | match &self.kind { |
392 | | CountLatchKind::Stealing { |
393 | 0 | latch, |
394 | 0 | registry, |
395 | 0 | worker_index, |
396 | 0 | } => unsafe { |
397 | 0 | let owner = owner.expect("owner thread"); |
398 | 0 | debug_assert_eq!(registry.id(), owner.registry().id()); |
399 | 0 | debug_assert_eq!(*worker_index, owner.index()); |
400 | 0 | owner.wait_until(latch); |
401 | | }, |
402 | 0 | CountLatchKind::Blocking { latch } => latch.wait(), |
403 | | } |
404 | 0 | } |
405 | | } |
406 | | |
407 | | impl Latch for CountLatch { |
408 | | #[inline] |
409 | 0 | unsafe fn set(this: *const Self) { |
410 | 0 | if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { |
411 | | // NOTE: Once we call `set` on the internal `latch`, |
412 | | // the target may proceed and invalidate `this`! |
413 | 0 | match (*this).kind { |
414 | | CountLatchKind::Stealing { |
415 | 0 | ref latch, |
416 | 0 | ref registry, |
417 | 0 | worker_index, |
418 | 0 | } => { |
419 | 0 | let registry = Arc::clone(registry); |
420 | 0 | if CoreLatch::set(latch) { |
421 | 0 | registry.notify_worker_latch_is_set(worker_index); |
422 | 0 | } |
423 | | } |
424 | 0 | CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), |
425 | | } |
426 | 0 | } |
427 | 0 | } |
428 | | } |
429 | | |
430 | | /// `&L` without any implication of `dereferenceable` for `Latch::set` |
431 | | pub(super) struct LatchRef<'a, L> { |
432 | | inner: *const L, |
433 | | marker: PhantomData<&'a L>, |
434 | | } |
435 | | |
436 | | impl<L> LatchRef<'_, L> { |
437 | 33.4k | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { |
438 | 33.4k | LatchRef { |
439 | 33.4k | inner, |
440 | 33.4k | marker: PhantomData, |
441 | 33.4k | } |
442 | 33.4k | } <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch>>::new Line | Count | Source | 437 | 9.00k | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { | 438 | 9.00k | LatchRef { | 439 | 9.00k | inner, | 440 | 9.00k | marker: PhantomData, | 441 | 9.00k | } | 442 | 9.00k | } |
<rayon_core::latch::LatchRef<rayon_core::latch::LockLatch>>::new Line | Count | Source | 437 | 8.70k | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { | 438 | 8.70k | LatchRef { | 439 | 8.70k | inner, | 440 | 8.70k | marker: PhantomData, | 441 | 8.70k | } | 442 | 8.70k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<_>>::new <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch>>::new Line | Count | Source | 437 | 7.08k | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { | 438 | 7.08k | LatchRef { | 439 | 7.08k | inner, | 440 | 7.08k | marker: PhantomData, | 441 | 7.08k | } | 442 | 7.08k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch>>::new <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch>>::new Line | Count | Source | 437 | 8.70k | pub(super) fn new(inner: &L) -> LatchRef<'_, L> { | 438 | 8.70k | LatchRef { | 439 | 8.70k | inner, | 440 | 8.70k | marker: PhantomData, | 441 | 8.70k | } | 442 | 8.70k | } |
|
443 | | } |
444 | | |
445 | | unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} |
446 | | |
447 | | impl<L> Deref for LatchRef<'_, L> { |
448 | | type Target = L; |
449 | | |
450 | 33.4k | fn deref(&self) -> &L { |
451 | 33.4k | // SAFETY: if we have &self, the inner latch is still alive |
452 | 33.4k | unsafe { &*self.inner } |
453 | 33.4k | } <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as core::ops::deref::Deref>::deref Line | Count | Source | 450 | 9.00k | fn deref(&self) -> &L { | 451 | 9.00k | // SAFETY: if we have &self, the inner latch is still alive | 452 | 9.00k | unsafe { &*self.inner } | 453 | 9.00k | } |
<rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as core::ops::deref::Deref>::deref Line | Count | Source | 450 | 8.70k | fn deref(&self) -> &L { | 451 | 8.70k | // SAFETY: if we have &self, the inner latch is still alive | 452 | 8.70k | unsafe { &*self.inner } | 453 | 8.70k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<_> as core::ops::deref::Deref>::deref <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as core::ops::deref::Deref>::deref Line | Count | Source | 450 | 7.08k | fn deref(&self) -> &L { | 451 | 7.08k | // SAFETY: if we have &self, the inner latch is still alive | 452 | 7.08k | unsafe { &*self.inner } | 453 | 7.08k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as core::ops::deref::Deref>::deref <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as core::ops::deref::Deref>::deref Line | Count | Source | 450 | 8.70k | fn deref(&self) -> &L { | 451 | 8.70k | // SAFETY: if we have &self, the inner latch is still alive | 452 | 8.70k | unsafe { &*self.inner } | 453 | 8.70k | } |
|
454 | | } |
455 | | |
456 | | impl<L: Latch> Latch for LatchRef<'_, L> { |
457 | | #[inline] |
458 | 33.4k | unsafe fn set(this: *const Self) { |
459 | 33.4k | L::set((*this).inner); |
460 | 33.4k | } <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as rayon_core::latch::Latch>::set Line | Count | Source | 458 | 9.00k | unsafe fn set(this: *const Self) { | 459 | 9.00k | L::set((*this).inner); | 460 | 9.00k | } |
<rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as rayon_core::latch::Latch>::set Line | Count | Source | 458 | 8.70k | unsafe fn set(this: *const Self) { | 459 | 8.70k | L::set((*this).inner); | 460 | 8.70k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<_> as rayon_core::latch::Latch>::set <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as rayon_core::latch::Latch>::set Line | Count | Source | 458 | 7.08k | unsafe fn set(this: *const Self) { | 459 | 7.08k | L::set((*this).inner); | 460 | 7.08k | } |
Unexecuted instantiation: <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as rayon_core::latch::Latch>::set <rayon_core::latch::LatchRef<rayon_core::latch::LockLatch> as rayon_core::latch::Latch>::set Line | Count | Source | 458 | 8.70k | unsafe fn set(this: *const Self) { | 459 | 8.70k | L::set((*this).inner); | 460 | 8.70k | } |
|
461 | | } |