Coverage Report

Created: 2026-06-30 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/crosvm/cros_async/src/sync/mu.rs
Line
Count
Source
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::cell::UnsafeCell;
6
use std::hint;
7
use std::mem;
8
use std::ops::Deref;
9
use std::ops::DerefMut;
10
use std::sync::atomic::AtomicUsize;
11
use std::sync::atomic::Ordering;
12
use std::sync::Arc;
13
use std::thread::yield_now;
14
15
use super::super::sync::waiter::Kind as WaiterKind;
16
use super::super::sync::waiter::Waiter;
17
use super::super::sync::waiter::WaiterAdapter;
18
use super::super::sync::waiter::WaiterList;
19
use super::super::sync::waiter::WaitingFor;
20
21
// Set when the rwlock is exclusively locked.
22
const LOCKED: usize = 1 << 0;
23
// Set when there are one or more threads waiting to acquire the lock.
24
const HAS_WAITERS: usize = 1 << 1;
25
// Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
26
// the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
27
// thread has been removed from the wait queue but has not gotten CPU time yet.
28
const DESIGNATED_WAKER: usize = 1 << 2;
29
// Used to provide exclusive access to the `waiters` field in `RwLock`. Should only be held while
30
// modifying the waiter list.
31
const SPINLOCK: usize = 1 << 3;
32
// Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
33
// attempting to acquire a shared lock will be preventing from getting it when this bit is set.
34
// However, this bit is ignored once a thread has gone through the wait queue at least once.
35
const WRITER_WAITING: usize = 1 << 4;
36
// Set when a thread has gone through the wait queue many times but has failed to acquire the lock
37
// every time it is woken up. When this bit is set, all other threads are prevented from acquiring
38
// the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
39
const LONG_WAIT: usize = 1 << 5;
40
// The bit that is added to the rwlock state in order to acquire a shared lock. Since more than one
41
// thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
42
// bits in the state to track the number of threads that have acquired a shared lock.
43
const READ_LOCK: usize = 1 << 8;
44
// Mask used for checking if any threads currently hold a shared lock.
45
const READ_MASK: usize = !0xff;
46
47
// The number of times the thread should just spin and attempt to re-acquire the lock.
48
const SPIN_THRESHOLD: usize = 7;
49
50
// The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
51
// bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
52
// high so that we don't lose the benefit of having running threads unless it is absolutely
53
// necessary.
54
const LONG_WAIT_THRESHOLD: usize = 19;
55
56
// Common methods between shared and exclusive locks.
57
trait Kind {
58
    // The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
59
    // are not zero then the thread will first spin and retry a few times before adding itself to
60
    // the wait queue.
61
    fn zero_to_acquire() -> usize;
62
63
    // The bit that must be added in order to acquire this kind of lock. This should either be
64
    // `LOCKED` or `READ_LOCK`.
65
    fn add_to_acquire() -> usize;
66
67
    // The bits that should be set when a thread adds itself to the wait queue while waiting to
68
    // acquire this kind of lock.
69
    fn set_when_waiting() -> usize;
70
71
    // The bits that should be cleared when a thread acquires this kind of lock.
72
    fn clear_on_acquire() -> usize;
73
74
    // The waiter that a thread should use when waiting to acquire this kind of lock.
75
    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76
}
77
78
// A lock type for shared read-only access to the data. More than one thread may hold this kind of
79
// lock simultaneously.
80
struct Shared;
81
82
impl Kind for Shared {
83
0
    fn zero_to_acquire() -> usize {
84
0
        LOCKED | WRITER_WAITING | LONG_WAIT
85
0
    }
86
87
0
    fn add_to_acquire() -> usize {
88
0
        READ_LOCK
89
0
    }
90
91
0
    fn set_when_waiting() -> usize {
92
0
        0
93
0
    }
94
95
0
    fn clear_on_acquire() -> usize {
96
0
        0
97
0
    }
98
99
0
    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100
0
        Arc::new(Waiter::new(
101
0
            WaiterKind::Shared,
102
0
            cancel_waiter,
103
0
            raw as *const RawRwLock as usize,
104
0
            WaitingFor::Mutex,
105
        ))
106
0
    }
107
}
108
109
// A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
110
// kind of lock at a time.
111
struct Exclusive;
112
113
impl Kind for Exclusive {
114
0
    fn zero_to_acquire() -> usize {
115
0
        LOCKED | READ_MASK | LONG_WAIT
116
0
    }
117
118
0
    fn add_to_acquire() -> usize {
119
0
        LOCKED
120
0
    }
121
122
0
    fn set_when_waiting() -> usize {
123
0
        WRITER_WAITING
124
0
    }
125
126
0
    fn clear_on_acquire() -> usize {
127
0
        WRITER_WAITING
128
0
    }
129
130
0
    fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131
0
        Arc::new(Waiter::new(
132
0
            WaiterKind::Exclusive,
133
0
            cancel_waiter,
134
0
            raw as *const RawRwLock as usize,
135
0
            WaitingFor::Mutex,
136
        ))
137
0
    }
138
}
139
140
// Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
141
// set in the rwlock state when the current thread releases the spin lock protecting the waiter
142
// list.
143
//
144
// If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
145
// waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
146
// found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
147
// bit, which should be set when the thread releases the spin lock.
148
//
149
// If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
150
// no bits are set in the returned `usize`.
151
0
fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152
0
    let mut to_wake = WaiterList::new(WaiterAdapter::new());
153
0
    let mut set_on_release = 0;
154
0
    let mut cursor = waiters.front_mut();
155
156
0
    let mut waking_readers = false;
157
0
    while let Some(w) = cursor.get() {
158
0
        match w.kind() {
159
0
            WaiterKind::Exclusive if !waking_readers => {
160
                // This is the first waiter and it's a writer. No need to check the other waiters.
161
0
                let waiter = cursor.remove().unwrap();
162
0
                waiter.set_waiting_for(WaitingFor::None);
163
0
                to_wake.push_back(waiter);
164
0
                break;
165
            }
166
167
0
            WaiterKind::Shared => {
168
0
                // This is a reader and the first waiter in the list was not a writer so wake up all
169
0
                // the readers in the wait list.
170
0
                let waiter = cursor.remove().unwrap();
171
0
                waiter.set_waiting_for(WaitingFor::None);
172
0
                to_wake.push_back(waiter);
173
0
                waking_readers = true;
174
0
            }
175
176
0
            WaiterKind::Exclusive => {
177
0
                // We found a writer while looking for more readers to wake up. Set the
178
0
                // `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
179
0
                // readers currently in the wait list will ignore this bit since they already waited
180
0
                // once.
181
0
                set_on_release |= WRITER_WAITING;
182
0
                cursor.move_next();
183
0
            }
184
        }
185
    }
186
187
0
    (to_wake, set_on_release)
