Coverage Report

Created: 2025-08-26 07:09

/src/h2/src/proto/streams/store.rs
Line
Count
Source (jump to first uncovered line)
1
use super::*;
2
3
use indexmap::{self, IndexMap};
4
5
use std::convert::Infallible;
6
use std::fmt;
7
use std::marker::PhantomData;
8
use std::ops;
9
10
/// Storage for streams
11
#[derive(Debug)]
12
pub(super) struct Store {
13
    slab: slab::Slab<Stream>,
14
    ids: IndexMap<StreamId, SlabIndex>,
15
}
16
17
/// "Pointer" to an entry in the store
18
pub(super) struct Ptr<'a> {
19
    key: Key,
20
    store: &'a mut Store,
21
}
22
23
/// References an entry in the store.
24
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25
pub(crate) struct Key {
26
    index: SlabIndex,
27
    /// Keep the stream ID in the key as an ABA guard, since slab indices
28
    /// could be re-used with a new stream.
29
    stream_id: StreamId,
30
}
31
32
// We can never have more than `StreamId::MAX` streams in the store,
33
// so we can save a smaller index (u32 vs usize).
34
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35
struct SlabIndex(u32);
36
37
pub(super) struct Queue<N> {
38
    indices: Option<store::Indices>,
39
    _p: PhantomData<N>,
40
}
41
42
pub(super) trait Next {
43
    fn next(stream: &Stream) -> Option<Key>;
44
45
    fn set_next(stream: &mut Stream, key: Option<Key>);
46
47
    fn take_next(stream: &mut Stream) -> Option<Key>;
48
49
    fn is_queued(stream: &Stream) -> bool;
50
51
    fn set_queued(stream: &mut Stream, val: bool);
52
}
53
54
/// A linked list
55
#[derive(Debug, Clone, Copy)]
56
struct Indices {
57
    pub head: Key,
58
    pub tail: Key,
59
}
60
61
pub(super) enum Entry<'a> {
62
    Occupied(OccupiedEntry<'a>),
63
    Vacant(VacantEntry<'a>),
64
}
65
66
pub(super) struct OccupiedEntry<'a> {
67
    ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
68
}
69
70
pub(super) struct VacantEntry<'a> {
71
    ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
72
    slab: &'a mut slab::Slab<Stream>,
73
}
74
75
pub(super) trait Resolve {
76
    fn resolve(&mut self, key: Key) -> Ptr<'_>;
77
}
78
79
// ===== impl Store =====
80
81
impl Store {
82
12.7k
    pub fn new() -> Self {
83
12.7k
        Store {
84
12.7k
            slab: slab::Slab::new(),
85
12.7k
            ids: IndexMap::new(),
86
12.7k
        }
87
12.7k
    }
88
89
41.1k
    pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<'_>> {
90
41.1k
        let index = match self.ids.get(id) {
91
26.4k
            Some(key) => *key,
92
14.7k
            None => return None,
93
        };
94
95
26.4k
        Some(Ptr {
96
26.4k
            key: Key {
97
26.4k
                index,
98
26.4k
                stream_id: *id,
99
26.4k
            },
100
26.4k
            store: self,
101
26.4k
        })
102
41.1k
    }
103
104
427k
    pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr<'_> {
105
427k
        let index = SlabIndex(self.slab.insert(val) as u32);
106
427k
        assert!(self.ids.insert(id, index).is_none());
107
108
427k
        Ptr {
109
427k
            key: Key {
110
427k
                index,
111
427k
                stream_id: id,
112
427k
            },
113
427k
            store: self,
114
427k
        }
115
427k
    }
116
117
53.9k
    pub fn find_entry(&mut self, id: StreamId) -> Entry<'_> {
118
        use self::indexmap::map::Entry::*;
119
120
53.9k
        match self.ids.entry(id) {
121
28.4k
            Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
122
25.5k
            Vacant(e) => Entry::Vacant(VacantEntry {
123
25.5k
                ids: e,
124
25.5k
                slab: &mut self.slab,
125
25.5k
            }),
126
        }
127
53.9k
    }
128
129
    #[allow(clippy::blocks_in_conditions)]
130
27.7k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
27.7k
    where
132
27.7k
        F: FnMut(Ptr),
