Coverage Report

Created: 2025-07-12 06:22

/src/h2/src/codec/framed_write.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::codec::UserError;
2
use crate::codec::UserError::*;
3
use crate::frame::{self, Frame, FrameSize};
4
use crate::hpack;
5
6
use bytes::{Buf, BufMut, BytesMut};
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
10
use tokio_util::io::poll_write_buf;
11
12
use std::io::{self, Cursor};
13
14
// A macro to get around a method needing to borrow &mut self
15
macro_rules! limited_write_buf {
16
    ($self:expr) => {{
17
        let limit = $self.max_frame_size() + frame::HEADER_LEN;
18
        $self.buf.get_mut().limit(limit)
19
    }};
20
}
21
22
#[derive(Debug)]
23
pub struct FramedWrite<T, B> {
24
    /// Upstream `AsyncWrite`
25
    inner: T,
26
    final_flush_done: bool,
27
28
    encoder: Encoder<B>,
29
}
30
31
#[derive(Debug)]
32
struct Encoder<B> {
33
    /// HPACK encoder
34
    hpack: hpack::Encoder,
35
36
    /// Write buffer
37
    ///
38
    /// TODO: Should this be a ring buffer?
39
    buf: Cursor<BytesMut>,
40
41
    /// Next frame to encode
42
    next: Option<Next<B>>,
43
44
    /// Last data frame
45
    last_data_frame: Option<frame::Data<B>>,
46
47
    /// Max frame size, this is specified by the peer
48
    max_frame_size: FrameSize,
49
50
    /// Chain payloads bigger than this.
51
    chain_threshold: usize,
52
53
    /// Min buffer required to attempt to write a frame
54
    min_buffer_capacity: usize,
55
}
56
57
#[derive(Debug)]
58
enum Next<B> {
59
    Data(frame::Data<B>),
60
    Continuation(frame::Continuation),
61
}
62
63
/// Initialize the connection with this amount of write buffer.
64
///
65
/// The minimum MAX_FRAME_SIZE is 16kb, so always be able to send a HEADERS
66
/// frame that big.
67
const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024;
68
69
/// Chain payloads bigger than this when vectored I/O is enabled. The remote
70
/// will never advertise a max frame size less than this (well, the spec says
71
/// the max frame size can't be less than 16kb, so not even close).
72
const CHAIN_THRESHOLD: usize = 256;
73
74
/// Chain payloads bigger than this when vectored I/O is **not** enabled.
75
/// A larger value in this scenario will reduce the number of small and
76
/// fragmented data being sent, and hereby improve the throughput.
77
const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024;
78
79
// TODO: Make generic
80
impl<T, B> FramedWrite<T, B>
81
where
82
    T: AsyncWrite + Unpin,
83
    B: Buf,
