Coverage Report

Created: 2026-01-22 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/flume-0.12.0/src/lib.rs
Line
Count
Source
1
//! # Flume
2
//!
3
//! A blazingly fast multi-producer, multi-consumer channel.
4
//!
5
//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
6
//!
7
//! ## Why Flume?
8
//!
9
//! - **Featureful**: Unbounded, bounded and rendezvous queues
10
//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
11
//! - **Safe**: No `unsafe` code anywhere in the codebase!
12
//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
13
//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
14
//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
15
//! - **Simple**: Few dependencies, minimal codebase, fast to compile
16
//! - **Asynchronous**: `async` support, including mix 'n match with sync code
17
//! - **Ergonomic**: Powerful `select`-like interface
18
//!
19
//! ## Example
20
//!
21
//! ```
22
//! let (tx, rx) = flume::unbounded();
23
//!
24
//! tx.send(42).unwrap();
25
//! assert_eq!(rx.recv().unwrap(), 42);
26
//! ```
27
28
#![cfg_attr(docsrs, feature(doc_cfg))]
29
#![deny(missing_docs)]
30
31
#[cfg(feature = "async")]
32
pub mod r#async;
33
#[cfg(feature = "select")]
34
pub mod select;
35
36
mod signal;
37
38
// Reexports
39
#[cfg(feature = "select")]
40
pub use select::Selector;
41
42
use crate::signal::{Signal, SyncSignal};
43
#[cfg(feature = "spin")]
44
use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
45
use std::fmt::Formatter;
46
use std::{
47
    collections::VecDeque,
48
    fmt,
49
    marker::PhantomData,
50
    sync::{
51
        atomic::{AtomicBool, AtomicUsize, Ordering},
52
        Arc, Weak,
53
    },
54
    thread,
55
    time::{Duration, Instant},
56
};
57
58
/// An error that may be emitted when attempting to send a value into a channel on a sender when
59
/// all receivers are dropped.
60
#[derive(Copy, Clone, PartialEq, Eq)]
61
pub struct SendError<T>(pub T);
62
63
impl<T> SendError<T> {
64
    /// Consume the error, yielding the message that failed to send.
65
0
    pub fn into_inner(self) -> T {
66
0
        self.0
67
0
    }
68
}
69
70
impl<T> fmt::Debug for SendError<T> {
71
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72
0
        "SendError(..)".fmt(f)
73
0
    }
74
}
75
76
impl<T> fmt::Display for SendError<T> {
77
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78
0
        "sending on a closed channel".fmt(f)
79
0
    }
80
}
81
82
impl<T> std::error::Error for SendError<T> {}
83
84
/// An error that may be emitted when attempting to send a value into a channel on a sender when
85
/// the channel is full or all receivers are dropped.
86
#[derive(Copy, Clone, PartialEq, Eq)]
87
pub enum TrySendError<T> {
88
    /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
89
    Full(T),
90
    /// All channel receivers were dropped and so the message has nobody to receive it.
91
    Disconnected(T),
92
}
93
94
impl<T> TrySendError<T> {
95
    /// Consume the error, yielding the message that failed to send.
96
0
    pub fn into_inner(self) -> T {
97
0
        match self {
98
0
            Self::Full(msg) | Self::Disconnected(msg) => msg,
99
        }
100
0
    }
101
}
102
103
impl<T> fmt::Debug for TrySendError<T> {
104
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105
0
        match *self {
106
0
            TrySendError::Full(..) => "Full(..)".fmt(f),
107
0
            TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
108
        }
109
0
    }
110
}
111
112
impl<T> fmt::Display for TrySendError<T> {
113
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114
0
        match self {
115
0
            TrySendError::Full(..) => "sending on a full channel".fmt(f),
116
0
            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
117
        }
118
0
    }
119
}
120
121
impl<T> std::error::Error for TrySendError<T> {}
122
123
impl<T> From<SendError<T>> for TrySendError<T> {
124
0
    fn from(err: SendError<T>) -> Self {
125
0
        match err {
126
0
            SendError(item) => Self::Disconnected(item),
127
        }
128
0
    }
129
}
130
131
/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
132
/// the send operation times out or all receivers are dropped.
133
#[derive(Copy, Clone, PartialEq, Eq)]
134
pub enum SendTimeoutError<T> {
135
    /// A timeout occurred when attempting to send the message.
136
    Timeout(T),
137
    /// All channel receivers were dropped and so the message has nobody to receive it.
138
    Disconnected(T),
139
}
140
141
impl<T> SendTimeoutError<T> {
142
    /// Consume the error, yielding the message that failed to send.
143
0
    pub fn into_inner(self) -> T {
144
0
        match self {
145
0
            Self::Timeout(msg) | Self::Disconnected(msg) => msg,
146
        }
147
0
    }
148
}
149
150
impl<T> fmt::Debug for SendTimeoutError<T> {
151
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152
0
        "SendTimeoutError(..)".fmt(f)
153
0
    }
154
}
155
156
impl<T> fmt::Display for SendTimeoutError<T> {
157
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158
0
        match self {
159
0
            SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
160
0
            SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
161
        }
162
0
    }
