Coverage Report

Created: 2026-02-14 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-util-0.7.18/src/io/simplex.rs
Line
Count
Source
1
//! Unidirectional byte-oriented channel.
2
3
use crate::util::poll_proceed;
4
5
use bytes::Buf;
6
use bytes::BytesMut;
7
use futures_core::ready;
8
use std::io::Error as IoError;
9
use std::io::ErrorKind as IoErrorKind;
10
use std::io::IoSlice;
11
use std::pin::Pin;
12
use std::sync::{Arc, Mutex};
13
use std::task::{Context, Poll, Waker};
14
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15
16
type IoResult<T> = Result<T, IoError>;
17
18
const CLOSED_ERROR_MSG: &str = "simplex has been closed";
19
20
#[derive(Debug)]
21
struct Inner {
22
    /// `poll_write` will return [`Poll::Pending`] if the backpressure boundary is reached
23
    backpressure_boundary: usize,
24
25
    /// either [`Sender`] or [`Receiver`] is closed
26
    is_closed: bool,
27
28
    /// Waker used to wake the [`Receiver`]
29
    receiver_waker: Option<Waker>,
30
31
    /// Waker used to wake the [`Sender`]
32
    sender_waker: Option<Waker>,
33
34
    /// Buffer used to read and write data
35
    buf: BytesMut,
36
}
37
38
impl Inner {
39
0
    fn with_capacity(capacity: usize) -> Self {
40
0
        Self {
41
0
            backpressure_boundary: capacity,
42
0
            is_closed: false,
43
0
            receiver_waker: None,
44
0
            sender_waker: None,
45
0
            buf: BytesMut::with_capacity(capacity),
46
0
        }
47
0
    }
48
49
0
    fn register_receiver_waker(&mut self, waker: &Waker) -> Option<Waker> {
50
0
        match self.receiver_waker.as_mut() {
51
0
            Some(old) if old.will_wake(waker) => None,
52
0
            _ => self.receiver_waker.replace(waker.clone()),
53
        }
54
0
    }
55
56
0
    fn register_sender_waker(&mut self, waker: &Waker) -> Option<Waker> {
57
0
        match self.sender_waker.as_mut() {
58
0
            Some(old) if old.will_wake(waker) => None,
59
0
            _ => self.sender_waker.replace(waker.clone()),
60
        }
61
0
    }
62
63
0
    fn take_receiver_waker(&mut self) -> Option<Waker> {
64
0
        self.receiver_waker.take()
65
0
    }
66
67
0
    fn take_sender_waker(&mut self) -> Option<Waker> {
68
0
        self.sender_waker.take()
69
0
    }
70
71
0
    fn is_closed(&self) -> bool {
72
0
        self.is_closed
73
0
    }
74
75
0
    fn close_receiver(&mut self) -> Option<Waker> {
76
0
        self.is_closed = true;
77
0
        self.take_sender_waker()
78
0
    }
79
80
0
    fn close_sender(&mut self) -> Option<Waker> {
81
0
        self.is_closed = true;
82
0
        self.take_receiver_waker()
83
0
    }
84
}
85
86
/// Receiver of the simplex channel.
87
///
88
/// # Cancellation safety
89
///
90
/// The `Receiver` is cancel safe. If it is used as the event in a
91
/// [`tokio::select!`](macro@tokio::select) statement and some other branch
92
/// completes first, it is guaranteed that no bytes were received on this
93
/// channel.
94
///
95
/// You can still read the remaining data from the buffer
96
/// even if the write half has been dropped.
97
/// See [`Sender::poll_shutdown`] and [`Sender::drop`] for more details.
98
#[derive(Debug)]
99
pub struct Receiver {
100
    inner: Arc<Mutex<Inner>>,
101
}
102
103
impl Drop for Receiver {
104
    /// This also wakes up the [`Sender`].
105
0
    fn drop(&mut self) {
106
0
        let maybe_waker = {
107
0
            let mut inner = self.inner.lock().unwrap();
108
0
            inner.close_receiver()
109
        };
110
111
0
        if let Some(waker) = maybe_waker {
112
0
            waker.wake();
113
0
        }
114
0
    }
115
}
116
117
impl AsyncRead for Receiver {
118
0
    fn poll_read(
119
0
        self: Pin<&mut Self>,
120
0
        cx: &mut Context<'_>,
121
0
        buf: &mut ReadBuf<'_>,
122
0
    ) -> Poll<IoResult<()>> {
123
0
        let coop = ready!(poll_proceed(cx));
124
125
0
        let mut inner = self.inner.lock().unwrap();
126
127
0
        let to_read = buf.remaining().min(inner.buf.remaining());
128
0
        if to_read == 0 {
129
0
            if inner.is_closed() || buf.remaining() == 0 {
130
0
                return Poll::Ready(Ok(()));
131
0
            }
132
133
0
            let old_waker = inner.register_receiver_waker(cx.waker());
134
0
            let maybe_waker = inner.take_sender_waker();
135
136
            // unlock before waking up and dropping old waker
137
0
            drop(inner);
138
0
            drop(old_waker);
139
0
            if let Some(waker) = maybe_waker {
140
0
                waker.wake();
141
0
            }
142
0
            return Poll::Pending;
143
0
        }
144
145
        // this is to avoid starving other tasks
146
0
        coop.made_progress();
147
148
0
        buf.put_slice(&inner.buf[..to_read]);
149
0
        inner.buf.advance(to_read);
150
151
0
        let waker = inner.take_sender_waker();
152
0
        drop(inner); // unlock before waking up
153
0
        if let Some(waker) = waker {
154
0
            waker.wake();
155
0
        }
156
157
0
        Poll::Ready(Ok(()))
158
0
    }
159
}
160
161
/// Sender of the simplex channel.
162
///
163
/// # Cancellation safety
164
///
165
/// The `Sender` is cancel safe. If it is used as the event in a
166
/// [`tokio::select!`](macro@tokio::select) statement and some other branch
167
/// completes first, it is guaranteed that no bytes were sent on this
168
/// channel.
169
///
170
/// # Shutdown
171
///
172
/// See [`Sender::poll_shutdown`].
173
#[derive(Debug)]
174
pub struct Sender {
175
    inner: Arc<Mutex<Inner>>,
176
}
177
178
impl Drop for Sender {
179
    /// This also wakes up the [`Receiver`].
180
0
    fn drop(&mut self) {
181
0
        let maybe_waker = {
182
0
            let mut inner = self.inner.lock().unwrap();
183
0
            inner.close_sender()
184
        };
185
186
0
        if let Some(waker) = maybe_waker {
187
0
            waker.wake();
188
0
        }
189
0
    }
190
}
191
192
impl AsyncWrite for Sender {
193
    /// # Errors
194
    ///
195
    /// This method will return [`IoErrorKind::BrokenPipe`]
196
    /// if the channel has been closed.
197
0
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
198
0
        let coop = ready!(poll_proceed(cx));
199
200
0
        let mut inner = self.inner.lock().unwrap();
201
202
0
        if inner.is_closed() {
203
0
            return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)));
