Coverage Report

Created: 2025-02-25 06:39

/rust/registry/src/index.crates.io-6f17d22bba15001f/parking_lot-0.12.3/src/condvar.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Amanieu d'Antras
2
//
3
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5
// http://opensource.org/licenses/MIT>, at your option. This file may not be
6
// copied, modified, or distributed except according to those terms.
7
8
use crate::mutex::MutexGuard;
9
use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10
use crate::{deadlock, util};
11
use core::{
12
    fmt, ptr,
13
    sync::atomic::{AtomicPtr, Ordering},
14
};
15
use lock_api::RawMutex as RawMutex_;
16
use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17
use std::ops::DerefMut;
18
use std::time::{Duration, Instant};
19
20
/// A type indicating whether a timed wait on a condition variable returned
21
/// due to a time out or not.
22
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23
pub struct WaitTimeoutResult(bool);
24
25
impl WaitTimeoutResult {
26
    /// Returns whether the wait was known to have timed out.
27
    #[inline]
28
0
    pub fn timed_out(self) -> bool {
29
0
        self.0
30
0
    }
Unexecuted instantiation: <parking_lot::condvar::WaitTimeoutResult>::timed_out
Unexecuted instantiation: <parking_lot::condvar::WaitTimeoutResult>::timed_out
31
}
32
33
/// A Condition Variable
34
///
35
/// Condition variables represent the ability to block a thread such that it
36
/// consumes no CPU time while waiting for an event to occur. Condition
37
/// variables are typically associated with a boolean predicate (a condition)
38
/// and a mutex. The predicate is always verified inside of the mutex before
39
/// determining that thread must block.
40
///
41
/// Note that this module places one additional restriction over the system
42
/// condition variables: each condvar can be used with only one mutex at a
43
/// time. Any attempt to use multiple mutexes on the same condition variable
44
/// simultaneously will result in a runtime panic. However it is possible to
45
/// switch to a different mutex if there are no threads currently waiting on
46
/// the condition variable.
47
///
48
/// # Differences from the standard library `Condvar`
49
///
50
/// - No spurious wakeups: A wait will only return a non-timeout result if it
51
///   was woken up by `notify_one` or `notify_all`.
52
/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53
///   requeued to wait for the `Mutex` to be unlocked by the thread that was
54
///   woken up.
55
/// - Only requires 1 word of space, whereas the standard library boxes the
56
///   `Condvar` due to platform limitations.
57
/// - Can be statically constructed.
58
/// - Does not require any drop glue when dropped.
59
/// - Inline fast path for the uncontended case.
60
///
61
/// # Examples
62
///
63
/// ```
64
/// use parking_lot::{Mutex, Condvar};
65
/// use std::sync::Arc;
66
/// use std::thread;
67
///
68
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69
/// let pair2 = pair.clone();
70
///
71
/// // Inside of our lock, spawn a new thread, and then wait for it to start
72
/// thread::spawn(move|| {
73
///     let &(ref lock, ref cvar) = &*pair2;
74
///     let mut started = lock.lock();
75
///     *started = true;
76
///     cvar.notify_one();
77
/// });
78
///
79
/// // wait for the thread to start up
80
/// let &(ref lock, ref cvar) = &*pair;
81
/// let mut started = lock.lock();
82
/// if !*started {
83
///     cvar.wait(&mut started);
84
/// }
85
/// // Note that we used an if instead of a while loop above. This is only
86
/// // possible because parking_lot's Condvar will never spuriously wake up.
87
/// // This means that wait() will only return after notify_one or notify_all is
88
/// // called.
89
/// ```
90
pub struct Condvar {
91
    state: AtomicPtr<RawMutex>,
92
}
93
94
impl Condvar {
95
    /// Creates a new condition variable which is ready to be waited on and
96
    /// notified.
97
    #[inline]
98
0
    pub const fn new() -> Condvar {
99
0
        Condvar {
100
0
            state: AtomicPtr::new(ptr::null_mut()),
101
0
        }
102
0
    }
Unexecuted instantiation: <parking_lot::condvar::Condvar>::new
Unexecuted instantiation: <parking_lot::condvar::Condvar>::new
103
104
    /// Wakes up one blocked thread on this condvar.
105
    ///
106
    /// Returns whether a thread was woken up.
107
    ///
108
    /// If there is a blocked thread on this condition variable, then it will
109
    /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110
    /// `notify_one` are not buffered in any way.
111
    ///
112
    /// To wake up all threads, see `notify_all()`.
113
    ///
114
    /// # Examples
115
    ///
116
    /// ```
117
    /// use parking_lot::Condvar;
118
    ///
119
    /// let condvar = Condvar::new();
120
    ///
121
    /// // do something with condvar, share it with other threads
122
    ///
123
    /// if !condvar.notify_one() {
124
    ///     println!("Nobody was listening for this.");
125
    /// }
126
    /// ```
127
    #[inline]
128
0
    pub fn notify_one(&self) -> bool {
129
0
        // Nothing to do if there are no waiting threads
130
0
        let state = self.state.load(Ordering::Relaxed);
131
0
        if state.is_null() {
132
0
            return false;
133
0
        }
134
0
135
0
        self.notify_one_slow(state)
136
0
    }
Unexecuted instantiation: <parking_lot::condvar::Condvar>::notify_one
Unexecuted instantiation: <parking_lot::condvar::Condvar>::notify_one
137
138
    #[cold]
139
0
    fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140
0
        // Unpark one thread and requeue the rest onto the mutex
141
0
        let from = self as *const _ as usize;
142
0
        let to = mutex as usize;
143
0
        let validate = || {
144
0
            // Make sure that our atomic state still points to the same
145
0
            // mutex. If not then it means that all threads on the current
146
0
            // mutex were woken up and a new waiting thread switched to a
147
0
            // different mutex. In that case we can get away with doing
148
0
            // nothing.
149
0
            if self.state.load(Ordering::Relaxed) != mutex {
150
0
                return RequeueOp::Abort;
151
0
            }
152
0
153
0
            // Unpark one thread if the mutex is unlocked, otherwise just
154
0
            // requeue everything to the mutex. This is safe to do here
155
0
            // since unlocking the mutex when the parked bit is set requires
156
0
            // locking the queue. There is the possibility of a race if the
157
0
            // mutex gets locked after we check, but that doesn't matter in
158
0
            // this case.
159
0
            if unsafe { (*mutex).mark_parked_if_locked() } {
160
0
                RequeueOp::RequeueOne
161
            } else {
162
0
                RequeueOp::UnparkOne
163
            }
164
0
        };
165
0
        let callback = |_op, result: UnparkResult| {
166
0
            // Clear our state if there are no more waiting threads
167
0
            if !result.have_more_threads {
168
0
                self.state.store(ptr::null_mut(), Ordering::Relaxed);
169
0
            }
170
0
            TOKEN_NORMAL
171
0
        };
172
0
        let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173
0
174
0
        res.unparked_threads + res.requeued_threads != 0
175
0
    }