163
}
164
165
impl<T> std::error::Error for SendTimeoutError<T> {}
166
167
impl<T> From<SendError<T>> for SendTimeoutError<T> {
168
0
    fn from(err: SendError<T>) -> Self {
169
0
        match err {
170
0
            SendError(item) => Self::Disconnected(item),
171
        }
172
0
    }
173
}
174
175
enum TrySendTimeoutError<T> {
176
    Full(T),
177
    Disconnected(T),
178
    Timeout(T),
179
}
180
181
/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
182
/// are dropped and there are no more messages in the channel.
183
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
184
pub enum RecvError {
185
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
186
    Disconnected,
187
}
188
189
impl fmt::Display for RecvError {
190
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191
0
        match self {
192
0
            RecvError::Disconnected => "receiving on a closed channel".fmt(f),
193
        }
194
0
    }
195
}
196
197
impl std::error::Error for RecvError {}
198
199
/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
200
/// messages in the channel. If there are no messages in the channel and all senders are dropped,
201
/// then `TryRecvError::Disconnected` will be returned.
202
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
203
pub enum TryRecvError {
204
    /// The channel was empty when the receive was attempted.
205
    Empty,
206
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
207
    Disconnected,
208
}
209
210
impl fmt::Display for TryRecvError {
211
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212
0
        match self {
213
0
            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
214
0
            TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
215
        }
216
0
    }
217
}
218
219
impl std::error::Error for TryRecvError {}
220
221
impl From<RecvError> for TryRecvError {
222
0
    fn from(err: RecvError) -> Self {
223
0
        match err {
224
0
            RecvError::Disconnected => Self::Disconnected,
225
        }
226
0
    }
227
}
228
229
/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
230
/// when the receive operation times out or all senders are dropped and there are no values left
231
/// in the channel.
232
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
233
pub enum RecvTimeoutError {
234
    /// A timeout occurred when attempting to receive a message.
235
    Timeout,
236
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
237
    Disconnected,
238
}
239
240
impl fmt::Display for RecvTimeoutError {
241
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242
0
        match self {
243
0
            RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
244
0
            RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
245
        }
246
0
    }
247
}
248
249
impl std::error::Error for RecvTimeoutError {}
250
251
impl From<RecvError> for RecvTimeoutError {
252
0
    fn from(err: RecvError) -> Self {
253
0
        match err {
254
0
            RecvError::Disconnected => Self::Disconnected,
255
        }
256
0
    }
257
}
258
259
enum TryRecvTimeoutError {
260
    Empty,
261
    Timeout,
262
    Disconnected,
263
}
264
265
// TODO: Investigate some sort of invalidation flag for timeouts
266
#[cfg(feature = "spin")]
267
struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
268
269
#[cfg(not(feature = "spin"))]
270
struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
271
272
#[cfg(feature = "spin")]
273
impl<T, S: ?Sized + Signal> Hook<T, S> {
274
    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
275
    where
276
        S: Sized,
277
    {
278
        Arc::new(Self(Some(Spinlock::new(msg)), signal))
279
    }
280
281
    fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
282
        self.0.as_ref().map(|s| s.lock())
283
    }
