Coverage Report

Created: 2024-12-17 06:15

/rust/registry/src/index.crates.io-6f17d22bba15001f/h2-0.3.26/src/server.rs
Line
Count
Source (jump to first uncovered line)
1
//! Server implementation of the HTTP/2 protocol.
2
//!
3
//! # Getting started
4
//!
5
//! Running an HTTP/2 server requires the caller to manage accepting the
6
//! connections as well as getting the connections to a state that is ready to
7
//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8
//! details.
9
//!
10
//! This could be as basic as using Tokio's [`TcpListener`] to accept
11
//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12
//! upgrades.
13
//!
14
//! Once a connection is obtained, it is passed to [`handshake`],
15
//! which will begin the [HTTP/2 handshake]. This returns a future that
16
//! completes once the handshake process is performed and HTTP/2 streams may
17
//! be received.
18
//!
19
//! [`handshake`] uses default configuration values. There are a number of
20
//! settings that can be changed by using [`Builder`] instead.
21
//!
22
//! # Inbound streams
23
//!
24
//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25
//! does this by implementing [`futures::Stream`]. When a new stream is
26
//! received, a call to [`Connection::accept`] will return `(request, response)`.
27
//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28
//! HTTP request head as well as provides a way to receive the inbound data
29
//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30
//! allows responding to the request, stream the response payload, send
31
//! trailers, and send push promises.
32
//!
33
//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34
//! can be operated independently.
35
//!
36
//! # Managing the connection
37
//!
38
//! The [`Connection`] instance is used to manage connection state. The caller
39
//! is required to call either [`Connection::accept`] or
40
//! [`Connection::poll_close`] in order to advance the connection state. Simply
41
//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42
//! connection state is advanced.
43
//!
44
//! It is not required to call **both** [`Connection::accept`] and
45
//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46
//! then only [`Connection::accept`] should be called. When the caller **does
47
//! not** want to accept a new stream, [`Connection::poll_close`] should be
48
//! called.
49
//!
50
//! The [`Connection`] instance should only be dropped once
51
//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52
//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53
//! this point, only [`Connection::poll_close`] should be called.
54
//!
55
//! # Shutting down the server
56
//!
57
//! Graceful shutdown of the server is [not yet
58
//! implemented](https://github.com/hyperium/h2/issues/69).
59
//!
60
//! # Example
61
//!
62
//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63
//! knowledge], i.e. both the client and the server assume that the TCP socket
64
//! will use the HTTP/2 protocol without prior negotiation.
65
//!
66
//! ```no_run
67
//! use h2::server;
68
//! use http::{Response, StatusCode};
69
//! use tokio::net::TcpListener;
70
//!
71
//! #[tokio::main]
72
//! pub async fn main() {
73
//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74
//!
75
//!     // Accept all incoming TCP connections.
76
//!     loop {
77
//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78
//!             // Spawn a new task to process each connection.
79
//!             tokio::spawn(async {
80
//!                 // Start the HTTP/2 connection handshake
81
//!                 let mut h2 = server::handshake(socket).await.unwrap();
82
//!                 // Accept all inbound HTTP/2 streams sent over the
83
//!                 // connection.
84
//!                 while let Some(request) = h2.accept().await {
85
//!                     let (request, mut respond) = request.unwrap();
86
//!                     println!("Received request: {:?}", request);
87
//!
88
//!                     // Build a response with no body
89
//!                     let response = Response::builder()
90
//!                         .status(StatusCode::OK)
91
//!                         .body(())
92
//!                         .unwrap();
93
//!
94
//!                     // Send the response back to the client
95
//!                     respond.send_response(response, true)
96
//!                         .unwrap();
97
//!                 }
98
//!
99
//!             });
100
//!         }
101
//!     }
102
//! }
103
//! ```
104
//!
105
//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106
//! [`handshake`]: fn.handshake.html
107
//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108
//! [`Builder`]: struct.Builder.html
109
//! [`Connection`]: struct.Connection.html
110
//! [`Connection::poll`]: struct.Connection.html#method.poll
111
//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112
//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113
//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114
//! [`RecvStream`]: ../struct.RecvStream.html
115
//! [`SendStream`]: ../struct.SendStream.html
116
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118
use crate::codec::{Codec, UserError};
119
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120
use crate::proto::{self, Config, Error, Prioritized};
121
use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123
use bytes::{Buf, Bytes};
124
use http::{HeaderMap, Method, Request, Response};
125
use std::future::Future;
126
use std::pin::Pin;
127
use std::task::{Context, Poll};
128
use std::time::Duration;
129
use std::{fmt, io};
130
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131
use tracing::instrument::{Instrument, Instrumented};
132
133
/// In progress HTTP/2 connection handshake future.
134
///
135
/// This type implements `Future`, yielding a `Connection` instance once the
136
/// handshake has completed.
137
///
138
/// The handshake is completed once the connection preface is fully received
139
/// from the client **and** the initial settings frame is sent to the client.
140
///
141
/// The handshake future does not wait for the initial settings frame from the
142
/// client.
143
///
144
/// See [module] level docs for more details.
145
///
146
/// [module]: index.html
147
#[must_use = "futures do nothing unless polled"]
148
pub struct Handshake<T, B: Buf = Bytes> {
149
    /// The config to pass to Connection::new after handshake succeeds.
150
    builder: Builder,
151
    /// The current state of the handshake.
152
    state: Handshaking<T, B>,
153
    /// Span tracking the handshake
154
    span: tracing::Span,
155
}
156
157
/// Accepts inbound HTTP/2 streams on a connection.
158
///
159
/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160
/// implements the HTTP/2 server logic for that connection. It is responsible
161
/// for receiving inbound streams initiated by the client as well as driving the
162
/// internal state forward.
163
///
164
/// `Connection` values are created by calling [`handshake`]. Once a
165
/// `Connection` value is obtained, the caller must call [`poll`] or
166
/// [`poll_close`] in order to drive the internal connection state forward.
167
///
168
/// See [module level] documentation for more details
169
///
170
/// [module level]: index.html
171
/// [`handshake`]: struct.Connection.html#method.handshake
172
/// [`poll`]: struct.Connection.html#method.poll
173
/// [`poll_close`]: struct.Connection.html#method.poll_close
174
///
175
/// # Examples
176
///
177
/// ```
178
/// # use tokio::io::{AsyncRead, AsyncWrite};
179
/// # use h2::server;
180
/// # use h2::server::*;
181
/// #
182
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183
/// let mut server = server::handshake(my_io).await.unwrap();
184
/// while let Some(request) = server.accept().await {
185
///     tokio::spawn(async move {
186
///         let (request, respond) = request.unwrap();
187
///         // Process the request and send the response back to the client
188
///         // using `respond`.
189
///     });
190
/// }
191
/// # }
192
/// #
193
/// # pub fn main() {}
194
/// ```
195
#[must_use = "streams do nothing unless polled"]
196
pub struct Connection<T, B: Buf> {
197
    connection: proto::Connection<T, Peer, B>,
198
}
199
200
/// Builds server connections with custom configuration values.
201
///
202
/// Methods can be chained in order to set the configuration values.
203
///
204
/// The server is constructed by calling [`handshake`] and passing the I/O
205
/// handle that will back the HTTP/2 server.
206
///
207
/// New instances of `Builder` are obtained via [`Builder::new`].
208
///
209
/// See function level documentation for details on the various server
210
/// configuration settings.
211
///
212
/// [`Builder::new`]: struct.Builder.html#method.new
213
/// [`handshake`]: struct.Builder.html#method.handshake
214
///
215
/// # Examples
216
///
217
/// ```
218
/// # use tokio::io::{AsyncRead, AsyncWrite};
219
/// # use h2::server::*;
220
/// #
221
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222
/// # -> Handshake<T>
223
/// # {
224
/// // `server_fut` is a future representing the completion of the HTTP/2
225
/// // handshake.
226
/// let server_fut = Builder::new()
227
///     .initial_window_size(1_000_000)
228
///     .max_concurrent_streams(1000)
229
///     .handshake(my_io);
230
/// # server_fut
231
/// # }
232
/// #
233
/// # pub fn main() {}
234
/// ```
235
#[derive(Clone, Debug)]
236
pub struct Builder {
237
    /// Time to keep locally reset streams around before reaping.
238
    reset_stream_duration: Duration,
239
240
    /// Maximum number of locally reset streams to keep at a time.
241
    reset_stream_max: usize,
242
243
    /// Maximum number of remotely reset streams to allow in the pending
244
    /// accept queue.
245
    pending_accept_reset_stream_max: usize,
246
247
    /// Initial `Settings` frame to send as part of the handshake.
248
    settings: Settings,
249
250
    /// Initial target window size for new connections.
251
    initial_target_connection_window_size: Option<u32>,
252
253
    /// Maximum amount of bytes to "buffer" for writing per stream.
254
    max_send_buffer_size: usize,
255
256
    /// Maximum number of locally reset streams due to protocol error across
257
    /// the lifetime of the connection.
258
    ///
259
    /// When this gets exceeded, we issue GOAWAYs.
260
    local_max_error_reset_streams: Option<usize>,
261
}
262
263
/// Send a response back to the client
264
///
265
/// A `SendResponse` instance is provided when receiving a request and is used
266
/// to send the associated response back to the client. It is also used to
267
/// explicitly reset the stream with a custom reason.
268
///
269
/// It will also be used to initiate push promises linked with the associated
270
/// stream.
271
///
272
/// If the `SendResponse` instance is dropped without sending a response, then
273
/// the HTTP/2 stream will be reset.
274
///
275
/// See [module] level docs for more details.
276
///
277
/// [module]: index.html
278
#[derive(Debug)]
279
pub struct SendResponse<B: Buf> {
280
    inner: proto::StreamRef<B>,
281
}
282
283
/// Send a response to a promised request
284
///
285
/// A `SendPushedResponse` instance is provided when promising a request and is used
286
/// to send the associated response to the client. It is also used to
287
/// explicitly reset the stream with a custom reason.
288
///
289
/// It can not be used to initiate push promises.
290
///
291
/// If the `SendPushedResponse` instance is dropped without sending a response, then
292
/// the HTTP/2 stream will be reset.
293
///
294
/// See [module] level docs for more details.
295
///
296
/// [module]: index.html
297
pub struct SendPushedResponse<B: Buf> {
298
    inner: SendResponse<B>,
299
}
300
301
// Manual implementation necessary because of rust-lang/rust#26925
302
impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304
0
        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305
