Coverage Report

Created: 2025-12-31 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/streams.rs
Line
Count
Source
1
use super::recv::RecvHeaderBlockError;
2
use super::store::{self, Entry, Resolve, Store};
3
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4
use crate::codec::{Codec, SendError, UserError};
5
use crate::ext::Protocol;
6
use crate::frame::{self, Frame, Reason};
7
use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8
use crate::{client, proto, server};
9
10
use bytes::{Buf, Bytes};
11
use http::{HeaderMap, Request, Response};
12
use std::task::{Context, Poll, Waker};
13
use tokio::io::AsyncWrite;
14
15
use std::sync::{Arc, Mutex};
16
use std::{fmt, io};
17
18
#[derive(Debug)]
19
pub(crate) struct Streams<B, P>
20
where
21
    P: Peer,
22
{
23
    /// Holds most of the connection and stream related state for processing
24
    /// HTTP/2 frames associated with streams.
25
    inner: Arc<Mutex<Inner>>,
26
27
    /// This is the queue of frames to be written to the wire. This is split out
28
    /// to avoid requiring a `B` generic on all public API types even if `B` is
29
    /// not technically required.
30
    ///
31
    /// Currently, splitting this out requires a second `Arc` + `Mutex`.
32
    /// However, it should be possible to avoid this duplication with a little
33
    /// bit of unsafe code. This optimization has been postponed until it has
34
    /// been shown to be necessary.
35
    send_buffer: Arc<SendBuffer<B>>,
36
37
    _p: ::std::marker::PhantomData<P>,
38
}
39
40
// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
41
// Ensures that the methods only get one instantiation, instead of two (client and server)
42
#[derive(Debug)]
43
pub(crate) struct DynStreams<'a, B> {
44
    inner: &'a Mutex<Inner>,
45
46
    send_buffer: &'a SendBuffer<B>,
47
48
    peer: peer::Dyn,
49
}
50
51
/// Reference to the stream state
52
#[derive(Debug)]
53
pub(crate) struct StreamRef<B> {
54
    opaque: OpaqueStreamRef,
55
    send_buffer: Arc<SendBuffer<B>>,
56
}
57
58
/// Reference to the stream state that hides the send data chunk generic
59
pub(crate) struct OpaqueStreamRef {
60
    inner: Arc<Mutex<Inner>>,
61
    key: store::Key,
62
}
63
64
/// Fields needed to manage state related to managing the set of streams. This
65
/// is mostly split out to make ownership happy.
66
///
67
/// TODO: better name
68
#[derive(Debug)]
69
struct Inner {
70
    /// Tracks send & recv stream concurrency.
71
    counts: Counts,
72
73
    /// Connection level state and performs actions on streams
74
    actions: Actions,
75
76
    /// Stores stream state
77
    store: Store,
78
79
    /// The number of stream refs to this shared state.
80
    refs: usize,
81
}
82
83
#[derive(Debug)]
84
struct Actions {
85
    /// Manages state transitions initiated by receiving frames
86
    recv: Recv,
87
88
    /// Manages state transitions initiated by sending frames
89
    send: Send,
90
91
    /// Task that calls `poll_complete`.
92
    task: Option<Waker>,
93
94
    /// If the connection errors, a copy is kept for any StreamRefs.
95
    conn_error: Option<proto::Error>,
96
}
97
98
/// Contains the buffer of frames to be written to the wire.
99
#[derive(Debug)]
100
struct SendBuffer<B> {
101
    inner: Mutex<Buffer<Frame<B>>>,
102
}
103
104
// ===== impl Streams =====
105
106
impl<B, P> Streams<B, P>
107
where
108
    B: Buf,
109
    P: Peer,
110
{
111
13.7k
    pub fn new(config: Config) -> Self {
112
13.7k
        let peer = P::r#dyn();
113
114
13.7k
        Streams {
115
13.7k
            inner: Inner::new(peer, config),
116
13.7k
            send_buffer: Arc::new(SendBuffer::new()),
117
13.7k
            _p: ::std::marker::PhantomData,
118
13.7k
        }
119
13.7k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::new
Line
Count
Source
111
733
    pub fn new(config: Config) -> Self {
112
733
        let peer = P::r#dyn();
113
114
733
        Streams {
115
733
            inner: Inner::new(peer, config),
116
733
            send_buffer: Arc::new(SendBuffer::new()),
117
733
            _p: ::std::marker::PhantomData,
118
733
        }
119
733
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::new
Line
Count
Source
111
12.9k
    pub fn new(config: Config) -> Self {
112
12.9k
        let peer = P::r#dyn();
113
114
12.9k
        Streams {
115
12.9k
            inner: Inner::new(peer, config),
116
12.9k
            send_buffer: Arc::new(SendBuffer::new()),
117
12.9k
            _p: ::std::marker::PhantomData,
118
12.9k
        }
119
12.9k
    }
120
121
0
    pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
122
0
        let mut me = self.inner.lock().unwrap();
123
0
        let me = &mut *me;
124
125
0
        me.actions
126
0
            .recv
127
0
            .set_target_connection_window(size, &mut me.actions.task)
128
0
    }
Unexecuted instantiation: <h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::set_target_connection_window_size
Unexecuted instantiation: <h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::set_target_connection_window_size
129
130
    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131
        let mut me = self.inner.lock().unwrap();
132
        let me = &mut *me;
133
        me.actions.recv.next_incoming(&mut me.store).map(|key| {
134
            let stream = &mut me.store.resolve(key);
135
            tracing::trace!(
136
                "next_incoming; id={:?}, state={:?}",
137
                stream.id,
138
                stream.state
139
            );
140
            // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141
            // the lock, so it can't.
142
            me.refs += 1;
143
144
            // Pending-accepted remotely-reset streams are counted.
145
            if stream.state.is_remote_reset() {
146
                me.counts.dec_num_remote_reset_streams();
147
            }
148
149
            StreamRef {
150
                opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
151
                send_buffer: self.send_buffer.clone(),
152
            }
153
        })
154
    }
155
156
2.44M
    pub fn send_pending_refusal<T>(
157
2.44M
        &mut self,
158
2.44M
        cx: &mut Context,
159
2.44M
        dst: &mut Codec<T, Prioritized<B>>,
160
2.44M
    ) -> Poll<io::Result<()>>
161
2.44M
    where
162
2.44M
        T: AsyncWrite + Unpin,
163
    {
164
2.44M
        let mut me = self.inner.lock().unwrap();
165
2.44M
        let me = &mut *me;
166
2.44M
        me.actions.recv.send_pending_refusal(cx, dst)
167
2.44M
    }
168
169
2.46M
    pub fn clear_expired_reset_streams(&mut self) {
170
2.46M
        let mut me = self.inner.lock().unwrap();
171
2.46M
        let me = &mut *me;
172
2.46M
        me.actions
173
2.46M
            .recv
174
2.46M
            .clear_expired_reset_streams(&mut me.store, &mut me.counts);
175
2.46M
    }
176
177
2.36M
    pub fn poll_complete<T>(
178
2.36M
        &mut self,
179
2.36M
        cx: &mut Context,
180
2.36M
        dst: &mut Codec<T, Prioritized<B>>,
181
2.36M
    ) -> Poll<io::Result<()>>
182
2.36M
    where
183
2.36M
        T: AsyncWrite + Unpin,
184
    {
185
2.36M
        let mut me = self.inner.lock().unwrap();
186
2.36M
        me.poll_complete(&self.send_buffer, cx, dst)
187
2.36M
    }
188
189
7.03k
    pub fn apply_remote_settings(
190
7.03k
        &mut self,
191
7.03k
        frame: &frame::Settings,
192
7.03k
        is_initial: bool,
193
7.03k
    ) -> Result<(), Error> {
194
7.03k
        let mut me = self.inner.lock().unwrap();
195
7.03k
        let me = &mut *me;
196
197
7.03k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
198
7.03k
        let send_buffer = &mut *send_buffer;
199
200
7.03k
        me.counts.apply_remote_settings(frame, is_initial);
201
202
7.03k
        me.actions.send.apply_remote_settings(
203
7.03k
            frame,
204
7.03k
            send_buffer,
205
7.03k
            &mut me.store,
206
7.03k
            &mut me.counts,
207
7.03k
            &mut me.actions.task,
208
        )
209
7.03k
    }
210
211
5
    pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
212
5
        let mut me = self.inner.lock().unwrap();
213
5
        let me = &mut *me;
214
215
5
        me.actions.recv.apply_local_settings(frame, &mut me.store)
216
5
    }
217
218
463k
    pub fn send_request(
219
463k
        &mut self,
220
463k
        mut request: Request<()>,
221
463k
        end_of_stream: bool,
222
463k
        pending: Option<&OpaqueStreamRef>,
223
463k
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
463k
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
463k
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
463k
        let mut me = self.inner.lock().unwrap();
238
463k
        let me = &mut *me;
239
240
463k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
463k
        let send_buffer = &mut *send_buffer;
242
243
463k
        me.actions.ensure_no_conn_error()?;
244
463k
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
463k
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
463k
        }
257
258
463k
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
463k
        }
262
263
463k
        let stream_id = me.actions.send.open()?;
264
265
463k
        let mut stream = Stream::new(
266
463k
            stream_id,
267
463k
            me.actions.send.init_window_sz(),
268
463k
            me.actions.recv.init_window_sz(),
269
        );
270
271
463k
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
463k
        }
