Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/proxy/inbound.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use futures_util::TryFutureExt;
16
use http::{Method, Response, StatusCode};
17
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
18
use std::sync::Arc;
19
use std::time::Instant;
20
use tls_listener::AsyncTls;
21
use tokio::sync::watch;
22
23
use tracing::{Instrument, debug, error, info, info_span, trace_span};
24
25
use super::{
26
    ConnectionResult, ConnectionResultBuilder, Error, HboneAddress, LocalWorkloadInformation,
27
    ResponseFlags, util,
28
};
29
use crate::baggage::{baggage_header_val, parse_baggage_header};
30
use crate::identity::Identity;
31
32
use crate::config::Config;
33
use crate::drain::DrainWatcher;
34
use crate::proxy::h2::server::{H2Request, RequestParts};
35
use crate::proxy::metrics::{ConnectionOpen, Reporter};
36
use crate::proxy::{
37
    BAGGAGE_HEADER, ProxyInputs, TRACEPARENT_HEADER, TraceParent, X_FORWARDED_NETWORK_HEADER,
38
    metrics,
39
};
40
use crate::rbac::Connection;
41
use crate::socket::to_canonical;
42
use crate::state::service::Service;
43
use crate::{assertions, copy, handle_connection, proxy, socket, strng, tls};
44
45
use crate::drain::run_with_drain;
46
use crate::proxy::h2;
47
use crate::state::workload::address::Address;
48
use crate::state::workload::application_tunnel::Protocol;
49
use crate::state::workload::{self, NetworkAddress, Workload};
50
use crate::state::{DemandProxyState, ProxyRbacContext};
51
use crate::strng::Strng;
52
use crate::tls::TlsError;
53
54
pub struct Inbound {
55
    listener: socket::Listener,
56
    drain: DrainWatcher,
57
    pi: Arc<ProxyInputs>,
58
    enable_orig_src: bool,
59
}
60
61
impl Inbound {
62
0
    pub(crate) async fn new(pi: Arc<ProxyInputs>, drain: DrainWatcher) -> Result<Inbound, Error> {
63
0
        let listener = pi
64
0
            .socket_factory
65
0
            .tcp_bind(pi.cfg.inbound_addr)
66
0
            .map_err(|e| Error::Bind(pi.cfg.inbound_addr, e))?;
67
0
        let enable_orig_src = super::maybe_set_transparent(&pi, &listener)?;
68
69
0
        info!(
70
0
            address=%listener.local_addr(),
71
            component="inbound",
72
            transparent=enable_orig_src,
73
0
            "listener established",
74
        );
75
0
        Ok(Inbound {
76
0
            listener,
77
0
            drain,
78
0
            pi,
79
0
            enable_orig_src,
80
0
        })
81
0
    }
82
83
    /// Returns the socket address this proxy is listening on.
84
0
    pub fn address(&self) -> SocketAddr {
85
0
        self.listener.local_addr()
86
0
    }
87
88
0
    pub async fn run(self) {
89
0
        let pi = self.pi.clone();
90
0
        let acceptor = InboundCertProvider {
91
0
            local_workload: self.pi.local_workload_information.clone(),
92
0
            crl_manager: self.pi.crl_manager.clone(),
93
0
        };
94
95
        // Safety: we set nodelay directly in tls_server, so it is safe to convert to a normal listener.
96
        // Although, that is *after* the TLS handshake; in theory we may get some benefits to setting it earlier.
97
98
0
        let accept = async move |drain: DrainWatcher, force_shutdown: watch::Receiver<()>| {
99
            loop {
100
0
                let (raw_socket, src) = match self.listener.accept().await {
101
0
                    Ok(raw_socket) => raw_socket,
102
0
                    Err(e) => {
103
0
                        if util::is_runtime_shutdown(&e) {
104
0
                            return;
105
0
                        }
106
0
                        error!("Failed TCP handshake {}", e);
107
0
                        continue;
108
                    }
109
                };
110
0
                let src = to_canonical(src);
111
0
                let start = Instant::now();
112
0
                let drain = drain.clone();
113
0
                let force_shutdown = force_shutdown.clone();
114
0
                let pi = self.pi.clone();
115
0
                let dst = to_canonical(raw_socket.local_addr().expect("local_addr available"));
116
0
                let network = pi.cfg.network.clone();
117
0
                let acceptor = crate::tls::InboundAcceptor::new(acceptor.clone());
118
119
0
                let socket_labels = metrics::SocketLabels {
120
0
                    reporter: Reporter::destination,
121
0
                };
122
0
                pi.metrics.record_socket_open(&socket_labels);
123
0
                let metrics_for_socket_close = pi.metrics.clone();
124
125
0
                let serve_client = async move {
126
0
                    let _socket_guard = metrics::SocketCloseGuard::new(
127
0
                        metrics_for_socket_close,
128
0
                        Reporter::destination,
129
                    );
130
0
                    let tls = match acceptor.accept(raw_socket).await {
131
0
                        Ok(tls) => tls,
132
0
                        Err(e) => {
133
0
                            metrics::log_early_deny(src, dst, Reporter::destination, e);
134
135
0
                            return Err::<(), _>(proxy::Error::SelfCall);
136
                        }
137
                    };
138
0
                    debug!(latency=?start.elapsed(), "accepted TLS connection");
139
0
                    let (_, ssl) = tls.get_ref();
140
0
                    let src_identity: Option<Identity> = tls::identity_from_connection(ssl);
141
0
                    let conn = Connection {
142
0
                        src_identity,
143
0
                        src,
144
0
                        dst_network: network.clone(), // inbound request must be on our network
145
0
                        dst,
146
0
                    };
147
0
                    debug!(%conn, "accepted connection");
148
0
                    let cfg = pi.cfg.clone();
149
0
                    let request_handler = move |req| {
150
0
                        let id = Self::extract_traceparent(&req);
151
0
                        let peer = conn.src;
152
0
                        let req_handler = Self::serve_connect(
153
0
                            pi.clone(),
154
0
                            conn.clone(),
155
0
                            self.enable_orig_src,
156
0
                            req,
157
                        )
158
0
                        .instrument(info_span!("inbound", %id, %peer));
159
                        // This is for each user connection, so most important to keep small
160
0
                        assertions::size_between_ref(1500, 2500, &req_handler);
161
0
                        req_handler
162
0
                    };
163
164
0
                    let serve_conn = h2::server::serve_connection(
165
0
                        cfg,
166
0
                        tls,
167
0
                        drain,
168
0
                        force_shutdown,
169
0
                        request_handler,
170
                    );
171
                    // This is per HBONE connection, so while would be nice to be small, at least it
172
                    // is pooled so typically fewer of these.
173
0
                    let serve = Box::pin(assertions::size_between(6000, 8000, serve_conn));
174
0
                    serve.await
175
0
                };
176
                // This is small since it only handles the TLS layer -- the HTTP2 layer is boxed
177
                // and measured above.
178
0
                assertions::size_between_ref(1000, 1600, &serve_client);
179
0
                tokio::task::spawn(serve_client.in_current_span());
180
            }
181
0
        };
Unexecuted instantiation: <ztunnel::proxy::inbound::Inbound>::run::{closure#0}::{closure#0}
Unexecuted instantiation: <ztunnel::proxy::inbound::Inbound>::run::{closure#0}::{closure#0}::{closure#0}::<_>
182
183
0
        run_with_drain(
184
0
            "inbound".to_string(),
185
0
            self.drain,
186
0
            pi.cfg.self_termination_deadline,
187
0
            accept,
188
0
        )
189
0
        .await
190
0
    }
191
192
0
    fn extract_traceparent(req: &H2Request) -> TraceParent {
193
0
        req.headers()
194
0
            .get(TRACEPARENT_HEADER)
195
0
            .and_then(|b| b.to_str().ok())
196
0
            .and_then(|b| TraceParent::try_from(b).ok())
197
0
            .unwrap_or_else(TraceParent::new)
198
0
    }
199
200
    /// serve_connect handles a single connection from a client.
201
    #[allow(clippy::too_many_arguments)]
202
0
    async fn serve_connect(
203
0
        pi: Arc<ProxyInputs>,
204
0
        conn: Connection,
205
0
        enable_original_source: bool,
206
0
        req: H2Request,
207
0
    ) {
208
0
        let src = conn.src;
209
0
        let dst = conn.dst;
210
211
0
        debug!(%conn, ?req, "received request");
212
213
        // In order to ensure we properly handle all errors, we split up serving inbound request into a few
214
        // phases.
215
216
        // Initial phase, build up context about the request.
217
0
        let ri = match Self::build_inbound_request(&pi, conn, req.get_request()).await {
218
0
            Ok(i) => i,
219
0
            Err(InboundError(e, code)) => {
220
                // At this point in processing, we never built up full context to log a complete access log.
221
                // Instead, just log a minimal error line.
222
0
                metrics::log_early_deny(src, dst, Reporter::destination, e);
223
0
                if let Err(err) =
224
0
                    req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage))
225
                {
226
0
                    tracing::warn!("failed to send HTTP response: {err}");
227
0
                }
228
0
                return;
229
            }
230
        };
231
232
        // Now we have enough context to properly report logs and metrics. Group everything else that
233
        // can fail before we send the OK response here.
234
0
        let rx = async {
235
            // Define a connection guard to ensure rbac conditions are maintained for the duration of the connection
236
0
            let conn_guard = pi
237
0
                .connection_manager
238
0
                .assert_rbac(&pi.state, &ri.rbac_ctx, ri.for_host)
239
0
                .await
240
0
                .map_err(InboundFlagError::build(
241
                    StatusCode::UNAUTHORIZED,
242
0
                    ResponseFlags::AuthorizationPolicyDenied,
243
0
                ))?;
244
245
            // app tunnels should only bind to localhost to prevent
246
            // being accessed without going through ztunnel
247
0
            let localhost_tunnel = pi.cfg.localhost_app_tunnel
248
0
                && ri
249
0
                    .tunnel_request
250
0
                    .as_ref()
251
0
                    .map(|tr| tr.protocol.supports_localhost_send())
252
0
                    .unwrap_or(false);
253
0
            let (src, dst) = if localhost_tunnel {
254
                // guess the family based on the destination address
255
0
                let loopback = match ri.upstream_addr {
256
0
                    SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST),
257
0
                    SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST),
258
                };
259
260
                // we must bind the src to be localhost when sending to localhost,
261
                // or various components could break traffic (RPF, iptables, ip route)
262
                // the original source is preserved within PROXY protocol
263
0
                (
264
0
                    Some(loopback),
265
0
                    SocketAddr::new(loopback, ri.upstream_addr.port()),
266
0
                )
267
            } else {
268
                // When ztunnel is proxying to its own internal endpoints (metrics server after HBONE termination),
269
                // we must not attempt to use the original external client's IP as the source for this internal connection.
270
                // Setting `disable_inbound_freebind` to true for such self-proxy scenarios ensures `upstream_src_ip` is `None`,
271
                // causing `freebind_connect` to use a local IP for the connection to ztunnel's own service.
272
                // For regular inbound traffic to other workloads, `disable_inbound_freebind` is false, and original source
273
                // preservation depends on `enable_original_source`.
274
0
                let upstream_src_ip = if pi.disable_inbound_freebind {
275
0
                    None
276
                } else {
277
0
                    enable_original_source.then_some(ri.rbac_ctx.conn.src.ip())
278
                };
279
0
                (upstream_src_ip, ri.upstream_addr)
280
            };
281
282
            // Establish upstream connection between original source and destination
283
            // We are allowing a bind to the original source address locally even if the ip address isn't on this node.
284
0
            let stream = super::freebind_connect(src, dst, pi.socket_factory.as_ref())
285
0
                .await
286
0
                .map_err(Error::ConnectionFailed)
287
0
                .map_err(InboundFlagError::build(
288
                    StatusCode::SERVICE_UNAVAILABLE,
289
0
                    ResponseFlags::ConnectionFailure,
290
0
                ))?;
291
0
            debug!("connected to: {}", ri.upstream_addr);
292
0
            Ok((conn_guard, stream))
293
0
        };
