Coverage Report

Created: 2025-08-29 06:13

/src/h2/src/proto/connection.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::codec::UserError;
2
use crate::frame::{Reason, StreamId};
3
use crate::{client, server};
4
5
use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6
use crate::proto::*;
7
8
use bytes::Bytes;
9
use futures_core::Stream;
10
use std::io;
11
use std::marker::PhantomData;
12
use std::pin::Pin;
13
use std::task::{Context, Poll};
14
use std::time::Duration;
15
use tokio::io::AsyncRead;
16
17
/// An H2 connection
18
#[derive(Debug)]
19
pub(crate) struct Connection<T, P, B: Buf = Bytes>
20
where
21
    P: Peer,
22
{
23
    /// Read / write frame values
24
    codec: Codec<T, Prioritized<B>>,
25
26
    inner: ConnectionInner<P, B>,
27
}
28
29
// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30
// method instantiations.
31
#[derive(Debug)]
32
struct ConnectionInner<P, B: Buf = Bytes>
33
where
34
    P: Peer,
35
{
36
    /// Tracks the connection level state transitions.
37
    state: State,
38
39
    /// An error to report back once complete.
40
    ///
41
    /// This exists separately from State in order to support
42
    /// graceful shutdown.
43
    error: Option<frame::GoAway>,
44
45
    /// Pending GOAWAY frames to write.
46
    go_away: GoAway,
47
48
    /// Ping/pong handler
49
    ping_pong: PingPong,
50
51
    /// Connection settings
52
    settings: Settings,
53
54
    /// Stream state handler
55
    streams: Streams<B, P>,
56
57
    /// A `tracing` span tracking the lifetime of the connection.
58
    span: tracing::Span,
59
60
    /// Client or server
61
    _phantom: PhantomData<P>,
62
}
63
64
struct DynConnection<'a, B: Buf = Bytes> {
65
    state: &'a mut State,
66
67
    go_away: &'a mut GoAway,
68
69
    streams: DynStreams<'a, B>,
70
71
    error: &'a mut Option<frame::GoAway>,
72
73
    ping_pong: &'a mut PingPong,
74
}
75
76
#[derive(Debug, Clone)]
77
pub(crate) struct Config {
78
    pub next_stream_id: StreamId,
79
    pub initial_max_send_streams: usize,
80
    pub max_send_buffer_size: usize,
81
    pub reset_stream_duration: Duration,
82
    pub reset_stream_max: usize,
83
    pub remote_reset_stream_max: usize,
84
    pub local_error_reset_streams_max: Option<usize>,
85
    pub settings: frame::Settings,
86
}
87
88
#[derive(Debug)]
89
enum State {
90
    /// Currently open in a sane state
91
    Open,
92
93
    /// The codec must be flushed
94
    Closing(Reason, Initiator),
95
96
    /// In a closed state
97
    Closed(Reason, Initiator),
98
}
99
100
impl<T, P, B> Connection<T, P, B>
101
where
102
    T: AsyncRead + AsyncWrite + Unpin,
103
    P: Peer,
104
    B: Buf,
105
{
106
12.6k
    pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107
12.6k
        fn streams_config(config: &Config) -> streams::Config {
108
12.6k
            streams::Config {
109
12.6k
                initial_max_send_streams: config.initial_max_send_streams,
110
12.6k
                local_max_buffer_size: config.max_send_buffer_size,
111
12.6k
                local_next_stream_id: config.next_stream_id,
112
12.6k
                local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
113
12.6k
                extended_connect_protocol_enabled: config
114
12.6k
                    .settings
115
12.6k
                    .is_extended_connect_protocol_enabled()
116
12.6k
                    .unwrap_or(false),
117
12.6k
                local_reset_duration: config.reset_stream_duration,
118
12.6k
                local_reset_max: config.reset_stream_max,
119
12.6k
                remote_reset_max: config.remote_reset_stream_max,
120
12.6k
                remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
121
12.6k
                remote_max_initiated: config
122
12.6k
                    .settings
123
12.6k
                    .max_concurrent_streams()
124
12.6k
                    .map(|max| max as usize),
125
12.6k
                local_max_error_reset_streams: config.local_error_reset_streams_max,
126
12.6k
            }
127
12.6k
        }
128
12.6k
        let streams = Streams::new(streams_config(&config));
129
12.6k
        Connection {
130
12.6k
            codec,
131
12.6k
            inner: ConnectionInner {
132
12.6k
                state: State::Open,
133
12.6k
                error: None,
134
12.6k
                go_away: GoAway::new(),
135
12.6k
                ping_pong: PingPong::new(),
136
12.6k
                settings: Settings::new(config.settings),
137
12.6k
                streams,
138
12.6k
                span: tracing::debug_span!("Connection", peer = %P::NAME),
139
12.6k
                _phantom: PhantomData,
140
12.6k
            },
141
12.6k
        }
142
12.6k
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::new
<h2::proto::connection::Connection<h2_support::mock::Mock, h2::client::Peer>>::new
Line
Count
Source
106
411
    pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107
        fn streams_config(config: &Config) -> streams::Config {
108
            streams::Config {
109
                initial_max_send_streams: config.initial_max_send_streams,
110
                local_max_buffer_size: config.max_send_buffer_size,
111
                local_next_stream_id: config.next_stream_id,
112
                local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
113
                extended_connect_protocol_enabled: config
114
                    .settings
115
                    .is_extended_connect_protocol_enabled()
116
                    .unwrap_or(false),
117
                local_reset_duration: config.reset_stream_duration,
118
                local_reset_max: config.reset_stream_max,
119
                remote_reset_max: config.remote_reset_stream_max,
120
                remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
121
                remote_max_initiated: config
122
                    .settings
123
                    .max_concurrent_streams()
124
                    .map(|max| max as usize),
125
                local_max_error_reset_streams: config.local_error_reset_streams_max,
126
            }
127
        }
128
411
        let streams = Streams::new(streams_config(&config));
129
411
        Connection {
130
411
            codec,
131
411
            inner: ConnectionInner {
132
411
                state: State::Open,
133
411
                error: None,
134
411
                go_away: GoAway::new(),
135
411
                ping_pong: PingPong::new(),
136
411
                settings: Settings::new(config.settings),
137
411
                streams,
138
411
                span: tracing::debug_span!("Connection", peer = %P::NAME),
139
411
                _phantom: PhantomData,
140
411
            },
141
411
        }
142
411
    }
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::new
Line
Count
Source
106
12.2k
    pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107
        fn streams_config(config: &Config) -> streams::Config {
108
            streams::Config {
109
                initial_max_send_streams: config.initial_max_send_streams,
110
                local_max_buffer_size: config.max_send_buffer_size,
111
                local_next_stream_id: config.next_stream_id,
112
                local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
113
                extended_connect_protocol_enabled: config
114
                    .settings
115
                    .is_extended_connect_protocol_enabled()
116
                    .unwrap_or(false),
117
                local_reset_duration: config.reset_stream_duration,
118
                local_reset_max: config.reset_stream_max,
119
                remote_reset_max: config.remote_reset_stream_max,
120
                remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
121
                remote_max_initiated: config
122
                    .settings
123
                    .max_concurrent_streams()
124
                    .map(|max| max as usize),
125
                local_max_error_reset_streams: config.local_error_reset_streams_max,
126
            }
127
        }
