Coverage Report

Created: 2025-10-10 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/server.rs
Line
Count
Source
1
//! Server implementation of the HTTP/2 protocol.
2
//!
3
//! # Getting started
4
//!
5
//! Running an HTTP/2 server requires the caller to manage accepting the
6
//! connections as well as getting the connections to a state that is ready to
7
//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8
//! details.
9
//!
10
//! This could be as basic as using Tokio's [`TcpListener`] to accept
11
//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12
//! upgrades.
13
//!
14
//! Once a connection is obtained, it is passed to [`handshake`],
15
//! which will begin the [HTTP/2 handshake]. This returns a future that
16
//! completes once the handshake process is performed and HTTP/2 streams may
17
//! be received.
18
//!
19
//! [`handshake`] uses default configuration values. There are a number of
20
//! settings that can be changed by using [`Builder`] instead.
21
//!
22
//! # Inbound streams
23
//!
24
//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25
//! does this by implementing [`futures::Stream`]. When a new stream is
26
//! received, a call to [`Connection::accept`] will return `(request, response)`.
27
//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28
//! HTTP request head as well as provides a way to receive the inbound data
29
//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30
//! allows responding to the request, stream the response payload, send
31
//! trailers, and send push promises.
32
//!
33
//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34
//! can be operated independently.
35
//!
36
//! # Managing the connection
37
//!
38
//! The [`Connection`] instance is used to manage connection state. The caller
39
//! is required to call either [`Connection::accept`] or
40
//! [`Connection::poll_close`] in order to advance the connection state. Simply
41
//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42
//! connection state is advanced.
43
//!
44
//! It is not required to call **both** [`Connection::accept`] and
45
//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46
//! then only [`Connection::accept`] should be called. When the caller **does
47
//! not** want to accept a new stream, [`Connection::poll_close`] should be
48
//! called.
49
//!
50
//! The [`Connection`] instance should only be dropped once
51
//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52
//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53
//! this point, only [`Connection::poll_close`] should be called.
54
//!
55
//! # Shutting down the server
56
//!
57
//! Graceful shutdown of the server is [not yet
58
//! implemented](https://github.com/hyperium/h2/issues/69).
59
//!
60
//! # Example
61
//!
62
//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63
//! knowledge], i.e. both the client and the server assume that the TCP socket
64
//! will use the HTTP/2 protocol without prior negotiation.
65
//!
66
//! ```no_run
67
//! use h2::server;
68
//! use http::{Response, StatusCode};
69
//! use tokio::net::TcpListener;
70
//!
71
//! #[tokio::main]
72
//! pub async fn main() {
73
//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74
//!
75
//!     // Accept all incoming TCP connections.
76
//!     loop {
77
//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78
//!             // Spawn a new task to process each connection.
79
//!             tokio::spawn(async {
80
//!                 // Start the HTTP/2 connection handshake
81
//!                 let mut h2 = server::handshake(socket).await.unwrap();
82
//!                 // Accept all inbound HTTP/2 streams sent over the
83
//!                 // connection.
84
//!                 while let Some(request) = h2.accept().await {
85
//!                     let (request, mut respond) = request.unwrap();
86
//!                     println!("Received request: {:?}", request);
87
//!
88
//!                     // Build a response with no body
89
//!                     let response = Response::builder()
90
//!                         .status(StatusCode::OK)
91
//!                         .body(())
92
//!                         .unwrap();
93
//!
94
//!                     // Send the response back to the client
95
//!                     respond.send_response(response, true)
96
//!                         .unwrap();
97
//!                 }
98
//!
99
//!             });
100
//!         }
101
//!     }
102
//! }
103
//! ```
104
//!
105
//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106
//! [`handshake`]: fn.handshake.html
107
//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108
//! [`Builder`]: struct.Builder.html
109
//! [`Connection`]: struct.Connection.html
110
//! [`Connection::poll`]: struct.Connection.html#method.poll
111
//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112
//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113
//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114
//! [`RecvStream`]: ../struct.RecvStream.html
115
//! [`SendStream`]: ../struct.SendStream.html
116
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118
use crate::codec::{Codec, UserError};
119
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120
use crate::proto::{self, Config, Error, Prioritized};
121
use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123
use bytes::{Buf, Bytes};
124
use http::{HeaderMap, Method, Request, Response};
125
use std::future::Future;
126
use std::pin::Pin;
127
use std::task::{Context, Poll};
128
use std::time::Duration;
129
use std::{fmt, io};
130
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131
use tracing::instrument::{Instrument, Instrumented};
132
133
/// In progress HTTP/2 connection handshake future.
134
///
135
/// This type implements `Future`, yielding a `Connection` instance once the
136
/// handshake has completed.
137
///
138
/// The handshake is completed once the connection preface is fully received
139
/// from the client **and** the initial settings frame is sent to the client.
140
///
141
/// The handshake future does not wait for the initial settings frame from the
142
/// client.
143
///
144
/// See [module] level docs for more details.
145
///
146
/// [module]: index.html
147
#[must_use = "futures do nothing unless polled"]
148
pub struct Handshake<T, B: Buf = Bytes> {
149
    /// The config to pass to Connection::new after handshake succeeds.
150
    builder: Builder,
151
    /// The current state of the handshake.
152
    state: Handshaking<T, B>,
153
    /// Span tracking the handshake
154
    span: tracing::Span,
155
}
156
157
/// Accepts inbound HTTP/2 streams on a connection.
158
///
159
/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160
/// implements the HTTP/2 server logic for that connection. It is responsible
161
/// for receiving inbound streams initiated by the client as well as driving the
162
/// internal state forward.
163
///
164
/// `Connection` values are created by calling [`handshake`]. Once a
165
/// `Connection` value is obtained, the caller must call [`poll`] or
166
/// [`poll_close`] in order to drive the internal connection state forward.
167
///
168
/// See [module level] documentation for more details
169
///
170
/// [module level]: index.html
171
/// [`handshake`]: struct.Connection.html#method.handshake
172
/// [`poll`]: struct.Connection.html#method.poll
173
/// [`poll_close`]: struct.Connection.html#method.poll_close
174
///
175
/// # Examples
176
///
177
/// ```
178
/// # use tokio::io::{AsyncRead, AsyncWrite};
179
/// # use h2::server;
180
/// # use h2::server::*;
181
/// #
182
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183
/// let mut server = server::handshake(my_io).await.unwrap();
184
/// while let Some(request) = server.accept().await {
185
///     tokio::spawn(async move {
186
///         let (request, respond) = request.unwrap();
187
///         // Process the request and send the response back to the client
188
///         // using `respond`.
189
///     });
190
/// }
191
/// # }
192
/// #
193
/// # pub fn main() {}
194
/// ```
195
#[must_use = "streams do nothing unless polled"]
196
pub struct Connection<T, B: Buf> {
197
    connection: proto::Connection<T, Peer, B>,
198
}
199
200
/// Builds server connections with custom configuration values.
201
///
202
/// Methods can be chained in order to set the configuration values.
203
///
204
/// The server is constructed by calling [`handshake`] and passing the I/O
205
/// handle that will back the HTTP/2 server.
206
///
207
/// New instances of `Builder` are obtained via [`Builder::new`].
208
///
209
/// See function level documentation for details on the various server
210
/// configuration settings.
211
///
212
/// [`Builder::new`]: struct.Builder.html#method.new
213
/// [`handshake`]: struct.Builder.html#method.handshake
214
///
215
/// # Examples
216
///
217
/// ```
218
/// # use tokio::io::{AsyncRead, AsyncWrite};
219
/// # use h2::server::*;
220
/// #
221
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222
/// # -> Handshake<T>
223
/// # {
224
/// // `server_fut` is a future representing the completion of the HTTP/2
225
/// // handshake.
226
/// let server_fut = Builder::new()
227
///     .initial_window_size(1_000_000)
228
///     .max_concurrent_streams(1000)
229
///     .handshake(my_io);
230
/// # server_fut
231
/// # }
232
/// #
233
/// # pub fn main() {}
234
/// ```
235
#[derive(Clone, Debug)]
236
pub struct Builder {
237
    /// Time to keep locally reset streams around before reaping.
238
    reset_stream_duration: Duration,
239
240
    /// Maximum number of locally reset streams to keep at a time.
241
    reset_stream_max: usize,
242
243
    /// Maximum number of remotely reset streams to allow in the pending
244
    /// accept queue.
245
    pending_accept_reset_stream_max: usize,
246
247
    /// Initial `Settings` frame to send as part of the handshake.
248
    settings: Settings,
249
250
    /// Initial target window size for new connections.
251
    initial_target_connection_window_size: Option<u32>,
252
253
    /// Maximum amount of bytes to "buffer" for writing per stream.
254
    max_send_buffer_size: usize,
255
256
    /// Maximum number of locally reset streams due to protocol error across
257
    /// the lifetime of the connection.
258
    ///
259
    /// When this gets exceeded, we issue GOAWAYs.
260
    local_max_error_reset_streams: Option<usize>,
261
}
262
263
/// Send a response back to the client
264
///
265
/// A `SendResponse` instance is provided when receiving a request and is used
266
/// to send the associated response back to the client. It is also used to
267
/// explicitly reset the stream with a custom reason.
268
///
269
/// It will also be used to initiate push promises linked with the associated
270
/// stream.
271
///
272
/// If the `SendResponse` instance is dropped without sending a response, then
273
/// the HTTP/2 stream will be reset.
274
///
275
/// See [module] level docs for more details.
276
///
277
/// [module]: index.html
278
#[derive(Debug)]
279
pub struct SendResponse<B: Buf> {
280
    inner: proto::StreamRef<B>,
281
}
282
283
/// Send a response to a promised request
284
///
285
/// A `SendPushedResponse` instance is provided when promising a request and is used
286
/// to send the associated response to the client. It is also used to
287
/// explicitly reset the stream with a custom reason.
288
///
289
/// It can not be used to initiate push promises.
290
///
291
/// If the `SendPushedResponse` instance is dropped without sending a response, then
292
/// the HTTP/2 stream will be reset.
293
///
294
/// See [module] level docs for more details.
295
///
296
/// [module]: index.html
297
pub struct SendPushedResponse<B: Buf> {
298
    inner: SendResponse<B>,
299
}
300
301
// Manual implementation necessary because of rust-lang/rust#26925
302
impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304
        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305
    }
