Coverage Report

Created: 2026-01-17 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/streams.rs
Line
Count
Source
1
use super::recv::RecvHeaderBlockError;
2
use super::store::{self, Entry, Resolve, Store};
3
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4
use crate::codec::{Codec, SendError, UserError};
5
use crate::ext::Protocol;
6
use crate::frame::{self, Frame, Reason};
7
use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8
use crate::{client, proto, server};
9
10
use bytes::{Buf, Bytes};
11
use http::{HeaderMap, Request, Response};
12
use std::task::{Context, Poll, Waker};
13
use tokio::io::AsyncWrite;
14
15
use std::sync::{Arc, Mutex};
16
use std::{fmt, io};
17
18
#[derive(Debug)]
19
pub(crate) struct Streams<B, P>
20
where
21
    P: Peer,
22
{
23
    /// Holds most of the connection and stream related state for processing
24
    /// HTTP/2 frames associated with streams.
25
    inner: Arc<Mutex<Inner>>,
26
27
    /// This is the queue of frames to be written to the wire. This is split out
28
    /// to avoid requiring a `B` generic on all public API types even if `B` is
29
    /// not technically required.
30
    ///
31
    /// Currently, splitting this out requires a second `Arc` + `Mutex`.
32
    /// However, it should be possible to avoid this duplication with a little
33
    /// bit of unsafe code. This optimization has been postponed until it has
34
    /// been shown to be necessary.
35
    send_buffer: Arc<SendBuffer<B>>,
36
37
    _p: ::std::marker::PhantomData<P>,
38
}
39
40
// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
41
// Ensures that the methods only get one instantiation, instead of two (client and server)
42
#[derive(Debug)]
43
pub(crate) struct DynStreams<'a, B> {
44
    inner: &'a Mutex<Inner>,
45
46
    send_buffer: &'a SendBuffer<B>,
47
48
    peer: peer::Dyn,
49
}
50
51
/// Reference to the stream state
52
#[derive(Debug)]
53
pub(crate) struct StreamRef<B> {
54
    opaque: OpaqueStreamRef,
55
    send_buffer: Arc<SendBuffer<B>>,
56
}
57
58
/// Reference to the stream state that hides the send data chunk generic
59
pub(crate) struct OpaqueStreamRef {
60
    inner: Arc<Mutex<Inner>>,
61
    key: store::Key,
62
}
63
64
/// Fields needed to manage state related to managing the set of streams. This
65
/// is mostly split out to make ownership happy.
66
///
67
/// TODO: better name
68
#[derive(Debug)]
69
struct Inner {
70
    /// Tracks send & recv stream concurrency.
71
    counts: Counts,
72
73
    /// Connection level state and performs actions on streams
74
    actions: Actions,
75
76
    /// Stores stream state
77
    store: Store,
78
79
    /// The number of stream refs to this shared state.
80
    refs: usize,
81
}
82
83
#[derive(Debug)]
84
struct Actions {
85
    /// Manages state transitions initiated by receiving frames
86
    recv: Recv,
87
88
    /// Manages state transitions initiated by sending frames
89
    send: Send,
90
91
    /// Task that calls `poll_complete`.
92
    task: Option<Waker>,
93
94
    /// If the connection errors, a copy is kept for any StreamRefs.
95
    conn_error: Option<proto::Error>,
96
}
97
98
/// Contains the buffer of frames to be written to the wire.
99
#[derive(Debug)]
100
struct SendBuffer<B> {
101
    inner: Mutex<Buffer<Frame<B>>>,
102
}
103
104
// ===== impl Streams =====
105
106
impl<B, P> Streams<B, P>
107
where
108
    B: Buf,
109
    P: Peer,
110
{
111
13.3k
    pub fn new(config: Config) -> Self {
112
13.3k
        let peer = P::r#dyn();
113
114
13.3k
        Streams {
115
13.3k
            inner: Inner::new(peer, config),
116
13.3k
            send_buffer: Arc::new(SendBuffer::new()),
117
13.3k
            _p: ::std::marker::PhantomData,
118
13.3k
        }
119
13.3k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::new
Line
Count
Source
111
695
    pub fn new(config: Config) -> Self {
112
695
        let peer = P::r#dyn();
113
114
695
        Streams {
115
695
            inner: Inner::new(peer, config),
116
695
            send_buffer: Arc::new(SendBuffer::new()),
117
695
            _p: ::std::marker::PhantomData,
118
695
        }
119
695
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::new
Line
Count
Source
111
12.6k
    pub fn new(config: Config) -> Self {
112
12.6k
        let peer = P::r#dyn();
113
114
12.6k
        Streams {
115
12.6k
            inner: Inner::new(peer, config),
116
12.6k
            send_buffer: Arc::new(SendBuffer::new()),
117
12.6k
            _p: ::std::marker::PhantomData,
118
12.6k
        }
119
12.6k
    }
120
121
0
    pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
122
0
        let mut me = self.inner.lock().unwrap();
123
0
        let me = &mut *me;
124
125
0
        me.actions
126
0
            .recv
127
0
            .set_target_connection_window(size, &mut me.actions.task)
128
0
    }
Unexecuted instantiation: <h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::set_target_connection_window_size
Unexecuted instantiation: <h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::set_target_connection_window_size
129
130
    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131
        let mut me = self.inner.lock().unwrap();
132
        let me = &mut *me;
133
        me.actions.recv.next_incoming(&mut me.store).map(|key| {
134
            let stream = &mut me.store.resolve(key);
135
            tracing::trace!(
136
                "next_incoming; id={:?}, state={:?}",
137
                stream.id,
138
                stream.state
139
            );
140
            // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141
            // the lock, so it can't.
142
            me.refs += 1;
143
144
            // Pending-accepted remotely-reset streams are counted.
145
            if stream.state.is_remote_reset() {
146
                me.counts.dec_num_remote_reset_streams();
147
            }
148
149
            StreamRef {
150
                opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
151
                send_buffer: self.send_buffer.clone(),
152
            }
153
        })
154
    }
155
156
1.12M
    pub fn send_pending_refusal<T>(
157
1.12M
        &mut self,
158
1.12M
        cx: &mut Context,
159
1.12M
        dst: &mut Codec<T, Prioritized<B>>,
160
1.12M
    ) -> Poll<io::Result<()>>
161
1.12M
    where
162
1.12M
        T: AsyncWrite + Unpin,
163
    {
164
1.12M
        let mut me = self.inner.lock().unwrap();
165
1.12M
        let me = &mut *me;
166
1.12M
        me.actions.recv.send_pending_refusal(cx, dst)
167
1.12M
    }
168
169
1.09M
    pub fn clear_expired_reset_streams(&mut self) {
170
1.09M
        let mut me = self.inner.lock().unwrap();
171
1.09M
        let me = &mut *me;
172
1.09M
        me.actions
173
1.09M
            .recv
174
1.09M
            .clear_expired_reset_streams(&mut me.store, &mut me.counts);
175
1.09M
    }
176
177
1.00M
    pub fn poll_complete<T>(
178
1.00M
        &mut self,
179
1.00M
        cx: &mut Context,
180
1.00M
        dst: &mut Codec<T, Prioritized<B>>,
181
1.00M
    ) -> Poll<io::Result<()>>
182
1.00M
    where
183
1.00M
        T: AsyncWrite + Unpin,
184
    {
185
1.00M
        let mut me = self.inner.lock().unwrap();
186
1.00M
        me.poll_complete(&self.send_buffer, cx, dst)
187
1.00M
    }
188
189
6.71k
    pub fn apply_remote_settings(
190
6.71k
        &mut self,
191
6.71k
        frame: &frame::Settings,
192
6.71k
        is_initial: bool,
193
6.71k
    ) -> Result<(), Error> {
194
6.71k
        let mut me = self.inner.lock().unwrap();
195
6.71k
        let me = &mut *me;
196
197
6.71k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
198
6.71k
        let send_buffer = &mut *send_buffer;
199
200
6.71k
        me.counts.apply_remote_settings(frame, is_initial);
201
202
6.71k
        me.actions.send.apply_remote_settings(
203
6.71k
            frame,
204
6.71k
            send_buffer,
205
6.71k
            &mut me.store,
206
6.71k
            &mut me.counts,
207
6.71k
            &mut me.actions.task,
208
        )
209
6.71k
    }
210
211
11
    pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
212
11
        let mut me = self.inner.lock().unwrap();
213
11
        let me = &mut *me;
214
215
11
        me.actions.recv.apply_local_settings(frame, &mut me.store)
216
11
    }
217
218
419k
    pub fn send_request(
219
419k
        &mut self,
220
419k
        mut request: Request<()>,
221
419k
        end_of_stream: bool,
222
419k
        pending: Option<&OpaqueStreamRef>,
223
419k
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
419k
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
419k
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
419k
        let mut me = self.inner.lock().unwrap();
238
419k
        let me = &mut *me;
239
240
419k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
419k
        let send_buffer = &mut *send_buffer;
242
243
419k
        me.actions.ensure_no_conn_error()?;
244
419k
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
419k
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
419k
        }