84
{
85
12.5k
    pub fn new(inner: T) -> FramedWrite<T, B> {
86
12.5k
        let chain_threshold = if inner.is_write_vectored() {
87
0
            CHAIN_THRESHOLD
88
        } else {
89
12.5k
            CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
90
        };
91
12.5k
        FramedWrite {
92
12.5k
            inner,
93
12.5k
            final_flush_done: false,
94
12.5k
            encoder: Encoder {
95
12.5k
                hpack: hpack::Encoder::default(),
96
12.5k
                buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
97
12.5k
                next: None,
98
12.5k
                last_data_frame: None,
99
12.5k
                max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
100
12.5k
                chain_threshold,
101
12.5k
                min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
102
12.5k
            },
103
12.5k
        }
104
12.5k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::new
<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new
Line
Count
Source
85
448
    pub fn new(inner: T) -> FramedWrite<T, B> {
86
448
        let chain_threshold = if inner.is_write_vectored() {
87
0
            CHAIN_THRESHOLD
88
        } else {
89
448
            CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
90
        };
91
448
        FramedWrite {
92
448
            inner,
93
448
            final_flush_done: false,
94
448
            encoder: Encoder {
95
448
                hpack: hpack::Encoder::default(),
96
448
                buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
97
448
                next: None,
98
448
                last_data_frame: None,
99
448
                max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
100
448
                chain_threshold,
101
448
                min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
102
448
            },
103
448
        }
104
448
    }
<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::new
Line
Count
Source
85
448
    pub fn new(inner: T) -> FramedWrite<T, B> {
86
448
        let chain_threshold = if inner.is_write_vectored() {
87
0
            CHAIN_THRESHOLD
88
        } else {
89
448
            CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
90
        };
91
448
        FramedWrite {
92
448
            inner,
93
448
            final_flush_done: false,
94
448
            encoder: Encoder {
95
448
                hpack: hpack::Encoder::default(),
96
448
                buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
97
448
                next: None,
98
448
                last_data_frame: None,
99
448
                max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
100
448
                chain_threshold,
101
448
                min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
102
448
            },
103
448
        }
104
448
    }
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new
Line
Count
Source
85
11.6k
    pub fn new(inner: T) -> FramedWrite<T, B> {
86
11.6k
        let chain_threshold = if inner.is_write_vectored() {
87
0
            CHAIN_THRESHOLD
88
        } else {
89
11.6k
            CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
90
        };
91
11.6k
        FramedWrite {
92
11.6k
            inner,
93
11.6k
            final_flush_done: false,
94
11.6k
            encoder: Encoder {
95
11.6k
                hpack: hpack::Encoder::default(),
96
11.6k
                buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
97
11.6k
                next: None,
98
11.6k
                last_data_frame: None,
99
11.6k
                max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
100
11.6k
                chain_threshold,
101
11.6k
                min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
102
11.6k
            },
103
11.6k
        }
104
11.6k
    }
105
106
    /// Returns `Ready` when `send` is able to accept a frame
107
    ///
108
    /// Calling this function may result in the current contents of the buffer
109
    /// to be flushed to `T`.
110
1.33M
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
111
1.33M
        if !self.encoder.has_capacity() {
112
            // Try flushing
113
919k
            ready!(self.flush(cx))?;
114
115
2.66k
            if !self.encoder.has_capacity() {
116
0
                return Poll::Pending;
117
2.66k
            }
118
419k
        }
119
120
422k
        Poll::Ready(Ok(()))
121
1.33M
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::poll_ready
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::poll_ready
Line
Count
Source
110
1.33M
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
111
1.33M
        if !self.encoder.has_capacity() {
112
            // Try flushing
113
919k
            ready!(self.flush(cx))?;
114
115
2.66k
            if !self.encoder.has_capacity() {
116
0
                return Poll::Pending;
117
2.66k
            }
118
419k
        }
119
120
422k
        Poll::Ready(Ok(()))
121
1.33M
    }
122
123
    /// Buffer a frame.
124
    ///
125
    /// `poll_ready` must be called first to ensure that a frame may be
126
    /// accepted.
127
231k
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
128
231k
        self.encoder.buffer(item)
129
231k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::buffer
<h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
127
448
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
128
448
        self.encoder.buffer(item)
129
448
    }
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
127
231k
    pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
128
231k
        self.encoder.buffer(item)
129
231k
    }
130
131
    /// Flush buffered data to the wire
132
1.03M
    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
133
1.03M
        let span = tracing::trace_span!("FramedWrite::flush");
134
1.03M
        let _e = span.enter();
135
136
        loop {
137
1.04M
            while !self.encoder.is_empty() {
138
854k
                match self.encoder.next {
139
854k
                    Some(Next::Data(ref mut frame)) => {
140
854k
                        tracing::trace!(queued_data_frame = true);
141
854k
                        let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
142
854k
                        ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
143
                    }
144
                    _ => {
145
162k
                        tracing::trace!(queued_data_frame = false);
146
162k
                        ready!(poll_write_buf(
147
162k
                            Pin::new(&mut self.inner),
148
162k
                            cx,
149
162k
                            &mut self.encoder.buf
150
162k
                        ))?
151
                    }
152
                };
153
            }
154
155
25.3k
            match self.encoder.unset_frame() {
156
0
                ControlFlow::Continue => (),
157
25.3k
                ControlFlow::Break => break,
158
25.3k
            }
159
25.3k
        }
