/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/signal/registry.rs
Line | Count | Source |
1 | | use crate::signal::os::{OsExtraData, OsStorage}; |
2 | | use crate::sync::watch; |
3 | | |
4 | | use std::ops; |
5 | | use std::sync::atomic::{AtomicBool, Ordering}; |
6 | | use std::sync::OnceLock; |
7 | | |
8 | | pub(crate) type EventId = usize; |
9 | | |
10 | | /// State for a specific event, whether a notification is pending delivery, |
11 | | /// and what listeners are registered. |
12 | | #[derive(Debug)] |
13 | | pub(crate) struct EventInfo { |
14 | | pending: AtomicBool, |
15 | | tx: watch::Sender<()>, |
16 | | } |
17 | | |
18 | | impl Default for EventInfo { |
19 | 128 | fn default() -> Self { |
20 | 128 | let (tx, _rx) = watch::channel(()); |
21 | | |
22 | 128 | Self { |
23 | 128 | pending: AtomicBool::new(false), |
24 | 128 | tx, |
25 | 128 | } |
26 | 128 | } |
27 | | } |
28 | | |
29 | | /// An interface for retrieving the `EventInfo` for a particular `eventId`. |
30 | | pub(crate) trait Storage { |
31 | | /// Gets the `EventInfo` for `id` if it exists. |
32 | | fn event_info(&self, id: EventId) -> Option<&EventInfo>; |
33 | | |
34 | | /// Invokes `f` once for each defined `EventInfo` in this storage. |
35 | | fn for_each<'a, F>(&'a self, f: F) |
36 | | where |
37 | | F: FnMut(&'a EventInfo); |
38 | | } |
39 | | |
40 | | impl Storage for Vec<EventInfo> { |
41 | 0 | fn event_info(&self, id: EventId) -> Option<&EventInfo> { |
42 | 0 | self.get(id) |
43 | 0 | } |
44 | | |
45 | 0 | fn for_each<'a, F>(&'a self, f: F) |
46 | 0 | where |
47 | 0 | F: FnMut(&'a EventInfo), |
48 | | { |
49 | 0 | self.iter().for_each(f); |
50 | 0 | } |
51 | | } |
52 | | |
53 | | /// Manages and distributes event notifications to any registered listeners. |
54 | | /// |
55 | | /// Generic over the underlying storage to allow for domain specific |
56 | | /// optimizations (e.g. `eventIds` may or may not be contiguous). |
57 | | #[derive(Debug)] |
58 | | pub(crate) struct Registry<S> { |
59 | | storage: S, |
60 | | } |
61 | | |
62 | | impl<S> Registry<S> { |
63 | 2 | fn new(storage: S) -> Self { |
64 | 2 | Self { storage } |
65 | 2 | } |
66 | | } |
67 | | |
68 | | impl<S: Storage> Registry<S> { |
69 | | /// Registers a new listener for `event_id`. |
70 | 0 | fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
71 | 0 | self.storage |
72 | 0 | .event_info(event_id) |
73 | 0 | .unwrap_or_else(|| panic!("invalid event_id: {event_id}")) |
74 | | .tx |
75 | 0 | .subscribe() |
76 | 0 | } |
77 | | |
78 | | /// Marks `event_id` as having been delivered, without broadcasting it to |
79 | | /// any listeners. |
80 | 0 | fn record_event(&self, event_id: EventId) { |
81 | 0 | if let Some(event_info) = self.storage.event_info(event_id) { |
82 | 0 | event_info.pending.store(true, Ordering::SeqCst); |
83 | 0 | } |
84 | 0 | } |
85 | | |
86 | | /// Broadcasts all previously recorded events to their respective listeners. |
87 | | /// |
88 | | /// Returns `true` if an event was delivered to at least one listener. |
89 | 0 | fn broadcast(&self) -> bool { |
90 | 0 | let mut did_notify = false; |
91 | 0 | self.storage.for_each(|event_info| { |
92 | | // Any signal of this kind arrived since we checked last? |
93 | 0 | if !event_info.pending.swap(false, Ordering::SeqCst) { |
94 | 0 | return; |
95 | 0 | } |
96 | | |
97 | | // Ignore errors if there are no listeners |
98 | 0 | if event_info.tx.send(()).is_ok() { |
99 | 0 | did_notify = true; |
100 | 0 | } |
101 | 0 | }); |
102 | | |
103 | 0 | did_notify |
104 | 0 | } |
105 | | } |
106 | | |
107 | | pub(crate) struct Globals { |
108 | | extra: OsExtraData, |
109 | | registry: Registry<OsStorage>, |
110 | | } |
111 | | |
112 | | impl ops::Deref for Globals { |
113 | | type Target = OsExtraData; |
114 | | |
115 | 17.7k | fn deref(&self) -> &Self::Target { |
116 | 17.7k | &self.extra |
117 | 17.7k | } |
118 | | } |
119 | | |
120 | | impl Globals { |
121 | | /// Registers a new listener for `event_id`. |
122 | 0 | pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { |
123 | 0 | self.registry.register_listener(event_id) |
124 | 0 | } |
125 | | |
126 | | /// Marks `event_id` as having been delivered, without broadcasting it to |
127 | | /// any listeners. |
128 | 0 | pub(crate) fn record_event(&self, event_id: EventId) { |
129 | 0 | self.registry.record_event(event_id); |
130 | 0 | } |
131 | | |
132 | | /// Broadcasts all previously recorded events to their respective listeners. |
133 | | /// |
134 | | /// Returns `true` if an event was delivered to at least one listener. |
135 | 0 | pub(crate) fn broadcast(&self) -> bool { |
136 | 0 | self.registry.broadcast() |
137 | 0 | } |
138 | | |
139 | | #[cfg(unix)] |
140 | 0 | pub(crate) fn storage(&self) -> &OsStorage { |
141 | 0 | &self.registry.storage |
142 | 0 | } |
143 | | } |
144 | | |
145 | 2 | fn globals_init() -> Globals |
146 | 2 | where |
147 | 2 | OsExtraData: 'static + Send + Sync + Default, |
148 | 2 | OsStorage: 'static + Send + Sync + Default, |
149 | | { |
150 | 2 | Globals { |
151 | 2 | extra: OsExtraData::default(), |
152 | 2 | registry: Registry::new(OsStorage::default()), |
153 | 2 | } |
154 | 2 | } |
155 | | |
156 | 17.7k | pub(crate) fn globals() -> &'static Globals |
157 | 17.7k | where |
158 | 17.7k | OsExtraData: 'static + Send + Sync + Default, |
159 | 17.7k | OsStorage: 'static + Send + Sync + Default, |
160 | | { |
161 | | static GLOBALS: OnceLock<Globals> = OnceLock::new(); |
162 | | |
163 | 17.7k | GLOBALS.get_or_init(globals_init) |
164 | 17.7k | } |
165 | | |
166 | | #[cfg(all(test, not(loom)))] |
167 | | mod tests { |
168 | | use super::*; |
169 | | use crate::runtime::{self, Runtime}; |
170 | | use crate::sync::{oneshot, watch}; |
171 | | |
172 | | use futures::future; |
173 | | |
174 | | #[test] |
175 | | fn smoke() { |
176 | | let rt = rt(); |
177 | | rt.block_on(async move { |
178 | | let registry = Registry::new(vec![ |
179 | | EventInfo::default(), |
180 | | EventInfo::default(), |
181 | | EventInfo::default(), |
182 | | ]); |
183 | | |
184 | | let first = registry.register_listener(0); |
185 | | let second = registry.register_listener(1); |
186 | | let third = registry.register_listener(2); |
187 | | |
188 | | let (fire, wait) = oneshot::channel(); |
189 | | |
190 | | crate::spawn(async { |
191 | | wait.await.expect("wait failed"); |
192 | | |
193 | | // Record some events which should get coalesced |
194 | | registry.record_event(0); |
195 | | registry.record_event(0); |
196 | | registry.record_event(1); |
197 | | registry.record_event(1); |
198 | | registry.broadcast(); |
199 | | |
200 | | // Yield so the previous broadcast can get received |
201 | | // |
202 | | // This yields many times since the block_on task is only polled every 61 |
203 | | // ticks. |
204 | | for _ in 0..100 { |
205 | | crate::task::yield_now().await; |
206 | | } |
207 | | |
208 | | // Send subsequent signal |
209 | | registry.record_event(0); |
210 | | registry.broadcast(); |
211 | | |
212 | | drop(registry); |
213 | | }); |
214 | | |
215 | | let _ = fire.send(()); |
216 | | let all = future::join3(collect(first), collect(second), collect(third)); |
217 | | |
218 | | let (first_results, second_results, third_results) = all.await; |
219 | | assert_eq!(2, first_results.len()); |
220 | | assert_eq!(1, second_results.len()); |
221 | | assert_eq!(0, third_results.len()); |
222 | | }); |
223 | | } |
224 | | |
225 | | #[test] |
226 | | #[should_panic = "invalid event_id: 1"] |
227 | | fn register_panics_on_invalid_input() { |
228 | | let registry = Registry::new(vec![EventInfo::default()]); |
229 | | |
230 | | registry.register_listener(1); |
231 | | } |
232 | | |
233 | | #[test] |
234 | | fn record_invalid_event_does_nothing() { |
235 | | let registry = Registry::new(vec![EventInfo::default()]); |
236 | | registry.record_event(1302); |
237 | | } |
238 | | |
239 | | #[test] |
240 | | fn broadcast_returns_if_at_least_one_event_fired() { |
241 | | let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); |
242 | | |
243 | | registry.record_event(0); |
244 | | assert!(!registry.broadcast()); |
245 | | |
246 | | let first = registry.register_listener(0); |
247 | | let second = registry.register_listener(1); |
248 | | |
249 | | registry.record_event(0); |
250 | | assert!(registry.broadcast()); |
251 | | |
252 | | drop(first); |
253 | | registry.record_event(0); |
254 | | assert!(!registry.broadcast()); |
255 | | |
256 | | drop(second); |
257 | | } |
258 | | |
259 | | fn rt() -> Runtime { |
260 | | runtime::Builder::new_current_thread() |
261 | | .enable_time() |
262 | | .build() |
263 | | .unwrap() |
264 | | } |
265 | | |
266 | | async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { |
267 | | let mut ret = vec![]; |
268 | | |
269 | | while let Ok(v) = rx.changed().await { |
270 | | ret.push(v); |
271 | | } |
272 | | |
273 | | ret |
274 | | } |
275 | | } |