Coverage Report

Created: 2024-05-20 06:38

/rust/registry/src/index.crates.io-6f17d22bba15001f/dbus-0.9.7/src/channel/ffichannel.rs
Line
Count
Source (jump to first uncovered line)
1
//! Contains some helper structs and traits common to all Connection types.-
2
3
use crate::{Error, Message, to_c_str};
4
use std::{str, time::Duration, collections::HashMap};
5
use std::sync::{Mutex, atomic::AtomicU8, atomic::Ordering};
6
use std::ffi::CStr;
7
use std::os::raw::{c_void, c_int};
8
use super::{BusType, Watch, WatchFd};
9
10
0
#[derive(Debug)]
11
struct ConnHandle(*mut ffi::DBusConnection, bool);
12
13
unsafe impl Send for ConnHandle {}
14
unsafe impl Sync for ConnHandle {}
15
16
impl Drop for ConnHandle {
17
0
    fn drop(&mut self) {
18
0
        if self.1 { unsafe {
19
0
            ffi::dbus_connection_close(self.0);
20
0
            ffi::dbus_connection_unref(self.0);
21
0
        }}
22
0
    }
23
}
24
25
0
#[derive(Debug, Eq, PartialEq, Hash)]
26
struct WatchHandle(*mut ffi::DBusWatch);
27
28
unsafe impl Send for WatchHandle {}
29
unsafe impl Sync for WatchHandle {}
30
31
/// This struct must be boxed as it is called from D-Bus callbacks!
32
0
#[derive(Debug)]
33
struct WatchMap {
34
    conn: ConnHandle,
35
    list: Mutex<HashMap<WatchHandle, (Watch, bool)>>,
36
    current_rw: AtomicU8,
37
    current_fd: Option<WatchFd>,
38
}
39
40
0
fn calc_rw(list: &HashMap<WatchHandle, (Watch, bool)>) -> u8 {
41
0
    let mut r = 0;
42
0
    for (w, b) in list.values() {
43
0
        if *b && w.read { r |= 1; }
44
0
        if *b && w.write { r |= 2; }
45
    }
46
0
    r
47
0
}
48
49
impl WatchMap {
50
0
    fn new(conn: ConnHandle) -> Box<WatchMap> {
51
0
        extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 { unsafe {
52
0
            let wm: &WatchMap = &*(data as *mut _);
53
0
            wm.list.lock().unwrap().insert(WatchHandle(watch), Watch::from_raw_enabled(watch));
54
0
            1
55
0
        }}
56
0
        extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe {
57
0
            let wm: &WatchMap = &*(data as *mut _);
58
0
            wm.list.lock().unwrap().remove(&WatchHandle(watch));
59
0
        }}
60
0
        extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe {
61
0
            let wm: &WatchMap = &*(data as *mut _);
62
0
            let mut list = wm.list.lock().unwrap();
63
0
            let (_, ref mut b) = list.get_mut(&WatchHandle(watch)).unwrap();
64
0
            *b = ffi::dbus_watch_get_enabled(watch) != 0;
65
0
            wm.current_rw.store(calc_rw(&list), Ordering::Release);
66
0
        }}
67
0
68
0
        let mut wm = Box::new(WatchMap {
69
0
            conn, list: Default::default(), current_rw: Default::default(), current_fd: None
70
0
        });
71
0
        let wptr: &WatchMap = &wm;
72
0
        if unsafe { ffi::dbus_connection_set_watch_functions(wm.conn.0,
73
0
            Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), wptr as *const _ as *mut _, None) } == 0 {
74
0
                panic!("Cannot enable watch tracking (OOM?)")
75
0
        }
76
0
77
0
        {
78
0
            let list = wm.list.lock().unwrap();
79
0
            wm.current_rw.store(calc_rw(&list), Ordering::Release);
80
81
            // This will never panic in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html
82
0
            for (w, _) in list.values() {
83
0
                if let Some(ref fd) = &wm.current_fd {
84
0
                    assert_eq!(*fd, w.fd);
85
0
                } else {
86
0
                    wm.current_fd = Some(w.fd);
87
0
                }
88
            }
89
        }
