Coverage Report

Created: 2026-03-31 07:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/flume-0.12.0/src/select.rs
Line
Count
Source
1
//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.
2
3
use crate::*;
4
use spin1::Mutex as Spinlock;
5
use std::{any::Any, marker::PhantomData};
6
7
// A unique token corresponding to an event in a selector
8
type Token = usize;
9
10
struct SelectSignal(
11
    thread::Thread,
12
    Token,
13
    AtomicBool,
14
    Arc<Spinlock<VecDeque<Token>>>,
15
);
16
17
impl Signal for SelectSignal {
18
0
    fn fire(&self) -> bool {
19
0
        self.2.store(true, Ordering::SeqCst);
20
0
        self.3.lock().push_back(self.1);
21
0
        self.0.unpark();
22
0
        false
23
0
    }
24
25
0
    fn as_any(&self) -> &(dyn Any + 'static) {
26
0
        self
27
0
    }
28
0
    fn as_ptr(&self) -> *const () {
29
0
        self as *const _ as *const ()
30
0
    }
31
}
32
33
trait Selection<'a, T> {
34
    fn init(&mut self) -> Option<T>;
35
    fn poll(&mut self) -> Option<T>;
36
    fn deinit(&mut self);
37
}
38
39
/// An error that may be emitted when attempting to wait for a value on a receiver.
40
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
41
pub enum SelectError {
42
    /// A timeout occurred when waiting on a `Selector`.
43
    Timeout,
44
}
45
46
impl fmt::Display for SelectError {
47
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48
0
        match self {
49
0
            SelectError::Timeout => "timeout occurred".fmt(f),
50
        }
51
0
    }
52
}
53
54
impl std::error::Error for SelectError {}
55
56
/// A type used to wait upon multiple blocking operations at once.
57
///
58
/// A [`Selector`] implements [`select`](https://en.wikipedia.org/wiki/Select_(Unix))-like behaviour,
59
/// allowing a thread to wait upon the result of more than one operation at once.
60
///
61
/// # Examples
62
/// ```
63
/// let (tx0, rx0) = flume::unbounded();
64
/// let (tx1, rx1) = flume::unbounded();
65
///
66
/// std::thread::spawn(move || {
67
///     tx0.send(true).unwrap();
68
///     tx1.send(42).unwrap();
69
/// });
70
///
71
/// flume::Selector::new()
72
///     .recv(&rx0, |b| println!("Received {:?}", b))
73
///     .recv(&rx1, |n| println!("Received {:?}", n))
74
///     .wait();
75
/// ```
76
pub struct Selector<'a, T: 'a> {
77
    selections: Vec<Box<dyn Selection<'a, T> + 'a>>,
78
    next_poll: usize,
79
    signalled: Arc<Spinlock<VecDeque<Token>>>,
80
    #[cfg(feature = "eventual-fairness")]
81
    rng: fastrand::Rng,
82
    phantom: PhantomData<*const ()>,
83
}
84
85
impl<'a, T: 'a> Default for Selector<'a, T> {
86
0
    fn default() -> Self {
87
0
        Self::new()
88
0
    }
89
}
90
91
impl<'a, T: 'a> fmt::Debug for Selector<'a, T> {
92
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93
0
        f.debug_struct("Selector").finish()
94
0
    }
95
}
96
97
impl<'a, T> Selector<'a, T> {
98
    /// Create a new selector.
99
0
    pub fn new() -> Self {
100
0
        Self {
101
0
            selections: Vec::new(),
102
0
            next_poll: 0,
103
0
            signalled: Arc::default(),
104
0
            phantom: PhantomData,
105
0
            #[cfg(feature = "eventual-fairness")]
106
0
            rng: fastrand::Rng::new(),
107
0
        }
108
0
    }
109
110
    /// Add a send operation to the selector that sends the provided value.
111
    ///
112
    /// Once added, the selector can be used to run the provided handler function on completion of this operation.
113
0
    pub fn send<U, F: FnMut(Result<(), SendError<U>>) -> T + 'a>(
114
0
        mut self,
115
0
        sender: &'a Sender<U>,
116
0
        msg: U,
117
0
        mapper: F,
118
0
    ) -> Self {
119
        struct SendSelection<'a, T, F, U> {
120
            sender: &'a Sender<U>,
121
            msg: Option<U>,
122
            token: Token,
123
            signalled: Arc<Spinlock<VecDeque<Token>>>,
124
            hook: Option<Arc<Hook<U, SelectSignal>>>,
125
            mapper: F,
126
            phantom: PhantomData<T>,
127
        }
128
129
        impl<'a, T, F, U> Selection<'a, T> for SendSelection<'a, T, F, U>
130
        where
131
            F: FnMut(Result<(), SendError<U>>) -> T,
