/rust/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.21/src/lock/bilock.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Futures-powered synchronization primitives. |
2 | | |
3 | | use alloc::boxed::Box; |
4 | | use alloc::sync::Arc; |
5 | | use core::cell::UnsafeCell; |
6 | | use core::fmt; |
7 | | use core::ops::{Deref, DerefMut}; |
8 | | use core::pin::Pin; |
9 | | use core::sync::atomic::AtomicUsize; |
10 | | use core::sync::atomic::Ordering::SeqCst; |
11 | | #[cfg(feature = "bilock")] |
12 | | use futures_core::future::Future; |
13 | | use futures_core::task::{Context, Poll, Waker}; |
14 | | |
15 | | /// A type of futures-powered synchronization primitive which is a mutex between |
16 | | /// two possible owners. |
17 | | /// |
18 | | /// This primitive is not as generic as a full-blown mutex but is sufficient for |
19 | | /// many use cases where there are only two possible owners of a resource. The |
20 | | /// implementation of `BiLock` can be more optimized for just the two possible |
21 | | /// owners. |
22 | | /// |
23 | | /// Note that it's possible to use this lock through a poll-style interface with |
24 | | /// the `poll_lock` method but you can also use it as a future with the `lock` |
25 | | /// method that consumes a `BiLock` and returns a future that will resolve when |
26 | | /// it's locked. |
27 | | /// |
28 | | /// A `BiLock` is typically used for "split" operations where data which serves |
29 | | /// two purposes wants to be split into two to be worked with separately. For |
30 | | /// example a TCP stream could be both a reader and a writer or a framing layer |
31 | | /// could be both a stream and a sink for messages. A `BiLock` enables splitting |
32 | | /// these two and then using each independently in a futures-powered fashion. |
33 | | /// |
34 | | /// This type is only available when the `bilock` feature of this |
35 | | /// library is activated. |
36 | | #[derive(Debug)] |
37 | | #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] |
38 | | pub struct BiLock<T> { |
39 | | arc: Arc<Inner<T>>, |
40 | | } |
41 | | |
42 | | #[derive(Debug)] |
43 | | struct Inner<T> { |
44 | | state: AtomicUsize, |
45 | | value: Option<UnsafeCell<T>>, |
46 | | } |
47 | | |
48 | | unsafe impl<T: Send> Send for Inner<T> {} |
49 | | unsafe impl<T: Send> Sync for Inner<T> {} |
50 | | |
51 | | impl<T> BiLock<T> { |
52 | | /// Creates a new `BiLock` protecting the provided data. |
53 | | /// |
54 | | /// Two handles to the lock are returned, and these are the only two handles |
55 | | /// that will ever be available to the lock. These can then be sent to separate |
56 | | /// tasks to be managed there. |
57 | | /// |
58 | | /// The data behind the bilock is considered to be pinned, which allows `Pin` |
59 | | /// references to locked data. However, this means that the locked value |
60 | | /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. |
61 | | /// Similarly, reuniting the lock and extracting the inner value is only |
62 | | /// possible when `T` is `Unpin`. |
63 | 0 | pub fn new(t: T) -> (Self, Self) { |
64 | 0 | let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); |
65 | 0 |
|
66 | 0 | (Self { arc: arc.clone() }, Self { arc }) |
67 | 0 | } |
68 | | |
69 | | /// Attempt to acquire this lock, returning `Pending` if it can't be |
70 | | /// acquired. |
71 | | /// |
72 | | /// This function will acquire the lock in a nonblocking fashion, returning |
73 | | /// immediately if the lock is already held. If the lock is successfully |
74 | | /// acquired then `Poll::Ready` is returned with a value that represents |
75 | | /// the locked value (and can be used to access the protected data). The |
76 | | /// lock is unlocked when the returned `BiLockGuard` is dropped. |
77 | | /// |
78 | | /// If the lock is already held then this function will return |
79 | | /// `Poll::Pending`. In this case the current task will also be scheduled |
80 | | /// to receive a notification when the lock would otherwise become |
81 | | /// available. |
82 | | /// |
83 | | /// # Panics |
84 | | /// |
85 | | /// This function will panic if called outside the context of a future's |
86 | | /// task. |
87 | 0 | pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { |
88 | 0 | let mut waker = None; |
89 | 0 | loop { |
90 | 0 | match self.arc.state.swap(1, SeqCst) { |
91 | | // Woohoo, we grabbed the lock! |
92 | 0 | 0 => return Poll::Ready(BiLockGuard { bilock: self }), |
93 | | |
94 | | // Oops, someone else has locked the lock |
95 | 0 | 1 => {} |
96 | | |
97 | | // A task was previously blocked on this lock, likely our task, |
98 | | // so we need to update that task. |
99 | 0 | n => unsafe { |
100 | 0 | let mut prev = Box::from_raw(n as *mut Waker); |
101 | 0 | *prev = cx.waker().clone(); |
102 | 0 | waker = Some(prev); |
103 | 0 | }, |
104 | | } |
105 | | |
106 | | // type ascription for safety's sake! |
107 | 0 | let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); |
108 | 0 | let me = Box::into_raw(me) as usize; |
109 | 0 |
|
110 | 0 | match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { |
111 | | // The lock is still locked, but we've now parked ourselves, so |
112 | | // just report that we're scheduled to receive a notification. |
113 | 0 | Ok(_) => return Poll::Pending, |
114 | | |
115 | | // Oops, looks like the lock was unlocked after our swap above |
116 | | // and before the compare_exchange. Deallocate what we just |
117 | | // allocated and go through the loop again. |
118 | 0 | Err(0) => unsafe { |
119 | 0 | waker = Some(Box::from_raw(me as *mut Waker)); |
120 | 0 | }, |
121 | | |
122 | | // The top of this loop set the previous state to 1, so if we |
123 | | // failed the CAS above then it's because the previous value was |
124 | | // *not* zero or one. This indicates that a task was blocked, |
125 | | // but we're trying to acquire the lock and there's only one |
126 | | // other reference of the lock, so it should be impossible for |
127 | | // that task to ever block itself. |
128 | 0 | Err(n) => panic!("invalid state: {}", n), |
129 | | } |
130 | | } |
131 | 0 | } |
132 | | |
133 | | /// Perform a "blocking lock" of this lock, consuming this lock handle and |
134 | | /// returning a future to the acquired lock. |
135 | | /// |
136 | | /// This function consumes the `BiLock<T>` and returns a sentinel future, |
137 | | /// `BiLockAcquire<T>`. The returned future will resolve to |
138 | | /// `BiLockAcquired<T>` which represents a locked lock similarly to |
139 | | /// `BiLockGuard<T>`. |
140 | | /// |
141 | | /// Note that the returned future will never resolve to an error. |
142 | | #[cfg(feature = "bilock")] |
143 | | #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] |
144 | | pub fn lock(&self) -> BiLockAcquire<'_, T> { |
145 | | BiLockAcquire { bilock: self } |
146 | | } |
147 | | |
148 | | /// Attempts to put the two "halves" of a `BiLock<T>` back together and |
149 | | /// recover the original value. Succeeds only if the two `BiLock<T>`s |
150 | | /// originated from the same call to `BiLock::new`. |
151 | 0 | pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> |
152 | 0 | where |
153 | 0 | T: Unpin, |
154 | 0 | { |
155 | 0 | if Arc::ptr_eq(&self.arc, &other.arc) { |
156 | 0 | drop(other); |
157 | 0 | let inner = Arc::try_unwrap(self.arc) |
158 | 0 | .ok() |
159 | 0 | .expect("futures: try_unwrap failed in BiLock<T>::reunite"); |
160 | 0 | Ok(unsafe { inner.into_value() }) |
161 | | } else { |
162 | 0 | Err(ReuniteError(self, other)) |
163 | | } |
164 | 0 | } |
165 | | |
166 | 0 | fn unlock(&self) { |
167 | 0 | match self.arc.state.swap(0, SeqCst) { |
168 | | // we've locked the lock, shouldn't be possible for us to see an |
169 | | // unlocked lock. |
170 | 0 | 0 => panic!("invalid unlocked state"), |
171 | | |
172 | | // Ok, no one else tried to get the lock, we're done. |
173 | 0 | 1 => {} |
174 | | |
175 | | // Another task has parked themselves on this lock, let's wake them |
176 | | // up as its now their turn. |
177 | 0 | n => unsafe { |
178 | 0 | Box::from_raw(n as *mut Waker).wake(); |
179 | 0 | }, |
180 | | } |
181 | 0 | } |
182 | | } |
183 | | |
184 | | impl<T: Unpin> Inner<T> { |
185 | 0 | unsafe fn into_value(mut self) -> T { |
186 | 0 | self.value.take().unwrap().into_inner() |
187 | 0 | } |
188 | | } |
189 | | |
190 | | impl<T> Drop for Inner<T> { |
191 | 0 | fn drop(&mut self) { |
192 | 0 | assert_eq!(self.state.load(SeqCst), 0); |
193 | 0 | } |
194 | | } |
195 | | |
196 | | /// Error indicating two `BiLock<T>`s were not two halves of a whole, and |
197 | | /// thus could not be `reunite`d. |
198 | | #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] |
199 | | pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); |
200 | | |
201 | | impl<T> fmt::Debug for ReuniteError<T> { |
202 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
203 | 0 | f.debug_tuple("ReuniteError").field(&"...").finish() |
204 | 0 | } |
205 | | } |
206 | | |
207 | | impl<T> fmt::Display for ReuniteError<T> { |
208 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
209 | 0 | write!(f, "tried to reunite two BiLocks that don't form a pair") |
210 | 0 | } |
211 | | } |
212 | | |
213 | | #[cfg(feature = "std")] |
214 | | impl<T: core::any::Any> std::error::Error for ReuniteError<T> {} |
215 | | |
216 | | /// Returned RAII guard from the `poll_lock` method. |
217 | | /// |
218 | | /// This structure acts as a sentinel to the data in the `BiLock<T>` itself, |
219 | | /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be |
220 | | /// unlocked. |
221 | | #[derive(Debug)] |
222 | | #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] |
223 | | pub struct BiLockGuard<'a, T> { |
224 | | bilock: &'a BiLock<T>, |
225 | | } |
226 | | |
227 | | impl<T> Deref for BiLockGuard<'_, T> { |
228 | | type Target = T; |
229 | 0 | fn deref(&self) -> &T { |
230 | 0 | unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } |
231 | 0 | } |
232 | | } |
233 | | |
234 | | impl<T: Unpin> DerefMut for BiLockGuard<'_, T> { |
235 | 0 | fn deref_mut(&mut self) -> &mut T { |
236 | 0 | unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } |
237 | 0 | } |
238 | | } |
239 | | |
240 | | impl<T> BiLockGuard<'_, T> { |
241 | | /// Get a mutable pinned reference to the locked value. |
242 | 0 | pub fn as_pin_mut(&mut self) -> Pin<&mut T> { |
243 | 0 | // Safety: we never allow moving a !Unpin value out of a bilock, nor |
244 | 0 | // allow mutable access to it |
245 | 0 | unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } |
246 | 0 | } |
247 | | } |
248 | | |
249 | | impl<T> Drop for BiLockGuard<'_, T> { |
250 | 0 | fn drop(&mut self) { |
251 | 0 | self.bilock.unlock(); |
252 | 0 | } |
253 | | } |
254 | | |
255 | | /// Future returned by `BiLock::lock` which will resolve when the lock is |
256 | | /// acquired. |
257 | | #[cfg(feature = "bilock")] |
258 | | #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] |
259 | | #[must_use = "futures do nothing unless you `.await` or poll them"] |
260 | | #[derive(Debug)] |
261 | | pub struct BiLockAcquire<'a, T> { |
262 | | bilock: &'a BiLock<T>, |
263 | | } |
264 | | |
265 | | // Pinning is never projected to fields |
266 | | #[cfg(feature = "bilock")] |
267 | | impl<T> Unpin for BiLockAcquire<'_, T> {} |
268 | | |
269 | | #[cfg(feature = "bilock")] |
270 | | impl<'a, T> Future for BiLockAcquire<'a, T> { |
271 | | type Output = BiLockGuard<'a, T>; |
272 | | |
273 | | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
274 | | self.bilock.poll_lock(cx) |
275 | | } |
276 | | } |