Coverage Report

Created: 2026-04-12 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2/src/proto/streams/store.rs
Line
Count
Source
1
use super::*;
2
3
use indexmap::{self, IndexMap};
4
5
use std::convert::Infallible;
6
use std::fmt;
7
use std::marker::PhantomData;
8
use std::ops;
9
10
/// Storage for streams
11
#[derive(Debug)]
12
pub(super) struct Store {
13
    slab: slab::Slab<Stream>,
14
    ids: IndexMap<StreamId, SlabIndex>,
15
}
16
17
/// "Pointer" to an entry in the store
18
pub(super) struct Ptr<'a> {
19
    key: Key,
20
    store: &'a mut Store,
21
}
22
23
/// References an entry in the store.
24
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25
pub(crate) struct Key {
26
    index: SlabIndex,
27
    /// Keep the stream ID in the key as an ABA guard, since slab indices
28
    /// could be re-used with a new stream.
29
    stream_id: StreamId,
30
}
31
32
// We can never have more than `StreamId::MAX` streams in the store,
33
// so we can save a smaller index (u32 vs usize).
34
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35
struct SlabIndex(u32);
36
37
pub(super) struct Queue<N> {
38
    indices: Option<store::Indices>,
39
    _p: PhantomData<N>,
40
}
41
42
pub(super) trait Next {
43
    fn next(stream: &Stream) -> Option<Key>;
44
45
    fn set_next(stream: &mut Stream, key: Option<Key>);
46
47
    fn take_next(stream: &mut Stream) -> Option<Key>;
48
49
    fn is_queued(stream: &Stream) -> bool;
50
51
    fn set_queued(stream: &mut Stream, val: bool);
52
}
53
54
/// A linked list
55
#[derive(Debug, Clone, Copy)]
56
struct Indices {
57
    pub head: Key,
58
    pub tail: Key,
59
}
60
61
pub(super) enum Entry<'a> {
62
    Occupied(OccupiedEntry<'a>),
63
    Vacant(VacantEntry<'a>),
64
}
65
66
pub(super) struct OccupiedEntry<'a> {
67
    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
68
}
69
70
pub(super) struct VacantEntry<'a> {
71
    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
72
    slab: &'a mut slab::Slab<Stream>,
73
}
74
75
pub(super) trait Resolve {
76
    fn resolve(&mut self, key: Key) -> Ptr<'_>;
77
}
78
79
// ===== impl Store =====
80
81
impl Store {
82
14.5k
    pub fn new() -> Self {
83
14.5k
        Store {
84
14.5k
            slab: slab::Slab::new(),
85
14.5k
            ids: IndexMap::new(),
86
14.5k
        }
87
14.5k
    }
88
89
68.2k
    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<'_>> {
90
68.2k
        let index = match self.ids.get(id) {
91
50.8k
            Some(key) => *key,
92
17.3k
            None => return None,
93
        };
94
95
50.8k
        Some(Ptr {
96
50.8k
            key: Key {
97
50.8k
                index,
98
50.8k
                stream_id: *id,
99
50.8k
            },
100
50.8k
            store: self,
101
50.8k
        })
102
68.2k
    }
103
104
489k
    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr<'_> {
105
489k
        let index = SlabIndex(self.slab.insert(val) as u32);
106
489k
        assert!(self.ids.insert(id, index).is_none());
107
108
489k
        Ptr {
109
489k
            key: Key {
110
489k
                index,
111
489k
                stream_id: id,
112
489k
            },
113
489k
            store: self,
114
489k
        }
115
489k
    }
116
117
61.6k
    pub fn find_entry(&mut self, id: StreamId) -> Entry<'_> {
118
        use self::indexmap::map::Entry::*;
119
120
61.6k
        match self.ids.entry(id) {
121
31.6k
            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
122
29.9k
            Vacant(e) => Entry::Vacant(VacantEntry {
123
29.9k
                ids: e,
124
29.9k
                slab: &mut self.slab,
125
29.9k
            }),
126
        }
