/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/fanout.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use core::fmt::{Debug, Formatter, Result as FmtResult}; |
2 | | use core::pin::Pin; |
3 | | use futures_core::task::{Context, Poll}; |
4 | | use futures_sink::Sink; |
5 | | use pin_project_lite::pin_project; |
6 | | |
7 | | pin_project! { |
8 | | /// Sink that clones incoming items and forwards them to two sinks at the same time. |
9 | | /// |
10 | | /// Backpressure from any downstream sink propagates up, which means that this sink |
11 | | /// can only process items as fast as its _slowest_ downstream sink. |
12 | | #[must_use = "sinks do nothing unless polled"] |
13 | | pub struct Fanout<Si1, Si2> { |
14 | | #[pin] |
15 | | sink1: Si1, |
16 | | #[pin] |
17 | | sink2: Si2 |
18 | | } |
19 | | } |
20 | | |
21 | | impl<Si1, Si2> Fanout<Si1, Si2> { |
22 | 0 | pub(super) fn new(sink1: Si1, sink2: Si2) -> Self { |
23 | 0 | Self { sink1, sink2 } |
24 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new |
25 | | |
26 | | /// Get a shared reference to the inner sinks. |
27 | 0 | pub fn get_ref(&self) -> (&Si1, &Si2) { |
28 | 0 | (&self.sink1, &self.sink2) |
29 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref |
30 | | |
31 | | /// Get a mutable reference to the inner sinks. |
32 | 0 | pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) { |
33 | 0 | (&mut self.sink1, &mut self.sink2) |
34 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut |
35 | | |
36 | | /// Get a pinned mutable reference to the inner sinks. |
37 | 0 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { |
38 | 0 | let this = self.project(); |
39 | 0 | (this.sink1, this.sink2) |
40 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut |
41 | | |
42 | | /// Consumes this combinator, returning the underlying sinks. |
43 | | /// |
44 | | /// Note that this may discard intermediate state of this combinator, |
45 | | /// so care should be taken to avoid losing resources when this is called. |
46 | 0 | pub fn into_inner(self) -> (Si1, Si2) { |
47 | 0 | (self.sink1, self.sink2) |
48 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner |
49 | | } |
50 | | |
51 | | impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> { |
52 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { |
53 | 0 | f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish() |
54 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt |
55 | | } |
56 | | |
57 | | impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> |
58 | | where |
59 | | Si1: Sink<Item>, |
60 | | Item: Clone, |
61 | | Si2: Sink<Item, Error = Si1::Error>, |
62 | | { |
63 | | type Error = Si1::Error; |
64 | | |
65 | 0 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
66 | 0 | let this = self.project(); |
67 | | |
68 | 0 | let sink1_ready = this.sink1.poll_ready(cx)?.is_ready(); |
69 | 0 | let sink2_ready = this.sink2.poll_ready(cx)?.is_ready(); |
70 | 0 | let ready = sink1_ready && sink2_ready; |
71 | 0 | if ready { |
72 | 0 | Poll::Ready(Ok(())) |
73 | | } else { |
74 | 0 | Poll::Pending |
75 | | } |
76 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready |
77 | | |
78 | 0 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
79 | 0 | let this = self.project(); |
80 | 0 |
|
81 | 0 | this.sink1.start_send(item.clone())?; |
82 | 0 | this.sink2.start_send(item)?; |
83 | 0 | Ok(()) |
84 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send |
85 | | |
86 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
87 | 0 | let this = self.project(); |
88 | | |
89 | 0 | let sink1_ready = this.sink1.poll_flush(cx)?.is_ready(); |
90 | 0 | let sink2_ready = this.sink2.poll_flush(cx)?.is_ready(); |
91 | 0 | let ready = sink1_ready && sink2_ready; |
92 | 0 | if ready { |
93 | 0 | Poll::Ready(Ok(())) |
94 | | } else { |
95 | 0 | Poll::Pending |
96 | | } |
97 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush |
98 | | |
99 | 0 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
100 | 0 | let this = self.project(); |
101 | | |
102 | 0 | let sink1_ready = this.sink1.poll_close(cx)?.is_ready(); |
103 | 0 | let sink2_ready = this.sink2.poll_close(cx)?.is_ready(); |
104 | 0 | let ready = sink1_ready && sink2_ready; |
105 | 0 | if ready { |
106 | 0 | Poll::Ready(Ok(())) |
107 | | } else { |
108 | 0 | Poll::Pending |
109 | | } |
110 | 0 | } Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close |
111 | | } |