Coverage Report

Created: 2025-12-31 06:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/store.rs
Line
Count
Source
1
use super::*;
2
3
use indexmap::{self, IndexMap};
4
5
use std::convert::Infallible;
6
use std::fmt;
7
use std::marker::PhantomData;
8
use std::ops;
9
10
/// Storage for streams
11
#[derive(Debug)]
12
pub(super) struct Store {
13
    slab: slab::Slab<Stream>,
14
    ids: IndexMap<StreamId, SlabIndex>,
15
}
16
17
/// "Pointer" to an entry in the store
18
pub(super) struct Ptr<'a> {
19
    key: Key,
20
    store: &'a mut Store,
21
}
22
23
/// References an entry in the store.
24
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25
pub(crate) struct Key {
26
    index: SlabIndex,
27
    /// Keep the stream ID in the key as an ABA guard, since slab indices
28
    /// could be re-used with a new stream.
29
    stream_id: StreamId,
30
}
31
32
// We can never have more than `StreamId::MAX` streams in the store,
33
// so we can save a smaller index (u32 vs usize).
34
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35
struct SlabIndex(u32);
36
37
pub(super) struct Queue<N> {
38
    indices: Option<store::Indices>,
39
    _p: PhantomData<N>,
40
}
41
42
pub(super) trait Next {
43
    fn next(stream: &Stream) -> Option<Key>;
44
45
    fn set_next(stream: &mut Stream, key: Option<Key>);
46
47
    fn take_next(stream: &mut Stream) -> Option<Key>;
48
49
    fn is_queued(stream: &Stream) -> bool;
50
51
    fn set_queued(stream: &mut Stream, val: bool);
52
}
53
54
/// A linked list
55
#[derive(Debug, Clone, Copy)]
56
struct Indices {
57
    pub head: Key,
58
    pub tail: Key,
59
}
60
61
pub(super) enum Entry<'a> {
62
    Occupied(OccupiedEntry<'a>),
63
    Vacant(VacantEntry<'a>),
64
}
65
66
pub(super) struct OccupiedEntry<'a> {
67
    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
68
}
69
70
pub(super) struct VacantEntry<'a> {
71
    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
72
    slab: &'a mut slab::Slab<Stream>,
73
}
74
75
pub(super) trait Resolve {
76
    fn resolve(&mut self, key: Key) -> Ptr<'_>;
77
}
78
79
// ===== impl Store =====
80
81
impl Store {
82
13.7k
    pub fn new() -> Self {
83
13.7k
        Store {
84
13.7k
            slab: slab::Slab::new(),
85
13.7k
            ids: IndexMap::new(),
86
13.7k
        }
87
13.7k
    }
88
89
76.2k
    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<'_>> {
90
76.2k
        let index = match self.ids.get(id) {
91
25.5k
            Some(key) => *key,
92
50.6k
            None => return None,
93
        };
94
95
25.5k
        Some(Ptr {
96
25.5k
            key: Key {
97
25.5k
                index,
98
25.5k
                stream_id: *id,
99
25.5k
            },
100
25.5k
            store: self,
101
25.5k
        })
102
76.2k
    }
103
104
464k
    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr<'_> {
105
464k
        let index = SlabIndex(self.slab.insert(val) as u32);
106
464k
        assert!(self.ids.insert(id, index).is_none());
107
108
464k
        Ptr {
109
464k
            key: Key {
110
464k
                index,
111
464k
                stream_id: id,
112
464k
            },
113
464k
            store: self,
114
464k
        }
115
464k
    }
116
117
93.6k
    pub fn find_entry(&mut self, id: StreamId) -> Entry<'_> {
118
        use self::indexmap::map::Entry::*;
119
120
93.6k
        match self.ids.entry(id) {
121
30.9k
            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
122
62.7k
            Vacant(e) => Entry::Vacant(VacantEntry {
123
62.7k
                ids: e,
124
62.7k
                slab: &mut self.slab,
125
62.7k
            }),
126
        }
127
93.6k
    }
128
129
    #[allow(clippy::blocks_in_conditions)]
130
32.1k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
32.1k
    where
132
32.1k
        F: FnMut(Ptr),