176
177
    /// Wakes up all blocked threads on this condvar.
178
    ///
179
    /// Returns the number of threads woken up.
180
    ///
181
    /// This method will ensure that any current waiters on the condition
182
    /// variable are awoken. Calls to `notify_all()` are not buffered in any
183
    /// way.
184
    ///
185
    /// To wake up only one thread, see `notify_one()`.
186
    #[inline]
187
0
    pub fn notify_all(&self) -> usize {
188
0
        // Nothing to do if there are no waiting threads
189
0
        let state = self.state.load(Ordering::Relaxed);
190
0
        if state.is_null() {
191
0
            return 0;
192
0
        }
193
0
194
0
        self.notify_all_slow(state)
195
0
    }
Unexecuted instantiation: <parking_lot::condvar::Condvar>::notify_all
Unexecuted instantiation: <parking_lot::condvar::Condvar>::notify_all
196
197
    #[cold]
198
0
    fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199
0
        // Unpark one thread and requeue the rest onto the mutex
200
0
        let from = self as *const _ as usize;
201
0
        let to = mutex as usize;
202
0
        let validate = || {
203
0
            // Make sure that our atomic state still points to the same
204
0
            // mutex. If not then it means that all threads on the current
205
0
            // mutex were woken up and a new waiting thread switched to a
206
0
            // different mutex. In that case we can get away with doing
207
0
            // nothing.
208
0
            if self.state.load(Ordering::Relaxed) != mutex {
209
0
                return RequeueOp::Abort;
210
0
            }
211
0
212
0
            // Clear our state since we are going to unpark or requeue all
213
0
            // threads.
214
0
            self.state.store(ptr::null_mut(), Ordering::Relaxed);
215
0
216
0
            // Unpark one thread if the mutex is unlocked, otherwise just
217
0
            // requeue everything to the mutex. This is safe to do here
218
0
            // since unlocking the mutex when the parked bit is set requires
219
0
            // locking the queue. There is the possibility of a race if the
220
0
            // mutex gets locked after we check, but that doesn't matter in
221
0
            // this case.
222
0
            if unsafe { (*mutex).mark_parked_if_locked() } {
223
0
                RequeueOp::RequeueAll
224
            } else {
225
0
                RequeueOp::UnparkOneRequeueRest
226
            }
227
0
        };
228
0
        let callback = |op, result: UnparkResult| {
229
0
            // If we requeued threads to the mutex, mark it as having
230
0
            // parked threads. The RequeueAll case is already handled above.
231
0
            if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232
0
                unsafe { (*mutex).mark_parked() };
233
0
            }
234
0
            TOKEN_NORMAL
235
0
        };
236
0
        let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237
0
238
0
        res.unparked_threads + res.requeued_threads
239
0
    }
240
241
    /// Blocks the current thread until this condition variable receives a
242
    /// notification.
243
    ///
244
    /// This function will atomically unlock the mutex specified (represented by
245
    /// `mutex_guard`) and block the current thread. This means that any calls
246
    /// to `notify_*()` which happen logically after the mutex is unlocked are
247
    /// candidates to wake this thread up. When this function call returns, the
248
    /// lock specified will have been re-acquired.
249
    ///
250
    /// # Panics
251
    ///
252
    /// This function will panic if another thread is waiting on the `Condvar`
253
    /// with a different `Mutex` object.
254
    #[inline]
255
0
    pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256
0
        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257
0
    }
Unexecuted instantiation: <parking_lot::condvar::Condvar>::wait::<()>
Unexecuted instantiation: <parking_lot::condvar::Condvar>::wait::<_>
258
259
    /// Waits on this condition variable for a notification, timing out after
260
    /// the specified time instant.
261
    ///
262
    /// The semantics of this function are equivalent to `wait()` except that
263
    /// the thread will be blocked roughly until `timeout` is reached. This
264
    /// method should not be used for precise timing due to anomalies such as
265
    /// preemption or platform differences that may not cause the maximum
266
    /// amount of time waited to be precisely `timeout`.
267
    ///
268
    /// Note that the best effort is made to ensure that the time waited is
269
    /// measured with a monotonic clock, and not affected by the changes made to
270
    /// the system time.
271
    ///
272
    /// The returned `WaitTimeoutResult` value indicates if the timeout is
273
    /// known to have elapsed.
274
    ///
275
    /// Like `wait`, the lock specified will be re-acquired when this function
276
    /// returns, regardless of whether the timeout elapsed or not.
