Coverage Report

Created: 2025-03-07 06:49

/rust/registry/src/index.crates.io-6f17d22bba15001f/flume-0.11.1/src/lib.rs
Line
Count
Source (jump to first uncovered line)
1
//! # Flume
2
//!
3
//! A blazingly fast multi-producer, multi-consumer channel.
4
//!
5
//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
6
//!
7
//! ## Why Flume?
8
//!
9
//! - **Featureful**: Unbounded, bounded and rendezvous queues
10
//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
11
//! - **Safe**: No `unsafe` code anywhere in the codebase!
12
//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
13
//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
14
//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
15
//! - **Simple**: Few dependencies, minimal codebase, fast to compile
16
//! - **Asynchronous**: `async` support, including mix 'n match with sync code
17
//! - **Ergonomic**: Powerful `select`-like interface
18
//!
19
//! ## Example
20
//!
21
//! ```
22
//! let (tx, rx) = flume::unbounded();
23
//!
24
//! tx.send(42).unwrap();
25
//! assert_eq!(rx.recv().unwrap(), 42);
26
//! ```
27
28
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
29
#![deny(missing_docs)]
30
31
#[cfg(feature = "select")]
32
pub mod select;
33
#[cfg(feature = "async")]
34
pub mod r#async;
35
36
mod signal;
37
38
// Reexports
39
#[cfg(feature = "select")]
40
pub use select::Selector;
41
42
use std::{
43
    collections::VecDeque,
44
    sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
45
    time::{Duration, Instant},
46
    marker::PhantomData,
47
    thread,
48
    fmt,
49
};
50
use std::fmt::Formatter;
51
#[cfg(feature = "spin")]
52
use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
53
use crate::signal::{Signal, SyncSignal};
54
55
/// An error that may be emitted when attempting to send a value into a channel on a sender when
56
/// all receivers are dropped.
57
#[derive(Copy, Clone, PartialEq, Eq)]
58
pub struct SendError<T>(pub T);
59
60
impl<T> SendError<T> {
61
    /// Consume the error, yielding the message that failed to send.
62
0
    pub fn into_inner(self) -> T { self.0 }
63
}
64
65
impl<T> fmt::Debug for SendError<T> {
66
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67
0
        "SendError(..)".fmt(f)
68
0
    }
69
}
70
71
impl<T> fmt::Display for SendError<T> {
72
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73
0
        "sending on a closed channel".fmt(f)
74
0
    }
75
}
76
77
impl<T> std::error::Error for SendError<T> {}
78
79
/// An error that may be emitted when attempting to send a value into a channel on a sender when
80
/// the channel is full or all receivers are dropped.
81
#[derive(Copy, Clone, PartialEq, Eq)]
82
pub enum TrySendError<T> {
83
    /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
84
    Full(T),
85
    /// All channel receivers were dropped and so the message has nobody to receive it.
86
    Disconnected(T),
87
}
88
89
impl<T> TrySendError<T> {
90
    /// Consume the error, yielding the message that failed to send.
91
0
    pub fn into_inner(self) -> T {
92
0
        match self {
93
0
            Self::Full(msg) | Self::Disconnected(msg) => msg,
94
0
        }
95
0
    }
96
}
97
98
impl<T> fmt::Debug for TrySendError<T> {
99
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100
0
        match *self {
101
0
            TrySendError::Full(..) => "Full(..)".fmt(f),
102
0
            TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
103
        }
104
0
    }
105
}
106
107
impl<T> fmt::Display for TrySendError<T> {
108
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109
0
        match self {
110
0
            TrySendError::Full(..) => "sending on a full channel".fmt(f),
111
0
            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
112
        }
113
0
    }
114
}
115
116
impl<T> std::error::Error for TrySendError<T> {}
117
118
impl<T> From<SendError<T>> for TrySendError<T> {
119
0
    fn from(err: SendError<T>) -> Self {
120
0
        match err {
121
0
            SendError(item) => Self::Disconnected(item),
122
0
        }
123
0
    }
124
}
125
126
/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
127
/// the send operation times out or all receivers are dropped.
128
#[derive(Copy, Clone, PartialEq, Eq)]
129
pub enum SendTimeoutError<T> {
130
    /// A timeout occurred when attempting to send the message.
131
    Timeout(T),
132
    /// All channel receivers were dropped and so the message has nobody to receive it.
133
    Disconnected(T),
134
}
135
136
impl<T> SendTimeoutError<T> {
137
    /// Consume the error, yielding the message that failed to send.
138
0
    pub fn into_inner(self) -> T {
139
0
        match self {
140
0
            Self::Timeout(msg) | Self::Disconnected(msg) => msg,
141
0
        }
142
0
    }
143
}
144
145
impl<T> fmt::Debug for SendTimeoutError<T> {
146
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147
0
        "SendTimeoutError(..)".fmt(f)
148
0
    }
149
}
150
151
impl<T> fmt::Display for SendTimeoutError<T> {
152
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153
0
        match self {
154
0
            SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
155
0
            SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
156
        }
157
0
    }
