Coverage Report

Created: 2026-02-26 06:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/server.rs
Line
Count
Source
1
//! Server implementation of the HTTP/2 protocol.
2
//!
3
//! # Getting started
4
//!
5
//! Running an HTTP/2 server requires the caller to manage accepting the
6
//! connections as well as getting the connections to a state that is ready to
7
//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8
//! details.
9
//!
10
//! This could be as basic as using Tokio's [`TcpListener`] to accept
11
//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12
//! upgrades.
13
//!
14
//! Once a connection is obtained, it is passed to [`handshake`],
15
//! which will begin the [HTTP/2 handshake]. This returns a future that
16
//! completes once the handshake process is performed and HTTP/2 streams may
17
//! be received.
18
//!
19
//! [`handshake`] uses default configuration values. There are a number of
20
//! settings that can be changed by using [`Builder`] instead.
21
//!
22
//! # Inbound streams
23
//!
24
//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25
//! does this by implementing [`futures::Stream`]. When a new stream is
26
//! received, a call to [`Connection::accept`] will return `(request, response)`.
27
//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28
//! HTTP request head as well as provides a way to receive the inbound data
29
//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30
//! allows responding to the request, stream the response payload, send
31
//! trailers, and send push promises.
32
//!
33
//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34
//! can be operated independently.
35
//!
36
//! # Managing the connection
37
//!
38
//! The [`Connection`] instance is used to manage connection state. The caller
39
//! is required to call either [`Connection::accept`] or
40
//! [`Connection::poll_close`] in order to advance the connection state. Simply
41
//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42
//! connection state is advanced.
43
//!
44
//! It is not required to call **both** [`Connection::accept`] and
45
//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46
//! then only [`Connection::accept`] should be called. When the caller **does
47
//! not** want to accept a new stream, [`Connection::poll_close`] should be
48
//! called.
49
//!
50
//! The [`Connection`] instance should only be dropped once
51
//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52
//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53
//! this point, only [`Connection::poll_close`] should be called.
54
//!
55
//! # Shutting down the server
56
//!
57
//! Graceful shutdown of the server is [not yet
58
//! implemented](https://github.com/hyperium/h2/issues/69).
59
//!
60
//! # Example
61
//!
62
//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63
//! knowledge], i.e. both the client and the server assume that the TCP socket
64
//! will use the HTTP/2 protocol without prior negotiation.
65
//!
66
//! ```no_run
67
//! use h2::server;
68
//! use http::{Response, StatusCode};
69
//! use tokio::net::TcpListener;
70
//!
71
//! #[tokio::main]
72
//! pub async fn main() {
73
//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74
//!
75
//!     // Accept all incoming TCP connections.
76
//!     loop {
77
//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78
//!             // Spawn a new task to process each connection.
79
//!             tokio::spawn(async {
80
//!                 // Start the HTTP/2 connection handshake
81
//!                 let mut h2 = server::handshake(socket).await.unwrap();
82
//!                 // Accept all inbound HTTP/2 streams sent over the
83
//!                 // connection.
84
//!                 while let Some(request) = h2.accept().await {
85
//!                     let (request, mut respond) = request.unwrap();
86
//!                     println!("Received request: {:?}", request);
87
//!
88
//!                     // Build a response with no body
89
//!                     let response = Response::builder()
90
//!                         .status(StatusCode::OK)
91
//!                         .body(())
92
//!                         .unwrap();
93
//!
94
//!                     // Send the response back to the client
95
//!                     respond.send_response(response, true)
96
//!                         .unwrap();
97
//!                 }
98
//!
99
//!             });
100
//!         }
101
//!     }
102
//! }
103
//! ```
104
//!
105
//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106
//! [`handshake`]: fn.handshake.html
107
//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108
//! [`Builder`]: struct.Builder.html
109
//! [`Connection`]: struct.Connection.html
110
//! [`Connection::poll`]: struct.Connection.html#method.poll
111
//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112
//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113
//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114
//! [`RecvStream`]: ../struct.RecvStream.html
115
//! [`SendStream`]: ../struct.SendStream.html
116
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118
use crate::codec::{Codec, UserError};
119
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120
use crate::proto::{self, Config, Error, Prioritized};
121
use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123
use bytes::{Buf, Bytes};
124
use http::{HeaderMap, Method, Request, Response};
125
use std::future::Future;
126
use std::pin::Pin;
127
use std::task::{Context, Poll};
128
use std::time::Duration;
129
use std::{fmt, io};
130
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131
use tracing::instrument::{Instrument, Instrumented};
132
133
/// In progress HTTP/2 connection handshake future.
134
///
135
/// This type implements `Future`, yielding a `Connection` instance once the
136
/// handshake has completed.
137
///
138
/// The handshake is completed once the connection preface is fully received
139
/// from the client **and** the initial settings frame is sent to the client.
140
///
141
/// The handshake future does not wait for the initial settings frame from the
142
/// client.
143
///
144
/// See [module] level docs for more details.
145
///
146
/// [module]: index.html
147
#[must_use = "futures do nothing unless polled"]
148
pub struct Handshake<T, B: Buf = Bytes> {
149
    /// The config to pass to Connection::new after handshake succeeds.
150
    builder: Builder,
151
    /// The current state of the handshake.
152
    state: Handshaking<T, B>,
153
    /// Span tracking the handshake
154
    span: tracing::Span,
155
}
156
157
/// Accepts inbound HTTP/2 streams on a connection.
158
///
159
/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160
/// implements the HTTP/2 server logic for that connection. It is responsible
161
/// for receiving inbound streams initiated by the client as well as driving the
162
/// internal state forward.
163
///
164
/// `Connection` values are created by calling [`handshake`]. Once a
165
/// `Connection` value is obtained, the caller must call [`poll`] or
166
/// [`poll_close`] in order to drive the internal connection state forward.
167
///
168
/// See [module level] documentation for more details
169
///
170
/// [module level]: index.html
171
/// [`handshake`]: struct.Connection.html#method.handshake
172
/// [`poll`]: struct.Connection.html#method.poll
173
/// [`poll_close`]: struct.Connection.html#method.poll_close
174
///
175
/// # Examples
176
///
177
/// ```
178
/// # use tokio::io::{AsyncRead, AsyncWrite};
179
/// # use h2::server;
180
/// # use h2::server::*;
181
/// #
182
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183
/// let mut server = server::handshake(my_io).await.unwrap();
184
/// while let Some(request) = server.accept().await {
185
///     tokio::spawn(async move {
186
///         let (request, respond) = request.unwrap();
187
///         // Process the request and send the response back to the client
188
///         // using `respond`.
189
///     });
190
/// }
191
/// # }
192
/// #
193
/// # pub fn main() {}
194
/// ```
195
#[must_use = "streams do nothing unless polled"]
196
pub struct Connection<T, B: Buf> {
197
    connection: proto::Connection<T, Peer, B>,
198
}
199
200
/// Builds server connections with custom configuration values.
201
///
202
/// Methods can be chained in order to set the configuration values.
203
///
204
/// The server is constructed by calling [`handshake`] and passing the I/O
205
/// handle that will back the HTTP/2 server.
206
///
207
/// New instances of `Builder` are obtained via [`Builder::new`].
208
///
209
/// See function level documentation for details on the various server
210
/// configuration settings.
211
///
212
/// [`Builder::new`]: struct.Builder.html#method.new
213
/// [`handshake`]: struct.Builder.html#method.handshake
214
///
215
/// # Examples
216
///
217
/// ```
218
/// # use tokio::io::{AsyncRead, AsyncWrite};
219
/// # use h2::server::*;
220
/// #
221
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222
/// # -> Handshake<T>
223
/// # {
224
/// // `server_fut` is a future representing the completion of the HTTP/2
225
/// // handshake.
226
/// let server_fut = Builder::new()
227
///     .initial_window_size(1_000_000)
228
///     .max_concurrent_streams(1000)
229
///     .handshake(my_io);
230
/// # server_fut
231
/// # }
232
/// #
233
/// # pub fn main() {}
234
/// ```
235
#[derive(Clone, Debug)]
236
pub struct Builder {
237
    /// Time to keep locally reset streams around before reaping.
238
    reset_stream_duration: Duration,
239
240
    /// Maximum number of locally reset streams to keep at a time.
241
    reset_stream_max: usize,
242
243
    /// Maximum number of remotely reset streams to allow in the pending
244
    /// accept queue.
245
    pending_accept_reset_stream_max: usize,
246
247
    /// Initial `Settings` frame to send as part of the handshake.
248
    settings: Settings,
249
250
    /// Initial target window size for new connections.
251
    initial_target_connection_window_size: Option<u32>,
252
253
    /// Maximum amount of bytes to "buffer" for writing per stream.
254
    max_send_buffer_size: usize,
255
256
    /// Maximum number of locally reset streams due to protocol error across
257
    /// the lifetime of the connection.
258
    ///
259
    /// When this gets exceeded, we issue GOAWAYs.
260
    local_max_error_reset_streams: Option<usize>,
261
}
262
263
/// Send a response back to the client
264
///
265
/// A `SendResponse` instance is provided when receiving a request and is used
266
/// to send the associated response back to the client. It is also used to
267
/// explicitly reset the stream with a custom reason.
268
///
269
/// It will also be used to initiate push promises linked with the associated
270
/// stream.
271
///
272
/// If the `SendResponse` instance is dropped without sending a response, then
273
/// the HTTP/2 stream will be reset.
274
///
275
/// See [module] level docs for more details.
276
///
277
/// [module]: index.html
278
#[derive(Debug)]
279
pub struct SendResponse<B: Buf> {
280
    inner: proto::StreamRef<B>,
281
}
282
283
/// Send a response to a promised request
284
///
285
/// A `SendPushedResponse` instance is provided when promising a request and is used
286
/// to send the associated response to the client. It is also used to
287
/// explicitly reset the stream with a custom reason.
288
///
289
/// It can not be used to initiate push promises.
290
///
291
/// If the `SendPushedResponse` instance is dropped without sending a response, then
292
/// the HTTP/2 stream will be reset.
293
///
294
/// See [module] level docs for more details.
295
///
296
/// [module]: index.html
297
pub struct SendPushedResponse<B: Buf> {
298
    inner: SendResponse<B>,
299
}
300
301
// Manual implementation necessary because of rust-lang/rust#26925
302
impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304
        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305
    }