277
    ///
278
    /// # Panics
279
    ///
280
    /// This function will panic if another thread is waiting on the `Condvar`
281
    /// with a different `Mutex` object.
282
    #[inline]
283
0
    pub fn wait_until<T: ?Sized>(
284
0
        &self,
285
0
        mutex_guard: &mut MutexGuard<'_, T>,
286
0
        timeout: Instant,
287
0
    ) -> WaitTimeoutResult {
288
0
        self.wait_until_internal(
289
0
            unsafe { MutexGuard::mutex(mutex_guard).raw() },
290
0
            Some(timeout),
291
0
        )
292
0
    }
293
294
    // This is a non-generic function to reduce the monomorphization cost of
295
    // using `wait_until`.
296
0
    fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297
0
        let result;
298
0
        let mut bad_mutex = false;
299
0
        let mut requeued = false;
300
0
        {
301
0
            let addr = self as *const _ as usize;
302
0
            let lock_addr = mutex as *const _ as *mut _;
303
0
            let validate = || {
304
0
                // Ensure we don't use two different mutexes with the same
305
0
                // Condvar at the same time. This is done while locked to
306
0
                // avoid races with notify_one
307
0
                let state = self.state.load(Ordering::Relaxed);
308
0
                if state.is_null() {
309
0
                    self.state.store(lock_addr, Ordering::Relaxed);
310
0
                } else if state != lock_addr {
311
0
                    bad_mutex = true;
312
0
                    return false;
313
0
                }
314
0
                true
315
0
            };
316
0
            let before_sleep = || {
317
0
                // Unlock the mutex before sleeping...
318
0
                unsafe { mutex.unlock() };
319
0
            };
320
0
            let timed_out = |k, was_last_thread| {
321
0
                // If we were requeued to a mutex, then we did not time out.
322
0
                // We'll just park ourselves on the mutex again when we try
323
0
                // to lock it later.
324
0
                requeued = k != addr;
325
0
326
0
                // If we were the last thread on the queue then we need to
327
0
                // clear our state. This is normally done by the
328
0
                // notify_{one,all} functions when not timing out.
329
0
                if !requeued && was_last_thread {
330
0
                    self.state.store(ptr::null_mut(), Ordering::Relaxed);
331
0
                }
332
0
            };
333
0
            result = unsafe {
334
0
                parking_lot_core::park(
335
0
                    addr,
336
0
                    validate,
337
0
                    before_sleep,
338
0
                    timed_out,
339
0
                    DEFAULT_PARK_TOKEN,
340
0
                    timeout,
341
0
                )
342
0
            };
343
0
        }
344
0
345
0
        // Panic if we tried to use multiple mutexes with a Condvar. Note
346
0
        // that at this point the MutexGuard is still locked. It will be
347
0
        // unlocked by the unwinding logic.
348
0
        if bad_mutex {
349
0
            panic!("attempted to use a condition variable with more than one mutex");
350
0
        }
351
0
352
0
        // ... and re-lock it once we are done sleeping
353
0
        if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354
0
            unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
355
0
        } else {
356
0
            mutex.lock();
357
0
        }
358
359
0
        WaitTimeoutResult(!(result.is_unparked() || requeued))
360
0
    }
361
362
    /// Waits on this condition variable for a notification, timing out after a
363
    /// specified duration.
364
    ///
365
    /// The semantics of this function are equivalent to `wait()` except that
366
    /// the thread will be blocked for roughly no longer than `timeout`. This
367
    /// method should not be used for precise timing due to anomalies such as
368
    /// preemption or platform differences that may not cause the maximum
369
    /// amount of time waited to be precisely `timeout`.
370
    ///
371
    /// Note that the best effort is made to ensure that the time waited is
372
    /// measured with a monotonic clock, and not affected by the changes made to
373
    /// the system time.
374
    ///
375
    /// The returned `WaitTimeoutResult` value indicates if the timeout is
376
    /// known to have elapsed.
377
    ///
378
    /// Like `wait`, the lock specified will be re-acquired when this function
379
    /// returns, regardless of whether the timeout elapsed or not.
380
    #[inline]
381
0
    pub fn wait_for<T: ?Sized>(
382
0
        &self,
383
0
        mutex_guard: &mut MutexGuard<'_, T>,
384
0
        timeout: Duration,
385
0
    ) -> WaitTimeoutResult {
386
0
        let deadline = util::to_deadline(timeout);
387
0
        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
388
0
    }
Unexecuted instantiation: <parking_lot::condvar::Condvar>::wait_for::<tokio::runtime::blocking::pool::Shared>
Unexecuted instantiation: <parking_lot::condvar::Condvar>::wait_for::<()>
Unexecuted instantiation: <parking_lot::condvar::Condvar>::wait_for::<_>
389
390
    #[inline]
391
0
    fn wait_while_until_internal<T, F>(
392
0
        &self,
393
0
        mutex_guard: &mut MutexGuard<'_, T>,
394
0
        mut condition: F,
395
0
        timeout: Option<Instant>,
396
0
    ) -> WaitTimeoutResult
397
0
    where
398
0
        T: ?Sized,
399
0
        F: FnMut(&mut T) -> bool,
400
0
    {
401
0
        let mut result = WaitTimeoutResult(false);
402
403
0
        while !result.timed_out() && condition(mutex_guard.deref_mut()) {
404
0
            result =
405
0
                self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
406
0
        }
407
408
0
        result
409
0
    }
410
    /// Blocks the current thread until this condition variable receives a
411
    /// notification. If the provided condition evaluates to `false`, then the
412
    /// thread is no longer blocked and the operation is completed. If the
413
    /// condition evaluates to `true`, then the thread is blocked again and
