/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.17/src/stream_close.rs
Line | Count | Source |
1 | | use crate::Stream; |
2 | | use pin_project_lite::pin_project; |
3 | | use std::pin::Pin; |
4 | | use std::task::{Context, Poll}; |
5 | | |
6 | | pin_project! { |
7 | | /// A `Stream` that wraps the values in an `Option`. |
8 | | /// |
9 | | /// Whenever the wrapped stream yields an item, this stream yields that item |
10 | | /// wrapped in `Some`. When the inner stream ends, then this stream first |
11 | | /// yields a `None` item, and then this stream will also end. |
12 | | /// |
13 | | /// # Example |
14 | | /// |
15 | | /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. |
16 | | /// |
17 | | /// ``` |
18 | | /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; |
19 | | /// |
20 | | /// #[tokio::main] |
21 | | /// async fn main() { |
22 | | /// let mut map = StreamMap::new(); |
23 | | /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
24 | | /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
25 | | /// map.insert(0, stream); |
26 | | /// map.insert(1, stream2); |
27 | | /// while let Some((key, val)) = map.next().await { |
28 | | /// match val { |
29 | | /// Some(val) => println!("got {val:?} from stream {key:?}"), |
30 | | /// None => println!("stream {key:?} closed"), |
31 | | /// } |
32 | | /// } |
33 | | /// } |
34 | | /// ``` |
35 | | #[must_use = "streams do nothing unless polled"] |
36 | | pub struct StreamNotifyClose<S> { |
37 | | #[pin] |
38 | | inner: Option<S>, |
39 | | } |
40 | | } |
41 | | |
42 | | impl<S> StreamNotifyClose<S> { |
43 | | /// Create a new `StreamNotifyClose`. |
44 | 0 | pub fn new(stream: S) -> Self { |
45 | 0 | Self { |
46 | 0 | inner: Some(stream), |
47 | 0 | } |
48 | 0 | } |
49 | | |
50 | | /// Get back the inner `Stream`. |
51 | | /// |
52 | | /// Returns `None` if the stream has reached its end. |
53 | 0 | pub fn into_inner(self) -> Option<S> { |
54 | 0 | self.inner |
55 | 0 | } |
56 | | } |
57 | | |
58 | | impl<S> Stream for StreamNotifyClose<S> |
59 | | where |
60 | | S: Stream, |
61 | | { |
62 | | type Item = Option<S::Item>; |
63 | | |
64 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
65 | | // We can't invoke poll_next after it ended, so we unset the inner stream as a marker. |
66 | 0 | match self |
67 | 0 | .as_mut() |
68 | 0 | .project() |
69 | 0 | .inner |
70 | 0 | .as_pin_mut() |
71 | 0 | .map(|stream| S::poll_next(stream, cx)) |
72 | | { |
73 | 0 | Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))), |
74 | | Some(Poll::Ready(None)) => { |
75 | 0 | self.project().inner.set(None); |
76 | 0 | Poll::Ready(Some(None)) |
77 | | } |
78 | 0 | Some(Poll::Pending) => Poll::Pending, |
79 | 0 | None => Poll::Ready(None), |
80 | | } |
81 | 0 | } |
82 | | |
83 | | #[inline] |
84 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
85 | 0 | if let Some(inner) = &self.inner { |
86 | | // We always return +1 because when there's stream there's atleast one more item. |
87 | 0 | let (l, u) = inner.size_hint(); |
88 | 0 | (l.saturating_add(1), u.and_then(|u| u.checked_add(1))) |
89 | | } else { |
90 | 0 | (0, Some(0)) |
91 | | } |
92 | 0 | } |
93 | | } |