/rust/registry/src/index.crates.io-1949cf8c6b5b557f/tracing-appender-0.2.3/src/worker.rs
Line | Count | Source |
1 | | use crate::Msg; |
2 | | use crossbeam_channel::{Receiver, RecvError, TryRecvError}; |
3 | | use std::fmt::Debug; |
4 | | use std::io::Write; |
5 | | use std::{io, thread}; |
6 | | |
7 | | pub(crate) struct Worker<T: Write + Send + 'static> { |
8 | | writer: T, |
9 | | receiver: Receiver<Msg>, |
10 | | shutdown: Receiver<()>, |
11 | | } |
12 | | |
13 | | #[derive(Debug, Clone, Copy, Eq, PartialEq)] |
14 | | pub(crate) enum WorkerState { |
15 | | Empty, |
16 | | Disconnected, |
17 | | Continue, |
18 | | Shutdown, |
19 | | } |
20 | | |
21 | | impl<T: Write + Send + 'static> Worker<T> { |
22 | 0 | pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> { |
23 | 0 | Self { |
24 | 0 | writer, |
25 | 0 | receiver, |
26 | 0 | shutdown, |
27 | 0 | } |
28 | 0 | } Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::new Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::new |
29 | | |
30 | 0 | fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> { |
31 | 0 | match result { |
32 | 0 | Ok(Msg::Line(msg)) => { |
33 | 0 | self.writer.write_all(msg)?; |
34 | 0 | Ok(WorkerState::Continue) |
35 | | } |
36 | 0 | Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown), |
37 | 0 | Err(_) => Ok(WorkerState::Disconnected), |
38 | | } |
39 | 0 | } Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::handle_recv Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::handle_recv |
40 | | |
41 | 0 | fn handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState> { |
42 | 0 | match result { |
43 | 0 | Ok(Msg::Line(msg)) => { |
44 | 0 | self.writer.write_all(msg)?; |
45 | 0 | Ok(WorkerState::Continue) |
46 | | } |
47 | 0 | Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown), |
48 | 0 | Err(TryRecvError::Empty) => Ok(WorkerState::Empty), |
49 | 0 | Err(TryRecvError::Disconnected) => Ok(WorkerState::Disconnected), |
50 | | } |
51 | 0 | } Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::handle_try_recv Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::handle_try_recv |
52 | | |
53 | | /// Blocks on the first recv of each batch of logs, unless the |
54 | | /// channel is disconnected. Afterwards, grabs as many logs as |
55 | | /// it can off the channel, buffers them and attempts a flush. |
56 | 0 | pub(crate) fn work(&mut self) -> io::Result<WorkerState> { |
57 | | // Worker thread yields here if receive buffer is empty |
58 | 0 | let mut worker_state = self.handle_recv(&self.receiver.recv())?; |
59 | | |
60 | 0 | while worker_state == WorkerState::Continue { |
61 | 0 | let try_recv_result = self.receiver.try_recv(); |
62 | 0 | let handle_result = self.handle_try_recv(&try_recv_result); |
63 | 0 | worker_state = handle_result?; |
64 | | } |
65 | 0 | self.writer.flush()?; |
66 | 0 | Ok(worker_state) |
67 | 0 | } Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::work Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::work |
68 | | |
69 | | /// Creates a worker thread that processes a channel until it's disconnected |
70 | 0 | pub(crate) fn worker_thread(mut self, name: String) -> std::thread::JoinHandle<()> { |
71 | 0 | thread::Builder::new() |
72 | 0 | .name(name) |
73 | 0 | .spawn(move || { |
74 | | loop { |
75 | 0 | match self.work() { |
76 | 0 | Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} |
77 | | Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { |
78 | 0 | let _ = self.shutdown.recv(); |
79 | 0 | break; |
80 | | } |
81 | 0 | Err(_) => { |
82 | 0 | // TODO: Expose a metric for IO Errors, or print to stderr |
83 | 0 | } |
84 | | } |
85 | | } |
86 | 0 | if let Err(e) = self.writer.flush() { |
87 | 0 | eprintln!("Failed to flush. Error: {}", e); |
88 | 0 | } |
89 | 0 | }) Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::worker_thread::{closure#0}Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::worker_thread::{closure#0} |
90 | 0 | .expect("failed to spawn `tracing-appender` non-blocking worker thread") |
91 | 0 | } Unexecuted instantiation: <tracing_appender::worker::Worker<std::io::stdio::Stdout>>::worker_thread Unexecuted instantiation: <tracing_appender::worker::Worker<_>>::worker_thread |
92 | | } |