414
    /// waits for another notification before repeating this process.
415
    ///
416
    /// This function will atomically unlock the mutex specified (represented by
417
    /// `mutex_guard`) and block the current thread. This means that any calls
418
    /// to `notify_*()` which happen logically after the mutex is unlocked are
419
    /// candidates to wake this thread up. When this function call returns, the
420
    /// lock specified will have been re-acquired.
421
    ///
422
    /// # Panics
423
    ///
424
    /// This function will panic if another thread is waiting on the `Condvar`
425
    /// with a different `Mutex` object.
426
    #[inline]
427
0
    pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
428
0
    where
429
0
        T: ?Sized,
430
0
        F: FnMut(&mut T) -> bool,
431
0
    {
432
0
        self.wait_while_until_internal(mutex_guard, condition, None);
433
0
    }
434
435
    /// Waits on this condition variable for a notification, timing out after
436
    /// the specified time instant. If the provided condition evaluates to
437
    /// `false`, then the thread is no longer blocked and the operation is
438
    /// completed. If the condition evaluates to `true`, then the thread is
439
    /// blocked again and waits for another notification before repeating
440
    /// this process.
441
    ///
442
    /// The semantics of this function are equivalent to `wait()` except that
443
    /// the thread will be blocked roughly until `timeout` is reached. This
444
    /// method should not be used for precise timing due to anomalies such as
445
    /// preemption or platform differences that may not cause the maximum
446
    /// amount of time waited to be precisely `timeout`.
447
    ///
448
    /// Note that the best effort is made to ensure that the time waited is
449
    /// measured with a monotonic clock, and not affected by the changes made to
450
    /// the system time.
451
    ///
452
    /// The returned `WaitTimeoutResult` value indicates if the timeout is
453
    /// known to have elapsed.
454
    ///
455
    /// Like `wait`, the lock specified will be re-acquired when this function
456
    /// returns, regardless of whether the timeout elapsed or not.
457
    ///
458
    /// # Panics
459
    ///
460
    /// This function will panic if another thread is waiting on the `Condvar`
461
    /// with a different `Mutex` object.
462
    #[inline]
463
0
    pub fn wait_while_until<T, F>(
464
0
        &self,
465
0
        mutex_guard: &mut MutexGuard<'_, T>,
466
0
        condition: F,
467
0
        timeout: Instant,
468
0
    ) -> WaitTimeoutResult
469
0
    where
470
0
        T: ?Sized,
471
0
        F: FnMut(&mut T) -> bool,
472
0
    {
473
0
        self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
474
0
    }
475
476
    /// Waits on this condition variable for a notification, timing out after a
477
    /// specified duration. If the provided condition evaluates to `false`,
478
    /// then the thread is no longer blocked and the operation is completed.
479
    /// If the condition evaluates to `true`, then the thread is blocked again
480
    /// and waits for another notification before repeating this process.
481
    ///
482
    /// The semantics of this function are equivalent to `wait()` except that
483
    /// the thread will be blocked for roughly no longer than `timeout`. This
484
    /// method should not be used for precise timing due to anomalies such as
485
    /// preemption or platform differences that may not cause the maximum
486
    /// amount of time waited to be precisely `timeout`.
487
    ///
488
    /// Note that the best effort is made to ensure that the time waited is
489
    /// measured with a monotonic clock, and not affected by the changes made to
490
    /// the system time.
491
    ///
492
    /// The returned `WaitTimeoutResult` value indicates if the timeout is
493
    /// known to have elapsed.
494
    ///
495
    /// Like `wait`, the lock specified will be re-acquired when this function
496
    /// returns, regardless of whether the timeout elapsed or not.
497
    #[inline]
498
0
    pub fn wait_while_for<T: ?Sized, F>(
499
0
        &self,
500
0
        mutex_guard: &mut MutexGuard<'_, T>,
501
0
        condition: F,
502
0
        timeout: Duration,
503
0
    ) -> WaitTimeoutResult
504
0
    where
505
0
        F: FnMut(&mut T) -> bool,
506
0
    {
507
0
        let deadline = util::to_deadline(timeout);
508
0
        self.wait_while_until_internal(mutex_guard, condition, deadline)
509
0
    }
510
}
511
512
impl Default for Condvar {
513
    #[inline]
514
0
    fn default() -> Condvar {
515
0
        Condvar::new()
516
0
    }
517
}
518
519
impl fmt::Debug for Condvar {
520
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521
0
        f.pad("Condvar { .. }")
522
0
    }