90
91
0
        wm
92
0
    }
93
}
94
95
impl Drop for WatchMap {
96
0
    fn drop(&mut self) {
97
0
        let wptr: &WatchMap = &self;
98
0
        if unsafe { ffi::dbus_connection_set_watch_functions(self.conn.0,
99
0
            None, None, None, wptr as *const _ as *mut _, None) } == 0 {
100
0
                panic!("Cannot disable watch tracking (OOM?)")
101
0
        }
102
0
    }
103
}
104
105
/// Low-level connection - handles read/write to the socket
106
///
107
/// You probably do not need to worry about this as you would typically
108
/// use the various blocking and non-blocking "Connection" structs instead.
109
///
110
/// This version avoids dbus_connection_dispatch, and thus avoids
111
/// callbacks from that function. Instead the same functionality
112
/// is implemented in the various blocking and non-blocking "Connection" components.
113
///
114
/// Blocking operations are clearly marked as such, although if you
115
/// try to access the connection from several threads at the same time,
116
/// blocking might occur due to an internal mutex inside the dbus library.
117
0
#[derive(Debug)]
118
pub struct Channel {
119
    handle: ConnHandle,
120
    watchmap: Option<Box<WatchMap>>,
121
}
122
123
impl Drop for Channel {
124
0
    fn drop(&mut self) {
125
0
        self.set_watch_enabled(false); // Make sure "watchmap" is destroyed before "handle" is
126
0
    }
127
}
128
129
impl Channel {
130
    #[inline(always)]
131
0
    pub (crate) fn conn(&self) -> *mut ffi::DBusConnection {
132
0
        self.handle.0
133
0
    }
134
135
0
    fn conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<Channel, Error> {
136
0
        let handle = ConnHandle(ptr, true);
137
0
138
0
        /* No, we don't want our app to suddenly quit if dbus goes down */
139
0
        unsafe { ffi::dbus_connection_set_exit_on_disconnect(ptr, 0) };
140
0
141
0
        let c = Channel { handle, watchmap: None };
142
0
143
0
        Ok(c)
144
0
    }
145
146
147
    /// Creates a new D-Bus connection.
148
    ///
149
    /// Blocking: until the connection is up and running.
150
0
    pub fn get_private(bus: BusType) -> Result<Channel, Error> {
151
0
        let mut e = Error::empty();
152
0
        let b = match bus {
153
0
            BusType::Session => ffi::DBusBusType::Session,
154
0
            BusType::System => ffi::DBusBusType::System,
155
0
            BusType::Starter => ffi::DBusBusType::Starter,
156
        };
157
0
        let conn = unsafe { ffi::dbus_bus_get_private(b, e.get_mut()) };
158
0
        if conn.is_null() {
159
0
            return Err(e)
160
0
        }
161
0
        Self::conn_from_ptr(conn)
162
0
    }
163
164
    /// Creates a new D-Bus connection to a remote address.
165
    ///
166
    /// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
167
    ///
168
    /// Blocking: until the connection is established.
169
0
    pub fn open_private(address: &str) -> Result<Channel, Error> {
170
0
        let mut e = Error::empty();
171
0
        let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
172
0
        if conn.is_null() {
173
0
            return Err(e)
174
0
        }
175
0
        Self::conn_from_ptr(conn)
176
0
    }
177
178
    /// Registers a new D-Bus connection with the bus.
179
    ///
180
    /// Note: `get_private` does this automatically, useful with `open_private`
181
    ///
182
    /// Blocking: until a "Hello" response is received from the server.
183
0
    pub fn register(&mut self) -> Result<(), Error> {
184
0
        // This function needs to take &mut self, because it changes unique_name and unique_name takes a &self
185
0
        let mut e = Error::empty();
186
0
        if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
187
0
            Err(e)
188
        } else {
189
0
            Ok(())
190
        }
191
0
    }