128
12.2k
        let streams = Streams::new(streams_config(&config));
129
12.2k
        Connection {
130
12.2k
            codec,
131
12.2k
            inner: ConnectionInner {
132
12.2k
                state: State::Open,
133
12.2k
                error: None,
134
12.2k
                go_away: GoAway::new(),
135
12.2k
                ping_pong: PingPong::new(),
136
12.2k
                settings: Settings::new(config.settings),
137
12.2k
                streams,
138
12.2k
                span: tracing::debug_span!("Connection", peer = %P::NAME),
139
12.2k
                _phantom: PhantomData,
140
12.2k
            },
141
12.2k
        }
142
12.2k
    }
143
144
    /// connection flow control
145
0
    pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
146
0
        let _res = self.inner.streams.set_target_connection_window_size(size);
147
0
        // TODO: proper error handling
148
0
        debug_assert!(_res.is_ok());
149
0
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::set_target_window_size
Unexecuted instantiation: <h2::proto::connection::Connection<h2_support::mock::Mock, h2::client::Peer>>::set_target_window_size
Unexecuted instantiation: <h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::set_target_window_size
150
151
    /// Send a new SETTINGS frame with an updated initial window size.
152
0
    pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
153
0
        let mut settings = frame::Settings::default();
154
0
        settings.set_initial_window_size(Some(size));
155
0
        self.inner.settings.send_settings(settings)
156
0
    }
157
158
    /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
159
0
    pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
160
0
        let mut settings = frame::Settings::default();
161
0
        settings.set_enable_connect_protocol(Some(1));
162
0
        self.inner.settings.send_settings(settings)
163
0
    }
164
165
    /// Returns the maximum number of concurrent streams that may be initiated
166
    /// by this peer.
167
0
    pub(crate) fn max_send_streams(&self) -> usize {
168
0
        self.inner.streams.max_send_streams()
169
0
    }
170
171
    /// Returns the maximum number of concurrent streams that may be initiated
172
    /// by the remote peer.
173
0
    pub(crate) fn max_recv_streams(&self) -> usize {
174
0
        self.inner.streams.max_recv_streams()
175
0
    }
176
177
    #[cfg(feature = "unstable")]
178
0
    pub fn num_wired_streams(&self) -> usize {
179
0
        self.inner.streams.num_wired_streams()
180
0
    }
181
182
    /// Returns `Ready` when the connection is ready to receive a frame.
183
    ///
184
    /// Returns `Error` as this may raise errors that are caused by delayed
185
    /// processing of received frames.
186
1.71M
    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
187
1.71M
        let _e = self.inner.span.enter();
188
1.71M
        let span = tracing::trace_span!("poll_ready");
189
1.71M
        let _e = span.enter();
190
        // The order of these calls don't really matter too much
191
1.71M
        ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
192
1.71M
        ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
193
1.71M
        ready!(self
194
1.71M
            .inner
195
1.71M
            .settings
196
1.71M
            .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
197
1.71M
        ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
198
199
1.71M
        Poll::Ready(Ok(()))
200
1.71M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::poll_ready
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::poll_ready
Line
Count
Source
186
1.71M
    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
187
1.71M
        let _e = self.inner.span.enter();
188
1.71M
        let span = tracing::trace_span!("poll_ready");
189
1.71M
        let _e = span.enter();
190
        // The order of these calls don't really matter too much
191
1.71M
        ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
192
1.71M
        ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
193
1.71M
        ready!(self
194
1.71M
            .inner
195
1.71M
            .settings
196
1.71M
            .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
197
1.71M
        ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
198
199
1.71M
        Poll::Ready(Ok(()))
200
1.71M
    }
201
202
    /// Send any pending GOAWAY frames.
203
    ///
204
    /// This will return `Some(reason)` if the connection should be closed
205
    /// afterwards. If this is a graceful shutdown, this returns `None`.
206
1.71M
    fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
207
1.71M
        self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
208
1.71M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::poll_go_away
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::poll_go_away
Line
Count
Source
206
1.71M
    fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
207
1.71M
        self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
208
1.71M
    }
