Coverage Report

Created: 2026-04-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ztunnel/src/telemetry.rs
Line
Count
Source
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use itertools::Itertools;
16
use std::fmt::Debug;
17
use std::str::FromStr;
18
use std::time::Instant;
19
use std::{env, fmt, io};
20
21
use once_cell::sync::Lazy;
22
use once_cell::sync::OnceCell;
23
use serde::Serializer;
24
use serde::ser::SerializeMap;
25
26
use thiserror::Error;
27
use tracing::{Event, Subscriber, field, info, warn};
28
use tracing_appender::non_blocking::NonBlocking;
29
use tracing_core::Field;
30
use tracing_core::field::Visit;
31
use tracing_core::span::Record;
32
use tracing_log::NormalizeEvent;
33
34
use tracing_subscriber::fmt::format::{JsonVisitor, Writer};
35
36
use tracing_subscriber::field::RecordFields;
37
use tracing_subscriber::fmt::time::{FormatTime, SystemTime};
38
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields};
39
use tracing_subscriber::registry::LookupSpan;
40
use tracing_subscriber::{Layer, Registry, filter, prelude::*, reload};
41
42
pub static APPLICATION_START_TIME: Lazy<Instant> = Lazy::new(Instant::now);
43
static LOG_HANDLE: OnceCell<LogHandle> = OnceCell::new();
44
45
0
pub fn setup_logging() -> tracing_appender::non_blocking::WorkerGuard {
46
0
    Lazy::force(&APPLICATION_START_TIME);
47
0
    let (non_blocking, _guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
48
0
        .lossy(false)
49
0
        .buffered_lines_limit(1000) // Buffer up to 1000 lines to avoid blocking on logs
50
0
        .finish(std::io::stdout());
51
0
    tracing_subscriber::registry()
52
0
        .with(fmt_layer(non_blocking))
53
0
        .init();
54
0
    _guard
55
0
}
56
57
0
fn json_fmt(writer: NonBlocking) -> Box<dyn Layer<Registry> + Send + Sync + 'static> {
58
0
    let format = tracing_subscriber::fmt::layer()
59
0
        .with_writer(writer)
60
0
        .event_format(IstioJsonFormat())
61
0
        .fmt_fields(IstioJsonFormat());
62
0
    Box::new(format)
63
0
}
64
65
0
fn plain_fmt(writer: NonBlocking) -> Box<dyn Layer<Registry> + Send + Sync + 'static> {
66
0
    let format = tracing_subscriber::fmt::layer()
67
0
        .with_writer(writer)
68
0
        .event_format(IstioFormat())
69
0
        .fmt_fields(IstioFormat());
70
0
    Box::new(format)
71
0
}
72
73
0
fn fmt_layer(writer: NonBlocking) -> Box<dyn Layer<Registry> + Send + Sync + 'static> {
74
0
    let format = if env::var("LOG_FORMAT").unwrap_or("plain".to_string()) == "json" {
75
0
        json_fmt(writer)
76
    } else {
77
0
        plain_fmt(writer)
78
    };
79
0
    let filter = default_filter();
80
0
    let (layer, reload) = reload::Layer::new(format.with_filter(filter));
81
0
    LOG_HANDLE
82
0
        .set(reload)
83
0
        .map_or_else(|_| warn!("setup log handler failed"), |_| {});
84
0
    Box::new(layer)
