Coverage Report

Created: 2025-07-12 06:22

/src/h2/src/client.rs
Line
Count
Source (jump to first uncovered line)
1
//! Client implementation of the HTTP/2 protocol.
2
//!
3
//! # Getting started
4
//!
5
//! Running an HTTP/2 client requires the caller to establish the underlying
6
//! connection as well as get the connection to a state that is ready to begin
7
//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8
//! details.
9
//!
10
//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11
//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12
//!
13
//! Once a connection is obtained, it is passed to [`handshake`], which will
14
//! begin the [HTTP/2 handshake]. This returns a future that completes once
15
//! the handshake process is performed and HTTP/2 streams may be initialized.
16
//!
17
//! [`handshake`] uses default configuration values. There are a number of
18
//! settings that can be changed by using [`Builder`] instead.
19
//!
20
//! Once the handshake future completes, the caller is provided with a
21
//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22
//! instance is used to drive the connection (see [Managing the connection]).
23
//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24
//! requests]).
25
//!
26
//! # Making requests
27
//!
28
//! Requests are made using the [`SendRequest`] handle provided by the handshake
29
//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30
//! the request is sent to the server.
31
//!
32
//! A request body and request trailers are sent using [`SendRequest`] and the
33
//! server's response is returned once the [`ResponseFuture`] future completes.
34
//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35
//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36
//! initialized by the sent request.
37
//!
38
//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39
//! stream can be created, i.e. as long as the current number of active streams
40
//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41
//! caller will be notified once an existing stream closes, freeing capacity for
42
//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43
//! capacity before sending a request to the server.
44
//!
45
//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46
//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47
//! to do so will result in an [`Error`] being returned.
48
//!
49
//! # Managing the connection
50
//!
51
//! The [`Connection`] instance is used to manage connection state. The caller
52
//! is required to call [`Connection::poll`] in order to advance state.
53
//! [`SendRequest::send_request`] and other functions have no effect unless
54
//! [`Connection::poll`] is called.
55
//!
56
//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57
//! returns `Ready`. At this point, the underlying socket has been closed and no
58
//! further work needs to be done.
59
//!
60
//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61
//! submit the [`Connection`] instance to an [executor]. The executor will then
62
//! manage polling the connection until the connection is complete.
63
//! Alternatively, the caller can call `poll` manually.
64
//!
65
//! # Example
66
//!
67
//! ```rust, no_run
68
//!
69
//! use h2::client;
70
//!
71
//! use http::{Request, Method};
72
//! use std::error::Error;
73
//! use tokio::net::TcpStream;
74
//!
75
//! #[tokio::main]
76
//! pub async fn main() -> Result<(), Box<dyn Error>> {
77
//!     // Establish TCP connection to the server.
78
//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79
//!     let (h2, connection) = client::handshake(tcp).await?;
80
//!     tokio::spawn(async move {
81
//!         connection.await.unwrap();
82
//!     });
83
//!
84
//!     let mut h2 = h2.ready().await?;
85
//!     // Prepare the HTTP request to send to the server.
86
//!     let request = Request::builder()
87
//!                     .method(Method::GET)
88
//!                     .uri("https://www.example.com/")
89
//!                     .body(())
90
//!                     .unwrap();
91
//!
92
//!     // Send the request. The second tuple item allows the caller
93
//!     // to stream a request body.
94
//!     let (response, _) = h2.send_request(request, true).unwrap();
95
//!
96
//!     let (head, mut body) = response.await?.into_parts();
97
//!
98
//!     println!("Received response: {:?}", head);
99
//!
100
//!     // The `flow_control` handle allows the caller to manage
101
//!     // flow control.
102
//!     //
103
//!     // Whenever data is received, the caller is responsible for
104
//!     // releasing capacity back to the server once it has freed
105
//!     // the data from memory.
106
//!     let mut flow_control = body.flow_control().clone();
107
//!
108
//!     while let Some(chunk) = body.data().await {
109
//!         let chunk = chunk?;
110
//!         println!("RX: {:?}", chunk);
111
//!
112
//!         // Let the server send more data.
113
//!         let _ = flow_control.release_capacity(chunk.len());
114
//!     }
115
//!
116
//!     Ok(())
117
//! }
118
//! ```
119
//!
120
//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121
//! [`handshake`]: fn.handshake.html
122
//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123
//! [`SendRequest`]: struct.SendRequest.html
124
//! [`SendStream`]: ../struct.SendStream.html
125
//! [Making requests]: #making-requests
126
//! [Managing the connection]: #managing-the-connection
127
//! [`Connection`]: struct.Connection.html
128
//! [`Connection::poll`]: struct.Connection.html#method.poll
129
//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130
//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131
//! [`SendRequest`]: struct.SendRequest.html
132
//! [`ResponseFuture`]: struct.ResponseFuture.html
133
//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134
//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135
//! [`Builder`]: struct.Builder.html
136
//! [`Error`]: ../struct.Error.html
137
138
use crate::codec::{Codec, SendError, UserError};
139
use crate::ext::Protocol;
140
use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141
use crate::proto::{self, Error};
142
use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144
use bytes::{Buf, Bytes};
145
use http::{uri, HeaderMap, Method, Request, Response, Version};
146
use std::fmt;
147
use std::future::Future;
148
use std::pin::Pin;
149
use std::task::{Context, Poll};
150
use std::time::Duration;
151
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152
use tracing::Instrument;
153
154
/// Initializes new HTTP/2 streams on a connection by sending a request.
155
///
156
/// This type does no work itself. Instead, it is a handle to the inner
157
/// connection state held by [`Connection`]. If the associated connection
158
/// instance is dropped, all `SendRequest` functions will return [`Error`].
159
///
160
/// [`SendRequest`] instances are able to move to and operate on separate tasks
161
/// / threads than their associated [`Connection`] instance. Internally, there
162
/// is a buffer used to stage requests before they get written to the
163
/// connection. There is no guarantee that requests get written to the
164
/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
165
///
166
/// [`SendRequest`] implements [`Clone`], enabling the creation of many
167
/// instances that are backed by a single connection.
168
///
169
/// See [module] level documentation for more details.
170
///
171
/// [module]: index.html
172
/// [`Connection`]: struct.Connection.html
173
/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
174
/// [`Error`]: ../struct.Error.html
175
pub struct SendRequest<B: Buf> {
176
    inner: proto::Streams<B, Peer>,
177
    pending: Option<proto::OpaqueStreamRef>,
178
}
179
180
/// Returns a `SendRequest` instance once it is ready to send at least one
181
/// request.
182
#[derive(Debug)]
183
pub struct ReadySendRequest<B: Buf> {
184
    inner: Option<SendRequest<B>>,
185
}
186
187
/// Manages all state associated with an HTTP/2 client connection.
188
///
189
/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
190
/// implements the HTTP/2 client logic for that connection. It is responsible
191
/// for driving the internal state forward, performing the work requested of the
192
/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
193
/// [`RecvStream`]).
194
///
195
/// `Connection` values are created by calling [`handshake`]. Once a
196
/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
197
/// until `Ready` is returned. The easiest way to do this is to submit the
198
/// `Connection` instance to an [executor].
199
///
200
/// [module]: index.html
201
/// [`handshake`]: fn.handshake.html
202
/// [`SendRequest`]: struct.SendRequest.html
203
/// [`ResponseFuture`]: struct.ResponseFuture.html
204
/// [`SendStream`]: ../struct.SendStream.html
205
/// [`RecvStream`]: ../struct.RecvStream.html
206
/// [`poll`]: #method.poll
207
/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
208
///
209
/// # Examples
210
///
211
/// ```
212
/// # use tokio::io::{AsyncRead, AsyncWrite};
213
/// # use h2::client;
214
/// # use h2::client::*;
215
/// #
216
/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
217
/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
218
/// # {
219
///     let (send_request, connection) = client::handshake(my_io).await?;
220
///     // Submit the connection handle to an executor.
221
///     tokio::spawn(async { connection.await.expect("connection failed"); });
222
///
223
///     // Now, use `send_request` to initialize HTTP/2 streams.
224
///     // ...
225
/// # Ok(())
226
/// # }
227
/// #
228
/// # pub fn main() {}
229
/// ```
230
#[must_use = "futures do nothing unless polled"]
231
pub struct Connection<T, B: Buf = Bytes> {
232
    inner: proto::Connection<T, Peer, B>,
233
}
234
235
/// A future of an HTTP response.
236
#[derive(Debug)]
237
#[must_use = "futures do nothing unless polled"]
238
pub struct ResponseFuture {
239
    inner: proto::OpaqueStreamRef,
240
    push_promise_consumed: bool,
241
}
242
243
/// A future of a pushed HTTP response.
244
///
245
/// We have to differentiate between pushed and non pushed because of the spec
246
/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
247
/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
248
/// > that is in either the "open" or "half-closed (remote)" state.
249
#[derive(Debug)]
250
#[must_use = "futures do nothing unless polled"]
251
pub struct PushedResponseFuture {
252
    inner: ResponseFuture,
253
}
254
255
/// A pushed response and corresponding request headers
256
#[derive(Debug)]
257
pub struct PushPromise {
258
    /// The request headers
259
    request: Request<()>,
260
261
    /// The pushed response
262
    response: PushedResponseFuture,
263
}
264
265
/// A stream of pushed responses and corresponding promised requests
266
#[derive(Debug)]
267
#[must_use = "streams do nothing unless polled"]
268
pub struct PushPromises {
269
    inner: proto::OpaqueStreamRef,
270
}
271
272
/// Builds client connections with custom configuration values.
273
///
274
/// Methods can be chained in order to set the configuration values.
275
///
276
/// The client is constructed by calling [`handshake`] and passing the I/O
277
/// handle that will back the HTTP/2 server.
278
///
279
/// New instances of `Builder` are obtained via [`Builder::new`].
280
///
281
/// See function level documentation for details on the various client
282
/// configuration settings.
283
///
284
/// [`Builder::new`]: struct.Builder.html#method.new
285
/// [`handshake`]: struct.Builder.html#method.handshake
286
///
287
/// # Examples
288
///
289
/// ```
290
/// # use tokio::io::{AsyncRead, AsyncWrite};
291
/// # use h2::client::*;
292
/// # use bytes::Bytes;
293
/// #
294
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
295
///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
296
/// # {
297
/// // `client_fut` is a future representing the completion of the HTTP/2
298
/// // handshake.
299
/// let client_fut = Builder::new()
300
///     .initial_window_size(1_000_000)
301
///     .max_concurrent_streams(1000)
302
///     .handshake(my_io);
303
/// # client_fut.await
304
/// # }
305
/// #
306
/// # pub fn main() {}
307
/// ```
308
#[derive(Clone, Debug)]
309
pub struct Builder {
310
    /// Time to keep locally reset streams around before reaping.
311
    reset_stream_duration: Duration,
312
313
    /// Initial maximum number of locally initiated (send) streams.
314
    /// After receiving a SETTINGS frame from the remote peer,
315
    /// the connection will overwrite this value with the
316
    /// MAX_CONCURRENT_STREAMS specified in the frame.
317
    /// If no value is advertised by the remote peer in the initial SETTINGS
318
    /// frame, it will be set to usize::MAX.
319
    initial_max_send_streams: usize,
320
321
    /// Initial target window size for new connections.
322
    initial_target_connection_window_size: Option<u32>,
323
324
    /// Maximum amount of bytes to "buffer" for writing per stream.
325
    max_send_buffer_size: usize,
326
327
    /// Maximum number of locally reset streams to keep at a time.
328
    reset_stream_max: usize,
329
330
    /// Maximum number of remotely reset streams to allow in the pending
331
    /// accept queue.
332
    pending_accept_reset_stream_max: usize,
333
334
    /// Initial `Settings` frame to send as part of the handshake.
335
    settings: Settings,
336
337
    /// The stream ID of the first (lowest) stream. Subsequent streams will use
338
    /// monotonically increasing stream IDs.
339
    stream_id: StreamId,
340
341
    /// Maximum number of locally reset streams due to protocol error across
342
    /// the lifetime of the connection.
343
    ///
344
    /// When this gets exceeded, we issue GOAWAYs.
345
    local_max_error_reset_streams: Option<usize>,
346
}
347
348
#[derive(Debug)]
349
pub(crate) struct Peer;
350
351
// ===== impl SendRequest =====
352
353
impl<B> SendRequest<B>
354
where
355
    B: Buf,