523
}
524
525
#[cfg(test)]
526
mod tests {
527
    use crate::{Condvar, Mutex, MutexGuard};
528
    use std::sync::mpsc::channel;
529
    use std::sync::Arc;
530
    use std::thread;
531
    use std::thread::sleep;
532
    use std::thread::JoinHandle;
533
    use std::time::Duration;
534
    use std::time::Instant;
535
536
    #[test]
537
    fn smoke() {
538
        let c = Condvar::new();
539
        c.notify_one();
540
        c.notify_all();
541
    }
542
543
    #[test]
544
    fn notify_one() {
545
        let m = Arc::new(Mutex::new(()));
546
        let m2 = m.clone();
547
        let c = Arc::new(Condvar::new());
548
        let c2 = c.clone();
549
550
        let mut g = m.lock();
551
        let _t = thread::spawn(move || {
552
            let _g = m2.lock();
553
            c2.notify_one();
554
        });
555
        c.wait(&mut g);
556
    }
557
558
    #[test]
559
    fn notify_all() {
560
        const N: usize = 10;
561
562
        let data = Arc::new((Mutex::new(0), Condvar::new()));
563
        let (tx, rx) = channel();
564
        for _ in 0..N {
565
            let data = data.clone();
566
            let tx = tx.clone();
567
            thread::spawn(move || {
568
                let (lock, cond) = &*data;
569
                let mut cnt = lock.lock();
570
                *cnt += 1;
571
                if *cnt == N {
572
                    tx.send(()).unwrap();
573
                }
574
                while *cnt != 0 {
575
                    cond.wait(&mut cnt);
576
                }
577
                tx.send(()).unwrap();
578
            });
579
        }
580
        drop(tx);
581
582
        let (lock, cond) = &*data;
583
        rx.recv().unwrap();
584
        let mut cnt = lock.lock();
585
        *cnt = 0;
586
        cond.notify_all();
587
        drop(cnt);
588
589
        for _ in 0..N {
590
            rx.recv().unwrap();
591
        }
592
    }
593
594
    #[test]
595
    fn notify_one_return_true() {
596
        let m = Arc::new(Mutex::new(()));
597
        let m2 = m.clone();
598
        let c = Arc::new(Condvar::new());
599
        let c2 = c.clone();
600
601
        let mut g = m.lock();
602
        let _t = thread::spawn(move || {
603
            let _g = m2.lock();
604
            assert!(c2.notify_one());
605
        });
606
        c.wait(&mut g);
607
    }
608
609
    #[test]
610
    fn notify_one_return_false() {
611
        let m = Arc::new(Mutex::new(()));
612
        let c = Arc::new(Condvar::new());
613
614
        let _t = thread::spawn(move || {
615
            let _g = m.lock();
616
            assert!(!c.notify_one());
617
        });
618
    }
619
620
    #[test]
621
    fn notify_all_return() {
622
        const N: usize = 10;
623
624
        let data = Arc::new((Mutex::new(0), Condvar::new()));
625
        let (tx, rx) = channel();
626
        for _ in 0..N {
627
            let data = data.clone();
628
            let tx = tx.clone();
629
            thread::spawn(move || {
630
                let (lock, cond) = &*data;
631
                let mut cnt = lock.lock();
632
                *cnt += 1;
633
                if *cnt == N {
634
                    tx.send(()).unwrap();
635
                }
636
                while *cnt != 0 {
637
                    cond.wait(&mut cnt);
638
                }
639
                tx.send(()).unwrap();
640
            });
641
        }
642
        drop(tx);
643
644
        let (lock, cond) = &*data;
645
        rx.recv().unwrap();
646
        let mut cnt = lock.lock();
647
        *cnt = 0;
648
        assert_eq!(cond.notify_all(), N);
649
        drop(cnt);
650
651
        for _ in 0..N {
652
            rx.recv().unwrap();
653
        }
654
655
        assert_eq!(cond.notify_all(), 0);
656
    }
657
658
    #[test]
659
    fn wait_for() {
660
        let m = Arc::new(Mutex::new(()));
661
        let m2 = m.clone();
662
        let c = Arc::new(Condvar::new());
663
        let c2 = c.clone();
664
665
        let mut g = m.lock();
666
        let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
667
        assert!(no_timeout.timed_out());
668
669
        let _t = thread::spawn(move || {
670
            let _g = m2.lock();
671
            c2.notify_one();
672
        });
673
        let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
674
        assert!(!timeout_res.timed_out());
675
676
        drop(g);
677
    }
678
679
    #[test]
680
    fn wait_until() {
681
        let m = Arc::new(Mutex::new(()));
682
        let m2 = m.clone();
683
        let c = Arc::new(Condvar::new());
684
        let c2 = c.clone();
685
686
        let mut g = m.lock();
687
        let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
688
        assert!(no_timeout.timed_out());
689
        let _t = thread::spawn(move || {
690
            let _g = m2.lock();
691
            c2.notify_one();
692
        });
693
        let timeout_res = c.wait_until(
694
            &mut g,
695
            Instant::now() + Duration::from_millis(u32::max_value() as u64),
696
        );
697
        assert!(!timeout_res.timed_out());
698
        drop(g);
699
    }
700
701
    fn spawn_wait_while_notifier(
702
        mutex: Arc<Mutex<u32>>,
703
        cv: Arc<Condvar>,
704
        num_iters: u32,
705
        timeout: Option<Instant>,
706
    ) -> JoinHandle<()> {
707
        thread::spawn(move || {
708
            for epoch in 1..=num_iters {
709
                // spin to wait for main test thread to block
710
                // before notifying it to wake back up and check
711
                // its condition.
712
                let mut sleep_backoff = Duration::from_millis(1);
713
                let _mutex_guard = loop {
714
                    let mutex_guard = mutex.lock();
715
716
                    if let Some(timeout) = timeout {
717
                        if Instant::now() >= timeout {
718
                            return;
719
                        }
720
                    }
721
722
                    if *mutex_guard == epoch {
723
                        break mutex_guard;
724
                    }
725
726
                    drop(mutex_guard);
727
728
                    // give main test thread a good chance to
729
                    // acquire the lock before this thread does.
730
                    sleep(sleep_backoff);
731
                    sleep_backoff *= 2;
732
                };
733
734
                cv.notify_one();
735
            }
736
        })
737
    }
738
739
    #[test]
740
    fn wait_while_until_internal_does_not_wait_if_initially_false() {
741
        let mutex = Arc::new(Mutex::new(0));
742
        let cv = Arc::new(Condvar::new());
743
744
        let condition = |counter: &mut u32| {
745
            *counter += 1;
746
            false
747
        };
748
749
        let mut mutex_guard = mutex.lock();
750
        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
751
752
        assert!(!timeout_result.timed_out());
753
        assert!(*mutex_guard == 1);
754
    }
755
756
    #[test]
757
    fn wait_while_until_internal_times_out_before_false() {
758
        let mutex = Arc::new(Mutex::new(0));
759
        let cv = Arc::new(Condvar::new());
760
761
        let num_iters = 3;
762
        let condition = |counter: &mut u32| {
763
            *counter += 1;
764
            true
765
        };
766
767
        let mut mutex_guard = mutex.lock();
768
        let timeout = Some(Instant::now() + Duration::from_millis(500));
769
        let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
770
771
        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772
773
        assert!(timeout_result.timed_out());
774
        assert!(*mutex_guard == num_iters + 1);
775
776
        // prevent deadlock with notifier
777
        drop(mutex_guard);
778
        handle.join().unwrap();
779
    }
780
781
    #[test]
782
    fn wait_while_until_internal() {
783
        let mutex = Arc::new(Mutex::new(0));
784
        let cv = Arc::new(Condvar::new());
785
786
        let num_iters = 4;
787
788
        let condition = |counter: &mut u32| {
789
            *counter += 1;
790
            *counter <= num_iters
791
        };
792
793
        let mut mutex_guard = mutex.lock();
794
        let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795
796
        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
797
798
        assert!(!timeout_result.timed_out());
799
        assert!(*mutex_guard == num_iters + 1);
800
801
        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
802
        handle.join().unwrap();
803
804
        assert!(!timeout_result.timed_out());
805
        assert!(*mutex_guard == num_iters + 2);
806
    }
807
808
    #[test]
809
    #[should_panic]
810
    fn two_mutexes() {
811
        let m = Arc::new(Mutex::new(()));
812
        let m2 = m.clone();
813
        let m3 = Arc::new(Mutex::new(()));
814
        let c = Arc::new(Condvar::new());
815
        let c2 = c.clone();
816
817
        // Make sure we don't leave the child thread dangling
818
        struct PanicGuard<'a>(&'a Condvar);
819
        impl<'a> Drop for PanicGuard<'a> {
820
            fn drop(&mut self) {
821
                self.0.notify_one();
822
            }
823
        }
824
825
        let (tx, rx) = channel();
826
        let g = m.lock();
827
        let _t = thread::spawn(move || {
828
            let mut g = m2.lock();
829
            tx.send(()).unwrap();
830
            c2.wait(&mut g);
831
        });
832
        drop(g);
833
        rx.recv().unwrap();
834
        let _g = m.lock();
835
        let _guard = PanicGuard(&c);
836
        c.wait(&mut m3.lock());
837
    }