209
210
0
    pub fn go_away_from_user(&mut self, e: Reason) {
211
0
        self.inner.as_dyn().go_away_from_user(e)
212
0
    }
213
214
1.04k
    fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
215
1.04k
        let (debug_data, theirs) = self
216
1.04k
            .inner
217
1.04k
            .error
218
1.04k
            .take()
219
1.04k
            .as_ref()
220
1.04k
            .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
221
104
                (frame.debug_data().clone(), frame.reason())
222
1.04k
            });
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::take_error::{closure#0}
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::take_error::{closure#0}
Line
Count
Source
220
104
            .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
221
104
                (frame.debug_data().clone(), frame.reason())
222
104
            });
223
1.04k
224
1.04k
        match (ours, theirs) {
225
830
            (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
226
113
            (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
227
            // If both sides reported an error, give their
228
            // error back to th user. We assume our error
229
            // was a consequence of their error, and less
230
            // important.
231
101
            (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
232
        }
233
1.04k
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::take_error
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::take_error
Line
Count
Source
214
1.04k
    fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
215
1.04k
        let (debug_data, theirs) = self
216
1.04k
            .inner
217
1.04k
            .error
218
1.04k
            .take()
219
1.04k
            .as_ref()
220
1.04k
            .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
221
                (frame.debug_data().clone(), frame.reason())
222
1.04k
            });
223
1.04k
224
1.04k
        match (ours, theirs) {
225
830
            (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
226
113
            (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
227
            // If both sides reported an error, give their
228
            // error back to th user. We assume our error
229
            // was a consequence of their error, and less
230
            // important.
231
101
            (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
232
        }
233
1.04k
    }
234
235
    /// Closes the connection by transitioning to a GOAWAY state
236
    /// iff there are no streams or references
237
1.61M
    pub fn maybe_close_connection_if_no_streams(&mut self) {
238
1.61M
        // If we poll() and realize that there are no streams or references
239
1.61M
        // then we can close the connection by transitioning to GOAWAY
240
1.61M
        if !self.inner.streams.has_streams_or_other_references() {
241
0
            self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
242
1.61M
        }
243
1.61M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::maybe_close_connection_if_no_streams
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::maybe_close_connection_if_no_streams
Line
Count
Source
237
1.61M
    pub fn maybe_close_connection_if_no_streams(&mut self) {
238
1.61M
        // If we poll() and realize that there are no streams or references
239
1.61M
        // then we can close the connection by transitioning to GOAWAY
240
1.61M
        if !self.inner.streams.has_streams_or_other_references() {
241
0
            self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
242
1.61M
        }
243
1.61M
    }
244
245
    /// Checks if there are any streams
246
0
    pub fn has_streams(&self) -> bool {
247
0
        self.inner.streams.has_streams()
248
0
    }
249
250
    /// Checks if there are any streams or references left
251
3.22M
    pub fn has_streams_or_other_references(&self) -> bool {
252
3.22M
        // If we poll() and realize that there are no streams or references
253
3.22M
        // then we can close the connection by transitioning to GOAWAY
254
3.22M
        self.inner.streams.has_streams_or_other_references()
255
3.22M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::has_streams_or_other_references
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::has_streams_or_other_references
Line
Count
Source
251
3.22M
    pub fn has_streams_or_other_references(&self) -> bool {
252
3.22M
        // If we poll() and realize that there are no streams or references
253
3.22M
        // then we can close the connection by transitioning to GOAWAY
254
3.22M
        self.inner.streams.has_streams_or_other_references()
255
3.22M
    }
256
257
0
    pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
258
0
        self.inner.ping_pong.take_user_pings()
259
0
    }
260
261
    /// Advances the internal state of the connection.
262
1.61M
    pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
263
1.61M
        // XXX(eliza): cloning the span is unfortunately necessary here in
264
1.61M
        // order to placate the borrow checker — `self` is mutably borrowed by
265
1.61M
        // `poll2`, which means that we can't borrow `self.span` to enter it.
266
1.61M
        // The clone is just an atomic ref bump.
267
1.61M
        let span = self.inner.span.clone();
268
1.61M
        let _e = span.enter();
269
1.61M
        let span = tracing::trace_span!("poll");
270
1.61M
        let _e = span.enter();
271
272
        loop {
273
1.67M
            tracing::trace!(connection.state = ?self.inner.state);
274
            // TODO: probably clean up this glob of code
275
1.67M
            match self.inner.state {
276
                // When open, continue to poll a frame
277
                State::Open => {
278
1.66M
                    let result = match self.poll2(cx) {
279
60.9k
                        Poll::Ready(result) => result,
280
                        // The connection is not ready to make progress
281
                        Poll::Pending => {
282
                            // Ensure all window updates have been sent.
283
                            //
284
                            // This will also handle flushing `self.codec`
285
1.60M
                            ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
286
287
731k
                            if (self.inner.error.is_some()
288
730k
                                || self.inner.go_away.should_close_on_idle())
289
379
                                && !self.inner.streams.has_streams()
290
                            {
291
41
                                self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
292
41
                                continue;
293
730k
                            }
294
730k
295
730k
                            return Poll::Pending;
296
                        }
297
                    };
298
299
60.9k
                    self.inner.as_dyn().handle_poll2_result(result)?
300
                }
301
10.9k
                State::Closing(reason, initiator) => {
302
10.9k
                    tracing::trace!("connection closing after flush");
303
                    // Flush/shutdown the codec
304
10.9k
                    ready!(self.codec.shutdown(cx))?;
305
306
                    // Transition the state to error
307
1.04k
                    self.inner.state = State::Closed(reason, initiator);
308
                }
309
1.04k
                State::Closed(reason, initiator) => {
310
1.04k
                    return Poll::Ready(self.take_error(reason, initiator));
311
                }
312
            }
313
        }
314
1.61M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::poll
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::poll
Line
Count
Source
262
1.61M
    pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
263
1.61M
        // XXX(eliza): cloning the span is unfortunately necessary here in
264
1.61M
        // order to placate the borrow checker — `self` is mutably borrowed by
265
1.61M
        // `poll2`, which means that we can't borrow `self.span` to enter it.
266
1.61M
        // The clone is just an atomic ref bump.
267
1.61M
        let span = self.inner.span.clone();
268
1.61M
        let _e = span.enter();
269
1.61M
        let span = tracing::trace_span!("poll");
270
1.61M
        let _e = span.enter();
271
272
        loop {
273
1.67M
            tracing::trace!(connection.state = ?self.inner.state);
274
            // TODO: probably clean up this glob of code
275
1.67M
            match self.inner.state {
276
                // When open, continue to poll a frame
277
                State::Open => {
278
1.66M
                    let result = match self.poll2(cx) {
279
60.9k
                        Poll::Ready(result) => result,
280
                        // The connection is not ready to make progress
281
                        Poll::Pending => {
282
                            // Ensure all window updates have been sent.
283
                            //
284
                            // This will also handle flushing `self.codec`
285
1.60M
                            ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
286
287
731k
                            if (self.inner.error.is_some()
288
730k
                                || self.inner.go_away.should_close_on_idle())
289
379
                                && !self.inner.streams.has_streams()
290
                            {
291
41
                                self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
292
41
                                continue;
293
730k
                            }
294
730k
295
730k
                            return Poll::Pending;
296
                        }
297
                    };
298
299
60.9k
                    self.inner.as_dyn().handle_poll2_result(result)?
300
                }
301
10.9k
                State::Closing(reason, initiator) => {
302
10.9k
                    tracing::trace!("connection closing after flush");
303
                    // Flush/shutdown the codec
304
10.9k
                    ready!(self.codec.shutdown(cx))?;
305
306
                    // Transition the state to error
307
1.04k
                    self.inner.state = State::Closed(reason, initiator);
308
                }
309
1.04k
                State::Closed(reason, initiator) => {
310
1.04k
                    return Poll::Ready(self.take_error(reason, initiator));
311
                }
312
            }
313
        }
314
1.61M
    }
315
316
1.66M
    fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
317
1.66M
        // This happens outside of the loop to prevent needing to do a clock
318
1.66M
        // check and then comparison of the queue possibly multiple times a
319
1.66M
        // second (and thus, the clock wouldn't have changed enough to matter).
320
1.66M
        self.clear_expired_reset_streams();
321
322
        loop {
323
            // First, ensure that the `Connection` is able to receive a frame
324
            //
325
            // The order here matters:
326
            // - poll_go_away may buffer a graceful shutdown GOAWAY frame
327
            // - If it has, we've also added a PING to be sent in poll_ready
328
1.71M
            if let Some(reason) = ready!(self.poll_go_away(cx)?) {
329
5.77k
                if self.inner.go_away.should_close_now() {
330
5.77k
                    if self.inner.go_away.is_user_initiated() {
331
                        // A user initiated abrupt shutdown shouldn't return
332
                        // the same error back to the user.
333
0
                        return Poll::Ready(Ok(()));
334
                    } else {
335
5.77k
                        return Poll::Ready(Err(Error::library_go_away(reason)));
336
                    }
337
0
                }
338
0
                // Only NO_ERROR should be waiting for idle
339
0
                debug_assert_eq!(
340
                    reason,
341
                    Reason::NO_ERROR,
342
0
                    "graceful GOAWAY should be NO_ERROR"
343
                );
344
1.71M
            }
345
1.71M
            ready!(self.poll_ready(cx))?;
346
347
1.71M
            match self
348
1.71M
                .inner
349
1.71M
                .as_dyn()
350
1.71M
                .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
351
            {
352
7.46k
                ReceivedFrame::Settings(frame) => {
353
7.46k
                    self.inner.settings.recv_settings(
354
7.46k
                        frame,
355
7.46k
                        &mut self.codec,
356
7.46k
                        &mut self.inner.streams,
357
7.46k
                    )?;
358
                }
359
46.0k
                ReceivedFrame::Continue => (),
360
                ReceivedFrame::Done => {
361
4.66k
                    return Poll::Ready(Ok(()));
362
                }
363
            }
364
        }
365
1.66M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::poll2
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::poll2
Line
Count
Source
316
1.66M
    fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
317
1.66M
        // This happens outside of the loop to prevent needing to do a clock
318
1.66M
        // check and then comparison of the queue possibly multiple times a
319
1.66M
        // second (and thus, the clock wouldn't have changed enough to matter).
320
1.66M
        self.clear_expired_reset_streams();
321
322
        loop {
323
            // First, ensure that the `Connection` is able to receive a frame
324
            //
325
            // The order here matters:
326
            // - poll_go_away may buffer a graceful shutdown GOAWAY frame
327
            // - If it has, we've also added a PING to be sent in poll_ready
328
1.71M
            if let Some(reason) = ready!(self.poll_go_away(cx)?) {
329
5.77k
                if self.inner.go_away.should_close_now() {
330
5.77k
                    if self.inner.go_away.is_user_initiated() {
331
                        // A user initiated abrupt shutdown shouldn't return
332
                        // the same error back to the user.
333
0
                        return Poll::Ready(Ok(()));
334
                    } else {
335
5.77k
                        return Poll::Ready(Err(Error::library_go_away(reason)));
336
                    }
337
0
                }
338
0
                // Only NO_ERROR should be waiting for idle
339
0
                debug_assert_eq!(
340
                    reason,
341
                    Reason::NO_ERROR,
342
0
                    "graceful GOAWAY should be NO_ERROR"
343
                );
344
1.71M
            }
345
1.71M
            ready!(self.poll_ready(cx))?;
346
347
1.71M
            match self
348
1.71M
                .inner
349
1.71M
                .as_dyn()
350
1.71M
                .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
351
            {
352
7.46k
                ReceivedFrame::Settings(frame) => {
353
7.46k
                    self.inner.settings.recv_settings(
354
7.46k
                        frame,
355
7.46k
                        &mut self.codec,
356
7.46k
                        &mut self.inner.streams,
357
7.46k
                    )?;
358
                }
359
46.0k
                ReceivedFrame::Continue => (),
360
                ReceivedFrame::Done => {
361
4.66k
                    return Poll::Ready(Ok(()));
362
                }
363
            }
364
        }
365
1.66M
    }
366
367
1.66M
    fn clear_expired_reset_streams(&mut self) {
368
1.66M
        self.inner.streams.clear_expired_reset_streams();
369
1.66M
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _>>::clear_expired_reset_streams
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::clear_expired_reset_streams
Line
Count
Source
367
1.66M
    fn clear_expired_reset_streams(&mut self) {
368
1.66M
        self.inner.streams.clear_expired_reset_streams();
369
1.66M
    }
370
}
371
372
impl<P, B> ConnectionInner<P, B>
373
where
374
    P: Peer,
375
    B: Buf,
376
{
377
1.77M
    fn as_dyn(&mut self) -> DynConnection<'_, B> {
378
1.77M
        let ConnectionInner {
379
1.77M
            state,
380
1.77M
            go_away,
381
1.77M
            streams,
382
1.77M
            error,
383
1.77M
            ping_pong,
384
1.77M
            ..
385
1.77M
        } = self;
386
1.77M
        let streams = streams.as_dyn();
387
1.77M
        DynConnection {
388
1.77M
            state,
389
1.77M
            go_away,
390
1.77M
            streams,
391
1.77M
            error,
392
1.77M
            ping_pong,
393
1.77M
        }
394
1.77M
    }
Unexecuted instantiation: <h2::proto::connection::ConnectionInner<_, _>>::as_dyn
<h2::proto::connection::ConnectionInner<h2::client::Peer>>::as_dyn
Line
Count
Source
377
1.77M
    fn as_dyn(&mut self) -> DynConnection<'_, B> {
378
1.77M
        let ConnectionInner {
379
1.77M
            state,
380
1.77M
            go_away,
381
1.77M
            streams,
382
1.77M
            error,
383
1.77M
            ping_pong,
384
1.77M
            ..
385
1.77M
        } = self;
386
1.77M
        let streams = streams.as_dyn();
387
1.77M
        DynConnection {
388
1.77M
            state,
389
1.77M
            go_away,
390
1.77M
            streams,
391
1.77M
            error,
392
1.77M
            ping_pong,
393
1.77M
        }
394
1.77M
    }
395
}
396
397
impl<B> DynConnection<'_, B>
398
where
399
    B: Buf,