294
        // Wait on establishing the upstream connection and connection guard before sending the 200 response to the client
295
0
        let (mut conn_guard, mut stream) = match rx.await {
296
0
            Ok(res) => res,
297
0
            Err(InboundFlagError(err, flag, code)) => {
298
0
                ri.result_tracker.record_with_flag(Err(err), flag);
299
0
                if let Err(err) =
300
0
                    req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage))
301
                {
302
0
                    tracing::warn!("failed to send HTTP response: {err}");
303
0
                }
304
0
                return;
305
            }
306
        };
307
308
        // At this point, we established the upstream connection and need to send a 200 back to the client.
309
        // we may still have failures at this point during the proxying, but we don't need to send these
310
        // at the HTTP layer.
311
        // Send a 200 back to the client and start forwarding traffic.
312
        //
313
        // If requested, we may start the stream with a PROXY protocol header. This ensures
314
        // that the server has all of the necessary information about the connection regardless of the protocol
315
        // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information about the
316
        // proxy protocol.
317
0
        let send = req
318
0
            .send_response(build_response(
319
                StatusCode::OK,
320
0
                Some(ri.destination_workload.as_ref()),
321
0
                pi.cfg.enable_enhanced_baggage,
322
            ))
323
0
            .and_then(|h2_stream| async {
324
                if let Some(TunnelRequest {
325
                    protocol: Protocol::PROXY,
326
0
                    tunnel_target,
327
0
                }) = ri.tunnel_request
328
                {
329
                    let Connection {
330
0
                        src, src_identity, ..
331
                    } = ri.rbac_ctx.conn;
332
0
                    super::write_proxy_protocol(&mut stream, (src, tunnel_target), src_identity)
333
0
                        .instrument(trace_span!("proxy protocol"))
334
0
                        .await?;
335
0
                }
336
0
                copy::copy_bidirectional(
337
0
                    h2_stream,
338
0
                    copy::TcpStreamSplitter(stream),
339
0
                    &ri.result_tracker,
340
                )
341
0
                .instrument(trace_span!("hbone server"))
342
0
                .await
343
0
            });