160
25.3k
161
25.3k
        tracing::trace!("flushing buffer");
162
        // Flush the upstream
163
25.3k
        ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
164
165
25.3k
        Poll::Ready(Ok(()))
166
1.03M
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::flush
<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::flush
Line
Count
Source
132
448
    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
133
448
        let span = tracing::trace_span!("FramedWrite::flush");
134
448
        let _e = span.enter();
135
136
        loop {
137
448
            while !self.encoder.is_empty() {
138
0
                match self.encoder.next {
139
0
                    Some(Next::Data(ref mut frame)) => {
140
0
                        tracing::trace!(queued_data_frame = true);
141
0
                        let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
142
0
                        ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
143
                    }
144
                    _ => {
145
0
                        tracing::trace!(queued_data_frame = false);
146
0
                        ready!(poll_write_buf(
147
0
                            Pin::new(&mut self.inner),
148
0
                            cx,
149
0
                            &mut self.encoder.buf
150
0
                        ))?
151
                    }
152
                };
153
            }
154
155
448
            match self.encoder.unset_frame() {
156
0
                ControlFlow::Continue => (),
157
448
                ControlFlow::Break => break,
158
448
            }
159
448
        }
160
448
161
448
        tracing::trace!("flushing buffer");
162
        // Flush the upstream
163
448
        ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
164
165
448
        Poll::Ready(Ok(()))
166
448
    }
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::flush
Line
Count
Source
132
1.03M
    pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
133
1.03M
        let span = tracing::trace_span!("FramedWrite::flush");
134
1.03M
        let _e = span.enter();
135
136
        loop {
137
1.04M
            while !self.encoder.is_empty() {
138
854k
                match self.encoder.next {
139
854k
                    Some(Next::Data(ref mut frame)) => {
140
854k
                        tracing::trace!(queued_data_frame = true);
141
854k
                        let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
142
854k
                        ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
143
                    }
144
                    _ => {
145
162k
                        tracing::trace!(queued_data_frame = false);
146
162k
                        ready!(poll_write_buf(
147
162k
                            Pin::new(&mut self.inner),
148
162k
                            cx,
149
162k
                            &mut self.encoder.buf
150
162k
                        ))?
151
                    }
152
                };
153
            }
154
155
24.9k
            match self.encoder.unset_frame() {
156
0
                ControlFlow::Continue => (),
157
24.9k
                ControlFlow::Break => break,
158
24.9k
            }
159
24.9k
        }
160
24.9k
161
24.9k
        tracing::trace!("flushing buffer");
162
        // Flush the upstream
163
24.9k
        ready!(Pin::new(&mut self.inner).poll_flush(cx))?;
164
165
24.9k
        Poll::Ready(Ok(()))
166
1.03M
    }
167
168
    /// Close the codec
169
10.4k
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
170
10.4k
        if !self.final_flush_done {
171
10.4k
            ready!(self.flush(cx))?;
172
1.34k
            self.final_flush_done = true;
173
0
        }
174
1.34k
        Pin::new(&mut self.inner).poll_shutdown(cx)
175
10.4k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::shutdown
<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::shutdown
Line
Count
Source
169
448
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
170
448
        if !self.final_flush_done {
171
448
            ready!(self.flush(cx))?;
172
448
            self.final_flush_done = true;
173
0
        }
174
448
        Pin::new(&mut self.inner).poll_shutdown(cx)
175
448
    }
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::shutdown
Line
Count
Source
169
9.96k
    pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
