Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/proxy/metrics.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::fmt::Write;
16
use std::net::SocketAddr;
17
use std::sync::atomic::{AtomicU64, Ordering};
18
use std::sync::{Arc, atomic};
19
use std::time::Instant;
20
21
use prometheus_client::encoding::{
22
    EncodeLabelSet, EncodeLabelValue, LabelSetEncoder, LabelValueEncoder,
23
};
24
use prometheus_client::metrics::counter::{Atomic, Counter};
25
use prometheus_client::metrics::family::Family;
26
use prometheus_client::metrics::gauge::Gauge;
27
use prometheus_client::registry::Registry;
28
29
use tracing::event;
30
use tracing_core::field::Value;
31
32
use crate::identity::Identity;
33
use crate::metrics::DefaultedUnknown;
34
use crate::proxy::{self, HboneAddress};
35
36
use crate::state::service::ServiceDescription;
37
use crate::state::workload::Workload;
38
use crate::strng::{RichStrng, Strng};
39
40
#[derive(Debug)]
41
pub struct Metrics {
42
    pub connection_opens: Family<CommonTrafficLabels, Counter>,
43
    pub connection_close: Family<CommonTrafficLabels, Counter>,
44
    pub connection_failures: Family<CommonTrafficLabels, Counter>,
45
    pub open_sockets: Family<SocketLabels, Gauge>,
46
    pub received_bytes: Family<CommonTrafficLabels, Counter>,
47
    pub sent_bytes: Family<CommonTrafficLabels, Counter>,
48
49
    // on-demand DNS is not a part of DNS proxy, but part of ztunnel proxy itself
50
    pub on_demand_dns: Family<OnDemandDnsLabels, Counter>,
51
}
52
53
#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
54
pub enum Reporter {
55
    #[default]
56
    source,
57
    #[allow(dead_code)]
58
    destination,
59
}
60
61
#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
62
pub enum RequestProtocol {
63
    #[default]
64
    tcp,
65
    #[allow(dead_code)]
66
    http,
67
}
68
69
#[derive(Default, Copy, Clone, Debug, Hash, PartialEq, Eq)]
70
pub enum ResponseFlags {
71
    #[default]
72
    None,
73
    // connection denied due to policy
74
    AuthorizationPolicyDenied,
75
    // connection denied because we could not establish an upstream connection
76
    ConnectionFailure,
77
    // TLS handshake failure
78
    TlsFailure,
79
    // HTTP/2 handshake failure
80
    Http2HandshakeFailure,
81
    // Network policy blocking connection
82
    NetworkPolicyError,
83
    // Identity/certificate error
84
    IdentityError,
85
}
86
87
impl EncodeLabelValue for ResponseFlags {
88
0
    fn encode(&self, writer: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> {
89
0
        match self {
90
0
            ResponseFlags::None => writer.write_str("-"),
91
0
            ResponseFlags::AuthorizationPolicyDenied => writer.write_str("DENY"),
92
0
            ResponseFlags::ConnectionFailure => writer.write_str("CONNECT"),
93
0
            ResponseFlags::TlsFailure => writer.write_str("TLS_FAILURE"),
94
0
            ResponseFlags::Http2HandshakeFailure => writer.write_str("H2_HANDSHAKE_FAILURE"),
95
0
            ResponseFlags::NetworkPolicyError => writer.write_str("NETWORK_POLICY"),
96
0
            ResponseFlags::IdentityError => writer.write_str("IDENTITY_ERROR"),
97
        }
98
0
    }
99
}
100
101
#[derive(Default, Copy, Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
102
pub enum SecurityPolicy {
103
    #[default]
104
    unknown,
105
    mutual_tls,
106
}
107
108
#[derive(Clone, Debug, Default)]
109
pub struct DerivedWorkload {
110
    pub workload_name: Option<Strng>,
111
    pub app: Option<Strng>,
112
    pub revision: Option<Strng>,
113
    pub namespace: Option<Strng>,
114
    pub identity: Option<Identity>,
115
    pub cluster_id: Option<Strng>,
116
    pub region: Option<Strng>,
117
    pub zone: Option<Strng>,
118
}
119
120
#[derive(Clone)]
121
pub struct ConnectionOpen {
122
    pub reporter: Reporter,
123
    pub source: Option<Arc<Workload>>,
124
    pub derived_source: Option<DerivedWorkload>,
125
    pub destination: Option<Arc<Workload>>,
126
    pub destination_service: Option<ServiceDescription>,
127
    pub connection_security_policy: SecurityPolicy,
128
}
129
130
impl CommonTrafficLabels {
131
0
    fn new() -> Self {
132
0
        Default::default()
133
0
    }
134
135
0
    fn with_source(mut self, w: Option<&Workload>) -> Self {
136
0
        let Some(w) = w else { return self };
137
0
        self.source_workload = w.workload_name.clone().into();
138
0
        self.source_canonical_service = w.canonical_name.clone().into();
139
0
        self.source_canonical_revision = w.canonical_revision.clone().into();
140
0
        self.source_workload_namespace = w.namespace.clone().into();
141
        // We explicitly do not set source_principal here. This is set only with with_derived_source
142
        // based on the real mTLS identity.
143
0
        self.source_app = w.canonical_name.clone().into();
144
0
        self.source_version = w.canonical_revision.clone().into();
145
0
        self.source_cluster = w.cluster_id.to_string().into();
146
147
0
        let mut local = self.locality.0.unwrap_or_default();
148
0
        local.source_region = w.locality.region.clone().into();
149
0
        local.source_zone = w.locality.zone.clone().into();
150
0
        self.locality = OptionallyEncode(Some(local));
151
152
0
        self
153
0
    }
154
155
0
    fn with_derived_source(mut self, w: Option<&DerivedWorkload>) -> Self {
156
0
        let Some(w) = w else { return self };
157
0
        self.source_workload = w.workload_name.clone().into();
158
0
        self.source_canonical_service = w.app.clone().into();
159
0
        self.source_canonical_revision = w.revision.clone().into();
160
0
        self.source_workload_namespace = w.namespace.clone().into();
161
0
        self.source_app = w.workload_name.clone().into();
162
0
        self.source_version = w.revision.clone().into();
163
0
        self.source_cluster = w.cluster_id.clone().into();
164
        // This is the identity from the TLS handshake; this is the most trustworthy source so use it
165
0
        self.source_principal = w.identity.clone().into();
166
167
0
        let mut local = self.locality.0.unwrap_or_default();
168
0
        local.source_region = w.region.clone().into();
169
0
        local.source_zone = w.zone.clone().into();
170
0
        self.locality = OptionallyEncode(Some(local));
171
172
0
        self
173
0
    }
174
175
0
    fn with_destination(mut self, w: Option<&Workload>) -> Self {
176
0
        let Some(w) = w else { return self };
177
0
        self.destination_workload = w.workload_name.clone().into();
178
0
        self.destination_canonical_service = w.canonical_name.clone().into();
179
0
        self.destination_canonical_revision = w.canonical_revision.clone().into();
180
0
        self.destination_workload_namespace = w.namespace.clone().into();
181
0
        self.destination_principal = w.identity().into();
182
0
        self.destination_app = w.canonical_name.clone().into();
183
0
        self.destination_version = w.canonical_revision.clone().into();
184
0
        self.destination_cluster = w.cluster_id.to_string().into();
185
186
0
        let mut local = self.locality.0.unwrap_or_default();
187
0
        local.destination_region = w.locality.region.clone().into();
188
0
        local.destination_zone = w.locality.zone.clone().into();
189
0
        self.locality = OptionallyEncode(Some(local));
190
191
0
        self
192
0
    }
193
194
0
    fn with_destination_service(mut self, w: Option<&ServiceDescription>) -> Self {
195
0
        let Some(w) = w else { return self };
196
0
        self.destination_service = w.hostname.clone().into();
197
0
        self.destination_service_name = w.name.clone().into();
198
0
        self.destination_service_namespace = w.namespace.clone().into();
199
0
        self
200
0
    }
201
202
0
    fn with_derived_destination(mut self, w: Option<&DerivedWorkload>) -> Self {
203
0
        let Some(w) = w else { return self };
204
0
        self.destination_workload = w.workload_name.clone().into();
205
0
        self.destination_canonical_service = w.app.clone().into();
206
0
        self.destination_canonical_revision = w.revision.clone().into();
207
0
        self.destination_workload_namespace = w.namespace.clone().into();
208
0
        self.destination_app = w.workload_name.clone().into();
209
0
        self.destination_version = w.revision.clone().into();
210
0
        self.destination_cluster = w.cluster_id.clone().into();
211
        // This is the identity from the TLS handshake; this is the most trustworthy source so use it
212
0
        self.destination_principal = w.identity.clone().into();
213
214
0
        let mut local = self.locality.0.unwrap_or_default();
215
0
        local.destination_region = w.region.clone().into();
216
0
        local.destination_zone = w.zone.clone().into();
217
0
        self.locality = OptionallyEncode(Some(local));
218
219
0
        self
220
0
    }
221
}
222
223
impl From<ConnectionOpen> for CommonTrafficLabels {
224
0
    fn from(c: ConnectionOpen) -> Self {
225
0
        CommonTrafficLabels {
226
0
            reporter: c.reporter,
227
0
            request_protocol: RequestProtocol::tcp,
228
0
            response_flags: ResponseFlags::None,
229
0
            connection_security_policy: c.connection_security_policy,
230
0
            ..CommonTrafficLabels::new()
231
0
                // Intentionally before with_source; source is more reliable
232
0
                .with_derived_source(c.derived_source.as_ref())
233
0
                .with_source(c.source.as_deref())
234
0
                .with_destination(c.destination.as_deref())
235
0
                .with_destination_service(c.destination_service.as_ref())
236
0
        }
237
0
    }
238
}
239
240
/// Minimal labels for socket metrics (without direction)
241
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
242
pub struct SocketLabels {
243
    pub reporter: Reporter,
244
}
245
246
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
247
pub struct CommonTrafficLabels {
248
    reporter: Reporter,
249
250
    source_workload: DefaultedUnknown<RichStrng>,
251
    source_canonical_service: DefaultedUnknown<RichStrng>,
252
    source_canonical_revision: DefaultedUnknown<RichStrng>,
253
    source_workload_namespace: DefaultedUnknown<RichStrng>,
254
    source_principal: DefaultedUnknown<Identity>,
255
    source_app: DefaultedUnknown<RichStrng>,
256
    source_version: DefaultedUnknown<RichStrng>,
257
    source_cluster: DefaultedUnknown<RichStrng>,
258
259
    destination_service: DefaultedUnknown<RichStrng>,
260
    destination_service_namespace: DefaultedUnknown<RichStrng>,
261
    destination_service_name: DefaultedUnknown<RichStrng>,
262
263
    destination_workload: DefaultedUnknown<RichStrng>,
264
    destination_canonical_service: DefaultedUnknown<RichStrng>,
265
    destination_canonical_revision: DefaultedUnknown<RichStrng>,
266
    destination_workload_namespace: DefaultedUnknown<RichStrng>,
267
    destination_principal: DefaultedUnknown<Identity>,
268
    destination_app: DefaultedUnknown<RichStrng>,
269
    destination_version: DefaultedUnknown<RichStrng>,
270
    destination_cluster: DefaultedUnknown<RichStrng>,
271
272
    request_protocol: RequestProtocol,
273
    response_flags: ResponseFlags,
274
    connection_security_policy: SecurityPolicy,
275
276
    #[prometheus(flatten)]
277
    locality: OptionallyEncode<LocalityLabels>,
278
}
279
280
/// OptionallyEncode is a wrapper that will optionally encode the entire label set.
281
/// This differs from something like DefaultedUnknown which handles only the value - this makes the
282
/// entire label not show up.
283
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq)]
284
struct OptionallyEncode<T>(Option<T>);
285
impl<T: EncodeLabelSet> EncodeLabelSet for OptionallyEncode<T> {
286
0
    fn encode(&self, encoder: &mut LabelSetEncoder) -> Result<(), std::fmt::Error> {
287
0
        match &self.0 {
288
0
            None => Ok(()),
289
0
            Some(ll) => ll.encode(encoder),
290
        }
291
0
    }
292
}
293
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
294
struct LocalityLabels {
295
    source_region: DefaultedUnknown<RichStrng>,
296
    source_zone: DefaultedUnknown<RichStrng>,
297
    destination_region: DefaultedUnknown<RichStrng>,
298
    destination_zone: DefaultedUnknown<RichStrng>,
299
}
300
301
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq, EncodeLabelSet)]
302
pub struct OnDemandDnsLabels {
303
    // on-demand DNS client information is just nice-to-have
304
    source_workload: DefaultedUnknown<RichStrng>,
305
    source_canonical_service: DefaultedUnknown<RichStrng>,
306
    source_canonical_revision: DefaultedUnknown<RichStrng>,
307
    source_workload_namespace: DefaultedUnknown<RichStrng>,
308
    source_principal: DefaultedUnknown<Identity>,
309
    source_app: DefaultedUnknown<RichStrng>,
310
    source_version: DefaultedUnknown<RichStrng>,
311
    source_cluster: DefaultedUnknown<RichStrng>,
312
313
    // on-demand DNS is resolved per hostname, so this is the most interesting part
314
    hostname: DefaultedUnknown<RichStrng>,
315
}
316
317
impl OnDemandDnsLabels {
318
0
    pub fn new() -> Self {
319
0
        Default::default()
320
0
    }
321
322
0
    pub fn with_source(mut self, w: &Workload) -> Self {
323
0
        self.source_workload = w.workload_name.clone().into();
324
0
        self.source_canonical_service = w.canonical_name.clone().into();
325
0
        self.source_canonical_revision = w.canonical_revision.clone().into();
326
0
        self.source_workload_namespace = w.namespace.clone().into();
327
0
        self.source_principal = w.identity().into();
328
0
        self.source_app = w.canonical_name.clone().into();
329
0
        self.source_version = w.canonical_revision.clone().into();
330
0
        self.source_cluster = w.cluster_id.to_string().into();
331
0
        self
332
0
    }
333
334
0
    pub fn with_destination(mut self, w: &Workload) -> Self {
335
0
        self.hostname = w.hostname.clone().into();
336
0
        self
337
0
    }
338
}
339
340
impl Metrics {
341
0
    pub fn new(registry: &mut Registry) -> Self {
342
0
        let connection_opens = Family::default();
343
0
        registry.register(
344
            "tcp_connections_opened",
345
            "The total number of TCP connections opened",
346
0
            connection_opens.clone(),
347
        );
348
0
        let connection_close = Family::default();
349
0
        registry.register(
350
            "tcp_connections_closed",
351
            "The total number of TCP connections closed",
352
0
            connection_close.clone(),
353
        );
354
355
0
        let received_bytes = Family::default();
356
0
        registry.register(
357
            "tcp_received_bytes",
358
            "The size of total bytes received during request in case of a TCP connection",
359
0
            received_bytes.clone(),
360
        );
361
0
        let sent_bytes = Family::default();
362
0
        registry.register(
363
            "tcp_sent_bytes",
364
            "The size of total bytes sent during response in case of a TCP connection",
365
0
            sent_bytes.clone(),
366
        );
367
0
        let on_demand_dns = Family::default();
368
0
        registry.register(
369
            "on_demand_dns",
370
            "The total number of requests that used on-demand DNS (unstable)",
371
0
            on_demand_dns.clone(),
372
        );
373
374
0
        let connection_failures = Family::default();
375
0
        registry.register(
376
            "tcp_connections_failed",
377
            "The total number of TCP connections that failed to establish (unstable)",
378
0
            connection_failures.clone(),
379
        );
380
381
0
        let open_sockets = Family::default();
382
0
        registry.register(
383
            "tcp_sockets_open",
384
            "The current number of open TCP sockets (unstable)",
385
0
            open_sockets.clone(),
386
        );
387
388
0
        Self {
389
0
            connection_opens,
390
0
            connection_close,
391
0
            received_bytes,
392
0
            sent_bytes,
393
0
            on_demand_dns,
394
0
            connection_failures,
395
0
            open_sockets,
396
0
        }
397
0
    }
398
399
0
    pub fn record_socket_open(&self, labels: &SocketLabels) {
400
0
        self.open_sockets.get_or_create(labels).inc();
401
0
    }
402
403
0
    pub fn record_socket_close(&self, labels: &SocketLabels) {
404
0
        self.open_sockets.get_or_create(labels).dec();
405
0
    }
406
}
407
408
/// Guard to ensure socket close is recorded even if task is cancelled
409
/// This should be created at the start of an async block that handles a socket
410
/// Stores only the minimal information needed to reconstruct labels, avoiding
411
/// cloning the large CommonTrafficLabels struct
412
pub struct SocketCloseGuard {
413
    metrics: Arc<Metrics>,
414
    reporter: Reporter,
415
}
416
417
impl Drop for SocketCloseGuard {
418
0
    fn drop(&mut self) {
419
0
        let labels = SocketLabels {
420
0
            reporter: self.reporter,
421
0
        };
422
0
        self.metrics.record_socket_close(&labels);
423
0
    }
424
}
425
426
impl SocketCloseGuard {
427
    /// Create a new socket close guard
428
0
    pub fn new(metrics: Arc<Metrics>, reporter: Reporter) -> Self {
429
0
        Self { metrics, reporter }
430
0
    }
431
}
432
433
#[derive(Debug)]
434
/// ConnectionResult abstracts recording a metric and emitting an access log upon a connection completion
435
pub struct ConnectionResult {
436
    // Src address and name
437
    pub(crate) src: (SocketAddr, Option<RichStrng>),
438
    // Dst address and name
439
    pub(crate) dst: (SocketAddr, Option<RichStrng>),
440
    pub(crate) hbone_target: Option<HboneAddress>,
441
    pub(crate) start: Instant,
442
443
    // TODO: storing CommonTrafficLabels adds ~600 bytes retained throughout a connection life time.
444
    // We can pre-fetch the metrics we need at initialization instead of storing this, then keep a more
445
    // efficient representation for the fields we need to log. Ideally, this would even be optional
446
    // in case logs were disabled.
447
    pub(crate) tl: CommonTrafficLabels,
448
    pub(crate) metrics: Arc<Metrics>,
449
450
    // sent records the number of bytes sent on this connection
451
    pub(crate) sent: AtomicU64,
452
    // sent_metric records the number of bytes sent on this connection to the aggregated metric counter
453
    pub(crate) sent_metric: Counter,
454
    // recv records the number of bytes received on this connection
455
    pub(crate) recv: AtomicU64,
456
    // recv_metric records the number of bytes received on this connection to the aggregated metric counter
457
    pub(crate) recv_metric: Counter,
458
    // Have we recorded yet?
459
    pub(crate) recorded: bool,
460
}
461
462
// log_early_deny allows logging a connection is denied before we have enough information to emit proper
463
// access logs/metrics
464
0
pub fn log_early_deny<E: std::error::Error>(
465
0
    src: SocketAddr,
466
0
    dst: SocketAddr,
467
0
    reporter: Reporter,
468
0
    err: E,
469
0
) {
470
0
    event!(
471
            target: "access",
472
            parent: None,
473
0
            tracing::Level::WARN,
474
475
            src.addr = %src,
476
            dst.addr = %dst,
477
478
0
            direction = if reporter == Reporter::source {
479
0
                "outbound"
480
            } else {
481
0
                "inbound"
482
            },
483
484
0
            error = format!("{err}"),
485
486
0
            "connection failed"
487
    );
488
0
}
Unexecuted instantiation: ztunnel::proxy::metrics::log_early_deny::<ztunnel::proxy::Error>
Unexecuted instantiation: ztunnel::proxy::metrics::log_early_deny::<ztunnel::tls::lib::TlsError>
489
490
macro_rules! access_log {
491
    ($res:expr, $($fields:tt)*) => {
492
0
        let err = $res.as_ref().err().map(|e| e.to_string());
493
        match $res {
494
            Ok(_) => {
495
                event!(
496
                    target: "access",
497
                    parent: None,
498
                    tracing::Level::INFO,
499
                    $($fields)*
500
                    "connection complete"
501
                );
502
            }
503
            Err(_) => {
504
                event!(
505
                    target: "access",
506
                    parent: None,
507
                    tracing::Level::ERROR,
508
                    $($fields)*
509
                    error = err,
510
                    "connection complete"
511
                );
512
            }
513
        }
514
    };
515
}
516
517
pub struct ConnectionResultBuilder {
518
    src: (SocketAddr, Option<RichStrng>),
519
    dst: (SocketAddr, Option<RichStrng>),
520
    hbone_target: Option<HboneAddress>,
521
    start: Instant,
522
    tl: CommonTrafficLabels,
523
    metrics: Arc<Metrics>,
524
}
525
526
impl ConnectionResultBuilder {
527
0
    pub fn new(
528
0
        src: SocketAddr,
529
0
        dst: SocketAddr,
530
0
        // If using hbone, the inner HBONE address
531
0
        // That is, dst is the L4 address, while is the :authority.
532
0
        hbone_target: Option<HboneAddress>,
533
0
        start: Instant,
534
0
        conn: ConnectionOpen,
535
0
        metrics: Arc<Metrics>,
536
0
    ) -> Self {
537
        // for src and dest, try to get pod name but fall back to "canonical service"
538
0
        let mut src = (src, conn.source.as_ref().map(|wl| wl.name.clone().into()));
539
0
        let mut dst = (
540
0
            dst,
541
0
            conn.destination.as_ref().map(|wl| wl.name.clone().into()),
542
        );
543
0
        let tl = CommonTrafficLabels::from(conn);
544
545
0
        src.1 = src.1.or(tl.source_canonical_service.clone().inner());
546
0
        dst.1 = dst.1.or(tl.destination_canonical_service.clone().inner());
547
0
        Self {
548
0
            src,
549
0
            dst,
550
0
            hbone_target,
551
0
            start,
552
0
            tl,
553
0
            metrics,
554
0
        }
555
0
    }
556
557
0
    pub fn with_derived_source(mut self, w: &DerivedWorkload) -> Self {
558
0
        self.tl = self.tl.with_derived_source(Some(w));
559
0
        self.src.1 = w.workload_name.clone().map(RichStrng::from);
560
0
        self
561
0
    }
562
563
0
    pub fn with_derived_destination(mut self, w: &DerivedWorkload) -> Self {
564
0
        self.tl = self.tl.with_derived_destination(Some(w));
565
0
        self.dst.1 = w.workload_name.clone().map(RichStrng::from);
566
0
        self
567
0
    }
568
569
0
    pub fn build(self) -> ConnectionResult {
570
        // Grab the metrics with our labels now, so we don't need to fetch them each time.
571
        // The inner metric is an Arc so clone is fine/cheap.
572
        // With the raw Counter, we increment is a simple atomic add operation (~1ns).
573
        // Fetching the metric itself is ~300ns; fast, but we call it on each read/write so it would
574
        // add up.
575
0
        let sent_metric = self.metrics.sent_bytes.get_or_create(&self.tl).clone();
576
0
        let recv_metric = self.metrics.received_bytes.get_or_create(&self.tl).clone();
577
0
        let sent = atomic::AtomicU64::new(0);
578
0
        let recv = atomic::AtomicU64::new(0);
579
580
0
        let mtls = self.tl.connection_security_policy == SecurityPolicy::mutual_tls;
581
0
        event!(
582
            target: "access",
583
            parent: None,
584
0
            tracing::Level::DEBUG,
585
586
            src.addr = %self.src.0,
587
0
            src.workload = self.src.1.as_deref().map(to_value),
588
0
            src.namespace = self.tl.source_workload_namespace.to_value(),
589
0
            src.identity = self.tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned),
590
591
            dst.addr = %self.dst.0,
592
0
            dst.hbone_addr = self.hbone_target.as_ref().map(display),
593
0
            dst.service = self.tl.destination_service.to_value(),
594
0
            dst.workload = self.dst.1.as_deref().map(to_value),
595
0
            dst.namespace = self.tl.destination_workload_namespace.to_value(),
596
0
            dst.identity = self.tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned),
597
598
0
            direction = if self.tl.reporter == Reporter::source {
599
0
                "outbound"
600
            } else {
601
0
                "inbound"
602
            },
603
604
0
            "connection opened"
605
        );
