/src/h2/tests/h2-support/src/mock.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use crate::SendFrame; |
2 | | |
3 | | use h2::frame::{self, Frame}; |
4 | | use h2::proto::Error; |
5 | | use h2::SendError; |
6 | | |
7 | | use futures::future::poll_fn; |
8 | | use futures::{ready, Stream, StreamExt}; |
9 | | |
10 | | use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; |
11 | | |
12 | | use super::assert::assert_frame_eq; |
13 | | use std::pin::Pin; |
14 | | use std::sync::{Arc, Mutex}; |
15 | | use std::task::{Context, Poll, Waker}; |
16 | | use std::time::Duration; |
17 | | use std::{cmp, io, usize}; |
18 | | |
19 | | /// A mock I/O |
20 | | #[derive(Debug)] |
21 | | pub struct Mock { |
22 | | pipe: Pipe, |
23 | | } |
24 | | |
25 | | #[derive(Debug)] |
26 | | pub struct Handle { |
27 | | codec: crate::Codec<Pipe>, |
28 | | } |
29 | | |
30 | | #[derive(Debug)] |
31 | | pub struct Pipe { |
32 | | inner: Arc<Mutex<Inner>>, |
33 | | } |
34 | | |
35 | | #[derive(Debug)] |
36 | | struct Inner { |
37 | | /// Data written by the test case to the h2 lib. |
38 | | rx: Vec<u8>, |
39 | | |
40 | | /// Notify when data is ready to be received. |
41 | | rx_task: Option<Waker>, |
42 | | |
43 | | /// Data written by the `h2` library to be read by the test case. |
44 | | tx: Vec<u8>, |
45 | | |
46 | | /// Notify when data is written. This notifies the test case waiters. |
47 | | tx_task: Option<Waker>, |
48 | | |
49 | | /// Number of bytes that can be written before `write` returns `Poll::Pending`. |
50 | | tx_rem: usize, |
51 | | |
52 | | /// Task to notify when write capacity becomes available. |
53 | | tx_rem_task: Option<Waker>, |
54 | | |
55 | | /// True when the pipe is closed. |
56 | | closed: bool, |
57 | | |
58 | | /// Trigger an `UnexpectedEof` error on read |
59 | | unexpected_eof: bool, |
60 | | } |
61 | | |
62 | | const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; |
63 | | |
64 | | /// Create a new mock and handle |
65 | 467 | pub fn new() -> (Mock, Handle) { |
66 | 467 | new_with_write_capacity(usize::MAX) |
67 | 467 | } |
68 | | |
69 | | /// Create a new mock and handle allowing up to `cap` bytes to be written. |
70 | 467 | pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) { |
71 | 467 | let inner = Arc::new(Mutex::new(Inner { |
72 | 467 | rx: vec![], |
73 | 467 | rx_task: None, |
74 | 467 | tx: vec![], |
75 | 467 | tx_task: None, |
76 | 467 | tx_rem: cap, |
77 | 467 | tx_rem_task: None, |
78 | 467 | closed: false, |
79 | 467 | unexpected_eof: false, |
80 | 467 | })); |
81 | 467 | |
82 | 467 | let mock = Mock { |
83 | 467 | pipe: Pipe { |
84 | 467 | inner: inner.clone(), |
85 | 467 | }, |
86 | 467 | }; |
87 | 467 | |
88 | 467 | let handle = Handle { |
89 | 467 | codec: h2::Codec::new(Pipe { inner }), |
90 | 467 | }; |
91 | 467 | |
92 | 467 | (mock, handle) |
93 | 467 | } |
94 | | |
95 | | // ===== impl Handle ===== |
96 | | |
97 | | impl Handle { |
98 | | /// Get a mutable reference to inner Codec. |
99 | 0 | pub fn codec_mut(&mut self) -> &mut crate::Codec<Pipe> { |
100 | 0 | &mut self.codec |
101 | 0 | } |
102 | | |
103 | 0 | pub fn close_without_notify(&mut self) { |
104 | 0 | let mut me = self.codec.get_mut().inner.lock().unwrap(); |
105 | 0 | me.unexpected_eof = true; |
106 | 0 | } |
107 | | |
108 | | /// Send a frame |
109 | 0 | pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> { |
110 | 0 | // Queue the frame |
111 | 0 | self.codec.buffer(item).unwrap(); |
112 | 0 |
|
113 | 0 | // Flush the frame |
114 | 0 | poll_fn(|cx| { |
115 | 0 | let p = self.codec.flush(cx); |
116 | 0 | assert!(p.is_ready()); |
117 | 0 | p |
118 | 0 | }) |
119 | 0 | .await?; |
120 | 0 | Ok(()) |
121 | 0 | } |
122 | | |
123 | | /// Writes the client preface |
124 | 0 | pub async fn write_preface(&mut self) { |
125 | 0 | self.codec.get_mut().write_all(PREFACE).await.unwrap(); |
126 | 0 | } |
127 | | |
128 | | /// Read the client preface |
129 | 0 | pub async fn read_preface(&mut self) -> io::Result<()> { |
130 | 0 | let mut buf = vec![0u8; PREFACE.len()]; |
131 | 0 | self.read_exact(&mut buf).await?; |
132 | 0 | assert_eq!(buf, PREFACE); |
133 | 0 | Ok(()) |
134 | 0 | } |
135 | | |
136 | 0 | pub async fn recv_frame<F: Into<Frame>>(&mut self, expected: F) { |
137 | 0 | let frame = self.next().await.unwrap().unwrap(); |
138 | 0 | assert_frame_eq(frame, expected); |
139 | 0 | } |
140 | | |
141 | 0 | pub async fn send_frame<F: Into<SendFrame>>(&mut self, frame: F) { |
142 | 0 | self.send(frame.into()).await.unwrap(); |
143 | 0 | } |
144 | | |
145 | 0 | pub async fn recv_eof(&mut self) { |
146 | 0 | let frame = self.next().await; |
147 | 0 | assert!(frame.is_none()); |
148 | 0 | } |
149 | | |
150 | 0 | pub async fn send_bytes(&mut self, data: &[u8]) { |
151 | | use bytes::Buf; |
152 | | use std::io::Cursor; |
153 | | |
154 | 0 | let buf: Vec<_> = data.into(); |
155 | 0 | let mut buf = Cursor::new(buf); |
156 | 0 |
|
157 | 0 | poll_fn(move |cx| { |
158 | 0 | while buf.has_remaining() { |
159 | 0 | let res = Pin::new(self.codec.get_mut()) |
160 | 0 | .poll_write(cx, buf.chunk()) |
161 | 0 | .map_err(|e| panic!("write err={:?}", e)); |
162 | | |
163 | 0 | let n = ready!(res).unwrap(); |
164 | 0 | buf.advance(n); |
165 | | } |
166 | | |
167 | 0 | Poll::Ready(()) |
168 | 0 | }) |
169 | 0 | .await; |
170 | 0 | } |
171 | | |
172 | | /// Perform the H2 handshake |
173 | 0 | pub async fn assert_client_handshake(&mut self) -> frame::Settings { |
174 | 0 | self.assert_client_handshake_with_settings(frame::Settings::default()) |
175 | 0 | .await |
176 | 0 | } |
177 | | |
178 | | /// Perform the H2 handshake |
179 | 0 | pub async fn assert_client_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings |
180 | 0 | where |
181 | 0 | T: Into<frame::Settings>, |
182 | 0 | { |
183 | 0 | let settings = settings.into(); |
184 | 0 | // Send a settings frame |
185 | 0 | self.send(settings.into()).await.unwrap(); |
186 | 0 | self.read_preface().await.unwrap(); |
187 | | |
188 | 0 | let settings = match self.next().await { |
189 | 0 | Some(frame) => match frame.unwrap() { |
190 | 0 | Frame::Settings(settings) => { |
191 | 0 | // Send the ACK |
192 | 0 | let ack = frame::Settings::ack(); |
193 | 0 |
|
194 | 0 | // TODO: Don't unwrap? |
195 | 0 | self.send(ack.into()).await.unwrap(); |
196 | 0 |
|
197 | 0 | settings |
198 | | } |
199 | 0 | frame => { |
200 | 0 | panic!("unexpected frame; frame={:?}", frame); |
201 | | } |
202 | | }, |
203 | | None => { |
204 | 0 | panic!("unexpected EOF"); |
205 | | } |
206 | | }; |
207 | | |
208 | 0 | let frame = self.next().await.unwrap().unwrap(); |
209 | 0 | let f = assert_settings!(frame); |
210 | | |
211 | | // Is ACK |
212 | 0 | assert!(f.is_ack()); |
213 | | |
214 | 0 | settings |
215 | 0 | } |
216 | | |
217 | | /// Perform the H2 handshake |
218 | 0 | pub async fn assert_server_handshake(&mut self) -> frame::Settings { |
219 | 0 | self.assert_server_handshake_with_settings(frame::Settings::default()) |
220 | 0 | .await |
221 | 0 | } |
222 | | |
223 | | /// Perform the H2 handshake |
224 | 0 | pub async fn assert_server_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings |
225 | 0 | where |
226 | 0 | T: Into<frame::Settings>, |
227 | 0 | { |
228 | 0 | self.write_preface().await; |
229 | | |
230 | 0 | let settings = settings.into(); |
231 | 0 | self.send(settings.into()).await.unwrap(); |
232 | | |
233 | 0 | let frame = self.next().await.expect("unexpected EOF").unwrap(); |
234 | 0 | let settings = assert_settings!(frame); |
235 | | |
236 | | // Send the ACK |
237 | 0 | let ack = frame::Settings::ack(); |
238 | 0 |
|
239 | 0 | // TODO: Don't unwrap? |
240 | 0 | self.send(ack.into()).await.unwrap(); |
241 | | |
242 | 0 | let frame = self.next().await; |
243 | 0 | let f = assert_settings!(frame.unwrap().unwrap()); |
244 | | |
245 | | // Is ACK |
246 | 0 | assert!(f.is_ack()); |
247 | | |
248 | 0 | settings |
249 | 0 | } |
250 | | |
251 | 0 | pub async fn ping_pong(&mut self, payload: [u8; 8]) { |
252 | 0 | self.send_frame(crate::frames::ping(payload)).await; |
253 | 0 | self.recv_frame(crate::frames::ping(payload).pong()).await; |
254 | 0 | } |
255 | | |
256 | 0 | pub async fn buffer_bytes(&mut self, num: usize) { |
257 | 0 | // Set tx_rem to num |
258 | 0 | { |
259 | 0 | let mut i = self.codec.get_mut().inner.lock().unwrap(); |
260 | 0 | i.tx_rem = num; |
261 | 0 | } |
262 | 0 |
|
263 | 0 | poll_fn(move |cx| { |
264 | 0 | { |
265 | 0 | let mut inner = self.codec.get_mut().inner.lock().unwrap(); |
266 | 0 | if inner.tx_rem == 0 { |
267 | 0 | inner.tx_rem = usize::MAX; |
268 | 0 | } else { |
269 | 0 | inner.tx_task = Some(cx.waker().clone()); |
270 | 0 | return Poll::Pending; |
271 | | } |
272 | | } |
273 | | |
274 | 0 | Poll::Ready(()) |
275 | 0 | }) |
276 | 0 | .await; |
277 | 0 | } |
278 | | |
279 | 0 | pub async fn unbounded_bytes(&mut self) { |
280 | 0 | let mut i = self.codec.get_mut().inner.lock().unwrap(); |
281 | 0 | i.tx_rem = usize::MAX; |
282 | | |
283 | 0 | if let Some(task) = i.tx_rem_task.take() { |
284 | 0 | task.wake(); |
285 | 0 | } |
286 | 0 | } |
287 | | } |
288 | | |
289 | | impl Stream for Handle { |
290 | | type Item = Result<Frame, Error>; |
291 | | |
292 | 0 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
293 | 0 | Pin::new(&mut self.codec).poll_next(cx) |
294 | 0 | } |
295 | | } |
296 | | |
297 | | impl AsyncRead for Handle { |
298 | 0 | fn poll_read( |
299 | 0 | mut self: Pin<&mut Self>, |
300 | 0 | cx: &mut Context<'_>, |
301 | 0 | buf: &mut ReadBuf, |
302 | 0 | ) -> Poll<io::Result<()>> { |
303 | 0 | Pin::new(self.codec.get_mut()).poll_read(cx, buf) |
304 | 0 | } |
305 | | } |
306 | | |
307 | | impl AsyncWrite for Handle { |
308 | 0 | fn poll_write( |
309 | 0 | mut self: Pin<&mut Self>, |
310 | 0 | cx: &mut Context<'_>, |
311 | 0 | buf: &[u8], |
312 | 0 | ) -> Poll<Result<usize, io::Error>> { |
313 | 0 | Pin::new(self.codec.get_mut()).poll_write(cx, buf) |
314 | 0 | } |
315 | | |
316 | 0 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
317 | 0 | Pin::new(self.codec.get_mut()).poll_flush(cx) |
318 | 0 | } |
319 | | |
320 | 0 | fn poll_shutdown( |
321 | 0 | mut self: Pin<&mut Self>, |
322 | 0 | cx: &mut Context<'_>, |
323 | 0 | ) -> Poll<Result<(), io::Error>> { |
324 | 0 | Pin::new(self.codec.get_mut()).poll_shutdown(cx) |
325 | 0 | } |
326 | | } |
327 | | |
328 | | impl Drop for Handle { |
329 | 467 | fn drop(&mut self) { |
330 | 467 | // Shutdown *shouldn't* need a real Waker... |
331 | 467 | let waker = futures::task::noop_waker(); |
332 | 467 | let mut cx = Context::from_waker(&waker); |
333 | 467 | assert!(self.codec.shutdown(&mut cx).is_ready()); |
334 | | |
335 | 467 | if let Ok(mut me) = self.codec.get_mut().inner.lock() { |
336 | 467 | me.closed = true; |
337 | | |
338 | 467 | if let Some(task) = me.rx_task.take() { |
339 | 0 | task.wake(); |
340 | 467 | } |
341 | 0 | } |
342 | 467 | } |
343 | | } |
344 | | |
345 | | // ===== impl Mock ===== |
346 | | |
347 | | impl AsyncRead for Mock { |
348 | 0 | fn poll_read( |
349 | 0 | self: Pin<&mut Self>, |
350 | 0 | cx: &mut Context<'_>, |
351 | 0 | buf: &mut ReadBuf, |
352 | 0 | ) -> Poll<io::Result<()>> { |
353 | 0 | assert!( |
354 | 0 | buf.remaining() > 0, |
355 | 0 | "attempted read with zero length buffer... wut?" |
356 | 0 | ); |
357 | | |
358 | 0 | let mut me = self.pipe.inner.lock().unwrap(); |
359 | 0 |
|
360 | 0 | if me.unexpected_eof { |
361 | 0 | return Poll::Ready(Err(io::Error::new( |
362 | 0 | io::ErrorKind::UnexpectedEof, |
363 | 0 | "Simulate an unexpected eof error", |
364 | 0 | ))); |
365 | 0 | } |
366 | 0 |
|
367 | 0 | if me.rx.is_empty() { |
368 | 0 | if me.closed { |
369 | 0 | return Poll::Ready(Ok(())); |
370 | 0 | } |
371 | 0 |
|
372 | 0 | me.rx_task = Some(cx.waker().clone()); |
373 | 0 | return Poll::Pending; |
374 | 0 | } |
375 | 0 |
|
376 | 0 | let n = cmp::min(buf.remaining(), me.rx.len()); |
377 | 0 | buf.put_slice(&me.rx[..n]); |
378 | 0 | me.rx.drain(..n); |
379 | 0 |
|
380 | 0 | Poll::Ready(Ok(())) |
381 | 0 | } |
382 | | } |
383 | | |
384 | | impl AsyncWrite for Mock { |
385 | 467 | fn poll_write( |
386 | 467 | self: Pin<&mut Self>, |
387 | 467 | cx: &mut Context<'_>, |
388 | 467 | mut buf: &[u8], |
389 | 467 | ) -> Poll<Result<usize, io::Error>> { |
390 | 467 | let mut me = self.pipe.inner.lock().unwrap(); |
391 | 467 | |
392 | 467 | if me.closed { |
393 | 0 | return Poll::Ready(Ok(buf.len())); |
394 | 467 | } |
395 | 467 | |
396 | 467 | if me.tx_rem == 0 { |
397 | 0 | me.tx_rem_task = Some(cx.waker().clone()); |
398 | 0 | return Poll::Pending; |
399 | 467 | } |
400 | 467 | |
401 | 467 | if buf.len() > me.tx_rem { |
402 | 0 | buf = &buf[..me.tx_rem]; |
403 | 467 | } |
404 | | |
405 | 467 | me.tx.extend(buf); |
406 | 467 | me.tx_rem -= buf.len(); |
407 | | |
408 | 467 | if let Some(task) = me.tx_task.take() { |
409 | 0 | task.wake(); |
410 | 467 | } |
411 | | |
412 | 467 | Poll::Ready(Ok(buf.len())) |
413 | 467 | } |
414 | | |
415 | 0 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
416 | 0 | Poll::Ready(Ok(())) |
417 | 0 | } |
418 | | |
419 | 0 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
420 | 0 | Poll::Ready(Ok(())) |
421 | 0 | } |
422 | | } |
423 | | |
424 | | impl Drop for Mock { |
425 | 467 | fn drop(&mut self) { |
426 | 467 | let mut me = self.pipe.inner.lock().unwrap(); |
427 | 467 | me.closed = true; |
428 | | |
429 | 467 | if let Some(task) = me.tx_task.take() { |
430 | 0 | task.wake(); |
431 | 467 | } |
432 | 467 | } |
433 | | } |
434 | | |
435 | | // ===== impl Pipe ===== |
436 | | |
437 | | impl AsyncRead for Pipe { |
438 | 0 | fn poll_read( |
439 | 0 | self: Pin<&mut Self>, |
440 | 0 | cx: &mut Context<'_>, |
441 | 0 | buf: &mut ReadBuf, |
442 | 0 | ) -> Poll<io::Result<()>> { |
443 | 0 | assert!( |
444 | 0 | buf.remaining() > 0, |
445 | 0 | "attempted read with zero length buffer... wut?" |
446 | 0 | ); |
447 | | |
448 | 0 | let mut me = self.inner.lock().unwrap(); |
449 | 0 |
|
450 | 0 | if me.tx.is_empty() { |
451 | 0 | if me.closed { |
452 | 0 | return Poll::Ready(Ok(())); |
453 | 0 | } |
454 | 0 |
|
455 | 0 | me.tx_task = Some(cx.waker().clone()); |
456 | 0 | return Poll::Pending; |
457 | 0 | } |
458 | 0 |
|
459 | 0 | let n = cmp::min(buf.remaining(), me.tx.len()); |
460 | 0 | buf.put_slice(&me.tx[..n]); |
461 | 0 | me.tx.drain(..n); |
462 | 0 |
|
463 | 0 | Poll::Ready(Ok(())) |
464 | 0 | } |
465 | | } |
466 | | |
467 | | impl AsyncWrite for Pipe { |
468 | 0 | fn poll_write( |
469 | 0 | self: Pin<&mut Self>, |
470 | 0 | _cx: &mut Context<'_>, |
471 | 0 | buf: &[u8], |
472 | 0 | ) -> Poll<Result<usize, io::Error>> { |
473 | 0 | let mut me = self.inner.lock().unwrap(); |
474 | 0 | me.rx.extend(buf); |
475 | | |
476 | 0 | if let Some(task) = me.rx_task.take() { |
477 | 0 | task.wake(); |
478 | 0 | } |
479 | | |
480 | 0 | Poll::Ready(Ok(buf.len())) |
481 | 0 | } |
482 | | |
483 | 467 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
484 | 467 | Poll::Ready(Ok(())) |
485 | 467 | } |
486 | | |
487 | 467 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
488 | 467 | Poll::Ready(Ok(())) |
489 | 467 | } |
490 | | } |
491 | | |
492 | 0 | pub async fn idle_ms(ms: u64) { |
493 | 0 | tokio::time::sleep(Duration::from_millis(ms)).await |
494 | 0 | } |