Coverage Report

Created: 2025-12-28 06:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/xds.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::HashMap;
16
use std::error::Error as StdErr;
17
use std::fmt;
18
use std::fmt::Formatter;
19
use std::str::FromStr;
20
use std::sync::{Arc, RwLock};
21
use tracing::Level;
22
23
use tokio::sync::mpsc;
24
#[cfg(any(test, feature = "testing"))]
25
use tracing::error;
26
use tracing::{debug, info, instrument, trace, warn};
27
28
pub use client::*;
29
pub use metrics::*;
30
pub use types::*;
31
use xds::istio::security::Authorization as XdsAuthorization;
32
use xds::istio::workload::Address as XdsAddress;
33
use xds::istio::workload::PortList;
34
use xds::istio::workload::Service as XdsService;
35
use xds::istio::workload::Workload as XdsWorkload;
36
use xds::istio::workload::address::Type as XdsType;
37
38
use crate::cert_fetcher::{CertFetcher, NoCertFetcher};
39
use crate::config::ConfigSource;
40
use crate::rbac::Authorization;
41
use crate::state::ProxyState;
42
use crate::state::service::{Endpoint, Service, ServiceStore};
43
use crate::state::workload::{NamespacedHostname, Workload, WorkloadStore};
44
use crate::strng::Strng;
45
use crate::{rbac, strng};
46
use crate::{tls, xds};
47
48
use self::service::discovery::v3::DeltaDiscoveryRequest;
49
50
mod client;
51
pub mod metrics;
52
mod types;
53
54
struct DisplayStatus<'a>(&'a tonic::Status);
55
56
impl fmt::Display for DisplayStatus<'_> {
57
0
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
58
0
        let s = &self.0;
59
0
        write!(f, "status: {:?}, message: {:?}", s.code(), s.message())?;
60
61
0
        if s.message().to_string().contains("authentication failure") {
62
0
            write!(
63
0
                f,
64
0
                " (hint: check the control plane logs for more information)"
65
0
            )?;
66
0
        }
67
0
        if !s.details().is_empty()
68
0
            && let Ok(st) = std::str::from_utf8(s.details())
69
        {
70
0
            write!(f, ", details: {st}")?;
71
0
        }
72
0
        if let Some(src) = s.source().and_then(|s| s.source()) {
73
0
            write!(f, ", source: {src}")?;
74
            // Error is not public to explicitly match on, so do a fuzzy match
75
0
            if format!("{src}").contains("Temporary failure in name resolution") {
76
0
                write!(f, " (hint: is the DNS server reachable?)")?;
77
0
            }
78
0
        }
79
0
        Ok(())
80
0
    }
