/src/ztunnel/src/proxy/inbound_passthrough.rs
Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::net::SocketAddr; |
16 | | use std::sync::Arc; |
17 | | use std::time::Instant; |
18 | | |
19 | | use tokio::net::TcpStream; |
20 | | use tokio::sync::watch; |
21 | | |
22 | | use tracing::{Instrument, debug, error, info, trace}; |
23 | | |
24 | | use crate::drain::DrainWatcher; |
25 | | use crate::drain::run_with_drain; |
26 | | use crate::proxy::Error; |
27 | | use crate::proxy::metrics::Reporter; |
28 | | use crate::proxy::{ProxyInputs, metrics, util}; |
29 | | use crate::state::workload::NetworkAddress; |
30 | | use crate::{assertions, copy, handle_connection, rbac, strng}; |
31 | | use crate::{proxy, socket}; |
32 | | |
33 | | pub(super) struct InboundPassthrough { |
34 | | listener: socket::Listener, |
35 | | pi: Arc<ProxyInputs>, |
36 | | drain: DrainWatcher, |
37 | | enable_orig_src: bool, |
38 | | } |
39 | | |
40 | | impl InboundPassthrough { |
41 | 0 | pub(super) async fn new( |
42 | 0 | pi: Arc<ProxyInputs>, |
43 | 0 | drain: DrainWatcher, |
44 | 0 | ) -> Result<InboundPassthrough, Error> { |
45 | 0 | let mut listener = pi |
46 | 0 | .socket_factory |
47 | 0 | .tcp_bind(pi.cfg.inbound_plaintext_addr) |
48 | 0 | .map_err(|e| Error::Bind(pi.cfg.inbound_plaintext_addr, e))?; |
49 | 0 | listener.set_socket_options(Some(pi.cfg.socket_config)); |
50 | | |
51 | 0 | let enable_orig_src = super::maybe_set_transparent(&pi, &listener)?; |
52 | | |
53 | 0 | info!( |
54 | 0 | address=%listener.local_addr(), |
55 | | component="inbound plaintext", |
56 | | transparent=enable_orig_src, |
57 | 0 | "listener established", |
58 | | ); |
59 | 0 | Ok(InboundPassthrough { |
60 | 0 | listener, |
61 | 0 | pi, |
62 | 0 | drain, |
63 | 0 | enable_orig_src, |
64 | 0 | }) |
65 | 0 | } |
66 | | |
67 | 0 | pub(super) async fn run(self) { |
68 | 0 | let pi = self.pi.clone(); |
69 | 0 | let accept = async move |drain: DrainWatcher, force_shutdown: watch::Receiver<()>| { |
70 | | loop { |
71 | | // Asynchronously wait for an inbound socket. |
72 | 0 | let socket = self.listener.accept().await; |
73 | 0 | let start = Instant::now(); |
74 | 0 | let mut force_shutdown = force_shutdown.clone(); |
75 | 0 | let drain = drain.clone(); |
76 | 0 | let pi = self.pi.clone(); |
77 | 0 | match socket { |
78 | 0 | Ok((stream, remote)) => { |
79 | 0 | let socket_labels = metrics::SocketLabels { |
80 | 0 | reporter: Reporter::destination, |
81 | 0 | }; |
82 | 0 | pi.metrics.record_socket_open(&socket_labels); |
83 | | |
84 | 0 | let metrics_for_socket_close = pi.metrics.clone(); |
85 | 0 | let serve_client = async move { |
86 | 0 | let _socket_guard = metrics::SocketCloseGuard::new( |
87 | 0 | metrics_for_socket_close, |
88 | 0 | Reporter::destination, |
89 | | ); |
90 | 0 | debug!(component="inbound passthrough", "connection started"); |
91 | | // Since this task is spawned, make sure we are guaranteed to terminate |
92 | 0 | tokio::select! { |
93 | 0 | _ = force_shutdown.changed() => { |
94 | 0 | debug!(component="inbound passthrough", "connection forcefully terminated"); |
95 | | } |
96 | 0 | _ = Self::proxy_inbound_plaintext(pi, socket::to_canonical(remote), stream, self.enable_orig_src) => {} |
97 | | } |
98 | | // Mark we are done with the connection, so drain can complete |
99 | 0 | drop(drain); |
100 | 0 | debug!(component="inbound passthrough", dur=?start.elapsed(), "connection completed"); |
101 | 0 | }.in_current_span(); |
102 | | |
103 | 0 | assertions::size_between_ref(1500, 3000, &serve_client); |
104 | 0 | tokio::spawn(serve_client); |
105 | | } |
106 | 0 | Err(e) => { |
107 | 0 | if util::is_runtime_shutdown(&e) { |
108 | 0 | return; |
109 | 0 | } |
110 | 0 | error!("Failed TCP handshake {}", e); |
111 | | } |
112 | | } |
113 | | } |
114 | 0 | }; Unexecuted instantiation: <ztunnel::proxy::inbound_passthrough::InboundPassthrough>::run::{closure#0}::{closure#0}::{closure#0}::<_>Unexecuted instantiation: <ztunnel::proxy::inbound_passthrough::InboundPassthrough>::run::{closure#0}::{closure#0} |
115 | | |
116 | 0 | run_with_drain( |
117 | 0 | "inbound passthrough".to_string(), |
118 | 0 | self.drain, |
119 | 0 | pi.cfg.self_termination_deadline, |
120 | 0 | accept, |
121 | 0 | ) |
122 | 0 | .await |
123 | 0 | } |
124 | | |
125 | 0 | async fn proxy_inbound_plaintext( |
126 | 0 | pi: Arc<ProxyInputs>, |
127 | 0 | source_addr: SocketAddr, |
128 | 0 | inbound_stream: TcpStream, |
129 | 0 | enable_orig_src: bool, |
130 | 0 | ) { |
131 | 0 | let start = Instant::now(); |
132 | 0 | let dest_addr = socket::orig_dst_addr_or_default(&inbound_stream); |
133 | | // Check if it is an illegal call to ourself, which could trampoline to illegal addresses or |
134 | | // lead to infinite loops |
135 | 0 | let illegal_call = pi.cfg.illegal_ports.contains(&dest_addr.port()); |
136 | 0 | if illegal_call { |
137 | 0 | metrics::log_early_deny( |
138 | 0 | source_addr, |
139 | 0 | dest_addr, |
140 | 0 | Reporter::destination, |
141 | 0 | Error::SelfCall, |
142 | | ); |
143 | 0 | return; |
144 | 0 | } |
145 | 0 | let upstream_workload = match pi.local_workload_information.get_workload().await { |
146 | 0 | Ok(upstream_workload) => upstream_workload, |
147 | 0 | Err(e) => { |
148 | 0 | metrics::log_early_deny(source_addr, dest_addr, Reporter::destination, e); |
149 | 0 | return; |
150 | | } |
151 | | }; |
152 | 0 | let upstream_services = pi.state.get_services_by_workload(&upstream_workload); |
153 | | |
154 | 0 | let rbac_ctx = crate::state::ProxyRbacContext { |
155 | 0 | conn: rbac::Connection { |
156 | 0 | src_identity: None, |
157 | 0 | src: source_addr, |
158 | 0 | // inbound request must be on our network since this is passthrough |
159 | 0 | // rather than HBONE, which can be tunneled across networks through gateways. |
160 | 0 | // by definition, without the gateway our source must be on our network. |
161 | 0 | dst_network: strng::new(&pi.cfg.network), |
162 | 0 | dst: dest_addr, |
163 | 0 | }, |
164 | 0 | dest_workload: upstream_workload.clone(), |
165 | 0 | }; |
166 | | |
167 | | // Find source info. We can lookup by XDS or from connection attributes |
168 | 0 | let source_workload = { |
169 | 0 | let network_addr_srcip = NetworkAddress { |
170 | 0 | // inbound request must be on our network since this is passthrough |
171 | 0 | // rather than HBONE, which can be tunneled across networks through gateways. |
172 | 0 | // by definition, without the gateway our source must be on our network. |
173 | 0 | network: pi.cfg.network.as_str().into(), |
174 | 0 | address: source_addr.ip(), |
175 | 0 | }; |
176 | 0 | pi.state |
177 | 0 | .fetch_workload_by_address(&network_addr_srcip) |
178 | 0 | .await |
179 | | }; |
180 | 0 | let derived_source = metrics::DerivedWorkload { |
181 | 0 | identity: rbac_ctx.conn.src_identity.clone(), |
182 | 0 | ..Default::default() |
183 | 0 | }; |
184 | 0 | let ds = proxy::guess_inbound_service( |
185 | 0 | &rbac_ctx.conn, |
186 | 0 | &None, |
187 | 0 | upstream_services, |
188 | 0 | &upstream_workload, |
189 | | ); |
190 | 0 | let result_tracker = Box::new( |
191 | 0 | metrics::ConnectionResultBuilder::new( |
192 | 0 | source_addr, |
193 | 0 | dest_addr, |
194 | 0 | None, |
195 | 0 | start, |
196 | 0 | metrics::ConnectionOpen { |
197 | 0 | reporter: Reporter::destination, |
198 | 0 | source: source_workload, |
199 | 0 | derived_source: Some(derived_source), |
200 | 0 | destination: Some(upstream_workload), |
201 | 0 | connection_security_policy: metrics::SecurityPolicy::unknown, |
202 | 0 | destination_service: ds, |
203 | 0 | }, |
204 | 0 | pi.metrics.clone(), |
205 | | ) |
206 | 0 | .build(), |
207 | | ); |
208 | | |
209 | 0 | let mut conn_guard = match pi |
210 | 0 | .connection_manager |
211 | 0 | .assert_rbac(&pi.state, &rbac_ctx, None) |
212 | 0 | .await |
213 | | { |
214 | 0 | Ok(cg) => cg, |
215 | 0 | Err(e) => { |
216 | 0 | result_tracker |
217 | 0 | .record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied); |
218 | 0 | return; |
219 | | } |
220 | | }; |
221 | | |
222 | 0 | let orig_src = if enable_orig_src { |
223 | 0 | Some(source_addr.ip()) |
224 | | } else { |
225 | 0 | None |
226 | | }; |
227 | | |
228 | 0 | let send = async { |
229 | 0 | trace!(%source_addr, %dest_addr, component="inbound plaintext", "connecting..."); |
230 | | |
231 | 0 | let outbound = super::freebind_connect(orig_src, dest_addr, pi.socket_factory.as_ref()) |
232 | 0 | .await |
233 | 0 | .map_err(Error::ConnectionFailed)?; |
234 | | |
235 | 0 | trace!(%source_addr, destination=%dest_addr, component="inbound plaintext", "connected"); |
236 | 0 | copy::copy_bidirectional( |
237 | 0 | copy::TcpStreamSplitter(inbound_stream), |
238 | 0 | copy::TcpStreamSplitter(outbound), |
239 | 0 | &result_tracker, |
240 | 0 | ) |
241 | 0 | .await |
242 | 0 | }; |
243 | | |
244 | 0 | let res = handle_connection!(conn_guard, send); |
245 | 0 | result_tracker.record(res); |
246 | 0 | } |
247 | | } |