133
    {
134
1.46M
        match self.try_for_each(|ptr| {
135
1.46M
            f(ptr);
136
1.46M
            Ok::<_, Infallible>(())
137
1.46M
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
643
        match self.try_for_each(|ptr| {
135
643
            f(ptr);
136
643
            Ok::<_, Infallible>(())
137
643
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
200k
        match self.try_for_each(|ptr| {
135
200k
            f(ptr);
136
200k
            Ok::<_, Infallible>(())
137
200k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
991k
        match self.try_for_each(|ptr| {
135
991k
            f(ptr);
136
991k
            Ok::<_, Infallible>(())
137
991k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
274k
        match self.try_for_each(|ptr| {
135
274k
            f(ptr);
136
274k
            Ok::<_, Infallible>(())
137
274k
        }) {
138
32.1k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
32.1k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
733
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
733
    where
132
733
        F: FnMut(Ptr),
133
    {
134
733
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
733
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
733
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
7.55k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
7.55k
    where
132
7.55k
        F: FnMut(Ptr),
133
    {
134
7.55k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
7.55k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
7.55k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
5.98k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
5.98k
    where
132
5.98k
        F: FnMut(Ptr),
133
    {
134
5.98k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
5.98k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
5.98k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
17.8k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
17.8k
    where
132
17.8k
        F: FnMut(Ptr),
133
    {
134
17.8k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
17.8k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
17.8k
    }
143
144
34.9k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
34.9k
    where
146
34.9k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
34.9k
        let mut len = self.ids.len();
149
34.9k
        let mut i = 0;
150
151
1.75M
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
1.72M
            let (stream_id, index) = {
154
1.72M
                let entry = self.ids.get_index(i).unwrap();
155
1.72M
                (*entry.0, *entry.1)
156
1.72M
            };
157
158
1.72M
            f(Ptr {
159
1.72M
                key: Key { index, stream_id },
160
1.72M
                store: self,
161
1.72M
            })?;
162
163
            // TODO: This logic probably could be better...
164
1.72M
            let new_len = self.ids.len();
165
166
1.72M
            if new_len < len {
167
472k
                debug_assert!(new_len == len - 1);
168
472k
                len -= 1;
169
1.25M
            } else {
170
1.25M
                i += 1;
171
1.25M
            }
172
        }
173
174
34.9k
        Ok(())
175
34.9k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#0}, h2::proto::error::Error>
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#1}, h2::proto::error::Error>
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
733
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
733
    where
146
733
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
733
        let mut len = self.ids.len();
149
733
        let mut i = 0;
150
151
1.37k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
643
            let (stream_id, index) = {
154
643
                let entry = self.ids.get_index(i).unwrap();
155
643
                (*entry.0, *entry.1)
156
643
            };
157
158
643
            f(Ptr {
159
643
                key: Key { index, stream_id },
160
643
                store: self,
161
643
            })?;
162
163
            // TODO: This logic probably could be better...
164
643
            let new_len = self.ids.len();
165
166
643
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
643
            } else {
170
643
                i += 1;
171
643
            }
172
        }
173
174
733
        Ok(())
175
733
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
7.55k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
7.55k
    where
146
7.55k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
7.55k
        let mut len = self.ids.len();
149
7.55k
        let mut i = 0;
150
151
207k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
200k
            let (stream_id, index) = {
154
200k
                let entry = self.ids.get_index(i).unwrap();
155
200k
                (*entry.0, *entry.1)
156
200k
            };
157
158
200k
            f(Ptr {
159
200k
                key: Key { index, stream_id },
160
200k
                store: self,
161
200k
            })?;
162
163
            // TODO: This logic probably could be better...
164
200k
            let new_len = self.ids.len();
165
166
200k
            if new_len < len {
167
183k
                debug_assert!(new_len == len - 1);
168
183k
                len -= 1;
169
16.8k
            } else {
170
16.8k
                i += 1;
171
16.8k
            }
172
        }
173
174
7.55k
        Ok(())
175
7.55k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
5.98k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
5.98k
    where
146
5.98k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
5.98k
        let mut len = self.ids.len();
149
5.98k
        let mut i = 0;
150
151
997k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
991k
            let (stream_id, index) = {
154
991k
                let entry = self.ids.get_index(i).unwrap();
155
991k
                (*entry.0, *entry.1)
156
991k
            };
157
158
991k
            f(Ptr {
159
991k
                key: Key { index, stream_id },
160
991k
                store: self,
161
991k
            })?;
162
163
            // TODO: This logic probably could be better...
164
991k
            let new_len = self.ids.len();
165
166
991k
            if new_len < len {
167
65.2k
                debug_assert!(new_len == len - 1);
168
65.2k
                len -= 1;
169
925k
            } else {
170
925k
                i += 1;
171
925k
            }
172
        }
173
174
5.98k
        Ok(())
175
5.98k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
17.8k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
17.8k
    where
146
17.8k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
17.8k
        let mut len = self.ids.len();
149
17.8k
        let mut i = 0;
150
151
292k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
274k
            let (stream_id, index) = {
154
274k
                let entry = self.ids.get_index(i).unwrap();
155
274k
                (*entry.0, *entry.1)
156
274k
            };
157
158
274k
            f(Ptr {
159
274k
                key: Key { index, stream_id },
160
274k
                store: self,
161
274k
            })?;
162
163
            // TODO: This logic probably could be better...
164
274k
            let new_len = self.ids.len();
165
166
274k
            if new_len < len {
167
224k
                debug_assert!(new_len == len - 1);
168
224k
                len -= 1;
169
50.3k
            } else {
170
50.3k
                i += 1;
171
50.3k
            }
172
        }
173
174
17.8k
        Ok(())
175
17.8k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#0}, h2::proto::error::Error>
Line
Count
Source
144
1.62k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.62k
    where
146
1.62k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.62k
        let mut len = self.ids.len();
149
1.62k
        let mut i = 0;
150
151
142k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
141k
            let (stream_id, index) = {
154
141k
                let entry = self.ids.get_index(i).unwrap();
155
141k
                (*entry.0, *entry.1)
156
141k
            };
157
158
141k
            f(Ptr {
159
141k
                key: Key { index, stream_id },
160
141k
                store: self,
161
141k
            })?;
162
163
            // TODO: This logic probably could be better...
164
141k
            let new_len = self.ids.len();
165
166
141k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
141k
            } else {
170
141k
                i += 1;
171
141k
            }
172
        }
173
174
1.62k
        Ok(())
175
1.62k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#1}, h2::proto::error::Error>
Line
Count
Source
144
1.22k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.22k
    where
146
1.22k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.22k
        let mut len = self.ids.len();
149
1.22k
        let mut i = 0;
150
151
116k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
115k
            let (stream_id, index) = {
154
115k
                let entry = self.ids.get_index(i).unwrap();
155
115k
                (*entry.0, *entry.1)
156
115k
            };
157
158
115k
            f(Ptr {
159
115k
                key: Key { index, stream_id },
160
115k
                store: self,
161
115k
            })?;
162
163
            // TODO: This logic probably could be better...
164
115k
            let new_len = self.ids.len();
165
166
115k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
115k
            } else {
170
115k
                i += 1;
171
115k
            }
172
        }
173
174
1.22k
        Ok(())
175
1.22k
    }
176
}
177
178
impl Resolve for Store {
179
6.15M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
180
6.15M
        Ptr { key, store: self }
181
6.15M
    }
182
}
183
184
impl ops::Index<Key> for Store {
185
    type Output = Stream;
186
187
36.5M
    fn index(&self, key: Key) -> &Self::Output {
188
36.5M
        self.slab
189
36.5M
            .get(key.index.0 as usize)
190
36.5M
            .filter(|s| s.id == key.stream_id)
191
36.5M
            .unwrap_or_else(|| {
192
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
193
            })
194
36.5M
    }
195
}
196
197
impl ops::IndexMut<Key> for Store {
198
16.6M
    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199
16.6M
        self.slab
200
16.6M
            .get_mut(key.index.0 as usize)
201
16.6M
            .filter(|s| s.id == key.stream_id)
202
16.6M
            .unwrap_or_else(|| {
203
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
204
            })
205
16.6M
    }