257
258
419k
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
419k
        }
262
263
419k
        let stream_id = me.actions.send.open()?;
264
265
419k
        let mut stream = Stream::new(
266
419k
            stream_id,
267
419k
            me.actions.send.init_window_sz(),
268
419k
            me.actions.recv.init_window_sz(),
269
        );
270
271
419k
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
419k
        }
274
275
        // Convert the message
276
419k
        let headers =
277
419k
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
419k
        let mut stream = me.store.insert(stream.id, stream);
280
281
419k
        let sent = me.actions.send.send_headers(
282
419k
            headers,
283
419k
            send_buffer,
284
419k
            &mut stream,
285
419k
            &mut me.counts,
286
419k
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
419k
        if let Err(err) = sent {
292
81
            stream.unlink();
293
81
            stream.remove();
294
81
            return Err(err.into());
295
419k
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
419k
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
419k
        me.refs += 1;
304
305
419k
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
419k
        Ok((
307
419k
            StreamRef {
308
419k
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
419k
                send_buffer: self.send_buffer.clone(),
310
419k
            },
311
419k
            is_full,
312
419k
        ))
313
419k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::send_request
Line
Count
Source
218
695
    pub fn send_request(
219
695
        &mut self,
220
695
        mut request: Request<()>,
221
695
        end_of_stream: bool,
222
695
        pending: Option<&OpaqueStreamRef>,
223
695
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
695
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
695
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
695
        let mut me = self.inner.lock().unwrap();
238
695
        let me = &mut *me;
239
240
695
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
695
        let send_buffer = &mut *send_buffer;
242
243
695
        me.actions.ensure_no_conn_error()?;
244
695
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
695
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
695
        }
257
258
695
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
695
        }
262
263
695
        let stream_id = me.actions.send.open()?;
264
265
695
        let mut stream = Stream::new(
266
695
            stream_id,
267
695
            me.actions.send.init_window_sz(),
268
695
            me.actions.recv.init_window_sz(),
269
        );
270
271
695
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
695
        }
274
275
        // Convert the message
276
695
        let headers =
277
695
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
695
        let mut stream = me.store.insert(stream.id, stream);
280
281
695
        let sent = me.actions.send.send_headers(
282
695
            headers,
283
695
            send_buffer,
284
695
            &mut stream,
285
695
            &mut me.counts,
286
695
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
695
        if let Err(err) = sent {
292
81
            stream.unlink();
293
81
            stream.remove();
294
81
            return Err(err.into());
295
614
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
614
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
614
        me.refs += 1;
304
305
614
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
614
        Ok((
307
614
            StreamRef {
308
614
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
614
                send_buffer: self.send_buffer.clone(),
310
614
            },
311
614
            is_full,
312
614
        ))
313
695
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::send_request
Line
Count
Source
218
418k
    pub fn send_request(
219
418k
        &mut self,
220
418k
        mut request: Request<()>,
221
418k
        end_of_stream: bool,
222
418k
        pending: Option<&OpaqueStreamRef>,
223
418k
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
418k
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
418k
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
418k
        let mut me = self.inner.lock().unwrap();
238
418k
        let me = &mut *me;
239
240
418k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
418k
        let send_buffer = &mut *send_buffer;
242
243
418k
        me.actions.ensure_no_conn_error()?;
244
418k
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
418k
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
418k
        }
257
258
418k
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
418k
        }
262
263
418k
        let stream_id = me.actions.send.open()?;
264
265
418k
        let mut stream = Stream::new(
266
418k
            stream_id,
267
418k
            me.actions.send.init_window_sz(),
268
418k
            me.actions.recv.init_window_sz(),
269
        );
270
271
418k
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
418k
        }
274
275
        // Convert the message
276
418k
        let headers =
277
418k
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
418k
        let mut stream = me.store.insert(stream.id, stream);
280
281
418k
        let sent = me.actions.send.send_headers(
282
418k
            headers,
283
418k
            send_buffer,
284
418k
            &mut stream,
285
418k
            &mut me.counts,
286
418k
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
418k
        if let Err(err) = sent {
292
0
            stream.unlink();
293
0
            stream.remove();
294
0
            return Err(err.into());
295
418k
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
418k
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
418k
        me.refs += 1;
304
305
418k
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
418k
        Ok((
307
418k
            StreamRef {
308
418k
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
418k
                send_buffer: self.send_buffer.clone(),
310
418k
            },
311
418k
            is_full,
312
418k
        ))
313
418k
    }
314
315
    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
316
        self.inner
317
            .lock()
318
            .unwrap()
319
            .actions
320
            .send
321
            .is_extended_connect_protocol_enabled()
322
    }
323
324
    pub fn current_max_send_streams(&self) -> usize {
325
        let me = self.inner.lock().unwrap();
326
        me.counts.max_send_streams()
327
    }
328
329
    pub fn current_max_recv_streams(&self) -> usize {
330
        let me = self.inner.lock().unwrap();
331
        me.counts.max_recv_streams()
332
    }
333
}
334
335
impl<B> DynStreams<'_, B> {
336
1.06k
    pub fn is_buffer_empty(&self) -> bool {
337
1.06k
        self.send_buffer.is_empty()
338
1.06k
    }
339
340
0
    pub fn is_server(&self) -> bool {
341
0
        self.peer.is_server()
342
0
    }
343
344
8.29k
    pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
345
8.29k
        let mut me = self.inner.lock().unwrap();
346
347
8.29k
        me.recv_headers(self.peer, self.send_buffer, frame)
348
8.29k
    }
349
350
60.2k
    pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
351
60.2k
        let mut me = self.inner.lock().unwrap();
352
60.2k
        me.recv_data(self.peer, self.send_buffer, frame)
353
60.2k
    }
354
355
1.34k
    pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
356
1.34k
        let mut me = self.inner.lock().unwrap();
357
358
1.34k
        me.recv_reset(self.send_buffer, frame)
359
1.34k
    }
360
361
    /// Notify all streams that a connection-level error happened.
362
7.32k
    pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
363
7.32k
        let mut me = self.inner.lock().unwrap();
364
7.32k
        me.handle_error(self.send_buffer, err)
365
7.32k
    }
366
367
4.98k
    pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
368
4.98k
        let mut me = self.inner.lock().unwrap();
369
4.98k
        me.recv_go_away(self.send_buffer, frame)
370
4.98k
    }
371
372
6.29k
    pub fn last_processed_id(&self) -> StreamId {
373
6.29k
        self.inner.lock().unwrap().actions.recv.last_processed_id()
374
6.29k
    }
375
376
1.35k
    pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
377
1.35k
        let mut me = self.inner.lock().unwrap();
378
1.35k
        me.recv_window_update(self.send_buffer, frame)
379
1.35k
    }
380
381
1.23k
    pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
382
1.23k
        let mut me = self.inner.lock().unwrap();
383
1.23k
        me.recv_push_promise(self.send_buffer, frame)
384
1.23k
    }
385
386
18.1k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
18.1k
        let mut me = self.inner.lock().map_err(|_| ())?;
388
18.1k
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
18.1k
    }
<h2::proto::streams::streams::DynStreams<bytes::bytes::Bytes>>::recv_eof
Line
Count
Source
386
695
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
695
        let mut me = self.inner.lock().map_err(|_| ())?;
388
695
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
695
    }
<h2::proto::streams::streams::DynStreams<bytes::bytes::Bytes>>::recv_eof
Line
Count
Source
386
17.4k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
17.4k
        let mut me = self.inner.lock().map_err(|_| ())?;
388
17.4k
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
17.4k
    }
390
391
76.6k
    pub fn send_reset(
392
76.6k
        &mut self,
393
76.6k
        id: StreamId,
394
76.6k
        reason: Reason,
395
76.6k
    ) -> Result<(), crate::proto::error::GoAway> {
396
76.6k
        let mut me = self.inner.lock().unwrap();
397
76.6k
        me.send_reset(self.send_buffer, id, reason)
398
76.6k
    }
399
400
0
    pub fn send_go_away(&mut self, last_processed_id: StreamId) {
401
0
        let mut me = self.inner.lock().unwrap();
402
0
        me.actions.recv.go_away(last_processed_id);
403
0
    }
404
}
405
406
impl Inner {
407
13.3k
    fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
408
13.3k
        Arc::new(Mutex::new(Inner {
409
13.3k
            counts: Counts::new(peer, &config),
410
13.3k
            actions: Actions {
411
13.3k
                recv: Recv::new(peer, &config),
412
13.3k
                send: Send::new(&config),
413
13.3k
                task: None,
414
13.3k
                conn_error: None,
415
13.3k
            },
416
13.3k
            store: Store::new(),
417
13.3k
            refs: 1,
418
13.3k
        }))
419
13.3k
    }