274
275
        // Convert the message
276
463k
        let headers =
277
463k
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
463k
        let mut stream = me.store.insert(stream.id, stream);
280
281
463k
        let sent = me.actions.send.send_headers(
282
463k
            headers,
283
463k
            send_buffer,
284
463k
            &mut stream,
285
463k
            &mut me.counts,
286
463k
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
463k
        if let Err(err) = sent {
292
90
            stream.unlink();
293
90
            stream.remove();
294
90
            return Err(err.into());
295
463k
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
463k
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
463k
        me.refs += 1;
304
305
463k
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
463k
        Ok((
307
463k
            StreamRef {
308
463k
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
463k
                send_buffer: self.send_buffer.clone(),
310
463k
            },
311
463k
            is_full,
312
463k
        ))
313
463k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::send_request
Line
Count
Source
218
733
    pub fn send_request(
219
733
        &mut self,
220
733
        mut request: Request<()>,
221
733
        end_of_stream: bool,
222
733
        pending: Option<&OpaqueStreamRef>,
223
733
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
733
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
733
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
733
        let mut me = self.inner.lock().unwrap();
238
733
        let me = &mut *me;
239
240
733
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
733
        let send_buffer = &mut *send_buffer;
242
243
733
        me.actions.ensure_no_conn_error()?;
244
733
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
733
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
733
        }
257
258
733
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
733
        }
262
263
733
        let stream_id = me.actions.send.open()?;
264
265
733
        let mut stream = Stream::new(
266
733
            stream_id,
267
733
            me.actions.send.init_window_sz(),
268
733
            me.actions.recv.init_window_sz(),
269
        );
270
271
733
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
733
        }
274
275
        // Convert the message
276
733
        let headers =
277
733
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
733
        let mut stream = me.store.insert(stream.id, stream);
280
281
733
        let sent = me.actions.send.send_headers(
282
733
            headers,
283
733
            send_buffer,
284
733
            &mut stream,
285
733
            &mut me.counts,
286
733
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
733
        if let Err(err) = sent {
292
90
            stream.unlink();
293
90
            stream.remove();
294
90
            return Err(err.into());
295
643
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
643
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
643
        me.refs += 1;
304
305
643
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
643
        Ok((
307
643
            StreamRef {
308
643
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
643
                send_buffer: self.send_buffer.clone(),
310
643
            },
311
643
            is_full,
312
643
        ))
313
733
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::send_request
Line
Count
Source
218
462k
    pub fn send_request(
219
462k
        &mut self,
220
462k
        mut request: Request<()>,
221
462k
        end_of_stream: bool,
222
462k
        pending: Option<&OpaqueStreamRef>,
223
462k
    ) -> Result<(StreamRef<B>, bool), SendError> {
224
        use super::stream::ContentLength;
225
        use http::Method;
226
227
462k
        let protocol = request.extensions_mut().remove::<Protocol>();
228
229
        // Clear before taking lock, incase extensions contain a StreamRef.
230
462k
        request.extensions_mut().clear();
231
232
        // TODO: There is a hazard with assigning a stream ID before the
233
        // prioritize layer. If prioritization reorders new streams, this
234
        // implicitly closes the earlier stream IDs.
235
        //
236
        // See: hyperium/h2#11
237
462k
        let mut me = self.inner.lock().unwrap();
238
462k
        let me = &mut *me;
239
240
462k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241
462k
        let send_buffer = &mut *send_buffer;
242
243
462k
        me.actions.ensure_no_conn_error()?;
244
462k
        me.actions.send.ensure_next_stream_id()?;
245
246
        // The `pending` argument is provided by the `Client`, and holds
247
        // a store `Key` of a `Stream` that may have been not been opened
248
        // yet.
249
        //
250
        // If that stream is still pending, the Client isn't allowed to
251
        // queue up another pending stream. They should use `poll_ready`.
252
462k
        if let Some(stream) = pending {
253
0
            if me.store.resolve(stream.key).is_pending_open {
254
0
                return Err(UserError::Rejected.into());
255
0
            }
256
462k
        }
257
258
462k
        if me.counts.peer().is_server() {
259
            // Servers cannot open streams. PushPromise must first be reserved.
260
0
            return Err(UserError::UnexpectedFrameType.into());
261
462k
        }
262
263
462k
        let stream_id = me.actions.send.open()?;
264
265
462k
        let mut stream = Stream::new(
266
462k
            stream_id,
267
462k
            me.actions.send.init_window_sz(),
268
462k
            me.actions.recv.init_window_sz(),
269
        );
270
271
462k
        if *request.method() == Method::HEAD {
272
0
            stream.content_length = ContentLength::Head;
273
462k
        }
274
275
        // Convert the message
276
462k
        let headers =
277
462k
            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278
279
462k
        let mut stream = me.store.insert(stream.id, stream);
280
281
462k
        let sent = me.actions.send.send_headers(
282
462k
            headers,
283
462k
            send_buffer,
284
462k
            &mut stream,
285
462k
            &mut me.counts,
286
462k
            &mut me.actions.task,
287
        );
288
289
        // send_headers can return a UserError, if it does,
290
        // we should forget about this stream.
291
462k
        if let Err(err) = sent {
292
0
            stream.unlink();
293
0
            stream.remove();
294
0
            return Err(err.into());
295
462k
        }
296
297
        // Given that the stream has been initialized, it should not be in the
298
        // closed state.
299
462k
        debug_assert!(!stream.state.is_closed());
300
301
        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302
        // the lock, so it can't.
303
462k
        me.refs += 1;
304
305
462k
        let is_full = me.counts.next_send_stream_will_reach_capacity();
306
462k
        Ok((
307
462k
            StreamRef {
308
462k
                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309
462k
                send_buffer: self.send_buffer.clone(),
310
462k
            },
311
462k
            is_full,
312
462k
        ))
313
462k
    }
314
315
    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
316
        self.inner
317
            .lock()
318
            .unwrap()
319
            .actions
320
            .send
321
            .is_extended_connect_protocol_enabled()
322
    }
323
324
    pub fn current_max_send_streams(&self) -> usize {
325
        let me = self.inner.lock().unwrap();
326
        me.counts.max_send_streams()
327
    }
328
329
    pub fn current_max_recv_streams(&self) -> usize {
330
        let me = self.inner.lock().unwrap();
331
        me.counts.max_recv_streams()
332
    }
333
}
334
335
impl<B> DynStreams<'_, B> {
336
1.15k
    pub fn is_buffer_empty(&self) -> bool {
337
1.15k
        self.send_buffer.is_empty()
338
1.15k
    }
339
340
0
    pub fn is_server(&self) -> bool {
341
0
        self.peer.is_server()
342
0
    }
343
344
8.91k
    pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
345
8.91k
        let mut me = self.inner.lock().unwrap();
346
347
8.91k
        me.recv_headers(self.peer, self.send_buffer, frame)
348
8.91k
    }
349
350
72.5k
    pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
351
72.5k
        let mut me = self.inner.lock().unwrap();
352
72.5k
        me.recv_data(self.peer, self.send_buffer, frame)
353
72.5k
    }
354
355
1.49k
    pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
356
1.49k
        let mut me = self.inner.lock().unwrap();
357
358
1.49k
        me.recv_reset(self.send_buffer, frame)
359
1.49k
    }
360
361
    /// Notify all streams that a connection-level error happened.
362
7.55k
    pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
363
7.55k
        let mut me = self.inner.lock().unwrap();
364
7.55k
        me.handle_error(self.send_buffer, err)
365
7.55k
    }
366
367
6.01k
    pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
368
6.01k
        let mut me = self.inner.lock().unwrap();
369
6.01k
        me.recv_go_away(self.send_buffer, frame)
370
6.01k
    }
371
372
6.43k
    pub fn last_processed_id(&self) -> StreamId {
373
6.43k
        self.inner.lock().unwrap().actions.recv.last_processed_id()
374
6.43k
    }
375
376
1.66k
    pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
377
1.66k
        let mut me = self.inner.lock().unwrap();
378
1.66k
        me.recv_window_update(self.send_buffer, frame)
379
1.66k
    }
380
381
1.32k
    pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
382
1.32k
        let mut me = self.inner.lock().unwrap();
383
1.32k
        me.recv_push_promise(self.send_buffer, frame)
384
1.32k
    }
385
386
18.5k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
18.5k
        let mut me = self.inner.lock().map_err(|_| ())?;
388
18.5k
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
18.5k
    }
<h2::proto::streams::streams::DynStreams<bytes::bytes::Bytes>>::recv_eof
Line
Count
Source
386
733
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
733
        let mut me = self.inner.lock().map_err(|_| ())?;
388
733
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
733
    }
<h2::proto::streams::streams::DynStreams<bytes::bytes::Bytes>>::recv_eof
Line
Count
Source
386
17.8k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
387
17.8k
        let mut me = self.inner.lock().map_err(|_| ())?;
388
17.8k
        me.recv_eof(self.send_buffer, clear_pending_accept)
389
17.8k
    }
390
391
84.7k
    pub fn send_reset(
392
84.7k
        &mut self,
393
84.7k
        id: StreamId,
394
84.7k
        reason: Reason,
395
84.7k
    ) -> Result<(), crate::proto::error::GoAway> {
396
84.7k
        let mut me = self.inner.lock().unwrap();
397
84.7k
        me.send_reset(self.send_buffer, id, reason)
398
84.7k
    }
