/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-timer-3.0.3/src/native/timer.rs
Line | Count | Source (jump to first uncovered line) |
1 | | use std::fmt; |
2 | | use std::pin::Pin; |
3 | | use std::sync::atomic::Ordering::SeqCst; |
4 | | use std::sync::atomic::{AtomicPtr, AtomicUsize}; |
5 | | use std::sync::{Arc, Mutex, Weak}; |
6 | | use std::task::{Context, Poll}; |
7 | | use std::time::Instant; |
8 | | |
9 | | use std::future::Future; |
10 | | |
11 | | use super::AtomicWaker; |
12 | | use super::{global, ArcList, Heap, HeapTimer, Node, Slot}; |
13 | | |
14 | | /// A "timer heap" used to power separately owned instances of `Delay`. |
15 | | /// |
16 | | /// This timer is implemented as a priority queued-based heap. Each `Timer` |
17 | | /// contains a few primary methods which which to drive it: |
18 | | /// |
19 | | /// * `next_wake` indicates how long the ambient system needs to sleep until it |
20 | | /// invokes further processing on a `Timer` |
21 | | /// * `advance_to` is what actually fires timers on the `Timer`, and should be |
22 | | /// called essentially every iteration of the event loop, or when the time |
23 | | /// specified by `next_wake` has elapsed. |
24 | | /// * The `Future` implementation for `Timer` is used to process incoming timer |
25 | | /// updates and requests. This is used to schedule new timeouts, update |
26 | | /// existing ones, or delete existing timeouts. The `Future` implementation |
27 | | /// will never resolve, but it'll schedule notifications of when to wake up |
28 | | /// and process more messages. |
29 | | /// |
30 | | /// Note that if you're using this crate you probably don't need to use a |
31 | | /// `Timer` as there is a global one already available for you run on a helper |
32 | | /// thread. If this isn't desirable, though, then the |
33 | | /// `TimerHandle::set_fallback` method can be used instead! |
34 | | pub struct Timer { |
35 | | inner: Arc<Inner>, |
36 | | timer_heap: Heap<HeapTimer>, |
37 | | } |
38 | | |
39 | | /// A handle to a `Timer` which is used to create instances of a `Delay`. |
40 | | #[derive(Clone)] |
41 | | pub struct TimerHandle { |
42 | | pub(crate) inner: Weak<Inner>, |
43 | | } |
44 | | |
45 | | pub(crate) struct Inner { |
46 | | /// List of updates the `Timer` needs to process |
47 | | pub(crate) list: ArcList<ScheduledTimer>, |
48 | | |
49 | | /// The blocked `Timer` task to receive notifications to the `list` above. |
50 | | pub(crate) waker: AtomicWaker, |
51 | | } |
52 | | |
53 | | /// Shared state between the `Timer` and a `Delay`. |
54 | | pub(crate) struct ScheduledTimer { |
55 | | pub(crate) waker: AtomicWaker, |
56 | | |
57 | | // The lowest bit here is whether the timer has fired or not, the second |
58 | | // lowest bit is whether the timer has been invalidated, and all the other |
59 | | // bits are the "generation" of the timer which is reset during the `reset` |
60 | | // function. Only timers for a matching generation are fired. |
61 | | pub(crate) state: AtomicUsize, |
62 | | |
63 | | pub(crate) inner: Weak<Inner>, |
64 | | pub(crate) at: Mutex<Option<Instant>>, |
65 | | |
66 | | // TODO: this is only accessed by the timer thread, should have a more |
67 | | // lightweight protection than a `Mutex` |
68 | | pub(crate) slot: Mutex<Option<Slot>>, |
69 | | } |
70 | | |
71 | | impl Timer { |
72 | | /// Creates a new timer heap ready to create new timers. |
73 | 0 | pub fn new() -> Timer { |
74 | 0 | Timer { |
75 | 0 | inner: Arc::new(Inner { |
76 | 0 | list: ArcList::new(), |
77 | 0 | waker: AtomicWaker::new(), |
78 | 0 | }), |
79 | 0 | timer_heap: Heap::new(), |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | | /// Returns a handle to this timer heap, used to create new timeouts. |
84 | 0 | pub fn handle(&self) -> TimerHandle { |
85 | 0 | TimerHandle { |
86 | 0 | inner: Arc::downgrade(&self.inner), |
87 | 0 | } |
88 | 0 | } |
89 | | |
90 | | /// Returns the time at which this timer next needs to be invoked with |
91 | | /// `advance_to`. |
92 | | /// |
93 | | /// Event loops or threads typically want to sleep until the specified |
94 | | /// instant. |
95 | 0 | pub fn next_event(&self) -> Option<Instant> { |
96 | 0 | self.timer_heap.peek().map(|t| t.at) |
97 | 0 | } |
98 | | |
99 | | /// Proces any timers which are supposed to fire at or before the current |
100 | | /// instant. |
101 | | /// |
102 | | /// This method is equivalent to `self.advance_to(Instant::now())`. |
103 | 0 | pub fn advance(&mut self) { |
104 | 0 | self.advance_to(Instant::now()) |
105 | 0 | } |
106 | | |
107 | | /// Proces any timers which are supposed to fire before `now` specified. |
108 | | /// |
109 | | /// This method should be called on `Timer` periodically to advance the |
110 | | /// internal state and process any pending timers which need to fire. |
111 | 0 | pub fn advance_to(&mut self, now: Instant) { |
112 | | loop { |
113 | 0 | match self.timer_heap.peek() { |
114 | 0 | Some(head) if head.at <= now => {} |
115 | 0 | Some(_) => break, |
116 | 0 | None => break, |
117 | | }; |
118 | | |
119 | | // Flag the timer as fired and then notify its task, if any, that's |
120 | | // blocked. |
121 | 0 | let heap_timer = self.timer_heap.pop().unwrap(); |
122 | 0 | *heap_timer.node.slot.lock().unwrap() = None; |
123 | 0 | let bits = heap_timer.gen << 2; |
124 | 0 | match heap_timer |
125 | 0 | .node |
126 | 0 | .state |
127 | 0 | .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst) |
128 | | { |
129 | 0 | Ok(_) => heap_timer.node.waker.wake(), |
130 | 0 | Err(_b) => {} |
131 | | } |
132 | | } |
133 | 0 | } |
134 | | |
135 | | /// Either updates the timer at slot `idx` to fire at `at`, or adds a new |
136 | | /// timer at `idx` and sets it to fire at `at`. |
137 | 0 | fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) { |
138 | 0 | // TODO: avoid remove + push and instead just do one sift of the heap? |
139 | 0 | // In theory we could update it in place and then do the percolation |
140 | 0 | // as necessary |
141 | 0 | let gen = node.state.load(SeqCst) >> 2; |
142 | 0 | let mut slot = node.slot.lock().unwrap(); |
143 | 0 | if let Some(heap_slot) = slot.take() { |
144 | 0 | self.timer_heap.remove(heap_slot); |
145 | 0 | } |
146 | 0 | *slot = Some(self.timer_heap.push(HeapTimer { |
147 | 0 | at, |
148 | 0 | gen, |
149 | 0 | node: node.clone(), |
150 | 0 | })); |
151 | 0 | } |
152 | | |
153 | 0 | fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) { |
154 | 0 | // If this `idx` is still around and it's still got a registered timer, |
155 | 0 | // then we jettison it form the timer heap. |
156 | 0 | let mut slot = node.slot.lock().unwrap(); |
157 | 0 | let heap_slot = match slot.take() { |
158 | 0 | Some(slot) => slot, |
159 | 0 | None => return, |
160 | | }; |
161 | 0 | self.timer_heap.remove(heap_slot); |
162 | 0 | } |
163 | | |
164 | 0 | fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) { |
165 | 0 | node.state.fetch_or(0b10, SeqCst); |
166 | 0 | node.waker.wake(); |
167 | 0 | } |
168 | | } |
169 | | |
170 | | impl Future for Timer { |
171 | | type Output = (); |
172 | | |
173 | 0 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
174 | 0 | Pin::new(&mut self.inner).waker.register(cx.waker()); |
175 | 0 | let mut list = self.inner.list.take(); |
176 | 0 | while let Some(node) = list.pop() { |
177 | 0 | let at = *node.at.lock().unwrap(); |
178 | 0 | match at { |
179 | 0 | Some(at) => self.update_or_add(at, node), |
180 | 0 | None => self.remove(node), |
181 | | } |
182 | | } |
183 | 0 | Poll::Pending |
184 | 0 | } |
185 | | } |
186 | | |
187 | | impl Drop for Timer { |
188 | 0 | fn drop(&mut self) { |
189 | 0 | // Seal off our list to prevent any more updates from getting pushed on. |
190 | 0 | // Any timer which sees an error from the push will immediately become |
191 | 0 | // inert. |
192 | 0 | let mut list = self.inner.list.take_and_seal(); |
193 | | |
194 | | // Now that we'll never receive another timer, drain the list of all |
195 | | // updates and also drain our heap of all active timers, invalidating |
196 | | // everything. |
197 | 0 | while let Some(t) = list.pop() { |
198 | 0 | self.invalidate(t); |
199 | 0 | } |
200 | 0 | while let Some(t) = self.timer_heap.pop() { |
201 | 0 | self.invalidate(t.node); |
202 | 0 | } |
203 | 0 | } |
204 | | } |
205 | | |
206 | | impl fmt::Debug for Timer { |
207 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { |
208 | 0 | f.debug_struct("Timer").field("heap", &"...").finish() |
209 | 0 | } |
210 | | } |
211 | | |
212 | | impl Default for Timer { |
213 | 0 | fn default() -> Self { |
214 | 0 | Self::new() |
215 | 0 | } |
216 | | } |
217 | | |
218 | | static HANDLE_FALLBACK: AtomicPtr<Inner> = AtomicPtr::new(EMPTY_HANDLE); |
219 | | const EMPTY_HANDLE: *mut Inner = std::ptr::null_mut(); |
220 | | |
221 | | /// Error returned from `TimerHandle::set_fallback`. |
222 | | #[derive(Clone, Debug)] |
223 | | struct SetDefaultError(()); |
224 | | |
225 | | impl TimerHandle { |
226 | | /// Configures this timer handle to be the one returned by |
227 | | /// `TimerHandle::default`. |
228 | | /// |
229 | | /// By default a global thread is initialized on the first call to |
230 | | /// `TimerHandle::default`. This first call can happen transitively through |
231 | | /// `Delay::new`. If, however, that hasn't happened yet then the global |
232 | | /// default timer handle can be configured through this method. |
233 | | /// |
234 | | /// This method can be used to prevent the global helper thread from |
235 | | /// spawning. If this method is successful then the global helper thread |
236 | | /// will never get spun up. |
237 | | /// |
238 | | /// On success this timer handle will have installed itself globally to be |
239 | | /// used as the return value for `TimerHandle::default` unless otherwise |
240 | | /// specified. |
241 | | /// |
242 | | /// # Errors |
243 | | /// |
244 | | /// If another thread has already called `set_as_global_fallback` or this |
245 | | /// thread otherwise loses a race to call this method then it will fail |
246 | | /// returning an error. Once a call to `set_as_global_fallback` is |
247 | | /// successful then no future calls may succeed. |
248 | 0 | fn set_as_global_fallback(self) -> Result<(), SetDefaultError> { |
249 | 0 | unsafe { |
250 | 0 | let val = self.into_raw(); |
251 | 0 | match HANDLE_FALLBACK.compare_exchange(EMPTY_HANDLE, val, SeqCst, SeqCst) { |
252 | 0 | Ok(_) => Ok(()), |
253 | | Err(_) => { |
254 | 0 | drop(TimerHandle::from_raw(val)); |
255 | 0 | Err(SetDefaultError(())) |
256 | | } |
257 | | } |
258 | | } |
259 | 0 | } |
260 | | |
261 | 0 | fn into_raw(self) -> *mut Inner { |
262 | 0 | self.inner.into_raw() as *mut Inner |
263 | 0 | } |
264 | | |
265 | 0 | unsafe fn from_raw(val: *mut Inner) -> TimerHandle { |
266 | 0 | let inner = Weak::from_raw(val); |
267 | 0 | TimerHandle { inner } |
268 | 0 | } |
269 | | } |
270 | | |
271 | | impl Default for TimerHandle { |
272 | 0 | fn default() -> TimerHandle { |
273 | 0 | let mut fallback = HANDLE_FALLBACK.load(SeqCst); |
274 | 0 |
|
275 | 0 | // If the fallback hasn't been previously initialized then let's spin |
276 | 0 | // up a helper thread and try to initialize with that. If we can't |
277 | 0 | // actually create a helper thread then we'll just return a "defunkt" |
278 | 0 | // handle which will return errors when timer objects are attempted to |
279 | 0 | // be associated. |
280 | 0 | if fallback == EMPTY_HANDLE { |
281 | 0 | let helper = match global::HelperThread::new() { |
282 | 0 | Ok(helper) => helper, |
283 | 0 | Err(_) => return TimerHandle { inner: Weak::new() }, |
284 | | }; |
285 | | |
286 | | // If we successfully set ourselves as the actual fallback then we |
287 | | // want to `forget` the helper thread to ensure that it persists |
288 | | // globally. If we fail to set ourselves as the fallback that means |
289 | | // that someone was racing with this call to |
290 | | // `TimerHandle::default`. They ended up winning so we'll destroy |
291 | | // our helper thread (which shuts down the thread) and reload the |
292 | | // fallback. |
293 | 0 | if helper.handle().set_as_global_fallback().is_ok() { |
294 | 0 | let ret = helper.handle(); |
295 | 0 | helper.forget(); |
296 | 0 | return ret; |
297 | 0 | } |
298 | 0 | fallback = HANDLE_FALLBACK.load(SeqCst); |
299 | 0 | } |
300 | | |
301 | | // At this point our fallback handle global was configured so we use |
302 | | // its value to reify a handle, clone it, and then forget our reified |
303 | | // handle as we don't actually have an owning reference to it. |
304 | 0 | assert!(fallback != EMPTY_HANDLE); |
305 | | unsafe { |
306 | 0 | let handle = TimerHandle::from_raw(fallback); |
307 | 0 | let ret = handle.clone(); |
308 | 0 | let _ = handle.into_raw(); |
309 | 0 | ret |
310 | | } |
311 | 0 | } |
312 | | } |
313 | | |
314 | | impl fmt::Debug for TimerHandle { |
315 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { |
316 | 0 | f.debug_struct("TimerHandle") |
317 | 0 | .field("inner", &"...") |
318 | 0 | .finish() |
319 | 0 | } |
320 | | } |