Coverage Report

Created: 2025-11-24 06:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/store.rs
Line
Count
Source
1
use super::*;
2
3
use indexmap::{self, IndexMap};
4
5
use std::convert::Infallible;
6
use std::fmt;
7
use std::marker::PhantomData;
8
use std::ops;
9
10
/// Storage for streams
11
#[derive(Debug)]
12
pub(super) struct Store {
13
    slab: slab::Slab<Stream>,
14
    ids: IndexMap<StreamId, SlabIndex>,
15
}
16
17
/// "Pointer" to an entry in the store
18
pub(super) struct Ptr<'a> {
19
    key: Key,
20
    store: &'a mut Store,
21
}
22
23
/// References an entry in the store.
24
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25
pub(crate) struct Key {
26
    index: SlabIndex,
27
    /// Keep the stream ID in the key as an ABA guard, since slab indices
28
    /// could be re-used with a new stream.
29
    stream_id: StreamId,
30
}
31
32
// We can never have more than `StreamId::MAX` streams in the store,
33
// so we can save a smaller index (u32 vs usize).
34
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35
struct SlabIndex(u32);
36
37
pub(super) struct Queue<N> {
38
    indices: Option<store::Indices>,
39
    _p: PhantomData<N>,
40
}
41
42
pub(super) trait Next {
43
    fn next(stream: &Stream) -> Option<Key>;
44
45
    fn set_next(stream: &mut Stream, key: Option<Key>);
46
47
    fn take_next(stream: &mut Stream) -> Option<Key>;
48
49
    fn is_queued(stream: &Stream) -> bool;
50
51
    fn set_queued(stream: &mut Stream, val: bool);
52
}
53
54
/// A linked list
55
#[derive(Debug, Clone, Copy)]
56
struct Indices {
57
    pub head: Key,
58
    pub tail: Key,
59
}
60
61
pub(super) enum Entry<'a> {
62
    Occupied(OccupiedEntry<'a>),
63
    Vacant(VacantEntry<'a>),
64
}
65
66
pub(super) struct OccupiedEntry<'a> {
67
    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
68
}
69
70
pub(super) struct VacantEntry<'a> {
71
    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
72
    slab: &'a mut slab::Slab<Stream>,
73
}
74
75
pub(super) trait Resolve {
76
    fn resolve(&mut self, key: Key) -> Ptr<'_>;
77
}
78
79
// ===== impl Store =====
80
81
impl Store {
82
12.8k
    pub fn new() -> Self {
83
12.8k
        Store {
84
12.8k
            slab: slab::Slab::new(),
85
12.8k
            ids: IndexMap::new(),
86
12.8k
        }
87
12.8k
    }
88
89
68.4k
    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<'_>> {
90
68.4k
        let index = match self.ids.get(id) {
91
24.8k
            Some(key) => *key,
92
43.5k
            None => return None,
93
        };
94
95
24.8k
        Some(Ptr {
96
24.8k
            key: Key {
97
24.8k
                index,
98
24.8k
                stream_id: *id,
99
24.8k
            },
100
24.8k
            store: self,
101
24.8k
        })
102
68.4k
    }
103
104
415k
    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr<'_> {
105
415k
        let index = SlabIndex(self.slab.insert(val) as u32);
106
415k
        assert!(self.ids.insert(id, index).is_none());
107
108
415k
        Ptr {
109
415k
            key: Key {
110
415k
                index,
111
415k
                stream_id: id,
112
415k
            },
113
415k
            store: self,
114
415k
        }
115
415k
    }
116
117
83.8k
    pub fn find_entry(&mut self, id: StreamId) -> Entry<'_> {
118
        use self::indexmap::map::Entry::*;
119
120
83.8k
        match self.ids.entry(id) {
121
28.7k
            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
122
55.0k
            Vacant(e) => Entry::Vacant(VacantEntry {
123
55.0k
                ids: e,
124
55.0k
                slab: &mut self.slab,
125
55.0k
            }),
126
        }
