Coverage Report

Created: 2024-12-17 06:15

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-timer-3.0.3/src/native/timer.rs
Line
Count
Source (jump to first uncovered line)
1
use std::fmt;
2
use std::pin::Pin;
3
use std::sync::atomic::Ordering::SeqCst;
4
use std::sync::atomic::{AtomicPtr, AtomicUsize};
5
use std::sync::{Arc, Mutex, Weak};
6
use std::task::{Context, Poll};
7
use std::time::Instant;
8
9
use std::future::Future;
10
11
use super::AtomicWaker;
12
use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
13
14
/// A "timer heap" used to power separately owned instances of `Delay`.
15
///
16
/// This timer is implemented as a priority queued-based heap. Each `Timer`
17
/// contains a few primary methods which which to drive it:
18
///
19
/// * `next_wake` indicates how long the ambient system needs to sleep until it
20
///   invokes further processing on a `Timer`
21
/// * `advance_to` is what actually fires timers on the `Timer`, and should be
22
///   called essentially every iteration of the event loop, or when the time
23
///   specified by `next_wake` has elapsed.
24
/// * The `Future` implementation for `Timer` is used to process incoming timer
25
///   updates and requests. This is used to schedule new timeouts, update
26
///   existing ones, or delete existing timeouts. The `Future` implementation
27
///   will never resolve, but it'll schedule notifications of when to wake up
28
///   and process more messages.
29
///
30
/// Note that if you're using this crate you probably don't need to use a
31
/// `Timer` as there is a global one already available for you run on a helper
32
/// thread. If this isn't desirable, though, then the
33
/// `TimerHandle::set_fallback` method can be used instead!
34
pub struct Timer {
35
    inner: Arc<Inner>,
36
    timer_heap: Heap<HeapTimer>,
37
}
38
39
/// A handle to a `Timer` which is used to create instances of a `Delay`.
40
#[derive(Clone)]
41
pub struct TimerHandle {
42
    pub(crate) inner: Weak<Inner>,
43
}
44
45
pub(crate) struct Inner {
46
    /// List of updates the `Timer` needs to process
47
    pub(crate) list: ArcList<ScheduledTimer>,
48
49
    /// The blocked `Timer` task to receive notifications to the `list` above.
50
    pub(crate) waker: AtomicWaker,
51
}
52
53
/// Shared state between the `Timer` and a `Delay`.
54
pub(crate) struct ScheduledTimer {
55
    pub(crate) waker: AtomicWaker,
56
57
    // The lowest bit here is whether the timer has fired or not, the second
58
    // lowest bit is whether the timer has been invalidated, and all the other
59
    // bits are the "generation" of the timer which is reset during the `reset`
60
    // function. Only timers for a matching generation are fired.
61
    pub(crate) state: AtomicUsize,
62
63
    pub(crate) inner: Weak<Inner>,
64
    pub(crate) at: Mutex<Option<Instant>>,
65
66
    // TODO: this is only accessed by the timer thread, should have a more
67
    // lightweight protection than a `Mutex`
68
    pub(crate) slot: Mutex<Option<Slot>>,
69
}
70
71
impl Timer {
72
    /// Creates a new timer heap ready to create new timers.
73
0
    pub fn new() -> Timer {
74
0
        Timer {
75
0
            inner: Arc::new(Inner {
76
0
                list: ArcList::new(),
77
0
                waker: AtomicWaker::new(),
78
0
            }),
79
0
            timer_heap: Heap::new(),
80
0
        }
81
0
    }
82
83
    /// Returns a handle to this timer heap, used to create new timeouts.
84
0
    pub fn handle(&self) -> TimerHandle {
85
0
        TimerHandle {
86
0
            inner: Arc::downgrade(&self.inner),
87
0
        }
88
0
    }
89
90
    /// Returns the time at which this timer next needs to be invoked with
91
    /// `advance_to`.
92
    ///
93
    /// Event loops or threads typically want to sleep until the specified
94
    /// instant.
95
0
    pub fn next_event(&self) -> Option<Instant> {
96
0
        self.timer_heap.peek().map(|t| t.at)
97
0
    }
98
99
    /// Proces any timers which are supposed to fire at or before the current
100
    /// instant.
101
    ///
102
    /// This method is equivalent to `self.advance_to(Instant::now())`.
103
0
    pub fn advance(&mut self) {
104
0
        self.advance_to(Instant::now())
105
0
    }
106
107
    /// Proces any timers which are supposed to fire before `now` specified.
108
    ///
109
    /// This method should be called on `Timer` periodically to advance the
110
    /// internal state and process any pending timers which need to fire.
111
0
    pub fn advance_to(&mut self, now: Instant) {
112
        loop {
113
0
            match self.timer_heap.peek() {
114
0
                Some(head) if head.at <= now => {}
115
0
                Some(_) => break,
116
0
                None => break,
117
            };
118
119
            // Flag the timer as fired and then notify its task, if any, that's
120
            // blocked.
121
0
            let heap_timer = self.timer_heap.pop().unwrap();
122
0
            *heap_timer.node.slot.lock().unwrap() = None;
123
0
            let bits = heap_timer.gen << 2;
124
0
            match heap_timer
125
0
                .node
126
0
                .state
127
0
                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
128
            {
129
0
                Ok(_) => heap_timer.node.waker.wake(),
130
0
                Err(_b) => {}
131
            }
132
        }
133
0
    }
134
135
    /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
136
    /// timer at `idx` and sets it to fire at `at`.
137
0
    fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
138
0
        // TODO: avoid remove + push and instead just do one sift of the heap?
139
0
        // In theory we could update it in place and then do the percolation
140
0
        // as necessary
141
0
        let gen = node.state.load(SeqCst) >> 2;
142
0
        let mut slot = node.slot.lock().unwrap();
143
0
        if let Some(heap_slot) = slot.take() {
144
0
            self.timer_heap.remove(heap_slot);
145
0
        }
146
0
        *slot = Some(self.timer_heap.push(HeapTimer {
147
0
            at,
148
0
            gen,
149
0
            node: node.clone(),
150
0
        }));
151
0
    }