85
0
}
86
87
0
fn default_filter() -> filter::Targets {
88
    // Read from env var, but prefix with setting DNS logs to warn as they are noisy; they can be explicitly overriden
89
0
    let var: String = env::var("RUST_LOG")
90
0
        .map_err(|_| ())
91
0
        .map(|v| "hickory_server::server=off,".to_string() + v.as_str())
92
0
        .unwrap_or("hickory_server::server=off,info".to_string());
93
0
    filter::Targets::from_str(&var).expect("static filter should build")
94
0
}
95
96
// a handle to get and set the log level
97
type BoxLayer = Box<dyn Layer<Registry> + Send + Sync + 'static>;
98
type FilteredLayer = filter::Filtered<BoxLayer, filter::Targets, Registry>;
99
type LogHandle = reload::Handle<FilteredLayer, Registry>;
100
101
/// set_level dynamically updates the logging level to *include* level. If `reset` is true, it will
102
/// reset the entire logging configuration first.
103
0
pub fn set_level(reset: bool, level: &str) -> Result<(), Error> {
104
0
    if let Some(handle) = LOG_HANDLE.get() {
105
        // new_directive will be current_directive + level
106
        //it can be duplicate, but the Target's parse() will properly handle it
107
0
        let new_directive = if let Ok(current) = handle.with_current(|f| f.filter().to_string()) {
108
0
            if reset {
109
0
                if level.is_empty() {
110
0
                    default_filter().to_string()
111
                } else {
112
0
                    format!("{},{}", default_filter(), level)
113
                }
114
            } else {
115
0
                format!("{current},{level}")
116
            }
117
        } else {
118
0
            level.to_string()
119
        };
120
121
        //create the new Targets based on the new directives
122
0
        let new_filter = filter::Targets::from_str(&new_directive)?;
123
0
        info!("new log filter is {new_filter}");
124
125
        //set the new filter
126
0
        Ok(handle.modify(|layer| {
127
0
            *layer.filter_mut() = new_filter;
128
0
        })?)
129
    } else {
130
0
        warn!("failed to get log handle");
131
0
        Err(Error::Uninitialized)
132
    }
133
0
}
134
135
0
pub fn get_current_loglevel() -> Result<String, Error> {
136
0
    if let Some(handle) = LOG_HANDLE.get() {
137
0
        Ok(handle.with_current(|f| f.filter().to_string())?)
138
    } else {
139
0
        Err(Error::Uninitialized)
140
    }
141
0
}
142
143
#[derive(Error, Debug)]
144
pub enum Error {
145
    #[error("parse failure: {0}")]
146
    InvalidFilter(#[from] filter::ParseError),
147
    #[error("reload failure: {0}")]
148
    Reload(#[from] reload::Error),
149
    #[error("logging is not initialized")]
150
    Uninitialized,
151
}
152
153
// IstioFormat encodes logs in the "standard" Istio JSON formatting used in the rest of the code
154
struct IstioJsonFormat();
155
156
// IstioFormat encodes logs in the "standard" Istio formatting used in the rest of the code
157
struct IstioFormat();
158
159
struct Visitor<'writer> {
160
    res: std::fmt::Result,
161
    is_empty: bool,
162
    writer: Writer<'writer>,
163
}
164
165
impl Visitor<'_> {
166
0
    fn write_padded(&mut self, value: &impl Debug) -> std::fmt::Result {
167
0
        let padding = if self.is_empty {
168
0
            self.is_empty = false;
169
0
            ""
170
        } else {
171
0
            " "
172
        };
173
0
        write!(self.writer, "{padding}{value:?}")
174
0
    }
175
}
176
177
impl field::Visit for Visitor<'_> {
178
0
    fn record_str(&mut self, field: &field::Field, value: &str) {
179
0
        if self.res.is_err() {
180
0
            return;
181
0
        }
182
183
0
        self.record_debug(field, &value)
184
0
    }
185
186
0
    fn record_debug(&mut self, field: &field::Field, val: &dyn std::fmt::Debug) {
187
0
        self.res = match field.name() {
188
            // Skip fields that are actually log metadata that have already been handled
189
0
            name if name.starts_with("log.") => Ok(()),
190
            // For the message, write out the message and a tab to separate the future fields
191
0
            "message" => write!(self.writer, "{val:?}\t"),
192
            // For the rest, k=v.
193
0
            _ => self.write_padded(&format_args!("{}={val:?}", field.name())),
194
        }
195
0
    }
196
}
197
198
impl<'writer> FormatFields<'writer> for IstioFormat {
199
0
    fn format_fields<R: tracing_subscriber::field::RecordFields>(
200
0
        &self,
201
0
        writer: Writer<'writer>,
202
0
        fields: R,
203
0
    ) -> std::fmt::Result {
204
0
        let mut visitor = Visitor {
205
0
            writer,
206
0
            res: Ok(()),
207
0
            is_empty: true,
208
0
        };
209
0
        fields.record(&mut visitor);
210
0
        visitor.res
211
0
    }
Unexecuted instantiation: <ztunnel::telemetry::IstioFormat as tracing_subscriber::fmt::format::FormatFields>::format_fields::<&tracing_core::span::Attributes>
Unexecuted instantiation: <ztunnel::telemetry::IstioFormat as tracing_subscriber::fmt::format::FormatFields>::format_fields::<&tracing_core::span::Record>
Unexecuted instantiation: <ztunnel::telemetry::IstioFormat as tracing_subscriber::fmt::format::FormatFields>::format_fields::<&tracing_core::event::Event>
212
}
213
214
impl<S, N> FormatEvent<S, N> for IstioFormat
215
where
216
    S: Subscriber + for<'a> LookupSpan<'a>,
217
    N: for<'a> FormatFields<'a> + 'static,