344
0
        let res = handle_connection!(conn_guard, send);
345
0
        ri.result_tracker.record(res);
346
0
    }
347
348
    // build_inbound_request builds up the context for an inbound request.
349
0
    async fn build_inbound_request<T: RequestParts>(
350
0
        pi: &Arc<ProxyInputs>,
351
0
        conn: Connection,
352
0
        req: &T,
353
0
    ) -> Result<InboundRequest, InboundError> {
354
0
        if req.method() != Method::CONNECT {
355
0
            let e = Error::NonConnectMethod(req.method().to_string());
356
0
            return Err(InboundError(e, StatusCode::BAD_REQUEST));
357
0
        }
358
359
0
        let start = Instant::now();
360
361
        // Extract the host or IP from the authority pseudo-header of the URI
362
0
        let hbone_addr: HboneAddress = req
363
0
            .uri()
364
0
            .try_into()
365
0
            .map_err(InboundError::build(StatusCode::BAD_REQUEST))?;
366
367
        // Get the destination workload information of the destination pods (wds) workload (not destination ztunnel)
368
0
        let destination_workload = pi
369
0
            .local_workload_information
370
0
            .get_workload()
371
0
            .await
372
            // At this point we already fetched the local workload for TLS, so it should be infallible.
373
0
            .map_err(InboundError::build(StatusCode::SERVICE_UNAVAILABLE))?;
374
375
        // Check the request is allowed by verifying the destination
376
0
        Self::validate_destination(&pi.state, &conn, &destination_workload, &hbone_addr)
377
0
            .await
378
0
            .map_err(InboundError::build(StatusCode::BAD_REQUEST))?;
379
380
        // Determine the next hop.
381
0
        let (upstream_addr, tunnel_request, upstream_service) = Self::find_inbound_upstream(
382
0
            &pi.cfg,
383
0
            &pi.state,
384
0
            &conn,
385
0
            &destination_workload,
386
0
            &hbone_addr,
387
        )
388
0
        .map_err(InboundError::build(StatusCode::SERVICE_UNAVAILABLE))?;
389
390
0
        let original_dst = conn.dst;
391
        // Connection has 15008, swap with the real port
392
0
        let conn = Connection {
393
0
            dst: upstream_addr,
394
0
            ..conn
395
0
        };
396
397
0
        let rbac_ctx = ProxyRbacContext {
398
0
            conn,
399
0
            dest_workload: destination_workload.clone(),
400
0
        };
401
402
0
        let for_host = parse_forwarded_host(req);
403
0
        let baggage = if pi.cfg.enable_enhanced_baggage {
404
0
            parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default()
405
        } else {
406
0
            Default::default()
407
        };
408
409
        // We assume it is from gateway if it's a hostname request.
410
        // We may need a more explicit indicator in the future.
411
        // Note: previously this attempted to check that the src identity was equal to the Gateway;
412
        // this check is broken as the gateway only forwards an HBONE request, it doesn't initiate it itself.
413
0
        let from_gateway = req
414
0
            .headers()
415
0
            .get(X_FORWARDED_NETWORK_HEADER)
416
0
            .and_then(|h| h.to_str().ok())
417
0
            .map(|s| !s.eq_ignore_ascii_case(&pi.cfg.network)) // If the network is different, it's from a gateway
418
0
            .unwrap_or(false);
419
420
0
        if from_gateway {
421
0
            debug!("request from gateway");
422
0
        }
423
0
        let source = match from_gateway {
424
            // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515.
425
            // Instead, we will use baggage
426
0
            true => None,
427
            false => {
428
0
                let src_network_addr = NetworkAddress {
429
0
                    // we can assume source network is our network because we did not traverse a gateway
430
0
                    network: rbac_ctx.conn.dst_network.clone(),
431
0
                    address: rbac_ctx.conn.src.ip(),
432
0
                };
433
                // Find source info. We can lookup by XDS or from connection attributes
434
0
                pi.state.fetch_workload_by_address(&src_network_addr).await
435
            }
436
        };
437
438
0
        let derived_source = if pi.cfg.enable_enhanced_baggage {
439
0
            metrics::DerivedWorkload {
440
0
                identity: rbac_ctx.conn.src_identity.clone(),
441
0
                cluster_id: baggage.cluster_id,
442
0
                region: baggage.region,
443
0
                zone: baggage.zone,
444
0
                namespace: baggage.namespace,
445
0
                app: baggage.service_name,
446
0
                workload_name: baggage.workload_name,
447
0
                revision: baggage.revision,
448
0
            }
449
        } else {
450
0
            metrics::DerivedWorkload {
451
0
                identity: rbac_ctx.conn.src_identity.clone(),
452
0
                ..Default::default()
453
0
            }
454
        };
455
0
        let ds = proxy::guess_inbound_service(
456
0
            &rbac_ctx.conn,
457
0
            &for_host,
458
0
            upstream_service,
459
0
            &destination_workload,
460
        );
461
0
        let connection_result_builder = ConnectionResultBuilder::new(
462
0
            rbac_ctx.conn.src,
463
            // For consistency with outbound logs, report the original destination (with 15008 port)
464
            // as dst.addr, and the target address as dst.hbone_addr
465
0
            original_dst,
466
0
            Some(hbone_addr.clone()),
467
0
            start,
468
0
            ConnectionOpen {
469
0
                reporter: Reporter::destination,
470
0
                source,
471
0
                derived_source: Some(derived_source),
472
0
                destination: Some(destination_workload.clone()),
473
0
                connection_security_policy: metrics::SecurityPolicy::mutual_tls,
474
0
                destination_service: ds,
475
0
            },
476
0
            pi.metrics.clone(),
477
        );
478
479
0
        let result_tracker = Box::new(connection_result_builder.build());
480
0
        Ok(InboundRequest {
481
0
            for_host,
482
0
            rbac_ctx,
483
0
            result_tracker,
484
0
            upstream_addr,
485
0
            tunnel_request,
486
0
            destination_workload,
487
0
        })
488
0
    }
