/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-test-0.4.4/src/stream_mock.rs
Line | Count | Source (jump to first uncovered line) |
1 | | #![cfg(not(loom))] |
2 | | |
3 | | //! A mock stream implementing [`Stream`]. |
4 | | //! |
5 | | //! # Overview |
6 | | //! This crate provides a `StreamMock` that can be used to test code that interacts with streams. |
7 | | //! It allows you to mock the behavior of a stream and control the items it yields and the waiting |
8 | | //! intervals between items. |
9 | | //! |
10 | | //! # Usage |
11 | | //! To use the `StreamMock`, you need to create a builder using [`StreamMockBuilder`]. The builder |
12 | | //! allows you to enqueue actions such as returning items or waiting for a certain duration. |
13 | | //! |
14 | | //! # Example |
15 | | //! ```rust |
16 | | //! |
17 | | //! use futures_util::StreamExt; |
18 | | //! use std::time::Duration; |
19 | | //! use tokio_test::stream_mock::StreamMockBuilder; |
20 | | //! |
21 | | //! async fn test_stream_mock_wait() { |
22 | | //! let mut stream_mock = StreamMockBuilder::new() |
23 | | //! .next(1) |
24 | | //! .wait(Duration::from_millis(300)) |
25 | | //! .next(2) |
26 | | //! .build(); |
27 | | //! |
28 | | //! assert_eq!(stream_mock.next().await, Some(1)); |
29 | | //! let start = std::time::Instant::now(); |
30 | | //! assert_eq!(stream_mock.next().await, Some(2)); |
31 | | //! let elapsed = start.elapsed(); |
32 | | //! assert!(elapsed >= Duration::from_millis(300)); |
33 | | //! assert_eq!(stream_mock.next().await, None); |
34 | | //! } |
35 | | //! ``` |
36 | | |
37 | | use std::collections::VecDeque; |
38 | | use std::pin::Pin; |
39 | | use std::task::Poll; |
40 | | use std::time::Duration; |
41 | | |
42 | | use futures_core::{ready, Stream}; |
43 | | use std::future::Future; |
44 | | use tokio::time::{sleep_until, Instant, Sleep}; |
45 | | |
46 | | #[derive(Debug, Clone)] |
47 | | enum Action<T: Unpin> { |
48 | | Next(T), |
49 | | Wait(Duration), |
50 | | } |
51 | | |
52 | | /// A builder for [`StreamMock`] |
53 | | #[derive(Debug, Clone)] |
54 | | pub struct StreamMockBuilder<T: Unpin> { |
55 | | actions: VecDeque<Action<T>>, |
56 | | } |
57 | | |
58 | | impl<T: Unpin> StreamMockBuilder<T> { |
59 | | /// Create a new empty [`StreamMockBuilder`] |
60 | 0 | pub fn new() -> Self { |
61 | 0 | StreamMockBuilder::default() |
62 | 0 | } |
63 | | |
64 | | /// Queue an item to be returned by the stream |
65 | 0 | pub fn next(mut self, value: T) -> Self { |
66 | 0 | self.actions.push_back(Action::Next(value)); |
67 | 0 | self |
68 | 0 | } |
69 | | |
70 | | // Queue an item to be consumed by the sink, |
71 | | // commented out until Sink is implemented. |
72 | | // |
73 | | // pub fn consume(mut self, value: T) -> Self { |
74 | | // self.actions.push_back(Action::Consume(value)); |
75 | | // self |
76 | | // } |
77 | | |
78 | | /// Queue the stream to wait for a duration |
79 | 0 | pub fn wait(mut self, duration: Duration) -> Self { |
80 | 0 | self.actions.push_back(Action::Wait(duration)); |
81 | 0 | self |
82 | 0 | } |
83 | | |
84 | | /// Build the [`StreamMock`] |
85 | 0 | pub fn build(self) -> StreamMock<T> { |
86 | 0 | StreamMock { |
87 | 0 | actions: self.actions, |
88 | 0 | sleep: None, |
89 | 0 | } |
90 | 0 | } |
91 | | } |
92 | | |
93 | | impl<T: Unpin> Default for StreamMockBuilder<T> { |
94 | 0 | fn default() -> Self { |
95 | 0 | StreamMockBuilder { |
96 | 0 | actions: VecDeque::new(), |
97 | 0 | } |
98 | 0 | } |
99 | | } |
100 | | |
101 | | /// A mock stream implementing [`Stream`] |
102 | | /// |
103 | | /// See [`StreamMockBuilder`] for more information. |
104 | | #[derive(Debug)] |
105 | | pub struct StreamMock<T: Unpin> { |
106 | | actions: VecDeque<Action<T>>, |
107 | | sleep: Option<Pin<Box<Sleep>>>, |
108 | | } |
109 | | |
110 | | impl<T: Unpin> StreamMock<T> { |
111 | 0 | fn next_action(&mut self) -> Option<Action<T>> { |
112 | 0 | self.actions.pop_front() |
113 | 0 | } |
114 | | } |
115 | | |
116 | | impl<T: Unpin> Stream for StreamMock<T> { |
117 | | type Item = T; |
118 | | |
119 | 0 | fn poll_next( |
120 | 0 | mut self: std::pin::Pin<&mut Self>, |
121 | 0 | cx: &mut std::task::Context<'_>, |
122 | 0 | ) -> std::task::Poll<Option<Self::Item>> { |
123 | | // Try polling the sleep future first |
124 | 0 | if let Some(ref mut sleep) = self.sleep { |
125 | 0 | ready!(Pin::new(sleep).poll(cx)); |
126 | | // Since we're ready, discard the sleep future |
127 | 0 | self.sleep.take(); |
128 | 0 | } |
129 | | |
130 | 0 | match self.next_action() { |
131 | 0 | Some(action) => match action { |
132 | 0 | Action::Next(item) => Poll::Ready(Some(item)), |
133 | 0 | Action::Wait(duration) => { |
134 | 0 | // Set up a sleep future and schedule this future to be polled again for it. |
135 | 0 | self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration))); |
136 | 0 | cx.waker().wake_by_ref(); |
137 | 0 |
|
138 | 0 | Poll::Pending |
139 | | } |
140 | | }, |
141 | 0 | None => Poll::Ready(None), |
142 | | } |
143 | 0 | } |
144 | | } |
145 | | |
146 | | impl<T: Unpin> Drop for StreamMock<T> { |
147 | 0 | fn drop(&mut self) { |
148 | 0 | // Avoid double panicking to make debugging easier. |
149 | 0 | if std::thread::panicking() { |
150 | 0 | return; |
151 | 0 | } |
152 | 0 |
|
153 | 0 | let undropped_count = self |
154 | 0 | .actions |
155 | 0 | .iter() |
156 | 0 | .filter(|action| match action { |
157 | 0 | Action::Next(_) => true, |
158 | 0 | Action::Wait(_) => false, |
159 | 0 | }) |
160 | 0 | .count(); |
161 | 0 |
|
162 | 0 | assert!( |
163 | 0 | undropped_count == 0, |
164 | 0 | "StreamMock was dropped before all actions were consumed, {} actions were not consumed", |
165 | | undropped_count |
166 | | ); |
167 | 0 | } |
168 | | } |