/src/ztunnel/src/proxy/metrics.rs
Line | Count | Source |
1 | | // Copyright Istio Authors |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::fmt::Write; |
16 | | use std::net::SocketAddr; |
17 | | use std::sync::atomic::{AtomicU64, Ordering}; |
18 | | use std::sync::{Arc, atomic}; |
19 | | use std::time::Instant; |
20 | | |
21 | | use prometheus_client::encoding::{ |
22 | | EncodeLabelSet, EncodeLabelValue, LabelSetEncoder, LabelValueEncoder, |
23 | | }; |
24 | | use prometheus_client::metrics::counter::{Atomic, Counter}; |
25 | | use prometheus_client::metrics::family::Family; |
26 | | use prometheus_client::metrics::gauge::Gauge; |
27 | | use prometheus_client::registry::Registry; |
28 | | |
29 | | use tracing::event; |
30 | | use tracing_core::field::Value; |
31 | | |
32 | | use crate::identity::Identity; |
33 | | use crate::metrics::DefaultedUnknown; |
34 | | use crate::proxy::{self, HboneAddress}; |
35 | | |
36 | | use crate::state::service::ServiceDescription; |
37 | | use crate::state::workload::Workload; |
38 | | use crate::strng::{RichStrng, Strng}; |
39 | | |
40 | | #[derive(Debug)] |
41 | | pub struct Metrics { |
42 | | pub connection_opens: Family<CommonTrafficLabels, Counter>, |
43 | | pub connection_close: Family<CommonTrafficLabels, Counter>, |
44 | | pub connection_failures: Family<CommonTrafficLabels, Counter>, |
45 | | pub open_sockets: Family<SocketLabels, Gauge>, |
46 | | pub received_bytes: Family<CommonTrafficLabels, Counter>, |
47 | | pub sent_bytes: Family<CommonTrafficLabels, Counter>, |
48 | | |
49 | | // on-demand DNS is not a part of DNS proxy, but part of ztunnel proxy itself |
50 | | pub on_demand_dns: Family<OnDemandDnsLabels, Counter>, |
51 | | } |
52 | | |
53 | | #[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] |
54 | | pub enum Reporter { |
55 | | #[default] |
56 | | source, |
57 | | #[allow(dead_code)] |
58 | | destination, |
59 | | } |
60 | | |
61 | | #[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] |
62 | | pub enum RequestProtocol { |
63 | | #[default] |
64 | | tcp, |
65 | | #[allow(dead_code)] |
66 | | http, |
67 | | } |
68 | | |
69 | | #[derive(Default, Copy, Clone, Debug, Hash, PartialEq, Eq)] |
70 | | pub enum ResponseFlags { |
71 | | #[default] |
72 | | None, |
73 | | // connection denied due to policy |
74 | | AuthorizationPolicyDenied, |
75 | | // connection denied because we could not establish an upstream connection |
76 | | ConnectionFailure, |
77 | | // TLS handshake failure |
78 | | TlsFailure, |
79 | | // HTTP/2 handshake failure |
80 | | Http2HandshakeFailure, |
81 | | // Network policy blocking connection |
82 | | NetworkPolicyError, |
83 | | // Identity/certificate error |
84 | | IdentityError, |
85 | | } |
86 | | |
87 | | impl EncodeLabelValue for ResponseFlags { |
88 | 0 | fn encode(&self, writer: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> { |
89 | 0 | match self { |
90 | 0 | ResponseFlags::None => writer.write_str("-"), |
91 | 0 | ResponseFlags::AuthorizationPolicyDenied => writer.write_str("DENY"), |
92 | 0 | ResponseFlags::ConnectionFailure => writer.write_str("CONNECT"), |
93 | 0 | ResponseFlags::TlsFailure => writer.write_str("TLS_FAILURE"), |
94 | 0 | ResponseFlags::Http2HandshakeFailure => writer.write_str("H2_HANDSHAKE_FAILURE"), |
95 | 0 | ResponseFlags::NetworkPolicyError => writer.write_str("NETWORK_POLICY"), |
96 | 0 | ResponseFlags::IdentityError => writer.write_str("IDENTITY_ERROR"), |
97 | | } |
98 | 0 | } |
99 | | } |
100 | | |
101 | | #[derive(Default, Copy, Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] |
102 | | pub enum SecurityPolicy { |
103 | | #[default] |
104 | | unknown, |
105 | | mutual_tls, |
106 | | } |
107 | | |
108 | | #[derive(Clone, Debug, Default)] |
109 | | pub struct DerivedWorkload { |
110 | | pub workload_name: Option<Strng>, |
111 | | pub app: Option<Strng>, |
112 | | pub revision: Option<Strng>, |
113 | | pub namespace: Option<Strng>, |
114 | | pub identity: Option<Identity>, |
115 | | pub cluster_id: Option<Strng>, |
116 | | pub region: Option<Strng>, |
117 | | pub zone: Option<Strng>, |
118 | | } |
119 | | |
120 | | #[derive(Clone)] |
121 | | pub struct ConnectionOpen { |
122 | | pub reporter: Reporter, |
123 | | pub source: Option<Arc<Workload>>, |
124 | | pub derived_source: Option<DerivedWorkload>, |
125 | | pub destination: Option<Arc<Workload>>, |
126 | | pub destination_service: Option<ServiceDescription>, |
127 | | pub connection_security_policy: SecurityPolicy, |
128 | | } |
129 | | |
130 | | impl CommonTrafficLabels { |
131 | 0 | fn new() -> Self { |
132 | 0 | Default::default() |
133 | 0 | } |
134 | | |
135 | 0 | fn with_source(mut self, w: Option<&Workload>) -> Self { |
136 | 0 | let Some(w) = w else { return self }; |
137 | 0 | self.source_workload = w.workload_name.clone().into(); |
138 | 0 | self.source_canonical_service = w.canonical_name.clone().into(); |
139 | 0 | self.source_canonical_revision = w.canonical_revision.clone().into(); |
140 | 0 | self.source_workload_namespace = w.namespace.clone().into(); |
141 | | // We explicitly do not set source_principal here. This is set only with with_derived_source |
142 | | // based on the real mTLS identity. |
143 | 0 | self.source_app = w.canonical_name.clone().into(); |
144 | 0 | self.source_version = w.canonical_revision.clone().into(); |
145 | 0 | self.source_cluster = w.cluster_id.to_string().into(); |
146 | | |
147 | 0 | let mut local = self.locality.0.unwrap_or_default(); |
148 | 0 | local.source_region = w.locality.region.clone().into(); |
149 | 0 | local.source_zone = w.locality.zone.clone().into(); |
150 | 0 | self.locality = OptionallyEncode(Some(local)); |
151 | | |
152 | 0 | self |
153 | 0 | } |
154 | | |
155 | 0 | fn with_derived_source(mut self, w: Option<&DerivedWorkload>) -> Self { |
156 | 0 | let Some(w) = w else { return self }; |
157 | 0 | self.source_workload = w.workload_name.clone().into(); |
158 | 0 | self.source_canonical_service = w.app.clone().into(); |
159 | 0 | self.source_canonical_revision = w.revision.clone().into(); |
160 | 0 | self.source_workload_namespace = w.namespace.clone().into(); |
161 | 0 | self.source_app = w.workload_name.clone().into(); |
162 | 0 | self.source_version = w.revision.clone().into(); |
163 | 0 | self.source_cluster = w.cluster_id.clone().into(); |
164 | | // This is the identity from the TLS handshake; this is the most trustworthy source so use it |
165 | 0 | self.source_principal = w.identity.clone().into(); |
166 | | |
167 | 0 | let mut local = self.locality.0.unwrap_or_default(); |
168 | 0 | local.source_region = w.region.clone().into(); |
169 | 0 | local.source_zone = w.zone.clone().into(); |
170 | 0 | self.locality = OptionallyEncode(Some(local)); |
171 | | |
172 | 0 | self |
173 | 0 | } |
174 | | |
175 | 0 | fn with_destination(mut self, w: Option<&Workload>) -> Self { |
176 | 0 | let Some(w) = w else { return self }; |
177 | 0 | self.destination_workload = w.workload_name.clone().into(); |
178 | 0 | self.destination_canonical_service = w.canonical_name.clone().into(); |
179 | 0 | self.destination_canonical_revision = w.canonical_revision.clone().into(); |
180 | 0 | self.destination_workload_namespace = w.namespace.clone().into(); |
181 | 0 | self.destination_principal = w.identity().into(); |
182 | 0 | self.destination_app = w.canonical_name.clone().into(); |
183 | 0 | self.destination_version = w.canonical_revision.clone().into(); |
184 | 0 | self.destination_cluster = w.cluster_id.to_string().into(); |
185 | | |
186 | 0 | let mut local = self.locality.0.unwrap_or_default(); |
187 | 0 | local.destination_region = w.locality.region.clone().into(); |
188 | 0 | local.destination_zone = w.locality.zone.clone().into(); |
189 | 0 | self.locality = OptionallyEncode(Some(local)); |
190 | | |
191 | 0 | self |
192 | 0 | } |
193 | | |
194 | 0 | fn with_destination_service(mut self, w: Option<&ServiceDescription>) -> Self { |
195 | 0 | let Some(w) = w else { return self }; |
196 | 0 | self.destination_service = w.hostname.clone().into(); |
197 | 0 | self.destination_service_name = w.name.clone().into(); |
198 | 0 | self.destination_service_namespace = w.namespace.clone().into(); |
199 | 0 | self |
200 | 0 | } |
201 | | |
202 | 0 | fn with_derived_destination(mut self, w: Option<&DerivedWorkload>) -> Self { |
203 | 0 | let Some(w) = w else { return self }; |
204 | 0 | self.destination_workload = w.workload_name.clone().into(); |
205 | 0 | self.destination_canonical_service = w.app.clone().into(); |
206 | 0 | self.destination_canonical_revision = w.revision.clone().into(); |
207 | 0 | self.destination_workload_namespace = w.namespace.clone().into(); |
208 | 0 | self.destination_app = w.workload_name.clone().into(); |
209 | 0 | self.destination_version = w.revision.clone().into(); |
210 | 0 | self.destination_cluster = w.cluster_id.clone().into(); |
211 | | // This is the identity from the TLS handshake; this is the most trustworthy source so use it |
212 | 0 | self.destination_principal = w.identity.clone().into(); |
213 | | |
214 | 0 | let mut local = self.locality.0.unwrap_or_default(); |
215 | 0 | local.destination_region = w.region.clone().into(); |
216 | 0 | local.destination_zone = w.zone.clone().into(); |
217 | 0 | self.locality = OptionallyEncode(Some(local)); |
218 | | |
219 | 0 | self |
220 | 0 | } |
221 | | } |
222 | | |
223 | | impl From<ConnectionOpen> for CommonTrafficLabels { |
224 | 0 | fn from(c: ConnectionOpen) -> Self { |
225 | 0 | CommonTrafficLabels { |
226 | 0 | reporter: c.reporter, |
227 | 0 | request_protocol: RequestProtocol::tcp, |
228 | 0 | response_flags: ResponseFlags::None, |
229 | 0 | connection_security_policy: c.connection_security_policy, |
230 | 0 | ..CommonTrafficLabels::new() |
231 | 0 | // Intentionally before with_source; source is more reliable |
232 | 0 | .with_derived_source(c.derived_source.as_ref()) |
233 | 0 | .with_source(c.source.as_deref()) |
234 | 0 | .with_destination(c.destination.as_deref()) |
235 | 0 | .with_destination_service(c.destination_service.as_ref()) |
236 | 0 | } |
237 | 0 | } |
238 | | } |
239 | | |
240 | | /// Minimal labels for socket metrics (without direction) |
241 | | #[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] |
242 | | pub struct SocketLabels { |
243 | | pub reporter: Reporter, |
244 | | } |
245 | | |
246 | | #[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] |
247 | | pub struct CommonTrafficLabels { |
248 | | reporter: Reporter, |
249 | | |
250 | | source_workload: DefaultedUnknown<RichStrng>, |
251 | | source_canonical_service: DefaultedUnknown<RichStrng>, |
252 | | source_canonical_revision: DefaultedUnknown<RichStrng>, |
253 | | source_workload_namespace: DefaultedUnknown<RichStrng>, |
254 | | source_principal: DefaultedUnknown<Identity>, |
255 | | source_app: DefaultedUnknown<RichStrng>, |
256 | | source_version: DefaultedUnknown<RichStrng>, |
257 | | source_cluster: DefaultedUnknown<RichStrng>, |
258 | | |
259 | | destination_service: DefaultedUnknown<RichStrng>, |
260 | | destination_service_namespace: DefaultedUnknown<RichStrng>, |
261 | | destination_service_name: DefaultedUnknown<RichStrng>, |
262 | | |
263 | | destination_workload: DefaultedUnknown<RichStrng>, |
264 | | destination_canonical_service: DefaultedUnknown<RichStrng>, |
265 | | destination_canonical_revision: DefaultedUnknown<RichStrng>, |
266 | | destination_workload_namespace: DefaultedUnknown<RichStrng>, |
267 | | destination_principal: DefaultedUnknown<Identity>, |
268 | | destination_app: DefaultedUnknown<RichStrng>, |
269 | | destination_version: DefaultedUnknown<RichStrng>, |
270 | | destination_cluster: DefaultedUnknown<RichStrng>, |
271 | | |
272 | | request_protocol: RequestProtocol, |
273 | | response_flags: ResponseFlags, |
274 | | connection_security_policy: SecurityPolicy, |
275 | | |
276 | | #[prometheus(flatten)] |
277 | | locality: OptionallyEncode<LocalityLabels>, |
278 | | } |
279 | | |
280 | | /// OptionallyEncode is a wrapper that will optionally encode the entire label set. |
281 | | /// This differs from something like DefaultedUnknown which handles only the value - this makes the |
282 | | /// entire label not show up. |
283 | | #[derive(Clone, Hash, Default, Debug, PartialEq, Eq)] |
284 | | struct OptionallyEncode<T>(Option<T>); |
285 | | impl<T: EncodeLabelSet> EncodeLabelSet for OptionallyEncode<T> { |
286 | 0 | fn encode(&self, encoder: &mut LabelSetEncoder) -> Result<(), std::fmt::Error> { |
287 | 0 | match &self.0 { |
288 | 0 | None => Ok(()), |
289 | 0 | Some(ll) => ll.encode(encoder), |
290 | | } |
291 | 0 | } |
292 | | } |
293 | | #[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] |
294 | | struct LocalityLabels { |
295 | | source_region: DefaultedUnknown<RichStrng>, |
296 | | source_zone: DefaultedUnknown<RichStrng>, |
297 | | destination_region: DefaultedUnknown<RichStrng>, |
298 | | destination_zone: DefaultedUnknown<RichStrng>, |
299 | | } |
300 | | |
301 | | #[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)] |
302 | | pub struct OnDemandDnsLabels { |
303 | | // on-demand DNS client information is just nice-to-have |
304 | | source_workload: DefaultedUnknown<RichStrng>, |
305 | | source_canonical_service: DefaultedUnknown<RichStrng>, |
306 | | source_canonical_revision: DefaultedUnknown<RichStrng>, |
307 | | source_workload_namespace: DefaultedUnknown<RichStrng>, |
308 | | source_principal: DefaultedUnknown<Identity>, |
309 | | source_app: DefaultedUnknown<RichStrng>, |
310 | | source_version: DefaultedUnknown<RichStrng>, |
311 | | source_cluster: DefaultedUnknown<RichStrng>, |
312 | | |
313 | | // on-demand DNS is resolved per hostname, so this is the most interesting part |
314 | | hostname: DefaultedUnknown<RichStrng>, |
315 | | } |
316 | | |
317 | | impl OnDemandDnsLabels { |
318 | 0 | pub fn new() -> Self { |
319 | 0 | Default::default() |
320 | 0 | } |
321 | | |
322 | 0 | pub fn with_source(mut self, w: &Workload) -> Self { |
323 | 0 | self.source_workload = w.workload_name.clone().into(); |
324 | 0 | self.source_canonical_service = w.canonical_name.clone().into(); |
325 | 0 | self.source_canonical_revision = w.canonical_revision.clone().into(); |
326 | 0 | self.source_workload_namespace = w.namespace.clone().into(); |
327 | 0 | self.source_principal = w.identity().into(); |
328 | 0 | self.source_app = w.canonical_name.clone().into(); |
329 | 0 | self.source_version = w.canonical_revision.clone().into(); |
330 | 0 | self.source_cluster = w.cluster_id.to_string().into(); |
331 | 0 | self |
332 | 0 | } |
333 | | |
334 | 0 | pub fn with_destination(mut self, w: &Workload) -> Self { |
335 | 0 | self.hostname = w.hostname.clone().into(); |
336 | 0 | self |
337 | 0 | } |
338 | | } |
339 | | |
340 | | impl Metrics { |
341 | 0 | pub fn new(registry: &mut Registry) -> Self { |
342 | 0 | let connection_opens = Family::default(); |
343 | 0 | registry.register( |
344 | | "tcp_connections_opened", |
345 | | "The total number of TCP connections opened", |
346 | 0 | connection_opens.clone(), |
347 | | ); |
348 | 0 | let connection_close = Family::default(); |
349 | 0 | registry.register( |
350 | | "tcp_connections_closed", |
351 | | "The total number of TCP connections closed", |
352 | 0 | connection_close.clone(), |
353 | | ); |
354 | | |
355 | 0 | let received_bytes = Family::default(); |
356 | 0 | registry.register( |
357 | | "tcp_received_bytes", |
358 | | "The size of total bytes received during request in case of a TCP connection", |
359 | 0 | received_bytes.clone(), |
360 | | ); |
361 | 0 | let sent_bytes = Family::default(); |
362 | 0 | registry.register( |
363 | | "tcp_sent_bytes", |
364 | | "The size of total bytes sent during response in case of a TCP connection", |
365 | 0 | sent_bytes.clone(), |
366 | | ); |
367 | 0 | let on_demand_dns = Family::default(); |
368 | 0 | registry.register( |
369 | | "on_demand_dns", |
370 | | "The total number of requests that used on-demand DNS (unstable)", |
371 | 0 | on_demand_dns.clone(), |
372 | | ); |
373 | | |
374 | 0 | let connection_failures = Family::default(); |
375 | 0 | registry.register( |
376 | | "tcp_connections_failed", |
377 | | "The total number of TCP connections that failed to establish (unstable)", |
378 | 0 | connection_failures.clone(), |
379 | | ); |
380 | | |
381 | 0 | let open_sockets = Family::default(); |
382 | 0 | registry.register( |
383 | | "tcp_sockets_open", |
384 | | "The current number of open TCP sockets (unstable)", |
385 | 0 | open_sockets.clone(), |
386 | | ); |
387 | | |
388 | 0 | Self { |
389 | 0 | connection_opens, |
390 | 0 | connection_close, |
391 | 0 | received_bytes, |
392 | 0 | sent_bytes, |
393 | 0 | on_demand_dns, |
394 | 0 | connection_failures, |
395 | 0 | open_sockets, |
396 | 0 | } |
397 | 0 | } |
398 | | |
399 | 0 | pub fn record_socket_open(&self, labels: &SocketLabels) { |
400 | 0 | self.open_sockets.get_or_create(labels).inc(); |
401 | 0 | } |
402 | | |
403 | 0 | pub fn record_socket_close(&self, labels: &SocketLabels) { |
404 | 0 | self.open_sockets.get_or_create(labels).dec(); |
405 | 0 | } |
406 | | } |
407 | | |
408 | | /// Guard to ensure socket close is recorded even if task is cancelled |
409 | | /// This should be created at the start of an async block that handles a socket |
410 | | /// Stores only the minimal information needed to reconstruct labels, avoiding |
411 | | /// cloning the large CommonTrafficLabels struct |
412 | | pub struct SocketCloseGuard { |
413 | | metrics: Arc<Metrics>, |
414 | | reporter: Reporter, |
415 | | } |
416 | | |
417 | | impl Drop for SocketCloseGuard { |
418 | 0 | fn drop(&mut self) { |
419 | 0 | let labels = SocketLabels { |
420 | 0 | reporter: self.reporter, |
421 | 0 | }; |
422 | 0 | self.metrics.record_socket_close(&labels); |
423 | 0 | } |
424 | | } |
425 | | |
426 | | impl SocketCloseGuard { |
427 | | /// Create a new socket close guard |
428 | 0 | pub fn new(metrics: Arc<Metrics>, reporter: Reporter) -> Self { |
429 | 0 | Self { metrics, reporter } |
430 | 0 | } |
431 | | } |
432 | | |
433 | | #[derive(Debug)] |
434 | | /// ConnectionResult abstracts recording a metric and emitting an access log upon a connection completion |
435 | | pub struct ConnectionResult { |
436 | | // Src address and name |
437 | | pub(crate) src: (SocketAddr, Option<RichStrng>), |
438 | | // Dst address and name |
439 | | pub(crate) dst: (SocketAddr, Option<RichStrng>), |
440 | | pub(crate) hbone_target: Option<HboneAddress>, |
441 | | pub(crate) start: Instant, |
442 | | |
443 | | // TODO: storing CommonTrafficLabels adds ~600 bytes retained throughout a connection life time. |
444 | | // We can pre-fetch the metrics we need at initialization instead of storing this, then keep a more |
445 | | // efficient representation for the fields we need to log. Ideally, this would even be optional |
446 | | // in case logs were disabled. |
447 | | pub(crate) tl: CommonTrafficLabels, |
448 | | pub(crate) metrics: Arc<Metrics>, |
449 | | |
450 | | // sent records the number of bytes sent on this connection |
451 | | pub(crate) sent: AtomicU64, |
452 | | // sent_metric records the number of bytes sent on this connection to the aggregated metric counter |
453 | | pub(crate) sent_metric: Counter, |
454 | | // recv records the number of bytes received on this connection |
455 | | pub(crate) recv: AtomicU64, |
456 | | // recv_metric records the number of bytes received on this connection to the aggregated metric counter |
457 | | pub(crate) recv_metric: Counter, |
458 | | // Have we recorded yet? |
459 | | pub(crate) recorded: bool, |
460 | | } |
461 | | |
462 | | // log_early_deny allows logging a connection is denied before we have enough information to emit proper |
463 | | // access logs/metrics |
464 | 0 | pub fn log_early_deny<E: std::error::Error>( |
465 | 0 | src: SocketAddr, |
466 | 0 | dst: SocketAddr, |
467 | 0 | reporter: Reporter, |
468 | 0 | err: E, |
469 | 0 | ) { |
470 | 0 | event!( |
471 | | target: "access", |
472 | | parent: None, |
473 | 0 | tracing::Level::WARN, |
474 | | |
475 | | src.addr = %src, |
476 | | dst.addr = %dst, |
477 | | |
478 | 0 | direction = if reporter == Reporter::source { |
479 | 0 | "outbound" |
480 | | } else { |
481 | 0 | "inbound" |
482 | | }, |
483 | | |
484 | 0 | error = format!("{err}"), |
485 | | |
486 | 0 | "connection failed" |
487 | | ); |
488 | 0 | } Unexecuted instantiation: ztunnel::proxy::metrics::log_early_deny::<ztunnel::proxy::Error> Unexecuted instantiation: ztunnel::proxy::metrics::log_early_deny::<ztunnel::tls::lib::TlsError> |
489 | | |
490 | | macro_rules! access_log { |
491 | | ($res:expr, $($fields:tt)*) => { |
492 | 0 | let err = $res.as_ref().err().map(|e| e.to_string()); |
493 | | match $res { |
494 | | Ok(_) => { |
495 | | event!( |
496 | | target: "access", |
497 | | parent: None, |
498 | | tracing::Level::INFO, |
499 | | $($fields)* |
500 | | "connection complete" |
501 | | ); |
502 | | } |
503 | | Err(_) => { |
504 | | event!( |
505 | | target: "access", |
506 | | parent: None, |
507 | | tracing::Level::ERROR, |
508 | | $($fields)* |
509 | | error = err, |
510 | | "connection complete" |
511 | | ); |
512 | | } |
513 | | } |
514 | | }; |
515 | | } |
516 | | |
517 | | pub struct ConnectionResultBuilder { |
518 | | src: (SocketAddr, Option<RichStrng>), |
519 | | dst: (SocketAddr, Option<RichStrng>), |
520 | | hbone_target: Option<HboneAddress>, |
521 | | start: Instant, |
522 | | tl: CommonTrafficLabels, |
523 | | metrics: Arc<Metrics>, |
524 | | } |
525 | | |
526 | | impl ConnectionResultBuilder { |
527 | 0 | pub fn new( |
528 | 0 | src: SocketAddr, |
529 | 0 | dst: SocketAddr, |
530 | 0 | // If using hbone, the inner HBONE address |
531 | 0 | // That is, dst is the L4 address, while is the :authority. |
532 | 0 | hbone_target: Option<HboneAddress>, |
533 | 0 | start: Instant, |
534 | 0 | conn: ConnectionOpen, |
535 | 0 | metrics: Arc<Metrics>, |
536 | 0 | ) -> Self { |
537 | | // for src and dest, try to get pod name but fall back to "canonical service" |
538 | 0 | let mut src = (src, conn.source.as_ref().map(|wl| wl.name.clone().into())); |
539 | 0 | let mut dst = ( |
540 | 0 | dst, |
541 | 0 | conn.destination.as_ref().map(|wl| wl.name.clone().into()), |
542 | | ); |
543 | 0 | let tl = CommonTrafficLabels::from(conn); |
544 | | |
545 | 0 | src.1 = src.1.or(tl.source_canonical_service.clone().inner()); |
546 | 0 | dst.1 = dst.1.or(tl.destination_canonical_service.clone().inner()); |
547 | 0 | Self { |
548 | 0 | src, |
549 | 0 | dst, |
550 | 0 | hbone_target, |
551 | 0 | start, |
552 | 0 | tl, |
553 | 0 | metrics, |
554 | 0 | } |
555 | 0 | } |
556 | | |
557 | 0 | pub fn with_derived_source(mut self, w: &DerivedWorkload) -> Self { |
558 | 0 | self.tl = self.tl.with_derived_source(Some(w)); |
559 | 0 | self.src.1 = w.workload_name.clone().map(RichStrng::from); |
560 | 0 | self |
561 | 0 | } |
562 | | |
563 | 0 | pub fn with_derived_destination(mut self, w: &DerivedWorkload) -> Self { |
564 | 0 | self.tl = self.tl.with_derived_destination(Some(w)); |
565 | 0 | self.dst.1 = w.workload_name.clone().map(RichStrng::from); |
566 | 0 | self |
567 | 0 | } |
568 | | |
569 | 0 | pub fn build(self) -> ConnectionResult { |
570 | | // Grab the metrics with our labels now, so we don't need to fetch them each time. |
571 | | // The inner metric is an Arc so clone is fine/cheap. |
572 | | // With the raw Counter, we increment is a simple atomic add operation (~1ns). |
573 | | // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would |
574 | | // add up. |
575 | 0 | let sent_metric = self.metrics.sent_bytes.get_or_create(&self.tl).clone(); |
576 | 0 | let recv_metric = self.metrics.received_bytes.get_or_create(&self.tl).clone(); |
577 | 0 | let sent = atomic::AtomicU64::new(0); |
578 | 0 | let recv = atomic::AtomicU64::new(0); |
579 | | |
580 | 0 | let mtls = self.tl.connection_security_policy == SecurityPolicy::mutual_tls; |
581 | 0 | event!( |
582 | | target: "access", |
583 | | parent: None, |
584 | 0 | tracing::Level::DEBUG, |
585 | | |
586 | | src.addr = %self.src.0, |
587 | 0 | src.workload = self.src.1.as_deref().map(to_value), |
588 | 0 | src.namespace = self.tl.source_workload_namespace.to_value(), |
589 | 0 | src.identity = self.tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), |
590 | | |
591 | | dst.addr = %self.dst.0, |
592 | 0 | dst.hbone_addr = self.hbone_target.as_ref().map(display), |
593 | 0 | dst.service = self.tl.destination_service.to_value(), |
594 | 0 | dst.workload = self.dst.1.as_deref().map(to_value), |
595 | 0 | dst.namespace = self.tl.destination_workload_namespace.to_value(), |
596 | 0 | dst.identity = self.tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), |
597 | | |
598 | 0 | direction = if self.tl.reporter == Reporter::source { |
599 | 0 | "outbound" |
600 | | } else { |
601 | 0 | "inbound" |
602 | | }, |
603 | | |
604 | 0 | "connection opened" |
605 | | ); |
606 | | |
607 | 0 | self.metrics.connection_opens.get_or_create(&self.tl).inc(); |
608 | 0 | ConnectionResult { |
609 | 0 | src: self.src, |
610 | 0 | dst: self.dst, |
611 | 0 | hbone_target: self.hbone_target, |
612 | 0 | start: self.start, |
613 | 0 | tl: self.tl, |
614 | 0 | metrics: self.metrics, |
615 | 0 | sent, |
616 | 0 | sent_metric, |
617 | 0 | recv, |
618 | 0 | recv_metric, |
619 | 0 | recorded: false, |
620 | 0 | } |
621 | 0 | } |
622 | | } |
623 | | |
624 | | impl ConnectionResult { |
625 | 0 | pub fn increment_send(&self, res: u64) { |
626 | 0 | self.sent.inc_by(res); |
627 | 0 | self.sent_metric.inc_by(res); |
628 | 0 | } |
629 | | |
630 | 0 | pub fn increment_recv(&self, res: u64) { |
631 | 0 | self.recv.inc_by(res); |
632 | 0 | self.recv_metric.inc_by(res); |
633 | 0 | } |
634 | | |
635 | | // Record our final result, with more details as a response flag. |
636 | 0 | pub fn record_with_flag<E: std::error::Error + 'static>( |
637 | 0 | mut self, |
638 | 0 | res: Result<(), E>, |
639 | 0 | flag: ResponseFlags, |
640 | 0 | ) { |
641 | 0 | self.tl.response_flags = flag; |
642 | 0 | self.record(res) |
643 | 0 | } |
644 | | |
645 | | // Record our final result. |
646 | 0 | pub fn record<E: std::error::Error + 'static>(mut self, res: Result<(), E>) { |
647 | | // If no specific flag was set and we have an error, try to infer the failure reason |
648 | 0 | if self.tl.response_flags == ResponseFlags::None |
649 | 0 | && let Err(ref err) = res |
650 | 0 | { |
651 | 0 | self.tl.response_flags = Self::extract_failure_reason(err); |
652 | 0 | } |
653 | 0 | self.record_internal(res) |
654 | 0 | } |
655 | | |
656 | | // Extract failure reason from error type using downcasting |
657 | 0 | fn extract_failure_reason<E: std::error::Error + 'static>(err: &E) -> ResponseFlags { |
658 | | use std::any::Any; |
659 | | |
660 | | // Try to downcast the error itself to proxy::Error |
661 | 0 | if let Some(proxy_err) = (err as &dyn Any).downcast_ref::<proxy::Error>() { |
662 | 0 | return match proxy_err { |
663 | 0 | proxy::Error::Tls(_) => ResponseFlags::TlsFailure, |
664 | | proxy::Error::Http2Handshake(_) | proxy::Error::H2(_) => { |
665 | 0 | ResponseFlags::Http2HandshakeFailure |
666 | | } |
667 | 0 | proxy::Error::MaybeHBONENetworkPolicyError(_) => ResponseFlags::NetworkPolicyError, |
668 | 0 | proxy::Error::Identity(_) => ResponseFlags::IdentityError, |
669 | | proxy::Error::AuthorizationPolicyRejection(_) |
670 | | | proxy::Error::AuthorizationPolicyLateRejection => { |
671 | 0 | ResponseFlags::AuthorizationPolicyDenied |
672 | | } |
673 | 0 | proxy::Error::ConnectionFailed(_) => ResponseFlags::ConnectionFailure, |
674 | 0 | _ => ResponseFlags::ConnectionFailure, |
675 | | }; |
676 | 0 | } |
677 | | |
678 | | // Default to generic connection failure if we can't identify the error type |
679 | 0 | ResponseFlags::ConnectionFailure |
680 | 0 | } |
681 | | |
682 | | // Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions. |
683 | 0 | fn record_internal<E: std::error::Error + 'static>(&mut self, res: Result<(), E>) { |
684 | 0 | debug_assert!(!self.recorded, "record called multiple times"); |
685 | 0 | if self.recorded { |
686 | 0 | return; |
687 | 0 | } |
688 | 0 | self.recorded = true; |
689 | 0 | let tl = &self.tl; |
690 | | |
691 | | // Unconditionally record the connection was closed |
692 | 0 | self.metrics.connection_close.get_or_create(tl).inc(); |
693 | | |
694 | 0 | if matches!( |
695 | 0 | tl.response_flags, |
696 | | ResponseFlags::ConnectionFailure |
697 | | | ResponseFlags::AuthorizationPolicyDenied |
698 | | | ResponseFlags::TlsFailure |
699 | | | ResponseFlags::Http2HandshakeFailure |
700 | | | ResponseFlags::NetworkPolicyError |
701 | | | ResponseFlags::IdentityError |
702 | 0 | ) { |
703 | 0 | self.metrics.connection_failures.get_or_create(tl).inc(); |
704 | 0 | } |
705 | | |
706 | | // Unconditionally write out an access log |
707 | 0 | let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls; |
708 | 0 | let bytes = ( |
709 | 0 | self.recv.load(Ordering::SeqCst), |
710 | 0 | self.sent.load(Ordering::SeqCst), |
711 | 0 | ); |
712 | 0 | let dur = format!("{}ms", self.start.elapsed().as_millis()); |
713 | | |
714 | | // We use our own macro to allow setting the level dynamically |
715 | 0 | access_log!( |
716 | 0 | res, |
717 | | |
718 | | src.addr = %self.src.0, |
719 | 0 | src.workload = self.src.1.as_deref().map(to_value), |
720 | 0 | src.namespace = tl.source_workload_namespace.to_value(), |
721 | 0 | src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), |
722 | | |
723 | | dst.addr = %self.dst.0, |
724 | 0 | dst.hbone_addr = self.hbone_target.as_ref().map(display), |
725 | 0 | dst.service = tl.destination_service.to_value(), |
726 | 0 | dst.workload = self.dst.1.as_deref().map(to_value), |
727 | 0 | dst.namespace = tl.destination_workload_namespace.to_value(), |
728 | 0 | dst.identity = tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned), |
729 | | |
730 | 0 | direction = if tl.reporter == Reporter::source { |
731 | 0 | "outbound" |
732 | | } else { |
733 | 0 | "inbound" |
734 | | }, |
735 | | |
736 | | // Istio flips the metric for source: https://github.com/istio/istio/issues/32399 |
737 | | // Unflip for logs |
738 | 0 | bytes_sent = if tl.reporter == Reporter::source {bytes.0} else {bytes.1}, |
739 | 0 | bytes_recv = if tl.reporter == Reporter::source {bytes.1} else {bytes.0}, |
740 | | duration = dur, |
741 | | ); |
742 | 0 | } |
743 | | } |
744 | | |
745 | | impl Drop for ConnectionResult { |
746 | 0 | fn drop(&mut self) { |
747 | 0 | if !self.recorded { |
748 | 0 | self.record_internal(Err(proxy::Error::ClosedFromDrain)) |
749 | 0 | } |
750 | 0 | } |
751 | | } |
752 | | |
753 | 0 | fn to_value_owned<T: ToString>(t: T) -> impl Value { |
754 | 0 | t.to_string() |
755 | 0 | } |
756 | | |
757 | 0 | fn to_value<T: AsRef<str>>(t: &T) -> impl Value + '_ { |
758 | 0 | let v: &str = t.as_ref(); |
759 | 0 | v |
760 | 0 | } |