192
193
    /// Gets whether the connection is currently open.
194
0
    pub fn is_connected(&self) -> bool {
195
0
        unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
196
0
    }
197
198
    /// Get the connection's unique name.
199
    ///
200
    /// It's usually something like ":1.54"
201
0
    pub fn unique_name(&self) -> Option<&str> {
202
0
        let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
203
0
        if c.is_null() { return None; }
204
0
        let s = unsafe { CStr::from_ptr(c) };
205
0
        str::from_utf8(s.to_bytes()).ok()
206
0
    }
207
208
209
    /// Puts a message into libdbus out queue, and tries to send it.
210
    ///
211
    /// Returns a serial number than can be used to match against a reply.
212
    ///
213
    /// Note: usually the message is sent when this call happens, but in
214
    /// case internal D-Bus buffers are full, it will be left in the out queue.
215
    /// Call "flush" or "read_write" to retry flushing the out queue.
216
0
    pub fn send(&self, msg: Message) -> Result<u32, ()> {
217
0
        let mut serial = 0u32;
218
0
        let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
219
0
        if r == 0 { return Err(()); }
220
0
        Ok(serial)
221
0
    }
222
223
    /// Sends a message over the D-Bus and waits for a reply. This is used for method calls.
224
    ///
225
    /// Blocking: until a reply is received or the timeout expires.
226
    ///
227
    /// Note: In case of an error reply, this is returned as an Err(), not as a Ok(Message) with the error type.
228
    ///
229
    /// Note: In case pop_message and send_with_reply_and_block is called in parallel from different threads,
230
    /// they might race to retreive the reply message from the internal queue.
231
0
    pub fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
232
0
        let mut e = Error::empty();
233
0
        let response = unsafe {
234
0
            ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(),
235
0
                timeout.as_millis() as c_int, e.get_mut())
236
0
        };
237
0
        if response.is_null() {
238
0
            return Err(e);
239
0
        }
240
0
        Ok(Message::from_ptr(response, false))
241
0
    }
242
243
    /// Flush the queue of outgoing messages.
244
    ///
245
    /// Blocking: until the outgoing queue is empty.
246
0
    pub fn flush(&self) { unsafe { ffi::dbus_connection_flush(self.conn()) } }
247
248
    /// Read and write to the connection.
249
    ///
250
    /// Incoming messages are put in the internal queue, outgoing messages are written.
251
    ///
252
    /// Blocking: If there are no messages, for up to timeout, or forever if timeout is None.
253
    /// For non-blocking behaviour, set timeout to Some(0).
254
0
    pub fn read_write(&self, timeout: Option<Duration>) -> Result<(), ()> {
255
0
        let t = timeout.map_or(-1, |t| t.as_millis() as c_int);
256
0
        if unsafe { ffi::dbus_connection_read_write(self.conn(), t) == 0 } {
257
0
            Err(())
258
        } else {
259
0
            Ok(())
260
        }
261
0
    }
262
263
    /// Gets whether the output message buffer is non-empty
264
0
    pub fn has_messages_to_send(&self) -> bool {
265
0
        unsafe { ffi::dbus_connection_has_messages_to_send(self.conn()) == 1 }
266
0
    }
267
268
    /// Removes a message from the incoming queue, or returns None if the queue is empty.
269
    ///
270
    /// Use "read_write" first, so that messages are put into the incoming queue.
271
    /// For unhandled messages, please call MessageDispatcher::default_dispatch to return
272
    /// default replies for method calls.
273
0
    pub fn pop_message(&self) -> Option<Message> {
274
0
        let mptr = unsafe { ffi::dbus_connection_pop_message(self.conn()) };
275
0
        if mptr.is_null() {
276
0
            None
277
        } else {
278
0
            let msg = Message::from_ptr(mptr, false);
279
0
            // println!("Incoming: {:?}", msg);
280
0
            Some(msg)
281
        }
282
0
    }
283
284
    /// Removes a message from the incoming queue, or waits until timeout if the queue is empty.
