Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/codec/mod.rs
Line
Count
Source (jump to first uncovered line)
1
mod error;
2
mod framed_read;
3
mod framed_write;
4
5
pub use self::error::{SendError, UserError};
6
7
use self::framed_read::FramedRead;
8
use self::framed_write::FramedWrite;
9
10
use crate::frame::{self, Data, Frame};
11
use crate::proto::Error;
12
13
use bytes::Buf;
14
use futures_core::Stream;
15
use futures_sink::Sink;
16
use std::pin::Pin;
17
use std::task::{Context, Poll};
18
use tokio::io::{AsyncRead, AsyncWrite};
19
use tokio_util::codec::length_delimited;
20
21
use std::io;
22
23
#[derive(Debug)]
24
pub struct Codec<T, B> {
25
    inner: FramedRead<FramedWrite<T, B>>,
26
}
27
28
impl<T, B> Codec<T, B>
29
where
30
    T: AsyncRead + AsyncWrite + Unpin,
31
    B: Buf,
32
{
33
    /// Returns a new `Codec` with the default max frame size
34
    #[inline]
35
13.2k
    pub fn new(io: T) -> Self {
36
13.2k
        Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
37
13.2k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::new
<h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new
Line
Count
Source
35
445
    pub fn new(io: T) -> Self {
36
445
        Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
37
445
    }
<h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes>>::new
Line
Count
Source
35
445
    pub fn new(io: T) -> Self {
36
445
        Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
37
445
    }
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new
Line
Count
Source
35
12.3k
    pub fn new(io: T) -> Self {
36
12.3k
        Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
37
12.3k
    }
38
39
    /// Returns a new `Codec` with the given maximum frame size
40
13.2k
    pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
41
13.2k
        // Wrap with writer
42
13.2k
        let framed_write = FramedWrite::new(io);
43
13.2k
44
13.2k
        // Delimit the frames
45
13.2k
        let delimited = length_delimited::Builder::new()
46
13.2k
            .big_endian()
47
13.2k
            .length_field_length(3)
48
13.2k
            .length_adjustment(9)
49
13.2k
            .num_skip(0) // Don't skip the header
50
13.2k
            .new_read(framed_write);
51
13.2k
52
13.2k
        let mut inner = FramedRead::new(delimited);
53
13.2k
54
13.2k
        // Use FramedRead's method since it checks the value is within range.
55
13.2k
        inner.set_max_frame_size(max_frame_size);
56
13.2k
57
13.2k
        Codec { inner }
58
13.2k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::with_max_recv_frame_size
<h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::with_max_recv_frame_size
Line
Count
Source
40
445
    pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
41
445
        // Wrap with writer
42
445
        let framed_write = FramedWrite::new(io);
43
445
44
445
        // Delimit the frames
45
445
        let delimited = length_delimited::Builder::new()
46
445
            .big_endian()
47
445
            .length_field_length(3)
48
445
            .length_adjustment(9)
49
445
            .num_skip(0) // Don't skip the header
50
445
            .new_read(framed_write);
51
445
52
445
        let mut inner = FramedRead::new(delimited);
53
445
54
445
        // Use FramedRead's method since it checks the value is within range.
55
445
        inner.set_max_frame_size(max_frame_size);
56
445
57
445
        Codec { inner }
58
445
    }
<h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes>>::with_max_recv_frame_size
Line
Count
Source
40
445
    pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
41
445
        // Wrap with writer
42
445
        let framed_write = FramedWrite::new(io);
43
445
44
445
        // Delimit the frames
45
445
        let delimited = length_delimited::Builder::new()
46
445
            .big_endian()
47
445
            .length_field_length(3)
48
445
            .length_adjustment(9)
49
445
            .num_skip(0) // Don't skip the header
50
445
            .new_read(framed_write);
51
445
52
445
        let mut inner = FramedRead::new(delimited);
53
445
54
445
        // Use FramedRead's method since it checks the value is within range.
55
445
        inner.set_max_frame_size(max_frame_size);
56
445
57
445
        Codec { inner }
58
445
    }
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::with_max_recv_frame_size
Line
Count
Source
40
12.3k
    pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
41
12.3k
        // Wrap with writer
42
12.3k
        let framed_write = FramedWrite::new(io);
43
12.3k
44
12.3k
        // Delimit the frames
45
12.3k
        let delimited = length_delimited::Builder::new()
46
12.3k
            .big_endian()
47
12.3k
            .length_field_length(3)
48
12.3k
            .length_adjustment(9)
49
12.3k
            .num_skip(0) // Don't skip the header
50
12.3k
            .new_read(framed_write);