420
421
8.29k
    fn recv_headers<B>(
422
8.29k
        &mut self,
423
8.29k
        peer: peer::Dyn,
424
8.29k
        send_buffer: &SendBuffer<B>,
425
8.29k
        frame: frame::Headers,
426
8.29k
    ) -> Result<(), Error> {
427
8.29k
        let id = frame.stream_id();
428
429
        // The GOAWAY process has begun. All streams with a greater ID than
430
        // specified as part of GOAWAY should be ignored.
431
8.29k
        if id > self.actions.recv.max_stream_id() {
432
0
            tracing::trace!(
433
0
                "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
434
                id,
435
0
                self.actions.recv.max_stream_id()
436
            );
437
0
            return Ok(());
438
8.29k
        }
439
440
8.29k
        let key = match self.store.find_entry(id) {
441
6.30k
            Entry::Occupied(e) => e.key(),
442
1.99k
            Entry::Vacant(e) => {
443
                // Client: it's possible to send a request, and then send
444
                // a RST_STREAM while the response HEADERS were in transit.
445
                //
446
                // Server: we can't reset a stream before having received
447
                // the request headers, so don't allow.
448
1.99k
                if !peer.is_server() {
449
                    // This may be response headers for a stream we've already
450
                    // forgotten about...
451
1.99k
                    if self.actions.may_have_forgotten_stream(peer, id) {
452
1.65k
                        tracing::debug!(
453
0
                            "recv_headers for old stream={:?}, sending STREAM_CLOSED",
454
                            id,
455
                        );
456
1.65k
                        return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
457
336
                    }
458
0
                }
459
460
336
                match self
461
336
                    .actions
462
336
                    .recv
463
336
                    .open(id, Open::Headers, &mut self.counts)?
464
                {
465
0
                    Some(stream_id) => {
466
0
                        let stream = Stream::new(
467
0
                            stream_id,
468
0
                            self.actions.send.init_window_sz(),
469
0
                            self.actions.recv.init_window_sz(),
470
                        );
471
472
0
                        e.insert(stream)
473
                    }
474
0
                    None => return Ok(()),
475
                }
476
            }
477
        };
478
479
6.30k
        let stream = self.store.resolve(key);
480
481
6.30k
        if stream.state.is_local_error() {
482
            // Locally reset streams must ignore frames "for some time".
483
            // This is because the remote may have sent trailers before
484
            // receiving the RST_STREAM frame.
485
3.62k
            tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
486
3.62k
            return Ok(());
487
2.67k
        }
488
489
2.67k
        let actions = &mut self.actions;
490
2.67k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
491
2.67k
        let send_buffer = &mut *send_buffer;
492
493
2.67k
        self.counts.transition(stream, |counts, stream| {
494
2.67k
            tracing::trace!(
495
0
                "recv_headers; stream={:?}; state={:?}",
496
0
                stream.id,
497
0
                stream.state
498
            );
499
500
2.67k
            let res = if stream.state.is_recv_headers() {
501
1.84k
                match actions.recv.recv_headers(frame, stream, counts) {
502
1.80k
                    Ok(()) => Ok(()),
503
0
                    Err(RecvHeaderBlockError::Oversize(resp)) => {
504
0
                        if let Some(resp) = resp {
505
0
                            let sent = actions.send.send_headers(
506
0
                                resp, send_buffer, stream, counts, &mut actions.task);
507
0
                            debug_assert!(sent.is_ok(), "oversize response should not fail");
508
509
0
                            actions.send.schedule_implicit_reset(
510
0
                                stream,
511
                                Reason::PROTOCOL_ERROR,
512
0
                                counts,
513
0
                                &mut actions.task);
514
515
0
                            actions.recv.enqueue_reset_expiration(stream, counts);
516
517
0
                            Ok(())
518
                        } else {
519
0
                            Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR))
520
                        }
521
                    },
522
38
                    Err(RecvHeaderBlockError::State(err)) => Err(err),
523
                }
524
            } else {
525
829
                if !frame.is_end_stream() {
526
                    // Receiving trailers that don't set EOS is a "malformed"
527
                    // message. Malformed messages are a stream error.
528
782
                    proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
529
782
                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
530
47
                }
531
532
47
                actions.recv.recv_trailers(frame, stream)
533
            };
534
535
1.89k
            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
536
2.67k
        })
537
8.29k
    }
538
539
60.2k
    fn recv_data<B>(
540
60.2k
        &mut self,
541
60.2k
        peer: peer::Dyn,
542
60.2k
        send_buffer: &SendBuffer<B>,
543
60.2k
        frame: frame::Data,
544
60.2k
    ) -> Result<(), Error> {
545
60.2k
        let id = frame.stream_id();
546
547
60.2k
        let stream = match self.store.find_mut(&id) {
548
17.5k
            Some(stream) => stream,
549
            None => {
550
                // The GOAWAY process has begun. All streams with a greater ID
551
                // than specified as part of GOAWAY should be ignored.
552
42.6k
                if id > self.actions.recv.max_stream_id() {
553
0
                    tracing::trace!(
554
0
                        "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
555
                        id,
556
0
                        self.actions.recv.max_stream_id()
557
                    );
558
0
                    return Ok(());
559
42.6k
                }
560
561
42.6k
                if self.actions.may_have_forgotten_stream(peer, id) {
562
42.5k
                    tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
563
564
42.5k
                    let sz = frame.payload().len();
565
                    // This should have been enforced at the codec::FramedRead layer, so
566
                    // this is just a sanity check.
567
42.5k
                    assert!(sz <= super::MAX_WINDOW_SIZE as usize);
568
42.5k
                    let sz = sz as WindowSize;
569
570
42.5k
                    self.actions.recv.ignore_data(sz)?;
571
42.5k
                    return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
572
109
                }
573
574
109
                proto_err!(conn: "recv_data: stream not found; id={:?}", id);
575
109
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
576
            }
577
        };
578
579
17.5k
        let actions = &mut self.actions;
580
17.5k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
581
17.5k
        let send_buffer = &mut *send_buffer;
582
583
17.5k
        self.counts.transition(stream, |counts, stream| {
584
17.5k
            let sz = frame.payload().len();
585
17.5k
            let res = actions.recv.recv_data(frame, stream);
586
587
            // Any stream error after receiving a DATA frame means
588
            // we won't give the data to the user, and so they can't
589
            // release the capacity. We do it automatically.
590
58
            if let Err(Error::Reset(..)) = res {
591
36
                actions
592
36
                    .recv
593
36
                    .release_connection_capacity(sz as WindowSize, &mut None);
594
17.5k
            }
595
17.5k
            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
596
17.5k
        })
597
60.2k
    }
598
599
1.34k
    fn recv_reset<B>(
600
1.34k
        &mut self,
601
1.34k
        send_buffer: &SendBuffer<B>,
602
1.34k
        frame: frame::Reset,
603
1.34k
    ) -> Result<(), Error> {
604
1.34k
        let id = frame.stream_id();
605
606
1.34k
        if id.is_zero() {
607
1
            proto_err!(conn: "recv_reset: invalid stream ID 0");
608
1
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
609
1.34k
        }
610
611
        // The GOAWAY process has begun. All streams with a greater ID than
612
        // specified as part of GOAWAY should be ignored.
613
1.34k
        if id > self.actions.recv.max_stream_id() {
614
0
            tracing::trace!(
615
0
                "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
616
                id,
617
0
                self.actions.recv.max_stream_id()
618
            );
619
0
            return Ok(());
620
1.34k
        }
621
622
1.34k
        let stream = match self.store.find_mut(&id) {
623
663
            Some(stream) => stream,
624
            None => {
625
                // TODO: Are there other error cases?
626
680
                self.actions
627
680
                    .ensure_not_idle(self.counts.peer(), id)
628
680
                    .map_err(Error::library_go_away)?;
629
630
641
                return Ok(());
631
            }
632
        };
633
634
663
        let mut send_buffer = send_buffer.inner.lock().unwrap();
635
663
        let send_buffer = &mut *send_buffer;
636
637
663
        let actions = &mut self.actions;
638
639
663
        self.counts.transition(stream, |counts, stream| {
640
663
            actions.recv.recv_reset(frame, stream, counts)?;
641
663
            actions.send.handle_error(send_buffer, stream, counts);
642
663
            assert!(stream.state.is_closed());
643
663
            Ok(())
644
663
        })
645
1.34k
    }
646
647
1.35k
    fn recv_window_update<B>(
648
1.35k
        &mut self,
649
1.35k
        send_buffer: &SendBuffer<B>,
650
1.35k
        frame: frame::WindowUpdate,
651
1.35k
    ) -> Result<(), Error> {
652
1.35k
        let id = frame.stream_id();
653
654
1.35k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
655
1.35k
        let send_buffer = &mut *send_buffer;
656
657
1.35k
        if id.is_zero() {
658
651
            self.actions
659
651
                .send
660
651
                .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
661
651
                .map_err(Error::library_go_away)?;
662
        } else {
663
            // The remote may send window updates for streams that the local now
664
            // considers closed. It's ok...
665
701
            if let Some(mut stream) = self.store.find_mut(&id) {
666
246
                let res = self
667
246
                    .actions
668
246
                    .send
669
246
                    .recv_stream_window_update(
670
246
                        frame.size_increment(),
671
246
                        send_buffer,
672
246
                        &mut stream,
673
246
                        &mut self.counts,
674
246
                        &mut self.actions.task,
675
                    )
676
246
                    .map_err(|reason| Error::library_reset(id, reason));
677
678
246
                return self.actions.reset_on_recv_stream_err(
679
246
                    send_buffer,
680
246
                    &mut stream,
681
246
                    &mut self.counts,
682
246
                    res,
683
                );
684
            } else {
685
455
                self.actions
686
455
                    .ensure_not_idle(self.counts.peer(), id)
687
455
                    .map_err(Error::library_go_away)?;
688
            }
689
        }
690
691
1.05k
        Ok(())
692
1.35k
    }