0
    }
Unexecuted instantiation: <h2::server::SendPushedResponse<_> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::SendPushedResponse<_> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::SendPushedResponse<_> as core::fmt::Debug>::fmt
306
}
307
308
/// Stages of an in-progress handshake.
309
enum Handshaking<T, B: Buf> {
310
    /// State 1. Connection is flushing pending SETTINGS frame.
311
    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312
    /// State 2. Connection is waiting for the client preface.
313
    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314
    /// State 3. Handshake is done, polling again would panic.
315
    Done,
316
}
317
318
/// Flush a Sink
319
struct Flush<T, B> {
320
    codec: Option<Codec<T, B>>,
321
}
322
323
/// Read the client connection preface
324
struct ReadPreface<T, B> {
325
    codec: Option<Codec<T, B>>,
326
    pos: usize,
327
}
328
329
#[derive(Debug)]
330
pub(crate) struct Peer;
331
332
const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334
/// Creates a new configured HTTP/2 server with default configuration
335
/// values backed by `io`.
336
///
337
/// It is expected that `io` already be in an appropriate state to commence
338
/// the [HTTP/2 handshake]. See [Handshake] for more details.
339
///
340
/// Returns a future which resolves to the [`Connection`] instance once the
341
/// HTTP/2 handshake has been completed. The returned [`Connection`]
342
/// instance will be using default configuration values. Use [`Builder`] to
343
/// customize the configuration values used by a [`Connection`] instance.
344
///
345
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346
/// [Handshake]: ../index.html#handshake
347
/// [`Connection`]: struct.Connection.html
348
///
349
/// # Examples
350
///
351
/// ```
352
/// # use tokio::io::{AsyncRead, AsyncWrite};
353
/// # use h2::server;
354
/// # use h2::server::*;
355
/// #
356
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357
/// # {
358
/// let connection = server::handshake(my_io).await.unwrap();
359
/// // The HTTP/2 handshake has completed, now use `connection` to
360
/// // accept inbound HTTP/2 streams.
361
/// # }
362
/// #
363
/// # pub fn main() {}
364
/// ```
365
0
pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366
0
where
367
0
    T: AsyncRead + AsyncWrite + Unpin,
368
0
{
369
0
    Builder::new().handshake(io)
370
0
}
Unexecuted instantiation: h2::server::handshake::<_>
Unexecuted instantiation: h2::server::handshake::<_>
Unexecuted instantiation: h2::server::handshake::<_>
371
372
// ===== impl Connection =====
373
374
impl<T, B> Connection<T, B>
375
where
376
    T: AsyncRead + AsyncWrite + Unpin,
377
    B: Buf,
