Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/identity/caclient.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::collections::BTreeMap;
16
17
use async_trait::async_trait;
18
use prost_types::Struct;
19
use prost_types::value::Kind;
20
use tonic::IntoRequest;
21
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue};
22
use tracing::{debug, error, instrument, warn};
23
24
use crate::identity::Error;
25
use crate::identity::auth::AuthSource;
26
use crate::identity::manager::Identity;
27
use crate::tls::{self, TlsGrpcChannel};
28
use crate::xds::istio::ca::IstioCertificateRequest;
29
use crate::xds::istio::ca::istio_certificate_service_client::IstioCertificateServiceClient;
30
31
pub struct CaClient {
32
    pub client: IstioCertificateServiceClient<TlsGrpcChannel>,
33
    pub enable_impersonated_identity: bool,
34
    pub secret_ttl: i64,
35
    ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
36
}
37
38
impl CaClient {
39
0
    pub async fn new(
40
0
        address: String,
41
0
        alt_hostname: Option<String>,
42
0
        cert_provider: Box<dyn tls::ControlPlaneClientCertProvider>,
43
0
        auth: AuthSource,
44
0
        enable_impersonated_identity: bool,
45
0
        secret_ttl: i64,
46
0
        ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>,
47
0
    ) -> Result<CaClient, Error> {
48
0
        let svc =
49
0
            tls::grpc_connector(address, auth, cert_provider.fetch_cert(alt_hostname).await?)?;
50
0
        let client = IstioCertificateServiceClient::new(svc);
51
0
        Ok(CaClient {
52
0
            client,
53
0
            enable_impersonated_identity,
54
0
            secret_ttl,
55
0
            ca_headers,
56
0
        })
57
0
    }
58
}
59
60
impl CaClient {
61
    #[instrument(skip_all)]
62
    async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> {
63
        let cs = tls::csr::CsrOptions {
64
            san: id.to_string(),
65
        }
66
        .generate()?;
67
        let csr = cs.csr;
68
        let private_key = cs.private_key;
69
70
        let mut req = tonic::Request::new(IstioCertificateRequest {
71
            csr,
72
            validity_duration: self.secret_ttl,
73
            metadata: {
74
                if self.enable_impersonated_identity {
75
                    Some(Struct {
76
                        fields: BTreeMap::from([(
77
                            "ImpersonatedIdentity".into(),
78
                            prost_types::Value {
79
                                kind: Some(Kind::StringValue(id.to_string())),
80
                            },
81
                        )]),
82
                    })
83
                } else {
84
                    None
85
                }
86
            },
87
        });
88
0
        self.ca_headers.iter().for_each(|(k, v)| {
89
0
            req.metadata_mut().insert(k.clone(), v.clone());
90
91
0
            if let Ok(v_str) = v.to_str() {
92
0
                debug!("CA header added: {}={}", k, v_str);
93
0
            }
94
0
        });
95
96
        let resp = self
97
            .client
98
            .clone()
99
            .create_certificate(req.into_request())
100
            .await
101
            .map_err(Box::new)?
102
            .into_inner();
103
104
        let leaf = resp
105
            .cert_chain
106
            .first()
107
0
            .ok_or_else(|| Error::EmptyResponse(id.to_owned()))?
108
            .as_bytes();
109
        let chain = if resp.cert_chain.len() > 1 {
110
0
            resp.cert_chain[1..].iter().map(|s| s.as_bytes()).collect()
111
        } else {
112
            warn!("no chain certs for: {}", id);
113
            vec![]
114
        };
115
        let certs = tls::WorkloadCertificate::new(&private_key, leaf, chain)?;
116
        // Make the certificate actually matches the identity we requested.
117
        if self.enable_impersonated_identity && certs.identity().as_ref() != Some(id) {
118
            error!("expected identity {:?}, got {:?}", id, certs.identity());
119
            return Err(Error::SanError(id.to_owned()));
120
        }
121
        Ok(certs)
122
    }
123
}
124
125
#[async_trait]
126
impl crate::identity::CaClientTrait for CaClient {
127
0
    async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> {
128
0
        self.fetch_certificate(id).await
129
0
    }
Unexecuted instantiation: <ztunnel::identity::caclient::CaClient as ztunnel::identity::manager::CaClientTrait>::fetch_certificate
Unexecuted instantiation: <ztunnel::identity::caclient::CaClient as ztunnel::identity::manager::CaClientTrait>::fetch_certificate::{closure#0}
130
}
131
132
#[cfg(any(test, feature = "testing"))]
133
pub mod mock {
134
    use std::sync::Arc;
135
    use std::time::Duration;
136
137
    use tokio::sync::RwLock;
138
    use tokio::time::Instant;
139
140
    use crate::identity::Identity;
141
142
    use super::*;
143
144
    #[derive(Default)]
145
    struct ClientState {
146
        fetches: Vec<Identity>,
147
        error: bool,
148
        cert_gen: tls::mock::CertGenerator,
149
    }
150
151
    #[derive(Clone)]
152
    pub struct ClientConfig {
153
        pub cert_lifetime: Duration,
154
        pub time_conv: crate::time::Converter,
155
        // If non-zero, causes fetch_certificate calls to sleep for the specified duration before
156
        // returning. This is helpful to let tests that pause tokio time get more control over code
157
        // execution.
158
        pub fetch_latency: Duration,
159
    }
160
161
    impl Default for ClientConfig {
162
        fn default() -> Self {
163
            Self {
164
                fetch_latency: Duration::ZERO,
165
                cert_lifetime: Duration::from_secs(10),
166
                time_conv: crate::time::Converter::new(),
167
            }
168
        }
169
    }
170
171
    #[derive(Clone)]
172
    pub struct CaClient {
173
        cfg: ClientConfig,
174
        state: Arc<RwLock<ClientState>>,
175
    }
176
177
    impl CaClient {
178
        pub fn new(cfg: ClientConfig) -> CaClient {
179
            CaClient {
180
                cfg,
181
                state: Default::default(),
182
            }
183
        }
184
185
        pub fn cert_lifetime(&self) -> Duration {
186
            self.cfg.cert_lifetime
187
        }
188
189
        // Returns a list of fetch_certificate calls, in the order they happened. Calls are added
190
        // just before the function returns (ie. after the potential sleep controlled by the
191
        // fetch_latency config option).
192
        pub async fn fetches(&self) -> Vec<Identity> {
193
            self.state.read().await.fetches.clone()
194
        }
195
196
        pub async fn clear_fetches(&self) {
197
            self.state.write().await.fetches.clear();
198
        }
199
200
        async fn fetch_certificate(
201
            &self,
202
            id: &Identity,
203
        ) -> Result<tls::WorkloadCertificate, Error> {
204
            let Identity::Spiffe {
205
                trust_domain: td,
206
                namespace: ns,
207
                ..
208
            } = id;
209
            if td == "error" {
210
                return Err(match ns.as_str() {
211
                    "forgotten" => Error::Forgotten,
212
                    _ => panic!("cannot parse injected error: {ns}"),
213
                });
214
            }
215
216
            if self.cfg.fetch_latency != Duration::ZERO {
217
                tokio::time::sleep(self.cfg.fetch_latency).await;
218
            }
219
220
            // Get SystemTime::now() via Instant::now() to allow mocking in tests.
221
            let not_before = self
222
                .cfg
223
                .time_conv
224
                .instant_to_system_time(Instant::now().into())
225
                .expect("SystemTime cannot represent current time. Was the process started in extreme future?");
226
            let not_after = not_before + self.cfg.cert_lifetime;
227
228
            let mut state = self.state.write().await;
229
            state.fetches.push(id.to_owned());
230
            if state.error {
231
                return Err(Error::Spiffe("injected test error".into()));
232
            }
233
            let certs = state
234
                .cert_gen
235
                .new_certs(&id.to_owned().into(), not_before, not_after);
236
            Ok(certs)
237
        }
238
239
        pub async fn set_error(&mut self, error: bool) {
240
            let mut state = self.state.write().await;
241
            state.error = error;
242
        }
243
    }
244
245
    #[async_trait]
246
    impl crate::identity::CaClientTrait for CaClient {
247
        async fn fetch_certificate(
248
            &self,
249
            id: &Identity,
250
        ) -> Result<tls::WorkloadCertificate, Error> {
251
            self.fetch_certificate(id).await
252
        }
253
    }
254
}
255
256
#[cfg(test)]
257
mod tests {
258
259
    use std::time::Duration;
260
261
    use matches::assert_matches;
262
263
    use crate::{
264
        identity::{Error, Identity},
265
        test_helpers, tls,
266
        xds::istio::ca::IstioCertificateResponse,
267
    };
268
269
    async fn test_ca_client_with_response(
270
        res: IstioCertificateResponse,
271
    ) -> Result<tls::WorkloadCertificate, Error> {
272
        let (mock, ca_client) = test_helpers::ca::CaServer::spawn().await;
273
        mock.send(Ok(res)).unwrap();
274
        ca_client.fetch_certificate(&Identity::default()).await
275
    }
276
277
    #[tokio::test]
278
    async fn empty_chain() {
279
        let res =
280
            test_ca_client_with_response(IstioCertificateResponse { cert_chain: vec![] }).await;
281
        assert_matches!(res, Err(Error::EmptyResponse(_)));
282
    }
283
284
    #[tokio::test]
285
    async fn wrong_identity() {
286
        let id = Identity::Spiffe {
287
            service_account: "wrong-sa".into(),
288
            namespace: "foo".into(),
289
            trust_domain: "cluster.local".into(),
290
        };
291
        let certs = tls::mock::generate_test_certs(
292
            &id.into(),
293
            Duration::from_secs(0),
294
            Duration::from_secs(0),
295
        );
296
297
        let res = test_ca_client_with_response(IstioCertificateResponse {
298
            cert_chain: certs.full_chain_and_roots(),
299
        })
300
        .await;
301
        assert_matches!(res, Err(Error::SanError(_)));
302
    }
303
304
    #[tokio::test]
305
    async fn fetch_certificate() {
306
        let certs = tls::mock::generate_test_certs(
307
            &Identity::default().into(),
308
            Duration::from_secs(0),
309
            Duration::from_secs(0),
310
        );
311
312
        let res = test_ca_client_with_response(IstioCertificateResponse {
313
            cert_chain: certs.full_chain_and_roots(),
314
        })
315
        .await;
316
        assert_matches!(res, Ok(_));
317
    }
318
}