Coverage Report

Created: 2026-02-14 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tracing-appender-0.2.3/src/worker.rs
Line
Count
Source
1
use crate::Msg;
2
use crossbeam_channel::{Receiver, RecvError, TryRecvError};
3
use std::fmt::Debug;
4
use std::io::Write;
5
use std::{io, thread};
6
7
pub(crate) struct Worker<T: Write + Send + 'static> {
8
    writer: T,
9
    receiver: Receiver<Msg>,
10
    shutdown: Receiver<()>,
11
}
12
13
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14
pub(crate) enum WorkerState {
15
    Empty,
16
    Disconnected,
17
    Continue,
18
    Shutdown,
19
}
20
21
impl<T: Write + Send + 'static> Worker<T> {
22
0
    pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> {
23
0
        Self {
24
0
            writer,
25
0
            receiver,
26
0
            shutdown,
27
0
        }
28
0
    }
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::new
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::new
29
30
0
    fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
31
0
        match result {
32
0
            Ok(Msg::Line(msg)) => {
33
0
                self.writer.write_all(msg)?;
34
0
                Ok(WorkerState::Continue)
35
            }
36
0
            Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
37
0
            Err(_) => Ok(WorkerState::Disconnected),
38
        }
39
0
    }
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::handle_recv
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::handle_recv
40
41
0
    fn handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState> {
42
0
        match result {
43
0
            Ok(Msg::Line(msg)) => {
44
0
                self.writer.write_all(msg)?;
45
0
                Ok(WorkerState::Continue)
46
            }
47
0
            Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
48
0
            Err(TryRecvError::Empty) => Ok(WorkerState::Empty),
49
0
            Err(TryRecvError::Disconnected) => Ok(WorkerState::Disconnected),
50
        }
51
0
    }
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::handle_try_recv
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::handle_try_recv
52
53
    /// Blocks on the first recv of each batch of logs, unless the
54
    /// channel is disconnected. Afterwards, grabs as many logs as
55
    /// it can off the channel, buffers them and attempts a flush.
56
0
    pub(crate) fn work(&mut self) -> io::Result<WorkerState> {
57
        // Worker thread yields here if receive buffer is empty
58
0
        let mut worker_state = self.handle_recv(&self.receiver.recv())?;
59
60
0
        while worker_state == WorkerState::Continue {
61
0
            let try_recv_result = self.receiver.try_recv();
62
0
            let handle_result = self.handle_try_recv(&try_recv_result);
63
0
            worker_state = handle_result?;
64
        }
65
0
        self.writer.flush()?;
66
0
        Ok(worker_state)
67
0
    }
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::work
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::work
68
69
    /// Creates a worker thread that processes a channel until it's disconnected
70
0
    pub(crate) fn worker_thread(mut self, name: String) -> std::thread::JoinHandle<()> {
71
0
        thread::Builder::new()
72
0
            .name(name)
73
0
            .spawn(move || {
74
                loop {
75
0
                    match self.work() {
76
0
                        Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
77
                        Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
78
0
                            let _ = self.shutdown.recv();
79
0
                            break;
80
                        }
81
0
                        Err(_) => {
82
0
                            // TODO: Expose a metric for IO Errors, or print to stderr
83
0
                        }
84
                    }
85
                }
86
0
                if let Err(e) = self.writer.flush() {
87
0
                    eprintln!("Failed to flush. Error: {}", e);
88
0
                }
89
0
            })
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::worker_thread::{closure#0}
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::worker_thread::{closure#0}
90
0
            .expect("failed to spawn `tracing-appender` non-blocking worker thread")
91
0
    }
Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::worker_thread
Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::worker_thread
92
}