Coverage Report

Created: 2024-12-17 06:15

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/fanout.rs
Line
Count
Source (jump to first uncovered line)
1
use core::fmt::{Debug, Formatter, Result as FmtResult};
2
use core::pin::Pin;
3
use futures_core::task::{Context, Poll};
4
use futures_sink::Sink;
5
use pin_project_lite::pin_project;
6
7
pin_project! {
8
    /// Sink that clones incoming items and forwards them to two sinks at the same time.
9
    ///
10
    /// Backpressure from any downstream sink propagates up, which means that this sink
11
    /// can only process items as fast as its _slowest_ downstream sink.
12
    #[must_use = "sinks do nothing unless polled"]
13
    pub struct Fanout<Si1, Si2> {
14
        #[pin]
15
        sink1: Si1,
16
        #[pin]
17
        sink2: Si2
18
    }
19
}
20
21
impl<Si1, Si2> Fanout<Si1, Si2> {
22
0
    pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
23
0
        Self { sink1, sink2 }
24
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::new
25
26
    /// Get a shared reference to the inner sinks.
27
0
    pub fn get_ref(&self) -> (&Si1, &Si2) {
28
0
        (&self.sink1, &self.sink2)
29
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_ref
30
31
    /// Get a mutable reference to the inner sinks.
32
0
    pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
33
0
        (&mut self.sink1, &mut self.sink2)
34
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_mut
35
36
    /// Get a pinned mutable reference to the inner sinks.
37
0
    pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
38
0
        let this = self.project();
39
0
        (this.sink1, this.sink2)
40
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::get_pin_mut
41
42
    /// Consumes this combinator, returning the underlying sinks.
43
    ///
44
    /// Note that this may discard intermediate state of this combinator,
45
    /// so care should be taken to avoid losing resources when this is called.
46
0
    pub fn into_inner(self) -> (Si1, Si2) {
47
0
        (self.sink1, self.sink2)
48
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _>>::into_inner
49
}
50
51
impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
52
0
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
53
0
        f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish()
54
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as core::fmt::Debug>::fmt
55
}
56
57
impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
58
where
59
    Si1: Sink<Item>,
60
    Item: Clone,
61
    Si2: Sink<Item, Error = Si1::Error>,
62
{
63
    type Error = Si1::Error;
64
65
0
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66
0
        let this = self.project();
67
68
0
        let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
69
0
        let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
70
0
        let ready = sink1_ready && sink2_ready;
71
0
        if ready {
72
0
            Poll::Ready(Ok(()))
73
        } else {
74
0
            Poll::Pending
75
        }
76
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_ready
77
78
0
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
79
0
        let this = self.project();
80
0
81
0
        this.sink1.start_send(item.clone())?;
82
0
        this.sink2.start_send(item)?;
83
0
        Ok(())
84
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::start_send
85
86
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87
0
        let this = self.project();
88
89
0
        let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
90
0
        let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
91
0
        let ready = sink1_ready && sink2_ready;
92
0
        if ready {
93
0
            Poll::Ready(Ok(()))
94
        } else {
95
0
            Poll::Pending
96
        }
97
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_flush
98
99
0
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100
0
        let this = self.project();
101
102
0
        let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
103
0
        let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
104
0
        let ready = sink1_ready && sink2_ready;
105
0
        if ready {
106
0
            Poll::Ready(Ok(()))
107
        } else {
108
0
            Poll::Pending
109
        }
110
0
    }
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close
Unexecuted instantiation: <futures_util::sink::fanout::Fanout<_, _> as futures_sink::Sink<_>>::poll_close
111
}