693
694
7.32k
    fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
695
7.32k
        let actions = &mut self.actions;
696
7.32k
        let counts = &mut self.counts;
697
7.32k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
698
7.32k
        let send_buffer = &mut *send_buffer;
699
700
7.32k
        let last_processed_id = actions.recv.last_processed_id();
701
702
169k
        self.store.for_each(|stream| {
703
169k
            counts.transition(stream, |counts, stream| {
704
169k
                actions.recv.handle_error(&err, &mut *stream);
705
169k
                actions.send.handle_error(send_buffer, stream, counts);
706
169k
            })
707
169k
        });
708
709
7.32k
        actions.conn_error = Some(err);
710
711
7.32k
        last_processed_id
712
7.32k
    }
713
714
4.98k
    fn recv_go_away<B>(
715
4.98k
        &mut self,
716
4.98k
        send_buffer: &SendBuffer<B>,
717
4.98k
        frame: &frame::GoAway,
718
4.98k
    ) -> Result<(), Error> {
719
4.98k
        let actions = &mut self.actions;
720
4.98k
        let counts = &mut self.counts;
721
4.98k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
722
4.98k
        let send_buffer = &mut *send_buffer;
723
724
4.98k
        let last_stream_id = frame.last_stream_id();
725
726
4.98k
        actions.send.recv_go_away(last_stream_id)?;
727
728
4.95k
        let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
729
730
822k
        self.store.for_each(|stream| {
731
822k
            if stream.id > last_stream_id {
732
234k
                counts.transition(stream, |counts, stream| {
733
234k
                    actions.recv.handle_error(&err, &mut *stream);
734
234k
                    actions.send.handle_error(send_buffer, stream, counts);
735
234k
                })
736
587k
            }
737
822k
        });
738
739
4.95k
        actions.conn_error = Some(err);
740
741
4.95k
        Ok(())
742
4.98k
    }
743
744
1.23k
    fn recv_push_promise<B>(
745
1.23k
        &mut self,
746
1.23k
        send_buffer: &SendBuffer<B>,
747
1.23k
        frame: frame::PushPromise,
748
1.23k
    ) -> Result<(), Error> {
749
1.23k
        let id = frame.stream_id();
750
1.23k
        let promised_id = frame.promised_id();
751
752
        // First, ensure that the initiating stream is still in a valid state.
753
1.23k
        let parent_key = match self.store.find_mut(&id) {
754
1.22k
            Some(stream) => {
755
                // The GOAWAY process has begun. All streams with a greater ID
756
                // than specified as part of GOAWAY should be ignored.
757
1.22k
                if id > self.actions.recv.max_stream_id() {
758
0
                    tracing::trace!(
759
0
                        "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
760
                        id,
761
0
                        self.actions.recv.max_stream_id()
762
                    );
763
0
                    return Ok(());
764
1.22k
                }
765
766
                // The stream must be receive open
767
1.22k
                if !stream.state.ensure_recv_open()? {
768
1
                    proto_err!(conn: "recv_push_promise: initiating stream is not opened");
769
1
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
770
1.10k
                }
771
772
1.10k
                stream.key()
773
            }
774
            None => {
775
14
                proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
776
14
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
777
            }
778
        };
779
780
        // TODO: Streams in the reserved states do not count towards the concurrency
781
        // limit. However, it seems like there should be a cap otherwise this
782
        // could grow in memory indefinitely.
783
784
        // Ensure that we can reserve streams
785
1.10k
        self.actions.recv.ensure_can_reserve()?;
786
787
        // Next, open the stream.
788
        //
789
        // If `None` is returned, then the stream is being refused. There is no
790
        // further work to be done.
791
1.10k
        if self
792
1.10k
            .actions
793
1.10k
            .recv
794
1.10k
            .open(promised_id, Open::PushPromise, &mut self.counts)?
795
1.10k
            .is_none()
796
        {
797
0
            return Ok(());
798
1.10k
        }
799
800
        // Try to handle the frame and create a corresponding key for the pushed stream
801
        // this requires a bit of indirection to make the borrow checker happy.
802
1.10k
        let child_key: Option<store::Key> = {
803
            // Create state for the stream
804
1.10k
            let stream = self.store.insert(promised_id, {
805
1.10k
                Stream::new(
806
1.10k
                    promised_id,
807
1.10k
                    self.actions.send.init_window_sz(),
808
1.10k
                    self.actions.recv.init_window_sz(),
809
                )
810
            });
811
812
1.10k
            let actions = &mut self.actions;
813
814
1.10k
            self.counts.transition(stream, |counts, stream| {
815
1.10k
                let stream_valid = actions.recv.recv_push_promise(frame, stream);
816
817
1.10k
                match stream_valid {
818
189
                    Ok(()) => Ok(Some(stream.key())),
819
                    _ => {
820
912
                        let mut send_buffer = send_buffer.inner.lock().unwrap();
821
912
                        actions
822
912
                            .reset_on_recv_stream_err(
823
912
                                &mut *send_buffer,
824
912
                                stream,
825
912
                                counts,
826
912
                                stream_valid,
827
                            )
828
912
                            .map(|()| None)
829
                    }
830
                }
831
1.10k
            })?
832
        };
833
        // If we're successful, push the headers and stream...
834
1.10k
        if let Some(child) = child_key {
835
189
            let mut ppp = self.store[parent_key].pending_push_promises.take();
836
189
            ppp.push(&mut self.store.resolve(child));
837
189
838
189
            let parent = &mut self.store.resolve(parent_key);
839
189
            parent.pending_push_promises = ppp;
840
189
            parent.notify_push();
841
912
        };
842
843
1.10k
        Ok(())
844
1.23k
    }
845
846
18.1k
    fn recv_eof<B>(
847
18.1k
        &mut self,
848
18.1k
        send_buffer: &SendBuffer<B>,
849
18.1k
        clear_pending_accept: bool,
850
18.1k
    ) -> Result<(), ()> {
851
18.1k
        let actions = &mut self.actions;
852
18.1k
        let counts = &mut self.counts;
853
18.1k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
18.1k
        let send_buffer = &mut *send_buffer;
855
856
18.1k
        if actions.conn_error.is_none() {
857
5.85k
            actions.conn_error = Some(
858
5.85k
                io::Error::new(
859
5.85k
                    io::ErrorKind::BrokenPipe,
860
5.85k
                    "connection closed because of a broken pipe",
861
5.85k
                )
862
5.85k
                .into(),
863
5.85k
            );
864
12.2k
        }
865
866
18.1k
        tracing::trace!("Streams::recv_eof");
867
868
261k
        self.store.for_each(|stream| {
869
261k
            counts.transition(stream, |counts, stream| {
870
261k
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
261k
                actions.send.handle_error(send_buffer, stream, counts);
875
261k
            })
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}::{closure#0}
Line
Count
Source
869
614
            counts.transition(stream, |counts, stream| {
870
614
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
614
                actions.send.handle_error(send_buffer, stream, counts);
875
614
            })
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}::{closure#0}
Line
Count
Source
869
260k
            counts.transition(stream, |counts, stream| {
870
260k
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
260k
                actions.send.handle_error(send_buffer, stream, counts);
875
260k
            })
876
261k
        });
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
868
614
        self.store.for_each(|stream| {
869
614
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
614
        });
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
868
260k
        self.store.for_each(|stream| {
869
260k
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
260k
        });
877
878
18.1k
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
18.1k
        Ok(())
880
18.1k
    }
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>
Line
Count
Source
846
695
    fn recv_eof<B>(
847
695
        &mut self,
848
695
        send_buffer: &SendBuffer<B>,
849
695
        clear_pending_accept: bool,
850
695
    ) -> Result<(), ()> {
851
695
        let actions = &mut self.actions;
852
695
        let counts = &mut self.counts;
853
695
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
695
        let send_buffer = &mut *send_buffer;
855
856
695
        if actions.conn_error.is_none() {
857
695
            actions.conn_error = Some(
858
695
                io::Error::new(
859
695
                    io::ErrorKind::BrokenPipe,
860
695
                    "connection closed because of a broken pipe",
861
695
                )
862
695
                .into(),
863
695
            );
864
695
        }
865
866
695
        tracing::trace!("Streams::recv_eof");
867
868
695
        self.store.for_each(|stream| {
869
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
        });
877
878
695
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
695
        Ok(())
880
695
    }
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>
Line
Count
Source
846
17.4k
    fn recv_eof<B>(
847
17.4k
        &mut self,
848
17.4k
        send_buffer: &SendBuffer<B>,
849
17.4k
        clear_pending_accept: bool,
850
17.4k
    ) -> Result<(), ()> {
851
17.4k
        let actions = &mut self.actions;
852
17.4k
        let counts = &mut self.counts;
853
17.4k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
17.4k
        let send_buffer = &mut *send_buffer;
855
856
17.4k
        if actions.conn_error.is_none() {
857
5.15k
            actions.conn_error = Some(
858
5.15k
                io::Error::new(
859
5.15k
                    io::ErrorKind::BrokenPipe,
860
5.15k
                    "connection closed because of a broken pipe",
861
5.15k
                )
862
5.15k
                .into(),
863
5.15k
            );
864
12.2k
        }
865
866
17.4k
        tracing::trace!("Streams::recv_eof");
867
868
17.4k
        self.store.for_each(|stream| {
869
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
        });
877
878
17.4k
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
17.4k
        Ok(())
880
17.4k
    }