158
}
159
160
impl<T> std::error::Error for SendTimeoutError<T> {}
161
162
impl<T> From<SendError<T>> for SendTimeoutError<T> {
163
0
    fn from(err: SendError<T>) -> Self {
164
0
        match err {
165
0
            SendError(item) => Self::Disconnected(item),
166
0
        }
167
0
    }
168
}
169
170
enum TrySendTimeoutError<T> {
171
    Full(T),
172
    Disconnected(T),
173
    Timeout(T),
174
}
175
176
/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
177
/// are dropped and there are no more messages in the channel.
178
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
179
pub enum RecvError {
180
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
181
    Disconnected,
182
}
183
184
impl fmt::Display for RecvError {
185
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186
0
        match self {
187
0
            RecvError::Disconnected => "receiving on a closed channel".fmt(f),
188
0
        }
189
0
    }
190
}
191
192
impl std::error::Error for RecvError {}
193
194
/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
195
/// messages in the channel. If there are no messages in the channel and all senders are dropped,
196
/// then `TryRecvError::Disconnected` will be returned.
197
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
198
pub enum TryRecvError {
199
    /// The channel was empty when the receive was attempted.
200
    Empty,
201
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
202
    Disconnected,
203
}
204
205
impl fmt::Display for TryRecvError {
206
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207
0
        match self {
208
0
            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
209
0
            TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
210
        }
211
0
    }
212
}
213
214
impl std::error::Error for TryRecvError {}
215
216
impl From<RecvError> for TryRecvError {
217
0
    fn from(err: RecvError) -> Self {
218
0
        match err {
219
0
            RecvError::Disconnected => Self::Disconnected,
220
0
        }
221
0
    }
222
}
223
224
/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
225
/// when the receive operation times out or all senders are dropped and there are no values left
226
/// in the channel.
227
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228
pub enum RecvTimeoutError {
229
    /// A timeout occurred when attempting to receive a message.
230
    Timeout,
231
    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
232
    Disconnected,
233
}
234
235
impl fmt::Display for RecvTimeoutError {
236
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237
0
        match self {
238
0
            RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
239
0
            RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
240
        }
241
0
    }
242
}
243
244
impl std::error::Error for RecvTimeoutError {}
245
246
impl From<RecvError> for RecvTimeoutError {
247
0
    fn from(err: RecvError) -> Self {
248
0
        match err {
249
0
            RecvError::Disconnected => Self::Disconnected,
250
0
        }
251
0
    }
252
}
253
254
enum TryRecvTimeoutError {
255
    Empty,
256
    Timeout,
257
    Disconnected,
258
}
259
260
// TODO: Investigate some sort of invalidation flag for timeouts
261
#[cfg(feature = "spin")]
262
struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
263
264
#[cfg(not(feature = "spin"))]
265
struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
266
267
#[cfg(feature = "spin")]
268
impl<T, S: ?Sized + Signal> Hook<T, S> {
269
    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
270
    where
271
        S: Sized,
272
    {
273
        Arc::new(Self(Some(Spinlock::new(msg)), signal))
274
    }
275
276
    fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
277
        self.0.as_ref().map(|s| s.lock())
278
    }
