Coverage Report

Created: 2026-05-30 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.32/src/sink/unfold.rs
Line
Count
Source
1
use super::assert_sink;
2
use crate::unfold_state::UnfoldState;
3
use core::{future::Future, pin::Pin};
4
use futures_core::ready;
5
use futures_core::task::{Context, Poll};
6
use futures_sink::Sink;
7
use pin_project_lite::pin_project;
8
9
pin_project! {
10
    /// Sink for the [`unfold`] function.
11
    #[derive(Debug)]
12
    #[must_use = "sinks do nothing unless polled"]
13
    pub struct Unfold<T, F, R> {
14
        function: F,
15
        #[pin]
16
        state: UnfoldState<T, R>,
17
    }
18
}
19
20
/// Create a sink from a function which processes one item at a time.
21
///
22
/// # Examples
23
///
24
/// ```
25
/// # futures::executor::block_on(async {
26
/// use core::pin::pin;
27
///
28
/// use futures::sink;
29
/// use futures::sink::SinkExt;
30
///
31
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
32
///     async move {
33
///         sum += i;
34
///         eprintln!("{}", i);
35
///         Ok::<_, futures::never::Never>(sum)
36
///     }
37
/// });
38
/// let mut unfold = pin!(unfold);
39
/// unfold.send(5).await?;
40
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
41
/// ```
42
0
pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
43
0
where
44
0
    F: FnMut(T, Item) -> R,
45
0
    R: Future<Output = Result<T, E>>,
46
{
47
0
    assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
48
0
}
49
50
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
51
where
52
    F: FnMut(T, Item) -> R,
53
    R: Future<Output = Result<T, E>>,
54
{
55
    type Error = E;
56
57
0
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58
0
        self.poll_flush(cx)
59
0
    }
60
61
0
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
62
0
        let mut this = self.project();
63
0
        let future = match this.state.as_mut().take_value() {
64
0
            Some(value) => (this.function)(value, item),
65
0
            None => panic!("start_send called without poll_ready being called first"),
66
        };
67
0
        this.state.set(UnfoldState::Future { future });
68
0
        Ok(())
69
0
    }
70
71
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72
0
        let mut this = self.project();
73
0
        Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
74
0
            match ready!(future.poll(cx)) {
75
0
                Ok(state) => {
76
0
                    this.state.set(UnfoldState::Value { value: state });
77
0
                    Ok(())
78
                }
79
0
                Err(err) => {
80
0
                    this.state.set(UnfoldState::Empty);
81
0
                    Err(err)
82
                }
83
            }
84
        } else {
85
0
            Ok(())
86
        })
87
0
    }
88
89
0
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90
0
        self.poll_flush(cx)
91
0
    }
92
}