Coverage Report

Created: 2025-07-11 07:25

/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.21/src/sync/parker.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
2
use crate::primitive::sync::{Arc, Condvar, Mutex};
3
use std::fmt;
4
use std::marker::PhantomData;
5
use std::time::{Duration, Instant};
6
7
/// A thread parking primitive.
8
///
9
/// Conceptually, each `Parker` has an associated token which is initially not present:
10
///
11
/// * The [`park`] method blocks the current thread unless or until the token is available, at
12
///   which point it automatically consumes the token.
13
///
14
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
15
///   a specified maximum time.
16
///
17
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
18
///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
19
///   returning immediately.
20
///
21
/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
22
/// [`park`] and [`unpark`].
23
///
24
/// # Examples
25
///
26
/// ```
27
/// use std::thread;
28
/// use std::time::Duration;
29
/// use crossbeam_utils::sync::Parker;
30
///
31
/// let p = Parker::new();
32
/// let u = p.unparker().clone();
33
///
34
/// // Make the token available.
35
/// u.unpark();
36
/// // Wakes up immediately and consumes the token.
37
/// p.park();
38
///
39
/// thread::spawn(move || {
40
///     thread::sleep(Duration::from_millis(500));
41
///     u.unpark();
42
/// });
43
///
44
/// // Wakes up when `u.unpark()` provides the token.
45
/// p.park();
46
/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
47
/// ```
48
///
49
/// [`park`]: Parker::park
50
/// [`park_timeout`]: Parker::park_timeout
51
/// [`park_deadline`]: Parker::park_deadline
52
/// [`unpark`]: Unparker::unpark
53
pub struct Parker {
54
    unparker: Unparker,
55
    _marker: PhantomData<*const ()>,
56
}
57
58
unsafe impl Send for Parker {}
59
60
impl Default for Parker {
61
0
    fn default() -> Self {
62
0
        Self {
63
0
            unparker: Unparker {
64
0
                inner: Arc::new(Inner {
65
0
                    state: AtomicUsize::new(EMPTY),
66
0
                    lock: Mutex::new(()),
67
0
                    cvar: Condvar::new(),
68
0
                }),
69
0
            },
70
0
            _marker: PhantomData,
71
0
        }
72
0
    }
73
}
74
75
impl Parker {
76
    /// Creates a new `Parker`.
77
    ///
78
    /// # Examples
79
    ///
80
    /// ```
81
    /// use crossbeam_utils::sync::Parker;
82
    ///
83
    /// let p = Parker::new();
84
    /// ```
85
    ///
86
0
    pub fn new() -> Parker {
87
0
        Self::default()
88
0
    }
89
90
    /// Blocks the current thread until the token is made available.
91
    ///
92
    /// # Examples
93
    ///
94
    /// ```
95
    /// use crossbeam_utils::sync::Parker;
96
    ///
97
    /// let p = Parker::new();
98
    /// let u = p.unparker().clone();
99
    ///
100
    /// // Make the token available.
101
    /// u.unpark();
102
    ///
103
    /// // Wakes up immediately and consumes the token.
104
    /// p.park();
105
    /// ```
106
0
    pub fn park(&self) {
107
0
        self.unparker.inner.park(None);
108
0
    }
109
110
    /// Blocks the current thread until the token is made available, but only for a limited time.
111
    ///
112
    /// # Examples
113
    ///
114
    /// ```
115
    /// use std::time::Duration;
116
    /// use crossbeam_utils::sync::Parker;
117
    ///
118
    /// let p = Parker::new();
119
    ///
120
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
121
    /// p.park_timeout(Duration::from_millis(500));
122
    /// ```
123
0
    pub fn park_timeout(&self, timeout: Duration) {
124
0
        match Instant::now().checked_add(timeout) {
125
0
            Some(deadline) => self.park_deadline(deadline),
126
0
            None => self.park(),
127
        }
128
0
    }
129
130
    /// Blocks the current thread until the token is made available, or until a certain deadline.
131
    ///
132
    /// # Examples
133
    ///
134
    /// ```
135
    /// use std::time::{Duration, Instant};
136
    /// use crossbeam_utils::sync::Parker;
137
    ///
138
    /// let p = Parker::new();
139
    /// let deadline = Instant::now() + Duration::from_millis(500);
140
    ///
141
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
142
    /// p.park_deadline(deadline);
143
    /// ```
144
0
    pub fn park_deadline(&self, deadline: Instant) {
145
0
        self.unparker.inner.park(Some(deadline))
146
0
    }
147
148
    /// Returns a reference to an associated [`Unparker`].
149
    ///
150
    /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
151
    ///
152
    /// # Examples
153
    ///
154
    /// ```
155
    /// use crossbeam_utils::sync::Parker;
156
    ///
157
    /// let p = Parker::new();
158
    /// let u = p.unparker().clone();
159
    ///
160
    /// // Make the token available.
161
    /// u.unpark();
162
    /// // Wakes up immediately and consumes the token.
163
    /// p.park();
164
    /// ```
165
    ///
166
    /// [`park`]: Parker::park
167
    /// [`park_timeout`]: Parker::park_timeout
168
0
    pub fn unparker(&self) -> &Unparker {
169
0
        &self.unparker
170
0
    }
171
172
    /// Converts a `Parker` into a raw pointer.
173
    ///
174
    /// # Examples
175
    ///
176
    /// ```
177
    /// use crossbeam_utils::sync::Parker;
178
    ///
179
    /// let p = Parker::new();
180
    /// let raw = Parker::into_raw(p);
181
    /// # let _ = unsafe { Parker::from_raw(raw) };
182
    /// ```
183
0
    pub fn into_raw(this: Parker) -> *const () {
184
0
        Unparker::into_raw(this.unparker)
185
0
    }
186
187
    /// Converts a raw pointer into a `Parker`.
188
    ///
189
    /// # Safety
190
    ///
191
    /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
192
    ///
193
    /// # Examples
194
    ///
195
    /// ```
196
    /// use crossbeam_utils::sync::Parker;
197
    ///
198
    /// let p = Parker::new();
199
    /// let raw = Parker::into_raw(p);
200
    /// let p = unsafe { Parker::from_raw(raw) };
201
    /// ```
202
0
    pub unsafe fn from_raw(ptr: *const ()) -> Parker {
203
0
        Parker {
204
0
            unparker: Unparker::from_raw(ptr),
205
0
            _marker: PhantomData,
206
0
        }
207
0
    }
208
}
209
210
impl fmt::Debug for Parker {
211
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212
0
        f.pad("Parker { .. }")
213
0
    }