170
9.96k
        if !self.final_flush_done {
171
9.96k
            ready!(self.flush(cx))?;
172
895
            self.final_flush_done = true;
173
0
        }
174
895
        Pin::new(&mut self.inner).poll_shutdown(cx)
175
9.96k
    }
176
}
177
178
#[must_use]
179
enum ControlFlow {
180
    Continue,
181
    Break,
182
}
183
184
impl<B> Encoder<B>
185
where
186
    B: Buf,
187
{
188
25.3k
    fn unset_frame(&mut self) -> ControlFlow {
189
25.3k
        // Clear internal buffer
190
25.3k
        self.buf.set_position(0);
191
25.3k
        self.buf.get_mut().clear();
192
25.3k
193
25.3k
        // The data frame has been written, so unset it
194
25.3k
        match self.next.take() {
195
2.56k
            Some(Next::Data(frame)) => {
196
2.56k
                self.last_data_frame = Some(frame);
197
2.56k
                debug_assert!(self.is_empty());
198
2.56k
                ControlFlow::Break
199
            }
200
0
            Some(Next::Continuation(frame)) => {
201
0
                // Buffer the continuation frame, then try to write again
202
0
                let mut buf = limited_write_buf!(self);
203
0
                if let Some(continuation) = frame.encode(&mut buf) {
204
0
                    self.next = Some(Next::Continuation(continuation));
205
0
                }
206
0
                ControlFlow::Continue
207
            }
208
22.8k
            None => ControlFlow::Break,
209
        }
210
25.3k
    }
Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::unset_frame
<h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::unset_frame
Line
Count
Source
188
448
    fn unset_frame(&mut self) -> ControlFlow {
189
448
        // Clear internal buffer
190
448
        self.buf.set_position(0);
191
448
        self.buf.get_mut().clear();
192
448
193
448
        // The data frame has been written, so unset it
194
448
        match self.next.take() {
195
0
            Some(Next::Data(frame)) => {
196
0
                self.last_data_frame = Some(frame);
197
0
                debug_assert!(self.is_empty());
198
0
                ControlFlow::Break
199
            }
200
0
            Some(Next::Continuation(frame)) => {
201
0
                // Buffer the continuation frame, then try to write again
202
0
                let mut buf = limited_write_buf!(self);
203
0
                if let Some(continuation) = frame.encode(&mut buf) {
204
0
                    self.next = Some(Next::Continuation(continuation));
205
0
                }
206
0
                ControlFlow::Continue
207
            }
208
448
            None => ControlFlow::Break,
209
        }
210
448
    }
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::unset_frame
Line
Count
Source
188
24.9k
    fn unset_frame(&mut self) -> ControlFlow {
189
24.9k
        // Clear internal buffer
190
24.9k
        self.buf.set_position(0);
191
24.9k
        self.buf.get_mut().clear();
192
24.9k
193
24.9k
        // The data frame has been written, so unset it
194
24.9k
        match self.next.take() {
195
2.56k
            Some(Next::Data(frame)) => {
196
2.56k
                self.last_data_frame = Some(frame);
197
2.56k
                debug_assert!(self.is_empty());
198
2.56k
                ControlFlow::Break
199
            }
200
0
            Some(Next::Continuation(frame)) => {
201
0
                // Buffer the continuation frame, then try to write again
202
0
                let mut buf = limited_write_buf!(self);
203
0
                if let Some(continuation) = frame.encode(&mut buf) {
204
0
                    self.next = Some(Next::Continuation(continuation));
205
0
                }
206
0
                ControlFlow::Continue
207
            }
208
22.3k
            None => ControlFlow::Break,
209
        }
210
24.9k
    }
211
212
231k
    fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
213
231k
        // Ensure that we have enough capacity to accept the write.
214
231k
        assert!(self.has_capacity());
215
231k
        let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
216
231k
        let _e = span.enter();
217
231k
218
231k
        tracing::debug!(frame = ?item, "send");
219
220
231k
        match item {
221
18.2k
            Frame::Data(mut v) => {
222
18.2k
                // Ensure that the payload is not greater than the max frame.
223
18.2k
                let len = v.payload().remaining();
224
18.2k
225
18.2k
                if len > self.max_frame_size() {
226
0
                    return Err(PayloadTooBig);
227
18.2k
                }
228
18.2k
229
18.2k
                if len >= self.chain_threshold {
230
3.76k
                    let head = v.head();
231
3.76k
232
3.76k
                    // Encode the frame head to the buffer
233
3.76k
                    head.encode(len, self.buf.get_mut());
234
3.76k
235
3.76k
                    if self.buf.get_ref().remaining() < self.chain_threshold {
236
2.43k
                        let extra_bytes = self.chain_threshold - self.buf.remaining();
237
2.43k
                        self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
238
2.43k
                    }
239
240
                    // Save the data frame
241
3.76k
                    self.next = Some(Next::Data(v));
242
                } else {
243
14.4k
                    v.encode_chunk(self.buf.get_mut());
244
14.4k
245
14.4k
                    // The chunk has been fully encoded, so there is no need to
246
14.4k
                    // keep it around
247
14.4k
                    assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded");
248
249
                    // Save off the last frame...
250
14.4k
                    self.last_data_frame = Some(v);
251
                }
252
            }
253
185k
            Frame::Headers(v) => {
254
185k
                let mut buf = limited_write_buf!(self);
255
185k
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
256
0
                    self.next = Some(Next::Continuation(continuation));
257
185k
                }
258
            }
259
0
            Frame::PushPromise(v) => {
260
0
                let mut buf = limited_write_buf!(self);
261
0
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
262
0
                    self.next = Some(Next::Continuation(continuation));
263
0
                }
264
            }
