Line | Count | Source (jump to first uncovered line) |
1 | | use crate::codec::UserError; |
2 | | use crate::frame::Reason; |
3 | | use crate::proto::{self, WindowSize}; |
4 | | |
5 | | use bytes::{Buf, Bytes}; |
6 | | use http::HeaderMap; |
7 | | |
8 | | use std::fmt; |
9 | | #[cfg(feature = "stream")] |
10 | | use std::pin::Pin; |
11 | | use std::task::{Context, Poll}; |
12 | | |
13 | | /// Sends the body stream and trailers to the remote peer. |
14 | | /// |
15 | | /// # Overview |
16 | | /// |
17 | | /// A `SendStream` is provided by [`SendRequest`] and [`SendResponse`] once the |
18 | | /// HTTP/2 message header has been sent sent. It is used to stream the message |
19 | | /// body and send the message trailers. See method level documentation for more |
20 | | /// details. |
21 | | /// |
22 | | /// The `SendStream` instance is also used to manage outbound flow control. |
23 | | /// |
24 | | /// If a `SendStream` is dropped without explicitly closing the send stream, a |
25 | | /// `RST_STREAM` frame will be sent. This essentially cancels the request / |
26 | | /// response exchange. |
27 | | /// |
28 | | /// The ways to explicitly close the send stream are: |
29 | | /// |
30 | | /// * Set `end_of_stream` to true when calling [`send_request`], |
31 | | /// [`send_response`], or [`send_data`]. |
32 | | /// * Send trailers with [`send_trailers`]. |
33 | | /// * Explicitly reset the stream with [`send_reset`]. |
34 | | /// |
35 | | /// # Flow control |
36 | | /// |
37 | | /// In HTTP/2, data cannot be sent to the remote peer unless there is |
38 | | /// available window capacity on both the stream and the connection. When a data |
39 | | /// frame is sent, both the stream window and the connection window are |
40 | | /// decremented. When the stream level window reaches zero, no further data can |
41 | | /// be sent on that stream. When the connection level window reaches zero, no |
42 | | /// further data can be sent on any stream for that connection. |
43 | | /// |
44 | | /// When the remote peer is ready to receive more data, it sends `WINDOW_UPDATE` |
45 | | /// frames. These frames increment the windows. See the [specification] for more |
46 | | /// details on the principles of HTTP/2 flow control. |
47 | | /// |
48 | | /// The implications for sending data are that the caller **should** ensure that |
49 | | /// both the stream and the connection has available window capacity before |
50 | | /// loading the data to send into memory. The `SendStream` instance provides the |
51 | | /// necessary APIs to perform this logic. This, however, is not an obligation. |
52 | | /// If the caller attempts to send data on a stream when there is no available |
53 | | /// window capacity, the library will buffer the data until capacity becomes |
54 | | /// available, at which point the buffer will be flushed to the connection. |
55 | | /// |
56 | | /// **NOTE**: There is no bound on the amount of data that the library will |
57 | | /// buffer. If you are sending large amounts of data, you really should hook |
58 | | /// into the flow control lifecycle. Otherwise, you risk using up significant |
59 | | /// amounts of memory. |
60 | | /// |
61 | | /// To hook into the flow control lifecycle, the caller signals to the library |
62 | | /// that it intends to send data by calling [`reserve_capacity`], specifying the |
63 | | /// amount of data, in octets, that the caller intends to send. After this, |
64 | | /// `poll_capacity` is used to be notified when the requested capacity is |
65 | | /// assigned to the stream. Once [`poll_capacity`] returns `Ready` with the number |
66 | | /// of octets available to the stream, the caller is able to actually send the |
67 | | /// data using [`send_data`]. |
68 | | /// |
69 | | /// Because there is also a connection level window that applies to **all** |
70 | | /// streams on a connection, when capacity is assigned to a stream (indicated by |
71 | | /// `poll_capacity` returning `Ready`), this capacity is reserved on the |
72 | | /// connection and will **not** be assigned to any other stream. If data is |
73 | | /// never written to the stream, that capacity is effectively lost to other |
74 | | /// streams and this introduces the risk of deadlocking a connection. |
75 | | /// |
76 | | /// To avoid throttling data on a connection, the caller should not reserve |
77 | | /// capacity until ready to send data and once any capacity is assigned to the |
78 | | /// stream, the caller should immediately send data consuming this capacity. |
79 | | /// There is no guarantee as to when the full capacity requested will become |
80 | | /// available. For example, if the caller requests 64 KB of data and 512 bytes |
81 | | /// become available, the caller should immediately send 512 bytes of data. |
82 | | /// |
83 | | /// See [`reserve_capacity`] documentation for more details. |
84 | | /// |
85 | | /// [`SendRequest`]: client/struct.SendRequest.html |
86 | | /// [`SendResponse`]: server/struct.SendResponse.html |
87 | | /// [specification]: http://httpwg.org/specs/rfc7540.html#FlowControl |
88 | | /// [`reserve_capacity`]: #method.reserve_capacity |
89 | | /// [`poll_capacity`]: #method.poll_capacity |
90 | | /// [`send_data`]: #method.send_data |
91 | | /// [`send_request`]: client/struct.SendRequest.html#method.send_request |
92 | | /// [`send_response`]: server/struct.SendResponse.html#method.send_response |
93 | | /// [`send_data`]: #method.send_data |
94 | | /// [`send_trailers`]: #method.send_trailers |
95 | | /// [`send_reset`]: #method.send_reset |
96 | | #[derive(Debug)] |
97 | | pub struct SendStream<B> { |
98 | | inner: proto::StreamRef<B>, |
99 | | } |
100 | | |
101 | | /// A stream identifier, as described in [Section 5.1.1] of RFC 7540. |
102 | | /// |
103 | | /// Streams are identified with an unsigned 31-bit integer. Streams |
104 | | /// initiated by a client MUST use odd-numbered stream identifiers; those |
105 | | /// initiated by the server MUST use even-numbered stream identifiers. A |
106 | | /// stream identifier of zero (0x0) is used for connection control |
107 | | /// messages; the stream identifier of zero cannot be used to establish a |
108 | | /// new stream. |
109 | | /// |
110 | | /// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1 |
111 | | #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] |
112 | | pub struct StreamId(u32); |
113 | | |
114 | | impl From<StreamId> for u32 { |
115 | 0 | fn from(src: StreamId) -> Self { |
116 | 0 | src.0 |
117 | 0 | } |
118 | | } |
119 | | |
120 | | /// Receives the body stream and trailers from the remote peer. |
121 | | /// |
122 | | /// A `RecvStream` is provided by [`client::ResponseFuture`] and |
123 | | /// [`server::Connection`] with the received HTTP/2 message head (the response |
124 | | /// and request head respectively). |
125 | | /// |
126 | | /// A `RecvStream` instance is used to receive the streaming message body and |
127 | | /// any trailers from the remote peer. It is also used to manage inbound flow |
128 | | /// control. |
129 | | /// |
130 | | /// See method level documentation for more details on receiving data. See |
131 | | /// [`FlowControl`] for more details on inbound flow control. |
132 | | /// |
133 | | /// [`client::ResponseFuture`]: client/struct.ResponseFuture.html |
134 | | /// [`server::Connection`]: server/struct.Connection.html |
135 | | /// [`FlowControl`]: struct.FlowControl.html |
136 | | /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html |
137 | | #[must_use = "streams do nothing unless polled"] |
138 | | pub struct RecvStream { |
139 | | inner: FlowControl, |
140 | | } |
141 | | |
142 | | /// A handle to release window capacity to a remote stream. |
143 | | /// |
144 | | /// This type allows the caller to manage inbound data [flow control]. The |
145 | | /// caller is expected to call [`release_capacity`] after dropping data frames. |
146 | | /// |
147 | | /// # Overview |
148 | | /// |
149 | | /// Each stream has a window size. This window size is the maximum amount of |
150 | | /// inbound data that can be in-flight. In-flight data is defined as data that |
151 | | /// has been received, but not yet released. |
152 | | /// |
153 | | /// When a stream is created, the window size is set to the connection's initial |
154 | | /// window size value. When a data frame is received, the window size is then |
155 | | /// decremented by size of the data frame before the data is provided to the |
156 | | /// caller. As the caller finishes using the data, [`release_capacity`] must be |
157 | | /// called. This will then increment the window size again, allowing the peer to |
158 | | /// send more data. |
159 | | /// |
160 | | /// There is also a connection level window as well as the stream level window. |
161 | | /// Received data counts against the connection level window as well and calls |
162 | | /// to [`release_capacity`] will also increment the connection level window. |
163 | | /// |
164 | | /// # Sending `WINDOW_UPDATE` frames |
165 | | /// |
166 | | /// `WINDOW_UPDATE` frames will not be sent out for **every** call to |
167 | | /// `release_capacity`, as this would end up slowing down the protocol. Instead, |
168 | | /// `h2` waits until the window size is increased to a certain threshold and |
169 | | /// then sends out a single `WINDOW_UPDATE` frame representing all the calls to |
170 | | /// `release_capacity` since the last `WINDOW_UPDATE` frame. |
171 | | /// |
172 | | /// This essentially batches window updating. |
173 | | /// |
174 | | /// # Scenarios |
175 | | /// |
176 | | /// Following is a basic scenario with an HTTP/2 connection containing a |
177 | | /// single active stream. |
178 | | /// |
179 | | /// * A new stream is activated. The receive window is initialized to 1024 (the |
180 | | /// value of the initial window size for this connection). |
181 | | /// * A `DATA` frame is received containing a payload of 600 bytes. |
182 | | /// * The receive window size is reduced to 424 bytes. |
183 | | /// * [`release_capacity`] is called with 200. |
184 | | /// * The receive window size is now 624 bytes. The peer may send no more than |
185 | | /// this. |
186 | | /// * A `DATA` frame is received with a payload of 624 bytes. |
187 | | /// * The window size is now 0 bytes. The peer may not send any more data. |
188 | | /// * [`release_capacity`] is called with 1024. |
189 | | /// * The receive window size is now 1024 bytes. The peer may now send more |
190 | | /// data. |
191 | | /// |
192 | | /// [flow control]: ../index.html#flow-control |
193 | | /// [`release_capacity`]: struct.FlowControl.html#method.release_capacity |
194 | | #[derive(Clone, Debug)] |
195 | | pub struct FlowControl { |
196 | | inner: proto::OpaqueStreamRef, |
197 | | } |
198 | | |
199 | | /// A handle to send and receive PING frames with the peer. |
200 | | // NOT Clone on purpose |
201 | | pub struct PingPong { |
202 | | inner: proto::UserPings, |
203 | | } |
204 | | |
205 | | /// Sent via [`PingPong`][] to send a PING frame to a peer. |
206 | | /// |
207 | | /// [`PingPong`]: struct.PingPong.html |
208 | | pub struct Ping { |
209 | | _p: (), |
210 | | } |
211 | | |
212 | | /// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][]. |
213 | | /// |
214 | | /// [`PingPong`]: struct.PingPong.html |
215 | | /// [`Ping`]: struct.Ping.html |
216 | | pub struct Pong { |
217 | | _p: (), |
218 | | } |
219 | | |
220 | | // ===== impl SendStream ===== |
221 | | |
222 | | impl<B: Buf> SendStream<B> { |
223 | 426k | pub(crate) fn new(inner: proto::StreamRef<B>) -> Self { |
224 | 426k | SendStream { inner } |
225 | 426k | } Unexecuted instantiation: <h2::share::SendStream<_>>::new <h2::share::SendStream<bytes::bytes::Bytes>>::new Line | Count | Source | 223 | 326 | pub(crate) fn new(inner: proto::StreamRef<B>) -> Self { | 224 | 326 | SendStream { inner } | 225 | 326 | } |
<h2::share::SendStream<bytes::bytes::Bytes>>::new Line | Count | Source | 223 | 425k | pub(crate) fn new(inner: proto::StreamRef<B>) -> Self { | 224 | 425k | SendStream { inner } | 225 | 425k | } |
|
226 | | |
227 | | /// Requests capacity to send data. |
228 | | /// |
229 | | /// This function is used to express intent to send data. This requests |
230 | | /// connection level capacity. Once the capacity is available, it is |
231 | | /// assigned to the stream and not reused by other streams. |
232 | | /// |
233 | | /// This function may be called repeatedly. The `capacity` argument is the |
234 | | /// **total** amount of requested capacity. Sequential calls to |
235 | | /// `reserve_capacity` are *not* additive. Given the following: |
236 | | /// |
237 | | /// ```rust |
238 | | /// # use h2::*; |
239 | | /// # fn doc(mut send_stream: SendStream<&'static [u8]>) { |
240 | | /// send_stream.reserve_capacity(100); |
241 | | /// send_stream.reserve_capacity(200); |
242 | | /// # } |
243 | | /// ``` |
244 | | /// |
245 | | /// After the second call to `reserve_capacity`, the *total* requested |
246 | | /// capacity will be 200. |
247 | | /// |
248 | | /// `reserve_capacity` is also used to cancel previous capacity requests. |
249 | | /// Given the following: |
250 | | /// |
251 | | /// ```rust |
252 | | /// # use h2::*; |
253 | | /// # fn doc(mut send_stream: SendStream<&'static [u8]>) { |
254 | | /// send_stream.reserve_capacity(100); |
255 | | /// send_stream.reserve_capacity(0); |
256 | | /// # } |
257 | | /// ``` |
258 | | /// |
259 | | /// After the second call to `reserve_capacity`, the *total* requested |
260 | | /// capacity will be 0, i.e. there is no requested capacity for the stream. |
261 | | /// |
262 | | /// If `reserve_capacity` is called with a lower value than the amount of |
263 | | /// capacity **currently** assigned to the stream, this capacity will be |
264 | | /// returned to the connection to be re-assigned to other streams. |
265 | | /// |
266 | | /// Also, the amount of capacity that is reserved gets decremented as data |
267 | | /// is sent. For example: |
268 | | /// |
269 | | /// ```rust |
270 | | /// # use h2::*; |
271 | | /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { |
272 | | /// send_stream.reserve_capacity(100); |
273 | | /// |
274 | | /// send_stream.send_data(b"hello", false).unwrap(); |
275 | | /// // At this point, the total amount of requested capacity is 95 bytes. |
276 | | /// |
277 | | /// // Calling `reserve_capacity` with `100` again essentially requests an |
278 | | /// // additional 5 bytes. |
279 | | /// send_stream.reserve_capacity(100); |
280 | | /// # } |
281 | | /// ``` |
282 | | /// |
283 | | /// See [Flow control](struct.SendStream.html#flow-control) for an overview |
284 | | /// of how send flow control works. |
285 | 0 | pub fn reserve_capacity(&mut self, capacity: usize) { |
286 | 0 | // TODO: Check for overflow |
287 | 0 | self.inner.reserve_capacity(capacity as WindowSize) |
288 | 0 | } |
289 | | |
290 | | /// Returns the stream's current send capacity. |
291 | | /// |
292 | | /// This allows the caller to check the current amount of available capacity |
293 | | /// before sending data. |
294 | 0 | pub fn capacity(&self) -> usize { |
295 | 0 | self.inner.capacity() as usize |
296 | 0 | } Unexecuted instantiation: <h2::share::SendStream<_>>::capacity Unexecuted instantiation: <h2::share::SendStream<bytes::bytes::Bytes>>::capacity |
297 | | |
298 | | /// Requests to be notified when the stream's capacity increases. |
299 | | /// |
300 | | /// Before calling this, capacity should be requested with |
301 | | /// `reserve_capacity`. Once capacity is requested, the connection will |
302 | | /// assign capacity to the stream **as it becomes available**. There is no |
303 | | /// guarantee as to when and in what increments capacity gets assigned to |
304 | | /// the stream. |
305 | | /// |
306 | | /// To get notified when the available capacity increases, the caller calls |
307 | | /// `poll_capacity`, which returns `Ready(Some(n))` when `n` has been |
308 | | /// increased by the connection. Note that `n` here represents the **total** |
309 | | /// amount of assigned capacity at that point in time. It is also possible |
310 | | /// that `n` is lower than the previous call if, since then, the caller has |
311 | | /// sent data. |
312 | 0 | pub fn poll_capacity(&mut self, cx: &mut Context) -> Poll<Option<Result<usize, crate::Error>>> { |
313 | 0 | self.inner |
314 | 0 | .poll_capacity(cx) |
315 | 0 | .map_ok(|w| w as usize) Unexecuted instantiation: <h2::share::SendStream<_>>::poll_capacity::{closure#0} Unexecuted instantiation: <h2::share::SendStream<bytes::bytes::Bytes>>::poll_capacity::{closure#0} |
316 | 0 | .map_err(Into::into) |
317 | 0 | } Unexecuted instantiation: <h2::share::SendStream<_>>::poll_capacity Unexecuted instantiation: <h2::share::SendStream<bytes::bytes::Bytes>>::poll_capacity |
318 | | |
319 | | /// Sends a single data frame to the remote peer. |
320 | | /// |
321 | | /// This function may be called repeatedly as long as `end_of_stream` is set |
322 | | /// to `false`. Setting `end_of_stream` to `true` sets the end stream flag |
323 | | /// on the data frame. Any further calls to `send_data` or `send_trailers` |
324 | | /// will return an [`Error`]. |
325 | | /// |
326 | | /// `send_data` can be called without reserving capacity. In this case, the |
327 | | /// data is buffered and the capacity is implicitly requested. Once the |
328 | | /// capacity becomes available, the data is flushed to the connection. |
329 | | /// However, this buffering is unbounded. As such, sending large amounts of |
330 | | /// data without reserving capacity before hand could result in large |
331 | | /// amounts of data being buffered in memory. |
332 | | /// |
333 | | /// [`Error`]: struct.Error.html |
334 | 425k | pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::Error> { |
335 | 425k | self.inner |
336 | 425k | .send_data(data, end_of_stream) |
337 | 425k | .map_err(Into::into) |
338 | 425k | } Unexecuted instantiation: <h2::share::SendStream<_>>::send_data <h2::share::SendStream<bytes::bytes::Bytes>>::send_data Line | Count | Source | 334 | 425k | pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::Error> { | 335 | 425k | self.inner | 336 | 425k | .send_data(data, end_of_stream) | 337 | 425k | .map_err(Into::into) | 338 | 425k | } |
|
339 | | |
340 | | /// Sends trailers to the remote peer. |
341 | | /// |
342 | | /// Sending trailers implicitly closes the send stream. Once the send stream |
343 | | /// is closed, no more data can be sent. |
344 | 0 | pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), crate::Error> { |
345 | 0 | self.inner.send_trailers(trailers).map_err(Into::into) |
346 | 0 | } |
347 | | |
348 | | /// Resets the stream. |
349 | | /// |
350 | | /// This cancels the request / response exchange. If the response has not |
351 | | /// yet been received, the associated `ResponseFuture` will return an |
352 | | /// [`Error`] to reflect the canceled exchange. |
353 | | /// |
354 | | /// [`Error`]: struct.Error.html |
355 | 0 | pub fn send_reset(&mut self, reason: Reason) { |
356 | 0 | self.inner.send_reset(reason) |
357 | 0 | } |
358 | | |
359 | | /// Polls to be notified when the client resets this stream. |
360 | | /// |
361 | | /// If stream is still open, this returns `Poll::Pending`, and |
362 | | /// registers the task to be notified if a `RST_STREAM` is received. |
363 | | /// |
364 | | /// If a `RST_STREAM` frame is received for this stream, calling this |
365 | | /// method will yield the `Reason` for the reset. |
366 | | /// |
367 | | /// # Error |
368 | | /// |
369 | | /// If connection sees an error, this returns that error instead of a |
370 | | /// `Reason`. |
371 | 0 | pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { |
372 | 0 | self.inner.poll_reset(cx, proto::PollReset::Streaming) |
373 | 0 | } |
374 | | |
375 | | /// Returns the stream ID of this `SendStream`. |
376 | | /// |
377 | | /// # Panics |
378 | | /// |
379 | | /// If the lock on the stream store has been poisoned. |
380 | 0 | pub fn stream_id(&self) -> StreamId { |
381 | 0 | StreamId::from_internal(self.inner.stream_id()) |
382 | 0 | } |
383 | | } |
384 | | |
385 | | // ===== impl StreamId ===== |
386 | | |
387 | | impl StreamId { |
388 | 0 | pub(crate) fn from_internal(id: crate::frame::StreamId) -> Self { |
389 | 0 | StreamId(id.into()) |
390 | 0 | } |
391 | | |
392 | | /// Returns the `u32` corresponding to this `StreamId` |
393 | | /// |
394 | | /// # Note |
395 | | /// |
396 | | /// This is the same as the `From<StreamId>` implementation, but |
397 | | /// included as an inherent method because that implementation doesn't |
398 | | /// appear in rustdocs, as well as a way to force the type instead of |
399 | | /// relying on inference. |
400 | 0 | pub fn as_u32(&self) -> u32 { |
401 | 0 | (*self).into() |
402 | 0 | } |
403 | | } |
404 | | // ===== impl RecvStream ===== |
405 | | |
406 | | impl RecvStream { |
407 | 745 | pub(crate) fn new(inner: FlowControl) -> Self { |
408 | 745 | RecvStream { inner } |
409 | 745 | } |
410 | | |
411 | | /// Get the next data frame. |
412 | 0 | pub async fn data(&mut self) -> Option<Result<Bytes, crate::Error>> { |
413 | 0 | crate::poll_fn(move |cx| self.poll_data(cx)).await |
414 | 0 | } |
415 | | |
416 | | /// Get optional trailers for this stream. |
417 | 0 | pub async fn trailers(&mut self) -> Result<Option<HeaderMap>, crate::Error> { |
418 | 0 | crate::poll_fn(move |cx| self.poll_trailers(cx)).await |
419 | 0 | } |
420 | | |
421 | | /// Poll for the next data frame. |
422 | 0 | pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>> { |
423 | 0 | self.inner.inner.poll_data(cx).map_err(Into::into) |
424 | 0 | } |
425 | | |
426 | | #[doc(hidden)] |
427 | 0 | pub fn poll_trailers( |
428 | 0 | &mut self, |
429 | 0 | cx: &mut Context, |
430 | 0 | ) -> Poll<Result<Option<HeaderMap>, crate::Error>> { |
431 | 0 | match ready!(self.inner.inner.poll_trailers(cx)) { |
432 | 0 | Some(Ok(map)) => Poll::Ready(Ok(Some(map))), |
433 | 0 | Some(Err(e)) => Poll::Ready(Err(e.into())), |
434 | 0 | None => Poll::Ready(Ok(None)), |
435 | | } |
436 | 0 | } |
437 | | |
438 | | /// Returns true if the receive half has reached the end of stream. |
439 | | /// |
440 | | /// A return value of `true` means that calls to `poll` and `poll_trailers` |
441 | | /// will both return `None`. |
442 | 0 | pub fn is_end_stream(&self) -> bool { |
443 | 0 | self.inner.inner.is_end_stream() |
444 | 0 | } |
445 | | |
446 | | /// Get a mutable reference to this stream's `FlowControl`. |
447 | | /// |
448 | | /// It can be used immediately, or cloned to be used later. |
449 | 0 | pub fn flow_control(&mut self) -> &mut FlowControl { |
450 | 0 | &mut self.inner |
451 | 0 | } |
452 | | |
453 | | /// Returns the stream ID of this stream. |
454 | | /// |
455 | | /// # Panics |
456 | | /// |
457 | | /// If the lock on the stream store has been poisoned. |
458 | 0 | pub fn stream_id(&self) -> StreamId { |
459 | 0 | self.inner.stream_id() |
460 | 0 | } |
461 | | } |
462 | | |
463 | | #[cfg(feature = "stream")] |
464 | | impl futures_core::Stream for RecvStream { |
465 | | type Item = Result<Bytes, crate::Error>; |
466 | | |
467 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
468 | 0 | self.poll_data(cx) |
469 | 0 | } |
470 | | } |
471 | | |
472 | | impl fmt::Debug for RecvStream { |
473 | 745 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
474 | 745 | fmt.debug_struct("RecvStream") |
475 | 745 | .field("inner", &self.inner) |
476 | 745 | .finish() |
477 | 745 | } |
478 | | } |
479 | | |
480 | | impl Drop for RecvStream { |
481 | 745 | fn drop(&mut self) { |
482 | 745 | // Eagerly clear any received DATA frames now, since its no longer |
483 | 745 | // possible to retrieve them. However, this will be called |
484 | 745 | // again once *all* stream refs have been dropped, since |
485 | 745 | // this won't send a RST_STREAM frame, in case the user wishes to |
486 | 745 | // still *send* DATA. |
487 | 745 | self.inner.inner.clear_recv_buffer(); |
488 | 745 | } |
489 | | } |
490 | | |
491 | | // ===== impl FlowControl ===== |
492 | | |
493 | | impl FlowControl { |
494 | 745 | pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self { |
495 | 745 | FlowControl { inner } |
496 | 745 | } |
497 | | |
498 | | /// Returns the stream ID of the stream whose capacity will |
499 | | /// be released by this `FlowControl`. |
500 | 0 | pub fn stream_id(&self) -> StreamId { |
501 | 0 | StreamId::from_internal(self.inner.stream_id()) |
502 | 0 | } |
503 | | |
504 | | /// Get the current available capacity of data this stream *could* receive. |
505 | 0 | pub fn available_capacity(&self) -> isize { |
506 | 0 | self.inner.available_recv_capacity() |
507 | 0 | } |
508 | | |
509 | | /// Get the currently *used* capacity for this stream. |
510 | | /// |
511 | | /// This is the amount of bytes that can be released back to the remote. |
512 | 0 | pub fn used_capacity(&self) -> usize { |
513 | 0 | self.inner.used_recv_capacity() as usize |
514 | 0 | } |
515 | | |
516 | | /// Release window capacity back to remote stream. |
517 | | /// |
518 | | /// This releases capacity back to the stream level and the connection level |
519 | | /// windows. Both window sizes will be increased by `sz`. |
520 | | /// |
521 | | /// See [struct level] documentation for more details. |
522 | | /// |
523 | | /// # Errors |
524 | | /// |
525 | | /// This function errors if increasing the receive window size by `sz` would |
526 | | /// result in a window size greater than the target window size. In other |
527 | | /// words, the caller cannot release more capacity than data has been |
528 | | /// received. If 1024 bytes of data have been received, at most 1024 bytes |
529 | | /// can be released. |
530 | | /// |
531 | | /// [struct level]: # |
532 | 0 | pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::Error> { |
533 | 0 | if sz > proto::MAX_WINDOW_SIZE as usize { |
534 | 0 | return Err(UserError::ReleaseCapacityTooBig.into()); |
535 | 0 | } |
536 | 0 | self.inner |
537 | 0 | .release_capacity(sz as proto::WindowSize) |
538 | 0 | .map_err(Into::into) |
539 | 0 | } |
540 | | } |
541 | | |
542 | | // ===== impl PingPong ===== |
543 | | |
544 | | impl PingPong { |
545 | 0 | pub(crate) fn new(inner: proto::UserPings) -> Self { |
546 | 0 | PingPong { inner } |
547 | 0 | } |
548 | | |
549 | | /// Send a PING frame and wait for the peer to send the pong. |
550 | 0 | pub async fn ping(&mut self, ping: Ping) -> Result<Pong, crate::Error> { |
551 | 0 | self.send_ping(ping)?; |
552 | 0 | crate::poll_fn(|cx| self.poll_pong(cx)).await |
553 | 0 | } |
554 | | |
555 | | #[doc(hidden)] |
556 | 0 | pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::Error> { |
557 | 0 | // Passing a `Ping` here is just to be forwards-compatible with |
558 | 0 | // eventually allowing choosing a ping payload. For now, we can |
559 | 0 | // just ignore it. |
560 | 0 | let _ = ping; |
561 | 0 |
|
562 | 0 | self.inner.send_ping().map_err(|err| match err { |
563 | 0 | Some(err) => err.into(), |
564 | 0 | None => UserError::SendPingWhilePending.into(), |
565 | 0 | }) |
566 | 0 | } |
567 | | |
568 | | #[doc(hidden)] |
569 | 0 | pub fn poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::Error>> { |
570 | 0 | ready!(self.inner.poll_pong(cx))?; |
571 | 0 | Poll::Ready(Ok(Pong { _p: () })) |
572 | 0 | } |
573 | | } |
574 | | |
575 | | impl fmt::Debug for PingPong { |
576 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
577 | 0 | fmt.debug_struct("PingPong").finish() |
578 | 0 | } |
579 | | } |
580 | | |
581 | | // ===== impl Ping ===== |
582 | | |
583 | | impl Ping { |
584 | | /// Creates a new opaque `Ping` to be sent via a [`PingPong`][]. |
585 | | /// |
586 | | /// The payload is "opaque", such that it shouldn't be depended on. |
587 | | /// |
588 | | /// [`PingPong`]: struct.PingPong.html |
589 | 0 | pub fn opaque() -> Ping { |
590 | 0 | Ping { _p: () } |
591 | 0 | } |
592 | | } |
593 | | |
594 | | impl fmt::Debug for Ping { |
595 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
596 | 0 | fmt.debug_struct("Ping").finish() |
597 | 0 | } |
598 | | } |
599 | | |
600 | | // ===== impl Pong ===== |
601 | | |
602 | | impl fmt::Debug for Pong { |
603 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
604 | 0 | fmt.debug_struct("Pong").finish() |
605 | 0 | } |
606 | | } |