Coverage Report

Created: 2026-01-09 06:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.49.0/src/signal/registry.rs
Line
Count
Source
1
use crate::signal::os::{OsExtraData, OsStorage};
2
use crate::sync::watch;
3
4
use std::ops;
5
use std::sync::atomic::{AtomicBool, Ordering};
6
use std::sync::OnceLock;
7
8
pub(crate) type EventId = usize;
9
10
/// State for a specific event, whether a notification is pending delivery,
11
/// and what listeners are registered.
12
#[derive(Debug)]
13
pub(crate) struct EventInfo {
14
    pending: AtomicBool,
15
    tx: watch::Sender<()>,
16
}
17
18
impl Default for EventInfo {
19
128
    fn default() -> Self {
20
128
        let (tx, _rx) = watch::channel(());
21
22
128
        Self {
23
128
            pending: AtomicBool::new(false),
24
128
            tx,
25
128
        }
26
128
    }
27
}
28
29
/// An interface for retrieving the `EventInfo` for a particular `eventId`.
30
pub(crate) trait Storage {
31
    /// Gets the `EventInfo` for `id` if it exists.
32
    fn event_info(&self, id: EventId) -> Option<&EventInfo>;
33
34
    /// Invokes `f` once for each defined `EventInfo` in this storage.
35
    fn for_each<'a, F>(&'a self, f: F)
36
    where
37
        F: FnMut(&'a EventInfo);
38
}
39
40
impl Storage for Vec<EventInfo> {
41
0
    fn event_info(&self, id: EventId) -> Option<&EventInfo> {
42
0
        self.get(id)
43
0
    }
44
45
0
    fn for_each<'a, F>(&'a self, f: F)
46
0
    where
47
0
        F: FnMut(&'a EventInfo),
48
    {
49
0
        self.iter().for_each(f);
50
0
    }
51
}
52
53
/// Manages and distributes event notifications to any registered listeners.
54
///
55
/// Generic over the underlying storage to allow for domain specific
56
/// optimizations (e.g. `eventIds` may or may not be contiguous).
57
#[derive(Debug)]
58
pub(crate) struct Registry<S> {
59
    storage: S,
60
}
61
62
impl<S> Registry<S> {
63
2
    fn new(storage: S) -> Self {
64
2
        Self { storage }
65
2
    }
66
}
67
68
impl<S: Storage> Registry<S> {
69
    /// Registers a new listener for `event_id`.
70
0
    fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
71
0
        self.storage
72
0
            .event_info(event_id)
73
0
            .unwrap_or_else(|| panic!("invalid event_id: {event_id}"))
74
            .tx
75
0
            .subscribe()
76
0
    }
77
78
    /// Marks `event_id` as having been delivered, without broadcasting it to
79
    /// any listeners.
80
0
    fn record_event(&self, event_id: EventId) {
81
0
        if let Some(event_info) = self.storage.event_info(event_id) {
82
0
            event_info.pending.store(true, Ordering::SeqCst);
83
0
        }
84
0
    }
85
86
    /// Broadcasts all previously recorded events to their respective listeners.
87
    ///
88
    /// Returns `true` if an event was delivered to at least one listener.
89
0
    fn broadcast(&self) -> bool {
90
0
        let mut did_notify = false;
91
0
        self.storage.for_each(|event_info| {
92
            // Any signal of this kind arrived since we checked last?
93
0
            if !event_info.pending.swap(false, Ordering::SeqCst) {
94
0
                return;
95
0
            }
96
97
            // Ignore errors if there are no listeners
98
0
            if event_info.tx.send(()).is_ok() {
99
0
                did_notify = true;
100
0
            }
101
0
        });
102
103
0
        did_notify
104
0
    }
105
}
106
107
pub(crate) struct Globals {
108
    extra: OsExtraData,
109
    registry: Registry<OsStorage>,
110
}
111
112
impl ops::Deref for Globals {
113
    type Target = OsExtraData;
114
115
17.7k
    fn deref(&self) -> &Self::Target {
116
17.7k
        &self.extra
117
17.7k
    }
118
}
119
120
impl Globals {
121
    /// Registers a new listener for `event_id`.
122
0
    pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
123
0
        self.registry.register_listener(event_id)
124
0
    }
125
126
    /// Marks `event_id` as having been delivered, without broadcasting it to
127
    /// any listeners.
