Coverage Report

Created: 2026-02-14 07:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/crossbeam-channel-0.5.15/src/waker.rs
Line
Count
Source
1
//! Waking mechanism for threads blocked on channel operations.
2
3
use std::ptr;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::sync::Mutex;
6
use std::thread::{self, ThreadId};
7
use std::vec::Vec;
8
9
use crate::context::Context;
10
use crate::select::{Operation, Selected};
11
12
/// Represents a thread blocked on a specific channel operation.
13
pub(crate) struct Entry {
14
    /// The operation.
15
    pub(crate) oper: Operation,
16
17
    /// Optional packet.
18
    pub(crate) packet: *mut (),
19
20
    /// Context associated with the thread owning this operation.
21
    pub(crate) cx: Context,
22
}
23
24
/// A queue of threads blocked on channel operations.
25
///
26
/// This data structure is used by threads to register blocking operations and get woken up once
27
/// an operation becomes ready.
28
pub(crate) struct Waker {
29
    /// A list of select operations.
30
    selectors: Vec<Entry>,
31
32
    /// A list of operations waiting to be ready.
33
    observers: Vec<Entry>,
34
}
35
36
impl Waker {
37
    /// Creates a new `Waker`.
38
    #[inline]
39
63.0k
    pub(crate) fn new() -> Self {
40
63.0k
        Waker {
41
63.0k
            selectors: Vec::new(),
42
63.0k
            observers: Vec::new(),
43
63.0k
        }
44
63.0k
    }
45
46
    /// Registers a select operation.
47
    #[inline]
48
0
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
49
0
        self.register_with_packet(oper, ptr::null_mut(), cx);
50
0
    }
51
52
    /// Registers a select operation and a packet.
53
    #[inline]
54
0
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
55
0
        self.selectors.push(Entry {
56
0
            oper,
57
0
            packet,
58
0
            cx: cx.clone(),
59
0
        });
60
0
    }
61
62
    /// Unregisters a select operation.
63
    #[inline]
64
0
    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
65
0
        if let Some((i, _)) = self
66
0
            .selectors
67
0
            .iter()
68
0
            .enumerate()
69
0
            .find(|&(_, entry)| entry.oper == oper)
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister::{closure#0}
70
        {
71
0
            let entry = self.selectors.remove(i);
72
0
            Some(entry)
73
        } else {
74
0
            None
75
        }
76
0
    }
77
78
    /// Attempts to find another thread's entry, select the operation, and wake it up.
79
    #[inline]
80
0
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
81
0
        if self.selectors.is_empty() {
82
0
            None
83
        } else {
84
0
            let thread_id = current_thread_id();
85
86
0
            self.selectors
87
0
                .iter()
88
0
                .position(|selector| {
89
                    // Does the entry belong to a different thread?
90
0
                    selector.cx.thread_id() != thread_id
91
0
                        && selector // Try selecting this operation.
92
0
                            .cx
93
0
                            .try_select(Selected::Operation(selector.oper))
94
0
                            .is_ok()
95
                        && {
96
                            // Provide the packet.
97
0
                            selector.cx.store_packet(selector.packet);
98
                            // Wake the thread up.
99
0
                            selector.cx.unpark();
100
0
                            true
101
                        }
102
0
                })
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#0}
103
                // Remove the entry from the queue to keep it clean and improve
104
                // performance.
105
0
                .map(|pos| self.selectors.remove(pos))
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select::{closure#1}
106
        }
107
0
    }
108
109
    /// Returns `true` if there is an entry which can be selected by the current thread.
110
    #[inline]
111
0
    pub(crate) fn can_select(&self) -> bool {
112
0
        if self.selectors.is_empty() {
113
0
            false
114
        } else {
115
0
            let thread_id = current_thread_id();
116
117
0
            self.selectors.iter().any(|entry| {
118
0
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
0
            })
120
        }
121
0
    }
122
123
    /// Registers an operation waiting to be ready.
124
    #[inline]
125
0
    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126
0
        self.observers.push(Entry {
127
0
            oper,
128
0
            packet: ptr::null_mut(),
129
0
            cx: cx.clone(),
130
0
        });
131
0
    }
132
133
    /// Unregisters an operation waiting to be ready.
134
    #[inline]
135
0
    pub(crate) fn unwatch(&mut self, oper: Operation) {
136
0
        self.observers.retain(|e| e.oper != oper);
137
0
    }
138
139
    /// Notifies all operations waiting to be ready.
140
    #[inline]
141
63.0k
    pub(crate) fn notify(&mut self) {
142
63.0k
        for entry in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
63.0k
    }
