Coverage Report

Created: 2025-07-23 07:04

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.31/src/lock/mutex.rs
Line
Count
Source (jump to first uncovered line)
1
use std::cell::UnsafeCell;
2
use std::marker::PhantomData;
3
use std::ops::{Deref, DerefMut};
4
use std::pin::Pin;
5
use std::sync::atomic::{AtomicUsize, Ordering};
6
use std::sync::{Arc, Mutex as StdMutex};
7
use std::{fmt, mem};
8
9
use slab::Slab;
10
11
use futures_core::future::{FusedFuture, Future};
12
use futures_core::task::{Context, Poll, Waker};
13
14
/// A futures-aware mutex.
15
///
16
/// # Fairness
17
///
18
/// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
19
/// in the order that they requested the lock, and it's possible for a single task
20
/// which repeatedly takes the lock to starve other tasks, which may be left waiting
21
/// indefinitely.
22
pub struct Mutex<T: ?Sized> {
23
    state: AtomicUsize,
24
    waiters: StdMutex<Slab<Waiter>>,
25
    value: UnsafeCell<T>,
26
}
27
28
impl<T: ?Sized> fmt::Debug for Mutex<T> {
29
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30
0
        let state = self.state.load(Ordering::SeqCst);
31
0
        f.debug_struct("Mutex")
32
0
            .field("is_locked", &((state & IS_LOCKED) != 0))
33
0
            .field("has_waiters", &((state & HAS_WAITERS) != 0))
34
0
            .finish()
35
0
    }
36
}
37
38
impl<T> From<T> for Mutex<T> {
39
0
    fn from(t: T) -> Self {
40
0
        Self::new(t)
41
0
    }
42
}
43
44
impl<T: Default> Default for Mutex<T> {
45
0
    fn default() -> Self {
46
0
        Self::new(Default::default())
47
0
    }
48
}
49
50
enum Waiter {
51
    Waiting(Waker),
52
    Woken,
53
}
54
55
impl Waiter {
56
0
    fn register(&mut self, waker: &Waker) {
57
0
        match self {
58
0
            Self::Waiting(w) if waker.will_wake(w) => {}
59
0
            _ => *self = Self::Waiting(waker.clone()),
60
        }
61
0
    }
62
63
0
    fn wake(&mut self) {
64
0
        match mem::replace(self, Self::Woken) {
65
0
            Self::Waiting(waker) => waker.wake(),
66
0
            Self::Woken => {}
67
        }
68
0
    }
69
}
70
71
const IS_LOCKED: usize = 1 << 0;
72
const HAS_WAITERS: usize = 1 << 1;
73
74
impl<T> Mutex<T> {
75
    /// Creates a new futures-aware mutex.
76
0
    pub fn new(t: T) -> Self {
77
0
        Self {
78
0
            state: AtomicUsize::new(0),
79
0
            waiters: StdMutex::new(Slab::new()),
80
0
            value: UnsafeCell::new(t),
81
0
        }
82
0
    }
83
84
    /// Consumes this mutex, returning the underlying data.
85
    ///
86
    /// # Examples
87
    ///
88
    /// ```
89
    /// use futures::lock::Mutex;
90
    ///
91
    /// let mutex = Mutex::new(0);
92
    /// assert_eq!(mutex.into_inner(), 0);
93
    /// ```
94
0
    pub fn into_inner(self) -> T {
95
0
        self.value.into_inner()
96
0
    }
97
}
98
99
impl<T: ?Sized> Mutex<T> {
100
    /// Attempt to acquire the lock immediately.
101
    ///
102
    /// If the lock is currently held, this will return `None`.
103
0
    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
104
0
        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
105
0
        if (old_state & IS_LOCKED) == 0 {
106
0
            Some(MutexGuard { mutex: self })
107
        } else {
108
0
            None
109
        }
110
0
    }
111
112
    /// Attempt to acquire the lock immediately.
113
    ///
114
    /// If the lock is currently held, this will return `None`.
115
0
    pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
116
0
        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
117
0
        if (old_state & IS_LOCKED) == 0 {
118
0
            Some(OwnedMutexGuard { mutex: self.clone() })
119
        } else {
120
0
            None
121
        }
122
0
    }
123
124
    /// Acquire the lock asynchronously.
125
    ///
126
    /// This method returns a future that will resolve once the lock has been
127
    /// successfully acquired.
128
0
    pub fn lock(&self) -> MutexLockFuture<'_, T> {
129
0
        MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
130
0
    }
131
132
    /// Acquire the lock asynchronously.
133
    ///