279
}
280
281
#[cfg(not(feature = "spin"))]
282
impl<T, S: ?Sized + Signal> Hook<T, S> {
283
0
    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
284
0
    where
285
0
        S: Sized,
286
0
    {
287
0
        Arc::new(Self(Some(Mutex::new(msg)), signal))
288
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::slot
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::slot
Unexecuted instantiation: <flume::Hook<_, _>>::slot
289
290
0
    fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
291
0
        self.0.as_ref().map(|s| s.lock().unwrap())
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::lock::{closure#0}
Unexecuted instantiation: <flume::Hook<_, _>>::lock::{closure#0}
292
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::lock
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::lock
Unexecuted instantiation: <flume::Hook<_, _>>::lock
293
}
294
295
impl<T, S: ?Sized + Signal> Hook<T, S> {
296
0
    pub fn fire_recv(&self) -> (T, &S) {
297
0
        let msg = self.lock().unwrap().take().unwrap();
298
0
        (msg, self.signal())
299
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::fire_recv
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::fire_recv
Unexecuted instantiation: <flume::Hook<_, _>>::fire_recv
300
301
0
    pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
302
0
        let ret = match self.lock() {
303
0
            Some(mut lock) => {
304
0
                *lock = Some(msg);
305
0
                None
306
            }
307
0
            None => Some(msg),
308
        };
309
0
        (ret, self.signal())
310
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::fire_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::fire_send
Unexecuted instantiation: <flume::Hook<_, _>>::fire_send
311
312
0
    pub fn is_empty(&self) -> bool {
313
0
        self.lock().map(|s| s.is_none()).unwrap_or(true)
314
0
    }
315
316
0
    pub fn try_take(&self) -> Option<T> {
317
0
        self.lock().unwrap().take()
318
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::try_take
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::try_take
Unexecuted instantiation: <flume::Hook<_, _>>::try_take
319
320
0
    pub fn trigger(signal: S) -> Arc<Self>
321
0
    where
322
0
        S: Sized,
323
0
    {
324
0
        Arc::new(Self(None, signal))
325
0
    }
326
327
0
    pub fn signal(&self) -> &S {
328
0
        &self.1
329
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, dyn flume::signal::Signal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::string::String, dyn flume::signal::Signal>>::signal
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::signal
Unexecuted instantiation: <flume::Hook<_, _>>::signal
330
331
0
    pub fn fire_nothing(&self) -> bool {
332
0
        self.signal().fire()
333
0
    }
334
}
335
336
impl<T> Hook<T, SyncSignal> {
337
0
    pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
338
        loop {
339
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
340
0
            let msg = self.lock().unwrap().take();
341
0
            if let Some(msg) = msg {
342
0
                break Some(msg);
343
0
            } else if disconnected {
344
0
                break None;
345
            } else {
346
0
                self.signal().wait()
347
            }
348
        }
349
0
    }
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_recv
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_recv
350
351
    // Err(true) if timeout
352
0
    pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
353
        loop {
354
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
355
0
            let msg = self.lock().unwrap().take();
356
0
            if let Some(msg) = msg {
357
0
                break Ok(msg);
358
0
            } else if disconnected {
359
0
                break Err(false);
360
0
            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
361
0
                self.signal().wait_timeout(dur);
362
0
            } else {
363
0
                break Err(true);
364
            }
365
        }
366
0
    }
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_deadline_recv
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_deadline_recv
367
368
0
    pub fn wait_send(&self, abort: &AtomicBool) {
369
        loop {
370
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
371
0
            if disconnected || self.lock().unwrap().is_none() {
372
0
                break;
373
0
            }
374
0
375
0
            self.signal().wait();
376
        }
377
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::wait_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_send
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_send
378
379
    // Err(true) if timeout
380
0
    pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
381
        loop {
382
0
            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
383
0
            if self.lock().unwrap().is_none() {
384
0
                break Ok(());
385
0
            } else if disconnected {
386
0
                break Err(false);
387
0
            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
388
0
                self.signal().wait_timeout(dur);
389
0
            } else {
390
0
                break Err(true);
391
            }
392
        }
393
0
    }
Unexecuted instantiation: <flume::Hook<alloc::sync::Arc<alloc::string::String>, flume::signal::SyncSignal>>::wait_deadline_send
Unexecuted instantiation: <flume::Hook<alloc::string::String, flume::signal::SyncSignal>>::wait_deadline_send
Unexecuted instantiation: <flume::Hook<_, flume::signal::SyncSignal>>::wait_deadline_send
394
}
395
396
#[cfg(feature = "spin")]
397
#[inline]
398
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
399
    // Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
400
    // running in the main thread of a web browser) so we only use it on targets where we know it
401
    // will work
402
    #[cfg(any(target_family = "unix", target_family = "windows"))]
403
    {
404
        let mut i = 4;
405
        loop {
406
            for _ in 0..10 {
407
                if let Some(guard) = lock.try_lock() {
408
                    return guard;
409
                }
410
                thread::yield_now();
411
            }
412
            // Sleep for at most ~1 ms
413
            thread::sleep(Duration::from_nanos(1 << i.min(20)));
414
            i += 1;
415
        }
416
    }
417
    #[cfg(not(any(target_family = "unix", target_family = "windows")))]
418
    lock.lock()
419
}
420
421
#[cfg(not(feature = "spin"))]
422
#[inline]
423
0
fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
424
0
    lock.lock().unwrap()
425
0
}
Unexecuted instantiation: flume::wait_lock::<flume::Chan<alloc::sync::Arc<alloc::string::String>>>
Unexecuted instantiation: flume::wait_lock::<flume::Chan<alloc::string::String>>
Unexecuted instantiation: flume::wait_lock::<_>
426
427
#[cfg(not(feature = "spin"))]
428
use std::sync::{Mutex, MutexGuard};
429
430
#[cfg(feature = "spin")]
431
type ChanLock<T> = Spinlock<T>;
432
#[cfg(not(feature = "spin"))]
433
type ChanLock<T> = Mutex<T>;
434
435
436
type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
437
struct Chan<T> {
438
    sending: Option<(usize, SignalVec<T>)>,
439
    queue: VecDeque<T>,
440
    waiting: SignalVec<T>,
441
}
442
443
impl<T> Chan<T> {
444
0
    fn pull_pending(&mut self, pull_extra: bool) {
445
0
        if let Some((cap, sending)) = &mut self.sending {
446
0
            let effective_cap = *cap + pull_extra as usize;
447
448
0
            while self.queue.len() < effective_cap {
449
0
                if let Some(s) = sending.pop_front() {
450
0
                    let (msg, signal) = s.fire_recv();
451
0
                    signal.fire();
452
0
                    self.queue.push_back(msg);
453
0
                } else {
454
0
                    break;
455
                }
456
            }
457
0
        }
458
0
    }
Unexecuted instantiation: <flume::Chan<alloc::sync::Arc<alloc::string::String>>>::pull_pending
Unexecuted instantiation: <flume::Chan<alloc::string::String>>::pull_pending
Unexecuted instantiation: <flume::Chan<_>>::pull_pending
459
460
0
    fn try_wake_receiver_if_pending(&mut self) {
461
0
        if !self.queue.is_empty() {
462
0
            while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
463
0
        }
464
0
    }
465
}
466
467
struct Shared<T> {
468
    chan: ChanLock<Chan<T>>,
469
    disconnected: AtomicBool,
470
    sender_count: AtomicUsize,
471
    receiver_count: AtomicUsize,
472
}
473
474
impl<T> Shared<T> {
475
0
    fn new(cap: Option<usize>) -> Self {
476
0
        Self {
477
0
            chan: ChanLock::new(Chan {
478
0
                sending: cap.map(|cap| (cap, VecDeque::new())),
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::new::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::new::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::new::{closure#0}
479
0
                queue: VecDeque::new(),
480
0
                waiting: VecDeque::new(),
481
0
            }),
482
0
            disconnected: AtomicBool::new(false),
483
0
            sender_count: AtomicUsize::new(1),
484
0
            receiver_count: AtomicUsize::new(1),
485
0
        }
486
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::new
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::new
Unexecuted instantiation: <flume::Shared<_>>::new
487
488
0
    fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
489
0
        &self,
490
0
        msg: T,
491
0
        should_block: bool,
492
0
        make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
493
0
        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
494
0
    ) -> R {
495
0
        let mut chan = wait_lock(&self.chan);
496
0
497
0
        if self.is_disconnected() {
498
0
            Err(TrySendTimeoutError::Disconnected(msg)).into()
499
0
        } else if !chan.waiting.is_empty() {
500
0
            let mut msg = Some(msg);
501
502
            loop {
503
0
                let slot = chan.waiting.pop_front();
504
0
                match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>::{closure#0}
505
                    // No more waiting receivers and msg in queue, so break out of the loop
506
0
                    None if msg.is_none() => break,
507
                    // No more waiting receivers, so add msg to queue and break out of the loop
508
                    None => {
509
0
                        chan.queue.push_back(msg.unwrap());
510
0
                        break;
511
                    }
512
0
                    Some((Some(m), signal)) => {
513
0
                        if signal.fire() {
514
                            // Was async and a stream, so didn't acquire the message. Wake another
515
                            // receiver, and do not yet push the message.
516
0
                            msg.replace(m);
517
0
                            continue;
518
                        } else {
519
                            // Was async and not a stream, so it did acquire the message. Push the
520
                            // message to the queue for it to be received.
521
0
                            chan.queue.push_back(m);
522
0
                            drop(chan);
523
0
                            break;
524
                        }
525
                    },
526
0
                    Some((None, signal)) => {
527
0
                        drop(chan);
528
0
                        signal.fire();
529
0
                        break; // Was sync, so it has acquired the message
530
                    },
531
                }
532
            }
533
534
0
            Ok(()).into()
535
0
        } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>::{closure#1}
536
0
            chan.queue.push_back(msg);
537
0
            Ok(()).into()
538
0
        } else if should_block { // Only bounded from here on
539
0
            let hook = make_signal(msg);
540
0
            chan.sending.as_mut().unwrap().1.push_back(hook.clone());
541
0
            drop(chan);
542
0
543
0
            do_block(hook)
544
        } else {
545
0
            Err(TrySendTimeoutError::Full(msg)).into()
546
        }
547
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::sync::Arc<alloc::string::String>>>, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}, <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send::<flume::signal::SyncSignal, core::result::Result<(), flume::TrySendTimeoutError<alloc::string::String>>, <flume::Shared<alloc::string::String>>::send_sync::{closure#0}, <flume::Shared<alloc::string::String>>::send_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<_>>::send::<_, _, _, _>
548
549
0
    fn send_sync(
550
0
        &self,
551
0
        msg: T,
552
0
        block: Option<Option<Instant>>,
553
0
    ) -> Result<(), TrySendTimeoutError<T>> {
554
0
        self.send(
555
0
            // msg
556
0
            msg,
557
0
            // should_block
558
0
            block.is_some(),
559
0
            // make_signal
560
0
            |msg| Hook::slot(Some(msg), SyncSignal::default()),
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#0}
561
0
            // do_block
562
0
            |hook| if let Some(deadline) = block.unwrap() {
563
0
                hook.wait_deadline_send(&self.disconnected, deadline)
564
0
                    .or_else(|timed_out| {
565
0
                        if timed_out { // Remove our signal
566
0
                            let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
567
0
                            wait_lock(&self.chan).sending
568
0
                                .as_mut()
569
0
                                .unwrap().1
570
0
                                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}::{closure#0}
571
0
                        }
572
0
                        hook.try_take().map(|msg| if self.is_disconnected() {
573
0
                            Err(TrySendTimeoutError::Disconnected(msg))
574
                        } else {
575
0
                            Err(TrySendTimeoutError::Timeout(msg))
576
0
                        })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}::{closure#1}
577
0
                        .unwrap_or(Ok(()))
578
0
                    })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}::{closure#0}
579
            } else {
580
0
                hook.wait_send(&self.disconnected);
581
0
582
0
                match hook.try_take() {
583
0
                    Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
584
0
                    None => Ok(()),
585
                }
586
0
            },
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::send_sync::{closure#1}
587
0
        )
588
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::send_sync
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::send_sync
Unexecuted instantiation: <flume::Shared<_>>::send_sync
589
590
0
    fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
591
0
        &self,
592
0
        should_block: bool,
593
0
        make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
594
0
        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
595
0
    ) -> R {
596
0
        let mut chan = wait_lock(&self.chan);
597
0
        chan.pull_pending(true);
598
599
0
        if let Some(msg) = chan.queue.pop_front() {
600
0
            drop(chan);
601
0
            Ok(msg).into()
602
0
        } else if self.is_disconnected() {
603
0
            drop(chan);
604
0
            Err(TryRecvTimeoutError::Disconnected).into()
605
0
        } else if should_block {
606
0
            let hook = make_signal();
607
0
            chan.waiting.push_back(hook.clone());
608
0
            drop(chan);
609
0
610
0
            do_block(hook)
611
        } else {
612
0
            drop(chan);
613
0
            Err(TryRecvTimeoutError::Empty).into()
614
        }
615
0
    }
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv::<flume::signal::SyncSignal, core::result::Result<alloc::string::String, flume::TryRecvTimeoutError>, <flume::Shared<alloc::string::String>>::recv_sync::{closure#0}, <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}>
Unexecuted instantiation: <flume::Shared<_>>::recv::<_, _, _, _>
616
617
0
    fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
618
0
        self.recv(
619
0
            // should_block
620
0
            block.is_some(),
621
0
            // make_signal
622
0
            || Hook::slot(None, SyncSignal::default()),
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#0}
623
0
            // do_block
624
0
            |hook| if let Some(deadline) = block.unwrap() {
625
0
                hook.wait_deadline_recv(&self.disconnected, deadline)
626
0
                    .or_else(|timed_out| {
627
0
                        if timed_out { // Remove our signal
628
0
                            let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
629
0
                            wait_lock(&self.chan).waiting
630
0
                                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#0}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#0}::{closure#0}
631
0
                        }
632
0
                        match hook.try_take() {
633
0
                            Some(msg) => Ok(msg),
634
                            None => {
635
0
                                let disconnected = self.is_disconnected(); // Check disconnect *before* msg
636
0
                                if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
637
0
                                    Ok(msg)
638
0
                                } else if disconnected {
639
0
                                    Err(TryRecvTimeoutError::Disconnected)
640
                                } else {
641
0
                                    Err(TryRecvTimeoutError::Timeout)
642
                                }
643
                            },
644
                        }
645
0
                    })
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#0}
646
            } else {
647
0
                hook.wait_recv(&self.disconnected)
648
0
                    .or_else(|| wait_lock(&self.chan).queue.pop_front())
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}::{closure#1}
649
0
                    .ok_or(TryRecvTimeoutError::Disconnected)
650
0
            },
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::recv_sync::{closure#1}
651
0
        )
652
0
    }
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::recv_sync
Unexecuted instantiation: <flume::Shared<_>>::recv_sync
653
654
    /// Disconnect anything listening on this channel (this will not prevent receivers receiving
655
    /// msgs that have already been sent)
656
0
    fn disconnect_all(&self) {
657
0
        self.disconnected.store(true, Ordering::Relaxed);
658
0
659
0
        let mut chan = wait_lock(&self.chan);
660
0
        chan.pull_pending(false);
661
0
        if let Some((_, sending)) = chan.sending.as_ref() {
662
0
            sending.iter().for_each(|hook| {
663
0
                hook.signal().fire();
664
0
            })
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all::{closure#0}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all::{closure#0}
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all::{closure#0}
665
0
        }
666
0
        chan.waiting.iter().for_each(|hook| {
667
0
            hook.signal().fire();
668
0
        });
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all::{closure#1}
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all::{closure#1}
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all::{closure#1}
669
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::disconnect_all
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::disconnect_all
Unexecuted instantiation: <flume::Shared<_>>::disconnect_all
670
671
0
    fn is_disconnected(&self) -> bool {
672
0
        self.disconnected.load(Ordering::SeqCst)
673
0
    }
Unexecuted instantiation: <flume::Shared<alloc::sync::Arc<alloc::string::String>>>::is_disconnected
Unexecuted instantiation: <flume::Shared<alloc::string::String>>::is_disconnected
Unexecuted instantiation: <flume::Shared<_>>::is_disconnected
674
675
0
    fn is_empty(&self) -> bool {
676
0
        self.len() == 0
677
0
    }
678
679
0
    fn is_full(&self) -> bool {
680
0
        self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
681
0
    }
682
683
0
    fn len(&self) -> usize {
684
0
        let mut chan = wait_lock(&self.chan);
685
0
        chan.pull_pending(false);
686
0
        chan.queue.len()
687
0
    }
688
689
0
    fn capacity(&self) -> Option<usize> {
690
0
        wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
691
0
    }
692
693
0
    fn sender_count(&self) -> usize {
694
0
        self.sender_count.load(Ordering::Relaxed)
695
0
    }
696
697
0
    fn receiver_count(&self) -> usize {
698
0
        self.receiver_count.load(Ordering::Relaxed)
699
0
    }
700
}
701
702
/// A transmitting end of a channel.
703
pub struct Sender<T> {
704
    shared: Arc<Shared<T>>,
705
}
706
707
impl<T> Sender<T> {
708
    /// Attempt to send a value into the channel. If the channel is bounded and full, or all
709
    /// receivers have been dropped, an error is returned. If the channel associated with this
710
    /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
711
0
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
712
0
        self.shared.send_sync(msg, None).map_err(|err| match err {
713
0
            TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
714
0
            TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
715
0
            _ => unreachable!(),
716
0
        })
717
0
    }
718
719
    /// Send a value into the channel, returning an error if all receivers have been dropped.
720
    /// If the channel is bounded and is full, this method will block until space is available
721
    /// or all receivers have been dropped. If the channel is unbounded, this method will not
722
    /// block.
723
0
    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
724
0
        self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
725
0
            TrySendTimeoutError::Disconnected(msg) => SendError(msg),
726
0
            _ => unreachable!(),
727
0
        })
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>>>::send::{closure#0}
Unexecuted instantiation: <flume::Sender<alloc::string::String>>::send::{closure#0}
Unexecuted instantiation: <flume::Sender<_>>::send::{closure#0}
728
0
    }
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>>>::send
Unexecuted instantiation: <flume::Sender<alloc::string::String>>::send
Unexecuted instantiation: <flume::Sender<_>>::send
729
730
    /// Send a value into the channel, returning an error if all receivers have been dropped
731
    /// or the deadline has passed. If the channel is bounded and is full, this method will
732
    /// block until space is available, the deadline is reached, or all receivers have been
733
    /// dropped.
734
0
    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
735
0
        self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
736
0
            TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
737
0
            TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
738
0
            _ => unreachable!(),
739
0
        })
740
0
    }
741
742
    /// Send a value into the channel, returning an error if all receivers have been dropped
743
    /// or the timeout has expired. If the channel is bounded and is full, this method will
744
    /// block until space is available, the timeout has expired, or all receivers have been
745
    /// dropped.
746
0
    pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
747
0
        self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
748
0
    }
749
750
    /// Returns true if all receivers for this channel have been dropped.
751
0
    pub fn is_disconnected(&self) -> bool {
752
0
        self.shared.is_disconnected()
753
0
    }
754
755
    /// Returns true if the channel is empty.
756
    /// Note: Zero-capacity channels are always empty.
757
0
    pub fn is_empty(&self) -> bool {
758
0
        self.shared.is_empty()
759
0
    }
760
761
    /// Returns true if the channel is full.
762
    /// Note: Zero-capacity channels are always full.
763
0
    pub fn is_full(&self) -> bool {
764
0
        self.shared.is_full()
765
0
    }
766
767
    /// Returns the number of messages in the channel
768
0
    pub fn len(&self) -> usize {
769
0
        self.shared.len()
770
0
    }
771
772
    /// If the channel is bounded, returns its capacity.
773
0
    pub fn capacity(&self) -> Option<usize> {
774
0
        self.shared.capacity()
775
0
    }
776
777
    /// Get the number of senders that currently exist, including this one.
778
0
    pub fn sender_count(&self) -> usize {
779
0
        self.shared.sender_count()
780
0
    }
781
782
    /// Get the number of receivers that currently exist.
783
    ///
784
    /// Note that this method makes no guarantees that a subsequent send will succeed; it's
785
    /// possible that between `receiver_count()` being called and a `send()`, all open receivers
786
    /// could drop.
787
0
    pub fn receiver_count(&self) -> usize {
788
0
        self.shared.receiver_count()
789
0
    }
790
791
    /// Creates a [`WeakSender`] that does not keep the channel open.
792
    ///
793
    /// The channel is closed once all `Sender`s are dropped, even if there
794
    /// are still active `WeakSender`s.
795
0
    pub fn downgrade(&self) -> WeakSender<T> {
796
0
        WeakSender {
797
0
            shared: Arc::downgrade(&self.shared),
798
0
        }
799
0
    }
800
801
    /// Returns whether the senders are belong to the same channel.
802
0
    pub fn same_channel(&self, other: &Sender<T>) -> bool {
803
0
        Arc::ptr_eq(&self.shared, &other.shared)
804
0
    }
805
}
806
807
impl<T> Clone for Sender<T> {
808
    /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
809
    /// contents will only be cleaned up when all senders and the receiver have been dropped.
810
0
    fn clone(&self) -> Self {
811
0
        self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
812
0
        Self { shared: self.shared.clone() }
813
0
    }
814
}
815
816
impl<T> fmt::Debug for Sender<T> {
817
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
818
0
        f.debug_struct("Sender").finish()
819
0
    }
820
}
821
822
impl<T> Drop for Sender<T> {
823
0
    fn drop(&mut self) {
824
0
        // Notify receivers that all senders have been dropped if the number of senders drops to 0.
825
0
        if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
826
0
            self.shared.disconnect_all();
827
0
        }
828
0
    }
Unexecuted instantiation: <flume::Sender<alloc::sync::Arc<alloc::string::String>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Sender<alloc::string::String> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Sender<_> as core::ops::drop::Drop>::drop
829
}
830
831
/// A sender that does not prevent the channel from being closed.
832
///
833
/// Weak senders do not count towards the number of active senders on the channel. As soon as
834
/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
835
/// `WeakSender`.
836
///
837
/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
838
/// method.
839
pub struct WeakSender<T> {
840
    shared: Weak<Shared<T>>,
841
}
842
843
impl<T> WeakSender<T> {
844
    /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
845
    ///
846
    /// Returns `None` if the channel was closed already. Note that a `Some` return value
847
    /// does not guarantee that the channel is still open.
848
0
    pub fn upgrade(&self) -> Option<Sender<T>> {
849
0
        self.shared
850
0
            .upgrade()
851
0
            // check that there are still live senders
852
0
            .filter(|shared| {
853
0
                shared
854
0
                    .sender_count
855
0
                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
856
0
                        if count == 0 {
857
                            // all senders are closed already -> don't increase the sender count
858
0
                            None
859
                        } else {
860
                            // there is still at least one active sender
861
0
                            Some(count + 1)
862
                        }
863
0
                    })
864
0
                    .is_ok()
865
0
            })
866
0
            .map(|shared| Sender { shared })
867
0
    }
868
}
869
870
impl<T> fmt::Debug for WeakSender<T> {
871
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
872
0
        f.debug_struct("WeakSender").finish()
873
0
    }
874
}
875
876
impl<T> Clone for WeakSender<T> {
877
    /// Clones this [`WeakSender`].
878
0
    fn clone(&self) -> Self {
879
0
        Self { shared: self.shared.clone() }
880
0
    }
881
}
882
883
/// The receiving end of a channel.
884
///
885
/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
886
/// Each message will only be received by a single receiver. This is useful for
887
/// implementing work stealing for concurrent programs.
888
pub struct Receiver<T> {
889
    shared: Arc<Shared<T>>,
890
}
891
892
impl<T> Receiver<T> {
893
    /// Attempt to fetch an incoming value from the channel associated with this receiver,
894
    /// returning an error if the channel is empty or if all senders have been dropped.
895
0
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
896
0
        self.shared.recv_sync(None).map_err(|err| match err {
897
0
            TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
898
0
            TryRecvTimeoutError::Empty => TryRecvError::Empty,
899
0
            _ => unreachable!(),
900
0
        })
901
0
    }
902
903
    /// Wait for an incoming value from the channel associated with this receiver, returning an
904
    /// error if all senders have been dropped.
905
0
    pub fn recv(&self) -> Result<T, RecvError> {
906
0
        self.shared.recv_sync(Some(None)).map_err(|err| match err {
907
0
            TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
908
0
            _ => unreachable!(),
909
0
        })
Unexecuted instantiation: <flume::Receiver<alloc::string::String>>::recv::{closure#0}
Unexecuted instantiation: <flume::Receiver<_>>::recv::{closure#0}
910
0
    }
Unexecuted instantiation: <flume::Receiver<alloc::string::String>>::recv
Unexecuted instantiation: <flume::Receiver<_>>::recv
911
912
    /// Wait for an incoming value from the channel associated with this receiver, returning an
913
    /// error if all senders have been dropped or the deadline has passed.
914
0
    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
915
0
        self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
916
0
            TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
917
0
            TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
918
0
            _ => unreachable!(),
919
0
        })
920
0
    }