127
61.6k
    }
128
129
    #[allow(clippy::blocks_in_conditions)]
130
32.9k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
32.9k
    where
132
32.9k
        F: FnMut(Ptr),
133
    {
134
1.45M
        match self.try_for_each(|ptr| {
135
1.45M
            f(ptr);
136
1.45M
            Ok::<_, Infallible>(())
137
1.45M
        }) {
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>::{closure#0}
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
708
        match self.try_for_each(|ptr| {
135
708
            f(ptr);
136
708
            Ok::<_, Infallible>(())
137
708
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
202k
        match self.try_for_each(|ptr| {
135
202k
            f(ptr);
136
202k
            Ok::<_, Infallible>(())
137
202k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
956k
        match self.try_for_each(|ptr| {
135
956k
            f(ptr);
136
956k
            Ok::<_, Infallible>(())
137
956k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
297k
        match self.try_for_each(|ptr| {
135
297k
            f(ptr);
136
297k
            Ok::<_, Infallible>(())
137
297k
        }) {
138
32.9k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
32.9k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
807
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
807
    where
132
807
        F: FnMut(Ptr),
133
    {
134
807
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
807
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
807
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
7.94k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
7.94k
    where
132
7.94k
        F: FnMut(Ptr),
133
    {
134
7.94k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
7.94k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
7.94k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
5.21k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
5.21k
    where
132
5.21k
        F: FnMut(Ptr),
133
    {
134
5.21k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
5.21k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
5.21k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
18.9k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
18.9k
    where
132
18.9k
        F: FnMut(Ptr),
133
    {
134
18.9k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
        }) {
138
18.9k
            Ok(()) => (),
139
            #[allow(unused)]
140
            Err(infallible) => match infallible {},
141
        }
142
18.9k
    }
143
144
35.5k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
35.5k
    where
146
35.5k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
35.5k
        let mut len = self.ids.len();
149
35.5k
        let mut i = 0;
150
151
1.71M
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
1.68M
            let (stream_id, index) = {
154
1.68M
                let entry = self.ids.get_index(i).unwrap();
155
1.68M
                (*entry.0, *entry.1)
156
1.68M
            };
157
158
1.68M
            f(Ptr {
159
1.68M
                key: Key { index, stream_id },
160
1.68M
                store: self,
161
1.68M
            })?;
162
163
            // TODO: This logic probably could be better...
164
1.68M
            let new_len = self.ids.len();
165
166
1.68M
            if new_len < len {
167
458k
                debug_assert!(new_len == len - 1);
168
458k
                len -= 1;
169
1.22M
            } else {
170
1.22M
                i += 1;
171
1.22M
            }
172
        }
173
174
35.5k
        Ok(())
175
35.5k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#0}, h2::proto::error::Error>
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#1}, h2::proto::error::Error>
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
807
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
807
    where
146
807
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
807
        let mut len = self.ids.len();
149
807
        let mut i = 0;
150
151
1.51k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
708
            let (stream_id, index) = {
154
708
                let entry = self.ids.get_index(i).unwrap();
155
708
                (*entry.0, *entry.1)
156
708
            };
157
158
708
            f(Ptr {
159
708
                key: Key { index, stream_id },
160
708
                store: self,
161
708
            })?;
162
163
            // TODO: This logic probably could be better...
164
708
            let new_len = self.ids.len();
165
166
708
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
708
            } else {
170
708
                i += 1;
171
708
            }
172
        }
173
174
807
        Ok(())
175
807
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
7.94k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
7.94k
    where
146
7.94k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
7.94k
        let mut len = self.ids.len();
149
7.94k
        let mut i = 0;
150
151
210k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
202k
            let (stream_id, index) = {
154
202k
                let entry = self.ids.get_index(i).unwrap();
155
202k
                (*entry.0, *entry.1)
156
202k
            };
157
158
202k
            f(Ptr {
159
202k
                key: Key { index, stream_id },
160
202k
                store: self,
161
202k
            })?;
162
163
            // TODO: This logic probably could be better...
164
202k
            let new_len = self.ids.len();
165
166
202k
            if new_len < len {
167
185k
                debug_assert!(new_len == len - 1);
168
185k
                len -= 1;
169
17.0k
            } else {
170
17.0k
                i += 1;
171
17.0k
            }
172
        }
173
174
7.94k
        Ok(())
175
7.94k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
5.21k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
5.21k
    where
146
5.21k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
5.21k
        let mut len = self.ids.len();
149
5.21k
        let mut i = 0;
150
151
962k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
956k
            let (stream_id, index) = {
154
956k
                let entry = self.ids.get_index(i).unwrap();
155
956k
                (*entry.0, *entry.1)
156
956k
            };
157
158
956k
            f(Ptr {
159
956k
                key: Key { index, stream_id },
160
956k
                store: self,
161
956k
            })?;
162
163
            // TODO: This logic probably could be better...
164
956k
            let new_len = self.ids.len();
165
166
956k
            if new_len < len {
167
30.5k
                debug_assert!(new_len == len - 1);
168
30.5k
                len -= 1;
169
926k
            } else {
170
926k
                i += 1;
171
926k
            }
172
        }
173
174
5.21k
        Ok(())
175
5.21k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
18.9k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
18.9k
    where
146
18.9k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
18.9k
        let mut len = self.ids.len();
149
18.9k
        let mut i = 0;
150
151
316k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
297k
            let (stream_id, index) = {
154
297k
                let entry = self.ids.get_index(i).unwrap();
155
297k
                (*entry.0, *entry.1)
156
297k
            };
157
158
297k
            f(Ptr {
159
297k
                key: Key { index, stream_id },
160
297k
                store: self,
161
297k
            })?;
162
163
            // TODO: This logic probably could be better...
164
297k
            let new_len = self.ids.len();
165
166
297k
            if new_len < len {
167
242k
                debug_assert!(new_len == len - 1);
168
242k
                len -= 1;
169
54.6k
            } else {
170
54.6k
                i += 1;
171
54.6k
            }
172
        }
173
174
18.9k
        Ok(())
175
18.9k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#0}, h2::proto::error::Error>
Line
Count
Source
144
1.50k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.50k
    where
146
1.50k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.50k
        let mut len = self.ids.len();
149
1.50k
        let mut i = 0;
150
151
125k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
123k
            let (stream_id, index) = {
154
123k
                let entry = self.ids.get_index(i).unwrap();
155
123k
                (*entry.0, *entry.1)
156
123k
            };
157
158
123k
            f(Ptr {
159
123k
                key: Key { index, stream_id },
160
123k
                store: self,
161
123k
            })?;
162
163
            // TODO: This logic probably could be better...
164
123k
            let new_len = self.ids.len();
165
166
123k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
123k
            } else {
170
123k
                i += 1;
171
123k
            }
172
        }
173
174
1.50k
        Ok(())
175
1.50k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#1}, h2::proto::error::Error>
Line
Count
Source
144
1.10k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.10k
    where
146
1.10k
        F: FnMut(Ptr) -> Result<(), E>,
147
    {
148
1.10k
        let mut len = self.ids.len();
149
1.10k
        let mut i = 0;
150
151
100k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
99.4k
            let (stream_id, index) = {
154
99.4k
                let entry = self.ids.get_index(i).unwrap();
155
99.4k
                (*entry.0, *entry.1)
156
99.4k
            };
157
158
99.4k
            f(Ptr {
159
99.4k
                key: Key { index, stream_id },
160
99.4k
                store: self,
161
99.4k
            })?;
162
163
            // TODO: This logic probably could be better...
164
99.4k
            let new_len = self.ids.len();
165
166
99.4k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
99.4k
            } else {
170
99.4k
                i += 1;
171
99.4k
            }
172
        }
173
174
1.10k
        Ok(())
175
1.10k
    }
176
}
177
178
impl Resolve for Store {
179
4.35M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
180
4.35M
        Ptr { key, store: self }
181
4.35M
    }