399
400
0
    pub fn send_go_away(&mut self, last_processed_id: StreamId) {
401
0
        let mut me = self.inner.lock().unwrap();
402
0
        me.actions.recv.go_away(last_processed_id);
403
0
    }
404
}
405
406
impl Inner {
407
13.7k
    fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
408
13.7k
        Arc::new(Mutex::new(Inner {
409
13.7k
            counts: Counts::new(peer, &config),
410
13.7k
            actions: Actions {
411
13.7k
                recv: Recv::new(peer, &config),
412
13.7k
                send: Send::new(&config),
413
13.7k
                task: None,
414
13.7k
                conn_error: None,
415
13.7k
            },
416
13.7k
            store: Store::new(),
417
13.7k
            refs: 1,
418
13.7k
        }))
419
13.7k
    }
420
421
8.91k
    fn recv_headers<B>(
422
8.91k
        &mut self,
423
8.91k
        peer: peer::Dyn,
424
8.91k
        send_buffer: &SendBuffer<B>,
425
8.91k
        frame: frame::Headers,
426
8.91k
    ) -> Result<(), Error> {
427
8.91k
        let id = frame.stream_id();
428
429
        // The GOAWAY process has begun. All streams with a greater ID than
430
        // specified as part of GOAWAY should be ignored.
431
8.91k
        if id > self.actions.recv.max_stream_id() {
432
0
            tracing::trace!(
433
0
                "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
434
                id,
435
0
                self.actions.recv.max_stream_id()
436
            );
437
0
            return Ok(());
438
8.91k
        }
439
440
8.91k
        let key = match self.store.find_entry(id) {
441
6.75k
            Entry::Occupied(e) => e.key(),
442
2.15k
            Entry::Vacant(e) => {
443
                // Client: it's possible to send a request, and then send
444
                // a RST_STREAM while the response HEADERS were in transit.
445
                //
446
                // Server: we can't reset a stream before having received
447
                // the request headers, so don't allow.
448
2.15k
                if !peer.is_server() {
449
                    // This may be response headers for a stream we've already
450
                    // forgotten about...
451
2.15k
                    if self.actions.may_have_forgotten_stream(peer, id) {
452
1.82k
                        tracing::debug!(
453
0
                            "recv_headers for old stream={:?}, sending STREAM_CLOSED",
454
                            id,
455
                        );
456
1.82k
                        return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
457
331
                    }
458
0
                }
459
460
331
                match self
461
331
                    .actions
462
331
                    .recv
463
331
                    .open(id, Open::Headers, &mut self.counts)?
464
                {
465
0
                    Some(stream_id) => {
466
0
                        let stream = Stream::new(
467
0
                            stream_id,
468
0
                            self.actions.send.init_window_sz(),
469
0
                            self.actions.recv.init_window_sz(),
470
                        );
471
472
0
                        e.insert(stream)
473
                    }
474
0
                    None => return Ok(()),
475
                }
476
            }
477
        };
478
479
6.75k
        let stream = self.store.resolve(key);
480
481
6.75k
        if stream.state.is_local_error() {
482
            // Locally reset streams must ignore frames "for some time".
483
            // This is because the remote may have sent trailers before
484
            // receiving the RST_STREAM frame.
485
3.56k
            tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
486
3.56k
            return Ok(());
487
3.19k
        }
488
489
3.19k
        let actions = &mut self.actions;
490
3.19k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
491
3.19k
        let send_buffer = &mut *send_buffer;
492
493
3.19k
        self.counts.transition(stream, |counts, stream| {
494
3.19k
            tracing::trace!(
495
0
                "recv_headers; stream={:?}; state={:?}",
496
0
                stream.id,
497
0
                stream.state
498
            );
499
500
3.19k
            let res = if stream.state.is_recv_headers() {
501
2.32k
                match actions.recv.recv_headers(frame, stream, counts) {
502
2.27k
                    Ok(()) => Ok(()),
503
0
                    Err(RecvHeaderBlockError::Oversize(resp)) => {
504
0
                        if let Some(resp) = resp {
505
0
                            let sent = actions.send.send_headers(
506
0
                                resp, send_buffer, stream, counts, &mut actions.task);
507
0
                            debug_assert!(sent.is_ok(), "oversize response should not fail");
508
509
0
                            actions.send.schedule_implicit_reset(
510
0
                                stream,
511
                                Reason::PROTOCOL_ERROR,
512
0
                                counts,
513
0
                                &mut actions.task);
514
515
0
                            actions.recv.enqueue_reset_expiration(stream, counts);
516
517
0
                            Ok(())
518
                        } else {
519
0
                            Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR))
520
                        }
521
                    },
522
47
                    Err(RecvHeaderBlockError::State(err)) => Err(err),
523
                }
524
            } else {
525
872
                if !frame.is_end_stream() {
526
                    // Receiving trailers that don't set EOS is a "malformed"
527
                    // message. Malformed messages are a stream error.
528
817
                    proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
529
817
                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
530
55
                }
531
532
55
                actions.recv.recv_trailers(frame, stream)
533
            };
534
535
2.37k
            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
536
3.19k
        })
537
8.91k
    }
538
539
72.5k
    fn recv_data<B>(
540
72.5k
        &mut self,
541
72.5k
        peer: peer::Dyn,
542
72.5k
        send_buffer: &SendBuffer<B>,
543
72.5k
        frame: frame::Data,
544
72.5k
    ) -> Result<(), Error> {
545
72.5k
        let id = frame.stream_id();
546
547
72.5k
        let stream = match self.store.find_mut(&id) {
548
23.2k
            Some(stream) => stream,
549
            None => {
550
                // The GOAWAY process has begun. All streams with a greater ID
551
                // than specified as part of GOAWAY should be ignored.
552
49.3k
                if id > self.actions.recv.max_stream_id() {
553
0
                    tracing::trace!(
554
0
                        "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
555
                        id,
556
0
                        self.actions.recv.max_stream_id()
557
                    );
558
0
                    return Ok(());
559
49.3k
                }
560
561
49.3k
                if self.actions.may_have_forgotten_stream(peer, id) {
562
49.1k
                    tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
563
564
49.1k
                    let sz = frame.payload().len();
565
                    // This should have been enforced at the codec::FramedRead layer, so
566
                    // this is just a sanity check.
567
49.1k
                    assert!(sz <= super::MAX_WINDOW_SIZE as usize);
568
49.1k
                    let sz = sz as WindowSize;
569
570
49.1k
                    self.actions.recv.ignore_data(sz)?;
571
49.1k
                    return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
572
150
                }
573
574
150
                proto_err!(conn: "recv_data: stream not found; id={:?}", id);
575
150
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
576
            }
577
        };
578
579
23.2k
        let actions = &mut self.actions;
580
23.2k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
581
23.2k
        let send_buffer = &mut *send_buffer;
582
583
23.2k
        self.counts.transition(stream, |counts, stream| {
584
23.2k
            let sz = frame.payload().len();
585
23.2k
            let res = actions.recv.recv_data(frame, stream);
586
587
            // Any stream error after receiving a DATA frame means
588
            // we won't give the data to the user, and so they can't
589
            // release the capacity. We do it automatically.
590
64
            if let Err(Error::Reset(..)) = res {
591
34
                actions
592
34
                    .recv
593
34
                    .release_connection_capacity(sz as WindowSize, &mut None);
594
23.2k
            }
595
23.2k
            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
596
23.2k
        })
597
72.5k
    }
598
599
1.49k
    fn recv_reset<B>(
600
1.49k
        &mut self,
601
1.49k
        send_buffer: &SendBuffer<B>,
602
1.49k
        frame: frame::Reset,
603
1.49k
    ) -> Result<(), Error> {
604
1.49k
        let id = frame.stream_id();
605
606
1.49k
        if id.is_zero() {
607
2
            proto_err!(conn: "recv_reset: invalid stream ID 0");
608
2
            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
609
1.48k
        }
610
611
        // The GOAWAY process has begun. All streams with a greater ID than
612
        // specified as part of GOAWAY should be ignored.
613
1.48k
        if id > self.actions.recv.max_stream_id() {
614
0
            tracing::trace!(
615
0
                "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
616
                id,
617
0
                self.actions.recv.max_stream_id()
618
            );
619
0
            return Ok(());
620
1.48k
        }
621
622
1.48k
        let stream = match self.store.find_mut(&id) {
623
666
            Some(stream) => stream,
624
            None => {
625
                // TODO: Are there other error cases?
626
823
                self.actions
627
823
                    .ensure_not_idle(self.counts.peer(), id)
628
823
                    .map_err(Error::library_go_away)?;
629
630
782
                return Ok(());
631
            }
632
        };
633
634
666
        let mut send_buffer = send_buffer.inner.lock().unwrap();
635
666
        let send_buffer = &mut *send_buffer;
636
637
666
        let actions = &mut self.actions;
638
639
666
        self.counts.transition(stream, |counts, stream| {
640
666
            actions.recv.recv_reset(frame, stream, counts)?;
641
666
            actions.send.handle_error(send_buffer, stream, counts);
642
666
            assert!(stream.state.is_closed());
643
666
            Ok(())
644
666
        })
645
1.49k
    }