306
}
307
308
/// Stages of an in-progress handshake.
309
enum Handshaking<T, B: Buf> {
310
    /// State 1. Connection is flushing pending SETTINGS frame.
311
    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312
    /// State 2. Connection is waiting for the client preface.
313
    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314
    /// State 3. Handshake is done, polling again would panic.
315
    Done,
316
}
317
318
/// Flush a Sink
319
struct Flush<T, B> {
320
    codec: Option<Codec<T, B>>,
321
}
322
323
/// Read the client connection preface
324
struct ReadPreface<T, B> {
325
    codec: Option<Codec<T, B>>,
326
    pos: usize,
327
}
328
329
#[derive(Debug)]
330
pub(crate) struct Peer;
331
332
const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334
/// Creates a new configured HTTP/2 server with default configuration
335
/// values backed by `io`.
336
///
337
/// It is expected that `io` already be in an appropriate state to commence
338
/// the [HTTP/2 handshake]. See [Handshake] for more details.
339
///
340
/// Returns a future which resolves to the [`Connection`] instance once the
341
/// HTTP/2 handshake has been completed. The returned [`Connection`]
342
/// instance will be using default configuration values. Use [`Builder`] to
343
/// customize the configuration values used by a [`Connection`] instance.
344
///
345
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346
/// [Handshake]: ../index.html#handshake
347
/// [`Connection`]: struct.Connection.html
348
///
349
/// # Examples
350
///
351
/// ```
352
/// # use tokio::io::{AsyncRead, AsyncWrite};
353
/// # use h2::server;
354
/// # use h2::server::*;
355
/// #
356
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357
/// # {
358
/// let connection = server::handshake(my_io).await.unwrap();
359
/// // The HTTP/2 handshake has completed, now use `connection` to
360
/// // accept inbound HTTP/2 streams.
361
/// # }
362
/// #
363
/// # pub fn main() {}
364
/// ```
365
pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366
where
367
    T: AsyncRead + AsyncWrite + Unpin,
368
{
369
    Builder::new().handshake(io)
370
}
371
372
// ===== impl Connection =====
373
374
impl<T, B> Connection<T, B>
375
where
376
    T: AsyncRead + AsyncWrite + Unpin,
377
    B: Buf,
378
{
379
    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380
        let span = tracing::trace_span!("server_handshake");
381
        let entered = span.enter();
382
383
        // Create the codec.
384
        let mut codec = Codec::new(io);
385
386
        if let Some(max) = builder.settings.max_frame_size() {
387
            codec.set_max_recv_frame_size(max as usize);
388
        }
389
390
        if let Some(max) = builder.settings.max_header_list_size() {
391
            codec.set_max_recv_header_list_size(max as usize);
392
        }
393
394
        // Send initial settings frame.
395
        codec
396
            .buffer(builder.settings.clone().into())
397
            .expect("invalid SETTINGS frame");
398
399
        // Create the handshake future.
400
        let state =
401
            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403
        drop(entered);
404
405
        Handshake {
406
            builder,
407
            state,
408
            span,
409
        }
410
    }
411
412
    /// Accept the next incoming request on this connection.
413
    pub async fn accept(
414
        &mut self,
415
    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416
        crate::poll_fn(move |cx| self.poll_accept(cx)).await
417
    }
418
419
    #[doc(hidden)]