306
}
307
308
/// Stages of an in-progress handshake.
309
enum Handshaking<T, B: Buf> {
310
    /// State 1. Connection is flushing pending SETTINGS frame.
311
    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312
    /// State 2. Connection is waiting for the client preface.
313
    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314
    /// State 3. Handshake is done, polling again would panic.
315
    Done,
316
}
317
318
/// Flush a Sink
319
struct Flush<T, B> {
320
    codec: Option<Codec<T, B>>,
321
}
322
323
/// Read the client connection preface
324
struct ReadPreface<T, B> {
325
    codec: Option<Codec<T, B>>,
326
    pos: usize,
327
}
328
329
#[derive(Debug)]
330
pub(crate) struct Peer;
331
332
const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334
/// Creates a new configured HTTP/2 server with default configuration
335
/// values backed by `io`.
336
///
337
/// It is expected that `io` already be in an appropriate state to commence
338
/// the [HTTP/2 handshake]. See [Handshake] for more details.
339
///
340
/// Returns a future which resolves to the [`Connection`] instance once the
341
/// HTTP/2 handshake has been completed. The returned [`Connection`]
342
/// instance will be using default configuration values. Use [`Builder`] to
343
/// customize the configuration values used by a [`Connection`] instance.
344
///
345
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346
/// [Handshake]: ../index.html#handshake
347
/// [`Connection`]: struct.Connection.html
348
///
349
/// # Examples
350
///
351
/// ```
352
/// # use tokio::io::{AsyncRead, AsyncWrite};
353
/// # use h2::server;
354
/// # use h2::server::*;
355
/// #
356
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357
/// # {
358
/// let connection = server::handshake(my_io).await.unwrap();
359
/// // The HTTP/2 handshake has completed, now use `connection` to
360
/// // accept inbound HTTP/2 streams.
361
/// # }
362
/// #
363
/// # pub fn main() {}
364
/// ```
365
pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366
where
367
    T: AsyncRead + AsyncWrite + Unpin,
368
{
369
    Builder::new().handshake(io)
370
}
371
372
// ===== impl Connection =====
373
374
impl<T, B> Connection<T, B>
375
where
376
    T: AsyncRead + AsyncWrite + Unpin,
377
    B: Buf,
378
{
379
    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380
        let span = tracing::trace_span!("server_handshake");
381
        let entered = span.enter();
382
383
        // Create the codec.
384
        let mut codec = Codec::new(io);
385
386
        if let Some(max) = builder.settings.max_frame_size() {
387
            codec.set_max_recv_frame_size(max as usize);
388
        }
389
390
        if let Some(max) = builder.settings.max_header_list_size() {
391
            codec.set_max_recv_header_list_size(max as usize);
392
        }
393
394
        // Send initial settings frame.
395
        codec
396
            .buffer(builder.settings.clone().into())
397
            .expect("invalid SETTINGS frame");
398
399
        // Create the handshake future.
400
        let state =
401
            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403
        drop(entered);
404
405
        Handshake {
406
            builder,
407
            state,
408
            span,
409
        }
410
    }
411
412
    /// Accept the next incoming request on this connection.
413
    pub async fn accept(
414
        &mut self,
415
    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416
        crate::poll_fn(move |cx| self.poll_accept(cx)).await
417
    }
418
419
    #[doc(hidden)]
420
    pub fn poll_accept(
421
        &mut self,
422
        cx: &mut Context<'_>,
423
    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424
        // Always try to advance the internal state. Getting Pending also is
425
        // needed to allow this function to return Pending.
426
        if self.poll_closed(cx)?.is_ready() {
427
            // If the socket is closed, don't return anything
428
            // TODO: drop any pending streams
429
            return Poll::Ready(None);
430
        }
431
432
        if let Some(inner) = self.connection.next_incoming() {
433
            tracing::trace!("received incoming");
434
            let (head, _) = inner.take_request().into_parts();
435
            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437
            let request = Request::from_parts(head, body);
438
            let respond = SendResponse { inner };
439
440
            return Poll::Ready(Some(Ok((request, respond))));
441
        }
442
443
        Poll::Pending
444
    }
445
446
    /// Sets the target window size for the whole connection.
447
    ///
448
    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449
    /// frame will be immediately sent to the remote, increasing the connection