284
}
285
286
#[cfg(not(feature = "spin"))]
287
impl<T, S: ?Sized + Signal> Hook<T, S> {
288
0
    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
289
0
    where
290
0
        S: Sized,
291
    {
292
0
        Arc::new(Self(Some(Mutex::new(msg)), signal))
293
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::slot
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::slot
Unexecuted instantiation: <flume::Hook<_, _>>::slot
294
295
0
    fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
296
0
        self.0.as_ref().map(|s| s.lock().unwrap())
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<_, _>>::lock::{closure#0}
297
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::lock
Unexecuted instantiation: <flume::Hook<_, _>>::lock
298
}
299
300
impl<T, S: ?Sized + Signal> Hook<T, S> {
301
0
    pub fn fire_recv(&self) -> (T, &S) {
302
0
        let msg = self.lock().unwrap().take().unwrap();
303
0
        (msg, self.signal())
304
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::fire_recv
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::fire_recv
Unexecuted instantiation: <flume::Hook<_, _>>::fire_recv
305
306
0
    pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
307
0
        let ret = match self.lock() {
308
0
            Some(mut lock) => {
309
0
                *lock = Some(msg);
310
0
                None
311
            }
312
0
            None => Some(msg),
313
        };
314
0
        (ret, self.signal())
315
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::fire_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::fire_send
Unexecuted instantiation: <flume::Hook<_, _>>::fire_send
316
317
0
    pub fn is_empty(&self) -> bool {
318
0
        self.lock().map(|s| s.is_none()).unwrap_or(true)
319
0
    }
320
321
0
    pub fn try_take(&self) -> Option<T> {
322
0
        self.lock().unwrap().take()
323
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::try_take
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::try_take
Unexecuted instantiation: <flume::Hook<_, _>>::try_take
324
325
0
    pub fn trigger(signal: S) -> Arc<Self>
326
0
    where
327
0
        S: Sized,
328
    {
329
0
        Arc::new(Self(None, signal))
330
0
    }
331
332
0
    pub fn signal(&self) -> &S {
333
0
        &self.1
334
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::signal
Unexecuted instantiation: <flume::Hook<_, _>>::signal
335
336
0
    pub fn fire_nothing(&self) -> bool {
337
0
        self.signal().fire()
338
0
    }
339
}
340
341
impl<T> Hook<T, SyncSignal> {
342
0
    pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
343
        loop {
344
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
345
0
            let msg = self.lock().unwrap().take();
346
0
            if let Some(msg) = msg {
347
0
                break Some(msg);
348
0
            } else if disconnected {
349
0
                break None;
350
            } else {
351
0
                self.signal().wait()
352
            }
353
        }
354
0
    }
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_recv
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_recv
355
356
    // Err(true) if timeout
357
0
    pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
358
        loop {
359
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
360
0
            let msg = self.lock().unwrap().take();
361
0
            if let Some(msg) = msg {
362
0
                break Ok(msg);
363
0
            } else if disconnected {
364
0
                break Err(false);
365
0
            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
366
0
                self.signal().wait_timeout(dur);
367
0
            } else {
368
0
                break Err(true);
369
            }
370
        }
371
0
    }
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_deadline_recv
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_deadline_recv
372
373
0
    pub fn wait_send(&self, abort: &AtomicBool) {
374
        loop {
375
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
376
0
            if disconnected || self.lock().unwrap().is_none() {
377
0
                break;
378
0
            }
379
380
0
            self.signal().wait();
381
        }
382
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::wait_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_send
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_send
383
384
    // Err(true) if timeout
385
0
    pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
386
        loop {
387
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
388
0
            if self.lock().unwrap().is_none() {
389
0
                break Ok(());
390
0
            } else if disconnected {
391
0
                break Err(false);
392
0
            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
393
0
                self.signal().wait_timeout(dur);
394
0
            } else {
395
0
                break Err(true);
396
            }
397
        }
398
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::wait_deadline_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_deadline_send
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_deadline_send
399
}
400
401
#[cfg(feature = "spin")]
402
#[inline]
403
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<'_, T> {
404
    // Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
405
    // running in the main thread of a web browser) so we only use it on targets where we know it
406
    // will work
407
    #[cfg(any(target_family = "unix", target_family = "windows"))]
408
    {
409
        let mut i = 4;
410
        loop {
411
            for _ in 0..10 {
412
                if let Some(guard) = lock.try_lock() {
413
                    return guard;
414
                }
415
                thread::yield_now();
416
            }
417
            // Sleep for at most ~1 ms
418
            thread::sleep(Duration::from_nanos(1 << i.min(20)));
419
            i += 1;
420
        }
421
    }
422
    #[cfg(not(any(target_family = "unix", target_family = "windows")))]
423
    lock.lock()
424
}
425
426
#[cfg(not(feature = "spin"))]
427
#[inline]
428
0
fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
429
0
    lock.lock().unwrap()
430
0
}
Unexecuted instantiation: flume::wait_lock::<flume::Chan<alloc::sync::Arc<alloc::string::String>>>
Unexecuted instantiation: flume::wait_lock::<flume::Chan<alloc::string::String>>
Unexecuted instantiation: flume::wait_lock::<_>
431
432
#[cfg(not(feature = "spin"))]
433
use std::sync::{Mutex, MutexGuard};
434
435
#[cfg(feature = "spin")]
436
type ChanLock<T> = Spinlock<T>;
437
#[cfg(not(feature = "spin"))]
438
type ChanLock<T> = Mutex<T>;
439
440
type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
441
struct Chan<T> {
442
    sending: Option<(usize, SignalVec<T>)>,
443
    queue: VecDeque<T>,
444
    waiting: SignalVec<T>,
445
}
446
447
impl<T> Chan<T> {
448
0
    fn pull_pending(&mut self, pull_extra: bool) {
449
0
        if let Some((cap, sending)) = &mut self.sending {
450
0
            let effective_cap = *cap + pull_extra as usize;
451
452
0
            while self.queue.len() < effective_cap {
453
0
                if let Some(s) = sending.pop_front() {
454
0
                    let (msg, signal) = s.fire_recv();
455
0
                    signal.fire();
456
0
                    self.queue.push_back(msg);
457
0
                } else {
458
0
                    break;
459
                }
460
            }
461
0
        }
462
0
    }
Unexecuted instantiation: <flume::Chan<alloc::sync::Arc<alloc::string::String>>>::pull_pending
Unexecuted instantiation: <flume::Chan<alloc::string::String>>::pull_pending
Unexecuted instantiation: <flume::Chan<_>>::pull_pending
463
464
0
    fn try_wake_receiver_if_pending(&mut self) {
465
0
        if !self.queue.is_empty() {
466
0
            while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
467
0
        }
468
0
    }
469
}
470
471
struct Shared<T> {
472
    chan: ChanLock<Chan<T>>,
473
    disconnected: AtomicBool,
474
    sender_count: AtomicUsize,
475
    receiver_count: AtomicUsize,
476
}
477
478
impl<T> Shared<T> {
479
0
    fn new(cap: Option<usize>) -> Self {
480
        Self {
481
0
            chan: ChanLock::new(Chan {
482
0
                sending: cap.map(|cap| (cap, VecDeque::new())),
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::new::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::new::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::new::{closure#0}
483
0
                queue: VecDeque::new(),
484
0
                waiting: VecDeque::new(),
485
            }),
486
0
            disconnected: AtomicBool::new(false),
487
0
            sender_count: AtomicUsize::new(1),
488
0
            receiver_count: AtomicUsize::new(1),
489
        }
490
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::new
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::new
Unexecuted instantiation: <flume::Shared<_>>::new
491
492
0
    fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
493
0
        &self,
494
0
        msg: T,
495
0
        should_block: bool,
496
0
        make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
497
0
        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
498
0
    ) -> R {
499
0
        let mut chan = wait_lock(&self.chan);
500
501
0
        if self.is_disconnected() {
502
0
            Err(TrySendTimeoutError::Disconnected(msg)).into()
503
0
        } else if !chan.waiting.is_empty() {
504
0
            let mut msg = Some(msg);
505
506
            loop {
507
0
                let slot = chan.waiting.pop_front();
508
0
                match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>::{closure#0}
509
                    // No more waiting receivers and msg in queue, so break out of the loop
510
0
                    None if msg.is_none() => break,
511
                    // No more waiting receivers, so add msg to queue and break out of the loop
512
                    None => {
513
0
                        chan.queue.push_back(msg.unwrap());
514
0
                        break;
515
                    }
516
0
                    Some((Some(m), signal)) => {
517
0
                        if signal.fire() {
518
                            // Was async and a stream, so didn't acquire the message. Wake another
519
                            // receiver, and do not yet push the message.
520
0
                            msg.replace(m);
521
0
                            continue;
522
                        } else {
523
                            // Was async and not a stream, so it did acquire the message. Push the
524
                            // message to the queue for it to be received.
525
0
                            chan.queue.push_back(m);
526
0
                            drop(chan);
527
0
                            break;
528
                        }
529
                    }
530
0
                    Some((None, signal)) => {
531
0
                        drop(chan);
532
0
                        signal.fire();
533
0
                        break; // Was sync, so it has acquired the message
534
                    }
535
                }
536
            }
537
538
0
            Ok(()).into()
539
0
        } else if chan
540
0
            .sending
541
0
            .as_ref()
542
0
            .map(|(cap, _)| chan.queue.len() < *cap)
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>::{closure#1}
543
0
            .unwrap_or(true)
544
        {
545
0
            chan.queue.push_back(msg);
546
0
            Ok(()).into()
547
0
        } else if should_block {
548
            // Only bounded from here on
549
0
            let hook = make_signal(msg);
550
0
            chan.sending.as_mut().unwrap().1.push_back(hook.clone());
551
0
            drop(chan);
552
553
0
            do_block(hook)
554
        } else {
555
0
            Err(TrySendTimeoutError::Full(msg)).into()
556
        }
557
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>
558
559
0
    fn send_sync(
560
0
        &self,
561
0
        msg: T,
562
0
        block: Option<Option<Instant>>,
563
0
    ) -> Result<(), TrySendTimeoutError<T>> {
564
0
        self.send(
565
            // msg
566
0
            msg,
567
            // should_block
568
0
            block.is_some(),
569
            // make_signal
570
0
            |msg| Hook::slot(Some(msg), SyncSignal::default()),
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#0}
571
            // do_block
572
0
            |hook| {
573
0
                if let Some(deadline) = block.unwrap() {
574
0
                    hook.wait_deadline_send(&self.disconnected, deadline)
575
0
                        .or_else(|timed_out| {
576
0
                            if timed_out {
577
                                // Remove our signal
578
0
                                let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
579
0
                                wait_lock(&self.chan)
580
0
                                    .sending
581
0
                                    .as_mut()
582
0
                                    .unwrap()
583
0
                                    .1
584
0
                                    .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}::{closure#0}
585
0
                            }
586
0
                            hook.try_take()
587
0
                                .map(|msg| {
588
0
                                    if self.is_disconnected() {
589
0
                                        Err(TrySendTimeoutError::Disconnected(msg))
590
                                    } else {
591
0
                                        Err(TrySendTimeoutError::Timeout(msg))
592
                                    }
593
0
                                })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}::{closure#1}
594
0
                                .unwrap_or(Ok(()))
595
0
                        })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}
596
                } else {
597
0
                    hook.wait_send(&self.disconnected);
598
599
0
                    match hook.try_take() {
600
0
                        Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
601
0
                        None => Ok(()),
602
                    }
603
                }
604
0
            },
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}
605
        )
606
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync
Unexecuted instantiation: <flume::Shared<_>>::send_sync
607
608
0
    fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
609
0
        &self,
610
0
        should_block: bool,
611
0
        make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
612
0
        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
613
0
    ) -> R {
614
0
        let mut chan = wait_lock(&self.chan);
615
0
        chan.pull_pending(true);
616
617
0
        if let Some(msg) = chan.queue.pop_front() {
618
0
            drop(chan);
619
0
            Ok(msg).into()
620
0
        } else if self.is_disconnected() {
621
0
            drop(chan);
622
0
            Err(TryRecvTimeoutError::Disconnected).into()
623
0
        } else if should_block {
624
0
            let hook = make_signal();
625
0
            chan.waiting.push_back(hook.clone());
626
0
            drop(chan);
627
628
0
            do_block(hook)
629
        } else {
630
0
            drop(chan);
631
0
            Err(TryRecvTimeoutError::Empty).into()
632
        }
633
0
    }
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv::<flume::signal::SyncSignal, core::result::Result<alloc::string::String, flume::TryRecvTimeoutError>, <flume::Shared<alloc::string::String>>::recv_sync::{closure#0}, <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<_>>::recv::<_, _, _, _>
634
635
0
    fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
636
0
        self.recv(
637
            // should_block
638
0
            block.is_some(),
639
            // make_signal
640
0
            || Hook::slot(None, SyncSignal::default()),
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#0}
641
            // do_block
642
0
            |hook| {
643
0
                if let Some(deadline) = block.unwrap() {
644
0
                    hook.wait_deadline_recv(&self.disconnected, deadline)
645
0
                        .or_else(|timed_out| {
646
0
                            if timed_out {
647
                                // Remove our signal
648
0
                                let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
649
0
                                wait_lock(&self.chan)
650
0
                                    .waiting
651
0
                                    .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#0}::{closure#0}
652
0
                            }
653
0
                            match hook.try_take() {
654
0
                                Some(msg) => Ok(msg),
655
                                None => {
656
0
                                    let disconnected = self.is_disconnected(); // Check disconnect *before* msg
657
0
                                    if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
658
0
                                        Ok(msg)
659
0
                                    } else if disconnected {
660
0
                                        Err(TryRecvTimeoutError::Disconnected)
661
                                    } else {
662
0
                                        Err(TryRecvTimeoutError::Timeout)
663
                                    }
664
                                }
665
                            }
666
0
                        })
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#0}
667
                } else {
668
0
                    hook.wait_recv(&self.disconnected)
669
0
                        .or_else(|| wait_lock(&self.chan).queue.pop_front())
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#1}
670
0
                        .ok_or(TryRecvTimeoutError::Disconnected)
671
                }
672
0
            },
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}
673
        )
674
0
    }
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync
Unexecuted instantiation: <flume::Shared<_>>::recv_sync
675
676
    /// Disconnect anything listening on this channel (this will not prevent receivers receiving
677
    /// msgs that have already been sent)
678
0
    fn disconnect_all(&self) {
679
0
        self.disconnected.store(true, Ordering::Relaxed);
680
681
0
        let mut chan = wait_lock(&self.chan);
682
0
        chan.pull_pending(false);
683
0
        if let Some((_, sending)) = chan.sending.as_ref() {
684
0
            sending.iter().for_each(|hook| {
685
0
                hook.signal().fire();
686
0
            })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all::{closure#0}
687
0
        }
688
0
        chan.waiting.iter().for_each(|hook| {
689
0
            hook.signal().fire();
690
0
        });
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all::{closure#1}
691
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all
692
693
0
    fn is_disconnected(&self) -> bool {
694
0
        self.disconnected.load(Ordering::SeqCst)
695
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::is_disconnected
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::is_disconnected
Unexecuted instantiation: <flume::Shared<_>>::is_disconnected
696
697
0
    fn is_empty(&self) -> bool {
698
0
        self.len() == 0
699
0
    }
700
701
0
    fn is_full(&self) -> bool {
702
0
        self.capacity()
703
0
            .map(|cap| cap == self.len())
704
0
            .unwrap_or(false)
705
0
    }
706
707
0
    fn len(&self) -> usize {
708
0
        let mut chan = wait_lock(&self.chan);
709
0
        chan.pull_pending(false);
710
0
        chan.queue.len()
711
0
    }
712
713
0
    fn capacity(&self) -> Option<usize> {
714
0
        wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
715
0
    }
716
717
0
    fn sender_count(&self) -> usize {
718
0
        self.sender_count.load(Ordering::Relaxed)
719
0
    }
720
721
0
    fn receiver_count(&self) -> usize {
722
0
        self.receiver_count.load(Ordering::Relaxed)
723
0
    }
724
}
725
726
/// A transmitting end of a channel.
727
pub struct Sender<T> {
728
    shared: Arc<Shared<T>>,
729
}
730
731
impl<T> Sender<T> {
732
    /// Attempt to send a value into the channel. If the channel is bounded and full, or all
733
    /// receivers have been dropped, an error is returned. If the channel associated with this
734
    /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
735
0
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
736
0
        self.shared.send_sync(msg, None).map_err(|err| match err {
737
0
            TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
738
0
            TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
739
0
            _ => unreachable!(),
740
0
        })
741
0
    }
742
743
    /// Send a value into the channel, returning an error if all receivers have been dropped.
744
    /// If the channel is bounded and is full, this method will block until space is available
745
    /// or all receivers have been dropped. If the channel is unbounded, this method will not
746
    /// block.
747
0
    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
748
0
        self.shared
749
0
            .send_sync(msg, Some(None))
750
0
            .map_err(|err| match err {
751
0
                TrySendTimeoutError::Disconnected(msg) => SendError(msg),
752
0
                _ => unreachable!(),
753
0
            })
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>>>::send::{closure#0}
Unexecuted instantiation: <flume::Sender<alloc::string::String>>::send::{closure#0}
Unexecuted instantiation: <flume::Sender<_>>::send::{closure#0}
754
0
    }
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>>>::send
Unexecuted instantiation: <flume::Sender<alloc::string::String>>::send
Unexecuted instantiation: <flume::Sender<_>>::send
755
756
0
    fn send_deadline_inner(
757
0
        &self,
758
0
        msg: T,
759
0
        deadline: Option<Instant>,
760
0
    ) -> Result<(), SendTimeoutError<T>> {
761
0
        self.shared
762
0
            .send_sync(msg, Some(deadline))
763
0
            .map_err(|err| match err {
764
0
                TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
765
0
                TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
766
0
                _ => unreachable!(),
767
0
            })
768
0
    }
769
770
    /// Send a value into the channel, returning an error if all receivers have been dropped
771
    /// or the deadline has passed. If the channel is bounded and is full, this method will
772
    /// block until space is available, the deadline is reached, or all receivers have been
773
    /// dropped.
774
0
    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
775
0
        self.send_deadline_inner(msg, Some(deadline))
776
0
    }
777
778
    /// Send a value into the channel, returning an error if all receivers have been dropped
779
    /// or the timeout has expired. If the channel is bounded and is full, this method will
780
    /// block until space is available, the timeout has expired, or all receivers have been
781
    /// dropped.
782
0
    pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
783
0
        self.send_deadline_inner(msg, Instant::now().checked_add(dur))
784
0
    }
785
786
    /// Returns true if all receivers for this channel have been dropped.
787
0
    pub fn is_disconnected(&self) -> bool {
788
0
        self.shared.is_disconnected()
789
0
    }
790
791
    /// Returns true if the channel is empty.
792
    /// Note: Zero-capacity channels are always empty.
793
0
    pub fn is_empty(&self) -> bool {
794
0
        self.shared.is_empty()
795
0
    }
796
797
    /// Returns true if the channel is full.
798
    /// Note: Zero-capacity channels are always full.
799
0
    pub fn is_full(&self) -> bool {
800
0
        self.shared.is_full()
801
0
    }
802
803
    /// Returns the number of messages in the channel
804
0
    pub fn len(&self) -> usize {
805
0
        self.shared.len()
806
0
    }
807
808
    /// If the channel is bounded, returns its capacity.
809
0
    pub fn capacity(&self) -> Option<usize> {
810
0
        self.shared.capacity()
811
0
    }
812
813
    /// Get the number of senders that currently exist, including this one.
814
0
    pub fn sender_count(&self) -> usize {
815
0
        self.shared.sender_count()
816
0
    }
817
818
    /// Get the number of receivers that currently exist.
819
    ///
820
    /// Note that this method makes no guarantees that a subsequent send will succeed; it's
821
    /// possible that between `receiver_count()` being called and a `send()`, all open receivers
822
    /// could drop.
823
0
    pub fn receiver_count(&self) -> usize {
824
0
        self.shared.receiver_count()
825
0
    }
826
827
    /// Creates a [`WeakSender`] that does not keep the channel open.
828
    ///
829
    /// The channel is closed once all `Sender`s are dropped, even if there
830
    /// are still active `WeakSender`s.
831
0
    pub fn downgrade(&self) -> WeakSender<T> {
832
0
        WeakSender {
833
0
            shared: Arc::downgrade(&self.shared),
834
0
        }
835
0
    }
836
837
    /// Returns whether the senders are belong to the same channel.
838
0
    pub fn same_channel(&self, other: &Sender<T>) -> bool {
839
0
        Arc::ptr_eq(&self.shared, &other.shared)
840
0
    }
841
}
842
843
impl<T> Clone for Sender<T> {
844
    /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
845
    /// contents will only be cleaned up when all senders and the receiver have been dropped.
846
0
    fn clone(&self) -> Self {
847
0
        self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
848
0
        Self {
849
0
            shared: self.shared.clone(),
850
0
        }
851
0
    }
852
}
853
854
impl<T> fmt::Debug for Sender<T> {
855
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
856
0
        f.debug_struct("Sender").finish()
857
0
    }
858
}
859
860
impl<T> Drop for Sender<T> {
861
0
    fn drop(&mut self) {
862
        // Notify receivers that all senders have been dropped if the number of senders drops to 0.
863
0
        if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
864
0
            self.shared.disconnect_all();
865
0
        }
866
0
    }
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Sender<alloc::string::String> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Sender<_> as core::ops::drop::Drop>::drop
867
}
868
869
/// A sender that does not prevent the channel from being closed.
870
///
871
/// Weak senders do not count towards the number of active senders on the channel. As soon as
872
/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
873
/// `WeakSender`.
874
///
875
/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`WeakSender::upgrade`]
876
/// method.
877
pub struct WeakSender<T> {
878
    shared: Weak<Shared<T>>,
879
}
880
881
impl<T> WeakSender<T> {
882
    /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
883
    ///
884
    /// Returns `None` if the channel was closed already. Note that a `Some` return value
885
    /// does not guarantee that the channel is still open.
886
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
887
0
        self.shared
888
0
            .upgrade()
889
            // check that there are still live senders
890
0
            .filter(|shared| {
891
0
                shared
892
0
                    .sender_count
893
0
                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
894
0
                        if count == 0 {
895
                            // all senders are closed already -> don't increase the sender count
896
0
                            None
897
                        } else {
898
                            // there is still at least one active sender
899
0
                            Some(count + 1)
900
                        }
901
0
                    })
902
0
                    .is_ok()
903
0
            })