356
{
357
    /// Returns `Ready` when the connection can initialize a new HTTP/2
358
    /// stream.
359
    ///
360
    /// This function must return `Ready` before `send_request` is called. When
361
    /// `Poll::Pending` is returned, the task will be notified once the readiness
362
    /// state changes.
363
    ///
364
    /// See [module] level docs for more details.
365
    ///
366
    /// [module]: index.html
367
425k
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
368
425k
        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
369
413k
        self.pending = None;
370
413k
        Poll::Ready(Ok(()))
371
425k
    }
Unexecuted instantiation: <h2::client::SendRequest<_>>::poll_ready
<h2::client::SendRequest<bytes::bytes::Bytes>>::poll_ready
Line
Count
Source
367
425k
    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
368
425k
        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
369
413k
        self.pending = None;
370
413k
        Poll::Ready(Ok(()))
371
425k
    }
372
373
    /// Consumes `self`, returning a future that returns `self` back once it is
374
    /// ready to send a request.
375
    ///
376
    /// This function should be called before calling `send_request`.
377
    ///
378
    /// This is a functional combinator for [`poll_ready`]. The returned future
379
    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
380
    /// the caller.
381
    ///
382
    /// # Examples
383
    ///
384
    /// ```rust
385
    /// # use h2::client::*;
386
    /// # use http::*;
387
    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
388
    /// # {
389
    /// // First, wait until the `send_request` handle is ready to send a new
390
    /// // request
391
    /// let mut send_request = send_request.ready().await.unwrap();
392
    /// // Use `send_request` here.
393
    /// # }
394
    /// # pub fn main() {}
395
    /// ```
396
    ///
397
    /// See [module] level docs for more details.
398
    ///
399
    /// [`poll_ready`]: #method.poll_ready
400
    /// [module]: index.html
401
0
    pub fn ready(self) -> ReadySendRequest<B> {
402
0
        ReadySendRequest { inner: Some(self) }
403
0
    }
404
405
    /// Sends a HTTP/2 request to the server.
406
    ///
407
    /// `send_request` initializes a new HTTP/2 stream on the associated
408
    /// connection, then sends the given request using this new stream. Only the
409
    /// request head is sent.
410
    ///
411
    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
412
    /// are returned. The [`ResponseFuture`] instance is used to get the
413
    /// server's response and the [`SendStream`] instance is used to send a
414
    /// request body or trailers to the server over the same HTTP/2 stream.
415
    ///
416
    /// To send a request body or trailers, set `end_of_stream` to `false`.
417
    /// Then, use the returned [`SendStream`] instance to stream request body
418
    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
419
    /// then attempting to call [`SendStream::send_data`] or
420
    /// [`SendStream::send_trailers`] will result in an error.
421
    ///
422
    /// If no request body or trailers are to be sent, set `end_of_stream` to
423
    /// `true` and drop the returned [`SendStream`] instance.
424
    ///
425
    /// # A note on HTTP versions
426
    ///
427
    /// The provided `Request` will be encoded differently depending on the
428
    /// value of its version field. If the version is set to 2.0, then the
429
    /// request is encoded as per the specification recommends.
430
    ///
431
    /// If the version is set to a lower value, then the request is encoded to
432
    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
433
    /// headers are permitted and the `:authority` pseudo header is not
434
    /// included.
435
    ///
436
    /// The caller should always set the request's version field to 2.0 unless
437
    /// specifically transmitting an HTTP 1.1 request over 2.0.
438
    ///
439
    /// # Examples
440
    ///
441
    /// Sending a request with no body
442
    ///
443
    /// ```rust
444
    /// # use h2::client::*;
445
    /// # use http::*;
446
    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
447
    /// # {
448
    /// // First, wait until the `send_request` handle is ready to send a new
449
    /// // request
450
    /// let mut send_request = send_request.ready().await.unwrap();
451
    /// // Prepare the HTTP request to send to the server.
452
    /// let request = Request::get("https://www.example.com/")
453
    ///     .body(())
454
    ///     .unwrap();
455
    ///
456
    /// // Send the request to the server. Since we are not sending a
457
    /// // body or trailers, we can drop the `SendStream` instance.
458
    /// let (response, _) = send_request.send_request(request, true).unwrap();
459
    /// let response = response.await.unwrap();
460
    /// // Process the response
461
    /// # }
462
    /// # pub fn main() {}
463
    /// ```
464
    ///
465
    /// Sending a request with a body and trailers
466
    ///
467
    /// ```rust
468
    /// # use h2::client::*;
469
    /// # use http::*;
470
    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
471
    /// # {
472
    /// // First, wait until the `send_request` handle is ready to send a new
473
    /// // request
474
    /// let mut send_request = send_request.ready().await.unwrap();
475
    ///
476
    /// // Prepare the HTTP request to send to the server.
477
    /// let request = Request::get("https://www.example.com/")
478
    ///     .body(())
479
    ///     .unwrap();
480
    ///
481
    /// // Send the request to the server. If we are not sending a
482
    /// // body or trailers, we can drop the `SendStream` instance.
483
    /// let (response, mut send_stream) = send_request
484
    ///     .send_request(request, false).unwrap();
485
    ///
486
    /// // At this point, one option would be to wait for send capacity.
487
    /// // Doing so would allow us to not hold data in memory that