450
    /// level window by `size - current_value`.
451
    ///
452
    /// If `size` is less than the current value, nothing will happen
453
    /// immediately. However, as window capacity is released by
454
    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455
    /// out until the number of "in flight" bytes drops below `size`.
456
    ///
457
    /// The default value is 65,535.
458
    ///
459
    /// See [`FlowControl`] documentation for more details.
460
    ///
461
    /// [`FlowControl`]: ../struct.FlowControl.html
462
    /// [library level]: ../index.html#flow-control
463
    pub fn set_target_window_size(&mut self, size: u32) {
464
        assert!(size <= proto::MAX_WINDOW_SIZE);
465
        self.connection.set_target_window_size(size);
466
    }
467
468
    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469
    /// flow control for received data.
470
    ///
471
    /// The `SETTINGS` will be sent to the remote, and only applied once the
472
    /// remote acknowledges the change.
473
    ///
474
    /// This can be used to increase or decrease the window size for existing
475
    /// streams.
476
    ///
477
    /// # Errors
478
    ///
479
    /// Returns an error if a previous call is still pending acknowledgement
480
    /// from the remote endpoint.
481
    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482
        assert!(size <= proto::MAX_WINDOW_SIZE);
483
        self.connection.set_initial_window_size(size)?;
484
        Ok(())
485
    }
486
487
    /// Enables the [extended CONNECT protocol].
488
    ///
489
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490
    ///
491
    /// # Errors
492
    ///
493
    /// Returns an error if a previous call is still pending acknowledgement
494
    /// from the remote endpoint.
495
    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496
        self.connection.set_enable_connect_protocol()?;
497
        Ok(())
498
    }
499
500
    /// Returns `Ready` when the underlying connection has closed.
501
    ///
502
    /// If any new inbound streams are received during a call to `poll_closed`,
503
    /// they will be queued and returned on the next call to [`poll_accept`].
504
    ///
505
    /// This function will advance the internal connection state, driving
506
    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507
    ///
508
    /// See [here](index.html#managing-the-connection) for more details.
509
    ///
510
    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511
    /// [`RecvStream`]: ../struct.RecvStream.html
512
    /// [`SendStream`]: ../struct.SendStream.html
513
    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514
        self.connection.poll(cx).map_err(Into::into)
515
    }
516
517
    /// Sets the connection to a GOAWAY state.
518
    ///
519
    /// Does not terminate the connection. Must continue being polled to close
520
    /// connection.
521
    ///
522
    /// After flushing the GOAWAY frame, the connection is closed. Any
523
    /// outstanding streams do not prevent the connection from closing. This
524
    /// should usually be reserved for shutting down when something bad
525
    /// external to `h2` has happened, and open streams cannot be properly
526
    /// handled.
527
    ///
528
    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
529
    pub fn abrupt_shutdown(&mut self, reason: Reason) {
530
        self.connection.go_away_from_user(reason);
531
    }
532
533
    /// Starts a [graceful shutdown][1] process.
534
    ///
535
    /// Must continue being polled to close connection.
536
    ///
537
    /// It's possible to receive more requests after calling this method, since
538
    /// they might have been in-flight from the client already. After about
539
    /// 1 RTT, no new requests should be accepted. Once all active streams
540
    /// have completed, the connection is closed.
541
    ///
542
    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
543
    pub fn graceful_shutdown(&mut self) {
544
        self.connection.go_away_gracefully();
545
    }
546
547
    /// Takes a `PingPong` instance from the connection.
548
    ///
549
    /// # Note
550
    ///
551
    /// This may only be called once. Calling multiple times will return `None`.
552
    pub fn ping_pong(&mut self) -> Option<PingPong> {
553
        self.connection.take_user_pings().map(PingPong::new)
554
    }
555
556
    /// Checks if there are any streams
557
    pub fn has_streams(&self) -> bool {
558
        self.connection.has_streams()
559
    }
560
561
    /// Returns the maximum number of concurrent streams that may be initiated
562
    /// by the server on this connection.
563
    ///
564
    /// This limit is configured by the client peer by sending the
565
    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
566
    /// This method returns the currently acknowledged value received from the
567
    /// remote.
568
    ///
569
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
570
    pub fn max_concurrent_send_streams(&self) -> usize {
571
        self.connection.max_send_streams()
572
    }
573
574
    /// Returns the maximum number of concurrent streams that may be initiated
575
    /// by the client on this connection.
576
    ///
577
    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
578
    /// parameter][1] sent in a `SETTINGS` frame that has been
579
    /// acknowledged by the remote peer. The value to be sent is configured by
580
    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
581
    /// with the remote peer.
582
    ///
583
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
584
    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
585
    pub fn max_concurrent_recv_streams(&self) -> usize {
586
        self.connection.max_recv_streams()
587
    }
588
589
    // Could disappear at anytime.
590
    #[doc(hidden)]
591
    #[cfg(feature = "unstable")]
592
    pub fn num_wired_streams(&self) -> usize {
593
        self.connection.num_wired_streams()
594
    }
595
}
596
597
#[cfg(feature = "stream")]
598
impl<T, B> futures_core::Stream for Connection<T, B>
599
where
600
    T: AsyncRead + AsyncWrite + Unpin,
601
    B: Buf,
602
{
603
    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
604
605
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
606
        self.poll_accept(cx)
607
    }
608
}
609
610
impl<T, B> fmt::Debug for Connection<T, B>
611
where
612
    T: fmt::Debug,
613
    B: fmt::Debug + Buf,
614
{
615
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
616
        fmt.debug_struct("Connection")
617
            .field("connection", &self.connection)
618
            .finish()
619
    }
620
}
621
622
// ===== impl Builder =====
623
624
impl Builder {
625
    /// Returns a new server builder instance initialized with default
626
    /// configuration values.
627
    ///
628
    /// Configuration methods can be chained on the return value.
629
    ///
630
    /// # Examples
631
    ///
632
    /// ```
633
    /// # use tokio::io::{AsyncRead, AsyncWrite};
634
    /// # use h2::server::*;
635
    /// #
636
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
637
    /// # -> Handshake<T>
638
    /// # {
639
    /// // `server_fut` is a future representing the completion of the HTTP/2
640
    /// // handshake.
641
    /// let server_fut = Builder::new()
642
    ///     .initial_window_size(1_000_000)
643
    ///     .max_concurrent_streams(1000)
644
    ///     .handshake(my_io);
645
    /// # server_fut
646
    /// # }
647
    /// #
648
    /// # pub fn main() {}
649
    /// ```
650
0
    pub fn new() -> Builder {
651
0
        Builder {
652
0
            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
653
0
            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
654
0
            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
655
0
            settings: Settings::default(),
656
0
            initial_target_connection_window_size: None,
657
0
            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658
0
659
0
            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
660
0
        }
661
0
    }
662
663
    /// Indicates the initial window size (in octets) for stream-level
664
    /// flow control for received data.
665
    ///
666
    /// The initial window of a stream is used as part of flow control. For more
667
    /// details, see [`FlowControl`].
668
    ///
669
    /// The default value is 65,535.
670
    ///
671
    /// [`FlowControl`]: ../struct.FlowControl.html
672
    ///
673
    /// # Examples
674
    ///
675
    /// ```
676
    /// # use tokio::io::{AsyncRead, AsyncWrite};
677
    /// # use h2::server::*;
678
    /// #
679
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
680
    /// # -> Handshake<T>
681
    /// # {
682
    /// // `server_fut` is a future representing the completion of the HTTP/2
683
    /// // handshake.
684
    /// let server_fut = Builder::new()
685
    ///     .initial_window_size(1_000_000)
686
    ///     .handshake(my_io);
687
    /// # server_fut
688
    /// # }
689
    /// #
690
    /// # pub fn main() {}
691
    /// ```
692
0
    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
693
0
        self.settings.set_initial_window_size(Some(size));
694
0
        self
695
0
    }
