Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/proxy/inbound_passthrough.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::net::SocketAddr;
16
use std::sync::Arc;
17
use std::time::Instant;
18
19
use tokio::net::TcpStream;
20
use tokio::sync::watch;
21
22
use tracing::{Instrument, debug, error, info, trace};
23
24
use crate::drain::DrainWatcher;
25
use crate::drain::run_with_drain;
26
use crate::proxy::Error;
27
use crate::proxy::metrics::Reporter;
28
use crate::proxy::{ProxyInputs, metrics, util};
29
use crate::state::workload::NetworkAddress;
30
use crate::{assertions, copy, handle_connection, rbac, strng};
31
use crate::{proxy, socket};
32
33
pub(super) struct InboundPassthrough {
34
    listener: socket::Listener,
35
    pi: Arc<ProxyInputs>,
36
    drain: DrainWatcher,
37
    enable_orig_src: bool,
38
}
39
40
impl InboundPassthrough {
41
0
    pub(super) async fn new(
42
0
        pi: Arc<ProxyInputs>,
43
0
        drain: DrainWatcher,
44
0
    ) -> Result<InboundPassthrough, Error> {
45
0
        let mut listener = pi
46
0
            .socket_factory
47
0
            .tcp_bind(pi.cfg.inbound_plaintext_addr)
48
0
            .map_err(|e| Error::Bind(pi.cfg.inbound_plaintext_addr, e))?;
49
0
        listener.set_socket_options(Some(pi.cfg.socket_config));
50
51
0
        let enable_orig_src = super::maybe_set_transparent(&pi, &listener)?;
52
53
0
        info!(
54
0
            address=%listener.local_addr(),
55
            component="inbound plaintext",
56
            transparent=enable_orig_src,
57
0
            "listener established",
58
        );
59
0
        Ok(InboundPassthrough {
60
0
            listener,
61
0
            pi,
62
0
            drain,
63
0
            enable_orig_src,
64
0
        })
65
0
    }
66
67
0
    pub(super) async fn run(self) {
68
0
        let pi = self.pi.clone();
69
0
        let accept = async move |drain: DrainWatcher, force_shutdown: watch::Receiver<()>| {
70
            loop {
71
                // Asynchronously wait for an inbound socket.
72
0
                let socket = self.listener.accept().await;
73
0
                let start = Instant::now();
74
0
                let mut force_shutdown = force_shutdown.clone();
75
0
                let drain = drain.clone();
76
0
                let pi = self.pi.clone();
77
0
                match socket {
78
0
                    Ok((stream, remote)) => {
79
0
                        let socket_labels = metrics::SocketLabels {
80
0
                            reporter: Reporter::destination,
81
0
                        };
82
0
                        pi.metrics.record_socket_open(&socket_labels);
83
84
0
                        let metrics_for_socket_close = pi.metrics.clone();
85
0
                        let serve_client = async move {
86
0
                            let _socket_guard = metrics::SocketCloseGuard::new(
87
0
                                metrics_for_socket_close,
88
0
                                Reporter::destination,
89
                            );
90
0
                            debug!(component="inbound passthrough", "connection started");
91
                                // Since this task is spawned, make sure we are guaranteed to terminate
92
0
                            tokio::select! {
93
0
                                _ = force_shutdown.changed() => {
94
0
                                    debug!(component="inbound passthrough", "connection forcefully terminated");
95
                                }
96
0
                                _ = Self::proxy_inbound_plaintext(pi, socket::to_canonical(remote), stream, self.enable_orig_src) => {}
97
                            }
98
                            // Mark we are done with the connection, so drain can complete
99
0
                            drop(drain);
100
0
                            debug!(component="inbound passthrough", dur=?start.elapsed(), "connection completed");
101
0
                        }.in_current_span();
102
103
0
                        assertions::size_between_ref(1500, 3000, &serve_client);
104
0
                        tokio::spawn(serve_client);
105
                    }
106
0
                    Err(e) => {
107
0
                        if util::is_runtime_shutdown(&e) {
108
0
                            return;
109
0
                        }
110
0
                        error!("Failed TCP handshake {}", e);
111
                    }
112
                }
113
            }
114
0
        };
Unexecuted instantiation: <ztunnel::proxy::inbound_passthrough::InboundPassthrough>::run::{closure#0}::{closure#0}::{closure#0}::<_>
Unexecuted instantiation: <ztunnel::proxy::inbound_passthrough::InboundPassthrough>::run::{closure#0}::{closure#0}
115
116
0
        run_with_drain(
117
0
            "inbound passthrough".to_string(),
118
0
            self.drain,
119
0
            pi.cfg.self_termination_deadline,
120
0
            accept,
121
0
        )
122
0
        .await
123
0
    }
124
125
0
    async fn proxy_inbound_plaintext(
126
0
        pi: Arc<ProxyInputs>,
127
0
        source_addr: SocketAddr,
128
0
        inbound_stream: TcpStream,
129
0
        enable_orig_src: bool,
130
0
    ) {
131
0
        let start = Instant::now();
132
0
        let dest_addr = socket::orig_dst_addr_or_default(&inbound_stream);
133
        // Check if it is an illegal call to ourself, which could trampoline to illegal addresses or
134
        // lead to infinite loops
135
0
        let illegal_call = pi.cfg.illegal_ports.contains(&dest_addr.port());
136
0
        if illegal_call {
137
0
            metrics::log_early_deny(
138
0
                source_addr,
139
0
                dest_addr,
140
0
                Reporter::destination,
141
0
                Error::SelfCall,
142
            );
143
0
            return;
144
0
        }
145
0
        let upstream_workload = match pi.local_workload_information.get_workload().await {
146
0
            Ok(upstream_workload) => upstream_workload,
147
0
            Err(e) => {
148
0
                metrics::log_early_deny(source_addr, dest_addr, Reporter::destination, e);
149
0
                return;
150
            }
151
        };
152
0
        let upstream_services = pi.state.get_services_by_workload(&upstream_workload);
153
154
0
        let rbac_ctx = crate::state::ProxyRbacContext {
155
0
            conn: rbac::Connection {
156
0
                src_identity: None,
157
0
                src: source_addr,
158
0
                // inbound request must be on our network since this is passthrough
159
0
                // rather than HBONE, which can be tunneled across networks through gateways.
160
0
                // by definition, without the gateway our source must be on our network.
161
0
                dst_network: strng::new(&pi.cfg.network),
162
0
                dst: dest_addr,
163
0
            },
164
0
            dest_workload: upstream_workload.clone(),
165
0
        };
166
167
        // Find source info. We can lookup by XDS or from connection attributes
168
0
        let source_workload = {
169
0
            let network_addr_srcip = NetworkAddress {
170
0
                // inbound request must be on our network since this is passthrough
171
0
                // rather than HBONE, which can be tunneled across networks through gateways.
172
0
                // by definition, without the gateway our source must be on our network.
173
0
                network: pi.cfg.network.as_str().into(),
174
0
                address: source_addr.ip(),
175
0
            };
176
0
            pi.state
177
0
                .fetch_workload_by_address(&network_addr_srcip)
178
0
                .await
179
        };
180
0
        let derived_source = metrics::DerivedWorkload {
181
0
            identity: rbac_ctx.conn.src_identity.clone(),
182
0
            ..Default::default()
183
0
        };
184
0
        let ds = proxy::guess_inbound_service(
185
0
            &rbac_ctx.conn,
186
0
            &None,
187
0
            upstream_services,
188
0
            &upstream_workload,
189
        );
190
0
        let result_tracker = Box::new(
191
0
            metrics::ConnectionResultBuilder::new(
192
0
                source_addr,
193
0
                dest_addr,
194
0
                None,
195
0
                start,
196
0
                metrics::ConnectionOpen {
197
0
                    reporter: Reporter::destination,
198
0
                    source: source_workload,
199
0
                    derived_source: Some(derived_source),
200
0
                    destination: Some(upstream_workload),
201
0
                    connection_security_policy: metrics::SecurityPolicy::unknown,
202
0
                    destination_service: ds,
203
0
                },
204
0
                pi.metrics.clone(),
205
            )
206
0
            .build(),
207
        );
208
209
0
        let mut conn_guard = match pi
210
0
            .connection_manager
211
0
            .assert_rbac(&pi.state, &rbac_ctx, None)
212
0
            .await
213
        {
214
0
            Ok(cg) => cg,
215
0
            Err(e) => {
216
0
                result_tracker
217
0
                    .record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied);
218
0
                return;
219
            }
220
        };
221
222
0
        let orig_src = if enable_orig_src {
223
0
            Some(source_addr.ip())
224
        } else {
225
0
            None
226
        };
227
228
0
        let send = async {
229
0
            trace!(%source_addr, %dest_addr, component="inbound plaintext", "connecting...");
230
231
0
            let outbound = super::freebind_connect(orig_src, dest_addr, pi.socket_factory.as_ref())
232
0
                .await
233
0
                .map_err(Error::ConnectionFailed)?;
234
235
0
            trace!(%source_addr, destination=%dest_addr, component="inbound plaintext", "connected");
236
0
            copy::copy_bidirectional(
237
0
                copy::TcpStreamSplitter(inbound_stream),
238
0
                copy::TcpStreamSplitter(outbound),
239
0
                &result_tracker,
240
0
            )
241
0
            .await
242
0
        };
243
244
0
        let res = handle_connection!(conn_guard, send);
245
0
        result_tracker.record(res);
246
0
    }
247
}