188
0
}
189
190
#[inline]
191
0
fn cpu_relax(iterations: usize) {
192
0
    for _ in 0..iterations {
193
0
        hint::spin_loop();
194
0
    }
195
0
}
Unexecuted instantiation: cros_async::sync::mu::cpu_relax
Unexecuted instantiation: cros_async::sync::mu::cpu_relax
196
197
pub(crate) struct RawRwLock {
198
    state: AtomicUsize,
199
    waiters: UnsafeCell<WaiterList>,
200
}
201
202
impl RawRwLock {
203
2.02k
    pub fn new() -> RawRwLock {
204
2.02k
        RawRwLock {
205
2.02k
            state: AtomicUsize::new(0),
206
2.02k
            waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207
2.02k
        }
208
2.02k
    }
209
210
    #[inline]
211
1.01k
    pub async fn lock(&self) {
212
1.01k
        match self
213
1.01k
            .state
214
1.01k
            .compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215
        {
216
1.01k
            Ok(_) => {}
217
0
            Err(oldstate) => {
218
                // If any bits that should be zero are not zero or if we fail to acquire the lock
219
                // with a single compare_exchange then go through the slow path.
220
0
                if (oldstate & Exclusive::zero_to_acquire()) != 0
221
0
                    || self
222
0
                        .state
223
0
                        .compare_exchange_weak(
224
0
                            oldstate,
225
0
                            (oldstate + Exclusive::add_to_acquire())
226
0
                                & !Exclusive::clear_on_acquire(),
227
0
                            Ordering::Acquire,
228
0
                            Ordering::Relaxed,
229
0
                        )
230
0
                        .is_err()
231
                {
232
0
                    self.lock_slow::<Exclusive>(0, 0).await;
233
0
                }
234
            }
235
        }
236
1.01k
    }
237
238
    #[inline]
239
694k
    pub async fn read_lock(&self) {
240
694k
        match self
241
694k
            .state
242
694k
            .compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243
        {
244
694k
            Ok(_) => {}
245
0
            Err(oldstate) => {
246
0
                if (oldstate & Shared::zero_to_acquire()) != 0
247
0
                    || self
248
0
                        .state
249
0
                        .compare_exchange_weak(
250
0
                            oldstate,
251
0
                            (oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252
0
                            Ordering::Acquire,
253
0
                            Ordering::Relaxed,
254
0
                        )
255
0
                        .is_err()
256
                {
257
0
                    self.lock_slow::<Shared>(0, 0).await;
258
0
                }
259
            }
260
        }
261
694k
    }
262
263
    // Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
264
    // when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
265
    // `K::zero_to_acquire()`.
266
    #[cold]
267
0
    async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Shared>
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Exclusive>
268
0
        let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269
270
0
        let mut spin_count = 0;
271
0
        let mut wait_count = 0;
272
0
        let mut waiter = None;
273
        loop {
274
0
            let oldstate = self.state.load(Ordering::Relaxed);
275
            //  If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
276
            //  directly.
277
0
            if (oldstate & zero_to_acquire) == 0 {
278
0
                if self
279
0
                    .state
280
0
                    .compare_exchange_weak(
281
0
                        oldstate,
282
0
                        (oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283
0
                        Ordering::Acquire,
284
0
                        Ordering::Relaxed,
285
0
                    )
286
0
                    .is_ok()
287
                {
288
0
                    return;
289
0
                }
290
0
            } else if (oldstate & SPINLOCK) == 0 {
291
                // The rwlock is locked and the spin lock is available.  Try to add this thread to
292
                // the waiter queue.
293
0
                let w = waiter.get_or_insert_with(|| K::new_waiter(self));
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Shared>::{closure#0}::{closure#0}
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Exclusive>::{closure#0}::{closure#0}
294
0
                w.reset(WaitingFor::Mutex);
295
296
0
                if self
297
0
                    .state
298
0
                    .compare_exchange_weak(
299
0
                        oldstate,
300
0
                        (oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301
0
                        Ordering::Acquire,
302
0
                        Ordering::Relaxed,
303
0
                    )
304
0
                    .is_ok()
305
                {
306
0
                    let mut set_on_release = 0;
307
308
0
                    if wait_count < LONG_WAIT_THRESHOLD {
309
0
                        // Add the waiter to the back of the queue.
310
0
                        // SAFETY:
311
0
                        // Safe because we have acquired the spin lock and it provides exclusive
312
0
                        // access to the waiter queue.
313
0
                        unsafe { (*self.waiters.get()).push_back(w.clone()) };
314
0
                    } else {
315
0
                        // This waiter has gone through the queue too many times. Put it in the
316
0
                        // front of the queue and block all other threads from acquiring the lock
317
0
                        // until this one has acquired it at least once.
318
0
                        // SAFETY:
319
0
                        // Safe because we have acquired the spin lock and it provides exclusive
320
0
                        // access to the waiter queue.
321
0
                        unsafe { (*self.waiters.get()).push_front(w.clone()) };
322
0
323
0
                        // Set the LONG_WAIT bit to prevent all other threads from acquiring the
324
0
                        // lock.
325
0
                        set_on_release |= LONG_WAIT;
326
0
327
0
                        // Make sure we clear the LONG_WAIT bit when we do finally get the lock.
328
0
                        clear |= LONG_WAIT;
329
0
330
0
                        // Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
331
0
                        // from acquiring the lock.
332
0
                        zero_to_acquire &= !LONG_WAIT;
333
0
                    }
334
335
                    // Release the spin lock.
336
0
                    let mut state = oldstate;
337
                    loop {
338
0
                        match self.state.compare_exchange_weak(
339
0
                            state,
340
0
                            (state | set_on_release) & !SPINLOCK,
341
0
                            Ordering::Release,
342
0
                            Ordering::Relaxed,
343
0
                        ) {
344
0
                            Ok(_) => break,
345
0
                            Err(w) => state = w,
346
                        }
347
                    }
348
349
                    // Now wait until we are woken.
350
0
                    w.wait().await;
351
352
                    // The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
353
                    // thread that originally held the lock. While this bit is set, no other waiters
354
                    // will be woken up so it's important to clear it the next time we try to
355
                    // acquire the main lock or the spin lock.
356
0
                    clear |= DESIGNATED_WAKER;
357
358
                    // Now that the thread has waited once, we no longer care if there is a writer
359
                    // waiting. Only the limits of mutual exclusion can prevent us from acquiring
360
                    // the lock.
361
0
                    zero_to_acquire &= !WRITER_WAITING;
362
363
                    // Reset the spin count since we just went through the wait queue.
364
0
                    spin_count = 0;
365
366
                    // Increment the wait count since we went through the wait queue.
367
0
                    wait_count += 1;
368
369
                    // Skip the `cpu_relax` below.
370
0
                    continue;
371
0
                }
372
0
            }
373
374
            // Both the lock and the spin lock are held by one or more other threads. First, we'll
375
            // spin a few times in case we can acquire the lock or the spin lock. If that fails then
376
            // we yield because we might be preventing the threads that do hold the 2 locks from
377
            // getting cpu time.
378
0
            if spin_count < SPIN_THRESHOLD {
379
0
                cpu_relax(1 << spin_count);
380
0
                spin_count += 1;
381
0
            } else {
382
0
                yield_now();
383
0
            }
384
        }
385
0
    }
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Shared>::{closure#0}
Unexecuted instantiation: <cros_async::sync::mu::RawRwLock>::lock_slow::<cros_async::sync::mu::Exclusive>::{closure#0}
386
387
    #[inline]
388
1.01k
    pub fn unlock(&self) {
389
        // Fast path, if possible. We can directly clear the locked bit since we have exclusive
390
        // access to the rwlock.
391
1.01k
        let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392
393
        // Panic if we just tried to unlock a rwlock that wasn't held by this thread. This shouldn't
394
        // really be possible since `unlock` is not a public method.
395
1.01k
        debug_assert_eq!(
396
0
            oldstate & READ_MASK,
397
            0,
398
0
            "`unlock` called on rwlock held in read-mode"
399
        );
400
1.01k
        debug_assert_ne!(
401
0
            oldstate & LOCKED,
402
            0,
403
0
            "`unlock` called on rwlock not held in write-mode"
404
        );
405
406
1.01k
        if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407
0
            // The oldstate has waiters but no designated waker has been chosen yet.
408
0
            self.unlock_slow();
409
1.01k
        }
410
1.01k
    }
411
412
    #[inline]
413
694k
    pub fn read_unlock(&self) {
414
        // Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
415
        // previously added it.
416
694k
        let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417
418
694k
        debug_assert_eq!(
419
0
            oldstate & LOCKED,
420
            0,
421
0
            "`read_unlock` called on rwlock held in write-mode"
422
        );
423
694k
        debug_assert_ne!(
424
0
            oldstate & READ_MASK,
425
            0,
426
0
            "`read_unlock` called on rwlock not held in read-mode"
427
        );
428
429
694k
        if (oldstate & HAS_WAITERS) != 0
430
0
            && (oldstate & DESIGNATED_WAKER) == 0
431
0
            && (oldstate & READ_MASK) == READ_LOCK
432
0
        {
433
0
            // There are waiters, no designated waker has been chosen yet, and the last reader is
434
0
            // unlocking so we have to take the slow path.
435
0
            self.unlock_slow();
436
694k
        }
437
694k
    }
438
439
    #[cold]
440
0
    fn unlock_slow(&self) {
441
0
        let mut spin_count = 0;
442
443
        loop {
444
0
            let oldstate = self.state.load(Ordering::Relaxed);
445
0
            if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446
                // No more waiters or a designated waker has been chosen. Nothing left for us to do.
447
0
                return;
448
0
            } else if (oldstate & SPINLOCK) == 0 {
449
                // The spin lock is not held by another thread. Try to acquire it. Also set the
450
                // `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
451
0
                if self
452
0
                    .state
453
0
                    .compare_exchange_weak(
454
0
                        oldstate,
455
0
                        oldstate | SPINLOCK | DESIGNATED_WAKER,
456
0
                        Ordering::Acquire,
457
0
                        Ordering::Relaxed,
458
0
                    )
459
0
                    .is_ok()
460
                {
461
                    // Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
462
                    // clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
463
                    // to be cleared.
464
0
                    let mut clear = SPINLOCK;
465
466
                    // SAFETY:
467
                    // Safe because the spinlock guarantees exclusive access to the waiter list and
468
                    // the reference does not escape this function.
469
0
                    let waiters = unsafe { &mut *self.waiters.get() };
470
0
                    let (wake_list, set_on_release) = get_wake_list(waiters);
471
472
                    // If the waiter list is now empty, clear the HAS_WAITERS bit.
473
0
                    if waiters.is_empty() {
474
0
                        clear |= HAS_WAITERS;
475
0
                    }
476
477
0
                    if wake_list.is_empty() {
478
0
                        // Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
479
0
                        // that we set when we acquired the spin lock.
480
0
                        clear |= DESIGNATED_WAKER;
481
0
                    }
482
483
                    // Release the spin lock and clear any other bits as necessary. Also, set any
484
                    // bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
485
                    // bit, which needs to be set when we are waking up a bunch of readers and there
486
                    // are still writers in the wait queue. This will prevent any readers that
487
                    // aren't in `wake_list` from acquiring the read lock.
488
0
                    let mut state = oldstate;
489
                    loop {
490
0
                        match self.state.compare_exchange_weak(
491
0
                            state,
492
0
                            (state | set_on_release) & !clear,
493
0
                            Ordering::Release,
494
0
                            Ordering::Relaxed,
495
0
                        ) {
496
0
                            Ok(_) => break,
497
0
                            Err(w) => state = w,
498
                        }
499
                    }
500
501
                    // Now wake the waiters, if any.
502
0
                    for w in wake_list {
503
0
                        w.wake();
504
0
                    }
505
506
                    // We're done.
507
0
                    return;
508
0
                }
509
0
            }
510
511
            // Spin and try again.  It's ok to block here as we have already released the lock.
512
0
            if spin_count < SPIN_THRESHOLD {
513
0
                cpu_relax(1 << spin_count);
514
0
                spin_count += 1;
515
0
            } else {
516
0
                yield_now();
517
0
            }
518
        }
519
0
    }
520
521
0
    fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522
0
        let mut oldstate = self.state.load(Ordering::Relaxed);
523
0
        while oldstate & SPINLOCK != 0
524
0
            || self
525
0
                .state
526
0
                .compare_exchange_weak(
527
0
                    oldstate,
528
0
                    oldstate | SPINLOCK,
529
0
                    Ordering::Acquire,
530
0
                    Ordering::Relaxed,
531
0
                )
532
0
                .is_err()
533
0
        {
534
0
            hint::spin_loop();
535
0
            oldstate = self.state.load(Ordering::Relaxed);
536
0
        }
537
538
        // SAFETY:
539
        // Safe because the spin lock provides exclusive access and the reference does not escape
540
        // this function.
541
0
        let waiters = unsafe { &mut *self.waiters.get() };
542
543
0
        let mut clear = SPINLOCK;
544
545
        // If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
546
        // bit. Also clear the bit if we are going to be waking some other waiters. In this case the
547
        // waiter that set the bit may have already been removed from the waiter list (and could be
548
        // the one that is currently being dropped). If it is still in the waiter list then clearing
549
        // this bit may starve it for one more iteration through the lock_slow() loop, whereas not
550
        // clearing this bit could cause a deadlock if the waiter that set it is the one that is
551
        // being dropped.
552
0
        if wake_next
553
0
            || waiters
554
0
                .front()
555
0
                .get()
556
0
                .map(|front| std::ptr::eq(front, waiter))
557
0
                .unwrap_or(false)
558
0
        {
559
0
            clear |= LONG_WAIT;
560
0
        }
561
562
0
        let waiting_for = waiter.is_waiting_for();
563
564
        // Don't drop the old waiter while holding the spin lock.
565
0
        let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566
            // SAFETY:
567
            // We know that the waiter is still linked and is waiting for the rwlock, which
568
            // guarantees that it is still linked into `self.waiters`.
569
0
            let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570
0
            cursor.remove()
571
        } else {
572
0
            None
573
        };
574
575
0
        let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576
            // Either the waiter was already woken or it's been removed from the rwlock's waiter
577
            // list and is going to be woken. Either way, we need to wake up another thread.
578
0
            get_wake_list(waiters)
579
        } else {
580
0
            (WaiterList::new(WaiterAdapter::new()), 0)
581
        };
582
583
0
        if waiters.is_empty() {
584
0
            clear |= HAS_WAITERS;
585
0
        }
586
587
0
        if wake_list.is_empty() {
588
0
            // We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
589
0
            // case this leads to an additional thread being woken up but we risk a deadlock if we
590
0
            // don't clear it.
591
0
            clear |= DESIGNATED_WAKER;
592
0
        }
593
594
0
        if let WaiterKind::Exclusive = waiter.kind() {
595
0
            // The waiter being dropped is a writer so clear the writer waiting bit for now. If we
596
0
            // found more writers in the list while fetching waiters to wake up then this bit will
597
0
            // be set again via `set_on_release`.
598
0
            clear |= WRITER_WAITING;
599
0
        }
600
601
0
        while self
602
0
            .state
603
0
            .compare_exchange_weak(
604
0
                oldstate,
605
0
                (oldstate & !clear) | set_on_release,
606
0
                Ordering::Release,
607
0
                Ordering::Relaxed,
608
0
            )
609
0
            .is_err()
610
0
        {
611
0
            hint::spin_loop();
612
0
            oldstate = self.state.load(Ordering::Relaxed);
613
0
        }
614
615
0
        for w in wake_list {
616
0
            w.wake();
617
0
        }
618
619
0
        mem::drop(old_waiter);
620
0
    }
621
}
622
623
// TODO(b/315998194): Add safety comment
624
#[allow(clippy::undocumented_unsafe_blocks)]
625
unsafe impl Send for RawRwLock {}
626
// TODO(b/315998194): Add safety comment
627
#[allow(clippy::undocumented_unsafe_blocks)]
628
unsafe impl Sync for RawRwLock {}
629
630
0
fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631
0
    let raw_rwlock = raw as *const RawRwLock;
632
633
    // SAFETY:
634
    // Safe because the thread that owns the waiter that is being canceled must also own a reference
635
    // to the rwlock, which ensures that this pointer is valid.
636
0
    unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637
0
}
638
639
/// A high-level primitive that provides safe, mutable access to a shared resource.
640
///
641
/// `RwLock` safely provides both shared, immutable access (via `read_lock()`) as well as exclusive,
642
/// mutable access (via `lock()`) to an underlying resource asynchronously while ensuring fairness
643
/// with no loss of performance. If you don't need `read_lock()` nor fairness, try upstream
644
/// `futures::lock::Mutex` instead.
645
///
646
/// # Poisoning
647
///
648
/// `RwLock` does not support lock poisoning so if a thread panics while holding the lock, the
649
/// poisoned data will be accessible by other threads in your program. If you need to guarantee that
650
/// other threads cannot access poisoned data then you may wish to wrap this `RwLock` inside another
651
/// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
652
/// example of this. Note `futures::lock::Mutex` does not support poisoning either.
653
///
654
///
655
/// # Fairness
656
///
657
/// This `RwLock` implementation does not guarantee that threads will acquire the lock in the same
658
/// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
659
/// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
660
/// threads will fail to acquire the lock until the starved thread has acquired it. Note, on the
661
/// other hand, `futures::lock::Mutex` does not guarantee fairness.
662
///
663
/// Similarly, this `RwLock` will attempt to balance reader and writer threads: once there is a
664
/// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
665
/// However, any reader threads that were already waiting will still be allowed to acquire it.
666
///
667
/// # Examples
668
///
669
/// ```edition2018
670
/// use std::sync::Arc;
671
/// use std::thread;
672
/// use std::sync::mpsc::channel;
673
///
674
/// use cros_async::{block_on, sync::RwLock};
675
///
676
/// const N: usize = 10;
677
///
678
/// // Spawn a few threads to increment a shared variable (non-atomically), and
679
/// // let the main thread know once all increments are done.
680
/// //
681
/// // Here we're using an Arc to share memory among threads, and the data inside
682
/// // the Arc is protected with a rwlock.
683
/// let data = Arc::new(RwLock::new(0));
684
///
685
/// let (tx, rx) = channel();
686
/// for _ in 0..N {
687
///     let (data, tx) = (Arc::clone(&data), tx.clone());
688
///     thread::spawn(move || {
689
///         // The shared state can only be accessed once the lock is held.
690
///         // Our non-atomic increment is safe because we're the only thread
691
///         // which can access the shared state when the lock is held.
692
///         let mut data = block_on(data.lock());
693
///         *data += 1;
694
///         if *data == N {
695
///             tx.send(()).unwrap();
696
///         }
697
///         // the lock is unlocked here when `data` goes out of scope.
698
///     });
699
/// }
700
///
701
/// rx.recv().unwrap();
702
/// ```
703
#[repr(align(128))]
704
pub struct RwLock<T: ?Sized> {
705
    raw: RawRwLock,
706
    value: UnsafeCell<T>,
707
}
708
709
impl<T> RwLock<T> {
710
    /// Create a new, unlocked `RwLock` ready for use.
711
2.02k
    pub fn new(v: T) -> RwLock<T> {
712
2.02k
        RwLock {
713
2.02k
            raw: RawRwLock::new(),
714
2.02k
            value: UnsafeCell::new(v),
715
2.02k
        }
716
2.02k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::new
Line
Count
Source
711
1.01k
    pub fn new(v: T) -> RwLock<T> {
712
1.01k
        RwLock {
713
1.01k
            raw: RawRwLock::new(),
714
1.01k
            value: UnsafeCell::new(v),
715
1.01k
        }
716
1.01k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::new
Line
Count
Source
711
1.01k
    pub fn new(v: T) -> RwLock<T> {
712
1.01k
        RwLock {
713
1.01k
            raw: RawRwLock::new(),
714
1.01k
            value: UnsafeCell::new(v),
715
1.01k
        }
716
1.01k
    }
717
718
    /// Consume the `RwLock` and return the contained value. This method does not perform any
719
    /// locking as the compiler will guarantee that there are no other references to `self` and the
720
    /// caller owns the `RwLock`.
721
    pub fn into_inner(self) -> T {
722
        // Don't need to acquire the lock because the compiler guarantees that there are
723
        // no references to `self`.
724
        self.value.into_inner()
725
    }
726
}
727
728
impl<T: ?Sized> RwLock<T> {
729
    /// Acquires exclusive, mutable access to the resource protected by the `RwLock`, blocking the
730
    /// current thread until it is able to do so. Upon returning, the current thread will be the
731
    /// only thread with access to the resource. The `RwLock` will be released when the returned
732
    /// `RwLockWriteGuard` is dropped.
733
    ///
734
    /// Calling `lock()` while holding a `RwLockWriteGuard` or a `RwLockReadGuard` will cause a
735
    /// deadlock.
736
    ///
737
    /// Callers that are not in an async context may wish to use the `block_on` method to block the
738
    /// thread until the `RwLock` is acquired.
739
    #[inline]
740
1.01k
    pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
Unexecuted instantiation: <cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::lock
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::lock
Line
Count
Source
740
1.01k
    pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741
1.01k
        self.raw.lock().await;
742
743
1.01k
        RwLockWriteGuard {
744
1.01k
            mu: self,
745
1.01k
            // SAFETY:
746
1.01k
            // Safe because we have exclusive access to `self.value`.
747
1.01k
            value: unsafe { &mut *self.value.get() },
748
1.01k
        }
749
1.01k
    }
Unexecuted instantiation: <cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::lock::{closure#0}
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::lock::{closure#0}
Line
Count
Source
740
1.01k
    pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741
1.01k
        self.raw.lock().await;
742
743
1.01k
        RwLockWriteGuard {
744
1.01k
            mu: self,
745
1.01k
            // SAFETY:
746
1.01k
            // Safe because we have exclusive access to `self.value`.
747
1.01k
            value: unsafe { &mut *self.value.get() },
748
1.01k
        }
749
1.01k
    }
750
751
    /// Acquires shared, immutable access to the resource protected by the `RwLock`, blocking the
752
    /// current thread until it is able to do so. Upon returning there may be other threads that
753
    /// also have immutable access to the resource but there will not be any threads that have
754
    /// mutable access to the resource. When the returned `RwLockReadGuard` is dropped the thread
755
    /// releases its access to the resource.
756
    ///
757
    /// Calling `read_lock()` while holding a `RwLockReadGuard` may deadlock. Calling `read_lock()`
758
    /// while holding a `RwLockWriteGuard` will deadlock.
759
    ///
760
    /// Callers that are not in an async context may wish to use the `block_on` method to block the
761
    /// thread until the `RwLock` is acquired.
762
    #[inline]
763
694k
    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::read_lock
Line
Count
Source
763
347k
    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::read_lock
Line
Count
Source
763
347k
    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764
694k
        self.raw.read_lock().await;
765
766
694k
        RwLockReadGuard {
767
694k
            mu: self,
768
694k
            // SAFETY:
769
694k
            // Safe because we have shared read-only access to `self.value`.
770
694k
            value: unsafe { &*self.value.get() },
771
694k
        }
772
694k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::read_lock::{closure#0}
Line
Count
Source
763
347k
    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764
347k
        self.raw.read_lock().await;
765
766
347k
        RwLockReadGuard {
767
347k
            mu: self,
768
347k
            // SAFETY:
769
347k
            // Safe because we have shared read-only access to `self.value`.
770
347k
            value: unsafe { &*self.value.get() },
771
347k
        }
772
347k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::read_lock::{closure#0}
Line
Count
Source
763
347k
    pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764
347k
        self.raw.read_lock().await;
765
766
347k
        RwLockReadGuard {
767
347k
            mu: self,
768
347k
            // SAFETY:
769
347k
            // Safe because we have shared read-only access to `self.value`.
770
347k
            value: unsafe { &*self.value.get() },
771
347k
        }
772
347k
    }
773
774
    // Called from `Condvar::wait` when the thread wants to reacquire the lock.
775
    #[inline]
776
    pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777
        self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778
779
        RwLockWriteGuard {
780
            mu: self,
781
            // SAFETY:
782
            // Safe because we have exclusive access to `self.value`.
783
            value: unsafe { &mut *self.value.get() },
784
        }
785
    }
786
787
    // Like `lock_from_cv` but for acquiring a shared lock.
788
    #[inline]
789
    pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790
        // Threads that have waited in the Condvar's waiter list don't have to care if there is a
791
        // writer waiting since they have already waited once.
792
        self.raw
793
            .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794
            .await;
795
796
        RwLockReadGuard {
797
            mu: self,
798
            // SAFETY:
799
            // Safe because we have exclusive access to `self.value`.
800
            value: unsafe { &*self.value.get() },
801
        }
802
    }
803
804
    #[inline]
805
1.01k
    fn unlock(&self) {
806
1.01k
        self.raw.unlock();
807
1.01k
    }
Unexecuted instantiation: <cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::unlock
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::unlock
Line
Count
Source
805
1.01k
    fn unlock(&self) {
806
1.01k
        self.raw.unlock();
807
1.01k
    }
808
809
    #[inline]
810
694k
    fn read_unlock(&self) {
811
694k
        self.raw.read_unlock();
812
694k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::WorkerSharedState>>::read_unlock
Line
Count
Source
810
347k
    fn read_unlock(&self) {
811
347k
        self.raw.read_unlock();
812
347k
    }
<cros_async::sync::mu::RwLock<devices::virtio::block::asynchronous::DiskState>>::read_unlock
Line
Count
Source
810
347k
    fn read_unlock(&self) {
811
347k
        self.raw.read_unlock();
812
347k
    }
813
814
    pub fn get_mut(&mut self) -> &mut T {
815
        // SAFETY:
816
        // Safe because the compiler statically guarantees that are no other references to `self`.
817
        // This is also why we don't need to acquire the lock first.
818
        unsafe { &mut *self.value.get() }
819
    }
820
}
821
822
// TODO(b/315998194): Add safety comment
823
#[allow(clippy::undocumented_unsafe_blocks)]
824
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825
// TODO(b/315998194): Add safety comment
826
#[allow(clippy::undocumented_unsafe_blocks)]
827
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828
829
impl<T: Default> Default for RwLock<T> {
830
    fn default() -> Self {
831
        Self::new(Default::default())
832
    }
833
}
834
835
impl<T> From<T> for RwLock<T> {
836
    fn from(source: T) -> Self {
837
        Self::new(source)
838
    }
839
}
840
841
/// An RAII implementation of a "scoped exclusive lock" for a `RwLock`. When this structure is
842
/// dropped, the lock will be released. The resource protected by the `RwLock` can be accessed via
843
/// the `Deref` and `DerefMut` implementations of this structure.
844
pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845
    mu: &'a RwLock<T>,
846
    value: &'a mut T,
847
}
848
849
impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
850
    pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851
        self.mu
852
    }