646
647
1.66k
    fn recv_window_update<B>(
648
1.66k
        &mut self,
649
1.66k
        send_buffer: &SendBuffer<B>,
650
1.66k
        frame: frame::WindowUpdate,
651
1.66k
    ) -> Result<(), Error> {
652
1.66k
        let id = frame.stream_id();
653
654
1.66k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
655
1.66k
        let send_buffer = &mut *send_buffer;
656
657
1.66k
        if id.is_zero() {
658
872
            self.actions
659
872
                .send
660
872
                .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
661
872
                .map_err(Error::library_go_away)?;
662
        } else {
663
            // The remote may send window updates for streams that the local now
664
            // considers closed. It's ok...
665
792
            if let Some(mut stream) = self.store.find_mut(&id) {
666
336
                let res = self
667
336
                    .actions
668
336
                    .send
669
336
                    .recv_stream_window_update(
670
336
                        frame.size_increment(),
671
336
                        send_buffer,
672
336
                        &mut stream,
673
336
                        &mut self.counts,
674
336
                        &mut self.actions.task,
675
                    )
676
336
                    .map_err(|reason| Error::library_reset(id, reason));
677
678
336
                return self.actions.reset_on_recv_stream_err(
679
336
                    send_buffer,
680
336
                    &mut stream,
681
336
                    &mut self.counts,
682
336
                    res,
683
                );
684
            } else {
685
456
                self.actions
686
456
                    .ensure_not_idle(self.counts.peer(), id)
687
456
                    .map_err(Error::library_go_away)?;
688
            }
689
        }
690
691
1.27k
        Ok(())
692
1.66k
    }
693
694
7.55k
    fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
695
7.55k
        let actions = &mut self.actions;
696
7.55k
        let counts = &mut self.counts;
697
7.55k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
698
7.55k
        let send_buffer = &mut *send_buffer;
699
700
7.55k
        let last_processed_id = actions.recv.last_processed_id();
701
702
200k
        self.store.for_each(|stream| {
703
200k
            counts.transition(stream, |counts, stream| {
704
200k
                actions.recv.handle_error(&err, &mut *stream);
705
200k
                actions.send.handle_error(send_buffer, stream, counts);
706
200k
            })
707
200k
        });
708
709
7.55k
        actions.conn_error = Some(err);
710
711
7.55k
        last_processed_id
712
7.55k
    }
713
714
6.01k
    fn recv_go_away<B>(
715
6.01k
        &mut self,
716
6.01k
        send_buffer: &SendBuffer<B>,
717
6.01k
        frame: &frame::GoAway,
718
6.01k
    ) -> Result<(), Error> {
719
6.01k
        let actions = &mut self.actions;
720
6.01k
        let counts = &mut self.counts;
721
6.01k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
722
6.01k
        let send_buffer = &mut *send_buffer;
723
724
6.01k
        let last_stream_id = frame.last_stream_id();
725
726
6.01k
        actions.send.recv_go_away(last_stream_id)?;
727
728
5.98k
        let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
729
730
991k
        self.store.for_each(|stream| {
731
991k
            if stream.id > last_stream_id {
732
280k
                counts.transition(stream, |counts, stream| {
733
280k
                    actions.recv.handle_error(&err, &mut *stream);
734
280k
                    actions.send.handle_error(send_buffer, stream, counts);
735
280k
                })
736
710k
            }
737
991k
        });
738
739
5.98k
        actions.conn_error = Some(err);
740
741
5.98k
        Ok(())
742
6.01k
    }
743
744
1.32k
    fn recv_push_promise<B>(
745
1.32k
        &mut self,
746
1.32k
        send_buffer: &SendBuffer<B>,
747
1.32k
        frame: frame::PushPromise,
748
1.32k
    ) -> Result<(), Error> {
749
1.32k
        let id = frame.stream_id();
750
1.32k
        let promised_id = frame.promised_id();
751
752
        // First, ensure that the initiating stream is still in a valid state.
753
1.32k
        let parent_key = match self.store.find_mut(&id) {
754
1.30k
            Some(stream) => {
755
                // The GOAWAY process has begun. All streams with a greater ID
756
                // than specified as part of GOAWAY should be ignored.
757
1.30k
                if id > self.actions.recv.max_stream_id() {
758
0
                    tracing::trace!(
759
0
                        "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
760
                        id,
761
0
                        self.actions.recv.max_stream_id()
762
                    );
763
0
                    return Ok(());
764
1.30k
                }
765
766
                // The stream must be receive open
767
1.30k
                if !stream.state.ensure_recv_open()? {
768
1
                    proto_err!(conn: "recv_push_promise: initiating stream is not opened");
769
1
                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
770
1.19k
                }
771
772
1.19k
                stream.key()
773
            }
774
            None => {
775
13
                proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
776
13
                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
777
            }
778
        };
779
780
        // TODO: Streams in the reserved states do not count towards the concurrency
781
        // limit. However, it seems like there should be a cap otherwise this
782
        // could grow in memory indefinitely.
783
784
        // Ensure that we can reserve streams
785
1.19k
        self.actions.recv.ensure_can_reserve()?;
786
787
        // Next, open the stream.
788
        //
789
        // If `None` is returned, then the stream is being refused. There is no
790
        // further work to be done.
791
1.19k
        if self
792
1.19k
            .actions
793
1.19k
            .recv
794
1.19k
            .open(promised_id, Open::PushPromise, &mut self.counts)?
795
1.18k
            .is_none()
796
        {
797
0
            return Ok(());
798
1.18k
        }
799
800
        // Try to handle the frame and create a corresponding key for the pushed stream
801
        // this requires a bit of indirection to make the borrow checker happy.
802
1.18k
        let child_key: Option<store::Key> = {
803
            // Create state for the stream
804
1.18k
            let stream = self.store.insert(promised_id, {
805
1.18k
                Stream::new(
806
1.18k
                    promised_id,
807
1.18k
                    self.actions.send.init_window_sz(),
808
1.18k
                    self.actions.recv.init_window_sz(),
809
                )
810
            });
811
812
1.18k
            let actions = &mut self.actions;
813
814
1.18k
            self.counts.transition(stream, |counts, stream| {
815
1.18k
                let stream_valid = actions.recv.recv_push_promise(frame, stream);
816
817
1.18k
                match stream_valid {
818
192
                    Ok(()) => Ok(Some(stream.key())),
819
                    _ => {
820
997
                        let mut send_buffer = send_buffer.inner.lock().unwrap();
821
997
                        actions
822
997
                            .reset_on_recv_stream_err(
823
997
                                &mut *send_buffer,
824
997
                                stream,
825
997
                                counts,
826
997
                                stream_valid,
827
                            )
828
997
                            .map(|()| None)
829
                    }
830
                }
831
1.18k
            })?
832
        };
833
        // If we're successful, push the headers and stream...
834
1.18k
        if let Some(child) = child_key {
835
192
            let mut ppp = self.store[parent_key].pending_push_promises.take();
836
192
            ppp.push(&mut self.store.resolve(child));
837
192
838
192
            let parent = &mut self.store.resolve(parent_key);
839
192
            parent.pending_push_promises = ppp;
840
192
            parent.notify_push();
841
997
        };
842
843
1.18k
        Ok(())
844
1.32k
    }
