Coverage Report

Created: 2025-07-18 06:42

/src/h2/tests/h2-support/src/mock.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::SendFrame;
2
3
use h2::frame::{self, Frame};
4
use h2::proto::Error;
5
use h2::SendError;
6
7
use futures::future::poll_fn;
8
use futures::{ready, Stream, StreamExt};
9
10
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
11
12
use super::assert::assert_frame_eq;
13
use std::pin::Pin;
14
use std::sync::{Arc, Mutex};
15
use std::task::{Context, Poll, Waker};
16
use std::time::Duration;
17
use std::{cmp, io, usize};
18
19
/// A mock I/O
20
#[derive(Debug)]
21
pub struct Mock {
22
    pipe: Pipe,
23
}
24
25
#[derive(Debug)]
26
pub struct Handle {
27
    codec: crate::Codec<Pipe>,
28
}
29
30
#[derive(Debug)]
31
pub struct Pipe {
32
    inner: Arc<Mutex<Inner>>,
33
}
34
35
#[derive(Debug)]
36
struct Inner {
37
    /// Data written by the test case to the h2 lib.
38
    rx: Vec<u8>,
39
40
    /// Notify when data is ready to be received.
41
    rx_task: Option<Waker>,
42
43
    /// Data written by the `h2` library to be read by the test case.
44
    tx: Vec<u8>,
45
46
    /// Notify when data is written. This notifies the test case waiters.
47
    tx_task: Option<Waker>,
48
49
    /// Number of bytes that can be written before `write` returns `Poll::Pending`.
50
    tx_rem: usize,
51
52
    /// Task to notify when write capacity becomes available.
53
    tx_rem_task: Option<Waker>,
54
55
    /// True when the pipe is closed.
56
    closed: bool,
57
58
    /// Trigger an `UnexpectedEof` error on read
59
    unexpected_eof: bool,
60
}
61
62
const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
63
64
/// Create a new mock and handle
65
467
pub fn new() -> (Mock, Handle) {
66
467
    new_with_write_capacity(usize::MAX)
67
467
}
68
69
/// Create a new mock and handle allowing up to `cap` bytes to be written.
70
467
pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) {
71
467
    let inner = Arc::new(Mutex::new(Inner {
72
467
        rx: vec![],
73
467
        rx_task: None,
74
467
        tx: vec![],
75
467
        tx_task: None,
76
467
        tx_rem: cap,
77
467
        tx_rem_task: None,
78
467
        closed: false,
79
467
        unexpected_eof: false,
80
467
    }));
81
467
82
467
    let mock = Mock {
83
467
        pipe: Pipe {
84
467
            inner: inner.clone(),
85
467
        },
86
467
    };
87
467
88
467
    let handle = Handle {
89
467
        codec: h2::Codec::new(Pipe { inner }),
90
467
    };
91
467
92
467
    (mock, handle)
93
467
}
94
95
// ===== impl Handle =====
96
97
impl Handle {
98
    /// Get a mutable reference to inner Codec.
99
0
    pub fn codec_mut(&mut self) -> &mut crate::Codec<Pipe> {
100
0
        &mut self.codec
101
0
    }
102
103
0
    pub fn close_without_notify(&mut self) {
104
0
        let mut me = self.codec.get_mut().inner.lock().unwrap();
105
0
        me.unexpected_eof = true;
106
0
    }
107
108
    /// Send a frame
109
0
    pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
110
0
        // Queue the frame
111
0
        self.codec.buffer(item).unwrap();
112
0
113
0
        // Flush the frame
114
0
        poll_fn(|cx| {
115
0
            let p = self.codec.flush(cx);
116
0
            assert!(p.is_ready());
117
0
            p
118
0
        })
119
0
        .await?;
120
0
        Ok(())
121
0
    }
122
123
    /// Writes the client preface
124
0
    pub async fn write_preface(&mut self) {
125
0
        self.codec.get_mut().write_all(PREFACE).await.unwrap();
126
0
    }
127
128
    /// Read the client preface
129
0
    pub async fn read_preface(&mut self) -> io::Result<()> {
130
0
        let mut buf = vec![0u8; PREFACE.len()];
131
0
        self.read_exact(&mut buf).await?;
132
0
        assert_eq!(buf, PREFACE);
133
0
        Ok(())
134
0
    }