853
854
    pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855
        &self.mu.raw
856
    }
857
}
858
859
impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
860
    type Target = T;
861
862
1.01k
    fn deref(&self) -> &Self::Target {
863
1.01k
        self.value
864
1.01k
    }
Unexecuted instantiation: <cros_async::sync::mu::RwLockWriteGuard<devices::virtio::block::asynchronous::WorkerSharedState> as core::ops::deref::Deref>::deref
<cros_async::sync::mu::RwLockWriteGuard<devices::virtio::block::asynchronous::DiskState> as core::ops::deref::Deref>::deref
Line
Count
Source
862
1.01k
    fn deref(&self) -> &Self::Target {
863
1.01k
        self.value
864
1.01k
    }
865
}
866
867
impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
868
    fn deref_mut(&mut self) -> &mut Self::Target {
869
        self.value
870
    }
871
}
872
873
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
874
1.01k
    fn drop(&mut self) {
875
1.01k
        self.mu.unlock()
876
1.01k
    }
Unexecuted instantiation: <cros_async::sync::mu::RwLockWriteGuard<devices::virtio::block::asynchronous::WorkerSharedState> as core::ops::drop::Drop>::drop
<cros_async::sync::mu::RwLockWriteGuard<devices::virtio::block::asynchronous::DiskState> as core::ops::drop::Drop>::drop
Line
Count
Source
874
1.01k
    fn drop(&mut self) {
875
1.01k
        self.mu.unlock()
876
1.01k
    }