845
846
18.5k
    fn recv_eof<B>(
847
18.5k
        &mut self,
848
18.5k
        send_buffer: &SendBuffer<B>,
849
18.5k
        clear_pending_accept: bool,
850
18.5k
    ) -> Result<(), ()> {
851
18.5k
        let actions = &mut self.actions;
852
18.5k
        let counts = &mut self.counts;
853
18.5k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
18.5k
        let send_buffer = &mut *send_buffer;
855
856
18.5k
        if actions.conn_error.is_none() {
857
5.97k
            actions.conn_error = Some(
858
5.97k
                io::Error::new(
859
5.97k
                    io::ErrorKind::BrokenPipe,
860
5.97k
                    "connection closed because of a broken pipe",
861
5.97k
                )
862
5.97k
                .into(),
863
5.97k
            );
864
12.6k
        }
865
866
18.5k
        tracing::trace!("Streams::recv_eof");
867
868
275k
        self.store.for_each(|stream| {
869
275k
            counts.transition(stream, |counts, stream| {
870
275k
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
275k
                actions.send.handle_error(send_buffer, stream, counts);
875
275k
            })
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}::{closure#0}
Line
Count
Source
869
643
            counts.transition(stream, |counts, stream| {
870
643
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
643
                actions.send.handle_error(send_buffer, stream, counts);
875
643
            })
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}::{closure#0}
Line
Count
Source
869
274k
            counts.transition(stream, |counts, stream| {
870
274k
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
274k
                actions.send.handle_error(send_buffer, stream, counts);
875
274k
            })
876
275k
        });
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
868
643
        self.store.for_each(|stream| {
869
643
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
643
        });
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>::{closure#0}
Line
Count
Source
868
274k
        self.store.for_each(|stream| {
869
274k
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
274k
        });
877
878
18.5k
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
18.5k
        Ok(())
880
18.5k
    }
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>
Line
Count
Source
846
733
    fn recv_eof<B>(
847
733
        &mut self,
848
733
        send_buffer: &SendBuffer<B>,
849
733
        clear_pending_accept: bool,
850
733
    ) -> Result<(), ()> {
851
733
        let actions = &mut self.actions;
852
733
        let counts = &mut self.counts;
853
733
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
733
        let send_buffer = &mut *send_buffer;
855
856
733
        if actions.conn_error.is_none() {
857
733
            actions.conn_error = Some(
858
733
                io::Error::new(
859
733
                    io::ErrorKind::BrokenPipe,
860
733
                    "connection closed because of a broken pipe",
861
733
                )
862
733
                .into(),
863
733
            );
864
733
        }
865
866
733
        tracing::trace!("Streams::recv_eof");
867
868
733
        self.store.for_each(|stream| {
869
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
        });
877
878
733
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
733
        Ok(())
880
733
    }
<h2::proto::streams::streams::Inner>::recv_eof::<bytes::bytes::Bytes>
Line
Count
Source
846
17.8k
    fn recv_eof<B>(
847
17.8k
        &mut self,
848
17.8k
        send_buffer: &SendBuffer<B>,
849
17.8k
        clear_pending_accept: bool,
850
17.8k
    ) -> Result<(), ()> {
851
17.8k
        let actions = &mut self.actions;
852
17.8k
        let counts = &mut self.counts;
853
17.8k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
854
17.8k
        let send_buffer = &mut *send_buffer;
855
856
17.8k
        if actions.conn_error.is_none() {
857
5.24k
            actions.conn_error = Some(
858
5.24k
                io::Error::new(
859
5.24k
                    io::ErrorKind::BrokenPipe,
860
5.24k
                    "connection closed because of a broken pipe",
861
5.24k
                )
862
5.24k
                .into(),
863
5.24k
            );
864
12.6k
        }
865
866
17.8k
        tracing::trace!("Streams::recv_eof");
867
868
17.8k
        self.store.for_each(|stream| {
869
            counts.transition(stream, |counts, stream| {
870
                actions.recv.recv_eof(stream);
871
872
                // This handles resetting send state associated with the
873
                // stream
874
                actions.send.handle_error(send_buffer, stream, counts);
875
            })
876
        });
877
878
17.8k
        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
879
17.8k
        Ok(())
880
17.8k
    }
881
882
2.36M
    fn poll_complete<T, B>(
883
2.36M
        &mut self,
884
2.36M
        send_buffer: &SendBuffer<B>,
885
2.36M
        cx: &mut Context,
886
2.36M
        dst: &mut Codec<T, Prioritized<B>>,
887
2.36M
    ) -> Poll<io::Result<()>>
888
2.36M
    where
889
2.36M
        T: AsyncWrite + Unpin,
890
2.36M
        B: Buf,
891
    {
892
2.36M
        let mut send_buffer = send_buffer.inner.lock().unwrap();
893
2.36M
        let send_buffer = &mut *send_buffer;
894
895
        // Send WINDOW_UPDATE frames first
896
        //
897
        // TODO: It would probably be better to interleave updates w/ data
898
        // frames.
899
2.36M
        ready!(self
900
2.36M
            .actions
901
2.36M
            .recv
902
2.36M
            .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
903
904
        // Send any other pending frames
905
1.51M
        ready!(self.actions.send.poll_complete(
906
1.51M
            cx,
907
1.51M
            send_buffer,
908
1.51M
            &mut self.store,
909
1.51M
            &mut self.counts,
910
1.51M
            dst
911
1.51M
        ))?;
912
913
        // Nothing else to do, track the task
914
1.50M
        self.actions.task = Some(cx.waker().clone());
915
916
1.50M
        Poll::Ready(Ok(()))
917
2.36M
    }
918
919
84.7k
    fn send_reset<B>(
920
84.7k
        &mut self,
921
84.7k
        send_buffer: &SendBuffer<B>,
922
84.7k
        id: StreamId,
923
84.7k
        reason: Reason,
924
84.7k
    ) -> Result<(), crate::proto::error::GoAway> {
925
84.7k
        let key = match self.store.find_entry(id) {
926
24.1k
            Entry::Occupied(e) => e.key(),
927
60.5k
            Entry::Vacant(e) => {
928
                // Resetting a stream we don't know about? That could be OK...
929
                //
930
                // 1. As a server, we just received a request, but that request
931
                //    was bad, so we're resetting before even accepting it.
932
                //    This is totally fine.
933
                //
934
                // 2. The remote may have sent us a frame on new stream that
935
                //    it's *not* supposed to have done, and thus, we don't know
936
                //    the stream. In that case, sending a reset will "open" the
937
                //    stream in our store. Maybe that should be a connection
938
                //    error instead? At least for now, we need to update what
939
                //    our vision of the next stream is.
940
60.5k
                if self.counts.peer().is_local_init(id) {
941
11.4k
                    // We normally would open this stream, so update our
942
11.4k
                    // next-send-id record.
943
11.4k
                    self.actions.send.maybe_reset_next_stream_id(id);
944
49.0k
                } else {
945
49.0k
                    // We normally would recv this stream, so update our
946
49.0k
                    // next-recv-id record.
947
49.0k
                    self.actions.recv.maybe_reset_next_stream_id(id);
948
49.0k
                }
949
950
60.5k
                let stream = Stream::new(id, 0, 0);
951
952
60.5k
                e.insert(stream)
953
            }
954
        };
955
956
84.7k
        let stream = self.store.resolve(key);
957
84.7k
        let mut send_buffer = send_buffer.inner.lock().unwrap();
958
84.7k
        let send_buffer = &mut *send_buffer;
959
84.7k
        self.actions.send_reset(
960
84.7k
            stream,
961
84.7k
            reason,
962
84.7k
            Initiator::Library,
963
84.7k
            &mut self.counts,
964
84.7k
            send_buffer,
965
        )
966
84.7k
    }
967
}
968
969
impl<B> Streams<B, client::Peer>
970
where
971
    B: Buf,
972
{
973
465k
    pub fn poll_pending_open(
974
465k
        &mut self,
975
465k
        cx: &Context,
976
465k
        pending: Option<&OpaqueStreamRef>,
977
465k
    ) -> Poll<Result<(), crate::Error>> {
978
465k
        let mut me = self.inner.lock().unwrap();
979
465k
        let me = &mut *me;
980
981
465k
        me.actions.ensure_no_conn_error()?;
982
464k
        me.actions.send.ensure_next_stream_id()?;
983
984
464k
        if let Some(pending) = pending {
985
2.07k
            let mut stream = me.store.resolve(pending.key);
986
2.07k
            tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
987
2.07k
            if stream.is_pending_open {
988
1.94k
                stream.wait_send(cx);
989
1.94k
                return Poll::Pending;
990
128
            }
991
462k
        }
992
462k
        Poll::Ready(Ok(()))
993
465k
    }
994
}
995
996
impl<B, P> Streams<B, P>
997
where
998
    P: Peer,
999
{
1000
2.56M
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
2.56M
            inner,
1003
2.56M
            send_buffer,
1004
2.56M
            _p,
1005
2.56M
        } = self;
1006
2.56M
        DynStreams {
1007
2.56M
            inner,
1008
2.56M
            send_buffer,
1009
2.56M
            peer: P::r#dyn(),
1010
2.56M
        }
1011
2.56M
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::as_dyn
Line
Count
Source
1000
733
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
733
            inner,
1003
733
            send_buffer,
1004
733
            _p,
1005
733
        } = self;
1006
733
        DynStreams {
1007
733
            inner,
1008
733
            send_buffer,
1009
733
            peer: P::r#dyn(),
1010
733
        }
1011
733
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::as_dyn
Line
Count
Source
1000
2.56M
    pub fn as_dyn(&self) -> DynStreams<'_, B> {
1001
        let Self {
1002
2.56M
            inner,
1003
2.56M
            send_buffer,
1004
2.56M
            _p,
1005
2.56M
        } = self;
1006
2.56M
        DynStreams {
1007
2.56M
            inner,
1008
2.56M
            send_buffer,
1009
2.56M
            peer: P::r#dyn(),
1010
2.56M
        }
1011
2.56M
    }
1012
1013
    /// This function is safe to call multiple times.
1014
    ///
1015
    /// A `Result` is returned to avoid panicking if the mutex is poisoned.
1016
13.7k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
13.7k
        self.as_dyn().recv_eof(clear_pending_accept)
1018
13.7k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::recv_eof
Line
Count
Source
1016
733
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
733
        self.as_dyn().recv_eof(clear_pending_accept)
1018
733
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer>>::recv_eof
Line
Count
Source
1016
12.9k
    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
1017
12.9k
        self.as_dyn().recv_eof(clear_pending_accept)
1018
12.9k
    }
1019
1020
    pub(crate) fn max_send_streams(&self) -> usize {
1021
        self.inner.lock().unwrap().counts.max_send_streams()
1022
    }
1023
1024
    pub(crate) fn max_recv_streams(&self) -> usize {
1025
        self.inner.lock().unwrap().counts.max_recv_streams()
1026
    }
1027
1028
    #[cfg(feature = "unstable")]
1029
    pub fn num_active_streams(&self) -> usize {
1030
        let me = self.inner.lock().unwrap();
1031
        me.store.num_active_streams()
1032
    }
1033
1034
414
    pub fn has_streams(&self) -> bool {
1035
414
        let me = self.inner.lock().unwrap();
1036
414
        me.counts.has_streams()
1037
414
    }
1038
1039
7.10M
    pub fn has_streams_or_other_references(&self) -> bool {
1040
7.10M
        let me = self.inner.lock().unwrap();
1041
7.10M
        me.counts.has_streams() || me.refs > 1
1042
7.10M
    }
1043
1044
    #[cfg(feature = "unstable")]