838
839
    #[test]
840
    fn two_mutexes_disjoint() {
841
        let m = Arc::new(Mutex::new(()));
842
        let m2 = m.clone();
843
        let m3 = Arc::new(Mutex::new(()));
844
        let c = Arc::new(Condvar::new());
845
        let c2 = c.clone();
846
847
        let mut g = m.lock();
848
        let _t = thread::spawn(move || {
849
            let _g = m2.lock();
850
            c2.notify_one();
851
        });
852
        c.wait(&mut g);
853
        drop(g);
854
855
        let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
856
    }
857
858
    #[test]
859
    fn test_debug_condvar() {
860
        let c = Condvar::new();
861
        assert_eq!(format!("{:?}", c), "Condvar { .. }");
862
    }
863
864
    #[test]
865
    fn test_condvar_requeue() {
866
        let m = Arc::new(Mutex::new(()));
867
        let m2 = m.clone();
868
        let c = Arc::new(Condvar::new());
869
        let c2 = c.clone();
870
        let t = thread::spawn(move || {
871
            let mut g = m2.lock();
872
            c2.wait(&mut g);
873
        });
874
875
        let mut g = m.lock();
876
        while !c.notify_one() {
877
            // Wait for the thread to get into wait()
878
            MutexGuard::bump(&mut g);
879
            // Yield, so the other thread gets a chance to do something.
880
            // (At least Miri needs this, because it doesn't preempt threads.)
881
            thread::yield_now();
882
        }
883
        // The thread should have been requeued to the mutex, which we wake up now.
884
        drop(g);
885
        t.join().unwrap();
886
    }
887
888
    #[test]
889
    fn test_issue_129() {
890
        let locks = Arc::new((Mutex::new(()), Condvar::new()));
891
892
        let (tx, rx) = channel();
893
        for _ in 0..4 {
894
            let locks = locks.clone();
895
            let tx = tx.clone();
896
            thread::spawn(move || {
897
                let mut guard = locks.0.lock();
898
                locks.1.wait(&mut guard);
899
                locks.1.wait_for(&mut guard, Duration::from_millis(1));
900
                locks.1.notify_one();
901
                tx.send(()).unwrap();
902
            });
903
        }
904
905
        thread::sleep(Duration::from_millis(100));
906
        locks.1.notify_one();
907
908
        for _ in 0..4 {
909
            assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
910
        }
911
    }
912
}
913
914
/// This module contains an integration test that is heavily inspired from WebKit's own integration
915
/// tests for it's own Condvar.
916
#[cfg(test)]
917
mod webkit_queue_test {
918
    use crate::{Condvar, Mutex, MutexGuard};
919
    use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
920
921
    #[derive(Clone, Copy)]
922
    enum Timeout {
923
        Bounded(Duration),
924
        Forever,
925
    }
926
927
    #[derive(Clone, Copy)]
928
    enum NotifyStyle {
929
        One,
930
        All,
931
    }
932
933
    struct Queue {
934
        items: VecDeque<usize>,
935
        should_continue: bool,
936
    }
937
938
    impl Queue {
939
        fn new() -> Self {
940
            Self {
941
                items: VecDeque::new(),
942
                should_continue: true,
943
            }
944
        }
945
    }
946
947
    fn wait<T: ?Sized>(
948
        condition: &Condvar,
949
        lock: &mut MutexGuard<'_, T>,
950
        predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
951
        timeout: &Timeout,
952
    ) {
953
        while !predicate(lock) {
954
            match timeout {
955
                Timeout::Forever => condition.wait(lock),
956
                Timeout::Bounded(bound) => {
957
                    condition.wait_for(lock, *bound);
958
                }
959
            }
960
        }
961
    }
962
963
    fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
964
        match style {
965
            NotifyStyle::One => {
966
                condition.notify_one();
967
            }
968
            NotifyStyle::All => {
969
                if should_notify {
970
                    condition.notify_all();
971
                }
972
            }
973
        }
974
    }