148
149
    /// Notifies all registered operations that the channel is disconnected.
150
    #[inline]
151
63.0k
    pub(crate) fn disconnect(&mut self) {
152
63.0k
        for entry in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
63.0k
        self.notify();
164
63.0k
    }
165
}
166
167
impl Drop for Waker {
168
    #[inline]
169
63.0k
    fn drop(&mut self) {
170
63.0k
        debug_assert_eq!(self.selectors.len(), 0);
171
63.0k
        debug_assert_eq!(self.observers.len(), 0);
172
63.0k
    }
173
}
174
175
/// A waker that can be shared among threads without locking.
176
///
177
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
178
pub(crate) struct SyncWaker {
179
    /// The inner `Waker`.
180
    inner: Mutex<Waker>,
181
182
    /// `true` if the waker is empty.
183
    is_empty: AtomicBool,
184
}
185
186
impl SyncWaker {
187
    /// Creates a new `SyncWaker`.
188
    #[inline]
189
63.0k
    pub(crate) fn new() -> Self {
190
63.0k
        SyncWaker {
191
63.0k
            inner: Mutex::new(Waker::new()),
192
63.0k
            is_empty: AtomicBool::new(true),
193
63.0k
        }
194
63.0k
    }
195
196
    /// Registers the current thread with an operation.
197
    #[inline]
198
0
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
0
        let mut inner = self.inner.lock().unwrap();
200
0
        inner.register(oper, cx);
201
0
        self.is_empty.store(
202
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
203
0
            Ordering::SeqCst,
204
        );
205
0
    }
206
207
    /// Unregisters an operation previously registered by the current thread.
208
    #[inline]
209
0
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
0
        let mut inner = self.inner.lock().unwrap();
211
0
        let entry = inner.unregister(oper);
212
0
        self.is_empty.store(
213
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
0
            Ordering::SeqCst,
215
        );
216
0
        entry
217
0
    }
218
219
    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
220
    #[inline]
221
58.5k
    pub(crate) fn notify(&self) {
222
58.5k
        if !self.is_empty.load(Ordering::SeqCst) {
223
0
            let mut inner = self.inner.lock().unwrap();
224
0
            if !self.is_empty.load(Ordering::SeqCst) {
225
0
                inner.try_select();
226
0
                inner.notify();
227
0
                self.is_empty.store(
228
0
                    inner.selectors.is_empty() && inner.observers.is_empty(),
229
0
                    Ordering::SeqCst,
230
                );
231
0
            }
232
58.5k
        }
233
58.5k
    }
234
235
    /// Registers an operation waiting to be ready.
236
    #[inline]
237
0
    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238
0
        let mut inner = self.inner.lock().unwrap();
239
0
        inner.watch(oper, cx);
240
0
        self.is_empty.store(
241
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
242
0
            Ordering::SeqCst,
243
        );
244
0
    }
245
246
    /// Unregisters an operation waiting to be ready.
247
    #[inline]
248
0
    pub(crate) fn unwatch(&self, oper: Operation) {
249
0
        let mut inner = self.inner.lock().unwrap();
250
0
        inner.unwatch(oper);
251
0
        self.is_empty.store(
252
0
            inner.selectors.is_empty() && inner.observers.is_empty(),
253
0
            Ordering::SeqCst,
254
        );
255
0
    }
256
257
    /// Notifies all threads that the channel is disconnected.
258
    #[inline]
259
63.0k
    pub(crate) fn disconnect(&self) {
260
63.0k
        let mut inner = self.inner.lock().unwrap();
261
63.0k
        inner.disconnect();
262
63.0k
        self.is_empty.store(
263
63.0k
            inner.selectors.is_empty() && inner.observers.is_empty(),
264
63.0k
            Ordering::SeqCst,
265
        );
266
63.0k
    }
267
}
268
269
impl Drop for SyncWaker {
270
    #[inline]
271
63.0k
    fn drop(&mut self) {
272
63.0k
        debug_assert!(self.is_empty.load(Ordering::SeqCst));
273
63.0k
    }
274
}
275
276
/// Returns the id of the current thread.
277
#[inline]
278
0
fn current_thread_id() -> ThreadId {
279
    std::thread_local! {
280
        /// Cached thread-local id.
281
        static THREAD_ID: ThreadId = thread::current().id();
282
    }
283
284
0
    THREAD_ID
285
0
        .try_with(|id| *id)
286
0
        .unwrap_or_else(|_| thread::current().id())
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id::{closure#1}
287
0
}