Coverage Report

Created: 2025-02-25 06:39

/rust/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.43.0/src/io/split.rs
Line
Count
Source (jump to first uncovered line)
1
//! Split a single value implementing `AsyncRead + AsyncWrite` into separate
2
//! `AsyncRead` and `AsyncWrite` handles.
3
//!
4
//! To restore this read/write object from its `split::ReadHalf` and
5
//! `split::WriteHalf` use `unsplit`.
6
7
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
8
9
use std::fmt;
10
use std::io;
11
use std::pin::Pin;
12
use std::sync::Arc;
13
use std::sync::Mutex;
14
use std::task::{Context, Poll};
15
16
cfg_io_util! {
17
    /// The readable half of a value returned from [`split`](split()).
18
    pub struct ReadHalf<T> {
19
        inner: Arc<Inner<T>>,
20
    }
21
22
    /// The writable half of a value returned from [`split`](split()).
23
    pub struct WriteHalf<T> {
24
        inner: Arc<Inner<T>>,
25
    }
26
27
    /// Splits a single value implementing `AsyncRead + AsyncWrite` into separate
28
    /// `AsyncRead` and `AsyncWrite` handles.
29
    ///
30
    /// To restore this read/write object from its `ReadHalf` and
31
    /// `WriteHalf` use [`unsplit`](ReadHalf::unsplit()).
32
0
    pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
33
0
    where
34
0
        T: AsyncRead + AsyncWrite,
35
0
    {
36
0
        let is_write_vectored = stream.is_write_vectored();
37
0
38
0
        let inner = Arc::new(Inner {
39
0
            stream: Mutex::new(stream),
40
0
            is_write_vectored,
41
0
        });
42
0
43
0
        let rd = ReadHalf {
44
0
            inner: inner.clone(),
45
0
        };
46
0
47
0
        let wr = WriteHalf { inner };
48
0
49
0
        (rd, wr)
50
0
    }
51
}
52
53
struct Inner<T> {
54
    stream: Mutex<T>,
55
    is_write_vectored: bool,
56
}
57
58
impl<T> Inner<T> {
59
0
    fn with_lock<R>(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R {
60
0
        let mut guard = self.stream.lock().unwrap();
61
0
62
0
        // safety: we do not move the stream.
63
0
        let stream = unsafe { Pin::new_unchecked(&mut *guard) };
64
0
65
0
        f(stream)
66
0
    }
67
}
68
69
impl<T> ReadHalf<T> {
70
    /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same
71
    /// stream.
72
0
    pub fn is_pair_of(&self, other: &WriteHalf<T>) -> bool {
73
0
        other.is_pair_of(self)
74
0
    }
75
76
    /// Reunites with a previously split `WriteHalf`.
77
    ///
78
    /// # Panics
79
    ///
80
    /// If this `ReadHalf` and the given `WriteHalf` do not originate from the
81
    /// same `split` operation this method will panic.
82
    /// This can be checked ahead of time by calling [`is_pair_of()`](Self::is_pair_of).
83
    #[track_caller]
84
0
    pub fn unsplit(self, wr: WriteHalf<T>) -> T
85
0
    where
86
0
        T: Unpin,
87
0
    {
88
0
        if self.is_pair_of(&wr) {
89
0
            drop(wr);
90
0
91
0
            let inner = Arc::try_unwrap(self.inner)
92
0
                .ok()
93
0
                .expect("`Arc::try_unwrap` failed");
94
0
95
0
            inner.stream.into_inner().unwrap()
96
        } else {
97
0
            panic!("Unrelated `split::Write` passed to `split::Read::unsplit`.")
98
        }
99
0
    }
100
}
101
102
impl<T> WriteHalf<T> {
103
    /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same
104
    /// stream.
105
0
    pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
106
0
        Arc::ptr_eq(&self.inner, &other.inner)
107
0
    }
108
}
109
110
impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
111
0
    fn poll_read(
112
0
        self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
        buf: &mut ReadBuf<'_>,
115
0
    ) -> Poll<io::Result<()>> {
116
0
        self.inner.with_lock(|stream| stream.poll_read(cx, buf))
117
0
    }
118
}
119
120
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
121
0
    fn poll_write(
122
0
        self: Pin<&mut Self>,
123
0
        cx: &mut Context<'_>,
124
0
        buf: &[u8],
125
0
    ) -> Poll<Result<usize, io::Error>> {
126
0
        self.inner.with_lock(|stream| stream.poll_write(cx, buf))
127
0
    }
128
129
0
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
130
0
        self.inner.with_lock(|stream| stream.poll_flush(cx))
131
0
    }
132
133
0
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
134
0
        self.inner.with_lock(|stream| stream.poll_shutdown(cx))
135
0
    }
136
137
0
    fn poll_write_vectored(
138
0
        self: Pin<&mut Self>,
139
0
        cx: &mut Context<'_>,
140
0
        bufs: &[io::IoSlice<'_>],
141
0
    ) -> Poll<Result<usize, io::Error>> {
142
0
        self.inner
143
0
            .with_lock(|stream| stream.poll_write_vectored(cx, bufs))
144
0
    }
145
146
0
    fn is_write_vectored(&self) -> bool {
147
0
        self.inner.is_write_vectored
148
0
    }
149
}
150
151
unsafe impl<T: Send> Send for ReadHalf<T> {}
152
unsafe impl<T: Send> Send for WriteHalf<T> {}
153
unsafe impl<T: Sync> Sync for ReadHalf<T> {}
154
unsafe impl<T: Sync> Sync for WriteHalf<T> {}
155
156
impl<T: fmt::Debug> fmt::Debug for ReadHalf<T> {
157
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
158
0
        fmt.debug_struct("split::ReadHalf").finish()
159
0
    }
160
}
161
162
impl<T: fmt::Debug> fmt::Debug for WriteHalf<T> {
163
0
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
164
0
        fmt.debug_struct("split::WriteHalf").finish()
165
0
    }
166
}