420
    pub fn poll_accept(
421
        &mut self,
422
        cx: &mut Context<'_>,
423
    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424
        // Always try to advance the internal state. Getting Pending also is
425
        // needed to allow this function to return Pending.
426
        if self.poll_closed(cx)?.is_ready() {
427
            // If the socket is closed, don't return anything
428
            // TODO: drop any pending streams
429
            return Poll::Ready(None);
430
        }
431
432
        if let Some(inner) = self.connection.next_incoming() {
433
            tracing::trace!("received incoming");
434
            let (head, _) = inner.take_request().into_parts();
435
            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437
            let request = Request::from_parts(head, body);
438
            let respond = SendResponse { inner };
439
440
            return Poll::Ready(Some(Ok((request, respond))));
441
        }
442
443
        Poll::Pending
444
    }
445
446
    /// Sets the target window size for the whole connection.
447
    ///
448
    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449
    /// frame will be immediately sent to the remote, increasing the connection
450
    /// level window by `size - current_value`.
451
    ///
452
    /// If `size` is less than the current value, nothing will happen
453
    /// immediately. However, as window capacity is released by
454
    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455
    /// out until the number of "in flight" bytes drops below `size`.
456
    ///
457
    /// The default value is 65,535.
458
    ///
459
    /// See [`FlowControl`] documentation for more details.
460
    ///
461
    /// [`FlowControl`]: ../struct.FlowControl.html
462
    /// [library level]: ../index.html#flow-control
463
    pub fn set_target_window_size(&mut self, size: u32) {
464
        assert!(size <= proto::MAX_WINDOW_SIZE);
465
        self.connection.set_target_window_size(size);
466
    }
467
468
    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469
    /// flow control for received data.
470
    ///
471
    /// The `SETTINGS` will be sent to the remote, and only applied once the
472
    /// remote acknowledges the change.
473
    ///
474
    /// This can be used to increase or decrease the window size for existing
475
    /// streams.
476
    ///
477
    /// # Errors
478
    ///
479
    /// Returns an error if a previous call is still pending acknowledgement
480
    /// from the remote endpoint.
481
    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482
        assert!(size <= proto::MAX_WINDOW_SIZE);
483
        self.connection.set_initial_window_size(size)?;
484
        Ok(())
485
    }
486
487
    /// Enables the [extended CONNECT protocol].
488
    ///
489
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490
    ///
491
    /// # Errors
492
    ///
493
    /// Returns an error if a previous call is still pending acknowledgement
494
    /// from the remote endpoint.
495
    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496
        self.connection.set_enable_connect_protocol()?;
497
        Ok(())
498
    }
499
500
    /// Returns `Ready` when the underlying connection has closed.
501
    ///
502
    /// If any new inbound streams are received during a call to `poll_closed`,
503
    /// they will be queued and returned on the next call to [`poll_accept`].
504
    ///
505
    /// This function will advance the internal connection state, driving
506
    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507
    ///
508
    /// See [here](index.html#managing-the-connection) for more details.
509
    ///
510
    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511
    /// [`RecvStream`]: ../struct.RecvStream.html
512
    /// [`SendStream`]: ../struct.SendStream.html
513
    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514
        self.connection.poll(cx).map_err(Into::into)
515
    }
516
517
    /// Sets the connection to a GOAWAY state.
518
    ///
519
    /// Does not terminate the connection. Must continue being polled to close
520
    /// connection.
521
    ///
522
    /// After flushing the GOAWAY frame, the connection is closed. Any
523
    /// outstanding streams do not prevent the connection from closing. This
524
    /// should usually be reserved for shutting down when something bad
525
    /// external to `h2` has happened, and open streams cannot be properly
526
    /// handled.
527
    ///
528
    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
529
    pub fn abrupt_shutdown(&mut self, reason: Reason) {
530
        self.connection.go_away_from_user(reason);
531
    }
532
533
    /// Starts a [graceful shutdown][1] process.
534
    ///
535
    /// Must continue being polled to close connection.
536
    ///
537
    /// It's possible to receive more requests after calling this method, since
538
    /// they might have been in-flight from the client already. After about
539
    /// 1 RTT, no new requests should be accepted. Once all active streams
540
    /// have completed, the connection is closed.
541
    ///
542
    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
543
    pub fn graceful_shutdown(&mut self) {
544
        self.connection.go_away_gracefully();
545
    }
546
547
    /// Takes a `PingPong` instance from the connection.
548
    ///
549
    /// # Note
550
    ///
551
    /// This may only be called once. Calling multiple times will return `None`.
552
    pub fn ping_pong(&mut self) -> Option<PingPong> {
553
        self.connection.take_user_pings().map(PingPong::new)
554
    }
555
556
    /// Checks if there are any streams
557
    pub fn has_streams(&self) -> bool {
558
        self.connection.has_streams()
559
    }
560
561
    /// Returns the maximum number of concurrent streams that may be initiated
562
    /// by the server on this connection.
563
    ///
564
    /// This limit is configured by the client peer by sending the
565
    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
566
    /// This method returns the currently acknowledged value received from the
567
    /// remote.
568
    ///
569
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
570
    pub fn max_concurrent_send_streams(&self) -> usize {
571
        self.connection.max_send_streams()
572
    }
573
574
    /// Returns the maximum number of concurrent streams that may be initiated
575
    /// by the client on this connection.
576
    ///
577
    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
578
    /// parameter][1] sent in a `SETTINGS` frame that has been
579
    /// acknowledged by the remote peer. The value to be sent is configured by
580
    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
581
    /// with the remote peer.
582
    ///
583
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
584
    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
585
    pub fn max_concurrent_recv_streams(&self) -> usize {
586
        self.connection.max_recv_streams()
587
    }
588
589
    // Could disappear at anytime.
590
    #[doc(hidden)]
591
    #[cfg(feature = "unstable")]
592
    pub fn num_wired_streams(&self) -> usize {
593
        self.connection.num_wired_streams()
594
    }
595
}
596
597
#[cfg(feature = "stream")]
598
impl<T, B> futures_core::Stream for Connection<T, B>
599
where
600
    T: AsyncRead + AsyncWrite + Unpin,
601
    B: Buf,
602
{
603
    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
604
605
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
606
        self.poll_accept(cx)
607
    }
608
}
609
610
impl<T, B> fmt::Debug for Connection<T, B>
611
where
612
    T: fmt::Debug,
613
    B: fmt::Debug + Buf,
614
{
615
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
616
        fmt.debug_struct("Connection")
617
            .field("connection", &self.connection)
618
            .finish()
619
    }