135
136
0
    pub async fn recv_frame<F: Into<Frame>>(&mut self, expected: F) {
137
0
        let frame = self.next().await.unwrap().unwrap();
138
0
        assert_frame_eq(frame, expected);
139
0
    }
140
141
0
    pub async fn send_frame<F: Into<SendFrame>>(&mut self, frame: F) {
142
0
        self.send(frame.into()).await.unwrap();
143
0
    }
144
145
0
    pub async fn recv_eof(&mut self) {
146
0
        let frame = self.next().await;
147
0
        assert!(frame.is_none());
148
0
    }
149
150
0
    pub async fn send_bytes(&mut self, data: &[u8]) {
151
        use bytes::Buf;
152
        use std::io::Cursor;
153
154
0
        let buf: Vec<_> = data.into();
155
0
        let mut buf = Cursor::new(buf);
156
0
157
0
        poll_fn(move |cx| {
158
0
            while buf.has_remaining() {
159
0
                let res = Pin::new(self.codec.get_mut())
160
0
                    .poll_write(cx, buf.chunk())
161
0
                    .map_err(|e| panic!("write err={:?}", e));
162
163
0
                let n = ready!(res).unwrap();
164
0
                buf.advance(n);
165
            }
166
167
0
            Poll::Ready(())
168
0
        })
169
0
        .await;
170
0
    }
171
172
    /// Perform the H2 handshake
173
0
    pub async fn assert_client_handshake(&mut self) -> frame::Settings {
174
0
        self.assert_client_handshake_with_settings(frame::Settings::default())
175
0
            .await
176
0
    }
177
178
    /// Perform the H2 handshake
179
0
    pub async fn assert_client_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings
180
0
    where
181
0
        T: Into<frame::Settings>,
182
0
    {
183
0
        let settings = settings.into();
184
0
        // Send a settings frame
185
0
        self.send(settings.into()).await.unwrap();
186
0
        self.read_preface().await.unwrap();
187
188
0
        let settings = match self.next().await {
189
0
            Some(frame) => match frame.unwrap() {
190
0
                Frame::Settings(settings) => {
191
0
                    // Send the ACK
192
0
                    let ack = frame::Settings::ack();
193
0
194
0
                    // TODO: Don't unwrap?
195
0
                    self.send(ack.into()).await.unwrap();
196
0
197
0
                    settings
198
                }
199
0
                frame => {
200
0
                    panic!("unexpected frame; frame={:?}", frame);
201
                }
202
            },
203
            None => {
204
0
                panic!("unexpected EOF");
205
            }
206
        };
207
208
0
        let frame = self.next().await.unwrap().unwrap();
209
0
        let f = assert_settings!(frame);
210
211
        // Is ACK
212
0
        assert!(f.is_ack());
213
214
0
        settings
215
0
    }
216
217
    /// Perform the H2 handshake
218
0
    pub async fn assert_server_handshake(&mut self) -> frame::Settings {
219
0
        self.assert_server_handshake_with_settings(frame::Settings::default())
220
0
            .await
221
0
    }
222
223
    /// Perform the H2 handshake
224
0
    pub async fn assert_server_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings
225
0
    where
226
0
        T: Into<frame::Settings>,
227
0
    {
228
0
        self.write_preface().await;
229
230
0
        let settings = settings.into();
231
0
        self.send(settings.into()).await.unwrap();
232
233
0
        let frame = self.next().await.expect("unexpected EOF").unwrap();
234
0
        let settings = assert_settings!(frame);
235
236
        // Send the ACK
237
0
        let ack = frame::Settings::ack();
238
0
239
0
        // TODO: Don't unwrap?
240
0
        self.send(ack.into()).await.unwrap();
241
242
0
        let frame = self.next().await;
243
0
        let f = assert_settings!(frame.unwrap().unwrap());
244
245
        // Is ACK
246
0
        assert!(f.is_ack());
247
248
0
        settings
249
0
    }
