Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/app.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::future::Future;
16
17
use crate::proxyfactory::ProxyFactory;
18
19
use crate::drain;
20
use anyhow::Context;
21
use prometheus_client::registry::Registry;
22
use std::net::SocketAddr;
23
use std::pin::Pin;
24
use std::sync::atomic::{AtomicUsize, Ordering};
25
use std::sync::{Arc, mpsc};
26
use std::thread;
27
use tokio::task::JoinSet;
28
use tracing::{Instrument, warn};
29
30
use crate::identity::SecretManager;
31
use crate::state::ProxyStateManager;
32
use crate::{admin, config, metrics, proxy, readiness, signal};
33
use crate::{dns, xds};
34
35
0
pub async fn build_with_cert(
36
0
    config: Arc<config::Config>,
37
0
    cert_manager: Arc<SecretManager>,
38
0
) -> anyhow::Result<Bound> {
39
    // Start the data plane worker pool.
40
0
    let data_plane_pool = new_data_plane_pool(config.num_worker_threads);
41
42
0
    let shutdown = signal::Shutdown::new();
43
    // Setup a drain channel. drain_tx is used to trigger a drain, which will complete
44
    // once all drain_rx handlers are dropped.
45
    // Any component which wants time to gracefully exit should take in a drain_rx clone,
46
    // await drain_rx.signaled(), then cleanup.
47
    // Note: there is still a hard timeout if the draining takes too long
48
0
    let (drain_tx, drain_rx) = drain::new();
49
50
    // Register readiness tasks.
51
0
    let ready = readiness::Ready::new();
52
0
    let state_mgr_task = ready.register_task("state manager");
53
0
    let proxy_task = if config.proxy {
54
0
        Some(ready.register_task("proxy"))
55
    } else {
56
0
        None
57
    };
58
0
    let dns_task = if config.dns_proxy {
59
0
        Some(ready.register_task("dns proxy"))
60
    } else {
61
0
        None
62
    };
63
64
    // Create and start the readiness server.
65
0
    let readiness_server = readiness::Server::new(config.clone(), drain_rx.clone(), ready.clone())
66
0
        .await
67
0
        .context("readiness server starts")?;
68
0
    let readiness_address = readiness_server.address();
69
    // Run the readiness server in the data plane worker pool.
70
0
    data_plane_pool.send(DataPlaneTask {
71
        block_shutdown: false,
72
0
        fut: Box::pin(async move {
73
0
            readiness_server.spawn();
74
0
            Ok(())
75
0
        }),
76
0
    })?;
77
78
    // Register metrics.
79
0
    let mut registry = Registry::default();
80
0
    register_process_metrics(&mut registry);
81
0
    let istio_registry = metrics::sub_registry(&mut registry);
82
0
    let _ = metrics::meta::Metrics::new(istio_registry);
83
0
    let xds_metrics = xds::Metrics::new(istio_registry);
84
0
    let proxy_metrics = Arc::new(proxy::Metrics::new(istio_registry));
85
0
    let dns_metrics = if config.dns_proxy {
86
0
        Some(dns::Metrics::new(istio_registry))
87
    } else {
88
0
        None
89
    };
90
91
0
    let (xds_tx, xds_rx) = tokio::sync::watch::channel(());
92
    // Create the manager that updates proxy state from XDS.
93
0
    let state_mgr = ProxyStateManager::new(
94
0
        config.clone(),
95
0
        xds_metrics,
96
0
        proxy_metrics.clone(),
97
0
        xds_tx,
98
0
        cert_manager.clone(),
99
0
    )
100
0
    .await?;
101
0
    let mut xds_rx_for_task = xds_rx.clone();
102
0
    tokio::spawn(async move {
103
0
        let _ = xds_rx_for_task.changed().await;
104
0
        std::mem::drop(state_mgr_task);
105
0
    });
106
0
    let state = state_mgr.state();
107
108
    // Run the XDS state manager in the current tokio worker pool.
109
0
    tokio::spawn(state_mgr.run());
110
111
    // Create and start the admin server.
112
0
    let mut admin_server = admin::Service::new(
113
0
        config.clone(),
114
0
        state.clone(),
115
0
        shutdown.trigger(),
116
0
        drain_rx.clone(),
117
0
        cert_manager.clone(),
118
0
    )
119
0
    .await
120
0
    .context("admin server starts")?;
121
0
    let admin_address = admin_server.address();
122
123
    // Optionally create the HBONE proxy.
124
0
    let mut proxy_addresses = None;
125
0
    let mut tcp_dns_proxy_address: Option<SocketAddr> = None;
126
0
    let mut udp_dns_proxy_address: Option<SocketAddr> = None;
127
128
0
    let proxy_gen = ProxyFactory::new(
129
0
        config.clone(),
130
0
        state.clone(),
131
0
        cert_manager.clone(),
132
0
        proxy_metrics,
133
0
        dns_metrics,
134
0
        drain_rx.clone(),
135
    )
136
0
    .map_err(|e| anyhow::anyhow!("failed to start proxy factory {:?}", e))?;
137
138
0
    if config.proxy_mode == config::ProxyMode::Shared {
139
0
        tracing::info!("shared proxy mode - in-pod mode enabled");
140
141
        // Create ztunnel inbound listener only if its specific identity and workload info are configured.
142
0
        if let Some(inbound) = proxy_gen.create_ztunnel_self_proxy_listener().await? {
143
            // Run the inbound listener in the data plane worker pool
144
0
            let mut xds_rx_for_inbound = xds_rx.clone();
145
0
            data_plane_pool.send(DataPlaneTask {
146
                block_shutdown: true,
147
0
                fut: Box::pin(async move {
148
0
                    tracing::info!("Starting ztunnel inbound listener task");
149
0
                    let _ = xds_rx_for_inbound.changed().await;
150
0
                    tokio::task::spawn(async move {
151
0
                        inbound.run().in_current_span().await;
152
0
                    })
153
0
                    .await?;
154
0
                    Ok(())
155
0
                }),
156
0
            })?;
157
0
        }
158
159
0
        let run_future = init_inpod_proxy_mgr(
160
0
            &mut registry,
161
0
            &mut admin_server,
162
0
            &config,
163
0
            proxy_gen,
164
0
            ready.clone(),
165
0
            drain_rx.clone(),
166
0
        )?;
167
168
0
        let mut xds_rx_for_proxy = xds_rx.clone();
169
0
        data_plane_pool.send(DataPlaneTask {
170
            block_shutdown: true,
171
0
            fut: Box::pin(async move {
172
0
                let _ = xds_rx_for_proxy.changed().await;
173
0
                run_future.in_current_span().await;
174
0
                Ok(())
175
0
            }),
176
0
        })?;
177
    } else {
178
0
        tracing::info!("proxy mode enabled");
179
0
        let wli = config
180
0
            .proxy_workload_information
181
0
            .clone()
182
0
            .expect("proxy_workload_information is required for dedicated mode");
183
0
        let proxies = proxy_gen.new_proxies_for_dedicated(wli).await?;
184
0
        match proxies.proxy {
185
0
            Some(proxy) => {
186
0
                proxy_addresses = Some(proxy.addresses());
187
188
                // Run the HBONE proxy in the data plane worker pool.
189
0
                let mut xds_rx_for_proxy = xds_rx.clone();
190
0
                data_plane_pool.send(DataPlaneTask {
191
                    block_shutdown: true,
192
0
                    fut: Box::pin(async move {
193
0
                        let _ = xds_rx_for_proxy.changed().await;
194
0
                        proxy.run().in_current_span().await;
195
0
                        Ok(())
196
0
                    }),
197
0
                })?;
198
199
0
                drop(proxy_task);
200
            }
201
            None => {
202
0
                tracing::info!("no proxy created");
203
            }
204
        }
205
206
0
        match proxies.dns_proxy {
207
0
            Some(dns_proxy) => {
208
                // Optional
209
0
                tcp_dns_proxy_address = Some(dns_proxy.tcp_address());
210
0
                udp_dns_proxy_address = Some(dns_proxy.udp_address());
211
212
                // Run the DNS proxy in the data plane worker pool.
213
0
                let mut xds_rx_for_dns_proxy = xds_rx.clone();
214
0
                data_plane_pool.send(DataPlaneTask {
215
                    block_shutdown: true,
216
0
                    fut: Box::pin(async move {
217
0
                        let _ = xds_rx_for_dns_proxy.changed().await;
218
0
                        dns_proxy.run().in_current_span().await;
219
0
                        Ok(())
220
0
                    }),
221
0
                })?;
222
223
0
                drop(dns_task);
224
            }
225
            None => {
226
0
                tracing::info!("no dns proxy created");
227
            }
228
        }
229
    }
230
231
    // Run the admin server in the current tokio worker pool.
232
0
    admin_server.spawn();
233
234
    // Create and start the metrics server.
235
0
    let metrics_server = metrics::Server::new(config.clone(), drain_rx.clone(), registry)
236
0
        .await
237
0
        .context("stats server starts")?;
238
0
    let metrics_address = metrics_server.address();
239
    // Run the metrics sever in the current tokio worker pool.
240
0
    metrics_server.spawn();
241
242
0
    Ok(Bound {
243
0
        drain_tx,
244
0
        shutdown,
245
0
        readiness_address,
246
0
        admin_address,
247
0
        metrics_address,
248
0
        proxy_addresses,
249
0
        tcp_dns_proxy_address,
250
0
        udp_dns_proxy_address,
251
0
    })
252
0
}
253
254
0
fn register_process_metrics(registry: &mut Registry) {
255
    #[cfg(unix)]
256
0
    registry.register_collector(Box::new(metrics::process::ProcessMetrics::new()));
257
0
}
258
259
struct DataPlaneTask {
260
    block_shutdown: bool,
261
    fut: Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>,
262
}
263
264
0
fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask> {
265
0
    let (tx, rx) = mpsc::channel();
266
267
0
    let span = tracing::span::Span::current();
268
0
    thread::spawn(move || {
269
0
        let _span = span.enter();
270
0
        let runtime = tokio::runtime::Builder::new_multi_thread()
271
0
            .worker_threads(num_worker_threads)
272
0
            .thread_name_fn(|| {
273
                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
274
0
                let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
275
                // Thread name can only be 16 chars so keep it short
276
0
                format!("ztunnel-{id}")
277
0
            })
278
0
            .enable_all()
279
0
            .build()
280
0
            .unwrap();
281
0
        runtime.block_on(
282
0
            async move {
283
0
                let mut join_set = JoinSet::new();
284
285
                // Spawn tasks as they're received, until all tasks are spawned.
286
0
                let task_iter: mpsc::Iter<DataPlaneTask> = rx.iter();
287
0
                for task in task_iter {
288
0
                    if task.block_shutdown {
289
0
                        // We'll block shutdown on this task.
290
0
                        join_set.spawn(task.fut);
291
0
                    } else {
292
0
                        // We won't block shutdown of this task. Just spawn and forget.
293
0
                        tokio::spawn(task.fut);
294
0
                    }
295
                }
296
297
0
                while let Some(join_result) = join_set.join_next().await {
298
0
                    match join_result {
299
0
                        Ok(result) => {
300
0
                            if let Err(e) = result {
301
0
                                warn!("data plane task failed: {e}");
302
0
                            }
303
                        }
304
0
                        Err(e) => warn!("failed joining data plane task: {e}"),
305
                    }
306
                }
307
0
            }
308
0
            .in_current_span(),
309
        );
310
0
    });
311
312
0
    tx
313
0
}
314
315
0
pub async fn build(config: Arc<config::Config>) -> anyhow::Result<Bound> {
316
0
    let cert_manager = if config.fake_ca {
317
0
        mock_secret_manager()
318
    } else {
319
0
        Arc::new(SecretManager::new(config.clone()).await?)
320
    };
321
0
    build_with_cert(config, cert_manager).await
322
0
}
323
324
#[cfg(feature = "testing")]
325
fn mock_secret_manager() -> Arc<SecretManager> {
326
    crate::identity::mock::new_secret_manager(std::time::Duration::from_secs(86400))
327
}
328
329
#[cfg(not(feature = "testing"))]
330
0
fn mock_secret_manager() -> Arc<SecretManager> {
331
0
    unimplemented!("fake_ca requires --features testing")
332
}
333
334
#[cfg(not(target_os = "linux"))]
335
fn init_inpod_proxy_mgr(
336
    _registry: &mut Registry,
337
    _admin_server: &mut crate::admin::Service,
338
    _config: &config::Config,
339
    _proxy_gen: ProxyFactory,
340
    _ready: readiness::Ready,
341
    _drain_rx: drain::DrainWatcher,
342
) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> {
343
    anyhow::bail!("in-pod mode is not supported on non-linux platforms")
344
}
345
346
#[cfg(target_os = "linux")]
347
0
fn init_inpod_proxy_mgr(
348
0
    registry: &mut Registry,
349
0
    admin_server: &mut crate::admin::Service,
350
0
    config: &config::Config,
351
0
    proxy_gen: ProxyFactory,
352
0
    ready: readiness::Ready,
353
0
    drain_rx: drain::DrainWatcher,
354
0
) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> {
355
0
    let metrics = Arc::new(crate::inpod::metrics::Metrics::new(
356
0
        registry.sub_registry_with_prefix("workload_manager"),
357
    ));
358
0
    let proxy_mgr = crate::inpod::init_and_new(metrics, admin_server, config, proxy_gen, ready)
359
0
        .map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?;
360
361
0
    Ok(Box::pin(async move {
362
0
        match proxy_mgr.run(drain_rx).await {
363
0
            Ok(()) => (),
364
0
            Err(e) => {
365
0
                tracing::error!("WorkloadProxyManager run error: {:?}", e);
366
0
                std::process::exit(1);
367
            }
368
        }
369
0
    }))
370
0
}
371
372
pub struct Bound {
373
    pub admin_address: SocketAddr,
374
    pub metrics_address: SocketAddr,
375
    pub readiness_address: SocketAddr,
376
377
    pub proxy_addresses: Option<proxy::Addresses>,
378
    pub tcp_dns_proxy_address: Option<SocketAddr>,
379
    pub udp_dns_proxy_address: Option<SocketAddr>,
380
381
    pub shutdown: signal::Shutdown,
382
    drain_tx: drain::DrainTrigger,
383
}
384
385
impl Bound {
386
0
    pub async fn wait_termination(self) -> anyhow::Result<()> {
387
        // Wait for a signal to shutdown from explicit admin shutdown or signal
388
0
        self.shutdown.wait().await;
389
390
        // Start a drain; this will attempt to end all connections
391
        // or itself be interrupted by a stronger TERM signal, whichever comes first.
392
0
        self.drain_tx
393
0
            .start_drain_and_wait(drain::DrainMode::Graceful)
394
0
            .await;
395
396
0
        Ok(())
397
0
    }
398
}