881
882
1.00M
    fn poll_complete<T, B>(
883
1.00M
        &mut self,
884
1.00M
        send_buffer: &SendBuffer<B>,
885
1.00M
        cx: &mut Context,
886
1.00M
        dst: &mut Codec<T, Prioritized<B>>,
887
1.00M
    ) -> Poll<io::Result<()>>
888
1.00M
    where
889
1.00M
        T: AsyncWrite + Unpin,
890
1.00M
        B: Buf,
891
    {
892
1.00M
        let mut send_buffer = send_buffer.inner.lock().unwrap();
893
1.00M
        let send_buffer = &mut *send_buffer;
894
895
        // Send WINDOW_UPDATE frames first
896
        //
897
        // TODO: It would probably be better to interleave updates w/ data
898
        // frames.
899
1.00M
        ready!(self
900
1.00M
            .actions
901
1.00M
            .recv
902
1.00M
            .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
903
904
        // Send any other pending frames
905
596k
        ready!(self.actions.send.poll_complete(
906
596k
            cx,
907
596k
            send_buffer,
908
596k
            &mut self.store,
909
596k
            &mut self.counts,
910
596k
            dst
911
596k
        ))?;
912
913
        // Nothing else to do, track the task
914
580k
        self.actions.task = Some(cx.waker().clone());
915
916
580k
        Poll::Ready(Ok(()))
917
1.00M
    }
918
919
76.6k
    fn send_reset<B>(
920
76.6k
        &mut self,
921
76.6k
        send_buffer: &SendBuffer<B>,
922
76.6k
        id: StreamId,
923
76.6k
        reason: Reason,
924
76.6k
    ) -> Result<(), crate::proto::error::GoAway> {
925
76.6k
        let key = match self.store.find_entry(id) {
926
23.3k
            Entry::Occupied(e) => e.key(),
927
53.2k
            Entry::Vacant(e) => {
928
                // Resetting a stream we don't know about? That could be OK...
929
                //
930
                // 1. As a server, we just received a request, but that request
931
                //    was bad, so we're resetting before even accepting it.
932
                //    This is totally fine.
933
                //
934
                // 2. The remote may have sent us a frame on new stream that
935
                //    it's *not* supposed to have done, and thus, we don't know
936
                //    the stream. In that case, sending a reset will "open" the
937
                //    stream in our store. Maybe that should be a connection
938
                //    error instead? At least for now, we need to update what
939
                //    our vision of the next stream is.
940
53.2k
                if self.counts.peer().is_local_init(id) {
941
10.4k
                    // We normally would open this stream, so update our
942
10.4k
                    // next-send-id record.
943
10.4k
                    self.actions.send.maybe_reset_next_stream_id(id);
944
42.7k
                } else {
945
42.7k
                    // We normally would recv this stream, so update our
946
42.7k
                    // next-recv-id record.
947
42.7k
                    self.actions.recv.maybe_reset_next_stream_id(id);
948
42.7k
                }
949
950
53.2k
                let stream = Stream::new(id, 0, 0);
951
952
53.2k
                e.insert(stream)
953
            }
954
        };
955
956
76.6k
        let stream = self.store.resolve(key);
957
76.6k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
958
76.6k
        let send_buffer = &mut *send_buffer;
959
76.6k
        self.actions.send_reset(
960
76.6k
            stream,
961
76.6k
            reason,
962
76.6k
            Initiator::Library,
963
76.6k
            &mut self.counts,
964
76.6k
            send_buffer,
965
        )
966
76.6k
    }
967
}
968
969
impl<B> Streams<B, client::Peer>
970
where
971
    B: Buf,
972
{
973
420k
    pub fn poll_pending_open(
974
420k
        &mut self,
975
420k
        cx: &Context,
976
420k
        pending: Option<&OpaqueStreamRef>,
977
420k
    ) -> Poll<Result<(), crate::Error>> {
978
420k
        let mut me = self.inner.lock().unwrap();
979
420k
        let me = &mut *me;
980
981
420k
        me.actions.ensure_no_conn_error()?;
982
420k
        me.actions.send.ensure_next_stream_id()?;
983
984
420k
        if let Some(pending) = pending {
985
1.98k
            let mut stream = me.store.resolve(pending.key);
986
1.98k
            tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
987
1.98k
            if stream.is_pending_open {
988
1.86k
                stream.wait_send(cx);
989
1.86k
                return Poll::Pending;
990
114
            }
991
418k
        }
992
418k
        Poll::Ready(Ok(()))
993
420k
    }
994
}
995
996
impl<B, P> Streams<B, P>
997
where
998
    P: Peer,
999
{
1000
1.23M
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
1.23M
            inner,
1003
1.23M
            send_buffer,
1004
1.23M
            _p,
1005
1.23M
        } = self;
1006
1.23M
        DynStreams {
1007
1.23M
            inner,
1008
1.23M
            send_buffer,
1009
1.23M
            peer: P::r#dyn(),
1010
1.23M
        }
1011
1.23M
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::as_dyn
Line
Count
Source
1000
695
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
695
            inner,
1003
695
            send_buffer,
1004
695
            _p,
1005
695
        } = self;
1006
695
        DynStreams {
1007
695
            inner,
1008
695
            send_buffer,
1009
695
            peer: P::r#dyn(),
1010
695
        }
1011
695
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::as_dyn
Line
Count
Source
1000
1.23M
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
1.23M
            inner,
1003
1.23M
            send_buffer,
1004
1.23M
            _p,
1005
1.23M
        } = self;
1006
1.23M
        DynStreams {
1007
1.23M
            inner,
1008
1.23M
            send_buffer,
1009
1.23M
            peer: P::r#dyn(),
1010
1.23M
        }
1011
1.23M
    }
1012
1013
    /// This function is safe to call multiple times.
1014
    ///
1015
    /// A `Result` is returned to avoid panicking if the mutex is poisoned.
1016
13.3k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
13.3k
        self.as_dyn().recv_eof(clear_pending_accept)
1018
13.3k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::recv_eof
Line
Count
Source
1016
695
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
695
        self.as_dyn().recv_eof(clear_pending_accept)
1018
695
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::recv_eof
Line
Count
Source
1016
12.6k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
12.6k
        self.as_dyn().recv_eof(clear_pending_accept)
1018
12.6k
    }
1019
1020
    pub(crate) fn max_send_streams(&self) -> usize {
1021
        self.inner.lock().unwrap().counts.max_send_streams()
1022
    }
1023
1024
    pub(crate) fn max_recv_streams(&self) -> usize {
1025
        self.inner.lock().unwrap().counts.max_recv_streams()
1026
    }
1027
1028
    #[cfg(feature = "unstable")]
1029
    pub fn num_active_streams(&self) -> usize {
1030
        let me = self.inner.lock().unwrap();
1031
        me.store.num_active_streams()
1032
    }
1033
1034
403
    pub fn has_streams(&self) -> bool {
1035
403
        let me = self.inner.lock().unwrap();
1036
403
        me.counts.has_streams()
1037
403
    }
1038
1039
3.03M
    pub fn has_streams_or_other_references(&self) -> bool {
1040
3.03M
        let me = self.inner.lock().unwrap();
1041
3.03M
        me.counts.has_streams() || me.refs > 1
1042
3.03M
    }
1043
1044
    #[cfg(feature = "unstable")]
1045
    pub fn num_wired_streams(&self) -> usize {
1046
        let me = self.inner.lock().unwrap();
1047
        me.store.num_wired_streams()
1048
    }
1049
}
1050
1051
// no derive because we don't need B and P to be Clone.
1052
impl<B, P> Clone for Streams<B, P>
1053
where
1054
    P: Peer,