81
}
82
83
#[derive(thiserror::Error, Debug)]
84
pub enum Error {
85
    #[error("gRPC error {}", DisplayStatus(.0))]
86
    GrpcStatus(#[from] tonic::Status),
87
    #[error("gRPC connection error connecting to {}: {}", .0, DisplayStatus(.1))]
88
    Connection(String, #[source] tonic::Status),
89
    /// Attempted to send on a MPSC channel which has been canceled
90
    #[error(transparent)]
91
    RequestFailure(#[from] Box<mpsc::error::SendError<DeltaDiscoveryRequest>>),
92
    #[error("failed to send on demand resource")]
93
    OnDemandSend(),
94
    #[error("TLS Error: {0}")]
95
    TLSError(#[from] tls::Error),
96
}
97
98
/// Updates the [ProxyState] from XDS.
99
/// All state updates code goes in ProxyStateUpdateMutator, that takes state as a parameter.
100
/// this guarantees that the state is always locked when it is updated.
101
#[derive(Clone)]
102
pub struct ProxyStateUpdateMutator {
103
    cert_fetcher: Arc<dyn CertFetcher>,
104
}
105
106
#[derive(Clone)]
107
pub struct ProxyStateUpdater {
108
    state: Arc<RwLock<ProxyState>>,
109
    updater: ProxyStateUpdateMutator,
110
}
111
112
impl ProxyStateUpdater {
113
    /// Creates a new updater for the given stores. Will prefetch certs when workloads are updated.
114
0
    pub fn new(state: Arc<RwLock<ProxyState>>, cert_fetcher: Arc<dyn CertFetcher>) -> Self {
115
0
        Self {
116
0
            state,
117
0
            updater: ProxyStateUpdateMutator { cert_fetcher },
118
0
        }
119
0
    }
120
    /// Creates a new updater that does not prefetch workload certs.
121
0
    pub fn new_no_fetch(state: Arc<RwLock<ProxyState>>) -> Self {
122
0
        Self {
123
0
            state,
124
0
            updater: ProxyStateUpdateMutator::new_no_fetch(),
125
0
        }
126
0
    }
127
}
128
129
impl ProxyStateUpdateMutator {
130
    /// Creates a new updater that does not prefetch workload certs.
131
0
    pub fn new_no_fetch() -> Self {
132
0
        ProxyStateUpdateMutator {
133
0
            cert_fetcher: Arc::new(NoCertFetcher()),
134
0
        }
135
0
    }
136
137
    #[instrument(
138
        level = Level::TRACE,
139
        name="insert_workload",
140
        skip_all,
141
        fields(uid=%w.uid),
142
    )]
143
    pub fn insert_workload(&self, state: &mut ProxyState, w: XdsWorkload) -> anyhow::Result<()> {
144
        debug!("handling insert");
145
146
        // Clone services, so we can pass full ownership of the rest of XdsWorkload to build our Workload
147
        // object, which doesn't include Services.
148
        // In theory, I think we could avoid this if Workload::try_from returning the services.
149
        // let services = w.services.clone();
150
        // Convert the workload.
151
        let (workload, services): (Workload, HashMap<String, PortList>) = w.try_into()?;
152
        let workload = Arc::new(workload);
153
154
        // First, remove the entry entirely to make sure things are cleaned up properly.
155
        self.remove_workload_for_insert(state, &workload.uid);
156
157
        // Prefetch the cert for the workload.
158
        self.cert_fetcher.prefetch_cert(&workload);
159
160
        // Lock and upstate the stores.
161
        state.workloads.insert(workload.clone());
162
        insert_service_endpoints(&workload, &services, &mut state.services)?;
163
164
        Ok(())
165
    }
166
167
0
    pub fn remove(&self, state: &mut ProxyState, xds_name: &Strng) {
168
0
        self.remove_internal(state, xds_name, false);
169
0
    }
170
171
0
    fn remove_workload_for_insert(&self, state: &mut ProxyState, xds_name: &Strng) {
172
0
        self.remove_internal(state, xds_name, true);
173
0
    }
174
175
    #[instrument(
176
        level = Level::TRACE,
177
        name="remove",
178
        skip_all,
179
        fields(name=%xds_name, for_workload_insert=%for_workload_insert),
180
    )]
181
    fn remove_internal(&self, state: &mut ProxyState, xds_name: &Strng, for_workload_insert: bool) {
182
        // remove workload by UID; if xds_name is a service then this will no-op
183
        if let Some(prev) = state.workloads.remove(&strng::new(xds_name)) {
184
            // Also remove service endpoints for the workload.
185
            state.services.remove_endpoint(&prev);
186
187
            // This is a real removal (not a removal before insertion), and nothing else references the cert
188
            // Clear it out
189
            if !for_workload_insert
190
                && state
191
                    .workloads
192
                    .was_last_identity_on_node(&prev.node, &prev.identity())
193
            {
194
                self.cert_fetcher.clear_cert(&prev.identity());
195
            }
196
            // We removed a workload, no reason to attempt to remove a service with the same name
197
            return;
198
        }
199
        if for_workload_insert {
200
            // This is a workload, don't attempt to remove as a service
201
            return;
202
        }
203
204
        let Ok(name) = NamespacedHostname::from_str(xds_name) else {
205
            // we don't have namespace/hostname xds primary key for service
206
            warn!(
207
                "tried to remove service but it did not have the expected namespace/hostname format"
208
            );
209
            return;
210
        };
211
212
        if name.hostname.contains('/') {
213
            // avoid trying to delete obvious workload UIDs as a service,
214
            // which can result in noisy logs when new workloads are added
215
            // (we remove then add workloads on initial update)
216
            //
217
            // we can make this assumption because namespaces and hostnames cannot have `/` in them
218
            trace!("not a service, not attempting to delete as such",);
219
            return;
220
        }
221
        if !state.services.remove(&name) {
222
            warn!("tried to remove service, but it was not found");
223
        }
224
    }
225
226
0
    pub fn insert_address(&self, state: &mut ProxyState, a: XdsAddress) -> anyhow::Result<()> {
227
0
        match a.r#type {
228
0
            Some(XdsType::Workload(w)) => self.insert_workload(state, w),
229
0
            Some(XdsType::Service(s)) => self.insert_service(state, s),
230
0
            _ => Err(anyhow::anyhow!("unknown address type")),
231
        }
232
0
    }
233
234
    #[instrument(
235
        level = Level::TRACE,
236
        name="insert_service",
237
        skip_all,
238
        fields(name=%service.name),
239
    )]
