/src/ztunnel/src/proxy/inbound.rs
Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use futures_util::TryFutureExt; |
16 | | use http::{Method, Response, StatusCode}; |
17 | | use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; |
18 | | use std::sync::Arc; |
19 | | use std::time::Instant; |
20 | | use tls_listener::AsyncTls; |
21 | | use tokio::sync::watch; |
22 | | |
23 | | use tracing::{Instrument, debug, error, info, info_span, trace_span}; |
24 | | |
25 | | use super::{ |
26 | | ConnectionResult, ConnectionResultBuilder, Error, HboneAddress, LocalWorkloadInformation, |
27 | | ResponseFlags, util, |
28 | | }; |
29 | | use crate::baggage::{baggage_header_val, parse_baggage_header}; |
30 | | use crate::identity::Identity; |
31 | | |
32 | | use crate::config::Config; |
33 | | use crate::drain::DrainWatcher; |
34 | | use crate::proxy::h2::server::{H2Request, RequestParts}; |
35 | | use crate::proxy::metrics::{ConnectionOpen, Reporter}; |
36 | | use crate::proxy::{ |
37 | | BAGGAGE_HEADER, ProxyInputs, TRACEPARENT_HEADER, TraceParent, X_FORWARDED_NETWORK_HEADER, |
38 | | metrics, |
39 | | }; |
40 | | use crate::rbac::Connection; |
41 | | use crate::socket::to_canonical; |
42 | | use crate::state::service::Service; |
43 | | use crate::{assertions, copy, handle_connection, proxy, socket, strng, tls}; |
44 | | |
45 | | use crate::drain::run_with_drain; |
46 | | use crate::proxy::h2; |
47 | | use crate::state::workload::address::Address; |
48 | | use crate::state::workload::application_tunnel::Protocol; |
49 | | use crate::state::workload::{self, NetworkAddress, Workload}; |
50 | | use crate::state::{DemandProxyState, ProxyRbacContext}; |
51 | | use crate::strng::Strng; |
52 | | use crate::tls::TlsError; |
53 | | |
54 | | pub struct Inbound { |
55 | | listener: socket::Listener, |
56 | | drain: DrainWatcher, |
57 | | pi: Arc<ProxyInputs>, |
58 | | enable_orig_src: bool, |
59 | | } |
60 | | |
61 | | impl Inbound { |
62 | 0 | pub(crate) async fn new(pi: Arc<ProxyInputs>, drain: DrainWatcher) -> Result<Inbound, Error> { |
63 | 0 | let listener = pi |
64 | 0 | .socket_factory |
65 | 0 | .tcp_bind(pi.cfg.inbound_addr) |
66 | 0 | .map_err(|e| Error::Bind(pi.cfg.inbound_addr, e))?; |
67 | 0 | let enable_orig_src = super::maybe_set_transparent(&pi, &listener)?; |
68 | | |
69 | 0 | info!( |
70 | 0 | address=%listener.local_addr(), |
71 | | component="inbound", |
72 | | transparent=enable_orig_src, |
73 | 0 | "listener established", |
74 | | ); |
75 | 0 | Ok(Inbound { |
76 | 0 | listener, |
77 | 0 | drain, |
78 | 0 | pi, |
79 | 0 | enable_orig_src, |
80 | 0 | }) |
81 | 0 | } |
82 | | |
83 | | /// Returns the socket address this proxy is listening on. |
84 | 0 | pub fn address(&self) -> SocketAddr { |
85 | 0 | self.listener.local_addr() |
86 | 0 | } |
87 | | |
88 | 0 | pub async fn run(self) { |
89 | 0 | let pi = self.pi.clone(); |
90 | 0 | let acceptor = InboundCertProvider { |
91 | 0 | local_workload: self.pi.local_workload_information.clone(), |
92 | 0 | crl_manager: self.pi.crl_manager.clone(), |
93 | 0 | }; |
94 | | |
95 | | // Safety: we set nodelay directly in tls_server, so it is safe to convert to a normal listener. |
96 | | // Although, that is *after* the TLS handshake; in theory we may get some benefits to setting it earlier. |
97 | | |
98 | 0 | let accept = async move |drain: DrainWatcher, force_shutdown: watch::Receiver<()>| { |
99 | | loop { |
100 | 0 | let (raw_socket, src) = match self.listener.accept().await { |
101 | 0 | Ok(raw_socket) => raw_socket, |
102 | 0 | Err(e) => { |
103 | 0 | if util::is_runtime_shutdown(&e) { |
104 | 0 | return; |
105 | 0 | } |
106 | 0 | error!("Failed TCP handshake {}", e); |
107 | 0 | continue; |
108 | | } |
109 | | }; |
110 | 0 | let src = to_canonical(src); |
111 | 0 | let start = Instant::now(); |
112 | 0 | let drain = drain.clone(); |
113 | 0 | let force_shutdown = force_shutdown.clone(); |
114 | 0 | let pi = self.pi.clone(); |
115 | 0 | let dst = to_canonical(raw_socket.local_addr().expect("local_addr available")); |
116 | 0 | let network = pi.cfg.network.clone(); |
117 | 0 | let acceptor = crate::tls::InboundAcceptor::new(acceptor.clone()); |
118 | | |
119 | 0 | let socket_labels = metrics::SocketLabels { |
120 | 0 | reporter: Reporter::destination, |
121 | 0 | }; |
122 | 0 | pi.metrics.record_socket_open(&socket_labels); |
123 | 0 | let metrics_for_socket_close = pi.metrics.clone(); |
124 | | |
125 | 0 | let serve_client = async move { |
126 | 0 | let _socket_guard = metrics::SocketCloseGuard::new( |
127 | 0 | metrics_for_socket_close, |
128 | 0 | Reporter::destination, |
129 | | ); |
130 | 0 | let tls = match acceptor.accept(raw_socket).await { |
131 | 0 | Ok(tls) => tls, |
132 | 0 | Err(e) => { |
133 | 0 | metrics::log_early_deny(src, dst, Reporter::destination, e); |
134 | | |
135 | 0 | return Err::<(), _>(proxy::Error::SelfCall); |
136 | | } |
137 | | }; |
138 | 0 | debug!(latency=?start.elapsed(), "accepted TLS connection"); |
139 | 0 | let (_, ssl) = tls.get_ref(); |
140 | 0 | let src_identity: Option<Identity> = tls::identity_from_connection(ssl); |
141 | 0 | let conn = Connection { |
142 | 0 | src_identity, |
143 | 0 | src, |
144 | 0 | dst_network: network.clone(), // inbound request must be on our network |
145 | 0 | dst, |
146 | 0 | }; |
147 | 0 | debug!(%conn, "accepted connection"); |
148 | 0 | let cfg = pi.cfg.clone(); |
149 | 0 | let request_handler = move |req| { |
150 | 0 | let id = Self::extract_traceparent(&req); |
151 | 0 | let peer = conn.src; |
152 | 0 | let req_handler = Self::serve_connect( |
153 | 0 | pi.clone(), |
154 | 0 | conn.clone(), |
155 | 0 | self.enable_orig_src, |
156 | 0 | req, |
157 | | ) |
158 | 0 | .instrument(info_span!("inbound", %id, %peer)); |
159 | | // This is for each user connection, so most important to keep small |
160 | 0 | assertions::size_between_ref(1500, 2500, &req_handler); |
161 | 0 | req_handler |
162 | 0 | }; |
163 | | |
164 | 0 | let serve_conn = h2::server::serve_connection( |
165 | 0 | cfg, |
166 | 0 | tls, |
167 | 0 | drain, |
168 | 0 | force_shutdown, |
169 | 0 | request_handler, |
170 | | ); |
171 | | // This is per HBONE connection, so while would be nice to be small, at least it |
172 | | // is pooled so typically fewer of these. |
173 | 0 | let serve = Box::pin(assertions::size_between(6000, 8000, serve_conn)); |
174 | 0 | serve.await |
175 | 0 | }; |
176 | | // This is small since it only handles the TLS layer -- the HTTP2 layer is boxed |
177 | | // and measured above. |
178 | 0 | assertions::size_between_ref(1000, 1600, &serve_client); |
179 | 0 | tokio::task::spawn(serve_client.in_current_span()); |
180 | | } |
181 | 0 | }; Unexecuted instantiation: <ztunnel::proxy::inbound::Inbound>::run::{closure#0}::{closure#0}Unexecuted instantiation: <ztunnel::proxy::inbound::Inbound>::run::{closure#0}::{closure#0}::{closure#0}::<_> |
182 | | |
183 | 0 | run_with_drain( |
184 | 0 | "inbound".to_string(), |
185 | 0 | self.drain, |
186 | 0 | pi.cfg.self_termination_deadline, |
187 | 0 | accept, |
188 | 0 | ) |
189 | 0 | .await |
190 | 0 | } |
191 | | |
192 | 0 | fn extract_traceparent(req: &H2Request) -> TraceParent { |
193 | 0 | req.headers() |
194 | 0 | .get(TRACEPARENT_HEADER) |
195 | 0 | .and_then(|b| b.to_str().ok()) |
196 | 0 | .and_then(|b| TraceParent::try_from(b).ok()) |
197 | 0 | .unwrap_or_else(TraceParent::new) |
198 | 0 | } |
199 | | |
200 | | /// serve_connect handles a single connection from a client. |
201 | | #[allow(clippy::too_many_arguments)] |
202 | 0 | async fn serve_connect( |
203 | 0 | pi: Arc<ProxyInputs>, |
204 | 0 | conn: Connection, |
205 | 0 | enable_original_source: bool, |
206 | 0 | req: H2Request, |
207 | 0 | ) { |
208 | 0 | let src = conn.src; |
209 | 0 | let dst = conn.dst; |
210 | | |
211 | 0 | debug!(%conn, ?req, "received request"); |
212 | | |
213 | | // In order to ensure we properly handle all errors, we split up serving inbound request into a few |
214 | | // phases. |
215 | | |
216 | | // Initial phase, build up context about the request. |
217 | 0 | let ri = match Self::build_inbound_request(&pi, conn, req.get_request()).await { |
218 | 0 | Ok(i) => i, |
219 | 0 | Err(InboundError(e, code)) => { |
220 | | // At this point in processing, we never built up full context to log a complete access log. |
221 | | // Instead, just log a minimal error line. |
222 | 0 | metrics::log_early_deny(src, dst, Reporter::destination, e); |
223 | 0 | if let Err(err) = |
224 | 0 | req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) |
225 | | { |
226 | 0 | tracing::warn!("failed to send HTTP response: {err}"); |
227 | 0 | } |
228 | 0 | return; |
229 | | } |
230 | | }; |
231 | | |
232 | | // Now we have enough context to properly report logs and metrics. Group everything else that |
233 | | // can fail before we send the OK response here. |
234 | 0 | let rx = async { |
235 | | // Define a connection guard to ensure rbac conditions are maintained for the duration of the connection |
236 | 0 | let conn_guard = pi |
237 | 0 | .connection_manager |
238 | 0 | .assert_rbac(&pi.state, &ri.rbac_ctx, ri.for_host) |
239 | 0 | .await |
240 | 0 | .map_err(InboundFlagError::build( |
241 | | StatusCode::UNAUTHORIZED, |
242 | 0 | ResponseFlags::AuthorizationPolicyDenied, |
243 | 0 | ))?; |
244 | | |
245 | | // app tunnels should only bind to localhost to prevent |
246 | | // being accessed without going through ztunnel |
247 | 0 | let localhost_tunnel = pi.cfg.localhost_app_tunnel |
248 | 0 | && ri |
249 | 0 | .tunnel_request |
250 | 0 | .as_ref() |
251 | 0 | .map(|tr| tr.protocol.supports_localhost_send()) |
252 | 0 | .unwrap_or(false); |
253 | 0 | let (src, dst) = if localhost_tunnel { |
254 | | // guess the family based on the destination address |
255 | 0 | let loopback = match ri.upstream_addr { |
256 | 0 | SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), |
257 | 0 | SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), |
258 | | }; |
259 | | |
260 | | // we must bind the src to be localhost when sending to localhost, |
261 | | // or various components could break traffic (RPF, iptables, ip route) |
262 | | // the original source is preserved within PROXY protocol |
263 | 0 | ( |
264 | 0 | Some(loopback), |
265 | 0 | SocketAddr::new(loopback, ri.upstream_addr.port()), |
266 | 0 | ) |
267 | | } else { |
268 | | // When ztunnel is proxying to its own internal endpoints (metrics server after HBONE termination), |
269 | | // we must not attempt to use the original external client's IP as the source for this internal connection. |
270 | | // Setting `disable_inbound_freebind` to true for such self-proxy scenarios ensures `upstream_src_ip` is `None`, |
271 | | // causing `freebind_connect` to use a local IP for the connection to ztunnel's own service. |
272 | | // For regular inbound traffic to other workloads, `disable_inbound_freebind` is false, and original source |
273 | | // preservation depends on `enable_original_source`. |
274 | 0 | let upstream_src_ip = if pi.disable_inbound_freebind { |
275 | 0 | None |
276 | | } else { |
277 | 0 | enable_original_source.then_some(ri.rbac_ctx.conn.src.ip()) |
278 | | }; |
279 | 0 | (upstream_src_ip, ri.upstream_addr) |
280 | | }; |
281 | | |
282 | | // Establish upstream connection between original source and destination |
283 | | // We are allowing a bind to the original source address locally even if the ip address isn't on this node. |
284 | 0 | let stream = super::freebind_connect(src, dst, pi.socket_factory.as_ref()) |
285 | 0 | .await |
286 | 0 | .map_err(Error::ConnectionFailed) |
287 | 0 | .map_err(InboundFlagError::build( |
288 | | StatusCode::SERVICE_UNAVAILABLE, |
289 | 0 | ResponseFlags::ConnectionFailure, |
290 | 0 | ))?; |
291 | 0 | debug!("connected to: {}", ri.upstream_addr); |
292 | 0 | Ok((conn_guard, stream)) |
293 | 0 | }; |
294 | | // Wait on establishing the upstream connection and connection guard before sending the 200 response to the client |
295 | 0 | let (mut conn_guard, mut stream) = match rx.await { |
296 | 0 | Ok(res) => res, |
297 | 0 | Err(InboundFlagError(err, flag, code)) => { |
298 | 0 | ri.result_tracker.record_with_flag(Err(err), flag); |
299 | 0 | if let Err(err) = |
300 | 0 | req.send_error(build_response(code, None, pi.cfg.enable_enhanced_baggage)) |
301 | | { |
302 | 0 | tracing::warn!("failed to send HTTP response: {err}"); |
303 | 0 | } |
304 | 0 | return; |
305 | | } |
306 | | }; |
307 | | |
308 | | // At this point, we established the upstream connection and need to send a 200 back to the client. |
309 | | // we may still have failures at this point during the proxying, but we don't need to send these |
310 | | // at the HTTP layer. |
311 | | // Send a 200 back to the client and start forwarding traffic. |
312 | | // |
313 | | // If requested, we may start the stream with a PROXY protocol header. This ensures |
314 | | // that the server has all of the necessary information about the connection regardless of the protocol |
315 | | // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information about the |
316 | | // proxy protocol. |
317 | 0 | let send = req |
318 | 0 | .send_response(build_response( |
319 | | StatusCode::OK, |
320 | 0 | Some(ri.destination_workload.as_ref()), |
321 | 0 | pi.cfg.enable_enhanced_baggage, |
322 | | )) |
323 | 0 | .and_then(|h2_stream| async { |
324 | | if let Some(TunnelRequest { |
325 | | protocol: Protocol::PROXY, |
326 | 0 | tunnel_target, |
327 | 0 | }) = ri.tunnel_request |
328 | | { |
329 | | let Connection { |
330 | 0 | src, src_identity, .. |
331 | | } = ri.rbac_ctx.conn; |
332 | 0 | super::write_proxy_protocol(&mut stream, (src, tunnel_target), src_identity) |
333 | 0 | .instrument(trace_span!("proxy protocol")) |
334 | 0 | .await?; |
335 | 0 | } |
336 | 0 | copy::copy_bidirectional( |
337 | 0 | h2_stream, |
338 | 0 | copy::TcpStreamSplitter(stream), |
339 | 0 | &ri.result_tracker, |
340 | | ) |
341 | 0 | .instrument(trace_span!("hbone server")) |
342 | 0 | .await |
343 | 0 | }); |
344 | 0 | let res = handle_connection!(conn_guard, send); |
345 | 0 | ri.result_tracker.record(res); |
346 | 0 | } |
347 | | |
348 | | // build_inbound_request builds up the context for an inbound request. |
349 | 0 | async fn build_inbound_request<T: RequestParts>( |
350 | 0 | pi: &Arc<ProxyInputs>, |
351 | 0 | conn: Connection, |
352 | 0 | req: &T, |
353 | 0 | ) -> Result<InboundRequest, InboundError> { |
354 | 0 | if req.method() != Method::CONNECT { |
355 | 0 | let e = Error::NonConnectMethod(req.method().to_string()); |
356 | 0 | return Err(InboundError(e, StatusCode::BAD_REQUEST)); |
357 | 0 | } |
358 | | |
359 | 0 | let start = Instant::now(); |
360 | | |
361 | | // Extract the host or IP from the authority pseudo-header of the URI |
362 | 0 | let hbone_addr: HboneAddress = req |
363 | 0 | .uri() |
364 | 0 | .try_into() |
365 | 0 | .map_err(InboundError::build(StatusCode::BAD_REQUEST))?; |
366 | | |
367 | | // Get the destination workload information of the destination pods (wds) workload (not destination ztunnel) |
368 | 0 | let destination_workload = pi |
369 | 0 | .local_workload_information |
370 | 0 | .get_workload() |
371 | 0 | .await |
372 | | // At this point we already fetched the local workload for TLS, so it should be infallible. |
373 | 0 | .map_err(InboundError::build(StatusCode::SERVICE_UNAVAILABLE))?; |
374 | | |
375 | | // Check the request is allowed by verifying the destination |
376 | 0 | Self::validate_destination(&pi.state, &conn, &destination_workload, &hbone_addr) |
377 | 0 | .await |
378 | 0 | .map_err(InboundError::build(StatusCode::BAD_REQUEST))?; |
379 | | |
380 | | // Determine the next hop. |
381 | 0 | let (upstream_addr, tunnel_request, upstream_service) = Self::find_inbound_upstream( |
382 | 0 | &pi.cfg, |
383 | 0 | &pi.state, |
384 | 0 | &conn, |
385 | 0 | &destination_workload, |
386 | 0 | &hbone_addr, |
387 | | ) |
388 | 0 | .map_err(InboundError::build(StatusCode::SERVICE_UNAVAILABLE))?; |
389 | | |
390 | 0 | let original_dst = conn.dst; |
391 | | // Connection has 15008, swap with the real port |
392 | 0 | let conn = Connection { |
393 | 0 | dst: upstream_addr, |
394 | 0 | ..conn |
395 | 0 | }; |
396 | | |
397 | 0 | let rbac_ctx = ProxyRbacContext { |
398 | 0 | conn, |
399 | 0 | dest_workload: destination_workload.clone(), |
400 | 0 | }; |
401 | | |
402 | 0 | let for_host = parse_forwarded_host(req); |
403 | 0 | let baggage = if pi.cfg.enable_enhanced_baggage { |
404 | 0 | parse_baggage_header(req.headers().get_all(BAGGAGE_HEADER)).unwrap_or_default() |
405 | | } else { |
406 | 0 | Default::default() |
407 | | }; |
408 | | |
409 | | // We assume it is from gateway if it's a hostname request. |
410 | | // We may need a more explicit indicator in the future. |
411 | | // Note: previously this attempted to check that the src identity was equal to the Gateway; |
412 | | // this check is broken as the gateway only forwards an HBONE request, it doesn't initiate it itself. |
413 | 0 | let from_gateway = req |
414 | 0 | .headers() |
415 | 0 | .get(X_FORWARDED_NETWORK_HEADER) |
416 | 0 | .and_then(|h| h.to_str().ok()) |
417 | 0 | .map(|s| !s.eq_ignore_ascii_case(&pi.cfg.network)) // If the network is different, it's from a gateway |
418 | 0 | .unwrap_or(false); |
419 | | |
420 | 0 | if from_gateway { |
421 | 0 | debug!("request from gateway"); |
422 | 0 | } |
423 | 0 | let source = match from_gateway { |
424 | | // we cannot lookup source workload since we don't know the network, see https://github.com/istio/ztunnel/issues/515. |
425 | | // Instead, we will use baggage |
426 | 0 | true => None, |
427 | | false => { |
428 | 0 | let src_network_addr = NetworkAddress { |
429 | 0 | // we can assume source network is our network because we did not traverse a gateway |
430 | 0 | network: rbac_ctx.conn.dst_network.clone(), |
431 | 0 | address: rbac_ctx.conn.src.ip(), |
432 | 0 | }; |
433 | | // Find source info. We can lookup by XDS or from connection attributes |
434 | 0 | pi.state.fetch_workload_by_address(&src_network_addr).await |
435 | | } |
436 | | }; |
437 | | |
438 | 0 | let derived_source = if pi.cfg.enable_enhanced_baggage { |
439 | 0 | metrics::DerivedWorkload { |
440 | 0 | identity: rbac_ctx.conn.src_identity.clone(), |
441 | 0 | cluster_id: baggage.cluster_id, |
442 | 0 | region: baggage.region, |
443 | 0 | zone: baggage.zone, |
444 | 0 | namespace: baggage.namespace, |
445 | 0 | app: baggage.service_name, |
446 | 0 | workload_name: baggage.workload_name, |
447 | 0 | revision: baggage.revision, |
448 | 0 | } |
449 | | } else { |
450 | 0 | metrics::DerivedWorkload { |
451 | 0 | identity: rbac_ctx.conn.src_identity.clone(), |
452 | 0 | ..Default::default() |
453 | 0 | } |
454 | | }; |
455 | 0 | let ds = proxy::guess_inbound_service( |
456 | 0 | &rbac_ctx.conn, |
457 | 0 | &for_host, |
458 | 0 | upstream_service, |
459 | 0 | &destination_workload, |
460 | | ); |
461 | 0 | let connection_result_builder = ConnectionResultBuilder::new( |
462 | 0 | rbac_ctx.conn.src, |
463 | | // For consistency with outbound logs, report the original destination (with 15008 port) |
464 | | // as dst.addr, and the target address as dst.hbone_addr |
465 | 0 | original_dst, |
466 | 0 | Some(hbone_addr.clone()), |
467 | 0 | start, |
468 | 0 | ConnectionOpen { |
469 | 0 | reporter: Reporter::destination, |
470 | 0 | source, |
471 | 0 | derived_source: Some(derived_source), |
472 | 0 | destination: Some(destination_workload.clone()), |
473 | 0 | connection_security_policy: metrics::SecurityPolicy::mutual_tls, |
474 | 0 | destination_service: ds, |
475 | 0 | }, |
476 | 0 | pi.metrics.clone(), |
477 | | ); |
478 | | |
479 | 0 | let result_tracker = Box::new(connection_result_builder.build()); |
480 | 0 | Ok(InboundRequest { |
481 | 0 | for_host, |
482 | 0 | rbac_ctx, |
483 | 0 | result_tracker, |
484 | 0 | upstream_addr, |
485 | 0 | tunnel_request, |
486 | 0 | destination_workload, |
487 | 0 | }) |
488 | 0 | } |
489 | | |
490 | | // Selects a service by hostname without the explicit knowledge of the namespace |
491 | | // There is no explicit mapping from hostname to namespace (e.g. foo.com) |
492 | 0 | fn find_service_by_hostname( |
493 | 0 | state: &DemandProxyState, |
494 | 0 | local_workload: &Workload, |
495 | 0 | hbone_host: &Strng, |
496 | 0 | ) -> Result<Arc<Service>, Error> { |
497 | 0 | state |
498 | 0 | .read() |
499 | 0 | .services |
500 | 0 | .get_best_by_host(hbone_host, Some(&local_workload.namespace)) |
501 | 0 | .ok_or_else(|| Error::NoHostname(hbone_host.to_string())) |
502 | 0 | } |
503 | | |
504 | | /// validate_destination ensures the destination is an allowed request. |
505 | 0 | async fn validate_destination( |
506 | 0 | state: &DemandProxyState, |
507 | 0 | conn: &Connection, |
508 | 0 | local_workload: &Workload, |
509 | 0 | hbone_addr: &HboneAddress, |
510 | 0 | ) -> Result<(), Error> { |
511 | 0 | let HboneAddress::SocketAddr(hbone_addr) = hbone_addr else { |
512 | | // This is a hostname - it is valid. We may not find the hostname, at which point we will fail later |
513 | 0 | return Ok(()); |
514 | | }; |
515 | 0 | if conn.dst.ip() == hbone_addr.ip() { |
516 | | // Normal case: both are aligned. This is allowed (we really only need the HBONE address for the port.) |
517 | 0 | return Ok(()); |
518 | 0 | } |
519 | 0 | if local_workload.application_tunnel.is_some() { |
520 | | // In the case they have their own tunnel, they will get the HBONE target address in the PROXY |
521 | | // header, and their application can decide what to do with it; we don't validate this. |
522 | | // This is the case, for instance, with a waypoint using PROXY. |
523 | 0 | return Ok(()); |
524 | 0 | } |
525 | | // There still may be the case where we are doing a "waypoint sandwich" but not using any tunnel. |
526 | | // Presumably, the waypoint is only matching on L7 attributes. |
527 | | // We want to make sure in this case we don't deny the requests just because the HBONE destination |
528 | | // mismatches (though we will, essentially, ignore the address). |
529 | | // To do this, we do a lookup to see if the HBONE target has us as its waypoint. |
530 | 0 | let hbone_dst = &NetworkAddress { |
531 | 0 | network: conn.dst_network.clone(), |
532 | 0 | address: hbone_addr.ip(), |
533 | 0 | }; |
534 | | |
535 | | // None means we need to do on-demand lookup |
536 | 0 | let lookup_is_destination_this_waypoint = || -> Option<bool> { |
537 | 0 | let state = state.read(); |
538 | | |
539 | | // TODO Allow HBONE address to be a hostname. We have to respect rules about |
540 | | // hostname scoping. Can we use the client's namespace here to do that? |
541 | 0 | let hbone_target = state.find_address(hbone_dst)?; |
542 | | |
543 | | // HBONE target can point to some service or workload. In either case, get the waypoint |
544 | 0 | let Some(target_waypoint) = (match hbone_target { |
545 | 0 | Address::Service(ref svc) => &svc.waypoint, |
546 | 0 | Address::Workload(ref wl) => &wl.waypoint, |
547 | | }) else { |
548 | | // Target has no waypoint |
549 | 0 | return Some(false); |
550 | | }; |
551 | | |
552 | | // Resolve the reference from our HBONE target |
553 | 0 | let Some(target_waypoint) = state.find_destination(&target_waypoint.destination) else { |
554 | 0 | return Some(false); |
555 | | }; |
556 | | |
557 | | // Validate that the HBONE target references the Waypoint we're connecting to |
558 | 0 | Some(match target_waypoint { |
559 | 0 | Address::Service(svc) => svc.contains_endpoint(local_workload), |
560 | 0 | Address::Workload(wl) => wl.workload_ips.contains(&conn.dst.ip()), |
561 | | }) |
562 | 0 | }; |
563 | | |
564 | 0 | let res = match lookup_is_destination_this_waypoint() { |
565 | 0 | Some(r) => Some(r), |
566 | | None => { |
567 | 0 | if !state.supports_on_demand() { |
568 | 0 | None |
569 | | } else { |
570 | 0 | state |
571 | 0 | .fetch_on_demand(strng::new(hbone_dst.to_string())) |
572 | 0 | .await; |
573 | 0 | lookup_is_destination_this_waypoint() |
574 | | } |
575 | | } |
576 | | }; |
577 | | |
578 | 0 | if res.is_none() || res == Some(false) { |
579 | 0 | return Err(Error::IPMismatch(conn.dst.ip(), hbone_addr.ip())); |
580 | 0 | } |
581 | 0 | Ok(()) |
582 | 0 | } |
583 | | |
584 | | /// find_inbound_upstream determines the next hop for an inbound request. |
585 | | #[expect(clippy::type_complexity)] |
586 | 0 | pub(super) fn find_inbound_upstream( |
587 | 0 | cfg: &Config, |
588 | 0 | state: &DemandProxyState, |
589 | 0 | conn: &Connection, |
590 | 0 | local_workload: &Workload, |
591 | 0 | hbone_addr: &HboneAddress, |
592 | 0 | ) -> Result<(SocketAddr, Option<TunnelRequest>, Vec<Arc<Service>>), Error> { |
593 | | // We always target the local workload IP as the destination. But we need to determine the port to send to. |
594 | 0 | let target_ip = conn.dst.ip(); |
595 | | |
596 | | // First, fetch the actual target SocketAddr as well as all possible services this could be for. |
597 | | // Given they may request the pod directly, there may be multiple possible services; we will |
598 | | // select a final one (if any) later. |
599 | 0 | let (dest, services) = match hbone_addr { |
600 | 0 | HboneAddress::SvcHostname(hostname, service_port) => { |
601 | | // Request is to a hostname. This must be a service. |
602 | | // We know the destination IP already (since this is inbound, we just need to forward it), |
603 | | // but will need to resolve the port from service port to target port. |
604 | 0 | let svc = Self::find_service_by_hostname(state, local_workload, hostname)?; |
605 | | |
606 | 0 | let endpoint_port = svc |
607 | 0 | .endpoints |
608 | 0 | .get(&local_workload.uid) |
609 | 0 | .and_then(|ep| ep.port.get(service_port)); |
610 | | // If we can get the port from the endpoint, that is ideal. But we may not, which is fine |
611 | | // if the service has a number target port (rather than named). |
612 | 0 | let port = if let Some(&ep_port) = endpoint_port { |
613 | 0 | ep_port |
614 | | } else { |
615 | 0 | let service_target_port = |
616 | 0 | svc.ports.get(service_port).copied().unwrap_or_default(); |
617 | 0 | if service_target_port == 0 { |
618 | 0 | return Err(Error::NoPortForServices( |
619 | 0 | hostname.to_string(), |
620 | 0 | *service_port, |
621 | 0 | )); |
622 | 0 | } |
623 | 0 | service_target_port |
624 | | }; |
625 | 0 | (SocketAddr::new(target_ip, port), vec![svc]) |
626 | | } |
627 | 0 | HboneAddress::SocketAddr(hbone_addr) => ( |
628 | 0 | SocketAddr::new(target_ip, hbone_addr.port()), |
629 | 0 | state.get_services_by_workload(local_workload), |
630 | 0 | ), |
631 | | }; |
632 | | |
633 | | // Check for illegal calls now that we have resolved to the final destination. |
634 | | // We need to do this here, rather than `validate_destination`, since the former doesn't |
635 | | // have access to the resolved service port. |
636 | 0 | if cfg.illegal_ports.contains(&dest.port()) { |
637 | 0 | return Err(Error::SelfCall); |
638 | 0 | } |
639 | | |
640 | | // Application tunnel may override the port. |
641 | 0 | let (target, tunnel) = match local_workload.application_tunnel.clone() { |
642 | 0 | Some(workload::ApplicationTunnel { port, protocol }) => { |
643 | | // We may need to override the target port. For instance, we may send all PROXY |
644 | | // traffic over a dedicated port like 15088. |
645 | 0 | let new_target = SocketAddr::new(dest.ip(), port.unwrap_or(dest.port())); |
646 | | // Note: the logic to decide which destination address to set inside the PROXY headers |
647 | | // is handled outside of this call. This just determines that location we actually send the |
648 | | // connection to. |
649 | | |
650 | | // Which address we will send in the tunnel |
651 | 0 | let tunnel_target = match hbone_addr { |
652 | 0 | HboneAddress::SvcHostname(h, port) => { |
653 | | // PROXY cannot currently send to hostnames, so we will need to select an IP to |
654 | | // use instead |
655 | | // We ensure a service is set above. |
656 | 0 | let vip = services |
657 | 0 | .first() |
658 | 0 | .expect("service must exist") |
659 | 0 | .vips |
660 | 0 | .iter() |
661 | 0 | .max_by_key(|a| match a.network == conn.dst_network { |
662 | | true => { |
663 | | // Defer to IPv4 if present |
664 | 0 | match a.address.is_ipv4() { |
665 | 0 | true => 2, |
666 | 0 | false => 1, |
667 | | } |
668 | | } |
669 | 0 | false => 0, |
670 | 0 | }) |
671 | 0 | .ok_or_else(|| Error::NoIPForService(h.to_string()))?; |
672 | 0 | SocketAddr::new(vip.address, *port) |
673 | | } |
674 | 0 | HboneAddress::SocketAddr(s) => *s, |
675 | | }; |
676 | 0 | ( |
677 | 0 | new_target, |
678 | 0 | Some(TunnelRequest { |
679 | 0 | tunnel_target, |
680 | 0 | protocol, |
681 | 0 | }), |
682 | 0 | ) |
683 | | } |
684 | 0 | None => (dest, None), |
685 | | }; |
686 | 0 | Ok((target, tunnel, services)) |
687 | 0 | } |
688 | | } |
689 | | |
690 | | #[derive(Debug)] |
691 | | pub(super) struct TunnelRequest { |
692 | | tunnel_target: SocketAddr, |
693 | | protocol: Protocol, |
694 | | } |
695 | | |
696 | | #[derive(Debug)] |
697 | | struct InboundRequest { |
698 | | for_host: Option<String>, |
699 | | rbac_ctx: ProxyRbacContext, |
700 | | result_tracker: Box<ConnectionResult>, |
701 | | upstream_addr: SocketAddr, |
702 | | tunnel_request: Option<TunnelRequest>, |
703 | | destination_workload: Arc<Workload>, |
704 | | } |
705 | | |
706 | | /// InboundError represents an error with an associated status code. |
707 | | #[derive(Debug)] |
708 | | struct InboundError(Error, StatusCode); |
709 | | impl InboundError { |
710 | 0 | pub fn build(code: StatusCode) -> impl Fn(Error) -> Self { |
711 | 0 | move |err| InboundError(err, code) |
712 | 0 | } |
713 | | } |
714 | | |
715 | | struct InboundFlagError(Error, ResponseFlags, StatusCode); |
716 | | impl InboundFlagError { |
717 | 0 | pub fn build(code: StatusCode, flag: ResponseFlags) -> impl Fn(Error) -> Self { |
718 | 0 | move |err| InboundFlagError(err, flag, code) |
719 | 0 | } |
720 | | } |
721 | | |
722 | | #[derive(Clone)] |
723 | | struct InboundCertProvider { |
724 | | local_workload: Arc<LocalWorkloadInformation>, |
725 | | crl_manager: Option<Arc<tls::crl::CrlManager>>, |
726 | | } |
727 | | |
728 | | #[async_trait::async_trait] |
729 | | impl crate::tls::ServerCertProvider for InboundCertProvider { |
730 | 0 | async fn fetch_cert(&mut self) -> Result<Arc<rustls::ServerConfig>, TlsError> { |
731 | 0 | debug!( |
732 | 0 | identity=%self.local_workload.workload_info(), |
733 | 0 | "fetching cert" |
734 | | ); |
735 | 0 | let cert = self.local_workload.fetch_certificate().await?; |
736 | 0 | Ok(Arc::new(cert.server_config(self.crl_manager.clone())?)) |
737 | 0 | } Unexecuted instantiation: <ztunnel::proxy::inbound::InboundCertProvider as ztunnel::tls::lib::ServerCertProvider>::fetch_cert::{closure#0}Unexecuted instantiation: <ztunnel::proxy::inbound::InboundCertProvider as ztunnel::tls::lib::ServerCertProvider>::fetch_cert |
738 | | } |
739 | | |
740 | 0 | pub fn parse_forwarded_host<T: RequestParts>(req: &T) -> Option<String> { |
741 | 0 | req.headers() |
742 | 0 | .get(http::header::FORWARDED) |
743 | 0 | .and_then(|rh| rh.to_str().ok()) |
744 | 0 | .and_then(proxy::parse_forwarded_host) |
745 | 0 | } |
746 | | |
747 | | // Second argument is local workload and cluster name |
748 | 0 | fn build_response( |
749 | 0 | status: StatusCode, |
750 | 0 | local_wl: Option<&Workload>, |
751 | 0 | enable_response_baggage: bool, |
752 | 0 | ) -> Response<()> { |
753 | 0 | let mut builder = Response::builder().status(status); |
754 | | |
755 | 0 | if let Some(local_wl) = local_wl |
756 | 0 | && enable_response_baggage |
757 | | { |
758 | 0 | builder = builder.header( |
759 | 0 | BAGGAGE_HEADER, |
760 | 0 | baggage_header_val(&local_wl.baggage(), &local_wl.workload_type), |
761 | 0 | ) |
762 | 0 | } |
763 | | |
764 | 0 | builder |
765 | 0 | .body(()) |
766 | 0 | .expect("builder with known status code should not fail") |
767 | 0 | } |
768 | | |
769 | | #[cfg(test)] |
770 | | #[allow(clippy::too_many_arguments)] |
771 | | mod tests { |
772 | | use super::{Inbound, ProxyInputs}; |
773 | | use crate::{ |
774 | | config, |
775 | | identity::manager::mock::new_secret_manager, |
776 | | proxy::{ |
777 | | ConnectionManager, DefaultSocketFactory, LocalWorkloadInformation, |
778 | | h2::server::RequestParts, inbound::HboneAddress, |
779 | | }, |
780 | | rbac::Connection, |
781 | | state::{ |
782 | | self, DemandProxyState, WorkloadInfo, |
783 | | service::{Endpoint, EndpointSet, Service}, |
784 | | workload::{ |
785 | | ApplicationTunnel, GatewayAddress, HealthStatus, InboundProtocol, NetworkAddress, |
786 | | NetworkMode, Workload, application_tunnel::Protocol as AppProtocol, |
787 | | gatewayaddress::Destination, |
788 | | }, |
789 | | }, |
790 | | strng, test_helpers, |
791 | | }; |
792 | | use hickory_resolver::config::{ResolverConfig, ResolverOpts}; |
793 | | use http::{Method, Uri}; |
794 | | use prometheus_client::registry::Registry; |
795 | | use std::{ |
796 | | net::SocketAddr, |
797 | | sync::{Arc, RwLock}, |
798 | | time::Duration, |
799 | | }; |
800 | | use test_case::test_case; |
801 | | |
802 | | const CLIENT_POD_IP: &str = "10.0.0.1"; |
803 | | |
804 | | const SERVER_POD_IP: &str = "10.0.0.2"; |
805 | | const SERVER_SVC_IP: &str = "10.10.0.1"; |
806 | | |
807 | | const SERVER_POD_HOSTNAME: &str = "server.default.svc.cluster.local"; |
808 | | |
809 | | const WAYPOINT_POD_IP: &str = "10.0.0.3"; |
810 | | const WAYPOINT_SVC_IP: &str = "10.10.0.2"; |
811 | | |
812 | | const SERVER_PORT: u16 = 80; |
813 | | const TARGET_PORT: u16 = 8080; |
814 | | const PROXY_PORT: u16 = 15088; |
815 | | |
816 | | const APP_TUNNEL_PROXY: Option<ApplicationTunnel> = Some(ApplicationTunnel { |
817 | | port: Some(PROXY_PORT), |
818 | | protocol: AppProtocol::PROXY, |
819 | | }); |
820 | | |
821 | | struct MockParts { |
822 | | method: Method, |
823 | | uri: Uri, |
824 | | headers: http::HeaderMap<http::HeaderValue>, |
825 | | } |
826 | | |
827 | | impl RequestParts for MockParts { |
828 | | fn uri(&self) -> &http::Uri { |
829 | | &self.uri |
830 | | } |
831 | | |
832 | | fn method(&self) -> &http::Method { |
833 | | &self.method |
834 | | } |
835 | | |
836 | | fn headers(&self) -> &http::HeaderMap<http::HeaderValue> { |
837 | | &self.headers |
838 | | } |
839 | | } |
840 | | |
841 | | // Regular zTunnel workload traffic inbound |
842 | | #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_IP, Some((SERVER_POD_IP, TARGET_PORT)); "to workload no waypoint")] |
843 | | // Svc hostname |
844 | | #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_HOSTNAME, Some((SERVER_POD_IP, TARGET_PORT)); "svc hostname to workload no waypoint")] |
845 | | // Sandwiched Waypoint Cases |
846 | | // to workload traffic |
847 | | #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to workload with waypoint referenced by pod")] |
848 | | #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to workload with waypoint referenced by vip")] |
849 | | #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_POD_IP , Some((WAYPOINT_POD_IP, PROXY_PORT)); "to workload with app tunnel")] |
850 | | // to service traffic |
851 | | #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to service with waypoint referenced by pod")] |
852 | | #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, TARGET_PORT)); "to service with waypint referenced by vip")] |
853 | | #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_SVC_IP , Some((WAYPOINT_POD_IP, PROXY_PORT)); "to service with app tunnel")] |
854 | | // Override port via app_protocol |
855 | | // Error cases |
856 | | #[test_case(Waypoint::None, SERVER_POD_IP, CLIENT_POD_IP, None; "to server ip mismatch" )] |
857 | | #[test_case(Waypoint::None, WAYPOINT_POD_IP, CLIENT_POD_IP, None; "to waypoint without attachment" )] |
858 | | #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , None; "to workload via waypoint with wrong attachment")] |
859 | | #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , None; "to service via waypoint with wrong attachment")] |
860 | | #[tokio::test] |
861 | | async fn test_find_inbound_upstream( |
862 | | target_waypoint: Waypoint<'_>, |
863 | | connection_dst: &str, |
864 | | hbone_dst: &str, |
865 | | want: Option<(&str, u16)>, |
866 | | ) { |
867 | | let state = test_state(target_waypoint).expect("state setup"); |
868 | | let cfg = config::parse_config().unwrap(); |
869 | | let conn = Connection { |
870 | | src_identity: None, |
871 | | src: format!("{CLIENT_POD_IP}:1234").parse().unwrap(), |
872 | | dst_network: "".into(), |
873 | | dst: format!("{connection_dst}:15008").parse().unwrap(), |
874 | | }; |
875 | | let local_wl = state |
876 | | .fetch_workload_by_address(&NetworkAddress { |
877 | | network: "".into(), |
878 | | address: conn.dst.ip(), |
879 | | }) |
880 | | .await |
881 | | .unwrap(); |
882 | | let hbone_addr = |
883 | | if let Ok(addr) = format!("{hbone_dst}:{TARGET_PORT}").parse::<SocketAddr>() { |
884 | | HboneAddress::SocketAddr(addr) |
885 | | } else { |
886 | | HboneAddress::SvcHostname(hbone_dst.into(), SERVER_PORT) |
887 | | }; |
888 | | |
889 | | let validate_destination = |
890 | | Inbound::validate_destination(&state, &conn, &local_wl, &hbone_addr).await; |
891 | | let res = Inbound::find_inbound_upstream(&cfg, &state, &conn, &local_wl, &hbone_addr); |
892 | | |
893 | | match want { |
894 | | Some((ip, port)) => { |
895 | | let got_addr = res.expect("no error").0; |
896 | | assert_eq!(got_addr, SocketAddr::new(ip.parse().unwrap(), port)); |
897 | | } |
898 | | None => { |
899 | | validate_destination.expect_err("did not find upstream"); |
900 | | } |
901 | | } |
902 | | } |
903 | | |
904 | | // Regular zTunnel workload traffic inbound |
905 | | #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((SERVER_POD_IP, TARGET_PORT, None)); "to workload no waypoint")] |
906 | | // Svc hostname |
907 | | #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_HOSTNAME, SERVER_PORT, Some((SERVER_POD_IP, TARGET_PORT, None)); "svc hostname to workload no waypoint")] |
908 | | // Sandwiched Waypoint Cases |
909 | | // to workload traffic |
910 | | #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to workload with waypoint referenced by pod")] |
911 | | #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to workload with waypoint referenced by vip")] |
912 | | #[test_case(Waypoint::Workload(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, PROXY_PORT, Some(SERVER_POD_IP))); "to workload with app tunnel")] |
913 | | // to service traffic |
914 | | #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to service with waypoint referenced by pod")] |
915 | | #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, TARGET_PORT, None)); "to service with waypint referenced by vip")] |
916 | | #[test_case(Waypoint::Service(WAYPOINT_SVC_IP, APP_TUNNEL_PROXY), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, Some((WAYPOINT_POD_IP, PROXY_PORT, Some(SERVER_SVC_IP))); "to service with app tunnel")] |
917 | | // Override port via app_protocol |
918 | | // Error cases |
919 | | #[test_case(Waypoint::None, SERVER_POD_IP, CLIENT_POD_IP, TARGET_PORT, None; "to server ip mismatch" )] |
920 | | #[test_case(Waypoint::None, WAYPOINT_POD_IP, CLIENT_POD_IP, TARGET_PORT, None; "to waypoint without attachment" )] |
921 | | #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP, TARGET_PORT, None; "to workload via waypoint with wrong attachment")] |
922 | | #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP, TARGET_PORT, None; "to service via waypoint with wrong attachment")] |
923 | | #[tokio::test] |
924 | | async fn test_build_inbound_request( |
925 | | target_waypoint: Waypoint<'_>, |
926 | | connection_dst: &str, |
927 | | hbone_dst: &str, |
928 | | hbobe_dst_port: u16, |
929 | | want: Option<(&str, u16, Option<&str>)>, |
930 | | ) { |
931 | | let state = test_state(target_waypoint).expect("state setup"); |
932 | | let cfg = config::parse_config().unwrap(); |
933 | | let conn = Connection { |
934 | | src_identity: None, |
935 | | src: format!("{CLIENT_POD_IP}:1234").parse().unwrap(), |
936 | | dst_network: "".into(), |
937 | | dst: format!("{connection_dst}:15008").parse().unwrap(), |
938 | | }; |
939 | | let request_parts = MockParts { |
940 | | method: Method::CONNECT, |
941 | | uri: format!("{hbone_dst}:{hbobe_dst_port}").parse().unwrap(), |
942 | | headers: http::HeaderMap::new(), |
943 | | }; |
944 | | let cm = ConnectionManager::default(); |
945 | | let metrics = Arc::new(crate::proxy::Metrics::new(&mut Registry::default())); |
946 | | let sf = Arc::new(DefaultSocketFactory::default()); |
947 | | let wl = state |
948 | | .fetch_workload_by_address(&NetworkAddress { |
949 | | network: "".into(), |
950 | | address: conn.dst.ip(), |
951 | | }) |
952 | | .await |
953 | | .unwrap(); |
954 | | let local_workload = Arc::new(LocalWorkloadInformation::new( |
955 | | Arc::new(WorkloadInfo { |
956 | | name: wl.name.to_string(), |
957 | | namespace: wl.namespace.to_string(), |
958 | | service_account: wl.service_account.to_string(), |
959 | | }), |
960 | | state.clone(), |
961 | | new_secret_manager(Duration::from_secs(10)), |
962 | | )); |
963 | | let pi = Arc::new(ProxyInputs::new( |
964 | | Arc::new(cfg), |
965 | | cm, |
966 | | state.clone(), |
967 | | metrics.clone(), |
968 | | sf, |
969 | | None, |
970 | | local_workload, |
971 | | false, |
972 | | None, |
973 | | )); |
974 | | let inbound_request = Inbound::build_inbound_request(&pi, conn, &request_parts).await; |
975 | | match want { |
976 | | Some((ip, port, protocol_addr)) => { |
977 | | let ir = inbound_request.unwrap(); |
978 | | assert_eq!(ir.upstream_addr, SocketAddr::new(ip.parse().unwrap(), port)); |
979 | | match ir.tunnel_request { |
980 | | Some(addr) => assert_eq!( |
981 | | addr.tunnel_target, |
982 | | SocketAddr::new(protocol_addr.unwrap().parse().unwrap(), hbobe_dst_port) |
983 | | ), |
984 | | None => assert_eq!(protocol_addr, None), |
985 | | }; |
986 | | } |
987 | | None => { |
988 | | inbound_request.expect_err("could not build inbound request"); |
989 | | } |
990 | | } |
991 | | } |
992 | | |
993 | | // Creates a test state for the `DemandProxyState` with predefined services and workloads. |
994 | | // server_waypoint specifies the waypoint configuration for the server. |
995 | | fn test_state(server_waypoint: Waypoint) -> anyhow::Result<state::DemandProxyState> { |
996 | | let mut state = state::ProxyState::new(None); |
997 | | |
998 | | let services = vec![ |
999 | | ("waypoint", WAYPOINT_SVC_IP, Waypoint::None), |
1000 | | ("server", SERVER_SVC_IP, server_waypoint.clone()), |
1001 | | ] |
1002 | | .into_iter() |
1003 | | .map(|(name, vip, waypoint)| Service { |
1004 | | name: name.into(), |
1005 | | namespace: "default".into(), |
1006 | | hostname: strng::format!("{name}.default.svc.cluster.local"), |
1007 | | vips: vec![NetworkAddress { |
1008 | | address: vip.parse().unwrap(), |
1009 | | network: "".into(), |
1010 | | }], |
1011 | | ports: std::collections::HashMap::from([(80u16, 8080u16)]), |
1012 | | endpoints: EndpointSet::from_list([Endpoint { |
1013 | | workload_uid: strng::format!("cluster1//v1/Pod/default/{name}"), |
1014 | | port: std::collections::HashMap::new(), |
1015 | | status: HealthStatus::Healthy, |
1016 | | }]), |
1017 | | subject_alt_names: vec![strng::format!("{name}.default.svc.cluster.local")], |
1018 | | waypoint: waypoint.service_attached(), |
1019 | | load_balancer: None, |
1020 | | ip_families: None, |
1021 | | canonical: true, |
1022 | | }); |
1023 | | |
1024 | | let workloads = vec![ |
1025 | | ( |
1026 | | "waypoint", |
1027 | | WAYPOINT_POD_IP, |
1028 | | Waypoint::None, |
1029 | | server_waypoint.app_tunnel(), |
1030 | | ), |
1031 | | ("client", CLIENT_POD_IP, Waypoint::None, None), |
1032 | | ("server", SERVER_POD_IP, server_waypoint, None), |
1033 | | ] |
1034 | | .into_iter() |
1035 | | .map(|(name, ip, waypoint, app_tunnel)| Workload { |
1036 | | workload_ips: vec![ip.parse().unwrap()], |
1037 | | waypoint: waypoint.workload_attached(), |
1038 | | protocol: InboundProtocol::HBONE, |
1039 | | uid: strng::format!("cluster1//v1/Pod/default/{name}"), |
1040 | | name: strng::format!("workload-{name}"), |
1041 | | namespace: "default".into(), |
1042 | | service_account: strng::format!("service-account-{name}"), |
1043 | | application_tunnel: app_tunnel, |
1044 | | network_mode: NetworkMode::Standard, |
1045 | | ..test_helpers::test_default_workload() |
1046 | | }); |
1047 | | |
1048 | | for svc in services { |
1049 | | state.services.insert(svc); |
1050 | | } |
1051 | | for wl in workloads { |
1052 | | state.workloads.insert(Arc::new(wl)); |
1053 | | } |
1054 | | |
1055 | | let mut registry = Registry::default(); |
1056 | | let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry)); |
1057 | | Ok(DemandProxyState::new( |
1058 | | Arc::new(RwLock::new(state)), |
1059 | | None, |
1060 | | ResolverConfig::default(), |
1061 | | ResolverOpts::default(), |
1062 | | metrics, |
1063 | | )) |
1064 | | } |
1065 | | |
1066 | | // tells the test if we're using workload-attached or svc-attached waypoints |
1067 | | #[derive(Clone)] |
1068 | | enum Waypoint<'a> { |
1069 | | None, |
1070 | | Service(&'a str, Option<ApplicationTunnel>), |
1071 | | Workload(&'a str, Option<ApplicationTunnel>), |
1072 | | } |
1073 | | |
1074 | | impl Waypoint<'_> { |
1075 | | fn app_tunnel(&self) -> Option<ApplicationTunnel> { |
1076 | | match self.clone() { |
1077 | | Waypoint::Service(_, v) => v, |
1078 | | Waypoint::Workload(_, v) => v, |
1079 | | _ => None, |
1080 | | } |
1081 | | } |
1082 | | fn service_attached(&self) -> Option<GatewayAddress> { |
1083 | | let Waypoint::Service(s, _) = self else { |
1084 | | return None; |
1085 | | }; |
1086 | | Some(GatewayAddress { |
1087 | | destination: Destination::Address(NetworkAddress { |
1088 | | network: strng::EMPTY, |
1089 | | address: s.parse().expect("a valid waypoint IP"), |
1090 | | }), |
1091 | | hbone_mtls_port: 15008, |
1092 | | }) |
1093 | | } |
1094 | | |
1095 | | fn workload_attached(&self) -> Option<GatewayAddress> { |
1096 | | let Waypoint::Workload(w, _) = self else { |
1097 | | return None; |
1098 | | }; |
1099 | | Some(GatewayAddress { |
1100 | | destination: Destination::Address(NetworkAddress { |
1101 | | network: strng::EMPTY, |
1102 | | address: w.parse().expect("a valid waypoint IP"), |
1103 | | }), |
1104 | | hbone_mtls_port: 15008, |
1105 | | }) |
1106 | | } |
1107 | | } |
1108 | | |
1109 | | #[test] |
1110 | | fn test_build_response_baggage_feature_gate() { |
1111 | | use super::build_response; |
1112 | | use crate::proxy::BAGGAGE_HEADER; |
1113 | | use crate::test_helpers; |
1114 | | use http::StatusCode; |
1115 | | |
1116 | | // Create a test workload |
1117 | | let workload = test_helpers::test_default_workload(); |
1118 | | |
1119 | | // Test with baggage enabled |
1120 | | let mut config_enabled = test_helpers::test_config(); |
1121 | | config_enabled.enable_enhanced_baggage = true; |
1122 | | |
1123 | | let response_enabled = build_response( |
1124 | | StatusCode::OK, |
1125 | | Some(&workload), |
1126 | | config_enabled.enable_enhanced_baggage, |
1127 | | ); |
1128 | | assert!(response_enabled.headers().contains_key(BAGGAGE_HEADER)); |
1129 | | |
1130 | | let baggage_header = response_enabled.headers().get(BAGGAGE_HEADER).unwrap(); |
1131 | | let baggage_value = baggage_header.to_str().unwrap(); |
1132 | | // Check that baggage header contains cluster_id from the test workload |
1133 | | assert!(baggage_value.contains("k8s.cluster.name=Kubernetes")); |
1134 | | |
1135 | | // Test with baggage disabled |
1136 | | let mut config_disabled = test_helpers::test_config(); |
1137 | | config_disabled.enable_enhanced_baggage = false; |
1138 | | |
1139 | | let response_disabled = build_response( |
1140 | | StatusCode::OK, |
1141 | | Some(&workload), |
1142 | | config_disabled.enable_enhanced_baggage, |
1143 | | ); |
1144 | | assert!(!response_disabled.headers().contains_key(BAGGAGE_HEADER)); |
1145 | | |
1146 | | // Test with None workload (should not have baggage regardless of config) |
1147 | | let response_no_workload = |
1148 | | build_response(StatusCode::OK, None, config_enabled.enable_enhanced_baggage); |
1149 | | assert!(!response_no_workload.headers().contains_key(BAGGAGE_HEADER)); |
1150 | | } |
1151 | | |
1152 | | #[test] |
1153 | | fn test_incoming_baggage_parsing_feature_gate() { |
1154 | | use crate::baggage::{Baggage, parse_baggage_header}; |
1155 | | use crate::proxy::BAGGAGE_HEADER; |
1156 | | use crate::test_helpers; |
1157 | | use http::{HeaderMap, HeaderValue}; |
1158 | | |
1159 | | // Create mock baggage header |
1160 | | let mut headers = HeaderMap::new(); |
1161 | | headers.insert(BAGGAGE_HEADER, HeaderValue::from_str("k8s.cluster.name=test-cluster,k8s.namespace.name=test-ns,k8s.deployment.name=test-app").unwrap()); |
1162 | | |
1163 | | // Test with baggage enabled |
1164 | | let config_enabled = test_helpers::test_config(); |
1165 | | assert!(config_enabled.enable_enhanced_baggage); // Default should be true |
1166 | | |
1167 | | let baggage_enabled = if config_enabled.enable_enhanced_baggage { |
1168 | | parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() |
1169 | | } else { |
1170 | | Baggage::default() |
1171 | | }; |
1172 | | |
1173 | | assert_eq!(baggage_enabled.cluster_id, Some("test-cluster".into())); |
1174 | | assert_eq!(baggage_enabled.namespace, Some("test-ns".into())); |
1175 | | assert_eq!(baggage_enabled.workload_name, Some("test-app".into())); |
1176 | | |
1177 | | // Test with baggage disabled |
1178 | | let mut config_disabled = test_helpers::test_config(); |
1179 | | config_disabled.enable_enhanced_baggage = false; |
1180 | | |
1181 | | let baggage_disabled = if config_disabled.enable_enhanced_baggage { |
1182 | | parse_baggage_header(headers.get_all(BAGGAGE_HEADER)).unwrap_or_default() |
1183 | | } else { |
1184 | | Baggage::default() |
1185 | | }; |
1186 | | |
1187 | | assert_eq!(baggage_disabled.cluster_id, None); |
1188 | | assert_eq!(baggage_disabled.namespace, None); |
1189 | | assert_eq!(baggage_disabled.workload_name, None); |
1190 | | } |
1191 | | } |