218
{
219
0
    fn format_event(
220
0
        &self,
221
0
        ctx: &FmtContext<'_, S, N>,
222
0
        mut writer: Writer<'_>,
223
0
        event: &Event<'_>,
224
0
    ) -> std::fmt::Result {
225
0
        let normalized_meta = event.normalized_metadata();
226
0
        SystemTime.format_time(&mut writer)?;
227
0
        let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
228
0
        write!(
229
0
            writer,
230
0
            "\t{}\t",
231
0
            meta.level().to_string().to_ascii_lowercase()
232
0
        )?;
233
234
0
        let target = meta.target();
235
        // No need to prefix everything
236
0
        let target = target.strip_prefix("ztunnel::").unwrap_or(target);
237
0
        write!(writer, "{target}")?;
238
239
        // Write out span fields. Istio logging outside of Rust doesn't really have this concept
240
0
        if let Some(scope) = ctx.event_scope() {
241
0
            for span in scope.from_root() {
242
0
                write!(writer, ":{}", span.metadata().name())?;
243
0
                let ext = span.extensions();
244
0
                if let Some(fields) = &ext.get::<FormattedFields<N>>()
245
0
                    && !fields.is_empty()
246
                {
247
0
                    write!(writer, "{{{fields}}}")?;
248
0
                }
249
            }
250
0
        };
251
        // Insert tab only if there is fields
252
0
        if event.fields().any(|_| true) {
253
0
            write!(writer, "\t")?;
254
0
        }
255
256
0
        ctx.format_fields(writer.by_ref(), event)?;
257
258
0
        writeln!(writer)
259
0
    }
260
}
261
262
struct JsonVisitory<S: SerializeMap> {
263
    serializer: S,
264
    state: Result<(), S::Error>,
265
}
266
267
impl<S: SerializeMap> JsonVisitory<S> {
268
0
    pub(crate) fn done(self) -> Result<S, S::Error> {
269
0
        let JsonVisitory { serializer, state } = self;
270
0
        state?;
271
0
        Ok(serializer)
272
0
    }
273
}
274
275
impl<S: SerializeMap> Visit for JsonVisitory<S> {
276
0
    fn record_bool(&mut self, field: &Field, value: bool) {
277
        // If previous fields serialized successfully, continue serializing,
278
        // otherwise, short-circuit and do nothing.
279
0
        if self.state.is_ok() {
280
0
            self.state = self.serializer.serialize_entry(field.name(), &value)
281
0
        }
282
0
    }
283
284
0
    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
285
0
        if self.state.is_ok() {
286
0
            self.state = self
287
0
                .serializer
288
0
                .serialize_entry(field.name(), &format_args!("{value:?}"))
289
0
        }
290
0
    }
291
292
0
    fn record_u64(&mut self, field: &Field, value: u64) {
293
0
        if self.state.is_ok() {
294
0
            self.state = self.serializer.serialize_entry(field.name(), &value)
295
0
        }
296
0
    }
297
298
0
    fn record_i64(&mut self, field: &Field, value: i64) {
299
0
        if self.state.is_ok() {
300
0
            self.state = self.serializer.serialize_entry(field.name(), &value)
301
0
        }
302
0
    }
303
304
0
    fn record_f64(&mut self, field: &Field, value: f64) {
305
0
        if self.state.is_ok() {
306
0
            self.state = self.serializer.serialize_entry(field.name(), &value)
307
0
        }
308
0
    }
309
310
0
    fn record_str(&mut self, field: &Field, value: &str) {
311
0
        if self.state.is_ok() {
312
0
            self.state = self.serializer.serialize_entry(field.name(), &value)
313
0
        }
314
0
    }
315
}
316
pub struct WriteAdaptor<'a> {
317
    fmt_write: &'a mut dyn fmt::Write,
318
}
319
impl<'a> WriteAdaptor<'a> {
320
0
    pub fn new(fmt_write: &'a mut dyn fmt::Write) -> Self {
321
0
        Self { fmt_write }
322
0
    }
323
}
324
impl io::Write for WriteAdaptor<'_> {
325
0
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
326
0
        let s =
327
0
            std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
328
329
0
        self.fmt_write.write_str(s).map_err(io::Error::other)?;
330
331
0
        Ok(s.len())
332
0
    }
333
334
0
    fn flush(&mut self) -> io::Result<()> {
335
0
        Ok(())
336
0
    }
337
}
338
impl<S, N> FormatEvent<S, N> for IstioJsonFormat
339
where
340
    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
341
    N: for<'writer> FormatFields<'writer> + 'static,