134
    /// This method returns a future that will resolve once the lock has been
135
    /// successfully acquired.
136
0
    pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
137
0
        OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
138
0
    }
139
140
    /// Returns a mutable reference to the underlying data.
141
    ///
142
    /// Since this call borrows the `Mutex` mutably, no actual locking needs to
143
    /// take place -- the mutable borrow statically guarantees no locks exist.
144
    ///
145
    /// # Examples
146
    ///
147
    /// ```
148
    /// # futures::executor::block_on(async {
149
    /// use futures::lock::Mutex;
150
    ///
151
    /// let mut mutex = Mutex::new(0);
152
    /// *mutex.get_mut() = 10;
153
    /// assert_eq!(*mutex.lock().await, 10);
154
    /// # });
155
    /// ```
156
0
    pub fn get_mut(&mut self) -> &mut T {
157
0
        // We know statically that there are no other references to `self`, so
158
0
        // there's no need to lock the inner mutex.
159
0
        unsafe { &mut *self.value.get() }
160
0
    }
161
162
0
    fn remove_waker(&self, wait_key: usize, wake_another: bool) {
163
0
        if wait_key != WAIT_KEY_NONE {
164
0
            let mut waiters = self.waiters.lock().unwrap();
165
0
            match waiters.remove(wait_key) {
166
0
                Waiter::Waiting(_) => {}
167
                Waiter::Woken => {
168
                    // We were awoken, but then dropped before we could
169
                    // wake up to acquire the lock. Wake up another
170
                    // waiter.
171
0
                    if wake_another {
172
0
                        if let Some((_i, waiter)) = waiters.iter_mut().next() {
173
0
                            waiter.wake();
174
0
                        }
175
0
                    }
176
                }
177
            }
178
0
            if waiters.is_empty() {
179
0
                self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
180
0
            }
181
0
        }
182
0
    }
183
184
    // Unlocks the mutex. Called by MutexGuard and MappedMutexGuard when they are
185
    // dropped.
186
0
    fn unlock(&self) {
187
0
        let old_state = self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
188
0
        if (old_state & HAS_WAITERS) != 0 {
189
0
            let mut waiters = self.waiters.lock().unwrap();
190
0
            if let Some((_i, waiter)) = waiters.iter_mut().next() {
191
0
                waiter.wake();
192
0
            }
193
0
        }
194
0
    }
195
}
196
197
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
198
const WAIT_KEY_NONE: usize = usize::MAX;
199
200
/// A future which resolves when the target mutex has been successfully acquired, owned version.
201
pub struct OwnedMutexLockFuture<T: ?Sized> {
202
    // `None` indicates that the mutex was successfully acquired.
203
    mutex: Option<Arc<Mutex<T>>>,
204
    wait_key: usize,
205
}
206
207
impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
208
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209
0
        f.debug_struct("OwnedMutexLockFuture")
210
0
            .field("was_acquired", &self.mutex.is_none())
211
0
            .field("mutex", &self.mutex)
212
0
            .field(
213
0
                "wait_key",
214
0
                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
215
            )
216
0
            .finish()
217
0
    }
218
}
219
220
impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
221
0
    fn is_terminated(&self) -> bool {
222
0
        self.mutex.is_none()
223
0
    }
224
}
225
226
impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
227
    type Output = OwnedMutexGuard<T>;
228
229
0
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230
0
        let this = self.get_mut();
231
0
232
0
        let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
233
234
0
        if let Some(lock) = mutex.try_lock_owned() {
235
0
            mutex.remove_waker(this.wait_key, false);
236
0
            this.mutex = None;
237
0
            return Poll::Ready(lock);
238
0
        }
239
0
240
0
        {
241
0
            let mut waiters = mutex.waiters.lock().unwrap();
242
0
            if this.wait_key == WAIT_KEY_NONE {
243
0
                this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
244
0
                if waiters.len() == 1 {
245
0
                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
246
0
                }
247
0
            } else {
248
0
                waiters[this.wait_key].register(cx.waker());
249
0
            }
250
        }
251
252
        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253
        // attempting to acquire the lock again.
254
0
        if let Some(lock) = mutex.try_lock_owned() {
255
0
            mutex.remove_waker(this.wait_key, false);
256
0
            this.mutex = None;
257
0
            return Poll::Ready(lock);
258
0
        }
259
0
260
0
        Poll::Pending
261
0
    }
