Line | Count | Source (jump to first uncovered line) |
1 | | //! Client implementation of the HTTP/2 protocol. |
2 | | //! |
3 | | //! # Getting started |
4 | | //! |
5 | | //! Running an HTTP/2 client requires the caller to establish the underlying |
6 | | //! connection as well as get the connection to a state that is ready to begin |
7 | | //! the HTTP/2 handshake. See [here](../index.html#handshake) for more |
8 | | //! details. |
9 | | //! |
10 | | //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote |
11 | | //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades. |
12 | | //! |
13 | | //! Once a connection is obtained, it is passed to [`handshake`], which will |
14 | | //! begin the [HTTP/2 handshake]. This returns a future that completes once |
15 | | //! the handshake process is performed and HTTP/2 streams may be initialized. |
16 | | //! |
17 | | //! [`handshake`] uses default configuration values. There are a number of |
18 | | //! settings that can be changed by using [`Builder`] instead. |
19 | | //! |
20 | | //! Once the handshake future completes, the caller is provided with a |
21 | | //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`] |
22 | | //! instance is used to drive the connection (see [Managing the connection]). |
23 | | //! The [`SendRequest`] instance is used to initialize new streams (see [Making |
24 | | //! requests]). |
25 | | //! |
26 | | //! # Making requests |
27 | | //! |
28 | | //! Requests are made using the [`SendRequest`] handle provided by the handshake |
29 | | //! future. Once a request is submitted, an HTTP/2 stream is initialized and |
30 | | //! the request is sent to the server. |
31 | | //! |
32 | | //! A request body and request trailers are sent using [`SendRequest`] and the |
33 | | //! server's response is returned once the [`ResponseFuture`] future completes. |
34 | | //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by |
35 | | //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream |
36 | | //! initialized by the sent request. |
37 | | //! |
38 | | //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2 |
39 | | //! stream can be created, i.e. as long as the current number of active streams |
40 | | //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the |
41 | | //! caller will be notified once an existing stream closes, freeing capacity for |
42 | | //! the caller. The caller should use [`SendRequest::poll_ready`] to check for |
43 | | //! capacity before sending a request to the server. |
44 | | //! |
45 | | //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user |
46 | | //! must not send a request if `poll_ready` does not return `Ready`. Attempting |
47 | | //! to do so will result in an [`Error`] being returned. |
48 | | //! |
49 | | //! # Managing the connection |
50 | | //! |
51 | | //! The [`Connection`] instance is used to manage connection state. The caller |
52 | | //! is required to call [`Connection::poll`] in order to advance state. |
53 | | //! [`SendRequest::send_request`] and other functions have no effect unless |
54 | | //! [`Connection::poll`] is called. |
55 | | //! |
56 | | //! The [`Connection`] instance should only be dropped once [`Connection::poll`] |
57 | | //! returns `Ready`. At this point, the underlying socket has been closed and no |
58 | | //! further work needs to be done. |
59 | | //! |
60 | | //! The easiest way to ensure that the [`Connection`] instance gets polled is to |
61 | | //! submit the [`Connection`] instance to an [executor]. The executor will then |
62 | | //! manage polling the connection until the connection is complete. |
63 | | //! Alternatively, the caller can call `poll` manually. |
64 | | //! |
65 | | //! # Example |
66 | | //! |
67 | | //! ```rust, no_run |
68 | | //! |
69 | | //! use h2::client; |
70 | | //! |
71 | | //! use http::{Request, Method}; |
72 | | //! use std::error::Error; |
73 | | //! use tokio::net::TcpStream; |
74 | | //! |
75 | | //! #[tokio::main] |
76 | | //! pub async fn main() -> Result<(), Box<dyn Error>> { |
77 | | //! // Establish TCP connection to the server. |
78 | | //! let tcp = TcpStream::connect("127.0.0.1:5928").await?; |
79 | | //! let (h2, connection) = client::handshake(tcp).await?; |
80 | | //! tokio::spawn(async move { |
81 | | //! connection.await.unwrap(); |
82 | | //! }); |
83 | | //! |
84 | | //! let mut h2 = h2.ready().await?; |
85 | | //! // Prepare the HTTP request to send to the server. |
86 | | //! let request = Request::builder() |
87 | | //! .method(Method::GET) |
88 | | //! .uri("https://www.example.com/") |
89 | | //! .body(()) |
90 | | //! .unwrap(); |
91 | | //! |
92 | | //! // Send the request. The second tuple item allows the caller |
93 | | //! // to stream a request body. |
94 | | //! let (response, _) = h2.send_request(request, true).unwrap(); |
95 | | //! |
96 | | //! let (head, mut body) = response.await?.into_parts(); |
97 | | //! |
98 | | //! println!("Received response: {:?}", head); |
99 | | //! |
100 | | //! // The `flow_control` handle allows the caller to manage |
101 | | //! // flow control. |
102 | | //! // |
103 | | //! // Whenever data is received, the caller is responsible for |
104 | | //! // releasing capacity back to the server once it has freed |
105 | | //! // the data from memory. |
106 | | //! let mut flow_control = body.flow_control().clone(); |
107 | | //! |
108 | | //! while let Some(chunk) = body.data().await { |
109 | | //! let chunk = chunk?; |
110 | | //! println!("RX: {:?}", chunk); |
111 | | //! |
112 | | //! // Let the server send more data. |
113 | | //! let _ = flow_control.release_capacity(chunk.len()); |
114 | | //! } |
115 | | //! |
116 | | //! Ok(()) |
117 | | //! } |
118 | | //! ``` |
119 | | //! |
120 | | //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html |
121 | | //! [`handshake`]: fn.handshake.html |
122 | | //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
123 | | //! [`SendRequest`]: struct.SendRequest.html |
124 | | //! [`SendStream`]: ../struct.SendStream.html |
125 | | //! [Making requests]: #making-requests |
126 | | //! [Managing the connection]: #managing-the-connection |
127 | | //! [`Connection`]: struct.Connection.html |
128 | | //! [`Connection::poll`]: struct.Connection.html#method.poll |
129 | | //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request |
130 | | //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues |
131 | | //! [`SendRequest`]: struct.SendRequest.html |
132 | | //! [`ResponseFuture`]: struct.ResponseFuture.html |
133 | | //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready |
134 | | //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
135 | | //! [`Builder`]: struct.Builder.html |
136 | | //! [`Error`]: ../struct.Error.html |
137 | | |
138 | | use crate::codec::{Codec, SendError, UserError}; |
139 | | use crate::ext::Protocol; |
140 | | use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; |
141 | | use crate::proto::{self, Error}; |
142 | | use crate::{FlowControl, PingPong, RecvStream, SendStream}; |
143 | | |
144 | | use bytes::{Buf, Bytes}; |
145 | | use http::{uri, HeaderMap, Method, Request, Response, Version}; |
146 | | use std::fmt; |
147 | | use std::future::Future; |
148 | | use std::pin::Pin; |
149 | | use std::task::{Context, Poll}; |
150 | | use std::time::Duration; |
151 | | use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; |
152 | | use tracing::Instrument; |
153 | | |
154 | | /// Initializes new HTTP/2 streams on a connection by sending a request. |
155 | | /// |
156 | | /// This type does no work itself. Instead, it is a handle to the inner |
157 | | /// connection state held by [`Connection`]. If the associated connection |
158 | | /// instance is dropped, all `SendRequest` functions will return [`Error`]. |
159 | | /// |
160 | | /// [`SendRequest`] instances are able to move to and operate on separate tasks |
161 | | /// / threads than their associated [`Connection`] instance. Internally, there |
162 | | /// is a buffer used to stage requests before they get written to the |
163 | | /// connection. There is no guarantee that requests get written to the |
164 | | /// connection in FIFO order as HTTP/2 prioritization logic can play a role. |
165 | | /// |
166 | | /// [`SendRequest`] implements [`Clone`], enabling the creation of many |
167 | | /// instances that are backed by a single connection. |
168 | | /// |
169 | | /// See [module] level documentation for more details. |
170 | | /// |
171 | | /// [module]: index.html |
172 | | /// [`Connection`]: struct.Connection.html |
173 | | /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html |
174 | | /// [`Error`]: ../struct.Error.html |
175 | | pub struct SendRequest<B: Buf> { |
176 | | inner: proto::Streams<B, Peer>, |
177 | | pending: Option<proto::OpaqueStreamRef>, |
178 | | } |
179 | | |
180 | | /// Returns a `SendRequest` instance once it is ready to send at least one |
181 | | /// request. |
182 | | #[derive(Debug)] |
183 | | pub struct ReadySendRequest<B: Buf> { |
184 | | inner: Option<SendRequest<B>>, |
185 | | } |
186 | | |
187 | | /// Manages all state associated with an HTTP/2 client connection. |
188 | | /// |
189 | | /// A `Connection` is backed by an I/O resource (usually a TCP socket) and |
190 | | /// implements the HTTP/2 client logic for that connection. It is responsible |
191 | | /// for driving the internal state forward, performing the work requested of the |
192 | | /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`], |
193 | | /// [`RecvStream`]). |
194 | | /// |
195 | | /// `Connection` values are created by calling [`handshake`]. Once a |
196 | | /// `Connection` value is obtained, the caller must repeatedly call [`poll`] |
197 | | /// until `Ready` is returned. The easiest way to do this is to submit the |
198 | | /// `Connection` instance to an [executor]. |
199 | | /// |
200 | | /// [module]: index.html |
201 | | /// [`handshake`]: fn.handshake.html |
202 | | /// [`SendRequest`]: struct.SendRequest.html |
203 | | /// [`ResponseFuture`]: struct.ResponseFuture.html |
204 | | /// [`SendStream`]: ../struct.SendStream.html |
205 | | /// [`RecvStream`]: ../struct.RecvStream.html |
206 | | /// [`poll`]: #method.poll |
207 | | /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html |
208 | | /// |
209 | | /// # Examples |
210 | | /// |
211 | | /// ``` |
212 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
213 | | /// # use h2::client; |
214 | | /// # use h2::client::*; |
215 | | /// # |
216 | | /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error> |
217 | | /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, |
218 | | /// # { |
219 | | /// let (send_request, connection) = client::handshake(my_io).await?; |
220 | | /// // Submit the connection handle to an executor. |
221 | | /// tokio::spawn(async { connection.await.expect("connection failed"); }); |
222 | | /// |
223 | | /// // Now, use `send_request` to initialize HTTP/2 streams. |
224 | | /// // ... |
225 | | /// # Ok(()) |
226 | | /// # } |
227 | | /// # |
228 | | /// # pub fn main() {} |
229 | | /// ``` |
230 | | #[must_use = "futures do nothing unless polled"] |
231 | | pub struct Connection<T, B: Buf = Bytes> { |
232 | | inner: proto::Connection<T, Peer, B>, |
233 | | } |
234 | | |
235 | | /// A future of an HTTP response. |
236 | | #[derive(Debug)] |
237 | | #[must_use = "futures do nothing unless polled"] |
238 | | pub struct ResponseFuture { |
239 | | inner: proto::OpaqueStreamRef, |
240 | | push_promise_consumed: bool, |
241 | | } |
242 | | |
243 | | /// A future of a pushed HTTP response. |
244 | | /// |
245 | | /// We have to differentiate between pushed and non pushed because of the spec |
246 | | /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE> |
247 | | /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream |
248 | | /// > that is in either the "open" or "half-closed (remote)" state. |
249 | | #[derive(Debug)] |
250 | | #[must_use = "futures do nothing unless polled"] |
251 | | pub struct PushedResponseFuture { |
252 | | inner: ResponseFuture, |
253 | | } |
254 | | |
255 | | /// A pushed response and corresponding request headers |
256 | | #[derive(Debug)] |
257 | | pub struct PushPromise { |
258 | | /// The request headers |
259 | | request: Request<()>, |
260 | | |
261 | | /// The pushed response |
262 | | response: PushedResponseFuture, |
263 | | } |
264 | | |
265 | | /// A stream of pushed responses and corresponding promised requests |
266 | | #[derive(Debug)] |
267 | | #[must_use = "streams do nothing unless polled"] |
268 | | pub struct PushPromises { |
269 | | inner: proto::OpaqueStreamRef, |
270 | | } |
271 | | |
272 | | /// Builds client connections with custom configuration values. |
273 | | /// |
274 | | /// Methods can be chained in order to set the configuration values. |
275 | | /// |
276 | | /// The client is constructed by calling [`handshake`] and passing the I/O |
277 | | /// handle that will back the HTTP/2 server. |
278 | | /// |
279 | | /// New instances of `Builder` are obtained via [`Builder::new`]. |
280 | | /// |
281 | | /// See function level documentation for details on the various client |
282 | | /// configuration settings. |
283 | | /// |
284 | | /// [`Builder::new`]: struct.Builder.html#method.new |
285 | | /// [`handshake`]: struct.Builder.html#method.handshake |
286 | | /// |
287 | | /// # Examples |
288 | | /// |
289 | | /// ``` |
290 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
291 | | /// # use h2::client::*; |
292 | | /// # use bytes::Bytes; |
293 | | /// # |
294 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
295 | | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
296 | | /// # { |
297 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
298 | | /// // handshake. |
299 | | /// let client_fut = Builder::new() |
300 | | /// .initial_window_size(1_000_000) |
301 | | /// .max_concurrent_streams(1000) |
302 | | /// .handshake(my_io); |
303 | | /// # client_fut.await |
304 | | /// # } |
305 | | /// # |
306 | | /// # pub fn main() {} |
307 | | /// ``` |
308 | | #[derive(Clone, Debug)] |
309 | | pub struct Builder { |
310 | | /// Time to keep locally reset streams around before reaping. |
311 | | reset_stream_duration: Duration, |
312 | | |
313 | | /// Initial maximum number of locally initiated (send) streams. |
314 | | /// After receiving a SETTINGS frame from the remote peer, |
315 | | /// the connection will overwrite this value with the |
316 | | /// MAX_CONCURRENT_STREAMS specified in the frame. |
317 | | /// If no value is advertised by the remote peer in the initial SETTINGS |
318 | | /// frame, it will be set to usize::MAX. |
319 | | initial_max_send_streams: usize, |
320 | | |
321 | | /// Initial target window size for new connections. |
322 | | initial_target_connection_window_size: Option<u32>, |
323 | | |
324 | | /// Maximum amount of bytes to "buffer" for writing per stream. |
325 | | max_send_buffer_size: usize, |
326 | | |
327 | | /// Maximum number of locally reset streams to keep at a time. |
328 | | reset_stream_max: usize, |
329 | | |
330 | | /// Maximum number of remotely reset streams to allow in the pending |
331 | | /// accept queue. |
332 | | pending_accept_reset_stream_max: usize, |
333 | | |
334 | | /// Initial `Settings` frame to send as part of the handshake. |
335 | | settings: Settings, |
336 | | |
337 | | /// The stream ID of the first (lowest) stream. Subsequent streams will use |
338 | | /// monotonically increasing stream IDs. |
339 | | stream_id: StreamId, |
340 | | |
341 | | /// Maximum number of locally reset streams due to protocol error across |
342 | | /// the lifetime of the connection. |
343 | | /// |
344 | | /// When this gets exceeded, we issue GOAWAYs. |
345 | | local_max_error_reset_streams: Option<usize>, |
346 | | } |
347 | | |
348 | | #[derive(Debug)] |
349 | | pub(crate) struct Peer; |
350 | | |
351 | | // ===== impl SendRequest ===== |
352 | | |
353 | | impl<B> SendRequest<B> |
354 | | where |
355 | | B: Buf, |
356 | | { |
357 | | /// Returns `Ready` when the connection can initialize a new HTTP/2 |
358 | | /// stream. |
359 | | /// |
360 | | /// This function must return `Ready` before `send_request` is called. When |
361 | | /// `Poll::Pending` is returned, the task will be notified once the readiness |
362 | | /// state changes. |
363 | | /// |
364 | | /// See [module] level docs for more details. |
365 | | /// |
366 | | /// [module]: index.html |
367 | 425k | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { |
368 | 425k | ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; |
369 | 413k | self.pending = None; |
370 | 413k | Poll::Ready(Ok(())) |
371 | 425k | } Unexecuted instantiation: <h2::client::SendRequest<_>>::poll_ready <h2::client::SendRequest<bytes::bytes::Bytes>>::poll_ready Line | Count | Source | 367 | 425k | pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { | 368 | 425k | ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; | 369 | 413k | self.pending = None; | 370 | 413k | Poll::Ready(Ok(())) | 371 | 425k | } |
|
372 | | |
373 | | /// Consumes `self`, returning a future that returns `self` back once it is |
374 | | /// ready to send a request. |
375 | | /// |
376 | | /// This function should be called before calling `send_request`. |
377 | | /// |
378 | | /// This is a functional combinator for [`poll_ready`]. The returned future |
379 | | /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to |
380 | | /// the caller. |
381 | | /// |
382 | | /// # Examples |
383 | | /// |
384 | | /// ```rust |
385 | | /// # use h2::client::*; |
386 | | /// # use http::*; |
387 | | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
388 | | /// # { |
389 | | /// // First, wait until the `send_request` handle is ready to send a new |
390 | | /// // request |
391 | | /// let mut send_request = send_request.ready().await.unwrap(); |
392 | | /// // Use `send_request` here. |
393 | | /// # } |
394 | | /// # pub fn main() {} |
395 | | /// ``` |
396 | | /// |
397 | | /// See [module] level docs for more details. |
398 | | /// |
399 | | /// [`poll_ready`]: #method.poll_ready |
400 | | /// [module]: index.html |
401 | 0 | pub fn ready(self) -> ReadySendRequest<B> { |
402 | 0 | ReadySendRequest { inner: Some(self) } |
403 | 0 | } |
404 | | |
405 | | /// Sends a HTTP/2 request to the server. |
406 | | /// |
407 | | /// `send_request` initializes a new HTTP/2 stream on the associated |
408 | | /// connection, then sends the given request using this new stream. Only the |
409 | | /// request head is sent. |
410 | | /// |
411 | | /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance |
412 | | /// are returned. The [`ResponseFuture`] instance is used to get the |
413 | | /// server's response and the [`SendStream`] instance is used to send a |
414 | | /// request body or trailers to the server over the same HTTP/2 stream. |
415 | | /// |
416 | | /// To send a request body or trailers, set `end_of_stream` to `false`. |
417 | | /// Then, use the returned [`SendStream`] instance to stream request body |
418 | | /// chunks or send trailers. If `end_of_stream` is **not** set to `false` |
419 | | /// then attempting to call [`SendStream::send_data`] or |
420 | | /// [`SendStream::send_trailers`] will result in an error. |
421 | | /// |
422 | | /// If no request body or trailers are to be sent, set `end_of_stream` to |
423 | | /// `true` and drop the returned [`SendStream`] instance. |
424 | | /// |
425 | | /// # A note on HTTP versions |
426 | | /// |
427 | | /// The provided `Request` will be encoded differently depending on the |
428 | | /// value of its version field. If the version is set to 2.0, then the |
429 | | /// request is encoded as per the specification recommends. |
430 | | /// |
431 | | /// If the version is set to a lower value, then the request is encoded to |
432 | | /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host |
433 | | /// headers are permitted and the `:authority` pseudo header is not |
434 | | /// included. |
435 | | /// |
436 | | /// The caller should always set the request's version field to 2.0 unless |
437 | | /// specifically transmitting an HTTP 1.1 request over 2.0. |
438 | | /// |
439 | | /// # Examples |
440 | | /// |
441 | | /// Sending a request with no body |
442 | | /// |
443 | | /// ```rust |
444 | | /// # use h2::client::*; |
445 | | /// # use http::*; |
446 | | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
447 | | /// # { |
448 | | /// // First, wait until the `send_request` handle is ready to send a new |
449 | | /// // request |
450 | | /// let mut send_request = send_request.ready().await.unwrap(); |
451 | | /// // Prepare the HTTP request to send to the server. |
452 | | /// let request = Request::get("https://www.example.com/") |
453 | | /// .body(()) |
454 | | /// .unwrap(); |
455 | | /// |
456 | | /// // Send the request to the server. Since we are not sending a |
457 | | /// // body or trailers, we can drop the `SendStream` instance. |
458 | | /// let (response, _) = send_request.send_request(request, true).unwrap(); |
459 | | /// let response = response.await.unwrap(); |
460 | | /// // Process the response |
461 | | /// # } |
462 | | /// # pub fn main() {} |
463 | | /// ``` |
464 | | /// |
465 | | /// Sending a request with a body and trailers |
466 | | /// |
467 | | /// ```rust |
468 | | /// # use h2::client::*; |
469 | | /// # use http::*; |
470 | | /// # async fn doc(send_request: SendRequest<&'static [u8]>) |
471 | | /// # { |
472 | | /// // First, wait until the `send_request` handle is ready to send a new |
473 | | /// // request |
474 | | /// let mut send_request = send_request.ready().await.unwrap(); |
475 | | /// |
476 | | /// // Prepare the HTTP request to send to the server. |
477 | | /// let request = Request::get("https://www.example.com/") |
478 | | /// .body(()) |
479 | | /// .unwrap(); |
480 | | /// |
481 | | /// // Send the request to the server. If we are not sending a |
482 | | /// // body or trailers, we can drop the `SendStream` instance. |
483 | | /// let (response, mut send_stream) = send_request |
484 | | /// .send_request(request, false).unwrap(); |
485 | | /// |
486 | | /// // At this point, one option would be to wait for send capacity. |
487 | | /// // Doing so would allow us to not hold data in memory that |
488 | | /// // cannot be sent. However, this is not a requirement, so this |
489 | | /// // example will skip that step. See `SendStream` documentation |
490 | | /// // for more details. |
491 | | /// send_stream.send_data(b"hello", false).unwrap(); |
492 | | /// send_stream.send_data(b"world", false).unwrap(); |
493 | | /// |
494 | | /// // Send the trailers. |
495 | | /// let mut trailers = HeaderMap::new(); |
496 | | /// trailers.insert( |
497 | | /// header::HeaderName::from_bytes(b"my-trailer").unwrap(), |
498 | | /// header::HeaderValue::from_bytes(b"hello").unwrap()); |
499 | | /// |
500 | | /// send_stream.send_trailers(trailers).unwrap(); |
501 | | /// |
502 | | /// let response = response.await.unwrap(); |
503 | | /// // Process the response |
504 | | /// # } |
505 | | /// # pub fn main() {} |
506 | | /// ``` |
507 | | /// |
508 | | /// [`ResponseFuture`]: struct.ResponseFuture.html |
509 | | /// [`SendStream`]: ../struct.SendStream.html |
510 | | /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data |
511 | | /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers |
512 | 414k | pub fn send_request( |
513 | 414k | &mut self, |
514 | 414k | request: Request<()>, |
515 | 414k | end_of_stream: bool, |
516 | 414k | ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> { |
517 | 414k | self.inner |
518 | 414k | .send_request(request, end_of_stream, self.pending.as_ref()) |
519 | 414k | .map_err(Into::into) |
520 | 414k | .map(|(stream, is_full)| { |
521 | 414k | if stream.is_pending_open() && is_full { |
522 | 234 | // Only prevent sending another request when the request queue |
523 | 234 | // is not full. |
524 | 234 | self.pending = Some(stream.clone_to_opaque()); |
525 | 413k | } |
526 | | |
527 | 414k | let response = ResponseFuture { |
528 | 414k | inner: stream.clone_to_opaque(), |
529 | 414k | push_promise_consumed: false, |
530 | 414k | }; |
531 | 414k | |
532 | 414k | let stream = SendStream::new(stream); |
533 | 414k | |
534 | 414k | (response, stream) |
535 | 414k | }) Unexecuted instantiation: <h2::client::SendRequest<_>>::send_request::{closure#0} <h2::client::SendRequest<bytes::bytes::Bytes>>::send_request::{closure#0} Line | Count | Source | 520 | 362 | .map(|(stream, is_full)| { | 521 | 362 | if stream.is_pending_open() && is_full { | 522 | 0 | // Only prevent sending another request when the request queue | 523 | 0 | // is not full. | 524 | 0 | self.pending = Some(stream.clone_to_opaque()); | 525 | 362 | } | 526 | | | 527 | 362 | let response = ResponseFuture { | 528 | 362 | inner: stream.clone_to_opaque(), | 529 | 362 | push_promise_consumed: false, | 530 | 362 | }; | 531 | 362 | | 532 | 362 | let stream = SendStream::new(stream); | 533 | 362 | | 534 | 362 | (response, stream) | 535 | 362 | }) |
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request::{closure#0} Line | Count | Source | 520 | 413k | .map(|(stream, is_full)| { | 521 | 413k | if stream.is_pending_open() && is_full { | 522 | 234 | // Only prevent sending another request when the request queue | 523 | 234 | // is not full. | 524 | 234 | self.pending = Some(stream.clone_to_opaque()); | 525 | 413k | } | 526 | | | 527 | 413k | let response = ResponseFuture { | 528 | 413k | inner: stream.clone_to_opaque(), | 529 | 413k | push_promise_consumed: false, | 530 | 413k | }; | 531 | 413k | | 532 | 413k | let stream = SendStream::new(stream); | 533 | 413k | | 534 | 413k | (response, stream) | 535 | 413k | }) |
|
536 | 414k | } Unexecuted instantiation: <h2::client::SendRequest<_>>::send_request <h2::client::SendRequest<bytes::bytes::Bytes>>::send_request Line | Count | Source | 512 | 448 | pub fn send_request( | 513 | 448 | &mut self, | 514 | 448 | request: Request<()>, | 515 | 448 | end_of_stream: bool, | 516 | 448 | ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> { | 517 | 448 | self.inner | 518 | 448 | .send_request(request, end_of_stream, self.pending.as_ref()) | 519 | 448 | .map_err(Into::into) | 520 | 448 | .map(|(stream, is_full)| { | 521 | | if stream.is_pending_open() && is_full { | 522 | | // Only prevent sending another request when the request queue | 523 | | // is not full. | 524 | | self.pending = Some(stream.clone_to_opaque()); | 525 | | } | 526 | | | 527 | | let response = ResponseFuture { | 528 | | inner: stream.clone_to_opaque(), | 529 | | push_promise_consumed: false, | 530 | | }; | 531 | | | 532 | | let stream = SendStream::new(stream); | 533 | | | 534 | | (response, stream) | 535 | 448 | }) | 536 | 448 | } |
<h2::client::SendRequest<bytes::bytes::Bytes>>::send_request Line | Count | Source | 512 | 413k | pub fn send_request( | 513 | 413k | &mut self, | 514 | 413k | request: Request<()>, | 515 | 413k | end_of_stream: bool, | 516 | 413k | ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> { | 517 | 413k | self.inner | 518 | 413k | .send_request(request, end_of_stream, self.pending.as_ref()) | 519 | 413k | .map_err(Into::into) | 520 | 413k | .map(|(stream, is_full)| { | 521 | | if stream.is_pending_open() && is_full { | 522 | | // Only prevent sending another request when the request queue | 523 | | // is not full. | 524 | | self.pending = Some(stream.clone_to_opaque()); | 525 | | } | 526 | | | 527 | | let response = ResponseFuture { | 528 | | inner: stream.clone_to_opaque(), | 529 | | push_promise_consumed: false, | 530 | | }; | 531 | | | 532 | | let stream = SendStream::new(stream); | 533 | | | 534 | | (response, stream) | 535 | 413k | }) | 536 | 413k | } |
|
537 | | |
538 | | /// Returns whether the [extended CONNECT protocol][1] is enabled or not. |
539 | | /// |
540 | | /// This setting is configured by the server peer by sending the |
541 | | /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. |
542 | | /// This method returns the currently acknowledged value received from the |
543 | | /// remote. |
544 | | /// |
545 | | /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 |
546 | | /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 |
547 | 0 | pub fn is_extended_connect_protocol_enabled(&self) -> bool { |
548 | 0 | self.inner.is_extended_connect_protocol_enabled() |
549 | 0 | } |
550 | | |
551 | | /// Returns the current max send streams |
552 | 0 | pub fn current_max_send_streams(&self) -> usize { |
553 | 0 | self.inner.current_max_send_streams() |
554 | 0 | } |
555 | | |
556 | | /// Returns the current max recv streams |
557 | 0 | pub fn current_max_recv_streams(&self) -> usize { |
558 | 0 | self.inner.current_max_recv_streams() |
559 | 0 | } |
560 | | } |
561 | | |
562 | | impl<B> fmt::Debug for SendRequest<B> |
563 | | where |
564 | | B: Buf, |
565 | | { |
566 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
567 | 0 | fmt.debug_struct("SendRequest").finish() |
568 | 0 | } |
569 | | } |
570 | | |
571 | | impl<B> Clone for SendRequest<B> |
572 | | where |
573 | | B: Buf, |
574 | | { |
575 | 0 | fn clone(&self) -> Self { |
576 | 0 | SendRequest { |
577 | 0 | inner: self.inner.clone(), |
578 | 0 | pending: None, |
579 | 0 | } |
580 | 0 | } |
581 | | } |
582 | | |
583 | | #[cfg(feature = "unstable")] |
584 | | impl<B> SendRequest<B> |
585 | | where |
586 | | B: Buf, |
587 | | { |
588 | | /// Returns the number of active streams. |
589 | | /// |
590 | | /// An active stream is a stream that has not yet transitioned to a closed |
591 | | /// state. |
592 | 0 | pub fn num_active_streams(&self) -> usize { |
593 | 0 | self.inner.num_active_streams() |
594 | 0 | } |
595 | | |
596 | | /// Returns the number of streams that are held in memory. |
597 | | /// |
598 | | /// A wired stream is a stream that is either active or is closed but must |
599 | | /// stay in memory for some reason. For example, there are still outstanding |
600 | | /// userspace handles pointing to the slot. |
601 | 0 | pub fn num_wired_streams(&self) -> usize { |
602 | 0 | self.inner.num_wired_streams() |
603 | 0 | } |
604 | | } |
605 | | |
606 | | // ===== impl ReadySendRequest ===== |
607 | | |
608 | | impl<B> Future for ReadySendRequest<B> |
609 | | where |
610 | | B: Buf, |
611 | | { |
612 | | type Output = Result<SendRequest<B>, crate::Error>; |
613 | | |
614 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
615 | 0 | match &mut self.inner { |
616 | 0 | Some(send_request) => { |
617 | 0 | ready!(send_request.poll_ready(cx))?; |
618 | | } |
619 | 0 | None => panic!("called `poll` after future completed"), |
620 | | } |
621 | | |
622 | 0 | Poll::Ready(Ok(self.inner.take().unwrap())) |
623 | 0 | } |
624 | | } |
625 | | |
626 | | // ===== impl Builder ===== |
627 | | |
628 | | impl Builder { |
629 | | /// Returns a new client builder instance initialized with default |
630 | | /// configuration values. |
631 | | /// |
632 | | /// Configuration methods can be chained on the return value. |
633 | | /// |
634 | | /// # Examples |
635 | | /// |
636 | | /// ``` |
637 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
638 | | /// # use h2::client::*; |
639 | | /// # use bytes::Bytes; |
640 | | /// # |
641 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
642 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
643 | | /// # { |
644 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
645 | | /// // handshake. |
646 | | /// let client_fut = Builder::new() |
647 | | /// .initial_window_size(1_000_000) |
648 | | /// .max_concurrent_streams(1000) |
649 | | /// .handshake(my_io); |
650 | | /// # client_fut.await |
651 | | /// # } |
652 | | /// # |
653 | | /// # pub fn main() {} |
654 | | /// ``` |
655 | 12.1k | pub fn new() -> Builder { |
656 | 12.1k | Builder { |
657 | 12.1k | max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, |
658 | 12.1k | reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), |
659 | 12.1k | reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, |
660 | 12.1k | pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, |
661 | 12.1k | initial_target_connection_window_size: None, |
662 | 12.1k | initial_max_send_streams: usize::MAX, |
663 | 12.1k | settings: Default::default(), |
664 | 12.1k | stream_id: 1.into(), |
665 | 12.1k | local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), |
666 | 12.1k | } |
667 | 12.1k | } |
668 | | |
669 | | /// Indicates the initial window size (in octets) for stream-level |
670 | | /// flow control for received data. |
671 | | /// |
672 | | /// The initial window of a stream is used as part of flow control. For more |
673 | | /// details, see [`FlowControl`]. |
674 | | /// |
675 | | /// The default value is 65,535. |
676 | | /// |
677 | | /// [`FlowControl`]: ../struct.FlowControl.html |
678 | | /// |
679 | | /// # Examples |
680 | | /// |
681 | | /// ``` |
682 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
683 | | /// # use h2::client::*; |
684 | | /// # use bytes::Bytes; |
685 | | /// # |
686 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
687 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
688 | | /// # { |
689 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
690 | | /// // handshake. |
691 | | /// let client_fut = Builder::new() |
692 | | /// .initial_window_size(1_000_000) |
693 | | /// .handshake(my_io); |
694 | | /// # client_fut.await |
695 | | /// # } |
696 | | /// # |
697 | | /// # pub fn main() {} |
698 | | /// ``` |
699 | 0 | pub fn initial_window_size(&mut self, size: u32) -> &mut Self { |
700 | 0 | self.settings.set_initial_window_size(Some(size)); |
701 | 0 | self |
702 | 0 | } |
703 | | |
704 | | /// Indicates the initial window size (in octets) for connection-level flow control |
705 | | /// for received data. |
706 | | /// |
707 | | /// The initial window of a connection is used as part of flow control. For more details, |
708 | | /// see [`FlowControl`]. |
709 | | /// |
710 | | /// The default value is 65,535. |
711 | | /// |
712 | | /// [`FlowControl`]: ../struct.FlowControl.html |
713 | | /// |
714 | | /// # Examples |
715 | | /// |
716 | | /// ``` |
717 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
718 | | /// # use h2::client::*; |
719 | | /// # use bytes::Bytes; |
720 | | /// # |
721 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
722 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
723 | | /// # { |
724 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
725 | | /// // handshake. |
726 | | /// let client_fut = Builder::new() |
727 | | /// .initial_connection_window_size(1_000_000) |
728 | | /// .handshake(my_io); |
729 | | /// # client_fut.await |
730 | | /// # } |
731 | | /// # |
732 | | /// # pub fn main() {} |
733 | | /// ``` |
734 | 0 | pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { |
735 | 0 | self.initial_target_connection_window_size = Some(size); |
736 | 0 | self |
737 | 0 | } |
738 | | |
739 | | /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the |
740 | | /// configured client is able to accept. |
741 | | /// |
742 | | /// The sender may send data frames that are **smaller** than this value, |
743 | | /// but any data larger than `max` will be broken up into multiple `DATA` |
744 | | /// frames. |
745 | | /// |
746 | | /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. |
747 | | /// |
748 | | /// # Examples |
749 | | /// |
750 | | /// ``` |
751 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
752 | | /// # use h2::client::*; |
753 | | /// # use bytes::Bytes; |
754 | | /// # |
755 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
756 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
757 | | /// # { |
758 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
759 | | /// // handshake. |
760 | | /// let client_fut = Builder::new() |
761 | | /// .max_frame_size(1_000_000) |
762 | | /// .handshake(my_io); |
763 | | /// # client_fut.await |
764 | | /// # } |
765 | | /// # |
766 | | /// # pub fn main() {} |
767 | | /// ``` |
768 | | /// |
769 | | /// # Panics |
770 | | /// |
771 | | /// This function panics if `max` is not within the legal range specified |
772 | | /// above. |
773 | 0 | pub fn max_frame_size(&mut self, max: u32) -> &mut Self { |
774 | 0 | self.settings.set_max_frame_size(Some(max)); |
775 | 0 | self |
776 | 0 | } |
777 | | |
778 | | /// Sets the max size of received header frames. |
779 | | /// |
780 | | /// This advisory setting informs a peer of the maximum size of header list |
781 | | /// that the sender is prepared to accept, in octets. The value is based on |
782 | | /// the uncompressed size of header fields, including the length of the name |
783 | | /// and value in octets plus an overhead of 32 octets for each header field. |
784 | | /// |
785 | | /// This setting is also used to limit the maximum amount of data that is |
786 | | /// buffered to decode HEADERS frames. |
787 | | /// |
788 | | /// # Examples |
789 | | /// |
790 | | /// ``` |
791 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
792 | | /// # use h2::client::*; |
793 | | /// # use bytes::Bytes; |
794 | | /// # |
795 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
796 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
797 | | /// # { |
798 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
799 | | /// // handshake. |
800 | | /// let client_fut = Builder::new() |
801 | | /// .max_header_list_size(16 * 1024) |
802 | | /// .handshake(my_io); |
803 | | /// # client_fut.await |
804 | | /// # } |
805 | | /// # |
806 | | /// # pub fn main() {} |
807 | | /// ``` |
808 | 0 | pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { |
809 | 0 | self.settings.set_max_header_list_size(Some(max)); |
810 | 0 | self |
811 | 0 | } |
812 | | |
813 | | /// Sets the maximum number of concurrent streams. |
814 | | /// |
815 | | /// The maximum concurrent streams setting only controls the maximum number |
816 | | /// of streams that can be initiated by the remote peer. In other words, |
817 | | /// when this setting is set to 100, this does not limit the number of |
818 | | /// concurrent streams that can be created by the caller. |
819 | | /// |
820 | | /// It is recommended that this value be no smaller than 100, so as to not |
821 | | /// unnecessarily limit parallelism. However, any value is legal, including |
822 | | /// 0. If `max` is set to 0, then the remote will not be permitted to |
823 | | /// initiate streams. |
824 | | /// |
825 | | /// Note that streams in the reserved state, i.e., push promises that have |
826 | | /// been reserved but the stream has not started, do not count against this |
827 | | /// setting. |
828 | | /// |
829 | | /// Also note that if the remote *does* exceed the value set here, it is not |
830 | | /// a protocol level error. Instead, the `h2` library will immediately reset |
831 | | /// the stream. |
832 | | /// |
833 | | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
834 | | /// |
835 | | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
836 | | /// |
837 | | /// # Examples |
838 | | /// |
839 | | /// ``` |
840 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
841 | | /// # use h2::client::*; |
842 | | /// # use bytes::Bytes; |
843 | | /// # |
844 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
845 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
846 | | /// # { |
847 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
848 | | /// // handshake. |
849 | | /// let client_fut = Builder::new() |
850 | | /// .max_concurrent_streams(1000) |
851 | | /// .handshake(my_io); |
852 | | /// # client_fut.await |
853 | | /// # } |
854 | | /// # |
855 | | /// # pub fn main() {} |
856 | | /// ``` |
857 | 0 | pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { |
858 | 0 | self.settings.set_max_concurrent_streams(Some(max)); |
859 | 0 | self |
860 | 0 | } |
861 | | |
862 | | /// Sets the initial maximum of locally initiated (send) streams. |
863 | | /// |
864 | | /// The initial settings will be overwritten by the remote peer when |
865 | | /// the SETTINGS frame is received. The new value will be set to the |
866 | | /// `max_concurrent_streams()` from the frame. If no value is advertised in |
867 | | /// the initial SETTINGS frame from the remote peer as part of |
868 | | /// [HTTP/2 Connection Preface], `usize::MAX` will be set. |
869 | | /// |
870 | | /// This setting prevents the caller from exceeding this number of |
871 | | /// streams that are counted towards the concurrency limit. |
872 | | /// |
873 | | /// Sending streams past the limit returned by the peer will be treated |
874 | | /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM. |
875 | | /// |
876 | | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
877 | | /// |
878 | | /// The default value is `usize::MAX`. |
879 | | /// |
880 | | /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface |
881 | | /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2 |
882 | | /// |
883 | | /// # Examples |
884 | | /// |
885 | | /// ``` |
886 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
887 | | /// # use h2::client::*; |
888 | | /// # use bytes::Bytes; |
889 | | /// # |
890 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
891 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
892 | | /// # { |
893 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
894 | | /// // handshake. |
895 | | /// let client_fut = Builder::new() |
896 | | /// .initial_max_send_streams(1000) |
897 | | /// .handshake(my_io); |
898 | | /// # client_fut.await |
899 | | /// # } |
900 | | /// # |
901 | | /// # pub fn main() {} |
902 | | /// ``` |
903 | 0 | pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self { |
904 | 0 | self.initial_max_send_streams = initial; |
905 | 0 | self |
906 | 0 | } |
907 | | |
908 | | /// Sets the maximum number of concurrent locally reset streams. |
909 | | /// |
910 | | /// When a stream is explicitly reset, the HTTP/2 specification requires |
911 | | /// that any further frames received for that stream must be ignored for |
912 | | /// "some time". |
913 | | /// |
914 | | /// In order to satisfy the specification, internal state must be maintained |
915 | | /// to implement the behavior. This state grows linearly with the number of |
916 | | /// streams that are locally reset. |
917 | | /// |
918 | | /// The `max_concurrent_reset_streams` setting configures sets an upper |
919 | | /// bound on the amount of state that is maintained. When this max value is |
920 | | /// reached, the oldest reset stream is purged from memory. |
921 | | /// |
922 | | /// Once the stream has been fully purged from memory, any additional frames |
923 | | /// received for that stream will result in a connection level protocol |
924 | | /// error, forcing the connection to terminate. |
925 | | /// |
926 | | /// The default value is 10. |
927 | | /// |
928 | | /// # Examples |
929 | | /// |
930 | | /// ``` |
931 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
932 | | /// # use h2::client::*; |
933 | | /// # use bytes::Bytes; |
934 | | /// # |
935 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
936 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
937 | | /// # { |
938 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
939 | | /// // handshake. |
940 | | /// let client_fut = Builder::new() |
941 | | /// .max_concurrent_reset_streams(1000) |
942 | | /// .handshake(my_io); |
943 | | /// # client_fut.await |
944 | | /// # } |
945 | | /// # |
946 | | /// # pub fn main() {} |
947 | | /// ``` |
948 | 0 | pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { |
949 | 0 | self.reset_stream_max = max; |
950 | 0 | self |
951 | 0 | } |
952 | | |
953 | | /// Sets the duration to remember locally reset streams. |
954 | | /// |
955 | | /// When a stream is explicitly reset, the HTTP/2 specification requires |
956 | | /// that any further frames received for that stream must be ignored for |
957 | | /// "some time". |
958 | | /// |
959 | | /// In order to satisfy the specification, internal state must be maintained |
960 | | /// to implement the behavior. This state grows linearly with the number of |
961 | | /// streams that are locally reset. |
962 | | /// |
963 | | /// The `reset_stream_duration` setting configures the max amount of time |
964 | | /// this state will be maintained in memory. Once the duration elapses, the |
965 | | /// stream state is purged from memory. |
966 | | /// |
967 | | /// Once the stream has been fully purged from memory, any additional frames |
968 | | /// received for that stream will result in a connection level protocol |
969 | | /// error, forcing the connection to terminate. |
970 | | /// |
971 | | /// The default value is 30 seconds. |
972 | | /// |
973 | | /// # Examples |
974 | | /// |
975 | | /// ``` |
976 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
977 | | /// # use h2::client::*; |
978 | | /// # use std::time::Duration; |
979 | | /// # use bytes::Bytes; |
980 | | /// # |
981 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
982 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
983 | | /// # { |
984 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
985 | | /// // handshake. |
986 | | /// let client_fut = Builder::new() |
987 | | /// .reset_stream_duration(Duration::from_secs(10)) |
988 | | /// .handshake(my_io); |
989 | | /// # client_fut.await |
990 | | /// # } |
991 | | /// # |
992 | | /// # pub fn main() {} |
993 | | /// ``` |
994 | 0 | pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { |
995 | 0 | self.reset_stream_duration = dur; |
996 | 0 | self |
997 | 0 | } |
998 | | |
999 | | /// Sets the maximum number of local resets due to protocol errors made by the remote end. |
1000 | | /// |
1001 | | /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. |
1002 | | /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. |
1003 | | /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. |
1004 | | /// |
1005 | | /// When the number of local resets exceeds this threshold, the client will close the connection. |
1006 | | /// |
1007 | | /// If you really want to disable this, supply [`Option::None`] here. |
1008 | | /// Disabling this is not recommended and may expose you to DOS attacks. |
1009 | | /// |
1010 | | /// The default value is currently 1024, but could change. |
1011 | 0 | pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self { |
1012 | 0 | self.local_max_error_reset_streams = max; |
1013 | 0 | self |
1014 | 0 | } |
1015 | | |
1016 | | /// Sets the maximum number of pending-accept remotely-reset streams. |
1017 | | /// |
1018 | | /// Streams that have been received by the peer, but not accepted by the |
1019 | | /// user, can also receive a RST_STREAM. This is a legitimate pattern: one |
1020 | | /// could send a request and then shortly after, realize it is not needed, |
1021 | | /// sending a CANCEL. |
1022 | | /// |
1023 | | /// However, since those streams are now "closed", they don't count towards |
1024 | | /// the max concurrent streams. So, they will sit in the accept queue, |
1025 | | /// using memory. |
1026 | | /// |
1027 | | /// When the number of remotely-reset streams sitting in the pending-accept |
1028 | | /// queue reaches this maximum value, a connection error with the code of |
1029 | | /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the |
1030 | | /// `Future`. |
1031 | | /// |
1032 | | /// The default value is currently 20, but could change. |
1033 | | /// |
1034 | | /// # Examples |
1035 | | /// |
1036 | | /// ``` |
1037 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1038 | | /// # use h2::client::*; |
1039 | | /// # use bytes::Bytes; |
1040 | | /// # |
1041 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1042 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1043 | | /// # { |
1044 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1045 | | /// // handshake. |
1046 | | /// let client_fut = Builder::new() |
1047 | | /// .max_pending_accept_reset_streams(100) |
1048 | | /// .handshake(my_io); |
1049 | | /// # client_fut.await |
1050 | | /// # } |
1051 | | /// # |
1052 | | /// # pub fn main() {} |
1053 | | /// ``` |
1054 | 0 | pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { |
1055 | 0 | self.pending_accept_reset_stream_max = max; |
1056 | 0 | self |
1057 | 0 | } |
1058 | | |
1059 | | /// Sets the maximum send buffer size per stream. |
1060 | | /// |
1061 | | /// Once a stream has buffered up to (or over) the maximum, the stream's |
1062 | | /// flow control will not "poll" additional capacity. Once bytes for the |
1063 | | /// stream have been written to the connection, the send buffer capacity |
1064 | | /// will be freed up again. |
1065 | | /// |
1066 | | /// The default is currently ~400KB, but may change. |
1067 | | /// |
1068 | | /// # Panics |
1069 | | /// |
1070 | | /// This function panics if `max` is larger than `u32::MAX`. |
1071 | 0 | pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { |
1072 | 0 | assert!(max <= u32::MAX as usize); |
1073 | 0 | self.max_send_buffer_size = max; |
1074 | 0 | self |
1075 | 0 | } |
1076 | | |
1077 | | /// Enables or disables server push promises. |
1078 | | /// |
1079 | | /// This value is included in the initial SETTINGS handshake. |
1080 | | /// Setting this value to value to |
1081 | | /// false in the initial SETTINGS handshake guarantees that the remote server |
1082 | | /// will never send a push promise. |
1083 | | /// |
1084 | | /// This setting can be changed during the life of a single HTTP/2 |
1085 | | /// connection by sending another settings frame updating the value. |
1086 | | /// |
1087 | | /// Default value: `true`. |
1088 | | /// |
1089 | | /// # Examples |
1090 | | /// |
1091 | | /// ``` |
1092 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1093 | | /// # use h2::client::*; |
1094 | | /// # use std::time::Duration; |
1095 | | /// # use bytes::Bytes; |
1096 | | /// # |
1097 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1098 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1099 | | /// # { |
1100 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1101 | | /// // handshake. |
1102 | | /// let client_fut = Builder::new() |
1103 | | /// .enable_push(false) |
1104 | | /// .handshake(my_io); |
1105 | | /// # client_fut.await |
1106 | | /// # } |
1107 | | /// # |
1108 | | /// # pub fn main() {} |
1109 | | /// ``` |
1110 | 0 | pub fn enable_push(&mut self, enabled: bool) -> &mut Self { |
1111 | 0 | self.settings.set_enable_push(enabled); |
1112 | 0 | self |
1113 | 0 | } |
1114 | | |
1115 | | /// Sets the header table size. |
1116 | | /// |
1117 | | /// This setting informs the peer of the maximum size of the header compression |
1118 | | /// table used to encode header blocks, in octets. The encoder may select any value |
1119 | | /// equal to or less than the header table size specified by the sender. |
1120 | | /// |
1121 | | /// The default value is 4,096. |
1122 | | /// |
1123 | | /// # Examples |
1124 | | /// |
1125 | | /// ``` |
1126 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1127 | | /// # use h2::client::*; |
1128 | | /// # use bytes::Bytes; |
1129 | | /// # |
1130 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1131 | | /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1132 | | /// # { |
1133 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1134 | | /// // handshake. |
1135 | | /// let client_fut = Builder::new() |
1136 | | /// .header_table_size(1_000_000) |
1137 | | /// .handshake(my_io); |
1138 | | /// # client_fut.await |
1139 | | /// # } |
1140 | | /// # |
1141 | | /// # pub fn main() {} |
1142 | | /// ``` |
1143 | 0 | pub fn header_table_size(&mut self, size: u32) -> &mut Self { |
1144 | 0 | self.settings.set_header_table_size(Some(size)); |
1145 | 0 | self |
1146 | 0 | } |
1147 | | |
1148 | | /// Sets the first stream ID to something other than 1. |
1149 | | #[cfg(feature = "unstable")] |
1150 | 0 | pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { |
1151 | 0 | self.stream_id = stream_id.into(); |
1152 | 0 | assert!( |
1153 | 0 | self.stream_id.is_client_initiated(), |
1154 | 0 | "stream id must be odd" |
1155 | | ); |
1156 | 0 | self |
1157 | 0 | } |
1158 | | |
1159 | | /// Creates a new configured HTTP/2 client backed by `io`. |
1160 | | /// |
1161 | | /// It is expected that `io` already be in an appropriate state to commence |
1162 | | /// the [HTTP/2 handshake]. The handshake is completed once both the connection |
1163 | | /// preface and the initial settings frame is sent by the client. |
1164 | | /// |
1165 | | /// The handshake future does not wait for the initial settings frame from the |
1166 | | /// server. |
1167 | | /// |
1168 | | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1169 | | /// tuple once the HTTP/2 handshake has been completed. |
1170 | | /// |
1171 | | /// This function also allows the caller to configure the send payload data |
1172 | | /// type. See [Outbound data type] for more details. |
1173 | | /// |
1174 | | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1175 | | /// [`Connection`]: struct.Connection.html |
1176 | | /// [`SendRequest`]: struct.SendRequest.html |
1177 | | /// [Outbound data type]: ../index.html#outbound-data-type. |
1178 | | /// |
1179 | | /// # Examples |
1180 | | /// |
1181 | | /// Basic usage: |
1182 | | /// |
1183 | | /// ``` |
1184 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1185 | | /// # use h2::client::*; |
1186 | | /// # use bytes::Bytes; |
1187 | | /// # |
1188 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1189 | | /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> |
1190 | | /// # { |
1191 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1192 | | /// // handshake. |
1193 | | /// let client_fut = Builder::new() |
1194 | | /// .handshake(my_io); |
1195 | | /// # client_fut.await |
1196 | | /// # } |
1197 | | /// # |
1198 | | /// # pub fn main() {} |
1199 | | /// ``` |
1200 | | /// |
1201 | | /// Configures the send-payload data type. In this case, the outbound data |
1202 | | /// type will be `&'static [u8]`. |
1203 | | /// |
1204 | | /// ``` |
1205 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1206 | | /// # use h2::client::*; |
1207 | | /// # |
1208 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) |
1209 | | /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error> |
1210 | | /// # { |
1211 | | /// // `client_fut` is a future representing the completion of the HTTP/2 |
1212 | | /// // handshake. |
1213 | | /// let client_fut = Builder::new() |
1214 | | /// .handshake::<_, &'static [u8]>(my_io); |
1215 | | /// # client_fut.await |
1216 | | /// # } |
1217 | | /// # |
1218 | | /// # pub fn main() {} |
1219 | | /// ``` |
1220 | 12.1k | pub fn handshake<T, B>( |
1221 | 12.1k | &self, |
1222 | 12.1k | io: T, |
1223 | 12.1k | ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> |
1224 | 12.1k | where |
1225 | 12.1k | T: AsyncRead + AsyncWrite + Unpin, |
1226 | 12.1k | B: Buf, |
1227 | 12.1k | { |
1228 | 12.1k | Connection::handshake2(io, self.clone()) |
1229 | 12.1k | } Unexecuted instantiation: <h2::client::Builder>::handshake::<_, _> <h2::client::Builder>::handshake::<h2_support::mock::Mock, bytes::bytes::Bytes> Line | Count | Source | 1220 | 448 | pub fn handshake<T, B>( | 1221 | 448 | &self, | 1222 | 448 | io: T, | 1223 | 448 | ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> | 1224 | 448 | where | 1225 | 448 | T: AsyncRead + AsyncWrite + Unpin, | 1226 | 448 | B: Buf, | 1227 | 448 | { | 1228 | 448 | Connection::handshake2(io, self.clone()) | 1229 | 448 | } |
<h2::client::Builder>::handshake::<fuzz_e2e::MockIo, bytes::bytes::Bytes> Line | Count | Source | 1220 | 11.6k | pub fn handshake<T, B>( | 1221 | 11.6k | &self, | 1222 | 11.6k | io: T, | 1223 | 11.6k | ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> | 1224 | 11.6k | where | 1225 | 11.6k | T: AsyncRead + AsyncWrite + Unpin, | 1226 | 11.6k | B: Buf, | 1227 | 11.6k | { | 1228 | 11.6k | Connection::handshake2(io, self.clone()) | 1229 | 11.6k | } |
|
1230 | | } |
1231 | | |
1232 | | impl Default for Builder { |
1233 | 0 | fn default() -> Builder { |
1234 | 0 | Builder::new() |
1235 | 0 | } |
1236 | | } |
1237 | | |
1238 | | /// Creates a new configured HTTP/2 client with default configuration |
1239 | | /// values backed by `io`. |
1240 | | /// |
1241 | | /// It is expected that `io` already be in an appropriate state to commence |
1242 | | /// the [HTTP/2 handshake]. See [Handshake] for more details. |
1243 | | /// |
1244 | | /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] |
1245 | | /// tuple once the HTTP/2 handshake has been completed. The returned |
1246 | | /// [`Connection`] instance will be using default configuration values. Use |
1247 | | /// [`Builder`] to customize the configuration values used by a [`Connection`] |
1248 | | /// instance. |
1249 | | /// |
1250 | | /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader |
1251 | | /// [Handshake]: ../index.html#handshake |
1252 | | /// [`Connection`]: struct.Connection.html |
1253 | | /// [`SendRequest`]: struct.SendRequest.html |
1254 | | /// |
1255 | | /// # Examples |
1256 | | /// |
1257 | | /// ``` |
1258 | | /// # use tokio::io::{AsyncRead, AsyncWrite}; |
1259 | | /// # use h2::client; |
1260 | | /// # use h2::client::*; |
1261 | | /// # |
1262 | | /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error> |
1263 | | /// # { |
1264 | | /// let (send_request, connection) = client::handshake(my_io).await?; |
1265 | | /// // The HTTP/2 handshake has completed, now start polling |
1266 | | /// // `connection` and use `send_request` to send requests to the |
1267 | | /// // server. |
1268 | | /// # Ok(()) |
1269 | | /// # } |
1270 | | /// # |
1271 | | /// # pub fn main() {} |
1272 | | /// ``` |
1273 | 11.6k | pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> |
1274 | 11.6k | where |
1275 | 11.6k | T: AsyncRead + AsyncWrite + Unpin, |
1276 | 11.6k | { Unexecuted instantiation: h2::client::handshake::<_> h2::client::handshake::<fuzz_e2e::MockIo> Line | Count | Source | 1273 | 11.6k | pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> | 1274 | 11.6k | where | 1275 | 11.6k | T: AsyncRead + AsyncWrite + Unpin, | 1276 | 11.6k | { |
|
1277 | 11.6k | let builder = Builder::new(); |
1278 | 11.6k | builder |
1279 | 11.6k | .handshake(io) |
1280 | 11.6k | .instrument(tracing::trace_span!("client_handshake")) |
1281 | 57.1k | .await |
1282 | 11.6k | } Unexecuted instantiation: h2::client::handshake::<_>::{closure#0} h2::client::handshake::<fuzz_e2e::MockIo>::{closure#0} Line | Count | Source | 1276 | 11.6k | { | 1277 | 11.6k | let builder = Builder::new(); | 1278 | 11.6k | builder | 1279 | 11.6k | .handshake(io) | 1280 | 11.6k | .instrument(tracing::trace_span!("client_handshake")) | 1281 | 57.1k | .await | 1282 | 11.6k | } |
|
1283 | | |
1284 | | // ===== impl Connection ===== |
1285 | | |
1286 | 12.1k | async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error> |
1287 | 12.1k | where |
1288 | 12.1k | T: AsyncRead + AsyncWrite + Unpin, |
1289 | 12.1k | { Unexecuted instantiation: h2::client::bind_connection::<_> h2::client::bind_connection::<h2_support::mock::Mock> Line | Count | Source | 1286 | 448 | async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error> | 1287 | 448 | where | 1288 | 448 | T: AsyncRead + AsyncWrite + Unpin, | 1289 | 448 | { |
h2::client::bind_connection::<fuzz_e2e::MockIo> Line | Count | Source | 1286 | 11.6k | async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error> | 1287 | 11.6k | where | 1288 | 11.6k | T: AsyncRead + AsyncWrite + Unpin, | 1289 | 11.6k | { |
|
1290 | 12.1k | tracing::debug!("binding client connection"); |
1291 | | |
1292 | 12.1k | let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; |
1293 | 57.1k | io.write_all(msg).await.map_err(crate::Error::from_io)?; |
1294 | | |
1295 | 12.0k | tracing::debug!("client connection bound"); |
1296 | | |
1297 | 12.0k | Ok(()) |
1298 | 12.1k | } Unexecuted instantiation: h2::client::bind_connection::<_>::{closure#0} h2::client::bind_connection::<h2_support::mock::Mock>::{closure#0} Line | Count | Source | 1289 | 448 | { | 1290 | 448 | tracing::debug!("binding client connection"); | 1291 | | | 1292 | 448 | let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; | 1293 | 448 | io.write_all(msg).await.map_err(crate::Error::from_io)?; | 1294 | | | 1295 | 448 | tracing::debug!("client connection bound"); | 1296 | | | 1297 | 448 | Ok(()) | 1298 | 448 | } |
h2::client::bind_connection::<fuzz_e2e::MockIo>::{closure#0} Line | Count | Source | 1289 | 11.6k | { | 1290 | 11.6k | tracing::debug!("binding client connection"); | 1291 | | | 1292 | 11.6k | let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; | 1293 | 57.1k | io.write_all(msg).await.map_err(crate::Error::from_io)?; | 1294 | | | 1295 | 11.6k | tracing::debug!("client connection bound"); | 1296 | | | 1297 | 11.6k | Ok(()) | 1298 | 11.6k | } |
|
1299 | | |
1300 | | impl<T, B> Connection<T, B> |
1301 | | where |
1302 | | T: AsyncRead + AsyncWrite + Unpin, |
1303 | | B: Buf, |
1304 | | { |
1305 | 12.1k | async fn handshake2( |
1306 | 12.1k | mut io: T, |
1307 | 12.1k | builder: Builder, |
1308 | 12.1k | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { Unexecuted instantiation: <h2::client::Connection<_, _>>::handshake2 <h2::client::Connection<h2_support::mock::Mock>>::handshake2 Line | Count | Source | 1305 | 448 | async fn handshake2( | 1306 | 448 | mut io: T, | 1307 | 448 | builder: Builder, | 1308 | 448 | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { |
<h2::client::Connection<fuzz_e2e::MockIo>>::handshake2 Line | Count | Source | 1305 | 11.6k | async fn handshake2( | 1306 | 11.6k | mut io: T, | 1307 | 11.6k | builder: Builder, | 1308 | 11.6k | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { |
|
1309 | 57.1k | bind_connection(&mut io).await?; |
1310 | | |
1311 | | // Create the codec |
1312 | 12.0k | let mut codec = Codec::new(io); |
1313 | | |
1314 | 12.0k | if let Some(max) = builder.settings.max_frame_size() { |
1315 | 0 | codec.set_max_recv_frame_size(max as usize); |
1316 | 12.0k | } |
1317 | | |
1318 | 12.0k | if let Some(max) = builder.settings.max_header_list_size() { |
1319 | 0 | codec.set_max_recv_header_list_size(max as usize); |
1320 | 12.0k | } |
1321 | | |
1322 | | // Send initial settings frame |
1323 | 12.0k | codec |
1324 | 12.0k | .buffer(builder.settings.clone().into()) |
1325 | 12.0k | .expect("invalid SETTINGS frame"); |
1326 | 12.0k | |
1327 | 12.0k | let inner = proto::Connection::new( |
1328 | 12.0k | codec, |
1329 | 12.0k | proto::Config { |
1330 | 12.0k | next_stream_id: builder.stream_id, |
1331 | 12.0k | initial_max_send_streams: builder.initial_max_send_streams, |
1332 | 12.0k | max_send_buffer_size: builder.max_send_buffer_size, |
1333 | 12.0k | reset_stream_duration: builder.reset_stream_duration, |
1334 | 12.0k | reset_stream_max: builder.reset_stream_max, |
1335 | 12.0k | remote_reset_stream_max: builder.pending_accept_reset_stream_max, |
1336 | 12.0k | local_error_reset_streams_max: builder.local_max_error_reset_streams, |
1337 | 12.0k | settings: builder.settings.clone(), |
1338 | 12.0k | }, |
1339 | 12.0k | ); |
1340 | 12.0k | let send_request = SendRequest { |
1341 | 12.0k | inner: inner.streams().clone(), |
1342 | 12.0k | pending: None, |
1343 | 12.0k | }; |
1344 | 12.0k | |
1345 | 12.0k | let mut connection = Connection { inner }; |
1346 | 12.0k | if let Some(sz) = builder.initial_target_connection_window_size { |
1347 | 0 | connection.set_target_window_size(sz); |
1348 | 12.0k | } |
1349 | | |
1350 | 12.0k | Ok((send_request, connection)) |
1351 | 12.1k | } Unexecuted instantiation: <h2::client::Connection<_, _>>::handshake2::{closure#0} <h2::client::Connection<h2_support::mock::Mock>>::handshake2::{closure#0} Line | Count | Source | 1308 | 448 | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { | 1309 | 448 | bind_connection(&mut io).await?; | 1310 | | | 1311 | | // Create the codec | 1312 | 448 | let mut codec = Codec::new(io); | 1313 | | | 1314 | 448 | if let Some(max) = builder.settings.max_frame_size() { | 1315 | 0 | codec.set_max_recv_frame_size(max as usize); | 1316 | 448 | } | 1317 | | | 1318 | 448 | if let Some(max) = builder.settings.max_header_list_size() { | 1319 | 0 | codec.set_max_recv_header_list_size(max as usize); | 1320 | 448 | } | 1321 | | | 1322 | | // Send initial settings frame | 1323 | 448 | codec | 1324 | 448 | .buffer(builder.settings.clone().into()) | 1325 | 448 | .expect("invalid SETTINGS frame"); | 1326 | 448 | | 1327 | 448 | let inner = proto::Connection::new( | 1328 | 448 | codec, | 1329 | 448 | proto::Config { | 1330 | 448 | next_stream_id: builder.stream_id, | 1331 | 448 | initial_max_send_streams: builder.initial_max_send_streams, | 1332 | 448 | max_send_buffer_size: builder.max_send_buffer_size, | 1333 | 448 | reset_stream_duration: builder.reset_stream_duration, | 1334 | 448 | reset_stream_max: builder.reset_stream_max, | 1335 | 448 | remote_reset_stream_max: builder.pending_accept_reset_stream_max, | 1336 | 448 | local_error_reset_streams_max: builder.local_max_error_reset_streams, | 1337 | 448 | settings: builder.settings.clone(), | 1338 | 448 | }, | 1339 | 448 | ); | 1340 | 448 | let send_request = SendRequest { | 1341 | 448 | inner: inner.streams().clone(), | 1342 | 448 | pending: None, | 1343 | 448 | }; | 1344 | 448 | | 1345 | 448 | let mut connection = Connection { inner }; | 1346 | 448 | if let Some(sz) = builder.initial_target_connection_window_size { | 1347 | 0 | connection.set_target_window_size(sz); | 1348 | 448 | } | 1349 | | | 1350 | 448 | Ok((send_request, connection)) | 1351 | 448 | } |
<h2::client::Connection<fuzz_e2e::MockIo>>::handshake2::{closure#0} Line | Count | Source | 1308 | 11.6k | ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> { | 1309 | 57.1k | bind_connection(&mut io).await?; | 1310 | | | 1311 | | // Create the codec | 1312 | 11.6k | let mut codec = Codec::new(io); | 1313 | | | 1314 | 11.6k | if let Some(max) = builder.settings.max_frame_size() { | 1315 | 0 | codec.set_max_recv_frame_size(max as usize); | 1316 | 11.6k | } | 1317 | | | 1318 | 11.6k | if let Some(max) = builder.settings.max_header_list_size() { | 1319 | 0 | codec.set_max_recv_header_list_size(max as usize); | 1320 | 11.6k | } | 1321 | | | 1322 | | // Send initial settings frame | 1323 | 11.6k | codec | 1324 | 11.6k | .buffer(builder.settings.clone().into()) | 1325 | 11.6k | .expect("invalid SETTINGS frame"); | 1326 | 11.6k | | 1327 | 11.6k | let inner = proto::Connection::new( | 1328 | 11.6k | codec, | 1329 | 11.6k | proto::Config { | 1330 | 11.6k | next_stream_id: builder.stream_id, | 1331 | 11.6k | initial_max_send_streams: builder.initial_max_send_streams, | 1332 | 11.6k | max_send_buffer_size: builder.max_send_buffer_size, | 1333 | 11.6k | reset_stream_duration: builder.reset_stream_duration, | 1334 | 11.6k | reset_stream_max: builder.reset_stream_max, | 1335 | 11.6k | remote_reset_stream_max: builder.pending_accept_reset_stream_max, | 1336 | 11.6k | local_error_reset_streams_max: builder.local_max_error_reset_streams, | 1337 | 11.6k | settings: builder.settings.clone(), | 1338 | 11.6k | }, | 1339 | 11.6k | ); | 1340 | 11.6k | let send_request = SendRequest { | 1341 | 11.6k | inner: inner.streams().clone(), | 1342 | 11.6k | pending: None, | 1343 | 11.6k | }; | 1344 | 11.6k | | 1345 | 11.6k | let mut connection = Connection { inner }; | 1346 | 11.6k | if let Some(sz) = builder.initial_target_connection_window_size { | 1347 | 0 | connection.set_target_window_size(sz); | 1348 | 11.6k | } | 1349 | | | 1350 | 11.6k | Ok((send_request, connection)) | 1351 | 11.6k | } |
|
1352 | | |
1353 | | /// Sets the target window size for the whole connection. |
1354 | | /// |
1355 | | /// If `size` is greater than the current value, then a `WINDOW_UPDATE` |
1356 | | /// frame will be immediately sent to the remote, increasing the connection |
1357 | | /// level window by `size - current_value`. |
1358 | | /// |
1359 | | /// If `size` is less than the current value, nothing will happen |
1360 | | /// immediately. However, as window capacity is released by |
1361 | | /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent |
1362 | | /// out until the number of "in flight" bytes drops below `size`. |
1363 | | /// |
1364 | | /// The default value is 65,535. |
1365 | | /// |
1366 | | /// See [`FlowControl`] documentation for more details. |
1367 | | /// |
1368 | | /// [`FlowControl`]: ../struct.FlowControl.html |
1369 | | /// [library level]: ../index.html#flow-control |
1370 | 0 | pub fn set_target_window_size(&mut self, size: u32) { |
1371 | 0 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1372 | 0 | self.inner.set_target_window_size(size); |
1373 | 0 | } Unexecuted instantiation: <h2::client::Connection<_, _>>::set_target_window_size Unexecuted instantiation: <h2::client::Connection<h2_support::mock::Mock>>::set_target_window_size Unexecuted instantiation: <h2::client::Connection<fuzz_e2e::MockIo>>::set_target_window_size |
1374 | | |
1375 | | /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level |
1376 | | /// flow control for received data. |
1377 | | /// |
1378 | | /// The `SETTINGS` will be sent to the remote, and only applied once the |
1379 | | /// remote acknowledges the change. |
1380 | | /// |
1381 | | /// This can be used to increase or decrease the window size for existing |
1382 | | /// streams. |
1383 | | /// |
1384 | | /// # Errors |
1385 | | /// |
1386 | | /// Returns an error if a previous call is still pending acknowledgement |
1387 | | /// from the remote endpoint. |
1388 | 0 | pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { |
1389 | 0 | assert!(size <= proto::MAX_WINDOW_SIZE); |
1390 | 0 | self.inner.set_initial_window_size(size)?; |
1391 | 0 | Ok(()) |
1392 | 0 | } |
1393 | | |
1394 | | /// Takes a `PingPong` instance from the connection. |
1395 | | /// |
1396 | | /// # Note |
1397 | | /// |
1398 | | /// This may only be called once. Calling multiple times will return `None`. |
1399 | 0 | pub fn ping_pong(&mut self) -> Option<PingPong> { |
1400 | 0 | self.inner.take_user_pings().map(PingPong::new) |
1401 | 0 | } |
1402 | | |
1403 | | /// Returns the maximum number of concurrent streams that may be initiated |
1404 | | /// by this client. |
1405 | | /// |
1406 | | /// This limit is configured by the server peer by sending the |
1407 | | /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame. |
1408 | | /// This method returns the currently acknowledged value received from the |
1409 | | /// remote. |
1410 | | /// |
1411 | | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1412 | 0 | pub fn max_concurrent_send_streams(&self) -> usize { |
1413 | 0 | self.inner.max_send_streams() |
1414 | 0 | } |
1415 | | /// Returns the maximum number of concurrent streams that may be initiated |
1416 | | /// by the server on this connection. |
1417 | | /// |
1418 | | /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS` |
1419 | | /// parameter][1] sent in a `SETTINGS` frame that has been |
1420 | | /// acknowledged by the remote peer. The value to be sent is configured by |
1421 | | /// the [`Builder::max_concurrent_streams`][2] method before handshaking |
1422 | | /// with the remote peer. |
1423 | | /// |
1424 | | /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 |
1425 | | /// [2]: ../struct.Builder.html#method.max_concurrent_streams |
1426 | 0 | pub fn max_concurrent_recv_streams(&self) -> usize { |
1427 | 0 | self.inner.max_recv_streams() |
1428 | 0 | } |
1429 | | } |
1430 | | |
1431 | | impl<T, B> Future for Connection<T, B> |
1432 | | where |
1433 | | T: AsyncRead + AsyncWrite + Unpin, |
1434 | | B: Buf, |
1435 | | { |
1436 | | type Output = Result<(), crate::Error>; |
1437 | | |
1438 | 1.02M | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1439 | 1.02M | self.inner.maybe_close_connection_if_no_streams(); |
1440 | 1.02M | let had_streams_or_refs = self.inner.has_streams_or_other_references(); |
1441 | 1.02M | let result = self.inner.poll(cx).map_err(Into::into); |
1442 | 1.02M | // if we had streams/refs, and don't anymore, wake up one more time to |
1443 | 1.02M | // ensure proper shutdown |
1444 | 1.02M | if result.is_pending() |
1445 | 1.01M | && had_streams_or_refs |
1446 | 1.01M | && !self.inner.has_streams_or_other_references() |
1447 | | { |
1448 | 0 | tracing::trace!("last stream closed during poll, wake again"); |
1449 | 0 | cx.waker().wake_by_ref(); |
1450 | 1.02M | } |
1451 | 1.02M | result |
1452 | 1.02M | } Unexecuted instantiation: <h2::client::Connection<_, _> as core::future::future::Future>::poll <h2::client::Connection<fuzz_e2e::MockIo> as core::future::future::Future>::poll Line | Count | Source | 1438 | 1.02M | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | 1439 | 1.02M | self.inner.maybe_close_connection_if_no_streams(); | 1440 | 1.02M | let had_streams_or_refs = self.inner.has_streams_or_other_references(); | 1441 | 1.02M | let result = self.inner.poll(cx).map_err(Into::into); | 1442 | 1.02M | // if we had streams/refs, and don't anymore, wake up one more time to | 1443 | 1.02M | // ensure proper shutdown | 1444 | 1.02M | if result.is_pending() | 1445 | 1.01M | && had_streams_or_refs | 1446 | 1.01M | && !self.inner.has_streams_or_other_references() | 1447 | | { | 1448 | 0 | tracing::trace!("last stream closed during poll, wake again"); | 1449 | 0 | cx.waker().wake_by_ref(); | 1450 | 1.02M | } | 1451 | 1.02M | result | 1452 | 1.02M | } |
|
1453 | | } |
1454 | | |
1455 | | impl<T, B> fmt::Debug for Connection<T, B> |
1456 | | where |
1457 | | T: AsyncRead + AsyncWrite, |
1458 | | T: fmt::Debug, |
1459 | | B: fmt::Debug + Buf, |
1460 | | { |
1461 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
1462 | 0 | fmt::Debug::fmt(&self.inner, fmt) |
1463 | 0 | } |
1464 | | } |
1465 | | |
1466 | | // ===== impl ResponseFuture ===== |
1467 | | |
1468 | | impl Future for ResponseFuture { |
1469 | | type Output = Result<Response<RecvStream>, crate::Error>; |
1470 | | |
1471 | 441k | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1472 | 441k | let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); |
1473 | 689 | let body = RecvStream::new(FlowControl::new(self.inner.clone())); |
1474 | 689 | |
1475 | 689 | Poll::Ready(Ok(Response::from_parts(parts, body))) |
1476 | 441k | } |
1477 | | } |
1478 | | |
1479 | | impl ResponseFuture { |
1480 | | /// Returns the stream ID of the response stream. |
1481 | | /// |
1482 | | /// # Panics |
1483 | | /// |
1484 | | /// If the lock on the stream store has been poisoned. |
1485 | 0 | pub fn stream_id(&self) -> crate::StreamId { |
1486 | 0 | crate::StreamId::from_internal(self.inner.stream_id()) |
1487 | 0 | } |
1488 | | /// Returns a stream of PushPromises |
1489 | | /// |
1490 | | /// # Panics |
1491 | | /// |
1492 | | /// If this method has been called before |
1493 | | /// or the stream was itself was pushed |
1494 | 0 | pub fn push_promises(&mut self) -> PushPromises { |
1495 | 0 | if self.push_promise_consumed { |
1496 | 0 | panic!("Reference to push promises stream taken!"); |
1497 | 0 | } |
1498 | 0 | self.push_promise_consumed = true; |
1499 | 0 | PushPromises { |
1500 | 0 | inner: self.inner.clone(), |
1501 | 0 | } |
1502 | 0 | } |
1503 | | } |
1504 | | |
1505 | | // ===== impl PushPromises ===== |
1506 | | |
1507 | | impl PushPromises { |
1508 | | /// Get the next `PushPromise`. |
1509 | 0 | pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> { |
1510 | 0 | crate::poll_fn(move |cx| self.poll_push_promise(cx)).await |
1511 | 0 | } |
1512 | | |
1513 | | #[doc(hidden)] |
1514 | 0 | pub fn poll_push_promise( |
1515 | 0 | &mut self, |
1516 | 0 | cx: &mut Context<'_>, |
1517 | 0 | ) -> Poll<Option<Result<PushPromise, crate::Error>>> { |
1518 | 0 | match self.inner.poll_pushed(cx) { |
1519 | 0 | Poll::Ready(Some(Ok((request, response)))) => { |
1520 | 0 | let response = PushedResponseFuture { |
1521 | 0 | inner: ResponseFuture { |
1522 | 0 | inner: response, |
1523 | 0 | push_promise_consumed: false, |
1524 | 0 | }, |
1525 | 0 | }; |
1526 | 0 | Poll::Ready(Some(Ok(PushPromise { request, response }))) |
1527 | | } |
1528 | 0 | Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), |
1529 | 0 | Poll::Ready(None) => Poll::Ready(None), |
1530 | 0 | Poll::Pending => Poll::Pending, |
1531 | | } |
1532 | 0 | } |
1533 | | } |
1534 | | |
1535 | | #[cfg(feature = "stream")] |
1536 | | impl futures_core::Stream for PushPromises { |
1537 | | type Item = Result<PushPromise, crate::Error>; |
1538 | | |
1539 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1540 | 0 | self.poll_push_promise(cx) |
1541 | 0 | } |
1542 | | } |
1543 | | |
1544 | | // ===== impl PushPromise ===== |
1545 | | |
1546 | | impl PushPromise { |
1547 | | /// Returns a reference to the push promise's request headers. |
1548 | 0 | pub fn request(&self) -> &Request<()> { |
1549 | 0 | &self.request |
1550 | 0 | } |
1551 | | |
1552 | | /// Returns a mutable reference to the push promise's request headers. |
1553 | 0 | pub fn request_mut(&mut self) -> &mut Request<()> { |
1554 | 0 | &mut self.request |
1555 | 0 | } |
1556 | | |
1557 | | /// Consumes `self`, returning the push promise's request headers and |
1558 | | /// response future. |
1559 | 0 | pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) { |
1560 | 0 | (self.request, self.response) |
1561 | 0 | } |
1562 | | } |
1563 | | |
1564 | | // ===== impl PushedResponseFuture ===== |
1565 | | |
1566 | | impl Future for PushedResponseFuture { |
1567 | | type Output = Result<Response<RecvStream>, crate::Error>; |
1568 | | |
1569 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1570 | 0 | Pin::new(&mut self.inner).poll(cx) |
1571 | 0 | } |
1572 | | } |
1573 | | |
1574 | | impl PushedResponseFuture { |
1575 | | /// Returns the stream ID of the response stream. |
1576 | | /// |
1577 | | /// # Panics |
1578 | | /// |
1579 | | /// If the lock on the stream store has been poisoned. |
1580 | 0 | pub fn stream_id(&self) -> crate::StreamId { |
1581 | 0 | self.inner.stream_id() |
1582 | 0 | } |
1583 | | } |
1584 | | |
1585 | | // ===== impl Peer ===== |
1586 | | |
1587 | | impl Peer { |
1588 | 414k | pub fn convert_send_message( |
1589 | 414k | id: StreamId, |
1590 | 414k | request: Request<()>, |
1591 | 414k | protocol: Option<Protocol>, |
1592 | 414k | end_of_stream: bool, |
1593 | 414k | ) -> Result<Headers, SendError> { |
1594 | | use http::request::Parts; |
1595 | | |
1596 | | let ( |
1597 | | Parts { |
1598 | 414k | method, |
1599 | 414k | uri, |
1600 | 414k | headers, |
1601 | 414k | version, |
1602 | 414k | .. |
1603 | 414k | }, |
1604 | 414k | _, |
1605 | 414k | ) = request.into_parts(); |
1606 | 414k | |
1607 | 414k | let is_connect = method == Method::CONNECT; |
1608 | 414k | |
1609 | 414k | // Build the set pseudo header set. All requests will include `method` |
1610 | 414k | // and `path`. |
1611 | 414k | let mut pseudo = Pseudo::request(method, uri, protocol); |
1612 | 414k | |
1613 | 414k | if pseudo.scheme.is_none() { |
1614 | | // If the scheme is not set, then there are a two options. |
1615 | | // |
1616 | | // 1) Authority is not set. In this case, a request was issued with |
1617 | | // a relative URI. This is permitted **only** when forwarding |
1618 | | // HTTP 1.x requests. If the HTTP version is set to 2.0, then |
1619 | | // this is an error. |
1620 | | // |
1621 | | // 2) Authority is set, then the HTTP method *must* be CONNECT. |
1622 | | // |
1623 | | // It is not possible to have a scheme but not an authority set (the |
1624 | | // `http` crate does not allow it). |
1625 | | // |
1626 | 400 | if pseudo.authority.is_none() { |
1627 | 74 | if version == Version::HTTP_2 { |
1628 | 0 | return Err(UserError::MissingUriSchemeAndAuthority.into()); |
1629 | 74 | } else { |
1630 | 74 | // This is acceptable as per the above comment. However, |
1631 | 74 | // HTTP/2 requires that a scheme is set. Since we are |
1632 | 74 | // forwarding an HTTP 1.1 request, the scheme is set to |
1633 | 74 | // "http". |
1634 | 74 | pseudo.set_scheme(uri::Scheme::HTTP); |
1635 | 74 | } |
1636 | 326 | } else if !is_connect { |
1637 | 326 | // TODO: Error |
1638 | 326 | } |
1639 | 413k | } |
1640 | | |
1641 | | // Create the HEADERS frame |
1642 | 414k | let mut frame = Headers::new(id, pseudo, headers); |
1643 | 414k | |
1644 | 414k | if end_of_stream { |
1645 | 448 | frame.set_end_stream() |
1646 | 413k | } |
1647 | | |
1648 | 414k | Ok(frame) |
1649 | 414k | } |
1650 | | } |
1651 | | |
1652 | | impl proto::Peer for Peer { |
1653 | | type Poll = Response<()>; |
1654 | | |
1655 | | const NAME: &'static str = "Client"; |
1656 | | |
1657 | 1.24M | fn r#dyn() -> proto::DynPeer { |
1658 | 1.24M | proto::DynPeer::Client |
1659 | 1.24M | } |
1660 | | |
1661 | | /* |
1662 | | fn is_server() -> bool { |
1663 | | false |
1664 | | } |
1665 | | */ |
1666 | | |
1667 | 1.23k | fn convert_poll_message( |
1668 | 1.23k | pseudo: Pseudo, |
1669 | 1.23k | fields: HeaderMap, |
1670 | 1.23k | stream_id: StreamId, |
1671 | 1.23k | ) -> Result<Self::Poll, Error> { |
1672 | 1.23k | let mut b = Response::builder(); |
1673 | 1.23k | |
1674 | 1.23k | b = b.version(Version::HTTP_2); |
1675 | | |
1676 | 1.23k | if let Some(status) = pseudo.status { |
1677 | 25 | b = b.status(status); |
1678 | 1.20k | } |
1679 | | |
1680 | 1.23k | let mut response = match b.body(()) { |
1681 | 1.23k | Ok(response) => response, |
1682 | | Err(_) => { |
1683 | | // TODO: Should there be more specialized handling for different |
1684 | | // kinds of errors |
1685 | 0 | return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); |
1686 | | } |
1687 | | }; |
1688 | | |
1689 | 1.23k | *response.headers_mut() = fields; |
1690 | 1.23k | |
1691 | 1.23k | Ok(response) |
1692 | 1.23k | } |
1693 | | } |