877
}
878
879
/// An RAII implementation of a "scoped shared lock" for a `RwLock`. When this structure is dropped,
880
/// the lock will be released. The resource protected by the `RwLock` can be accessed via the
881
/// `Deref` implementation of this structure.
882
pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883
    mu: &'a RwLock<T>,
884
    value: &'a T,
885
}
886
887
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
888
    pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889
        self.mu
890
    }
891
892
    pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893
        &self.mu.raw
894
    }
895
}
896
897
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
898
    type Target = T;
899
900
976k
    fn deref(&self) -> &Self::Target {
901
976k
        self.value
902
976k
    }
<cros_async::sync::mu::RwLockReadGuard<devices::virtio::block::asynchronous::WorkerSharedState> as core::ops::deref::Deref>::deref
Line
Count
Source
900
303k
    fn deref(&self) -> &Self::Target {
901
303k
        self.value
902
303k
    }
<cros_async::sync::mu::RwLockReadGuard<devices::virtio::block::asynchronous::DiskState> as core::ops::deref::Deref>::deref
Line
Count
Source
900
673k
    fn deref(&self) -> &Self::Target {
901
673k
        self.value
902
673k
    }
903
}
904
905
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
906
694k
    fn drop(&mut self) {
907
694k
        self.mu.read_unlock()
908
694k
    }
<cros_async::sync::mu::RwLockReadGuard<devices::virtio::block::asynchronous::WorkerSharedState> as core::ops::drop::Drop>::drop
Line
Count
Source
906
347k
    fn drop(&mut self) {
907
347k
        self.mu.read_unlock()
908
347k
    }
<cros_async::sync::mu::RwLockReadGuard<devices::virtio::block::asynchronous::DiskState> as core::ops::drop::Drop>::drop
Line
Count
Source
906
347k
    fn drop(&mut self) {
907
347k
        self.mu.read_unlock()
908
347k
    }
909
}
910
911
// TODO(b/194338842): Fix tests for windows
912
#[cfg(any(target_os = "android", target_os = "linux"))]
913
#[cfg(test)]
914
mod test {
915
    use std::future::Future;
916
    use std::mem;
917
    use std::pin::Pin;
918
    use std::rc::Rc;
919
    use std::sync::atomic::AtomicUsize;
920
    use std::sync::atomic::Ordering;
921
    use std::sync::mpsc::channel;
922
    use std::sync::mpsc::Sender;
923
    use std::sync::Arc;
924
    use std::task::Context;
925
    use std::task::Poll;
926
    use std::task::Waker;
927
    use std::thread;
928
    use std::time::Duration;
929
930
    use futures::channel::oneshot;
931
    use futures::pending;
932
    use futures::select;
933
    use futures::task::waker_ref;
934
    use futures::task::ArcWake;
935
    use futures::FutureExt;
936
    use futures_executor::LocalPool;
937
    use futures_executor::ThreadPool;
938
    use futures_util::task::LocalSpawnExt;
939
940
    use super::super::super::block_on;
941
    use super::super::super::sync::Condvar;
942
    use super::super::super::sync::SpinLock;
943
    use super::*;
944
945
    #[derive(Debug, Eq, PartialEq)]
946
    struct NonCopy(u32);
947
948
    // Dummy waker used when we want to manually drive futures.
949
    struct TestWaker;
950
    impl ArcWake for TestWaker {
951
        fn wake_by_ref(_arc_self: &Arc<Self>) {}
952
    }
953
954
    #[test]
955
    fn it_works() {
956
        let mu = RwLock::new(NonCopy(13));
957
958
        assert_eq!(*block_on(mu.lock()), NonCopy(13));
959
    }
960
961
    #[test]
962
    fn smoke() {
963
        let mu = RwLock::new(NonCopy(7));
964
965
        mem::drop(block_on(mu.lock()));
966
        mem::drop(block_on(mu.lock()));
967
    }
968
969
    #[test]
970
    fn rw_smoke() {
971
        let mu = RwLock::new(NonCopy(7));
972
973
        mem::drop(block_on(mu.lock()));
974
        mem::drop(block_on(mu.read_lock()));
975
        mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976
        mem::drop(block_on(mu.lock()));
977
    }
978
979
    #[test]
980
    fn async_smoke() {
981
        async fn lock(mu: Rc<RwLock<NonCopy>>) {
982
            mu.lock().await;
983
        }
984
985
        async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986
            mu.read_lock().await;
987
        }
988
989
        async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990
            let first = mu.read_lock().await;
991
            mu.read_lock().await;
992
993
            // Make sure first lives past the second read lock.
994
            first.as_raw_rwlock();
995
        }
996
997
        let mu = Rc::new(RwLock::new(NonCopy(7)));
998
999
        let mut ex = LocalPool::new();
1000
        let spawner = ex.spawner();
