/src/tokio/tokio-stream/src/wrappers/mpsc_unbounded.rs
Line | Count | Source |
1 | | use crate::Stream; |
2 | | use std::pin::Pin; |
3 | | use std::task::{Context, Poll}; |
4 | | use tokio::sync::mpsc::UnboundedReceiver; |
5 | | |
6 | | /// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. |
7 | | /// |
8 | | /// # Example |
9 | | /// |
10 | | /// ``` |
11 | | /// use tokio::sync::mpsc; |
12 | | /// use tokio_stream::wrappers::UnboundedReceiverStream; |
13 | | /// use tokio_stream::StreamExt; |
14 | | /// |
15 | | /// # #[tokio::main(flavor = "current_thread")] |
16 | | /// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> { |
17 | | /// let (tx, rx) = mpsc::unbounded_channel(); |
18 | | /// tx.send(10)?; |
19 | | /// tx.send(20)?; |
20 | | /// # // prevent the doc test from hanging |
21 | | /// drop(tx); |
22 | | /// |
23 | | /// let mut stream = UnboundedReceiverStream::new(rx); |
24 | | /// assert_eq!(stream.next().await, Some(10)); |
25 | | /// assert_eq!(stream.next().await, Some(20)); |
26 | | /// assert_eq!(stream.next().await, None); |
27 | | /// # Ok(()) |
28 | | /// # } |
29 | | /// ``` |
30 | | /// |
31 | | /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver |
32 | | /// [`Stream`]: trait@crate::Stream |
33 | | #[derive(Debug)] |
34 | | pub struct UnboundedReceiverStream<T> { |
35 | | inner: UnboundedReceiver<T>, |
36 | | } |
37 | | |
38 | | impl<T> UnboundedReceiverStream<T> { |
39 | | /// Create a new `UnboundedReceiverStream`. |
40 | 0 | pub fn new(recv: UnboundedReceiver<T>) -> Self { |
41 | 0 | Self { inner: recv } |
42 | 0 | } |
43 | | |
44 | | /// Get back the inner `UnboundedReceiver`. |
45 | | pub fn into_inner(self) -> UnboundedReceiver<T> { |
46 | | self.inner |
47 | | } |
48 | | |
49 | | /// Closes the receiving half of a channel without dropping it. |
50 | | /// |
51 | | /// This prevents any further messages from being sent on the channel while |
52 | | /// still enabling the receiver to drain messages that are buffered. |
53 | | pub fn close(&mut self) { |
54 | | self.inner.close(); |
55 | | } |
56 | | } |
57 | | |
58 | | impl<T> Stream for UnboundedReceiverStream<T> { |
59 | | type Item = T; |
60 | | |
61 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
62 | 0 | self.inner.poll_recv(cx) |
63 | 0 | } |
64 | | |
65 | | /// Returns the bounds of the stream based on the underlying receiver. |
66 | | /// |
67 | | /// For open channels, it returns `(receiver.len(), None)`. |
68 | | /// |
69 | | /// For closed channels, it returns `(receiver.len(), receiver.len())`. |
70 | | fn size_hint(&self) -> (usize, Option<usize>) { |
71 | | if self.inner.is_closed() { |
72 | | let len = self.inner.len(); |
73 | | (len, Some(len)) |
74 | | } else { |
75 | | (self.inner.len(), None) |
76 | | } |
77 | | } |
78 | | } |
79 | | |
80 | | impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
81 | | fn as_ref(&self) -> &UnboundedReceiver<T> { |
82 | | &self.inner |
83 | | } |
84 | | } |
85 | | |
86 | | impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
87 | | fn as_mut(&mut self) -> &mut UnboundedReceiver<T> { |
88 | | &mut self.inner |
89 | | } |
90 | | } |
91 | | |
92 | | impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
93 | | fn from(recv: UnboundedReceiver<T>) -> Self { |
94 | | Self::new(recv) |
95 | | } |
96 | | } |