127
83.8k
    }
128
129
    #[allow(clippy::blocks_in_conditions)]
130
29.9k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
29.9k
    where
132
29.9k
        F: FnMut(Ptr),
133
    {
134
1.31M
        match self.try_for_each(|ptr| {
135
1.31M
            f(ptr);
136
1.31M
            Ok::<_, Infallible>(())
137
1.31M
        }) {
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>::{closure#0}
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
587
        match self.try_for_each(|ptr| {
135
587
            f(ptr);
136
587
            Ok::<_, Infallible>(())
137
587
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
171k
        match self.try_for_each(|ptr| {
135
171k
            f(ptr);
136
171k
            Ok::<_, Infallible>(())
137
171k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
887k
        match self.try_for_each(|ptr| {
135
887k
            f(ptr);
136
887k
            Ok::<_, Infallible>(())
137
887k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
254k
        match self.try_for_each(|ptr| {
135
254k
            f(ptr);
136
254k
            Ok::<_, Infallible>(())
137
254k
        }) {
138
29.9k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
29.9k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
678
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
678
    where
132
678
        F: FnMut(Ptr),
133
    {
134
678
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
678
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
678
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
7.02k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
7.02k
    where
132
7.02k
        F: FnMut(Ptr),
133
    {
134
7.02k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
7.02k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
7.02k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
5.38k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
5.38k
    where
132
5.38k
        F: FnMut(Ptr),
133
    {
134
5.38k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
5.38k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
5.38k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
16.8k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
16.8k
    where
132
16.8k
        F: FnMut(Ptr),
133
    {
134
16.8k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
16.8k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
16.8k
    }
143
144
32.9k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
32.9k
    where
146
32.9k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
32.9k
        let mut len = self.ids.len();
149
32.9k
        let mut i = 0;
150
151
1.63M
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
1.59M
            let (stream_id, index) = {
154
1.59M
                let entry = self.ids.get_index(i).unwrap();
155
1.59M
                (*entry.0, *entry.1)
156
1.59M
            };
157
158
1.59M
            f(Ptr {
159
1.59M
                key: Key { index, stream_id },
160
1.59M
                store: self,
161
1.59M
            })?;
162
163
            // TODO: This logic probably could be better...
164
1.59M
            let new_len = self.ids.len();
165
166
1.59M
            if new_len < len {
167
417k
                debug_assert!(new_len == len - 1);
168
417k
                len -= 1;
169
1.18M
            } else {
170
1.18M
                i += 1;
171
1.18M
            }
172
        }
173
174
32.9k
        Ok(())
175
32.9k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#0}, h2::proto::error::Error>
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#1}, h2::proto::error::Error>
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
678
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
678
    where
146
678
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
678
        let mut len = self.ids.len();
149
678
        let mut i = 0;
150
151
1.26k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
587
            let (stream_id, index) = {
154
587
                let entry = self.ids.get_index(i).unwrap();
155
587
                (*entry.0, *entry.1)
156
587
            };
157
158
587
            f(Ptr {
159
587
                key: Key { index, stream_id },
160
587
                store: self,
161
587
            })?;
162
163
            // TODO: This logic probably could be better...
164
587
            let new_len = self.ids.len();
165
166
587
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
587
            } else {
170
587
                i += 1;
171
587
            }
172
        }
173
174
678
        Ok(())
175
678
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
7.02k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
7.02k
    where
146
7.02k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
7.02k
        let mut len = self.ids.len();
149
7.02k
        let mut i = 0;
150
151
178k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
171k
            let (stream_id, index) = {
154
171k
                let entry = self.ids.get_index(i).unwrap();
155
171k
                (*entry.0, *entry.1)
156
171k
            };
157
158
171k
            f(Ptr {
159
171k
                key: Key { index, stream_id },
160
171k
                store: self,
161
171k
            })?;
162
163
            // TODO: This logic probably could be better...
164
171k
            let new_len = self.ids.len();
165
166
171k
            if new_len < len {
167
156k
                debug_assert!(new_len == len - 1);
168
156k
                len -= 1;
169
15.5k
            } else {
170
15.5k
                i += 1;
171
15.5k
            }
172
        }
173
174
7.02k
        Ok(())
175
7.02k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
5.38k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
5.38k
    where
146
5.38k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
5.38k
        let mut len = self.ids.len();
149
5.38k
        let mut i = 0;
150
151
893k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
887k
            let (stream_id, index) = {
154
887k
                let entry = self.ids.get_index(i).unwrap();
155
887k
                (*entry.0, *entry.1)
156
887k
            };
157
158
887k
            f(Ptr {
159
887k
                key: Key { index, stream_id },
160
887k
                store: self,
161
887k
            })?;
162
163
            // TODO: This logic probably could be better...
164
887k
            let new_len = self.ids.len();
165
166
887k
            if new_len < len {
167
55.9k
                debug_assert!(new_len == len - 1);
168
55.9k
                len -= 1;
169
831k
            } else {
170
831k
                i += 1;
171
831k
            }
172
        }
173
174
5.38k
        Ok(())
175
5.38k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
16.8k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
16.8k
    where
146
16.8k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
16.8k
        let mut len = self.ids.len();
149
16.8k
        let mut i = 0;
150
151
270k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
254k
            let (stream_id, index) = {
154
254k
                let entry = self.ids.get_index(i).unwrap();
155
254k
                (*entry.0, *entry.1)
156
254k
            };
157
158
254k
            f(Ptr {
159
254k
                key: Key { index, stream_id },
160
254k
                store: self,
161
254k
            })?;
162
163
            // TODO: This logic probably could be better...
164
254k
            let new_len = self.ids.len();
165
166
254k
            if new_len < len {
167
205k
                debug_assert!(new_len == len - 1);
168
205k
                len -= 1;
169
48.8k
            } else {
170
48.8k
                i += 1;
171
48.8k
            }
172
        }
173
174
16.8k
        Ok(())
175
16.8k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#0}, h2::proto::error::Error>
Line
Count
Source
144
1.73k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.73k
    where
146
1.73k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.73k
        let mut len = self.ids.len();
149
1.73k
        let mut i = 0;
150
151
159k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
157k
            let (stream_id, index) = {
154
157k
                let entry = self.ids.get_index(i).unwrap();
155
157k
                (*entry.0, *entry.1)
156
157k
            };
157
158
157k
            f(Ptr {
159
157k
                key: Key { index, stream_id },
160
157k
                store: self,
161
157k
            })?;
162
163
            // TODO: This logic probably could be better...
164
157k
            let new_len = self.ids.len();
165
166
157k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
157k
            } else {
170
157k
                i += 1;
171
157k
            }
172
        }
173
174
1.73k
        Ok(())
175
1.73k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#1}, h2::proto::error::Error>
Line
Count
Source
144
1.31k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.31k
    where
146
1.31k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.31k
        let mut len = self.ids.len();
149
1.31k
        let mut i = 0;
150
151
127k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
126k
            let (stream_id, index) = {
154
126k
                let entry = self.ids.get_index(i).unwrap();
155
126k
                (*entry.0, *entry.1)
156
126k
            };
157
158
126k
            f(Ptr {
159
126k
                key: Key { index, stream_id },
160
126k
                store: self,
161
126k
            })?;
162
163
            // TODO: This logic probably could be better...
164
126k
            let new_len = self.ids.len();
165
166
126k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
126k
            } else {
170
126k
                i += 1;
171
126k
            }
172
        }
173
174
1.31k
        Ok(())
175
1.31k
    }
176
}
177
178
impl Resolve for Store {
179
5.59M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
180
5.59M
        Ptr { key, store: self }
181
5.59M
    }
182
}
183
184
impl ops::Index<Key> for Store {
185
    type Output = Stream;
186
187
33.1M
    fn index(&self, key: Key) -> &Self::Output {
188
33.1M
        self.slab
189
33.1M
            .get(key.index.0 as usize)
190
33.1M
            .filter(|s| s.id == key.stream_id)
191
33.1M
            .unwrap_or_else(|| {
192
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
193
            })
194
33.1M
    }
195
}
196
197
impl ops::IndexMut<Key> for Store {
198
15.0M
    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199
15.0M
        self.slab
200
15.0M
            .get_mut(key.index.0 as usize)
201
15.0M
            .filter(|s| s.id == key.stream_id)
202
15.0M
            .unwrap_or_else(|| {
203
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
204
            })
205
15.0M
    }