265
19.1k
            Frame::Settings(v) => {
266
19.1k
                v.encode(self.buf.get_mut());
267
19.1k
                tracing::trace!(rem = self.buf.remaining(), "encoded settings");
268
            }
269
5.69k
            Frame::GoAway(v) => {
270
5.69k
                v.encode(self.buf.get_mut());
271
5.69k
                tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
272
            }
273
207
            Frame::Ping(v) => {
274
207
                v.encode(self.buf.get_mut());
275
207
                tracing::trace!(rem = self.buf.remaining(), "encoded ping");
276
            }
277
108
            Frame::WindowUpdate(v) => {
278
108
                v.encode(self.buf.get_mut());
279
108
                tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
280
            }
281
282
            Frame::Priority(_) => {
283
                /*
284
                v.encode(self.buf.get_mut());
285
                tracing::trace!("encoded priority; rem={:?}", self.buf.remaining());
286
                */
287
0
                unimplemented!();
288
            }
289
2.35k
            Frame::Reset(v) => {
290
2.35k
                v.encode(self.buf.get_mut());
291
2.35k
                tracing::trace!(rem = self.buf.remaining(), "encoded reset");
292
            }
293
        }
294
295
231k
        Ok(())
296
231k
    }
Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::buffer
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
212
448
    fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
213
448
        // Ensure that we have enough capacity to accept the write.
214
448
        assert!(self.has_capacity());
215
448
        let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
216
448
        let _e = span.enter();
217
448
218
448
        tracing::debug!(frame = ?item, "send");