400
{
401
0
    fn go_away(&mut self, id: StreamId, e: Reason) {
402
0
        let frame = frame::GoAway::new(id, e);
403
0
        self.streams.send_go_away(id);
404
0
        self.go_away.go_away(frame);
405
0
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::go_away
Unexecuted instantiation: <h2::proto::connection::DynConnection>::go_away
406
407
41
    fn go_away_now(&mut self, e: Reason) {
408
41
        let last_processed_id = self.streams.last_processed_id();
409
41
        let frame = frame::GoAway::new(last_processed_id, e);
410
41
        self.go_away.go_away_now(frame);
411
41
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::go_away_now
<h2::proto::connection::DynConnection>::go_away_now
Line
Count
Source
407
41
    fn go_away_now(&mut self, e: Reason) {
408
41
        let last_processed_id = self.streams.last_processed_id();
409
41
        let frame = frame::GoAway::new(last_processed_id, e);
410
41
        self.go_away.go_away_now(frame);
411
41
    }
412
413
5.90k
    fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
414
5.90k
        let last_processed_id = self.streams.last_processed_id();
415
5.90k
        let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
416
5.90k
        self.go_away.go_away_now(frame);
417
5.90k
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::go_away_now_data
<h2::proto::connection::DynConnection>::go_away_now_data
Line
Count
Source
413
5.90k
    fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
414
5.90k
        let last_processed_id = self.streams.last_processed_id();
415
5.90k
        let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
416
5.90k
        self.go_away.go_away_now(frame);
417
5.90k
    }
418
419
0
    fn go_away_from_user(&mut self, e: Reason) {
420
0
        let last_processed_id = self.streams.last_processed_id();
421
0
        let frame = frame::GoAway::new(last_processed_id, e);
422
0
        self.go_away.go_away_from_user(frame);
423
0
424
0
        // Notify all streams of reason we're abruptly closing.
425
0
        self.streams.handle_error(Error::user_go_away(e));
426
0
    }
427
428
60.9k
    fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
429
56.2k
        match result {
430
            // The connection has shutdown normally
431
            Ok(()) => {
432
4.66k
                *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
433
4.66k
                Ok(())
434
            }
435
            // Attempting to read a frame resulted in a connection level
436
            // error. This is handled by setting a GOAWAY frame followed by
437
            // terminating the connection.
438
11.6k
            Err(Error::GoAway(debug_data, reason, initiator)) => {
439
11.6k
                self.handle_go_away(reason, debug_data, initiator);
440
11.6k
                Ok(())
441
            }
442
            // Attempting to read a frame resulted in a stream level error.
443
            // This is handled by resetting the frame then trying to read
444
            // another frame.
445
43.5k
            Err(Error::Reset(id, reason, initiator)) => {
446
43.5k
                debug_assert_eq!(initiator, Initiator::Library);
447
43.5k
                tracing::trace!(?id, ?reason, "stream error");
448
43.5k
                match self.streams.send_reset(id, reason) {
449
43.5k
                    Ok(()) => (),
450
0
                    Err(crate::proto::error::GoAway { debug_data, reason }) => {
451
0
                        self.handle_go_away(reason, debug_data, Initiator::Library);
452
0
                    }
453
                }
454
43.5k
                Ok(())
455
            }
456
            // Attempting to read a frame resulted in an I/O error. All
457
            // active streams must be reset.
458
            //
459
            // TODO: Are I/O errors recoverable?
460
1.08k
            Err(Error::Io(kind, inner)) => {
461
1.08k
                tracing::debug!(error = ?kind, "Connection::poll; IO error");
462
1.08k
                let e = Error::Io(kind, inner);
463
1.08k
464
1.08k
                // Reset all active streams
465
1.08k
                self.streams.handle_error(e.clone());
466
1.08k
467
1.08k
                // Some client implementations drop the connections without notifying its peer
468
1.08k
                // Attempting to read after the client dropped the connection results in UnexpectedEof
469
1.08k
                // If as a server, we don't have anything more to send, just close the connection
470
1.08k
                // without error
471
1.08k
                //
472
1.08k
                // See https://github.com/hyperium/hyper/issues/3427
473
1.08k
                if self.streams.is_buffer_empty()
474
1.08k
                    && matches!(kind, io::ErrorKind::UnexpectedEof)
475
0
                    && (self.streams.is_server()
476
0
                        || self.error.as_ref().map(|f| f.reason() == Reason::NO_ERROR)
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::handle_poll2_result::{closure#0}
Unexecuted instantiation: <h2::proto::connection::DynConnection>::handle_poll2_result::{closure#0}
477
0
                            == Some(true))
478
                {
479
0
                    *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
480
0
                    return Ok(());
481
1.08k
                }
482
1.08k
483
1.08k
                // Return the error
484
1.08k
                Err(e)
485
            }
486
        }
487
60.9k
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::handle_poll2_result
<h2::proto::connection::DynConnection>::handle_poll2_result
Line
Count
Source
428
60.9k
    fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
429
56.2k
        match result {
430
            // The connection has shutdown normally
431
            Ok(()) => {
432
4.66k
                *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
433
4.66k
                Ok(())
434
            }
435
            // Attempting to read a frame resulted in a connection level
436
            // error. This is handled by setting a GOAWAY frame followed by
437
            // terminating the connection.
438
11.6k
            Err(Error::GoAway(debug_data, reason, initiator)) => {
439
11.6k
                self.handle_go_away(reason, debug_data, initiator);
440
11.6k
                Ok(())
441
            }
442
            // Attempting to read a frame resulted in a stream level error.
443
            // This is handled by resetting the frame then trying to read
444
            // another frame.
445
43.5k
            Err(Error::Reset(id, reason, initiator)) => {
446
43.5k
                debug_assert_eq!(initiator, Initiator::Library);
447
43.5k
                tracing::trace!(?id, ?reason, "stream error");
448
43.5k
                match self.streams.send_reset(id, reason) {
449
43.5k
                    Ok(()) => (),
450
0
                    Err(crate::proto::error::GoAway { debug_data, reason }) => {
451
0
                        self.handle_go_away(reason, debug_data, Initiator::Library);
452
0
                    }
453
                }
454
43.5k
                Ok(())
455
            }
456
            // Attempting to read a frame resulted in an I/O error. All
457
            // active streams must be reset.
458
            //
459
            // TODO: Are I/O errors recoverable?
460
1.08k
            Err(Error::Io(kind, inner)) => {
461
1.08k
                tracing::debug!(error = ?kind, "Connection::poll; IO error");
462
1.08k
                let e = Error::Io(kind, inner);
463
1.08k
464
1.08k
                // Reset all active streams
465
1.08k
                self.streams.handle_error(e.clone());
466
1.08k
467
1.08k
                // Some client implementations drop the connections without notifying its peer
468
1.08k
                // Attempting to read after the client dropped the connection results in UnexpectedEof
469
1.08k
                // If as a server, we don't have anything more to send, just close the connection
470
1.08k
                // without error
471
1.08k
                //
472
1.08k
                // See https://github.com/hyperium/hyper/issues/3427
473
1.08k
                if self.streams.is_buffer_empty()
474
1.08k
                    && matches!(kind, io::ErrorKind::UnexpectedEof)
475
0
                    && (self.streams.is_server()
476
0
                        || self.error.as_ref().map(|f| f.reason() == Reason::NO_ERROR)
477
0
                            == Some(true))
478
                {
479
0
                    *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
480
0
                    return Ok(());
481
1.08k
                }
482
1.08k
483
1.08k
                // Return the error
484
1.08k
                Err(e)
485
            }
486
        }
487
60.9k
    }
488
489
11.6k
    fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
490
11.6k
        let e = Error::GoAway(debug_data.clone(), reason, initiator);
491
11.6k
        tracing::debug!(error = ?e, "Connection::poll; connection error");
492
493
        // We may have already sent a GOAWAY for this error,
494
        // if so, don't send another, just flush and close up.
495
11.6k
        if self
496
11.6k
            .go_away
497
11.6k
            .going_away()
498
11.6k
            .map_or(false, |frame| frame.reason() == reason)
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::handle_go_away::{closure#0}
<h2::proto::connection::DynConnection>::handle_go_away::{closure#0}
Line
Count
Source
498
5.77k
            .map_or(false, |frame| frame.reason() == reason)
499
        {
500
5.77k
            tracing::trace!("    -> already going away");
501
5.77k
            *self.state = State::Closing(reason, initiator);
502
5.77k
            return;
503
5.90k
        }
504
5.90k
505
5.90k
        // Reset all active streams
506
5.90k
        self.streams.handle_error(e);
507
5.90k
        self.go_away_now_data(reason, debug_data);
508
11.6k
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::handle_go_away
<h2::proto::connection::DynConnection>::handle_go_away
Line
Count
Source
489
11.6k
    fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
490
11.6k
        let e = Error::GoAway(debug_data.clone(), reason, initiator);
491
11.6k
        tracing::debug!(error = ?e, "Connection::poll; connection error");
492
493
        // We may have already sent a GOAWAY for this error,
494
        // if so, don't send another, just flush and close up.
495
11.6k
        if self
496
11.6k
            .go_away
497
11.6k
            .going_away()
498
11.6k
            .map_or(false, |frame| frame.reason() == reason)
499
        {
500
5.77k
            tracing::trace!("    -> already going away");
501
5.77k
            *self.state = State::Closing(reason, initiator);
502
5.77k
            return;
503
5.90k
        }
504
5.90k
505
5.90k
        // Reset all active streams
506
5.90k
        self.streams.handle_error(e);
507
5.90k
        self.go_away_now_data(reason, debug_data);
508
11.6k
    }
509
510
74.5k
    fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
511
        use crate::frame::Frame::*;
512
69.8k
        match frame {
513
9.48k
            Some(Headers(frame)) => {
514
9.48k
                tracing::trace!(?frame, "recv HEADERS");
515
9.48k
                self.streams.recv_headers(frame)?;
516
            }
517
44.3k
            Some(Data(frame)) => {
518
44.3k
                tracing::trace!(?frame, "recv DATA");
519
44.3k
                self.streams.recv_data(frame)?;
520
            }
521
1.29k
            Some(Reset(frame)) => {
522
1.29k
                tracing::trace!(?frame, "recv RST_STREAM");
523
1.29k
                self.streams.recv_reset(frame)?;
524
            }
525
1.18k
            Some(PushPromise(frame)) => {
526
1.18k
                tracing::trace!(?frame, "recv PUSH_PROMISE");
527
1.18k
                self.streams.recv_push_promise(frame)?;
528
            }
529
7.46k
            Some(Settings(frame)) => {
530
7.46k
                tracing::trace!(?frame, "recv SETTINGS");
531
7.46k
                return Ok(ReceivedFrame::Settings(frame));
532
            }
533
3.86k
            Some(GoAway(frame)) => {
534
3.86k
                tracing::trace!(?frame, "recv GOAWAY");
535
                // This should prevent starting new streams,
536
                // but should allow continuing to process current streams
537
                // until they are all EOS. Once they are, State should
538
                // transition to GoAway.
539
3.86k
                self.streams.recv_go_away(&frame)?;
540
3.86k
                *self.error = Some(frame);
541
            }
542
311
            Some(Ping(frame)) => {
543
311
                tracing::trace!(?frame, "recv PING");
544
311
                let status = self.ping_pong.recv_ping(frame);
545
311
                if status.is_shutdown() {
546
0
                    assert!(
547
0
                        self.go_away.is_going_away(),
548
0
                        "received unexpected shutdown ping"
549
                    );
550
551
0
                    let last_processed_id = self.streams.last_processed_id();
552
0
                    self.go_away(last_processed_id, Reason::NO_ERROR);
553
311
                }
554
            }
555
1.83k
            Some(WindowUpdate(frame)) => {
556
1.83k
                tracing::trace!(?frame, "recv WINDOW_UPDATE");
557
1.83k
                self.streams.recv_window_update(frame)?;
558
            }
559
102
            Some(Priority(frame)) => {
560
102
                tracing::trace!(?frame, "recv PRIORITY");
561
                // TODO: handle
562
            }
563
            None => {
564
4.66k
                tracing::trace!("codec closed");
565
4.66k
                self.streams.recv_eof(false).expect("mutex poisoned");
566
4.66k
                return Ok(ReceivedFrame::Done);
567
            }
568
        }
569
46.0k
        Ok(ReceivedFrame::Continue)
570
74.5k
    }
Unexecuted instantiation: <h2::proto::connection::DynConnection<_>>::recv_frame
<h2::proto::connection::DynConnection>::recv_frame
Line
Count
Source
510
74.5k
    fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
511
        use crate::frame::Frame::*;
512
69.8k
        match frame {
513
9.48k
            Some(Headers(frame)) => {
514
9.48k
                tracing::trace!(?frame, "recv HEADERS");
515
9.48k
                self.streams.recv_headers(frame)?;
516
            }
517
44.3k
            Some(Data(frame)) => {
518
44.3k
                tracing::trace!(?frame, "recv DATA");
519
44.3k
                self.streams.recv_data(frame)?;
520
            }
521
1.29k
            Some(Reset(frame)) => {
522
1.29k
                tracing::trace!(?frame, "recv RST_STREAM");
523
1.29k
                self.streams.recv_reset(frame)?;
524
            }
525
1.18k
            Some(PushPromise(frame)) => {
526
1.18k
                tracing::trace!(?frame, "recv PUSH_PROMISE");
527
1.18k
                self.streams.recv_push_promise(frame)?;
528
            }
529
7.46k
            Some(Settings(frame)) => {
530
7.46k
                tracing::trace!(?frame, "recv SETTINGS");
531
7.46k
                return Ok(ReceivedFrame::Settings(frame));
532
            }
533
3.86k
            Some(GoAway(frame)) => {
534
3.86k
                tracing::trace!(?frame, "recv GOAWAY");
535
                // This should prevent starting new streams,
536
                // but should allow continuing to process current streams
537
                // until they are all EOS. Once they are, State should
538
                // transition to GoAway.
539
3.86k
                self.streams.recv_go_away(&frame)?;
540
3.86k
                *self.error = Some(frame);
541
            }
542
311
            Some(Ping(frame)) => {
543
311
                tracing::trace!(?frame, "recv PING");
544
311
                let status = self.ping_pong.recv_ping(frame);
545
311
                if status.is_shutdown() {
546
0
                    assert!(
547
0
                        self.go_away.is_going_away(),
548
0
                        "received unexpected shutdown ping"
549
                    );
550
551
0
                    let last_processed_id = self.streams.last_processed_id();
552
0
                    self.go_away(last_processed_id, Reason::NO_ERROR);
553
311
                }
554
            }
555
1.83k
            Some(WindowUpdate(frame)) => {
556
1.83k
                tracing::trace!(?frame, "recv WINDOW_UPDATE");
557
1.83k
                self.streams.recv_window_update(frame)?;
558
            }
559
102
            Some(Priority(frame)) => {
560
102
                tracing::trace!(?frame, "recv PRIORITY");
561
                // TODO: handle
562
            }
563
            None => {
564
4.66k
                tracing::trace!("codec closed");
565
4.66k
                self.streams.recv_eof(false).expect("mutex poisoned");
566
4.66k
                return Ok(ReceivedFrame::Done);
567
            }
568
        }
569
46.0k
        Ok(ReceivedFrame::Continue)
570
74.5k
    }
571
}
572
573
enum ReceivedFrame {
574
    Settings(frame::Settings),
575
    Continue,
576
    Done,
577
}
578
579
impl<T, B> Connection<T, client::Peer, B>
580
where
581
    T: AsyncRead + AsyncWrite,
582
    B: Buf,
583
{
584
12.6k
    pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
585
12.6k
        &self.inner.streams
586
12.6k
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, h2::client::Peer, _>>::streams
<h2::proto::connection::Connection<h2_support::mock::Mock, h2::client::Peer>>::streams
Line
Count
Source
584
411
    pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
585
411
        &self.inner.streams
586
411
    }
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer>>::streams
Line
Count
Source
584
12.2k
    pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
585
12.2k
        &self.inner.streams
586
12.2k
    }