620
}
621
622
// ===== impl Builder =====
623
624
impl Builder {
625
    /// Returns a new server builder instance initialized with default
626
    /// configuration values.
627
    ///
628
    /// Configuration methods can be chained on the return value.
629
    ///
630
    /// # Examples
631
    ///
632
    /// ```
633
    /// # use tokio::io::{AsyncRead, AsyncWrite};
634
    /// # use h2::server::*;
635
    /// #
636
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
637
    /// # -> Handshake<T>
638
    /// # {
639
    /// // `server_fut` is a future representing the completion of the HTTP/2
640
    /// // handshake.
641
    /// let server_fut = Builder::new()
642
    ///     .initial_window_size(1_000_000)
643
    ///     .max_concurrent_streams(1000)
644
    ///     .handshake(my_io);
645
    /// # server_fut
646
    /// # }
647
    /// #
648
    /// # pub fn main() {}
649
    /// ```
650
0
    pub fn new() -> Builder {
651
0
        Builder {
652
0
            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
653
0
            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
654
0
            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
655
0
            settings: Settings::default(),
656
0
            initial_target_connection_window_size: None,
657
0
            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658
0
659
0
            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
660
0
        }
661
0
    }
662
663
    /// Indicates the initial window size (in octets) for stream-level
664
    /// flow control for received data.
665
    ///
666
    /// The initial window of a stream is used as part of flow control. For more
667
    /// details, see [`FlowControl`].
668
    ///
669
    /// The default value is 65,535.
670
    ///
671
    /// [`FlowControl`]: ../struct.FlowControl.html
672
    ///
673
    /// # Examples
674
    ///
675
    /// ```
676
    /// # use tokio::io::{AsyncRead, AsyncWrite};
677
    /// # use h2::server::*;
678
    /// #
679
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
680
    /// # -> Handshake<T>
681
    /// # {
682
    /// // `server_fut` is a future representing the completion of the HTTP/2
683
    /// // handshake.
684
    /// let server_fut = Builder::new()
685
    ///     .initial_window_size(1_000_000)
686
    ///     .handshake(my_io);
687
    /// # server_fut
688
    /// # }
689
    /// #
690
    /// # pub fn main() {}
691
    /// ```
692
0
    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
693
0
        self.settings.set_initial_window_size(Some(size));
694
0
        self
695
0
    }
696
697
    /// Indicates the initial window size (in octets) for connection-level flow control
698
    /// for received data.
699
    ///
700
    /// The initial window of a connection is used as part of flow control. For more details,
701
    /// see [`FlowControl`].
702
    ///
703
    /// The default value is 65,535.
704
    ///
705
    /// [`FlowControl`]: ../struct.FlowControl.html
706
    ///
707
    /// # Examples
708
    ///
709
    /// ```
710
    /// # use tokio::io::{AsyncRead, AsyncWrite};
711
    /// # use h2::server::*;
712
    /// #
713
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
714
    /// # -> Handshake<T>
715
    /// # {
716
    /// // `server_fut` is a future representing the completion of the HTTP/2
717
    /// // handshake.
718
    /// let server_fut = Builder::new()
719
    ///     .initial_connection_window_size(1_000_000)
720
    ///     .handshake(my_io);
721
    /// # server_fut
722
    /// # }
723
    /// #
724
    /// # pub fn main() {}
725
    /// ```
726
0
    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
727
0
        self.initial_target_connection_window_size = Some(size);
728
0
        self
729
0
    }
730
731
    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
732
    /// configured server is able to accept.
733
    ///
734
    /// The sender may send data frames that are **smaller** than this value,
735
    /// but any data larger than `max` will be broken up into multiple `DATA`
736
    /// frames.
737
    ///
738
    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
739
    ///
740
    /// # Examples
741
    ///
742
    /// ```
743
    /// # use tokio::io::{AsyncRead, AsyncWrite};
744
    /// # use h2::server::*;
745
    /// #
746
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747
    /// # -> Handshake<T>
748
    /// # {
749
    /// // `server_fut` is a future representing the completion of the HTTP/2
750
    /// // handshake.
751
    /// let server_fut = Builder::new()
752
    ///     .max_frame_size(1_000_000)
753
    ///     .handshake(my_io);
754
    /// # server_fut
755
    /// # }
756
    /// #
757
    /// # pub fn main() {}
758
    /// ```
759
    ///
760
    /// # Panics
761
    ///
762
    /// This function panics if `max` is not within the legal range specified
763
    /// above.
764
0
    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765
0
        self.settings.set_max_frame_size(Some(max));
766
0
        self
767
0
    }
768
769
    /// Sets the max size of received header frames.
770
    ///
771
    /// This advisory setting informs a peer of the maximum size of header list
772
    /// that the sender is prepared to accept, in octets. The value is based on
773
    /// the uncompressed size of header fields, including the length of the name
774
    /// and value in octets plus an overhead of 32 octets for each header field.
775
    ///
776
    /// This setting is also used to limit the maximum amount of data that is
777
    /// buffered to decode HEADERS frames.
778
    ///
779
    /// # Examples
780
    ///
781
    /// ```
782
    /// # use tokio::io::{AsyncRead, AsyncWrite};
783
    /// # use h2::server::*;
784
    /// #
785
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
786
    /// # -> Handshake<T>
787
    /// # {
788
    /// // `server_fut` is a future representing the completion of the HTTP/2
789
    /// // handshake.
790
    /// let server_fut = Builder::new()
791
    ///     .max_header_list_size(16 * 1024)
792
    ///     .handshake(my_io);
793
    /// # server_fut
794
    /// # }
795
    /// #
796
    /// # pub fn main() {}
797
    /// ```
798
0
    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
799
0
        self.settings.set_max_header_list_size(Some(max));
800
0
        self
801
0
    }
802
803
    /// Sets the maximum number of concurrent streams.
804
    ///
805
    /// The maximum concurrent streams setting only controls the maximum number
806
    /// of streams that can be initiated by the remote peer. In other words,
807
    /// when this setting is set to 100, this does not limit the number of
808
    /// concurrent streams that can be created by the caller.
809
    ///
810
    /// It is recommended that this value be no smaller than 100, so as to not
811
    /// unnecessarily limit parallelism. However, any value is legal, including
812
    /// 0. If `max` is set to 0, then the remote will not be permitted to
813
    /// initiate streams.
814
    ///
815
    /// Note that streams in the reserved state, i.e., push promises that have
816
    /// been reserved but the stream has not started, do not count against this
817
    /// setting.
818
    ///
819
    /// Also note that if the remote *does* exceed the value set here, it is not
820
    /// a protocol level error. Instead, the `h2` library will immediately reset
821
    /// the stream.
822
    ///
823
    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
824
    ///
825
    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
826
    ///
827
    /// # Examples
828
    ///
829
    /// ```
830
    /// # use tokio::io::{AsyncRead, AsyncWrite};
831
    /// # use h2::server::*;
832
    /// #
833
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
834
    /// # -> Handshake<T>
835
    /// # {
836
    /// // `server_fut` is a future representing the completion of the HTTP/2
837
    /// // handshake.
838
    /// let server_fut = Builder::new()