378
{
379
0
    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380
0
        let span = tracing::trace_span!("server_handshake");
381
0
        let entered = span.enter();
382
0
383
0
        // Create the codec.
384
0
        let mut codec = Codec::new(io);
385
386
0
        if let Some(max) = builder.settings.max_frame_size() {
387
0
            codec.set_max_recv_frame_size(max as usize);
388
0
        }
389
390
0
        if let Some(max) = builder.settings.max_header_list_size() {
391
0
            codec.set_max_recv_header_list_size(max as usize);
392
0
        }
393
394
        // Send initial settings frame.
395
0
        codec
396
0
            .buffer(builder.settings.clone().into())
397
0
            .expect("invalid SETTINGS frame");
398
399
        // Create the handshake future.
400
0
        let state =
401
0
            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403
0
        drop(entered);
404
0
405
0
        Handshake {
406
0
            builder,
407
0
            state,
408
0
            span,
409
0
        }
410
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::handshake2
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::handshake2
Unexecuted instantiation: <h2::server::Connection<_, _>>::handshake2
Unexecuted instantiation: <h2::server::Connection<_, _>>::handshake2
411
412
    /// Accept the next incoming request on this connection.
413
0
    pub async fn accept(
414
0
        &mut self,
415
0
    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept
416
0
        futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}::{closure#0}
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}::{closure#0}
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}::{closure#0}
417
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}
Unexecuted instantiation: <h2::server::Connection<_, _>>::accept::{closure#0}
418
419
    #[doc(hidden)]
420
0
    pub fn poll_accept(
421
0
        &mut self,
422
0
        cx: &mut Context<'_>,
423
0
    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424
0
        // Always try to advance the internal state. Getting Pending also is
425
0
        // needed to allow this function to return Pending.
426
0
        if self.poll_closed(cx)?.is_ready() {
427
            // If the socket is closed, don't return anything
428
            // TODO: drop any pending streams
429
0
            return Poll::Ready(None);
430
0
        }
431
432
0
        if let Some(inner) = self.connection.next_incoming() {
433
0
            tracing::trace!("received incoming");
434
0
            let (head, _) = inner.take_request().into_parts();
435
0
            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
0
437
0
            let request = Request::from_parts(head, body);
438
0
            let respond = SendResponse { inner };
439
0
440
0
            return Poll::Ready(Some(Ok((request, respond))));
441
0
        }
442
0
443
0
        Poll::Pending
444
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_accept
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::poll_accept
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_accept
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_accept
445
446
    /// Sets the target window size for the whole connection.
447
    ///
448
    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449
    /// frame will be immediately sent to the remote, increasing the connection
450
    /// level window by `size - current_value`.
451
    ///
452
    /// If `size` is less than the current value, nothing will happen
453
    /// immediately. However, as window capacity is released by
454
    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455
    /// out until the number of "in flight" bytes drops below `size`.
456
    ///
457
    /// The default value is 65,535.
458
    ///
459
    /// See [`FlowControl`] documentation for more details.
460
    ///
461
    /// [`FlowControl`]: ../struct.FlowControl.html
462
    /// [library level]: ../index.html#flow-control
463
0
    pub fn set_target_window_size(&mut self, size: u32) {
464
0
        assert!(size <= proto::MAX_WINDOW_SIZE);
465
0
        self.connection.set_target_window_size(size);
466
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_target_window_size
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::set_target_window_size
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_target_window_size
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_target_window_size
467
468
    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469
    /// flow control for received data.
470
    ///
471
    /// The `SETTINGS` will be sent to the remote, and only applied once the
472
    /// remote acknowledges the change.
473
    ///
474
    /// This can be used to increase or decrease the window size for existing
475
    /// streams.
476
    ///
477
    /// # Errors
478
    ///
479
    /// Returns an error if a previous call is still pending acknowledgement
480
    /// from the remote endpoint.
481
0
    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482
0
        assert!(size <= proto::MAX_WINDOW_SIZE);
483
0
        self.connection.set_initial_window_size(size)?;
484
0
        Ok(())
485
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_initial_window_size
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::set_initial_window_size
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_initial_window_size
Unexecuted instantiation: <h2::server::Connection<_, _>>::set_initial_window_size
486
487
    /// Enables the [extended CONNECT protocol].
488
    ///
489
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490
    ///
491
    /// # Errors
492
    ///
493
    /// Returns an error if a previous call is still pending acknowledgement
494
    /// from the remote endpoint.
495
0
    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496
0
        self.connection.set_enable_connect_protocol()?;
497
0
        Ok(())
498
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::enable_connect_protocol
Unexecuted instantiation: <h2::server::Connection<_, _>>::enable_connect_protocol
Unexecuted instantiation: <h2::server::Connection<_, _>>::enable_connect_protocol
499
500
    /// Returns `Ready` when the underlying connection has closed.
501
    ///
502
    /// If any new inbound streams are received during a call to `poll_closed`,
503
    /// they will be queued and returned on the next call to [`poll_accept`].
504
    ///
505
    /// This function will advance the internal connection state, driving
506
    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507
    ///
508
    /// See [here](index.html#managing-the-connection) for more details.
509
    ///
510
    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511
    /// [`RecvStream`]: ../struct.RecvStream.html
512
    /// [`SendStream`]: ../struct.SendStream.html
513
0
    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514
0
        self.connection.poll(cx).map_err(Into::into)
515
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_closed
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::poll_closed
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_closed
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_closed
516
517
    #[doc(hidden)]
518
    #[deprecated(note = "renamed to poll_closed")]
519
0
    pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
520
0
        self.poll_closed(cx)
521
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_close
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_close
Unexecuted instantiation: <h2::server::Connection<_, _>>::poll_close
522
523
    /// Sets the connection to a GOAWAY state.
524
    ///
525
    /// Does not terminate the connection. Must continue being polled to close
526
    /// connection.
527
    ///
528
    /// After flushing the GOAWAY frame, the connection is closed. Any
529
    /// outstanding streams do not prevent the connection from closing. This
530
    /// should usually be reserved for shutting down when something bad
531
    /// external to `h2` has happened, and open streams cannot be properly
532
    /// handled.
533
    ///
534
    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
535
0
    pub fn abrupt_shutdown(&mut self, reason: Reason) {
536
0
        self.connection.go_away_from_user(reason);
537
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::abrupt_shutdown
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::abrupt_shutdown
Unexecuted instantiation: <h2::server::Connection<_, _>>::abrupt_shutdown
Unexecuted instantiation: <h2::server::Connection<_, _>>::abrupt_shutdown
538
539
    /// Starts a [graceful shutdown][1] process.
540
    ///
541
    /// Must continue being polled to close connection.
542
    ///
543
    /// It's possible to receive more requests after calling this method, since
544
    /// they might have been in-flight from the client already. After about
545
    /// 1 RTT, no new requests should be accepted. Once all active streams
546
    /// have completed, the connection is closed.
547
    ///
548
    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
549
0
    pub fn graceful_shutdown(&mut self) {
550
0
        self.connection.go_away_gracefully();
551
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::graceful_shutdown
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::graceful_shutdown
Unexecuted instantiation: <h2::server::Connection<_, _>>::graceful_shutdown
Unexecuted instantiation: <h2::server::Connection<_, _>>::graceful_shutdown
552
553
    /// Takes a `PingPong` instance from the connection.
554
    ///
555
    /// # Note
556
    ///
557
    /// This may only be called once. Calling multiple times will return `None`.
558
0
    pub fn ping_pong(&mut self) -> Option<PingPong> {
559
0
        self.connection.take_user_pings().map(PingPong::new)
560
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::ping_pong
Unexecuted instantiation: <h2::server::Connection<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::ping_pong
Unexecuted instantiation: <h2::server::Connection<_, _>>::ping_pong
Unexecuted instantiation: <h2::server::Connection<_, _>>::ping_pong
561
562
    /// Returns the maximum number of concurrent streams that may be initiated
563
    /// by the server on this connection.
564
    ///
565
    /// This limit is configured by the client peer by sending the
566
    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
567
    /// This method returns the currently acknowledged value received from the
568
    /// remote.
569
    ///
570
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
571
0
    pub fn max_concurrent_send_streams(&self) -> usize {
572
0
        self.connection.max_send_streams()
573
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_send_streams
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_send_streams
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_send_streams
574
575
    /// Returns the maximum number of concurrent streams that may be initiated
576
    /// by the client on this connection.
577
    ///
578
    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
579
    /// parameter][1] sent in a `SETTINGS` frame that has been
580
    /// acknowledged by the remote peer. The value to be sent is configured by
581
    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
582
    /// with the remote peer.
583
    ///
584
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
585
    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
586
0
    pub fn max_concurrent_recv_streams(&self) -> usize {
587
0
        self.connection.max_recv_streams()
588
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_recv_streams
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_recv_streams
Unexecuted instantiation: <h2::server::Connection<_, _>>::max_concurrent_recv_streams
589
590
    // Could disappear at anytime.
591
    #[doc(hidden)]
592
    #[cfg(feature = "unstable")]
593
    pub fn num_wired_streams(&self) -> usize {
594
        self.connection.num_wired_streams()
595
    }
596
}
597
598
#[cfg(feature = "stream")]
599
impl<T, B> futures_core::Stream for Connection<T, B>
600
where
601
    T: AsyncRead + AsyncWrite + Unpin,
602
    B: Buf,
603
{
604
    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
605
606
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
607
        self.poll_accept(cx)
608
    }
609
}
610
611
impl<T, B> fmt::Debug for Connection<T, B>
612
where
613
    T: fmt::Debug,
614
    B: fmt::Debug + Buf,
615
{
616
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
617
0
        fmt.debug_struct("Connection")
618
0
            .field("connection", &self.connection)
619
0
            .finish()
620
0
    }
Unexecuted instantiation: <h2::server::Connection<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Connection<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Connection<_, _> as core::fmt::Debug>::fmt
621
}
622
623
// ===== impl Builder =====
624
625
impl Builder {
626
    /// Returns a new server builder instance initialized with default
627
    /// configuration values.
628
    ///
629
    /// Configuration methods can be chained on the return value.
630
    ///
631
    /// # Examples
632
    ///
633
    /// ```
634
    /// # use tokio::io::{AsyncRead, AsyncWrite};
635
    /// # use h2::server::*;
636
    /// #
637
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
638
    /// # -> Handshake<T>
639
    /// # {
640
    /// // `server_fut` is a future representing the completion of the HTTP/2
641
    /// // handshake.
642
    /// let server_fut = Builder::new()
643
    ///     .initial_window_size(1_000_000)
644
    ///     .max_concurrent_streams(1000)
645
    ///     .handshake(my_io);
646
    /// # server_fut
647
    /// # }
648
    /// #
649
    /// # pub fn main() {}
650
    /// ```
651
0
    pub fn new() -> Builder {
652
0
        Builder {
653
0
            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
654
0
            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
655
0
            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
656
0
            settings: Settings::default(),
657
0
            initial_target_connection_window_size: None,
658
0
            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
659
0
660
0
            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
661
0
        }
662
0
    }
Unexecuted instantiation: <h2::server::Builder>::new
Unexecuted instantiation: <h2::server::Builder>::new
Unexecuted instantiation: <h2::server::Builder>::new
663
664
    /// Indicates the initial window size (in octets) for stream-level
665
    /// flow control for received data.
666
    ///
667
    /// The initial window of a stream is used as part of flow control. For more
668
    /// details, see [`FlowControl`].
669
    ///
670
    /// The default value is 65,535.
671
    ///
672
    /// [`FlowControl`]: ../struct.FlowControl.html
673
    ///
674
    /// # Examples
675
    ///
676
    /// ```
677
    /// # use tokio::io::{AsyncRead, AsyncWrite};
678
    /// # use h2::server::*;
679
    /// #
680
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
681
    /// # -> Handshake<T>
682
    /// # {
683
    /// // `server_fut` is a future representing the completion of the HTTP/2
684
    /// // handshake.
685
    /// let server_fut = Builder::new()
686
    ///     .initial_window_size(1_000_000)
687
    ///     .handshake(my_io);
688
    /// # server_fut
689
    /// # }
690
    /// #
691
    /// # pub fn main() {}
692
    /// ```
693
0
    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
694
0
        self.settings.set_initial_window_size(Some(size));
695
0
        self
696
0
    }
Unexecuted instantiation: <h2::server::Builder>::initial_window_size
Unexecuted instantiation: <h2::server::Builder>::initial_window_size
Unexecuted instantiation: <h2::server::Builder>::initial_window_size
697
698
    /// Indicates the initial window size (in octets) for connection-level flow control
699
    /// for received data.
700
    ///
701
    /// The initial window of a connection is used as part of flow control. For more details,
702
    /// see [`FlowControl`].
703
    ///
704
    /// The default value is 65,535.
705
    ///
706
    /// [`FlowControl`]: ../struct.FlowControl.html
707
    ///
708
    /// # Examples
709
    ///
710
    /// ```
711
    /// # use tokio::io::{AsyncRead, AsyncWrite};
712
    /// # use h2::server::*;
713
    /// #
714
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
715
    /// # -> Handshake<T>
716
    /// # {
717
    /// // `server_fut` is a future representing the completion of the HTTP/2
718
    /// // handshake.
719
    /// let server_fut = Builder::new()
720
    ///     .initial_connection_window_size(1_000_000)
721
    ///     .handshake(my_io);
722
    /// # server_fut
723
    /// # }
724
    /// #
725
    /// # pub fn main() {}
726
    /// ```
727
0
    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
728
0
        self.initial_target_connection_window_size = Some(size);
729
0
        self
730
0
    }
Unexecuted instantiation: <h2::server::Builder>::initial_connection_window_size
Unexecuted instantiation: <h2::server::Builder>::initial_connection_window_size
Unexecuted instantiation: <h2::server::Builder>::initial_connection_window_size
731
732
    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
733
    /// configured server is able to accept.
734
    ///
735
    /// The sender may send data frames that are **smaller** than this value,
736
    /// but any data larger than `max` will be broken up into multiple `DATA`
737
    /// frames.
738
    ///
739
    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
740
    ///
741
    /// # Examples
742
    ///
743
    /// ```
744
    /// # use tokio::io::{AsyncRead, AsyncWrite};
745
    /// # use h2::server::*;
746
    /// #
747
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
748
    /// # -> Handshake<T>
749
    /// # {
750
    /// // `server_fut` is a future representing the completion of the HTTP/2
751
    /// // handshake.
752
    /// let server_fut = Builder::new()
753
    ///     .max_frame_size(1_000_000)
754
    ///     .handshake(my_io);
755
    /// # server_fut
756
    /// # }
757
    /// #
758
    /// # pub fn main() {}
759
    /// ```
760
    ///
761
    /// # Panics
762
    ///
763
    /// This function panics if `max` is not within the legal range specified
764
    /// above.
765
0
    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
766
0
        self.settings.set_max_frame_size(Some(max));
767
0
        self
768
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_frame_size
Unexecuted instantiation: <h2::server::Builder>::max_frame_size
Unexecuted instantiation: <h2::server::Builder>::max_frame_size
769
770
    /// Sets the max size of received header frames.
771
    ///
772
    /// This advisory setting informs a peer of the maximum size of header list
773
    /// that the sender is prepared to accept, in octets. The value is based on
774
    /// the uncompressed size of header fields, including the length of the name
775
    /// and value in octets plus an overhead of 32 octets for each header field.
776
    ///
777
    /// This setting is also used to limit the maximum amount of data that is
778
    /// buffered to decode HEADERS frames.
779
    ///
780
    /// # Examples
781
    ///
782
    /// ```
783
    /// # use tokio::io::{AsyncRead, AsyncWrite};
784
    /// # use h2::server::*;
785
    /// #
786
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787
    /// # -> Handshake<T>
788
    /// # {
789
    /// // `server_fut` is a future representing the completion of the HTTP/2
790
    /// // handshake.
791
    /// let server_fut = Builder::new()
792
    ///     .max_header_list_size(16 * 1024)
793
    ///     .handshake(my_io);
794
    /// # server_fut
795
    /// # }
796
    /// #
797
    /// # pub fn main() {}
798
    /// ```
799
0
    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800
0
        self.settings.set_max_header_list_size(Some(max));
801
0
        self
802
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_header_list_size
Unexecuted instantiation: <h2::server::Builder>::max_header_list_size
Unexecuted instantiation: <h2::server::Builder>::max_header_list_size
803
804
    /// Sets the maximum number of concurrent streams.
805
    ///
806
    /// The maximum concurrent streams setting only controls the maximum number
807
    /// of streams that can be initiated by the remote peer. In other words,
808
    /// when this setting is set to 100, this does not limit the number of
809
    /// concurrent streams that can be created by the caller.
810
    ///
811
    /// It is recommended that this value be no smaller than 100, so as to not
812
    /// unnecessarily limit parallelism. However, any value is legal, including
813
    /// 0. If `max` is set to 0, then the remote will not be permitted to
814
    /// initiate streams.
815
    ///
816
    /// Note that streams in the reserved state, i.e., push promises that have
817
    /// been reserved but the stream has not started, do not count against this
818
    /// setting.
819
    ///
820
    /// Also note that if the remote *does* exceed the value set here, it is not
821
    /// a protocol level error. Instead, the `h2` library will immediately reset
822
    /// the stream.
823
    ///
824
    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825
    ///
826
    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827
    ///
828
    /// # Examples
829
    ///
830
    /// ```
831
    /// # use tokio::io::{AsyncRead, AsyncWrite};
832
    /// # use h2::server::*;
833
    /// #
834
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
835
    /// # -> Handshake<T>
836
    /// # {
837
    /// // `server_fut` is a future representing the completion of the HTTP/2
838
    /// // handshake.
839
    /// let server_fut = Builder::new()
840
    ///     .max_concurrent_streams(1000)
841
    ///     .handshake(my_io);
842
    /// # server_fut
843
    /// # }
844
    /// #
845
    /// # pub fn main() {}
846
    /// ```
847
0
    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
848
0
        self.settings.set_max_concurrent_streams(Some(max));
849
0
        self
850
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_streams
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_streams
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_streams
851
852
    /// Sets the maximum number of concurrent locally reset streams.
853
    ///
854
    /// When a stream is explicitly reset by either calling
855
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
856
    /// before completing the stream, the HTTP/2 specification requires that
857
    /// any further frames received for that stream must be ignored for "some
858
    /// time".
859
    ///
860
    /// In order to satisfy the specification, internal state must be maintained
861
    /// to implement the behavior. This state grows linearly with the number of
862
    /// streams that are locally reset.
863
    ///
864
    /// The `max_concurrent_reset_streams` setting configures sets an upper
865
    /// bound on the amount of state that is maintained. When this max value is
866
    /// reached, the oldest reset stream is purged from memory.
867
    ///
868
    /// Once the stream has been fully purged from memory, any additional frames
869
    /// received for that stream will result in a connection level protocol
870
    /// error, forcing the connection to terminate.
871
    ///
872
    /// The default value is 10.
873
    ///
874
    /// # Examples
875
    ///
876
    /// ```
877
    /// # use tokio::io::{AsyncRead, AsyncWrite};
878
    /// # use h2::server::*;
879
    /// #
880
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
881
    /// # -> Handshake<T>
882
    /// # {
883
    /// // `server_fut` is a future representing the completion of the HTTP/2
884
    /// // handshake.
885
    /// let server_fut = Builder::new()
886
    ///     .max_concurrent_reset_streams(1000)
887
    ///     .handshake(my_io);
888
    /// # server_fut
889
    /// # }
890
    /// #
891
    /// # pub fn main() {}
892
    /// ```
893
0
    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
894
0
        self.reset_stream_max = max;
895
0
        self
896
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_concurrent_reset_streams
897
898
    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
899
    ///
900
    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
901
    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
902
    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
903
    ///
904
    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
905
    /// `ENHANCE_YOUR_CALM` to the client.
906
    ///
907
    /// If you really want to disable this, supply [`Option::None`] here.
908
    /// Disabling this is not recommended and may expose you to DOS attacks.
909
    ///
910
    /// The default value is currently 1024, but could change.
911
0
    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
912
0
        self.local_max_error_reset_streams = max;
913
0
        self
914
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_local_error_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_local_error_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_local_error_reset_streams
915
916
    /// Sets the maximum number of pending-accept remotely-reset streams.
917
    ///
918
    /// Streams that have been received by the peer, but not accepted by the
919
    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
920
    /// could send a request and then shortly after, realize it is not needed,
921
    /// sending a CANCEL.
922
    ///
923
    /// However, since those streams are now "closed", they don't count towards
924
    /// the max concurrent streams. So, they will sit in the accept queue,
925
    /// using memory.
926
    ///
927
    /// When the number of remotely-reset streams sitting in the pending-accept
928
    /// queue reaches this maximum value, a connection error with the code of
929
    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
930
    /// `Future`.
931
    ///
932
    /// The default value is currently 20, but could change.
933
    ///
934
    /// # Examples
935
    ///
936
    ///
937
    /// ```
938
    /// # use tokio::io::{AsyncRead, AsyncWrite};
939
    /// # use h2::server::*;
940
    /// #
941
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
942
    /// # -> Handshake<T>
943
    /// # {
944
    /// // `server_fut` is a future representing the completion of the HTTP/2
945
    /// // handshake.
946
    /// let server_fut = Builder::new()
947
    ///     .max_pending_accept_reset_streams(100)
948
    ///     .handshake(my_io);
949
    /// # server_fut
950
    /// # }
951
    /// #
952
    /// # pub fn main() {}
953
    /// ```
954
0
    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
955
0
        self.pending_accept_reset_stream_max = max;
956
0
        self
957
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_pending_accept_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_pending_accept_reset_streams
Unexecuted instantiation: <h2::server::Builder>::max_pending_accept_reset_streams
958
959
    /// Sets the maximum send buffer size per stream.
960
    ///
961
    /// Once a stream has buffered up to (or over) the maximum, the stream's
962
    /// flow control will not "poll" additional capacity. Once bytes for the
963
    /// stream have been written to the connection, the send buffer capacity
964
    /// will be freed up again.
965
    ///
966
    /// The default is currently ~400KB, but may change.
967
    ///
968
    /// # Panics
969
    ///
970
    /// This function panics if `max` is larger than `u32::MAX`.
971
0
    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
972
0
        assert!(max <= std::u32::MAX as usize);
973
0
        self.max_send_buffer_size = max;
974
0
        self
975
0
    }
Unexecuted instantiation: <h2::server::Builder>::max_send_buffer_size
Unexecuted instantiation: <h2::server::Builder>::max_send_buffer_size
Unexecuted instantiation: <h2::server::Builder>::max_send_buffer_size
976
977
    /// Sets the maximum number of concurrent locally reset streams.
978
    ///
979
    /// When a stream is explicitly reset by either calling
980
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
981
    /// before completing the stream, the HTTP/2 specification requires that
982
    /// any further frames received for that stream must be ignored for "some
983
    /// time".
984
    ///
985
    /// In order to satisfy the specification, internal state must be maintained
986
    /// to implement the behavior. This state grows linearly with the number of
987
    /// streams that are locally reset.
988
    ///
989
    /// The `reset_stream_duration` setting configures the max amount of time
990
    /// this state will be maintained in memory. Once the duration elapses, the
991
    /// stream state is purged from memory.
992
    ///
993
    /// Once the stream has been fully purged from memory, any additional frames
994
    /// received for that stream will result in a connection level protocol
995
    /// error, forcing the connection to terminate.
996
    ///
997
    /// The default value is 30 seconds.
998
    ///
999
    /// # Examples
1000
    ///
1001
    /// ```
1002
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1003
    /// # use h2::server::*;
1004
    /// # use std::time::Duration;
1005
    /// #
1006
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1007
    /// # -> Handshake<T>
1008
    /// # {
1009
    /// // `server_fut` is a future representing the completion of the HTTP/2
1010
    /// // handshake.
1011
    /// let server_fut = Builder::new()
1012
    ///     .reset_stream_duration(Duration::from_secs(10))
1013
    ///     .handshake(my_io);
1014
    /// # server_fut
1015
    /// # }
1016
    /// #
1017
    /// # pub fn main() {}
1018
    /// ```
1019
0
    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1020
0
        self.reset_stream_duration = dur;
1021
0
        self
1022
0
    }
Unexecuted instantiation: <h2::server::Builder>::reset_stream_duration
Unexecuted instantiation: <h2::server::Builder>::reset_stream_duration
Unexecuted instantiation: <h2::server::Builder>::reset_stream_duration
1023
1024
    /// Enables the [extended CONNECT protocol].
1025
    ///
1026
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1027
0
    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1028
0
        self.settings.set_enable_connect_protocol(Some(1));
1029
0
        self
1030
0
    }
Unexecuted instantiation: <h2::server::Builder>::enable_connect_protocol
Unexecuted instantiation: <h2::server::Builder>::enable_connect_protocol
Unexecuted instantiation: <h2::server::Builder>::enable_connect_protocol
1031
1032
    /// Creates a new configured HTTP/2 server backed by `io`.
1033
    ///
1034
    /// It is expected that `io` already be in an appropriate state to commence
1035
    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1036
    ///
1037
    /// Returns a future which resolves to the [`Connection`] instance once the
1038
    /// HTTP/2 handshake has been completed.
1039
    ///
1040
    /// This function also allows the caller to configure the send payload data
1041
    /// type. See [Outbound data type] for more details.
1042
    ///
1043
    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1044
    /// [Handshake]: ../index.html#handshake
1045
    /// [`Connection`]: struct.Connection.html
1046
    /// [Outbound data type]: ../index.html#outbound-data-type.
1047
    ///
1048
    /// # Examples
1049
    ///
1050
    /// Basic usage:
1051
    ///
1052
    /// ```
1053
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1054
    /// # use h2::server::*;
1055
    /// #
1056
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1057
    /// # -> Handshake<T>
1058
    /// # {
1059
    /// // `server_fut` is a future representing the completion of the HTTP/2
1060
    /// // handshake.
1061
    /// let server_fut = Builder::new()
1062
    ///     .handshake(my_io);
1063
    /// # server_fut
1064
    /// # }
1065
    /// #
1066
    /// # pub fn main() {}
1067
    /// ```
1068
    ///
1069
    /// Configures the send-payload data type. In this case, the outbound data
1070
    /// type will be `&'static [u8]`.
1071
    ///
1072
    /// ```
1073
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1074
    /// # use h2::server::*;
1075
    /// #
1076
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1077
    /// # -> Handshake<T, &'static [u8]>
1078
    /// # {
1079
    /// // `server_fut` is a future representing the completion of the HTTP/2
1080
    /// // handshake.
1081
    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1082
    ///     .handshake(my_io);
1083
    /// # server_fut
1084
    /// # }
1085
    /// #
1086
    /// # pub fn main() {}
1087
    /// ```
1088
0
    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1089
0
    where
1090
0
        T: AsyncRead + AsyncWrite + Unpin,
1091
0
        B: Buf,
1092
0
    {
1093
0
        Connection::handshake2(io, self.clone())
1094
0
    }
Unexecuted instantiation: <h2::server::Builder>::handshake::<_, _>
Unexecuted instantiation: <h2::server::Builder>::handshake::<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>
Unexecuted instantiation: <h2::server::Builder>::handshake::<_, _>
Unexecuted instantiation: <h2::server::Builder>::handshake::<_, _>
1095
}
1096
1097
impl Default for Builder {
1098
0
    fn default() -> Builder {
1099
0
        Builder::new()
1100
0
    }
Unexecuted instantiation: <h2::server::Builder as core::default::Default>::default
Unexecuted instantiation: <h2::server::Builder as core::default::Default>::default
Unexecuted instantiation: <h2::server::Builder as core::default::Default>::default
1101
}
1102
1103
// ===== impl SendResponse =====
1104
1105
impl<B: Buf> SendResponse<B> {
1106
    /// Send a response to a client request.
1107
    ///
1108
    /// On success, a [`SendStream`] instance is returned. This instance can be
1109
    /// used to stream the response body and send trailers.
1110
    ///
1111
    /// If a body or trailers will be sent on the returned [`SendStream`]
1112
    /// instance, then `end_of_stream` must be set to `false` when calling this
1113
    /// function.
1114
    ///
1115
    /// The [`SendResponse`] instance is already associated with a received
1116
    /// request.  This function may only be called once per instance and only if
1117
    /// [`send_reset`] has not been previously called.
1118
    ///
1119
    /// [`SendResponse`]: #
1120
    /// [`SendStream`]: ../struct.SendStream.html
1121
    /// [`send_reset`]: #method.send_reset
1122
0
    pub fn send_response(
1123
0
        &mut self,
1124
0
        response: Response<()>,
1125
0
        end_of_stream: bool,
1126
0
    ) -> Result<SendStream<B>, crate::Error> {
1127
0
        self.inner
1128
0
            .send_response(response, end_of_stream)
1129
0
            .map(|_| SendStream::new(self.inner.clone()))
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response::{closure#0}
Unexecuted instantiation: <h2::server::SendResponse<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::send_response::{closure#0}
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response::{closure#0}
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response::{closure#0}
1130
0
            .map_err(Into::into)
1131
0
    }
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response
Unexecuted instantiation: <h2::server::SendResponse<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::send_response
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_response
1132
1133
    /// Push a request and response to the client
1134
    ///
1135
    /// On success, a [`SendResponse`] instance is returned.
1136
    ///
1137
    /// [`SendResponse`]: #
1138
0
    pub fn push_request(
1139
0
        &mut self,
1140
0
        request: Request<()>,
1141
0
    ) -> Result<SendPushedResponse<B>, crate::Error> {
1142
0
        self.inner
1143
0
            .send_push_promise(request)
1144
0
            .map(|inner| SendPushedResponse {
1145
0
                inner: SendResponse { inner },
1146
0
            })
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request::{closure#0}
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request::{closure#0}
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request::{closure#0}
1147
0
            .map_err(Into::into)
1148
0
    }
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request
Unexecuted instantiation: <h2::server::SendResponse<_>>::push_request
1149
1150
    /// Send a stream reset to the peer.
1151
    ///
1152
    /// This essentially cancels the stream, including any inbound or outbound
1153
    /// data streams.
1154
    ///
1155
    /// If this function is called before [`send_response`], a call to
1156
    /// [`send_response`] will result in an error.
1157
    ///
1158
    /// If this function is called while a [`SendStream`] instance is active,
1159
    /// any further use of the instance will result in an error.
1160
    ///
1161
    /// This function should only be called once.
1162
    ///
1163
    /// [`send_response`]: #method.send_response
1164
    /// [`SendStream`]: ../struct.SendStream.html
1165
0
    pub fn send_reset(&mut self, reason: Reason) {
1166
0
        self.inner.send_reset(reason)
1167
0
    }
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_reset
Unexecuted instantiation: <h2::server::SendResponse<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::send_reset
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_reset
Unexecuted instantiation: <h2::server::SendResponse<_>>::send_reset
1168
1169
    /// Polls to be notified when the client resets this stream.
1170
    ///
1171
    /// If stream is still open, this returns `Poll::Pending`, and
1172
    /// registers the task to be notified if a `RST_STREAM` is received.
1173
    ///
1174
    /// If a `RST_STREAM` frame is received for this stream, calling this
1175
    /// method will yield the `Reason` for the reset.
1176
    ///
1177
    /// # Error
1178
    ///
1179
    /// Calling this method after having called `send_response` will return
1180
    /// a user error.
1181
0
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1182
0
        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1183
0
    }
Unexecuted instantiation: <h2::server::SendResponse<_>>::poll_reset
Unexecuted instantiation: <h2::server::SendResponse<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>::poll_reset
Unexecuted instantiation: <h2::server::SendResponse<_>>::poll_reset
Unexecuted instantiation: <h2::server::SendResponse<_>>::poll_reset
1184
1185
    /// Returns the stream ID of the response stream.
1186
    ///
1187
    /// # Panics
1188
    ///
1189
    /// If the lock on the stream store has been poisoned.
1190
0
    pub fn stream_id(&self) -> crate::StreamId {
1191
0
        crate::StreamId::from_internal(self.inner.stream_id())
1192
0
    }
Unexecuted instantiation: <h2::server::SendResponse<_>>::stream_id
Unexecuted instantiation: <h2::server::SendResponse<_>>::stream_id
Unexecuted instantiation: <h2::server::SendResponse<_>>::stream_id
1193
}
1194
1195
// ===== impl SendPushedResponse =====
1196
1197
impl<B: Buf> SendPushedResponse<B> {
1198
    /// Send a response to a promised request.
1199
    ///
1200
    /// On success, a [`SendStream`] instance is returned. This instance can be
1201
    /// used to stream the response body and send trailers.
1202
    ///
1203
    /// If a body or trailers will be sent on the returned [`SendStream`]
1204
    /// instance, then `end_of_stream` must be set to `false` when calling this
1205
    /// function.
1206
    ///
1207
    /// The [`SendPushedResponse`] instance is associated with a promised
1208
    /// request.  This function may only be called once per instance and only if
1209
    /// [`send_reset`] has not been previously called.
1210
    ///
1211
    /// [`SendPushedResponse`]: #
1212
    /// [`SendStream`]: ../struct.SendStream.html
1213
    /// [`send_reset`]: #method.send_reset
1214
0
    pub fn send_response(
1215
0
        &mut self,
1216
0
        response: Response<()>,
1217
0
        end_of_stream: bool,
1218
0
    ) -> Result<SendStream<B>, crate::Error> {
1219
0
        self.inner.send_response(response, end_of_stream)
1220
0
    }
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_response
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_response
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_response
1221
1222
    /// Send a stream reset to the peer.
1223
    ///
1224
    /// This essentially cancels the stream, including any inbound or outbound
1225
    /// data streams.
1226
    ///
1227
    /// If this function is called before [`send_response`], a call to
1228
    /// [`send_response`] will result in an error.
1229
    ///
1230
    /// If this function is called while a [`SendStream`] instance is active,
1231
    /// any further use of the instance will result in an error.
1232
    ///
1233
    /// This function should only be called once.
1234
    ///
1235
    /// [`send_response`]: #method.send_response
1236
    /// [`SendStream`]: ../struct.SendStream.html
1237
0
    pub fn send_reset(&mut self, reason: Reason) {
1238
0
        self.inner.send_reset(reason)
1239
0
    }
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_reset
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_reset
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::send_reset
1240
1241
    /// Polls to be notified when the client resets this stream.
1242
    ///
1243
    /// If stream is still open, this returns `Poll::Pending`, and
1244
    /// registers the task to be notified if a `RST_STREAM` is received.
1245
    ///
1246
    /// If a `RST_STREAM` frame is received for this stream, calling this
1247
    /// method will yield the `Reason` for the reset.
1248
    ///
1249
    /// # Error
1250
    ///
1251
    /// Calling this method after having called `send_response` will return
1252
    /// a user error.
1253
0
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1254
0
        self.inner.poll_reset(cx)
1255
0
    }
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::poll_reset
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::poll_reset
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::poll_reset
1256
1257
    /// Returns the stream ID of the response stream.
1258
    ///
1259
    /// # Panics
1260
    ///
1261
    /// If the lock on the stream store has been poisoned.
1262
0
    pub fn stream_id(&self) -> crate::StreamId {
1263
0
        self.inner.stream_id()
1264
0
    }
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::stream_id
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::stream_id
Unexecuted instantiation: <h2::server::SendPushedResponse<_>>::stream_id
1265
}
1266
1267
// ===== impl Flush =====
1268
1269
impl<T, B: Buf> Flush<T, B> {
1270
0
    fn new(codec: Codec<T, B>) -> Self {
1271
0
        Flush { codec: Some(codec) }
1272
0
    }
Unexecuted instantiation: <h2::server::Flush<_, _>>::new
Unexecuted instantiation: <h2::server::Flush<tokio::io::util::mem::DuplexStream, h2::proto::streams::prioritize::Prioritized<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>>::new
Unexecuted instantiation: <h2::server::Flush<_, _>>::new
Unexecuted instantiation: <h2::server::Flush<_, _>>::new
1273
}
1274
1275
impl<T, B> Future for Flush<T, B>
1276
where
1277
    T: AsyncWrite + Unpin,
1278
    B: Buf,
1279
{
1280
    type Output = Result<Codec<T, B>, crate::Error>;
1281
1282
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1283
        // Flush the codec
1284
0
        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1285
1286
        // Return the codec
1287
0
        Poll::Ready(Ok(self.codec.take().unwrap()))
1288
0
    }
Unexecuted instantiation: <h2::server::Flush<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Flush<tokio::io::util::mem::DuplexStream, h2::proto::streams::prioritize::Prioritized<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Flush<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Flush<_, _> as core::future::future::Future>::poll
1289
}
1290
1291
impl<T, B: Buf> ReadPreface<T, B> {
1292
0
    fn new(codec: Codec<T, B>) -> Self {
1293
0
        ReadPreface {
1294
0
            codec: Some(codec),
1295
0
            pos: 0,
1296
0
        }
1297
0
    }
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::new
Unexecuted instantiation: <h2::server::ReadPreface<tokio::io::util::mem::DuplexStream, h2::proto::streams::prioritize::Prioritized<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>>::new
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::new
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::new
1298
1299
0
    fn inner_mut(&mut self) -> &mut T {
1300
0
        self.codec.as_mut().unwrap().get_mut()
1301
0
    }
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::inner_mut
Unexecuted instantiation: <h2::server::ReadPreface<tokio::io::util::mem::DuplexStream, h2::proto::streams::prioritize::Prioritized<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>>>::inner_mut
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::inner_mut
Unexecuted instantiation: <h2::server::ReadPreface<_, _>>::inner_mut
1302
}
1303
1304
impl<T, B> Future for ReadPreface<T, B>
1305
where
1306
    T: AsyncRead + Unpin,
1307
    B: Buf,
1308
{
1309
    type Output = Result<Codec<T, B>, crate::Error>;
1310
1311
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1312
0
        let mut buf = [0; 24];
1313
0
        let mut rem = PREFACE.len() - self.pos;
1314
1315
0
        while rem > 0 {
1316
0
            let mut buf = ReadBuf::new(&mut buf[..rem]);
1317
0
            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1318
0
                .map_err(crate::Error::from_io)?;
1319
0
            let n = buf.filled().len();
1320
0
            if n == 0 {
1321
0
                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1322
0
                    io::ErrorKind::UnexpectedEof,
1323
0
                    "connection closed before reading preface",
1324
0
                ))));
1325
0
            }
1326
0
1327
0
            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1328
0
                proto_err!(conn: "read_preface: invalid preface");
1329
                // TODO: Should this just write the GO_AWAY frame directly?
1330
0
                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1331
0
            }
1332
0
1333
0
            self.pos += n;
1334
0
            rem -= n; // TODO test
1335
        }
1336
1337
0
        Poll::Ready(Ok(self.codec.take().unwrap()))
1338
0
    }
