Coverage Report

Created: 2025-08-28 07:05

/src/h2/src/proto/ping_pong.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::codec::Codec;
2
use crate::frame::Ping;
3
use crate::proto::{self, PingPayload};
4
5
use atomic_waker::AtomicWaker;
6
use bytes::Buf;
7
use std::io;
8
use std::sync::atomic::{AtomicUsize, Ordering};
9
use std::sync::Arc;
10
use std::task::{Context, Poll};
11
use tokio::io::AsyncWrite;
12
13
/// Acknowledges ping requests from the remote.
14
#[derive(Debug)]
15
pub(crate) struct PingPong {
16
    pending_ping: Option<PendingPing>,
17
    pending_pong: Option<PingPayload>,
18
    user_pings: Option<UserPingsRx>,
19
}
20
21
#[derive(Debug)]
22
pub(crate) struct UserPings(Arc<UserPingsInner>);
23
24
#[derive(Debug)]
25
struct UserPingsRx(Arc<UserPingsInner>);
26
27
#[derive(Debug)]
28
struct UserPingsInner {
29
    state: AtomicUsize,
30
    /// Task to wake up the main `Connection`.
31
    ping_task: AtomicWaker,
32
    /// Task to wake up `share::PingPong::poll_pong`.
33
    pong_task: AtomicWaker,
34
}
35
36
#[derive(Debug)]
37
struct PendingPing {
38
    payload: PingPayload,
39
    sent: bool,
40
}
41
42
/// Status returned from `PingPong::recv_ping`.
43
#[derive(Debug)]
44
pub(crate) enum ReceivedPing {
45
    MustAck,
46
    Unknown,
47
    Shutdown,
48
}
49
50
/// No user ping pending.
51
const USER_STATE_EMPTY: usize = 0;
52
/// User has called `send_ping`, but PING hasn't been written yet.
53
const USER_STATE_PENDING_PING: usize = 1;
54
/// User PING has been written, waiting for PONG.
55
const USER_STATE_PENDING_PONG: usize = 2;
56
/// We've received user PONG, waiting for user to `poll_pong`.
57
const USER_STATE_RECEIVED_PONG: usize = 3;
58
/// The connection is closed.
59
const USER_STATE_CLOSED: usize = 4;
60
61
// ===== impl PingPong =====
62
63
impl PingPong {
64
12.9k
    pub(crate) fn new() -> Self {
65
12.9k
        PingPong {
66
12.9k
            pending_ping: None,
67
12.9k
            pending_pong: None,
68
12.9k
            user_pings: None,
69
12.9k
        }
70
12.9k
    }
71
72
    /// Can only be called once. If called a second time, returns `None`.
73
0
    pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
74
0
        if self.user_pings.is_some() {
75
0
            return None;
76
0
        }
77
0
78
0
        let user_pings = Arc::new(UserPingsInner {
79
0
            state: AtomicUsize::new(USER_STATE_EMPTY),
80
0
            ping_task: AtomicWaker::new(),
81
0
            pong_task: AtomicWaker::new(),
82
0
        });
83
0
        self.user_pings = Some(UserPingsRx(user_pings.clone()));
84
0
        Some(UserPings(user_pings))
85
0
    }
86
87
0
    pub(crate) fn ping_shutdown(&mut self) {
88
0
        assert!(self.pending_ping.is_none());
89
90
0
        self.pending_ping = Some(PendingPing {
91
0
            payload: Ping::SHUTDOWN,
92
0
            sent: false,
93
0
        });
94
0
    }
95
96
    /// Process a ping
97
311
    pub(crate) fn recv_ping(&mut self, ping: Ping) -> ReceivedPing {
98
311
        // The caller should always check that `send_pongs` returns ready before
99
311
        // calling `recv_ping`.
100
311
        assert!(self.pending_pong.is_none());
101
102
311
        if ping.is_ack() {
103
67
            if let Some(pending) = self.pending_ping.take() {
104
0
                if &pending.payload == ping.payload() {
105
0
                    assert_eq!(
106
0
                        &pending.payload,
107
                        &Ping::SHUTDOWN,
108
0
                        "pending_ping should be for shutdown",
109
                    );
110
0
                    tracing::trace!("recv PING SHUTDOWN ack");
111
0
                    return ReceivedPing::Shutdown;
112
0
                }
113
0
114
0
                // if not the payload we expected, put it back.
115
0
                self.pending_ping = Some(pending);
116
67
            }
117
118
67
            if let Some(ref users) = self.user_pings {
119
0
                if ping.payload() == &Ping::USER && users.receive_pong() {
120
0
                    tracing::trace!("recv PING USER ack");
121
0
                    return ReceivedPing::Unknown;
122
0
                }
123
67
            }
124
125
            // else we were acked a ping we didn't send?
126
            // The spec doesn't require us to do anything about this,
127
            // so for resiliency, just ignore it for now.
128
67
            tracing::warn!("recv PING ack that we never sent: {:?}", ping);
129
67
            ReceivedPing::Unknown
130
        } else {
131
            // Save the ping's payload to be sent as an acknowledgement.
132
244
            self.pending_pong = Some(ping.into_payload());
133
244
            ReceivedPing::MustAck
134
        }
135
311
    }