839
    ///     .max_concurrent_streams(1000)
840
    ///     .handshake(my_io);
841
    /// # server_fut
842
    /// # }
843
    /// #
844
    /// # pub fn main() {}
845
    /// ```
846
0
    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
847
0
        self.settings.set_max_concurrent_streams(Some(max));
848
0
        self
849
0
    }
850
851
    /// Sets the maximum number of concurrent locally reset streams.
852
    ///
853
    /// When a stream is explicitly reset by either calling
854
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
855
    /// before completing the stream, the HTTP/2 specification requires that
856
    /// any further frames received for that stream must be ignored for "some
857
    /// time".
858
    ///
859
    /// In order to satisfy the specification, internal state must be maintained
860
    /// to implement the behavior. This state grows linearly with the number of
861
    /// streams that are locally reset.
862
    ///
863
    /// The `max_concurrent_reset_streams` setting configures sets an upper
864
    /// bound on the amount of state that is maintained. When this max value is
865
    /// reached, the oldest reset stream is purged from memory.
866
    ///
867
    /// Once the stream has been fully purged from memory, any additional frames
868
    /// received for that stream will result in a connection level protocol
869
    /// error, forcing the connection to terminate.
870
    ///
871
    /// The default value is currently 50.
872
    ///
873
    /// # Examples
874
    ///
875
    /// ```
876
    /// # use tokio::io::{AsyncRead, AsyncWrite};
877
    /// # use h2::server::*;
878
    /// #
879
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
880
    /// # -> Handshake<T>
881
    /// # {
882
    /// // `server_fut` is a future representing the completion of the HTTP/2
883
    /// // handshake.
884
    /// let server_fut = Builder::new()
885
    ///     .max_concurrent_reset_streams(1000)
886
    ///     .handshake(my_io);
887
    /// # server_fut
888
    /// # }
889
    /// #
890
    /// # pub fn main() {}
891
    /// ```
892
0
    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
893
0
        self.reset_stream_max = max;
894
0
        self
895
0
    }
896
897
    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
898
    ///
899
    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
900
    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
901
    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
902
    ///
903
    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
904
    /// `ENHANCE_YOUR_CALM` to the client.
905
    ///
906
    /// If you really want to disable this, supply [`Option::None`] here.
907
    /// Disabling this is not recommended and may expose you to DOS attacks.
908
    ///
909
    /// The default value is currently 1024, but could change.
910
0
    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
911
0
        self.local_max_error_reset_streams = max;
912
0
        self
913
0
    }
914
915
    /// Sets the maximum number of pending-accept remotely-reset streams.
916
    ///
917
    /// Streams that have been received by the peer, but not accepted by the
918
    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
919
    /// could send a request and then shortly after, realize it is not needed,
920
    /// sending a CANCEL.
921
    ///
922
    /// However, since those streams are now "closed", they don't count towards
923
    /// the max concurrent streams. So, they will sit in the accept queue,
924
    /// using memory.
925
    ///
926
    /// When the number of remotely-reset streams sitting in the pending-accept
927
    /// queue reaches this maximum value, a connection error with the code of
928
    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
929
    /// `Future`.
930
    ///
931
    /// The default value is currently 20, but could change.
932
    ///
933
    /// # Examples
934
    ///
935
    ///
936
    /// ```
937
    /// # use tokio::io::{AsyncRead, AsyncWrite};
938
    /// # use h2::server::*;
939
    /// #
940
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
941
    /// # -> Handshake<T>
942
    /// # {
943
    /// // `server_fut` is a future representing the completion of the HTTP/2
944
    /// // handshake.
945
    /// let server_fut = Builder::new()
946
    ///     .max_pending_accept_reset_streams(100)
947
    ///     .handshake(my_io);
948
    /// # server_fut
949
    /// # }
950
    /// #
951
    /// # pub fn main() {}
952
    /// ```
953
0
    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
954
0
        self.pending_accept_reset_stream_max = max;
955
0
        self
956
0
    }
957
958
    /// Sets the maximum send buffer size per stream.
959
    ///
960
    /// Once a stream has buffered up to (or over) the maximum, the stream's
961
    /// flow control will not "poll" additional capacity. Once bytes for the
962
    /// stream have been written to the connection, the send buffer capacity
963
    /// will be freed up again.
964
    ///
965
    /// The default is currently ~400KB, but may change.
966
    ///
967
    /// # Panics
968
    ///
969
    /// This function panics if `max` is larger than `u32::MAX`.
970
0
    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
971
0
        assert!(max <= u32::MAX as usize);
972
0
        self.max_send_buffer_size = max;
973
0
        self
974
0
    }
975
976
    /// Sets the maximum number of concurrent locally reset streams.
977
    ///
978
    /// When a stream is explicitly reset by either calling
979
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
980
    /// before completing the stream, the HTTP/2 specification requires that
981
    /// any further frames received for that stream must be ignored for "some
982
    /// time".
983
    ///
984
    /// In order to satisfy the specification, internal state must be maintained
985
    /// to implement the behavior. This state grows linearly with the number of
986
    /// streams that are locally reset.
987
    ///
988
    /// The `reset_stream_duration` setting configures the max amount of time
989
    /// this state will be maintained in memory. Once the duration elapses, the
990
    /// stream state is purged from memory.
991
    ///
992
    /// Once the stream has been fully purged from memory, any additional frames
993
    /// received for that stream will result in a connection level protocol
994
    /// error, forcing the connection to terminate.
995
    ///
996
    /// The default value is currently 1 second.
997
    ///
998
    /// # Examples
999
    ///
1000
    /// ```
1001
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1002
    /// # use h2::server::*;
1003
    /// # use std::time::Duration;
1004
    /// #
1005
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1006
    /// # -> Handshake<T>
1007
    /// # {
1008
    /// // `server_fut` is a future representing the completion of the HTTP/2
1009
    /// // handshake.
1010
    /// let server_fut = Builder::new()
1011
    ///     .reset_stream_duration(Duration::from_secs(10))
1012
    ///     .handshake(my_io);
1013
    /// # server_fut
1014
    /// # }
1015
    /// #
1016
    /// # pub fn main() {}
1017
    /// ```
1018
0
    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1019
0
        self.reset_stream_duration = dur;
1020
0
        self
1021
0
    }
1022
1023
    /// Enables the [extended CONNECT protocol].
1024
    ///
1025
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1026
0
    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1027
0
        self.settings.set_enable_connect_protocol(Some(1));
1028
0
        self
1029
0
    }
1030
1031
    /// Creates a new configured HTTP/2 server backed by `io`.
1032
    ///
1033
    /// It is expected that `io` already be in an appropriate state to commence
1034
    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1035
    ///
1036
    /// Returns a future which resolves to the [`Connection`] instance once the
1037
    /// HTTP/2 handshake has been completed.
1038
    ///
1039
    /// This function also allows the caller to configure the send payload data
