/src/ztunnel/src/identity/caclient.rs
Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::BTreeMap; |
16 | | |
17 | | use async_trait::async_trait; |
18 | | use prost_types::Struct; |
19 | | use prost_types::value::Kind; |
20 | | use tonic::IntoRequest; |
21 | | use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue}; |
22 | | use tracing::{debug, error, instrument, warn}; |
23 | | |
24 | | use crate::identity::Error; |
25 | | use crate::identity::auth::AuthSource; |
26 | | use crate::identity::manager::Identity; |
27 | | use crate::tls::{self, TlsGrpcChannel}; |
28 | | use crate::xds::istio::ca::IstioCertificateRequest; |
29 | | use crate::xds::istio::ca::istio_certificate_service_client::IstioCertificateServiceClient; |
30 | | |
31 | | pub struct CaClient { |
32 | | pub client: IstioCertificateServiceClient<TlsGrpcChannel>, |
33 | | pub enable_impersonated_identity: bool, |
34 | | pub secret_ttl: i64, |
35 | | ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>, |
36 | | } |
37 | | |
38 | | impl CaClient { |
39 | 0 | pub async fn new( |
40 | 0 | address: String, |
41 | 0 | alt_hostname: Option<String>, |
42 | 0 | cert_provider: Box<dyn tls::ControlPlaneClientCertProvider>, |
43 | 0 | auth: AuthSource, |
44 | 0 | enable_impersonated_identity: bool, |
45 | 0 | secret_ttl: i64, |
46 | 0 | ca_headers: Vec<(AsciiMetadataKey, AsciiMetadataValue)>, |
47 | 0 | ) -> Result<CaClient, Error> { |
48 | 0 | let svc = |
49 | 0 | tls::grpc_connector(address, auth, cert_provider.fetch_cert(alt_hostname).await?)?; |
50 | 0 | let client = IstioCertificateServiceClient::new(svc); |
51 | 0 | Ok(CaClient { |
52 | 0 | client, |
53 | 0 | enable_impersonated_identity, |
54 | 0 | secret_ttl, |
55 | 0 | ca_headers, |
56 | 0 | }) |
57 | 0 | } |
58 | | } |
59 | | |
60 | | impl CaClient { |
61 | | #[instrument(skip_all)] |
62 | | async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> { |
63 | | let cs = tls::csr::CsrOptions { |
64 | | san: id.to_string(), |
65 | | } |
66 | | .generate()?; |
67 | | let csr = cs.csr; |
68 | | let private_key = cs.private_key; |
69 | | |
70 | | let mut req = tonic::Request::new(IstioCertificateRequest { |
71 | | csr, |
72 | | validity_duration: self.secret_ttl, |
73 | | metadata: { |
74 | | if self.enable_impersonated_identity { |
75 | | Some(Struct { |
76 | | fields: BTreeMap::from([( |
77 | | "ImpersonatedIdentity".into(), |
78 | | prost_types::Value { |
79 | | kind: Some(Kind::StringValue(id.to_string())), |
80 | | }, |
81 | | )]), |
82 | | }) |
83 | | } else { |
84 | | None |
85 | | } |
86 | | }, |
87 | | }); |
88 | 0 | self.ca_headers.iter().for_each(|(k, v)| { |
89 | 0 | req.metadata_mut().insert(k.clone(), v.clone()); |
90 | | |
91 | 0 | if let Ok(v_str) = v.to_str() { |
92 | 0 | debug!("CA header added: {}={}", k, v_str); |
93 | 0 | } |
94 | 0 | }); |
95 | | |
96 | | let resp = self |
97 | | .client |
98 | | .clone() |
99 | | .create_certificate(req.into_request()) |
100 | | .await |
101 | | .map_err(Box::new)? |
102 | | .into_inner(); |
103 | | |
104 | | let leaf = resp |
105 | | .cert_chain |
106 | | .first() |
107 | 0 | .ok_or_else(|| Error::EmptyResponse(id.to_owned()))? |
108 | | .as_bytes(); |
109 | | let chain = if resp.cert_chain.len() > 1 { |
110 | 0 | resp.cert_chain[1..].iter().map(|s| s.as_bytes()).collect() |
111 | | } else { |
112 | | warn!("no chain certs for: {}", id); |
113 | | vec![] |
114 | | }; |
115 | | let certs = tls::WorkloadCertificate::new(&private_key, leaf, chain)?; |
116 | | // Make the certificate actually matches the identity we requested. |
117 | | if self.enable_impersonated_identity && certs.identity().as_ref() != Some(id) { |
118 | | error!("expected identity {:?}, got {:?}", id, certs.identity()); |
119 | | return Err(Error::SanError(id.to_owned())); |
120 | | } |
121 | | Ok(certs) |
122 | | } |
123 | | } |
124 | | |
125 | | #[async_trait] |
126 | | impl crate::identity::CaClientTrait for CaClient { |
127 | 0 | async fn fetch_certificate(&self, id: &Identity) -> Result<tls::WorkloadCertificate, Error> { |
128 | 0 | self.fetch_certificate(id).await |
129 | 0 | } Unexecuted instantiation: <ztunnel::identity::caclient::CaClient as ztunnel::identity::manager::CaClientTrait>::fetch_certificate Unexecuted instantiation: <ztunnel::identity::caclient::CaClient as ztunnel::identity::manager::CaClientTrait>::fetch_certificate::{closure#0} |
130 | | } |
131 | | |
132 | | #[cfg(any(test, feature = "testing"))] |
133 | | pub mod mock { |
134 | | use std::sync::Arc; |
135 | | use std::time::Duration; |
136 | | |
137 | | use tokio::sync::RwLock; |
138 | | use tokio::time::Instant; |
139 | | |
140 | | use crate::identity::Identity; |
141 | | |
142 | | use super::*; |
143 | | |
144 | | #[derive(Default)] |
145 | | struct ClientState { |
146 | | fetches: Vec<Identity>, |
147 | | error: bool, |
148 | | cert_gen: tls::mock::CertGenerator, |
149 | | } |
150 | | |
151 | | #[derive(Clone)] |
152 | | pub struct ClientConfig { |
153 | | pub cert_lifetime: Duration, |
154 | | pub time_conv: crate::time::Converter, |
155 | | // If non-zero, causes fetch_certificate calls to sleep for the specified duration before |
156 | | // returning. This is helpful to let tests that pause tokio time get more control over code |
157 | | // execution. |
158 | | pub fetch_latency: Duration, |
159 | | } |
160 | | |
161 | | impl Default for ClientConfig { |
162 | | fn default() -> Self { |
163 | | Self { |
164 | | fetch_latency: Duration::ZERO, |
165 | | cert_lifetime: Duration::from_secs(10), |
166 | | time_conv: crate::time::Converter::new(), |
167 | | } |
168 | | } |
169 | | } |
170 | | |
171 | | #[derive(Clone)] |
172 | | pub struct CaClient { |
173 | | cfg: ClientConfig, |
174 | | state: Arc<RwLock<ClientState>>, |
175 | | } |
176 | | |
177 | | impl CaClient { |
178 | | pub fn new(cfg: ClientConfig) -> CaClient { |
179 | | CaClient { |
180 | | cfg, |
181 | | state: Default::default(), |
182 | | } |
183 | | } |
184 | | |
185 | | pub fn cert_lifetime(&self) -> Duration { |
186 | | self.cfg.cert_lifetime |
187 | | } |
188 | | |
189 | | // Returns a list of fetch_certificate calls, in the order they happened. Calls are added |
190 | | // just before the function returns (ie. after the potential sleep controlled by the |
191 | | // fetch_latency config option). |
192 | | pub async fn fetches(&self) -> Vec<Identity> { |
193 | | self.state.read().await.fetches.clone() |
194 | | } |
195 | | |
196 | | pub async fn clear_fetches(&self) { |
197 | | self.state.write().await.fetches.clear(); |
198 | | } |
199 | | |
200 | | async fn fetch_certificate( |
201 | | &self, |
202 | | id: &Identity, |
203 | | ) -> Result<tls::WorkloadCertificate, Error> { |
204 | | let Identity::Spiffe { |
205 | | trust_domain: td, |
206 | | namespace: ns, |
207 | | .. |
208 | | } = id; |
209 | | if td == "error" { |
210 | | return Err(match ns.as_str() { |
211 | | "forgotten" => Error::Forgotten, |
212 | | _ => panic!("cannot parse injected error: {ns}"), |
213 | | }); |
214 | | } |
215 | | |
216 | | if self.cfg.fetch_latency != Duration::ZERO { |
217 | | tokio::time::sleep(self.cfg.fetch_latency).await; |
218 | | } |
219 | | |
220 | | // Get SystemTime::now() via Instant::now() to allow mocking in tests. |
221 | | let not_before = self |
222 | | .cfg |
223 | | .time_conv |
224 | | .instant_to_system_time(Instant::now().into()) |
225 | | .expect("SystemTime cannot represent current time. Was the process started in extreme future?"); |
226 | | let not_after = not_before + self.cfg.cert_lifetime; |
227 | | |
228 | | let mut state = self.state.write().await; |
229 | | state.fetches.push(id.to_owned()); |
230 | | if state.error { |
231 | | return Err(Error::Spiffe("injected test error".into())); |
232 | | } |
233 | | let certs = state |
234 | | .cert_gen |
235 | | .new_certs(&id.to_owned().into(), not_before, not_after); |
236 | | Ok(certs) |
237 | | } |
238 | | |
239 | | pub async fn set_error(&mut self, error: bool) { |
240 | | let mut state = self.state.write().await; |
241 | | state.error = error; |
242 | | } |
243 | | } |
244 | | |
245 | | #[async_trait] |
246 | | impl crate::identity::CaClientTrait for CaClient { |
247 | | async fn fetch_certificate( |
248 | | &self, |
249 | | id: &Identity, |
250 | | ) -> Result<tls::WorkloadCertificate, Error> { |
251 | | self.fetch_certificate(id).await |
252 | | } |
253 | | } |
254 | | } |
255 | | |
256 | | #[cfg(test)] |
257 | | mod tests { |
258 | | |
259 | | use std::time::Duration; |
260 | | |
261 | | use matches::assert_matches; |
262 | | |
263 | | use crate::{ |
264 | | identity::{Error, Identity}, |
265 | | test_helpers, tls, |
266 | | xds::istio::ca::IstioCertificateResponse, |
267 | | }; |
268 | | |
269 | | async fn test_ca_client_with_response( |
270 | | res: IstioCertificateResponse, |
271 | | ) -> Result<tls::WorkloadCertificate, Error> { |
272 | | let (mock, ca_client) = test_helpers::ca::CaServer::spawn().await; |
273 | | mock.send(Ok(res)).unwrap(); |
274 | | ca_client.fetch_certificate(&Identity::default()).await |
275 | | } |
276 | | |
277 | | #[tokio::test] |
278 | | async fn empty_chain() { |
279 | | let res = |
280 | | test_ca_client_with_response(IstioCertificateResponse { cert_chain: vec![] }).await; |
281 | | assert_matches!(res, Err(Error::EmptyResponse(_))); |
282 | | } |
283 | | |
284 | | #[tokio::test] |
285 | | async fn wrong_identity() { |
286 | | let id = Identity::Spiffe { |
287 | | service_account: "wrong-sa".into(), |
288 | | namespace: "foo".into(), |
289 | | trust_domain: "cluster.local".into(), |
290 | | }; |
291 | | let certs = tls::mock::generate_test_certs( |
292 | | &id.into(), |
293 | | Duration::from_secs(0), |
294 | | Duration::from_secs(0), |
295 | | ); |
296 | | |
297 | | let res = test_ca_client_with_response(IstioCertificateResponse { |
298 | | cert_chain: certs.full_chain_and_roots(), |
299 | | }) |
300 | | .await; |
301 | | assert_matches!(res, Err(Error::SanError(_))); |
302 | | } |
303 | | |
304 | | #[tokio::test] |
305 | | async fn fetch_certificate() { |
306 | | let certs = tls::mock::generate_test_certs( |
307 | | &Identity::default().into(), |
308 | | Duration::from_secs(0), |
309 | | Duration::from_secs(0), |
310 | | ); |
311 | | |
312 | | let res = test_ca_client_with_response(IstioCertificateResponse { |
313 | | cert_chain: certs.full_chain_and_roots(), |
314 | | }) |
315 | | .await; |
316 | | assert_matches!(res, Ok(_)); |
317 | | } |
318 | | } |