904
0
            .map(|shared| Sender { shared })
905
0
    }
906
}
907
908
impl<T> fmt::Debug for WeakSender<T> {
909
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
910
0
        f.debug_struct("WeakSender").finish()
911
0
    }
912
}
913
914
impl<T> Clone for WeakSender<T> {
915
    /// Clones this [`WeakSender`].
916
0
    fn clone(&self) -> Self {
917
0
        Self {
918
0
            shared: self.shared.clone(),
919
0
        }
920
0
    }
921
}
922
923
/// The receiving end of a channel.
924
///
925
/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
926
/// Each message will only be received by a single receiver. This is useful for
927
/// implementing work stealing for concurrent programs.
928
pub struct Receiver<T> {
929
    shared: Arc<Shared<T>>,
930
}
931
932
impl<T> Receiver<T> {
933
    /// Attempt to fetch an incoming value from the channel associated with this receiver,
934
    /// returning an error if the channel is empty or if all senders have been dropped.
935
0
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
936
0
        self.shared.recv_sync(None).map_err(|err| match err {
937
0
            TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
938
0
            TryRecvTimeoutError::Empty => TryRecvError::Empty,
939
0
            _ => unreachable!(),
940
0
        })
941
0
    }
942
943
    /// Wait for an incoming value from the channel associated with this receiver, returning an