1055
{
1056
13.3k
    fn clone(&self) -> Self {
1057
13.3k
        self.inner.lock().unwrap().refs += 1;
1058
13.3k
        Streams {
1059
13.3k
            inner: self.inner.clone(),
1060
13.3k
            send_buffer: self.send_buffer.clone(),
1061
13.3k
            _p: ::std::marker::PhantomData,
1062
13.3k
        }
1063
13.3k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::clone::Clone>::clone
Line
Count
Source
1056
695
    fn clone(&self) -> Self {
1057
695
        self.inner.lock().unwrap().refs += 1;
1058
695
        Streams {
1059
695
            inner: self.inner.clone(),
1060
695
            send_buffer: self.send_buffer.clone(),
1061
695
            _p: ::std::marker::PhantomData,
1062
695
        }
1063
695
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::clone::Clone>::clone
Line
Count
Source
1056
12.6k
    fn clone(&self) -> Self {
1057
12.6k
        self.inner.lock().unwrap().refs += 1;
1058
12.6k
        Streams {
1059
12.6k
            inner: self.inner.clone(),
1060
12.6k
            send_buffer: self.send_buffer.clone(),
1061
12.6k
            _p: ::std::marker::PhantomData,
1062
12.6k
        }
1063
12.6k
    }
1064
}
1065
1066
impl<B, P> Drop for Streams<B, P>
1067
where
1068
    P: Peer,
1069
{
1070
26.7k
    fn drop(&mut self) {
1071
26.7k
        if let Ok(mut inner) = self.inner.lock() {
1072
26.7k
            inner.refs -= 1;
1073
26.7k
            if inner.refs == 1 {
1074
13.3k
                if let Some(task) = inner.actions.task.take() {
1075
69
                    task.wake();
1076
13.2k
                }
1077
13.3k
            }
1078
0
        }
1079
26.7k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
1070
1.39k
    fn drop(&mut self) {
1071
1.39k
        if let Ok(mut inner) = self.inner.lock() {
1072
1.39k
            inner.refs -= 1;
1073
1.39k
            if inner.refs == 1 {
1074
695
                if let Some(task) = inner.actions.task.take() {
1075
0
                    task.wake();
1076
695
                }
1077
695
            }
1078
0
        }
1079
1.39k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
1070
25.3k
    fn drop(&mut self) {
1071
25.3k
        if let Ok(mut inner) = self.inner.lock() {
1072
25.3k
            inner.refs -= 1;
1073
25.3k
            if inner.refs == 1 {
1074
12.6k
                if let Some(task) = inner.actions.task.take() {
1075
69
                    task.wake();
1076
12.6k
                }
1077
12.6k
            }
1078
0
        }
1079
25.3k
    }
1080
}
1081
1082
// ===== impl StreamRef =====
1083
1084
impl<B> StreamRef<B> {
1085
418k
    pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1086
418k
    where
1087
418k
        B: Buf,
1088
    {
1089
418k
        let mut me = self.opaque.inner.lock().unwrap();
1090
418k
        let me = &mut *me;
1091
1092
418k
        let stream = me.store.resolve(self.opaque.key);
1093
418k
        let actions = &mut me.actions;
1094
418k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1095
418k
        let send_buffer = &mut *send_buffer;
1096
1097
418k
        me.counts.transition(stream, |counts, stream| {
1098
            // Create the data frame
1099
418k
            let mut frame = frame::Data::new(stream.id, data);
1100
418k
            frame.set_end_stream(end_stream);
1101
1102
            // Send the data frame
1103
418k
            actions
1104
418k
                .send
1105
418k
                .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1106
418k
        })
1107
418k
    }
1108
1109
    pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1110
        let mut me = self.opaque.inner.lock().unwrap();
1111
        let me = &mut *me;
1112
1113
        let stream = me.store.resolve(self.opaque.key);
1114
        let actions = &mut me.actions;
1115
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1116
        let send_buffer = &mut *send_buffer;
1117
1118
        me.counts.transition(stream, |counts, stream| {
1119
            // Create the trailers frame
1120
            let frame = frame::Headers::trailers(stream.id, trailers);
1121
1122
            // Send the trailers frame
1123
            actions
1124
                .send
1125
                .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1126
        })
1127
    }
1128
1129
    pub fn send_reset(&mut self, reason: Reason) {
1130
        let mut me = self.opaque.inner.lock().unwrap();
1131
        let me = &mut *me;
1132
1133
        let stream = me.store.resolve(self.opaque.key);
1134
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1135
        let send_buffer = &mut *send_buffer;
1136
1137
        match me
1138
            .actions
1139
            .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer)
1140
        {
1141
            Ok(()) => (),
1142
            Err(crate::proto::error::GoAway { .. }) => {
1143
                // this should never happen, because Initiator::User resets do
1144
                // not count toward the local limit.
1145
                // we could perhaps make this state impossible, if we made the
1146
                // initiator argument a generic, and so this could return
1147
                // Infallible instead of an impossible GoAway, but oh well.
1148
                unreachable!("Initiator::User should not error sending reset");
1149
            }
1150
        }
1151
    }
1152
1153
    pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> {
1154
        let mut me = self.opaque.inner.lock().unwrap();
1155
        let me = &mut *me;
1156
1157
        let stream = me.store.resolve(self.opaque.key);
1158
        let actions = &mut me.actions;
1159
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1160
        let send_buffer = &mut *send_buffer;
1161
1162
        me.counts.transition(stream, |counts, stream| {
1163
            // For informational responses (1xx), we need to send headers without
1164
            // changing the stream state. This allows multiple informational responses
1165
            // to be sent before the final response.
1166
1167
            // Validate that this is actually an informational response
1168
            debug_assert!(
1169
                frame.is_informational(),
1170
                "Frame must be informational after conversion from informational response"
1171
            );
1172
1173
            // Ensure the frame is not marked as end_stream for informational responses
1174
            if frame.is_end_stream() {
1175
                return Err(UserError::UnexpectedFrameType);
1176
            }
1177
1178
            // Send the interim informational headers directly to the buffer without state changes
1179
            // This bypasses the normal send_headers flow that would transition the stream state
1180
            actions.send.send_interim_informational_headers(
1181
                frame,
1182
                send_buffer,
1183
                stream,
1184
                counts,
1185
                &mut actions.task,
1186
            )
1187
        })
1188
    }
1189
1190
    pub fn send_response(
1191
        &mut self,
1192
        mut response: Response<()>,
1193
        end_of_stream: bool,
1194
    ) -> Result<(), UserError> {
1195
        // Clear before taking lock, incase extensions contain a StreamRef.
1196
        response.extensions_mut().clear();
1197
        let mut me = self.opaque.inner.lock().unwrap();
1198
        let me = &mut *me;
1199
1200
        let stream = me.store.resolve(self.opaque.key);
1201
        let actions = &mut me.actions;
1202
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1203
        let send_buffer = &mut *send_buffer;
1204
1205
        me.counts.transition(stream, |counts, stream| {
1206
            let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1207
1208
            actions
1209
                .send
1210
                .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1211
        })
1212
    }
1213
1214
    pub fn send_push_promise(
1215
        &mut self,
1216
        mut request: Request<()>,
1217
    ) -> Result<StreamRef<B>, UserError> {
1218
        // Clear before taking lock, incase extensions contain a StreamRef.
1219
        request.extensions_mut().clear();
1220
        let mut me = self.opaque.inner.lock().unwrap();
1221
        let me = &mut *me;
1222
1223
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1224
        let send_buffer = &mut *send_buffer;
1225
1226
        let actions = &mut me.actions;
1227
        let promised_id = actions.send.reserve_local()?;
1228
1229
        let child_key = {
1230
            let mut child_stream = me.store.insert(
1231
                promised_id,
1232
                Stream::new(
1233
                    promised_id,
1234
                    actions.send.init_window_sz(),
1235
                    actions.recv.init_window_sz(),
1236
                ),
1237
            );
1238
            child_stream.state.reserve_local()?;
1239
            child_stream.is_pending_push = true;
1240
            child_stream.key()
1241
        };
1242
1243
        let pushed = {
1244
            let mut stream = me.store.resolve(self.opaque.key);
1245
1246
            let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1247
1248
            actions
1249
                .send
1250
                .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1251
        };
1252
1253
        if let Err(err) = pushed {
1254
            let mut child_stream = me.store.resolve(child_key);
1255
            child_stream.unlink();
1256
            child_stream.remove();
1257
            return Err(err);
1258
        }
1259
1260
        me.refs += 1;
1261
        let opaque =
1262
            OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1263
1264
        Ok(StreamRef {
1265
            opaque,
1266
            send_buffer: self.send_buffer.clone(),
1267
        })
1268
    }
1269
1270
    /// Called by the server after the stream is accepted. Given that clients
1271
    /// initialize streams by sending HEADERS, the request will always be
1272
    /// available.
1273
    ///
1274
    /// # Panics
1275
    ///
1276
    /// This function panics if the request isn't present.
1277
    pub fn take_request(&self) -> Request<()> {
1278
        let mut me = self.opaque.inner.lock().unwrap();
1279
        let me = &mut *me;
1280
1281
        let mut stream = me.store.resolve(self.opaque.key);
1282
        me.actions.recv.take_request(&mut stream)
1283
    }
1284
1285
    /// Called by a client to see if the current stream is pending open
1286
419k
    pub fn is_pending_open(&self) -> bool {
1287
419k
        let mut me = self.opaque.inner.lock().unwrap();
1288
419k
        me.store.resolve(self.opaque.key).is_pending_open
1289
419k
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::is_pending_open
Line
Count
Source
1286
614
    pub fn is_pending_open(&self) -> bool {
1287
614
        let mut me = self.opaque.inner.lock().unwrap();
1288
614
        me.store.resolve(self.opaque.key).is_pending_open
1289
614
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::is_pending_open
Line
Count
Source
1286
418k
    pub fn is_pending_open(&self) -> bool {
1287
418k
        let mut me = self.opaque.inner.lock().unwrap();
1288
418k
        me.store.resolve(self.opaque.key).is_pending_open
1289
418k
    }
1290
1291
    /// Request capacity to send data
1292
    pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1293
        let mut me = self.opaque.inner.lock().unwrap();
1294
        let me = &mut *me;
1295
1296
        let mut stream = me.store.resolve(self.opaque.key);
1297
1298
        me.actions
1299
            .send
1300
            .reserve_capacity(capacity, &mut stream, &mut me.counts)
1301
    }
1302
1303
    /// Returns the stream's current send capacity.
1304
0
    pub fn capacity(&self) -> WindowSize {
1305
0
        let mut me = self.opaque.inner.lock().unwrap();
1306
0
        let me = &mut *me;
1307
1308
0
        let mut stream = me.store.resolve(self.opaque.key);
1309
1310
0
        me.actions.send.capacity(&mut stream)
1311
0
    }
1312
1313
    /// Request to be notified when the stream's capacity increases
1314
0
    pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1315
0
        let mut me = self.opaque.inner.lock().unwrap();
1316
0
        let me = &mut *me;
1317
1318
0
        let mut stream = me.store.resolve(self.opaque.key);
1319
1320
0
        me.actions.send.poll_capacity(cx, &mut stream)
1321
0
    }
1322
1323
    /// Request to be notified for if a `RST_STREAM` is received for this stream.
1324
    pub(crate) fn poll_reset(
1325
        &mut self,
1326
        cx: &Context,
1327
        mode: proto::PollReset,
1328
    ) -> Poll<Result<Reason, crate::Error>> {
1329
        let mut me = self.opaque.inner.lock().unwrap();
1330
        let me = &mut *me;
1331
1332
        let mut stream = me.store.resolve(self.opaque.key);
1333
1334
        me.actions.send.poll_reset(cx, &mut stream, mode)
1335
    }
1336
1337
419k
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
419k
        self.opaque.clone()
1339
419k
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::clone_to_opaque
Line
Count
Source
1337
614
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
614
        self.opaque.clone()
1339
614
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::clone_to_opaque
Line
Count
Source
1337
418k
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
418k
        self.opaque.clone()
1339
418k
    }
1340
1341
    pub fn stream_id(&self) -> StreamId {
1342
        self.opaque.stream_id()
1343
    }
1344
}
1345
1346
impl<B> Clone for StreamRef<B> {
1347
    fn clone(&self) -> Self {
1348
        StreamRef {
1349
            opaque: self.opaque.clone(),
1350
            send_buffer: self.send_buffer.clone(),
1351
        }
1352
    }
1353
}
1354
1355
// ===== impl OpaqueStreamRef =====
1356
1357
impl OpaqueStreamRef {
1358
419k
    fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1359
419k
        stream.ref_inc();
1360
419k
        OpaqueStreamRef {
1361
419k
            inner,
1362
419k
            key: stream.key(),
1363
419k
        }
1364
419k
    }
1365
    /// Called by a client to check for a received response.
1366
445k
    pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1367
445k
        let mut me = self.inner.lock().unwrap();
1368
445k
        let me = &mut *me;
1369
1370
445k
        let mut stream = me.store.resolve(self.key);
1371
1372
445k
        me.actions.recv.poll_response(cx, &mut stream)
1373
445k
    }