206
}
207
208
impl Store {
209
    #[cfg(feature = "unstable")]
210
0
    pub fn num_active_streams(&self) -> usize {
211
0
        self.ids.len()
212
0
    }
213
214
    #[cfg(feature = "unstable")]
215
0
    pub fn num_wired_streams(&self) -> usize {
216
0
        self.slab.len()
217
0
    }
218
}
219
220
// While running h2 unit/integration tests, enable this debug assertion.
221
//
222
// In practice, we don't need to ensure this. But the integration tests
223
// help to make sure we've cleaned up in cases where we could (like, the
224
// runtime isn't suddenly dropping the task for unknown reasons).
225
#[cfg(feature = "unstable")]
226
impl Drop for Store {
227
12.8k
    fn drop(&mut self) {
228
        use std::thread;
229
230
12.8k
        if !thread::panicking() {
231
12.8k
            debug_assert!(self.slab.is_empty());
232
0
        }
233
12.8k
    }
234
}
235
236
// ===== impl Queue =====
237
238
impl<N> Queue<N>
239
where
240
    N: Next,
241
{
242
545k
    pub fn new() -> Self {
243
545k
        Queue {
244
545k
            indices: None,
245
545k
            _p: PhantomData,
246
545k
        }
247
545k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::new
Line
Count
Source
242
481k
    pub fn new() -> Self {
243
481k
        Queue {
244
481k
            indices: None,
245
481k
            _p: PhantomData,
246
481k
        }
247
481k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::new
Line
Count
Source
242
12.8k
    pub fn new() -> Self {
243
12.8k
        Queue {
244
12.8k
            indices: None,
245
12.8k
            _p: PhantomData,
246
12.8k
        }
247
12.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::new
Line
Count
Source
242
12.8k
    pub fn new() -> Self {
243
12.8k
        Queue {
244
12.8k
            indices: None,
245
12.8k
            _p: PhantomData,
246
12.8k
        }
247
12.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::new
Line
Count
Source
242
12.8k
    pub fn new() -> Self {
243
12.8k
        Queue {
244
12.8k
            indices: None,
245
12.8k
            _p: PhantomData,
246
12.8k
        }
247
12.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::new
Line
Count
Source
242
12.8k
    pub fn new() -> Self {
243
12.8k
        Queue {
244
12.8k
            indices: None,
245
12.8k
            _p: PhantomData,
246
12.8k
        }
247
12.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::new
Line
Count
Source
242
12.8k
    pub fn new() -> Self {
243
12.8k
        Queue {
244
12.8k
            indices: None,
245
12.8k
            _p: PhantomData,
246
12.8k
        }
247
12.8k
    }
248
249
414k
    pub fn take(&mut self) -> Self {
250
414k
        Queue {
251
414k
            indices: self.indices.take(),
252
414k
            _p: PhantomData,
253
414k
        }
254
414k
    }
255
256
    /// Queue the stream.
257
    ///
258
    /// If the stream is already contained by the list, return `false`.
259
1.29M
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
1.29M
        tracing::trace!("Queue::push_back");
261
262
1.29M
        if N::is_queued(stream) {
263
354k
            tracing::trace!(" -> already queued");
264
354k
            return false;
265
943k
        }
266
267
943k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
943k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
943k
        match self.indices {
274
926k
            Some(ref mut idxs) => {
275
926k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
926k
                let key = stream.key();
279
926k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
926k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
16.3k
                tracing::trace!(" -> first entry");
286
16.3k
                self.indices = Some(store::Indices {
287
16.3k
                    head: stream.key(),
288
16.3k
                    tail: stream.key(),
289
16.3k
                });
290
            }
291
        }
292
293
943k
        true
294
1.29M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::push
Line
Count
Source
259
192
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
192
        tracing::trace!("Queue::push_back");
261
262
192
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
192
        }
266
267
192
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
192
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
192
        match self.indices {
274
30
            Some(ref mut idxs) => {
275
30
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
30
                let key = stream.key();
279
30
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
30
                idxs.tail = stream.key();
283
            }
284
            None => {
285
162
                tracing::trace!(" -> first entry");
286
162
                self.indices = Some(store::Indices {
287
162
                    head: stream.key(),
288
162
                    tail: stream.key(),
289
162
                });
290
            }
291
        }
292
293
192
        true
294
192
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::push
Line
Count
Source
259
50.0k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
50.0k
        tracing::trace!("Queue::push_back");
261
262
50.0k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
50.0k
        }
266
267
50.0k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
50.0k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
50.0k
        match self.indices {
274
45.1k
            Some(ref mut idxs) => {
275
45.1k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
45.1k
                let key = stream.key();
279
45.1k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
45.1k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.91k
                tracing::trace!(" -> first entry");
286
4.91k
                self.indices = Some(store::Indices {
287
4.91k
                    head: stream.key(),
288
4.91k
                    tail: stream.key(),
289
4.91k
                });
290
            }
291
        }
292
293
50.0k
        true
294
50.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::push
Line
Count
Source
259
279k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
279k
        tracing::trace!("Queue::push_back");
261
262
279k
        if N::is_queued(stream) {
263
48.6k
            tracing::trace!(" -> already queued");
264
48.6k
            return false;
265
230k
        }
266
267
230k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
230k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
230k
        match self.indices {
274
228k
            Some(ref mut idxs) => {
275
228k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
228k
                let key = stream.key();
279
228k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
228k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
2.05k
                tracing::trace!(" -> first entry");
286
2.05k
                self.indices = Some(store::Indices {
287
2.05k
                    head: stream.key(),
288
2.05k
                    tail: stream.key(),
289
2.05k
                });
290
            }
291
        }
292
293
230k
        true
294
279k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::push
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::push
Line
Count
Source
259
414k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
414k
        tracing::trace!("Queue::push_back");
261
262
414k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
414k
        }
266
267
414k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
414k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
414k
        match self.indices {
274
410k
            Some(ref mut idxs) => {
275
410k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
410k
                let key = stream.key();
279
410k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
410k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.16k
                tracing::trace!(" -> first entry");
286
4.16k
                self.indices = Some(store::Indices {
287
4.16k
                    head: stream.key(),
288
4.16k
                    tail: stream.key(),
289
4.16k
                });
290
            }
291
        }
292
293
414k
        true
294
414k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push
Line
Count
Source
259
554k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
554k
        tracing::trace!("Queue::push_back");
261
262
554k
        if N::is_queued(stream) {
263
305k
            tracing::trace!(" -> already queued");
264
305k
            return false;
265
248k
        }
266
267
248k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
248k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
248k
        match self.indices {
274
243k
            Some(ref mut idxs) => {
275
243k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
243k
                let key = stream.key();
279
243k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
243k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
5.01k
                tracing::trace!(" -> first entry");
286
5.01k
                self.indices = Some(store::Indices {
287
5.01k
                    head: stream.key(),
288
5.01k
                    tail: stream.key(),
289
5.01k
                });
290
            }
291
        }
292
293
248k
        true
294
554k
    }
295
296
    /// Queue the stream
297
    ///
298
    /// If the stream is already contained by the list, return `false`.
299
174k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
174k
        tracing::trace!("Queue::push_front");
301
302
174k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
174k
        }
306
307
174k
        N::set_queued(stream, true);
308
309
        // The next pointer shouldn't be set
310
174k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
174k
        match self.indices {
314
173k
            Some(ref mut idxs) => {
315
173k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
173k
                let head_key = stream.resolve(idxs.head).key();
319
173k
                N::set_next(stream, Some(head_key));
320
321
                // Update the head pointer
322
173k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.46k
                tracing::trace!(" -> first entry");
326
1.46k
                self.indices = Some(store::Indices {
327
1.46k
                    head: stream.key(),
328
1.46k
                    tail: stream.key(),
329
1.46k
                });
330
            }
331
        }
332
333
174k
        true
334
174k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<_>>::push_front
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push_front
Line
Count
Source
299
174k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
174k
        tracing::trace!("Queue::push_front");
301
302
174k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
174k
        }
306
307
174k
        N::set_queued(stream, true);
308
309
        // The next pointer shouldn't be set
310
174k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
174k
        match self.indices {
314
173k
            Some(ref mut idxs) => {
315
173k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
173k
                let head_key = stream.resolve(idxs.head).key();
319
173k
                N::set_next(stream, Some(head_key));
320
321
                // Update the head pointer
322
173k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.46k
                tracing::trace!(" -> first entry");
326
1.46k
                self.indices = Some(store::Indices {
327
1.46k
                    head: stream.key(),
328
1.46k
                    tail: stream.key(),
329
1.46k
                });
330
            }
331
        }
332
333
174k
        true
334
174k
    }