342
{
343
0
    fn format_event(
344
0
        &self,
345
0
        ctx: &FmtContext<'_, S, N>,
346
0
        mut writer: Writer<'_>,
347
0
        event: &Event<'_>,
348
0
    ) -> fmt::Result
349
0
    where
350
0
        S: Subscriber + for<'a> LookupSpan<'a>,
351
    {
352
0
        let meta = event.normalized_metadata();
353
0
        let meta = meta.as_ref().unwrap_or_else(|| event.metadata());
354
0
        let mut write = || {
355
0
            let mut timestamp = String::with_capacity(28);
356
0
            let mut w = Writer::new(&mut timestamp);
357
0
            SystemTime.format_time(&mut w)?;
358
0
            let mut sx = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
359
0
            let mut serializer = sx.serialize_map(event.fields().try_len().ok())?;
360
0
            serializer.serialize_entry("level", &meta.level().as_str().to_ascii_lowercase())?;
361
0
            serializer.serialize_entry("time", &timestamp)?;
362
0
            serializer.serialize_entry("scope", meta.target())?;
363
0
            let mut v = JsonVisitory {
364
0
                serializer,
365
0
                state: Ok(()),
366
0
            };
367
0
            event.record(&mut v);
368
369
0
            let mut serializer = v.done()?;
370
0
            if let Some(scope) = ctx.event_scope() {
371
0
                for span in scope.from_root() {
372
0
                    let ext = span.extensions();
373
0
                    if let Some(fields) = &ext.get::<FormattedFields<N>>() {
374
0
                        let json = serde_json::from_str::<serde_json::Value>(fields)?;
375
0
                        serializer.serialize_entry(span.metadata().name(), &json)?;
376
0
                    }
377
                }
378
0
            };
379
0
            SerializeMap::end(serializer)?;
380
0
            Ok::<(), anyhow::Error>(())
381
0
        };
382
0
        write().map_err(|_| fmt::Error)?;
383
0
        writeln!(writer)
384
0
    }
385
}
386
387
// Copied from tracing_subscriber json
388
impl<'a> FormatFields<'a> for IstioJsonFormat {
389
    /// Format the provided `fields` to the provided `writer`, returning a result.
390
0
    fn format_fields<R: RecordFields>(&self, mut writer: Writer<'_>, fields: R) -> fmt::Result {
391
        use tracing_subscriber::field::VisitOutput;
392
0
        let mut v = JsonVisitor::new(&mut writer);
393
0
        fields.record(&mut v);
394
0
        v.finish()
395
0
    }
Unexecuted instantiation: <ztunnel::telemetry::IstioJsonFormat as tracing_subscriber::fmt::format::FormatFields>::format_fields::<&tracing_core::span::Attributes>
Unexecuted instantiation: <ztunnel::telemetry::IstioJsonFormat as tracing_subscriber::fmt::format::FormatFields>::format_fields::<&tracing_core::span::Record>
396
397
0
    fn add_fields(
398
0
        &self,
399
0
        _current: &'a mut FormattedFields<Self>,
400
0
        _fields: &Record<'_>,
401
0
    ) -> fmt::Result {
402
        // We could implement this but tracing doesn't give us an easy or efficient way to do so.
403
        // for not just disallow it.
404
0
        debug_assert!(false, "add_fields is inefficient and should not be used");
405
0
        Ok(())
406
0
    }
407
}
408
409
/// Mod testing gives access to a test logger, which stores logs in memory for querying.
410
/// Inspired by https://github.com/dbrgn/tracing-test
411
#[cfg(any(test, feature = "testing"))]
412
pub mod testing {
413
    use crate::telemetry::{APPLICATION_START_TIME, IstioJsonFormat, fmt_layer};
414
    use itertools::Itertools;
415
    use once_cell::sync::Lazy;
416
    use serde_json::Value;
417
    use std::collections::HashMap;
418
    use std::fmt::{Display, Formatter};
419
    use std::io;
420
    use std::sync::{Mutex, MutexGuard, OnceLock};
421
422
    use tracing_subscriber::fmt;
423
424
    use tracing_subscriber::layer::SubscriberExt;
425
    use tracing_subscriber::util::SubscriberInitExt;
426
427
    #[derive(Debug)]
428
    pub enum LogError {
429
        // Wanted to equal the value, its missing
430
        Missing(String),
431
        // Want to be absent but it is present
432
        Present(String),
433
        // Mismatch: want, got
434
        Mismatch(String, String),
435
    }
436
437
    impl Display for LogError {
438
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439
            match self {
440
                LogError::Missing(_v) => {
441
                    write!(f, "missing")
442
                }
443
                LogError::Present(v) => {
444
                    write!(f, "{v:?} found unexpectedly")
445
                }
446
                LogError::Mismatch(want, got) => {
447
                    write!(f, "{want:?} != {got:?}")
448
                }
449
            }
450
        }
451
    }
