/src/h2/src/codec/framed_write.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use crate::codec::UserError; |
2 | | use crate::codec::UserError::*; |
3 | | use crate::frame::{self, Frame, FrameSize}; |
4 | | use crate::hpack; |
5 | | |
6 | | use bytes::{Buf, BufMut, BytesMut}; |
7 | | use std::pin::Pin; |
8 | | use std::task::{Context, Poll}; |
9 | | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
10 | | use tokio_util::io::poll_write_buf; |
11 | | |
12 | | use std::io::{self, Cursor}; |
13 | | |
14 | | // A macro to get around a method needing to borrow &mut self |
15 | | macro_rules! limited_write_buf { |
16 | | ($self:expr) => {{ |
17 | | let limit = $self.max_frame_size() + frame::HEADER_LEN; |
18 | | $self.buf.get_mut().limit(limit) |
19 | | }}; |
20 | | } |
21 | | |
22 | | #[derive(Debug)] |
23 | | pub struct FramedWrite<T, B> { |
24 | | /// Upstream `AsyncWrite` |
25 | | inner: T, |
26 | | final_flush_done: bool, |
27 | | |
28 | | encoder: Encoder<B>, |
29 | | } |
30 | | |
31 | | #[derive(Debug)] |
32 | | struct Encoder<B> { |
33 | | /// HPACK encoder |
34 | | hpack: hpack::Encoder, |
35 | | |
36 | | /// Write buffer |
37 | | /// |
38 | | /// TODO: Should this be a ring buffer? |
39 | | buf: Cursor<BytesMut>, |
40 | | |
41 | | /// Next frame to encode |
42 | | next: Option<Next<B>>, |
43 | | |
44 | | /// Last data frame |
45 | | last_data_frame: Option<frame::Data<B>>, |
46 | | |
47 | | /// Max frame size, this is specified by the peer |
48 | | max_frame_size: FrameSize, |
49 | | |
50 | | /// Chain payloads bigger than this. |
51 | | chain_threshold: usize, |
52 | | |
53 | | /// Min buffer required to attempt to write a frame |
54 | | min_buffer_capacity: usize, |
55 | | } |
56 | | |
57 | | #[derive(Debug)] |
58 | | enum Next<B> { |
59 | | Data(frame::Data<B>), |
60 | | Continuation(frame::Continuation), |
61 | | } |
62 | | |
63 | | /// Initialize the connection with this amount of write buffer. |
64 | | /// |
65 | | /// The minimum MAX_FRAME_SIZE is 16kb, so always be able to send a HEADERS |
66 | | /// frame that big. |
67 | | const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024; |
68 | | |
69 | | /// Chain payloads bigger than this when vectored I/O is enabled. The remote |
70 | | /// will never advertise a max frame size less than this (well, the spec says |
71 | | /// the max frame size can't be less than 16kb, so not even close). |
72 | | const CHAIN_THRESHOLD: usize = 256; |
73 | | |
74 | | /// Chain payloads bigger than this when vectored I/O is **not** enabled. |
75 | | /// A larger value in this scenario will reduce the number of small and |
76 | | /// fragmented data being sent, and hereby improve the throughput. |
77 | | const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024; |
78 | | |
79 | | // TODO: Make generic |
80 | | impl<T, B> FramedWrite<T, B> |
81 | | where |
82 | | T: AsyncWrite + Unpin, |
83 | | B: Buf, |
84 | | { |
85 | 12.5k | pub fn new(inner: T) -> FramedWrite<T, B> { |
86 | 12.5k | let chain_threshold = if inner.is_write_vectored() { |
87 | 0 | CHAIN_THRESHOLD |
88 | | } else { |
89 | 12.5k | CHAIN_THRESHOLD_WITHOUT_VECTORED_IO |
90 | | }; |
91 | 12.5k | FramedWrite { |
92 | 12.5k | inner, |
93 | 12.5k | final_flush_done: false, |
94 | 12.5k | encoder: Encoder { |
95 | 12.5k | hpack: hpack::Encoder::default(), |
96 | 12.5k | buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), |
97 | 12.5k | next: None, |
98 | 12.5k | last_data_frame: None, |
99 | 12.5k | max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, |
100 | 12.5k | chain_threshold, |
101 | 12.5k | min_buffer_capacity: chain_threshold + frame::HEADER_LEN, |
102 | 12.5k | }, |
103 | 12.5k | } |
104 | 12.5k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::new <h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new Line | Count | Source | 85 | 448 | pub fn new(inner: T) -> FramedWrite<T, B> { | 86 | 448 | let chain_threshold = if inner.is_write_vectored() { | 87 | 0 | CHAIN_THRESHOLD | 88 | | } else { | 89 | 448 | CHAIN_THRESHOLD_WITHOUT_VECTORED_IO | 90 | | }; | 91 | 448 | FramedWrite { | 92 | 448 | inner, | 93 | 448 | final_flush_done: false, | 94 | 448 | encoder: Encoder { | 95 | 448 | hpack: hpack::Encoder::default(), | 96 | 448 | buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), | 97 | 448 | next: None, | 98 | 448 | last_data_frame: None, | 99 | 448 | max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, | 100 | 448 | chain_threshold, | 101 | 448 | min_buffer_capacity: chain_threshold + frame::HEADER_LEN, | 102 | 448 | }, | 103 | 448 | } | 104 | 448 | } |
<h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::new Line | Count | Source | 85 | 448 | pub fn new(inner: T) -> FramedWrite<T, B> { | 86 | 448 | let chain_threshold = if inner.is_write_vectored() { | 87 | 0 | CHAIN_THRESHOLD | 88 | | } else { | 89 | 448 | CHAIN_THRESHOLD_WITHOUT_VECTORED_IO | 90 | | }; | 91 | 448 | FramedWrite { | 92 | 448 | inner, | 93 | 448 | final_flush_done: false, | 94 | 448 | encoder: Encoder { | 95 | 448 | hpack: hpack::Encoder::default(), | 96 | 448 | buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), | 97 | 448 | next: None, | 98 | 448 | last_data_frame: None, | 99 | 448 | max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, | 100 | 448 | chain_threshold, | 101 | 448 | min_buffer_capacity: chain_threshold + frame::HEADER_LEN, | 102 | 448 | }, | 103 | 448 | } | 104 | 448 | } |
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::new Line | Count | Source | 85 | 11.6k | pub fn new(inner: T) -> FramedWrite<T, B> { | 86 | 11.6k | let chain_threshold = if inner.is_write_vectored() { | 87 | 0 | CHAIN_THRESHOLD | 88 | | } else { | 89 | 11.6k | CHAIN_THRESHOLD_WITHOUT_VECTORED_IO | 90 | | }; | 91 | 11.6k | FramedWrite { | 92 | 11.6k | inner, | 93 | 11.6k | final_flush_done: false, | 94 | 11.6k | encoder: Encoder { | 95 | 11.6k | hpack: hpack::Encoder::default(), | 96 | 11.6k | buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), | 97 | 11.6k | next: None, | 98 | 11.6k | last_data_frame: None, | 99 | 11.6k | max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, | 100 | 11.6k | chain_threshold, | 101 | 11.6k | min_buffer_capacity: chain_threshold + frame::HEADER_LEN, | 102 | 11.6k | }, | 103 | 11.6k | } | 104 | 11.6k | } |
|
105 | | |
106 | | /// Returns `Ready` when `send` is able to accept a frame |
107 | | /// |
108 | | /// Calling this function may result in the current contents of the buffer |
109 | | /// to be flushed to `T`. |
110 | 1.33M | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { |
111 | 1.33M | if !self.encoder.has_capacity() { |
112 | | // Try flushing |
113 | 919k | ready!(self.flush(cx))?; |
114 | | |
115 | 2.66k | if !self.encoder.has_capacity() { |
116 | 0 | return Poll::Pending; |
117 | 2.66k | } |
118 | 419k | } |
119 | | |
120 | 422k | Poll::Ready(Ok(())) |
121 | 1.33M | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::poll_ready <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::poll_ready Line | Count | Source | 110 | 1.33M | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { | 111 | 1.33M | if !self.encoder.has_capacity() { | 112 | | // Try flushing | 113 | 919k | ready!(self.flush(cx))?; | 114 | | | 115 | 2.66k | if !self.encoder.has_capacity() { | 116 | 0 | return Poll::Pending; | 117 | 2.66k | } | 118 | 419k | } | 119 | | | 120 | 422k | Poll::Ready(Ok(())) | 121 | 1.33M | } |
|
122 | | |
123 | | /// Buffer a frame. |
124 | | /// |
125 | | /// `poll_ready` must be called first to ensure that a frame may be |
126 | | /// accepted. |
127 | 231k | pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { |
128 | 231k | self.encoder.buffer(item) |
129 | 231k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::buffer <h2::codec::framed_write::FramedWrite<h2_support::mock::Mock, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer Line | Count | Source | 127 | 448 | pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { | 128 | 448 | self.encoder.buffer(item) | 129 | 448 | } |
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer Line | Count | Source | 127 | 231k | pub fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { | 128 | 231k | self.encoder.buffer(item) | 129 | 231k | } |
|
130 | | |
131 | | /// Flush buffered data to the wire |
132 | 1.03M | pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { |
133 | 1.03M | let span = tracing::trace_span!("FramedWrite::flush"); |
134 | 1.03M | let _e = span.enter(); |
135 | | |
136 | | loop { |
137 | 1.04M | while !self.encoder.is_empty() { |
138 | 854k | match self.encoder.next { |
139 | 854k | Some(Next::Data(ref mut frame)) => { |
140 | 854k | tracing::trace!(queued_data_frame = true); |
141 | 854k | let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); |
142 | 854k | ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? |
143 | | } |
144 | | _ => { |
145 | 162k | tracing::trace!(queued_data_frame = false); |
146 | 162k | ready!(poll_write_buf( |
147 | 162k | Pin::new(&mut self.inner), |
148 | 162k | cx, |
149 | 162k | &mut self.encoder.buf |
150 | 162k | ))? |
151 | | } |
152 | | }; |
153 | | } |
154 | | |
155 | 25.3k | match self.encoder.unset_frame() { |
156 | 0 | ControlFlow::Continue => (), |
157 | 25.3k | ControlFlow::Break => break, |
158 | 25.3k | } |
159 | 25.3k | } |
160 | 25.3k | |
161 | 25.3k | tracing::trace!("flushing buffer"); |
162 | | // Flush the upstream |
163 | 25.3k | ready!(Pin::new(&mut self.inner).poll_flush(cx))?; |
164 | | |
165 | 25.3k | Poll::Ready(Ok(())) |
166 | 1.03M | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::flush <h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::flush Line | Count | Source | 132 | 448 | pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { | 133 | 448 | let span = tracing::trace_span!("FramedWrite::flush"); | 134 | 448 | let _e = span.enter(); | 135 | | | 136 | | loop { | 137 | 448 | while !self.encoder.is_empty() { | 138 | 0 | match self.encoder.next { | 139 | 0 | Some(Next::Data(ref mut frame)) => { | 140 | 0 | tracing::trace!(queued_data_frame = true); | 141 | 0 | let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); | 142 | 0 | ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? | 143 | | } | 144 | | _ => { | 145 | 0 | tracing::trace!(queued_data_frame = false); | 146 | 0 | ready!(poll_write_buf( | 147 | 0 | Pin::new(&mut self.inner), | 148 | 0 | cx, | 149 | 0 | &mut self.encoder.buf | 150 | 0 | ))? | 151 | | } | 152 | | }; | 153 | | } | 154 | | | 155 | 448 | match self.encoder.unset_frame() { | 156 | 0 | ControlFlow::Continue => (), | 157 | 448 | ControlFlow::Break => break, | 158 | 448 | } | 159 | 448 | } | 160 | 448 | | 161 | 448 | tracing::trace!("flushing buffer"); | 162 | | // Flush the upstream | 163 | 448 | ready!(Pin::new(&mut self.inner).poll_flush(cx))?; | 164 | | | 165 | 448 | Poll::Ready(Ok(())) | 166 | 448 | } |
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::flush Line | Count | Source | 132 | 1.03M | pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { | 133 | 1.03M | let span = tracing::trace_span!("FramedWrite::flush"); | 134 | 1.03M | let _e = span.enter(); | 135 | | | 136 | | loop { | 137 | 1.04M | while !self.encoder.is_empty() { | 138 | 854k | match self.encoder.next { | 139 | 854k | Some(Next::Data(ref mut frame)) => { | 140 | 854k | tracing::trace!(queued_data_frame = true); | 141 | 854k | let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); | 142 | 854k | ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? | 143 | | } | 144 | | _ => { | 145 | 162k | tracing::trace!(queued_data_frame = false); | 146 | 162k | ready!(poll_write_buf( | 147 | 162k | Pin::new(&mut self.inner), | 148 | 162k | cx, | 149 | 162k | &mut self.encoder.buf | 150 | 162k | ))? | 151 | | } | 152 | | }; | 153 | | } | 154 | | | 155 | 24.9k | match self.encoder.unset_frame() { | 156 | 0 | ControlFlow::Continue => (), | 157 | 24.9k | ControlFlow::Break => break, | 158 | 24.9k | } | 159 | 24.9k | } | 160 | 24.9k | | 161 | 24.9k | tracing::trace!("flushing buffer"); | 162 | | // Flush the upstream | 163 | 24.9k | ready!(Pin::new(&mut self.inner).poll_flush(cx))?; | 164 | | | 165 | 24.9k | Poll::Ready(Ok(())) | 166 | 1.03M | } |
|
167 | | |
168 | | /// Close the codec |
169 | 10.4k | pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { |
170 | 10.4k | if !self.final_flush_done { |
171 | 10.4k | ready!(self.flush(cx))?; |
172 | 1.34k | self.final_flush_done = true; |
173 | 0 | } |
174 | 1.34k | Pin::new(&mut self.inner).poll_shutdown(cx) |
175 | 10.4k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::shutdown <h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::shutdown Line | Count | Source | 169 | 448 | pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { | 170 | 448 | if !self.final_flush_done { | 171 | 448 | ready!(self.flush(cx))?; | 172 | 448 | self.final_flush_done = true; | 173 | 0 | } | 174 | 448 | Pin::new(&mut self.inner).poll_shutdown(cx) | 175 | 448 | } |
<h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::shutdown Line | Count | Source | 169 | 9.96k | pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { | 170 | 9.96k | if !self.final_flush_done { | 171 | 9.96k | ready!(self.flush(cx))?; | 172 | 895 | self.final_flush_done = true; | 173 | 0 | } | 174 | 895 | Pin::new(&mut self.inner).poll_shutdown(cx) | 175 | 9.96k | } |
|
176 | | } |
177 | | |
178 | | #[must_use] |
179 | | enum ControlFlow { |
180 | | Continue, |
181 | | Break, |
182 | | } |
183 | | |
184 | | impl<B> Encoder<B> |
185 | | where |
186 | | B: Buf, |
187 | | { |
188 | 25.3k | fn unset_frame(&mut self) -> ControlFlow { |
189 | 25.3k | // Clear internal buffer |
190 | 25.3k | self.buf.set_position(0); |
191 | 25.3k | self.buf.get_mut().clear(); |
192 | 25.3k | |
193 | 25.3k | // The data frame has been written, so unset it |
194 | 25.3k | match self.next.take() { |
195 | 2.56k | Some(Next::Data(frame)) => { |
196 | 2.56k | self.last_data_frame = Some(frame); |
197 | 2.56k | debug_assert!(self.is_empty()); |
198 | 2.56k | ControlFlow::Break |
199 | | } |
200 | 0 | Some(Next::Continuation(frame)) => { |
201 | 0 | // Buffer the continuation frame, then try to write again |
202 | 0 | let mut buf = limited_write_buf!(self); |
203 | 0 | if let Some(continuation) = frame.encode(&mut buf) { |
204 | 0 | self.next = Some(Next::Continuation(continuation)); |
205 | 0 | } |
206 | 0 | ControlFlow::Continue |
207 | | } |
208 | 22.8k | None => ControlFlow::Break, |
209 | | } |
210 | 25.3k | } Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::unset_frame <h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::unset_frame Line | Count | Source | 188 | 448 | fn unset_frame(&mut self) -> ControlFlow { | 189 | 448 | // Clear internal buffer | 190 | 448 | self.buf.set_position(0); | 191 | 448 | self.buf.get_mut().clear(); | 192 | 448 | | 193 | 448 | // The data frame has been written, so unset it | 194 | 448 | match self.next.take() { | 195 | 0 | Some(Next::Data(frame)) => { | 196 | 0 | self.last_data_frame = Some(frame); | 197 | 0 | debug_assert!(self.is_empty()); | 198 | 0 | ControlFlow::Break | 199 | | } | 200 | 0 | Some(Next::Continuation(frame)) => { | 201 | 0 | // Buffer the continuation frame, then try to write again | 202 | 0 | let mut buf = limited_write_buf!(self); | 203 | 0 | if let Some(continuation) = frame.encode(&mut buf) { | 204 | 0 | self.next = Some(Next::Continuation(continuation)); | 205 | 0 | } | 206 | 0 | ControlFlow::Continue | 207 | | } | 208 | 448 | None => ControlFlow::Break, | 209 | | } | 210 | 448 | } |
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::unset_frame Line | Count | Source | 188 | 24.9k | fn unset_frame(&mut self) -> ControlFlow { | 189 | 24.9k | // Clear internal buffer | 190 | 24.9k | self.buf.set_position(0); | 191 | 24.9k | self.buf.get_mut().clear(); | 192 | 24.9k | | 193 | 24.9k | // The data frame has been written, so unset it | 194 | 24.9k | match self.next.take() { | 195 | 2.56k | Some(Next::Data(frame)) => { | 196 | 2.56k | self.last_data_frame = Some(frame); | 197 | 2.56k | debug_assert!(self.is_empty()); | 198 | 2.56k | ControlFlow::Break | 199 | | } | 200 | 0 | Some(Next::Continuation(frame)) => { | 201 | 0 | // Buffer the continuation frame, then try to write again | 202 | 0 | let mut buf = limited_write_buf!(self); | 203 | 0 | if let Some(continuation) = frame.encode(&mut buf) { | 204 | 0 | self.next = Some(Next::Continuation(continuation)); | 205 | 0 | } | 206 | 0 | ControlFlow::Continue | 207 | | } | 208 | 22.3k | None => ControlFlow::Break, | 209 | | } | 210 | 24.9k | } |
|
211 | | |
212 | 231k | fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { |
213 | 231k | // Ensure that we have enough capacity to accept the write. |
214 | 231k | assert!(self.has_capacity()); |
215 | 231k | let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); |
216 | 231k | let _e = span.enter(); |
217 | 231k | |
218 | 231k | tracing::debug!(frame = ?item, "send"); |
219 | | |
220 | 231k | match item { |
221 | 18.2k | Frame::Data(mut v) => { |
222 | 18.2k | // Ensure that the payload is not greater than the max frame. |
223 | 18.2k | let len = v.payload().remaining(); |
224 | 18.2k | |
225 | 18.2k | if len > self.max_frame_size() { |
226 | 0 | return Err(PayloadTooBig); |
227 | 18.2k | } |
228 | 18.2k | |
229 | 18.2k | if len >= self.chain_threshold { |
230 | 3.76k | let head = v.head(); |
231 | 3.76k | |
232 | 3.76k | // Encode the frame head to the buffer |
233 | 3.76k | head.encode(len, self.buf.get_mut()); |
234 | 3.76k | |
235 | 3.76k | if self.buf.get_ref().remaining() < self.chain_threshold { |
236 | 2.43k | let extra_bytes = self.chain_threshold - self.buf.remaining(); |
237 | 2.43k | self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); |
238 | 2.43k | } |
239 | | |
240 | | // Save the data frame |
241 | 3.76k | self.next = Some(Next::Data(v)); |
242 | | } else { |
243 | 14.4k | v.encode_chunk(self.buf.get_mut()); |
244 | 14.4k | |
245 | 14.4k | // The chunk has been fully encoded, so there is no need to |
246 | 14.4k | // keep it around |
247 | 14.4k | assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); |
248 | | |
249 | | // Save off the last frame... |
250 | 14.4k | self.last_data_frame = Some(v); |
251 | | } |
252 | | } |
253 | 185k | Frame::Headers(v) => { |
254 | 185k | let mut buf = limited_write_buf!(self); |
255 | 185k | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { |
256 | 0 | self.next = Some(Next::Continuation(continuation)); |
257 | 185k | } |
258 | | } |
259 | 0 | Frame::PushPromise(v) => { |
260 | 0 | let mut buf = limited_write_buf!(self); |
261 | 0 | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { |
262 | 0 | self.next = Some(Next::Continuation(continuation)); |
263 | 0 | } |
264 | | } |
265 | 19.1k | Frame::Settings(v) => { |
266 | 19.1k | v.encode(self.buf.get_mut()); |
267 | 19.1k | tracing::trace!(rem = self.buf.remaining(), "encoded settings"); |
268 | | } |
269 | 5.69k | Frame::GoAway(v) => { |
270 | 5.69k | v.encode(self.buf.get_mut()); |
271 | 5.69k | tracing::trace!(rem = self.buf.remaining(), "encoded go_away"); |
272 | | } |
273 | 207 | Frame::Ping(v) => { |
274 | 207 | v.encode(self.buf.get_mut()); |
275 | 207 | tracing::trace!(rem = self.buf.remaining(), "encoded ping"); |
276 | | } |
277 | 108 | Frame::WindowUpdate(v) => { |
278 | 108 | v.encode(self.buf.get_mut()); |
279 | 108 | tracing::trace!(rem = self.buf.remaining(), "encoded window_update"); |
280 | | } |
281 | | |
282 | | Frame::Priority(_) => { |
283 | | /* |
284 | | v.encode(self.buf.get_mut()); |
285 | | tracing::trace!("encoded priority; rem={:?}", self.buf.remaining()); |
286 | | */ |
287 | 0 | unimplemented!(); |
288 | | } |
289 | 2.35k | Frame::Reset(v) => { |
290 | 2.35k | v.encode(self.buf.get_mut()); |
291 | 2.35k | tracing::trace!(rem = self.buf.remaining(), "encoded reset"); |
292 | | } |
293 | | } |
294 | | |
295 | 231k | Ok(()) |
296 | 231k | } Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::buffer <h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer Line | Count | Source | 212 | 448 | fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { | 213 | 448 | // Ensure that we have enough capacity to accept the write. | 214 | 448 | assert!(self.has_capacity()); | 215 | 448 | let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); | 216 | 448 | let _e = span.enter(); | 217 | 448 | | 218 | 448 | tracing::debug!(frame = ?item, "send"); | 219 | | | 220 | 448 | match item { | 221 | 0 | Frame::Data(mut v) => { | 222 | 0 | // Ensure that the payload is not greater than the max frame. | 223 | 0 | let len = v.payload().remaining(); | 224 | 0 |
| 225 | 0 | if len > self.max_frame_size() { | 226 | 0 | return Err(PayloadTooBig); | 227 | 0 | } | 228 | 0 |
| 229 | 0 | if len >= self.chain_threshold { | 230 | 0 | let head = v.head(); | 231 | 0 |
| 232 | 0 | // Encode the frame head to the buffer | 233 | 0 | head.encode(len, self.buf.get_mut()); | 234 | 0 |
| 235 | 0 | if self.buf.get_ref().remaining() < self.chain_threshold { | 236 | 0 | let extra_bytes = self.chain_threshold - self.buf.remaining(); | 237 | 0 | self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); | 238 | 0 | } | 239 | | | 240 | | // Save the data frame | 241 | 0 | self.next = Some(Next::Data(v)); | 242 | | } else { | 243 | 0 | v.encode_chunk(self.buf.get_mut()); | 244 | 0 |
| 245 | 0 | // The chunk has been fully encoded, so there is no need to | 246 | 0 | // keep it around | 247 | 0 | assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); | 248 | | | 249 | | // Save off the last frame... | 250 | 0 | self.last_data_frame = Some(v); | 251 | | } | 252 | | } | 253 | 0 | Frame::Headers(v) => { | 254 | 0 | let mut buf = limited_write_buf!(self); | 255 | 0 | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { | 256 | 0 | self.next = Some(Next::Continuation(continuation)); | 257 | 0 | } | 258 | | } | 259 | 0 | Frame::PushPromise(v) => { | 260 | 0 | let mut buf = limited_write_buf!(self); | 261 | 0 | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { | 262 | 0 | self.next = Some(Next::Continuation(continuation)); | 263 | 0 | } | 264 | | } | 265 | 448 | Frame::Settings(v) => { | 266 | 448 | v.encode(self.buf.get_mut()); | 267 | 448 | tracing::trace!(rem = self.buf.remaining(), "encoded settings"); | 268 | | } | 269 | 0 | Frame::GoAway(v) => { | 270 | 0 | v.encode(self.buf.get_mut()); | 271 | 0 | tracing::trace!(rem = self.buf.remaining(), "encoded go_away"); | 272 | | } | 273 | 0 | Frame::Ping(v) => { | 274 | 0 | v.encode(self.buf.get_mut()); | 275 | 0 | tracing::trace!(rem = self.buf.remaining(), "encoded ping"); | 276 | | } | 277 | 0 | Frame::WindowUpdate(v) => { | 278 | 0 | v.encode(self.buf.get_mut()); | 279 | 0 | tracing::trace!(rem = self.buf.remaining(), "encoded window_update"); | 280 | | } | 281 | | | 282 | | Frame::Priority(_) => { | 283 | | /* | 284 | | v.encode(self.buf.get_mut()); | 285 | | tracing::trace!("encoded priority; rem={:?}", self.buf.remaining()); | 286 | | */ | 287 | 0 | unimplemented!(); | 288 | | } | 289 | 0 | Frame::Reset(v) => { | 290 | 0 | v.encode(self.buf.get_mut()); | 291 | 0 | tracing::trace!(rem = self.buf.remaining(), "encoded reset"); | 292 | | } | 293 | | } | 294 | | | 295 | 448 | Ok(()) | 296 | 448 | } |
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::buffer Line | Count | Source | 212 | 231k | fn buffer(&mut self, item: Frame<B>) -> Result<(), UserError> { | 213 | 231k | // Ensure that we have enough capacity to accept the write. | 214 | 231k | assert!(self.has_capacity()); | 215 | 231k | let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); | 216 | 231k | let _e = span.enter(); | 217 | 231k | | 218 | 231k | tracing::debug!(frame = ?item, "send"); | 219 | | | 220 | 231k | match item { | 221 | 18.2k | Frame::Data(mut v) => { | 222 | 18.2k | // Ensure that the payload is not greater than the max frame. | 223 | 18.2k | let len = v.payload().remaining(); | 224 | 18.2k | | 225 | 18.2k | if len > self.max_frame_size() { | 226 | 0 | return Err(PayloadTooBig); | 227 | 18.2k | } | 228 | 18.2k | | 229 | 18.2k | if len >= self.chain_threshold { | 230 | 3.76k | let head = v.head(); | 231 | 3.76k | | 232 | 3.76k | // Encode the frame head to the buffer | 233 | 3.76k | head.encode(len, self.buf.get_mut()); | 234 | 3.76k | | 235 | 3.76k | if self.buf.get_ref().remaining() < self.chain_threshold { | 236 | 2.43k | let extra_bytes = self.chain_threshold - self.buf.remaining(); | 237 | 2.43k | self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); | 238 | 2.43k | } | 239 | | | 240 | | // Save the data frame | 241 | 3.76k | self.next = Some(Next::Data(v)); | 242 | | } else { | 243 | 14.4k | v.encode_chunk(self.buf.get_mut()); | 244 | 14.4k | | 245 | 14.4k | // The chunk has been fully encoded, so there is no need to | 246 | 14.4k | // keep it around | 247 | 14.4k | assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); | 248 | | | 249 | | // Save off the last frame... | 250 | 14.4k | self.last_data_frame = Some(v); | 251 | | } | 252 | | } | 253 | 185k | Frame::Headers(v) => { | 254 | 185k | let mut buf = limited_write_buf!(self); | 255 | 185k | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { | 256 | 0 | self.next = Some(Next::Continuation(continuation)); | 257 | 185k | } | 258 | | } | 259 | 0 | Frame::PushPromise(v) => { | 260 | 0 | let mut buf = limited_write_buf!(self); | 261 | 0 | if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { | 262 | 0 | self.next = Some(Next::Continuation(continuation)); | 263 | 0 | } | 264 | | } | 265 | 18.7k | Frame::Settings(v) => { | 266 | 18.7k | v.encode(self.buf.get_mut()); | 267 | 18.7k | tracing::trace!(rem = self.buf.remaining(), "encoded settings"); | 268 | | } | 269 | 5.69k | Frame::GoAway(v) => { | 270 | 5.69k | v.encode(self.buf.get_mut()); | 271 | 5.69k | tracing::trace!(rem = self.buf.remaining(), "encoded go_away"); | 272 | | } | 273 | 207 | Frame::Ping(v) => { | 274 | 207 | v.encode(self.buf.get_mut()); | 275 | 207 | tracing::trace!(rem = self.buf.remaining(), "encoded ping"); | 276 | | } | 277 | 108 | Frame::WindowUpdate(v) => { | 278 | 108 | v.encode(self.buf.get_mut()); | 279 | 108 | tracing::trace!(rem = self.buf.remaining(), "encoded window_update"); | 280 | | } | 281 | | | 282 | | Frame::Priority(_) => { | 283 | | /* | 284 | | v.encode(self.buf.get_mut()); | 285 | | tracing::trace!("encoded priority; rem={:?}", self.buf.remaining()); | 286 | | */ | 287 | 0 | unimplemented!(); | 288 | | } | 289 | 2.35k | Frame::Reset(v) => { | 290 | 2.35k | v.encode(self.buf.get_mut()); | 291 | 2.35k | tracing::trace!(rem = self.buf.remaining(), "encoded reset"); | 292 | | } | 293 | | } | 294 | | | 295 | 231k | Ok(()) | 296 | 231k | } |
|
297 | | |
298 | 1.57M | fn has_capacity(&self) -> bool { |
299 | 1.57M | self.next.is_none() |
300 | 724k | && (self.buf.get_ref().capacity() - self.buf.get_ref().len() |
301 | 724k | >= self.min_buffer_capacity) |
302 | 1.57M | } Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::has_capacity <h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::has_capacity Line | Count | Source | 298 | 448 | fn has_capacity(&self) -> bool { | 299 | 448 | self.next.is_none() | 300 | 448 | && (self.buf.get_ref().capacity() - self.buf.get_ref().len() | 301 | 448 | >= self.min_buffer_capacity) | 302 | 448 | } |
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::has_capacity Line | Count | Source | 298 | 1.57M | fn has_capacity(&self) -> bool { | 299 | 1.57M | self.next.is_none() | 300 | 724k | && (self.buf.get_ref().capacity() - self.buf.get_ref().len() | 301 | 724k | >= self.min_buffer_capacity) | 302 | 1.57M | } |
|
303 | | |
304 | 1.04M | fn is_empty(&self) -> bool { |
305 | 856k | match self.next { |
306 | 856k | Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), |
307 | 185k | _ => !self.buf.has_remaining(), |
308 | | } |
309 | 1.04M | } Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::is_empty <h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::is_empty Line | Count | Source | 304 | 448 | fn is_empty(&self) -> bool { | 305 | 0 | match self.next { | 306 | 0 | Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), | 307 | 448 | _ => !self.buf.has_remaining(), | 308 | | } | 309 | 448 | } |
<h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::is_empty Line | Count | Source | 304 | 1.04M | fn is_empty(&self) -> bool { | 305 | 856k | match self.next { | 306 | 856k | Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), | 307 | 184k | _ => !self.buf.has_remaining(), | 308 | | } | 309 | 1.04M | } |
|
310 | | } |
311 | | |
312 | | impl<B> Encoder<B> { |
313 | 306k | fn max_frame_size(&self) -> usize { |
314 | 306k | self.max_frame_size as usize |
315 | 306k | } Unexecuted instantiation: <h2::codec::framed_write::Encoder<_>>::max_frame_size Unexecuted instantiation: <h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size Unexecuted instantiation: <h2::codec::framed_write::Encoder<bytes::bytes::Bytes>>::max_frame_size <h2::codec::framed_write::Encoder<h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size Line | Count | Source | 313 | 306k | fn max_frame_size(&self) -> usize { | 314 | 306k | self.max_frame_size as usize | 315 | 306k | } |
|
316 | | } |
317 | | |
318 | | impl<T, B> FramedWrite<T, B> { |
319 | | /// Returns the max frame size that can be sent |
320 | 102k | pub fn max_frame_size(&self) -> usize { |
321 | 102k | self.encoder.max_frame_size() |
322 | 102k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::max_frame_size <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::max_frame_size Line | Count | Source | 320 | 102k | pub fn max_frame_size(&self) -> usize { | 321 | 102k | self.encoder.max_frame_size() | 322 | 102k | } |
|
323 | | |
324 | | /// Set the peer's max frame size. |
325 | 463 | pub fn set_max_frame_size(&mut self, val: usize) { |
326 | 463 | assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize); |
327 | 463 | self.encoder.max_frame_size = val as FrameSize; |
328 | 463 | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::set_max_frame_size <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_max_frame_size Line | Count | Source | 325 | 463 | pub fn set_max_frame_size(&mut self, val: usize) { | 326 | 463 | assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize); | 327 | 463 | self.encoder.max_frame_size = val as FrameSize; | 328 | 463 | } |
|
329 | | |
330 | | /// Set the peer's header table size. |
331 | 1.85k | pub fn set_header_table_size(&mut self, val: usize) { |
332 | 1.85k | self.encoder.hpack.update_max_size(val); |
333 | 1.85k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::set_header_table_size <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::set_header_table_size Line | Count | Source | 331 | 1.85k | pub fn set_header_table_size(&mut self, val: usize) { | 332 | 1.85k | self.encoder.hpack.update_max_size(val); | 333 | 1.85k | } |
|
334 | | |
335 | | /// Retrieve the last data frame that has been sent |
336 | 328k | pub fn take_last_data_frame(&mut self) -> Option<frame::Data<B>> { |
337 | 328k | self.encoder.last_data_frame.take() |
338 | 328k | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::take_last_data_frame <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>>::take_last_data_frame Line | Count | Source | 336 | 328k | pub fn take_last_data_frame(&mut self) -> Option<frame::Data<B>> { | 337 | 328k | self.encoder.last_data_frame.take() | 338 | 328k | } |
|
339 | | |
340 | 448 | pub fn get_mut(&mut self) -> &mut T { |
341 | 448 | &mut self.inner |
342 | 448 | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _>>::get_mut <h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes>>::get_mut Line | Count | Source | 340 | 448 | pub fn get_mut(&mut self) -> &mut T { | 341 | 448 | &mut self.inner | 342 | 448 | } |
|
343 | | } |
344 | | |
345 | | impl<T: AsyncRead + Unpin, B> AsyncRead for FramedWrite<T, B> { |
346 | 1.02M | fn poll_read( |
347 | 1.02M | mut self: Pin<&mut Self>, |
348 | 1.02M | cx: &mut Context<'_>, |
349 | 1.02M | buf: &mut ReadBuf, |
350 | 1.02M | ) -> Poll<io::Result<()>> { |
351 | 1.02M | Pin::new(&mut self.inner).poll_read(cx, buf) |
352 | 1.02M | } Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<_, _> as tokio::io::async_read::AsyncRead>::poll_read Unexecuted instantiation: <h2::codec::framed_write::FramedWrite<h2_support::mock::Pipe, bytes::bytes::Bytes> as tokio::io::async_read::AsyncRead>::poll_read <h2::codec::framed_write::FramedWrite<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>> as tokio::io::async_read::AsyncRead>::poll_read Line | Count | Source | 346 | 1.02M | fn poll_read( | 347 | 1.02M | mut self: Pin<&mut Self>, | 348 | 1.02M | cx: &mut Context<'_>, | 349 | 1.02M | buf: &mut ReadBuf, | 350 | 1.02M | ) -> Poll<io::Result<()>> { | 351 | 1.02M | Pin::new(&mut self.inner).poll_read(cx, buf) | 352 | 1.02M | } |
|
353 | | } |
354 | | |
355 | | // We never project the Pin to `B`. |
356 | | impl<T: Unpin, B> Unpin for FramedWrite<T, B> {} |
357 | | |
358 | | #[cfg(feature = "unstable")] |
359 | | mod unstable { |
360 | | use super::*; |
361 | | |
362 | | impl<T, B> FramedWrite<T, B> { |
363 | 0 | pub fn get_ref(&self) -> &T { |
364 | 0 | &self.inner |
365 | 0 | } |
366 | | } |
367 | | } |