921
922
    /// Wait for an incoming value from the channel associated with this receiver, returning an
923
    /// error if all senders have been dropped or the timeout has expired.
924
0
    pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
925
0
        self.recv_deadline(Instant::now().checked_add(dur).unwrap())
926
0
    }
927
928
    /// Create a blocking iterator over the values received on the channel that finishes iteration
929
    /// when all senders have been dropped.
930
    ///
931
    /// You can also create a self-owned iterator with [`Receiver::into_iter`].
932
0
    pub fn iter(&self) -> Iter<T> {
933
0
        Iter { receiver: &self }
934
0
    }
935
936
    /// A non-blocking iterator over the values received on the channel that finishes iteration
937
    /// when all senders have been dropped or the channel is empty.
938
0
    pub fn try_iter(&self) -> TryIter<T> {
939
0
        TryIter { receiver: &self }
940
0
    }
941
942
    /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
943
    /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
944
    /// the function has been called.
945
0
    pub fn drain(&self) -> Drain<T> {
946
0
        let mut chan = wait_lock(&self.shared.chan);
947
0
        chan.pull_pending(false);
948
0
        let queue = std::mem::take(&mut chan.queue);
949
0
950
0
        Drain { queue, _phantom: PhantomData }
951
0
    }
952
953
    /// Returns true if all senders for this channel have been dropped.