219
220
448
        match item {
221
0
            Frame::Data(mut v) => {
222
0
                // Ensure that the payload is not greater than the max frame.
223
0
                let len = v.payload().remaining();
224
0
225
0
                if len > self.max_frame_size() {
226
0
                    return Err(PayloadTooBig);
227
0
                }
228
0
229
0
                if len >= self.chain_threshold {
230
0
                    let head = v.head();
231
0
232
0
                    // Encode the frame head to the buffer
233
0
                    head.encode(len, self.buf.get_mut());
234
0
235
0
                    if self.buf.get_ref().remaining() < self.chain_threshold {
236
0
                        let extra_bytes = self.chain_threshold - self.buf.remaining();
237
0
                        self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
238
0
                    }
239
240
                    // Save the data frame
241
0
                    self.next = Some(Next::Data(v));
242
                } else {
243
0
                    v.encode_chunk(self.buf.get_mut());
244
0
245
0
                    // The chunk has been fully encoded, so there is no need to
246
0
                    // keep it around
247
0
                    assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded");
248
249
                    // Save off the last frame...
250
0
                    self.last_data_frame = Some(v);
251
                }
252
            }
253
0
            Frame::Headers(v) => {
254
0
                let mut buf = limited_write_buf!(self);
255
0
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
256
0
                    self.next = Some(Next::Continuation(continuation));
257
0
                }
258
            }
259
0
            Frame::PushPromise(v) => {
260
0
                let mut buf = limited_write_buf!(self);
261
0
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
262
0
                    self.next = Some(Next::Continuation(continuation));
263
0
                }
264
            }
265
448
            Frame::Settings(v) => {
266
448
                v.encode(self.buf.get_mut());
267
448
                tracing::trace!(rem = self.buf.remaining(), "encoded settings");
268
            }
269
0
            Frame::GoAway(v) => {
270
0
                v.encode(self.buf.get_mut());
271
0
                tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
272
            }
273
0
            Frame::Ping(v) => {
274
0
                v.encode(self.buf.get_mut());
275
0
                tracing::trace!(rem = self.buf.remaining(), "encoded ping");
276
            }
277
0
            Frame::WindowUpdate(v) => {
278
0
                v.encode(self.buf.get_mut());
279
0
                tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
280
            }
281
282
            Frame::Priority(_) => {
283
                /*
284
                v.encode(self.buf.get_mut());
285
                tracing::trace!("encoded priority; rem={:?}", self.buf.remaining());
286
                */
287
0
                unimplemented!();
288
            }
289
0
            Frame::Reset(v) => {
290
0
                v.encode(self.buf.get_mut());
291
0
                tracing::trace!(rem = self.buf.remaining(), "encoded reset");
292
            }
293
        }
294
295
448
        Ok(())
296
448
    }
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer
Line
Count
Source
212
231k
    fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> {
213
231k
        // Ensure that we have enough capacity to accept the write.
214
231k
        assert!(self.has_capacity());
215
231k
        let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item);
216
231k
        let _e = span.enter();
217
231k
218
231k
        tracing::debug!(frame = ?item, "send");
219
220
231k
        match item {
221
18.2k
            Frame::Data(mut v) => {
222
18.2k
                // Ensure that the payload is not greater than the max frame.
223
18.2k
                let len = v.payload().remaining();
224
18.2k
225
18.2k
                if len > self.max_frame_size() {
226
0
                    return Err(PayloadTooBig);
227
18.2k
                }
228
18.2k
229
18.2k
                if len >= self.chain_threshold {
230
3.76k
                    let head = v.head();
231
3.76k
232
3.76k
                    // Encode the frame head to the buffer
233
3.76k
                    head.encode(len, self.buf.get_mut());
234
3.76k
235
3.76k
                    if self.buf.get_ref().remaining() < self.chain_threshold {
236
2.43k
                        let extra_bytes = self.chain_threshold - self.buf.remaining();
237
2.43k
                        self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
238
2.43k
                    }
239
240
                    // Save the data frame
241
3.76k
                    self.next = Some(Next::Data(v));
242
                } else {
243
14.4k
                    v.encode_chunk(self.buf.get_mut());
244
14.4k
245
14.4k
                    // The chunk has been fully encoded, so there is no need to
246
14.4k
                    // keep it around
247
14.4k
                    assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded");
248
249
                    // Save off the last frame...
250
14.4k
                    self.last_data_frame = Some(v);
251
                }
252
            }
253
185k
            Frame::Headers(v) => {
254
185k
                let mut buf = limited_write_buf!(self);
255
185k
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
256
0
                    self.next = Some(Next::Continuation(continuation));
257
185k
                }
258
            }