1001
1002
        spawner
1003
            .spawn_local(lock(Rc::clone(&mu)))
1004
            .expect("Failed to spawn future");
1005
        spawner
1006
            .spawn_local(read_lock(Rc::clone(&mu)))
1007
            .expect("Failed to spawn future");
1008
        spawner
1009
            .spawn_local(double_read_lock(Rc::clone(&mu)))
1010
            .expect("Failed to spawn future");
1011
        spawner
1012
            .spawn_local(lock(Rc::clone(&mu)))
1013
            .expect("Failed to spawn future");
1014
1015
        ex.run();
1016
    }
1017
1018
    #[test]
1019
    fn send() {
1020
        let mu = RwLock::new(NonCopy(19));
1021
1022
        thread::spawn(move || {
1023
            let value = block_on(mu.lock());
1024
            assert_eq!(*value, NonCopy(19));
1025
        })
1026
        .join()
1027
        .unwrap();
1028
    }
1029
1030
    #[test]
1031
    fn arc_nested() {
1032
        // Tests nested rwlocks and access to underlying data.
1033
        let mu = RwLock::new(1);
1034
        let arc = Arc::new(RwLock::new(mu));
1035
        thread::spawn(move || {
1036
            let nested = block_on(arc.lock());
1037
            let lock2 = block_on(nested.lock());
1038
            assert_eq!(*lock2, 1);
1039
        })
1040
        .join()
1041
        .unwrap();
1042
    }
1043
1044
    #[test]
1045
    fn arc_access_in_unwind() {
1046
        let arc = Arc::new(RwLock::new(1));
1047
        let arc2 = arc.clone();
1048
        thread::spawn(move || {
1049
            struct Unwinder {
1050
                i: Arc<RwLock<i32>>,
1051
            }
1052
            impl Drop for Unwinder {
1053
                fn drop(&mut self) {
1054
                    *block_on(self.i.lock()) += 1;
1055
                }
1056
            }
1057
            let _u = Unwinder { i: arc2 };
1058
            panic!();
1059
        })
1060
        .join()
1061
        .expect_err("thread did not panic");
1062
        let lock = block_on(arc.lock());
1063
        assert_eq!(*lock, 2);
1064
    }
1065
1066
    #[test]
1067
    fn unsized_value() {
1068
        let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069
        {
1070
            let b = &mut *block_on(rwlock.lock());
1071
            b[0] = 4;
1072
            b[2] = 5;
1073
        }
1074
        let expected: &[i32] = &[4, 2, 5];
1075
        assert_eq!(&*block_on(rwlock.lock()), expected);
1076
    }
1077
    #[test]
1078
    fn high_contention() {
1079
        const THREADS: usize = 17;
1080
        const ITERATIONS: usize = 103;
1081
1082
        let mut threads = Vec::with_capacity(THREADS);
1083
1084
        let mu = Arc::new(RwLock::new(0usize));
1085
        for _ in 0..THREADS {
1086
            let mu2 = mu.clone();
1087
            threads.push(thread::spawn(move || {
1088
                for _ in 0..ITERATIONS {
1089
                    *block_on(mu2.lock()) += 1;
1090
                }
1091
            }));
1092
        }
1093
1094
        for t in threads.into_iter() {
1095
            t.join().unwrap();
1096
        }
1097
1098
        assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100
    }
1101
1102
    #[test]
1103
    fn high_contention_with_cancel() {
1104
        const TASKS: usize = 17;
1105
        const ITERATIONS: usize = 103;
1106
1107
        async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108
            for _ in 0..ITERATIONS {
1109
                select! {
1110
                    mut count = mu.lock().fuse() => *count += 1,
1111
                    mut count = alt_mu.lock().fuse() => *count += 1,
1112
                }
1113
            }
1114
            tx.send(()).expect("Failed to send completion signal");
1115
        }
1116
1117
        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118
1119
        let mu = Arc::new(RwLock::new(0usize));
1120
        let alt_mu = Arc::new(RwLock::new(0usize));
1121
1122
        let (tx, rx) = channel();
1123
        for _ in 0..TASKS {
1124
            ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125
        }
1126
1127
        for _ in 0..TASKS {
1128
            if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129
                panic!("Error while waiting for threads to complete: {e}");
1130
            }
1131
        }
1132
1133
        assert_eq!(
1134
            *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135
            TASKS * ITERATIONS
1136
        );
1137
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138
        assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139
    }
1140
1141
    #[test]
1142
    fn single_thread_async() {
1143
        const TASKS: usize = 17;
1144
        const ITERATIONS: usize = 103;
1145
1146
        // Async closures are unstable.
1147
        async fn increment(mu: Rc<RwLock<usize>>) {
1148
            for _ in 0..ITERATIONS {
1149
                *mu.lock().await += 1;
1150
            }
1151
        }
1152
1153
        let mut ex = LocalPool::new();
1154
        let spawner = ex.spawner();
1155
1156
        let mu = Rc::new(RwLock::new(0usize));
1157
        for _ in 0..TASKS {
1158
            spawner
1159
                .spawn_local(increment(Rc::clone(&mu)))
1160
                .expect("Failed to spawn task");
1161
        }
1162
1163
        ex.run();
1164
1165
        assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167
    }
1168
1169
    #[test]
1170
    fn multi_thread_async() {
1171
        const TASKS: usize = 17;
1172
        const ITERATIONS: usize = 103;
1173
1174
        // Async closures are unstable.
1175
        async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176
            for _ in 0..ITERATIONS {
1177
                *mu.lock().await += 1;
1178
            }
1179
            tx.send(()).expect("Failed to send completion signal");
1180
        }
1181
1182
        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183
1184
        let mu = Arc::new(RwLock::new(0usize));
1185
        let (tx, rx) = channel();
1186
        for _ in 0..TASKS {
1187
            ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188
        }
1189
1190
        for _ in 0..TASKS {
1191
            rx.recv_timeout(Duration::from_secs(5))
1192
                .expect("Failed to receive completion signal");
1193
        }
1194
        assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196
    }
1197
1198
    #[test]
1199
    fn get_mut() {
1200
        let mut mu = RwLock::new(NonCopy(13));
1201
        *mu.get_mut() = NonCopy(17);
1202
1203
        assert_eq!(mu.into_inner(), NonCopy(17));
1204
    }
1205
1206
    #[test]
1207
    fn into_inner() {
1208
        let mu = RwLock::new(NonCopy(29));
1209
        assert_eq!(mu.into_inner(), NonCopy(29));
1210
    }
1211
1212
    #[test]
1213
    fn into_inner_drop() {
1214
        struct NeedsDrop(Arc<AtomicUsize>);
1215
        impl Drop for NeedsDrop {
1216
            fn drop(&mut self) {
1217
                self.0.fetch_add(1, Ordering::AcqRel);
1218
            }
1219
        }
1220
1221
        let value = Arc::new(AtomicUsize::new(0));
1222
        let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223
        assert_eq!(value.load(Ordering::Acquire), 0);
1224
1225
        {
1226
            let inner = needs_drop.into_inner();
1227
            assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228
        }
1229
1230
        assert_eq!(value.load(Ordering::Acquire), 1);
1231
    }
1232
1233
    #[test]
1234
    fn rw_arc() {
1235
        const THREADS: isize = 7;
1236
        const ITERATIONS: isize = 13;
1237
1238
        let mu = Arc::new(RwLock::new(0isize));
1239
        let mu2 = mu.clone();
1240
1241
        let (tx, rx) = channel();
1242
        thread::spawn(move || {
1243
            let mut guard = block_on(mu2.lock());
1244
            for _ in 0..ITERATIONS {
1245
                let tmp = *guard;
1246
                *guard = -1;
1247
                thread::yield_now();
1248
                *guard = tmp + 1;
1249
            }
1250
            tx.send(()).unwrap();
1251
        });
1252
1253
        let mut readers = Vec::with_capacity(10);
1254
        for _ in 0..THREADS {
1255
            let mu3 = mu.clone();
1256
            let handle = thread::spawn(move || {
1257
                let guard = block_on(mu3.read_lock());
1258
                assert!(*guard >= 0);
1259
            });
1260
1261
            readers.push(handle);
1262
        }
1263
1264
        // Wait for the readers to finish their checks.
1265
        for r in readers {
1266
            r.join().expect("One or more readers saw a negative value");
1267
        }
1268
1269
        // Wait for the writer to finish.
1270
        rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271
        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273
    }
1274
1275
    #[test]
1276
    fn rw_single_thread_async() {
1277
        // A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
1278
        // time after that.
1279
        struct TestFuture {
1280
            polled: bool,
1281
            waker: Arc<SpinLock<Option<Waker>>>,
1282
        }
1283
1284
        impl Future for TestFuture {
1285
            type Output = ();
1286
1287
            fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288
                if self.polled {
1289
                    Poll::Ready(())
1290
                } else {
1291
                    self.polled = true;
1292
                    *self.waker.lock() = Some(cx.waker().clone());
1293
                    Poll::Pending
1294
                }
1295
            }
1296
        }
1297
1298
        fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299
            loop {
1300
                if let Some(w) = waker.lock().take() {
1301
                    w.wake();
1302
                    return;
1303
                }
1304
1305
                // This sleep cannot be moved into an else branch because we would end up holding
1306
                // the lock while sleeping due to rust's drop ordering rules.
1307
                thread::sleep(Duration::from_millis(10));
1308
            }
1309
        }
1310
1311
        async fn writer(mu: Rc<RwLock<isize>>) {
1312
            let mut guard = mu.lock().await;
1313
            for _ in 0..ITERATIONS {
1314
                let tmp = *guard;
1315
                *guard = -1;
1316
                let waker = Arc::new(SpinLock::new(None));
1317
                let waker2 = Arc::clone(&waker);
1318
                thread::spawn(move || wake_future(waker2));
1319
                let fut = TestFuture {
1320
                    polled: false,
1321
                    waker,
1322
                };
1323
                fut.await;
1324
                *guard = tmp + 1;
1325
            }
1326
        }