262
}
263
264
impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
265
0
    fn drop(&mut self) {
266
0
        if let Some(mutex) = self.mutex.as_ref() {
267
0
            // This future was dropped before it acquired the mutex.
268
0
            //
269
0
            // Remove ourselves from the map, waking up another waiter if we
270
0
            // had been awoken to acquire the lock.
271
0
            mutex.remove_waker(self.wait_key, true);
272
0
        }
273
0
    }
274
}
275
276
/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277
/// When this structure is dropped (falls out of scope), the lock will be
278
/// unlocked.
279
pub struct OwnedMutexGuard<T: ?Sized> {
280
    mutex: Arc<Mutex<T>>,
281
}
282
283
impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
284
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285
0
        f.debug_struct("OwnedMutexGuard")
286
0
            .field("value", &&**self)
287
0
            .field("mutex", &self.mutex)
288
0
            .finish()
289
0
    }
290
}
291
292
impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
293
0
    fn drop(&mut self) {
294
0
        self.mutex.unlock()
295
0
    }
296
}
297
298
impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
299
    type Target = T;
300
0
    fn deref(&self) -> &T {
301
0
        unsafe { &*self.mutex.value.get() }
302
0
    }
303
}
304
305
impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
306
0
    fn deref_mut(&mut self) -> &mut T {
307
0
        unsafe { &mut *self.mutex.value.get() }
308
0
    }
309
}
310
311
/// A future which resolves when the target mutex has been successfully acquired.
312
pub struct MutexLockFuture<'a, T: ?Sized> {
313
    // `None` indicates that the mutex was successfully acquired.
314
    mutex: Option<&'a Mutex<T>>,
315
    wait_key: usize,
316
}
317
318
impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
319
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320
0
        f.debug_struct("MutexLockFuture")
321
0
            .field("was_acquired", &self.mutex.is_none())
322
0
            .field("mutex", &self.mutex)
323
0
            .field(
324
0
                "wait_key",
325
0
                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
326
            )
327
0
            .finish()
328
0
    }
329
}
330
331
impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> {
332
0
    fn is_terminated(&self) -> bool {
333
0
        self.mutex.is_none()
334
0
    }
335
}
336
337
impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> {
338
    type Output = MutexGuard<'a, T>;
339
340
0
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341
0
        let mutex = self.mutex.expect("polled MutexLockFuture after completion");
342
343
0
        if let Some(lock) = mutex.try_lock() {
344
0
            mutex.remove_waker(self.wait_key, false);
345
0
            self.mutex = None;
346
0
            return Poll::Ready(lock);
347
0
        }
348
0
349
0
        {
350
0
            let mut waiters = mutex.waiters.lock().unwrap();
351
0
            if self.wait_key == WAIT_KEY_NONE {
352
0
                self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
353
0
                if waiters.len() == 1 {
354
0
                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
355
0
                }
356
0
            } else {
357
0
                waiters[self.wait_key].register(cx.waker());
358
0
            }
359
        }
360
361
        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
362
        // attempting to acquire the lock again.
363
0
        if let Some(lock) = mutex.try_lock() {
364
0
            mutex.remove_waker(self.wait_key, false);
365
0
            self.mutex = None;
366
0
            return Poll::Ready(lock);
367
0
        }
368
0
369
0
        Poll::Pending
370
0
    }
371
}
372
373
impl<T: ?Sized> Drop for MutexLockFuture<'_, T> {
374
0
    fn drop(&mut self) {
375
0
        if let Some(mutex) = self.mutex {
376
0
            // This future was dropped before it acquired the mutex.
377
0
            //
378
0
            // Remove ourselves from the map, waking up another waiter if we
379
0
            // had been awoken to acquire the lock.
380
0
            mutex.remove_waker(self.wait_key, true);
381
0
        }
382
0
    }
383
}
384
385
/// An RAII guard returned by the `lock` and `try_lock` methods.
386
/// When this structure is dropped (falls out of scope), the lock will be
387
/// unlocked.
388
pub struct MutexGuard<'a, T: ?Sized> {
389
    mutex: &'a Mutex<T>,
390
}
391
392
impl<'a, T: ?Sized> MutexGuard<'a, T> {
393
    /// Returns a locked view over a portion of the locked data.
394
    ///
395
    /// # Example
396
    ///
397
    /// ```
398
    /// # futures::executor::block_on(async {
399
    /// use futures::lock::{Mutex, MutexGuard};
400
    ///
401
    /// let data = Mutex::new(Some("value".to_string()));
402
    /// {
403
    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
404
    ///     assert_eq!(&*locked_str, "value");
405
    /// }
406
    /// # });
407
    /// ```
408
    #[inline]
409
0
    pub fn map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U>
410
0
    where
411
0
        F: FnOnce(&mut T) -> &mut U,
412
0
    {
413
0
        let mutex = this.mutex;
414
0
        let value = f(unsafe { &mut *this.mutex.value.get() });
415
0
        // Don't run the `drop` method for MutexGuard. The ownership of the underlying
416
0
        // locked state is being moved to the returned MappedMutexGuard.
417
0
        mem::forget(this);
418
0
        MappedMutexGuard { mutex, value, _marker: PhantomData }
419
0
    }
420
}
421
422
impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
423
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424
0
        f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish()
