Coverage Report

Created: 2026-04-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/tls/control.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use crate::config::RootCert;
16
use crate::identity::AuthSource;
17
use crate::tls::lib::provider;
18
use crate::tls::{ControlPlaneClientCertProvider, Error, WorkloadCertificate};
19
use hyper::Uri;
20
use hyper::body::Incoming;
21
use hyper_rustls::HttpsConnector;
22
use hyper_util::client::legacy::connect::HttpConnector;
23
use itertools::Itertools;
24
use notify::{Config, RecommendedWatcher};
25
use notify_debouncer_full::{DebounceEventResult, Debouncer, FileIdMap, new_debouncer_opt};
26
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
27
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
28
use rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme};
29
use std::future::Future;
30
use std::io::Cursor;
31
use std::path::Path;
32
use std::pin::Pin;
33
use std::sync::atomic::{AtomicBool, Ordering};
34
use std::sync::{Arc, RwLock};
35
use std::task::{Context, Poll};
36
use std::time::Duration;
37
use tonic::body::Body;
38
use tracing::debug;
39
40
0
async fn root_to_store(root_cert: &RootCert) -> Result<rustls::RootCertStore, Error> {
41
0
    let mut roots = rustls::RootCertStore::empty();
42
0
    match root_cert {
43
0
        RootCert::File(f) => {
44
0
            let certfile = tokio::fs::read(f)
45
0
                .await
46
0
                .map_err(|e| Error::InvalidRootCert(e.to_string()))?;
47
0
            let mut reader = std::io::BufReader::new(Cursor::new(certfile));
48
0
            let certs = rustls_pemfile::certs(&mut reader)
49
0
                .collect::<Result<Vec<_>, _>>()
50
0
                .map_err(|e| Error::InvalidRootCert(e.to_string()))?;
51
0
            roots.add_parsable_certificates(certs);
52
        }
53
0
        RootCert::Static(b) => {
54
0
            let mut reader = std::io::BufReader::new(Cursor::new(b));
55
0
            let certs = rustls_pemfile::certs(&mut reader)
56
0
                .collect::<Result<Vec<_>, _>>()
57
0
                .map_err(|e| Error::InvalidRootCert(e.to_string()))?;
58
0
            roots.add_parsable_certificates(certs);
59
        }
60
        RootCert::Default => {
61
0
            let certs = {
62
0
                let rustls_native_certs::CertificateResult { certs, errors, .. } =
63
0
                    rustls_native_certs::load_native_certs();
64
0
                if !errors.is_empty() {
65
                    return Err(Error::InvalidRootCert(
66
0
                        errors.into_iter().map(|e| e.to_string()).join(","),
67
                    ));
68
0
                }
69
0
                certs
70
            };
71
0
            roots.add_parsable_certificates(certs);
72
        }
73
    };
74
0
    Ok(roots)
75
0
}
76
77
#[derive(Debug)]
78
pub enum ControlPlaneAuthentication {
79
    RootCert(RootCert),
80
    ClientBundle(WorkloadCertificate),
81
}
82
83
#[async_trait::async_trait]
84
impl ControlPlaneClientCertProvider for ControlPlaneAuthentication {
85
0
    async fn fetch_cert(&self, alt_hostname: Option<String>) -> Result<ClientConfig, Error> {
86
        match self {
87
            ControlPlaneAuthentication::RootCert(root_cert) => {
88
                control_plane_client_config(root_cert, alt_hostname).await
89
            }
90
            ControlPlaneAuthentication::ClientBundle(_bundle) => {
91
                // TODO: implement this. Its is not currently used so no need.
92
                unimplemented!();
93
            }
94
        }
95
0
    }
96
}
97
98
#[derive(Debug)]
99
struct AltHostnameVerifier {
100
    roots: Arc<rustls::RootCertStore>,
101
    alt_server_name: ServerName<'static>,
102
}
103
104
// A custom verifier that allows alternative server names to be accepted.
105
// Build our own verifier, inspired by https://github.com/rustls/rustls/blob/ccb79947a4811412ee7dcddcd0f51ea56bccf101/rustls/src/webpki/server_verifier.rs#L239.
106
impl ServerCertVerifier for AltHostnameVerifier {
107
    /// Will verify the certificate is valid in the following ways:
108
    /// - Signed by a  trusted `RootCertStore` CA
109
    /// - Not Expired
110
0
    fn verify_server_cert(
111
0
        &self,
112
0
        end_entity: &CertificateDer<'_>,
113
0
        intermediates: &[CertificateDer<'_>],
114
0
        sn: &ServerName,
115
0
        ocsp_response: &[u8],
116
0
        now: UnixTime,
117
0
    ) -> Result<ServerCertVerified, rustls::Error> {
118
0
        let cert = rustls::server::ParsedCertificate::try_from(end_entity)?;
119
120
0
        let algs = provider().signature_verification_algorithms;
121
0
        rustls::client::verify_server_cert_signed_by_trust_anchor(
122
0
            &cert,
123
0
            &self.roots,
124
0
            intermediates,
125
0
            now,
126
0
            algs.all,
127
0
        )?;
128
129
0
        if !ocsp_response.is_empty() {
130
0
            tracing::trace!("Unvalidated OCSP response: {ocsp_response:?}");
131
0
        }
132
133
        // First attempt to verify the original server name...
134
0
        if let Err(err) = rustls::client::verify_server_name(&cert, sn) {
135
0
            tracing::debug!(
136
0
                "failed to verify {sn:?} ({err}), attempting alt name {:?}",
137
                self.alt_server_name
138
            );
139
            // That failed, lets try the alternative one
140
0
            rustls::client::verify_server_name(&cert, &self.alt_server_name)?;
141
0
        }
142
143
0
        Ok(ServerCertVerified::assertion())
144
0
    }
145
146
    // Rest use the default implementations
147
148
0
    fn verify_tls12_signature(
149
0
        &self,
150
0
        message: &[u8],
151
0
        cert: &CertificateDer<'_>,
152
0
        dss: &DigitallySignedStruct,
153
0
    ) -> Result<HandshakeSignatureValid, rustls::Error> {
154
0
        rustls::crypto::verify_tls12_signature(
155
0
            message,
156
0
            cert,
157
0
            dss,
158
0
            &provider().signature_verification_algorithms,
159
        )
160
0
    }
161
162
0
    fn verify_tls13_signature(
163
0
        &self,
164
0
        message: &[u8],
165
0
        cert: &CertificateDer<'_>,
166
0
        dss: &DigitallySignedStruct,
167
0
    ) -> Result<HandshakeSignatureValid, rustls::Error> {
168
0
        rustls::crypto::verify_tls13_signature(
169
0
            message,
170
0
            cert,
171
0
            dss,
172
0
            &provider().signature_verification_algorithms,
173
        )
174
0
    }
175
176
0
    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
177
0
        provider()
178
0
            .signature_verification_algorithms
179
0
            .supported_schemes()
180
0
    }
181
}
182
183
0
pub(crate) async fn control_plane_client_config(
184
0
    root_cert: &RootCert,
185
0
    alt_hostname: Option<String>,
186
0
) -> Result<ClientConfig, Error> {
187
0
    let roots = root_to_store(root_cert).await?;
188
0
    let c = ClientConfig::builder_with_provider(provider())
189
0
        .with_protocol_versions(crate::tls::tls_versions())?;
190
0
    if let Some(alt_hostname) = alt_hostname {
191
0
        debug!("using alternate hostname {alt_hostname} for TLS verification");
192
0
        Ok(c.dangerous()
193
0
            .with_custom_certificate_verifier(Arc::new(AltHostnameVerifier {
194
0
                roots: Arc::new(roots),
195
0
                alt_server_name: ServerName::try_from(alt_hostname)?,
196
            }))
197
0
            .with_no_client_auth())
198
    } else {
199
0
        Ok(c.with_root_certificates(roots).with_no_client_auth())
200
    }
201
0
}
202
203
#[derive(Clone, Debug)]
204
pub struct TlsGrpcChannel {
205
    uri: Uri,
206
    client: hyper_util::client::legacy::Client<HttpsConnector<HttpConnector>, Body>,
207
    auth: Arc<AuthSource>,
208
}
209
210
/// grpc_connector provides a client TLS channel for gRPC requests.
211
0
pub fn grpc_connector(
212
0
    uri: String,
213
0
    auth: AuthSource,
214
0
    cc: ClientConfig,
215
0
) -> Result<TlsGrpcChannel, Error> {
216
0
    let uri = Uri::try_from(uri)?;
217
0
    let _is_localhost_call = uri.host() == Some("localhost");
218
0
    let mut http: HttpConnector = HttpConnector::new();
219
    // Set keepalives to match istio's Envoy bootstrap configuration:
220
    // https://github.com/istio/istio/blob/a29d5c9c27d80bff31f218936f5a96759d8911c8/tools/packaging/common/envoy_bootstrap.json#L322C14-L322C28
221
    //
222
    // keepalive_interval and keepalive_retries match the linux default per Envoy docs:
223
    // https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/core/v3/address.proto#config-core-v3-tcpkeepalive
224
0
    http.set_keepalive(Some(Duration::from_secs(300)));
225
0
    http.set_keepalive_interval(Some(Duration::from_secs(75)));
226
0
    http.set_keepalive_retries(Some(9));
227
0
    http.set_connect_timeout(Some(Duration::from_secs(5)));
228
0
    http.enforce_http(false);
229
0
    let https: HttpsConnector<HttpConnector> = hyper_rustls::HttpsConnectorBuilder::new()
230
0
        .with_tls_config(cc)
231
0
        .https_only()
232
0
        .enable_http2()
233
0
        .wrap_connector(http);
234
235
    // Configure hyper's client to be h2 only and build with the
236
    // correct https connector.
237
0
    let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
238
0
        .http2_only(true)
239
0
        .http2_keep_alive_interval(Duration::from_secs(30))
240
0
        .http2_keep_alive_timeout(Duration::from_secs(10))
241
0
        .timer(crate::hyper_util::TokioTimer)
242
0
        .build(https);
243
244
0
    Ok(TlsGrpcChannel {
245
0
        uri,
246
0
        auth: Arc::new(auth),
247
0
        client,
248
0
    })
249
0
}
250
251
impl tower::Service<http::Request<Body>> for TlsGrpcChannel {
252
    type Response = http::Response<Incoming>;
253
    type Error = anyhow::Error;
254
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
255
256
0
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257
0
        Ok(()).into()
258
0
    }