954
0
    pub fn is_disconnected(&self) -> bool {
955
0
        self.shared.is_disconnected()
956
0
    }
957
958
    /// Returns true if the channel is empty.
959
    /// Note: Zero-capacity channels are always empty.
960
0
    pub fn is_empty(&self) -> bool {
961
0
        self.shared.is_empty()
962
0
    }
963
964
    /// Returns true if the channel is full.
965
    /// Note: Zero-capacity channels are always full.
966
0
    pub fn is_full(&self) -> bool {
967
0
        self.shared.is_full()
968
0
    }
969
970
    /// Returns the number of messages in the channel.
971
0
    pub fn len(&self) -> usize {
972
0
        self.shared.len()
973
0
    }
974
975
    /// If the channel is bounded, returns its capacity.
976
0
    pub fn capacity(&self) -> Option<usize> {
977
0
        self.shared.capacity()
978
0
    }
979
980
    /// Get the number of senders that currently exist.
981
0
    pub fn sender_count(&self) -> usize {
982
0
        self.shared.sender_count()
983
0
    }
984
985
    /// Get the number of receivers that currently exist, including this one.
986
0
    pub fn receiver_count(&self) -> usize {
987
0
        self.shared.receiver_count()
988
0
    }
989
990
    /// Returns whether the receivers are belong to the same channel.
