/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-util-0.7.18/src/io/simplex.rs
Line | Count | Source |
1 | | //! Unidirectional byte-oriented channel. |
2 | | |
3 | | use crate::util::poll_proceed; |
4 | | |
5 | | use bytes::Buf; |
6 | | use bytes::BytesMut; |
7 | | use futures_core::ready; |
8 | | use std::io::Error as IoError; |
9 | | use std::io::ErrorKind as IoErrorKind; |
10 | | use std::io::IoSlice; |
11 | | use std::pin::Pin; |
12 | | use std::sync::{Arc, Mutex}; |
13 | | use std::task::{Context, Poll, Waker}; |
14 | | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
15 | | |
16 | | type IoResult<T> = Result<T, IoError>; |
17 | | |
18 | | const CLOSED_ERROR_MSG: &str = "simplex has been closed"; |
19 | | |
20 | | #[derive(Debug)] |
21 | | struct Inner { |
22 | | /// `poll_write` will return [`Poll::Pending`] if the backpressure boundary is reached |
23 | | backpressure_boundary: usize, |
24 | | |
25 | | /// either [`Sender`] or [`Receiver`] is closed |
26 | | is_closed: bool, |
27 | | |
28 | | /// Waker used to wake the [`Receiver`] |
29 | | receiver_waker: Option<Waker>, |
30 | | |
31 | | /// Waker used to wake the [`Sender`] |
32 | | sender_waker: Option<Waker>, |
33 | | |
34 | | /// Buffer used to read and write data |
35 | | buf: BytesMut, |
36 | | } |
37 | | |
38 | | impl Inner { |
39 | 0 | fn with_capacity(capacity: usize) -> Self { |
40 | 0 | Self { |
41 | 0 | backpressure_boundary: capacity, |
42 | 0 | is_closed: false, |
43 | 0 | receiver_waker: None, |
44 | 0 | sender_waker: None, |
45 | 0 | buf: BytesMut::with_capacity(capacity), |
46 | 0 | } |
47 | 0 | } |
48 | | |
49 | 0 | fn register_receiver_waker(&mut self, waker: &Waker) -> Option<Waker> { |
50 | 0 | match self.receiver_waker.as_mut() { |
51 | 0 | Some(old) if old.will_wake(waker) => None, |
52 | 0 | _ => self.receiver_waker.replace(waker.clone()), |
53 | | } |
54 | 0 | } |
55 | | |
56 | 0 | fn register_sender_waker(&mut self, waker: &Waker) -> Option<Waker> { |
57 | 0 | match self.sender_waker.as_mut() { |
58 | 0 | Some(old) if old.will_wake(waker) => None, |
59 | 0 | _ => self.sender_waker.replace(waker.clone()), |
60 | | } |
61 | 0 | } |
62 | | |
63 | 0 | fn take_receiver_waker(&mut self) -> Option<Waker> { |
64 | 0 | self.receiver_waker.take() |
65 | 0 | } |
66 | | |
67 | 0 | fn take_sender_waker(&mut self) -> Option<Waker> { |
68 | 0 | self.sender_waker.take() |
69 | 0 | } |
70 | | |
71 | 0 | fn is_closed(&self) -> bool { |
72 | 0 | self.is_closed |
73 | 0 | } |
74 | | |
75 | 0 | fn close_receiver(&mut self) -> Option<Waker> { |
76 | 0 | self.is_closed = true; |
77 | 0 | self.take_sender_waker() |
78 | 0 | } |
79 | | |
80 | 0 | fn close_sender(&mut self) -> Option<Waker> { |
81 | 0 | self.is_closed = true; |
82 | 0 | self.take_receiver_waker() |
83 | 0 | } |
84 | | } |
85 | | |
86 | | /// Receiver of the simplex channel. |
87 | | /// |
88 | | /// # Cancellation safety |
89 | | /// |
90 | | /// The `Receiver` is cancel safe. If it is used as the event in a |
91 | | /// [`tokio::select!`](macro@tokio::select) statement and some other branch |
92 | | /// completes first, it is guaranteed that no bytes were received on this |
93 | | /// channel. |
94 | | /// |
95 | | /// You can still read the remaining data from the buffer |
96 | | /// even if the write half has been dropped. |
97 | | /// See [`Sender::poll_shutdown`] and [`Sender::drop`] for more details. |
98 | | #[derive(Debug)] |
99 | | pub struct Receiver { |
100 | | inner: Arc<Mutex<Inner>>, |
101 | | } |
102 | | |
103 | | impl Drop for Receiver { |
104 | | /// This also wakes up the [`Sender`]. |
105 | 0 | fn drop(&mut self) { |
106 | 0 | let maybe_waker = { |
107 | 0 | let mut inner = self.inner.lock().unwrap(); |
108 | 0 | inner.close_receiver() |
109 | | }; |
110 | | |
111 | 0 | if let Some(waker) = maybe_waker { |
112 | 0 | waker.wake(); |
113 | 0 | } |
114 | 0 | } |
115 | | } |
116 | | |
117 | | impl AsyncRead for Receiver { |
118 | 0 | fn poll_read( |
119 | 0 | self: Pin<&mut Self>, |
120 | 0 | cx: &mut Context<'_>, |
121 | 0 | buf: &mut ReadBuf<'_>, |
122 | 0 | ) -> Poll<IoResult<()>> { |
123 | 0 | let coop = ready!(poll_proceed(cx)); |
124 | | |
125 | 0 | let mut inner = self.inner.lock().unwrap(); |
126 | | |
127 | 0 | let to_read = buf.remaining().min(inner.buf.remaining()); |
128 | 0 | if to_read == 0 { |
129 | 0 | if inner.is_closed() || buf.remaining() == 0 { |
130 | 0 | return Poll::Ready(Ok(())); |
131 | 0 | } |
132 | | |
133 | 0 | let old_waker = inner.register_receiver_waker(cx.waker()); |
134 | 0 | let maybe_waker = inner.take_sender_waker(); |
135 | | |
136 | | // unlock before waking up and dropping old waker |
137 | 0 | drop(inner); |
138 | 0 | drop(old_waker); |
139 | 0 | if let Some(waker) = maybe_waker { |
140 | 0 | waker.wake(); |
141 | 0 | } |
142 | 0 | return Poll::Pending; |
143 | 0 | } |
144 | | |
145 | | // this is to avoid starving other tasks |
146 | 0 | coop.made_progress(); |
147 | | |
148 | 0 | buf.put_slice(&inner.buf[..to_read]); |
149 | 0 | inner.buf.advance(to_read); |
150 | | |
151 | 0 | let waker = inner.take_sender_waker(); |
152 | 0 | drop(inner); // unlock before waking up |
153 | 0 | if let Some(waker) = waker { |
154 | 0 | waker.wake(); |
155 | 0 | } |
156 | | |
157 | 0 | Poll::Ready(Ok(())) |
158 | 0 | } |
159 | | } |
160 | | |
161 | | /// Sender of the simplex channel. |
162 | | /// |
163 | | /// # Cancellation safety |
164 | | /// |
165 | | /// The `Sender` is cancel safe. If it is used as the event in a |
166 | | /// [`tokio::select!`](macro@tokio::select) statement and some other branch |
167 | | /// completes first, it is guaranteed that no bytes were sent on this |
168 | | /// channel. |
169 | | /// |
170 | | /// # Shutdown |
171 | | /// |
172 | | /// See [`Sender::poll_shutdown`]. |
173 | | #[derive(Debug)] |
174 | | pub struct Sender { |
175 | | inner: Arc<Mutex<Inner>>, |
176 | | } |
177 | | |
178 | | impl Drop for Sender { |
179 | | /// This also wakes up the [`Receiver`]. |
180 | 0 | fn drop(&mut self) { |
181 | 0 | let maybe_waker = { |
182 | 0 | let mut inner = self.inner.lock().unwrap(); |
183 | 0 | inner.close_sender() |
184 | | }; |
185 | | |
186 | 0 | if let Some(waker) = maybe_waker { |
187 | 0 | waker.wake(); |
188 | 0 | } |
189 | 0 | } |
190 | | } |
191 | | |
192 | | impl AsyncWrite for Sender { |
193 | | /// # Errors |
194 | | /// |
195 | | /// This method will return [`IoErrorKind::BrokenPipe`] |
196 | | /// if the channel has been closed. |
197 | 0 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> { |
198 | 0 | let coop = ready!(poll_proceed(cx)); |
199 | | |
200 | 0 | let mut inner = self.inner.lock().unwrap(); |
201 | | |
202 | 0 | if inner.is_closed() { |
203 | 0 | return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))); |
204 | 0 | } |
205 | | |
206 | 0 | let free = inner |
207 | 0 | .backpressure_boundary |
208 | 0 | .checked_sub(inner.buf.len()) |
209 | 0 | .expect("backpressure boundary overflow"); |
210 | 0 | let to_write = buf.len().min(free); |
211 | 0 | if to_write == 0 { |
212 | 0 | if buf.is_empty() { |
213 | 0 | return Poll::Ready(Ok(0)); |
214 | 0 | } |
215 | | |
216 | 0 | let old_waker = inner.register_sender_waker(cx.waker()); |
217 | 0 | let waker = inner.take_receiver_waker(); |
218 | | |
219 | | // unlock before waking up and dropping old waker |
220 | 0 | drop(inner); |
221 | 0 | drop(old_waker); |
222 | 0 | if let Some(waker) = waker { |
223 | 0 | waker.wake(); |
224 | 0 | } |
225 | | |
226 | 0 | return Poll::Pending; |
227 | 0 | } |
228 | | |
229 | | // this is to avoid starving other tasks |
230 | 0 | coop.made_progress(); |
231 | | |
232 | 0 | inner.buf.extend_from_slice(&buf[..to_write]); |
233 | | |
234 | 0 | let waker = inner.take_receiver_waker(); |
235 | 0 | drop(inner); // unlock before waking up |
236 | 0 | if let Some(waker) = waker { |
237 | 0 | waker.wake(); |
238 | 0 | } |
239 | | |
240 | 0 | Poll::Ready(Ok(to_write)) |
241 | 0 | } |
242 | | |
243 | | /// # Errors |
244 | | /// |
245 | | /// This method will return [`IoErrorKind::BrokenPipe`] |
246 | | /// if the channel has been closed. |
247 | 0 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> { |
248 | 0 | let inner = self.inner.lock().unwrap(); |
249 | 0 | if inner.is_closed() { |
250 | 0 | Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))) |
251 | | } else { |
252 | 0 | Poll::Ready(Ok(())) |
253 | | } |
254 | 0 | } |
255 | | |
256 | | /// After returns [`Poll::Ready`], all the following call to |
257 | | /// [`Sender::poll_write`] and [`Sender::poll_flush`] |
258 | | /// will return error. |
259 | | /// |
260 | | /// The [`Receiver`] can still be used to read remaining data |
261 | | /// until all bytes have been consumed. |
262 | 0 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> { |
263 | 0 | let maybe_waker = { |
264 | 0 | let mut inner = self.inner.lock().unwrap(); |
265 | 0 | inner.close_sender() |
266 | | }; |
267 | | |
268 | 0 | if let Some(waker) = maybe_waker { |
269 | 0 | waker.wake(); |
270 | 0 | } |
271 | | |
272 | 0 | Poll::Ready(Ok(())) |
273 | 0 | } |
274 | | |
275 | 0 | fn is_write_vectored(&self) -> bool { |
276 | 0 | true |
277 | 0 | } |
278 | | |
279 | 0 | fn poll_write_vectored( |
280 | 0 | self: Pin<&mut Self>, |
281 | 0 | cx: &mut Context<'_>, |
282 | 0 | bufs: &[IoSlice<'_>], |
283 | 0 | ) -> Poll<Result<usize, IoError>> { |
284 | 0 | let coop = ready!(poll_proceed(cx)); |
285 | | |
286 | 0 | let mut inner = self.inner.lock().unwrap(); |
287 | 0 | if inner.is_closed() { |
288 | 0 | return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))); |
289 | 0 | } |
290 | | |
291 | 0 | let free = inner |
292 | 0 | .backpressure_boundary |
293 | 0 | .checked_sub(inner.buf.len()) |
294 | 0 | .expect("backpressure boundary overflow"); |
295 | 0 | if free == 0 { |
296 | 0 | let old_waker = inner.register_sender_waker(cx.waker()); |
297 | 0 | let maybe_waker = inner.take_receiver_waker(); |
298 | | |
299 | | // unlock before waking up and dropping old waker |
300 | 0 | drop(inner); |
301 | 0 | drop(old_waker); |
302 | 0 | if let Some(waker) = maybe_waker { |
303 | 0 | waker.wake(); |
304 | 0 | } |
305 | | |
306 | 0 | return Poll::Pending; |
307 | 0 | } |
308 | | |
309 | | // this is to avoid starving other tasks |
310 | 0 | coop.made_progress(); |
311 | | |
312 | 0 | let mut rem = free; |
313 | 0 | for buf in bufs { |
314 | 0 | if rem == 0 { |
315 | 0 | break; |
316 | 0 | } |
317 | | |
318 | 0 | let to_write = buf.len().min(rem); |
319 | 0 | if to_write == 0 { |
320 | 0 | assert_ne!(rem, 0); |
321 | 0 | assert_eq!(buf.len(), 0); |
322 | 0 | continue; |
323 | 0 | } |
324 | | |
325 | 0 | inner.buf.extend_from_slice(&buf[..to_write]); |
326 | 0 | rem -= to_write; |
327 | | } |
328 | | |
329 | 0 | let waker = inner.take_receiver_waker(); |
330 | 0 | drop(inner); // unlock before waking up |
331 | 0 | if let Some(waker) = waker { |
332 | 0 | waker.wake(); |
333 | 0 | } |
334 | | |
335 | 0 | Poll::Ready(Ok(free - rem)) |
336 | 0 | } |
337 | | } |
338 | | |
339 | | /// Create a simplex channel. |
340 | | /// |
341 | | /// The `capacity` parameter specifies the maximum number of bytes that can be |
342 | | /// stored in the channel without making the [`Sender::poll_write`] |
343 | | /// return [`Poll::Pending`]. |
344 | | /// |
345 | | /// # Panics |
346 | | /// |
347 | | /// This function will panic if `capacity` is zero. |
348 | 0 | pub fn new(capacity: usize) -> (Sender, Receiver) { |
349 | 0 | assert_ne!(capacity, 0, "capacity must be greater than zero"); |
350 | | |
351 | 0 | let inner = Arc::new(Mutex::new(Inner::with_capacity(capacity))); |
352 | 0 | let tx = Sender { |
353 | 0 | inner: Arc::clone(&inner), |
354 | 0 | }; |
355 | 0 | let rx = Receiver { inner }; |
356 | 0 | (tx, rx) |
357 | 0 | } |