696
697
    /// Indicates the initial window size (in octets) for connection-level flow control
698
    /// for received data.
699
    ///
700
    /// The initial window of a connection is used as part of flow control. For more details,
701
    /// see [`FlowControl`].
702
    ///
703
    /// The default value is 65,535.
704
    ///
705
    /// [`FlowControl`]: ../struct.FlowControl.html
706
    ///
707
    /// # Examples
708
    ///
709
    /// ```
710
    /// # use tokio::io::{AsyncRead, AsyncWrite};
711
    /// # use h2::server::*;
712
    /// #
713
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
714
    /// # -> Handshake<T>
715
    /// # {
716
    /// // `server_fut` is a future representing the completion of the HTTP/2
717
    /// // handshake.
718
    /// let server_fut = Builder::new()
719
    ///     .initial_connection_window_size(1_000_000)
720
    ///     .handshake(my_io);
721
    /// # server_fut
722
    /// # }
723
    /// #
724
    /// # pub fn main() {}
725
    /// ```
726
0
    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
727
0
        self.initial_target_connection_window_size = Some(size);
728
0
        self
729
0
    }
730
731
    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
732
    /// configured server is able to accept.
733
    ///
734
    /// The sender may send data frames that are **smaller** than this value,
735
    /// but any data larger than `max` will be broken up into multiple `DATA`
736
    /// frames.
737
    ///
738
    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
739
    ///
740
    /// # Examples
741
    ///
742
    /// ```
743
    /// # use tokio::io::{AsyncRead, AsyncWrite};
744
    /// # use h2::server::*;
745
    /// #
746
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747
    /// # -> Handshake<T>
748
    /// # {
749
    /// // `server_fut` is a future representing the completion of the HTTP/2
750
    /// // handshake.
751
    /// let server_fut = Builder::new()
752
    ///     .max_frame_size(1_000_000)
753
    ///     .handshake(my_io);
754
    /// # server_fut
755
    /// # }
756
    /// #
757
    /// # pub fn main() {}
758
    /// ```
759
    ///
760
    /// # Panics
761
    ///
762
    /// This function panics if `max` is not within the legal range specified
763
    /// above.
764
0
    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765
0
        self.settings.set_max_frame_size(Some(max));
766
0
        self
767
0
    }
768
769
    /// Sets the max size of received header frames.
770
    ///
771
    /// This advisory setting informs a peer of the maximum size of header list
772
    /// that the sender is prepared to accept, in octets. The value is based on
773
    /// the uncompressed size of header fields, including the length of the name
774
    /// and value in octets plus an overhead of 32 octets for each header field.
775
    ///
776
    /// This setting is also used to limit the maximum amount of data that is
777
    /// buffered to decode HEADERS frames.
778
    ///
779
    /// # Examples
780
    ///
781
    /// ```
782
    /// # use tokio::io::{AsyncRead, AsyncWrite};
783
    /// # use h2::server::*;
784
    /// #
785
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
786
    /// # -> Handshake<T>
787
    /// # {
788
    /// // `server_fut` is a future representing the completion of the HTTP/2
789
    /// // handshake.
790
    /// let server_fut = Builder::new()
791
    ///     .max_header_list_size(16 * 1024)
792
    ///     .handshake(my_io);
793
    /// # server_fut
794
    /// # }
795
    /// #
796
    /// # pub fn main() {}
797
    /// ```
798
0
    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
799
0
        self.settings.set_max_header_list_size(Some(max));
800
0
        self
801
0
    }
802
803
    /// Sets the maximum number of concurrent streams.
804
    ///
805
    /// The maximum concurrent streams setting only controls the maximum number
806
    /// of streams that can be initiated by the remote peer. In other words,
807
    /// when this setting is set to 100, this does not limit the number of
808
    /// concurrent streams that can be created by the caller.
809
    ///
810
    /// It is recommended that this value be no smaller than 100, so as to not
811
    /// unnecessarily limit parallelism. However, any value is legal, including
812
    /// 0. If `max` is set to 0, then the remote will not be permitted to
813
    /// initiate streams.
814
    ///
815
    /// Note that streams in the reserved state, i.e., push promises that have
816
    /// been reserved but the stream has not started, do not count against this
817
    /// setting.
818
    ///
819
    /// Also note that if the remote *does* exceed the value set here, it is not
820
    /// a protocol level error. Instead, the `h2` library will immediately reset
821
    /// the stream.
822
    ///
823
    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
824
    ///
825
    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
826
    ///
827
    /// # Examples
828
    ///
829
    /// ```
830
    /// # use tokio::io::{AsyncRead, AsyncWrite};
831
    /// # use h2::server::*;
832
    /// #
833
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
834
    /// # -> Handshake<T>
835
    /// # {
836
    /// // `server_fut` is a future representing the completion of the HTTP/2
837
    /// // handshake.
838
    /// let server_fut = Builder::new()
839
    ///     .max_concurrent_streams(1000)
840
    ///     .handshake(my_io);
841
    /// # server_fut
842
    /// # }
843
    /// #
844
    /// # pub fn main() {}
845
    /// ```
846
0
    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
847
0
        self.settings.set_max_concurrent_streams(Some(max));
848
0
        self
849
0
    }
850
851
    /// Sets the maximum number of concurrent locally reset streams.
852
    ///
853
    /// When a stream is explicitly reset by either calling
854
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
855
    /// before completing the stream, the HTTP/2 specification requires that
856
    /// any further frames received for that stream must be ignored for "some
857
    /// time".
858
    ///
859
    /// In order to satisfy the specification, internal state must be maintained
860
    /// to implement the behavior. This state grows linearly with the number of
861
    /// streams that are locally reset.
862
    ///
863
    /// The `max_concurrent_reset_streams` setting configures sets an upper
864
    /// bound on the amount of state that is maintained. When this max value is
865
    /// reached, the oldest reset stream is purged from memory.
866
    ///
867
    /// Once the stream has been fully purged from memory, any additional frames
868
    /// received for that stream will result in a connection level protocol
869
    /// error, forcing the connection to terminate.
870
    ///
871
    /// The default value is currently 50.
872
    ///
873
    /// # Examples
874
    ///
875
    /// ```
876
    /// # use tokio::io::{AsyncRead, AsyncWrite};
877
    /// # use h2::server::*;
878
    /// #
879
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
880
    /// # -> Handshake<T>
881
    /// # {
882
    /// // `server_fut` is a future representing the completion of the HTTP/2
883
    /// // handshake.
884
    /// let server_fut = Builder::new()
885
    ///     .max_concurrent_reset_streams(1000)
886
    ///     .handshake(my_io);
887
    /// # server_fut
888
    /// # }
889
    /// #
890
    /// # pub fn main() {}