133
27.7k
    {
134
870k
        match self.try_for_each(|ptr| {
135
870k
            f(ptr);
136
870k
            Ok::<_, Infallible>(())
137
870k
        }) {
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>::{closure#0}
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
357
        match self.try_for_each(|ptr| {
135
357
            f(ptr);
136
357
            Ok::<_, Infallible>(())
137
357
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
164k
        match self.try_for_each(|ptr| {
135
164k
            f(ptr);
136
164k
            Ok::<_, Infallible>(())
137
164k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
437k
        match self.try_for_each(|ptr| {
135
437k
            f(ptr);
136
437k
            Ok::<_, Infallible>(())
137
437k
        }) {
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}
Line
Count
Source
134
268k
        match self.try_for_each(|ptr| {
135
268k
            f(ptr);
136
268k
            Ok::<_, Infallible>(())
137
268k
        }) {
138
27.7k
            Ok(()) => (),
139
27.7k
            #[allow(unused)]
140
27.7k
            Err(infallible) => match infallible {},
141
27.7k
        }
142
27.7k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::for_each::<_>
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
445
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
445
    where
132
445
        F: FnMut(Ptr),
133
445
    {
134
445
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
445
        }) {
138
445
            Ok(()) => (),
139
445
            #[allow(unused)]
140
445
            Err(infallible) => match infallible {},
141
445
        }
142
445
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
7.01k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
7.01k
    where
132
7.01k
        F: FnMut(Ptr),
133
7.01k
    {
134
7.01k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
7.01k
        }) {
138
7.01k
            Ok(()) => (),
139
7.01k
            #[allow(unused)]
140
7.01k
            Err(infallible) => match infallible {},
141
7.01k
        }
142
7.01k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
3.20k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
3.20k
    where
132
3.20k
        F: FnMut(Ptr),
133
3.20k
    {
134
3.20k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
3.20k
        }) {
138
3.20k
            Ok(()) => (),
139
3.20k
            #[allow(unused)]
140
3.20k
            Err(infallible) => match infallible {},
141
3.20k
        }
142
3.20k
    }
<h2::proto::streams::store::Store>::for_each::<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>
Line
Count
Source
130
17.0k
    pub(crate) fn for_each<F>(&mut self, mut f: F)
131
17.0k
    where
132
17.0k
        F: FnMut(Ptr),
133
17.0k
    {
134
17.0k
        match self.try_for_each(|ptr| {
135
            f(ptr);
136
            Ok::<_, Infallible>(())
137
17.0k
        }) {
138
17.0k
            Ok(()) => (),
139
17.0k
            #[allow(unused)]
140
17.0k
            Err(infallible) => match infallible {},
141
17.0k
        }
142
17.0k
    }
143
144
30.9k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
30.9k
    where
146
30.9k
        F: FnMut(Ptr) -> Result<(), E>,
147
30.9k
    {
148
30.9k
        let mut len = self.ids.len();
149
30.9k
        let mut i = 0;
150
151
1.20M
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
1.16M
            let (stream_id, index) = {
154
1.16M
                let entry = self.ids.get_index(i).unwrap();
155
1.16M
                (*entry.0, *entry.1)
156
1.16M
            };
157
1.16M
158
1.16M
            f(Ptr {
159
1.16M
                key: Key { index, stream_id },
160
1.16M
                store: self,
161
1.16M
            })?;
162
163
            // TODO: This logic probably could be better...
164
1.16M
            let new_len = self.ids.len();
165
1.16M
166
1.16M
            if new_len < len {
167
401k
                debug_assert!(new_len == len - 1);
168
401k
                len -= 1;
169
768k
            } else {
170
768k
                i += 1;
171
768k
            }
172
        }
173
174
30.9k
        Ok(())
175
30.9k
    }
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#0}, h2::proto::error::Error>
Unexecuted instantiation: <h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::recv::Recv>::apply_local_settings::{closure#1}, h2::proto::error::Error>
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
445
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
445
    where
146
445
        F: FnMut(Ptr) -> Result<(), E>,
147
445
    {
148
445
        let mut len = self.ids.len();
149
445
        let mut i = 0;
150
151
802
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
357
            let (stream_id, index) = {
154
357
                let entry = self.ids.get_index(i).unwrap();
155
357
                (*entry.0, *entry.1)
156
357
            };
157
357
158
357
            f(Ptr {
159
357
                key: Key { index, stream_id },
160
357
                store: self,
161
357
            })?;
162
163
            // TODO: This logic probably could be better...
164
357
            let new_len = self.ids.len();
165
357
166
357
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
357
            } else {
170
357
                i += 1;
171
357
            }
172
        }
173
174
445
        Ok(())
175
445
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::handle_error<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
7.01k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
7.01k
    where
146
7.01k
        F: FnMut(Ptr) -> Result<(), E>,
147
7.01k
    {
148
7.01k
        let mut len = self.ids.len();
149
7.01k
        let mut i = 0;
150
151
171k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
164k
            let (stream_id, index) = {
154
164k
                let entry = self.ids.get_index(i).unwrap();
155
164k
                (*entry.0, *entry.1)
156
164k
            };
157
164k
158
164k
            f(Ptr {
159
164k
                key: Key { index, stream_id },
160
164k
                store: self,
161
164k
            })?;
162
163
            // TODO: This logic probably could be better...
164
164k
            let new_len = self.ids.len();
165
164k
166
164k
            if new_len < len {
167
151k
                debug_assert!(new_len == len - 1);
168
151k
                len -= 1;
169
12.6k
            } else {
170
12.6k
                i += 1;
171
12.6k
            }
172
        }
173
174
7.01k
        Ok(())
175
7.01k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_go_away<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
3.20k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
3.20k
    where
146
3.20k
        F: FnMut(Ptr) -> Result<(), E>,
147
3.20k
    {
148
3.20k
        let mut len = self.ids.len();
149
3.20k
        let mut i = 0;
150
151
440k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
437k
            let (stream_id, index) = {
154
437k
                let entry = self.ids.get_index(i).unwrap();
155
437k
                (*entry.0, *entry.1)
156
437k
            };
157
437k
158
437k
            f(Ptr {
159
437k
                key: Key { index, stream_id },
160
437k
                store: self,
161
437k
            })?;
162
163
            // TODO: This logic probably could be better...
164
437k
            let new_len = self.ids.len();
165
437k
166
437k
            if new_len < len {
167
29.4k
                debug_assert!(new_len == len - 1);
168
29.4k
                len -= 1;
169
407k
            } else {
170
407k
                i += 1;
171
407k
            }
172
        }
173
174
3.20k
        Ok(())
175
3.20k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::store::Store>::for_each<<h2::proto::streams::streams::Inner>::recv_eof<bytes::bytes::Bytes>::{closure#0}>::{closure#0}, core::convert::Infallible>
Line
Count
Source
144
17.0k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
17.0k
    where
146
17.0k
        F: FnMut(Ptr) -> Result<(), E>,
147
17.0k
    {
148
17.0k
        let mut len = self.ids.len();
149
17.0k
        let mut i = 0;
150
151
285k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
268k
            let (stream_id, index) = {
154
268k
                let entry = self.ids.get_index(i).unwrap();
155
268k
                (*entry.0, *entry.1)
156
268k
            };
157
268k
158
268k
            f(Ptr {
159
268k
                key: Key { index, stream_id },
160
268k
                store: self,
161
268k
            })?;
162
163
            // TODO: This logic probably could be better...
164
268k
            let new_len = self.ids.len();
165
268k
166
268k
            if new_len < len {
167
219k
                debug_assert!(new_len == len - 1);
168
219k
                len -= 1;
169
48.3k
            } else {
170
48.3k
                i += 1;
171
48.3k
            }
172
        }
173
174
17.0k
        Ok(())
175
17.0k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#0}, h2::proto::error::Error>
Line
Count
Source
144
1.87k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.87k
    where
146
1.87k
        F: FnMut(Ptr) -> Result<(), E>,
147
1.87k
    {
148
1.87k
        let mut len = self.ids.len();
149
1.87k
        let mut i = 0;
150
151
174k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
172k
            let (stream_id, index) = {
154
172k
                let entry = self.ids.get_index(i).unwrap();
155
172k
                (*entry.0, *entry.1)
156
172k
            };
157
172k
158
172k
            f(Ptr {
159
172k
                key: Key { index, stream_id },
160
172k
                store: self,
161
172k
            })?;
162
163
            // TODO: This logic probably could be better...
164
172k
            let new_len = self.ids.len();
165
172k
166
172k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
172k
            } else {
170
172k
                i += 1;
171
172k
            }
172
        }
173
174
1.87k
        Ok(())
175
1.87k
    }
<h2::proto::streams::store::Store>::try_for_each::<<h2::proto::streams::send::Send>::apply_remote_settings<bytes::bytes::Bytes>::{closure#1}, h2::proto::error::Error>
Line
Count
Source
144
1.35k
    pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145
1.35k
    where
146
1.35k
        F: FnMut(Ptr) -> Result<(), E>,
147
1.35k
    {
148
1.35k
        let mut len = self.ids.len();
149
1.35k
        let mut i = 0;
150
151
127k
        while i < len {
152
            // Get the key by index, this makes the borrow checker happy
153
126k
            let (stream_id, index) = {
154
126k
                let entry = self.ids.get_index(i).unwrap();
155
126k
                (*entry.0, *entry.1)
156
126k
            };
157
126k
158
126k
            f(Ptr {
159
126k
                key: Key { index, stream_id },
160
126k
                store: self,
161
126k
            })?;
162
163
            // TODO: This logic probably could be better...
164
126k
            let new_len = self.ids.len();
165
126k
166
126k
            if new_len < len {
167
0
                debug_assert!(new_len == len - 1);
168
0
                len -= 1;
169
126k
            } else {
170
126k
                i += 1;
171
126k
            }
172
        }
173
174
1.35k
        Ok(())
175
1.35k
    }
176
}
177
178
impl Resolve for Store {
179
4.62M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
180
4.62M
        Ptr { key, store: self }
181
4.62M
    }
182
}
183
184
impl ops::Index<Key> for Store {
185
    type Output = Stream;
186
187
30.9M
    fn index(&self, key: Key) -> &Self::Output {
188
30.9M
        self.slab
189
30.9M
            .get(key.index.0 as usize)
190
30.9M
            .filter(|s| s.id == key.stream_id)
191
30.9M
            .unwrap_or_else(|| {
192
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
193
30.9M
            })
194
30.9M
    }
195
}
196
197
impl ops::IndexMut<Key> for Store {
198
14.5M
    fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199
14.5M
        self.slab
200
14.5M
            .get_mut(key.index.0 as usize)
201
14.5M
            .filter(|s| s.id == key.stream_id)
202
14.5M
            .unwrap_or_else(|| {
203
0
                panic!("dangling store key for stream_id={:?}", key.stream_id);
204
14.5M
            })
205
14.5M
    }