152
153
0
    fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
154
0
        // If this `idx` is still around and it's still got a registered timer,
155
0
        // then we jettison it form the timer heap.
156
0
        let mut slot = node.slot.lock().unwrap();
157
0
        let heap_slot = match slot.take() {
158
0
            Some(slot) => slot,
159
0
            None => return,
160
        };
161
0
        self.timer_heap.remove(heap_slot);
162
0
    }
163
164
0
    fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
165
0
        node.state.fetch_or(0b10, SeqCst);
166
0
        node.waker.wake();
167
0
    }
168
}
169
170
impl Future for Timer {
171
    type Output = ();
172
173
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174
0
        Pin::new(&mut self.inner).waker.register(cx.waker());
175
0
        let mut list = self.inner.list.take();
176
0
        while let Some(node) = list.pop() {
177
0
            let at = *node.at.lock().unwrap();
178
0
            match at {
179
0
                Some(at) => self.update_or_add(at, node),
180
0
                None => self.remove(node),
181
            }
182
        }
183
0
        Poll::Pending
184
0
    }
185
}
186
187
impl Drop for Timer {
188
0
    fn drop(&mut self) {
189
0
        // Seal off our list to prevent any more updates from getting pushed on.
190
0
        // Any timer which sees an error from the push will immediately become
191
0
        // inert.
192
0
        let mut list = self.inner.list.take_and_seal();
193
194
        // Now that we'll never receive another timer, drain the list of all
195
        // updates and also drain our heap of all active timers, invalidating
196
        // everything.
197
0
        while let Some(t) = list.pop() {
198
0
            self.invalidate(t);
199
0
        }
200
0
        while let Some(t) = self.timer_heap.pop() {
201
0
            self.invalidate(t.node);
202
0
        }
203
0
    }
204
}
205
206
impl fmt::Debug for Timer {
207
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
208
0
        f.debug_struct("Timer").field("heap", &"...").finish()
209
0
    }