891
    /// ```
892
0
    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
893
0
        self.reset_stream_max = max;
894
0
        self
895
0
    }
896
897
    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
898
    ///
899
    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
900
    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
901
    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
902
    ///
903
    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
904
    /// `ENHANCE_YOUR_CALM` to the client.
905
    ///
906
    /// If you really want to disable this, supply [`Option::None`] here.
907
    /// Disabling this is not recommended and may expose you to DOS attacks.
908
    ///
909
    /// The default value is currently 1024, but could change.
910
0
    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
911
0
        self.local_max_error_reset_streams = max;
912
0
        self
913
0
    }
914
915
    /// Sets the maximum number of pending-accept remotely-reset streams.
916
    ///
917
    /// Streams that have been received by the peer, but not accepted by the
918
    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
919
    /// could send a request and then shortly after, realize it is not needed,
920
    /// sending a CANCEL.
921
    ///
922
    /// However, since those streams are now "closed", they don't count towards
923
    /// the max concurrent streams. So, they will sit in the accept queue,
924
    /// using memory.
925
    ///
926
    /// When the number of remotely-reset streams sitting in the pending-accept
927
    /// queue reaches this maximum value, a connection error with the code of
928
    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
929
    /// `Future`.
930
    ///
931
    /// The default value is currently 20, but could change.
932
    ///
933
    /// # Examples
934
    ///
935
    ///
936
    /// ```
937
    /// # use tokio::io::{AsyncRead, AsyncWrite};
938
    /// # use h2::server::*;
939
    /// #
940
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
941
    /// # -> Handshake<T>
942
    /// # {
943
    /// // `server_fut` is a future representing the completion of the HTTP/2
944
    /// // handshake.
945
    /// let server_fut = Builder::new()
946
    ///     .max_pending_accept_reset_streams(100)
947
    ///     .handshake(my_io);
948
    /// # server_fut
949
    /// # }
950
    /// #
951
    /// # pub fn main() {}
952
    /// ```
953
0
    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
954
0
        self.pending_accept_reset_stream_max = max;
955
0
        self
956
0
    }
957
958
    /// Sets the maximum send buffer size per stream.
959
    ///
960
    /// Once a stream has buffered up to (or over) the maximum, the stream's
961
    /// flow control will not "poll" additional capacity. Once bytes for the
962
    /// stream have been written to the connection, the send buffer capacity
963
    /// will be freed up again.
964
    ///
965
    /// The default is currently ~400KB, but may change.
966
    ///
967
    /// # Panics
968
    ///
969
    /// This function panics if `max` is larger than `u32::MAX`.
970
0
    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
971
0
        assert!(max <= u32::MAX as usize);
972
0
        self.max_send_buffer_size = max;
973
0
        self
974
0
    }
975
976
    /// Sets the maximum number of concurrent locally reset streams.
977
    ///
978
    /// When a stream is explicitly reset by either calling
979
    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
980
    /// before completing the stream, the HTTP/2 specification requires that
981
    /// any further frames received for that stream must be ignored for "some
982
    /// time".
983
    ///
984
    /// In order to satisfy the specification, internal state must be maintained
985
    /// to implement the behavior. This state grows linearly with the number of
986
    /// streams that are locally reset.
987
    ///
988
    /// The `reset_stream_duration` setting configures the max amount of time
989
    /// this state will be maintained in memory. Once the duration elapses, the
990
    /// stream state is purged from memory.
991
    ///
992
    /// Once the stream has been fully purged from memory, any additional frames
993
    /// received for that stream will result in a connection level protocol
994
    /// error, forcing the connection to terminate.
995
    ///
996
    /// The default value is currently 1 second.
997
    ///
998
    /// # Examples
999
    ///
1000
    /// ```
1001
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1002
    /// # use h2::server::*;
1003
    /// # use std::time::Duration;
1004
    /// #
1005
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1006
    /// # -> Handshake<T>
1007
    /// # {
1008
    /// // `server_fut` is a future representing the completion of the HTTP/2
1009
    /// // handshake.
1010
    /// let server_fut = Builder::new()
1011
    ///     .reset_stream_duration(Duration::from_secs(10))
1012
    ///     .handshake(my_io);
1013
    /// # server_fut
1014
    /// # }
1015
    /// #
1016
    /// # pub fn main() {}
1017
    /// ```
1018
0
    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1019
0
        self.reset_stream_duration = dur;
1020
0
        self
1021
0
    }
1022
1023
    /// Enables the [extended CONNECT protocol].
1024
    ///
1025
    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1026
0
    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1027
0
        self.settings.set_enable_connect_protocol(Some(1));
1028
0
        self
1029
0
    }
1030
1031
    /// Creates a new configured HTTP/2 server backed by `io`.
1032
    ///
1033
    /// It is expected that `io` already be in an appropriate state to commence
1034
    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1035
    ///
1036
    /// Returns a future which resolves to the [`Connection`] instance once the
1037
    /// HTTP/2 handshake has been completed.
1038
    ///
1039
    /// This function also allows the caller to configure the send payload data
1040
    /// type. See [Outbound data type] for more details.
1041
    ///
1042
    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1043
    /// [Handshake]: ../index.html#handshake
1044
    /// [`Connection`]: struct.Connection.html
1045
    /// [Outbound data type]: ../index.html#outbound-data-type.
1046
    ///
1047
    /// # Examples
1048
    ///
1049
    /// Basic usage:
1050
    ///
1051
    /// ```
1052
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1053
    /// # use h2::server::*;
1054
    /// #
1055
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1056
    /// # -> Handshake<T>
1057
    /// # {
1058
    /// // `server_fut` is a future representing the completion of the HTTP/2
1059
    /// // handshake.
1060
    /// let server_fut = Builder::new()
1061
    ///     .handshake(my_io);
1062
    /// # server_fut
1063
    /// # }
1064
    /// #
1065
    /// # pub fn main() {}
1066
    /// ```
1067
    ///
1068
    /// Configures the send-payload data type. In this case, the outbound data
1069
    /// type will be `&'static [u8]`.
1070
    ///
1071
    /// ```
1072
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1073
    /// # use h2::server::*;
1074
    /// #
1075
    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1076
    /// # -> Handshake<T, &'static [u8]>
1077
    /// # {
1078
    /// // `server_fut` is a future representing the completion of the HTTP/2
1079
    /// // handshake.
1080
    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1081
    ///     .handshake(my_io);
1082
    /// # server_fut
1083
    /// # }
1084
    /// #
1085
    /// # pub fn main() {}
1086
    /// ```
1087
    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1088
    where
1089
        T: AsyncRead + AsyncWrite + Unpin,
1090
        B: Buf,
1091
    {
1092
        Connection::handshake2(io, self.clone())
1093
    }
