Coverage Report

Created: 2025-08-29 06:43

/src/tokio/tokio-stream/src/stream_ext/all.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::Stream;
2
3
use core::future::Future;
4
use core::marker::PhantomPinned;
5
use core::pin::Pin;
6
use core::task::{ready, Context, Poll};
7
use pin_project_lite::pin_project;
8
9
pin_project! {
10
    /// Future for the [`all`](super::StreamExt::all) method.
11
    #[derive(Debug)]
12
    #[must_use = "futures do nothing unless you `.await` or poll them"]
13
    pub struct AllFuture<'a, St: ?Sized, F> {
14
        stream: &'a mut St,
15
        f: F,
16
        // Make this future `!Unpin` for compatibility with async trait methods.
17
        #[pin]
18
        _pin: PhantomPinned,
19
    }
20
}
21
22
impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
23
0
    pub(super) fn new(stream: &'a mut St, f: F) -> Self {
24
0
        Self {
25
0
            stream,
26
0
            f,
27
0
            _pin: PhantomPinned,
28
0
        }
29
0
    }
30
}
31
32
impl<St, F> Future for AllFuture<'_, St, F>
33
where
34
    St: ?Sized + Stream + Unpin,
35
    F: FnMut(St::Item) -> bool,
36
{
37
    type Output = bool;
38
39
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40
0
        let me = self.project();
41
0
        let mut stream = Pin::new(me.stream);
42
43
        // Take a maximum of 32 items from the stream before yielding.
44
0
        for _ in 0..32 {
45
0
            match ready!(stream.as_mut().poll_next(cx)) {
46
0
                Some(v) => {
47
0
                    if !(me.f)(v) {
48
0
                        return Poll::Ready(false);
49
0
                    }
50
                }
51
0
                None => return Poll::Ready(true),
52
            }
53
        }
54
55
0
        cx.waker().wake_by_ref();
56
0
        Poll::Pending
57
0
    }
58
}