/src/h2/src/proto/ping_pong.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use crate::codec::Codec; |
2 | | use crate::frame::Ping; |
3 | | use crate::proto::{self, PingPayload}; |
4 | | |
5 | | use atomic_waker::AtomicWaker; |
6 | | use bytes::Buf; |
7 | | use std::io; |
8 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
9 | | use std::sync::Arc; |
10 | | use std::task::{Context, Poll}; |
11 | | use tokio::io::AsyncWrite; |
12 | | |
13 | | /// Acknowledges ping requests from the remote. |
14 | | #[derive(Debug)] |
15 | | pub(crate) struct PingPong { |
16 | | pending_ping: Option<PendingPing>, |
17 | | pending_pong: Option<PingPayload>, |
18 | | user_pings: Option<UserPingsRx>, |
19 | | } |
20 | | |
21 | | #[derive(Debug)] |
22 | | pub(crate) struct UserPings(Arc<UserPingsInner>); |
23 | | |
24 | | #[derive(Debug)] |
25 | | struct UserPingsRx(Arc<UserPingsInner>); |
26 | | |
27 | | #[derive(Debug)] |
28 | | struct UserPingsInner { |
29 | | state: AtomicUsize, |
30 | | /// Task to wake up the main `Connection`. |
31 | | ping_task: AtomicWaker, |
32 | | /// Task to wake up `share::PingPong::poll_pong`. |
33 | | pong_task: AtomicWaker, |
34 | | } |
35 | | |
36 | | #[derive(Debug)] |
37 | | struct PendingPing { |
38 | | payload: PingPayload, |
39 | | sent: bool, |
40 | | } |
41 | | |
42 | | /// Status returned from `PingPong::recv_ping`. |
43 | | #[derive(Debug)] |
44 | | pub(crate) enum ReceivedPing { |
45 | | MustAck, |
46 | | Unknown, |
47 | | Shutdown, |
48 | | } |
49 | | |
50 | | /// No user ping pending. |
51 | | const USER_STATE_EMPTY: usize = 0; |
52 | | /// User has called `send_ping`, but PING hasn't been written yet. |
53 | | const USER_STATE_PENDING_PING: usize = 1; |
54 | | /// User PING has been written, waiting for PONG. |
55 | | const USER_STATE_PENDING_PONG: usize = 2; |
56 | | /// We've received user PONG, waiting for user to `poll_pong`. |
57 | | const USER_STATE_RECEIVED_PONG: usize = 3; |
58 | | /// The connection is closed. |
59 | | const USER_STATE_CLOSED: usize = 4; |
60 | | |
61 | | // ===== impl PingPong ===== |
62 | | |
63 | | impl PingPong { |
64 | 12.9k | pub(crate) fn new() -> Self { |
65 | 12.9k | PingPong { |
66 | 12.9k | pending_ping: None, |
67 | 12.9k | pending_pong: None, |
68 | 12.9k | user_pings: None, |
69 | 12.9k | } |
70 | 12.9k | } |
71 | | |
72 | | /// Can only be called once. If called a second time, returns `None`. |
73 | 0 | pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> { |
74 | 0 | if self.user_pings.is_some() { |
75 | 0 | return None; |
76 | 0 | } |
77 | 0 |
|
78 | 0 | let user_pings = Arc::new(UserPingsInner { |
79 | 0 | state: AtomicUsize::new(USER_STATE_EMPTY), |
80 | 0 | ping_task: AtomicWaker::new(), |
81 | 0 | pong_task: AtomicWaker::new(), |
82 | 0 | }); |
83 | 0 | self.user_pings = Some(UserPingsRx(user_pings.clone())); |
84 | 0 | Some(UserPings(user_pings)) |
85 | 0 | } |
86 | | |
87 | 0 | pub(crate) fn ping_shutdown(&mut self) { |
88 | 0 | assert!(self.pending_ping.is_none()); |
89 | | |
90 | 0 | self.pending_ping = Some(PendingPing { |
91 | 0 | payload: Ping::SHUTDOWN, |
92 | 0 | sent: false, |
93 | 0 | }); |
94 | 0 | } |
95 | | |
96 | | /// Process a ping |
97 | 311 | pub(crate) fn recv_ping(&mut self, ping: Ping) -> ReceivedPing { |
98 | 311 | // The caller should always check that `send_pongs` returns ready before |
99 | 311 | // calling `recv_ping`. |
100 | 311 | assert!(self.pending_pong.is_none()); |
101 | | |
102 | 311 | if ping.is_ack() { |
103 | 67 | if let Some(pending) = self.pending_ping.take() { |
104 | 0 | if &pending.payload == ping.payload() { |
105 | 0 | assert_eq!( |
106 | 0 | &pending.payload, |
107 | | &Ping::SHUTDOWN, |
108 | 0 | "pending_ping should be for shutdown", |
109 | | ); |
110 | 0 | tracing::trace!("recv PING SHUTDOWN ack"); |
111 | 0 | return ReceivedPing::Shutdown; |
112 | 0 | } |
113 | 0 |
|
114 | 0 | // if not the payload we expected, put it back. |
115 | 0 | self.pending_ping = Some(pending); |
116 | 67 | } |
117 | | |
118 | 67 | if let Some(ref users) = self.user_pings { |
119 | 0 | if ping.payload() == &Ping::USER && users.receive_pong() { |
120 | 0 | tracing::trace!("recv PING USER ack"); |
121 | 0 | return ReceivedPing::Unknown; |
122 | 0 | } |
123 | 67 | } |
124 | | |
125 | | // else we were acked a ping we didn't send? |
126 | | // The spec doesn't require us to do anything about this, |
127 | | // so for resiliency, just ignore it for now. |
128 | 67 | tracing::warn!("recv PING ack that we never sent: {:?}", ping); |
129 | 67 | ReceivedPing::Unknown |
130 | | } else { |
131 | | // Save the ping's payload to be sent as an acknowledgement. |
132 | 244 | self.pending_pong = Some(ping.into_payload()); |
133 | 244 | ReceivedPing::MustAck |
134 | | } |
135 | 311 | } |
136 | | |
137 | | /// Send any pending pongs. |
138 | 2.41M | pub(crate) fn send_pending_pong<T, B>( |
139 | 2.41M | &mut self, |
140 | 2.41M | cx: &mut Context, |
141 | 2.41M | dst: &mut Codec<T, B>, |
142 | 2.41M | ) -> Poll<io::Result<()>> |
143 | 2.41M | where |
144 | 2.41M | T: AsyncWrite + Unpin, |
145 | 2.41M | B: Buf, |
146 | 2.41M | { |
147 | 2.41M | if let Some(pong) = self.pending_pong.take() { |
148 | 829 | if !dst.poll_ready(cx)?.is_ready() { |
149 | 593 | self.pending_pong = Some(pong); |
150 | 593 | return Poll::Pending; |
151 | 219 | } |
152 | 219 | |
153 | 219 | dst.buffer(Ping::pong(pong).into()) |
154 | 219 | .expect("invalid pong frame"); |
155 | 2.41M | } |
156 | | |
157 | 2.41M | Poll::Ready(Ok(())) |
158 | 2.41M | } Unexecuted instantiation: <h2::proto::ping_pong::PingPong>::send_pending_pong::<_, _> <h2::proto::ping_pong::PingPong>::send_pending_pong::<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>> Line | Count | Source | 138 | 2.41M | pub(crate) fn send_pending_pong<T, B>( | 139 | 2.41M | &mut self, | 140 | 2.41M | cx: &mut Context, | 141 | 2.41M | dst: &mut Codec<T, B>, | 142 | 2.41M | ) -> Poll<io::Result<()>> | 143 | 2.41M | where | 144 | 2.41M | T: AsyncWrite + Unpin, | 145 | 2.41M | B: Buf, | 146 | 2.41M | { | 147 | 2.41M | if let Some(pong) = self.pending_pong.take() { | 148 | 829 | if !dst.poll_ready(cx)?.is_ready() { | 149 | 593 | self.pending_pong = Some(pong); | 150 | 593 | return Poll::Pending; | 151 | 219 | } | 152 | 219 | | 153 | 219 | dst.buffer(Ping::pong(pong).into()) | 154 | 219 | .expect("invalid pong frame"); | 155 | 2.41M | } | 156 | | | 157 | 2.41M | Poll::Ready(Ok(())) | 158 | 2.41M | } |
|
159 | | |
160 | | /// Send any pending pings. |
161 | 2.41M | pub(crate) fn send_pending_ping<T, B>( |
162 | 2.41M | &mut self, |
163 | 2.41M | cx: &mut Context, |
164 | 2.41M | dst: &mut Codec<T, B>, |
165 | 2.41M | ) -> Poll<io::Result<()>> |
166 | 2.41M | where |
167 | 2.41M | T: AsyncWrite + Unpin, |
168 | 2.41M | B: Buf, |
169 | 2.41M | { |
170 | 2.41M | if let Some(ref mut ping) = self.pending_ping { |
171 | 0 | if !ping.sent { |
172 | 0 | if !dst.poll_ready(cx)?.is_ready() { |
173 | 0 | return Poll::Pending; |
174 | 0 | } |
175 | 0 |
|
176 | 0 | dst.buffer(Ping::new(ping.payload).into()) |
177 | 0 | .expect("invalid ping frame"); |
178 | 0 | ping.sent = true; |
179 | 0 | } |
180 | 2.41M | } else if let Some(ref users) = self.user_pings { |
181 | 0 | if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING { |
182 | 0 | if !dst.poll_ready(cx)?.is_ready() { |
183 | 0 | return Poll::Pending; |
184 | 0 | } |
185 | 0 |
|
186 | 0 | dst.buffer(Ping::new(Ping::USER).into()) |
187 | 0 | .expect("invalid ping frame"); |
188 | 0 | users |
189 | 0 | .0 |
190 | 0 | .state |
191 | 0 | .store(USER_STATE_PENDING_PONG, Ordering::Release); |
192 | 0 | } else { |
193 | 0 | users.0.ping_task.register(cx.waker()); |
194 | 0 | } |
195 | 2.41M | } |
196 | | |
197 | 2.41M | Poll::Ready(Ok(())) |
198 | 2.41M | } Unexecuted instantiation: <h2::proto::ping_pong::PingPong>::send_pending_ping::<_, _> <h2::proto::ping_pong::PingPong>::send_pending_ping::<fuzz_e2e::MockIo, h2::proto::streams::prioritize::Prioritized<bytes::bytes::Bytes>> Line | Count | Source | 161 | 2.41M | pub(crate) fn send_pending_ping<T, B>( | 162 | 2.41M | &mut self, | 163 | 2.41M | cx: &mut Context, | 164 | 2.41M | dst: &mut Codec<T, B>, | 165 | 2.41M | ) -> Poll<io::Result<()>> | 166 | 2.41M | where | 167 | 2.41M | T: AsyncWrite + Unpin, | 168 | 2.41M | B: Buf, | 169 | 2.41M | { | 170 | 2.41M | if let Some(ref mut ping) = self.pending_ping { | 171 | 0 | if !ping.sent { | 172 | 0 | if !dst.poll_ready(cx)?.is_ready() { | 173 | 0 | return Poll::Pending; | 174 | 0 | } | 175 | 0 |
| 176 | 0 | dst.buffer(Ping::new(ping.payload).into()) | 177 | 0 | .expect("invalid ping frame"); | 178 | 0 | ping.sent = true; | 179 | 0 | } | 180 | 2.41M | } else if let Some(ref users) = self.user_pings { | 181 | 0 | if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING { | 182 | 0 | if !dst.poll_ready(cx)?.is_ready() { | 183 | 0 | return Poll::Pending; | 184 | 0 | } | 185 | 0 |
| 186 | 0 | dst.buffer(Ping::new(Ping::USER).into()) | 187 | 0 | .expect("invalid ping frame"); | 188 | 0 | users | 189 | 0 | .0 | 190 | 0 | .state | 191 | 0 | .store(USER_STATE_PENDING_PONG, Ordering::Release); | 192 | 0 | } else { | 193 | 0 | users.0.ping_task.register(cx.waker()); | 194 | 0 | } | 195 | 2.41M | } | 196 | | | 197 | 2.41M | Poll::Ready(Ok(())) | 198 | 2.41M | } |
|
199 | | } |
200 | | |
201 | | impl ReceivedPing { |
202 | 311 | pub(crate) fn is_shutdown(&self) -> bool { |
203 | 311 | matches!(*self, Self::Shutdown) |
204 | 311 | } |
205 | | } |
206 | | |
207 | | // ===== impl UserPings ===== |
208 | | |
209 | | impl UserPings { |
210 | 0 | pub(crate) fn send_ping(&self) -> Result<(), Option<proto::Error>> { |
211 | 0 | let prev = self |
212 | 0 | .0 |
213 | 0 | .state |
214 | 0 | .compare_exchange( |
215 | 0 | USER_STATE_EMPTY, // current |
216 | 0 | USER_STATE_PENDING_PING, // new |
217 | 0 | Ordering::AcqRel, |
218 | 0 | Ordering::Acquire, |
219 | 0 | ) |
220 | 0 | .unwrap_or_else(|v| v); |
221 | 0 |
|
222 | 0 | match prev { |
223 | | USER_STATE_EMPTY => { |
224 | 0 | self.0.ping_task.wake(); |
225 | 0 | Ok(()) |
226 | | } |
227 | 0 | USER_STATE_CLOSED => Err(Some(broken_pipe().into())), |
228 | | _ => { |
229 | | // Was already pending, user error! |
230 | 0 | Err(None) |
231 | | } |
232 | | } |
233 | 0 | } |
234 | | |
235 | 0 | pub(crate) fn poll_pong(&self, cx: &mut Context) -> Poll<Result<(), proto::Error>> { |
236 | 0 | // Must register before checking state, in case state were to change |
237 | 0 | // before we could register, and then the ping would just be lost. |
238 | 0 | self.0.pong_task.register(cx.waker()); |
239 | 0 | let prev = self |
240 | 0 | .0 |
241 | 0 | .state |
242 | 0 | .compare_exchange( |
243 | 0 | USER_STATE_RECEIVED_PONG, // current |
244 | 0 | USER_STATE_EMPTY, // new |
245 | 0 | Ordering::AcqRel, |
246 | 0 | Ordering::Acquire, |
247 | 0 | ) |
248 | 0 | .unwrap_or_else(|v| v); |
249 | 0 |
|
250 | 0 | match prev { |
251 | 0 | USER_STATE_RECEIVED_PONG => Poll::Ready(Ok(())), |
252 | 0 | USER_STATE_CLOSED => Poll::Ready(Err(broken_pipe().into())), |
253 | 0 | _ => Poll::Pending, |
254 | | } |
255 | 0 | } |
256 | | } |
257 | | |
258 | | // ===== impl UserPingsRx ===== |
259 | | |
260 | | impl UserPingsRx { |
261 | 0 | fn receive_pong(&self) -> bool { |
262 | 0 | let prev = self |
263 | 0 | .0 |
264 | 0 | .state |
265 | 0 | .compare_exchange( |
266 | 0 | USER_STATE_PENDING_PONG, // current |
267 | 0 | USER_STATE_RECEIVED_PONG, // new |
268 | 0 | Ordering::AcqRel, |
269 | 0 | Ordering::Acquire, |
270 | 0 | ) |
271 | 0 | .unwrap_or_else(|v| v); |
272 | 0 |
|
273 | 0 | if prev == USER_STATE_PENDING_PONG { |
274 | 0 | self.0.pong_task.wake(); |
275 | 0 | true |
276 | | } else { |
277 | 0 | false |
278 | | } |
279 | 0 | } |
280 | | } |
281 | | |
282 | | impl Drop for UserPingsRx { |
283 | 0 | fn drop(&mut self) { |
284 | 0 | self.0.state.store(USER_STATE_CLOSED, Ordering::Release); |
285 | 0 | self.0.pong_task.wake(); |
286 | 0 | } |
287 | | } |
288 | | |
289 | 0 | fn broken_pipe() -> io::Error { |
290 | 0 | io::ErrorKind::BrokenPipe.into() |
291 | 0 | } |