250
251
0
    pub async fn ping_pong(&mut self, payload: [u8; 8]) {
252
0
        self.send_frame(crate::frames::ping(payload)).await;
253
0
        self.recv_frame(crate::frames::ping(payload).pong()).await;
254
0
    }
255
256
0
    pub async fn buffer_bytes(&mut self, num: usize) {
257
0
        // Set tx_rem to num
258
0
        {
259
0
            let mut i = self.codec.get_mut().inner.lock().unwrap();
260
0
            i.tx_rem = num;
261
0
        }
262
0
263
0
        poll_fn(move |cx| {
264
0
            {
265
0
                let mut inner = self.codec.get_mut().inner.lock().unwrap();
266
0
                if inner.tx_rem == 0 {
267
0
                    inner.tx_rem = usize::MAX;
268
0
                } else {
269
0
                    inner.tx_task = Some(cx.waker().clone());
270
0
                    return Poll::Pending;
271
                }
272
            }
273
274
0
            Poll::Ready(())
275
0
        })
276
0
        .await;
277
0
    }
278
279
0
    pub async fn unbounded_bytes(&mut self) {
280
0
        let mut i = self.codec.get_mut().inner.lock().unwrap();
281
0
        i.tx_rem = usize::MAX;
282
283
0
        if let Some(task) = i.tx_rem_task.take() {
284
0
            task.wake();
285
0
        }
286
0
    }
287
}
288
289
impl Stream for Handle {
290
    type Item = Result<Frame, Error>;
291
292
0
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
293
0
        Pin::new(&mut self.codec).poll_next(cx)
294
0
    }
295
}
296
297
impl AsyncRead for Handle {
298
0
    fn poll_read(
299
0
        mut self: Pin<&mut Self>,
300
0
        cx: &mut Context<'_>,
301
0
        buf: &mut ReadBuf,
302
0
    ) -> Poll<io::Result<()>> {
303
0
        Pin::new(self.codec.get_mut()).poll_read(cx, buf)
304
0
    }
305
}
306
307
impl AsyncWrite for Handle {
308
0
    fn poll_write(
309
0
        mut self: Pin<&mut Self>,
310
0
        cx: &mut Context<'_>,
311
0
        buf: &[u8],
312
0
    ) -> Poll<Result<usize, io::Error>> {
313
0
        Pin::new(self.codec.get_mut()).poll_write(cx, buf)
314
0
    }
315
316
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
317
0
        Pin::new(self.codec.get_mut()).poll_flush(cx)
318
0
    }
319
320
0
    fn poll_shutdown(
321
0
        mut self: Pin<&mut Self>,
322
0
        cx: &mut Context<'_>,
323
0
    ) -> Poll<Result<(), io::Error>> {
324
0
        Pin::new(self.codec.get_mut()).poll_shutdown(cx)
325
0
    }