944
    /// error if all senders have been dropped.
945
0
    pub fn recv(&self) -> Result<T, RecvError> {
946
0
        self.shared.recv_sync(Some(None)).map_err(|err| match err {
947
0
            TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
948
0
            _ => unreachable!(),
949
0
        })
Unexecuted instantiation: <flume::Receiver<alloc::string::String>>::recv::{closure#0}
Unexecuted instantiation: <flume::Receiver<_>>::recv::{closure#0}
950
0
    }
Unexecuted instantiation: <flume::Receiver<alloc::string::String>>::recv
Unexecuted instantiation: <flume::Receiver<_>>::recv
951
952
0
    fn recv_deadline_inner(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
953
0
        self.shared
954
0
            .recv_sync(Some(deadline))
955
0
            .map_err(|err| match err {
956
0
                TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
957
0
                TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
958
0
                _ => unreachable!(),
959
0
            })
960
0
    }
961
962
    /// Wait for an incoming value from the channel associated with this receiver, returning an
963
    /// error if all senders have been dropped or the deadline has passed.
964
0
    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
965
0
        self.recv_deadline_inner(Some(deadline))
966
0
    }
967
968
    /// Wait for an incoming value from the channel associated with this receiver, returning an
969
    /// error if all senders have been dropped or the timeout has expired.
970
0
    pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
971
0
        self.recv_deadline_inner(Instant::now().checked_add(dur))
972
0
    }