489
490
    // Selects a service by hostname without the explicit knowledge of the namespace
491
    // There is no explicit mapping from hostname to namespace (e.g. foo.com)
492
0
    fn find_service_by_hostname(
493
0
        state: &DemandProxyState,
494
0
        local_workload: &Workload,
495
0
        hbone_host: &Strng,
496
0
    ) -> Result<Arc<Service>, Error> {
497
0
        state
498
0
            .read()
499
0
            .services
500
0
            .get_best_by_host(hbone_host, Some(&local_workload.namespace))
501
0
            .ok_or_else(|| Error::NoHostname(hbone_host.to_string()))
502
0
    }
503
504
    /// validate_destination ensures the destination is an allowed request.
505
0
    async fn validate_destination(
506
0
        state: &DemandProxyState,
507
0
        conn: &Connection,
508
0
        local_workload: &Workload,
509
0
        hbone_addr: &HboneAddress,
510
0
    ) -> Result<(), Error> {
511
0
        let HboneAddress::SocketAddr(hbone_addr) = hbone_addr else {
512
            // This is a hostname - it is valid. We may not find the hostname, at which point we will fail later
513
0
            return Ok(());
514
        };
515
0
        if conn.dst.ip() == hbone_addr.ip() {
516
            // Normal case: both are aligned. This is allowed (we really only need the HBONE address for the port.)
517
0
            return Ok(());
518
0
        }
519
0
        if local_workload.application_tunnel.is_some() {
520
            // In the case they have their own tunnel, they will get the HBONE target address in the PROXY
521
            // header, and their application can decide what to do with it; we don't validate this.
522
            // This is the case, for instance, with a waypoint using PROXY.
523
0
            return Ok(());
524
0
        }
525
        // There still may be the case where we are doing a "waypoint sandwich" but not using any tunnel.
526
        // Presumably, the waypoint is only matching on L7 attributes.
527
        // We want to make sure in this case we don't deny the requests just because the HBONE destination
528
        // mismatches (though we will, essentially, ignore the address).
529
        // To do this, we do a lookup to see if the HBONE target has us as its waypoint.
530
0
        let hbone_dst = &NetworkAddress {
531
0
            network: conn.dst_network.clone(),
532
0
            address: hbone_addr.ip(),
533
0
        };
534
535
        // None means we need to do on-demand lookup
536
0
        let lookup_is_destination_this_waypoint = || -> Option<bool> {
537
0
            let state = state.read();
538
539
            // TODO Allow HBONE address to be a hostname. We have to respect rules about
540
            // hostname scoping. Can we use the client's namespace here to do that?
541
0
            let hbone_target = state.find_address(hbone_dst)?;
542
543
            // HBONE target can point to some service or workload. In either case, get the waypoint
544
0
            let Some(target_waypoint) = (match hbone_target {
545
0
                Address::Service(ref svc) => &svc.waypoint,
546
0
                Address::Workload(ref wl) => &wl.waypoint,
547
            }) else {
548
                // Target has no waypoint
549
0
                return Some(false);
550
            };
551
552
            // Resolve the reference from our HBONE target
553
0
            let Some(target_waypoint) = state.find_destination(&target_waypoint.destination) else {
554
0
                return Some(false);
555
            };
556
557
            // Validate that the HBONE target references the Waypoint we're connecting to
558
0
            Some(match target_waypoint {
559
0
                Address::Service(svc) => svc.contains_endpoint(local_workload),
560
0
                Address::Workload(wl) => wl.workload_ips.contains(&conn.dst.ip()),
561
            })
562
0
        };
563
564
0
        let res = match lookup_is_destination_this_waypoint() {
565
0
            Some(r) => Some(r),
566
            None => {
567
0
                if !state.supports_on_demand() {
568
0
                    None
569
                } else {
570
0
                    state
571
0
                        .fetch_on_demand(strng::new(hbone_dst.to_string()))
572
0
                        .await;
573
0
                    lookup_is_destination_this_waypoint()
574
                }
575
            }
576
        };
577
578
0
        if res.is_none() || res == Some(false) {
579
0
            return Err(Error::IPMismatch(conn.dst.ip(), hbone_addr.ip()));
580
0
        }
581
0
        Ok(())
582
0
    }
583
584
    /// find_inbound_upstream determines the next hop for an inbound request.
585
    #[expect(clippy::type_complexity)]