Unexecuted instantiation: <h2::server::ReadPreface<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::ReadPreface<tokio::io::util::mem::DuplexStream, h2::proto::streams::prioritize::Prioritized<hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>>> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::ReadPreface<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::ReadPreface<_, _> as core::future::future::Future>::poll
1339
}
1340
1341
// ===== impl Handshake =====
1342
1343
impl<T, B: Buf> Future for Handshake<T, B>
1344
where
1345
    T: AsyncRead + AsyncWrite + Unpin,
1346
    B: Buf,
1347
{
1348
    type Output = Result<Connection<T, B>, crate::Error>;
1349
1350
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1351
0
        let span = self.span.clone(); // XXX(eliza): T_T
1352
0
        let _e = span.enter();
1353
0
        tracing::trace!(state = ?self.state);
1354
1355
        loop {
1356
0
            match &mut self.state {
1357
0
                Handshaking::Flushing(flush) => {
1358
                    // We're currently flushing a pending SETTINGS frame. Poll the
1359
                    // flush future, and, if it's completed, advance our state to wait
1360
                    // for the client preface.
1361
0
                    let codec = match Pin::new(flush).poll(cx)? {
1362
                        Poll::Pending => {
1363
0
                            tracing::trace!(flush.poll = %"Pending");
1364
0
                            return Poll::Pending;
1365
                        }
1366
0
                        Poll::Ready(flushed) => {
1367
0
                            tracing::trace!(flush.poll = %"Ready");
1368
0
                            flushed
1369
                        }
1370
                    };
1371
0
                    self.state = Handshaking::ReadingPreface(
1372
0
                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1373
                    );
1374
                }
1375
0
                Handshaking::ReadingPreface(read) => {
1376
0
                    let codec = ready!(Pin::new(read).poll(cx)?);
1377
1378
0
                    self.state = Handshaking::Done;
1379
0
1380
0
                    let connection = proto::Connection::new(
1381
0
                        codec,
1382
0
                        Config {
1383
0
                            next_stream_id: 2.into(),
1384
0
                            // Server does not need to locally initiate any streams
1385
0
                            initial_max_send_streams: 0,
1386
0
                            max_send_buffer_size: self.builder.max_send_buffer_size,
1387
0
                            reset_stream_duration: self.builder.reset_stream_duration,
1388
0
                            reset_stream_max: self.builder.reset_stream_max,
1389
0
                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1390
0
                            local_error_reset_streams_max: self
1391
0
                                .builder
1392
0
                                .local_max_error_reset_streams,
1393
0
                            settings: self.builder.settings.clone(),
1394
0
                        },
1395
0
                    );
1396
0
1397
0
                    tracing::trace!("connection established!");
1398
0
                    let mut c = Connection { connection };
1399
0
                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1400
0
                        c.set_target_window_size(sz);
1401
0
                    }
1402
1403
0
                    return Poll::Ready(Ok(c));
1404
                }
1405
                Handshaking::Done => {
1406
0
                    panic!("Handshaking::poll() called again after handshaking was complete")
1407
                }
1408
            }
1409
        }