136
137
    /// Send any pending pongs.
138
2.41M
    pub(crate) fn send_pending_pong<T, B>(
139
2.41M
        &mut self,
140
2.41M
        cx: &mut Context,
141
2.41M
        dst: &mut Codec<T, B>,
142
2.41M
    ) -> Poll<io::Result<()>>
143
2.41M
    where
144
2.41M
        T: AsyncWrite + Unpin,
145
2.41M
        B: Buf,
146
2.41M
    {
147
2.41M
        if let Some(pong) = self.pending_pong.take() {
148
829
            if !dst.poll_ready(cx)?.is_ready() {
149
593
                self.pending_pong = Some(pong);
150
593
                return Poll::Pending;
151
219
            }
152
219
153
219
            dst.buffer(Ping::pong(pong).into())
154
219
                .expect("invalid pong frame");
155
2.41M
        }
156
157
2.41M
        Poll::Ready(Ok(()))
158
2.41M
    }
Unexecuted instantiation: <h2::proto::ping_pong::PingPong>::send_pending_pong::<_, _>
<h2::proto::ping_pong::PingPong>::send_pending_pong::<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>
Line
Count
Source
138
2.41M
    pub(crate) fn send_pending_pong<T, B>(
139
2.41M
        &mut self,
140
2.41M
        cx: &mut Context,
141
2.41M
        dst: &mut Codec<T, B>,
142
2.41M
    ) -> Poll<io::Result<()>>
143
2.41M
    where
144
2.41M
        T: AsyncWrite + Unpin,
145
2.41M
        B: Buf,
146
2.41M
    {
147
2.41M
        if let Some(pong) = self.pending_pong.take() {
148
829
            if !dst.poll_ready(cx)?.is_ready() {
149
593
                self.pending_pong = Some(pong);
150
593
                return Poll::Pending;
151
219
            }
152
219
153
219
            dst.buffer(Ping::pong(pong).into())
154
219
                .expect("invalid pong frame");
155
2.41M
        }
156
157
2.41M
        Poll::Ready(Ok(()))
158
2.41M
    }
159
160
    /// Send any pending pings.
161
2.41M
    pub(crate) fn send_pending_ping<T, B>(
162
2.41M
        &mut self,
163
2.41M
        cx: &mut Context,
164
2.41M
        dst: &mut Codec<T, B>,
165
2.41M
    ) -> Poll<io::Result<()>>
166
2.41M
    where
167
2.41M
        T: AsyncWrite + Unpin,
168
2.41M
        B: Buf,
169
2.41M
    {
170
2.41M
        if let Some(ref mut ping) = self.pending_ping {
171
0
            if !ping.sent {
172
0
                if !dst.poll_ready(cx)?.is_ready() {
173
0
                    return Poll::Pending;
174
0
                }
175
0
176
0
                dst.buffer(Ping::new(ping.payload).into())
177
0
                    .expect("invalid ping frame");
178
0
                ping.sent = true;
179
0
            }
180
2.41M
        } else if let Some(ref users) = self.user_pings {
181
0
            if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING {
182
0
                if !dst.poll_ready(cx)?.is_ready() {
183
0
                    return Poll::Pending;
184
0
                }
185
0
186
0
                dst.buffer(Ping::new(Ping::USER).into())
187
0
                    .expect("invalid ping frame");
188
0
                users
189
0
                    .0
190
0
                    .state
191
0
                    .store(USER_STATE_PENDING_PONG, Ordering::Release);
192
0
            } else {
193
0
                users.0.ping_task.register(cx.waker());
194
0
            }
195
2.41M
        }
196
197
2.41M
        Poll::Ready(Ok(()))
198
2.41M
    }
Unexecuted instantiation: <h2::proto::ping_pong::PingPong>::send_pending_ping::<_, _>
<h2::proto::ping_pong::PingPong>::send_pending_ping::<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>>
Line
Count
Source
161
2.41M
    pub(crate) fn send_pending_ping<T, B>(
162
2.41M
        &mut self,
163
2.41M
        cx: &mut Context,
164
2.41M
        dst: &mut Codec<T, B>,
165
2.41M
    ) -> Poll<io::Result<()>>