1040
    /// type. See [Outbound data type] for more details.
1041
    ///
1042
    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1043
    /// [Handshake]: ../index.html#handshake
1044
    /// [`Connection`]: struct.Connection.html
1045
    /// [Outbound data type]: ../index.html#outbound-data-type.
1046
    ///
1047
    /// # Examples
1048
    ///
1049
    /// Basic usage:
1050
    ///
1051
    /// ```
1052
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1053
    /// # use h2::server::*;
1054
    /// #
1055
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1056
    /// # -> Handshake<T>
1057
    /// # {
1058
    /// // `server_fut` is a future representing the completion of the HTTP/2
1059
    /// // handshake.
1060
    /// let server_fut = Builder::new()
1061
    ///     .handshake(my_io);
1062
    /// # server_fut
1063
    /// # }
1064
    /// #
1065
    /// # pub fn main() {}
1066
    /// ```
1067
    ///
1068
    /// Configures the send-payload data type. In this case, the outbound data
1069
    /// type will be `&'static [u8]`.
1070
    ///
1071
    /// ```
1072
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1073
    /// # use h2::server::*;
1074
    /// #
1075
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1076
    /// # -> Handshake<T, &'static [u8]>
1077
    /// # {
1078
    /// // `server_fut` is a future representing the completion of the HTTP/2
1079
    /// // handshake.
1080
    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1081
    ///     .handshake(my_io);
1082
    /// # server_fut
1083
    /// # }
1084
    /// #
1085
    /// # pub fn main() {}
1086
    /// ```
1087
    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1088
    where
1089
        T: AsyncRead + AsyncWrite + Unpin,
1090
        B: Buf,
1091
    {
1092
        Connection::handshake2(io, self.clone())
1093
    }
1094
}
1095
1096
impl Default for Builder {
1097
0
    fn default() -> Builder {
1098
0
        Builder::new()
1099
0
    }
1100
}
1101
1102
// ===== impl SendResponse =====
1103
1104
impl<B: Buf> SendResponse<B> {
1105
    /// Send a response to a client request.
1106
    ///
1107
    /// On success, a [`SendStream`] instance is returned. This instance can be
1108
    /// used to stream the response body and send trailers.
1109
    ///
1110
    /// If a body or trailers will be sent on the returned [`SendStream`]
1111
    /// instance, then `end_of_stream` must be set to `false` when calling this
1112
    /// function.
1113
    ///
1114
    /// The [`SendResponse`] instance is already associated with a received
1115
    /// request.  This function may only be called once per instance and only if
1116
    /// [`send_reset`] has not been previously called.
1117
    ///
1118
    /// [`SendResponse`]: #
1119
    /// [`SendStream`]: ../struct.SendStream.html
1120
    /// [`send_reset`]: #method.send_reset
1121
    pub fn send_response(
1122
        &mut self,
1123
        response: Response<()>,
1124
        end_of_stream: bool,
1125
    ) -> Result<SendStream<B>, crate::Error> {
1126
        self.inner
1127
            .send_response(response, end_of_stream)
1128
            .map(|_| SendStream::new(self.inner.clone()))
1129
            .map_err(Into::into)
1130
    }
1131
1132
    /// Push a request and response to the client
1133
    ///
1134
    /// On success, a [`SendResponse`] instance is returned.
1135
    ///
1136
    /// [`SendResponse`]: #
1137
    pub fn push_request(
1138
        &mut self,
1139
        request: Request<()>,
1140
    ) -> Result<SendPushedResponse<B>, crate::Error> {
1141
        self.inner
1142
            .send_push_promise(request)
1143
            .map(|inner| SendPushedResponse {
1144
                inner: SendResponse { inner },
1145
            })
1146
            .map_err(Into::into)
1147
    }
1148
1149
    /// Send a stream reset to the peer.
1150
    ///
1151
    /// This essentially cancels the stream, including any inbound or outbound
1152
    /// data streams.
1153
    ///
1154
    /// If this function is called before [`send_response`], a call to
1155
    /// [`send_response`] will result in an error.
1156
    ///
1157
    /// If this function is called while a [`SendStream`] instance is active,
1158
    /// any further use of the instance will result in an error.
1159
    ///
1160
    /// This function should only be called once.
1161
    ///
1162
    /// [`send_response`]: #method.send_response
1163
    /// [`SendStream`]: ../struct.SendStream.html
1164
    pub fn send_reset(&mut self, reason: Reason) {
1165
        self.inner.send_reset(reason)
1166
    }
1167
1168
    /// Polls to be notified when the client resets this stream.
1169
    ///
1170
    /// If stream is still open, this returns `Poll::Pending`, and
1171
    /// registers the task to be notified if a `RST_STREAM` is received.
1172
    ///
1173
    /// If a `RST_STREAM` frame is received for this stream, calling this
1174
    /// method will yield the `Reason` for the reset.
1175
    ///
1176
    /// # Error
1177
    ///
1178
    /// Calling this method after having called `send_response` will return
1179
    /// a user error.
1180
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1181
        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1182
    }
1183
1184
    /// Returns the stream ID of the response stream.
1185
    ///
1186
    /// # Panics
1187
    ///
1188
    /// If the lock on the stream store has been poisoned.
1189
    pub fn stream_id(&self) -> crate::StreamId {
1190
        crate::StreamId::from_internal(self.inner.stream_id())
1191
    }
1192
}
1193
1194
// ===== impl SendPushedResponse =====
1195
1196
impl<B: Buf> SendPushedResponse<B> {
1197
    /// Send a response to a promised request.
1198
    ///
1199
    /// On success, a [`SendStream`] instance is returned. This instance can be
1200
    /// used to stream the response body and send trailers.
1201
    ///
1202
    /// If a body or trailers will be sent on the returned [`SendStream`]
1203
    /// instance, then `end_of_stream` must be set to `false` when calling this
1204
    /// function.
1205
    ///
1206
    /// The [`SendPushedResponse`] instance is associated with a promised
1207
    /// request.  This function may only be called once per instance and only if
1208
    /// [`send_reset`] has not been previously called.
1209
    ///
1210
    /// [`SendPushedResponse`]: #
1211
    /// [`SendStream`]: ../struct.SendStream.html
1212
    /// [`send_reset`]: #method.send_reset
1213
    pub fn send_response(
1214
        &mut self,
1215
        response: Response<()>,
1216
        end_of_stream: bool,
1217
    ) -> Result<SendStream<B>, crate::Error> {
1218
        self.inner.send_response(response, end_of_stream)
1219
    }
1220
1221
    /// Send a stream reset to the peer.
1222
    ///
1223
    /// This essentially cancels the stream, including any inbound or outbound
1224
    /// data streams.
1225
    ///
1226
    /// If this function is called before [`send_response`], a call to
1227
    /// [`send_response`] will result in an error.
1228
    ///
1229
    /// If this function is called while a [`SendStream`] instance is active,
1230
    /// any further use of the instance will result in an error.
1231
    ///
1232
    /// This function should only be called once.
1233
    ///
1234
    /// [`send_response`]: #method.send_response
1235
    /// [`SendStream`]: ../struct.SendStream.html
1236
    pub fn send_reset(&mut self, reason: Reason) {
1237
        self.inner.send_reset(reason)
1238
    }
1239
1240
    /// Polls to be notified when the client resets this stream.
1241
    ///
1242
    /// If stream is still open, this returns `Poll::Pending`, and
1243
    /// registers the task to be notified if a `RST_STREAM` is received.
1244
    ///
1245
    /// If a `RST_STREAM` frame is received for this stream, calling this
1246
    /// method will yield the `Reason` for the reset.
1247
    ///
1248
    /// # Error
1249
    ///
1250
    /// Calling this method after having called `send_response` will return
1251
    /// a user error.
1252
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1253
        self.inner.poll_reset(cx)
1254
    }