452
453
    /// assert_contains asserts the logs contain a line with the matching keys.
454
    /// Common keys to match one are "target" and "message"; most of the rest are custom.
455
    #[track_caller]
456
    pub fn assert_contains(want: HashMap<&str, &str>) {
457
        let logs = {
458
            let buf = global_buf().lock().unwrap();
459
            std::str::from_utf8(&buf)
460
                .expect("Logs contain invalid UTF8")
461
                .to_string()
462
        };
463
        let errors: Vec<HashMap<_, _>> = logs
464
            .lines()
465
            .map(|line| {
466
                serde_json::from_str::<serde_json::Value>(line).expect("log must be valid json")
467
            })
468
            .map(|log| {
469
                let mut errors = HashMap::new();
470
                for (k, v) in &want {
471
                    let Some(have) = log.get(k) else {
472
                        if !v.is_empty() {
473
                            errors.insert(k.to_string(), LogError::Missing(v.to_string()));
474
                        }
475
                        continue;
476
                    };
477
                    let have = match have {
478
                        Value::Number(n) => format!("{n}"),
479
                        Value::String(v) => v.clone(),
480
                        _ => panic!("assert_contains currently only supports string/number values"),
481
                    };
482
                    if v.is_empty() {
483
                        errors.insert(k.to_string(), LogError::Present(have));
484
                        continue;
485
                    }
486
                    // TODO fuzzy match
487
                    if *v != have {
488
                        errors.insert(k.to_string(), LogError::Mismatch(v.to_string(), have));
489
                    }
490
                }
491
                errors
492
            })
493
            .sorted_by_key(|h| h.len())
494
            .collect();
495
496
        let found_exact_match = errors.first().map(|h| h.is_empty()).unwrap_or(false);
497
        if found_exact_match {
498
            return;
499
        }
500
501
        let total = errors.len();
502
        let help = errors
503
            .iter()
504
            .take(10)
505
            .map(|h| {
506
                h.iter()
507
                    .sorted_by_key(|(k, _)| *k)
508
                    .map(|(k, err)| format!("{k}:{err}"))
509
                    .join("\n")
510
            })
511
            .join("\n\n");
512
        panic!(
513
            "Analyzed {total} logs but none matched our criteria. Closest 10 matches:\n\n{help}"
514
        );
515
    }
516
517
    /// MockWriter will store written logs
518
    #[derive(Debug)]
519
    pub struct MockWriter<'a> {
520
        buf: &'a Mutex<Vec<u8>>,
521
    }
522
523
    impl<'a> MockWriter<'a> {
524
        pub fn new(buf: &'a Mutex<Vec<u8>>) -> Self {
525
            Self { buf }
526
        }
527
528
        fn buf(&self) -> io::Result<MutexGuard<'a, Vec<u8>>> {
529
            self.buf
530
                .lock()
531
                .map_err(|_| io::Error::from(io::ErrorKind::Other))
532
        }
533
    }
534
535
    impl io::Write for MockWriter<'_> {
536
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
537
            let mut target = self.buf()?;
538
            target.write(buf)
539
        }
540
541
        fn flush(&mut self) -> io::Result<()> {
542
            self.buf()?.flush()
543
        }
544
    }
545
546
    impl fmt::MakeWriter<'_> for MockWriter<'_> {
547
        type Writer = Self;
548
549
        fn make_writer(&self) -> Self::Writer {
550
            MockWriter::new(self.buf)
551
        }
552
    }
553
554
    // Global buffer to store logs in
555
    fn global_buf() -> &'static Mutex<Vec<u8>> {
556
        static GLOBAL_BUF: OnceLock<Mutex<Vec<u8>>> = OnceLock::new();
557
        GLOBAL_BUF.get_or_init(|| Mutex::new(vec![]))
558
    }
559
560
    pub fn setup_test_logging() {
561
        Lazy::force(&APPLICATION_START_TIME);
562
        let mock_writer = MockWriter::new(global_buf());
563
        let (non_blocking, _guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
564
            .lossy(false)
565
            .buffered_lines_limit(1)
566
            .finish(std::io::stdout());
567
        // Ensure we do not close until the program ends
568
        Box::leak(Box::new(_guard));
569
        let layer: fmt::Layer<_, _, _, _> = fmt::layer()
570
            .event_format(IstioJsonFormat())
571
            .fmt_fields(IstioJsonFormat())
572
            .with_writer(mock_writer);
573
        tracing_subscriber::registry()
574
            .with(fmt_layer(non_blocking))
575
            .with(layer)
576
            .init();
577
    }
578
}