/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/io/split.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Split a single value implementing `AsyncRead + AsyncWrite` into separate |
2 | | //! `AsyncRead` and `AsyncWrite` handles. |
3 | | //! |
4 | | //! To restore this read/write object from its `split::ReadHalf` and |
5 | | //! `split::WriteHalf` use `unsplit`. |
6 | | |
7 | | use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; |
8 | | |
9 | | use std::fmt; |
10 | | use std::io; |
11 | | use std::pin::Pin; |
12 | | use std::sync::Arc; |
13 | | use std::sync::Mutex; |
14 | | use std::task::{Context, Poll}; |
15 | | |
16 | | cfg_io_util! { |
17 | | /// The readable half of a value returned from [`split`](split()). |
18 | | pub struct ReadHalf<T> { |
19 | | inner: Arc<Inner<T>>, |
20 | | } |
21 | | |
22 | | /// The writable half of a value returned from [`split`](split()). |
23 | | pub struct WriteHalf<T> { |
24 | | inner: Arc<Inner<T>>, |
25 | | } |
26 | | |
27 | | /// Splits a single value implementing `AsyncRead + AsyncWrite` into separate |
28 | | /// `AsyncRead` and `AsyncWrite` handles. |
29 | | /// |
30 | | /// To restore this read/write object from its `ReadHalf` and |
31 | | /// `WriteHalf` use [`unsplit`](ReadHalf::unsplit()). |
32 | 0 | pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>) |
33 | 0 | where |
34 | 0 | T: AsyncRead + AsyncWrite, |
35 | 0 | { |
36 | 0 | let is_write_vectored = stream.is_write_vectored(); |
37 | 0 |
|
38 | 0 | let inner = Arc::new(Inner { |
39 | 0 | stream: Mutex::new(stream), |
40 | 0 | is_write_vectored, |
41 | 0 | }); |
42 | 0 |
|
43 | 0 | let rd = ReadHalf { |
44 | 0 | inner: inner.clone(), |
45 | 0 | }; |
46 | 0 |
|
47 | 0 | let wr = WriteHalf { inner }; |
48 | 0 |
|
49 | 0 | (rd, wr) |
50 | 0 | } |
51 | | } |
52 | | |
53 | | struct Inner<T> { |
54 | | stream: Mutex<T>, |
55 | | is_write_vectored: bool, |
56 | | } |
57 | | |
58 | | impl<T> Inner<T> { |
59 | 0 | fn with_lock<R>(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R { |
60 | 0 | let mut guard = self.stream.lock().unwrap(); |
61 | 0 |
|
62 | 0 | // safety: we do not move the stream. |
63 | 0 | let stream = unsafe { Pin::new_unchecked(&mut *guard) }; |
64 | 0 |
|
65 | 0 | f(stream) |
66 | 0 | } |
67 | | } |
68 | | |
69 | | impl<T> ReadHalf<T> { |
70 | | /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same |
71 | | /// stream. |
72 | 0 | pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool { |
73 | 0 | other.is_pair_of(self) |
74 | 0 | } |
75 | | |
76 | | /// Reunites with a previously split `WriteHalf`. |
77 | | /// |
78 | | /// # Panics |
79 | | /// |
80 | | /// If this `ReadHalf` and the given `WriteHalf` do not originate from the |
81 | | /// same `split` operation this method will panic. |
82 | | /// This can be checked ahead of time by calling [`is_pair_of()`](Self::is_pair_of). |
83 | | #[track_caller] |
84 | 0 | pub fn unsplit(self, wr: WriteHalf<T>) -> T |
85 | 0 | where |
86 | 0 | T: Unpin, |
87 | 0 | { |
88 | 0 | if self.is_pair_of(&wr) { |
89 | 0 | drop(wr); |
90 | 0 |
|
91 | 0 | let inner = Arc::try_unwrap(self.inner) |
92 | 0 | .ok() |
93 | 0 | .expect("`Arc::try_unwrap` failed"); |
94 | 0 |
|
95 | 0 | inner.stream.into_inner().unwrap() |
96 | | } else { |
97 | 0 | panic!("Unrelated `split::Write` passed to `split::Read::unsplit`.") |
98 | | } |
99 | 0 | } |
100 | | } |
101 | | |
102 | | impl<T> WriteHalf<T> { |
103 | | /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same |
104 | | /// stream. |
105 | 0 | pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool { |
106 | 0 | Arc::ptr_eq(&self.inner, &other.inner) |
107 | 0 | } |
108 | | } |
109 | | |
110 | | impl<T: AsyncRead> AsyncRead for ReadHalf<T> { |
111 | 0 | fn poll_read( |
112 | 0 | self: Pin<&mut Self>, |
113 | 0 | cx: &mut Context<'_>, |
114 | 0 | buf: &mut ReadBuf<'_>, |
115 | 0 | ) -> Poll<io::Result<()>> { |
116 | 0 | self.inner.with_lock(|stream| stream.poll_read(cx, buf)) |
117 | 0 | } |
118 | | } |
119 | | |
120 | | impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> { |
121 | 0 | fn poll_write( |
122 | 0 | self: Pin<&mut Self>, |
123 | 0 | cx: &mut Context<'_>, |
124 | 0 | buf: &[u8], |
125 | 0 | ) -> Poll<Result<usize, io::Error>> { |
126 | 0 | self.inner.with_lock(|stream| stream.poll_write(cx, buf)) |
127 | 0 | } |
128 | | |
129 | 0 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
130 | 0 | self.inner.with_lock(|stream| stream.poll_flush(cx)) |
131 | 0 | } |
132 | | |
133 | 0 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
134 | 0 | self.inner.with_lock(|stream| stream.poll_shutdown(cx)) |
135 | 0 | } |
136 | | |
137 | 0 | fn poll_write_vectored( |
138 | 0 | self: Pin<&mut Self>, |
139 | 0 | cx: &mut Context<'_>, |
140 | 0 | bufs: &[io::IoSlice<'_>], |
141 | 0 | ) -> Poll<Result<usize, io::Error>> { |
142 | 0 | self.inner |
143 | 0 | .with_lock(|stream| stream.poll_write_vectored(cx, bufs)) |
144 | 0 | } |
145 | | |
146 | 0 | fn is_write_vectored(&self) -> bool { |
147 | 0 | self.inner.is_write_vectored |
148 | 0 | } |
149 | | } |
150 | | |
151 | | unsafe impl<T: Send> Send for ReadHalf<T> {} |
152 | | unsafe impl<T: Send> Send for WriteHalf<T> {} |
153 | | unsafe impl<T: Sync> Sync for ReadHalf<T> {} |
154 | | unsafe impl<T: Sync> Sync for WriteHalf<T> {} |
155 | | |
156 | | impl<T: fmt::Debug> fmt::Debug for ReadHalf<T> { |
157 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
158 | 0 | fmt.debug_struct("split::ReadHalf").finish() |
159 | 0 | } |
160 | | } |
161 | | |
162 | | impl<T: fmt::Debug> fmt::Debug for WriteHalf<T> { |
163 | 0 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
164 | 0 | fmt.debug_struct("split::WriteHalf").finish() |
165 | 0 | } |
166 | | } |