/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-test-0.4.5/src/io.rs
Line | Count | Source |
1 | | #![cfg(not(loom))] |
2 | | |
3 | | //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. |
4 | | //! |
5 | | //! |
6 | | //! # Overview |
7 | | //! |
8 | | //! Provides a type that implements [`AsyncRead`] + [`AsyncWrite`] that can be configured |
9 | | //! to handle an arbitrary sequence of read and write operations. This is useful |
10 | | //! for writing unit tests for networking services as using an actual network |
11 | | //! type is fairly non deterministic. |
12 | | //! |
13 | | //! # Usage |
14 | | //! |
15 | | //! Attempting to write data that the mock isn't expecting will result in a |
16 | | //! panic. |
17 | | //! |
18 | | //! [`AsyncRead`]: tokio::io::AsyncRead |
19 | | //! [`AsyncWrite`]: tokio::io::AsyncWrite |
20 | | |
21 | | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
22 | | use tokio::sync::mpsc; |
23 | | use tokio::time::{self, Duration, Instant, Sleep}; |
24 | | use tokio_stream::wrappers::UnboundedReceiverStream; |
25 | | |
26 | | use futures_core::Stream; |
27 | | use std::collections::VecDeque; |
28 | | use std::fmt; |
29 | | use std::future::Future; |
30 | | use std::pin::Pin; |
31 | | use std::sync::Arc; |
32 | | use std::task::{self, ready, Poll, Waker}; |
33 | | use std::{cmp, io}; |
34 | | |
35 | | /// An I/O object that follows a predefined script. |
36 | | /// |
37 | | /// This value is created by `Builder` and implements `AsyncRead` + `AsyncWrite`. It |
38 | | /// follows the scenario described by the builder and panics otherwise. |
39 | | #[derive(Debug)] |
40 | | pub struct Mock { |
41 | | inner: Inner, |
42 | | } |
43 | | |
44 | | /// A handle to send additional actions to the related `Mock`. |
45 | | #[derive(Debug)] |
46 | | pub struct Handle { |
47 | | tx: mpsc::UnboundedSender<Action>, |
48 | | } |
49 | | |
50 | | /// Builds `Mock` instances. |
51 | | #[derive(Debug, Clone, Default)] |
52 | | pub struct Builder { |
53 | | // Sequence of actions for the Mock to take |
54 | | actions: VecDeque<Action>, |
55 | | name: String, |
56 | | } |
57 | | |
58 | | #[derive(Debug, Clone)] |
59 | | enum Action { |
60 | | Read(Vec<u8>), |
61 | | Write(Vec<u8>), |
62 | | Wait(Duration), |
63 | | // Wrapped in Arc so that Builder can be cloned and Send. |
64 | | // Mock is not cloned as does not need to check Rc for ref counts. |
65 | | ReadError(Option<Arc<io::Error>>), |
66 | | WriteError(Option<Arc<io::Error>>), |
67 | | } |
68 | | |
69 | | struct Inner { |
70 | | actions: VecDeque<Action>, |
71 | | waiting: Option<Instant>, |
72 | | sleep: Option<Pin<Box<Sleep>>>, |
73 | | read_wait: Option<Waker>, |
74 | | rx: UnboundedReceiverStream<Action>, |
75 | | name: String, |
76 | | } |
77 | | |
78 | | impl Builder { |
79 | | /// Return a new, empty `Builder`. |
80 | 0 | pub fn new() -> Self { |
81 | 0 | Self::default() |
82 | 0 | } |
83 | | |
84 | | /// Sequence a `read` operation. |
85 | | /// |
86 | | /// The next operation in the mock's script will be to expect a `read` call |
87 | | /// and return `buf`. |
88 | 0 | pub fn read(&mut self, buf: &[u8]) -> &mut Self { |
89 | 0 | self.actions.push_back(Action::Read(buf.into())); |
90 | 0 | self |
91 | 0 | } |
92 | | |
93 | | /// Sequence a `read` operation that produces an error. |
94 | | /// |
95 | | /// The next operation in the mock's script will be to expect a `read` call |
96 | | /// and return `error`. |
97 | 0 | pub fn read_error(&mut self, error: io::Error) -> &mut Self { |
98 | 0 | let error = Some(error.into()); |
99 | 0 | self.actions.push_back(Action::ReadError(error)); |
100 | 0 | self |
101 | 0 | } |
102 | | |
103 | | /// Sequence a `write` operation. |
104 | | /// |
105 | | /// The next operation in the mock's script will be to expect a `write` |
106 | | /// call. |
107 | 0 | pub fn write(&mut self, buf: &[u8]) -> &mut Self { |
108 | 0 | self.actions.push_back(Action::Write(buf.into())); |
109 | 0 | self |
110 | 0 | } |
111 | | |
112 | | /// Sequence a `write` operation that produces an error. |
113 | | /// |
114 | | /// The next operation in the mock's script will be to expect a `write` |
115 | | /// call that provides `error`. |
116 | 0 | pub fn write_error(&mut self, error: io::Error) -> &mut Self { |
117 | 0 | let error = Some(error.into()); |
118 | 0 | self.actions.push_back(Action::WriteError(error)); |
119 | 0 | self |
120 | 0 | } |
121 | | |
122 | | /// Sequence a wait. |
123 | | /// |
124 | | /// The next operation in the mock's script will be to wait without doing so |
125 | | /// for `duration` amount of time. |
126 | 0 | pub fn wait(&mut self, duration: Duration) -> &mut Self { |
127 | 0 | let duration = cmp::max(duration, Duration::from_millis(1)); |
128 | 0 | self.actions.push_back(Action::Wait(duration)); |
129 | 0 | self |
130 | 0 | } |
131 | | |
132 | | /// Set name of the mock IO object to include in panic messages and debug output |
133 | 0 | pub fn name(&mut self, name: impl Into<String>) -> &mut Self { |
134 | 0 | self.name = name.into(); |
135 | 0 | self |
136 | 0 | } |
137 | | |
138 | | /// Build a `Mock` value according to the defined script. |
139 | 0 | pub fn build(&mut self) -> Mock { |
140 | 0 | let (mock, _) = self.build_with_handle(); |
141 | 0 | mock |
142 | 0 | } |
143 | | |
144 | | /// Build a `Mock` value paired with a handle |
145 | 0 | pub fn build_with_handle(&mut self) -> (Mock, Handle) { |
146 | 0 | let (inner, handle) = Inner::new(self.actions.clone(), self.name.clone()); |
147 | | |
148 | 0 | let mock = Mock { inner }; |
149 | | |
150 | 0 | (mock, handle) |
151 | 0 | } |
152 | | } |
153 | | |
154 | | impl Handle { |
155 | | /// Sequence a `read` operation. |
156 | | /// |
157 | | /// The next operation in the mock's script will be to expect a `read` call |
158 | | /// and return `buf`. |
159 | 0 | pub fn read(&mut self, buf: &[u8]) -> &mut Self { |
160 | 0 | self.tx.send(Action::Read(buf.into())).unwrap(); |
161 | 0 | self |
162 | 0 | } |
163 | | |
164 | | /// Sequence a `read` operation error. |
165 | | /// |
166 | | /// The next operation in the mock's script will be to expect a `read` call |
167 | | /// and return `error`. |
168 | 0 | pub fn read_error(&mut self, error: io::Error) -> &mut Self { |
169 | 0 | let error = Some(error.into()); |
170 | 0 | self.tx.send(Action::ReadError(error)).unwrap(); |
171 | 0 | self |
172 | 0 | } |
173 | | |
174 | | /// Sequence a `write` operation. |
175 | | /// |
176 | | /// The next operation in the mock's script will be to expect a `write` |
177 | | /// call. |
178 | 0 | pub fn write(&mut self, buf: &[u8]) -> &mut Self { |
179 | 0 | self.tx.send(Action::Write(buf.into())).unwrap(); |
180 | 0 | self |
181 | 0 | } |
182 | | |
183 | | /// Sequence a `write` operation error. |
184 | | /// |
185 | | /// The next operation in the mock's script will be to expect a `write` |
186 | | /// call error. |
187 | 0 | pub fn write_error(&mut self, error: io::Error) -> &mut Self { |
188 | 0 | let error = Some(error.into()); |
189 | 0 | self.tx.send(Action::WriteError(error)).unwrap(); |
190 | 0 | self |
191 | 0 | } |
192 | | } |
193 | | |
194 | | impl Inner { |
195 | 0 | fn new(actions: VecDeque<Action>, name: String) -> (Inner, Handle) { |
196 | 0 | let (tx, rx) = mpsc::unbounded_channel(); |
197 | | |
198 | 0 | let rx = UnboundedReceiverStream::new(rx); |
199 | | |
200 | 0 | let inner = Inner { |
201 | 0 | actions, |
202 | 0 | sleep: None, |
203 | 0 | read_wait: None, |
204 | 0 | rx, |
205 | 0 | waiting: None, |
206 | 0 | name, |
207 | 0 | }; |
208 | | |
209 | 0 | let handle = Handle { tx }; |
210 | | |
211 | 0 | (inner, handle) |
212 | 0 | } |
213 | | |
214 | 0 | fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> { |
215 | 0 | Pin::new(&mut self.rx).poll_next(cx) |
216 | 0 | } |
217 | | |
218 | 0 | fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { |
219 | 0 | match self.action() { |
220 | 0 | Some(&mut Action::Read(ref mut data)) => { |
221 | | // Figure out how much to copy |
222 | 0 | let n = cmp::min(dst.remaining(), data.len()); |
223 | | |
224 | | // Copy the data into the `dst` slice |
225 | 0 | dst.put_slice(&data[..n]); |
226 | | |
227 | | // Drain the data from the source |
228 | 0 | data.drain(..n); |
229 | | |
230 | 0 | Ok(()) |
231 | | } |
232 | 0 | Some(&mut Action::ReadError(ref mut err)) => { |
233 | | // As the |
234 | 0 | let err = err.take().expect("Should have been removed from actions."); |
235 | 0 | let err = Arc::try_unwrap(err).expect("There are no other references."); |
236 | 0 | Err(err) |
237 | | } |
238 | | Some(_) => { |
239 | | // Either waiting or expecting a write |
240 | 0 | Err(io::ErrorKind::WouldBlock.into()) |
241 | | } |
242 | 0 | None => Ok(()), |
243 | | } |
244 | 0 | } |
245 | | |
246 | 0 | fn write(&mut self, mut src: &[u8]) -> io::Result<usize> { |
247 | 0 | let mut ret = 0; |
248 | | |
249 | 0 | if self.actions.is_empty() { |
250 | 0 | return Err(io::ErrorKind::BrokenPipe.into()); |
251 | 0 | } |
252 | | |
253 | 0 | if let Some(&mut Action::Wait(..)) = self.action() { |
254 | 0 | return Err(io::ErrorKind::WouldBlock.into()); |
255 | 0 | } |
256 | | |
257 | 0 | if let Some(&mut Action::WriteError(ref mut err)) = self.action() { |
258 | 0 | let err = err.take().expect("Should have been removed from actions."); |
259 | 0 | let err = Arc::try_unwrap(err).expect("There are no other references."); |
260 | 0 | return Err(err); |
261 | 0 | } |
262 | | |
263 | 0 | for i in 0..self.actions.len() { |
264 | 0 | match self.actions[i] { |
265 | 0 | Action::Write(ref mut expect) => { |
266 | 0 | let n = cmp::min(src.len(), expect.len()); |
267 | | |
268 | 0 | assert_eq!(&src[..n], &expect[..n], "name={} i={}", self.name, i); |
269 | | |
270 | | // Drop data that was matched |
271 | 0 | expect.drain(..n); |
272 | 0 | src = &src[n..]; |
273 | | |
274 | 0 | ret += n; |
275 | | |
276 | 0 | if src.is_empty() { |
277 | 0 | return Ok(ret); |
278 | 0 | } |
279 | | } |
280 | | Action::Wait(..) | Action::WriteError(..) => { |
281 | 0 | break; |
282 | | } |
283 | 0 | _ => {} |
284 | | } |
285 | | |
286 | | // TODO: remove write |
287 | | } |
288 | | |
289 | 0 | Ok(ret) |
290 | 0 | } |
291 | | |
292 | 0 | fn remaining_wait(&mut self) -> Option<Duration> { |
293 | 0 | match self.action() { |
294 | 0 | Some(&mut Action::Wait(dur)) => Some(dur), |
295 | 0 | _ => None, |
296 | | } |
297 | 0 | } |
298 | | |
299 | 0 | fn action(&mut self) -> Option<&mut Action> { |
300 | | loop { |
301 | 0 | if self.actions.is_empty() { |
302 | 0 | return None; |
303 | 0 | } |
304 | | |
305 | 0 | match self.actions[0] { |
306 | 0 | Action::Read(ref mut data) => { |
307 | 0 | if !data.is_empty() { |
308 | 0 | break; |
309 | 0 | } |
310 | | } |
311 | 0 | Action::Write(ref mut data) => { |
312 | 0 | if !data.is_empty() { |
313 | 0 | break; |
314 | 0 | } |
315 | | } |
316 | 0 | Action::Wait(ref mut dur) => { |
317 | 0 | if let Some(until) = self.waiting { |
318 | 0 | let now = Instant::now(); |
319 | | |
320 | 0 | if now < until { |
321 | 0 | break; |
322 | 0 | } else { |
323 | 0 | self.waiting = None; |
324 | 0 | } |
325 | | } else { |
326 | 0 | self.waiting = Some(Instant::now() + *dur); |
327 | 0 | break; |
328 | | } |
329 | | } |
330 | 0 | Action::ReadError(ref mut error) | Action::WriteError(ref mut error) => { |
331 | 0 | if error.is_some() { |
332 | 0 | break; |
333 | 0 | } |
334 | | } |
335 | | } |
336 | | |
337 | 0 | let _action = self.actions.pop_front(); |
338 | | } |
339 | | |
340 | 0 | self.actions.front_mut() |
341 | 0 | } |
342 | | } |
343 | | |
344 | | // ===== impl Inner ===== |
345 | | |
346 | | impl Mock { |
347 | 0 | fn maybe_wakeup_reader(&mut self) { |
348 | 0 | match self.inner.action() { |
349 | | Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) | None => { |
350 | 0 | if let Some(waker) = self.inner.read_wait.take() { |
351 | 0 | waker.wake(); |
352 | 0 | } |
353 | | } |
354 | 0 | _ => {} |
355 | | } |
356 | 0 | } |
357 | | } |
358 | | |
359 | | impl AsyncRead for Mock { |
360 | 0 | fn poll_read( |
361 | 0 | mut self: Pin<&mut Self>, |
362 | 0 | cx: &mut task::Context<'_>, |
363 | 0 | buf: &mut ReadBuf<'_>, |
364 | 0 | ) -> Poll<io::Result<()>> { |
365 | | loop { |
366 | 0 | if let Some(ref mut sleep) = self.inner.sleep { |
367 | 0 | ready!(Pin::new(sleep).poll(cx)); |
368 | 0 | } |
369 | | |
370 | | // If a sleep is set, it has already fired |
371 | 0 | self.inner.sleep = None; |
372 | | |
373 | | // Capture 'filled' to monitor if it changed |
374 | 0 | let filled = buf.filled().len(); |
375 | | |
376 | 0 | match self.inner.read(buf) { |
377 | 0 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
378 | 0 | if let Some(rem) = self.inner.remaining_wait() { |
379 | 0 | let until = Instant::now() + rem; |
380 | 0 | self.inner.sleep = Some(Box::pin(time::sleep_until(until))); |
381 | 0 | } else { |
382 | 0 | self.inner.read_wait = Some(cx.waker().clone()); |
383 | 0 | return Poll::Pending; |
384 | | } |
385 | | } |
386 | | Ok(()) => { |
387 | 0 | if buf.filled().len() == filled { |
388 | 0 | match ready!(self.inner.poll_action(cx)) { |
389 | 0 | Some(action) => { |
390 | 0 | self.inner.actions.push_back(action); |
391 | 0 | continue; |
392 | | } |
393 | | None => { |
394 | 0 | return Poll::Ready(Ok(())); |
395 | | } |
396 | | } |
397 | | } else { |
398 | 0 | return Poll::Ready(Ok(())); |
399 | | } |
400 | | } |
401 | 0 | Err(e) => return Poll::Ready(Err(e)), |
402 | | } |
403 | | } |
404 | 0 | } |
405 | | } |
406 | | |
407 | | impl AsyncWrite for Mock { |
408 | 0 | fn poll_write( |
409 | 0 | mut self: Pin<&mut Self>, |
410 | 0 | cx: &mut task::Context<'_>, |
411 | 0 | buf: &[u8], |
412 | 0 | ) -> Poll<io::Result<usize>> { |
413 | | loop { |
414 | 0 | if let Some(ref mut sleep) = self.inner.sleep { |
415 | 0 | ready!(Pin::new(sleep).poll(cx)); |
416 | 0 | } |
417 | | |
418 | | // If a sleep is set, it has already fired |
419 | 0 | self.inner.sleep = None; |
420 | | |
421 | 0 | if self.inner.actions.is_empty() { |
422 | 0 | match self.inner.poll_action(cx) { |
423 | 0 | Poll::Pending => { |
424 | 0 | // do not propagate pending |
425 | 0 | } |
426 | 0 | Poll::Ready(Some(action)) => { |
427 | 0 | self.inner.actions.push_back(action); |
428 | 0 | } |
429 | | Poll::Ready(None) => { |
430 | 0 | panic!("unexpected write {}", self.pmsg()); |
431 | | } |
432 | | } |
433 | 0 | } |
434 | | |
435 | 0 | match self.inner.write(buf) { |
436 | 0 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
437 | 0 | if let Some(rem) = self.inner.remaining_wait() { |
438 | 0 | let until = Instant::now() + rem; |
439 | 0 | self.inner.sleep = Some(Box::pin(time::sleep_until(until))); |
440 | 0 | } else { |
441 | 0 | panic!("unexpected WouldBlock {}", self.pmsg()); |
442 | | } |
443 | | } |
444 | | Ok(0) => { |
445 | | // TODO: Is this correct? |
446 | 0 | if !self.inner.actions.is_empty() { |
447 | 0 | return Poll::Pending; |
448 | 0 | } |
449 | | |
450 | | // TODO: Extract |
451 | 0 | match ready!(self.inner.poll_action(cx)) { |
452 | 0 | Some(action) => { |
453 | 0 | self.inner.actions.push_back(action); |
454 | 0 | continue; |
455 | | } |
456 | | None => { |
457 | 0 | panic!("unexpected write {}", self.pmsg()); |
458 | | } |
459 | | } |
460 | | } |
461 | 0 | ret => { |
462 | 0 | self.maybe_wakeup_reader(); |
463 | 0 | return Poll::Ready(ret); |
464 | | } |
465 | | } |
466 | | } |
467 | 0 | } |
468 | | |
469 | 0 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
470 | 0 | Poll::Ready(Ok(())) |
471 | 0 | } |
472 | | |
473 | 0 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
474 | 0 | Poll::Ready(Ok(())) |
475 | 0 | } |
476 | | } |
477 | | |
478 | | /// Ensures that Mock isn't dropped with data "inside". |
479 | | impl Drop for Mock { |
480 | 0 | fn drop(&mut self) { |
481 | | // Avoid double panicking, since makes debugging much harder. |
482 | 0 | if std::thread::panicking() { |
483 | 0 | return; |
484 | 0 | } |
485 | | |
486 | 0 | self.inner.actions.iter().for_each(|a| match a { |
487 | 0 | Action::Read(data) => assert!( |
488 | 0 | data.is_empty(), |
489 | 0 | "There is still data left to read. {}", |
490 | 0 | self.pmsg() |
491 | | ), |
492 | 0 | Action::Write(data) => assert!( |
493 | 0 | data.is_empty(), |
494 | 0 | "There is still data left to write. {}", |
495 | 0 | self.pmsg() |
496 | | ), |
497 | 0 | _ => (), |
498 | 0 | }); |
499 | 0 | } |
500 | | } |
501 | | /* |
502 | | /// Returns `true` if called from the context of a futures-rs Task |
503 | | fn is_task_ctx() -> bool { |
504 | | use std::panic; |
505 | | |
506 | | // Save the existing panic hook |
507 | | let h = panic::take_hook(); |
508 | | |
509 | | // Install a new one that does nothing |
510 | | panic::set_hook(Box::new(|_| {})); |
511 | | |
512 | | // Attempt to call the fn |
513 | | let r = panic::catch_unwind(|| task::current()).is_ok(); |
514 | | |
515 | | // Re-install the old one |
516 | | panic::set_hook(h); |
517 | | |
518 | | // Return the result |
519 | | r |
520 | | } |
521 | | */ |
522 | | |
523 | | impl fmt::Debug for Inner { |
524 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
525 | 0 | if self.name.is_empty() { |
526 | 0 | write!(f, "Inner {{...}}") |
527 | | } else { |
528 | 0 | write!(f, "Inner {{name={}, ...}}", self.name) |
529 | | } |
530 | 0 | } |
531 | | } |
532 | | |
533 | | struct PanicMsgSnippet<'a>(&'a Inner); |
534 | | |
535 | | impl<'a> fmt::Display for PanicMsgSnippet<'a> { |
536 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
537 | 0 | if self.0.name.is_empty() { |
538 | 0 | write!(f, "({} actions remain)", self.0.actions.len()) |
539 | | } else { |
540 | 0 | write!( |
541 | 0 | f, |
542 | 0 | "(name {}, {} actions remain)", |
543 | | self.0.name, |
544 | 0 | self.0.actions.len() |
545 | | ) |
546 | | } |
547 | 0 | } |
548 | | } |
549 | | |
550 | | impl Mock { |
551 | 0 | fn pmsg(&self) -> PanicMsgSnippet<'_> { |
552 | 0 | PanicMsgSnippet(&self.inner) |
553 | 0 | } |
554 | | } |