Coverage Report

Created: 2025-10-12 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/tokio/tokio-stream/src/wrappers/mpsc_unbounded.rs
Line
Count
Source
1
use crate::Stream;
2
use std::pin::Pin;
3
use std::task::{Context, Poll};
4
use tokio::sync::mpsc::UnboundedReceiver;
5
6
/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
7
///
8
/// # Example
9
///
10
/// ```
11
/// use tokio::sync::mpsc;
12
/// use tokio_stream::wrappers::UnboundedReceiverStream;
13
/// use tokio_stream::StreamExt;
14
///
15
/// # #[tokio::main(flavor = "current_thread")]
16
/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17
/// let (tx, rx) = mpsc::unbounded_channel();
18
/// tx.send(10)?;
19
/// tx.send(20)?;
20
/// # // prevent the doc test from hanging
21
/// drop(tx);
22
///
23
/// let mut stream = UnboundedReceiverStream::new(rx);
24
/// assert_eq!(stream.next().await, Some(10));
25
/// assert_eq!(stream.next().await, Some(20));
26
/// assert_eq!(stream.next().await, None);
27
/// # Ok(())
28
/// # }
29
/// ```
30
///
31
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
32
/// [`Stream`]: trait@crate::Stream
33
#[derive(Debug)]
34
pub struct UnboundedReceiverStream<T> {
35
    inner: UnboundedReceiver<T>,
36
}
37
38
impl<T> UnboundedReceiverStream<T> {
39
    /// Create a new `UnboundedReceiverStream`.
40
0
    pub fn new(recv: UnboundedReceiver<T>) -> Self {
41
0
        Self { inner: recv }
42
0
    }
43
44
    /// Get back the inner `UnboundedReceiver`.
45
    pub fn into_inner(self) -> UnboundedReceiver<T> {
46
        self.inner
47
    }
48
49
    /// Closes the receiving half of a channel without dropping it.
50
    ///
51
    /// This prevents any further messages from being sent on the channel while
52
    /// still enabling the receiver to drain messages that are buffered.
53
    pub fn close(&mut self) {
54
        self.inner.close();
55
    }
56
}
57
58
impl<T> Stream for UnboundedReceiverStream<T> {
59
    type Item = T;
60
61
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62
0
        self.inner.poll_recv(cx)
63
0
    }
64
65
    /// Returns the bounds of the stream based on the underlying receiver.
66
    ///
67
    /// For open channels, it returns `(receiver.len(), None)`.
68
    ///
69
    /// For closed channels, it returns `(receiver.len(), receiver.len())`.
70
    fn size_hint(&self) -> (usize, Option<usize>) {
71
        if self.inner.is_closed() {
72
            let len = self.inner.len();
73
            (len, Some(len))
74
        } else {
75
            (self.inner.len(), None)
76
        }
77
    }
78
}
79
80
impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
81
    fn as_ref(&self) -> &UnboundedReceiver<T> {
82
        &self.inner
83
    }
84
}
85
86
impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
87
    fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
88
        &mut self.inner
89
    }
90
}
91
92
impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
93
    fn from(recv: UnboundedReceiver<T>) -> Self {
94
        Self::new(recv)
95
    }
96
}