1045
    pub fn num_wired_streams(&self) -> usize {
1046
        let me = self.inner.lock().unwrap();
1047
        me.store.num_wired_streams()
1048
    }
1049
}
1050
1051
// no derive because we don't need B and P to be Clone.
1052
impl<B, P> Clone for Streams<B, P>
1053
where
1054
    P: Peer,
1055
{
1056
13.7k
    fn clone(&self) -> Self {
1057
13.7k
        self.inner.lock().unwrap().refs += 1;
1058
13.7k
        Streams {
1059
13.7k
            inner: self.inner.clone(),
1060
13.7k
            send_buffer: self.send_buffer.clone(),
1061
13.7k
            _p: ::std::marker::PhantomData,
1062
13.7k
        }
1063
13.7k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::clone::Clone>::clone
Line
Count
Source
1056
733
    fn clone(&self) -> Self {
1057
733
        self.inner.lock().unwrap().refs += 1;
1058
733
        Streams {
1059
733
            inner: self.inner.clone(),
1060
733
            send_buffer: self.send_buffer.clone(),
1061
733
            _p: ::std::marker::PhantomData,
1062
733
        }
1063
733
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::clone::Clone>::clone
Line
Count
Source
1056
12.9k
    fn clone(&self) -> Self {
1057
12.9k
        self.inner.lock().unwrap().refs += 1;
1058
12.9k
        Streams {
1059
12.9k
            inner: self.inner.clone(),
1060
12.9k
            send_buffer: self.send_buffer.clone(),
1061
12.9k
            _p: ::std::marker::PhantomData,
1062
12.9k
        }
1063
12.9k
    }
1064
}
1065
1066
impl<B, P> Drop for Streams<B, P>
1067
where
1068
    P: Peer,
1069
{
1070
27.4k
    fn drop(&mut self) {
1071
27.4k
        if let Ok(mut inner) = self.inner.lock() {
1072
27.4k
            inner.refs -= 1;
1073
27.4k
            if inner.refs == 1 {
1074
13.7k
                if let Some(task) = inner.actions.task.take() {
1075
81
                    task.wake();
1076
13.6k
                }
1077
13.7k
            }
1078
0
        }
1079
27.4k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
1070
1.46k
    fn drop(&mut self) {
1071
1.46k
        if let Ok(mut inner) = self.inner.lock() {
1072
1.46k
            inner.refs -= 1;
1073
1.46k
            if inner.refs == 1 {
1074
733
                if let Some(task) = inner.actions.task.take() {
1075
0
                    task.wake();
1076
733
                }
1077
733
            }
1078
0
        }
1079
1.46k
    }
<h2::proto::streams::streams::Streams<bytes::bytes::Bytes, h2::client::Peer> as core::ops::drop::Drop>::drop
Line
Count
Source
1070
25.9k
    fn drop(&mut self) {
1071
25.9k
        if let Ok(mut inner) = self.inner.lock() {
1072
25.9k
            inner.refs -= 1;
1073
25.9k
            if inner.refs == 1 {
1074
12.9k
                if let Some(task) = inner.actions.task.take() {
1075
81
                    task.wake();
1076
12.9k
                }
1077
12.9k
            }
1078
0
        }
1079
25.9k
    }
1080
}
1081
1082
// ===== impl StreamRef =====
1083
1084
impl<B> StreamRef<B> {
1085
462k
    pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1086
462k
    where
1087
462k
        B: Buf,
1088
    {
1089
462k
        let mut me = self.opaque.inner.lock().unwrap();
1090
462k
        let me = &mut *me;
1091
1092
462k
        let stream = me.store.resolve(self.opaque.key);
1093
462k
        let actions = &mut me.actions;
1094
462k
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1095
462k
        let send_buffer = &mut *send_buffer;
1096
1097
462k
        me.counts.transition(stream, |counts, stream| {
1098
            // Create the data frame
1099
462k
            let mut frame = frame::Data::new(stream.id, data);
1100
462k
            frame.set_end_stream(end_stream);
1101
1102
            // Send the data frame
1103
462k
            actions
1104
462k
                .send
1105
462k
                .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1106
462k
        })
1107
462k
    }
1108
1109
    pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1110
        let mut me = self.opaque.inner.lock().unwrap();
1111
        let me = &mut *me;
1112
1113
        let stream = me.store.resolve(self.opaque.key);
1114
        let actions = &mut me.actions;
1115
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1116
        let send_buffer = &mut *send_buffer;
1117
1118
        me.counts.transition(stream, |counts, stream| {
1119
            // Create the trailers frame
1120
            let frame = frame::Headers::trailers(stream.id, trailers);
1121
1122
            // Send the trailers frame
1123
            actions
1124
                .send
1125
                .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1126
        })
1127
    }
1128
1129
    pub fn send_reset(&mut self, reason: Reason) {
1130
        let mut me = self.opaque.inner.lock().unwrap();
1131
        let me = &mut *me;
1132
1133
        let stream = me.store.resolve(self.opaque.key);
1134
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1135
        let send_buffer = &mut *send_buffer;
1136
1137
        match me
1138
            .actions
1139
            .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer)
1140
        {
1141
            Ok(()) => (),
1142
            Err(crate::proto::error::GoAway { .. }) => {
1143
                // this should never happen, because Initiator::User resets do
1144
                // not count toward the local limit.
1145
                // we could perhaps make this state impossible, if we made the
1146
                // initiator argument a generic, and so this could return
1147
                // Infallible instead of an impossible GoAway, but oh well.
1148
                unreachable!("Initiator::User should not error sending reset");
1149
            }
1150
        }
1151
    }
1152
1153
    pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> {
1154
        let mut me = self.opaque.inner.lock().unwrap();
1155
        let me = &mut *me;
1156
1157
        let stream = me.store.resolve(self.opaque.key);
1158
        let actions = &mut me.actions;
1159
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1160
        let send_buffer = &mut *send_buffer;
1161
1162
        me.counts.transition(stream, |counts, stream| {
1163
            // For informational responses (1xx), we need to send headers without
1164
            // changing the stream state. This allows multiple informational responses
1165
            // to be sent before the final response.
1166
1167
            // Validate that this is actually an informational response
1168
            debug_assert!(
1169
                frame.is_informational(),
1170
                "Frame must be informational after conversion from informational response"
1171
            );
1172
1173
            // Ensure the frame is not marked as end_stream for informational responses
1174
            if frame.is_end_stream() {
1175
                return Err(UserError::UnexpectedFrameType);
1176
            }
1177
1178
            // Send the interim informational headers directly to the buffer without state changes
1179
            // This bypasses the normal send_headers flow that would transition the stream state
1180
            actions.send.send_interim_informational_headers(
1181
                frame,
1182
                send_buffer,
1183
                stream,
1184
                counts,
1185
                &mut actions.task,
1186
            )
1187
        })
1188
    }
1189
1190
    pub fn send_response(
1191
        &mut self,
1192
        mut response: Response<()>,
1193
        end_of_stream: bool,
1194
    ) -> Result<(), UserError> {
1195
        // Clear before taking lock, incase extensions contain a StreamRef.
1196
        response.extensions_mut().clear();
1197
        let mut me = self.opaque.inner.lock().unwrap();
1198
        let me = &mut *me;
1199
1200
        let stream = me.store.resolve(self.opaque.key);
1201
        let actions = &mut me.actions;
1202
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1203
        let send_buffer = &mut *send_buffer;
1204
1205
        me.counts.transition(stream, |counts, stream| {
1206
            let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1207
1208
            actions
1209
                .send
1210
                .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1211
        })
1212
    }
1213
1214
    pub fn send_push_promise(
1215
        &mut self,
1216
        mut request: Request<()>,
1217
    ) -> Result<StreamRef<B>, UserError> {
1218
        // Clear before taking lock, incase extensions contain a StreamRef.
1219
        request.extensions_mut().clear();
1220
        let mut me = self.opaque.inner.lock().unwrap();
1221
        let me = &mut *me;
1222
1223
        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1224
        let send_buffer = &mut *send_buffer;
1225
1226
        let actions = &mut me.actions;
1227
        let promised_id = actions.send.reserve_local()?;
1228
1229
        let child_key = {
1230
            let mut child_stream = me.store.insert(
1231
                promised_id,
1232
                Stream::new(
1233
                    promised_id,
1234
                    actions.send.init_window_sz(),
1235
                    actions.recv.init_window_sz(),
1236
                ),
1237
            );
1238
            child_stream.state.reserve_local()?;
1239
            child_stream.is_pending_push = true;
1240
            child_stream.key()
1241
        };
1242
1243
        let pushed = {
1244
            let mut stream = me.store.resolve(self.opaque.key);
1245
1246
            let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1247
1248
            actions
1249
                .send
1250
                .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1251
        };
1252
1253
        if let Err(err) = pushed {
1254
            let mut child_stream = me.store.resolve(child_key);
1255
            child_stream.unlink();
1256
            child_stream.remove();
1257
            return Err(err);
1258
        }
1259
1260
        me.refs += 1;
1261
        let opaque =
1262
            OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1263
1264
        Ok(StreamRef {
1265
            opaque,
1266
            send_buffer: self.send_buffer.clone(),
1267
        })
1268
    }
1269
1270
    /// Called by the server after the stream is accepted. Given that clients
1271
    /// initialize streams by sending HEADERS, the request will always be
1272
    /// available.
1273
    ///
1274
    /// # Panics
1275
    ///
1276
    /// This function panics if the request isn't present.
1277
    pub fn take_request(&self) -> Request<()> {
1278
        let mut me = self.opaque.inner.lock().unwrap();
1279
        let me = &mut *me;
1280
1281
        let mut stream = me.store.resolve(self.opaque.key);
1282
        me.actions.recv.take_request(&mut stream)
1283
    }
1284
1285
    /// Called by a client to see if the current stream is pending open
1286
463k
    pub fn is_pending_open(&self) -> bool {
1287
463k
        let mut me = self.opaque.inner.lock().unwrap();
1288
463k
        me.store.resolve(self.opaque.key).is_pending_open
1289
463k
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::is_pending_open
Line
Count
Source
1286
643
    pub fn is_pending_open(&self) -> bool {
1287
643
        let mut me = self.opaque.inner.lock().unwrap();
1288
643
        me.store.resolve(self.opaque.key).is_pending_open
1289
643
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::is_pending_open
Line
Count
Source
1286
462k
    pub fn is_pending_open(&self) -> bool {
1287
462k
        let mut me = self.opaque.inner.lock().unwrap();
1288
462k
        me.store.resolve(self.opaque.key).is_pending_open
1289
462k
    }
1290
1291
    /// Request capacity to send data
1292
    pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1293
        let mut me = self.opaque.inner.lock().unwrap();
1294
        let me = &mut *me;
1295
1296
        let mut stream = me.store.resolve(self.opaque.key);
1297
1298
        me.actions
1299
            .send
1300
            .reserve_capacity(capacity, &mut stream, &mut me.counts)
1301
    }
1302
1303
    /// Returns the stream's current send capacity.
1304
0
    pub fn capacity(&self) -> WindowSize {
1305
0
        let mut me = self.opaque.inner.lock().unwrap();
1306
0
        let me = &mut *me;
1307
1308
0
        let mut stream = me.store.resolve(self.opaque.key);
1309
1310
0
        me.actions.send.capacity(&mut stream)
1311
0
    }
1312
1313
    /// Request to be notified when the stream's capacity increases
1314
0
    pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1315
0
        let mut me = self.opaque.inner.lock().unwrap();
1316
0
        let me = &mut *me;
1317
1318
0
        let mut stream = me.store.resolve(self.opaque.key);
1319
1320
0
        me.actions.send.poll_capacity(cx, &mut stream)
1321
0
    }
1322
1323
    /// Request to be notified for if a `RST_STREAM` is received for this stream.
1324
    pub(crate) fn poll_reset(
1325
        &mut self,
1326
        cx: &Context,
1327
        mode: proto::PollReset,
1328
    ) -> Poll<Result<Reason, crate::Error>> {
1329
        let mut me = self.opaque.inner.lock().unwrap();
1330
        let me = &mut *me;
1331
1332
        let mut stream = me.store.resolve(self.opaque.key);
1333
1334
        me.actions.send.poll_reset(cx, &mut stream, mode)
1335
    }
1336
1337
463k
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
463k
        self.opaque.clone()
1339
463k
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::clone_to_opaque
Line
Count
Source
1337
643
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
643
        self.opaque.clone()
1339
643
    }
<h2::proto::streams::streams::StreamRef<bytes::bytes::Bytes>>::clone_to_opaque
Line
Count
Source
1337
463k
    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1338
463k
        self.opaque.clone()
1339
463k
    }