240
    pub fn insert_service(
241
        &self,
242
        state: &mut ProxyState,
243
        service: XdsService,
244
    ) -> anyhow::Result<()> {
245
        debug!("handling insert");
246
        let mut service = Service::try_from(&service)?;
247
248
        // If the service already exists, add existing endpoints into the new service.
249
        if let Some(prev) = state
250
            .services
251
            .get_by_namespaced_host(&service.namespaced_hostname())
252
        {
253
            for ep in prev.endpoints.iter() {
254
                if service.should_include_endpoint(ep.status) {
255
                    service
256
                        .endpoints
257
                        .insert(ep.workload_uid.clone(), ep.clone());
258
                }
259
            }
260
        }
261
262
        state.services.insert(service);
263
        Ok(())
264
    }
265
266
0
    pub fn insert_authorization(
267
0
        &self,
268
0
        state: &mut ProxyState,
269
0
        xds_name: Strng,
270
0
        r: XdsAuthorization,
271
0
    ) -> anyhow::Result<()> {
272
0
        info!("handling RBAC update {}", r.name);
273
274
0
        let rbac = rbac::Authorization::try_from(r)?;
275
0
        trace!(
276
0
            "insert policy {}, {}",
277
            xds_name,
278
0
            serde_json::to_string(&rbac)?
279
        );
280
0
        state.policies.insert(xds_name, rbac);
281
0
        Ok(())
282
0
    }
283
284
0
    pub fn remove_authorization(&self, state: &mut ProxyState, xds_name: Strng) {
285
0
        info!("handling RBAC delete {}", xds_name);
286
0
        state.policies.remove(xds_name);
287
0
    }