1327
1328
        async fn reader(mu: Rc<RwLock<isize>>) {
1329
            let guard = mu.read_lock().await;
1330
            assert!(*guard >= 0);
1331
        }
1332
1333
        const TASKS: isize = 7;
1334
        const ITERATIONS: isize = 13;
1335
1336
        let mu = Rc::new(RwLock::new(0isize));
1337
        let mut ex = LocalPool::new();
1338
        let spawner = ex.spawner();
1339
1340
        spawner
1341
            .spawn_local(writer(Rc::clone(&mu)))
1342
            .expect("Failed to spawn writer");
1343
1344
        for _ in 0..TASKS {
1345
            spawner
1346
                .spawn_local(reader(Rc::clone(&mu)))
1347
                .expect("Failed to spawn reader");
1348
        }
1349
1350
        ex.run();
1351
1352
        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354
    }
1355
1356
    #[test]
1357
    fn rw_multi_thread_async() {
1358
        async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359
            let mut guard = mu.lock().await;
1360
            for _ in 0..ITERATIONS {
1361
                let tmp = *guard;
1362
                *guard = -1;
1363
                thread::yield_now();
1364
                *guard = tmp + 1;
1365
            }
1366
1367
            mem::drop(guard);
1368
            tx.send(()).unwrap();
1369
        }
1370
1371
        async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372
            let guard = mu.read_lock().await;
1373
            assert!(*guard >= 0);
1374
1375
            mem::drop(guard);
1376
            tx.send(()).expect("Failed to send completion message");
1377
        }
1378
1379
        const TASKS: isize = 7;
1380
        const ITERATIONS: isize = 13;
1381
1382
        let mu = Arc::new(RwLock::new(0isize));
1383
        let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384
1385
        let (txw, rxw) = channel();
1386
        ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387
1388
        let (txr, rxr) = channel();
1389
        for _ in 0..TASKS {
1390
            ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391
        }
1392
1393
        // Wait for the readers to finish their checks.
1394
        for _ in 0..TASKS {
1395
            rxr.recv_timeout(Duration::from_secs(5))
1396
                .expect("Failed to receive completion message from reader");
1397
        }
1398
1399
        // Wait for the writer to finish.
1400
        rxw.recv_timeout(Duration::from_secs(5))
1401
            .expect("Failed to receive completion message from writer");
1402
1403
        assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405
    }
1406
1407
    #[test]
1408
    fn wake_all_readers() {
1409
        async fn read(mu: Arc<RwLock<()>>) {
1410
            let g = mu.read_lock().await;
1411
            pending!();
1412
            mem::drop(g);
1413
        }
1414
1415
        async fn write(mu: Arc<RwLock<()>>) {
1416
            mu.lock().await;
1417
        }
1418
1419
        let mu = Arc::new(RwLock::new(()));
1420
        let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421
            Box::pin(read(mu.clone())),
1422
            Box::pin(read(mu.clone())),
1423
            Box::pin(read(mu.clone())),
1424
            Box::pin(write(mu.clone())),
1425
            Box::pin(read(mu.clone())),
1426
        ];
1427
        const NUM_READERS: usize = 4;
1428
1429
        let arc_waker = Arc::new(TestWaker);
1430
        let waker = waker_ref(&arc_waker);
1431
        let mut cx = Context::from_waker(&waker);
1432
1433
        // Acquire the lock so that the futures cannot get it.
1434
        let g = block_on(mu.lock());
1435
1436
        for r in &mut futures {
1437
            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438
                panic!("future unexpectedly ready");
1439
            }
1440
        }
1441
1442
        assert_eq!(
1443
            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444
            HAS_WAITERS
1445
        );
1446
1447
        assert_eq!(
1448
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449
            WRITER_WAITING
1450
        );
1451
1452
        // Drop the lock. This should allow all readers to make progress. Since they already waited
1453
        // once they should ignore the WRITER_WAITING bit that is currently set.
1454
        mem::drop(g);
1455
        for r in &mut futures {
1456
            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457
                panic!("future unexpectedly ready");
1458
            }
1459
        }
1460
1461
        // Check that all readers were able to acquire the lock.
1462
        assert_eq!(
1463
            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464
            READ_LOCK * NUM_READERS
1465
        );
1466
        assert_eq!(
1467
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468
            WRITER_WAITING
1469
        );
1470
1471
        let mut needs_poll = None;
1472
1473
        // All the readers can now finish but the writer needs to be polled again.
1474
        for (i, r) in futures.iter_mut().enumerate() {
1475
            match r.as_mut().poll(&mut cx) {
1476
                Poll::Ready(()) => {}
1477
                Poll::Pending => {
1478
                    if needs_poll.is_some() {
1479
                        panic!("More than one future unable to complete");
1480
                    }
1481
                    needs_poll = Some(i);
1482
                }
1483
            }
1484
        }
1485
1486
        if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487
            .as_mut()
1488
            .poll(&mut cx)
1489
            .is_pending()
1490
        {
1491
            panic!("Writer unable to complete");
1492
        }
1493
1494
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495
    }
1496
1497
    #[test]
1498
    fn long_wait() {
1499
        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500
            loop {
1501
                let ready = mu.lock().await;
1502
                if *ready {
1503
                    break;
1504
                }
1505
                pending!();
1506
            }
1507
        }
1508
1509
        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510
            *mu.lock().await = true;
1511
        }
1512
1513
        let mu = Arc::new(RwLock::new(false));
1514
        let mut tl = Box::pin(tight_loop(mu.clone()));
1515
        let mut mark = Box::pin(mark_ready(mu.clone()));
1516
1517
        let arc_waker = Arc::new(TestWaker);
1518
        let waker = waker_ref(&arc_waker);
1519
        let mut cx = Context::from_waker(&waker);
1520
1521
        for _ in 0..=LONG_WAIT_THRESHOLD {
1522
            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523
                panic!("tight_loop unexpectedly ready");
1524
            }
1525
1526
            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527
                panic!("mark_ready unexpectedly ready");
1528
            }
1529
        }
1530
1531
        assert_eq!(
1532
            mu.raw.state.load(Ordering::Relaxed),
1533
            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534
        );
1535
1536
        // This time the tight loop will fail to acquire the lock.
1537
        if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538
            panic!("tight_loop unexpectedly ready");
1539
        }
1540
1541
        // Which will finally allow the mark_ready function to make progress.
1542
        if mark.as_mut().poll(&mut cx).is_pending() {
1543
            panic!("mark_ready not able to make progress");
1544
        }
1545
1546
        // Now the tight loop will finish.
1547
        if tl.as_mut().poll(&mut cx).is_pending() {
1548
            panic!("tight_loop not able to finish");
1549
        }
1550
1551
        assert!(*block_on(mu.lock()));
1552
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553
    }
1554
1555
    #[test]
1556
    fn cancel_long_wait_before_wake() {
1557
        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558
            loop {
1559
                let ready = mu.lock().await;
1560
                if *ready {
1561
                    break;
1562
                }
1563
                pending!();
1564
            }
1565
        }
1566
1567
        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568
            *mu.lock().await = true;
1569
        }
1570
1571
        let mu = Arc::new(RwLock::new(false));
1572
        let mut tl = Box::pin(tight_loop(mu.clone()));
1573
        let mut mark = Box::pin(mark_ready(mu.clone()));
1574
1575
        let arc_waker = Arc::new(TestWaker);
1576
        let waker = waker_ref(&arc_waker);
1577
        let mut cx = Context::from_waker(&waker);
1578
1579
        for _ in 0..=LONG_WAIT_THRESHOLD {
1580
            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581
                panic!("tight_loop unexpectedly ready");
1582
            }
1583
1584
            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585
                panic!("mark_ready unexpectedly ready");
1586
            }
1587
        }
1588
1589
        assert_eq!(
1590
            mu.raw.state.load(Ordering::Relaxed),
1591
            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592
        );
1593
1594
        // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1595
        mem::drop(mark);
1596
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597
1598
        mem::drop(tl);
1599
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600
    }
1601
1602
    #[test]
1603
    fn cancel_long_wait_after_wake() {
1604
        async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605
            loop {
1606
                let ready = mu.lock().await;
1607
                if *ready {
1608
                    break;
1609
                }
1610
                pending!();
1611
            }
1612
        }
1613
1614
        async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615
            *mu.lock().await = true;
1616
        }
1617
1618
        let mu = Arc::new(RwLock::new(false));
1619
        let mut tl = Box::pin(tight_loop(mu.clone()));
1620
        let mut mark = Box::pin(mark_ready(mu.clone()));
1621
1622
        let arc_waker = Arc::new(TestWaker);
1623
        let waker = waker_ref(&arc_waker);
1624
        let mut cx = Context::from_waker(&waker);
1625
1626
        for _ in 0..=LONG_WAIT_THRESHOLD {
1627
            if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628
                panic!("tight_loop unexpectedly ready");
1629
            }
1630
1631
            if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632
                panic!("mark_ready unexpectedly ready");
1633
            }
1634
        }
1635
1636
        assert_eq!(
1637
            mu.raw.state.load(Ordering::Relaxed),
1638
            LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639
        );
1640
1641
        // This time the tight loop will fail to acquire the lock.
1642
        if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643
            panic!("tight_loop unexpectedly ready");
1644
        }
1645
1646
        // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1647
        mem::drop(mark);
1648
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649
1650
        // Since the lock is not held, we should be able to spawn a future to set the ready flag.
1651
        block_on(mark_ready(mu.clone()));
1652
1653
        // Now the tight loop will finish.
1654
        if tl.as_mut().poll(&mut cx).is_pending() {
1655
            panic!("tight_loop not able to finish");
1656
        }
1657
1658
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659
    }
1660
1661
    #[test]