1094
}
1095
1096
impl Default for Builder {
1097
0
    fn default() -> Builder {
1098
0
        Builder::new()
1099
0
    }
1100
}
1101
1102
// ===== impl SendResponse =====
1103
1104
impl<B: Buf> SendResponse<B> {
1105
    /// Send an interim informational response (1xx status codes)
1106
    ///
1107
    /// This method can be called multiple times before calling `send_response()`
1108
    /// to send the final response. Only 1xx status codes are allowed.
1109
    ///
1110
    /// Interim informational responses are used to provide early feedback to the client
1111
    /// before the final response is ready. Common examples include:
1112
    /// - 100 Continue: Indicates the client should continue with the request
1113
    /// - 103 Early Hints: Provides early hints about resources to preload
1114
    ///
1115
    /// # Arguments
1116
    /// * `response` - HTTP response with 1xx status code and headers
1117
    ///
1118
    /// # Returns
1119
    /// * `Ok(())` - Interim Informational response sent successfully
1120
    /// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.)
1121
    ///
1122
    /// # Examples
1123
    /// ```rust
1124
    /// use h2::server;
1125
    /// use http::{Response, StatusCode};
1126
    ///
1127
    /// # async fn example(mut send_response: h2::server::SendResponse<bytes::Bytes>) -> Result<(), h2::Error> {
1128
    /// // Send 100 Continue before processing request body
1129
    /// let continue_response = Response::builder()
1130
    ///     .status(StatusCode::CONTINUE)
1131
    ///     .body(())
1132
    ///     .unwrap();
1133
    /// send_response.send_informational(continue_response)?;
1134
    ///
1135
    /// // Later send the final response
1136
    /// let final_response = Response::builder()
1137
    ///     .status(StatusCode::OK)
1138
    ///     .body(())
1139
    ///     .unwrap();
1140
    /// let _stream = send_response.send_response(final_response, false)?;
1141
    /// # Ok(())
1142
    /// # }
1143
    /// ```
1144
    ///
1145
    /// # Errors
1146
    /// This method will return an error if:
1147
    /// - The response status code is not in the 1xx range
1148
    /// - The final response has already been sent
1149
    /// - There is a connection-level error
1150
    pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> {
1151
        let stream_id = self.inner.stream_id();
1152
        let status = response.status();
1153
1154
        tracing::trace!(
1155
            "send_informational called with status: {} on stream: {:?}",
1156
            status,
1157
            stream_id
1158
        );
1159
1160
        // Validate that this is an informational response (1xx status code)
1161
        if !response.status().is_informational() {
1162
            tracing::trace!(
1163
                "invalid informational status code: {} on stream: {:?}",
1164
                status,
1165
                stream_id
1166
            );
1167
            return Err(crate::Error::from(
1168
                UserError::InvalidInformationalStatusCode,
1169
            ));
1170
        }
1171
1172
        tracing::trace!(
1173
            "converting informational response to HEADERS frame without END_STREAM flag for stream: {:?}",
1174
            stream_id
1175
        );
1176
1177
        let frame = Peer::convert_send_message(
1178
            stream_id, response, false, // NOT end_of_stream for informational responses
1179
        );
1180
1181
        tracing::trace!(
1182
            "sending interim informational headers frame for stream: {:?}",
1183
            stream_id
1184
        );
1185
1186
        // Use the proper H2 streams API for sending interim informational headers
1187
        // This bypasses the normal response flow and allows multiple informational responses
1188
        let result = self
1189
            .inner
1190
            .send_informational_headers(frame)
1191
            .map_err(Into::into);
1192
1193
        match &result {
1194
            Ok(()) => tracing::trace!(
1195
                "Successfully sent informational headers for stream: {:?}",
1196
                stream_id
1197
            ),
1198
            Err(e) => tracing::trace!(
1199
                "Failed to send informational headers for stream: {:?}: {:?}",
1200
                stream_id,
1201
                e
1202
            ),
1203
        }
1204
1205
        result
1206
    }
1207
1208
    /// Send a response to a client request.
1209
    ///
1210
    /// On success, a [`SendStream`] instance is returned. This instance can be
1211
    /// used to stream the response body and send trailers.
1212
    ///
1213
    /// If a body or trailers will be sent on the returned [`SendStream`]
1214
    /// instance, then `end_of_stream` must be set to `false` when calling this
1215
    /// function.
1216
    ///
1217
    /// The [`SendResponse`] instance is already associated with a received
1218
    /// request.  This function may only be called once per instance and only if
1219
    /// [`send_reset`] has not been previously called.
1220
    ///
1221
    /// [`SendResponse`]: #
1222
    /// [`SendStream`]: ../struct.SendStream.html
1223
    /// [`send_reset`]: #method.send_reset
1224
    pub fn send_response(
1225
        &mut self,
1226
        response: Response<()>,
1227
        end_of_stream: bool,
1228
    ) -> Result<SendStream<B>, crate::Error> {
1229
        self.inner
1230
            .send_response(response, end_of_stream)
1231
            .map(|_| SendStream::new(self.inner.clone()))
1232
            .map_err(Into::into)
1233
    }
1234
1235
    /// Push a request and response to the client
1236
    ///
1237
    /// On success, a [`SendResponse`] instance is returned.
1238
    ///
1239
    /// [`SendResponse`]: #
1240
    pub fn push_request(
1241
        &mut self,
1242
        request: Request<()>,
1243
    ) -> Result<SendPushedResponse<B>, crate::Error> {
1244
        self.inner
1245
            .send_push_promise(request)
1246
            .map(|inner| SendPushedResponse {
1247
                inner: SendResponse { inner },
1248
            })
1249
            .map_err(Into::into)
1250
    }
1251
1252
    /// Send a stream reset to the peer.
1253
    ///
1254
    /// This essentially cancels the stream, including any inbound or outbound
1255
    /// data streams.
1256
    ///
1257
    /// If this function is called before [`send_response`], a call to
1258
    /// [`send_response`] will result in an error.
1259
    ///
1260
    /// If this function is called while a [`SendStream`] instance is active,
1261
    /// any further use of the instance will result in an error.
1262
    ///
1263
    /// This function should only be called once.
1264
    ///
1265
    /// [`send_response`]: #method.send_response
1266
    /// [`SendStream`]: ../struct.SendStream.html
1267
    pub fn send_reset(&mut self, reason: Reason) {
1268
        self.inner.send_reset(reason)
1269
    }
1270
1271
    /// Polls to be notified when the client resets this stream.
1272
    ///
1273
    /// If stream is still open, this returns `Poll::Pending`, and
1274
    /// registers the task to be notified if a `RST_STREAM` is received.
1275
    ///
1276
    /// If a `RST_STREAM` frame is received for this stream, calling this
1277
    /// method will yield the `Reason` for the reset.
1278
    ///
1279
    /// # Error
1280
    ///
1281
    /// Calling this method after having called `send_response` will return
1282
    /// a user error.
1283
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1284
        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1285
    }
1286
1287
    /// Returns the stream ID of the response stream.
1288
    ///
1289
    /// # Panics
1290
    ///
1291
    /// If the lock on the stream store has been poisoned.
1292
    pub fn stream_id(&self) -> crate::StreamId {
1293
        crate::StreamId::from_internal(self.inner.stream_id())
1294
    }