1410
0
    }
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Handshake<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::future::future::Future>::poll
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::future::future::Future>::poll
1411
}
1412
1413
impl<T, B> fmt::Debug for Handshake<T, B>
1414
where
1415
    T: AsyncRead + AsyncWrite + fmt::Debug,
1416
    B: fmt::Debug + Buf,
1417
{
1418
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1419
0
        write!(fmt, "server::Handshake")
1420
0
    }
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Handshake<_, _> as core::fmt::Debug>::fmt
1421
}
1422
1423
impl Peer {
1424
0
    pub fn convert_send_message(
1425
0
        id: StreamId,
1426
0
        response: Response<()>,
1427
0
        end_of_stream: bool,
1428
0
    ) -> frame::Headers {
1429
        use http::response::Parts;
1430
1431
        // Extract the components of the HTTP request
1432
        let (
1433
            Parts {
1434
0
                status, headers, ..
1435
0
            },
1436
0
            _,
1437
0
        ) = response.into_parts();
1438
0
1439
0
        // Build the set pseudo header set. All requests will include `method`
1440
0
        // and `path`.
1441
0
        let pseudo = Pseudo::response(status);
1442
0
1443
0
        // Create the HEADERS frame
1444
0
        let mut frame = frame::Headers::new(id, pseudo, headers);
1445
0
1446
0
        if end_of_stream {
1447
0
            frame.set_end_stream()
1448
0
        }
1449
1450
0
        frame
1451
0
    }
Unexecuted instantiation: <h2::server::Peer>::convert_send_message
Unexecuted instantiation: <h2::server::Peer>::convert_send_message
Unexecuted instantiation: <h2::server::Peer>::convert_send_message
1452
1453
0
    pub fn convert_push_message(
1454
0
        stream_id: StreamId,
1455
0
        promised_id: StreamId,
1456
0
        request: Request<()>,
1457
0
    ) -> Result<frame::PushPromise, UserError> {
1458
        use http::request::Parts;
1459
1460
0
        if let Err(e) = frame::PushPromise::validate_request(&request) {
1461
            use PushPromiseHeaderError::*;
1462
0
            match e {
1463
0
                NotSafeAndCacheable => tracing::debug!(
1464
                    ?promised_id,
1465
0
                    "convert_push_message: method {} is not safe and cacheable",
1466
0
                    request.method(),
1467
                ),
1468
0
                InvalidContentLength(e) => tracing::debug!(
1469
                    ?promised_id,
1470
0
                    "convert_push_message; promised request has invalid content-length {:?}",
1471
                    e,
1472
                ),
1473
            }
1474
0
            return Err(UserError::MalformedHeaders);
1475
0
        }
1476
0
1477
0
        // Extract the components of the HTTP request
1478
0
        let (
1479
0
            Parts {
1480
0
                method,
1481
0
                uri,
1482
0
                headers,
1483
0
                ..
1484
0
            },
1485
0
            _,
1486
0
        ) = request.into_parts();
1487
0
1488
0
        let pseudo = Pseudo::request(method, uri, None);
1489
0
1490
0
        Ok(frame::PushPromise::new(
1491
0
            stream_id,
1492
0
            promised_id,
1493
0
            pseudo,
1494
0
            headers,
1495
0
        ))
1496
0
    }
Unexecuted instantiation: <h2::server::Peer>::convert_push_message
Unexecuted instantiation: <h2::server::Peer>::convert_push_message
Unexecuted instantiation: <h2::server::Peer>::convert_push_message
1497
}
1498
1499
impl proto::Peer for Peer {
1500
    type Poll = Request<()>;
1501
1502
    const NAME: &'static str = "Server";
1503
1504
    /*
1505
    fn is_server() -> bool {
1506
        true
1507
    }
1508
    */
1509
1510
0
    fn r#dyn() -> proto::DynPeer {
1511
0
        proto::DynPeer::Server
1512
0
    }
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::dyn
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::dyn
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::dyn
1513
1514
0
    fn convert_poll_message(
1515
0
        pseudo: Pseudo,
1516
0
        fields: HeaderMap,
1517
0
        stream_id: StreamId,
1518
0
    ) -> Result<Self::Poll, Error> {
1519
        use http::{uri, Version};
1520
1521
0
        let mut b = Request::builder();
1522
1523
        macro_rules! malformed {
1524
            ($($arg:tt)*) => {{
1525
                tracing::debug!($($arg)*);
1526
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1527
            }}
1528
        }
1529
1530
0
        b = b.version(Version::HTTP_2);
1531
1532
        let is_connect;
1533
0
        if let Some(method) = pseudo.method {
1534
0
            is_connect = method == Method::CONNECT;
1535
0
            b = b.method(method);
1536
0
        } else {
1537
0
            malformed!("malformed headers: missing method");
1538
        }
1539
1540
0
        let has_protocol = pseudo.protocol.is_some();
1541
0
        if has_protocol {
1542
0
            if is_connect {
1543
0
                // Assert that we have the right type.
1544
0
                b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1545
0
            } else {
1546
0
                malformed!("malformed headers: :protocol on non-CONNECT request");
1547
            }
1548
0
        }
1549
1550
0
        if pseudo.status.is_some() {
1551
0
            malformed!("malformed headers: :status field on request");
1552
0
        }
1553
0
1554
0
        // Convert the URI
1555
0
        let mut parts = uri::Parts::default();
1556
1557
        // A request translated from HTTP/1 must not include the :authority
1558
        // header
1559
0
        if let Some(authority) = pseudo.authority {
1560
0
            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1561
0
            parts.authority = Some(maybe_authority.or_else(|why| {
1562
0
                malformed!(
1563
0
                    "malformed headers: malformed authority ({:?}): {}",
1564
                    authority,
1565
                    why,
1566
                )
1567
0
            })?);
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#0}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#0}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#0}
1568
0
        }
1569
1570
        // A :scheme is required, except CONNECT.
1571
0
        if let Some(scheme) = pseudo.scheme {
1572
0
            if is_connect && !has_protocol {
1573
0
                malformed!("malformed headers: :scheme in CONNECT");
1574
0
            }
1575
0
            let maybe_scheme = scheme.parse();
1576
0
            let scheme = maybe_scheme.or_else(|why| {
1577
0
                malformed!(
1578
0
                    "malformed headers: malformed scheme ({:?}): {}",
1579
                    scheme,
1580
                    why,
1581
                )
1582
0
            })?;
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#1}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#1}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#1}
1583
1584
            // It's not possible to build an `Uri` from a scheme and path. So,
1585
            // after validating is was a valid scheme, we just have to drop it
1586
            // if there isn't an :authority.
1587
0
            if parts.authority.is_some() {
1588
0
                parts.scheme = Some(scheme);
1589
0
            }
1590
0
        } else if !is_connect || has_protocol {
1591
0
            malformed!("malformed headers: missing scheme");
1592
0
        }
1593
1594
0
        if let Some(path) = pseudo.path {
1595
0
            if is_connect && !has_protocol {
1596
0
                malformed!("malformed headers: :path in CONNECT");
1597
0
            }
1598
0
1599
0
            // This cannot be empty
1600
0
            if path.is_empty() {
1601
0
                malformed!("malformed headers: missing path");
1602
0
            }
1603
0
1604
0
            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1605
0
            parts.path_and_query = Some(maybe_path.or_else(|why| {
1606
0
                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1607
0
            })?);
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#2}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#2}
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message::{closure#2}
1608
0
        } else if is_connect && has_protocol {
1609
0
            malformed!("malformed headers: missing path in extended CONNECT");
1610
0
        }
1611
1612
0
        b = b.uri(parts);
1613
1614
0
        let mut request = match b.body(()) {
1615
0
            Ok(request) => request,
1616
0
            Err(e) => {
1617
0
                // TODO: Should there be more specialized handling for different
1618
0
                // kinds of errors
1619
0
                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1620
0
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1621
            }
1622
        };
1623
1624
0
        *request.headers_mut() = fields;
1625
0
1626
0
        Ok(request)
1627
0
    }
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message
Unexecuted instantiation: <h2::server::Peer as h2::proto::peer::Peer>::convert_poll_message
1628
}
1629
1630
// ===== impl Handshaking =====
1631
1632
impl<T, B> fmt::Debug for Handshaking<T, B>
1633
where
1634
    B: Buf,
1635
{
1636
    #[inline]
1637
0
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1638
0
        match *self {
1639
0
            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1640
0
            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1641
0
            Handshaking::Done => f.write_str("Done"),
1642
        }
1643
0
    }
Unexecuted instantiation: <h2::server::Handshaking<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Handshaking<tokio::io::util::mem::DuplexStream, hyper::proto::h2::SendBuf<linkerd_http_box::body::Data>> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Handshaking<_, _> as core::fmt::Debug>::fmt
Unexecuted instantiation: <h2::server::Handshaking<_, _> as core::fmt::Debug>::fmt
1644
}