Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::future::Future; |
16 | | |
17 | | use crate::proxyfactory::ProxyFactory; |
18 | | |
19 | | use crate::drain; |
20 | | use anyhow::Context; |
21 | | use prometheus_client::registry::Registry; |
22 | | use std::net::SocketAddr; |
23 | | use std::pin::Pin; |
24 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
25 | | use std::sync::{Arc, mpsc}; |
26 | | use std::thread; |
27 | | use tokio::task::JoinSet; |
28 | | use tracing::{Instrument, warn}; |
29 | | |
30 | | use crate::identity::SecretManager; |
31 | | use crate::state::ProxyStateManager; |
32 | | use crate::{admin, config, metrics, proxy, readiness, signal}; |
33 | | use crate::{dns, xds}; |
34 | | |
35 | 0 | pub async fn build_with_cert( |
36 | 0 | config: Arc<config::Config>, |
37 | 0 | cert_manager: Arc<SecretManager>, |
38 | 0 | ) -> anyhow::Result<Bound> { |
39 | | // Start the data plane worker pool. |
40 | 0 | let data_plane_pool = new_data_plane_pool(config.num_worker_threads); |
41 | | |
42 | 0 | let shutdown = signal::Shutdown::new(); |
43 | | // Setup a drain channel. drain_tx is used to trigger a drain, which will complete |
44 | | // once all drain_rx handlers are dropped. |
45 | | // Any component which wants time to gracefully exit should take in a drain_rx clone, |
46 | | // await drain_rx.signaled(), then cleanup. |
47 | | // Note: there is still a hard timeout if the draining takes too long |
48 | 0 | let (drain_tx, drain_rx) = drain::new(); |
49 | | |
50 | | // Register readiness tasks. |
51 | 0 | let ready = readiness::Ready::new(); |
52 | 0 | let state_mgr_task = ready.register_task("state manager"); |
53 | 0 | let proxy_task = if config.proxy { |
54 | 0 | Some(ready.register_task("proxy")) |
55 | | } else { |
56 | 0 | None |
57 | | }; |
58 | 0 | let dns_task = if config.dns_proxy { |
59 | 0 | Some(ready.register_task("dns proxy")) |
60 | | } else { |
61 | 0 | None |
62 | | }; |
63 | | |
64 | | // Create and start the readiness server. |
65 | 0 | let readiness_server = readiness::Server::new(config.clone(), drain_rx.clone(), ready.clone()) |
66 | 0 | .await |
67 | 0 | .context("readiness server starts")?; |
68 | 0 | let readiness_address = readiness_server.address(); |
69 | | // Run the readiness server in the data plane worker pool. |
70 | 0 | data_plane_pool.send(DataPlaneTask { |
71 | | block_shutdown: false, |
72 | 0 | fut: Box::pin(async move { |
73 | 0 | readiness_server.spawn(); |
74 | 0 | Ok(()) |
75 | 0 | }), |
76 | 0 | })?; |
77 | | |
78 | | // Register metrics. |
79 | 0 | let mut registry = Registry::default(); |
80 | 0 | register_process_metrics(&mut registry); |
81 | 0 | let istio_registry = metrics::sub_registry(&mut registry); |
82 | 0 | let _ = metrics::meta::Metrics::new(istio_registry); |
83 | 0 | let xds_metrics = xds::Metrics::new(istio_registry); |
84 | 0 | let proxy_metrics = Arc::new(proxy::Metrics::new(istio_registry)); |
85 | 0 | let dns_metrics = if config.dns_proxy { |
86 | 0 | Some(dns::Metrics::new(istio_registry)) |
87 | | } else { |
88 | 0 | None |
89 | | }; |
90 | | |
91 | 0 | let (xds_tx, xds_rx) = tokio::sync::watch::channel(()); |
92 | | // Create the manager that updates proxy state from XDS. |
93 | 0 | let state_mgr = ProxyStateManager::new( |
94 | 0 | config.clone(), |
95 | 0 | xds_metrics, |
96 | 0 | proxy_metrics.clone(), |
97 | 0 | xds_tx, |
98 | 0 | cert_manager.clone(), |
99 | 0 | ) |
100 | 0 | .await?; |
101 | 0 | let mut xds_rx_for_task = xds_rx.clone(); |
102 | 0 | tokio::spawn(async move { |
103 | 0 | let _ = xds_rx_for_task.changed().await; |
104 | 0 | std::mem::drop(state_mgr_task); |
105 | 0 | }); |
106 | 0 | let state = state_mgr.state(); |
107 | | |
108 | | // Run the XDS state manager in the current tokio worker pool. |
109 | 0 | tokio::spawn(state_mgr.run()); |
110 | | |
111 | | // Create and start the admin server. |
112 | 0 | let mut admin_server = admin::Service::new( |
113 | 0 | config.clone(), |
114 | 0 | state.clone(), |
115 | 0 | shutdown.trigger(), |
116 | 0 | drain_rx.clone(), |
117 | 0 | cert_manager.clone(), |
118 | 0 | ) |
119 | 0 | .await |
120 | 0 | .context("admin server starts")?; |
121 | 0 | let admin_address = admin_server.address(); |
122 | | |
123 | | // Optionally create the HBONE proxy. |
124 | 0 | let mut proxy_addresses = None; |
125 | 0 | let mut tcp_dns_proxy_address: Option<SocketAddr> = None; |
126 | 0 | let mut udp_dns_proxy_address: Option<SocketAddr> = None; |
127 | | |
128 | 0 | let proxy_gen = ProxyFactory::new( |
129 | 0 | config.clone(), |
130 | 0 | state.clone(), |
131 | 0 | cert_manager.clone(), |
132 | 0 | proxy_metrics, |
133 | 0 | dns_metrics, |
134 | 0 | drain_rx.clone(), |
135 | | ) |
136 | 0 | .map_err(|e| anyhow::anyhow!("failed to start proxy factory {:?}", e))?; |
137 | | |
138 | 0 | if config.proxy_mode == config::ProxyMode::Shared { |
139 | 0 | tracing::info!("shared proxy mode - in-pod mode enabled"); |
140 | | |
141 | | // Create ztunnel inbound listener only if its specific identity and workload info are configured. |
142 | 0 | if let Some(inbound) = proxy_gen.create_ztunnel_self_proxy_listener().await? { |
143 | | // Run the inbound listener in the data plane worker pool |
144 | 0 | let mut xds_rx_for_inbound = xds_rx.clone(); |
145 | 0 | data_plane_pool.send(DataPlaneTask { |
146 | | block_shutdown: true, |
147 | 0 | fut: Box::pin(async move { |
148 | 0 | tracing::info!("Starting ztunnel inbound listener task"); |
149 | 0 | let _ = xds_rx_for_inbound.changed().await; |
150 | 0 | tokio::task::spawn(async move { |
151 | 0 | inbound.run().in_current_span().await; |
152 | 0 | }) |
153 | 0 | .await?; |
154 | 0 | Ok(()) |
155 | 0 | }), |
156 | 0 | })?; |
157 | 0 | } |
158 | | |
159 | 0 | let run_future = init_inpod_proxy_mgr( |
160 | 0 | &mut registry, |
161 | 0 | &mut admin_server, |
162 | 0 | &config, |
163 | 0 | proxy_gen, |
164 | 0 | ready.clone(), |
165 | 0 | drain_rx.clone(), |
166 | 0 | )?; |
167 | | |
168 | 0 | let mut xds_rx_for_proxy = xds_rx.clone(); |
169 | 0 | data_plane_pool.send(DataPlaneTask { |
170 | | block_shutdown: true, |
171 | 0 | fut: Box::pin(async move { |
172 | 0 | let _ = xds_rx_for_proxy.changed().await; |
173 | 0 | run_future.in_current_span().await; |
174 | 0 | Ok(()) |
175 | 0 | }), |
176 | 0 | })?; |
177 | | } else { |
178 | 0 | tracing::info!("proxy mode enabled"); |
179 | 0 | let wli = config |
180 | 0 | .proxy_workload_information |
181 | 0 | .clone() |
182 | 0 | .expect("proxy_workload_information is required for dedicated mode"); |
183 | 0 | let proxies = proxy_gen.new_proxies_for_dedicated(wli).await?; |
184 | 0 | match proxies.proxy { |
185 | 0 | Some(proxy) => { |
186 | 0 | proxy_addresses = Some(proxy.addresses()); |
187 | | |
188 | | // Run the HBONE proxy in the data plane worker pool. |
189 | 0 | let mut xds_rx_for_proxy = xds_rx.clone(); |
190 | 0 | data_plane_pool.send(DataPlaneTask { |
191 | | block_shutdown: true, |
192 | 0 | fut: Box::pin(async move { |
193 | 0 | let _ = xds_rx_for_proxy.changed().await; |
194 | 0 | proxy.run().in_current_span().await; |
195 | 0 | Ok(()) |
196 | 0 | }), |
197 | 0 | })?; |
198 | | |
199 | 0 | drop(proxy_task); |
200 | | } |
201 | | None => { |
202 | 0 | tracing::info!("no proxy created"); |
203 | | } |
204 | | } |
205 | | |
206 | 0 | match proxies.dns_proxy { |
207 | 0 | Some(dns_proxy) => { |
208 | | // Optional |
209 | 0 | tcp_dns_proxy_address = Some(dns_proxy.tcp_address()); |
210 | 0 | udp_dns_proxy_address = Some(dns_proxy.udp_address()); |
211 | | |
212 | | // Run the DNS proxy in the data plane worker pool. |
213 | 0 | let mut xds_rx_for_dns_proxy = xds_rx.clone(); |
214 | 0 | data_plane_pool.send(DataPlaneTask { |
215 | | block_shutdown: true, |
216 | 0 | fut: Box::pin(async move { |
217 | 0 | let _ = xds_rx_for_dns_proxy.changed().await; |
218 | 0 | dns_proxy.run().in_current_span().await; |
219 | 0 | Ok(()) |
220 | 0 | }), |
221 | 0 | })?; |
222 | | |
223 | 0 | drop(dns_task); |
224 | | } |
225 | | None => { |
226 | 0 | tracing::info!("no dns proxy created"); |
227 | | } |
228 | | } |
229 | | } |
230 | | |
231 | | // Run the admin server in the current tokio worker pool. |
232 | 0 | admin_server.spawn(); |
233 | | |
234 | | // Create and start the metrics server. |
235 | 0 | let metrics_server = metrics::Server::new(config.clone(), drain_rx.clone(), registry) |
236 | 0 | .await |
237 | 0 | .context("stats server starts")?; |
238 | 0 | let metrics_address = metrics_server.address(); |
239 | | // Run the metrics sever in the current tokio worker pool. |
240 | 0 | metrics_server.spawn(); |
241 | | |
242 | 0 | Ok(Bound { |
243 | 0 | drain_tx, |
244 | 0 | shutdown, |
245 | 0 | readiness_address, |
246 | 0 | admin_address, |
247 | 0 | metrics_address, |
248 | 0 | proxy_addresses, |
249 | 0 | tcp_dns_proxy_address, |
250 | 0 | udp_dns_proxy_address, |
251 | 0 | }) |
252 | 0 | } |
253 | | |
254 | 0 | fn register_process_metrics(registry: &mut Registry) { |
255 | | #[cfg(unix)] |
256 | 0 | registry.register_collector(Box::new(metrics::process::ProcessMetrics::new())); |
257 | 0 | } |
258 | | |
259 | | struct DataPlaneTask { |
260 | | block_shutdown: bool, |
261 | | fut: Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>, |
262 | | } |
263 | | |
264 | 0 | fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask> { |
265 | 0 | let (tx, rx) = mpsc::channel(); |
266 | | |
267 | 0 | let span = tracing::span::Span::current(); |
268 | 0 | thread::spawn(move || { |
269 | 0 | let _span = span.enter(); |
270 | 0 | let runtime = tokio::runtime::Builder::new_multi_thread() |
271 | 0 | .worker_threads(num_worker_threads) |
272 | 0 | .thread_name_fn(|| { |
273 | | static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); |
274 | 0 | let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); |
275 | | // Thread name can only be 16 chars so keep it short |
276 | 0 | format!("ztunnel-{id}") |
277 | 0 | }) |
278 | 0 | .enable_all() |
279 | 0 | .build() |
280 | 0 | .unwrap(); |
281 | 0 | runtime.block_on( |
282 | 0 | async move { |
283 | 0 | let mut join_set = JoinSet::new(); |
284 | | |
285 | | // Spawn tasks as they're received, until all tasks are spawned. |
286 | 0 | let task_iter: mpsc::Iter<DataPlaneTask> = rx.iter(); |
287 | 0 | for task in task_iter { |
288 | 0 | if task.block_shutdown { |
289 | 0 | // We'll block shutdown on this task. |
290 | 0 | join_set.spawn(task.fut); |
291 | 0 | } else { |
292 | 0 | // We won't block shutdown of this task. Just spawn and forget. |
293 | 0 | tokio::spawn(task.fut); |
294 | 0 | } |
295 | | } |
296 | | |
297 | 0 | while let Some(join_result) = join_set.join_next().await { |
298 | 0 | match join_result { |
299 | 0 | Ok(result) => { |
300 | 0 | if let Err(e) = result { |
301 | 0 | warn!("data plane task failed: {e}"); |
302 | 0 | } |
303 | | } |
304 | 0 | Err(e) => warn!("failed joining data plane task: {e}"), |
305 | | } |
306 | | } |
307 | 0 | } |
308 | 0 | .in_current_span(), |
309 | | ); |
310 | 0 | }); |
311 | | |
312 | 0 | tx |
313 | 0 | } |
314 | | |
315 | 0 | pub async fn build(config: Arc<config::Config>) -> anyhow::Result<Bound> { |
316 | 0 | let cert_manager = if config.fake_ca { |
317 | 0 | mock_secret_manager() |
318 | | } else { |
319 | 0 | Arc::new(SecretManager::new(config.clone()).await?) |
320 | | }; |
321 | 0 | build_with_cert(config, cert_manager).await |
322 | 0 | } |
323 | | |
324 | | #[cfg(feature = "testing")] |
325 | | fn mock_secret_manager() -> Arc<SecretManager> { |
326 | | crate::identity::mock::new_secret_manager(std::time::Duration::from_secs(86400)) |
327 | | } |
328 | | |
329 | | #[cfg(not(feature = "testing"))] |
330 | 0 | fn mock_secret_manager() -> Arc<SecretManager> { |
331 | 0 | unimplemented!("fake_ca requires --features testing") |
332 | | } |
333 | | |
334 | | #[cfg(not(target_os = "linux"))] |
335 | | fn init_inpod_proxy_mgr( |
336 | | _registry: &mut Registry, |
337 | | _admin_server: &mut crate::admin::Service, |
338 | | _config: &config::Config, |
339 | | _proxy_gen: ProxyFactory, |
340 | | _ready: readiness::Ready, |
341 | | _drain_rx: drain::DrainWatcher, |
342 | | ) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> { |
343 | | anyhow::bail!("in-pod mode is not supported on non-linux platforms") |
344 | | } |
345 | | |
346 | | #[cfg(target_os = "linux")] |
347 | 0 | fn init_inpod_proxy_mgr( |
348 | 0 | registry: &mut Registry, |
349 | 0 | admin_server: &mut crate::admin::Service, |
350 | 0 | config: &config::Config, |
351 | 0 | proxy_gen: ProxyFactory, |
352 | 0 | ready: readiness::Ready, |
353 | 0 | drain_rx: drain::DrainWatcher, |
354 | 0 | ) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> { |
355 | 0 | let metrics = Arc::new(crate::inpod::metrics::Metrics::new( |
356 | 0 | registry.sub_registry_with_prefix("workload_manager"), |
357 | | )); |
358 | 0 | let proxy_mgr = crate::inpod::init_and_new(metrics, admin_server, config, proxy_gen, ready) |
359 | 0 | .map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?; |
360 | | |
361 | 0 | Ok(Box::pin(async move { |
362 | 0 | match proxy_mgr.run(drain_rx).await { |
363 | 0 | Ok(()) => (), |
364 | 0 | Err(e) => { |
365 | 0 | tracing::error!("WorkloadProxyManager run error: {:?}", e); |
366 | 0 | std::process::exit(1); |
367 | | } |
368 | | } |
369 | 0 | })) |
370 | 0 | } |
371 | | |
372 | | pub struct Bound { |
373 | | pub admin_address: SocketAddr, |
374 | | pub metrics_address: SocketAddr, |
375 | | pub readiness_address: SocketAddr, |
376 | | |
377 | | pub proxy_addresses: Option<proxy::Addresses>, |
378 | | pub tcp_dns_proxy_address: Option<SocketAddr>, |
379 | | pub udp_dns_proxy_address: Option<SocketAddr>, |
380 | | |
381 | | pub shutdown: signal::Shutdown, |
382 | | drain_tx: drain::DrainTrigger, |
383 | | } |
384 | | |
385 | | impl Bound { |
386 | 0 | pub async fn wait_termination(self) -> anyhow::Result<()> { |
387 | | // Wait for a signal to shutdown from explicit admin shutdown or signal |
388 | 0 | self.shutdown.wait().await; |
389 | | |
390 | | // Start a drain; this will attempt to end all connections |
391 | | // or itself be interrupted by a stronger TERM signal, whichever comes first. |
392 | 0 | self.drain_tx |
393 | 0 | .start_drain_and_wait(drain::DrainMode::Graceful) |
394 | 0 | .await; |
395 | | |
396 | 0 | Ok(()) |
397 | 0 | } |
398 | | } |