488
    /// // cannot be sent. However, this is not a requirement, so this
489
    /// // example will skip that step. See `SendStream` documentation
490
    /// // for more details.
491
    /// send_stream.send_data(b"hello", false).unwrap();
492
    /// send_stream.send_data(b"world", false).unwrap();
493
    ///
494
    /// // Send the trailers.
495
    /// let mut trailers = HeaderMap::new();
496
    /// trailers.insert(
497
    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
498
    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
499
    ///
500
    /// send_stream.send_trailers(trailers).unwrap();
501
    ///
502
    /// let response = response.await.unwrap();
503
    /// // Process the response
504
    /// # }
505
    /// # pub fn main() {}
506
    /// ```
507
    ///
508
    /// [`ResponseFuture`]: struct.ResponseFuture.html
509
    /// [`SendStream`]: ../struct.SendStream.html
510
    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
511
    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
512
414k
    pub fn send_request(
513
414k
        &mut self,
514
414k
        request: Request<()>,
515
414k
        end_of_stream: bool,
516
414k
    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
517
414k
        self.inner
518
414k
            .send_request(request, end_of_stream, self.pending.as_ref())
519
414k
            .map_err(Into::into)
520
414k
            .map(|(stream, is_full)| {
521
414k
                if stream.is_pending_open() && is_full {
522
234
                    // Only prevent sending another request when the request queue
523
234
                    // is not full.
524
234
                    self.pending = Some(stream.clone_to_opaque());
525
413k
                }
526
527
414k
                let response = ResponseFuture {
528
414k
                    inner: stream.clone_to_opaque(),
529
414k
                    push_promise_consumed: false,
530
414k
                };
531
414k
532
414k
                let stream = SendStream::new(stream);
533
414k
534
414k
                (response, stream)
535
414k
            })
Unexecuted instantiation: <h2::client::SendRequest<_>>::send_request::{closure#0}
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request::{closure#0}
Line
Count
Source
520
362
            .map(|(stream, is_full)| {
521
362
                if stream.is_pending_open() && is_full {
522
0
                    // Only prevent sending another request when the request queue
523
0
                    // is not full.
524
0
                    self.pending = Some(stream.clone_to_opaque());
525
362
                }
526
527
362
                let response = ResponseFuture {
528
362
                    inner: stream.clone_to_opaque(),
529
362
                    push_promise_consumed: false,
530
362
                };
531
362
532
362
                let stream = SendStream::new(stream);
533
362
534
362
                (response, stream)
535
362
            })
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request::{closure#0}
Line
Count
Source
520
413k
            .map(|(stream, is_full)| {
521
413k
                if stream.is_pending_open() && is_full {
522
234
                    // Only prevent sending another request when the request queue
523
234
                    // is not full.
524
234
                    self.pending = Some(stream.clone_to_opaque());
525
413k
                }
526
527
413k
                let response = ResponseFuture {
528
413k
                    inner: stream.clone_to_opaque(),
529
413k
                    push_promise_consumed: false,
530
413k
                };
531
413k
532
413k
                let stream = SendStream::new(stream);
533
413k
534
413k
                (response, stream)
535
413k
            })
536
414k
    }
Unexecuted instantiation: <h2::client::SendRequest<_>>::send_request
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request
Line
Count
Source
512
448
    pub fn send_request(
513
448
        &mut self,
514
448
        request: Request<()>,
515
448
        end_of_stream: bool,
516
448
    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
517
448
        self.inner
518
448
            .send_request(request, end_of_stream, self.pending.as_ref())
519
448
            .map_err(Into::into)
520
448
            .map(|(stream, is_full)| {
521
                if stream.is_pending_open() && is_full {
522
                    // Only prevent sending another request when the request queue
523
                    // is not full.
524
                    self.pending = Some(stream.clone_to_opaque());
525
                }
526
527
                let response = ResponseFuture {
528
                    inner: stream.clone_to_opaque(),
529
                    push_promise_consumed: false,
530
                };
531
532
                let stream = SendStream::new(stream);
533
534
                (response, stream)
535
448
            })
536
448
    }
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request
Line
Count
Source
512
413k
    pub fn send_request(
513
413k
        &mut self,
514
413k
        request: Request<()>,
515
413k
        end_of_stream: bool,
516
413k
    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
517
413k
        self.inner
518
413k
            .send_request(request, end_of_stream, self.pending.as_ref())
519
413k
            .map_err(Into::into)
520
413k
            .map(|(stream, is_full)| {
521
                if stream.is_pending_open() && is_full {
522
                    // Only prevent sending another request when the request queue
523
                    // is not full.
524
                    self.pending = Some(stream.clone_to_opaque());
525
                }
526
527
                let response = ResponseFuture {
528
                    inner: stream.clone_to_opaque(),
529
                    push_promise_consumed: false,
530
                };
531
532
                let stream = SendStream::new(stream);
533
534
                (response, stream)
535
413k
            })
536
413k
    }
537
538
    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
539
    ///
540
    /// This setting is configured by the server peer by sending the
541
    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
542
    /// This method returns the currently acknowledged value received from the
543
    /// remote.
544
    ///
545
    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
546
    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
547
0
    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
548
0
        self.inner.is_extended_connect_protocol_enabled()
549
0
    }
550
551
    /// Returns the current max send streams
552
0
    pub fn current_max_send_streams(&self) -> usize {
553
0
        self.inner.current_max_send_streams()
554
0
    }
555
556
    /// Returns the current max recv streams
557
0
    pub fn current_max_recv_streams(&self) -> usize {
558
0
        self.inner.current_max_recv_streams()
559
0
    }
560
}
561
562
impl<B> fmt::Debug for SendRequest<B>
563
where
564
    B: Buf,
565
{
566
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
567
0
        fmt.debug_struct("SendRequest").finish()
568
0
    }
569
}
570
571
impl<B> Clone for SendRequest<B>
572
where
573
    B: Buf,
574
{
575
0
    fn clone(&self) -> Self {
576
0
        SendRequest {
577
0
            inner: self.inner.clone(),
578
0
            pending: None,
579
0
        }
580
0
    }
581
}
582
583
#[cfg(feature = "unstable")]
584
impl<B> SendRequest<B>
585
where
586
    B: Buf,
587
{
588
    /// Returns the number of active streams.
589
    ///
590
    /// An active stream is a stream that has not yet transitioned to a closed
591
    /// state.
592
0
    pub fn num_active_streams(&self) -> usize {
593
0
        self.inner.num_active_streams()
594
0
    }
595
596
    /// Returns the number of streams that are held in memory.
597
    ///
598
    /// A wired stream is a stream that is either active or is closed but must
599
    /// stay in memory for some reason. For example, there are still outstanding
600
    /// userspace handles pointing to the slot.
601
0
    pub fn num_wired_streams(&self) -> usize {
602
0
        self.inner.num_wired_streams()
603
0
    }
604
}
605
606
// ===== impl ReadySendRequest =====
607
608
impl<B> Future for ReadySendRequest<B>
609
where
610
    B: Buf,
611
{
612
    type Output = Result<SendRequest<B>, crate::Error>;
613
614
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
615
0
        match &mut self.inner {
616
0
            Some(send_request) => {
617
0
                ready!(send_request.poll_ready(cx))?;
618
            }
619
0
            None => panic!("called `poll` after future completed"),
620
        }
621
622
0
        Poll::Ready(Ok(self.inner.take().unwrap()))
623
0
    }
624
}
625
626
// ===== impl Builder =====
627
628
impl Builder {
629
    /// Returns a new client builder instance initialized with default
630
    /// configuration values.
631
    ///
632
    /// Configuration methods can be chained on the return value.
633
    ///
634
    /// # Examples
635
    ///
636
    /// ```
637
    /// # use tokio::io::{AsyncRead, AsyncWrite};
638
    /// # use h2::client::*;
639
    /// # use bytes::Bytes;
640
    /// #
641
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
642
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
643
    /// # {
644
    /// // `client_fut` is a future representing the completion of the HTTP/2
645
    /// // handshake.
646
    /// let client_fut = Builder::new()
647
    ///     .initial_window_size(1_000_000)
648
    ///     .max_concurrent_streams(1000)
649
    ///     .handshake(my_io);
650
    /// # client_fut.await
651
    /// # }
652
    /// #
653
    /// # pub fn main() {}
654
    /// ```
655
12.1k
    pub fn new() -> Builder {
656
12.1k
        Builder {
657
12.1k
            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658
12.1k
            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
659
12.1k
            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
660
12.1k
            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
661
12.1k
            initial_target_connection_window_size: None,
662
12.1k
            initial_max_send_streams: usize::MAX,
663
12.1k
            settings: Default::default(),
664
12.1k
            stream_id: 1.into(),
665
12.1k
            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
666
12.1k
        }
667
12.1k
    }
668
669
    /// Indicates the initial window size (in octets) for stream-level
670
    /// flow control for received data.
671
    ///
672
    /// The initial window of a stream is used as part of flow control. For more
673
    /// details, see [`FlowControl`].
674
    ///
675
    /// The default value is 65,535.
676
    ///
677
    /// [`FlowControl`]: ../struct.FlowControl.html
678
    ///
679
    /// # Examples
680
    ///
681
    /// ```
682
    /// # use tokio::io::{AsyncRead, AsyncWrite};
683
    /// # use h2::client::*;
684
    /// # use bytes::Bytes;
685
    /// #
686
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
687
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
688
    /// # {
689
    /// // `client_fut` is a future representing the completion of the HTTP/2
690
    /// // handshake.
691
    /// let client_fut = Builder::new()
692
    ///     .initial_window_size(1_000_000)
693
    ///     .handshake(my_io);
694
    /// # client_fut.await
695
    /// # }
696
    /// #
697
    /// # pub fn main() {}
698
    /// ```
699
0
    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
700
0
        self.settings.set_initial_window_size(Some(size));
701
0
        self
702
0
    }
