Coverage Report

Created: 2025-07-12 06:22

/src/h2/src/proto/streams/store.rs
Line
Count
Source (jump to first uncovered line)
1
use super::*;
2
3
use indexmap::{self, IndexMap};
4
5
use std::convert::Infallible;
6
use std::fmt;
7
use std::marker::PhantomData;
8
use std::ops;
9
10
/// Storage for streams
11
#[derive(Debug)]
12
pub(super) struct Store {
13
    slab: slab::Slab<Stream>,
14
    ids: IndexMap<StreamId, SlabIndex>,
15
}
16
17
/// "Pointer" to an entry in the store
18
pub(super) struct Ptr<'a> {
19
    key: Key,
20
    store: &'a mut Store,
21
}
22
23
/// References an entry in the store.
24
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25
pub(crate) struct Key {
26
    index: SlabIndex,
27
    /// Keep the stream ID in the key as an ABA guard, since slab indices
28
    /// could be re-used with a new stream.
29
    stream_id: StreamId,
30
}
31
32
// We can never have more than `StreamId::MAX` streams in the store,
33
// so we can save a smaller index (u32 vs usize).
34
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35
struct SlabIndex(u32);
36
37
#[derive(Debug)]
38
pub(super) struct Queue<N> {
39
    indices: Option<store::Indices>,
40
    _p: PhantomData<N>,
41
}
42
43
pub(super) trait Next {
44
    fn next(stream: &Stream) -> Option<Key>;
45
46
    fn set_next(stream: &mut Stream, key: Option<Key>);
47
48
    fn take_next(stream: &mut Stream) -> Option<Key>;
49
50
    fn is_queued(stream: &Stream) -> bool;
51
52
    fn set_queued(stream: &mut Stream, val: bool);
53
}
54
55
/// A linked list
56
#[derive(Debug, Clone, Copy)]
57
struct Indices {
58
    pub head: Key,
59
    pub tail: Key,
60
}
61
62
pub(super) enum Entry<'a> {
63
    Occupied(OccupiedEntry<'a>),
64
    Vacant(VacantEntry<'a>),
65
}
66
67
pub(super) struct OccupiedEntry<'a> {
68
    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69
}
70
71
pub(super) struct VacantEntry<'a> {
72
    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73
    slab: &'a mut slab::Slab<Stream>,
74
}
75
76
pub(super) trait Resolve {
77
    fn resolve(&mut self, key: Key) -> Ptr<'_>;
78
}
79
80
// ===== impl Store =====
81
82
impl Store {
83
12.0k
    pub fn new() -> Self {
84
12.0k
        Store {
85
12.0k
            slab: slab::Slab::new(),
86
12.0k
            ids: IndexMap::new(),
87
12.0k
        }
88
12.0k
    }
89
90
68.8k
    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<'_>> {
91
68.8k
        let index = match self.ids.get(id) {
92
14.1k
            Some(key) => *key,
93
54.6k
            None => return None,
94
        };
95
96
14.1k
        Some(Ptr {
97
14.1k
            key: Key {
98
14.1k
                index,
99
14.1k
                stream_id: *id,
100
14.1k
            },
101
14.1k
            store: self,
102
14.1k
        })
103
68.8k
    }
104
105
415k
    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr<'_> {
106
415k
        let index = SlabIndex(self.slab.insert(val) as u32);
107
415k
        assert!(self.ids.insert(id, index).is_none());
108
109
415k
        Ptr {
110
415k
            key: Key {
111
415k
                index,
112
415k
                stream_id: id,
113
415k
            },
114
415k
            store: self,
115
415k
        }
116
415k
    }
117
118
82.4k
    pub fn find_entry(&mut self, id: StreamId) -> Entry<'_> {
119
        use self::indexmap::map::Entry::*;
120
121
82.4k
        match self.ids.entry(id) {
122
19.5k
            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123
62.9k
            Vacant(e) => Entry::Vacant(VacantEntry {
124
62.9k
                ids: e,
125
62.9k
                slab: &mut self.slab,
126
62.9k
            }),
127
        }
128
82.4k
    }
129
130
    #[allow(clippy::blocks_in_conditions)]
131
29.4k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
132
29.4k
    where
133
29.4k
        F: FnMut(Ptr),