326
}
327
328
impl Drop for Handle {
329
467
    fn drop(&mut self) {
330
467
        // Shutdown *shouldn't* need a real Waker...
331
467
        let waker = futures::task::noop_waker();
332
467
        let mut cx = Context::from_waker(&waker);
333
467
        assert!(self.codec.shutdown(&mut cx).is_ready());
334
335
467
        if let Ok(mut me) = self.codec.get_mut().inner.lock() {
336
467
            me.closed = true;
337
338
467
            if let Some(task) = me.rx_task.take() {
339
0
                task.wake();
340
467
            }
341
0
        }
342
467
    }
343
}
344
345
// ===== impl Mock =====
346
347
impl AsyncRead for Mock {
348
0
    fn poll_read(
349
0
        self: Pin<&mut Self>,
350
0
        cx: &mut Context<'_>,
351
0
        buf: &mut ReadBuf,
352
0
    ) -> Poll<io::Result<()>> {
353
0
        assert!(
354
0
            buf.remaining() > 0,
355
0
            "attempted read with zero length buffer... wut?"
356
0
        );
357
358
0
        let mut me = self.pipe.inner.lock().unwrap();
359
0
360
0
        if me.unexpected_eof {
361
0
            return Poll::Ready(Err(io::Error::new(
362
0
                io::ErrorKind::UnexpectedEof,
363
0
                "Simulate an unexpected eof error",
364
0
            )));
365
0
        }
366
0
367
0
        if me.rx.is_empty() {
368
0
            if me.closed {
369
0
                return Poll::Ready(Ok(()));
370
0
            }
371
0
372
0
            me.rx_task = Some(cx.waker().clone());
373
0
            return Poll::Pending;
374
0
        }
375
0
376
0
        let n = cmp::min(buf.remaining(), me.rx.len());
377
0
        buf.put_slice(&me.rx[..n]);
378
0
        me.rx.drain(..n);
379
0
380
0
        Poll::Ready(Ok(()))
381
0
    }
382
}
383
384
impl AsyncWrite for Mock {
385
467
    fn poll_write(
386
467
        self: Pin<&mut Self>,
387
467
        cx: &mut Context<'_>,
388
467
        mut buf: &[u8],
389
467
    ) -> Poll<Result<usize, io::Error>> {
390
467
        let mut me = self.pipe.inner.lock().unwrap();
391
467
392
467
        if me.closed {
393
0
            return Poll::Ready(Ok(buf.len()));
394
467
        }
395
467
396
467
        if me.tx_rem == 0 {
397
0
            me.tx_rem_task = Some(cx.waker().clone());
398
0
            return Poll::Pending;
399
467
        }
400
467
401
467
        if buf.len() > me.tx_rem {
402
0
            buf = &buf[..me.tx_rem];
403
467
        }
404
405
467
        me.tx.extend(buf);
406
467
        me.tx_rem -= buf.len();
407
408
467
        if let Some(task) = me.tx_task.take() {
409
0
            task.wake();
410
467
        }
411
412
467
        Poll::Ready(Ok(buf.len()))
413
467
    }
414
415
0
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
416
0
        Poll::Ready(Ok(()))
417
0
    }
418
419
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
420
0
        Poll::Ready(Ok(()))
421
0
    }
422
}
423
424
impl Drop for Mock {
425
467
    fn drop(&mut self) {
426
467
        let mut me = self.pipe.inner.lock().unwrap();
427
467
        me.closed = true;
428
429
467
        if let Some(task) = me.tx_task.take() {
430
0
            task.wake();
431
467
        }
432
467
    }
433
}
434
435
// ===== impl Pipe =====
436
437
impl AsyncRead for Pipe {
438
0
    fn poll_read(
439
0
        self: Pin<&mut Self>,
440
0
        cx: &mut Context<'_>,
441
0
        buf: &mut ReadBuf,
442
0
    ) -> Poll<io::Result<()>> {
443
0
        assert!(
444
0
            buf.remaining() > 0,
445
0
            "attempted read with zero length buffer... wut?"
446
0
        );
447
448
0
        let mut me = self.inner.lock().unwrap();
449
0
450
0
        if me.tx.is_empty() {
451
0
            if me.closed {
452
0
                return Poll::Ready(Ok(()));
453
0
            }
454
0
455
0
            me.tx_task = Some(cx.waker().clone());
456
0
            return Poll::Pending;
457
0
        }
458
0
459
0
        let n = cmp::min(buf.remaining(), me.tx.len());
460
0
        buf.put_slice(&me.tx[..n]);
461
0
        me.tx.drain(..n);
462
0
463
0
        Poll::Ready(Ok(()))
464
0
    }
465
}
466
467
impl AsyncWrite for Pipe {
468
0
    fn poll_write(
469
0
        self: Pin<&mut Self>,
470
0
        _cx: &mut Context<'_>,
471
0
        buf: &[u8],
472
0
    ) -> Poll<Result<usize, io::Error>> {
473
0
        let mut me = self.inner.lock().unwrap();
474
0
        me.rx.extend(buf);
475
476
0
        if let Some(task) = me.rx_task.take() {
477
0
            task.wake();
478
0
        }
479
480
0
        Poll::Ready(Ok(buf.len()))
481
0
    }
482
483
467
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
484
467
        Poll::Ready(Ok(()))
485
467
    }
486
487
467
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
488
467
        Poll::Ready(Ok(()))
489
467
    }
490
}
491
492
0
pub async fn idle_ms(ms: u64) {
493
0
    tokio::time::sleep(Duration::from_millis(ms)).await
494
0
}