1662
    fn designated_waker() {
1663
        async fn inc(mu: Arc<RwLock<usize>>) {
1664
            *mu.lock().await += 1;
1665
        }
1666
1667
        let mu = Arc::new(RwLock::new(0));
1668
1669
        let mut futures = [
1670
            Box::pin(inc(mu.clone())),
1671
            Box::pin(inc(mu.clone())),
1672
            Box::pin(inc(mu.clone())),
1673
        ];
1674
1675
        let arc_waker = Arc::new(TestWaker);
1676
        let waker = waker_ref(&arc_waker);
1677
        let mut cx = Context::from_waker(&waker);
1678
1679
        let count = block_on(mu.lock());
1680
1681
        // Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
1682
        // the waiter list.
1683
        if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684
            panic!("future unexpectedly ready");
1685
        }
1686
        if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687
            panic!("future unexpectedly ready");
1688
        }
1689
1690
        assert_eq!(
1691
            mu.raw.state.load(Ordering::Relaxed),
1692
            LOCKED | HAS_WAITERS | WRITER_WAITING,
1693
        );
1694
1695
        // Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
1696
        // in the wait list.
1697
        mem::drop(count);
1698
1699
        assert_eq!(
1700
            mu.raw.state.load(Ordering::Relaxed),
1701
            DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702
        );
1703
1704
        // Now poll the third future.  It should be able to acquire the lock immediately.
1705
        if futures[2].as_mut().poll(&mut cx).is_pending() {
1706
            panic!("future unable to complete");
1707
        }
1708
        assert_eq!(*block_on(mu.lock()), 1);
1709
1710
        // There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
1711
        // be set.
1712
        assert_eq!(
1713
            mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714
            DESIGNATED_WAKER
1715
        );
1716
        assert_eq!(
1717
            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718
            HAS_WAITERS
1719
        );
1720
1721
        // Now let the future that was woken up run.
1722
        if futures[0].as_mut().poll(&mut cx).is_pending() {
1723
            panic!("future unable to complete");
1724
        }
1725
        assert_eq!(*block_on(mu.lock()), 2);
1726
1727
        if futures[1].as_mut().poll(&mut cx).is_pending() {
1728
            panic!("future unable to complete");
1729
        }
1730
        assert_eq!(*block_on(mu.lock()), 3);
1731
1732
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733
    }
1734
1735
    #[test]
1736
    fn cancel_designated_waker() {
1737
        async fn inc(mu: Arc<RwLock<usize>>) {
1738
            *mu.lock().await += 1;
1739
        }
1740
1741
        let mu = Arc::new(RwLock::new(0));
1742
1743
        let mut fut = Box::pin(inc(mu.clone()));
1744
1745
        let arc_waker = Arc::new(TestWaker);
1746
        let waker = waker_ref(&arc_waker);
1747
        let mut cx = Context::from_waker(&waker);
1748
1749
        let count = block_on(mu.lock());
1750
1751
        if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752
            panic!("Future unexpectedly ready when lock is held");
1753
        }
1754
1755
        // Drop the lock.  This will wake up the future.
1756
        mem::drop(count);
1757
1758
        // Now drop the future without polling. This should clear all the state in the rwlock.
1759
        mem::drop(fut);
1760
1761
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762
    }
1763
1764
    #[test]
1765
    fn cancel_before_wake() {
1766
        async fn inc(mu: Arc<RwLock<usize>>) {
1767
            *mu.lock().await += 1;
1768
        }
1769
1770
        let mu = Arc::new(RwLock::new(0));
1771
1772
        let mut fut1 = Box::pin(inc(mu.clone()));
1773
1774
        let mut fut2 = Box::pin(inc(mu.clone()));
1775
1776
        let arc_waker = Arc::new(TestWaker);
1777
        let waker = waker_ref(&arc_waker);
1778
        let mut cx = Context::from_waker(&waker);
1779
1780
        // First acquire the lock.
1781
        let count = block_on(mu.lock());
1782
1783
        // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1784
        // list.
1785
        match fut1.as_mut().poll(&mut cx) {
1786
            Poll::Pending => {}
1787
            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788
        }
1789
1790
        match fut2.as_mut().poll(&mut cx) {
1791
            Poll::Pending => {}
1792
            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793
        }
1794
1795
        assert_eq!(
1796
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797
            WRITER_WAITING
1798
        );
1799
1800
        // Drop fut1.  This should remove it from the waiter list but shouldn't wake fut2.
1801
        mem::drop(fut1);
1802
1803
        // There should be no designated waker.
1804
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805
1806
        // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1807
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808
1809
        match fut2.as_mut().poll(&mut cx) {
1810
            Poll::Pending => {}
1811
            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812
        }
1813
1814
        // Now drop the lock.  This should mark fut2 as ready to make progress.
1815
        mem::drop(count);
1816
1817
        match fut2.as_mut().poll(&mut cx) {
1818
            Poll::Pending => panic!("Future is not ready to make progress"),
1819
            Poll::Ready(()) => {}
1820
        }
1821
1822
        // Verify that we only incremented the count once.
1823
        assert_eq!(*block_on(mu.lock()), 1);
1824
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825
    }
1826
1827
    #[test]
1828
    fn cancel_after_wake() {
1829
        async fn inc(mu: Arc<RwLock<usize>>) {
1830
            *mu.lock().await += 1;
1831
        }
1832
1833
        let mu = Arc::new(RwLock::new(0));
1834
1835
        let mut fut1 = Box::pin(inc(mu.clone()));
1836
1837
        let mut fut2 = Box::pin(inc(mu.clone()));
1838
1839
        let arc_waker = Arc::new(TestWaker);
1840
        let waker = waker_ref(&arc_waker);
1841
        let mut cx = Context::from_waker(&waker);
1842
1843
        // First acquire the lock.
1844
        let count = block_on(mu.lock());
1845
1846
        // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1847
        // list.
1848
        match fut1.as_mut().poll(&mut cx) {
1849
            Poll::Pending => {}
1850
            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851
        }
1852
1853
        match fut2.as_mut().poll(&mut cx) {
1854
            Poll::Pending => {}
1855
            Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856
        }
1857
1858
        assert_eq!(
1859
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860
            WRITER_WAITING
1861
        );
1862
1863
        // Drop the lock.  This should mark fut1 as ready to make progress.
1864
        mem::drop(count);
1865
1866
        // Now drop fut1.  This should make fut2 ready to make progress.
1867
        mem::drop(fut1);
1868
1869
        // Since there was still another waiter in the list we shouldn't have cleared the
1870
        // DESIGNATED_WAKER bit.
1871
        assert_eq!(
1872
            mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873
            DESIGNATED_WAKER
1874
        );
1875
1876
        // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1877
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878
1879
        match fut2.as_mut().poll(&mut cx) {
1880
            Poll::Pending => panic!("Future is not ready to make progress"),
1881
            Poll::Ready(()) => {}
1882
        }
1883
1884
        // Verify that we only incremented the count once.
1885
        assert_eq!(*block_on(mu.lock()), 1);
1886
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887
    }
1888
1889
    #[test]
1890
    fn timeout() {
1891
        async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892
            select! {
1893
                res = timer.fuse() => {
1894
                    match res {
1895
                        Ok(()) => {},
1896
                        Err(e) => panic!("Timer unexpectedly canceled: {e}"),
1897
                    }
1898
                }
1899
                _ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900
            }
1901
        }
1902
1903
        let mu = Arc::new(RwLock::new(()));
1904
        let (tx, rx) = oneshot::channel();
1905
1906
        let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907
1908
        let arc_waker = Arc::new(TestWaker);
1909
        let waker = waker_ref(&arc_waker);
1910
        let mut cx = Context::from_waker(&waker);
1911
1912
        // Acquire the lock.
1913
        let g = block_on(mu.lock());
1914
1915
        // Poll the future.
1916
        if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917
            panic!("timed_lock unexpectedly ready");
1918
        }
1919
1920
        assert_eq!(
1921
            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922
            HAS_WAITERS
1923
        );
1924
1925
        // Signal the channel, which should cancel the lock.
1926
        tx.send(()).expect("Failed to send wakeup");
1927
1928
        // Now the future should have completed without acquiring the lock.
1929
        if timeout.as_mut().poll(&mut cx).is_pending() {
1930
            panic!("timed_lock not ready after timeout");
1931
        }
1932
1933
        // The rwlock state should not show any waiters.
1934
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935
1936
        mem::drop(g);
1937
1938
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939
    }
1940
1941
    #[test]
1942
    fn writer_waiting() {
1943
        async fn read_zero(mu: Arc<RwLock<usize>>) {
1944
            let val = mu.read_lock().await;
1945
            pending!();
1946
1947
            assert_eq!(*val, 0);
1948
        }
1949
1950
        async fn inc(mu: Arc<RwLock<usize>>) {
1951
            *mu.lock().await += 1;
1952
        }
1953
1954
        async fn read_one(mu: Arc<RwLock<usize>>) {
1955
            let val = mu.read_lock().await;
1956
1957
            assert_eq!(*val, 1);
1958
        }
1959
1960
        let mu = Arc::new(RwLock::new(0));
1961
1962
        let mut r1 = Box::pin(read_zero(mu.clone()));
1963
        let mut r2 = Box::pin(read_zero(mu.clone()));
1964
1965
        let mut w = Box::pin(inc(mu.clone()));
1966
        let mut r3 = Box::pin(read_one(mu.clone()));
1967
1968
        let arc_waker = Arc::new(TestWaker);
1969
        let waker = waker_ref(&arc_waker);
1970
        let mut cx = Context::from_waker(&waker);
1971
1972
        if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973
            panic!("read_zero unexpectedly ready");
1974
        }
1975
        if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976
            panic!("read_zero unexpectedly ready");
1977
        }
1978
        assert_eq!(
1979
            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980
            2 * READ_LOCK
1981
        );
1982
1983
        if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984
            panic!("inc unexpectedly ready");
1985
        }
1986
        assert_eq!(
1987
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988
            WRITER_WAITING
1989
        );
1990
1991
        // The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
1992
        if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993
            panic!("read_one unexpectedly ready");
1994
        }
1995
        assert_eq!(
1996
            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997
            2 * READ_LOCK
1998
        );
1999
2000
        if r1.as_mut().poll(&mut cx).is_pending() {
2001
            panic!("read_zero unable to complete");
2002
        }