973
974
    /// Create a blocking iterator over the values received on the channel that finishes iteration
975
    /// when all senders have been dropped.
976
    ///
977
    /// You can also create a self-owned iterator with [`Receiver::into_iter`].
978
0
    pub fn iter(&self) -> Iter<'_, T> {
979
0
        Iter { receiver: self }
980
0
    }
981
982
    /// A non-blocking iterator over the values received on the channel that finishes iteration
983
    /// when all senders have been dropped or the channel is empty.
984
0
    pub fn try_iter(&self) -> TryIter<'_, T> {
985
0
        TryIter { receiver: self }
986
0
    }
987
988
    /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
989
    /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
990
    /// the function has been called.
991
0
    pub fn drain(&self) -> Drain<'_, T> {
992
0
        let mut chan = wait_lock(&self.shared.chan);
993
0
        chan.pull_pending(false);
994
0
        let queue = std::mem::take(&mut chan.queue);
995
996
0
        Drain {
997
0
            queue,
998
0
            _phantom: PhantomData,
999
0
        }
1000
0
    }
1001
1002
    /// Returns true if all senders for this channel have been dropped.
1003
0
    pub fn is_disconnected(&self) -> bool {
1004
0
        self.shared.is_disconnected()
1005
0
    }
1006
1007
    /// Returns true if the channel is empty.