606
607
0
        self.metrics.connection_opens.get_or_create(&self.tl).inc();
608
0
        ConnectionResult {
609
0
            src: self.src,
610
0
            dst: self.dst,
611
0
            hbone_target: self.hbone_target,
612
0
            start: self.start,
613
0
            tl: self.tl,
614
0
            metrics: self.metrics,
615
0
            sent,
616
0
            sent_metric,
617
0
            recv,
618
0
            recv_metric,
619
0
            recorded: false,
620
0
        }
621
0
    }
622
}
623
624
impl ConnectionResult {
625
0
    pub fn increment_send(&self, res: u64) {
626
0
        self.sent.inc_by(res);
627
0
        self.sent_metric.inc_by(res);
628
0
    }
629
630
0
    pub fn increment_recv(&self, res: u64) {
631
0
        self.recv.inc_by(res);
632
0
        self.recv_metric.inc_by(res);
633
0
    }
634
635
    // Record our final result, with more details as a response flag.
636
0
    pub fn record_with_flag<E: std::error::Error + 'static>(
637
0
        mut self,
638
0
        res: Result<(), E>,
639
0
        flag: ResponseFlags,
640
0
    ) {
641
0
        self.tl.response_flags = flag;
642
0
        self.record(res)
643
0
    }
644
645
    // Record our final result.