259
260
0
    fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
261
0
        let mut uri = Uri::builder();
262
0
        if let Some(scheme) = self.uri.scheme() {
263
0
            uri = uri.scheme(scheme.to_owned());
264
0
        }
265
0
        if let Some(authority) = self.uri.authority() {
266
0
            uri = uri.authority(authority.to_owned());
267
0
        }
268
0
        if let Some(path_and_query) = req.uri().path_and_query() {
269
0
            uri = uri.path_and_query(path_and_query.to_owned());
270
0
        }
271
0
        let uri = uri.build().expect("uri must be valid");
272
0
        *req.uri_mut() = uri;
273
274
0
        let client = self.client.clone();
275
0
        let auth = self.auth.clone();
276
0
        Box::pin(async move {
277
0
            auth.insert_headers(req.headers_mut()).await?;
278
0
            Ok(client.request(req).await?)
279
0
        })
280
0
    }
281
}
282
283
/// Internal state of [`RootCertManager`]
284
///
285
/// Separated into its own struct to protect it behind a `RwLock`
286
/// and be mutated after the outer `Arc` is created
287
struct RootCertManagerInner {
288
    /// Original cert source. Retained to be able to read it again on reload
289
    root_cert: RootCert,
290
291
    /// OS watcher
292
    // WARNING: must use FileIdMap, NOT NoCache. Kubernetes secret/configmap volume updates
293
    // use atomic symlink swaps — FileIdMap tracks inode identity across renames so these
294
    // are detected correctly. NoCache silently misses them, breaking CRL hot-reload entirely.
295
    _debouncer: Option<Debouncer<RecommendedWatcher, FileIdMap>>,
296
}
297
298
/// Watches the CA root certificate file for changes and signals it with a flag.
299
///
300
/// Note: watcher is set on CA root cet's parent directory due to Kubernetes handle of ConfigMaps
301
///       Whenever a ConfigMap changes Kubernetes creates a new timestamped folder with the content
302
///       and atomically hot-swap a symlink to point to the new folder.
303
///       If we setup a watcher on the file itself we will miss this event
304
pub(crate) struct RootCertManager {
305
    inner: RwLock<RootCertManagerInner>,
306
307
    /// Set to `true` by OS watcher when cert directory changes => ConfigMap changed.
308
    /// Reset by CaClient after a successful channel rebuild
309
    dirty: AtomicBool,
310
}
311
312
impl RootCertManager {
313
    /// Creates a new manager for `root_cert`
314
    ///
315
    /// If `root_cert` is a `RootCert::File` it starts a folder watcher on the cert's parent dir
316
    ///
317
    /// Returns an error if cert's parent dir cannot be watched (e.g. it does not exist).
318
    /// A missing cert file is not detect here, it will be detected at channel rebuild by [`CaClient`]
319
0
    pub(crate) fn new(root_cert: RootCert) -> Result<Arc<Self>, Error> {
320
0
        let manager = Arc::new(Self {
321
0
            inner: RwLock::new(RootCertManagerInner {
322
0
                root_cert: root_cert.clone(),
323
0
                _debouncer: None,
324
0
            }),
325
0
            dirty: AtomicBool::new(false),
326
0
        });
327
328
0
        if let RootCert::File(path) = &root_cert {
329
0
            let debouncer = manager.start_watcher(path)?;
330
331
0
            manager.inner.write().unwrap()._debouncer = Some(debouncer);
332
0
        }
333
334
0
        Ok(manager)
335
0
    }
336
337
    /// Atomically reads and clears the dirty flag
338
0
    pub(crate) fn take_dirty(&self) -> bool {
339
0
        self.dirty.swap(false, Ordering::AcqRel)
340
0
    }
341
342
    /// Re-arms dirty flag
343
0
    pub(crate) fn mark_dirty(&self) {
344
0
        self.dirty.store(true, Ordering::Release);
345
0
    }
346
347
    /// Returns the original cert source
348
0
    pub(crate) fn root_cert(&self) -> RootCert {
349
0
        self.inner.read().unwrap().root_cert.clone()
350
0
    }
351
352
    /// Register an OS dir watcher on `path`'s parent dir.
353
    ///
354
    /// Any filesystem event in that dir will set the `dirty` flag after a (2 seconds) debounce
355
0
    fn start_watcher(
356
0
        self: &Arc<Self>,
357
0
        path: &Path,
358
0
    ) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
359
0
        let watch_dir = path.parent().ok_or_else(|| {
360
0
            Error::InvalidRootCert("root cert path must have a parent directory".to_string())
361
0
        })?;
362
363
0
        let manager = Arc::clone(self);
364
365
0
        let mut debouncer = new_debouncer_opt(
366
0
            Duration::from_secs(2),
367
0
            None,
368
0
            move |result: DebounceEventResult| match result {
369
0
                Ok(events) => {
370
0
                    debug!(
371
0
                        event_count = events.len(),
372
0
                        "root cert directory changed; scheduling TLS channel rebuild"
373
                    );
374
0
                    if !events.is_empty() {
375
0
                        manager.dirty.store(true, Ordering::Release);
376
0
                    }
377
                }
378
0
                Err(errors) => {
379
0
                    for e in errors {
380
0
                        debug!(error = ?e, "root cert watcher error");
381
                    }
382
                }
383
0
            },
384
0
            FileIdMap::new(),
385
0
            Config::default(),
386
        )
387
0
        .map_err(|e| Error::InvalidRootCert(format!("failed to create root cert watcher: {e}")))?;
388
389
0
        debouncer
390
0
            .watch(watch_dir, notify::RecursiveMode::NonRecursive)
391
0
            .map_err(|e| {
392
0
                Error::InvalidRootCert(format!(
393
0
                    "failed to watch root cert directory {watch_dir:?}: {e}"
394
0
                ))
395
0
            })?;
396
397
0
        debug!(path = ?watch_dir, "root cert file watcher started");
398
0
        Ok(debouncer)
399
0
    }