1008
    /// Note: Zero-capacity channels are always empty.
1009
0
    pub fn is_empty(&self) -> bool {
1010
0
        self.shared.is_empty()
1011
0
    }
1012
1013
    /// Returns true if the channel is full.
1014
    /// Note: Zero-capacity channels are always full.
1015
0
    pub fn is_full(&self) -> bool {
1016
0
        self.shared.is_full()
1017
0
    }
1018
1019
    /// Returns the number of messages in the channel.
1020
0
    pub fn len(&self) -> usize {
1021
0
        self.shared.len()
1022
0
    }
1023
1024
    /// If the channel is bounded, returns its capacity.
1025
0
    pub fn capacity(&self) -> Option<usize> {
1026
0
        self.shared.capacity()
1027
0
    }
1028
1029
    /// Get the number of senders that currently exist.
1030
0
    pub fn sender_count(&self) -> usize {
1031
0
        self.shared.sender_count()
1032
0
    }
1033
1034
    /// Get the number of receivers that currently exist, including this one.
1035
0
    pub fn receiver_count(&self) -> usize {
1036
0
        self.shared.receiver_count()
1037
0
    }
1038
1039
    /// Returns whether the receivers are belong to the same channel.
1040
0
    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1041
0
        Arc::ptr_eq(&self.shared, &other.shared)
1042
0
    }
1043
}
1044
1045
impl<T> Clone for Receiver<T> {
1046
    /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
1047
    /// channel contents will only be cleaned up when all senders and the receiver have been
1048
    /// dropped.
1049
    ///
1050
    /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
1051
    /// Each message will only be received by a single receiver. This is useful for
1052
    /// implementing work stealing for concurrent programs.
1053
0
    fn clone(&self) -> Self {
1054
0
        self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1055
0
        Self {
1056
0
            shared: self.shared.clone(),
1057
0
        }
1058
0
    }
1059
}
1060
1061
impl<T> fmt::Debug for Receiver<T> {
1062
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1063
0
        f.debug_struct("Receiver").finish()
1064
0
    }
1065
}
1066
1067
impl<T> Drop for Receiver<T> {
1068
0
    fn drop(&mut self) {
1069
        // Notify senders that all receivers have been dropped if the number of receivers drops
1070
        // to 0.
1071
0
        if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1072
0
            self.shared.disconnect_all();
1073
0
        }
1074
0
    }