51
12.3k
52
12.3k
        let mut inner = FramedRead::new(delimited);
53
12.3k
54
12.3k
        // Use FramedRead's method since it checks the value is within range.
55
12.3k
        inner.set_max_frame_size(max_frame_size);
56
12.3k
57
12.3k
        Codec { inner }
58
12.3k
    }
59
}
60
61
impl<T, B> Codec<T, B> {
62
    /// Updates the max received frame size.
63
    ///
64
    /// The change takes effect the next time a frame is decoded. In other
65
    /// words, if a frame is currently in process of being decoded with a frame
66
    /// size greater than `val` but less than the max frame size in effect
67
    /// before calling this function, then the frame will be allowed.
68
    #[inline]
69
0
    pub fn set_max_recv_frame_size(&mut self, val: usize) {
70
0
        self.inner.set_max_frame_size(val)
71
0
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::set_max_recv_frame_size
Unexecuted instantiation: <h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_recv_frame_size
Unexecuted instantiation: <h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_recv_frame_size
72
73
    /// Returns the current max received frame size setting.
74
    ///
75
    /// This is the largest size this codec will accept from the wire. Larger
76
    /// frames will be rejected.
77
    #[cfg(feature = "unstable")]
78
    #[inline]
79
0
    pub fn max_recv_frame_size(&self) -> usize {
80
0
        self.inner.max_frame_size()
81
0
    }
82
83
    /// Returns the max frame size that can be sent to the peer.
84
381k
    pub fn max_send_frame_size(&self) -> usize {
85
381k
        self.inner.get_ref().max_frame_size()
86
381k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::max_send_frame_size
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_send_frame_size
Line
Count
Source
84
381k
    pub fn max_send_frame_size(&self) -> usize {
85
381k
        self.inner.get_ref().max_frame_size()
86
381k
    }
87
88
    /// Set the peer's max frame size.
89
329
    pub fn set_max_send_frame_size(&mut self, val: usize) {
90
329
        self.framed_write().set_max_frame_size(val)
91
329
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::set_max_send_frame_size
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_send_frame_size
Line
Count
Source
89
329
    pub fn set_max_send_frame_size(&mut self, val: usize) {
90
329
        self.framed_write().set_max_frame_size(val)
91
329
    }
92
93
    /// Set the peer's header table size size.
94
1.68k
    pub fn set_send_header_table_size(&mut self, val: usize) {
95
1.68k
        self.framed_write().set_header_table_size(val)
96
1.68k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::set_send_header_table_size
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_send_header_table_size
Line
Count
Source
94
1.68k
    pub fn set_send_header_table_size(&mut self, val: usize) {
95
1.68k
        self.framed_write().set_header_table_size(val)
96
1.68k
    }
97
98
    /// Set the decoder header table size size.
99
0
    pub fn set_recv_header_table_size(&mut self, val: usize) {
100
0
        self.inner.set_header_table_size(val)
101
0
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::set_recv_header_table_size
Unexecuted instantiation: <h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_recv_header_table_size
102
103
    /// Set the max header list size that can be received.
104
0
    pub fn set_max_recv_header_list_size(&mut self, val: usize) {
105
0
        self.inner.set_max_header_list_size(val);
106
0
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::set_max_recv_header_list_size
Unexecuted instantiation: <h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_recv_header_list_size
Unexecuted instantiation: <h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_recv_header_list_size
107
108
    /// Get a reference to the inner stream.
109
    #[cfg(feature = "unstable")]
110
0
    pub fn get_ref(&self) -> &T {
111
0
        self.inner.get_ref().get_ref()
112
0
    }
113
114
    /// Get a mutable reference to the inner stream.
115
445
    pub fn get_mut(&mut self) -> &mut T {
116
445
        self.inner.get_mut().get_mut()
117
445
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::get_mut
<h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes>>::get_mut
Line
Count
Source
115
445
    pub fn get_mut(&mut self) -> &mut T {
116
445
        self.inner.get_mut().get_mut()
117
445
    }
118
119
    /// Takes the data payload value that was fully written to the socket
120
929k
    pub(crate) fn take_last_data_frame(&mut self) -> Option<Data<B>> {
121
929k
        self.framed_write().take_last_data_frame()
122
929k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::take_last_data_frame
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::take_last_data_frame
Line
Count
Source
120
929k
    pub(crate) fn take_last_data_frame(&mut self) -> Option<Data<B>> {
121
929k
        self.framed_write().take_last_data_frame()
122
929k
    }
123
124
3.86M
    fn framed_write(&mut self) -> &mut FramedWrite<T, B> {
125
3.86M
        self.inner.get_mut()
126
3.86M
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::framed_write
<h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::framed_write
Line
Count
Source
124
445
    fn framed_write(&mut self) -> &mut FramedWrite<T, B> {
125
445
        self.inner.get_mut()
126
445
    }
<h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes>>::framed_write
Line
Count
Source
124
445
    fn framed_write(&mut self) -> &mut FramedWrite<T, B> {
125
445
        self.inner.get_mut()
126
445
    }
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::framed_write
Line
Count
Source
124
3.86M
    fn framed_write(&mut self) -> &mut FramedWrite<T, B> {
125
3.86M
        self.inner.get_mut()
126
3.86M
    }
127
}
128
129
impl<T, B> Codec<T, B>
130
where
131
    T: AsyncWrite + Unpin,
132
    B: Buf,
133
{
134
    /// Returns `Ready` when the codec can buffer a frame
135
2.30M
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
136
2.30M
        self.framed_write().poll_ready(cx)
137
2.30M
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::poll_ready
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::poll_ready
Line
Count
Source
135
2.30M
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
136
2.30M
        self.framed_write().poll_ready(cx)
137
2.30M
    }
138
139
    /// Buffer a frame.
140
    ///
141
    /// `poll_ready` must be called first to ensure that a frame may be
142
    /// accepted.
143
    ///
144
    /// TODO: Rename this to avoid conflicts with Sink::buffer
145
231k
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
146
231k
        self.framed_write().buffer(item)
147
231k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::buffer
<h2::codec::Codec<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
145
445
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
146
445
        self.framed_write().buffer(item)
147
445
    }
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
145
230k
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
146
230k
        self.framed_write().buffer(item)
147
230k
    }
148
149
    /// Flush buffered data to the wire
150
379k
    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
151
379k
        self.framed_write().flush(cx)
152
379k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::flush
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::flush
Line
Count
Source
150
379k
    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
151
379k
        self.framed_write().flush(cx)
152
379k
    }
153
154
    /// Shutdown the send half
155
11.4k
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
156
11.4k
        self.framed_write().shutdown(cx)
157
11.4k
    }
Unexecuted instantiation: <h2::codec::Codec<_, _>>::shutdown
<h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes>>::shutdown
Line
Count
Source
155
445
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
156
445
        self.framed_write().shutdown(cx)
157
445
    }
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::shutdown
Line
Count
Source
155
11.0k
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
156
11.0k
        self.framed_write().shutdown(cx)
157
11.0k
    }
158
}
159
160
impl<T, B> Stream for Codec<T, B>
161
where
162
    T: AsyncRead + Unpin,
163
{
164
    type Item = Result<Frame, Error>;
165
166
1.61M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167
1.61M
        Pin::new(&mut self.inner).poll_next(cx)
168
1.61M
    }
Unexecuted instantiation: <h2::codec::Codec<_, _> as futures_core::stream::Stream>::poll_next
Unexecuted instantiation: <h2::codec::Codec<h2_support::mock::Pipe, bytes::bytes::Bytes> as futures_core::stream::Stream>::poll_next
<h2::codec::Codec<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>> as futures_core::stream::Stream>::poll_next
Line
Count
Source
166
1.61M
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167
1.61M
        Pin::new(&mut self.inner).poll_next(cx)
168
1.61M
    }
169
}
170
171
impl<T, B> Sink<Frame<B>> for Codec<T, B>
172
where
173
    T: AsyncWrite + Unpin,
174
    B: Buf,
175
{
176
    type Error = SendError;
177
178
0
    fn start_send(mut self: Pin<&mut Self>, item: Frame<B>) -> Result<(), Self::Error> {
179
0
        Codec::buffer(&mut self, item)?;
180
0
        Ok(())
181
0
    }
182
    /// Returns `Ready` when the codec can buffer a frame
183
0
    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
184
0
        self.framed_write().poll_ready(cx).map_err(Into::into)
185
0
    }
186
187
    /// Flush buffered data to the wire
188
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189
0
        self.framed_write().flush(cx).map_err(Into::into)
190
0
    }
191
192
0
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
193
0
        ready!(self.shutdown(cx))?;
194
0
        Poll::Ready(Ok(()))
195
0
    }
196
}
197
198
// TODO: remove (or improve) this
199
impl<T> From<T> for Codec<T, bytes::Bytes>
200
where
201
    T: AsyncRead + AsyncWrite + Unpin,
202
{
203
0
    fn from(src: T) -> Self {
204
0
        Self::new(src)
205
0
    }
206
}