991
0
    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
992
0
        Arc::ptr_eq(&self.shared, &other.shared)
993
0
    }
994
}
995
996
impl<T> Clone for Receiver<T> {
997
    /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
998
    /// channel contents will only be cleaned up when all senders and the receiver have been
999
    /// dropped.
1000
    ///
1001
    /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
1002
    /// Each message will only be received by a single receiver. This is useful for
1003
    /// implementing work stealing for concurrent programs.
1004
0
    fn clone(&self) -> Self {
1005
0
        self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1006
0
        Self { shared: self.shared.clone() }
1007
0
    }
1008
}
1009
1010
impl<T> fmt::Debug for Receiver<T> {
1011
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012
0
        f.debug_struct("Receiver").finish()
1013
0
    }
1014
}
1015
1016
impl<T> Drop for Receiver<T> {
1017
0
    fn drop(&mut self) {
1018
0
        // Notify senders that all receivers have been dropped if the number of receivers drops
1019
0
        // to 0.
1020
0
        if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1021
0
            self.shared.disconnect_all();
1022
0
        }
1023
0
    }
Unexecuted instantiation: <flume::Receiver<alloc::sync::Arc<alloc::string::String>> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Receiver<alloc::string::String> as core::ops::drop::Drop>::drop
Unexecuted instantiation: <flume::Receiver<_> as core::ops::drop::Drop>::drop
1024
}
1025
1026
/// This exists as a shorthand for [`Receiver::iter`].
1027
impl<'a, T> IntoIterator for &'a Receiver<T> {
1028
    type Item = T;
1029
    type IntoIter = Iter<'a, T>;
1030
1031
0
    fn into_iter(self) -> Self::IntoIter {
1032
0
        Iter { receiver: self }
1033
0
    }
1034
}
1035
1036
impl<T> IntoIterator for Receiver<T> {
1037
    type Item = T;
1038
    type IntoIter = IntoIter<T>;
1039
1040
    /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
1041
0
    fn into_iter(self) -> Self::IntoIter {
1042
0
        IntoIter { receiver: self }
1043
0
    }
1044
}
1045
1046
/// An iterator over the msgs received from a channel.
1047
pub struct Iter<'a, T> {
1048
    receiver: &'a Receiver<T>,