646
0
    pub fn record<E: std::error::Error + 'static>(mut self, res: Result<(), E>) {
647
        // If no specific flag was set and we have an error, try to infer the failure reason
648
0
        if self.tl.response_flags == ResponseFlags::None
649
0
            && let Err(ref err) = res
650
0
        {
651
0
            self.tl.response_flags = Self::extract_failure_reason(err);
652
0
        }
653
0
        self.record_internal(res)
654
0
    }
655
656
    // Extract failure reason from error type using downcasting
657
0
    fn extract_failure_reason<E: std::error::Error + 'static>(err: &E) -> ResponseFlags {
658
        use std::any::Any;
659
660
        // Try to downcast the error itself to proxy::Error
661
0
        if let Some(proxy_err) = (err as &dyn Any).downcast_ref::<proxy::Error>() {
662
0
            return match proxy_err {
663
0
                proxy::Error::Tls(_) => ResponseFlags::TlsFailure,
664
                proxy::Error::Http2Handshake(_) | proxy::Error::H2(_) => {
665
0
                    ResponseFlags::Http2HandshakeFailure
666
                }
667
0
                proxy::Error::MaybeHBONENetworkPolicyError(_) => ResponseFlags::NetworkPolicyError,
668
0
                proxy::Error::Identity(_) => ResponseFlags::IdentityError,
669
                proxy::Error::AuthorizationPolicyRejection(_)
670
                | proxy::Error::AuthorizationPolicyLateRejection => {
671
0
                    ResponseFlags::AuthorizationPolicyDenied
672
                }
673
0
                proxy::Error::ConnectionFailed(_) => ResponseFlags::ConnectionFailure,
674
0
                _ => ResponseFlags::ConnectionFailure,
675
            };
676
0
        }