335
336
7.24M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
7.24M
    where
338
7.24M
        R: Resolve,
339
    {
340
7.24M
        if let Some(mut idxs) = self.indices {
341
1.11M
            let mut stream = store.resolve(idxs.head);
342
343
1.11M
            if idxs.head == idxs.tail {
344
17.7k
                assert!(N::next(&stream).is_none());
345
17.7k
                self.indices = None;
346
1.10M
            } else {
347
1.10M
                idxs.head = N::take_next(&mut stream).unwrap();
348
1.10M
                self.indices = Some(idxs);
349
1.10M
            }
350
351
1.11M
            debug_assert!(N::is_queued(&stream));
352
1.11M
            N::set_queued(&mut stream, false);
353
354
1.11M
            return Some(stream);
355
6.12M
        }
356
357
6.12M
        None
358
7.24M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
427k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
427k
    where
338
427k
        R: Resolve,
339
    {
340
427k
        if let Some(mut idxs) = self.indices {
341
187
            let mut stream = store.resolve(idxs.head);
342
343
187
            if idxs.head == idxs.tail {
344
160
                assert!(N::next(&stream).is_none());
345
160
                self.indices = None;
346
27
            } else {
347
27
                idxs.head = N::take_next(&mut stream).unwrap();
348
27
                self.indices = Some(idxs);
349
27
            }
350
351
187
            debug_assert!(N::is_queued(&stream));
352
187
            N::set_queued(&mut stream, false);
353
354
187
            return Some(stream);
355
427k
        }
356
357
427k
        None
358
427k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
67.5k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
67.5k
    where
338
67.5k
        R: Resolve,
339
    {
340
67.5k
        if let Some(mut idxs) = self.indices {
341
50.0k
            let mut stream = store.resolve(idxs.head);
342
343
50.0k
            if idxs.head == idxs.tail {
344
4.91k
                assert!(N::next(&stream).is_none());
345
4.91k
                self.indices = None;
346
45.1k
            } else {
347
45.1k
                idxs.head = N::take_next(&mut stream).unwrap();
348
45.1k
                self.indices = Some(idxs);
349
45.1k
            }
350
351
50.0k
            debug_assert!(N::is_queued(&stream));
352
50.0k
            N::set_queued(&mut stream, false);
353
354
50.0k
            return Some(stream);
355
17.4k
        }
356
357
17.4k
        None
358
67.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Ptr>
Line
Count
Source
336
172k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
172k
    where
338
172k
        R: Resolve,
339
    {
340
172k
        if let Some(mut idxs) = self.indices {
341
154k
            let mut stream = store.resolve(idxs.head);
342
343
154k
            if idxs.head == idxs.tail {
344
1.42k
                assert!(N::next(&stream).is_none());
345
1.42k
                self.indices = None;
346
153k
            } else {
347
153k
                idxs.head = N::take_next(&mut stream).unwrap();
348
153k
                self.indices = Some(idxs);
349
153k
            }
350
351
154k
            debug_assert!(N::is_queued(&stream));
352
154k
            N::set_queued(&mut stream, false);
353
354
154k
            return Some(stream);
355
17.3k
        }
356
357
17.3k
        None
358
172k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
94.8k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
94.8k
    where
338
94.8k
        R: Resolve,
339
    {
340
94.8k
        if let Some(mut idxs) = self.indices {
341
75.6k
            let mut stream = store.resolve(idxs.head);
342
343
75.6k
            if idxs.head == idxs.tail {
344
635
                assert!(N::next(&stream).is_none());
345
635
                self.indices = None;
346
75.0k
            } else {
347
75.0k
                idxs.head = N::take_next(&mut stream).unwrap();
348
75.0k
                self.indices = Some(idxs);
349
75.0k
            }
350
351
75.6k
            debug_assert!(N::is_queued(&stream));
352
75.6k
            N::set_queued(&mut stream, false);
353
354
75.6k
            return Some(stream);
355
19.2k
        }
356
357
19.2k
        None
358
94.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
1.87M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
1.87M
    where
338
1.87M
        R: Resolve,
339
    {
340
1.87M
        if let Some(mut idxs) = self.indices {
341
0
            let mut stream = store.resolve(idxs.head);
342
343
0
            if idxs.head == idxs.tail {
344
0
                assert!(N::next(&stream).is_none());
345
0
                self.indices = None;
346
0
            } else {
347
0
                idxs.head = N::take_next(&mut stream).unwrap();
348
0
                self.indices = Some(idxs);
349
0
            }
350
351
0
            debug_assert!(N::is_queued(&stream));
352
0
            N::set_queued(&mut stream, false);
353
354
0
            return Some(stream);
355
1.87M
        }
356
357
1.87M
        None
358
1.87M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
2.30M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.30M
    where
338
2.30M
        R: Resolve,
339
    {
340
2.30M
        if let Some(mut idxs) = self.indices {
341
414k
            let mut stream = store.resolve(idxs.head);
342
343
414k
            if idxs.head == idxs.tail {
344
4.16k
                assert!(N::next(&stream).is_none());
345
4.16k
                self.indices = None;
346
410k
            } else {
347
410k
                idxs.head = N::take_next(&mut stream).unwrap();
348
410k
                self.indices = Some(idxs);
349
410k
            }
350
351
414k
            debug_assert!(N::is_queued(&stream));
352
414k
            N::set_queued(&mut stream, false);
353
354
414k
            return Some(stream);
355
1.89M
        }
356
357
1.89M
        None
358
2.30M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
2.29M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.29M
    where
338
2.29M
        R: Resolve,
339
    {
340
2.29M
        if let Some(mut idxs) = self.indices {
341
423k
            let mut stream = store.resolve(idxs.head);
342
343
423k
            if idxs.head == idxs.tail {
344
6.48k
                assert!(N::next(&stream).is_none());
345
6.48k
                self.indices = None;
346
416k
            } else {
347
416k
                idxs.head = N::take_next(&mut stream).unwrap();
348
416k
                self.indices = Some(idxs);
349
416k
            }
350
351
423k
            debug_assert!(N::is_queued(&stream));
352
423k
            N::set_queued(&mut stream, false);
353
354
423k
            return Some(stream);
355
1.87M
        }
356
357
1.87M
        None
358
2.29M
    }
359
360
2.33M
    pub fn is_empty(&self) -> bool {
361
2.33M
        self.indices.is_none()
362
2.33M
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::is_empty
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::is_empty
Line
Count
Source
360
2.33M
    pub fn is_empty(&self) -> bool {
361
2.33M
        self.indices.is_none()
362
2.33M
    }
363
364
2.01M
    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365
2.01M
    where
366
2.01M
        R: Resolve,
367
2.01M
        F: Fn(&Stream) -> bool,
368
    {
369
2.01M
        if let Some(idxs) = self.indices {
370
2.01M
            let should_pop = f(&store.resolve(idxs.head));
371
2.01M
            if should_pop {
372
586
                return self.pop(store);
373
2.01M
            }
374
32
        }
375
376
2.01M
        None
377
2.01M
    }
378
}
379
380
impl<N> fmt::Debug for Queue<N> {
381
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382
0
        f.debug_struct("Queue")
383
0
            .field("indices", &self.indices)
384
            // skip phantom data
385
0
            .finish()
386
0
    }
387
}
388
389
// ===== impl Ptr =====
390
391
impl<'a> Ptr<'a> {
392
    /// Returns the Key associated with the stream
393
2.72M
    pub fn key(&self) -> Key {
394
2.72M
        self.key
395
2.72M
    }
396
397
414k
    pub fn store_mut(&mut self) -> &mut Store {
398
414k
        self.store
399
414k
    }
400
401
    /// Remove the stream from the store
402
467k
    pub fn remove(self) -> StreamId {
403
        // The stream must have been unlinked before this point
404
467k
        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
405
406
        // Remove the stream state
407
467k
        let stream = self.store.slab.remove(self.key.index.0 as usize);
408
467k
        assert_eq!(stream.id, self.key.stream_id);
409
467k
        stream.id
410
467k
    }
411
412
    /// Remove the StreamId -> stream state association.
413
    ///
414
    /// This will effectively remove the stream as far as the H2 protocol is
415
    /// concerned.
416
1.25M
    pub fn unlink(&mut self) {
417
1.25M
        let id = self.key.stream_id;
418
1.25M
        self.store.ids.swap_remove(&id);
419
1.25M
    }
420
}
421
422
impl<'a> Resolve for Ptr<'a> {
423
1.25M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
424
1.25M
        Ptr {
425
1.25M
            key,
426
1.25M
            store: &mut *self.store,
427
1.25M
        }
428
1.25M
    }
429
}
430
431
impl<'a> ops::Deref for Ptr<'a> {
432
    type Target = Stream;
433
434
33.1M
    fn deref(&self) -> &Stream {
435
33.1M
        &self.store[self.key]
436
33.1M
    }
437
}
438
439
impl<'a> ops::DerefMut for Ptr<'a> {
440
15.0M
    fn deref_mut(&mut self) -> &mut Stream {
441
15.0M
        &mut self.store[self.key]
442
15.0M
    }
443
}
444
445
impl<'a> fmt::Debug for Ptr<'a> {
446
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
447
0
        (**self).fmt(fmt)
448
0
    }
449
}
450
451
// ===== impl OccupiedEntry =====
452
453
impl<'a> OccupiedEntry<'a> {
454
28.7k
    pub fn key(&self) -> Key {
455
28.7k
        let stream_id = *self.ids.key();
456
28.7k
        let index = *self.ids.get();
457
28.7k
        Key { index, stream_id }
458
28.7k
    }
459
}
460
461
// ===== impl VacantEntry =====
462
463
impl<'a> VacantEntry<'a> {
464
53.0k
    pub fn insert(self, value: Stream) -> Key {
465
        // Insert the value in the slab
466
53.0k
        let stream_id = value.id;
467
53.0k
        let index = SlabIndex(self.slab.insert(value) as u32);
468
469
        // Insert the handle in the ID map
470
53.0k
        self.ids.insert(index);
471
472
53.0k
        Key { index, stream_id }
473
53.0k
    }
474
}