975
976
    fn run_queue_test(
977
        num_producers: usize,
978
        num_consumers: usize,
979
        max_queue_size: usize,
980
        messages_per_producer: usize,
981
        notify_style: NotifyStyle,
982
        timeout: Timeout,
983
        delay: Duration,
984
    ) {
985
        let input_queue = Arc::new(Mutex::new(Queue::new()));
986
        let empty_condition = Arc::new(Condvar::new());
987
        let full_condition = Arc::new(Condvar::new());
988
989
        let output_vec = Arc::new(Mutex::new(vec![]));
990
991
        let consumers = (0..num_consumers)
992
            .map(|_| {
993
                consumer_thread(
994
                    input_queue.clone(),
995
                    empty_condition.clone(),
996
                    full_condition.clone(),
997
                    timeout,
998
                    notify_style,
999
                    output_vec.clone(),
1000
                    max_queue_size,
1001
                )
1002
            })
1003
            .collect::<Vec<_>>();
1004
        let producers = (0..num_producers)
1005
            .map(|_| {
1006
                producer_thread(
1007
                    messages_per_producer,
1008
                    input_queue.clone(),
1009
                    empty_condition.clone(),
1010
                    full_condition.clone(),
1011
                    timeout,
1012
                    notify_style,
1013
                    max_queue_size,
1014
                )
1015
            })
1016
            .collect::<Vec<_>>();
1017
1018
        thread::sleep(delay);
1019
1020
        for producer in producers.into_iter() {
1021
            producer.join().expect("Producer thread panicked");
1022
        }
1023
1024
        {
1025
            let mut input_queue = input_queue.lock();
1026
            input_queue.should_continue = false;
1027
        }
1028
        empty_condition.notify_all();
1029
1030
        for consumer in consumers.into_iter() {
1031
            consumer.join().expect("Consumer thread panicked");
1032
        }
1033
1034
        let mut output_vec = output_vec.lock();
1035
        assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1036
        output_vec.sort();
1037
        for msg_idx in 0..messages_per_producer {
1038
            for producer_idx in 0..num_producers {
1039
                assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1040
            }
1041
        }
1042
    }
1043
1044
    fn consumer_thread(
1045
        input_queue: Arc<Mutex<Queue>>,
1046
        empty_condition: Arc<Condvar>,
1047
        full_condition: Arc<Condvar>,
1048
        timeout: Timeout,
1049
        notify_style: NotifyStyle,
1050
        output_queue: Arc<Mutex<Vec<usize>>>,
1051
        max_queue_size: usize,
1052
    ) -> thread::JoinHandle<()> {
1053
        thread::spawn(move || loop {
1054
            let (should_notify, result) = {
1055
                let mut queue = input_queue.lock();
1056
                wait(
1057
                    &empty_condition,
1058
                    &mut queue,
1059
                    |state| -> bool { !state.items.is_empty() || !state.should_continue },
1060
                    &timeout,
1061
                );
1062
                if queue.items.is_empty() && !queue.should_continue {
1063
                    return;
1064
                }
1065
                let should_notify = queue.items.len() == max_queue_size;
1066
                let result = queue.items.pop_front();
1067
                std::mem::drop(queue);
1068
                (should_notify, result)
1069
            };
1070
            notify(notify_style, &full_condition, should_notify);
1071
1072
            if let Some(result) = result {
1073
                output_queue.lock().push(result);
1074
            }
1075
        })
1076
    }
1077
1078
    fn producer_thread(
1079
        num_messages: usize,
1080
        queue: Arc<Mutex<Queue>>,
1081
        empty_condition: Arc<Condvar>,
1082
        full_condition: Arc<Condvar>,
1083
        timeout: Timeout,
1084
        notify_style: NotifyStyle,
1085
        max_queue_size: usize,
1086
    ) -> thread::JoinHandle<()> {
1087
        thread::spawn(move || {
1088
            for message in 0..num_messages {
1089
                let should_notify = {
1090
                    let mut queue = queue.lock();
1091
                    wait(
1092
                        &full_condition,
1093
                        &mut queue,
1094
                        |state| state.items.len() < max_queue_size,
1095
                        &timeout,
1096
                    );
1097
                    let should_notify = queue.items.is_empty();
1098
                    queue.items.push_back(message);
1099
                    std::mem::drop(queue);
1100
                    should_notify
1101
                };
1102
                notify(notify_style, &empty_condition, should_notify);
1103
            }
1104
        })
1105
    }
1106
1107
    macro_rules! run_queue_tests {
1108
        ( $( $name:ident(
1109
            num_producers: $num_producers:expr,
1110
            num_consumers: $num_consumers:expr,
1111
            max_queue_size: $max_queue_size:expr,
1112
            messages_per_producer: $messages_per_producer:expr,
1113
            notification_style: $notification_style:expr,
1114
            timeout: $timeout:expr,
1115
            delay_seconds: $delay_seconds:expr);
1116
        )* ) => {
1117
            $(#[test]
1118
            fn $name() {
1119
                let delay = Duration::from_secs($delay_seconds);
1120
                run_queue_test(
1121
                    $num_producers,
1122
                    $num_consumers,
1123
                    $max_queue_size,
1124
                    $messages_per_producer,
1125
                    $notification_style,
1126
                    $timeout,
1127
                    delay,
1128
                    );
1129
            })*
1130
        };
1131
    }