1340
1341
    pub fn stream_id(&self) -> StreamId {
1342
        self.opaque.stream_id()
1343
    }
1344
}
1345
1346
impl<B> Clone for StreamRef<B> {
1347
    fn clone(&self) -> Self {
1348
        StreamRef {
1349
            opaque: self.opaque.clone(),
1350
            send_buffer: self.send_buffer.clone(),
1351
        }
1352
    }
1353
}
1354
1355
// ===== impl OpaqueStreamRef =====
1356
1357
impl OpaqueStreamRef {
1358
463k
    fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1359
463k
        stream.ref_inc();
1360
463k
        OpaqueStreamRef {
1361
463k
            inner,
1362
463k
            key: stream.key(),
1363
463k
        }
1364
463k
    }
1365
    /// Called by a client to check for a received response.
1366
500k
    pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1367
500k
        let mut me = self.inner.lock().unwrap();
1368
500k
        let me = &mut *me;
1369
1370
500k
        let mut stream = me.store.resolve(self.key);
1371
1372
500k
        me.actions.recv.poll_response(cx, &mut stream)
1373
500k
    }
1374
1375
    /// Called by a client to check for informational responses (1xx status codes)
1376
0
    pub fn poll_informational(
1377
0
        &mut self,
1378
0
        cx: &Context,
1379
0
    ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
1380
0
        let mut me = self.inner.lock().unwrap();
1381
0
        let me = &mut *me;
1382
1383
0
        let mut stream = me.store.resolve(self.key);
1384
1385
0
        me.actions.recv.poll_informational(cx, &mut stream)
1386
0
    }
1387
    /// Called by a client to check for a pushed request.
1388
0
    pub fn poll_pushed(
1389
0
        &mut self,
1390
0
        cx: &Context,
1391
0
    ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1392
0
        let mut me = self.inner.lock().unwrap();
1393
0
        let me = &mut *me;
1394
1395
0
        let mut stream = me.store.resolve(self.key);
1396
0
        me.actions
1397
0
            .recv
1398
0
            .poll_pushed(cx, &mut stream)
1399
0
            .map_ok(|(h, key)| {
1400
0
                me.refs += 1;
1401
0
                let opaque_ref =
1402
0
                    OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1403
0
                (h, opaque_ref)
1404
0
            })
1405
0
    }
1406
1407
0
    pub fn is_end_stream(&self) -> bool {
1408
0
        let mut me = self.inner.lock().unwrap();
1409
0
        let me = &mut *me;
1410
1411
0
        let stream = me.store.resolve(self.key);
1412
1413
0
        me.actions.recv.is_end_stream(&stream)
1414
0
    }
1415
1416
0
    pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1417
0
        let mut me = self.inner.lock().unwrap();
1418
0
        let me = &mut *me;
1419
1420
0
        let mut stream = me.store.resolve(self.key);
1421
1422
0
        me.actions.recv.poll_data(cx, &mut stream)
1423
0
    }
1424
1425
0
    pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1426
0
        let mut me = self.inner.lock().unwrap();
1427
0
        let me = &mut *me;
1428
1429
0
        let mut stream = me.store.resolve(self.key);
1430
1431
0
        me.actions.recv.poll_trailers(cx, &mut stream)
1432
0
    }
1433
1434
0
    pub(crate) fn available_recv_capacity(&self) -> isize {
1435
0
        let me = self.inner.lock().unwrap();
1436
0
        let me = &*me;
1437
1438
0
        let stream = &me.store[self.key];
1439
0
        stream.recv_flow.available().into()
1440
0
    }
1441
1442
0
    pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1443
0
        let me = self.inner.lock().unwrap();
1444
0
        let me = &*me;
1445
1446
0
        let stream = &me.store[self.key];
1447
0
        stream.in_flight_recv_data
1448
0
    }
1449
1450
    /// Releases recv capacity back to the peer. This may result in sending
1451
    /// WINDOW_UPDATE frames on both the stream and connection.
1452
0
    pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1453
0
        let mut me = self.inner.lock().unwrap();
1454
0
        let me = &mut *me;
1455
1456
0
        let mut stream = me.store.resolve(self.key);
1457
1458
0
        me.actions
1459
0
            .recv
1460
0
            .release_capacity(capacity, &mut stream, &mut me.actions.task)
1461
0
    }
1462
1463
    /// Clear the receive queue and set the status to no longer receive data frames.
1464
1.29k
    pub(crate) fn clear_recv_buffer(&mut self) {
1465
1.29k
        let mut me = self.inner.lock().unwrap();
1466
1.29k
        let me = &mut *me;
1467
1468
1.29k
        let mut stream = me.store.resolve(self.key);
1469
1.29k
        stream.is_recv = false;
1470
1.29k
        me.actions.recv.clear_recv_buffer(&mut stream);
1471
1.29k
    }
1472
1473
0
    pub fn stream_id(&self) -> StreamId {
1474
0
        self.inner.lock().unwrap().store[self.key].id
1475
0
    }
1476
}
1477
1478
impl fmt::Debug for OpaqueStreamRef {
1479
1.29k
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1480
        use std::sync::TryLockError::*;
1481
1482
1.29k
        match self.inner.try_lock() {
1483
1.29k
            Ok(me) => {
1484
1.29k
                let stream = &me.store[self.key];
1485
1.29k
                fmt.debug_struct("OpaqueStreamRef")
1486
1.29k
                    .field("stream_id", &stream.id)
1487
1.29k
                    .field("ref_count", &stream.ref_count)
1488
1.29k
                    .finish()
1489
            }
1490
0
            Err(Poisoned(_)) => fmt
1491
0
                .debug_struct("OpaqueStreamRef")
1492
0
                .field("inner", &"<Poisoned>")
1493
0
                .finish(),
1494
0
            Err(WouldBlock) => fmt
1495
0
                .debug_struct("OpaqueStreamRef")
1496
0
                .field("inner", &"<Locked>")
1497
0
                .finish(),
1498
        }
1499
1.29k
    }