182
}
183
184
impl ops::Index<Key> for Store {
185
    type Output = Stream;
186
187
34.6M
    fn index(&self, key: Key) -> &Self::Output {
188
34.6M
        self.slab
189
34.6M
            .get(key.index.0 as usize)
190
34.6M
            .filter(|s| s.id == key.stream_id)
191
34.6M
            .unwrap_or_else(|| {
192
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
193
            })
194
34.6M
    }
195
}
196
197
impl ops::IndexMut<Key> for Store {
198
16.3M
    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199
16.3M
        self.slab
200
16.3M
            .get_mut(key.index.0 as usize)
201
16.3M
            .filter(|s| s.id == key.stream_id)
202
16.3M
            .unwrap_or_else(|| {
203
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
204
            })
205
16.3M
    }
206
}
207
208
impl Store {
209
    #[cfg(feature = "unstable")]
210
0
    pub fn num_active_streams(&self) -> usize {
211
0
        self.ids.len()
212
0
    }
213
214
    #[cfg(feature = "unstable")]
215
0
    pub fn num_wired_streams(&self) -> usize {
216
0
        self.slab.len()
217
0
    }
218
}
219
220
// While running h2 unit/integration tests, enable this debug assertion.
221
//
222
// In practice, we don't need to ensure this. But the integration tests
223
// help to make sure we've cleaned up in cases where we could (like, the
224
// runtime isn't suddenly dropping the task for unknown reasons).
225
#[cfg(feature = "unstable")]
226
impl Drop for Store {
227
14.5k
    fn drop(&mut self) {
228
        use std::thread;
229
230
14.5k
        if !thread::panicking() {
231
14.5k
            debug_assert!(self.slab.is_empty());
232
0
        }
233
14.5k
    }
234
}
235
236
// ===== impl Queue =====
237
238
impl<N> Queue<N>
239
where
240
    N: Next,