1132
1133
    run_queue_tests! {
1134
        sanity_check_queue(
1135
            num_producers: 1,
1136
            num_consumers: 1,
1137
            max_queue_size: 1,
1138
            messages_per_producer: 100_000,
1139
            notification_style: NotifyStyle::All,
1140
            timeout: Timeout::Bounded(Duration::from_secs(1)),
1141
            delay_seconds: 0
1142
        );
1143
        sanity_check_queue_timeout(
1144
            num_producers: 1,
1145
            num_consumers: 1,
1146
            max_queue_size: 1,
1147
            messages_per_producer: 100_000,
1148
            notification_style: NotifyStyle::All,
1149
            timeout: Timeout::Forever,
1150
            delay_seconds: 0
1151
        );
1152
        new_test_without_timeout_5(
1153
            num_producers: 1,
1154
            num_consumers: 5,
1155
            max_queue_size: 1,
1156
            messages_per_producer: 100_000,
1157
            notification_style: NotifyStyle::All,
1158
            timeout: Timeout::Forever,
1159
            delay_seconds: 0
1160
        );
1161
        one_producer_one_consumer_one_slot(
1162
            num_producers: 1,
1163
            num_consumers: 1,
1164
            max_queue_size: 1,
1165
            messages_per_producer: 100_000,
1166
            notification_style: NotifyStyle::All,
1167
            timeout: Timeout::Forever,
1168
            delay_seconds: 0
1169
        );
1170
        one_producer_one_consumer_one_slot_timeout(
1171
            num_producers: 1,
1172
            num_consumers: 1,
1173
            max_queue_size: 1,
1174
            messages_per_producer: 100_000,
1175
            notification_style: NotifyStyle::All,
1176
            timeout: Timeout::Forever,
1177
            delay_seconds: 1
1178
        );
1179
        one_producer_one_consumer_hundred_slots(
1180
            num_producers: 1,
1181
            num_consumers: 1,
1182
            max_queue_size: 100,
1183
            messages_per_producer: 1_000_000,
1184
            notification_style: NotifyStyle::All,
1185
            timeout: Timeout::Forever,
1186
            delay_seconds: 0
1187
        );
1188
        ten_producers_one_consumer_one_slot(
1189
            num_producers: 10,
1190
            num_consumers: 1,
1191
            max_queue_size: 1,
1192
            messages_per_producer: 10000,
1193
            notification_style: NotifyStyle::All,
1194
            timeout: Timeout::Forever,
1195
            delay_seconds: 0
1196
        );
1197
        ten_producers_one_consumer_hundred_slots_notify_all(
1198
            num_producers: 10,
1199
            num_consumers: 1,
1200
            max_queue_size: 100,
1201
            messages_per_producer: 10000,
1202
            notification_style: NotifyStyle::All,
1203
            timeout: Timeout::Forever,
1204
            delay_seconds: 0
1205
        );
1206
        ten_producers_one_consumer_hundred_slots_notify_one(
1207
            num_producers: 10,
1208
            num_consumers: 1,
1209
            max_queue_size: 100,
1210
            messages_per_producer: 10000,
1211
            notification_style: NotifyStyle::One,
1212
            timeout: Timeout::Forever,
1213
            delay_seconds: 0
1214
        );
1215
        one_producer_ten_consumers_one_slot(
1216
            num_producers: 1,
1217
            num_consumers: 10,
1218
            max_queue_size: 1,
1219
            messages_per_producer: 10000,
1220
            notification_style: NotifyStyle::All,
1221
            timeout: Timeout::Forever,
1222
            delay_seconds: 0
1223
        );
1224
        one_producer_ten_consumers_hundred_slots_notify_all(
1225
            num_producers: 1,
1226
            num_consumers: 10,
1227
            max_queue_size: 100,
1228
            messages_per_producer: 100_000,
1229
            notification_style: NotifyStyle::All,
1230
            timeout: Timeout::Forever,
1231
            delay_seconds: 0
1232
        );
1233
        one_producer_ten_consumers_hundred_slots_notify_one(
1234
            num_producers: 1,
1235
            num_consumers: 10,
1236
            max_queue_size: 100,
1237
            messages_per_producer: 100_000,
1238
            notification_style: NotifyStyle::One,
1239
            timeout: Timeout::Forever,
1240
            delay_seconds: 0
1241
        );
1242
        ten_producers_ten_consumers_one_slot(
1243
            num_producers: 10,
1244
            num_consumers: 10,
1245
            max_queue_size: 1,
1246
            messages_per_producer: 50000,
1247
            notification_style: NotifyStyle::All,
1248
            timeout: Timeout::Forever,
1249
            delay_seconds: 0
1250
        );
1251
        ten_producers_ten_consumers_hundred_slots_notify_all(
1252
            num_producers: 10,
1253
            num_consumers: 10,
1254
            max_queue_size: 100,
1255
            messages_per_producer: 50000,
1256
            notification_style: NotifyStyle::All,
1257
            timeout: Timeout::Forever,
1258
            delay_seconds: 0
1259
        );
1260
        ten_producers_ten_consumers_hundred_slots_notify_one(
1261
            num_producers: 10,
1262
            num_consumers: 10,
1263
            max_queue_size: 100,
1264
            messages_per_producer: 50000,
1265
            notification_style: NotifyStyle::One,
1266
            timeout: Timeout::Forever,
1267
            delay_seconds: 0
1268
        );
1269
    }
1270
}