1374
1375
    /// Called by a client to check for informational responses (1xx status codes)
1376
0
    pub fn poll_informational(
1377
0
        &mut self,
1378
0
        cx: &Context,
1379
0
    ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
1380
0
        let mut me = self.inner.lock().unwrap();
1381
0
        let me = &mut *me;
1382
1383
0
        let mut stream = me.store.resolve(self.key);
1384
1385
0
        me.actions.recv.poll_informational(cx, &mut stream)
1386
0
    }
1387
    /// Called by a client to check for a pushed request.
1388
0
    pub fn poll_pushed(
1389
0
        &mut self,
1390
0
        cx: &Context,
1391
0
    ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1392
0
        let mut me = self.inner.lock().unwrap();
1393
0
        let me = &mut *me;
1394
1395
0
        let mut stream = me.store.resolve(self.key);
1396
0
        me.actions
1397
0
            .recv
1398
0
            .poll_pushed(cx, &mut stream)
1399
0
            .map_ok(|(h, key)| {
1400
0
                me.refs += 1;
1401
0
                let opaque_ref =
1402
0
                    OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1403
0
                (h, opaque_ref)
1404
0
            })
1405
0
    }
1406
1407
0
    pub fn is_end_stream(&self) -> bool {
1408
0
        let mut me = self.inner.lock().unwrap();
1409
0
        let me = &mut *me;
1410
1411
0
        let stream = me.store.resolve(self.key);
1412
1413
0
        me.actions.recv.is_end_stream(&stream)
1414
0
    }
1415
1416
0
    pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1417
0
        let mut me = self.inner.lock().unwrap();
1418
0
        let me = &mut *me;
1419
1420
0
        let mut stream = me.store.resolve(self.key);
1421
1422
0
        me.actions.recv.poll_data(cx, &mut stream)
1423
0
    }
1424
1425
0
    pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1426
0
        let mut me = self.inner.lock().unwrap();
1427
0
        let me = &mut *me;
1428
1429
0
        let mut stream = me.store.resolve(self.key);
1430
1431
0
        me.actions.recv.poll_trailers(cx, &mut stream)
1432
0
    }
1433
1434
0
    pub(crate) fn available_recv_capacity(&self) -> isize {
1435
0
        let me = self.inner.lock().unwrap();
1436
0
        let me = &*me;
1437
1438
0
        let stream = &me.store[self.key];
1439
0
        stream.recv_flow.available().into()
1440
0
    }
1441
1442
0
    pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1443
0
        let me = self.inner.lock().unwrap();
1444
0
        let me = &*me;
1445
1446
0
        let stream = &me.store[self.key];
1447
0
        stream.in_flight_recv_data
1448
0
    }
1449
1450
    /// Releases recv capacity back to the peer. This may result in sending
1451
    /// WINDOW_UPDATE frames on both the stream and connection.
1452
0
    pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1453
0
        let mut me = self.inner.lock().unwrap();
1454
0
        let me = &mut *me;
1455
1456
0
        let mut stream = me.store.resolve(self.key);
1457
1458
0
        me.actions
1459
0
            .recv
1460
0
            .release_capacity(capacity, &mut stream, &mut me.actions.task)
1461
0
    }
1462
1463
    /// Clear the receive queue and set the status to no longer receive data frames.
1464
844
    pub(crate) fn clear_recv_buffer(&mut self) {
1465
844
        let mut me = self.inner.lock().unwrap();
1466
844
        let me = &mut *me;
1467
1468
844
        let mut stream = me.store.resolve(self.key);
1469
844
        stream.is_recv = false;
1470
844
        me.actions.recv.clear_recv_buffer(&mut stream);
1471
844
    }
1472
1473
0
    pub fn stream_id(&self) -> StreamId {
1474
0
        self.inner.lock().unwrap().store[self.key].id
1475
0
    }
1476
}
1477
1478
impl fmt::Debug for OpaqueStreamRef {
1479
844
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1480
        use std::sync::TryLockError::*;
1481
1482
844
        match self.inner.try_lock() {
1483
844
            Ok(me) => {
1484
844
                let stream = &me.store[self.key];
1485
844
                fmt.debug_struct("OpaqueStreamRef")
1486
844
                    .field("stream_id", &stream.id)
1487
844
                    .field("ref_count", &stream.ref_count)
1488
844
                    .finish()
1489
            }
1490
0
            Err(Poisoned(_)) => fmt
1491
0
                .debug_struct("OpaqueStreamRef")
1492
0
                .field("inner", &"<Poisoned>")
1493
0
                .finish(),
1494
0
            Err(WouldBlock) => fmt
1495
0
                .debug_struct("OpaqueStreamRef")
1496
0
                .field("inner", &"<Locked>")
1497
0
                .finish(),
1498
        }
1499
844
    }