Unexecuted instantiation: <flume::Receiver<alloc::sync::Arc<alloc::string::String>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Receiver<alloc::string::String> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Receiver<_> as core::ops::drop::Drop>::drop
1075
}
1076
1077
/// This exists as a shorthand for [`Receiver::iter`].
1078
impl<'a, T> IntoIterator for &'a Receiver<T> {
1079
    type Item = T;
1080
    type IntoIter = Iter<'a, T>;
1081
1082
0
    fn into_iter(self) -> Self::IntoIter {
1083
0
        Iter { receiver: self }
1084
0
    }
1085
}
1086
1087
impl<T> IntoIterator for Receiver<T> {
1088
    type Item = T;
1089
    type IntoIter = IntoIter<T>;
1090
1091
    /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
1092
0
    fn into_iter(self) -> Self::IntoIter {
1093
0
        IntoIter { receiver: self }
1094
0
    }
1095
}
1096
1097
/// An iterator over the msgs received from a channel.
1098
pub struct Iter<'a, T> {
1099
    receiver: &'a Receiver<T>,
1100
}
1101
1102
impl<'a, T> fmt::Debug for Iter<'a, T> {
1103
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1104
0
        f.debug_struct("Iter")
1105
0
            .field("receiver", &self.receiver)
1106
0
            .finish()
1107
0
    }
1108
}
1109
1110
impl<'a, T> Iterator for Iter<'a, T> {
1111
    type Item = T;
1112
1113
0
    fn next(&mut self) -> Option<Self::Item> {
1114
0
        self.receiver.recv().ok()
1115
0
    }
1116
}
1117
1118
/// An non-blocking iterator over the msgs received from a channel.
1119
pub struct TryIter<'a, T> {
1120
    receiver: &'a Receiver<T>,
1121
}
1122
1123
impl<'a, T> fmt::Debug for TryIter<'a, T> {
1124
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1125
0
        f.debug_struct("TryIter")
1126
0
            .field("receiver", &self.receiver)
1127
0
            .finish()
1128
0
    }
1129
}
1130
1131
impl<'a, T> Iterator for TryIter<'a, T> {
1132
    type Item = T;
1133
1134
0
    fn next(&mut self) -> Option<Self::Item> {
1135
0
        self.receiver.try_recv().ok()
1136
0
    }
1137
}
1138
1139
/// An fixed-sized iterator over the msgs drained from a channel.
1140
#[derive(Debug)]
1141
pub struct Drain<'a, T> {
1142
    queue: VecDeque<T>,
1143
    /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1144
    /// implementation may change and we don't want to unintentionally constrain it. Removing this
1145
    /// lifetime later is a possibility.
1146
    _phantom: PhantomData<&'a ()>,
1147
}
1148
1149
impl<'a, T> Iterator for Drain<'a, T> {
1150
    type Item = T;
1151
1152
0
    fn next(&mut self) -> Option<Self::Item> {
1153
0
        self.queue.pop_front()
1154
0
    }
1155
}
1156
1157
impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1158
0
    fn len(&self) -> usize {
1159
0
        self.queue.len()
1160
0
    }
1161
}
1162
1163
/// An owned iterator over the msgs received from a channel.
1164
pub struct IntoIter<T> {
1165
    receiver: Receiver<T>,
1166
}
1167
1168
impl<T> fmt::Debug for IntoIter<T> {
1169
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1170
0
        f.debug_struct("IntoIter")
1171
0
            .field("receiver", &self.receiver)
1172
0
            .finish()
1173
0
    }
1174
}
1175
1176
impl<T> Iterator for IntoIter<T> {
1177
    type Item = T;
1178
1179
0
    fn next(&mut self) -> Option<Self::Item> {
1180
0
        self.receiver.recv().ok()
1181
0
    }
1182
}
1183
1184
/// Create a channel with no maximum capacity.
1185
///
1186
/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
1187
/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1188
/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1189
/// may be cloned.
1190
///
1191
/// # Examples
1192
/// ```
1193
/// let (tx, rx) = flume::unbounded();
1194
///
1195
/// tx.send(42).unwrap();
1196
/// assert_eq!(rx.recv().unwrap(), 42);
1197
/// ```
1198
0
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1199
0
    let shared = Arc::new(Shared::new(None));
1200
0
    (
1201
0
        Sender {
1202
0
            shared: shared.clone(),
1203
0
        },
1204
0
        Receiver { shared },
1205
0
    )
1206
0
}
Unexecuted instantiation: flume::unbounded::<alloc::sync::Arc<alloc::string::String>>
Unexecuted instantiation: flume::unbounded::<alloc::string::String>
Unexecuted instantiation: flume::unbounded::<_>
1207
1208
/// Create a channel with a maximum capacity.
1209
///
1210
/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
1211
/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1212
/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1213
/// may be cloned.
1214
///
1215
/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1216
/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1217
/// is not desired, [`Sender::try_send`] may be used.
1218
///
1219
/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
1220
/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
1221
/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
1222
/// perform a handshake and transfer ownership of a value.
1223
///
1224
/// # Examples
1225
/// ```
1226
/// let (tx, rx) = flume::bounded(32);
1227
///
1228
/// for i in 1..33 {
1229
///     tx.send(i).unwrap();
1230
/// }
1231
/// assert!(tx.try_send(33).is_err());
1232
///
1233
/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
1234
/// ```
1235
0
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1236
0
    let shared = Arc::new(Shared::new(Some(cap)));
1237
0
    (
1238
0
        Sender {
1239
0
            shared: shared.clone(),
1240
0
        },
1241
0
        Receiver { shared },
1242
0
    )
1243
0
}