1255
1256
    /// Returns the stream ID of the response stream.
1257
    ///
1258
    /// # Panics
1259
    ///
1260
    /// If the lock on the stream store has been poisoned.
1261
    pub fn stream_id(&self) -> crate::StreamId {
1262
        self.inner.stream_id()
1263
    }
1264
}
1265
1266
// ===== impl Flush =====
1267
1268
impl<T, B: Buf> Flush<T, B> {
1269
    fn new(codec: Codec<T, B>) -> Self {
1270
        Flush { codec: Some(codec) }
1271
    }
1272
}
1273
1274
impl<T, B> Future for Flush<T, B>
1275
where
1276
    T: AsyncWrite + Unpin,
1277
    B: Buf,
1278
{
1279
    type Output = Result<Codec<T, B>, crate::Error>;
1280
1281
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1282
        // Flush the codec
1283
        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1284
1285
        // Return the codec
1286
        Poll::Ready(Ok(self.codec.take().unwrap()))
1287
    }
1288
}
1289
1290
impl<T, B: Buf> ReadPreface<T, B> {
1291
    fn new(codec: Codec<T, B>) -> Self {
1292
        ReadPreface {
1293
            codec: Some(codec),
1294
            pos: 0,
1295
        }
1296
    }
1297
1298
    fn inner_mut(&mut self) -> &mut T {
1299
        self.codec.as_mut().unwrap().get_mut()
1300
    }
1301
}
1302
1303
impl<T, B> Future for ReadPreface<T, B>
1304
where
1305
    T: AsyncRead + Unpin,
1306
    B: Buf,
1307
{
1308
    type Output = Result<Codec<T, B>, crate::Error>;
1309
1310
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1311
        let mut buf = [0; 24];
1312
        let mut rem = PREFACE.len() - self.pos;
1313
1314
        while rem > 0 {
1315
            let mut buf = ReadBuf::new(&mut buf[..rem]);
1316
            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1317
                .map_err(crate::Error::from_io)?;
1318
            let n = buf.filled().len();
1319
            if n == 0 {
1320
                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1321
                    io::ErrorKind::UnexpectedEof,
1322
                    "connection closed before reading preface",
1323
                ))));
1324
            }
1325
1326
            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1327
                proto_err!(conn: "read_preface: invalid preface");
1328
                // TODO: Should this just write the GO_AWAY frame directly?
1329
                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1330
            }
1331
1332
            self.pos += n;
1333
            rem -= n; // TODO test
1334
        }
1335
1336
        Poll::Ready(Ok(self.codec.take().unwrap()))
1337
    }
1338
}
1339
1340
// ===== impl Handshake =====
1341
1342
impl<T, B: Buf> Future for Handshake<T, B>
1343
where
1344
    T: AsyncRead + AsyncWrite + Unpin,
1345
    B: Buf,
1346
{
1347
    type Output = Result<Connection<T, B>, crate::Error>;
1348
1349
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1350
        let span = self.span.clone(); // XXX(eliza): T_T
1351
        let _e = span.enter();
1352
        tracing::trace!(state = ?self.state);
1353
1354
        loop {
1355
            match &mut self.state {
1356
                Handshaking::Flushing(flush) => {
1357
                    // We're currently flushing a pending SETTINGS frame. Poll the
1358
                    // flush future, and, if it's completed, advance our state to wait
1359
                    // for the client preface.
1360
                    let codec = match Pin::new(flush).poll(cx)? {
1361
                        Poll::Pending => {
1362
                            tracing::trace!(flush.poll = %"Pending");
1363
                            return Poll::Pending;
1364
                        }
1365
                        Poll::Ready(flushed) => {
1366
                            tracing::trace!(flush.poll = %"Ready");
1367
                            flushed
1368
                        }
1369
                    };
1370
                    self.state = Handshaking::ReadingPreface(
1371
                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1372
                    );
1373
                }
1374
                Handshaking::ReadingPreface(read) => {
1375
                    let codec = ready!(Pin::new(read).poll(cx)?);
1376
1377
                    self.state = Handshaking::Done;
1378
1379
                    let connection = proto::Connection::new(
1380
                        codec,
1381
                        Config {
1382
                            next_stream_id: 2.into(),
1383
                            // Server does not need to locally initiate any streams
1384
                            initial_max_send_streams: 0,
1385
                            max_send_buffer_size: self.builder.max_send_buffer_size,
1386
                            reset_stream_duration: self.builder.reset_stream_duration,
1387
                            reset_stream_max: self.builder.reset_stream_max,
1388
                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1389
                            local_error_reset_streams_max: self
1390
                                .builder
1391
                                .local_max_error_reset_streams,
1392
                            settings: self.builder.settings.clone(),
1393
                        },
1394
                    );
1395
1396
                    tracing::trace!("connection established!");
1397
                    let mut c = Connection { connection };
1398
                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1399
                        c.set_target_window_size(sz);
1400
                    }
1401
1402
                    return Poll::Ready(Ok(c));
1403
                }
1404
                Handshaking::Done => {
1405
                    panic!("Handshaking::poll() called again after handshaking was complete")
1406
                }
1407
            }
1408
        }
1409
    }
1410
}
1411
1412
impl<T, B> fmt::Debug for Handshake<T, B>
1413
where
1414
    T: AsyncRead + AsyncWrite + fmt::Debug,
1415
    B: fmt::Debug + Buf,
1416
{
1417
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1418
        write!(fmt, "server::Handshake")
1419
    }
