Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::collections::HashMap; |
16 | | use std::error::Error as StdErr; |
17 | | use std::fmt; |
18 | | use std::fmt::Formatter; |
19 | | use std::str::FromStr; |
20 | | use std::sync::{Arc, RwLock}; |
21 | | use tracing::Level; |
22 | | |
23 | | use tokio::sync::mpsc; |
24 | | #[cfg(any(test, feature = "testing"))] |
25 | | use tracing::error; |
26 | | use tracing::{debug, info, instrument, trace, warn}; |
27 | | |
28 | | pub use client::*; |
29 | | pub use metrics::*; |
30 | | pub use types::*; |
31 | | use xds::istio::security::Authorization as XdsAuthorization; |
32 | | use xds::istio::workload::Address as XdsAddress; |
33 | | use xds::istio::workload::PortList; |
34 | | use xds::istio::workload::Service as XdsService; |
35 | | use xds::istio::workload::Workload as XdsWorkload; |
36 | | use xds::istio::workload::address::Type as XdsType; |
37 | | |
38 | | use crate::cert_fetcher::{CertFetcher, NoCertFetcher}; |
39 | | use crate::config::ConfigSource; |
40 | | use crate::rbac::Authorization; |
41 | | use crate::state::ProxyState; |
42 | | use crate::state::service::{Endpoint, Service, ServiceStore}; |
43 | | use crate::state::workload::{NamespacedHostname, Workload, WorkloadStore}; |
44 | | use crate::strng::Strng; |
45 | | use crate::{rbac, strng}; |
46 | | use crate::{tls, xds}; |
47 | | |
48 | | use self::service::discovery::v3::DeltaDiscoveryRequest; |
49 | | |
50 | | mod client; |
51 | | pub mod metrics; |
52 | | mod types; |
53 | | |
54 | | struct DisplayStatus<'a>(&'a tonic::Status); |
55 | | |
56 | | impl fmt::Display for DisplayStatus<'_> { |
57 | 0 | fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { |
58 | 0 | let s = &self.0; |
59 | 0 | write!(f, "status: {:?}, message: {:?}", s.code(), s.message())?; |
60 | | |
61 | 0 | if s.message().to_string().contains("authentication failure") { |
62 | 0 | write!( |
63 | 0 | f, |
64 | 0 | " (hint: check the control plane logs for more information)" |
65 | 0 | )?; |
66 | 0 | } |
67 | 0 | if !s.details().is_empty() |
68 | 0 | && let Ok(st) = std::str::from_utf8(s.details()) |
69 | | { |
70 | 0 | write!(f, ", details: {st}")?; |
71 | 0 | } |
72 | 0 | if let Some(src) = s.source().and_then(|s| s.source()) { |
73 | 0 | write!(f, ", source: {src}")?; |
74 | | // Error is not public to explicitly match on, so do a fuzzy match |
75 | 0 | if format!("{src}").contains("Temporary failure in name resolution") { |
76 | 0 | write!(f, " (hint: is the DNS server reachable?)")?; |
77 | 0 | } |
78 | 0 | } |
79 | 0 | Ok(()) |
80 | 0 | } |
81 | | } |
82 | | |
83 | | #[derive(thiserror::Error, Debug)] |
84 | | pub enum Error { |
85 | | #[error("gRPC error {}", DisplayStatus(.0))] |
86 | | GrpcStatus(#[from] tonic::Status), |
87 | | #[error("gRPC connection error connecting to {}: {}", .0, DisplayStatus(.1))] |
88 | | Connection(String, #[source] tonic::Status), |
89 | | /// Attempted to send on a MPSC channel which has been canceled |
90 | | #[error(transparent)] |
91 | | RequestFailure(#[from] Box<mpsc::error::SendError<DeltaDiscoveryRequest>>), |
92 | | #[error("failed to send on demand resource")] |
93 | | OnDemandSend(), |
94 | | #[error("TLS Error: {0}")] |
95 | | TLSError(#[from] tls::Error), |
96 | | } |
97 | | |
98 | | /// Updates the [ProxyState] from XDS. |
99 | | /// All state updates code goes in ProxyStateUpdateMutator, that takes state as a parameter. |
100 | | /// this guarantees that the state is always locked when it is updated. |
101 | | #[derive(Clone)] |
102 | | pub struct ProxyStateUpdateMutator { |
103 | | cert_fetcher: Arc<dyn CertFetcher>, |
104 | | } |
105 | | |
106 | | #[derive(Clone)] |
107 | | pub struct ProxyStateUpdater { |
108 | | state: Arc<RwLock<ProxyState>>, |
109 | | updater: ProxyStateUpdateMutator, |
110 | | } |
111 | | |
112 | | impl ProxyStateUpdater { |
113 | | /// Creates a new updater for the given stores. Will prefetch certs when workloads are updated. |
114 | 0 | pub fn new(state: Arc<RwLock<ProxyState>>, cert_fetcher: Arc<dyn CertFetcher>) -> Self { |
115 | 0 | Self { |
116 | 0 | state, |
117 | 0 | updater: ProxyStateUpdateMutator { cert_fetcher }, |
118 | 0 | } |
119 | 0 | } |
120 | | /// Creates a new updater that does not prefetch workload certs. |
121 | 0 | pub fn new_no_fetch(state: Arc<RwLock<ProxyState>>) -> Self { |
122 | 0 | Self { |
123 | 0 | state, |
124 | 0 | updater: ProxyStateUpdateMutator::new_no_fetch(), |
125 | 0 | } |
126 | 0 | } |
127 | | } |
128 | | |
129 | | impl ProxyStateUpdateMutator { |
130 | | /// Creates a new updater that does not prefetch workload certs. |
131 | 0 | pub fn new_no_fetch() -> Self { |
132 | 0 | ProxyStateUpdateMutator { |
133 | 0 | cert_fetcher: Arc::new(NoCertFetcher()), |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | | #[instrument( |
138 | | level = Level::TRACE, |
139 | | name="insert_workload", |
140 | | skip_all, |
141 | | fields(uid=%w.uid), |
142 | | )] |
143 | | pub fn insert_workload(&self, state: &mut ProxyState, w: XdsWorkload) -> anyhow::Result<()> { |
144 | | debug!("handling insert"); |
145 | | |
146 | | // Clone services, so we can pass full ownership of the rest of XdsWorkload to build our Workload |
147 | | // object, which doesn't include Services. |
148 | | // In theory, I think we could avoid this if Workload::try_from returning the services. |
149 | | // let services = w.services.clone(); |
150 | | // Convert the workload. |
151 | | let (workload, services): (Workload, HashMap<String, PortList>) = w.try_into()?; |
152 | | let workload = Arc::new(workload); |
153 | | |
154 | | // First, remove the entry entirely to make sure things are cleaned up properly. |
155 | | self.remove_workload_for_insert(state, &workload.uid); |
156 | | |
157 | | // Prefetch the cert for the workload. |
158 | | self.cert_fetcher.prefetch_cert(&workload); |
159 | | |
160 | | // Lock and upstate the stores. |
161 | | state.workloads.insert(workload.clone()); |
162 | | insert_service_endpoints(&workload, &services, &mut state.services)?; |
163 | | |
164 | | Ok(()) |
165 | | } |
166 | | |
167 | 0 | pub fn remove(&self, state: &mut ProxyState, xds_name: &Strng) { |
168 | 0 | self.remove_internal(state, xds_name, false); |
169 | 0 | } |
170 | | |
171 | 0 | fn remove_workload_for_insert(&self, state: &mut ProxyState, xds_name: &Strng) { |
172 | 0 | self.remove_internal(state, xds_name, true); |
173 | 0 | } |
174 | | |
175 | | #[instrument( |
176 | | level = Level::TRACE, |
177 | | name="remove", |
178 | | skip_all, |
179 | | fields(name=%xds_name, for_workload_insert=%for_workload_insert), |
180 | | )] |
181 | | fn remove_internal(&self, state: &mut ProxyState, xds_name: &Strng, for_workload_insert: bool) { |
182 | | // remove workload by UID; if xds_name is a service then this will no-op |
183 | | if let Some(prev) = state.workloads.remove(&strng::new(xds_name)) { |
184 | | // Also remove service endpoints for the workload. |
185 | | state.services.remove_endpoint(&prev); |
186 | | |
187 | | // This is a real removal (not a removal before insertion), and nothing else references the cert |
188 | | // Clear it out |
189 | | if !for_workload_insert |
190 | | && state |
191 | | .workloads |
192 | | .was_last_identity_on_node(&prev.node, &prev.identity()) |
193 | | { |
194 | | self.cert_fetcher.clear_cert(&prev.identity()); |
195 | | } |
196 | | // We removed a workload, no reason to attempt to remove a service with the same name |
197 | | return; |
198 | | } |
199 | | if for_workload_insert { |
200 | | // This is a workload, don't attempt to remove as a service |
201 | | return; |
202 | | } |
203 | | |
204 | | let Ok(name) = NamespacedHostname::from_str(xds_name) else { |
205 | | // we don't have namespace/hostname xds primary key for service |
206 | | warn!( |
207 | | "tried to remove service but it did not have the expected namespace/hostname format" |
208 | | ); |
209 | | return; |
210 | | }; |
211 | | |
212 | | if name.hostname.contains('/') { |
213 | | // avoid trying to delete obvious workload UIDs as a service, |
214 | | // which can result in noisy logs when new workloads are added |
215 | | // (we remove then add workloads on initial update) |
216 | | // |
217 | | // we can make this assumption because namespaces and hostnames cannot have `/` in them |
218 | | trace!("not a service, not attempting to delete as such",); |
219 | | return; |
220 | | } |
221 | | if !state.services.remove(&name) { |
222 | | warn!("tried to remove service, but it was not found"); |
223 | | } |
224 | | } |
225 | | |
226 | 0 | pub fn insert_address(&self, state: &mut ProxyState, a: XdsAddress) -> anyhow::Result<()> { |
227 | 0 | match a.r#type { |
228 | 0 | Some(XdsType::Workload(w)) => self.insert_workload(state, w), |
229 | 0 | Some(XdsType::Service(s)) => self.insert_service(state, s), |
230 | 0 | _ => Err(anyhow::anyhow!("unknown address type")), |
231 | | } |
232 | 0 | } |
233 | | |
234 | | #[instrument( |
235 | | level = Level::TRACE, |
236 | | name="insert_service", |
237 | | skip_all, |
238 | | fields(name=%service.name), |
239 | | )] |
240 | | pub fn insert_service( |
241 | | &self, |
242 | | state: &mut ProxyState, |
243 | | service: XdsService, |
244 | | ) -> anyhow::Result<()> { |
245 | | debug!("handling insert"); |
246 | | let mut service = Service::try_from(&service)?; |
247 | | |
248 | | // If the service already exists, add existing endpoints into the new service. |
249 | | if let Some(prev) = state |
250 | | .services |
251 | | .get_by_namespaced_host(&service.namespaced_hostname()) |
252 | | { |
253 | | for ep in prev.endpoints.iter() { |
254 | | if service.should_include_endpoint(ep.status) { |
255 | | service |
256 | | .endpoints |
257 | | .insert(ep.workload_uid.clone(), ep.clone()); |
258 | | } |
259 | | } |
260 | | } |
261 | | |
262 | | state.services.insert(service); |
263 | | Ok(()) |
264 | | } |
265 | | |
266 | 0 | pub fn insert_authorization( |
267 | 0 | &self, |
268 | 0 | state: &mut ProxyState, |
269 | 0 | xds_name: Strng, |
270 | 0 | r: XdsAuthorization, |
271 | 0 | ) -> anyhow::Result<()> { |
272 | 0 | info!("handling RBAC update {}", r.name); |
273 | | |
274 | 0 | let rbac = rbac::Authorization::try_from(r)?; |
275 | 0 | trace!( |
276 | 0 | "insert policy {}, {}", |
277 | | xds_name, |
278 | 0 | serde_json::to_string(&rbac)? |
279 | | ); |
280 | 0 | state.policies.insert(xds_name, rbac); |
281 | 0 | Ok(()) |
282 | 0 | } |
283 | | |
284 | 0 | pub fn remove_authorization(&self, state: &mut ProxyState, xds_name: Strng) { |
285 | 0 | info!("handling RBAC delete {}", xds_name); |
286 | 0 | state.policies.remove(xds_name); |
287 | 0 | } |
288 | | } |
289 | | |
290 | | impl Handler<XdsWorkload> for ProxyStateUpdater { |
291 | 0 | fn handle( |
292 | 0 | &self, |
293 | 0 | updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsWorkload>>>, |
294 | 0 | ) -> Result<(), Vec<RejectedConfig>> { |
295 | | // use deepsize::DeepSizeOf; |
296 | 0 | let mut state = self.state.write().unwrap(); |
297 | 0 | let handle = |res: XdsUpdate<XdsWorkload>| { |
298 | 0 | match res { |
299 | 0 | XdsUpdate::Update(w) => self.updater.insert_workload(&mut state, w.resource)?, |
300 | 0 | XdsUpdate::Remove(name) => { |
301 | 0 | debug!("handling delete {}", name); |
302 | 0 | self.updater.remove(&mut state, &strng::new(name)) |
303 | | } |
304 | | } |
305 | 0 | Ok(()) |
306 | 0 | }; |
307 | 0 | handle_single_resource(updates, handle) |
308 | 0 | } |
309 | | } |
310 | | |
311 | | impl Handler<XdsAddress> for ProxyStateUpdater { |
312 | 0 | fn handle( |
313 | 0 | &self, |
314 | 0 | updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsAddress>>>, |
315 | 0 | ) -> Result<(), Vec<RejectedConfig>> { |
316 | 0 | let mut state = self.state.write().unwrap(); |
317 | 0 | let handle = |res: XdsUpdate<XdsAddress>| { |
318 | 0 | match res { |
319 | 0 | XdsUpdate::Update(w) => self.updater.insert_address(&mut state, w.resource)?, |
320 | 0 | XdsUpdate::Remove(name) => { |
321 | 0 | debug!("handling delete {}", name); |
322 | 0 | self.updater.remove(&mut state, &strng::new(name)) |
323 | | } |
324 | | } |
325 | 0 | Ok(()) |
326 | 0 | }; |
327 | 0 | handle_single_resource(updates, handle) |
328 | 0 | } |
329 | | } |
330 | | |
331 | 0 | fn insert_service_endpoints( |
332 | 0 | workload: &Workload, |
333 | 0 | services: &HashMap<String, PortList>, |
334 | 0 | services_state: &mut ServiceStore, |
335 | 0 | ) -> anyhow::Result<()> { |
336 | 0 | for (namespaced_host, ports) in services { |
337 | | // Parse the namespaced hostname for the service. |
338 | 0 | let namespaced_host = NamespacedHostname::from_str(namespaced_host)?; |
339 | 0 | services_state.insert_endpoint( |
340 | 0 | namespaced_host, |
341 | 0 | Endpoint { |
342 | 0 | workload_uid: workload.uid.clone(), |
343 | 0 | port: ports.into(), |
344 | 0 | status: workload.status, |
345 | 0 | }, |
346 | | ) |
347 | | } |
348 | 0 | Ok(()) |
349 | 0 | } |
350 | | |
351 | | impl Handler<XdsAuthorization> for ProxyStateUpdater { |
352 | 0 | fn no_on_demand(&self) -> bool { |
353 | 0 | true |
354 | 0 | } |
355 | | |
356 | 0 | fn handle( |
357 | 0 | &self, |
358 | 0 | updates: Box<&mut dyn Iterator<Item = XdsUpdate<XdsAuthorization>>>, |
359 | 0 | ) -> Result<(), Vec<RejectedConfig>> { |
360 | 0 | let mut state = self.state.write().unwrap(); |
361 | 0 | let handle = |res: XdsUpdate<XdsAuthorization>| { |
362 | 0 | match res { |
363 | 0 | XdsUpdate::Update(w) => self |
364 | 0 | .updater |
365 | 0 | .insert_authorization(&mut state, w.name, w.resource)?, |
366 | 0 | XdsUpdate::Remove(name) => self.updater.remove_authorization(&mut state, name), |
367 | | } |
368 | 0 | Ok(()) |
369 | 0 | }; |
370 | 0 | let mut len_updates = 0; |
371 | 0 | let updates = updates.inspect(|_| len_updates += 1); |
372 | 0 | match handle_single_resource(updates, handle) { |
373 | | Ok(()) => { |
374 | 0 | state.policies.send(); |
375 | 0 | Ok(()) |
376 | | } |
377 | 0 | Err(e) => { |
378 | 0 | if e.len() < len_updates { |
379 | 0 | // not all config was rejected, we have _some_ valide update |
380 | 0 | state.policies.send(); |
381 | 0 | } |
382 | 0 | Err(e) |
383 | | } |
384 | | } |
385 | 0 | } |
386 | | } |
387 | | |
388 | | /// LocalClient serves as a local file reader alternative for XDS. This is intended for testing. |
389 | | pub struct LocalClient { |
390 | | pub cfg: ConfigSource, |
391 | | pub state: Arc<RwLock<ProxyState>>, |
392 | | pub cert_fetcher: Arc<dyn CertFetcher>, |
393 | | pub local_node: Option<Strng>, |
394 | | } |
395 | | |
396 | | #[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] |
397 | | #[serde(rename_all = "camelCase", deny_unknown_fields)] |
398 | | pub struct LocalWorkload { |
399 | | #[serde(flatten)] |
400 | | pub workload: Workload, |
401 | | pub services: HashMap<String, HashMap<u16, u16>>, |
402 | | } |
403 | | |
404 | | #[derive(Default, Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] |
405 | | #[serde(rename_all = "camelCase", deny_unknown_fields)] |
406 | | pub struct LocalConfig { |
407 | | #[serde(default)] |
408 | | pub workloads: Vec<LocalWorkload>, |
409 | | #[serde(default)] |
410 | | pub policies: Vec<Authorization>, |
411 | | #[serde(default)] |
412 | | pub services: Vec<Service>, |
413 | | } |
414 | | |
415 | | impl LocalClient { |
416 | | #[instrument(skip_all, name = "local_client")] |
417 | | pub async fn run(self) -> Result<(), anyhow::Error> { |
418 | | // Load initial state |
419 | | match &self.cfg { |
420 | | #[cfg(any(test, feature = "testing"))] |
421 | | ConfigSource::Dynamic(rx) => { |
422 | | let mut rx = rx.lock().await; |
423 | | let r = rx |
424 | | .recv() |
425 | | .await |
426 | | .ok_or(anyhow::anyhow!("did not get initial config"))?; |
427 | | self.load_config(r)?; |
428 | | rx.ack().await?; |
429 | | } |
430 | | f => { |
431 | | let r: LocalConfig = serde_yaml::from_str(&f.read_to_string().await?)?; |
432 | | self.load_config(r)?; |
433 | | } |
434 | | }; |
435 | | #[cfg(any(test, feature = "testing"))] |
436 | | if let ConfigSource::Dynamic(ref rx) = self.cfg { |
437 | | let rx = rx.clone(); |
438 | | tokio::spawn(async move { |
439 | | // Mutex is just for borrow checker; we know we are the only user and can hold the lock forever. |
440 | | let mut rx = rx.lock().await; |
441 | | while let Some(req) = rx.recv().await { |
442 | | if let Err(e) = self.load_config(req) { |
443 | | error!("failed to load dynamic config update: {e:?}"); |
444 | | } |
445 | | if let Err(e) = rx.ack().await { |
446 | | error!("failed to ack: {}", e); |
447 | | } |
448 | | } |
449 | | }); |
450 | | }; |
451 | | Ok(()) |
452 | | } |
453 | | |
454 | 0 | fn load_config(&self, r: LocalConfig) -> anyhow::Result<()> { |
455 | 0 | debug!( |
456 | 0 | "load local config: {}", |
457 | 0 | serde_yaml::to_string(&r).unwrap_or_default() |
458 | | ); |
459 | 0 | let mut state = self.state.write().unwrap(); |
460 | | // Clear the state |
461 | 0 | state.workloads = WorkloadStore::new(self.local_node.clone()); |
462 | 0 | state.services = Default::default(); |
463 | | // Policies have some channels, so we don't want to reset it entirely |
464 | 0 | state.policies.clear_all_policies(); |
465 | 0 | let num_workloads = r.workloads.len(); |
466 | 0 | let num_policies = r.policies.len(); |
467 | 0 | for wl in r.workloads { |
468 | 0 | trace!("inserting local workload {}", &wl.workload.uid); |
469 | 0 | self.cert_fetcher.prefetch_cert(&wl.workload); |
470 | 0 | let w = Arc::new(wl.workload); |
471 | 0 | state.workloads.insert(w.clone()); |
472 | | |
473 | 0 | let services: HashMap<String, PortList> = wl |
474 | 0 | .services |
475 | 0 | .into_iter() |
476 | 0 | .map(|(k, v)| (k, PortList::from(v))) |
477 | 0 | .collect(); |
478 | | |
479 | 0 | insert_service_endpoints(&w, &services, &mut state.services)?; |
480 | | } |
481 | 0 | for rbac in r.policies { |
482 | 0 | let xds_name = rbac.to_key(); |
483 | 0 | state.policies.insert(xds_name, rbac); |
484 | 0 | } |
485 | 0 | for svc in r.services { |
486 | 0 | state.services.insert(svc); |
487 | 0 | } |
488 | 0 | info!(%num_workloads, %num_policies, "local config initialized"); |
489 | 0 | Ok(()) |
490 | 0 | } |
491 | | } |