Coverage Report

Created: 2024-12-17 06:15

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/unfold.rs
Line
Count
Source (jump to first uncovered line)
1
use super::assert_sink;
2
use crate::unfold_state::UnfoldState;
3
use core::{future::Future, pin::Pin};
4
use futures_core::ready;
5
use futures_core::task::{Context, Poll};
6
use futures_sink::Sink;
7
use pin_project_lite::pin_project;
8
9
pin_project! {
10
    /// Sink for the [`unfold`] function.
11
    #[derive(Debug)]
12
    #[must_use = "sinks do nothing unless polled"]
13
    pub struct Unfold<T, F, R> {
14
        function: F,
15
        #[pin]
16
        state: UnfoldState<T, R>,
17
    }
18
}
19
20
/// Create a sink from a function which processes one item at a time.
21
///
22
/// # Examples
23
///
24
/// ```
25
/// # futures::executor::block_on(async {
26
/// use futures::sink::{self, SinkExt};
27
///
28
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
29
///     async move {
30
///         sum += i;
31
///         eprintln!("{}", i);
32
///         Ok::<_, futures::never::Never>(sum)
33
///     }
34
/// });
35
/// futures::pin_mut!(unfold);
36
/// unfold.send(5).await?;
37
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
38
/// ```
39
0
pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
40
0
where
41
0
    F: FnMut(T, Item) -> R,
42
0
    R: Future<Output = Result<T, E>>,
43
0
{
44
0
    assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
45
0
}
Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _>
Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _>
Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _>
46
47
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
48
where
49
    F: FnMut(T, Item) -> R,
50
    R: Future<Output = Result<T, E>>,
51
{
52
    type Error = E;
53
54
0
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55
0
        self.poll_flush(cx)
56
0
    }
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready
57
58
0
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
59
0
        let mut this = self.project();
60
0
        let future = match this.state.as_mut().take_value() {
61
0
            Some(value) => (this.function)(value, item),
62
0
            None => panic!("start_send called without poll_ready being called first"),
63
        };
64
0
        this.state.set(UnfoldState::Future { future });
65
0
        Ok(())
66
0
    }
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send
67
68
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69
0
        let mut this = self.project();
70
0
        Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
71
0
            match ready!(future.poll(cx)) {
72
0
                Ok(state) => {
73
0
                    this.state.set(UnfoldState::Value { value: state });
74
0
                    Ok(())
75
                }
76
0
                Err(err) => {
77
0
                    this.state.set(UnfoldState::Empty);
78
0
                    Err(err)
79
                }
80
            }
81
        } else {
82
0
            Ok(())
83
        })
84
0
    }
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush
85
86
0
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87
0
        self.poll_flush(cx)
88
0
    }
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close
Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close
89
}