1420
}
1421
1422
impl Peer {
1423
0
    pub fn convert_send_message(
1424
0
        id: StreamId,
1425
0
        response: Response<()>,
1426
0
        end_of_stream: bool,
1427
0
    ) -> frame::Headers {
1428
        use http::response::Parts;
1429
1430
        // Extract the components of the HTTP request
1431
        let (
1432
            Parts {
1433
0
                status, headers, ..
1434
            },
1435
            _,
1436
0
        ) = response.into_parts();
1437
1438
        // Build the set pseudo header set. All requests will include `method`
1439
        // and `path`.
1440
0
        let pseudo = Pseudo::response(status);
1441
1442
        // Create the HEADERS frame
1443
0
        let mut frame = frame::Headers::new(id, pseudo, headers);
1444
1445
0
        if end_of_stream {
1446
0
            frame.set_end_stream()
1447
0
        }
1448
1449
0
        frame
1450
0
    }
1451
1452
0
    pub fn convert_push_message(
1453
0
        stream_id: StreamId,
1454
0
        promised_id: StreamId,
1455
0
        request: Request<()>,
1456
0
    ) -> Result<frame::PushPromise, UserError> {
1457
        use http::request::Parts;
1458
1459
0
        if let Err(e) = frame::PushPromise::validate_request(&request) {
1460
            use PushPromiseHeaderError::*;
1461
0
            match e {
1462
0
                NotSafeAndCacheable => tracing::debug!(
1463
                    ?promised_id,
1464
0
                    "convert_push_message: method {} is not safe and cacheable",
1465
0
                    request.method(),
1466
                ),
1467
0
                InvalidContentLength(e) => tracing::debug!(
1468
                    ?promised_id,
1469
0
                    "convert_push_message; promised request has invalid content-length {:?}",
1470
                    e,
1471
                ),
1472
            }
1473
0
            return Err(UserError::MalformedHeaders);
1474
0
        }
1475
1476
        // Extract the components of the HTTP request
1477
        let (
1478
            Parts {
1479
0
                method,
1480
0
                uri,
1481
0
                headers,
1482
                ..
1483
            },
1484
            _,
1485
0
        ) = request.into_parts();
1486
1487
0
        let pseudo = Pseudo::request(method, uri, None);
1488
1489
0
        Ok(frame::PushPromise::new(
1490
0
            stream_id,
1491
0
            promised_id,
1492
0
            pseudo,
1493
0
            headers,
1494
0
        ))
1495
0
    }
1496
}
1497
1498
impl proto::Peer for Peer {
1499
    type Poll = Request<()>;
1500
1501
    const NAME: &'static str = "Server";
1502
1503
    /*
1504
    fn is_server() -> bool {
1505
        true
1506
    }
1507
    */
1508
1509
0
    fn r#dyn() -> proto::DynPeer {
1510
0
        proto::DynPeer::Server
1511
0
    }
1512
1513
1.11k
    fn convert_poll_message(
1514
1.11k
        pseudo: Pseudo,
1515
1.11k
        fields: HeaderMap,
1516
1.11k
        stream_id: StreamId,
1517
1.11k
    ) -> Result<Self::Poll, Error> {
1518
        use http::{uri, Version};
1519
1520
1.11k
        let mut b = Request::builder();
1521
1522
        macro_rules! malformed {
1523
            ($($arg:tt)*) => {{
1524
                tracing::debug!($($arg)*);
1525
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1526
            }}
1527
        }
1528
1529
1.11k
        b = b.version(Version::HTTP_2);
1530
1531
        let is_connect;
1532
1.11k
        if let Some(method) = pseudo.method {
1533
806
            is_connect = method == Method::CONNECT;
1534
806
            b = b.method(method);
1535
806
        } else {
1536
307
            malformed!("malformed headers: missing method");
1537
        }
1538
1539
806
        let has_protocol = pseudo.protocol.is_some();
1540
806
        if has_protocol {
1541
1
            if is_connect {
1542
0
                // Assert that we have the right type.
1543
0
                b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1544
0
            } else {
1545
1
                malformed!("malformed headers: :protocol on non-CONNECT request");
1546
            }
1547
805
        }
1548
1549
805
        if pseudo.status.is_some() {
1550
17
            malformed!("malformed headers: :status field on request");
1551
788
        }
1552
1553
        // Convert the URI
1554
788
        let mut parts = uri::Parts::default();
1555
1556
        // A request translated from HTTP/1 must not include the :authority
1557
        // header
1558
788
        if let Some(authority) = pseudo.authority {
1559
164
            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1560
164
            parts.authority = Some(maybe_authority.or_else(|why| {
1561
118
                malformed!(
1562
0
                    "malformed headers: malformed authority ({:?}): {}",
1563
                    authority,
1564
                    why,
1565
                )
1566
118
            })?);
1567
624
        }
1568
1569
        // A :scheme is required, except CONNECT.
1570
670
        if let Some(scheme) = pseudo.scheme {
1571
634
            if is_connect && !has_protocol {
1572
0
                malformed!("malformed headers: :scheme in CONNECT");
1573
634
            }
1574
634
            let maybe_scheme = scheme.parse();
1575
634
            let scheme = maybe_scheme.or_else(|why| {
1576
45
                malformed!(
1577
0
                    "malformed headers: malformed scheme ({:?}): {}",
1578
                    scheme,
1579
                    why,
1580
                )
1581
45
            })?;
1582
1583
            // It's not possible to build an `Uri` from a scheme and path. So,
1584
            // after validating is was a valid scheme, we just have to drop it
1585
            // if there isn't an :authority.
1586
589
            if parts.authority.is_some() {
1587
30
                parts.scheme = Some(scheme);
1588
559
            }
1589
36
        } else if !is_connect || has_protocol {
1590
36
            malformed!("malformed headers: missing scheme");
1591
0
        }
1592
1593
589
        if let Some(path) = pseudo.path {
1594
358
            if is_connect && !has_protocol {
1595
0
                malformed!("malformed headers: :path in CONNECT");
1596
358
            }
1597
1598
            // This cannot be empty
1599
358
            if path.is_empty() {
1600
13
                malformed!("malformed headers: missing path");
1601
345
            }
1602
1603
345
            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1604
345
            parts.path_and_query = Some(maybe_path.or_else(|why| {
1605
209
                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1606
209
            })?);
1607
231
        } else if is_connect && has_protocol {
1608
0
            malformed!("malformed headers: missing path in extended CONNECT");
1609
231
        }
1610
1611
367
        b = b.uri(parts);
1612
1613
367
        let mut request = match b.body(()) {
1614
349
            Ok(request) => request,
1615
18
            Err(e) => {
1616
                // TODO: Should there be more specialized handling for different
1617
                // kinds of errors
1618
18
                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1619
18
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1620
            }
1621
        };
1622
1623
349
        *request.headers_mut() = fields;
1624
1625
349
        Ok(request)
1626
1.11k
    }
1627
}
1628
1629
// ===== impl Handshaking =====
1630
1631
impl<T, B> fmt::Debug for Handshaking<T, B>
1632
where
1633
    B: Buf,
1634
{
1635
    #[inline]
1636
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1637
        match *self {
1638
            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1639
            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1640
            Handshaking::Done => f.write_str("Done"),
1641
        }
1642
    }
1643
}