/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/sink/buffer.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use alloc::collections::VecDeque; |
2 | | use core::pin::Pin; |
3 | | use futures_core::ready; |
4 | | use futures_core::stream::{FusedStream, Stream}; |
5 | | use futures_core::task::{Context, Poll}; |
6 | | use futures_sink::Sink; |
7 | | use pin_project_lite::pin_project; |
8 | | |
9 | | pin_project! { |
10 | | /// Sink for the [`buffer`](super::SinkExt::buffer) method. |
11 | | #[derive(Debug)] |
12 | | #[must_use = "sinks do nothing unless polled"] |
13 | | pub struct Buffer<Si, Item> { |
14 | | #[pin] |
15 | | sink: Si, |
16 | | buf: VecDeque<Item>, |
17 | | |
18 | | // Track capacity separately from the `VecDeque`, which may be rounded up |
19 | | capacity: usize, |
20 | | } |
21 | | } |
22 | | |
23 | | impl<Si: Sink<Item>, Item> Buffer<Si, Item> { |
24 | 0 | pub(super) fn new(sink: Si, capacity: usize) -> Self { |
25 | 0 | Self { sink, buf: VecDeque::with_capacity(capacity), capacity } |
26 | 0 | } |
27 | | |
28 | | delegate_access_inner!(sink, Si, ()); |
29 | | |
30 | 0 | fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { |
31 | 0 | let mut this = self.project(); |
32 | 0 | ready!(this.sink.as_mut().poll_ready(cx))?; |
33 | 0 | while let Some(item) = this.buf.pop_front() { |
34 | 0 | this.sink.as_mut().start_send(item)?; |
35 | 0 | if !this.buf.is_empty() { |
36 | 0 | ready!(this.sink.as_mut().poll_ready(cx))?; |
37 | 0 | } |
38 | | } |
39 | 0 | Poll::Ready(Ok(())) |
40 | 0 | } |
41 | | } |
42 | | |
43 | | // Forwarding impl of Stream from the underlying sink |
44 | | impl<S, Item> Stream for Buffer<S, Item> |
45 | | where |
46 | | S: Sink<Item> + Stream, |
47 | | { |
48 | | type Item = S::Item; |
49 | | |
50 | 0 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
51 | 0 | self.project().sink.poll_next(cx) |
52 | 0 | } |
53 | | |
54 | 0 | fn size_hint(&self) -> (usize, Option<usize>) { |
55 | 0 | self.sink.size_hint() |
56 | 0 | } |
57 | | } |
58 | | |
59 | | impl<S, Item> FusedStream for Buffer<S, Item> |
60 | | where |
61 | | S: Sink<Item> + FusedStream, |
62 | | { |
63 | 0 | fn is_terminated(&self) -> bool { |
64 | 0 | self.sink.is_terminated() |
65 | 0 | } |
66 | | } |
67 | | |
68 | | impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { |
69 | | type Error = Si::Error; |
70 | | |
71 | 0 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
72 | 0 | if self.capacity == 0 { |
73 | 0 | return self.project().sink.poll_ready(cx); |
74 | 0 | } |
75 | 0 |
|
76 | 0 | let _ = self.as_mut().try_empty_buffer(cx)?; |
77 | | |
78 | 0 | if self.buf.len() >= self.capacity { |
79 | 0 | Poll::Pending |
80 | | } else { |
81 | 0 | Poll::Ready(Ok(())) |
82 | | } |
83 | 0 | } |
84 | | |
85 | 0 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
86 | 0 | if self.capacity == 0 { |
87 | 0 | self.project().sink.start_send(item) |
88 | | } else { |
89 | 0 | self.project().buf.push_back(item); |
90 | 0 | Ok(()) |
91 | | } |
92 | 0 | } |
93 | | |
94 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
95 | 0 | ready!(self.as_mut().try_empty_buffer(cx))?; |
96 | 0 | debug_assert!(self.buf.is_empty()); |
97 | 0 | self.project().sink.poll_flush(cx) |
98 | 0 | } |
99 | | |
100 | 0 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
101 | 0 | ready!(self.as_mut().try_empty_buffer(cx))?; |
102 | 0 | debug_assert!(self.buf.is_empty()); |
103 | 0 | self.project().sink.poll_close(cx) |
104 | 0 | } |
105 | | } |