128
0
    pub(crate) fn record_event(&self, event_id: EventId) {
129
0
        self.registry.record_event(event_id);
130
0
    }
131
132
    /// Broadcasts all previously recorded events to their respective listeners.
133
    ///
134
    /// Returns `true` if an event was delivered to at least one listener.
135
0
    pub(crate) fn broadcast(&self) -> bool {
136
0
        self.registry.broadcast()
137
0
    }
138
139
    #[cfg(unix)]
140
0
    pub(crate) fn storage(&self) -> &OsStorage {
141
0
        &self.registry.storage
142
0
    }
143
}
144
145
2
fn globals_init() -> Globals
146
2
where
147
2
    OsExtraData: 'static + Send + Sync + Default,
148
2
    OsStorage: 'static + Send + Sync + Default,
149
{
150
2
    Globals {
151
2
        extra: OsExtraData::default(),
152
2
        registry: Registry::new(OsStorage::default()),
153
2
    }
154
2
}
155
156
17.7k
pub(crate) fn globals() -> &'static Globals
157
17.7k
where
158
17.7k
    OsExtraData: 'static + Send + Sync + Default,
159
17.7k
    OsStorage: 'static + Send + Sync + Default,
160
{
161
    static GLOBALS: OnceLock<Globals> = OnceLock::new();
162
163
17.7k
    GLOBALS.get_or_init(globals_init)
164
17.7k
}
165
166
#[cfg(all(test, not(loom)))]
167
mod tests {
168
    use super::*;
169
    use crate::runtime::{self, Runtime};
170
    use crate::sync::{oneshot, watch};
171
172
    use futures::future;
173
174
    #[test]
175
    fn smoke() {
176
        let rt = rt();
177
        rt.block_on(async move {
178
            let registry = Registry::new(vec![
179
                EventInfo::default(),
180
                EventInfo::default(),
181
                EventInfo::default(),
182
            ]);
183
184
            let first = registry.register_listener(0);
185
            let second = registry.register_listener(1);
186
            let third = registry.register_listener(2);
187
188
            let (fire, wait) = oneshot::channel();
189
190
            crate::spawn(async {
191
                wait.await.expect("wait failed");
192
193
                // Record some events which should get coalesced
194
                registry.record_event(0);
195
                registry.record_event(0);
196
                registry.record_event(1);
197
                registry.record_event(1);
198
                registry.broadcast();
199
200
                // Yield so the previous broadcast can get received
201
                //
202
                // This yields many times since the block_on task is only polled every 61
203
                // ticks.
204
                for _ in 0..100 {
205
                    crate::task::yield_now().await;
206
                }
207
208
                // Send subsequent signal
209
                registry.record_event(0);
210
                registry.broadcast();
211
212
                drop(registry);
213
            });
214
215
            let _ = fire.send(());
216
            let all = future::join3(collect(first), collect(second), collect(third));
217
218
            let (first_results, second_results, third_results) = all.await;
219
            assert_eq!(2, first_results.len());
220
            assert_eq!(1, second_results.len());
221
            assert_eq!(0, third_results.len());
222
        });
223
    }
224
225
    #[test]
226
    #[should_panic = "invalid event_id: 1"]
227
    fn register_panics_on_invalid_input() {
228
        let registry = Registry::new(vec![EventInfo::default()]);
229
230
        registry.register_listener(1);
231
    }
232
233
    #[test]
234
    fn record_invalid_event_does_nothing() {
235
        let registry = Registry::new(vec![EventInfo::default()]);
236
        registry.record_event(1302);
237
    }
238
239
    #[test]
240
    fn broadcast_returns_if_at_least_one_event_fired() {
241
        let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
242
243
        registry.record_event(0);
244
        assert!(!registry.broadcast());
245
246
        let first = registry.register_listener(0);
247
        let second = registry.register_listener(1);
248
249
        registry.record_event(0);
250
        assert!(registry.broadcast());
251
252
        drop(first);
253
        registry.record_event(0);
254
        assert!(!registry.broadcast());
255
256
        drop(second);
257
    }
258
259
    fn rt() -> Runtime {
260
        runtime::Builder::new_current_thread()
261
            .enable_time()
262
            .build()
263
            .unwrap()
264
    }
265
266
    async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
267
        let mut ret = vec![];
268
269
        while let Ok(v) = rx.changed().await {
270
            ret.push(v);
271
        }
272
273
        ret
274
    }
275
}