1049
}
1050
1051
impl<'a, T> fmt::Debug for Iter<'a, T> {
1052
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1053
0
        f.debug_struct("Iter").field("receiver", &self.receiver).finish()
1054
0
    }
1055
}
1056
1057
impl<'a, T> Iterator for Iter<'a, T> {
1058
    type Item = T;
1059
1060
0
    fn next(&mut self) -> Option<Self::Item> {
1061
0
        self.receiver.recv().ok()
1062
0
    }
1063
}
1064
1065
/// An non-blocking iterator over the msgs received from a channel.
1066
pub struct TryIter<'a, T> {
1067
    receiver: &'a Receiver<T>,
1068
}
1069
1070
impl<'a, T> fmt::Debug for TryIter<'a, T> {
1071
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1072
0
        f.debug_struct("TryIter").field("receiver", &self.receiver).finish()
1073
0
    }
1074
}
1075
1076
impl<'a, T> Iterator for TryIter<'a, T> {
1077
    type Item = T;
1078
1079
0
    fn next(&mut self) -> Option<Self::Item> {
1080
0
        self.receiver.try_recv().ok()
1081
0
    }
1082
}
1083
1084
/// An fixed-sized iterator over the msgs drained from a channel.
1085
#[derive(Debug)]
1086
pub struct Drain<'a, T> {
1087
    queue: VecDeque<T>,
