/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/unfold.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use super::assert_sink; |
2 | | use crate::unfold_state::UnfoldState; |
3 | | use core::{future::Future, pin::Pin}; |
4 | | use futures_core::ready; |
5 | | use futures_core::task::{Context, Poll}; |
6 | | use futures_sink::Sink; |
7 | | use pin_project_lite::pin_project; |
8 | | |
9 | | pin_project! { |
10 | | /// Sink for the [`unfold`] function. |
11 | | #[derive(Debug)] |
12 | | #[must_use = "sinks do nothing unless polled"] |
13 | | pub struct Unfold<T, F, R> { |
14 | | function: F, |
15 | | #[pin] |
16 | | state: UnfoldState<T, R>, |
17 | | } |
18 | | } |
19 | | |
20 | | /// Create a sink from a function which processes one item at a time. |
21 | | /// |
22 | | /// # Examples |
23 | | /// |
24 | | /// ``` |
25 | | /// # futures::executor::block_on(async { |
26 | | /// use futures::sink::{self, SinkExt}; |
27 | | /// |
28 | | /// let unfold = sink::unfold(0, |mut sum, i: i32| { |
29 | | /// async move { |
30 | | /// sum += i; |
31 | | /// eprintln!("{}", i); |
32 | | /// Ok::<_, futures::never::Never>(sum) |
33 | | /// } |
34 | | /// }); |
35 | | /// futures::pin_mut!(unfold); |
36 | | /// unfold.send(5).await?; |
37 | | /// # Ok::<(), futures::never::Never>(()) }).unwrap(); |
38 | | /// ``` |
39 | 0 | pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R> |
40 | 0 | where |
41 | 0 | F: FnMut(T, Item) -> R, |
42 | 0 | R: Future<Output = Result<T, E>>, |
43 | 0 | { |
44 | 0 | assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } }) |
45 | 0 | } Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _> Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _> Unexecuted instantiation: futures_util::sink::unfold::unfold::<_, _, _, _, _> |
46 | | |
47 | | impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> |
48 | | where |
49 | | F: FnMut(T, Item) -> R, |
50 | | R: Future<Output = Result<T, E>>, |
51 | | { |
52 | | type Error = E; |
53 | | |
54 | 0 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
55 | 0 | self.poll_flush(cx) |
56 | 0 | } Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_ready |
57 | | |
58 | 0 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
59 | 0 | let mut this = self.project(); |
60 | 0 | let future = match this.state.as_mut().take_value() { |
61 | 0 | Some(value) => (this.function)(value, item), |
62 | 0 | None => panic!("start_send called without poll_ready being called first"), |
63 | | }; |
64 | 0 | this.state.set(UnfoldState::Future { future }); |
65 | 0 | Ok(()) |
66 | 0 | } Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::start_send |
67 | | |
68 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
69 | 0 | let mut this = self.project(); |
70 | 0 | Poll::Ready(if let Some(future) = this.state.as_mut().project_future() { |
71 | 0 | match ready!(future.poll(cx)) { |
72 | 0 | Ok(state) => { |
73 | 0 | this.state.set(UnfoldState::Value { value: state }); |
74 | 0 | Ok(()) |
75 | | } |
76 | 0 | Err(err) => { |
77 | 0 | this.state.set(UnfoldState::Empty); |
78 | 0 | Err(err) |
79 | | } |
80 | | } |
81 | | } else { |
82 | 0 | Ok(()) |
83 | | }) |
84 | 0 | } Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_flush |
85 | | |
86 | 0 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
87 | 0 | self.poll_flush(cx) |
88 | 0 | } Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close Unexecuted instantiation: <futures_util::sink::unfold::Unfold<_, _, _> as futures_sink::Sink<_>>::poll_close |
89 | | } |