132
        {
133
0
            fn init(&mut self) -> Option<T> {
134
0
                let token = self.token;
135
0
                let signalled = self.signalled.clone();
136
0
                let r = self.sender.shared.send(
137
0
                    self.msg.take().unwrap(),
138
                    true,
139
0
                    |msg| {
140
0
                        Hook::slot(
141
0
                            Some(msg),
142
0
                            SelectSignal(
143
0
                                thread::current(),
144
0
                                token,
145
0
                                AtomicBool::new(false),
146
0
                                signalled,
147
0
                            ),
148
                        )
149
0
                    },
150
                    // Always runs
151
0
                    |h| {
152
0
                        self.hook = Some(h);
153
0
                        Ok(())
154
0
                    },
155
                );
156
157
0
                if self.hook.is_none() {
158
0
                    Some((self.mapper)(match r {
159
0
                        Ok(()) => Ok(()),
160
0
                        Err(TrySendTimeoutError::Disconnected(msg)) => Err(SendError(msg)),
161
0
                        _ => unreachable!(),
162
                    }))
163
                } else {
164
0
                    None
165
                }
166
0
            }
167
168
0
            fn poll(&mut self) -> Option<T> {
169
0
                let res = if self.sender.shared.is_disconnected() {
170
                    // Check the hook one last time
171
0
                    if let Some(msg) = self.hook.as_ref()?.try_take() {
172
0
                        Err(SendError(msg))
173
                    } else {
174
0
                        Ok(())
175
                    }
176
0
                } else if self.hook.as_ref().unwrap().is_empty() {
177
                    // The message was sent
178
0
                    Ok(())
179
                } else {
180
0
                    return None;
181
                };
182
183
0
                Some((self.mapper)(res))
184
0
            }
185
186
0
            fn deinit(&mut self) {
187
0
                if let Some(hook) = self.hook.take() {
188
                    // Remove hook
189
0
                    let hook: Arc<Hook<U, dyn Signal>> = hook;
190
0
                    wait_lock(&self.sender.shared.chan)
191
0
                        .sending
192
0
                        .as_mut()
193
0
                        .unwrap()
194
0
                        .1
195
0
                        .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
196
0
                }
197
0
            }
198
        }
199
200
0
        let token = self.selections.len();
201
0
        self.selections.push(Box::new(SendSelection {
202
0
            sender,
203
0
            msg: Some(msg),
204
0
            token,
205
0
            signalled: self.signalled.clone(),
206
0
            hook: None,
207
0
            mapper,
208
0
            phantom: Default::default(),
209
0
        }));
210
211
0
        self
212
0
    }
213
214
    /// Add a receive operation to the selector.
215
    ///
216
    /// Once added, the selector can be used to run the provided handler function on completion of this operation.
217
0
    pub fn recv<U, F: FnMut(Result<U, RecvError>) -> T + 'a>(
218
0
        mut self,
219
0
        receiver: &'a Receiver<U>,
220
0
        mapper: F,
221
0
    ) -> Self {
222
        struct RecvSelection<'a, T, F, U> {
223
            receiver: &'a Receiver<U>,
224
            token: Token,
225
            signalled: Arc<Spinlock<VecDeque<Token>>>,
226
            hook: Option<Arc<Hook<U, SelectSignal>>>,
227
            mapper: F,
228
            received: bool,
229
            phantom: PhantomData<T>,
230
        }
231
232
        impl<'a, T, F, U> Selection<'a, T> for RecvSelection<'a, T, F, U>
233
        where
234
            F: FnMut(Result<U, RecvError>) -> T,
235
        {
236
0
            fn init(&mut self) -> Option<T> {
237
0
                let token = self.token;
238
0
                let signalled = self.signalled.clone();
239
0
                let r = self.receiver.shared.recv(
240
                    true,
241
0
                    || {
242
0
                        Hook::trigger(SelectSignal(
243
0
                            thread::current(),
244
0
                            token,
245
0
                            AtomicBool::new(false),
246
0
                            signalled,
247
0
                        ))
248
0
                    },
249
                    // Always runs
250
0
                    |h| {
251
0
                        self.hook = Some(h);
252
0
                        Err(TryRecvTimeoutError::Timeout)
253
0
                    },
254
                );
255
256
0
                if self.hook.is_none() {
257
0
                    Some((self.mapper)(match r {
258
0
                        Ok(msg) => Ok(msg),
259
0
                        Err(TryRecvTimeoutError::Disconnected) => Err(RecvError::Disconnected),
260
0
                        _ => unreachable!(),
261
                    }))
262
                } else {
263
0
                    None
264
                }
265
0
            }
266
267
0
            fn poll(&mut self) -> Option<T> {
268
0
                let res = if let Ok(msg) = self.receiver.try_recv() {
269
0
                    self.received = true;
270
0
                    Ok(msg)
271
0
                } else if self.receiver.shared.is_disconnected() {
272
0
                    Err(RecvError::Disconnected)
273
                } else {
274
0
                    return None;
275
                };
276
277
0
                Some((self.mapper)(res))
278
0
            }
279
280
0
            fn deinit(&mut self) {
281
0
                if let Some(hook) = self.hook.take() {
282
                    // Remove hook
283
0
                    let hook: Arc<Hook<U, dyn Signal>> = hook;
284
0
                    wait_lock(&self.receiver.shared.chan)
285
0
                        .waiting
286
0
                        .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
287
                    // If we were woken, but never polled, wake up another
288
0
                    if !self.received
289
0
                        && hook
290
0
                            .signal()
291
0
                            .as_any()
292
0
                            .downcast_ref::<SelectSignal>()
293
0
                            .unwrap()
294
0
                            .2
295
0
                            .load(Ordering::SeqCst)
296
0
                    {
297
0
                        wait_lock(&self.receiver.shared.chan).try_wake_receiver_if_pending();
298
0
                    }
299
0
                }
300
0
            }
301
        }