703
704
    /// Indicates the initial window size (in octets) for connection-level flow control
705
    /// for received data.
706
    ///
707
    /// The initial window of a connection is used as part of flow control. For more details,
708
    /// see [`FlowControl`].
709
    ///
710
    /// The default value is 65,535.
711
    ///
712
    /// [`FlowControl`]: ../struct.FlowControl.html
713
    ///
714
    /// # Examples
715
    ///
716
    /// ```
717
    /// # use tokio::io::{AsyncRead, AsyncWrite};
718
    /// # use h2::client::*;
719
    /// # use bytes::Bytes;
720
    /// #
721
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
722
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
723
    /// # {
724
    /// // `client_fut` is a future representing the completion of the HTTP/2
725
    /// // handshake.
726
    /// let client_fut = Builder::new()
727
    ///     .initial_connection_window_size(1_000_000)
728
    ///     .handshake(my_io);
729
    /// # client_fut.await
730
    /// # }
731
    /// #
732
    /// # pub fn main() {}
733
    /// ```
734
0
    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
735
0
        self.initial_target_connection_window_size = Some(size);
736
0
        self
737
0
    }
738
739
    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
740
    /// configured client is able to accept.
741
    ///
742
    /// The sender may send data frames that are **smaller** than this value,
743
    /// but any data larger than `max` will be broken up into multiple `DATA`
744
    /// frames.
745
    ///
746
    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
747
    ///
748
    /// # Examples
749
    ///
750
    /// ```
751
    /// # use tokio::io::{AsyncRead, AsyncWrite};
752
    /// # use h2::client::*;
753
    /// # use bytes::Bytes;
754
    /// #
755
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
756
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
757
    /// # {
758
    /// // `client_fut` is a future representing the completion of the HTTP/2
759
    /// // handshake.
760
    /// let client_fut = Builder::new()
761
    ///     .max_frame_size(1_000_000)
762
    ///     .handshake(my_io);
763
    /// # client_fut.await
764
    /// # }
765
    /// #
766
    /// # pub fn main() {}
767
    /// ```
768
    ///
769
    /// # Panics
770
    ///
771
    /// This function panics if `max` is not within the legal range specified
772
    /// above.
773
0
    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
774
0
        self.settings.set_max_frame_size(Some(max));
775
0
        self
776
0
    }
777
778
    /// Sets the max size of received header frames.
779
    ///
780
    /// This advisory setting informs a peer of the maximum size of header list
781
    /// that the sender is prepared to accept, in octets. The value is based on
782
    /// the uncompressed size of header fields, including the length of the name
783
    /// and value in octets plus an overhead of 32 octets for each header field.
784
    ///
785
    /// This setting is also used to limit the maximum amount of data that is
786
    /// buffered to decode HEADERS frames.
787
    ///
788
    /// # Examples
789
    ///
790
    /// ```
791
    /// # use tokio::io::{AsyncRead, AsyncWrite};
792
    /// # use h2::client::*;
793
    /// # use bytes::Bytes;
794
    /// #
795
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
796
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
797
    /// # {
798
    /// // `client_fut` is a future representing the completion of the HTTP/2
799
    /// // handshake.
800
    /// let client_fut = Builder::new()
801
    ///     .max_header_list_size(16 * 1024)
802
    ///     .handshake(my_io);
803
    /// # client_fut.await
804
    /// # }
805
    /// #
806
    /// # pub fn main() {}
807
    /// ```
808
0
    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
809
0
        self.settings.set_max_header_list_size(Some(max));
810
0
        self
811
0
    }
812
813
    /// Sets the maximum number of concurrent streams.
814
    ///
815
    /// The maximum concurrent streams setting only controls the maximum number
816
    /// of streams that can be initiated by the remote peer. In other words,
817
    /// when this setting is set to 100, this does not limit the number of
818
    /// concurrent streams that can be created by the caller.
819
    ///
820
    /// It is recommended that this value be no smaller than 100, so as to not
821
    /// unnecessarily limit parallelism. However, any value is legal, including
822
    /// 0. If `max` is set to 0, then the remote will not be permitted to
823
    /// initiate streams.
824
    ///
825
    /// Note that streams in the reserved state, i.e., push promises that have
826
    /// been reserved but the stream has not started, do not count against this
827
    /// setting.
828
    ///
829
    /// Also note that if the remote *does* exceed the value set here, it is not
830
    /// a protocol level error. Instead, the `h2` library will immediately reset
831
    /// the stream.
832
    ///
833
    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
834
    ///
835
    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
836
    ///
837
    /// # Examples
838
    ///
839
    /// ```
840
    /// # use tokio::io::{AsyncRead, AsyncWrite};
841
    /// # use h2::client::*;
842
    /// # use bytes::Bytes;
843
    /// #
844
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
845
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
846
    /// # {
847
    /// // `client_fut` is a future representing the completion of the HTTP/2
848
    /// // handshake.
849
    /// let client_fut = Builder::new()
850
    ///     .max_concurrent_streams(1000)
851
    ///     .handshake(my_io);
852
    /// # client_fut.await
853
    /// # }
854
    /// #
855
    /// # pub fn main() {}
856
    /// ```
857
0
    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
858
0
        self.settings.set_max_concurrent_streams(Some(max));
859
0
        self
860
0
    }
861
862
    /// Sets the initial maximum of locally initiated (send) streams.
863
    ///
864
    /// The initial settings will be overwritten by the remote peer when
865
    /// the SETTINGS frame is received. The new value will be set to the
866
    /// `max_concurrent_streams()` from the frame. If no value is advertised in
867
    /// the initial SETTINGS frame from the remote peer as part of
868
    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
869
    ///
870
    /// This setting prevents the caller from exceeding this number of
871
    /// streams that are counted towards the concurrency limit.
872
    ///
873
    /// Sending streams past the limit returned by the peer will be treated
874
    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
875
    ///
876
    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
877
    ///
878
    /// The default value is `usize::MAX`.
879
    ///
880
    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
881
    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
882
    ///
883
    /// # Examples
884
    ///
885
    /// ```
886
    /// # use tokio::io::{AsyncRead, AsyncWrite};
887
    /// # use h2::client::*;
888
    /// # use bytes::Bytes;
889
    /// #
890
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
891
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
892
    /// # {
893
    /// // `client_fut` is a future representing the completion of the HTTP/2
894
    /// // handshake.
895
    /// let client_fut = Builder::new()
896
    ///     .initial_max_send_streams(1000)
897
    ///     .handshake(my_io);
898
    /// # client_fut.await
899
    /// # }
900
    /// #
901
    /// # pub fn main() {}
902
    /// ```
903
0
    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
904
0
        self.initial_max_send_streams = initial;
905
0
        self
906
0
    }
907
908
    /// Sets the maximum number of concurrent locally reset streams.
909
    ///
910
    /// When a stream is explicitly reset, the HTTP/2 specification requires
911
    /// that any further frames received for that stream must be ignored for
912
    /// "some time".
913
    ///
914
    /// In order to satisfy the specification, internal state must be maintained
915
    /// to implement the behavior. This state grows linearly with the number of
916
    /// streams that are locally reset.
917
    ///
918
    /// The `max_concurrent_reset_streams` setting configures sets an upper
919
    /// bound on the amount of state that is maintained. When this max value is
920
    /// reached, the oldest reset stream is purged from memory.
921
    ///
922
    /// Once the stream has been fully purged from memory, any additional frames
923
    /// received for that stream will result in a connection level protocol
924
    /// error, forcing the connection to terminate.
925
    ///
926
    /// The default value is 10.
927
    ///
928
    /// # Examples
929
    ///
930
    /// ```
931
    /// # use tokio::io::{AsyncRead, AsyncWrite};
932
    /// # use h2::client::*;
933
    /// # use bytes::Bytes;