285
    ///
286
0
    pub fn blocking_pop_message(&self, timeout: Duration) -> Result<Option<Message>, Error> {
287
0
        if let Some(msg) = self.pop_message() { return Ok(Some(msg)) }
288
0
        self.read_write(Some(timeout)).map_err(|_|
289
0
            Error::new_failed("Failed to read/write data, disconnected from D-Bus?")
290
0
        )?;
291
0
        Ok(self.pop_message())
292
0
    }
293
294
    /// Enables watch tracking, a prequisite for calling watch.
295
    ///
296
    /// (In theory, this could panic in case libdbus ever changes to listen to
297
    /// something else than one file descriptor,
298
    /// but this should be extremely unlikely to ever happen.)
299
0
    pub fn set_watch_enabled(&mut self, enable: bool) {
300
0
        if enable == self.watchmap.is_some() { return }
301
0
        if enable {
302
0
            self.watchmap = Some(WatchMap::new(ConnHandle(self.conn(), false)));
303
0
        } else {
304
0
            self.watchmap = None;
305
0
        }
306
0
    }
307
308
    /// Gets the file descriptor to listen for read/write.
309
    ///
310
    /// Panics: if set_watch_enabled is false.
311
    ///
312
    /// (In theory, this could panic in case libdbus ever changes to listen to
313
    /// something else than one file descriptor,
314
    /// but this should be extremely unlikely to ever happen.)
315
0
    pub fn watch(&self) -> Watch {
316
0
        let wm = self.watchmap.as_ref().unwrap();
317
0
        let rw = wm.current_rw.load(Ordering::Acquire);
318
0
        Watch {
319
0
            fd: wm.current_fd.unwrap(),
320
0
            read: (rw & 1) != 0,
321
0
            write: (rw & 2) != 0,
322
0
        }
323
0
    }
324
325
    /// Get an up-to-date list of file descriptors to watch.
326
    ///
327
    /// Obsolete - in practice, you can use watch and set_watch_enabled instead.
328
    #[deprecated]
329
0
    pub fn watch_fds(&mut self) -> Result<Vec<Watch>, ()> {
330
0
        let en = self.watchmap.is_some();
331
0
        self.set_watch_enabled(true);
332
0
        let mut wlist: Vec<Watch> = self.watchmap.as_ref().unwrap().list.lock().unwrap().values()
333
0
            .map(|&(w, b)| Watch { fd: w.fd, read: b && w.read, write: b && w.write })
334
0
            .collect();
335
0
        self.set_watch_enabled(en);
336
0
337
0
        if wlist.len() == 2 && wlist[0].fd == wlist[1].fd {
338
0
            // This is always true in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html
339
0
            wlist = vec!(Watch {
340
0
                fd: wlist[0].fd,
341
0
                read: wlist[0].read || wlist[1].read,
342
0
                write: wlist[0].write || wlist[1].write
343
            });
344
0
        }
345
346
0
        Ok(wlist)
347
0
    }
348
}
349
350
351
impl Watch {
352
0
    unsafe fn from_raw_enabled(watch: *mut ffi::DBusWatch) -> (Self, bool) {
353
0
        #[cfg(unix)]
354
0
        let mut w = Watch {fd: ffi::dbus_watch_get_unix_fd(watch), read: false, write: false};
355
0
        #[cfg(windows)]
356
0
        let mut w = Watch {fd: ffi::dbus_watch_get_socket(watch) as WatchFd, read: false, write: false};
357
0
        let enabled = ffi::dbus_watch_get_enabled(watch) != 0;
358
0
        let flags = ffi::dbus_watch_get_flags(watch);
359
0
        use std::os::raw::c_uint;
360
0
        w.read = (flags & ffi::DBUS_WATCH_READABLE as c_uint) != 0;
361
0
        w.write = (flags & ffi::DBUS_WATCH_WRITABLE as c_uint) != 0;
362
0
        (w, enabled)
363
0
    }
364
}