Coverage Report

Created: 2026-03-08 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/thrift/lib/rs/src/transport/socket.rs
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements. See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership. The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License. You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied. See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
use std::convert::From;
19
use std::io;
20
use std::io::{ErrorKind, Read, Write};
21
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
22
23
#[cfg(unix)]
24
use std::os::unix::net::UnixStream;
25
26
use super::{ReadHalf, TIoChannel, WriteHalf};
27
use crate::{new_transport_error, TransportErrorKind};
28
29
/// Bidirectional TCP/IP channel.
30
///
31
/// # Examples
32
///
33
/// Create a `TTcpChannel`.
34
///
35
/// ```no_run
36
/// use std::io::{Read, Write};
37
/// use thrift::transport::TTcpChannel;
38
///
39
/// let mut c = TTcpChannel::new();
40
/// c.open("localhost:9090").unwrap();
41
///
42
/// let mut buf = vec![0u8; 4];
43
/// c.read(&mut buf).unwrap();
44
/// c.write(&vec![0, 1, 2]).unwrap();
45
/// ```
46
///
47
/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
48
///
49
/// ```no_run
50
/// use std::io::{Read, Write};
51
/// use std::net::TcpStream;
52
/// use thrift::transport::TTcpChannel;
53
///
54
/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
55
/// stream.set_nodelay(true).unwrap();
56
///
57
/// // no need to call c.open() since we've already connected above
58
/// let mut c = TTcpChannel::with_stream(stream);
59
///
60
/// let mut buf = vec![0u8; 4];
61
/// c.read(&mut buf).unwrap();
62
/// c.write(&vec![0, 1, 2]).unwrap();
63
/// ```
64
#[derive(Debug, Default)]
65
pub struct TTcpChannel {
66
    stream: Option<TcpStream>,
67
}
68
69
impl TTcpChannel {
70
    /// Create an uninitialized `TTcpChannel`.
71
    ///
72
    /// The returned instance must be opened using `TTcpChannel::open(...)`
73
    /// before it can be used.
74
    pub fn new() -> TTcpChannel {
75
        TTcpChannel { stream: None }
76
    }
77
78
    /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
79
    ///
80
    /// The passed-in stream is assumed to have been opened before being wrapped
81
    /// by the created `TTcpChannel` instance.
82
    pub fn with_stream(stream: TcpStream) -> TTcpChannel {
83
        TTcpChannel {
84
            stream: Some(stream),
85
        }
86
    }
87
88
    /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait.
89
    pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()> {
90
        if self.stream.is_some() {
91
            Err(new_transport_error(
92
                TransportErrorKind::AlreadyOpen,
93
                "tcp connection previously opened",
94
            ))
95
        } else {
96
            match TcpStream::connect(&remote_address) {
97
                Ok(s) => {
98
                    s.set_nodelay(true)?;
99
                    self.stream = Some(s);
100
                    Ok(())
101
                }
102
                Err(e) => Err(From::from(e)),
103
            }
104
        }
105
    }
106
107
    /// Shut down this channel.
108
    ///
109
    /// Both send and receive halves are closed, and this instance can no
110
    /// longer be used to communicate with another endpoint.
111
    pub fn close(&mut self) -> crate::Result<()> {
112
0
        self.if_set(|s| s.shutdown(Shutdown::Both))
113
            .map_err(From::from)
114
    }
115
116
0
    fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
117
0
    where
118
0
        F: FnMut(&mut TcpStream) -> io::Result<T>,
119
    {
120
0
        if let Some(ref mut s) = self.stream {
121
0
            stream_operation(s)
122
        } else {
123
0
            Err(io::Error::new(
124
0
                ErrorKind::NotConnected,
125
0
                "tcp endpoint not connected",
126
0
            ))
127
        }
128
0
    }
Unexecuted instantiation: <thrift::transport::socket::TTcpChannel>::if_set::<<thrift::transport::socket::TTcpChannel>::close::{closure#0}, ()>
Unexecuted instantiation: <thrift::transport::socket::TTcpChannel>::if_set::<<thrift::transport::socket::TTcpChannel as std::io::Read>::read::{closure#0}, usize>
Unexecuted instantiation: <thrift::transport::socket::TTcpChannel>::if_set::<<thrift::transport::socket::TTcpChannel as std::io::Write>::flush::{closure#0}, ()>
Unexecuted instantiation: <thrift::transport::socket::TTcpChannel>::if_set::<<thrift::transport::socket::TTcpChannel as std::io::Write>::write::{closure#0}, usize>
129
}
130
131
impl TIoChannel for TTcpChannel {
132
    fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
133
    where
134
        Self: Sized,
135
    {
136
        let mut s = self;
137
138
        s.stream
139
            .as_mut()
140
0
            .and_then(|s| s.try_clone().ok())
141
0
            .map(|cloned| {
142
0
                let read_half = ReadHalf::new(TTcpChannel {
143
0
                    stream: s.stream.take(),
144
0
                });
145
0
                let write_half = WriteHalf::new(TTcpChannel {
146
0
                    stream: Some(cloned),
147
0
                });
148
0
                (read_half, write_half)
149
0
            })
150
0
            .ok_or_else(|| {
151
0
                new_transport_error(
152
0
                    TransportErrorKind::Unknown,
153
                    "cannot clone underlying tcp stream",
154
                )
155
0
            })
156
    }
157
}
158
159
impl Read for TTcpChannel {
160
    fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
161
0
        self.if_set(|s| s.read(b))
162
    }
163
}
164
165
impl Write for TTcpChannel {
166
    fn write(&mut self, b: &[u8]) -> io::Result<usize> {
167
0
        self.if_set(|s| s.write(b))
168
    }
169
170
    fn flush(&mut self) -> io::Result<()> {
171
0
        self.if_set(|s| s.flush())
172
    }
173
}
174
175
#[cfg(unix)]
176
impl TIoChannel for UnixStream {
177
    fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
178
    where
179
        Self: Sized,
180
    {
181
        let socket_rx = self.try_clone().unwrap();
182
183
        Ok((ReadHalf::new(self), WriteHalf::new(socket_rx)))
184
    }
185
}