587
}
588
589
impl<T, B> Connection<T, server::Peer, B>
590
where
591
    T: AsyncRead + AsyncWrite + Unpin,
592
    B: Buf,
593
{
594
0
    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
595
0
        self.inner.streams.next_incoming()
596
0
    }
597
598
    // Graceful shutdown only makes sense for server peers.
599
0
    pub fn go_away_gracefully(&mut self) {
600
0
        if self.inner.go_away.is_going_away() {
601
            // No reason to start a new one.
602
0
            return;
603
0
        }
604
0
605
0
        // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
606
0
        //
607
0
        // > A server that is attempting to gracefully shut down a connection
608
0
        // > SHOULD send an initial GOAWAY frame with the last stream
609
0
        // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
610
0
        // > client that a shutdown is imminent and that initiating further
611
0
        // > requests is prohibited. After allowing time for any in-flight
612
0
        // > stream creation (at least one round-trip time), the server can
613
0
        // > send another GOAWAY frame with an updated last stream identifier.
614
0
        // > This ensures that a connection can be cleanly shut down without
615
0
        // > losing requests.
616
0
        self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
617
0
618
0
        // We take the advice of waiting 1 RTT literally, and wait
619
0
        // for a pong before proceeding.
620
0
        self.inner.ping_pong.ping_shutdown();
621
0
    }
622
}
623
624
impl<T, P, B> Drop for Connection<T, P, B>
625
where
626
    P: Peer,
627
    B: Buf,
628
{
629
12.6k
    fn drop(&mut self) {
630
12.6k
        // Ignore errors as this indicates that the mutex is poisoned.
631
12.6k
        let _ = self.inner.streams.recv_eof(true);
632
12.6k
    }
Unexecuted instantiation: <h2::proto::connection::Connection<_, _, _> as core::ops::drop::Drop>::drop
<h2::proto::connection::Connection<h2_support::mock::Mock, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
629
411
    fn drop(&mut self) {
630
411
        // Ignore errors as this indicates that the mutex is poisoned.
631
411
        let _ = self.inner.streams.recv_eof(true);
632
411
    }
<h2::proto::connection::Connection<fuzz_e2e::MockIo, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
629
12.2k
    fn drop(&mut self) {
630
12.2k
        // Ignore errors as this indicates that the mutex is poisoned.
631
12.2k
        let _ = self.inner.streams.recv_eof(true);
632
12.2k
    }
633
}