259
0
            Frame::PushPromise(v) => {
260
0
                let mut buf = limited_write_buf!(self);
261
0
                if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) {
262
0
                    self.next = Some(Next::Continuation(continuation));
263
0
                }
264
            }
265
18.7k
            Frame::Settings(v) => {
266
18.7k
                v.encode(self.buf.get_mut());
267
18.7k
                tracing::trace!(rem = self.buf.remaining(), "encoded settings");
268
            }
269
5.69k
            Frame::GoAway(v) => {
270
5.69k
                v.encode(self.buf.get_mut());
271
5.69k
                tracing::trace!(rem = self.buf.remaining(), "encoded go_away");
272
            }
273
207
            Frame::Ping(v) => {
274
207
                v.encode(self.buf.get_mut());
275
207
                tracing::trace!(rem = self.buf.remaining(), "encoded ping");
276
            }
277
108
            Frame::WindowUpdate(v) => {
278
108
                v.encode(self.buf.get_mut());
279
108
                tracing::trace!(rem = self.buf.remaining(), "encoded window_update");
280
            }
281
282
            Frame::Priority(_) => {
283
                /*
284
                v.encode(self.buf.get_mut());
285
                tracing::trace!("encoded priority; rem={:?}", self.buf.remaining());
286
                */
287
0
                unimplemented!();
288
            }
289
2.35k
            Frame::Reset(v) => {
290
2.35k
                v.encode(self.buf.get_mut());
291
2.35k
                tracing::trace!(rem = self.buf.remaining(), "encoded reset");
292
            }
293
        }
294
295
231k
        Ok(())
296
231k
    }
297
298
1.57M
    fn has_capacity(&self) -> bool {
299
1.57M
        self.next.is_none()
300
724k
            && (self.buf.get_ref().capacity() - self.buf.get_ref().len()
301
724k
                >= self.min_buffer_capacity)
302
1.57M
    }
Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::has_capacity
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::has_capacity
Line
Count
Source
298
448
    fn has_capacity(&self) -> bool {
299
448
        self.next.is_none()
300
448
            && (self.buf.get_ref().capacity() - self.buf.get_ref().len()
301
448
                >= self.min_buffer_capacity)
302
448
    }
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::has_capacity
Line
Count
Source
298
1.57M
    fn has_capacity(&self) -> bool {
299
1.57M
        self.next.is_none()
300
724k
            && (self.buf.get_ref().capacity() - self.buf.get_ref().len()
301
724k
                >= self.min_buffer_capacity)
302
1.57M
    }
303
304
1.04M
    fn is_empty(&self) -> bool {
305
856k
        match self.next {
306
856k
            Some(Next::Data(ref frame)) => !frame.payload().has_remaining(),
307
185k
            _ => !self.buf.has_remaining(),
308
        }
309
1.04M
    }
Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::is_empty
<h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::is_empty
Line
Count
Source
304
448
    fn is_empty(&self) -> bool {
305
0
        match self.next {
306
0
            Some(Next::Data(ref frame)) => !frame.payload().has_remaining(),
307
448
            _ => !self.buf.has_remaining(),
308
        }
309
448
    }
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::is_empty
Line
Count
Source
304
1.04M
    fn is_empty(&self) -> bool {
305
856k
        match self.next {
306
856k
            Some(Next::Data(ref frame)) => !frame.payload().has_remaining(),
307
184k
            _ => !self.buf.has_remaining(),
308
        }
309
1.04M
    }