214
}
215
216
/// Unparks a thread parked by the associated [`Parker`].
217
pub struct Unparker {
218
    inner: Arc<Inner>,
219
}
220
221
unsafe impl Send for Unparker {}
222
unsafe impl Sync for Unparker {}
223
224
impl Unparker {
225
    /// Atomically makes the token available if it is not already.
226
    ///
227
    /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
228
    /// any.
229
    ///
230
    /// # Examples
231
    ///
232
    /// ```
233
    /// use std::thread;
234
    /// use std::time::Duration;
235
    /// use crossbeam_utils::sync::Parker;
236
    ///
237
    /// let p = Parker::new();
238
    /// let u = p.unparker().clone();
239
    ///
240
    /// thread::spawn(move || {
241
    ///     thread::sleep(Duration::from_millis(500));
242
    ///     u.unpark();
243
    /// });
244
    ///
245
    /// // Wakes up when `u.unpark()` provides the token.
246
    /// p.park();
247
    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
248
    /// ```
249
    ///
250
    /// [`park`]: Parker::park
251
    /// [`park_timeout`]: Parker::park_timeout
252
0
    pub fn unpark(&self) {
253
0
        self.inner.unpark()
254
0
    }
255
256
    /// Converts an `Unparker` into a raw pointer.
257
    ///
258
    /// # Examples
259
    ///
260
    /// ```
261
    /// use crossbeam_utils::sync::{Parker, Unparker};
262
    ///
263
    /// let p = Parker::new();
264
    /// let u = p.unparker().clone();
265
    /// let raw = Unparker::into_raw(u);
266
    /// # let _ = unsafe { Unparker::from_raw(raw) };
267
    /// ```
268
0
    pub fn into_raw(this: Unparker) -> *const () {
269
0
        Arc::into_raw(this.inner).cast::<()>()
270
0
    }
271
272
    /// Converts a raw pointer into an `Unparker`.
273
    ///
274
    /// # Safety
275
    ///
276
    /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
277
    ///
278
    /// # Examples
279
    ///
280
    /// ```
281
    /// use crossbeam_utils::sync::{Parker, Unparker};
282
    ///
283
    /// let p = Parker::new();
284
    /// let u = p.unparker().clone();
285
    ///
286
    /// let raw = Unparker::into_raw(u);
287
    /// let u = unsafe { Unparker::from_raw(raw) };
288
    /// ```
289
0
    pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
290
0
        Unparker {
291
0
            inner: Arc::from_raw(ptr.cast::<Inner>()),
292
0
        }
293
0
    }
