Coverage Report

Created: 2025-08-28 06:06

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/io/take.rs
Line
Count
Source (jump to first uncovered line)
1
use futures_core::ready;
2
use futures_core::task::{Context, Poll};
3
use futures_io::{AsyncBufRead, AsyncRead};
4
use pin_project_lite::pin_project;
5
use std::pin::Pin;
6
use std::{cmp, io};
7
8
pin_project! {
9
    /// Reader for the [`take`](super::AsyncReadExt::take) method.
10
    #[derive(Debug)]
11
    #[must_use = "readers do nothing unless you `.await` or poll them"]
12
    pub struct Take<R> {
13
        #[pin]
14
        inner: R,
15
        limit: u64,
16
    }
17
}
18
19
impl<R: AsyncRead> Take<R> {
20
0
    pub(super) fn new(inner: R, limit: u64) -> Self {
21
0
        Self { inner, limit }
22
0
    }
23
24
    /// Returns the remaining number of bytes that can be
25
    /// read before this instance will return EOF.
26
    ///
27
    /// # Note
28
    ///
29
    /// This instance may reach `EOF` after reading fewer bytes than indicated by
30
    /// this method if the underlying [`AsyncRead`] instance reaches EOF.
31
    ///
32
    /// # Examples
33
    ///
34
    /// ```
35
    /// # futures::executor::block_on(async {
36
    /// use futures::io::{AsyncReadExt, Cursor};
37
    ///
38
    /// let reader = Cursor::new(&b"12345678"[..]);
39
    /// let mut buffer = [0; 2];
40
    ///
41
    /// let mut take = reader.take(4);
42
    /// let n = take.read(&mut buffer).await?;
43
    ///
44
    /// assert_eq!(take.limit(), 2);
45
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
46
    /// ```
47
0
    pub fn limit(&self) -> u64 {
48
0
        self.limit
49
0
    }
50
51
    /// Sets the number of bytes that can be read before this instance will
52
    /// return EOF. This is the same as constructing a new `Take` instance, so
53
    /// the amount of bytes read and the previous limit value don't matter when
54
    /// calling this method.
55
    ///
56
    /// # Examples
57
    ///
58
    /// ```
59
    /// # futures::executor::block_on(async {
60
    /// use futures::io::{AsyncReadExt, Cursor};
61
    ///
62
    /// let reader = Cursor::new(&b"12345678"[..]);
63
    /// let mut buffer = [0; 4];
64
    ///
65
    /// let mut take = reader.take(4);
66
    /// let n = take.read(&mut buffer).await?;
67
    ///
68
    /// assert_eq!(n, 4);
69
    /// assert_eq!(take.limit(), 0);
70
    ///
71
    /// take.set_limit(10);
72
    /// let n = take.read(&mut buffer).await?;
73
    /// assert_eq!(n, 4);
74
    ///
75
    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
76
    /// ```
77
0
    pub fn set_limit(&mut self, limit: u64) {
78
0
        self.limit = limit
79
0
    }
80
81
    delegate_access_inner!(inner, R, ());
82
}
83
84
impl<R: AsyncRead> AsyncRead for Take<R> {
85
0
    fn poll_read(
86
0
        self: Pin<&mut Self>,
87
0
        cx: &mut Context<'_>,
88
0
        buf: &mut [u8],
89
0
    ) -> Poll<Result<usize, io::Error>> {
90
0
        let this = self.project();
91
0
92
0
        if *this.limit == 0 {
93
0
            return Poll::Ready(Ok(0));
94
0
        }
95
0
96
0
        let max = cmp::min(buf.len() as u64, *this.limit) as usize;
97
0
        let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?;
98
0
        *this.limit -= n as u64;
99
0
        Poll::Ready(Ok(n))
100
0
    }
101
}
102
103
impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
104
0
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
105
0
        let this = self.project();
106
0
107
0
        // Don't call into inner reader at all at EOF because it may still block
108
0
        if *this.limit == 0 {
109
0
            return Poll::Ready(Ok(&[]));
110
0
        }
111
112
0
        let buf = ready!(this.inner.poll_fill_buf(cx)?);
113
0
        let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
114
0
        Poll::Ready(Ok(&buf[..cap]))
115
0
    }
116
117
0
    fn consume(self: Pin<&mut Self>, amt: usize) {
118
0
        let this = self.project();
119
0
120
0
        // Don't let callers reset the limit by passing an overlarge value
121
0
        let amt = cmp::min(amt as u64, *this.limit) as usize;
122
0
        *this.limit -= amt as u64;
123
0
        this.inner.consume(amt);
124
0
    }
125
}