677
678
        // Default to generic connection failure if we can't identify the error type
679
0
        ResponseFlags::ConnectionFailure
680
0
    }
681
682
    // Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions.
683
0
    fn record_internal<E: std::error::Error + 'static>(&mut self, res: Result<(), E>) {
684
0
        debug_assert!(!self.recorded, "record called multiple times");
685
0
        if self.recorded {
686
0
            return;
687
0
        }
688
0
        self.recorded = true;
689
0
        let tl = &self.tl;
690
691
        // Unconditionally record the connection was closed
692
0
        self.metrics.connection_close.get_or_create(tl).inc();
693
694
0
        if matches!(
695
0
            tl.response_flags,
696
            ResponseFlags::ConnectionFailure
697
                | ResponseFlags::AuthorizationPolicyDenied
698
                | ResponseFlags::TlsFailure
699
                | ResponseFlags::Http2HandshakeFailure
700
                | ResponseFlags::NetworkPolicyError
701
                | ResponseFlags::IdentityError
702
0
        ) {
703
0
            self.metrics.connection_failures.get_or_create(tl).inc();
704
0
        }
705
706
        // Unconditionally write out an access log
707
0
        let mtls = tl.connection_security_policy == SecurityPolicy::mutual_tls;
708
0
        let bytes = (
709
0
            self.recv.load(Ordering::SeqCst),
710
0
            self.sent.load(Ordering::SeqCst),
711
0
        );