2003
        if r2.as_mut().poll(&mut cx).is_pending() {
2004
            panic!("read_zero unable to complete");
2005
        }
2006
        if w.as_mut().poll(&mut cx).is_pending() {
2007
            panic!("inc unable to complete");
2008
        }
2009
        if r3.as_mut().poll(&mut cx).is_pending() {
2010
            panic!("read_one unable to complete");
2011
        }
2012
2013
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014
    }
2015
2016
    #[test]
2017
    fn notify_one() {
2018
        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019
            let mut count = mu.read_lock().await;
2020
            while *count == 0 {
2021
                count = cv.wait_read(count).await;
2022
            }
2023
        }
2024
2025
        async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026
            let mut count = mu.lock().await;
2027
            while *count == 0 {
2028
                count = cv.wait(count).await;
2029
            }
2030
2031
            *count -= 1;
2032
        }
2033
2034
        let mu = Arc::new(RwLock::new(0));
2035
        let cv = Arc::new(Condvar::new());
2036
2037
        let arc_waker = Arc::new(TestWaker);
2038
        let waker = waker_ref(&arc_waker);
2039
        let mut cx = Context::from_waker(&waker);
2040
2041
        let mut readers = [
2042
            Box::pin(read(mu.clone(), cv.clone())),
2043
            Box::pin(read(mu.clone(), cv.clone())),
2044
            Box::pin(read(mu.clone(), cv.clone())),
2045
            Box::pin(read(mu.clone(), cv.clone())),
2046
        ];
2047
        let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048
2049
        for r in &mut readers {
2050
            if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051
                panic!("reader unexpectedly ready");
2052
            }
2053
        }
2054
        if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055
            panic!("writer unexpectedly ready");
2056
        }
2057
2058
        let mut count = block_on(mu.lock());
2059
        *count = 1;
2060
2061
        // This should wake all readers + one writer.
2062
        cv.notify_one();
2063
2064
        // Poll the readers and the writer so they add themselves to the rwlock's waiter list.
2065
        for r in &mut readers {
2066
            if r.as_mut().poll(&mut cx).is_ready() {
2067
                panic!("reader unexpectedly ready");
2068
            }
2069
        }
2070
2071
        if writer.as_mut().poll(&mut cx).is_ready() {
2072
            panic!("writer unexpectedly ready");
2073
        }
2074
2075
        assert_eq!(
2076
            mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077
            HAS_WAITERS
2078
        );
2079
        assert_eq!(
2080
            mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081
            WRITER_WAITING
2082
        );
2083
2084
        mem::drop(count);
2085
2086
        assert_eq!(
2087
            mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088
            HAS_WAITERS | WRITER_WAITING
2089
        );
2090
2091
        for r in &mut readers {
2092
            if r.as_mut().poll(&mut cx).is_pending() {
2093
                panic!("reader unable to complete");
2094
            }
2095
        }
2096
2097
        if writer.as_mut().poll(&mut cx).is_pending() {
2098
            panic!("writer unable to complete");
2099
        }
2100
2101
        assert_eq!(*block_on(mu.read_lock()), 0);
2102
    }
2103
2104
    #[test]
2105
    fn notify_when_unlocked() {
2106
        async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107
            let mut count = mu.lock().await;
2108
2109
            while *count == 0 {
2110
                count = cv.wait(count).await;
2111
            }
2112
2113
            *count -= 1;
2114
        }
2115
2116
        let mu = Arc::new(RwLock::new(0));
2117
        let cv = Arc::new(Condvar::new());
2118
2119
        let arc_waker = Arc::new(TestWaker);
2120
        let waker = waker_ref(&arc_waker);
2121
        let mut cx = Context::from_waker(&waker);
2122
2123
        let mut futures = [
2124
            Box::pin(dec(mu.clone(), cv.clone())),
2125
            Box::pin(dec(mu.clone(), cv.clone())),
2126
            Box::pin(dec(mu.clone(), cv.clone())),
2127
            Box::pin(dec(mu.clone(), cv.clone())),
2128
        ];
2129
2130
        for f in &mut futures {
2131
            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132
                panic!("future unexpectedly ready");
2133
            }
2134
        }
2135
2136
        *block_on(mu.lock()) = futures.len();
2137
        cv.notify_all();
2138
2139
        // Since we haven't polled `futures` yet, the rwlock should not have any waiters.
2140
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141
2142
        for f in &mut futures {
2143
            if f.as_mut().poll(&mut cx).is_pending() {
2144
                panic!("future unexpectedly ready");
2145
            }
2146
        }
2147
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148
    }
2149
2150
    #[test]
2151
    fn notify_reader_writer() {
2152
        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153
            let mut count = mu.read_lock().await;
2154
            while *count == 0 {
2155
                count = cv.wait_read(count).await;
2156
            }
2157
2158
            // Yield once while holding the read lock, which should prevent the writer from waking
2159
            // up.
2160
            pending!();
2161
        }
2162
2163
        async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164
            let mut count = mu.lock().await;
2165
            while *count == 0 {
2166
                count = cv.wait(count).await;
2167
            }
2168
2169
            *count -= 1;
2170
        }
2171
2172
        async fn lock(mu: Arc<RwLock<usize>>) {
2173
            mem::drop(mu.lock().await);
2174
        }
2175
2176
        let mu = Arc::new(RwLock::new(0));
2177
        let cv = Arc::new(Condvar::new());
2178
2179
        let arc_waker = Arc::new(TestWaker);
2180
        let waker = waker_ref(&arc_waker);
2181
        let mut cx = Context::from_waker(&waker);
2182
2183
        let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184
            Box::pin(read(mu.clone(), cv.clone())),
2185
            Box::pin(read(mu.clone(), cv.clone())),
2186
            Box::pin(read(mu.clone(), cv.clone())),
2187
            Box::pin(write(mu.clone(), cv.clone())),
2188
            Box::pin(read(mu.clone(), cv.clone())),
2189
        ];
2190
        const NUM_READERS: usize = 4;
2191
2192
        let mut l = Box::pin(lock(mu.clone()));
2193
2194
        for f in &mut futures {
2195
            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196
                panic!("future unexpectedly ready");
2197
            }
2198
        }
2199
2200
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201
2202
        let mut count = block_on(mu.lock());
2203
        *count = 1;
2204
2205
        // Now poll the lock function. Since the lock is held by us, it will get queued on the
2206
        // waiter list.
2207
        if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208
            panic!("lock() unexpectedly ready");
2209
        }
2210
2211
        assert_eq!(
2212
            mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213
            HAS_WAITERS | WRITER_WAITING
2214
        );
2215
2216
        // Wake up waiters while holding the lock.
2217
        cv.notify_all();
2218
2219
        // Drop the lock.  This should wake up the lock function.
2220
        mem::drop(count);
2221
2222
        if l.as_mut().poll(&mut cx).is_pending() {
2223
            panic!("lock() unable to complete");
2224
        }
2225
2226
        // Since we haven't polled `futures` yet, the rwlock state should now be empty.
2227
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228
2229
        // Poll everything again. The readers should be able to make progress (but not complete) but
2230
        // the writer should be blocked.
2231
        for f in &mut futures {
2232
            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233
                panic!("future unexpectedly ready");
2234
            }
2235
        }
2236
2237
        assert_eq!(
2238
            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239
            READ_LOCK * NUM_READERS
2240
        );
2241
2242
        // All the readers can now finish but the writer needs to be polled again.
2243
        let mut needs_poll = None;
2244
        for (i, r) in futures.iter_mut().enumerate() {
2245
            match r.as_mut().poll(&mut cx) {
2246
                Poll::Ready(()) => {}
2247
                Poll::Pending => {
2248
                    if needs_poll.is_some() {
2249
                        panic!("More than one future unable to complete");
2250
                    }
2251
                    needs_poll = Some(i);
2252
                }
2253
            }
2254
        }
2255
2256
        if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257
            .as_mut()
2258
            .poll(&mut cx)
2259
            .is_pending()
2260
        {
2261
            panic!("Writer unable to complete");
2262
        }
2263
2264
        assert_eq!(*block_on(mu.lock()), 0);
2265
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266
    }
2267
2268
    #[test]
2269
    fn notify_readers_with_read_lock() {
2270
        async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271
            let mut count = mu.read_lock().await;
2272
            while *count == 0 {
2273
                count = cv.wait_read(count).await;
2274
            }
2275
2276
            // Yield once while holding the read lock.
2277
            pending!();
2278
        }
2279
2280
        let mu = Arc::new(RwLock::new(0));
2281
        let cv = Arc::new(Condvar::new());
2282
2283
        let arc_waker = Arc::new(TestWaker);
2284
        let waker = waker_ref(&arc_waker);
2285
        let mut cx = Context::from_waker(&waker);
2286
2287
        let mut futures = [
2288
            Box::pin(read(mu.clone(), cv.clone())),
2289
            Box::pin(read(mu.clone(), cv.clone())),
2290
            Box::pin(read(mu.clone(), cv.clone())),
2291
            Box::pin(read(mu.clone(), cv.clone())),
2292
        ];
2293
2294
        for f in &mut futures {
2295
            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296
                panic!("future unexpectedly ready");
2297
            }
2298
        }
2299
2300
        // Increment the count and then grab a read lock.
2301
        *block_on(mu.lock()) = 1;
2302
2303
        let g = block_on(mu.read_lock());
2304
2305
        // Notify the condvar while holding the read lock. This should wake up all the waiters.
2306
        cv.notify_all();
2307
2308
        // Since the lock is held in shared mode, all the readers should immediately be able to
2309
        // acquire the read lock.
2310
        for f in &mut futures {
2311
            if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312
                panic!("future unexpectedly ready");
2313
            }
2314
        }
2315
        assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316
        assert_eq!(
2317
            mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318
            READ_LOCK * (futures.len() + 1)
2319
        );
2320
2321
        mem::drop(g);
2322
2323
        for f in &mut futures {
2324
            if f.as_mut().poll(&mut cx).is_pending() {
2325
                panic!("future unable to complete");
2326
            }
2327
        }
2328
2329
        assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330
    }
2331
}