288
}
289
290
impl Handler<XdsWorkload> for ProxyStateUpdater {
291
0
    fn handle(
292
0
        &self,
293
0
        updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsWorkload>>>,
294
0
    ) -> Result<(), Vec<RejectedConfig>> {
295
        // use deepsize::DeepSizeOf;
296
0
        let mut state = self.state.write().unwrap();
297
0
        let handle = |res: XdsUpdate<XdsWorkload>| {
298
0
            match res {
299
0
                XdsUpdate::Update(w) => self.updater.insert_workload(&mut state, w.resource)?,
300
0
                XdsUpdate::Remove(name) => {
301
0
                    debug!("handling delete {}", name);
302
0
                    self.updater.remove(&mut state, &strng::new(name))
303
                }
304
            }
305
0
            Ok(())
306
0
        };
307
0
        handle_single_resource(updates, handle)
308
0
    }
309
}
310
311
impl Handler<XdsAddress> for ProxyStateUpdater {
312
0
    fn handle(
313
0
        &self,
314
0
        updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsAddress>>>,
315
0
    ) -> Result<(), Vec<RejectedConfig>> {
316
0
        let mut state = self.state.write().unwrap();
317
0
        let handle = |res: XdsUpdate<XdsAddress>| {
318
0
            match res {
319
0
                XdsUpdate::Update(w) => self.updater.insert_address(&mut state, w.resource)?,
320
0
                XdsUpdate::Remove(name) => {
321
0
                    debug!("handling delete {}", name);
322
0
                    self.updater.remove(&mut state, &strng::new(name))
323
                }
324
            }
325
0
            Ok(())
326
0
        };
327
0
        handle_single_resource(updates, handle)
328
0
    }
329
}
330
331
0
fn insert_service_endpoints(
332
0
    workload: &Workload,
333
0
    services: &HashMap<String, PortList>,
334
0
    services_state: &mut ServiceStore,
335
0
) -> anyhow::Result<()> {
336
0
    for (namespaced_host, ports) in services {
337
        // Parse the namespaced hostname for the service.
338
0
        let namespaced_host = NamespacedHostname::from_str(namespaced_host)?;
339
0
        services_state.insert_endpoint(
340
0
            namespaced_host,
341
0
            Endpoint {
342
0
                workload_uid: workload.uid.clone(),
343
0
                port: ports.into(),
344
0
                status: workload.status,
345
0
            },
346
        )
347
    }
348
0
    Ok(())
349
0
}
350
351
impl Handler<XdsAuthorization> for ProxyStateUpdater {
352
0
    fn no_on_demand(&self) -> bool {
353
0
        true
354
0
    }
355
356
0
    fn handle(
357
0
        &self,
358
0
        updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsAuthorization>>>,
359
0
    ) -> Result<(), Vec<RejectedConfig>> {
360
0
        let mut state = self.state.write().unwrap();
361
0
        let handle = |res: XdsUpdate<XdsAuthorization>| {
362
0
            match res {
363
0
                XdsUpdate::Update(w) => self
364
0
                    .updater
365
0
                    .insert_authorization(&mut state, w.name, w.resource)?,
366
0
                XdsUpdate::Remove(name) => self.updater.remove_authorization(&mut state, name),
367
            }
368
0
            Ok(())
369
0
        };
370
0
        let mut len_updates = 0;
371
0
        let updates = updates.inspect(|_| len_updates += 1);
372
0
        match handle_single_resource(updates, handle) {
373
            Ok(()) => {
374
0
                state.policies.send();
375
0
                Ok(())
376
            }
377
0
            Err(e) => {
378
0
                if e.len() < len_updates {
379
0
                    // not all config was rejected, we have _some_ valide update
380
0
                    state.policies.send();
381
0
                }
382
0
                Err(e)
383
            }
384
        }
385
0
    }
386
}
387
388
/// LocalClient serves as a local file reader alternative for XDS. This is intended for testing.
389
pub struct LocalClient {
390
    pub cfg: ConfigSource,
391
    pub state: Arc<RwLock<ProxyState>>,
392
    pub cert_fetcher: Arc<dyn CertFetcher>,
393
    pub local_node: Option<Strng>,
394
}
395
396
#[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
397
#[serde(rename_all = "camelCase", deny_unknown_fields)]
398
pub struct LocalWorkload {
399
    #[serde(flatten)]
400
    pub workload: Workload,
401
    pub services: HashMap<String, HashMap<u16, u16>>,
402
}
403
404
#[derive(Default, Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
405
#[serde(rename_all = "camelCase", deny_unknown_fields)]
406
pub struct LocalConfig {
407
    #[serde(default)]
408
    pub workloads: Vec<LocalWorkload>,
409
    #[serde(default)]
410
    pub policies: Vec<Authorization>,
411
    #[serde(default)]
412
    pub services: Vec<Service>,
413
}
414
415
impl LocalClient {
416
    #[instrument(skip_all, name = "local_client")]
417
    pub async fn run(self) -> Result<(), anyhow::Error> {
418
        // Load initial state
419
        match &self.cfg {
420
            #[cfg(any(test, feature = "testing"))]
421
            ConfigSource::Dynamic(rx) => {
422
                let mut rx = rx.lock().await;
423
                let r = rx
424
                    .recv()
425
                    .await
426
                    .ok_or(anyhow::anyhow!("did not get initial config"))?;
427
                self.load_config(r)?;
428
                rx.ack().await?;
429
            }
430
            f => {
431
                let r: LocalConfig = serde_yaml::from_str(&f.read_to_string().await?)?;
432
                self.load_config(r)?;
433
            }
434
        };
435
        #[cfg(any(test, feature = "testing"))]
436
        if let ConfigSource::Dynamic(ref rx) = self.cfg {
437
            let rx = rx.clone();
438
            tokio::spawn(async move {
439
                // Mutex is just for borrow checker; we know we are the only user and can hold the lock forever.
440
                let mut rx = rx.lock().await;
441
                while let Some(req) = rx.recv().await {
442
                    if let Err(e) = self.load_config(req) {
443
                        error!("failed to load dynamic config update: {e:?}");
444
                    }
445
                    if let Err(e) = rx.ack().await {
446
                        error!("failed to ack: {}", e);
447
                    }
448
                }
449
            });
450
        };
451
        Ok(())
452
    }
453
454
0
    fn load_config(&self, r: LocalConfig) -> anyhow::Result<()> {
455
0
        debug!(
456
0
            "load local config: {}",
457
0
            serde_yaml::to_string(&r).unwrap_or_default()
458
        );
459
0
        let mut state = self.state.write().unwrap();
460
        // Clear the state
461
0
        state.workloads = WorkloadStore::new(self.local_node.clone());
462
0
        state.services = Default::default();
463
        // Policies have some channels, so we don't want to reset it entirely
464
0
        state.policies.clear_all_policies();
465
0
        let num_workloads = r.workloads.len();
466
0
        let num_policies = r.policies.len();
467
0
        for wl in r.workloads {
468
0
            trace!("inserting local workload {}", &wl.workload.uid);
469
0
            self.cert_fetcher.prefetch_cert(&wl.workload);
470
0
            let w = Arc::new(wl.workload);
471
0
            state.workloads.insert(w.clone());
472
473
0
            let services: HashMap<String, PortList> = wl
474
0
                .services
475
0
                .into_iter()
476
0
                .map(|(k, v)| (k, PortList::from(v)))
477
0
                .collect();
478
479
0
            insert_service_endpoints(&w, &services, &mut state.services)?;
480
        }
481
0
        for rbac in r.policies {
482
0
            let xds_name = rbac.to_key();
483
0
            state.policies.insert(xds_name, rbac);
484
0
        }
485
0
        for svc in r.services {
486
0
            state.services.insert(svc);
487
0
        }
488
0
        info!(%num_workloads, %num_policies, "local config initialized");
489
0
        Ok(())
490
0
    }
491
}