204
0
        }
205
206
0
        let free = inner
207
0
            .backpressure_boundary
208
0
            .checked_sub(inner.buf.len())
209
0
            .expect("backpressure boundary overflow");
210
0
        let to_write = buf.len().min(free);
211
0
        if to_write == 0 {
212
0
            if buf.is_empty() {
213
0
                return Poll::Ready(Ok(0));
214
0
            }
215
216
0
            let old_waker = inner.register_sender_waker(cx.waker());
217
0
            let waker = inner.take_receiver_waker();
218
219
            // unlock before waking up and dropping old waker
220
0
            drop(inner);
221
0
            drop(old_waker);
222
0
            if let Some(waker) = waker {
223
0
                waker.wake();
224
0
            }
225
226
0
            return Poll::Pending;
227
0
        }
228
229
        // this is to avoid starving other tasks
230
0
        coop.made_progress();
231
232
0
        inner.buf.extend_from_slice(&buf[..to_write]);
233
234
0
        let waker = inner.take_receiver_waker();
235
0
        drop(inner); // unlock before waking up
236
0
        if let Some(waker) = waker {
237
0
            waker.wake();
238
0
        }
239
240
0
        Poll::Ready(Ok(to_write))
241
0
    }
242
243
    /// # Errors
244
    ///