310
}
311
312
impl<B> Encoder<B> {
313
306k
    fn max_frame_size(&self) -> usize {
314
306k
        self.max_frame_size as usize
315
306k
    }
Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::max_frame_size
Unexecuted instantiation: <h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size
Unexecuted instantiation: <h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::max_frame_size
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size
Line
Count
Source
313
306k
    fn max_frame_size(&self) -> usize {
314
306k
        self.max_frame_size as usize
315
306k
    }
316
}
317
318
impl<T, B> FramedWrite<T, B> {
319
    /// Returns the max frame size that can be sent
320
102k
    pub fn max_frame_size(&self) -> usize {
321
102k
        self.encoder.max_frame_size()
322
102k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::max_frame_size
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size
Line
Count
Source
320
102k
    pub fn max_frame_size(&self) -> usize {
321
102k
        self.encoder.max_frame_size()
322
102k
    }
323
324
    /// Set the peer's max frame size.
325
463
    pub fn set_max_frame_size(&mut self, val: usize) {
326
463
        assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
327
463
        self.encoder.max_frame_size = val as FrameSize;
328
463
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::set_max_frame_size
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_frame_size
Line
Count
Source
325
463
    pub fn set_max_frame_size(&mut self, val: usize) {
326
463
        assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
327
463
        self.encoder.max_frame_size = val as FrameSize;
328
463
    }
329
330
    /// Set the peer's header table size.
331
1.85k
    pub fn set_header_table_size(&mut self, val: usize) {
332
1.85k
        self.encoder.hpack.update_max_size(val);
333
1.85k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::set_header_table_size
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_header_table_size
Line
Count
Source
331
1.85k
    pub fn set_header_table_size(&mut self, val: usize) {
332
1.85k
        self.encoder.hpack.update_max_size(val);
333
1.85k
    }
334
335
    /// Retrieve the last data frame that has been sent
336
328k
    pub fn take_last_data_frame(&mut self) -> Option<frame::Data<B>> {
337
328k
        self.encoder.last_data_frame.take()
338
328k
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::take_last_data_frame
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::take_last_data_frame
Line
Count
Source
336
328k
    pub fn take_last_data_frame(&mut self) -> Option<frame::Data<B>> {
337
328k
        self.encoder.last_data_frame.take()
338
328k
    }
339
340
448
    pub fn get_mut(&mut self) -> &mut T {
341
448
        &mut self.inner
342
448
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::get_mut
<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::get_mut
Line
Count
Source
340
448
    pub fn get_mut(&mut self) -> &mut T {
341
448
        &mut self.inner
342
448
    }
343
}
344
345
impl<T: AsyncRead + Unpin, B> AsyncRead for FramedWrite<T, B> {
346
1.02M
    fn poll_read(
347
1.02M
        mut self: Pin<&mut Self>,
348
1.02M
        cx: &mut Context<'_>,
349
1.02M
        buf: &mut ReadBuf,
350
1.02M
    ) -> Poll<io::Result<()>> {
351
1.02M
        Pin::new(&mut self.inner).poll_read(cx, buf)
352
1.02M
    }
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _> as tokio::io::async_read::AsyncRead>::poll_read
Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes> as tokio::io::async_read::AsyncRead>::poll_read
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>> as tokio::io::async_read::AsyncRead>::poll_read
Line
Count
Source
346
1.02M
    fn poll_read(
347
1.02M
        mut self: Pin<&mut Self>,
348
1.02M
        cx: &mut Context<'_>,
349
1.02M
        buf: &mut ReadBuf,
350
1.02M
    ) -> Poll<io::Result<()>> {
351
1.02M
        Pin::new(&mut self.inner).poll_read(cx, buf)
352
1.02M
    }
353
}
354
355
// We never project the Pin to `B`.
356
impl<T: Unpin, B> Unpin for FramedWrite<T, B> {}
357
358
#[cfg(feature = "unstable")]
359
mod unstable {
360
    use super::*;
361
362
    impl<T, B> FramedWrite<T, B> {
363
0
        pub fn get_ref(&self) -> &T {
364
0
            &self.inner
365
0
        }
366
    }
367
}