1295
}
1296
1297
// ===== impl SendPushedResponse =====
1298
1299
impl<B: Buf> SendPushedResponse<B> {
1300
    /// Send a response to a promised request.
1301
    ///
1302
    /// On success, a [`SendStream`] instance is returned. This instance can be
1303
    /// used to stream the response body and send trailers.
1304
    ///
1305
    /// If a body or trailers will be sent on the returned [`SendStream`]
1306
    /// instance, then `end_of_stream` must be set to `false` when calling this
1307
    /// function.
1308
    ///
1309
    /// The [`SendPushedResponse`] instance is associated with a promised
1310
    /// request.  This function may only be called once per instance and only if
1311
    /// [`send_reset`] has not been previously called.
1312
    ///
1313
    /// [`SendPushedResponse`]: #
1314
    /// [`SendStream`]: ../struct.SendStream.html
1315
    /// [`send_reset`]: #method.send_reset
1316
    pub fn send_response(
1317
        &mut self,
1318
        response: Response<()>,
1319
        end_of_stream: bool,
1320
    ) -> Result<SendStream<B>, crate::Error> {
1321
        self.inner.send_response(response, end_of_stream)
1322
    }
1323
1324
    /// Send a stream reset to the peer.
1325
    ///
1326
    /// This essentially cancels the stream, including any inbound or outbound
1327
    /// data streams.
1328
    ///
1329
    /// If this function is called before [`send_response`], a call to
1330
    /// [`send_response`] will result in an error.
1331
    ///
1332
    /// If this function is called while a [`SendStream`] instance is active,
1333
    /// any further use of the instance will result in an error.
1334
    ///
1335
    /// This function should only be called once.
1336
    ///
1337
    /// [`send_response`]: #method.send_response
1338
    /// [`SendStream`]: ../struct.SendStream.html
1339
    pub fn send_reset(&mut self, reason: Reason) {
1340
        self.inner.send_reset(reason)
1341
    }
1342
1343
    /// Polls to be notified when the client resets this stream.
1344
    ///
1345
    /// If stream is still open, this returns `Poll::Pending`, and
1346
    /// registers the task to be notified if a `RST_STREAM` is received.
1347
    ///
1348
    /// If a `RST_STREAM` frame is received for this stream, calling this
1349
    /// method will yield the `Reason` for the reset.
1350
    ///
1351
    /// # Error
1352
    ///
1353
    /// Calling this method after having called `send_response` will return
1354
    /// a user error.
1355
    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1356
        self.inner.poll_reset(cx)
1357
    }
1358
1359
    /// Returns the stream ID of the response stream.
1360
    ///
1361
    /// # Panics
1362
    ///
1363
    /// If the lock on the stream store has been poisoned.
1364
    pub fn stream_id(&self) -> crate::StreamId {
1365
        self.inner.stream_id()
1366
    }
1367
}
1368
1369
// ===== impl Flush =====
1370
1371
impl<T, B: Buf> Flush<T, B> {
1372
    fn new(codec: Codec<T, B>) -> Self {
1373
        Flush { codec: Some(codec) }
1374
    }
1375
}
1376
1377
impl<T, B> Future for Flush<T, B>
1378
where
1379
    T: AsyncWrite + Unpin,
1380
    B: Buf,
1381
{
1382
    type Output = Result<Codec<T, B>, crate::Error>;
1383
1384
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1385
        // Flush the codec
1386
        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1387
1388
        // Return the codec
1389
        Poll::Ready(Ok(self.codec.take().unwrap()))
1390
    }
1391
}
1392
1393
impl<T, B: Buf> ReadPreface<T, B> {
1394
    fn new(codec: Codec<T, B>) -> Self {
1395
        ReadPreface {
1396
            codec: Some(codec),
1397
            pos: 0,
1398
        }
1399
    }
1400
1401
    fn inner_mut(&mut self) -> &mut T {
1402
        self.codec.as_mut().unwrap().get_mut()
1403
    }
1404
}
1405
1406
impl<T, B> Future for ReadPreface<T, B>
1407
where
1408
    T: AsyncRead + Unpin,
1409
    B: Buf,
1410
{
1411
    type Output = Result<Codec<T, B>, crate::Error>;
1412
1413
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1414
        let mut buf = [0; 24];
1415
        let mut rem = PREFACE.len() - self.pos;
1416
1417
        while rem > 0 {
1418
            let mut buf = ReadBuf::new(&mut buf[..rem]);
1419
            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1420
                .map_err(crate::Error::from_io)?;
1421
            let n = buf.filled().len();
1422
            if n == 0 {
1423
                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1424
                    io::ErrorKind::UnexpectedEof,
1425
                    "connection closed before reading preface",
1426
                ))));
1427
            }
1428
1429
            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1430
                proto_err!(conn: "read_preface: invalid preface");
1431
                // TODO: Should this just write the GO_AWAY frame directly?
1432
                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1433
            }
1434
1435
            self.pos += n;
1436
            rem -= n; // TODO test
1437
        }
1438
1439
        Poll::Ready(Ok(self.codec.take().unwrap()))
1440
    }
1441
}
1442
1443
// ===== impl Handshake =====
1444
1445
impl<T, B: Buf> Future for Handshake<T, B>
1446
where
1447
    T: AsyncRead + AsyncWrite + Unpin,
1448
    B: Buf,
1449
{
1450
    type Output = Result<Connection<T, B>, crate::Error>;
1451
1452
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1453
        let span = self.span.clone(); // XXX(eliza): T_T
1454
        let _e = span.enter();
1455
        tracing::trace!(state = ?self.state);
1456
1457
        loop {
1458
            match &mut self.state {
1459
                Handshaking::Flushing(flush) => {
1460
                    // We're currently flushing a pending SETTINGS frame. Poll the
1461
                    // flush future, and, if it's completed, advance our state to wait
1462
                    // for the client preface.
1463
                    let codec = match Pin::new(flush).poll(cx)? {
1464
                        Poll::Pending => {
1465
                            tracing::trace!(flush.poll = %"Pending");
1466
                            return Poll::Pending;
1467
                        }
1468
                        Poll::Ready(flushed) => {
1469
                            tracing::trace!(flush.poll = %"Ready");
1470
                            flushed
1471
                        }
1472
                    };
1473
                    self.state = Handshaking::ReadingPreface(
1474
                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1475
                    );
1476
                }
1477
                Handshaking::ReadingPreface(read) => {
1478
                    let codec = ready!(Pin::new(read).poll(cx)?);
1479
1480
                    self.state = Handshaking::Done;
1481
1482
                    let connection = proto::Connection::new(
1483
                        codec,
1484
                        Config {
1485
                            next_stream_id: 2.into(),
1486
                            // Server does not need to locally initiate any streams
1487
                            initial_max_send_streams: 0,
1488
                            max_send_buffer_size: self.builder.max_send_buffer_size,
1489
                            reset_stream_duration: self.builder.reset_stream_duration,
1490
                            reset_stream_max: self.builder.reset_stream_max,
1491
                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1492
                            local_error_reset_streams_max: self
1493
                                .builder
1494
                                .local_max_error_reset_streams,
1495
                            settings: self.builder.settings.clone(),
1496
                        },
1497
                    );
1498
1499
                    tracing::trace!("connection established!");
1500
                    let mut c = Connection { connection };
1501
                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1502
                        c.set_target_window_size(sz);
1503
                    }
1504
1505
                    return Poll::Ready(Ok(c));
1506
                }
1507
                Handshaking::Done => {
1508
                    panic!("Handshaking::poll() called again after handshaking was complete")
1509
                }
1510
            }
1511
        }
1512
    }
1513
}
1514
1515
impl<T, B> fmt::Debug for Handshake<T, B>
1516
where
1517
    T: AsyncRead + AsyncWrite + fmt::Debug,
1518
    B: fmt::Debug + Buf,
1519
{
1520
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1521
        write!(fmt, "server::Handshake")
1522
    }