241
{
242
604k
    pub fn new() -> Self {
243
604k
        Queue {
244
604k
            indices: None,
245
604k
            _p: PhantomData,
246
604k
        }
247
604k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::new
Line
Count
Source
242
531k
    pub fn new() -> Self {
243
531k
        Queue {
244
531k
            indices: None,
245
531k
            _p: PhantomData,
246
531k
        }
247
531k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::new
Line
Count
Source
242
14.5k
    pub fn new() -> Self {
243
14.5k
        Queue {
244
14.5k
            indices: None,
245
14.5k
            _p: PhantomData,
246
14.5k
        }
247
14.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::new
Line
Count
Source
242
14.5k
    pub fn new() -> Self {
243
14.5k
        Queue {
244
14.5k
            indices: None,
245
14.5k
            _p: PhantomData,
246
14.5k
        }
247
14.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::new
Line
Count
Source
242
14.5k
    pub fn new() -> Self {
243
14.5k
        Queue {
244
14.5k
            indices: None,
245
14.5k
            _p: PhantomData,
246
14.5k
        }
247
14.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::new
Line
Count
Source
242
14.5k
    pub fn new() -> Self {
243
14.5k
        Queue {
244
14.5k
            indices: None,
245
14.5k
            _p: PhantomData,
246
14.5k
        }
247
14.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::new
Line
Count
Source
242
14.5k
    pub fn new() -> Self {
243
14.5k
        Queue {
244
14.5k
            indices: None,
245
14.5k
            _p: PhantomData,
246
14.5k
        }
247
14.5k
    }
248
249
488k
    pub fn take(&mut self) -> Self {
250
488k
        Queue {
251
488k
            indices: self.indices.take(),
252
488k
            _p: PhantomData,
253
488k
        }
254
488k
    }
255
256
    /// Queue the stream.
257
    ///
258
    /// If the stream is already contained by the list, return `false`.
259
1.42M
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
1.42M
        tracing::trace!("Queue::push_back");
261
262
1.42M
        if N::is_queued(stream) {
263
395k
            tracing::trace!(" -> already queued");
264
395k
            return false;
265
1.03M
        }
266
267
1.03M
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
1.03M
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
1.03M
        match self.indices {
274
1.01M
            Some(ref mut idxs) => {
275
1.01M
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
1.01M
                let key = stream.key();
279
1.01M
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
1.01M
                idxs.tail = stream.key();
283
            }
284
            None => {
285
18.4k
                tracing::trace!(" -> first entry");
286
18.4k
                self.indices = Some(store::Indices {
287
18.4k
                    head: stream.key(),
288
18.4k
                    tail: stream.key(),
289
18.4k
                });
290
            }
291
        }
292
293
1.03M
        true
294
1.42M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::push
Line
Count
Source
259
199
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
199
        tracing::trace!("Queue::push_back");
261
262
199
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
199
        }
266
267
199
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
199
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
199
        match self.indices {
274
25
            Some(ref mut idxs) => {
275
25
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
25
                let key = stream.key();
279
25
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
25
                idxs.tail = stream.key();
283
            }
284
            None => {
285
174
                tracing::trace!(" -> first entry");
286
174
                self.indices = Some(store::Indices {
287
174
                    head: stream.key(),
288
174
                    tail: stream.key(),
289
174
                });
290
            }
291
        }
292
293
199
        true
294
199
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::push
Line
Count
Source
259
56.0k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
56.0k
        tracing::trace!("Queue::push_back");
261
262
56.0k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
56.0k
        }
266
267
56.0k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
56.0k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
56.0k
        match self.indices {
274
50.4k
            Some(ref mut idxs) => {
275
50.4k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
50.4k
                let key = stream.key();
279
50.4k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
50.4k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
5.55k
                tracing::trace!(" -> first entry");
286
5.55k
                self.indices = Some(store::Indices {
287
5.55k
                    head: stream.key(),
288
5.55k
                    tail: stream.key(),
289
5.55k
                });
290
            }
291
        }
292
293
56.0k
        true
294
56.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::push
Line
Count
Source
259
252k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
252k
        tracing::trace!("Queue::push_back");
261
262
252k
        if N::is_queued(stream) {
263
26.3k
            tracing::trace!(" -> already queued");
264
26.3k
            return false;
265
225k
        }
266
267
225k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
225k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
225k
        match self.indices {
274
223k
            Some(ref mut idxs) => {
275
223k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
223k
                let key = stream.key();
279
223k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
223k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
2.11k
                tracing::trace!(" -> first entry");
286
2.11k
                self.indices = Some(store::Indices {
287
2.11k
                    head: stream.key(),
288
2.11k
                    tail: stream.key(),
289
2.11k
                });
290
            }
291
        }
292
293
225k
        true
294
252k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::push
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::push
Line
Count
Source
259
488k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
488k
        tracing::trace!("Queue::push_back");
261
262
488k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
488k
        }
266
267
488k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
488k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
488k
        match self.indices {
274
483k
            Some(ref mut idxs) => {
275
483k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
483k
                let key = stream.key();
279
483k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
483k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.98k
                tracing::trace!(" -> first entry");
286
4.98k
                self.indices = Some(store::Indices {
287
4.98k
                    head: stream.key(),
288
4.98k
                    tail: stream.key(),
289
4.98k
                });
290
            }
291
        }
292
293
488k
        true
294
488k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push
Line
Count
Source
259
632k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
632k
        tracing::trace!("Queue::push_back");
261
262
632k
        if N::is_queued(stream) {
263
369k
            tracing::trace!(" -> already queued");
264
369k
            return false;
265
263k
        }
266
267
263k
        N::set_queued(stream, true);
268
269
        // The next pointer shouldn't be set
270
263k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
263k
        match self.indices {
274
257k
            Some(ref mut idxs) => {
275
257k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
257k
                let key = stream.key();
279
257k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
281
                // Update the tail pointer
282
257k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
5.61k
                tracing::trace!(" -> first entry");
286
5.61k
                self.indices = Some(store::Indices {
287
5.61k
                    head: stream.key(),
288
5.61k
                    tail: stream.key(),
289
5.61k
                });
290
            }
291
        }
292
293
263k
        true
294
632k
    }
295
296
    /// Queue the stream
297
    ///
298
    /// If the stream is already contained by the list, return `false`.
299
208k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
208k
        tracing::trace!("Queue::push_front");
301
302
208k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
208k
        }
306
307
208k
        N::set_queued(stream, true);
308
309
        // The next pointer shouldn't be set
310
208k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
208k
        match self.indices {
314
207k
            Some(ref mut idxs) => {
315
207k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
207k
                let head_key = stream.resolve(idxs.head).key();
319
207k
                N::set_next(stream, Some(head_key));
320
321
                // Update the head pointer
322
207k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.78k
                tracing::trace!(" -> first entry");
326
1.78k
                self.indices = Some(store::Indices {
327
1.78k
                    head: stream.key(),
328
1.78k
                    tail: stream.key(),
329
1.78k
                });
330
            }
331
        }
332
333
208k
        true
334
208k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<_>>::push_front
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push_front
Line
Count
Source
299
208k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
208k
        tracing::trace!("Queue::push_front");
301
302
208k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
208k
        }
306
307
208k
        N::set_queued(stream, true);
308
309
        // The next pointer shouldn't be set
310
208k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
208k
        match self.indices {
314
207k
            Some(ref mut idxs) => {
315
207k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
207k
                let head_key = stream.resolve(idxs.head).key();
319
207k
                N::set_next(stream, Some(head_key));
320
321
                // Update the head pointer
322
207k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.78k
                tracing::trace!(" -> first entry");
326
1.78k
                self.indices = Some(store::Indices {
327
1.78k
                    head: stream.key(),
328
1.78k
                    tail: stream.key(),
329
1.78k
                });
330
            }
331
        }
332
333
208k
        true
334
208k
    }