166
2.41M
    where
167
2.41M
        T: AsyncWrite + Unpin,
168
2.41M
        B: Buf,
169
2.41M
    {
170
2.41M
        if let Some(ref mut ping) = self.pending_ping {
171
0
            if !ping.sent {
172
0
                if !dst.poll_ready(cx)?.is_ready() {
173
0
                    return Poll::Pending;
174
0
                }
175
0
176
0
                dst.buffer(Ping::new(ping.payload).into())
177
0
                    .expect("invalid ping frame");
178
0
                ping.sent = true;
179
0
            }
180
2.41M
        } else if let Some(ref users) = self.user_pings {
181
0
            if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING {
182
0
                if !dst.poll_ready(cx)?.is_ready() {
183
0
                    return Poll::Pending;
184
0
                }
185
0
186
0
                dst.buffer(Ping::new(Ping::USER).into())
187
0
                    .expect("invalid ping frame");
188
0
                users
189
0
                    .0
190
0
                    .state
191
0
                    .store(USER_STATE_PENDING_PONG, Ordering::Release);
192
0
            } else {
193
0
                users.0.ping_task.register(cx.waker());
194
0
            }
195
2.41M
        }
196
197
2.41M
        Poll::Ready(Ok(()))
198
2.41M
    }
199
}
200
201
impl ReceivedPing {
202
311
    pub(crate) fn is_shutdown(&self) -> bool {
203
311
        matches!(*self, Self::Shutdown)
204
311
    }
205
}
206
207
// ===== impl UserPings =====
208
209
impl UserPings {
210
0
    pub(crate) fn send_ping(&self) -> Result<(), Option<proto::Error>> {
211
0
        let prev = self
212
0
            .0
213
0
            .state
214
0
            .compare_exchange(
215
0
                USER_STATE_EMPTY,        // current
216
0
                USER_STATE_PENDING_PING, // new
217
0
                Ordering::AcqRel,
218
0
                Ordering::Acquire,
219
0
            )
220
0
            .unwrap_or_else(|v| v);
221
0
222
0
        match prev {
223
            USER_STATE_EMPTY => {
224
0
                self.0.ping_task.wake();
225
0
                Ok(())
226
            }
227
0
            USER_STATE_CLOSED => Err(Some(broken_pipe().into())),
228
            _ => {
229
                // Was already pending, user error!
230
0
                Err(None)
231
            }
232
        }
233
0
    }
234
235
0
    pub(crate) fn poll_pong(&self, cx: &mut Context) -> Poll<Result<(), proto::Error>> {
236
0
        // Must register before checking state, in case state were to change
237
0
        // before we could register, and then the ping would just be lost.
238
0
        self.0.pong_task.register(cx.waker());
239
0
        let prev = self
240
0
            .0
241
0
            .state
242
0
            .compare_exchange(
243
0
                USER_STATE_RECEIVED_PONG, // current
244
0
                USER_STATE_EMPTY,         // new
245
0
                Ordering::AcqRel,
246
0
                Ordering::Acquire,
247
0
            )
248
0
            .unwrap_or_else(|v| v);
249
0
250
0
        match prev {
251
0
            USER_STATE_RECEIVED_PONG => Poll::Ready(Ok(())),
252
0
            USER_STATE_CLOSED => Poll::Ready(Err(broken_pipe().into())),
253
0
            _ => Poll::Pending,
254
        }
255
0
    }
256
}
257
258
// ===== impl UserPingsRx =====
259
260
impl UserPingsRx {
261
0
    fn receive_pong(&self) -> bool {
262
0
        let prev = self
263
0
            .0
264
0
            .state
265
0
            .compare_exchange(
266
0
                USER_STATE_PENDING_PONG,  // current
267
0
                USER_STATE_RECEIVED_PONG, // new
268
0
                Ordering::AcqRel,
269
0
                Ordering::Acquire,
270
0
            )
271
0
            .unwrap_or_else(|v| v);
272
0
273
0
        if prev == USER_STATE_PENDING_PONG {
274
0
            self.0.pong_task.wake();
275
0
            true
276
        } else {
277
0
            false
278
        }
279
0
    }
280
}
281
282
impl Drop for UserPingsRx {
283
0
    fn drop(&mut self) {
284
0
        self.0.state.store(USER_STATE_CLOSED, Ordering::Release);
285
0
        self.0.pong_task.wake();
286
0
    }
287
}
288
289
0
fn broken_pipe() -> io::Error {
290
0
    io::ErrorKind::BrokenPipe.into()
291
0
}