Coverage Report

Created: 2025-11-16 06:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-stream-0.1.17/src/stream_close.rs
Line
Count
Source
1
use crate::Stream;
2
use pin_project_lite::pin_project;
3
use std::pin::Pin;
4
use std::task::{Context, Poll};
5
6
pin_project! {
7
    /// A `Stream` that wraps the values in an `Option`.
8
    ///
9
    /// Whenever the wrapped stream yields an item, this stream yields that item
10
    /// wrapped in `Some`. When the inner stream ends, then this stream first
11
    /// yields a `None` item, and then this stream will also end.
12
    ///
13
    /// # Example
14
    ///
15
    /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
16
    ///
17
    /// ```
18
    /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
19
    ///
20
    /// #[tokio::main]
21
    /// async fn main() {
22
    ///     let mut map = StreamMap::new();
23
    ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
24
    ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
25
    ///     map.insert(0, stream);
26
    ///     map.insert(1, stream2);
27
    ///     while let Some((key, val)) = map.next().await {
28
    ///         match val {
29
    ///             Some(val) => println!("got {val:?} from stream {key:?}"),
30
    ///             None => println!("stream {key:?} closed"),
31
    ///         }
32
    ///     }
33
    /// }
34
    /// ```
35
    #[must_use = "streams do nothing unless polled"]
36
    pub struct StreamNotifyClose<S> {
37
        #[pin]
38
        inner: Option<S>,
39
    }
40
}
41
42
impl<S> StreamNotifyClose<S> {
43
    /// Create a new `StreamNotifyClose`.
44
0
    pub fn new(stream: S) -> Self {
45
0
        Self {
46
0
            inner: Some(stream),
47
0
        }
48
0
    }
49
50
    /// Get back the inner `Stream`.
51
    ///
52
    /// Returns `None` if the stream has reached its end.
53
0
    pub fn into_inner(self) -> Option<S> {
54
0
        self.inner
55
0
    }
56
}
57
58
impl<S> Stream for StreamNotifyClose<S>
59
where
60
    S: Stream,
61
{
62
    type Item = Option<S::Item>;
63
64
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65
        // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
66
0
        match self
67
0
            .as_mut()
68
0
            .project()
69
0
            .inner
70
0
            .as_pin_mut()
71
0
            .map(|stream| S::poll_next(stream, cx))
72
        {
73
0
            Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
74
            Some(Poll::Ready(None)) => {
75
0
                self.project().inner.set(None);
76
0
                Poll::Ready(Some(None))
77
            }
78
0
            Some(Poll::Pending) => Poll::Pending,
79
0
            None => Poll::Ready(None),
80
        }
81
0
    }
82
83
    #[inline]
84
0
    fn size_hint(&self) -> (usize, Option<usize>) {
85
0
        if let Some(inner) = &self.inner {
86
            // We always return +1 because when there's stream there's atleast one more item.
87
0
            let (l, u) = inner.size_hint();
88
0
            (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
89
        } else {
90
0
            (0, Some(0))
91
        }
92
0
    }
93
}