Coverage Report

Created: 2024-08-22 06:13

/rust/registry/src/index.crates.io-6f17d22bba15001f/prodash-29.0.0/src/throughput.rs
Line
Count
Source (jump to first uncovered line)
1
use std::{
2
    collections::VecDeque,
3
    sync::atomic::Ordering,
4
    time::{Duration, SystemTime},
5
};
6
7
use crate::{progress, unit};
8
9
const THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
10
const ONCE_A_SECOND: Duration = Duration::from_secs(1);
11
12
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
13
struct State {
14
    observed: Duration,
15
    last_value: progress::Step,
16
    elapsed_values: VecDeque<(Duration, progress::Step)>,
17
18
    last_update_duration: Duration,
19
    precomputed_throughput: Option<progress::Step>,
20
}
21
22
impl State {
23
0
    fn new(value: progress::Step, elapsed: Duration) -> Self {
24
0
        State {
25
0
            observed: elapsed,
26
0
            last_value: value,
27
0
            elapsed_values: {
28
0
                let mut v = VecDeque::with_capacity(6); // default frames per second
29
0
                v.push_back((elapsed, value));
30
0
                v
31
0
            },
32
0
33
0
            last_update_duration: elapsed,
34
0
            precomputed_throughput: None,
35
0
        }
36
0
    }
37
38
0
    fn compute_throughput(&mut self) -> progress::Step {
39
0
        let mut observed: Duration = self.elapsed_values.iter().map(|e| e.0).sum();
40
0
        while !self.elapsed_values.is_empty() && observed > ONCE_A_SECOND {
41
0
            let candidate = self
42
0
                .elapsed_values
43
0
                .front()
44
0
                .map(|e| e.0)
45
0
                .expect("at least one item as we are in the checked loop");
46
0
            if observed.checked_sub(candidate).unwrap_or_default() <= ONCE_A_SECOND {
47
0
                break;
48
0
            }
49
0
            observed -= candidate;
50
0
            self.elapsed_values.pop_front();
51
        }
52
0
        let observed_value: progress::Step = self.elapsed_values.iter().map(|e| e.1).sum();
53
0
        ((observed_value as f64 / observed.as_secs_f64()) * ONCE_A_SECOND.as_secs_f64()) as progress::Step
54
0
    }
55
56
0
    fn update(&mut self, value: progress::Step, elapsed: Duration) -> Option<unit::display::Throughput> {
57
0
        self.observed += elapsed;
58
0
        self.elapsed_values
59
0
            .push_back((elapsed, value.saturating_sub(self.last_value)));
60
0
        self.last_value = value;
61
0
        if self.observed - self.last_update_duration > THROTTLE_INTERVAL {
62
0
            self.precomputed_throughput = Some(self.compute_throughput());
63
0
            self.last_update_duration = self.observed;
64
0
        }
65
0
        self.throughput()
66
0
    }
67
68
0
    fn throughput(&self) -> Option<unit::display::Throughput> {
69
0
        self.precomputed_throughput.map(|tp| unit::display::Throughput {
70
0
            value_change_in_timespan: tp,
71
0
            timespan: ONCE_A_SECOND,
72
0
        })
73
0
    }
74
}
75
76
/// A utility to compute throughput of a set of progress values usually available to a renderer.
77
#[derive(Default)]
78
pub struct Throughput {
79
    sorted_by_key: Vec<(progress::Key, State)>,
80
    updated_at: Option<SystemTime>,
81
    elapsed: Option<Duration>,
82
}
83
84
impl Throughput {
85
    /// Called at the beginning of the drawing of a renderer to remember at which time progress values are
86
    /// going to be updated with [`update_and_get(…)`][Throughput::update_and_get()].
87
0
    pub fn update_elapsed(&mut self) {
88
0
        let now = SystemTime::now();
89
0
        self.elapsed = self.updated_at.and_then(|then| now.duration_since(then).ok());
90
0
        self.updated_at = Some(now);
91
0
    }
92
93
    /// Lookup or create the progress value at `key` and set its current `progress`, returning its computed
94
    /// throughput.
95
0
    pub fn update_and_get(
96
0
        &mut self,
97
0
        key: &progress::Key,
98
0
        progress: Option<&progress::Value>,
99
0
    ) -> Option<unit::display::Throughput> {
100
0
        progress.and_then(|progress| {
101
0
            self.elapsed
102
0
                .and_then(|elapsed| match self.sorted_by_key.binary_search_by_key(key, |t| t.0) {
103
0
                    Ok(index) => self.sorted_by_key[index]
104
0
                        .1
105
0
                        .update(progress.step.load(Ordering::SeqCst), elapsed),
106
0
                    Err(index) => {
107
0
                        let state = State::new(progress.step.load(Ordering::SeqCst), elapsed);
108
0
                        let tp = state.throughput();
109
0
                        self.sorted_by_key.insert(index, (*key, state));
110
0
                        tp
111
                    }
112
0
                })
113
0
        })
114
0
    }
115
116
    /// Compare the keys in `sorted_values` with our internal state and remove all missing tasks from it.
117
    ///
118
    /// This should be called after [`update_and_get(…)`][Throughput::update_and_get()] to pick up removed/finished
119
    /// progress.
120
0
    pub fn reconcile(&mut self, sorted_values: &[(progress::Key, progress::Task)]) {
121
0
        self.sorted_by_key
122
0
            .retain(|(key, _)| sorted_values.binary_search_by_key(key, |e| e.0).is_ok());
123
0
    }
124
}