425
0
    }
426
}
427
428
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
429
0
    fn drop(&mut self) {
430
0
        self.mutex.unlock()
431
0
    }
432
}
433
434
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
435
    type Target = T;
436
0
    fn deref(&self) -> &T {
437
0
        unsafe { &*self.mutex.value.get() }
438
0
    }
439
}
440
441
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
442
0
    fn deref_mut(&mut self) -> &mut T {
443
0
        unsafe { &mut *self.mutex.value.get() }
444
0
    }
445
}
446
447
/// An RAII guard returned by the `MutexGuard::map` and `MappedMutexGuard::map` methods.
448
/// When this structure is dropped (falls out of scope), the lock will be unlocked.
449
pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
450
    mutex: &'a Mutex<T>,
451
    value: *mut U,
452
    _marker: PhantomData<&'a mut U>,
453
}
454
455
impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
456
    /// Returns a locked view over a portion of the locked data.
457
    ///
458
    /// # Example
459
    ///
460
    /// ```
461
    /// # futures::executor::block_on(async {
462
    /// use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
463
    ///
464
    /// let data = Mutex::new(Some("value".to_string()));
465
    /// {
466
    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
467
    ///     let locked_char = MappedMutexGuard::map(locked_str, |s| s.get_mut(0..1).unwrap());
468
    ///     assert_eq!(&*locked_char, "v");
469
    /// }
470
    /// # });
471
    /// ```
472
    #[inline]
473
0
    pub fn map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V>
474
0
    where
475
0
        F: FnOnce(&mut U) -> &mut V,
476
0
    {
477
0
        let mutex = this.mutex;
478
0
        let value = f(unsafe { &mut *this.value });
479
0
        // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
480
0
        // locked state is being moved to the returned MappedMutexGuard.
481
0
        mem::forget(this);
482
0
        MappedMutexGuard { mutex, value, _marker: PhantomData }
483
0
    }
484
}
485
486
impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'_, T, U> {
487
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488
0
        f.debug_struct("MappedMutexGuard")
489
0
            .field("value", &&**self)
490
0
            .field("mutex", &self.mutex)
491
0
            .finish()
492
0
    }
493
}
494
495
impl<T: ?Sized, U: ?Sized> Drop for MappedMutexGuard<'_, T, U> {
496
0
    fn drop(&mut self) {
497
0
        self.mutex.unlock()
498
0
    }
499
}
500
501
impl<T: ?Sized, U: ?Sized> Deref for MappedMutexGuard<'_, T, U> {
502
    type Target = U;
503
0
    fn deref(&self) -> &U {
504
0
        unsafe { &*self.value }
505
0
    }
506
}
507
508
impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> {
509
0
    fn deref_mut(&mut self) -> &mut U {
510
0
        unsafe { &mut *self.value }
511
0
    }
512
}
513
514
// Mutexes can be moved freely between threads and acquired on any thread so long
515
// as the inner value can be safely sent between threads.
516
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
517
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
518
519
// It's safe to switch which thread the acquire is being attempted on so long as
520
// `T` can be accessed on that thread.
521
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
522
523
// doesn't have any interesting `&self` methods (only Debug)
524
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
525
526
// It's safe to switch which thread the acquire is being attempted on so long as
527
// `T` can be accessed on that thread.
528
unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
529
530
// doesn't have any interesting `&self` methods (only Debug)
531
unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
532
533
// Safe to send since we don't track any thread-specific details-- the inner
534
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
535
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
536
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
537
538
unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
539
unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
540
541
unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
542
unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
543
544
#[cfg(test)]
545
mod tests {
546
    use super::*;
547
    use std::format;
548
549
    #[test]
550
    fn test_mutex_guard_debug_not_recurse() {
551
        let mutex = Mutex::new(42);
552
        let guard = mutex.try_lock().unwrap();
553
        let _ = format!("{:?}", guard);
554
        let guard = MutexGuard::map(guard, |n| n);
555
        let _ = format!("{:?}", guard);
556
    }
557
}