712
0
        let dur = format!("{}ms", self.start.elapsed().as_millis());
713
714
        // We use our own macro to allow setting the level dynamically
715
0
        access_log!(
716
0
            res,
717
718
            src.addr = %self.src.0,
719
0
            src.workload = self.src.1.as_deref().map(to_value),
720
0
            src.namespace = tl.source_workload_namespace.to_value(),
721
0
            src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned),
722
723
            dst.addr = %self.dst.0,
724
0
            dst.hbone_addr = self.hbone_target.as_ref().map(display),
725
0
            dst.service = tl.destination_service.to_value(),
726
0
            dst.workload = self.dst.1.as_deref().map(to_value),
727
0
            dst.namespace = tl.destination_workload_namespace.to_value(),
728
0
            dst.identity = tl.destination_principal.as_ref().filter(|_| mtls).map(to_value_owned),
729
730
0
            direction = if tl.reporter == Reporter::source {
731
0
                "outbound"
732
            } else {
733
0
                "inbound"
734
            },
735
736
            // Istio flips the metric for source: https://github.com/istio/istio/issues/32399
737
            // Unflip for logs
738
0
            bytes_sent = if tl.reporter == Reporter::source {bytes.0} else {bytes.1},
739
0
            bytes_recv = if tl.reporter == Reporter::source {bytes.1} else {bytes.0},
740
            duration = dur,
741
        );
742
0
    }
743
}
744
745
impl Drop for ConnectionResult {
746
0
    fn drop(&mut self) {
747
0
        if !self.recorded {
748
0
            self.record_internal(Err(proxy::Error::ClosedFromDrain))
749
0
        }
750
0
    }
751
}
752
753
0
fn to_value_owned<T: ToString>(t: T) -> impl Value {
754
0
    t.to_string()
755
0
}
756
757
0
fn to_value<T: AsRef<str>>(t: &T) -> impl Value + '_ {
758
0
    let v: &str = t.as_ref();
759
0
    v
760
0
}