586
0
    pub(super) fn find_inbound_upstream(
587
0
        cfg: &Config,
588
0
        state: &DemandProxyState,
589
0
        conn: &Connection,
590
0
        local_workload: &Workload,
591
0
        hbone_addr: &HboneAddress,
592
0
    ) -> Result<(SocketAddr, Option<TunnelRequest>, Vec<Arc<Service>>), Error> {
593
        // We always target the local workload IP as the destination. But we need to determine the port to send to.
594
0
        let target_ip = conn.dst.ip();
595
596
        // First, fetch the actual target SocketAddr as well as all possible services this could be for.
597
        // Given they may request the pod directly, there may be multiple possible services; we will
598
        // select a final one (if any) later.
599
0
        let (dest, services) = match hbone_addr {
600
0
            HboneAddress::SvcHostname(hostname, service_port) => {
601
                // Request is to a hostname. This must be a service.
602
                // We know the destination IP already (since this is inbound, we just need to forward it),
603
                // but will need to resolve the port from service port to target port.
604
0
                let svc = Self::find_service_by_hostname(state, local_workload, hostname)?;
605
606
0
                let endpoint_port = svc
607
0
                    .endpoints
608
0
                    .get(&local_workload.uid)
609
0
                    .and_then(|ep| ep.port.get(service_port));
610
                // If we can get the port from the endpoint, that is ideal. But we may not, which is fine
611
                // if the service has a number target port (rather than named).
612
0
                let port = if let Some(&ep_port) = endpoint_port {
613
0
                    ep_port
614
                } else {
615
0
                    let service_target_port =
616
0
                        svc.ports.get(service_port).copied().unwrap_or_default();
617
0
                    if service_target_port == 0 {
618
0
                        return Err(Error::NoPortForServices(
619
0
                            hostname.to_string(),
620
0
                            *service_port,
621
0
                        ));
622
0
                    }
623
0
                    service_target_port
624
                };
625
0
                (SocketAddr::new(target_ip, port), vec![svc])
626
            }
627
0
            HboneAddress::SocketAddr(hbone_addr) => (
628
0
                SocketAddr::new(target_ip, hbone_addr.port()),
629
0
                state.get_services_by_workload(local_workload),
630
0
            ),
631
        };
632
633
        // Check for illegal calls now that we have resolved to the final destination.
634
        // We need to do this here, rather than `validate_destination`, since the former doesn't
635
        // have access to the resolved service port.
636
0
        if cfg.illegal_ports.contains(&dest.port()) {
637
0
            return Err(Error::SelfCall);
638
0
        }
639
640
        // Application tunnel may override the port.
641
0
        let (target, tunnel) = match local_workload.application_tunnel.clone() {
642
0
            Some(workload::ApplicationTunnel { port, protocol }) => {
643
                // We may need to override the target port. For instance, we may send all PROXY
644
                // traffic over a dedicated port like 15088.
645
0
                let new_target = SocketAddr::new(dest.ip(), port.unwrap_or(dest.port()));
646
                // Note: the logic to decide which destination address to set inside the PROXY headers
647
                // is handled outside of this call. This just determines that location we actually send the
648
                // connection to.
649
650
                // Which address we will send in the tunnel
651
0
                let tunnel_target = match hbone_addr {
652
0
                    HboneAddress::SvcHostname(h, port) => {
653
                        // PROXY cannot currently send to hostnames, so we will need to select an IP to
654
                        // use instead
655
                        // We ensure a service is set above.
656
0
                        let vip = services
657
0
                            .first()
658
0
                            .expect("service must exist")
659
0
                            .vips
660
0
                            .iter()
661
0
                            .max_by_key(|a| match a.network == conn.dst_network {
662
                                true => {
663
                                    // Defer to IPv4 if present
664
0
                                    match a.address.is_ipv4() {
665
0
                                        true => 2,
666
0
                                        false => 1,
667
                                    }
668
                                }
669
0
                                false => 0,
670
0
                            })
671
0
                            .ok_or_else(|| Error::NoIPForService(h.to_string()))?;
672
0
                        SocketAddr::new(vip.address, *port)
673
                    }
674
0
                    HboneAddress::SocketAddr(s) => *s,
675
                };
676
0
                (
677
0
                    new_target,
678
0
                    Some(TunnelRequest {
679
0
                        tunnel_target,
680
0
                        protocol,
681
0
                    }),
682
0
                )
683
            }
684
0
            None => (dest, None),
685
        };
686
0
        Ok((target, tunnel, services))
687
0
    }