294
}
295
296
impl fmt::Debug for Unparker {
297
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298
0
        f.pad("Unparker { .. }")
299
0
    }
300
}
301
302
impl Clone for Unparker {
303
0
    fn clone(&self) -> Unparker {
304
0
        Unparker {
305
0
            inner: self.inner.clone(),
306
0
        }
307
0
    }
308
}
309
310
const EMPTY: usize = 0;
311
const PARKED: usize = 1;
312
const NOTIFIED: usize = 2;
313
314
struct Inner {
315
    state: AtomicUsize,
316
    lock: Mutex<()>,
317
    cvar: Condvar,
318
}
319
320
impl Inner {
321
0
    fn park(&self, deadline: Option<Instant>) {
322
0
        // If we were previously notified then we consume this notification and return quickly.
323
0
        if self
324
0
            .state
325
0
            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
326
0
            .is_ok()
327
        {
328
0
            return;
329
0
        }
330
331
        // If the timeout is zero, then there is no need to actually block.
332
0
        if let Some(deadline) = deadline {
333
0
            if deadline <= Instant::now() {
334
0
                return;
335
0
            }
336
0
        }
337
338
        // Otherwise we need to coordinate going to sleep.
339
0
        let mut m = self.lock.lock().unwrap();
340
0
341
0
        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
342
0
            Ok(_) => {}
343
            // Consume this notification to avoid spurious wakeups in the next park.
344
            Err(NOTIFIED) => {
345
                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
346
                // because `unpark` may have been called again since we read `NOTIFIED` in the
347
                // `compare_exchange` above. We must perform an acquire operation that synchronizes
348
                // with that `unpark` to observe any writes it made before the call to `unpark`. To
349
                // do that we must read from the write it made to `state`.
350
0
                let old = self.state.swap(EMPTY, SeqCst);
351
0
                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
352
0
                return;
353
            }
354
0
            Err(n) => panic!("inconsistent park_timeout state: {}", n),
355
        }
356
357
        loop {
358
            // Block the current thread on the conditional variable.
359
0
            m = match deadline {
360
0
                None => self.cvar.wait(m).unwrap(),
361
0
                Some(deadline) => {
362
0
                    let now = Instant::now();
363
0
                    if now < deadline {
364
                        // We could check for a timeout here, in the return value of wait_timeout,
365
                        // but in the case that a timeout and an unpark arrive simultaneously, we
366
                        // prefer to report the former.
367
0
                        self.cvar.wait_timeout(m, deadline - now).unwrap().0
368
                    } else {
369
                        // We've timed out; swap out the state back to empty on our way out
370
0
                        match self.state.swap(EMPTY, SeqCst) {
371
0
                            NOTIFIED | PARKED => return,
372
0
                            n => panic!("inconsistent park_timeout state: {}", n),
373
                        };
374
                    }
375
                }
376
            };
377
378
0
            if self
379
0
                .state
380
0
                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
381
0
                .is_ok()
382
            {
383
                // got a notification
384
0
                return;
385
0
            }
386
387
            // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
388
            // in the branch above, when we discover the deadline is in the past
389
        }
390
0
    }
391
392
0
    pub(crate) fn unpark(&self) {
393
0
        // To ensure the unparked thread will observe any writes we made before this call, we must
394
0
        // perform a release operation that `park` can synchronize with. To do that we must write
395
0
        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
396
0
        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
397
0
        match self.state.swap(NOTIFIED, SeqCst) {
398
0
            EMPTY => return,    // no one was waiting
399
0
            NOTIFIED => return, // already unparked
400
0
            PARKED => {}        // gotta go wake someone up
401
0
            _ => panic!("inconsistent state in unpark"),
402
        }
403
404
        // There is a period between when the parked thread sets `state` to `PARKED` (or last
405
        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
406
        // If we were to notify during this period it would be ignored and then when the parked
407
        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
408
        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
409
        //
410
        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
411
        // it doesn't get woken only to have to wait for us to release `lock`.
412
0
        drop(self.lock.lock().unwrap());
413
0
        self.cvar.notify_one();
414
0
    }
415
}