335
336
2.34M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.34M
    where
338
2.34M
        R: Resolve,
339
    {
340
2.34M
        if let Some(mut idxs) = self.indices {
341
1.24M
            let mut stream = store.resolve(idxs.head);
342
343
1.24M
            if idxs.head == idxs.tail {
344
20.2k
                assert!(N::next(&stream).is_none());
345
20.2k
                self.indices = None;
346
1.22M
            } else {
347
1.22M
                idxs.head = N::take_next(&mut stream).unwrap();
348
1.22M
                self.indices = Some(idxs);
349
1.22M
            }
350
351
1.24M
            debug_assert!(N::is_queued(&stream));
352
1.24M
            N::set_queued(&mut stream, false);
353
354
1.24M
            return Some(stream);
355
1.10M
        }
356
357
1.10M
        None
358
2.34M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
503k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
503k
    where
338
503k
        R: Resolve,
339
    {
340
503k
        if let Some(mut idxs) = self.indices {
341
193
            let mut stream = store.resolve(idxs.head);
342
343
193
            if idxs.head == idxs.tail {
344
171
                assert!(N::next(&stream).is_none());
345
171
                self.indices = None;
346
22
            } else {
347
22
                idxs.head = N::take_next(&mut stream).unwrap();
348
22
                self.indices = Some(idxs);
349
22
            }
350
351
193
            debug_assert!(N::is_queued(&stream));
352
193
            N::set_queued(&mut stream, false);
353
354
193
            return Some(stream);
355
502k
        }
356
357
502k
        None
358
503k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
75.7k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
75.7k
    where
338
75.7k
        R: Resolve,
339
    {
340
75.7k
        if let Some(mut idxs) = self.indices {
341
56.0k
            let mut stream = store.resolve(idxs.head);
342
343
56.0k
            if idxs.head == idxs.tail {
344
5.55k
                assert!(N::next(&stream).is_none());
345
5.55k
                self.indices = None;
346
50.4k
            } else {
347
50.4k
                idxs.head = N::take_next(&mut stream).unwrap();
348
50.4k
                self.indices = Some(idxs);
349
50.4k
            }
350
351
56.0k
            debug_assert!(N::is_queued(&stream));
352
56.0k
            N::set_queued(&mut stream, false);
353
354
56.0k
            return Some(stream);
355
19.7k
        }
356
357
19.7k
        None
358
75.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Ptr>
Line
Count
Source
336
196k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
196k
    where
338
196k
        R: Resolve,
339
    {
340
196k
        if let Some(mut idxs) = self.indices {
341
172k
            let mut stream = store.resolve(idxs.head);
342
343
172k
            if idxs.head == idxs.tail {
344
1.65k
                assert!(N::next(&stream).is_none());
345
1.65k
                self.indices = None;
346
170k
            } else {
347
170k
                idxs.head = N::take_next(&mut stream).unwrap();
348
170k
                self.indices = Some(idxs);
349
170k
            }
350
351
172k
            debug_assert!(N::is_queued(&stream));
352
172k
            N::set_queued(&mut stream, false);
353
354
172k
            return Some(stream);
355
23.5k
        }
356
357
23.5k
        None
358
196k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
74.9k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
74.9k
    where
338
74.9k
        R: Resolve,
339
    {
340
74.9k
        if let Some(mut idxs) = self.indices {
341
53.4k
            let mut stream = store.resolve(idxs.head);
342
343
53.4k
            if idxs.head == idxs.tail {
344
452
                assert!(N::next(&stream).is_none());
345
452
                self.indices = None;
346
52.9k
            } else {
347
52.9k
                idxs.head = N::take_next(&mut stream).unwrap();
348
52.9k
                self.indices = Some(idxs);
349
52.9k
            }
350
351
53.4k
            debug_assert!(N::is_queued(&stream));
352
53.4k
            N::set_queued(&mut stream, false);
353
354
53.4k
            return Some(stream);
355
21.5k
        }
356
357
21.5k
        None
358
74.9k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
172k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
172k
    where
338
172k
        R: Resolve,
339
    {
340
172k
        if let Some(mut idxs) = self.indices {
341
0
            let mut stream = store.resolve(idxs.head);
342
343
0
            if idxs.head == idxs.tail {
344
0
                assert!(N::next(&stream).is_none());
345
0
                self.indices = None;
346
0
            } else {
347
0
                idxs.head = N::take_next(&mut stream).unwrap();
348
0
                self.indices = Some(idxs);
349
0
            }
350
351
0
            debug_assert!(N::is_queued(&stream));
352
0
            N::set_queued(&mut stream, false);
353
354
0
            return Some(stream);
355
172k
        }
356
357
172k
        None
358
172k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
680k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
680k
    where
338
680k
        R: Resolve,
339
    {
340
680k
        if let Some(mut idxs) = self.indices {
341
488k
            let mut stream = store.resolve(idxs.head);
342
343
488k
            if idxs.head == idxs.tail {
344
4.98k
                assert!(N::next(&stream).is_none());
345
4.98k
                self.indices = None;
346
483k
            } else {
347
483k
                idxs.head = N::take_next(&mut stream).unwrap();
348
483k
                self.indices = Some(idxs);
349
483k
            }
350
351
488k
            debug_assert!(N::is_queued(&stream));
352
488k
            N::set_queued(&mut stream, false);
353
354
488k
            return Some(stream);
355
191k
        }
356
357
191k
        None
358
680k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
642k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
642k
    where
338
642k
        R: Resolve,
339
    {
340
642k
        if let Some(mut idxs) = self.indices {
341
472k
            let mut stream = store.resolve(idxs.head);
342
343
472k
            if idxs.head == idxs.tail {
344
7.40k
                assert!(N::next(&stream).is_none());
345
7.40k
                self.indices = None;
346
464k
            } else {
347
464k
                idxs.head = N::take_next(&mut stream).unwrap();
348
464k
                self.indices = Some(idxs);
349
464k
            }
350
351
472k
            debug_assert!(N::is_queued(&stream));
352
472k
            N::set_queued(&mut stream, false);
353
354
472k
            return Some(stream);
355
170k
        }
356
357
170k
        None
358
642k
    }
359
360
526k
    pub fn is_empty(&self) -> bool {
361
526k
        self.indices.is_none()
362
526k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::is_empty
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::is_empty
Line
Count
Source
360
526k
    pub fn is_empty(&self) -> bool {
361
526k
        self.indices.is_none()
362
526k
    }
363
364
235k
    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365
235k
    where
366
235k
        R: Resolve,
367
235k
        F: Fn(&Stream) -> bool,
368
    {
369
235k
        if let Some(idxs) = self.indices {
370
235k
            let should_pop = f(&store.resolve(idxs.head));
371
235k
            if should_pop {
372
627
                return self.pop(store);
373
234k
            }
374
69
        }
375
376
234k
        None
377
235k
    }
378
}
379
380
impl<N> fmt::Debug for Queue<N> {
381
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382
0
        f.debug_struct("Queue")
383
0
            .field("indices", &self.indices)
384
            // skip phantom data
385
0
            .finish()
386
0
    }
387
}
388
389
// ===== impl Ptr =====
390
391
impl<'a> Ptr<'a> {
392
    /// Returns the Key associated with the stream
393
3.03M
    pub fn key(&self) -> Key {
394
3.03M
        self.key
395
3.03M
    }
396
397
488k
    pub fn store_mut(&mut self) -> &mut Store {
398
488k
        self.store
399
488k
    }
400
401
    /// Remove the stream from the store
402
516k
    pub fn remove(self) -> StreamId {
403
        // The stream must have been unlinked before this point
404
516k
        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
405
406
        // Remove the stream state
407
516k
        let stream = self.store.slab.remove(self.key.index.0 as usize);
408
516k
        assert_eq!(stream.id, self.key.stream_id);
409
516k
        stream.id
410
516k
    }
411
412
    /// Remove the StreamId -> stream state association.
413
    ///
414
    /// This will effectively remove the stream as far as the H2 protocol is
415
    /// concerned.
416
1.42M
    pub fn unlink(&mut self) {
417
1.42M
        let id = self.key.stream_id;
418
1.42M
        self.store.ids.swap_remove(&id);
419
1.42M
    }
420
}
421
422
impl<'a> Resolve for Ptr<'a> {
423
1.39M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
424
1.39M
        Ptr {
425
1.39M
            key,
426
1.39M
            store: &mut *self.store,
427
1.39M
        }
428
1.39M
    }
429
}
430
431
impl<'a> ops::Deref for Ptr<'a> {
432
    type Target = Stream;
433
434
34.6M
    fn deref(&self) -> &Stream {
435
34.6M
        &self.store[self.key]
436
34.6M
    }
437
}
438
439
impl<'a> ops::DerefMut for Ptr<'a> {
440
16.3M
    fn deref_mut(&mut self) -> &mut Stream {
441
16.3M
        &mut self.store[self.key]
442
16.3M
    }
443
}
444
445
impl<'a> fmt::Debug for Ptr<'a> {
446
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
447
0
        (**self).fmt(fmt)
448
0
    }
449
}
450
451
// ===== impl OccupiedEntry =====
452
453
impl<'a> OccupiedEntry<'a> {
454
31.6k
    pub fn key(&self) -> Key {
455
31.6k
        let stream_id = *self.ids.key();
456
31.6k
        let index = *self.ids.get();
457
31.6k
        Key { index, stream_id }
458
31.6k
    }
459
}
460
461
// ===== impl VacantEntry =====
462
463
impl<'a> VacantEntry<'a> {
464
27.6k
    pub fn insert(self, value: Stream) -> Key {
465
        // Insert the value in the slab
466
27.6k
        let stream_id = value.id;
467
27.6k
        let index = SlabIndex(self.slab.insert(value) as u32);
468
469
        // Insert the handle in the ID map
470
27.6k
        self.ids.insert(index);
471
472
27.6k
        Key { index, stream_id }
473
27.6k
    }
474
}