302
303
0
        let token = self.selections.len();
304
0
        self.selections.push(Box::new(RecvSelection {
305
0
            receiver,
306
0
            token,
307
0
            signalled: self.signalled.clone(),
308
0
            hook: None,
309
0
            mapper,
310
0
            received: false,
311
0
            phantom: Default::default(),
312
0
        }));
313
314
0
        self
315
0
    }
316
317
0
    fn wait_inner(mut self, deadline: Option<Instant>) -> Option<T> {
318
        #[cfg(feature = "eventual-fairness")]
319
0
        {
320
0
            self.next_poll = self.rng.usize(0..self.selections.len());
321
0
        }
322
323
0
        let res = 'outer: {
324
            // Init signals
325
0
            for _ in 0..self.selections.len() {
326
0
                if let Some(val) = self.selections[self.next_poll].init() {
327
0
                    break 'outer Some(val);
328
0
                }
329
0
                self.next_poll = (self.next_poll + 1) % self.selections.len();
330
            }
331
332
            // Speculatively poll
333
0
            if let Some(msg) = self.poll() {
334
0
                break 'outer Some(msg);
335
0
            }
336
337
            loop {
338
0
                if let Some(deadline) = deadline {
339
0
                    if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
340
0
                        thread::park_timeout(dur);
341
0
                    }
342
0
                } else {
343
0
                    thread::park();
344
0
                }
345
346
0
                if deadline.map(|d| Instant::now() >= d).unwrap_or(false) {
347
0
                    break 'outer self.poll();
348
0
                }
349
350
0
                let token = if let Some(token) = self.signalled.lock().pop_front() {
351
0
                    token
352
                } else {
353
                    // Spurious wakeup, park again
354
0
                    continue;
355
                };
356
357
                // Attempt to receive a message
358
0
                if let Some(msg) = self.selections[token].poll() {
359
0
                    break 'outer Some(msg);
360
0
                }
361
            }
362
        };
363
364
        // Deinit signals
365
0
        for s in &mut self.selections {
366
0
            s.deinit();
367
0
        }
368
369
0
        res
370
0
    }
371
372
0
    fn poll(&mut self) -> Option<T> {
373
0
        for _ in 0..self.selections.len() {
374
0
            if let Some(val) = self.selections[self.next_poll].poll() {
375
0
                return Some(val);
376
0
            }
377
0
            self.next_poll = (self.next_poll + 1) % self.selections.len();
378
        }
379
0
        None
380
0
    }
381
382
    /// Wait until one of the events associated with this [`Selector`] has completed. If the `eventual-fairness`
383
    /// feature flag is enabled, this method is fair and will handle a random event of those that are ready.
384
0
    pub fn wait(self) -> T {
385
0
        self.wait_inner(None).unwrap()
386
0
    }
387
388
    /// Wait until one of the events associated with this [`Selector`] has completed or the timeout has expired. If the
389
    /// `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those that
390
    /// are ready.
391
0
    pub fn wait_timeout(self, dur: Duration) -> Result<T, SelectError> {
392
0
        self.wait_inner(Instant::now().checked_add(dur))
393
0
            .ok_or(SelectError::Timeout)
394
0
    }
395
396
    /// Wait until one of the events associated with this [`Selector`] has completed or the deadline has been reached.
397
    /// If the `eventual-fairness` feature flag is enabled, this method is fair and will handle a random event of those
398
    /// that are ready.
399
0
    pub fn wait_deadline(self, deadline: Instant) -> Result<T, SelectError> {
400
0
        self.wait_inner(Some(deadline)).ok_or(SelectError::Timeout)
401
0
    }
402
}