245
    /// This method will return [`IoErrorKind::BrokenPipe`]
246
    /// if the channel has been closed.
247
0
    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
248
0
        let inner = self.inner.lock().unwrap();
249
0
        if inner.is_closed() {
250
0
            Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)))
251
        } else {
252
0
            Poll::Ready(Ok(()))
253
        }
254
0
    }
255
256
    /// After returns [`Poll::Ready`], all the following call to
257
    /// [`Sender::poll_write`] and [`Sender::poll_flush`]
258
    /// will return error.
259
    ///
260
    /// The [`Receiver`] can still be used to read remaining data
261
    /// until all bytes have been consumed.
262
0
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
263
0
        let maybe_waker = {
264
0
            let mut inner = self.inner.lock().unwrap();
265
0
            inner.close_sender()
266
        };
267
268
0
        if let Some(waker) = maybe_waker {
269
0
            waker.wake();
270
0
        }
271
272
0
        Poll::Ready(Ok(()))
273
0
    }
274
275
0
    fn is_write_vectored(&self) -> bool {
276
0
        true
277
0
    }
278
279
0
    fn poll_write_vectored(
280
0
        self: Pin<&mut Self>,
281
0
        cx: &mut Context<'_>,
282
0
        bufs: &[IoSlice<'_>],
283
0
    ) -> Poll<Result<usize, IoError>> {
284
0
        let coop = ready!(poll_proceed(cx));
285
286
0
        let mut inner = self.inner.lock().unwrap();
287
0
        if inner.is_closed() {
288
0
            return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG)));
289
0
        }
290
291
0
        let free = inner
292
0
            .backpressure_boundary
293
0
            .checked_sub(inner.buf.len())
294
0
            .expect("backpressure boundary overflow");
295
0
        if free == 0 {
296
0
            let old_waker = inner.register_sender_waker(cx.waker());
297
0
            let maybe_waker = inner.take_receiver_waker();
298
299
            // unlock before waking up and dropping old waker
300
0
            drop(inner);
301
0
            drop(old_waker);
302
0
            if let Some(waker) = maybe_waker {
303
0
                waker.wake();
304
0
            }
305
306
0
            return Poll::Pending;
307
0
        }
308
309
        // this is to avoid starving other tasks
310
0
        coop.made_progress();
311
312
0
        let mut rem = free;
313
0
        for buf in bufs {
314
0
            if rem == 0 {
315
0
                break;
316
0
            }
317
318
0
            let to_write = buf.len().min(rem);
319
0
            if to_write == 0 {
320
0
                assert_ne!(rem, 0);
321
0
                assert_eq!(buf.len(), 0);
322
0
                continue;
323
0
            }
324
325
0
            inner.buf.extend_from_slice(&buf[..to_write]);
326
0
            rem -= to_write;
327
        }
328
329
0
        let waker = inner.take_receiver_waker();
330
0
        drop(inner); // unlock before waking up
331
0
        if let Some(waker) = waker {
332
0
            waker.wake();
333
0
        }
334
335
0
        Poll::Ready(Ok(free - rem))
336
0
    }
337
}
338
339
/// Create a simplex channel.
340
///
341
/// The `capacity` parameter specifies the maximum number of bytes that can be
342
/// stored in the channel without making the [`Sender::poll_write`]
343
/// return [`Poll::Pending`].
344
///
345
/// # Panics
346
///
347
/// This function will panic if `capacity` is zero.
348
0
pub fn new(capacity: usize) -> (Sender, Receiver) {
349
0
    assert_ne!(capacity, 0, "capacity must be greater than zero");
350
351
0
    let inner = Arc::new(Mutex::new(Inner::with_capacity(capacity)));
352
0
    let tx = Sender {
353
0
        inner: Arc::clone(&inner),
354
0
    };
355
0
    let rx = Receiver { inner };
356
0
    (tx, rx)
357
0
}