688
}
689
690
#[derive(Debug)]
691
pub(super) struct TunnelRequest {
692
    tunnel_target: SocketAddr,
693
    protocol: Protocol,
694
}
695
696
#[derive(Debug)]
697
struct InboundRequest {
698
    for_host: Option<String>,
699
    rbac_ctx: ProxyRbacContext,
700
    result_tracker: Box<ConnectionResult>,
701
    upstream_addr: SocketAddr,
702
    tunnel_request: Option<TunnelRequest>,
703
    destination_workload: Arc<Workload>,
704
}
705
706
/// InboundError represents an error with an associated status code.
707
#[derive(Debug)]
708
struct InboundError(Error, StatusCode);
709
impl InboundError {
710
0
    pub fn build(code: StatusCode) -> impl Fn(Error) -> Self {
711
0
        move |err| InboundError(err, code)
712
0
    }
713
}
714
715
struct InboundFlagError(Error, ResponseFlags, StatusCode);
716
impl InboundFlagError {
717
0
    pub fn build(code: StatusCode, flag: ResponseFlags) -> impl Fn(Error) -> Self {
718
0
        move |err| InboundFlagError(err, flag, code)
719
0
    }
720
}
721
722
#[derive(Clone)]
723
struct InboundCertProvider {
724
    local_workload: Arc<LocalWorkloadInformation>,
725
    crl_manager: Option<Arc<tls::crl::CrlManager>>,
726
}
727
728
#[async_trait::async_trait]
729
impl crate::tls::ServerCertProvider for InboundCertProvider {
730
0
    async fn fetch_cert(&mut self) -> Result<Arc<rustls::ServerConfig>, TlsError> {
731
0
        debug!(
732
0
            identity=%self.local_workload.workload_info(),
733
0
            "fetching cert"
734
        );
735
0
        let cert = self.local_workload.fetch_certificate().await?;
736
0
        Ok(Arc::new(cert.server_config(self.crl_manager.clone())?))
737
0
    }
Unexecuted instantiation: <ztunnel::proxy::inbound::InboundCertProvider as ztunnel::tls::lib::ServerCertProvider>::fetch_cert::{closure#0}
Unexecuted instantiation: <ztunnel::proxy::inbound::InboundCertProvider as ztunnel::tls::lib::ServerCertProvider>::fetch_cert
738
}
739
740
0
pub fn parse_forwarded_host<T: RequestParts>(req: &T) -> Option<String> {
741
0
    req.headers()
742
0
        .get(http::header::FORWARDED)
743
0
        .and_then(|rh| rh.to_str().ok())
744
0
        .and_then(proxy::parse_forwarded_host)
745
0
}
746
747
// Second argument is local workload and cluster name
748
0
fn build_response(
749
0
    status: StatusCode,
750
0
    local_wl: Option<&Workload>,
751
0
    enable_response_baggage: bool,
752
0
) -> Response<()> {
753
0
    let mut builder = Response::builder().status(status);
754
755
0
    if let Some(local_wl) = local_wl
756
0
        && enable_response_baggage
757
    {
758
0
        builder = builder.header(
759
0
            BAGGAGE_HEADER,
760
0
            baggage_header_val(&local_wl.baggage(), &local_wl.workload_type),
761
0
        )
762
0
    }
763
764
0
    builder
765
0
        .body(())
766
0
        .expect("builder with known status code should not fail")
767
0
}
768
769
#[cfg(test)]
770
#[allow(clippy::too_many_arguments)]
771
mod tests {
772
    use super::{Inbound, ProxyInputs};
773
    use crate::{
774
        config,
775
        identity::manager::mock::new_secret_manager,
776
        proxy::{
777
            ConnectionManager, DefaultSocketFactory, LocalWorkloadInformation,
778
            h2::server::RequestParts, inbound::HboneAddress,
779
        },
780
        rbac::Connection,
781
        state::{
782
            self, DemandProxyState, WorkloadInfo,
783
            service::{Endpoint, EndpointSet, Service},
784
            workload::{
785
                ApplicationTunnel, GatewayAddress, HealthStatus, InboundProtocol, NetworkAddress,
786
                NetworkMode, Workload, application_tunnel::Protocol as AppProtocol,
787
                gatewayaddress::Destination,
788
            },
789
        },
790
        strng, test_helpers,
791
    };
792
    use hickory_resolver::config::{ResolverConfig, ResolverOpts};
793
    use http::{Method, Uri};
794
    use prometheus_client::registry::Registry;
795
    use std::{
796
        net::SocketAddr,
797
        sync::{Arc, RwLock},
798
        time::Duration,
799
    };
800
    use test_case::test_case;
801
802
    const CLIENT_POD_IP: &str = "10.0.0.1";
803
804
    const SERVER_POD_IP: &str = "10.0.0.2";
805
    const SERVER_SVC_IP: &str = "10.10.0.1";
806
807
    const SERVER_POD_HOSTNAME: &str = "server.default.svc.cluster.local";
808
809
    const WAYPOINT_POD_IP: &str = "10.0.0.3";
810
    const WAYPOINT_SVC_IP: &str = "10.10.0.2";
811
812
    const SERVER_PORT: u16 = 80;
813
    const TARGET_PORT: u16 = 8080;
814
    const PROXY_PORT: u16 = 15088;
815
816
    const APP_TUNNEL_PROXY: Option<ApplicationTunnel> = Some(ApplicationTunnel {
817
        port: Some(PROXY_PORT),
818
        protocol: AppProtocol::PROXY,
819
    });
820
821
    struct MockParts {
822
        method: Method,
823
        uri: Uri,
824
        headers: http::HeaderMap<http::HeaderValue>,
825
    }
826
827
    impl RequestParts for MockParts {
828
        fn uri(&self) -> &http::Uri {
829
            &self.uri
830
        }
831
832
        fn method(&self) -> &http::Method {
833
            &self.method
834
        }
835
836
        fn headers(&self) -> &http::HeaderMap<http::HeaderValue> {
837
            &self.headers
838
        }
839
    }
840
841
    // Regular zTunnel workload traffic inbound
842
    #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_IP, Some((SERVER_POD_IP, TARGET_PORT)); "to workload no waypoint")]
843
    // Svc hostname
844
    #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_HOSTNAME, Some((SERVER_POD_IP, TARGET_PORT)); "svc hostname to workload no waypoint")]
845
    // Sandwiched Waypoint Cases
846
    // to workload traffic
847
    #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to workload with waypoint referenced by pod")]
848
    #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to workload with waypoint referenced by vip")]
849
    #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, PROXY_PORT)); "to workload with app tunnel")]
850
    // to service traffic
851
    #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to service with waypoint referenced by pod")]
852
    #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to service with waypint referenced by vip")]
853
    #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, PROXY_PORT)); "to service with app tunnel")]
854
    // Override port via app_protocol
855
    // Error cases
856
    #[test_case(Waypoint::None, SERVER_POD_IP, CLIENT_POD_IP, None; "to server ip mismatch" )]
857
    #[test_case(Waypoint::None, WAYPOINT_POD_IP, CLIENT_POD_IP, None; "to waypoint without attachment" )]
858
    #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , None; "to workload via waypoint with wrong attachment")]
859
    #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , None; "to service via waypoint with wrong attachment")]
860
    #[tokio::test]
861
    async fn test_find_inbound_upstream(
862
        target_waypoint: Waypoint<'_>,
863
        connection_dst: &str,
864
        hbone_dst: &str,
865
        want: Option<(&str, u16)>,
866
    ) {
867
        let state = test_state(target_waypoint).expect("state setup");
868
        let cfg = config::parse_config().unwrap();
869
        let conn = Connection {
870
            src_identity: None,
871
            src: format!("{CLIENT_POD_IP}:1234").parse().unwrap(),
872
            dst_network: "".into(),
873
            dst: format!("{connection_dst}:15008").parse().unwrap(),
874
        };
875
        let local_wl = state
876
            .fetch_workload_by_address(&NetworkAddress {
877
                network: "".into(),
878
                address: conn.dst.ip(),
879
            })
880
            .await
881
            .unwrap();
882
        let hbone_addr =
883
            if let Ok(addr) = format!("{hbone_dst}:{TARGET_PORT}").parse::<SocketAddr>() {
884
                HboneAddress::SocketAddr(addr)
885
            } else {
886
                HboneAddress::SvcHostname(hbone_dst.into(), SERVER_PORT)
887
            };
888
889
        let validate_destination =
890
            Inbound::validate_destination(&state, &conn, &local_wl, &hbone_addr).await;
891
        let res = Inbound::find_inbound_upstream(&cfg, &state, &conn, &local_wl, &hbone_addr);
892
893
        match want {
894
            Some((ip, port)) => {
895
                let got_addr = res.expect("no error").0;
896
                assert_eq!(got_addr, SocketAddr::new(ip.parse().unwrap(), port));
897
            }
898
            None => {
899
                validate_destination.expect_err("did not find upstream");
900
            }
901
        }
902
    }
903
904
    // Regular zTunnel workload traffic inbound
905
    #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((SERVER_POD_IP, TARGET_PORT, None)); "to workload no waypoint")]
906
    // Svc hostname
907
    #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_HOSTNAME, SERVER_PORT, Some((SERVER_POD_IP, TARGET_PORT, None)); "svc hostname to workload no waypoint")]
908
    // Sandwiched Waypoint Cases
909
    // to workload traffic
910
    #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to workload with waypoint referenced by pod")]
911
    #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to workload with waypoint referenced by vip")]
912
    #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, PROXY_PORT, Some(SERVER_POD_IP))); "to workload with app tunnel")]
913
    // to service traffic
914
    #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to service with waypoint referenced by pod")]
915
    #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to service with waypint referenced by vip")]
916
    #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, PROXY_PORT, Some(SERVER_SVC_IP))); "to service with app tunnel")]
917
    // Override port via app_protocol
918
    // Error cases
919
    #[test_case(Waypoint::None, SERVER_POD_IP, CLIENT_POD_IP, TARGET_PORT, None; "to server ip mismatch" )]
920
    #[test_case(Waypoint::None, WAYPOINT_POD_IP, CLIENT_POD_IP, TARGET_PORT, None; "to waypoint without attachment" )]
921
    #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, None; "to workload via waypoint with wrong attachment")]
922
    #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, None; "to service via waypoint with wrong attachment")]
923
    #[tokio::test]
924
    async fn test_build_inbound_request(
925
        target_waypoint: Waypoint<'_>,
926
        connection_dst: &str,
927
        hbone_dst: &str,
928
        hbobe_dst_port: u16,
929
        want: Option<(&str, u16, Option<&str>)>,
930
    ) {
931
        let state = test_state(target_waypoint).expect("state setup");
932
        let cfg = config::parse_config().unwrap();
933
        let conn = Connection {
934
            src_identity: None,
935
            src: format!("{CLIENT_POD_IP}:1234").parse().unwrap(),
936
            dst_network: "".into(),
937
            dst: format!("{connection_dst}:15008").parse().unwrap(),
938
        };
939
        let request_parts = MockParts {
940
            method: Method::CONNECT,
941
            uri: format!("{hbone_dst}:{hbobe_dst_port}").parse().unwrap(),
942
            headers: http::HeaderMap::new(),
943
        };
944
        let cm = ConnectionManager::default();
945
        let metrics = Arc::new(crate::proxy::Metrics::new(&mut Registry::default()));
946
        let sf = Arc::new(DefaultSocketFactory::default());
947
        let wl = state
948
            .fetch_workload_by_address(&NetworkAddress {
949
                network: "".into(),
950
                address: conn.dst.ip(),
951
            })
952
            .await
953
            .unwrap();
954
        let local_workload = Arc::new(LocalWorkloadInformation::new(
955
            Arc::new(WorkloadInfo {
956
                name: wl.name.to_string(),
957
                namespace: wl.namespace.to_string(),
958
                service_account: wl.service_account.to_string(),
959
            }),
960
            state.clone(),
961
            new_secret_manager(Duration::from_secs(10)),
962
        ));
963
        let pi = Arc::new(ProxyInputs::new(
964
            Arc::new(cfg),
965
            cm,
966
            state.clone(),
967
            metrics.clone(),
968
            sf,
969
            None,
970
            local_workload,
971
            false,
972
            None,
973
        ));
974
        let inbound_request = Inbound::build_inbound_request(&pi, conn, &request_parts).await;
975
        match want {
976
            Some((ip, port, protocol_addr)) => {
977
                let ir = inbound_request.unwrap();
978
                assert_eq!(ir.upstream_addr, SocketAddr::new(ip.parse().unwrap(), port));
979
                match ir.tunnel_request {
980
                    Some(addr) => assert_eq!(
981
                        addr.tunnel_target,
982
                        SocketAddr::new(protocol_addr.unwrap().parse().unwrap(), hbobe_dst_port)
983
                    ),
984
                    None => assert_eq!(protocol_addr, None),
985
                };
986
            }
987
            None => {
988
                inbound_request.expect_err("could not build inbound request");
989
            }
990
        }
991
    }
992
993
    // Creates a test state for the `DemandProxyState` with predefined services and workloads.
994
    // server_waypoint specifies the waypoint configuration for the server.
995
    fn test_state(server_waypoint: Waypoint) -> anyhow::Result<state::DemandProxyState> {
996
        let mut state = state::ProxyState::new(None);
997
998
        let services = vec![
999
            ("waypoint", WAYPOINT_SVC_IP, Waypoint::None),
1000
            ("server", SERVER_SVC_IP, server_waypoint.clone()),
1001
        ]
1002
        .into_iter()
1003
        .map(|(name, vip, waypoint)| Service {
1004
            name: name.into(),
1005
            namespace: "default".into(),
1006
            hostname: strng::format!("{name}.default.svc.cluster.local"),
1007
            vips: vec![NetworkAddress {
1008
                address: vip.parse().unwrap(),
1009
                network: "".into(),
1010
            }],
1011
            ports: std::collections::HashMap::from([(80u16, 8080u16)]),
1012
            endpoints: EndpointSet::from_list([Endpoint {
1013
                workload_uid: strng::format!("cluster1//v1/Pod/default/{name}"),
1014
                port: std::collections::HashMap::new(),
1015
                status: HealthStatus::Healthy,
1016
            }]),
1017
            subject_alt_names: vec![strng::format!("{name}.default.svc.cluster.local")],
1018
            waypoint: waypoint.service_attached(),
1019
            load_balancer: None,
1020
            ip_families: None,
1021
            canonical: true,
1022
        });
1023
1024
        let workloads = vec![
1025
            (
1026
                "waypoint",
1027
                WAYPOINT_POD_IP,
1028
                Waypoint::None,
1029
                server_waypoint.app_tunnel(),
1030
            ),
1031
            ("client", CLIENT_POD_IP, Waypoint::None, None),
1032
            ("server", SERVER_POD_IP, server_waypoint, None),
1033
        ]
1034
        .into_iter()
1035
        .map(|(name, ip, waypoint, app_tunnel)| Workload {
1036
            workload_ips: vec![ip.parse().unwrap()],
1037
            waypoint: waypoint.workload_attached(),
1038
            protocol: InboundProtocol::HBONE,
1039
            uid: strng::format!("cluster1//v1/Pod/default/{name}"),
1040
            name: strng::format!("workload-{name}"),
1041
            namespace: "default".into(),
1042
            service_account: strng::format!("service-account-{name}"),
1043
            application_tunnel: app_tunnel,
1044
            network_mode: NetworkMode::Standard,
1045
            ..test_helpers::test_default_workload()
1046
        });
1047
1048
        for svc in services {
1049
            state.services.insert(svc);
1050
        }
1051
        for wl in workloads {
1052
            state.workloads.insert(Arc::new(wl));
1053
        }
1054
1055
        let mut registry = Registry::default();
1056
        let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
1057
        Ok(DemandProxyState::new(
1058
            Arc::new(RwLock::new(state)),
1059
            None,
1060
            ResolverConfig::default(),
1061
            ResolverOpts::default(),
1062
            metrics,
1063
        ))
1064
    }
1065
1066
    // tells the test if we're using workload-attached or svc-attached waypoints
1067
    #[derive(Clone)]
1068
    enum Waypoint<'a> {
1069
        None,
1070
        Service(&'a str, Option<ApplicationTunnel>),
1071
        Workload(&'a str, Option<ApplicationTunnel>),
1072
    }
1073
1074
    impl Waypoint<'_> {
1075
        fn app_tunnel(&self) -> Option<ApplicationTunnel> {
1076
            match self.clone() {
1077
                Waypoint::Service(_, v) => v,
1078
                Waypoint::Workload(_, v) => v,
1079
                _ => None,
1080
            }
1081
        }
1082
        fn service_attached(&self) -> Option<GatewayAddress> {
1083
            let Waypoint::Service(s, _) = self else {
1084
                return None;
1085
            };
1086
            Some(GatewayAddress {
1087
                destination: Destination::Address(NetworkAddress {
1088
                    network: strng::EMPTY,
1089
                    address: s.parse().expect("a valid waypoint IP"),
1090
                }),
1091
                hbone_mtls_port: 15008,
1092
            })
1093
        }
1094
1095
        fn workload_attached(&self) -> Option<GatewayAddress> {
1096
            let Waypoint::Workload(w, _) = self else {
1097
                return None;
1098
            };
1099
            Some(GatewayAddress {
1100
                destination: Destination::Address(NetworkAddress {
1101
                    network: strng::EMPTY,
1102
                    address: w.parse().expect("a valid waypoint IP"),
1103
                }),
1104
                hbone_mtls_port: 15008,
1105
            })
1106
        }
1107
    }
1108
1109
    #[test]
1110
    fn test_build_response_baggage_feature_gate() {
1111
        use super::build_response;
1112
        use crate::proxy::BAGGAGE_HEADER;
1113
        use crate::test_helpers;
1114
        use http::StatusCode;
1115
1116
        // Create a test workload
1117
        let workload = test_helpers::test_default_workload();
1118
1119
        // Test with baggage enabled
1120
        let mut config_enabled = test_helpers::test_config();
1121
        config_enabled.enable_enhanced_baggage = true;
1122
1123
        let response_enabled = build_response(
1124
            StatusCode::OK,
1125
            Some(&workload),
1126
            config_enabled.enable_enhanced_baggage,
1127
        );
1128
        assert!(response_enabled.headers().contains_key(BAGGAGE_HEADER));
1129
1130
        let baggage_header = response_enabled.headers().get(BAGGAGE_HEADER).unwrap();
1131
        let baggage_value = baggage_header.to_str().unwrap();
1132
        // Check that baggage header contains cluster_id from the test workload
1133
        assert!(baggage_value.contains("k8s.cluster.name=Kubernetes"));
1134
1135
        // Test with baggage disabled
1136
        let mut config_disabled = test_helpers::test_config();
1137
        config_disabled.enable_enhanced_baggage = false;
1138
1139
        let response_disabled = build_response(
1140
            StatusCode::OK,
1141
            Some(&workload),
1142
            config_disabled.enable_enhanced_baggage,
1143
        );
1144
        assert!(!response_disabled.headers().contains_key(BAGGAGE_HEADER));
1145
1146
        // Test with None workload (should not have baggage regardless of config)
1147
        let response_no_workload =
1148
            build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage);
1149
        assert!(!response_no_workload.headers().contains_key(BAGGAGE_HEADER));
1150
    }
1151
1152
    #[test]
1153
    fn test_incoming_baggage_parsing_feature_gate() {
1154
        use crate::baggage::{Baggage, parse_baggage_header};
1155
        use crate::proxy::BAGGAGE_HEADER;
1156
        use crate::test_helpers;
1157
        use http::{HeaderMap, HeaderValue};
1158
1159
        // Create mock baggage header
1160
        let mut headers = HeaderMap::new();
1161
        headers.insert(BAGGAGE_HEADER, HeaderValue::from_str("k8s.cluster.name=test-cluster,k8s.namespace.name=test-ns,k8s.deployment.name=test-app").unwrap());
1162
1163
        // Test with baggage enabled
1164
        let config_enabled = test_helpers::test_config();
1165
        assert!(config_enabled.enable_enhanced_baggage); // Default should be true
1166
1167
        let baggage_enabled = if config_enabled.enable_enhanced_baggage {
1168
            parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default()
1169
        } else {
1170
            Baggage::default()
1171
        };
1172
1173
        assert_eq!(baggage_enabled.cluster_id, Some("test-cluster".into()));
1174
        assert_eq!(baggage_enabled.namespace, Some("test-ns".into()));
1175
        assert_eq!(baggage_enabled.workload_name, Some("test-app".into()));
1176
1177
        // Test with baggage disabled
1178
        let mut config_disabled = test_helpers::test_config();
1179
        config_disabled.enable_enhanced_baggage = false;
1180
1181
        let baggage_disabled = if config_disabled.enable_enhanced_baggage {
1182
            parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default()
1183
        } else {
1184
            Baggage::default()
1185
        };
1186
1187
        assert_eq!(baggage_disabled.cluster_id, None);
1188
        assert_eq!(baggage_disabled.namespace, None);
1189
        assert_eq!(baggage_disabled.workload_name, None);
1190
    }
1191
}