206
}
207
208
impl Store {
209
    #[cfg(feature = "unstable")]
210
0
    pub fn num_active_streams(&self) -> usize {
211
0
        self.ids.len()
212
0
    }
213
214
    #[cfg(feature = "unstable")]
215
0
    pub fn num_wired_streams(&self) -> usize {
216
0
        self.slab.len()
217
0
    }
218
}
219
220
// While running h2 unit/integration tests, enable this debug assertion.
221
//
222
// In practice, we don't need to ensure this. But the integration tests
223
// help to make sure we've cleaned up in cases where we could (like, the
224
// runtime isn't suddenly dropping the task for unknown reasons).
225
#[cfg(feature = "unstable")]
226
impl Drop for Store {
227
13.7k
    fn drop(&mut self) {
228
        use std::thread;
229
230
13.7k
        if !thread::panicking() {
231
13.7k
            debug_assert!(self.slab.is_empty());
232
0
        }
233
13.7k
    }
234
}
235
236
// ===== impl Queue =====
237
238
impl<N> Queue<N>
239
where
240
    N: Next,
241
{
242
607k
    pub fn new() -> Self {
243
607k
        Queue {
244
607k
            indices: None,
245
607k
            _p: PhantomData,
246
607k
        }
247
607k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::new
Line
Count
Source
242
539k
    pub fn new() -> Self {
243
539k
        Queue {
244
539k
            indices: None,
245
539k
            _p: PhantomData,
246
539k
        }
247
539k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::new
Line
Count
Source
242
13.7k
    pub fn new() -> Self {
243
13.7k
        Queue {
244
13.7k
            indices: None,
245
13.7k
            _p: PhantomData,
246
13.7k
        }
247
13.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::new
Line
Count
Source
242
13.7k
    pub fn new() -> Self {
243
13.7k
        Queue {
244
13.7k
            indices: None,
245
13.7k
            _p: PhantomData,
246
13.7k
        }
247
13.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::new
Line
Count
Source
242
13.7k
    pub fn new() -> Self {
243
13.7k
        Queue {
244
13.7k
            indices: None,
245
13.7k
            _p: PhantomData,
246
13.7k
        }
247
13.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::new
Line
Count
Source
242
13.7k
    pub fn new() -> Self {
243
13.7k
        Queue {
244
13.7k
            indices: None,
245
13.7k
            _p: PhantomData,
246
13.7k
        }
247
13.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::new
Line
Count
Source
242
13.7k
    pub fn new() -> Self {
243
13.7k
        Queue {
244
13.7k
            indices: None,
245
13.7k
            _p: PhantomData,
246
13.7k
        }
247
13.7k
    }
248
249
463k
    pub fn take(&mut self) -> Self {
250
463k
        Queue {
251
463k
            indices: self.indices.take(),
252
463k
            _p: PhantomData,
253
463k
        }
254
463k
    }
255
256
    /// Queue the stream.
257
    ///
258
    /// If the stream is already contained by the list, return `false`.
259
1.40M
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
1.40M
        tracing::trace!("Queue::push_back");
261
262
1.40M
        if N::is_queued(stream) {
263
362k
            tracing::trace!(" -> already queued");
264
362k
            return false;
265
1.04M
        }
266
267
1.04M
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
1.04M
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
1.04M
        match self.indices {
274
1.02M
            Some(ref mut idxs) => {
275
1.02M
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
1.02M
                let key = stream.key();
279
1.02M
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
1.02M
                idxs.tail = stream.key();
283
            }
284
            None => {
285
17.7k
                tracing::trace!(" -> first entry");
286
17.7k
                self.indices = Some(store::Indices {
287
17.7k
                    head: stream.key(),
288
17.7k
                    tail: stream.key(),
289
17.7k
                });
290
            }
291
        }
292
293
1.04M
        true
294
1.40M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::push
Line
Count
Source
259
192
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
192
        tracing::trace!("Queue::push_back");
261
262
192
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
192
        }
266
267
192
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
192
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
192
        match self.indices {
274
33
            Some(ref mut idxs) => {
275
33
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
33
                let key = stream.key();
279
33
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
33
                idxs.tail = stream.key();
283
            }
284
            None => {
285
159
                tracing::trace!(" -> first entry");
286
159
                self.indices = Some(store::Indices {
287
159
                    head: stream.key(),
288
159
                    tail: stream.key(),
289
159
                });
290
            }
291
        }
292
293
192
        true
294
192
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::push
Line
Count
Source
259
51.8k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
51.8k
        tracing::trace!("Queue::push_back");
261
262
51.8k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
51.8k
        }
266
267
51.8k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
51.8k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
51.8k
        match self.indices {
274
46.5k
            Some(ref mut idxs) => {
275
46.5k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
46.5k
                let key = stream.key();
279
46.5k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
46.5k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
5.27k
                tracing::trace!(" -> first entry");
286
5.27k
                self.indices = Some(store::Indices {
287
5.27k
                    head: stream.key(),
288
5.27k
                    tail: stream.key(),
289
5.27k
                });
290
            }
291
        }
292
293
51.8k
        true
294
51.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::push
Line
Count
Source
259
283k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
283k
        tracing::trace!("Queue::push_back");
261
262
283k
        if N::is_queued(stream) {
263
37.5k
            tracing::trace!(" -> already queued");
264
37.5k
            return false;
265
246k
        }
266
267
246k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
246k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
246k
        match self.indices {
274
243k
            Some(ref mut idxs) => {
275
243k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
243k
                let key = stream.key();
279
243k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
243k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
2.24k
                tracing::trace!(" -> first entry");
286
2.24k
                self.indices = Some(store::Indices {
287
2.24k
                    head: stream.key(),
288
2.24k
                    tail: stream.key(),
289
2.24k
                });
290
            }
291
        }
292
293
246k
        true
294
283k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::push
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::push
Line
Count
Source
259
463k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
463k
        tracing::trace!("Queue::push_back");
261
262
463k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
463k
        }
266
267
463k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
463k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
463k
        match self.indices {
274
458k
            Some(ref mut idxs) => {
275
458k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
458k
                let key = stream.key();
279
458k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
458k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.68k
                tracing::trace!(" -> first entry");
286
4.68k
                self.indices = Some(store::Indices {
287
4.68k
                    head: stream.key(),
288
4.68k
                    tail: stream.key(),
289
4.68k
                });
290
            }
291
        }
292
293
463k
        true
294
463k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push
Line
Count
Source
259
605k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
605k
        tracing::trace!("Queue::push_back");
261
262
605k
        if N::is_queued(stream) {
263
325k
            tracing::trace!(" -> already queued");
264
325k
            return false;
265
280k
        }
266
267
280k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
280k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
280k
        match self.indices {
274
274k
            Some(ref mut idxs) => {
275
274k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
274k
                let key = stream.key();
279
274k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
274k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
5.36k
                tracing::trace!(" -> first entry");
286
5.36k
                self.indices = Some(store::Indices {
287
5.36k
                    head: stream.key(),
288
5.36k
                    tail: stream.key(),
289
5.36k
                });
290
            }
291
        }
292
293
280k
        true
294
605k
    }
295
296
    /// Queue the stream
297
    ///
298
    /// If the stream is already contained by the list, return `false`.
299
191k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
191k
        tracing::trace!("Queue::push_front");
301
302
191k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
191k
        }
306
307
191k
        N::set_queued(stream, true);
308
309
        // The next pointer shouldn't be set
310
191k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
191k
        match self.indices {
314
189k
            Some(ref mut idxs) => {
315
189k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
189k
                let head_key = stream.resolve(idxs.head).key();
319
189k
                N::set_next(stream, Some(head_key));
320
321
                // Update the head pointer
322
189k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.60k
                tracing::trace!(" -> first entry");
326
1.60k
                self.indices = Some(store::Indices {
327
1.60k
                    head: stream.key(),
328
1.60k
                    tail: stream.key(),
329
1.60k
                });
330
            }
331
        }
332
333
191k
        true
334
191k
    }
335
336
6.38M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
6.38M
    where
338
6.38M
        R: Resolve,
339
    {
340
6.38M
        if let Some(mut idxs) = self.indices {
341
1.23M
            let mut stream = store.resolve(idxs.head);
342
343
1.23M
            if idxs.head == idxs.tail {
344
19.3k
                assert!(N::next(&stream).is_none());
345
19.3k
                self.indices = None;
346
1.21M
            } else {
347
1.21M
                idxs.head = N::take_next(&mut stream).unwrap();
348
1.21M
                self.indices = Some(idxs);
349
1.21M
            }
350
351
1.23M
            debug_assert!(N::is_queued(&stream));
352
1.23M
            N::set_queued(&mut stream, false);
353
354
1.23M
            return Some(stream);
355
5.14M
        }
356
357
5.14M
        None
358
6.38M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
477k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
477k
    where
338
477k
        R: Resolve,
339
    {
340
477k
        if let Some(mut idxs) = self.indices {
341
185
            let mut stream = store.resolve(idxs.head);
342
343
185
            if idxs.head == idxs.tail {
344
156
                assert!(N::next(&stream).is_none());
345
156
                self.indices = None;
346
29
            } else {
347
29
                idxs.head = N::take_next(&mut stream).unwrap();
348
29
                self.indices = Some(idxs);
349
29
            }
350
351
185
            debug_assert!(N::is_queued(&stream));
352
185
            N::set_queued(&mut stream, false);
353
354
185
            return Some(stream);
355
477k
        }
356
357
477k
        None
358
477k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
70.4k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
70.4k
    where
338
70.4k
        R: Resolve,
339
    {
340
70.4k
        if let Some(mut idxs) = self.indices {
341
51.8k
            let mut stream = store.resolve(idxs.head);
342
343
51.8k
            if idxs.head == idxs.tail {
344
5.27k
                assert!(N::next(&stream).is_none());
345
5.27k
                self.indices = None;
346
46.5k
            } else {
347
46.5k
                idxs.head = N::take_next(&mut stream).unwrap();
348
46.5k
                self.indices = Some(idxs);
349
46.5k
            }
350
351
51.8k
            debug_assert!(N::is_queued(&stream));
352
51.8k
            N::set_queued(&mut stream, false);
353
354
51.8k
            return Some(stream);
355
18.5k
        }
356
357
18.5k
        None
358
70.4k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Ptr>
Line
Count
Source
336
189k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
189k
    where
338
189k
        R: Resolve,
339
    {
340
189k
        if let Some(mut idxs) = self.indices {
341
167k
            let mut stream = store.resolve(idxs.head);
342
343
167k
            if idxs.head == idxs.tail {
344
1.60k
                assert!(N::next(&stream).is_none());
345
1.60k
                self.indices = None;
346
165k
            } else {
347
165k
                idxs.head = N::take_next(&mut stream).unwrap();
348
165k
                self.indices = Some(idxs);
349
165k
            }
350
351
167k
            debug_assert!(N::is_queued(&stream));
352
167k
            N::set_queued(&mut stream, false);
353
354
167k
            return Some(stream);
355
21.8k
        }
356
357
21.8k
        None
358
189k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
98.9k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
98.9k
    where
338
98.9k
        R: Resolve,
339
    {
340
98.9k
        if let Some(mut idxs) = self.indices {
341
78.6k
            let mut stream = store.resolve(idxs.head);
342
343
78.6k
            if idxs.head == idxs.tail {
344
649
                assert!(N::next(&stream).is_none());
345
649
                self.indices = None;
346
77.9k
            } else {
347
77.9k
                idxs.head = N::take_next(&mut stream).unwrap();
348
77.9k
                self.indices = Some(idxs);
349
77.9k
            }
350
351
78.6k
            debug_assert!(N::is_queued(&stream));
352
78.6k
            N::set_queued(&mut stream, false);
353
354
78.6k
            return Some(stream);
355
20.3k
        }
356
357
20.3k
        None
358
98.9k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
1.53M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
1.53M
    where
338
1.53M
        R: Resolve,
339
    {
340
1.53M
        if let Some(mut idxs) = self.indices {
341
0
            let mut stream = store.resolve(idxs.head);
342
343
0
            if idxs.head == idxs.tail {
344
0
                assert!(N::next(&stream).is_none());
345
0
                self.indices = None;
346
0
            } else {
347
0
                idxs.head = N::take_next(&mut stream).unwrap();
348
0
                self.indices = Some(idxs);
349
0
            }
350
351
0
            debug_assert!(N::is_queued(&stream));
352
0
            N::set_queued(&mut stream, false);
353
354
0
            return Some(stream);
355
1.53M
        }
356
357
1.53M
        None
358
1.53M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
2.01M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.01M
    where
338
2.01M
        R: Resolve,
339
    {
340
2.01M
        if let Some(mut idxs) = self.indices {
341
463k
            let mut stream = store.resolve(idxs.head);
342
343
463k
            if idxs.head == idxs.tail {
344
4.68k
                assert!(N::next(&stream).is_none());
345
4.68k
                self.indices = None;
346
458k
            } else {
347
458k
                idxs.head = N::take_next(&mut stream).unwrap();
348
458k
                self.indices = Some(idxs);
349
458k
            }
350
351
463k
            debug_assert!(N::is_queued(&stream));
352
463k
            N::set_queued(&mut stream, false);
353
354
463k
            return Some(stream);
355
1.54M
        }
356
357
1.54M
        None
358
2.01M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
2.00M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.00M
    where
338
2.00M
        R: Resolve,
339
    {
340
2.00M
        if let Some(mut idxs) = self.indices {
341
471k
            let mut stream = store.resolve(idxs.head);
342
343
471k
            if idxs.head == idxs.tail {
344
6.97k
                assert!(N::next(&stream).is_none());
345
6.97k
                self.indices = None;
346
464k
            } else {
347
464k
                idxs.head = N::take_next(&mut stream).unwrap();
348
464k
                self.indices = Some(idxs);
349
464k
            }
350
351
471k
            debug_assert!(N::is_queued(&stream));
352
471k
            N::set_queued(&mut stream, false);
353
354
471k
            return Some(stream);
355
1.52M
        }
356
357
1.52M
        None
358
2.00M
    }
359
360
2.46M
    pub fn is_empty(&self) -> bool {
361
2.46M
        self.indices.is_none()
362
2.46M
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::is_empty
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::is_empty
Line
Count
Source
360
2.46M
    pub fn is_empty(&self) -> bool {
361
2.46M
        self.indices.is_none()
362
2.46M
    }
363
364
2.15M
    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365
2.15M
    where
366
2.15M
        R: Resolve,
367
2.15M
        F: Fn(&Stream) -> bool,
368
    {
369
2.15M
        if let Some(idxs) = self.indices {
370
2.15M
            let should_pop = f(&store.resolve(idxs.head));
371
2.15M
            if should_pop {
372
823
                return self.pop(store);
373
2.15M
            }
374
55
        }
375
376
2.15M
        None
377
2.15M
    }
378
}
379
380
impl<N> fmt::Debug for Queue<N> {
381
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382
0
        f.debug_struct("Queue")
383
0
            .field("indices", &self.indices)
384
            // skip phantom data
385
0
            .finish()
386
0
    }
387
}
388
389
// ===== impl Ptr =====
390
391
impl<'a> Ptr<'a> {
392
    /// Returns the Key associated with the stream
393
3.02M
    pub fn key(&self) -> Key {
394
3.02M
        self.key
395
3.02M
    }
396
397
463k
    pub fn store_mut(&mut self) -> &mut Store {
398
463k
        self.store
399
463k
    }
400
401
    /// Remove the stream from the store
402
523k
    pub fn remove(self) -> StreamId {
403
        // The stream must have been unlinked before this point
404
523k
        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
405
406
        // Remove the stream state
407
523k
        let stream = self.store.slab.remove(self.key.index.0 as usize);
408
523k
        assert_eq!(stream.id, self.key.stream_id);
409
523k
        stream.id
410
523k
    }
411
412
    /// Remove the StreamId -> stream state association.
413
    ///
414
    /// This will effectively remove the stream as far as the H2 protocol is
415
    /// concerned.
416
1.42M
    pub fn unlink(&mut self) {
417
1.42M
        let id = self.key.stream_id;
418
1.42M
        self.store.ids.swap_remove(&id);
419
1.42M
    }
420
}
421
422
impl<'a> Resolve for Ptr<'a> {
423
1.38M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
424
1.38M
        Ptr {
425
1.38M
            key,
426
1.38M
            store: &mut *self.store,
427
1.38M
        }
428
1.38M
    }
429
}
430
431
impl<'a> ops::Deref for Ptr<'a> {
432
    type Target = Stream;
433
434
36.5M
    fn deref(&self) -> &Stream {
435
36.5M
        &self.store[self.key]
436
36.5M
    }
437
}
438
439
impl<'a> ops::DerefMut for Ptr<'a> {
440
16.6M
    fn deref_mut(&mut self) -> &mut Stream {
441
16.6M
        &mut self.store[self.key]
442
16.6M
    }
443
}
444
445
impl<'a> fmt::Debug for Ptr<'a> {
446
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
447
0
        (**self).fmt(fmt)
448
0
    }
449
}
450
451
// ===== impl OccupiedEntry =====
452
453
impl<'a> OccupiedEntry<'a> {
454
30.9k
    pub fn key(&self) -> Key {
455
30.9k
        let stream_id = *self.ids.key();
456
30.9k
        let index = *self.ids.get();
457
30.9k
        Key { index, stream_id }
458
30.9k
    }
459
}
460
461
// ===== impl VacantEntry =====
462
463
impl<'a> VacantEntry<'a> {
464
60.5k
    pub fn insert(self, value: Stream) -> Key {
465
        // Insert the value in the slab
466
60.5k
        let stream_id = value.id;
467
60.5k
        let index = SlabIndex(self.slab.insert(value) as u32);
468
469
        // Insert the handle in the ID map
470
60.5k
        self.ids.insert(index);
471
472
60.5k
        Key { index, stream_id }
473
60.5k
    }
474
}