400
}
401
402
#[cfg(test)]
403
mod tests {
404
405
    use std::io::Write;
406
407
    use bytes::Bytes;
408
    use tempfile::NamedTempFile;
409
410
    use crate::tls::mock::TEST_ROOT;
411
412
    use super::*;
413
414
    #[test]
415
    fn static_cert_is_never_dirty() {
416
        let manager = RootCertManager::new(RootCert::Static(Bytes::from_static(TEST_ROOT)))
417
            .expect("static cert manager must not fail");
418
419
        assert!(!manager.take_dirty(), "static cert must never be dirty");
420
        assert!(
421
            manager.inner.read().unwrap()._debouncer.is_none(),
422
            "no debouncer for static certs"
423
        );
424
    }
425
426
    #[test]
427
    fn file_cert_starts_clean() {
428
        let mut file = NamedTempFile::new().unwrap();
429
        file.write_all(TEST_ROOT).unwrap();
430
431
        let manager = RootCertManager::new(RootCert::File(file.path().to_path_buf()))
432
            .expect("file cert manager must not fail");
433
434
        assert!(!manager.take_dirty(), "new manager must not start dirty");
435
        assert!(
436
            manager.inner.read().unwrap()._debouncer.is_some(),
437
            "file cert must have a debouncer"
438
        );
439
    }
440
441
    #[test]
442
    fn take_dirty_and_make_dirty_work() {
443
        let mut file = NamedTempFile::new().unwrap();
444
        file.write_all(TEST_ROOT).unwrap();
445
446
        let manager = RootCertManager::new(RootCert::File(file.path().to_path_buf()))
447
            .expect("file cert manager must not fail");
448
449
        assert!(!manager.take_dirty(), "new manager must not start dirty");
450
451
        manager.dirty.store(true, Ordering::Release);
452
        assert!(
453
            manager.take_dirty(),
454
            "take_dirty should return true when dirty"
455
        );
456
        assert!(
457
            !manager.take_dirty(),
458
            "take_dirty should return false after clearing"
459
        );
460
461
        manager.mark_dirty();
462
        assert!(
463
            manager.take_dirty(),
464
            "take_dirty should return true after rearm"
465
        );
466
    }
467
}