134
29.4k
    {
135
1.31M
        match self.try_for_each(|ptr| {
136
1.31M
            f(ptr);
137
1.31M
            Ok::<_, Infallible>(())
138
1.31M
        }) {
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>::{closure#0}
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
135
362
        match self.try_for_each(|ptr| {
136
362
            f(ptr);
137
362
            Ok::<_, Infallible>(())
138
362
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
135
184k
        match self.try_for_each(|ptr| {
136
184k
            f(ptr);
137
184k
            Ok::<_, Infallible>(())
138
184k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
135
902k
        match self.try_for_each(|ptr| {
136
902k
            f(ptr);
137
902k
            Ok::<_, Infallible>(())
138
902k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
135
231k
        match self.try_for_each(|ptr| {
136
231k
            f(ptr);
137
231k
            Ok::<_, Infallible>(())
138
231k
        }) {
139
29.4k
            Ok(()) => (),
140
29.4k
            #[allow(unused)]
141
29.4k
            Err(infallible) => match infallible {},
142
29.4k
        }
143
29.4k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
131
448
    pub(crate) fn for_each<F>(&mut self, mut f: F)
132
448
    where
133
448
        F: FnMut(Ptr),
134
448
    {
135
448
        match self.try_for_each(|ptr| {
136
            f(ptr);
137
            Ok::<_, Infallible>(())
138
448
        }) {
139
448
            Ok(()) => (),
140
448
            #[allow(unused)]
141
448
            Err(infallible) => match infallible {},
142
448
        }
143
448
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
131
7.16k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
132
7.16k
    where
133
7.16k
        F: FnMut(Ptr),
134
7.16k
    {
135
7.16k
        match self.try_for_each(|ptr| {
136
            f(ptr);
137
            Ok::<_, Infallible>(())
138
7.16k
        }) {
139
7.16k
            Ok(()) => (),
140
7.16k
            #[allow(unused)]
141
7.16k
            Err(infallible) => match infallible {},
142
7.16k
        }
143
7.16k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
131
6.32k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
132
6.32k
    where
133
6.32k
        F: FnMut(Ptr),
134
6.32k
    {
135
6.32k
        match self.try_for_each(|ptr| {
136
            f(ptr);
137
            Ok::<_, Infallible>(())
138
6.32k
        }) {
139
6.32k
            Ok(()) => (),
140
6.32k
            #[allow(unused)]
141
6.32k
            Err(infallible) => match infallible {},
142
6.32k
        }
143
6.32k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
131
15.5k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
132
15.5k
    where
133
15.5k
        F: FnMut(Ptr),
134
15.5k
    {
135
15.5k
        match self.try_for_each(|ptr| {
136
            f(ptr);
137
            Ok::<_, Infallible>(())
138
15.5k
        }) {
139
15.5k
            Ok(()) => (),
140
15.5k
            #[allow(unused)]
141
15.5k
            Err(infallible) => match infallible {},
142
15.5k
        }
143
15.5k
    }
144
145
32.2k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
32.2k
    where
147
32.2k
        F: FnMut(Ptr) -> Result<(), E>,
148
32.2k
    {
149
32.2k
        let mut len = self.ids.len();
150
32.2k
        let mut i = 0;
151
152
1.59M
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
1.56M
            let (stream_id, index) = {
155
1.56M
                let entry = self.ids.get_index(i).unwrap();
156
1.56M
                (*entry.0, *entry.1)
157
1.56M
            };
158
1.56M
159
1.56M
            f(Ptr {
160
1.56M
                key: Key { index, stream_id },
161
1.56M
                store: self,
162
1.56M
            })?;
163
164
            // TODO: This logic probably could be better...
165
1.56M
            let new_len = self.ids.len();
166
1.56M
167
1.56M
            if new_len < len {
168
457k
                debug_assert!(new_len == len - 1);
169
457k
                len -= 1;
170
1.10M
            } else {
171
1.10M
                i += 1;
172
1.10M
            }
173
        }
174
175
32.2k
        Ok(())
176
32.2k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#0}, h2::proto::error::Error>
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#1}, h2::proto::error::Error>
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
145
448
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
448
    where
147
448
        F: FnMut(Ptr) -> Result<(), E>,
148
448
    {
149
448
        let mut len = self.ids.len();
150
448
        let mut i = 0;
151
152
810
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
362
            let (stream_id, index) = {
155
362
                let entry = self.ids.get_index(i).unwrap();
156
362
                (*entry.0, *entry.1)
157
362
            };
158
362
159
362
            f(Ptr {
160
362
                key: Key { index, stream_id },
161
362
                store: self,
162
362
            })?;
163
164
            // TODO: This logic probably could be better...
165
362
            let new_len = self.ids.len();
166
362
167
362
            if new_len < len {
168
0
                debug_assert!(new_len == len - 1);
169
0
                len -= 1;
170
362
            } else {
171
362
                i += 1;
172
362
            }
173
        }
174
175
448
        Ok(())
176
448
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
145
7.16k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
7.16k
    where
147
7.16k
        F: FnMut(Ptr) -> Result<(), E>,
148
7.16k
    {
149
7.16k
        let mut len = self.ids.len();
150
7.16k
        let mut i = 0;
151
152
191k
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
184k
            let (stream_id, index) = {
155
184k
                let entry = self.ids.get_index(i).unwrap();
156
184k
                (*entry.0, *entry.1)
157
184k
            };
158
184k
159
184k
            f(Ptr {
160
184k
                key: Key { index, stream_id },
161
184k
                store: self,
162
184k
            })?;
163
164
            // TODO: This logic probably could be better...
165
184k
            let new_len = self.ids.len();
166
184k
167
184k
            if new_len < len {
168
177k
                debug_assert!(new_len == len - 1);
169
177k
                len -= 1;
170
7.43k
            } else {
171
7.43k
                i += 1;
172
7.43k
            }
173
        }
174
175
7.16k
        Ok(())
176
7.16k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
145
6.32k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
6.32k
    where
147
6.32k
        F: FnMut(Ptr) -> Result<(), E>,
148
6.32k
    {
149
6.32k
        let mut len = self.ids.len();
150
6.32k
        let mut i = 0;
151
152
908k
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
902k
            let (stream_id, index) = {
155
902k
                let entry = self.ids.get_index(i).unwrap();
156
902k
                (*entry.0, *entry.1)
157
902k
            };
158
902k
159
902k
            f(Ptr {
160
902k
                key: Key { index, stream_id },
161
902k
                store: self,
162
902k
            })?;
163
164
            // TODO: This logic probably could be better...
165
902k
            let new_len = self.ids.len();
166
902k
167
902k
            if new_len < len {
168
65.2k
                debug_assert!(new_len == len - 1);
169
65.2k
                len -= 1;
170
837k
            } else {
171
837k
                i += 1;
172
837k
            }
173
        }
174
175
6.32k
        Ok(())
176
6.32k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
145
15.5k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
15.5k
    where
147
15.5k
        F: FnMut(Ptr) -> Result<(), E>,
148
15.5k
    {
149
15.5k
        let mut len = self.ids.len();
150
15.5k
        let mut i = 0;
151
152
247k
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
231k
            let (stream_id, index) = {
155
231k
                let entry = self.ids.get_index(i).unwrap();
156
231k
                (*entry.0, *entry.1)
157
231k
            };
158
231k
159
231k
            f(Ptr {
160
231k
                key: Key { index, stream_id },
161
231k
                store: self,
162
231k
            })?;
163
164
            // TODO: This logic probably could be better...
165
231k
            let new_len = self.ids.len();
166
231k
167
231k
            if new_len < len {
168
215k
                debug_assert!(new_len == len - 1);
169
215k
                len -= 1;
170
16.1k
            } else {
171
16.1k
                i += 1;
172
16.1k
            }
173
        }
174
175
15.5k
        Ok(())
176
15.5k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#0}, h2::proto::error::Error>
Line
Count
Source
145
1.60k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
1.60k
    where
147
1.60k
        F: FnMut(Ptr) -> Result<(), E>,
148
1.60k
    {
149
1.60k
        let mut len = self.ids.len();
150
1.60k
        let mut i = 0;
151
152
141k
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
139k
            let (stream_id, index) = {
155
139k
                let entry = self.ids.get_index(i).unwrap();
156
139k
                (*entry.0, *entry.1)
157
139k
            };
158
139k
159
139k
            f(Ptr {
160
139k
                key: Key { index, stream_id },
161
139k
                store: self,
162
139k
            })?;
163
164
            // TODO: This logic probably could be better...
165
139k
            let new_len = self.ids.len();
166
139k
167
139k
            if new_len < len {
168
0
                debug_assert!(new_len == len - 1);
169
0
                len -= 1;
170
139k
            } else {
171
139k
                i += 1;
172
139k
            }
173
        }
174
175
1.60k
        Ok(())
176
1.60k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#1}, h2::proto::error::Error>
Line
Count
Source
145
1.18k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146
1.18k
    where
147
1.18k
        F: FnMut(Ptr) -> Result<(), E>,
148
1.18k
    {
149
1.18k
        let mut len = self.ids.len();
150
1.18k
        let mut i = 0;
151
152
102k
        while i < len {
153
            // Get the key by index, this makes the borrow checker happy
154
101k
            let (stream_id, index) = {
155
101k
                let entry = self.ids.get_index(i).unwrap();
156
101k
                (*entry.0, *entry.1)
157
101k
            };
158
101k
159
101k
            f(Ptr {
160
101k
                key: Key { index, stream_id },
161
101k
                store: self,
162
101k
            })?;
163
164
            // TODO: This logic probably could be better...
165
101k
            let new_len = self.ids.len();
166
101k
167
101k
            if new_len < len {
168
0
                debug_assert!(new_len == len - 1);
169
0
                len -= 1;
170
101k
            } else {
171
101k
                i += 1;
172
101k
            }
173
        }
174
175
1.18k
        Ok(())
176
1.18k
    }
177
}
178
179
impl Resolve for Store {
180
3.80M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
181
3.80M
        Ptr { key, store: self }
182
3.80M
    }
183
}
184
185
impl ops::Index<Key> for Store {
186
    type Output = Stream;
187
188
29.8M
    fn index(&self, key: Key) -> &Self::Output {
189
29.8M
        self.slab
190
29.8M
            .get(key.index.0 as usize)
191
29.8M
            .filter(|s| s.id == key.stream_id)
192
29.8M
            .unwrap_or_else(|| {
193
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
194
29.8M
            })
195
29.8M
    }
196
}
197
198
impl ops::IndexMut<Key> for Store {
199
14.3M
    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
200
14.3M
        self.slab
201
14.3M
            .get_mut(key.index.0 as usize)
202
14.3M
            .filter(|s| s.id == key.stream_id)
203
14.3M
            .unwrap_or_else(|| {
204
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
205
14.3M
            })
206
14.3M
    }
207
}
208
209
impl Store {
210
    #[cfg(feature = "unstable")]
211
0
    pub fn num_active_streams(&self) -> usize {
212
0
        self.ids.len()
213
0
    }
214
215
    #[cfg(feature = "unstable")]
216
0
    pub fn num_wired_streams(&self) -> usize {
217
0
        self.slab.len()
218
0
    }
219
}
220
221
// While running h2 unit/integration tests, enable this debug assertion.
222
//
223
// In practice, we don't need to ensure this. But the integration tests
224
// help to make sure we've cleaned up in cases where we could (like, the
225
// runtime isn't suddenly dropping the task for unknown reasons).
226
#[cfg(feature = "unstable")]
227
impl Drop for Store {
228
12.0k
    fn drop(&mut self) {
229
        use std::thread;
230
231
12.0k
        if !thread::panicking() {
232
12.0k
            debug_assert!(self.slab.is_empty());
233
0
        }
234
12.0k
    }
235
}
236
237
// ===== impl Queue =====
238
239
impl<N> Queue<N>
240
where
241
    N: Next,
242
{
243
549k
    pub fn new() -> Self {
244
549k
        Queue {
245
549k
            indices: None,
246
549k
            _p: PhantomData,
247
549k
        }
248
549k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::new
Line
Count
Source
243
488k
    pub fn new() -> Self {
244
488k
        Queue {
245
488k
            indices: None,
246
488k
            _p: PhantomData,
247
488k
        }
248
488k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::new
Line
Count
Source
243
12.0k
    pub fn new() -> Self {
244
12.0k
        Queue {
245
12.0k
            indices: None,
246
12.0k
            _p: PhantomData,
247
12.0k
        }
248
12.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::new
Line
Count
Source
243
12.0k
    pub fn new() -> Self {
244
12.0k
        Queue {
245
12.0k
            indices: None,
246
12.0k
            _p: PhantomData,
247
12.0k
        }
248
12.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::new
Line
Count
Source
243
12.0k
    pub fn new() -> Self {
244
12.0k
        Queue {
245
12.0k
            indices: None,
246
12.0k
            _p: PhantomData,
247
12.0k
        }
248
12.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::new
Line
Count
Source
243
12.0k
    pub fn new() -> Self {
244
12.0k
        Queue {
245
12.0k
            indices: None,
246
12.0k
            _p: PhantomData,
247
12.0k
        }
248
12.0k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::new
Line
Count
Source
243
12.0k
    pub fn new() -> Self {
244
12.0k
        Queue {
245
12.0k
            indices: None,
246
12.0k
            _p: PhantomData,
247
12.0k
        }
248
12.0k
    }
249
250
414k
    pub fn take(&mut self) -> Self {
251
414k
        Queue {
252
414k
            indices: self.indices.take(),
253
414k
            _p: PhantomData,
254
414k
        }
255
414k
    }
256
257
    /// Queue the stream.
258
    ///
259
    /// If the stream is already contained by the list, return `false`.
260
1.26M
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
1.26M
        tracing::trace!("Queue::push_back");
262
263
1.26M
        if N::is_queued(stream) {
264
347k
            tracing::trace!(" -> already queued");
265
347k
            return false;
266
915k
        }
267
915k
268
915k
        N::set_queued(stream, true);
269
915k
270
915k
        // The next pointer shouldn't be set
271
915k
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
915k
        match self.indices {
275
900k
            Some(ref mut idxs) => {
276
900k
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
900k
                let key = stream.key();
280
900k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
900k
282
900k
                // Update the tail pointer
283
900k
                idxs.tail = stream.key();
284
            }
285
            None => {
286
15.4k
                tracing::trace!(" -> first entry");
287
15.4k
                self.indices = Some(store::Indices {
288
15.4k
                    head: stream.key(),
289
15.4k
                    tail: stream.key(),
290
15.4k
                });
291
            }
292
        }
293
294
915k
        true
295
1.26M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::push
Line
Count
Source
260
226
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
226
        tracing::trace!("Queue::push_back");
262
263
226
        if N::is_queued(stream) {
264
0
            tracing::trace!(" -> already queued");
265
0
            return false;
266
226
        }
267
226
268
226
        N::set_queued(stream, true);
269
226
270
226
        // The next pointer shouldn't be set
271
226
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
226
        match self.indices {
275
71
            Some(ref mut idxs) => {
276
71
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
71
                let key = stream.key();
280
71
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
71
282
71
                // Update the tail pointer
283
71
                idxs.tail = stream.key();
284
            }
285
            None => {
286
155
                tracing::trace!(" -> first entry");
287
155
                self.indices = Some(store::Indices {
288
155
                    head: stream.key(),
289
155
                    tail: stream.key(),
290
155
                });
291
            }
292
        }
293
294
226
        true
295
226
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::push
Line
Count
Source
260
16.5k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
16.5k
        tracing::trace!("Queue::push_back");
262
263
16.5k
        if N::is_queued(stream) {
264
0
            tracing::trace!(" -> already queued");
265
0
            return false;
266
16.5k
        }
267
16.5k
268
16.5k
        N::set_queued(stream, true);
269
16.5k
270
16.5k
        // The next pointer shouldn't be set
271
16.5k
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
16.5k
        match self.indices {
275
12.2k
            Some(ref mut idxs) => {
276
12.2k
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
12.2k
                let key = stream.key();
280
12.2k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
12.2k
282
12.2k
                // Update the tail pointer
283
12.2k
                idxs.tail = stream.key();
284
            }
285
            None => {
286
4.31k
                tracing::trace!(" -> first entry");
287
4.31k
                self.indices = Some(store::Indices {
288
4.31k
                    head: stream.key(),
289
4.31k
                    tail: stream.key(),
290
4.31k
                });
291
            }
292
        }
293
294
16.5k
        true
295
16.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::push
Line
Count
Source
260
258k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
258k
        tracing::trace!("Queue::push_back");
262
263
258k
        if N::is_queued(stream) {
264
45.8k
            tracing::trace!(" -> already queued");
265
45.8k
            return false;
266
212k
        }
267
212k
268
212k
        N::set_queued(stream, true);
269
212k
270
212k
        // The next pointer shouldn't be set
271
212k
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
212k
        match self.indices {
275
210k
            Some(ref mut idxs) => {
276
210k
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
210k
                let key = stream.key();
280
210k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
210k
282
210k
                // Update the tail pointer
283
210k
                idxs.tail = stream.key();
284
            }
285
            None => {
286
2.08k
                tracing::trace!(" -> first entry");
287
2.08k
                self.indices = Some(store::Indices {
288
2.08k
                    head: stream.key(),
289
2.08k
                    tail: stream.key(),
290
2.08k
                });
291
            }
292
        }
293
294
212k
        true
295
258k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::push
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::push
Line
Count
Source
260
414k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
414k
        tracing::trace!("Queue::push_back");
262
263
414k
        if N::is_queued(stream) {
264
0
            tracing::trace!(" -> already queued");
265
0
            return false;
266
414k
        }
267
414k
268
414k
        N::set_queued(stream, true);
269
414k
270
414k
        // The next pointer shouldn't be set
271
414k
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
414k
        match self.indices {
275
410k
            Some(ref mut idxs) => {
276
410k
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
410k
                let key = stream.key();
280
410k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
410k
282
410k
                // Update the tail pointer
283
410k
                idxs.tail = stream.key();
284
            }
285
            None => {
286
3.95k
                tracing::trace!(" -> first entry");
287
3.95k
                self.indices = Some(store::Indices {
288
3.95k
                    head: stream.key(),
289
3.95k
                    tail: stream.key(),
290
3.95k
                });
291
            }
292
        }
293
294
414k
        true
295
414k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push
Line
Count
Source
260
573k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261
573k
        tracing::trace!("Queue::push_back");
262
263
573k
        if N::is_queued(stream) {
264
301k
            tracing::trace!(" -> already queued");
265
301k
            return false;
266
271k
        }
267
271k
268
271k
        N::set_queued(stream, true);
269
271k
270
271k
        // The next pointer shouldn't be set
271
271k
        debug_assert!(N::next(stream).is_none());
272
273
        // Queue the stream
274
271k
        match self.indices {
275
266k
            Some(ref mut idxs) => {
276
266k
                tracing::trace!(" -> existing entries");
277
278
                // Update the current tail node to point to `stream`
279
266k
                let key = stream.key();
280
266k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
266k
282
266k
                // Update the tail pointer
283
266k
                idxs.tail = stream.key();
284
            }
285
            None => {
286
4.93k
                tracing::trace!(" -> first entry");
287
4.93k
                self.indices = Some(store::Indices {
288
4.93k
                    head: stream.key(),
289
4.93k
                    tail: stream.key(),
290
4.93k
                });
291
            }
292
        }
293
294
271k
        true
295
573k
    }
296
297
    /// Queue the stream
298
    ///
299
    /// If the stream is already contained by the list, return `false`.
300
186k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
301
186k
        tracing::trace!("Queue::push_front");
302
303
186k
        if N::is_queued(stream) {
304
0
            tracing::trace!(" -> already queued");
305
0
            return false;
306
186k
        }
307
186k
308
186k
        N::set_queued(stream, true);
309
186k
310
186k
        // The next pointer shouldn't be set
311
186k
        debug_assert!(N::next(stream).is_none());
312
313
        // Queue the stream
314
186k
        match self.indices {
315
184k
            Some(ref mut idxs) => {
316
184k
                tracing::trace!(" -> existing entries");
317
318
                // Update the provided stream to point to the head node
319
184k
                let head_key = stream.resolve(idxs.head).key();
320
184k
                N::set_next(stream, Some(head_key));
321
184k
322
184k
                // Update the head pointer
323
184k
                idxs.head = stream.key();
324
            }
325
            None => {
326
1.59k
                tracing::trace!(" -> first entry");
327
1.59k
                self.indices = Some(store::Indices {
328
1.59k
                    head: stream.key(),
329
1.59k
                    tail: stream.key(),
330
1.59k
                });
331
            }
332
        }
333
334
186k
        true
335
186k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<_>>::push_front
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push_front
Line
Count
Source
300
186k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
301
186k
        tracing::trace!("Queue::push_front");
302
303
186k
        if N::is_queued(stream) {
304
0
            tracing::trace!(" -> already queued");
305
0
            return false;
306
186k
        }
307
186k
308
186k
        N::set_queued(stream, true);
309
186k
310
186k
        // The next pointer shouldn't be set
311
186k
        debug_assert!(N::next(stream).is_none());
312
313
        // Queue the stream
314
186k
        match self.indices {
315
184k
            Some(ref mut idxs) => {
316
184k
                tracing::trace!(" -> existing entries");
317
318
                // Update the provided stream to point to the head node
319
184k
                let head_key = stream.resolve(idxs.head).key();
320
184k
                N::set_next(stream, Some(head_key));
321
184k
322
184k
                // Update the head pointer
323
184k
                idxs.head = stream.key();
324
            }
325
            None => {
326
1.59k
                tracing::trace!(" -> first entry");
327
1.59k
                self.indices = Some(store::Indices {
328
1.59k
                    head: stream.key(),
329
1.59k
                    tail: stream.key(),
330
1.59k
                });
331
            }
332
        }
333
334
186k
        true
335
186k
    }
336
337
1.93M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
1.93M
    where
339
1.93M
        R: Resolve,
340
1.93M
    {
341
1.93M
        if let Some(mut idxs) = self.indices {
342
1.10M
            let mut stream = store.resolve(idxs.head);
343
1.10M
344
1.10M
            if idxs.head == idxs.tail {
345
17.0k
                assert!(N::next(&stream).is_none());
346
17.0k
                self.indices = None;
347
1.08M
            } else {
348
1.08M
                idxs.head = N::take_next(&mut stream).unwrap();
349
1.08M
                self.indices = Some(idxs);
350
1.08M
            }
351
352
1.10M
            debug_assert!(N::is_queued(&stream));
353
1.10M
            N::set_queued(&mut stream, false);
354
1.10M
355
1.10M
            return Some(stream);
356
837k
        }
357
837k
358
837k
        None
359
1.93M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
426k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
426k
    where
339
426k
        R: Resolve,
340
426k
    {
341
426k
        if let Some(mut idxs) = self.indices {
342
218
            let mut stream = store.resolve(idxs.head);
343
218
344
218
            if idxs.head == idxs.tail {
345
150
                assert!(N::next(&stream).is_none());
346
150
                self.indices = None;
347
68
            } else {
348
68
                idxs.head = N::take_next(&mut stream).unwrap();
349
68
                self.indices = Some(idxs);
350
68
            }
351
352
218
            debug_assert!(N::is_queued(&stream));
353
218
            N::set_queued(&mut stream, false);
354
218
355
218
            return Some(stream);
356
426k
        }
357
426k
358
426k
        None
359
426k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
32.5k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
32.5k
    where
339
32.5k
        R: Resolve,
340
32.5k
    {
341
32.5k
        if let Some(mut idxs) = self.indices {
342
16.5k
            let mut stream = store.resolve(idxs.head);
343
16.5k
344
16.5k
            if idxs.head == idxs.tail {
345
4.31k
                assert!(N::next(&stream).is_none());
346
4.31k
                self.indices = None;
347
12.2k
            } else {
348
12.2k
                idxs.head = N::take_next(&mut stream).unwrap();
349
12.2k
                self.indices = Some(idxs);
350
12.2k
            }
351
352
16.5k
            debug_assert!(N::is_queued(&stream));
353
16.5k
            N::set_queued(&mut stream, false);
354
16.5k
355
16.5k
            return Some(stream);
356
15.9k
        }
357
15.9k
358
15.9k
        None
359
32.5k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Ptr>
Line
Count
Source
337
172k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
172k
    where
339
172k
        R: Resolve,
340
172k
    {
341
172k
        if let Some(mut idxs) = self.indices {
342
154k
            let mut stream = store.resolve(idxs.head);
343
154k
344
154k
            if idxs.head == idxs.tail {
345
1.59k
                assert!(N::next(&stream).is_none());
346
1.59k
                self.indices = None;
347
153k
            } else {
348
153k
                idxs.head = N::take_next(&mut stream).unwrap();
349
153k
                self.indices = Some(idxs);
350
153k
            }
351
352
154k
            debug_assert!(N::is_queued(&stream));
353
154k
            N::set_queued(&mut stream, false);
354
154k
355
154k
            return Some(stream);
356
17.9k
        }
357
17.9k
358
17.9k
        None
359
172k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
75.4k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
75.4k
    where
339
75.4k
        R: Resolve,
340
75.4k
    {
341
75.4k
        if let Some(mut idxs) = self.indices {
342
57.9k
            let mut stream = store.resolve(idxs.head);
343
57.9k
344
57.9k
            if idxs.head == idxs.tail {
345
485
                assert!(N::next(&stream).is_none());
346
485
                self.indices = None;
347
57.5k
            } else {
348
57.5k
                idxs.head = N::take_next(&mut stream).unwrap();
349
57.5k
                self.indices = Some(idxs);
350
57.5k
            }
351
352
57.9k
            debug_assert!(N::is_queued(&stream));
353
57.9k
            N::set_queued(&mut stream, false);
354
57.9k
355
57.9k
            return Some(stream);
356
17.4k
        }
357
17.4k
358
17.4k
        None
359
75.4k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
118k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
118k
    where
339
118k
        R: Resolve,
340
118k
    {
341
118k
        if let Some(mut idxs) = self.indices {
342
0
            let mut stream = store.resolve(idxs.head);
343
0
344
0
            if idxs.head == idxs.tail {
345
0
                assert!(N::next(&stream).is_none());
346
0
                self.indices = None;
347
0
            } else {
348
0
                idxs.head = N::take_next(&mut stream).unwrap();
349
0
                self.indices = Some(idxs);
350
0
            }
351
352
0
            debug_assert!(N::is_queued(&stream));
353
0
            N::set_queued(&mut stream, false);
354
0
355
0
            return Some(stream);
356
118k
        }
357
118k
358
118k
        None
359
118k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
539k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
539k
    where
339
539k
        R: Resolve,
340
539k
    {
341
539k
        if let Some(mut idxs) = self.indices {
342
414k
            let mut stream = store.resolve(idxs.head);
343
414k
344
414k
            if idxs.head == idxs.tail {
345
3.95k
                assert!(N::next(&stream).is_none());
346
3.95k
                self.indices = None;
347
410k
            } else {
348
410k
                idxs.head = N::take_next(&mut stream).unwrap();
349
410k
                self.indices = Some(idxs);
350
410k
            }
351
352
414k
            debug_assert!(N::is_queued(&stream));
353
414k
            N::set_queued(&mut stream, false);
354
414k
355
414k
            return Some(stream);
356
124k
        }
357
124k
358
124k
        None
359
539k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
337
574k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338
574k
    where
339
574k
        R: Resolve,
340
574k
    {
341
574k
        if let Some(mut idxs) = self.indices {
342
458k
            let mut stream = store.resolve(idxs.head);
343
458k
344
458k
            if idxs.head == idxs.tail {
345
6.52k
                assert!(N::next(&stream).is_none());
346
6.52k
                self.indices = None;
347
451k
            } else {
348
451k
                idxs.head = N::take_next(&mut stream).unwrap();
349
451k
                self.indices = Some(idxs);
350
451k
            }
351
352
458k
            debug_assert!(N::is_queued(&stream));
353
458k
            N::set_queued(&mut stream, false);
354
458k
355
458k
            return Some(stream);
356
116k
        }
357
116k
358
116k
        None
359
574k
    }
360
361
1.10M
    pub fn is_empty(&self) -> bool {
362
1.10M
        self.indices.is_none()
363
1.10M
    }
364
365
231k
    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
366
231k
    where
367
231k
        R: Resolve,
368
231k
        F: Fn(&Stream) -> bool,
369
231k
    {
370
231k
        if let Some(idxs) = self.indices {
371
231k
            let should_pop = f(&store.resolve(idxs.head));
372
231k
            if should_pop {
373
0
                return self.pop(store);
374
231k
            }
375
0
        }
376
377
231k
        None
378
231k
    }
379
}
380
381
// ===== impl Ptr =====
382
383
impl<'a> Ptr<'a> {
384
    /// Returns the Key associated with the stream
385
2.70M
    pub fn key(&self) -> Key {
386
2.70M
        self.key
387
2.70M
    }
388
389
414k
    pub fn store_mut(&mut self) -> &mut Store {
390
414k
        self.store
391
414k
    }
392
393
    /// Remove the stream from the store
394
476k
    pub fn remove(self) -> StreamId {
395
476k
        // The stream must have been unlinked before this point
396
476k
        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
397
398
        // Remove the stream state
399
476k
        let stream = self.store.slab.remove(self.key.index.0 as usize);
400
476k
        assert_eq!(stream.id, self.key.stream_id);
401
476k
        stream.id
402
476k
    }
403
404
    /// Remove the StreamId -> stream state association.
405
    ///
406
    /// This will effectively remove the stream as far as the H2 protocol is
407
    /// concerned.
408
1.27M
    pub fn unlink(&mut self) {
409
1.27M
        let id = self.key.stream_id;
410
1.27M
        self.store.ids.swap_remove(&id);
411
1.27M
    }
412
}
413
414
impl<'a> Resolve for Ptr<'a> {
415
1.23M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
416
1.23M
        Ptr {
417
1.23M
            key,
418
1.23M
            store: &mut *self.store,
419
1.23M
        }
420
1.23M
    }
421
}
422
423
impl<'a> ops::Deref for Ptr<'a> {
424
    type Target = Stream;
425
426
29.8M
    fn deref(&self) -> &Stream {
427
29.8M
        &self.store[self.key]
428
29.8M
    }
429
}
430
431
impl<'a> ops::DerefMut for Ptr<'a> {
432
14.3M
    fn deref_mut(&mut self) -> &mut Stream {
433
14.3M
        &mut self.store[self.key]
434
14.3M
    }
435
}
436
437
impl<'a> fmt::Debug for Ptr<'a> {
438
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
439
0
        (**self).fmt(fmt)
440
0
    }
441
}
442
443
// ===== impl OccupiedEntry =====
444
445
impl<'a> OccupiedEntry<'a> {
446
19.5k
    pub fn key(&self) -> Key {
447
19.5k
        let stream_id = *self.ids.key();
448
19.5k
        let index = *self.ids.get();
449
19.5k
        Key { index, stream_id }
450
19.5k
    }
451
}
452
453
// ===== impl VacantEntry =====
454
455
impl<'a> VacantEntry<'a> {
456
61.1k
    pub fn insert(self, value: Stream) -> Key {
457
61.1k
        // Insert the value in the slab
458
61.1k
        let stream_id = value.id;
459
61.1k
        let index = SlabIndex(self.slab.insert(value) as u32);
460
61.1k
461
61.1k
        // Insert the handle in the ID map
462
61.1k
        self.ids.insert(index);
463
61.1k
464
61.1k
        Key { index, stream_id }
465
61.1k
    }
466
}