210
}
211
212
impl Default for Timer {
213
0
    fn default() -> Self {
214
0
        Self::new()
215
0
    }
216
}
217
218
static HANDLE_FALLBACK: AtomicPtr<Inner> = AtomicPtr::new(EMPTY_HANDLE);
219
const EMPTY_HANDLE: *mut Inner = std::ptr::null_mut();
220
221
/// Error returned from `TimerHandle::set_fallback`.
222
#[derive(Clone, Debug)]
223
struct SetDefaultError(());
224
225
impl TimerHandle {
226
    /// Configures this timer handle to be the one returned by
227
    /// `TimerHandle::default`.
228
    ///
229
    /// By default a global thread is initialized on the first call to
230
    /// `TimerHandle::default`. This first call can happen transitively through
231
    /// `Delay::new`. If, however, that hasn't happened yet then the global
232
    /// default timer handle can be configured through this method.
233
    ///
234
    /// This method can be used to prevent the global helper thread from
235
    /// spawning. If this method is successful then the global helper thread
236
    /// will never get spun up.
237
    ///
238
    /// On success this timer handle will have installed itself globally to be
239
    /// used as the return value for `TimerHandle::default` unless otherwise
240
    /// specified.
241
    ///
242
    /// # Errors
243
    ///
244
    /// If another thread has already called `set_as_global_fallback` or this
245
    /// thread otherwise loses a race to call this method then it will fail
246
    /// returning an error. Once a call to `set_as_global_fallback` is
247
    /// successful then no future calls may succeed.
248
0
    fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
249
0
        unsafe {
250
0
            let val = self.into_raw();
251
0
            match HANDLE_FALLBACK.compare_exchange(EMPTY_HANDLE, val, SeqCst, SeqCst) {
252
0
                Ok(_) => Ok(()),
253
                Err(_) => {
254
0
                    drop(TimerHandle::from_raw(val));
255
0
                    Err(SetDefaultError(()))
256
                }
257
            }
258
        }
259
0
    }
260
261
0
    fn into_raw(self) -> *mut Inner {
262
0
        self.inner.into_raw() as *mut Inner
263
0
    }
264
265
0
    unsafe fn from_raw(val: *mut Inner) -> TimerHandle {
266
0
        let inner = Weak::from_raw(val);
267
0
        TimerHandle { inner }
268
0
    }
269
}
270
271
impl Default for TimerHandle {
272
0
    fn default() -> TimerHandle {
273
0
        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
274
0
275
0
        // If the fallback hasn't been previously initialized then let's spin
276
0
        // up a helper thread and try to initialize with that. If we can't
277
0
        // actually create a helper thread then we'll just return a "defunkt"
278
0
        // handle which will return errors when timer objects are attempted to
279
0
        // be associated.
280
0
        if fallback == EMPTY_HANDLE {
281
0
            let helper = match global::HelperThread::new() {
282
0
                Ok(helper) => helper,
283
0
                Err(_) => return TimerHandle { inner: Weak::new() },
284
            };
285
286
            // If we successfully set ourselves as the actual fallback then we
287
            // want to `forget` the helper thread to ensure that it persists
288
            // globally. If we fail to set ourselves as the fallback that means
289
            // that someone was racing with this call to
290
            // `TimerHandle::default`.  They ended up winning so we'll destroy
291
            // our helper thread (which shuts down the thread) and reload the
292
            // fallback.
293
0
            if helper.handle().set_as_global_fallback().is_ok() {
294
0
                let ret = helper.handle();
295
0
                helper.forget();
296
0
                return ret;
297
0
            }
298
0
            fallback = HANDLE_FALLBACK.load(SeqCst);
299
0
        }
300
301
        // At this point our fallback handle global was configured so we use
302
        // its value to reify a handle, clone it, and then forget our reified
303
        // handle as we don't actually have an owning reference to it.
304
0
        assert!(fallback != EMPTY_HANDLE);
305
        unsafe {
306
0
            let handle = TimerHandle::from_raw(fallback);
307
0
            let ret = handle.clone();
308
0
            let _ = handle.into_raw();
309
0
            ret
310
        }
311
0
    }
312
}
313
314
impl fmt::Debug for TimerHandle {
315
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
316
0
        f.debug_struct("TimerHandle")
317
0
            .field("inner", &"...")
318
0
            .finish()
319
0
    }
320
}