Coverage Report

Created: 2023-04-25 07:07

/rust/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.4/src/waker.rs
Line
Count
Source (jump to first uncovered line)
1
//! Waking mechanism for threads blocked on channel operations.
2
3
use std::ptr;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::thread::{self, ThreadId};
6
7
use crate::context::Context;
8
use crate::select::{Operation, Selected};
9
use crate::utils::Spinlock;
10
11
/// Represents a thread blocked on a specific channel operation.
12
pub(crate) struct Entry {
13
    /// The operation.
14
    pub(crate) oper: Operation,
15
16
    /// Optional packet.
17
    pub(crate) packet: *mut (),
18
19
    /// Context associated with the thread owning this operation.
20
    pub(crate) cx: Context,
21
}
22
23
/// A queue of threads blocked on channel operations.
24
///
25
/// This data structure is used by threads to register blocking operations and get woken up once
26
/// an operation becomes ready.
27
pub(crate) struct Waker {
28
    /// A list of select operations.
29
    selectors: Vec<Entry>,
30
31
    /// A list of operations waiting to be ready.
32
    observers: Vec<Entry>,
33
}
34
35
impl Waker {
36
    /// Creates a new `Waker`.
37
    #[inline]
38
0
    pub(crate) fn new() -> Self {
39
0
        Waker {
40
0
            selectors: Vec::new(),
41
0
            observers: Vec::new(),
42
0
        }
43
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::new
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::new
44
45
    /// Registers a select operation.
46
    #[inline]
47
0
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48
0
        self.register_with_packet(oper, ptr::null_mut(), cx);
49
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register
50
51
    /// Registers a select operation and a packet.
52
    #[inline]
53
0
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54
0
        self.selectors.push(Entry {
55
0
            oper,
56
0
            packet,
57
0
            cx: cx.clone(),
58
0
        });
59
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register_with_packet
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::register_with_packet
60
61
    /// Unregisters a select operation.
62
    #[inline]
63
    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64
0
        if let Some((i, _)) = self
65
0
            .selectors
66
0
            .iter()
67
0
            .enumerate()
68
0
            .find(|&(_, entry)| entry.oper == oper)
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
69
        {
70
0
            let entry = self.selectors.remove(i);
71
0
            Some(entry)
72
        } else {
73
0
            None
74
        }
75
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
76
77
    /// Attempts to find another thread's entry, select the operation, and wake it up.
78
    #[inline]
79
0
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
80
0
        if self.selectors.is_empty() {
81
0
            None
82
        } else {
83
0
            let thread_id = current_thread_id();
84
0
85
0
            self.selectors
86
0
                .iter()
87
0
                .position(|selector| {
88
0
                    // Does the entry belong to a different thread?
89
0
                    selector.cx.thread_id() != thread_id
90
0
                        && selector // Try selecting this operation.
91
0
                            .cx
92
0
                            .try_select(Selected::Operation(selector.oper))
93
0
                            .is_ok()
94
                        && {
95
                            // Provide the packet.
96
0
                            selector.cx.store_packet(selector.packet);
97
0
                            // Wake the thread up.
98
0
                            selector.cx.unpark();
99
0
                            true
100
                        }
101
0
                })
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
102
0
                // Remove the entry from the queue to keep it clean and improve
103
0
                // performance.
104
0
                .map(|pos| self.selectors.remove(pos))
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
105
        }
106
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
107
108
    /// Returns `true` if there is an entry which can be selected by the current thread.
109
    #[inline]
110
0
    pub(crate) fn can_select(&self) -> bool {
111
0
        if self.selectors.is_empty() {
112
0
            false
113
        } else {
114
0
            let thread_id = current_thread_id();
115
0
116
0
            self.selectors.iter().any(|entry| {
117
0
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
118
0
            })
119
        }
120
0
    }
121
122
    /// Registers an operation waiting to be ready.
123
    #[inline]
124
0
    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
125
0
        self.observers.push(Entry {
126
0
            oper,
127
0
            packet: ptr::null_mut(),
128
0
            cx: cx.clone(),
129
0
        });
130
0
    }
131
132
    /// Unregisters an operation waiting to be ready.
133
    #[inline]
134
0
    pub(crate) fn unwatch(&mut self, oper: Operation) {
135
0
        self.observers.retain(|e| e.oper != oper);
136
0
    }
137
138
    /// Notifies all operations waiting to be ready.
139
    #[inline]
140
0
    pub(crate) fn notify(&mut self) {
141
0
        for entry in self.observers.drain(..) {
142
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
143
0
                entry.cx.unpark();
144
0
            }
145
        }
146
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify
147
148
    /// Notifies all registered operations that the channel is disconnected.
149
    #[inline]
150
0
    pub(crate) fn disconnect(&mut self) {
151
0
        for entry in self.selectors.iter() {
152
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
153
0
                // Wake the thread up.
154
0
                //
155
0
                // Here we don't remove the entry from the queue. Registered threads must
156
0
                // unregister from the waker by themselves. They might also want to recover the
157
0
                // packet value and destroy it, if necessary.
158
0
                entry.cx.unpark();
159
0
            }
160
        }
161
162
0
        self.notify();
163
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect
164
}
165
166
impl Drop for Waker {
167
    #[inline]
168
0
    fn drop(&mut self) {
169
0
        debug_assert_eq!(self.selectors.len(), 0);
170
0
        debug_assert_eq!(self.observers.len(), 0);
171
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Unexecuted instantiation: <crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
172
}
173
174
/// A waker that can be shared among threads without locking.
175
///
176
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
177
pub(crate) struct SyncWaker {
178
    /// The inner `Waker`.
179
    inner: Spinlock<Waker>,
180
181
    /// `true` if the waker is empty.
182
    is_empty: AtomicBool,
183
}
184
185
impl SyncWaker {
186
    /// Creates a new `SyncWaker`.
187
    #[inline]
188
0
    pub(crate) fn new() -> Self {
189
0
        SyncWaker {
190
0
            inner: Spinlock::new(Waker::new()),
191
0
            is_empty: AtomicBool::new(true),
192
0
        }
193
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::new
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::new
194
195
    /// Registers the current thread with an operation.
196
    #[inline]
197
0
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
198
0
        let mut inner = self.inner.lock();
199
0
        inner.register(oper, cx);
200
0
        self.is_empty.store(
201
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
202
0
            Ordering::SeqCst,
203
0
        );
204
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
205
206
    /// Unregisters an operation previously registered by the current thread.
207
    #[inline]
208
0
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
209
0
        let mut inner = self.inner.lock();
210
0
        let entry = inner.unregister(oper);
211
0
        self.is_empty.store(
212
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
213
0
            Ordering::SeqCst,
214
0
        );
215
0
        entry
216
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
217
218
    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
219
    #[inline]
220
0
    pub(crate) fn notify(&self) {
221
0
        if !self.is_empty.load(Ordering::SeqCst) {
222
0
            let mut inner = self.inner.lock();
223
0
            if !self.is_empty.load(Ordering::SeqCst) {
224
0
                inner.try_select();
225
0
                inner.notify();
226
0
                self.is_empty.store(
227
0
                    inner.selectors.is_empty() && inner.observers.is_empty(),
228
0
                    Ordering::SeqCst,
229
                );
230
0
            }
231
0
        }
232
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify
233
234
    /// Registers an operation waiting to be ready.
235
    #[inline]
236
0
    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
237
0
        let mut inner = self.inner.lock();
238
0
        inner.watch(oper, cx);
239
0
        self.is_empty.store(
240
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
241
0
            Ordering::SeqCst,
242
0
        );
243
0
    }
244
245
    /// Unregisters an operation waiting to be ready.
246
    #[inline]
247
0
    pub(crate) fn unwatch(&self, oper: Operation) {
248
0
        let mut inner = self.inner.lock();
249
0
        inner.unwatch(oper);
250
0
        self.is_empty.store(
251
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
252
0
            Ordering::SeqCst,
253
0
        );
254
0
    }
255
256
    /// Notifies all threads that the channel is disconnected.
257
    #[inline]
258
0
    pub(crate) fn disconnect(&self) {
259
0
        let mut inner = self.inner.lock();
260
0
        inner.disconnect();
261
0
        self.is_empty.store(
262
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
263
0
            Ordering::SeqCst,
264
0
        );
265
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect
266
}
267
268
impl Drop for SyncWaker {
269
    #[inline]
270
0
    fn drop(&mut self) {
271
0
        debug_assert!(self.is_empty.load(Ordering::SeqCst));
272
0
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
273
}
274
275
/// Returns the id of the current thread.
276
#[inline]
277
0
fn current_thread_id() -> ThreadId {
278
0
    thread_local! {
279
0
        /// Cached thread-local id.
280
0
        static THREAD_ID: ThreadId = thread::current().id();
281
0
    }
282
0
283
0
    THREAD_ID
284
0
        .try_with(|id| *id)
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#0}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#0}
285
0
        .unwrap_or_else(|_| thread::current().id())
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
286
0
}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id