Coverage Report

Created: 2025-07-11 06:53

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/buffer.rs
Line
Count
Source (jump to first uncovered line)
1
use alloc::collections::VecDeque;
2
use core::pin::Pin;
3
use futures_core::ready;
4
use futures_core::stream::{FusedStream, Stream};
5
use futures_core::task::{Context, Poll};
6
use futures_sink::Sink;
7
use pin_project_lite::pin_project;
8
9
pin_project! {
10
    /// Sink for the [`buffer`](super::SinkExt::buffer) method.
11
    #[derive(Debug)]
12
    #[must_use = "sinks do nothing unless polled"]
13
    pub struct Buffer<Si, Item> {
14
        #[pin]
15
        sink: Si,
16
        buf: VecDeque<Item>,
17
18
        // Track capacity separately from the `VecDeque`, which may be rounded up
19
        capacity: usize,
20
    }
21
}
22
23
impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
24
0
    pub(super) fn new(sink: Si, capacity: usize) -> Self {
25
0
        Self { sink, buf: VecDeque::with_capacity(capacity), capacity }
26
0
    }
27
28
    delegate_access_inner!(sink, Si, ());
29
30
0
    fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> {
31
0
        let mut this = self.project();
32
0
        ready!(this.sink.as_mut().poll_ready(cx))?;
33
0
        while let Some(item) = this.buf.pop_front() {
34
0
            this.sink.as_mut().start_send(item)?;
35
0
            if !this.buf.is_empty() {
36
0
                ready!(this.sink.as_mut().poll_ready(cx))?;
37
0
            }
38
        }
39
0
        Poll::Ready(Ok(()))
40
0
    }
41
}
42
43
// Forwarding impl of Stream from the underlying sink
44
impl<S, Item> Stream for Buffer<S, Item>
45
where
46
    S: Sink<Item> + Stream,
47
{
48
    type Item = S::Item;
49
50
0
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
51
0
        self.project().sink.poll_next(cx)
52
0
    }
53
54
0
    fn size_hint(&self) -> (usize, Option<usize>) {
55
0
        self.sink.size_hint()
56
0
    }
57
}
58
59
impl<S, Item> FusedStream for Buffer<S, Item>
60
where
61
    S: Sink<Item> + FusedStream,
62
{
63
0
    fn is_terminated(&self) -> bool {
64
0
        self.sink.is_terminated()
65
0
    }
66
}
67
68
impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
69
    type Error = Si::Error;
70
71
0
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72
0
        if self.capacity == 0 {
73
0
            return self.project().sink.poll_ready(cx);
74
0
        }
75
0
76
0
        let _ = self.as_mut().try_empty_buffer(cx)?;
77
78
0
        if self.buf.len() >= self.capacity {
79
0
            Poll::Pending
80
        } else {
81
0
            Poll::Ready(Ok(()))
82
        }
83
0
    }
84
85
0
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
86
0
        if self.capacity == 0 {
87
0
            self.project().sink.start_send(item)
88
        } else {
89
0
            self.project().buf.push_back(item);
90
0
            Ok(())
91
        }
92
0
    }
93
94
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
95
0
        ready!(self.as_mut().try_empty_buffer(cx))?;
96
0
        debug_assert!(self.buf.is_empty());
97
0
        self.project().sink.poll_flush(cx)
98
0
    }
99
100
0
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101
0
        ready!(self.as_mut().try_empty_buffer(cx))?;
102
0
        debug_assert!(self.buf.is_empty());
103
0
        self.project().sink.poll_close(cx)
104
0
    }
105
}