206
}
207
208
impl Store {
209
    #[cfg(feature = "unstable")]
210
0
    pub fn num_active_streams(&self) -> usize {
211
0
        self.ids.len()
212
0
    }
213
214
    #[cfg(feature = "unstable")]
215
0
    pub fn num_wired_streams(&self) -> usize {
216
0
        self.slab.len()
217
0
    }
218
}
219
220
// While running h2 unit/integration tests, enable this debug assertion.
221
//
222
// In practice, we don't need to ensure this. But the integration tests
223
// help to make sure we've cleaned up in cases where we could (like, the
224
// runtime isn't suddenly dropping the task for unknown reasons).
225
#[cfg(feature = "unstable")]
226
impl Drop for Store {
227
12.7k
    fn drop(&mut self) {
228
        use std::thread;
229
230
12.7k
        if !thread::panicking() {
231
12.7k
            debug_assert!(self.slab.is_empty());
232
0
        }
233
12.7k
    }
234
}
235
236
// ===== impl Queue =====
237
238
impl<N> Queue<N>
239
where
240
    N: Next,
241
{
242
527k
    pub fn new() -> Self {
243
527k
        Queue {
244
527k
            indices: None,
245
527k
            _p: PhantomData,
246
527k
        }
247
527k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::new
Line
Count
Source
242
463k
    pub fn new() -> Self {
243
463k
        Queue {
244
463k
            indices: None,
245
463k
            _p: PhantomData,
246
463k
        }
247
463k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::new
Line
Count
Source
242
12.7k
    pub fn new() -> Self {
243
12.7k
        Queue {
244
12.7k
            indices: None,
245
12.7k
            _p: PhantomData,
246
12.7k
        }
247
12.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::new
Line
Count
Source
242
12.7k
    pub fn new() -> Self {
243
12.7k
        Queue {
244
12.7k
            indices: None,
245
12.7k
            _p: PhantomData,
246
12.7k
        }
247
12.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::new
Line
Count
Source
242
12.7k
    pub fn new() -> Self {
243
12.7k
        Queue {
244
12.7k
            indices: None,
245
12.7k
            _p: PhantomData,
246
12.7k
        }
247
12.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::new
Line
Count
Source
242
12.7k
    pub fn new() -> Self {
243
12.7k
        Queue {
244
12.7k
            indices: None,
245
12.7k
            _p: PhantomData,
246
12.7k
        }
247
12.7k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::new
Line
Count
Source
242
12.7k
    pub fn new() -> Self {
243
12.7k
        Queue {
244
12.7k
            indices: None,
245
12.7k
            _p: PhantomData,
246
12.7k
        }
247
12.7k
    }
248
249
426k
    pub fn take(&mut self) -> Self {
250
426k
        Queue {
251
426k
            indices: self.indices.take(),
252
426k
            _p: PhantomData,
253
426k
        }
254
426k
    }
255
256
    /// Queue the stream.
257
    ///
258
    /// If the stream is already contained by the list, return `false`.
259
1.31M
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
1.31M
        tracing::trace!("Queue::push_back");
261
262
1.31M
        if N::is_queued(stream) {
263
368k
            tracing::trace!(" -> already queued");
264
368k
            return false;
265
941k
        }
266
941k
267
941k
        N::set_queued(stream, true);
268
941k
269
941k
        // The next pointer shouldn't be set
270
941k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
941k
        match self.indices {
274
926k
            Some(ref mut idxs) => {
275
926k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
926k
                let key = stream.key();
279
926k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
926k
281
926k
                // Update the tail pointer
282
926k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
15.8k
                tracing::trace!(" -> first entry");
286
15.8k
                self.indices = Some(store::Indices {
287
15.8k
                    head: stream.key(),
288
15.8k
                    tail: stream.key(),
289
15.8k
                });
290
            }
291
        }
292
293
941k
        true
294
1.31M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::push
Line
Count
Source
259
190
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
190
        tracing::trace!("Queue::push_back");
261
262
190
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
190
        }
266
190
267
190
        N::set_queued(stream, true);
268
190
269
190
        // The next pointer shouldn't be set
270
190
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
190
        match self.indices {
274
42
            Some(ref mut idxs) => {
275
42
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
42
                let key = stream.key();
279
42
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
42
281
42
                // Update the tail pointer
282
42
                idxs.tail = stream.key();
283
            }
284
            None => {
285
148
                tracing::trace!(" -> first entry");
286
148
                self.indices = Some(store::Indices {
287
148
                    head: stream.key(),
288
148
                    tail: stream.key(),
289
148
                });
290
            }
291
        }
292
293
190
        true
294
190
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::push
Line
Count
Source
259
48.8k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
48.8k
        tracing::trace!("Queue::push_back");
261
262
48.8k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
48.8k
        }
266
48.8k
267
48.8k
        N::set_queued(stream, true);
268
48.8k
269
48.8k
        // The next pointer shouldn't be set
270
48.8k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
48.8k
        match self.indices {
274
44.3k
            Some(ref mut idxs) => {
275
44.3k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
44.3k
                let key = stream.key();
279
44.3k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
44.3k
281
44.3k
                // Update the tail pointer
282
44.3k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.57k
                tracing::trace!(" -> first entry");
286
4.57k
                self.indices = Some(store::Indices {
287
4.57k
                    head: stream.key(),
288
4.57k
                    tail: stream.key(),
289
4.57k
                });
290
            }
291
        }
292
293
48.8k
        true
294
48.8k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::push
Line
Count
Source
259
292k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
292k
        tracing::trace!("Queue::push_back");
261
262
292k
        if N::is_queued(stream) {
263
57.6k
            tracing::trace!(" -> already queued");
264
57.6k
            return false;
265
234k
        }
266
234k
267
234k
        N::set_queued(stream, true);
268
234k
269
234k
        // The next pointer shouldn't be set
270
234k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
234k
        match self.indices {
274
232k
            Some(ref mut idxs) => {
275
232k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
232k
                let key = stream.key();
279
232k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
232k
281
232k
                // Update the tail pointer
282
232k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
2.13k
                tracing::trace!(" -> first entry");
286
2.13k
                self.indices = Some(store::Indices {
287
2.13k
                    head: stream.key(),
288
2.13k
                    tail: stream.key(),
289
2.13k
                });
290
            }
291
        }
292
293
234k
        true
294
292k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::push
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::push
Line
Count
Source
259
426k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
426k
        tracing::trace!("Queue::push_back");
261
262
426k
        if N::is_queued(stream) {
263
0
            tracing::trace!(" -> already queued");
264
0
            return false;
265
426k
        }
266
426k
267
426k
        N::set_queued(stream, true);
268
426k
269
426k
        // The next pointer shouldn't be set
270
426k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
426k
        match self.indices {
274
422k
            Some(ref mut idxs) => {
275
422k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
422k
                let key = stream.key();
279
422k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
422k
281
422k
                // Update the tail pointer
282
422k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.07k
                tracing::trace!(" -> first entry");
286
4.07k
                self.indices = Some(store::Indices {
287
4.07k
                    head: stream.key(),
288
4.07k
                    tail: stream.key(),
289
4.07k
                });
290
            }
291
        }
292
293
426k
        true
294
426k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push
Line
Count
Source
259
543k
    pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260
543k
        tracing::trace!("Queue::push_back");
261
262
543k
        if N::is_queued(stream) {
263
311k
            tracing::trace!(" -> already queued");
264
311k
            return false;
265
232k
        }
266
232k
267
232k
        N::set_queued(stream, true);
268
232k
269
232k
        // The next pointer shouldn't be set
270
232k
        debug_assert!(N::next(stream).is_none());
271
272
        // Queue the stream
273
232k
        match self.indices {
274
227k
            Some(ref mut idxs) => {
275
227k
                tracing::trace!(" -> existing entries");
276
277
                // Update the current tail node to point to `stream`
278
227k
                let key = stream.key();
279
227k
                N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280
227k
281
227k
                // Update the tail pointer
282
227k
                idxs.tail = stream.key();
283
            }
284
            None => {
285
4.89k
                tracing::trace!(" -> first entry");
286
4.89k
                self.indices = Some(store::Indices {
287
4.89k
                    head: stream.key(),
288
4.89k
                    tail: stream.key(),
289
4.89k
                });
290
            }
291
        }
292
293
232k
        true
294
543k
    }
295
296
    /// Queue the stream
297
    ///
298
    /// If the stream is already contained by the list, return `false`.
299
184k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
184k
        tracing::trace!("Queue::push_front");
301
302
184k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
184k
        }
306
184k
307
184k
        N::set_queued(stream, true);
308
184k
309
184k
        // The next pointer shouldn't be set
310
184k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
184k
        match self.indices {
314
183k
            Some(ref mut idxs) => {
315
183k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
183k
                let head_key = stream.resolve(idxs.head).key();
319
183k
                N::set_next(stream, Some(head_key));
320
183k
321
183k
                // Update the head pointer
322
183k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.60k
                tracing::trace!(" -> first entry");
326
1.60k
                self.indices = Some(store::Indices {
327
1.60k
                    head: stream.key(),
328
1.60k
                    tail: stream.key(),
329
1.60k
                });
330
            }
331
        }
332
333
184k
        true
334
184k
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<_>>::push_front
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::push_front
Line
Count
Source
299
184k
    pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300
184k
        tracing::trace!("Queue::push_front");
301
302
184k
        if N::is_queued(stream) {
303
0
            tracing::trace!(" -> already queued");
304
0
            return false;
305
184k
        }
306
184k
307
184k
        N::set_queued(stream, true);
308
184k
309
184k
        // The next pointer shouldn't be set
310
184k
        debug_assert!(N::next(stream).is_none());
311
312
        // Queue the stream
313
184k
        match self.indices {
314
183k
            Some(ref mut idxs) => {
315
183k
                tracing::trace!(" -> existing entries");
316
317
                // Update the provided stream to point to the head node
318
183k
                let head_key = stream.resolve(idxs.head).key();
319
183k
                N::set_next(stream, Some(head_key));
320
183k
321
183k
                // Update the head pointer
322
183k
                idxs.head = stream.key();
323
            }
324
            None => {
325
1.60k
                tracing::trace!(" -> first entry");
326
1.60k
                self.indices = Some(store::Indices {
327
1.60k
                    head: stream.key(),
328
1.60k
                    tail: stream.key(),
329
1.60k
                });
330
            }
331
        }
332
333
184k
        true
334
184k
    }
335
336
2.82M
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
2.82M
    where
338
2.82M
        R: Resolve,
339
2.82M
    {
340
2.82M
        if let Some(mut idxs) = self.indices {
341
1.12M
            let mut stream = store.resolve(idxs.head);
342
1.12M
343
1.12M
            if idxs.head == idxs.tail {
344
17.4k
                assert!(N::next(&stream).is_none());
345
17.4k
                self.indices = None;
346
1.10M
            } else {
347
1.10M
                idxs.head = N::take_next(&mut stream).unwrap();
348
1.10M
                self.indices = Some(idxs);
349
1.10M
            }
350
351
1.12M
            debug_assert!(N::is_queued(&stream));
352
1.12M
            N::set_queued(&mut stream, false);
353
1.12M
354
1.12M
            return Some(stream);
355
1.69M
        }
356
1.69M
357
1.69M
        None
358
2.82M
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
439k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
439k
    where
338
439k
        R: Resolve,
339
439k
    {
340
439k
        if let Some(mut idxs) = self.indices {
341
181
            let mut stream = store.resolve(idxs.head);
342
181
343
181
            if idxs.head == idxs.tail {
344
142
                assert!(N::next(&stream).is_none());
345
142
                self.indices = None;
346
39
            } else {
347
39
                idxs.head = N::take_next(&mut stream).unwrap();
348
39
                self.indices = Some(idxs);
349
39
            }
350
351
181
            debug_assert!(N::is_queued(&stream));
352
181
            N::set_queued(&mut stream, false);
353
181
354
181
            return Some(stream);
355
438k
        }
356
438k
357
438k
        None
358
439k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
66.3k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
66.3k
    where
338
66.3k
        R: Resolve,
339
66.3k
    {
340
66.3k
        if let Some(mut idxs) = self.indices {
341
48.8k
            let mut stream = store.resolve(idxs.head);
342
48.8k
343
48.8k
            if idxs.head == idxs.tail {
344
4.57k
                assert!(N::next(&stream).is_none());
345
4.57k
                self.indices = None;
346
44.3k
            } else {
347
44.3k
                idxs.head = N::take_next(&mut stream).unwrap();
348
44.3k
                self.indices = Some(idxs);
349
44.3k
            }
350
351
48.8k
            debug_assert!(N::is_queued(&stream));
352
48.8k
            N::set_queued(&mut stream, false);
353
48.8k
354
48.8k
            return Some(stream);
355
17.4k
        }
356
17.4k
357
17.4k
        None
358
66.3k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Ptr>
Line
Count
Source
336
184k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
184k
    where
338
184k
        R: Resolve,
339
184k
    {
340
184k
        if let Some(mut idxs) = self.indices {
341
163k
            let mut stream = store.resolve(idxs.head);
342
163k
343
163k
            if idxs.head == idxs.tail {
344
1.55k
                assert!(N::next(&stream).is_none());
345
1.55k
                self.indices = None;
346
162k
            } else {
347
162k
                idxs.head = N::take_next(&mut stream).unwrap();
348
162k
                self.indices = Some(idxs);
349
162k
            }
350
351
163k
            debug_assert!(N::is_queued(&stream));
352
163k
            N::set_queued(&mut stream, false);
353
163k
354
163k
            return Some(stream);
355
20.7k
        }
356
20.7k
357
20.7k
        None
358
184k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSendCapacity>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
89.6k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
89.6k
    where
338
89.6k
        R: Resolve,
339
89.6k
    {
340
89.6k
        if let Some(mut idxs) = self.indices {
341
70.4k
            let mut stream = store.resolve(idxs.head);
342
70.4k
343
70.4k
            if idxs.head == idxs.tail {
344
587
                assert!(N::next(&stream).is_none());
345
587
                self.indices = None;
346
69.8k
            } else {
347
69.8k
                idxs.head = N::take_next(&mut stream).unwrap();
348
69.8k
                self.indices = Some(idxs);
349
69.8k
            }
350
351
70.4k
            debug_assert!(N::is_queued(&stream));
352
70.4k
            N::set_queued(&mut stream, false);
353
70.4k
354
70.4k
            return Some(stream);
355
19.2k
        }
356
19.2k
357
19.2k
        None
358
89.6k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextWindowUpdate>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
399k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
399k
    where
338
399k
        R: Resolve,
339
399k
    {
340
399k
        if let Some(mut idxs) = self.indices {
341
0
            let mut stream = store.resolve(idxs.head);
342
0
343
0
            if idxs.head == idxs.tail {
344
0
                assert!(N::next(&stream).is_none());
345
0
                self.indices = None;
346
0
            } else {
347
0
                idxs.head = N::take_next(&mut stream).unwrap();
348
0
                self.indices = Some(idxs);
349
0
            }
350
351
0
            debug_assert!(N::is_queued(&stream));
352
0
            N::set_queued(&mut stream, false);
353
0
354
0
            return Some(stream);
355
399k
        }
356
399k
357
399k
        None
358
399k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextOpen>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
827k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
827k
    where
338
827k
        R: Resolve,
339
827k
    {
340
827k
        if let Some(mut idxs) = self.indices {
341
426k
            let mut stream = store.resolve(idxs.head);
342
426k
343
426k
            if idxs.head == idxs.tail {
344
4.07k
                assert!(N::next(&stream).is_none());
345
4.07k
                self.indices = None;
346
422k
            } else {
347
422k
                idxs.head = N::take_next(&mut stream).unwrap();
348
422k
                self.indices = Some(idxs);
349
422k
            }
350
351
426k
            debug_assert!(N::is_queued(&stream));
352
426k
            N::set_queued(&mut stream, false);
353
426k
354
426k
            return Some(stream);
355
401k
        }
356
401k
357
401k
        None
358
827k
    }
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextSend>>::pop::<h2::proto::streams::store::Store>
Line
Count
Source
336
814k
    pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337
814k
    where
338
814k
        R: Resolve,
339
814k
    {
340
814k
        if let Some(mut idxs) = self.indices {
341
416k
            let mut stream = store.resolve(idxs.head);
342
416k
343
416k
            if idxs.head == idxs.tail {
344
6.49k
                assert!(N::next(&stream).is_none());
345
6.49k
                self.indices = None;
346
410k
            } else {
347
410k
                idxs.head = N::take_next(&mut stream).unwrap();
348
410k
                self.indices = Some(idxs);
349
410k
            }
350
351
416k
            debug_assert!(N::is_queued(&stream));
352
416k
            N::set_queued(&mut stream, false);
353
416k
354
416k
            return Some(stream);
355
397k
        }
356
397k
357
397k
        None
358
814k
    }
359
360
1.67M
    pub fn is_empty(&self) -> bool {
361
1.67M
        self.indices.is_none()
362
1.67M
    }
Unexecuted instantiation: <h2::proto::streams::store::Queue<h2::proto::streams::stream::NextAccept>>::is_empty
<h2::proto::streams::store::Queue<h2::proto::streams::stream::NextResetExpire>>::is_empty
Line
Count
Source
360
1.67M
    pub fn is_empty(&self) -> bool {
361
1.67M
        self.indices.is_none()
362
1.67M
    }
363
364
987k
    pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365
987k
    where
366
987k
        R: Resolve,
367
987k
        F: Fn(&Stream) -> bool,
368
987k
    {
369
987k
        if let Some(idxs) = self.indices {
370
987k
            let should_pop = f(&store.resolve(idxs.head));
371
987k
            if should_pop {
372
150
                return self.pop(store);
373
987k
            }
374
17
        }
375
376
987k
        None
377
987k
    }
378
}
379
380
impl<N> fmt::Debug for Queue<N> {
381
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382
0
        f.debug_struct("Queue")
383
0
            .field("indices", &self.indices)
384
0
            // skip phantom data
385
0
            .finish()
386
0
    }
387
}
388
389
// ===== impl Ptr =====
390
391
impl<'a> Ptr<'a> {
392
    /// Returns the Key associated with the stream
393
2.73M
    pub fn key(&self) -> Key {
394
2.73M
        self.key
395
2.73M
    }
396
397
426k
    pub fn store_mut(&mut self) -> &mut Store {
398
426k
        self.store
399
426k
    }
400
401
    /// Remove the stream from the store
402
450k
    pub fn remove(self) -> StreamId {
403
450k
        // The stream must have been unlinked before this point
404
450k
        debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
405
406
        // Remove the stream state
407
450k
        let stream = self.store.slab.remove(self.key.index.0 as usize);
408
450k
        assert_eq!(stream.id, self.key.stream_id);
409
450k
        stream.id
410
450k
    }
411
412
    /// Remove the StreamId -> stream state association.
413
    ///
414
    /// This will effectively remove the stream as far as the H2 protocol is
415
    /// concerned.
416
1.23M
    pub fn unlink(&mut self) {
417
1.23M
        let id = self.key.stream_id;
418
1.23M
        self.store.ids.swap_remove(&id);
419
1.23M
    }
420
}
421
422
impl<'a> Resolve for Ptr<'a> {
423
1.27M
    fn resolve(&mut self, key: Key) -> Ptr<'_> {
424
1.27M
        Ptr {
425
1.27M
            key,
426
1.27M
            store: &mut *self.store,
427
1.27M
        }
428
1.27M
    }
429
}
430
431
impl<'a> ops::Deref for Ptr<'a> {
432
    type Target = Stream;
433
434
30.9M
    fn deref(&self) -> &Stream {
435
30.9M
        &self.store[self.key]
436
30.9M
    }
437
}
438
439
impl<'a> ops::DerefMut for Ptr<'a> {
440
14.5M
    fn deref_mut(&mut self) -> &mut Stream {
441
14.5M
        &mut self.store[self.key]
442
14.5M
    }
443
}
444
445
impl<'a> fmt::Debug for Ptr<'a> {
446
0
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
447
0
        (**self).fmt(fmt)
448
0
    }
449
}
450
451
// ===== impl OccupiedEntry =====
452
453
impl<'a> OccupiedEntry<'a> {
454
28.4k
    pub fn key(&self) -> Key {
455
28.4k
        let stream_id = *self.ids.key();
456
28.4k
        let index = *self.ids.get();
457
28.4k
        Key { index, stream_id }
458
28.4k
    }
459
}
460
461
// ===== impl VacantEntry =====
462
463
impl<'a> VacantEntry<'a> {
464
23.5k
    pub fn insert(self, value: Stream) -> Key {
465
23.5k
        // Insert the value in the slab
466
23.5k
        let stream_id = value.id;
467
23.5k
        let index = SlabIndex(self.slab.insert(value) as u32);
468
23.5k
469
23.5k
        // Insert the handle in the ID map
470
23.5k
        self.ids.insert(index);
471
23.5k
472
23.5k
        Key { index, stream_id }
473
23.5k
    }
474
}