Coverage Report

Created: 2024-09-08 06:35

/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.21/src/lock/bilock.rs
Line
Count
Source (jump to first uncovered line)
1
//! Futures-powered synchronization primitives.
2
3
use alloc::boxed::Box;
4
use alloc::sync::Arc;
5
use core::cell::UnsafeCell;
6
use core::fmt;
7
use core::ops::{Deref, DerefMut};
8
use core::pin::Pin;
9
use core::sync::atomic::AtomicUsize;
10
use core::sync::atomic::Ordering::SeqCst;
11
#[cfg(feature = "bilock")]
12
use futures_core::future::Future;
13
use futures_core::task::{Context, Poll, Waker};
14
15
/// A type of futures-powered synchronization primitive which is a mutex between
16
/// two possible owners.
17
///
18
/// This primitive is not as generic as a full-blown mutex but is sufficient for
19
/// many use cases where there are only two possible owners of a resource. The
20
/// implementation of `BiLock` can be more optimized for just the two possible
21
/// owners.
22
///
23
/// Note that it's possible to use this lock through a poll-style interface with
24
/// the `poll_lock` method but you can also use it as a future with the `lock`
25
/// method that consumes a `BiLock` and returns a future that will resolve when
26
/// it's locked.
27
///
28
/// A `BiLock` is typically used for "split" operations where data which serves
29
/// two purposes wants to be split into two to be worked with separately. For
30
/// example a TCP stream could be both a reader and a writer or a framing layer
31
/// could be both a stream and a sink for messages. A `BiLock` enables splitting
32
/// these two and then using each independently in a futures-powered fashion.
33
///
34
/// This type is only available when the `bilock` feature of this
35
/// library is activated.
36
#[derive(Debug)]
37
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
38
pub struct BiLock<T> {
39
    arc: Arc<Inner<T>>,
40
}
41
42
#[derive(Debug)]
43
struct Inner<T> {
44
    state: AtomicUsize,
45
    value: Option<UnsafeCell<T>>,
46
}
47
48
unsafe impl<T: Send> Send for Inner<T> {}
49
unsafe impl<T: Send> Sync for Inner<T> {}
50
51
impl<T> BiLock<T> {
52
    /// Creates a new `BiLock` protecting the provided data.
53
    ///
54
    /// Two handles to the lock are returned, and these are the only two handles
55
    /// that will ever be available to the lock. These can then be sent to separate
56
    /// tasks to be managed there.
57
    ///
58
    /// The data behind the bilock is considered to be pinned, which allows `Pin`
59
    /// references to locked data. However, this means that the locked value
60
    /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`.
61
    /// Similarly, reuniting the lock and extracting the inner value is only
62
    /// possible when `T` is `Unpin`.
63
0
    pub fn new(t: T) -> (Self, Self) {
64
0
        let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) });
65
0
66
0
        (Self { arc: arc.clone() }, Self { arc })
67
0
    }
68
69
    /// Attempt to acquire this lock, returning `Pending` if it can't be
70
    /// acquired.
71
    ///
72
    /// This function will acquire the lock in a nonblocking fashion, returning
73
    /// immediately if the lock is already held. If the lock is successfully
74
    /// acquired then `Poll::Ready` is returned with a value that represents
75
    /// the locked value (and can be used to access the protected data). The
76
    /// lock is unlocked when the returned `BiLockGuard` is dropped.
77
    ///
78
    /// If the lock is already held then this function will return
79
    /// `Poll::Pending`. In this case the current task will also be scheduled
80
    /// to receive a notification when the lock would otherwise become
81
    /// available.
82
    ///
83
    /// # Panics
84
    ///
85
    /// This function will panic if called outside the context of a future's
86
    /// task.
87
0
    pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
88
0
        let mut waker = None;
89
0
        loop {
90
0
            match self.arc.state.swap(1, SeqCst) {
91
                // Woohoo, we grabbed the lock!
92
0
                0 => return Poll::Ready(BiLockGuard { bilock: self }),
93
94
                // Oops, someone else has locked the lock
95
0
                1 => {}
96
97
                // A task was previously blocked on this lock, likely our task,
98
                // so we need to update that task.
99
0
                n => unsafe {
100
0
                    let mut prev = Box::from_raw(n as *mut Waker);
101
0
                    *prev = cx.waker().clone();
102
0
                    waker = Some(prev);
103
0
                },
104
            }
105
106
            // type ascription for safety's sake!
107
0
            let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
108
0
            let me = Box::into_raw(me) as usize;
109
0
110
0
            match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) {
111
                // The lock is still locked, but we've now parked ourselves, so
112
                // just report that we're scheduled to receive a notification.
113
0
                Ok(_) => return Poll::Pending,
114
115
                // Oops, looks like the lock was unlocked after our swap above
116
                // and before the compare_exchange. Deallocate what we just
117
                // allocated and go through the loop again.
118
0
                Err(0) => unsafe {
119
0
                    waker = Some(Box::from_raw(me as *mut Waker));
120
0
                },
121
122
                // The top of this loop set the previous state to 1, so if we
123
                // failed the CAS above then it's because the previous value was
124
                // *not* zero or one. This indicates that a task was blocked,
125
                // but we're trying to acquire the lock and there's only one
126
                // other reference of the lock, so it should be impossible for
127
                // that task to ever block itself.
128
0
                Err(n) => panic!("invalid state: {}", n),
129
            }
130
        }
131
0
    }
132
133
    /// Perform a "blocking lock" of this lock, consuming this lock handle and
134
    /// returning a future to the acquired lock.
135
    ///
136
    /// This function consumes the `BiLock<T>` and returns a sentinel future,
137
    /// `BiLockAcquire<T>`. The returned future will resolve to
138
    /// `BiLockAcquired<T>` which represents a locked lock similarly to
139
    /// `BiLockGuard<T>`.
140
    ///
141
    /// Note that the returned future will never resolve to an error.
142
    #[cfg(feature = "bilock")]
143
    #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
144
    pub fn lock(&self) -> BiLockAcquire<'_, T> {
145
        BiLockAcquire { bilock: self }
146
    }
147
148
    /// Attempts to put the two "halves" of a `BiLock<T>` back together and
149
    /// recover the original value. Succeeds only if the two `BiLock<T>`s
150
    /// originated from the same call to `BiLock::new`.
151
0
    pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>>
152
0
    where
153
0
        T: Unpin,
154
0
    {
155
0
        if Arc::ptr_eq(&self.arc, &other.arc) {
156
0
            drop(other);
157
0
            let inner = Arc::try_unwrap(self.arc)
158
0
                .ok()
159
0
                .expect("futures: try_unwrap failed in BiLock<T>::reunite");
160
0
            Ok(unsafe { inner.into_value() })
161
        } else {
162
0
            Err(ReuniteError(self, other))
163
        }
164
0
    }
165
166
0
    fn unlock(&self) {
167
0
        match self.arc.state.swap(0, SeqCst) {
168
            // we've locked the lock, shouldn't be possible for us to see an
169
            // unlocked lock.
170
0
            0 => panic!("invalid unlocked state"),
171
172
            // Ok, no one else tried to get the lock, we're done.
173
0
            1 => {}
174
175
            // Another task has parked themselves on this lock, let's wake them
176
            // up as its now their turn.
177
0
            n => unsafe {
178
0
                Box::from_raw(n as *mut Waker).wake();
179
0
            },
180
        }
181
0
    }
182
}
183
184
impl<T: Unpin> Inner<T> {
185
0
    unsafe fn into_value(mut self) -> T {
186
0
        self.value.take().unwrap().into_inner()
187
0
    }
188
}
189
190
impl<T> Drop for Inner<T> {
191
0
    fn drop(&mut self) {
192
0
        assert_eq!(self.state.load(SeqCst), 0);
193
0
    }
194
}
195
196
/// Error indicating two `BiLock<T>`s were not two halves of a whole, and
197
/// thus could not be `reunite`d.
198
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
199
pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
200
201
impl<T> fmt::Debug for ReuniteError<T> {
202
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203
0
        f.debug_tuple("ReuniteError").field(&"...").finish()
204
0
    }
205
}
206
207
impl<T> fmt::Display for ReuniteError<T> {
208
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209
0
        write!(f, "tried to reunite two BiLocks that don't form a pair")
210
0
    }
211
}
212
213
#[cfg(feature = "std")]
214
impl<T: core::any::Any> std::error::Error for ReuniteError<T> {}
215
216
/// Returned RAII guard from the `poll_lock` method.
217
///
218
/// This structure acts as a sentinel to the data in the `BiLock<T>` itself,
219
/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
220
/// unlocked.
221
#[derive(Debug)]
222
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
223
pub struct BiLockGuard<'a, T> {
224
    bilock: &'a BiLock<T>,
225
}
226
227
impl<T> Deref for BiLockGuard<'_, T> {
228
    type Target = T;
229
0
    fn deref(&self) -> &T {
230
0
        unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() }
231
0
    }
232
}
233
234
impl<T: Unpin> DerefMut for BiLockGuard<'_, T> {
235
0
    fn deref_mut(&mut self) -> &mut T {
236
0
        unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() }
237
0
    }
238
}
239
240
impl<T> BiLockGuard<'_, T> {
241
    /// Get a mutable pinned reference to the locked value.
242
0
    pub fn as_pin_mut(&mut self) -> Pin<&mut T> {
243
0
        // Safety: we never allow moving a !Unpin value out of a bilock, nor
244
0
        // allow mutable access to it
245
0
        unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) }
246
0
    }
247
}
248
249
impl<T> Drop for BiLockGuard<'_, T> {
250
0
    fn drop(&mut self) {
251
0
        self.bilock.unlock();
252
0
    }
253
}
254
255
/// Future returned by `BiLock::lock` which will resolve when the lock is
256
/// acquired.
257
#[cfg(feature = "bilock")]
258
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
259
#[must_use = "futures do nothing unless you `.await` or poll them"]
260
#[derive(Debug)]
261
pub struct BiLockAcquire<'a, T> {
262
    bilock: &'a BiLock<T>,
263
}
264
265
// Pinning is never projected to fields
266
#[cfg(feature = "bilock")]
267
impl<T> Unpin for BiLockAcquire<'_, T> {}
268
269
#[cfg(feature = "bilock")]
270
impl<'a, T> Future for BiLockAcquire<'a, T> {
271
    type Output = BiLockGuard<'a, T>;
272
273
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274
        self.bilock.poll_lock(cx)
275
    }
276
}