Coverage Report

Created: 2025-05-07 06:59

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-util-0.7.14/src/io/inspect.rs
Line
Count
Source (jump to first uncovered line)
1
use pin_project_lite::pin_project;
2
use std::io::{IoSlice, Result};
3
use std::pin::Pin;
4
use std::task::{ready, Context, Poll};
5
6
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
7
8
pin_project! {
9
    /// An adapter that lets you inspect the data that's being read.
10
    ///
11
    /// This is useful for things like hashing data as it's read in.
12
    pub struct InspectReader<R, F> {
13
        #[pin]
14
        reader: R,
15
        f: F,
16
    }
17
}
18
19
impl<R, F> InspectReader<R, F> {
20
    /// Create a new `InspectReader`, wrapping `reader` and calling `f` for the
21
    /// new data supplied by each read call.
22
    ///
23
    /// The closure will only be called with an empty slice if the inner reader
24
    /// returns without reading data into the buffer. This happens at EOF, or if
25
    /// `poll_read` is called with a zero-size buffer.
26
0
    pub fn new(reader: R, f: F) -> InspectReader<R, F>
27
0
    where
28
0
        R: AsyncRead,
29
0
        F: FnMut(&[u8]),
30
0
    {
31
0
        InspectReader { reader, f }
32
0
    }
33
34
    /// Consumes the `InspectReader`, returning the wrapped reader
35
0
    pub fn into_inner(self) -> R {
36
0
        self.reader
37
0
    }
38
}
39
40
impl<R: AsyncRead, F: FnMut(&[u8])> AsyncRead for InspectReader<R, F> {
41
0
    fn poll_read(
42
0
        self: Pin<&mut Self>,
43
0
        cx: &mut Context<'_>,
44
0
        buf: &mut ReadBuf<'_>,
45
0
    ) -> Poll<Result<()>> {
46
0
        let me = self.project();
47
0
        let filled_length = buf.filled().len();
48
0
        ready!(me.reader.poll_read(cx, buf))?;
49
0
        (me.f)(&buf.filled()[filled_length..]);
50
0
        Poll::Ready(Ok(()))
51
0
    }
52
}
53
54
impl<R: AsyncWrite, F> AsyncWrite for InspectReader<R, F> {
55
0
    fn poll_write(
56
0
        self: Pin<&mut Self>,
57
0
        cx: &mut Context<'_>,
58
0
        buf: &[u8],
59
0
    ) -> Poll<std::result::Result<usize, std::io::Error>> {
60
0
        self.project().reader.poll_write(cx, buf)
61
0
    }
62
63
0
    fn poll_flush(
64
0
        self: Pin<&mut Self>,
65
0
        cx: &mut Context<'_>,
66
0
    ) -> Poll<std::result::Result<(), std::io::Error>> {
67
0
        self.project().reader.poll_flush(cx)
68
0
    }
69
70
0
    fn poll_shutdown(
71
0
        self: Pin<&mut Self>,
72
0
        cx: &mut Context<'_>,
73
0
    ) -> Poll<std::result::Result<(), std::io::Error>> {
74
0
        self.project().reader.poll_shutdown(cx)
75
0
    }
76
77
0
    fn poll_write_vectored(
78
0
        self: Pin<&mut Self>,
79
0
        cx: &mut Context<'_>,
80
0
        bufs: &[IoSlice<'_>],
81
0
    ) -> Poll<Result<usize>> {
82
0
        self.project().reader.poll_write_vectored(cx, bufs)
83
0
    }
84
85
0
    fn is_write_vectored(&self) -> bool {
86
0
        self.reader.is_write_vectored()
87
0
    }
88
}
89
90
pin_project! {
91
    /// An adapter that lets you inspect the data that's being written.
92
    ///
93
    /// This is useful for things like hashing data as it's written out.
94
    pub struct InspectWriter<W, F> {
95
        #[pin]
96
        writer: W,
97
        f: F,
98
    }
99
}
100
101
impl<W, F> InspectWriter<W, F> {
102
    /// Create a new `InspectWriter`, wrapping `write` and calling `f` for the
103
    /// data successfully written by each write call.
104
    ///
105
    /// The closure `f` will never be called with an empty slice. A vectored
106
    /// write can result in multiple calls to `f` - at most one call to `f` per
107
    /// buffer supplied to `poll_write_vectored`.
108
0
    pub fn new(writer: W, f: F) -> InspectWriter<W, F>
109
0
    where
110
0
        W: AsyncWrite,
111
0
        F: FnMut(&[u8]),
112
0
    {
113
0
        InspectWriter { writer, f }
114
0
    }
115
116
    /// Consumes the `InspectWriter`, returning the wrapped writer
117
0
    pub fn into_inner(self) -> W {
118
0
        self.writer
119
0
    }
120
}
121
122
impl<W: AsyncWrite, F: FnMut(&[u8])> AsyncWrite for InspectWriter<W, F> {
123
0
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
124
0
        let me = self.project();
125
0
        let res = me.writer.poll_write(cx, buf);
126
0
        if let Poll::Ready(Ok(count)) = res {
127
0
            if count != 0 {
128
0
                (me.f)(&buf[..count]);
129
0
            }
130
0
        }
131
0
        res
132
0
    }
133
134
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
135
0
        let me = self.project();
136
0
        me.writer.poll_flush(cx)
137
0
    }
138
139
0
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
140
0
        let me = self.project();
141
0
        me.writer.poll_shutdown(cx)
142
0
    }
143
144
0
    fn poll_write_vectored(
145
0
        self: Pin<&mut Self>,
146
0
        cx: &mut Context<'_>,
147
0
        bufs: &[IoSlice<'_>],
148
0
    ) -> Poll<Result<usize>> {
149
0
        let me = self.project();
150
0
        let res = me.writer.poll_write_vectored(cx, bufs);
151
0
        if let Poll::Ready(Ok(mut count)) = res {
152
0
            for buf in bufs {
153
0
                if count == 0 {
154
0
                    break;
155
0
                }
156
0
                let size = count.min(buf.len());
157
0
                if size != 0 {
158
0
                    (me.f)(&buf[..size]);
159
0
                    count -= size;
160
0
                }
161
            }
162
0
        }
163
0
        res
164
0
    }
165
166
0
    fn is_write_vectored(&self) -> bool {
167
0
        self.writer.is_write_vectored()
168
0
    }
169
}
170
171
impl<W: AsyncRead, F> AsyncRead for InspectWriter<W, F> {
172
0
    fn poll_read(
173
0
        self: Pin<&mut Self>,
174
0
        cx: &mut Context<'_>,
175
0
        buf: &mut ReadBuf<'_>,
176
0
    ) -> Poll<std::io::Result<()>> {
177
0
        self.project().writer.poll_read(cx, buf)
178
0
    }
179
}