1523
}
1524
1525
impl Peer {
1526
0
    pub fn convert_send_message(
1527
0
        id: StreamId,
1528
0
        response: Response<()>,
1529
0
        end_of_stream: bool,
1530
0
    ) -> frame::Headers {
1531
        use http::response::Parts;
1532
1533
        // Extract the components of the HTTP request
1534
        let (
1535
            Parts {
1536
0
                status, headers, ..
1537
            },
1538
            _,
1539
0
        ) = response.into_parts();
1540
1541
        // Build the set pseudo header set. All requests will include `method`
1542
        // and `path`.
1543
0
        let pseudo = Pseudo::response(status);
1544
1545
        // Create the HEADERS frame
1546
0
        let mut frame = frame::Headers::new(id, pseudo, headers);
1547
1548
0
        if end_of_stream {
1549
0
            frame.set_end_stream()
1550
0
        }
1551
1552
0
        frame
1553
0
    }
1554
1555
0
    pub fn convert_push_message(
1556
0
        stream_id: StreamId,
1557
0
        promised_id: StreamId,
1558
0
        request: Request<()>,
1559
0
    ) -> Result<frame::PushPromise, UserError> {
1560
        use http::request::Parts;
1561
1562
0
        if let Err(e) = frame::PushPromise::validate_request(&request) {
1563
            use PushPromiseHeaderError::*;
1564
0
            match e {
1565
0
                NotSafeAndCacheable => tracing::debug!(
1566
                    ?promised_id,
1567
0
                    "convert_push_message: method {} is not safe and cacheable",
1568
0
                    request.method(),
1569
                ),
1570
0
                InvalidContentLength(e) => tracing::debug!(
1571
                    ?promised_id,
1572
0
                    "convert_push_message; promised request has invalid content-length {:?}",
1573
                    e,
1574
                ),
1575
            }
1576
0
            return Err(UserError::MalformedHeaders);
1577
0
        }
1578
1579
        // Extract the components of the HTTP request
1580
        let (
1581
            Parts {
1582
0
                method,
1583
0
                uri,
1584
0
                headers,
1585
                ..
1586
            },
1587
            _,
1588
0
        ) = request.into_parts();
1589
1590
0
        let pseudo = Pseudo::request(method, uri, None);
1591
1592
0
        Ok(frame::PushPromise::new(
1593
0
            stream_id,
1594
0
            promised_id,
1595
0
            pseudo,
1596
0
            headers,
1597
0
        ))
1598
0
    }
1599
}
1600
1601
impl proto::Peer for Peer {
1602
    type Poll = Request<()>;
1603
1604
    const NAME: &'static str = "Server";
1605
1606
    /*
1607
    fn is_server() -> bool {
1608
        true
1609
    }
1610
    */
1611
1612
0
    fn r#dyn() -> proto::DynPeer {
1613
0
        proto::DynPeer::Server
1614
0
    }
1615
1616
1.15k
    fn convert_poll_message(
1617
1.15k
        pseudo: Pseudo,
1618
1.15k
        fields: HeaderMap,
1619
1.15k
        stream_id: StreamId,
1620
1.15k
    ) -> Result<Self::Poll, Error> {
1621
        use http::{uri, Version};
1622
1623
1.15k
        let mut b = Request::builder();
1624
1625
        macro_rules! malformed {
1626
            ($($arg:tt)*) => {{
1627
                tracing::debug!($($arg)*);
1628
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1629
            }}
1630
        }
1631
1632
1.15k
        b = b.version(Version::HTTP_2);
1633
1634
        let is_connect;
1635
1.15k
        if let Some(method) = pseudo.method {
1636
863
            is_connect = method == Method::CONNECT;
1637
863
            b = b.method(method);
1638
863
        } else {
1639
294
            malformed!("malformed headers: missing method");
1640
        }
1641
1642
863
        let has_protocol = pseudo.protocol.is_some();
1643
863
        if has_protocol {
1644
1
            if is_connect {
1645
0
                // Assert that we have the right type.
1646
0
                b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1647
0
            } else {
1648
1
                malformed!("malformed headers: :protocol on non-CONNECT request");
1649
            }
1650
862
        }
1651
1652
862
        if pseudo.status.is_some() {
1653
14
            malformed!("malformed headers: :status field on request");
1654
848
        }
1655
1656
        // Convert the URI
1657
848
        let mut parts = uri::Parts::default();
1658
1659
        // A request translated from HTTP/1 must not include the :authority
1660
        // header
1661
848
        if let Some(authority) = pseudo.authority {
1662
198
            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1663
198
            parts.authority = Some(maybe_authority.or_else(|why| {
1664
156
                malformed!(
1665
0
                    "malformed headers: malformed authority ({:?}): {}",
1666
                    authority,
1667
                    why,
1668
                )
1669
156
            })?);
1670
650
        }
1671
1672
        // A :scheme is required, except CONNECT.
1673
692
        if let Some(scheme) = pseudo.scheme {
1674
651
            if is_connect && !has_protocol {
1675
0
                malformed!("malformed headers: :scheme in CONNECT");
1676
651
            }
1677
651
            let maybe_scheme = scheme.parse();
1678
651
            let scheme = maybe_scheme.or_else(|why| {
1679
48
                malformed!(
1680
0
                    "malformed headers: malformed scheme ({:?}): {}",
1681
                    scheme,
1682
                    why,
1683
                )
1684
48
            })?;
1685
1686
            // It's not possible to build an `Uri` from a scheme and path. So,
1687
            // after validating is was a valid scheme, we just have to drop it
1688
            // if there isn't an :authority.
1689
603
            if parts.authority.is_some() {
1690
29
                parts.scheme = Some(scheme);
1691
574
            }
1692
41
        } else if !is_connect || has_protocol {
1693
41
            malformed!("malformed headers: missing scheme");
1694
0
        }
1695
1696
603
        if let Some(path) = pseudo.path {
1697
368
            if is_connect && !has_protocol {
1698
0
                malformed!("malformed headers: :path in CONNECT");
1699
368
            }
1700
1701
            // This cannot be empty
1702
368
            if path.is_empty() {
1703
16
                malformed!("malformed headers: missing path");
1704
352
            }
1705
1706
352
            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1707
352
            parts.path_and_query = Some(maybe_path.or_else(|why| {
1708
205
                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1709
205
            })?);
1710
235
        } else if is_connect && has_protocol {
1711
0
            malformed!("malformed headers: missing path in extended CONNECT");
1712
235
        }
1713
1714
382
        b = b.uri(parts);
1715
1716
382
        let mut request = match b.body(()) {
1717
368
            Ok(request) => request,
1718
14
            Err(e) => {
1719
                // TODO: Should there be more specialized handling for different
1720
                // kinds of errors
1721
14
                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1722
14
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1723
            }
1724
        };
1725
1726
368
        *request.headers_mut() = fields;
1727
1728
368
        Ok(request)
1729
1.15k
    }
1730
}
1731
1732
// ===== impl Handshaking =====
1733
1734
impl<T, B> fmt::Debug for Handshaking<T, B>
1735
where
1736
    B: Buf,
1737
{
1738
    #[inline]
1739
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1740
        match *self {
1741
            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1742
            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1743
            Handshaking::Done => f.write_str("Done"),
1744
        }
1745
    }
1746
}