934
    /// #
935
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
936
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
937
    /// # {
938
    /// // `client_fut` is a future representing the completion of the HTTP/2
939
    /// // handshake.
940
    /// let client_fut = Builder::new()
941
    ///     .max_concurrent_reset_streams(1000)
942
    ///     .handshake(my_io);
943
    /// # client_fut.await
944
    /// # }
945
    /// #
946
    /// # pub fn main() {}
947
    /// ```
948
0
    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
949
0
        self.reset_stream_max = max;
950
0
        self
951
0
    }
952
953
    /// Sets the duration to remember locally reset streams.
954
    ///
955
    /// When a stream is explicitly reset, the HTTP/2 specification requires
956
    /// that any further frames received for that stream must be ignored for
957
    /// "some time".
958
    ///
959
    /// In order to satisfy the specification, internal state must be maintained
960
    /// to implement the behavior. This state grows linearly with the number of
961
    /// streams that are locally reset.
962
    ///
963
    /// The `reset_stream_duration` setting configures the max amount of time
964
    /// this state will be maintained in memory. Once the duration elapses, the
965
    /// stream state is purged from memory.
966
    ///
967
    /// Once the stream has been fully purged from memory, any additional frames
968
    /// received for that stream will result in a connection level protocol
969
    /// error, forcing the connection to terminate.
970
    ///
971
    /// The default value is 30 seconds.
972
    ///
973
    /// # Examples
974
    ///
975
    /// ```
976
    /// # use tokio::io::{AsyncRead, AsyncWrite};
977
    /// # use h2::client::*;
978
    /// # use std::time::Duration;
979
    /// # use bytes::Bytes;
980
    /// #
981
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
982
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
983
    /// # {
984
    /// // `client_fut` is a future representing the completion of the HTTP/2
985
    /// // handshake.
986
    /// let client_fut = Builder::new()
987
    ///     .reset_stream_duration(Duration::from_secs(10))
988
    ///     .handshake(my_io);
989
    /// # client_fut.await
990
    /// # }
991
    /// #
992
    /// # pub fn main() {}
993
    /// ```
994
0
    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
995
0
        self.reset_stream_duration = dur;
996
0
        self
997
0
    }
998
999
    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1000
    ///
1001
    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1002
    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1003
    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1004
    ///
1005
    /// When the number of local resets exceeds this threshold, the client will close the connection.
1006
    ///
1007
    /// If you really want to disable this, supply [`Option::None`] here.
1008
    /// Disabling this is not recommended and may expose you to DOS attacks.
1009
    ///
1010
    /// The default value is currently 1024, but could change.
1011
0
    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1012
0
        self.local_max_error_reset_streams = max;
1013
0
        self
1014
0
    }
1015
1016
    /// Sets the maximum number of pending-accept remotely-reset streams.
1017
    ///
1018
    /// Streams that have been received by the peer, but not accepted by the
1019
    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1020
    /// could send a request and then shortly after, realize it is not needed,
1021
    /// sending a CANCEL.
1022
    ///
1023
    /// However, since those streams are now "closed", they don't count towards
1024
    /// the max concurrent streams. So, they will sit in the accept queue,
1025
    /// using memory.
1026
    ///
1027
    /// When the number of remotely-reset streams sitting in the pending-accept
1028
    /// queue reaches this maximum value, a connection error with the code of
1029
    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1030
    /// `Future`.
1031
    ///
1032
    /// The default value is currently 20, but could change.
1033
    ///
1034
    /// # Examples
1035
    ///
1036
    /// ```
1037
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1038
    /// # use h2::client::*;
1039
    /// # use bytes::Bytes;
1040
    /// #
1041
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1042
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1043
    /// # {
1044
    /// // `client_fut` is a future representing the completion of the HTTP/2
1045
    /// // handshake.
1046
    /// let client_fut = Builder::new()
1047
    ///     .max_pending_accept_reset_streams(100)
1048
    ///     .handshake(my_io);
1049
    /// # client_fut.await
1050
    /// # }
1051
    /// #
1052
    /// # pub fn main() {}
1053
    /// ```
1054
0
    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1055
0
        self.pending_accept_reset_stream_max = max;
1056
0
        self
1057
0
    }
1058
1059
    /// Sets the maximum send buffer size per stream.
1060
    ///
1061
    /// Once a stream has buffered up to (or over) the maximum, the stream's
1062
    /// flow control will not "poll" additional capacity. Once bytes for the
1063
    /// stream have been written to the connection, the send buffer capacity
1064
    /// will be freed up again.
1065
    ///
1066
    /// The default is currently ~400KB, but may change.
1067
    ///
1068
    /// # Panics
1069
    ///
1070
    /// This function panics if `max` is larger than `u32::MAX`.
1071
0
    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1072
0
        assert!(max <= u32::MAX as usize);
1073
0
        self.max_send_buffer_size = max;
1074
0
        self
1075
0
    }
1076
1077
    /// Enables or disables server push promises.
1078
    ///
1079
    /// This value is included in the initial SETTINGS handshake.
1080
    /// Setting this value to value to
1081
    /// false in the initial SETTINGS handshake guarantees that the remote server
1082
    /// will never send a push promise.
1083
    ///
1084
    /// This setting can be changed during the life of a single HTTP/2
1085
    /// connection by sending another settings frame updating the value.
1086
    ///
1087
    /// Default value: `true`.
1088
    ///
1089
    /// # Examples
1090
    ///
1091
    /// ```
1092
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1093
    /// # use h2::client::*;
1094
    /// # use std::time::Duration;
1095
    /// # use bytes::Bytes;
1096
    /// #
1097
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1098
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1099
    /// # {
1100
    /// // `client_fut` is a future representing the completion of the HTTP/2
1101
    /// // handshake.
1102
    /// let client_fut = Builder::new()
1103
    ///     .enable_push(false)
1104
    ///     .handshake(my_io);
1105
    /// # client_fut.await
1106
    /// # }
1107
    /// #
1108
    /// # pub fn main() {}
1109
    /// ```
1110
0
    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1111
0
        self.settings.set_enable_push(enabled);
1112
0
        self
1113
0
    }
1114
1115
    /// Sets the header table size.
1116
    ///
1117
    /// This setting informs the peer of the maximum size of the header compression
1118
    /// table used to encode header blocks, in octets. The encoder may select any value
1119
    /// equal to or less than the header table size specified by the sender.
1120
    ///
1121
    /// The default value is 4,096.
1122
    ///
1123
    /// # Examples
1124
    ///
1125
    /// ```
1126
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1127
    /// # use h2::client::*;
1128
    /// # use bytes::Bytes;
1129
    /// #
1130
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1131
    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1132
    /// # {
1133
    /// // `client_fut` is a future representing the completion of the HTTP/2
1134
    /// // handshake.
1135
    /// let client_fut = Builder::new()
1136
    ///     .header_table_size(1_000_000)
1137
    ///     .handshake(my_io);
1138
    /// # client_fut.await
1139
    /// # }
1140
    /// #
1141
    /// # pub fn main() {}
1142
    /// ```
1143
0
    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1144
0
        self.settings.set_header_table_size(Some(size));
1145
0
        self
1146
0
    }
1147
1148
    /// Sets the first stream ID to something other than 1.
1149
    #[cfg(feature = "unstable")]
1150
0
    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1151
0
        self.stream_id = stream_id.into();
1152
0
        assert!(
1153
0
            self.stream_id.is_client_initiated(),
1154
0
            "stream id must be odd"
1155
        );
1156
0
        self
1157
0
    }
1158
1159
    /// Creates a new configured HTTP/2 client backed by `io`.
1160
    ///
1161
    /// It is expected that `io` already be in an appropriate state to commence
1162
    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1163
    /// preface and the initial settings frame is sent by the client.
1164
    ///
1165
    /// The handshake future does not wait for the initial settings frame from the
1166
    /// server.
1167
    ///
1168
    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1169
    /// tuple once the HTTP/2 handshake has been completed.
1170
    ///
1171
    /// This function also allows the caller to configure the send payload data
1172
    /// type. See [Outbound data type] for more details.
1173
    ///
1174
    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1175
    /// [`Connection`]: struct.Connection.html
1176
    /// [`SendRequest`]: struct.SendRequest.html
1177
    /// [Outbound data type]: ../index.html#outbound-data-type.
1178
    ///
1179
    /// # Examples
1180
    ///
1181
    /// Basic usage:
1182
    ///
1183
    /// ```
1184
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1185
    /// # use h2::client::*;
1186
    /// # use bytes::Bytes;
1187
    /// #
1188
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1189
    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1190
    /// # {
1191
    /// // `client_fut` is a future representing the completion of the HTTP/2
1192
    /// // handshake.
1193
    /// let client_fut = Builder::new()
1194
    ///     .handshake(my_io);
1195
    /// # client_fut.await
1196
    /// # }
1197
    /// #