1088
    /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1089
    /// implementation may change and we don't want to unintentionally constrain it. Removing this
1090
    /// lifetime later is a possibility.
1091
    _phantom: PhantomData<&'a ()>,
1092
}
1093
1094
impl<'a, T> Iterator for Drain<'a, T> {
1095
    type Item = T;
1096
1097
0
    fn next(&mut self) -> Option<Self::Item> {
1098
0
        self.queue.pop_front()
1099
0
    }
1100
}
1101
1102
impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1103
0
    fn len(&self) -> usize {
1104
0
        self.queue.len()
1105
0
    }
1106
}
1107
1108
/// An owned iterator over the msgs received from a channel.
1109
pub struct IntoIter<T> {
1110
    receiver: Receiver<T>,
1111
}
1112
1113
impl<T> fmt::Debug for IntoIter<T> {
1114
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1115
0
        f.debug_struct("IntoIter").field("receiver", &self.receiver).finish()
1116
0
    }
1117
}
1118
1119
impl<T> Iterator for IntoIter<T> {
1120
    type Item = T;
1121
1122
0
    fn next(&mut self) -> Option<Self::Item> {
1123
0
        self.receiver.recv().ok()
1124
0
    }
1125
}
1126
1127
/// Create a channel with no maximum capacity.
1128
///
1129
/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
1130
/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1131
/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1132
/// may be cloned.
1133
///
1134
/// # Examples
1135
/// ```
1136
/// let (tx, rx) = flume::unbounded();
1137
///
1138
/// tx.send(42).unwrap();
1139
/// assert_eq!(rx.recv().unwrap(), 42);
1140
/// ```
1141
0
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1142
0
    let shared = Arc::new(Shared::new(None));
1143
0
    (
1144
0
        Sender { shared: shared.clone() },
1145
0
        Receiver { shared },
1146
0
    )
1147
0
}
Unexecuted instantiation: flume::unbounded::<alloc::sync::Arc<alloc::string::String>>
Unexecuted instantiation: flume::unbounded::<alloc::string::String>
Unexecuted instantiation: flume::unbounded::<_>
1148
1149
/// Create a channel with a maximum capacity.
1150
///
1151
/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
1152
/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1153
/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1154
/// may be cloned.
1155
///
1156
/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1157
/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1158
/// is not desired, [`Sender::try_send`] may be used.
1159
///
1160
/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
1161
/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
1162
/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
1163
/// perform a handshake and transfer ownership of a value.
1164
///
1165
/// # Examples
1166
/// ```
1167
/// let (tx, rx) = flume::bounded(32);
1168
///
1169
/// for i in 1..33 {
1170
///     tx.send(i).unwrap();
1171
/// }
1172
/// assert!(tx.try_send(33).is_err());
1173
///
1174
/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
1175
/// ```
1176
0
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1177
0
    let shared = Arc::new(Shared::new(Some(cap)));
1178
0
    (
1179
0
        Sender { shared: shared.clone() },
1180
0
        Receiver { shared },
1181
0
    )
1182
0
}