/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.4/src/waker.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Waking mechanism for threads blocked on channel operations. |
2 | | |
3 | | use std::ptr; |
4 | | use std::sync::atomic::{AtomicBool, Ordering}; |
5 | | use std::thread::{self, ThreadId}; |
6 | | |
7 | | use crate::context::Context; |
8 | | use crate::select::{Operation, Selected}; |
9 | | use crate::utils::Spinlock; |
10 | | |
11 | | /// Represents a thread blocked on a specific channel operation. |
12 | | pub(crate) struct Entry { |
13 | | /// The operation. |
14 | | pub(crate) oper: Operation, |
15 | | |
16 | | /// Optional packet. |
17 | | pub(crate) packet: *mut (), |
18 | | |
19 | | /// Context associated with the thread owning this operation. |
20 | | pub(crate) cx: Context, |
21 | | } |
22 | | |
23 | | /// A queue of threads blocked on channel operations. |
24 | | /// |
25 | | /// This data structure is used by threads to register blocking operations and get woken up once |
26 | | /// an operation becomes ready. |
27 | | pub(crate) struct Waker { |
28 | | /// A list of select operations. |
29 | | selectors: Vec<Entry>, |
30 | | |
31 | | /// A list of operations waiting to be ready. |
32 | | observers: Vec<Entry>, |
33 | | } |
34 | | |
35 | | impl Waker { |
36 | | /// Creates a new `Waker`. |
37 | | #[inline] |
38 | 0 | pub(crate) fn new() -> Self { |
39 | 0 | Waker { |
40 | 0 | selectors: Vec::new(), |
41 | 0 | observers: Vec::new(), |
42 | 0 | } |
43 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::new Unexecuted instantiation: <crossbeam_channel::waker::Waker>::new |
44 | | |
45 | | /// Registers a select operation. |
46 | | #[inline] |
47 | 0 | pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { |
48 | 0 | self.register_with_packet(oper, ptr::null_mut(), cx); |
49 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register |
50 | | |
51 | | /// Registers a select operation and a packet. |
52 | | #[inline] |
53 | 0 | pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { |
54 | 0 | self.selectors.push(Entry { |
55 | 0 | oper, |
56 | 0 | packet, |
57 | 0 | cx: cx.clone(), |
58 | 0 | }); |
59 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register_with_packet Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register_with_packet |
60 | | |
61 | | /// Unregisters a select operation. |
62 | | #[inline] |
63 | | pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { |
64 | 0 | if let Some((i, _)) = self |
65 | 0 | .selectors |
66 | 0 | .iter() |
67 | 0 | .enumerate() |
68 | 0 | .find(|&(_, entry)| entry.oper == oper) Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0} |
69 | | { |
70 | 0 | let entry = self.selectors.remove(i); |
71 | 0 | Some(entry) |
72 | | } else { |
73 | 0 | None |
74 | | } |
75 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister |
76 | | |
77 | | /// Attempts to find another thread's entry, select the operation, and wake it up. |
78 | | #[inline] |
79 | 0 | pub(crate) fn try_select(&mut self) -> Option<Entry> { |
80 | 0 | if self.selectors.is_empty() { |
81 | 0 | None |
82 | | } else { |
83 | 0 | let thread_id = current_thread_id(); |
84 | 0 |
|
85 | 0 | self.selectors |
86 | 0 | .iter() |
87 | 0 | .position(|selector| { |
88 | 0 | // Does the entry belong to a different thread? |
89 | 0 | selector.cx.thread_id() != thread_id |
90 | 0 | && selector // Try selecting this operation. |
91 | 0 | .cx |
92 | 0 | .try_select(Selected::Operation(selector.oper)) |
93 | 0 | .is_ok() |
94 | | && { |
95 | | // Provide the packet. |
96 | 0 | selector.cx.store_packet(selector.packet); |
97 | 0 | // Wake the thread up. |
98 | 0 | selector.cx.unpark(); |
99 | 0 | true |
100 | | } |
101 | 0 | }) Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0} |
102 | 0 | // Remove the entry from the queue to keep it clean and improve |
103 | 0 | // performance. |
104 | 0 | .map(|pos| self.selectors.remove(pos)) Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1} |
105 | | } |
106 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select |
107 | | |
108 | | /// Returns `true` if there is an entry which can be selected by the current thread. |
109 | | #[inline] |
110 | 0 | pub(crate) fn can_select(&self) -> bool { |
111 | 0 | if self.selectors.is_empty() { |
112 | 0 | false |
113 | | } else { |
114 | 0 | let thread_id = current_thread_id(); |
115 | 0 |
|
116 | 0 | self.selectors.iter().any(|entry| { |
117 | 0 | entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting |
118 | 0 | }) |
119 | | } |
120 | 0 | } |
121 | | |
122 | | /// Registers an operation waiting to be ready. |
123 | | #[inline] |
124 | 0 | pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { |
125 | 0 | self.observers.push(Entry { |
126 | 0 | oper, |
127 | 0 | packet: ptr::null_mut(), |
128 | 0 | cx: cx.clone(), |
129 | 0 | }); |
130 | 0 | } |
131 | | |
132 | | /// Unregisters an operation waiting to be ready. |
133 | | #[inline] |
134 | 0 | pub(crate) fn unwatch(&mut self, oper: Operation) { |
135 | 0 | self.observers.retain(|e| e.oper != oper); |
136 | 0 | } |
137 | | |
138 | | /// Notifies all operations waiting to be ready. |
139 | | #[inline] |
140 | 0 | pub(crate) fn notify(&mut self) { |
141 | 0 | for entry in self.observers.drain(..) { |
142 | 0 | if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { |
143 | 0 | entry.cx.unpark(); |
144 | 0 | } |
145 | | } |
146 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify |
147 | | |
148 | | /// Notifies all registered operations that the channel is disconnected. |
149 | | #[inline] |
150 | 0 | pub(crate) fn disconnect(&mut self) { |
151 | 0 | for entry in self.selectors.iter() { |
152 | 0 | if entry.cx.try_select(Selected::Disconnected).is_ok() { |
153 | 0 | // Wake the thread up. |
154 | 0 | // |
155 | 0 | // Here we don't remove the entry from the queue. Registered threads must |
156 | 0 | // unregister from the waker by themselves. They might also want to recover the |
157 | 0 | // packet value and destroy it, if necessary. |
158 | 0 | entry.cx.unpark(); |
159 | 0 | } |
160 | | } |
161 | | |
162 | 0 | self.notify(); |
163 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect |
164 | | } |
165 | | |
166 | | impl Drop for Waker { |
167 | | #[inline] |
168 | 0 | fn drop(&mut self) { |
169 | 0 | debug_assert_eq!(self.selectors.len(), 0); |
170 | 0 | debug_assert_eq!(self.observers.len(), 0); |
171 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop Unexecuted instantiation: <crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop |
172 | | } |
173 | | |
174 | | /// A waker that can be shared among threads without locking. |
175 | | /// |
176 | | /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. |
177 | | pub(crate) struct SyncWaker { |
178 | | /// The inner `Waker`. |
179 | | inner: Spinlock<Waker>, |
180 | | |
181 | | /// `true` if the waker is empty. |
182 | | is_empty: AtomicBool, |
183 | | } |
184 | | |
185 | | impl SyncWaker { |
186 | | /// Creates a new `SyncWaker`. |
187 | | #[inline] |
188 | 0 | pub(crate) fn new() -> Self { |
189 | 0 | SyncWaker { |
190 | 0 | inner: Spinlock::new(Waker::new()), |
191 | 0 | is_empty: AtomicBool::new(true), |
192 | 0 | } |
193 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::new Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::new |
194 | | |
195 | | /// Registers the current thread with an operation. |
196 | | #[inline] |
197 | 0 | pub(crate) fn register(&self, oper: Operation, cx: &Context) { |
198 | 0 | let mut inner = self.inner.lock(); |
199 | 0 | inner.register(oper, cx); |
200 | 0 | self.is_empty.store( |
201 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
202 | 0 | Ordering::SeqCst, |
203 | 0 | ); |
204 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register |
205 | | |
206 | | /// Unregisters an operation previously registered by the current thread. |
207 | | #[inline] |
208 | 0 | pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { |
209 | 0 | let mut inner = self.inner.lock(); |
210 | 0 | let entry = inner.unregister(oper); |
211 | 0 | self.is_empty.store( |
212 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
213 | 0 | Ordering::SeqCst, |
214 | 0 | ); |
215 | 0 | entry |
216 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister |
217 | | |
218 | | /// Attempts to find one thread (not the current one), select its operation, and wake it up. |
219 | | #[inline] |
220 | 0 | pub(crate) fn notify(&self) { |
221 | 0 | if !self.is_empty.load(Ordering::SeqCst) { |
222 | 0 | let mut inner = self.inner.lock(); |
223 | 0 | if !self.is_empty.load(Ordering::SeqCst) { |
224 | 0 | inner.try_select(); |
225 | 0 | inner.notify(); |
226 | 0 | self.is_empty.store( |
227 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
228 | 0 | Ordering::SeqCst, |
229 | | ); |
230 | 0 | } |
231 | 0 | } |
232 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify |
233 | | |
234 | | /// Registers an operation waiting to be ready. |
235 | | #[inline] |
236 | 0 | pub(crate) fn watch(&self, oper: Operation, cx: &Context) { |
237 | 0 | let mut inner = self.inner.lock(); |
238 | 0 | inner.watch(oper, cx); |
239 | 0 | self.is_empty.store( |
240 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
241 | 0 | Ordering::SeqCst, |
242 | 0 | ); |
243 | 0 | } |
244 | | |
245 | | /// Unregisters an operation waiting to be ready. |
246 | | #[inline] |
247 | 0 | pub(crate) fn unwatch(&self, oper: Operation) { |
248 | 0 | let mut inner = self.inner.lock(); |
249 | 0 | inner.unwatch(oper); |
250 | 0 | self.is_empty.store( |
251 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
252 | 0 | Ordering::SeqCst, |
253 | 0 | ); |
254 | 0 | } |
255 | | |
256 | | /// Notifies all threads that the channel is disconnected. |
257 | | #[inline] |
258 | 0 | pub(crate) fn disconnect(&self) { |
259 | 0 | let mut inner = self.inner.lock(); |
260 | 0 | inner.disconnect(); |
261 | 0 | self.is_empty.store( |
262 | 0 | inner.selectors.is_empty() && inner.observers.is_empty(), |
263 | 0 | Ordering::SeqCst, |
264 | 0 | ); |
265 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect |
266 | | } |
267 | | |
268 | | impl Drop for SyncWaker { |
269 | | #[inline] |
270 | 0 | fn drop(&mut self) { |
271 | 0 | debug_assert!(self.is_empty.load(Ordering::SeqCst)); |
272 | 0 | } Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop |
273 | | } |
274 | | |
275 | | /// Returns the id of the current thread. |
276 | | #[inline] |
277 | 0 | fn current_thread_id() -> ThreadId { |
278 | 0 | thread_local! { |
279 | 0 | /// Cached thread-local id. |
280 | 0 | static THREAD_ID: ThreadId = thread::current().id(); |
281 | 0 | } |
282 | 0 |
|
283 | 0 | THREAD_ID |
284 | 0 | .try_with(|id| *id) Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#0}Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#0} |
285 | 0 | .unwrap_or_else(|_| thread::current().id()) Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1} |
286 | 0 | } Unexecuted instantiation: crossbeam_channel::waker::current_thread_id Unexecuted instantiation: crossbeam_channel::waker::current_thread_id |