1500
}
1501
1502
impl Clone for OpaqueStreamRef {
1503
465k
    fn clone(&self) -> Self {
1504
        // Increment the ref count
1505
465k
        let mut inner = self.inner.lock().unwrap();
1506
465k
        inner.store.resolve(self.key).ref_inc();
1507
465k
        inner.refs += 1;
1508
1509
465k
        OpaqueStreamRef {
1510
465k
            inner: self.inner.clone(),
1511
465k
            key: self.key,
1512
465k
        }
1513
465k
    }
1514
}
1515
1516
impl Drop for OpaqueStreamRef {
1517
928k
    fn drop(&mut self) {
1518
928k
        drop_stream_ref(&self.inner, self.key);
1519
928k
    }
1520
}
1521
1522
// TODO: Move back in fn above
1523
928k
fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1524
928k
    let mut me = match inner.lock() {
1525
928k
        Ok(inner) => inner,
1526
        Err(_) => {
1527
0
            if ::std::thread::panicking() {
1528
0
                tracing::trace!("StreamRef::drop; mutex poisoned");
1529
0
                return;
1530
            } else {
1531
0
                panic!("StreamRef::drop; mutex poisoned");
1532
            }
1533
        }
1534
    };
1535
1536
928k
    let me = &mut *me;
1537
928k
    me.refs -= 1;
1538
928k
    let mut stream = me.store.resolve(key);
1539
1540
928k
    tracing::trace!("drop_stream_ref; stream={:?}", stream);
1541
1542
    // decrement the stream's ref count by 1.
1543
928k
    stream.ref_dec();
1544
1545
928k
    let actions = &mut me.actions;
1546
1547
    // If the stream is not referenced and it is already
1548
    // closed (does not have to go through logic below
1549
    // of canceling the stream), we should notify the task
1550
    // (connection) so that it can close properly
1551
928k
    if stream.ref_count == 0 && stream.is_closed() {
1552
385k
        if let Some(task) = actions.task.take() {
1553
151
            task.wake();
1554
385k
        }
1555
543k
    }
1556
1557
928k
    me.counts.transition(stream, |counts, stream| {
1558
928k
        maybe_cancel(stream, actions, counts);
1559
1560
928k
        if stream.ref_count == 0 {
1561
            // Release any recv window back to connection, no one can access
1562
            // it anymore.
1563
463k
            actions
1564
463k
                .recv
1565
463k
                .release_closed_capacity(stream, &mut actions.task);
1566
1567
            // We won't be able to reach our push promises anymore
1568
463k
            let mut ppp = stream.pending_push_promises.take();
1569
463k
            while let Some(promise) = ppp.pop(stream.store_mut()) {
1570
185
                counts.transition(promise, |counts, stream| {
1571
185
                    maybe_cancel(stream, actions, counts);
1572
185
                });
1573
            }
1574
465k
        }
1575
928k
    });
1576
928k
}
1577
1578
929k
fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1579
929k
    if stream.is_canceled_interest() {
1580
        // Server is allowed to early respond without fully consuming the client input stream
1581
        // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
1582
        // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
1583
77.7k
        let reason = if counts.peer().is_server()
1584
0
            && stream.state.is_send_closed()
1585
0
            && stream.state.is_recv_streaming()
1586
        {
1587
0
            Reason::NO_ERROR
1588
        } else {
1589
77.7k
            Reason::CANCEL
1590
        };
1591
1592
77.7k
        actions
1593
77.7k
            .send
1594
77.7k
            .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
1595
77.7k
        actions.recv.enqueue_reset_expiration(stream, counts);
1596
851k
    }
1597
929k
}
1598
1599
// ===== impl SendBuffer =====
1600
1601
impl<B> SendBuffer<B> {
1602
13.7k
    fn new() -> Self {
1603
13.7k
        let inner = Mutex::new(Buffer::new());
1604
13.7k
        SendBuffer { inner }
1605
13.7k
    }
<h2::proto::streams::streams::SendBuffer<bytes::bytes::Bytes>>::new
Line
Count
Source
1602
733
    fn new() -> Self {
1603
733
        let inner = Mutex::new(Buffer::new());
1604
733
        SendBuffer { inner }
1605
733
    }
<h2::proto::streams::streams::SendBuffer<bytes::bytes::Bytes>>::new
Line
Count
Source
1602
12.9k
    fn new() -> Self {
1603
12.9k
        let inner = Mutex::new(Buffer::new());
1604
12.9k
        SendBuffer { inner }
1605
12.9k
    }
1606
1607
1.15k
    pub fn is_empty(&self) -> bool {
1608
1.15k
        let buf = self.inner.lock().unwrap();
1609
1.15k
        buf.is_empty()
1610
1.15k
    }
1611
}
1612
1613
// ===== impl Actions =====
1614
1615
impl Actions {
1616
84.7k
    fn send_reset<B>(
1617
84.7k
        &mut self,
1618
84.7k
        stream: store::Ptr,
1619
84.7k
        reason: Reason,
1620
84.7k
        initiator: Initiator,
1621
84.7k
        counts: &mut Counts,
1622
84.7k
        send_buffer: &mut Buffer<Frame<B>>,
1623
84.7k
    ) -> Result<(), crate::proto::error::GoAway> {
1624
84.7k
        counts.transition(stream, |counts, stream| {
1625
84.7k
            if initiator.is_library() {
1626
84.7k
                if counts.can_inc_num_local_error_resets() {
1627
84.7k
                    counts.inc_num_local_error_resets();
1628
84.7k
                } else {
1629
2
                    tracing::warn!(
1630
0
                        "locally-reset streams reached limit ({:?})",
1631
0
                        counts.max_local_error_resets().unwrap(),
1632
                    );
1633
2
                    return Err(crate::proto::error::GoAway {
1634
2
                        reason: Reason::ENHANCE_YOUR_CALM,
1635
2
                        debug_data: "too_many_internal_resets".into(),
1636
2
                    });
1637
                }
1638
0
            }
1639
1640
84.7k
            self.send.send_reset(
1641
84.7k
                reason,
1642
84.7k
                initiator,
1643
84.7k
                send_buffer,
1644
84.7k
                stream,
1645
84.7k
                counts,
1646
84.7k
                &mut self.task,
1647
            );
1648
84.7k
            self.recv.enqueue_reset_expiration(stream, counts);
1649
            // if a RecvStream is parked, ensure it's notified
1650
84.7k
            stream.notify_recv();
1651
1652
84.7k
            Ok(())
1653
84.7k
        })
1654
84.7k
    }
1655
1656
26.9k
    fn reset_on_recv_stream_err<B>(
1657
26.9k
        &mut self,
1658
26.9k
        buffer: &mut Buffer<Frame<B>>,
1659
26.9k
        stream: &mut store::Ptr,
1660
26.9k
        counts: &mut Counts,
1661
26.9k
        res: Result<(), Error>,
1662
26.9k
    ) -> Result<(), Error> {
1663
1.14k
        if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1664
1.10k
            debug_assert_eq!(stream_id, stream.id);
1665
1666
1.10k
            if counts.can_inc_num_local_error_resets() {
1667
1.10k
                counts.inc_num_local_error_resets();
1668
1669
                // Reset the stream.
1670
1.10k
                self.send
1671
1.10k
                    .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1672
1.10k
                self.recv.enqueue_reset_expiration(stream, counts);
1673
                // if a RecvStream is parked, ensure it's notified
1674
1.10k
                stream.notify_recv();
1675
1.10k
                Ok(())
1676
            } else {
1677
0
                tracing::warn!(
1678
0
                    "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})",
1679
0
                    counts.max_local_error_resets().unwrap(),
1680
                );
1681
0
                Err(Error::library_go_away_data(
1682
0
                    Reason::ENHANCE_YOUR_CALM,
1683
0
                    "too_many_internal_resets",
1684
0
                ))
1685
            }
1686
        } else {
1687
25.8k
            res
1688
        }
1689
26.9k
    }
1690
1691
1.27k
    fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1692
1.27k
        if peer.is_local_init(id) {
1693
806
            self.send.ensure_not_idle(id)
1694
        } else {
1695
473
            self.recv.ensure_not_idle(id)
1696
        }
1697
1.27k
    }
1698
1699
928k
    fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1700
928k
        if let Some(ref err) = self.conn_error {
1701
107
            Err(err.clone())
1702
        } else {
1703
928k
            Ok(())
1704
        }
1705
928k
    }
1706
1707
    /// Check if we possibly could have processed and since forgotten this stream.
1708
    ///
1709
    /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1710
    /// the stream to free up memory. It's possible that the remote peer had
1711
    /// frames in-flight, and by the time we receive them, our own state is
1712
    /// gone. We *could* tear everything down by sending a GOAWAY, but it
1713
    /// is more likely to be latency/memory constraints that caused this,
1714
    /// and not a bad actor. So be less catastrophic, the spec allows
1715
    /// us to send another RST_STREAM of STREAM_CLOSED.
1716
51.4k
    fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1717
51.4k
        if id.is_zero() {
1718
0
            return false;
1719
51.4k
        }
1720
51.4k
        if peer.is_local_init(id) {
1721
6.63k
            self.send.may_have_created_stream(id)
1722
        } else {
1723
44.8k
            self.recv.may_have_created_stream(id)
1724
        }
1725
51.4k
    }
1726
1727
18.5k
    fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1728
18.5k
        self.recv.clear_queues(clear_pending_accept, store, counts);
1729
18.5k
        self.send.clear_queues(store, counts);
1730
18.5k
    }
1731
}