1500
}
1501
1502
impl Clone for OpaqueStreamRef {
1503
420k
    fn clone(&self) -> Self {
1504
        // Increment the ref count
1505
420k
        let mut inner = self.inner.lock().unwrap();
1506
420k
        inner.store.resolve(self.key).ref_inc();
1507
420k
        inner.refs += 1;
1508
1509
420k
        OpaqueStreamRef {
1510
420k
            inner: self.inner.clone(),
1511
420k
            key: self.key,
1512
420k
        }
1513
420k
    }
1514
}
1515
1516
impl Drop for OpaqueStreamRef {
1517
839k
    fn drop(&mut self) {
1518
839k
        drop_stream_ref(&self.inner, self.key);
1519
839k
    }
1520
}
1521
1522
// TODO: Move back in fn above
1523
839k
fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1524
839k
    let mut me = match inner.lock() {
1525
839k
        Ok(inner) => inner,
1526
        Err(_) => {
1527
0
            if ::std::thread::panicking() {
1528
0
                tracing::trace!("StreamRef::drop; mutex poisoned");
1529
0
                return;
1530
            } else {
1531
0
                panic!("StreamRef::drop; mutex poisoned");
1532
            }
1533
        }
1534
    };
1535
1536
839k
    let me = &mut *me;
1537
839k
    me.refs -= 1;
1538
839k
    let mut stream = me.store.resolve(key);
1539
1540
839k
    tracing::trace!("drop_stream_ref; stream={:?}", stream);
1541
1542
    // decrement the stream's ref count by 1.
1543
839k
    stream.ref_dec();
1544
1545
839k
    let actions = &mut me.actions;
1546
1547
    // If the stream is not referenced and it is already
1548
    // closed (does not have to go through logic below
1549
    // of canceling the stream), we should notify the task
1550
    // (connection) so that it can close properly
1551
839k
    if stream.ref_count == 0 && stream.is_closed() {
1552
344k
        if let Some(task) = actions.task.take() {
1553
147
            task.wake();
1554
344k
        }
1555
494k
    }
1556
1557
839k
    me.counts.transition(stream, |counts, stream| {
1558
839k
        maybe_cancel(stream, actions, counts);
1559
1560
839k
        if stream.ref_count == 0 {
1561
            // Release any recv window back to connection, no one can access
1562
            // it anymore.
1563
419k
            actions
1564
419k
                .recv
1565
419k
                .release_closed_capacity(stream, &mut actions.task);
1566
1567
            // We won't be able to reach our push promises anymore
1568
419k
            let mut ppp = stream.pending_push_promises.take();
1569
419k
            while let Some(promise) = ppp.pop(stream.store_mut()) {
1570
184
                counts.transition(promise, |counts, stream| {
1571
184
                    maybe_cancel(stream, actions, counts);
1572
184
                });
1573
            }
1574
420k
        }
1575
839k
    });
1576
839k
}
1577
1578
839k
fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1579
839k
    if stream.is_canceled_interest() {
1580
        // Server is allowed to early respond without fully consuming the client input stream
1581
        // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
1582
        // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
1583
74.5k
        let reason = if counts.peer().is_server()
1584
0
            && stream.state.is_send_closed()
1585
0
            && stream.state.is_recv_streaming()
1586
        {
1587
0
            Reason::NO_ERROR
1588
        } else {
1589
74.5k
            Reason::CANCEL
1590
        };
1591
1592
74.5k
        actions
1593
74.5k
            .send
1594
74.5k
            .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
1595
74.5k
        actions.recv.enqueue_reset_expiration(stream, counts);
1596
765k
    }
1597
839k
}
1598
1599
// ===== impl SendBuffer =====
1600
1601
impl<B> SendBuffer<B> {
1602
13.3k
    fn new() -> Self {
1603
13.3k
        let inner = Mutex::new(Buffer::new());
1604
13.3k
        SendBuffer { inner }
1605
13.3k
    }
<h2::proto::streams::streams::SendBuffer<bytes::bytes::Bytes>>::new
Line
Count
Source
1602
695
    fn new() -> Self {
1603
695
        let inner = Mutex::new(Buffer::new());
1604
695
        SendBuffer { inner }
1605
695
    }
<h2::proto::streams::streams::SendBuffer<bytes::bytes::Bytes>>::new
Line
Count
Source
1602
12.6k
    fn new() -> Self {
1603
12.6k
        let inner = Mutex::new(Buffer::new());
1604
12.6k
        SendBuffer { inner }
1605
12.6k
    }
1606
1607
1.06k
    pub fn is_empty(&self) -> bool {
1608
1.06k
        let buf = self.inner.lock().unwrap();
1609
1.06k
        buf.is_empty()
1610
1.06k
    }
1611
}
1612
1613
// ===== impl Actions =====
1614
1615
impl Actions {
1616
76.6k
    fn send_reset<B>(
1617
76.6k
        &mut self,
1618
76.6k
        stream: store::Ptr,
1619
76.6k
        reason: Reason,
1620
76.6k
        initiator: Initiator,
1621
76.6k
        counts: &mut Counts,
1622
76.6k
        send_buffer: &mut Buffer<Frame<B>>,
1623
76.6k
    ) -> Result<(), crate::proto::error::GoAway> {
1624
76.6k
        counts.transition(stream, |counts, stream| {
1625
76.6k
            if initiator.is_library() {
1626
76.6k
                if counts.can_inc_num_local_error_resets() {
1627
76.6k
                    counts.inc_num_local_error_resets();
1628
76.6k
                } else {
1629
1
                    tracing::warn!(
1630
0
                        "locally-reset streams reached limit ({:?})",
1631
0
                        counts.max_local_error_resets().unwrap(),
1632
                    );
1633
1
                    return Err(crate::proto::error::GoAway {
1634
1
                        reason: Reason::ENHANCE_YOUR_CALM,
1635
1
                        debug_data: "too_many_internal_resets".into(),
1636
1
                    });
1637
                }
1638
0
            }
1639
1640
76.6k
            self.send.send_reset(
1641
76.6k
                reason,
1642
76.6k
                initiator,
1643
76.6k
                send_buffer,
1644
76.6k
                stream,
1645
76.6k
                counts,
1646
76.6k
                &mut self.task,
1647
            );
1648
76.6k
            self.recv.enqueue_reset_expiration(stream, counts);
1649
            // if a RecvStream is parked, ensure it's notified
1650
76.6k
            stream.notify_recv();
1651
1652
76.6k
            Ok(())
1653
76.6k
        })
1654
76.6k
    }
1655
1656
20.6k
    fn reset_on_recv_stream_err<B>(
1657
20.6k
        &mut self,
1658
20.6k
        buffer: &mut Buffer<Frame<B>>,
1659
20.6k
        stream: &mut store::Ptr,
1660
20.6k
        counts: &mut Counts,
1661
20.6k
        res: Result<(), Error>,
1662
20.6k
    ) -> Result<(), Error> {
1663
1.04k
        if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1664
1.01k
            debug_assert_eq!(stream_id, stream.id);
1665
1666
1.01k
            if counts.can_inc_num_local_error_resets() {
1667
1.01k
                counts.inc_num_local_error_resets();
1668
1669
                // Reset the stream.
1670
1.01k
                self.send
1671
1.01k
                    .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1672
1.01k
                self.recv.enqueue_reset_expiration(stream, counts);
1673
                // if a RecvStream is parked, ensure it's notified
1674
1.01k
                stream.notify_recv();
1675
1.01k
                Ok(())
1676
            } else {
1677
0
                tracing::warn!(
1678
0
                    "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})",
1679
0
                    counts.max_local_error_resets().unwrap(),
1680
                );
1681
0
                Err(Error::library_go_away_data(
1682
0
                    Reason::ENHANCE_YOUR_CALM,
1683
0
                    "too_many_internal_resets",
1684
0
                ))
1685
            }
1686
        } else {
1687
19.6k
            res
1688
        }
1689
20.6k
    }
1690
1691
1.13k
    fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1692
1.13k
        if peer.is_local_init(id) {
1693
676
            self.send.ensure_not_idle(id)
1694
        } else {
1695
459
            self.recv.ensure_not_idle(id)
1696
        }
1697
1.13k
    }
1698
1699
839k
    fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1700
839k
        if let Some(ref err) = self.conn_error {
1701
71
            Err(err.clone())
1702
        } else {
1703
839k
            Ok(())
1704
        }
1705
839k
    }
1706
1707
    /// Check if we possibly could have processed and since forgotten this stream.
1708
    ///
1709
    /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1710
    /// the stream to free up memory. It's possible that the remote peer had
1711
    /// frames in-flight, and by the time we receive them, our own state is
1712
    /// gone. We *could* tear everything down by sending a GOAWAY, but it
1713
    /// is more likely to be latency/memory constraints that caused this,
1714
    /// and not a bad actor. So be less catastrophic, the spec allows
1715
    /// us to send another RST_STREAM of STREAM_CLOSED.
1716
44.6k
    fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1717
44.6k
        if id.is_zero() {
1718
0
            return false;
1719
44.6k
        }
1720
44.6k
        if peer.is_local_init(id) {
1721
5.89k
            self.send.may_have_created_stream(id)
1722
        } else {
1723
38.7k
            self.recv.may_have_created_stream(id)
1724
        }
1725
44.6k
    }
1726
1727
18.1k
    fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1728
18.1k
        self.recv.clear_queues(clear_pending_accept, store, counts);
1729
18.1k
        self.send.clear_queues(store, counts);
1730
18.1k
    }
1731
}