1198
    /// # pub fn main() {}
1199
    /// ```
1200
    ///
1201
    /// Configures the send-payload data type. In this case, the outbound data
1202
    /// type will be `&'static [u8]`.
1203
    ///
1204
    /// ```
1205
    /// # use tokio::io::{AsyncRead, AsyncWrite};
1206
    /// # use h2::client::*;
1207
    /// #
1208
    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1209
    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1210
    /// # {
1211
    /// // `client_fut` is a future representing the completion of the HTTP/2
1212
    /// // handshake.
1213
    /// let client_fut = Builder::new()
1214
    ///     .handshake::<_, &'static [u8]>(my_io);
1215
    /// # client_fut.await
1216
    /// # }
1217
    /// #
1218
    /// # pub fn main() {}
1219
    /// ```
1220
12.1k
    pub fn handshake<T, B>(
1221
12.1k
        &self,
1222
12.1k
        io: T,
1223
12.1k
    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1224
12.1k
    where
1225
12.1k
        T: AsyncRead + AsyncWrite + Unpin,
1226
12.1k
        B: Buf,
1227
12.1k
    {
1228
12.1k
        Connection::handshake2(io, self.clone())
1229
12.1k
    }
Unexecuted instantiation: <h2::client::Builder>::handshake::<_, _>
<h2::client::Builder>::handshake::<h2_support::mock::Mock, bytes::bytes::Bytes>
Line
Count
Source
1220
448
    pub fn handshake<T, B>(
1221
448
        &self,
1222
448
        io: T,
1223
448
    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1224
448
    where
1225
448
        T: AsyncRead + AsyncWrite + Unpin,
1226
448
        B: Buf,
1227
448
    {
1228
448
        Connection::handshake2(io, self.clone())
1229
448
    }
<h2::client::Builder>::handshake::<fuzz_e2e::MockIo, bytes::bytes::Bytes>
Line
Count
Source
1220
11.6k
    pub fn handshake<T, B>(
1221
11.6k
        &self,
1222
11.6k
        io: T,
1223
11.6k
    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1224
11.6k
    where
1225
11.6k
        T: AsyncRead + AsyncWrite + Unpin,
1226
11.6k
        B: Buf,
1227
11.6k
    {
1228
11.6k
        Connection::handshake2(io, self.clone())
1229
11.6k
    }
1230
}
1231
1232
impl Default for Builder {
1233
0
    fn default() -> Builder {
1234
0
        Builder::new()
1235
0
    }
1236
}
1237
1238
/// Creates a new configured HTTP/2 client with default configuration
1239
/// values backed by `io`.
1240
///
1241
/// It is expected that `io` already be in an appropriate state to commence
1242
/// the [HTTP/2 handshake]. See [Handshake] for more details.
1243
///
1244
/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1245
/// tuple once the HTTP/2 handshake has been completed. The returned
1246
/// [`Connection`] instance will be using default configuration values. Use
1247
/// [`Builder`] to customize the configuration values used by a [`Connection`]
1248
/// instance.
1249
///
1250
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1251
/// [Handshake]: ../index.html#handshake
1252
/// [`Connection`]: struct.Connection.html
1253
/// [`SendRequest`]: struct.SendRequest.html
1254
///
1255
/// # Examples
1256
///
1257
/// ```
1258
/// # use tokio::io::{AsyncRead, AsyncWrite};
1259
/// # use h2::client;
1260
/// # use h2::client::*;
1261
/// #
1262
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1263
/// # {
1264
/// let (send_request, connection) = client::handshake(my_io).await?;
1265
/// // The HTTP/2 handshake has completed, now start polling
1266
/// // `connection` and use `send_request` to send requests to the
1267
/// // server.
1268
/// # Ok(())
1269
/// # }
1270
/// #
1271
/// # pub fn main() {}
1272
/// ```
1273
11.6k
pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1274
11.6k
where
1275
11.6k
    T: AsyncRead + AsyncWrite + Unpin,
1276
11.6k
{
Unexecuted instantiation: h2::client::handshake::<_>
h2::client::handshake::<fuzz_e2e::MockIo>
Line
Count
Source
1273
11.6k
pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1274
11.6k
where
1275
11.6k
    T: AsyncRead + AsyncWrite + Unpin,
1276
11.6k
{
1277
11.6k
    let builder = Builder::new();
1278
11.6k
    builder
1279
11.6k
        .handshake(io)
1280
11.6k
        .instrument(tracing::trace_span!("client_handshake"))
1281
57.1k
        .await
1282
11.6k
}
Unexecuted instantiation: h2::client::handshake::<_>::{closure#0}
h2::client::handshake::<fuzz_e2e::MockIo>::{closure#0}
Line
Count
Source
1276
11.6k
{
1277
11.6k
    let builder = Builder::new();
1278
11.6k
    builder
1279
11.6k
        .handshake(io)
1280
11.6k
        .instrument(tracing::trace_span!("client_handshake"))
1281
57.1k
        .await
1282
11.6k
}
1283
1284
// ===== impl Connection =====
1285
1286
12.1k
async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1287
12.1k
where
1288
12.1k
    T: AsyncRead + AsyncWrite + Unpin,
1289
12.1k
{
Unexecuted instantiation: h2::client::bind_connection::<_>
h2::client::bind_connection::<h2_support::mock::Mock>
Line
Count
Source
1286
448
async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1287
448
where
1288
448
    T: AsyncRead + AsyncWrite + Unpin,
1289
448
{
h2::client::bind_connection::<fuzz_e2e::MockIo>
Line
Count
Source
1286
11.6k
async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1287
11.6k
where
1288
11.6k
    T: AsyncRead + AsyncWrite + Unpin,
1289
11.6k
{
1290
12.1k
    tracing::debug!("binding client connection");
1291
1292
12.1k
    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1293
57.1k
    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1294
1295
12.0k
    tracing::debug!("client connection bound");
1296
1297
12.0k
    Ok(())
1298
12.1k
}
Unexecuted instantiation: h2::client::bind_connection::<_>::{closure#0}
h2::client::bind_connection::<h2_support::mock::Mock>::{closure#0}
Line
Count
Source
1289
448
{
1290
448
    tracing::debug!("binding client connection");
1291
1292
448
    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1293
448
    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1294
1295
448
    tracing::debug!("client connection bound");
1296
1297
448
    Ok(())
1298
448
}
h2::client::bind_connection::<fuzz_e2e::MockIo>::{closure#0}
Line
Count
Source
1289
11.6k
{
1290
11.6k
    tracing::debug!("binding client connection");
1291
1292
11.6k
    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1293
57.1k
    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1294
1295
11.6k
    tracing::debug!("client connection bound");
1296
1297
11.6k
    Ok(())
1298
11.6k
}
1299
1300
impl<T, B> Connection<T, B>
1301
where
1302
    T: AsyncRead + AsyncWrite + Unpin,
1303
    B: Buf,
1304
{
1305
12.1k
    async fn handshake2(
1306
12.1k
        mut io: T,
1307
12.1k
        builder: Builder,
1308
12.1k
    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
Unexecuted instantiation: <h2::client::Connection<_, _>>::handshake2
<h2::client::Connection<h2_support::mock::Mock>>::handshake2
Line
Count
Source
1305
448
    async fn handshake2(
1306
448
        mut io: T,
1307
448
        builder: Builder,
1308
448
    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
<h2::client::Connection<fuzz_e2e::MockIo>>::handshake2
Line
Count
Source
1305
11.6k
    async fn handshake2(
1306
11.6k
        mut io: T,
1307
11.6k
        builder: Builder,
1308
11.6k
    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1309
57.1k
        bind_connection(&mut io).await?;
1310
1311
        // Create the codec
1312
12.0k
        let mut codec = Codec::new(io);
1313
1314
12.0k
        if let Some(max) = builder.settings.max_frame_size() {
1315
0
            codec.set_max_recv_frame_size(max as usize);
1316
12.0k
        }
1317
1318
12.0k
        if let Some(max) = builder.settings.max_header_list_size() {
1319
0
            codec.set_max_recv_header_list_size(max as usize);
1320
12.0k
        }
1321
1322
        // Send initial settings frame
1323
12.0k
        codec
1324
12.0k
            .buffer(builder.settings.clone().into())
1325
12.0k
            .expect("invalid SETTINGS frame");
1326
12.0k
1327
12.0k
        let inner = proto::Connection::new(
1328
12.0k
            codec,
1329
12.0k
            proto::Config {
1330
12.0k
                next_stream_id: builder.stream_id,
1331
12.0k
                initial_max_send_streams: builder.initial_max_send_streams,
1332
12.0k
                max_send_buffer_size: builder.max_send_buffer_size,
1333
12.0k
                reset_stream_duration: builder.reset_stream_duration,
1334
12.0k
                reset_stream_max: builder.reset_stream_max,
1335
12.0k
                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1336
12.0k
                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1337
12.0k
                settings: builder.settings.clone(),
1338
12.0k
            },
1339
12.0k
        );
1340
12.0k
        let send_request = SendRequest {
1341
12.0k
            inner: inner.streams().clone(),
1342
12.0k
            pending: None,
1343
12.0k
        };
1344
12.0k
1345
12.0k
        let mut connection = Connection { inner };
1346
12.0k
        if let Some(sz) = builder.initial_target_connection_window_size {
1347
0
            connection.set_target_window_size(sz);
1348
12.0k
        }
1349
1350
12.0k
        Ok((send_request, connection))
1351
12.1k
    }
Unexecuted instantiation: <h2::client::Connection<_, _>>::handshake2::{closure#0}
<h2::client::Connection<h2_support::mock::Mock>>::handshake2::{closure#0}
Line
Count
Source
1308
448
    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1309
448
        bind_connection(&mut io).await?;
1310
1311
        // Create the codec
1312
448
        let mut codec = Codec::new(io);
1313
1314
448
        if let Some(max) = builder.settings.max_frame_size() {
1315
0
            codec.set_max_recv_frame_size(max as usize);
1316
448
        }
1317
1318
448
        if let Some(max) = builder.settings.max_header_list_size() {
1319
0
            codec.set_max_recv_header_list_size(max as usize);
1320
448
        }
1321
1322
        // Send initial settings frame
1323
448
        codec
1324
448
            .buffer(builder.settings.clone().into())
1325
448
            .expect("invalid SETTINGS frame");
1326
448
1327
448
        let inner = proto::Connection::new(
1328
448
            codec,
1329
448
            proto::Config {
1330
448
                next_stream_id: builder.stream_id,
1331
448
                initial_max_send_streams: builder.initial_max_send_streams,
1332
448
                max_send_buffer_size: builder.max_send_buffer_size,
1333
448
                reset_stream_duration: builder.reset_stream_duration,
1334
448
                reset_stream_max: builder.reset_stream_max,
1335
448
                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1336
448
                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1337
448
                settings: builder.settings.clone(),
1338
448
            },
1339
448
        );
1340
448
        let send_request = SendRequest {
1341
448
            inner: inner.streams().clone(),
1342
448
            pending: None,
1343
448
        };
1344
448
1345
448
        let mut connection = Connection { inner };
1346
448
        if let Some(sz) = builder.initial_target_connection_window_size {
1347
0
            connection.set_target_window_size(sz);
1348
448
        }
1349
1350
448
        Ok((send_request, connection))
1351
448
    }
<h2::client::Connection<fuzz_e2e::MockIo>>::handshake2::{closure#0}
Line
Count
Source
1308
11.6k
    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1309
57.1k
        bind_connection(&mut io).await?;
1310
1311
        // Create the codec
1312
11.6k
        let mut codec = Codec::new(io);
1313
1314
11.6k
        if let Some(max) = builder.settings.max_frame_size() {
1315
0
            codec.set_max_recv_frame_size(max as usize);
1316
11.6k
        }
1317
1318
11.6k
        if let Some(max) = builder.settings.max_header_list_size() {
1319
0
            codec.set_max_recv_header_list_size(max as usize);
1320
11.6k
        }
1321
1322
        // Send initial settings frame
1323
11.6k
        codec
1324
11.6k
            .buffer(builder.settings.clone().into())
1325
11.6k
            .expect("invalid SETTINGS frame");
1326
11.6k
1327
11.6k
        let inner = proto::Connection::new(
1328
11.6k
            codec,
1329
11.6k
            proto::Config {
1330
11.6k
                next_stream_id: builder.stream_id,
1331
11.6k
                initial_max_send_streams: builder.initial_max_send_streams,
1332
11.6k
                max_send_buffer_size: builder.max_send_buffer_size,
1333
11.6k
                reset_stream_duration: builder.reset_stream_duration,
1334
11.6k
                reset_stream_max: builder.reset_stream_max,
1335
11.6k
                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1336
11.6k
                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1337
11.6k
                settings: builder.settings.clone(),
1338
11.6k
            },
1339
11.6k
        );
1340
11.6k
        let send_request = SendRequest {
1341
11.6k
            inner: inner.streams().clone(),
1342
11.6k
            pending: None,
1343
11.6k
        };
1344
11.6k
1345
11.6k
        let mut connection = Connection { inner };
1346
11.6k
        if let Some(sz) = builder.initial_target_connection_window_size {
1347
0
            connection.set_target_window_size(sz);
1348
11.6k
        }
1349
1350
11.6k
        Ok((send_request, connection))
1351
11.6k
    }
1352
1353
    /// Sets the target window size for the whole connection.
1354
    ///
1355
    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1356
    /// frame will be immediately sent to the remote, increasing the connection
1357
    /// level window by `size - current_value`.
1358
    ///
1359
    /// If `size` is less than the current value, nothing will happen
1360
    /// immediately. However, as window capacity is released by
1361
    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1362
    /// out until the number of "in flight" bytes drops below `size`.
1363
    ///
1364
    /// The default value is 65,535.
1365
    ///
1366
    /// See [`FlowControl`] documentation for more details.
1367
    ///
1368
    /// [`FlowControl`]: ../struct.FlowControl.html
1369
    /// [library level]: ../index.html#flow-control
1370
0
    pub fn set_target_window_size(&mut self, size: u32) {
1371
0
        assert!(size <= proto::MAX_WINDOW_SIZE);
1372
0
        self.inner.set_target_window_size(size);
1373
0
    }
Unexecuted instantiation: <h2::client::Connection<_, _>>::set_target_window_size
Unexecuted instantiation: <h2::client::Connection<h2_support::mock::Mock>>::set_target_window_size
Unexecuted instantiation: <h2::client::Connection<fuzz_e2e::MockIo>>::set_target_window_size
1374
1375
    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1376
    /// flow control for received data.
1377
    ///
1378
    /// The `SETTINGS` will be sent to the remote, and only applied once the
1379
    /// remote acknowledges the change.
1380
    ///
1381
    /// This can be used to increase or decrease the window size for existing
1382
    /// streams.
1383
    ///
1384
    /// # Errors
1385
    ///
1386
    /// Returns an error if a previous call is still pending acknowledgement
1387
    /// from the remote endpoint.
1388
0
    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1389
0
        assert!(size <= proto::MAX_WINDOW_SIZE);
1390
0
        self.inner.set_initial_window_size(size)?;
1391
0
        Ok(())
1392
0
    }
1393
1394
    /// Takes a `PingPong` instance from the connection.
1395
    ///
1396
    /// # Note
1397
    ///
1398
    /// This may only be called once. Calling multiple times will return `None`.
1399
0
    pub fn ping_pong(&mut self) -> Option<PingPong> {
1400
0
        self.inner.take_user_pings().map(PingPong::new)
1401
0
    }
1402
1403
    /// Returns the maximum number of concurrent streams that may be initiated
1404
    /// by this client.
1405
    ///
1406
    /// This limit is configured by the server peer by sending the
1407
    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1408
    /// This method returns the currently acknowledged value received from the
1409
    /// remote.
1410
    ///
1411
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1412
0
    pub fn max_concurrent_send_streams(&self) -> usize {
1413
0
        self.inner.max_send_streams()
1414
0
    }
1415
    /// Returns the maximum number of concurrent streams that may be initiated
1416
    /// by the server on this connection.
1417
    ///
1418
    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1419
    /// parameter][1] sent in a `SETTINGS` frame that has been
1420
    /// acknowledged by the remote peer. The value to be sent is configured by
1421
    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1422
    /// with the remote peer.
1423
    ///
1424
    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1425
    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1426
0
    pub fn max_concurrent_recv_streams(&self) -> usize {
1427
0
        self.inner.max_recv_streams()
1428
0
    }
1429
}
1430
1431
impl<T, B> Future for Connection<T, B>
1432
where
1433
    T: AsyncRead + AsyncWrite + Unpin,
1434
    B: Buf,
1435
{
1436
    type Output = Result<(), crate::Error>;
1437
1438
1.02M
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1439
1.02M
        self.inner.maybe_close_connection_if_no_streams();
1440
1.02M
        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1441
1.02M
        let result = self.inner.poll(cx).map_err(Into::into);
1442
1.02M
        // if we had streams/refs, and don't anymore, wake up one more time to
1443
1.02M
        // ensure proper shutdown
1444
1.02M
        if result.is_pending()
1445
1.01M
            && had_streams_or_refs
1446
1.01M
            && !self.inner.has_streams_or_other_references()
1447
        {
1448
0
            tracing::trace!("last stream closed during poll, wake again");
1449
0
            cx.waker().wake_by_ref();
1450
1.02M
        }
1451
1.02M
        result
1452
1.02M
    }
Unexecuted instantiation: <h2::client::Connection<_, _> as core::future::future::Future>::poll
<h2::client::Connection<fuzz_e2e::MockIo> as core::future::future::Future>::poll
Line
Count
Source
1438
1.02M
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1439
1.02M
        self.inner.maybe_close_connection_if_no_streams();
1440
1.02M
        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1441
1.02M
        let result = self.inner.poll(cx).map_err(Into::into);
1442
1.02M
        // if we had streams/refs, and don't anymore, wake up one more time to
1443
1.02M
        // ensure proper shutdown
1444
1.02M
        if result.is_pending()
1445
1.01M
            && had_streams_or_refs
1446
1.01M
            && !self.inner.has_streams_or_other_references()
1447
        {
1448
0
            tracing::trace!("last stream closed during poll, wake again");
1449
0
            cx.waker().wake_by_ref();
1450
1.02M
        }
1451
1.02M
        result
1452
1.02M
    }
1453
}
1454
1455
impl<T, B> fmt::Debug for Connection<T, B>
1456
where
1457
    T: AsyncRead + AsyncWrite,
1458
    T: fmt::Debug,
1459
    B: fmt::Debug + Buf,
1460
{
1461
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1462
0
        fmt::Debug::fmt(&self.inner, fmt)
1463
0
    }
1464
}
1465
1466
// ===== impl ResponseFuture =====
1467
1468
impl Future for ResponseFuture {
1469
    type Output = Result<Response<RecvStream>, crate::Error>;
1470
1471
441k
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1472
441k
        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1473
689
        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1474
689
1475
689
        Poll::Ready(Ok(Response::from_parts(parts, body)))
1476
441k
    }
1477
}
1478
1479
impl ResponseFuture {
1480
    /// Returns the stream ID of the response stream.
1481
    ///
1482
    /// # Panics
1483
    ///
1484
    /// If the lock on the stream store has been poisoned.
1485
0
    pub fn stream_id(&self) -> crate::StreamId {
1486
0
        crate::StreamId::from_internal(self.inner.stream_id())
1487
0
    }
1488
    /// Returns a stream of PushPromises
1489
    ///
1490
    /// # Panics
1491
    ///
1492
    /// If this method has been called before
1493
    /// or the stream was itself was pushed
1494
0
    pub fn push_promises(&mut self) -> PushPromises {
1495
0
        if self.push_promise_consumed {
1496
0
            panic!("Reference to push promises stream taken!");
1497
0
        }
1498
0
        self.push_promise_consumed = true;
1499
0
        PushPromises {
1500
0
            inner: self.inner.clone(),
1501
0
        }
1502
0
    }
1503
}
1504
1505
// ===== impl PushPromises =====
1506
1507
impl PushPromises {
1508
    /// Get the next `PushPromise`.
1509
0
    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1510
0
        crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1511
0
    }
1512
1513
    #[doc(hidden)]
1514
0
    pub fn poll_push_promise(
1515
0
        &mut self,
1516
0
        cx: &mut Context<'_>,
1517
0
    ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1518
0
        match self.inner.poll_pushed(cx) {
1519
0
            Poll::Ready(Some(Ok((request, response)))) => {
1520
0
                let response = PushedResponseFuture {
1521
0
                    inner: ResponseFuture {
1522
0
                        inner: response,
1523
0
                        push_promise_consumed: false,
1524
0
                    },
1525
0
                };
1526
0
                Poll::Ready(Some(Ok(PushPromise { request, response })))
1527
            }
1528
0
            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1529
0
            Poll::Ready(None) => Poll::Ready(None),
1530
0
            Poll::Pending => Poll::Pending,
1531
        }
1532
0
    }
1533
}
1534
1535
#[cfg(feature = "stream")]
1536
impl futures_core::Stream for PushPromises {
1537
    type Item = Result<PushPromise, crate::Error>;
1538
1539
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1540
0
        self.poll_push_promise(cx)
1541
0
    }
1542
}
1543
1544
// ===== impl PushPromise =====
1545
1546
impl PushPromise {
1547
    /// Returns a reference to the push promise's request headers.
1548
0
    pub fn request(&self) -> &Request<()> {
1549
0
        &self.request
1550
0
    }
1551
1552
    /// Returns a mutable reference to the push promise's request headers.
1553
0
    pub fn request_mut(&mut self) -> &mut Request<()> {
1554
0
        &mut self.request
1555
0
    }
1556
1557
    /// Consumes `self`, returning the push promise's request headers and
1558
    /// response future.
1559
0
    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1560
0
        (self.request, self.response)
1561
0
    }
1562
}
1563
1564
// ===== impl PushedResponseFuture =====
1565
1566
impl Future for PushedResponseFuture {
1567
    type Output = Result<Response<RecvStream>, crate::Error>;
1568
1569
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1570
0
        Pin::new(&mut self.inner).poll(cx)
1571
0
    }
1572
}
1573
1574
impl PushedResponseFuture {
1575
    /// Returns the stream ID of the response stream.
1576
    ///
1577
    /// # Panics
1578
    ///
1579
    /// If the lock on the stream store has been poisoned.
1580
0
    pub fn stream_id(&self) -> crate::StreamId {
1581
0
        self.inner.stream_id()
1582
0
    }
1583
}
1584
1585
// ===== impl Peer =====
1586
1587
impl Peer {
1588
414k
    pub fn convert_send_message(
1589
414k
        id: StreamId,
1590
414k
        request: Request<()>,
1591
414k
        protocol: Option<Protocol>,
1592
414k
        end_of_stream: bool,
1593
414k
    ) -> Result<Headers, SendError> {
1594
        use http::request::Parts;
1595
1596
        let (
1597
            Parts {
1598
414k
                method,
1599
414k
                uri,
1600
414k
                headers,
1601
414k
                version,
1602
414k
                ..
1603
414k
            },
1604
414k
            _,
1605
414k
        ) = request.into_parts();
1606
414k
1607
414k
        let is_connect = method == Method::CONNECT;
1608
414k
1609
414k
        // Build the set pseudo header set. All requests will include `method`
1610
414k
        // and `path`.
1611
414k
        let mut pseudo = Pseudo::request(method, uri, protocol);
1612
414k
1613
414k
        if pseudo.scheme.is_none() {
1614
            // If the scheme is not set, then there are a two options.
1615
            //
1616
            // 1) Authority is not set. In this case, a request was issued with
1617
            //    a relative URI. This is permitted **only** when forwarding
1618
            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1619
            //    this is an error.
1620
            //
1621
            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1622
            //
1623
            // It is not possible to have a scheme but not an authority set (the
1624
            // `http` crate does not allow it).
1625
            //
1626
400
            if pseudo.authority.is_none() {
1627
74
                if version == Version::HTTP_2 {
1628
0
                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1629
74
                } else {
1630
74
                    // This is acceptable as per the above comment. However,
1631
74
                    // HTTP/2 requires that a scheme is set. Since we are
1632
74
                    // forwarding an HTTP 1.1 request, the scheme is set to
1633
74
                    // "http".
1634
74
                    pseudo.set_scheme(uri::Scheme::HTTP);
1635
74
                }
1636
326
            } else if !is_connect {
1637
326
                // TODO: Error
1638
326
            }
1639
413k
        }
1640
1641
        // Create the HEADERS frame
1642
414k
        let mut frame = Headers::new(id, pseudo, headers);
1643
414k
1644
414k
        if end_of_stream {
1645
448
            frame.set_end_stream()
1646
413k
        }
1647
1648
414k
        Ok(frame)
1649
414k
    }
1650
}
1651
1652
impl proto::Peer for Peer {
1653
    type Poll = Response<()>;
1654
1655
    const NAME: &'static str = "Client";
1656
1657
1.24M
    fn r#dyn() -> proto::DynPeer {
1658
1.24M
        proto::DynPeer::Client
1659
1.24M
    }
1660
1661
    /*
1662
    fn is_server() -> bool {
1663
        false
1664
    }
1665
    */
1666
1667
1.23k
    fn convert_poll_message(
1668
1.23k
        pseudo: Pseudo,
1669
1.23k
        fields: HeaderMap,
1670
1.23k
        stream_id: StreamId,
1671
1.23k
    ) -> Result<Self::Poll, Error> {
1672
1.23k
        let mut b = Response::builder();
1673
1.23k
1674
1.23k
        b = b.version(Version::HTTP_2);
1675
1676
1.23k
        if let Some(status) = pseudo.status {
1677
25
            b = b.status(status);
1678
1.20k
        }
1679
1680
1.23k
        let mut response = match b.body(()) {
1681
1.23k
            Ok(response) => response,
1682
            Err(_) => {
1683
                // TODO: Should there be more specialized handling for different
1684
                // kinds of errors
1685
0
                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1686
            }
1687
        };
1688
1689
1.23k
        *response.headers_mut() = fields;
1690
1.23k
1691
1.23k
        Ok(response)
1692
1.23k
    }
1693
}