Coverage Report

Created: 2024-04-26 06:25

/rust/registry/src/index.crates.io-6f17d22bba15001f/dbus-0.9.7/src/nonblock.rs
Line
Count
Source (jump to first uncovered line)
1
//! Async version of connection.
2
//!
3
//! This module requires the `futures` feature to be enabled.
4
//!
5
//! Current status:
6
//!  * Basic client functionality is up and running, i e, you can make method calls and
7
//!    receive incoming messages (e g signals).
8
//!  * As for server side code, you can use the `tree` module with this connection, but it does not
9
//!    support async method handlers.
10
//!
11
//! You're probably going to need a companion crate - dbus-tokio - for this connection to make sense.
12
//! (Although you can also just call read_write and process_all at regular intervals, and possibly
13
//! set a timeout handler.)
14
15
16
use crate::{Error, Message};
17
use crate::channel::{MatchingReceiver, Channel, Sender, Token};
18
use crate::strings::{BusName, Path, Interface, Member};
19
use crate::arg::{AppendAll, ReadAll, IterAppend};
20
use crate::message::{MatchRule, MessageType};
21
22
use std::sync::{Arc, Mutex};
23
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24
use std::{task, pin, mem};
25
use std::cell::RefCell;
26
use std::time::Duration;
27
use crate::filters::Filters;
28
use std::future::Future;
29
use std::time::Instant;
30
use std::collections::HashMap;
31
32
33
#[allow(missing_docs)]
34
mod generated_org_freedesktop_standard_interfaces;
35
mod generated_org_freedesktop_dbus;
36
37
38
/// This module contains some standard interfaces and an easy way to call them.
39
///
40
/// See the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces) for more information about these standard interfaces.
41
///
42
/// The code was created by dbus-codegen.
43
pub mod stdintf {
44
    #[allow(missing_docs)]
45
    pub mod org_freedesktop_dbus {
46
        pub use super::super::generated_org_freedesktop_standard_interfaces::*;
47
        #[allow(unused_imports)]
48
        pub(crate) use super::super::generated_org_freedesktop_dbus::*;
49
50
0
        #[derive(Debug, PartialEq, Eq, Copy, Clone)]
51
        pub enum RequestNameReply {
52
            PrimaryOwner = 1,
53
            InQueue = 2,
54
            Exists = 3,
55
            AlreadyOwner = 4,
56
        }
57
58
0
        #[derive(Debug, PartialEq, Eq, Copy, Clone)]
59
        pub enum ReleaseNameReply {
60
            Released = 1,
61
            NonExistent = 2,
62
            NotOwner = 3,
63
        }
64
65
    }
66
}
67
68
69
type Replies<F> = HashMap<Token, F>;
70
71
/// A connection to D-Bus, thread local + async version
72
pub struct LocalConnection {
73
    channel: Channel,
74
    filters: RefCell<Filters<LocalFilterCb>>,
75
    replies: RefCell<Replies<LocalRepliesCb>>,
76
    timeout_maker: Option<TimeoutMakerCb>,
77
    waker: Option<WakerCb>,
78
    all_signal_matches: AtomicBool,
79
}
80
81
/// A connection to D-Bus, async version, which is Send but not Sync.
82
pub struct Connection {
83
    channel: Channel,
84
    filters: RefCell<Filters<FilterCb>>,
85
    replies: RefCell<Replies<RepliesCb>>,
86
    timeout_maker: Option<TimeoutMakerCb>,
87
    waker: Option<WakerCb>,
88
    all_signal_matches: AtomicBool,
89
}
90
91
/// A connection to D-Bus, Send + Sync + async version
92
pub struct SyncConnection {
93
    channel: Channel,
94
    filters: Mutex<Filters<SyncFilterCb>>,
95
    replies: Mutex<Replies<SyncRepliesCb>>,
96
    timeout_maker: Option<TimeoutMakerCb>,
97
    waker: Option<WakerCb>,
98
    all_signal_matches: AtomicBool,
99
}
100
101
use stdintf::org_freedesktop_dbus::DBus;
102
103
macro_rules! connimpl {
104
     ($c: ident, $cb: ident, $rcb: ident $(, $ss:tt)*) =>  {
105
106
type
107
    $cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>;
108
type
109
    $rcb = Box<dyn FnOnce(Message, &$c) $(+ $ss)* + 'static>;
110
111
impl From<Channel> for $c {
112
0
    fn from(x: Channel) -> Self {
113
0
        $c {
114
0
            channel: x,
115
0
            replies: Default::default(),
116
0
            filters: Default::default(),
117
0
            timeout_maker: None,
118
0
            waker: None,
119
0
            all_signal_matches: AtomicBool::new(false),
120
0
        }
121
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection as core::convert::From<dbus::channel::ffichannel::Channel>>::from
Unexecuted instantiation: <dbus::nonblock::LocalConnection as core::convert::From<dbus::channel::ffichannel::Channel>>::from
Unexecuted instantiation: <dbus::nonblock::SyncConnection as core::convert::From<dbus::channel::ffichannel::Channel>>::from
122
}
123
124
impl AsRef<Channel> for $c {
125
0
    fn as_ref(&self) -> &Channel { &self.channel }
Unexecuted instantiation: <dbus::nonblock::LocalConnection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref
Unexecuted instantiation: <dbus::nonblock::SyncConnection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref
Unexecuted instantiation: <dbus::nonblock::Connection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref
126
}
127
128
impl Sender for $c {
129
0
    fn send(&self, msg: Message) -> Result<u32, ()> {
130
0
        let token = self.channel.send(msg);
131
0
        if self.channel.has_messages_to_send() {
132
            // Non-blocking send failed
133
            // Wake up task that will send the message
134
0
            if self.waker.as_ref().map(|wake| wake().is_err() ).unwrap_or(false) {
135
0
                return Err(());
136
0
            }
137
0
        }
138
0
        token
139
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::Sender>::send
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::Sender>::send
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::Sender>::send
140
}
141
142
impl MatchingReceiver for $c {
143
    type F = $cb;
144
0
    fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token {
145
0
        self.filters_mut().add(m, f)
146
0
    }
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::MatchingReceiver>::start_receive
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::MatchingReceiver>::start_receive
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::MatchingReceiver>::start_receive
147
0
    fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> {
148
0
        self.filters_mut().remove(id)
149
0
    }
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::MatchingReceiver>::stop_receive
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::MatchingReceiver>::stop_receive
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::MatchingReceiver>::stop_receive
150
}
151
152
impl NonblockReply for $c {
153
    type F = $rcb;
154
0
    fn send_with_reply(&self, msg: Message, f: Self::F) -> Result<Token, ()> {
155
0
        let token = {
156
0
            // We must hold the mutex from moment we send the message
157
0
            // To moment we set a handler for the reply
158
0
            // So reply can't arrive before we set handler
159
0
            let mut replies = self.replies_mut();
160
0
            self.channel.send(msg).map(|x| {
161
                let t = Token(x as usize);
162
                replies.insert(t, f);
163
                t
164
0
            })
165
0
        };
166
0
        if self.channel.has_messages_to_send() {
167
            // Non-blocking send failed
168
            // Wake up task that will send the message
169
0
            if self.waker.as_ref().map(|wake| wake().is_err() ).unwrap_or(false) {
170
0
                return Err(());
171
0
            }
172
0
        }
173
0
        token
174
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply
175
0
    fn cancel_reply(&self, id: Token) -> Option<Self::F> { self.replies_mut().remove(&id) }
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::cancel_reply
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::cancel_reply
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::cancel_reply
176
0
    fn make_f<G: FnOnce(Message, &Self) + Send + 'static>(g: G) -> Self::F { Box::new(g) }
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::make_f::<_>
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::make_f::<_>
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::make_f::<_>
177
0
    fn timeout_maker(&self) -> Option<TimeoutMakerCb> { self.timeout_maker }
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::timeout_maker
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::timeout_maker
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::timeout_maker
178
0
    fn set_timeout_maker(&mut self, f: Option<TimeoutMakerCb>) -> Option<TimeoutMakerCb> {
179
0
        mem::replace(&mut self.timeout_maker, f)
180
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::set_timeout_maker
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::set_timeout_maker
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::set_timeout_maker
181
0
    fn set_waker(&mut self, f: Option<WakerCb>) -> Option<WakerCb> {
182
0
        mem::replace(&mut self.waker, f)
183
0
    }
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::set_waker
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::set_waker
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::set_waker
184
}
185
186
187
impl Process for $c {
188
0
    fn process_one(&self, msg: Message) {
189
0
        if let Some(serial) = msg.get_reply_serial() {
190
0
            if let Some(f) = self.replies_mut().remove(&Token(serial as usize)) {
191
0
                f(msg, self);
192
0
                return;
193
0
            }
194
0
        }
195
0
        if self.all_signal_matches.load(Ordering::Acquire) && msg.msg_type() == MessageType::Signal {
196
            // If it's a signal and the mode is enabled, send a copy of the message to all
197
            // matching filters.
198
0
            let matching_filters = self.filters_mut().remove_all_matching(&msg);
199
            // `matching_filters` needs to be a separate variable and not inlined here, because if
200
            // it's inline then the `MutexGuard` will live too long and we'll get a deadlock on the
201
            // next call to `filters_mut()` below.
202
0
            for mut ff in matching_filters {
203
0
                if let Ok(copy) = msg.duplicate() {
204
0
                    if ff.2(copy, self) {
205
0
                        self.filters_mut().insert(ff);
206
0
                    }
207
0
                } else {
208
0
                    // Silently drop the message, but add the filter back.
209
0
                    self.filters_mut().insert(ff);
210
0
                }
211
            }
212
        } else {
213
            // Otherwise, send the original message to only the first matching filter.
214
0
            let ff = self.filters_mut().remove_first_matching(&msg);
215
0
            if let Some(mut ff) = ff {
216
0
                if ff.2(msg, self) {
217
0
                    self.filters_mut().insert(ff);
218
0
                }
219
0
            } else if let Some(reply) = crate::channel::default_reply(&msg) {
220
0
                let _ = self.channel.send(reply);
221
0
            }
222
        }
223
0
    }
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::Process>::process_one
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::Process>::process_one
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::Process>::process_one
224
}
225
226
impl $c {
227
0
    fn dbus_proxy(&self) -> Proxy<&Self> {
228
0
        Proxy::new("org.freedesktop.DBus", "/org/freedesktop/DBus", Duration::from_secs(10), self)
229
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection>::dbus_proxy
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::dbus_proxy
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::dbus_proxy
230
231
    /// Get the connection's unique name.
232
    ///
233
    /// It's usually something like ":1.54"
234
0
    pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() }
Unexecuted instantiation: <dbus::nonblock::Connection>::unique_name
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::unique_name
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::unique_name
235
236
    /// Request a name on the D-Bus.
237
    ///
238
    /// For detailed information on the flags and return values, see the libdbus documentation.
239
0
    pub async fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool)
240
0
    -> Result<stdintf::org_freedesktop_dbus::RequestNameReply, Error> {
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_>
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_>
Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_>
241
        let flags: u32 =
242
            if allow_replacement { 1 } else { 0 } +
243
            if replace_existing { 2 } else { 0 } +
244
            if do_not_queue { 4 } else { 0 };
245
        let r = self.dbus_proxy().request_name(&name.into(), flags).await?;
246
        use stdintf::org_freedesktop_dbus::RequestNameReply::*;
247
        let all = [PrimaryOwner, InQueue, Exists, AlreadyOwner];
248
        all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
249
            crate::Error::new_failed("Invalid reply from DBus server")
250
        )
251
    }
252
253
    /// Release a previously requested name on the D-Bus.
254
0
    pub async fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<stdintf::org_freedesktop_dbus::ReleaseNameReply, Error> {
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_>
Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_>
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_>
255
        let r = self.dbus_proxy().release_name(&name.into()).await?;
256
        use stdintf::org_freedesktop_dbus::ReleaseNameReply::*;
257
        let all = [Released, NonExistent, NotOwner];
258
        all.iter().find(|x| **x as u32 == r).copied().ok_or_else(||
259
            crate::Error::new_failed("Invalid reply from DBus server")
260
        )
261
    }
262
263
    /// Adds a new match to the connection, and sets up a callback when this message arrives.
264
    ///
265
    /// If multiple [`MatchRule`]s match the same message, then by default only the first will get
266
    /// the callback. This behaviour can be changed for signal messages by calling
267
    /// [`set_signal_match_mode`](Self::set_signal_match_mode).
268
    ///
269
    /// The returned value can be used to remove the match.
270
0
    pub async fn add_match(&self, match_rule: MatchRule<'static>) -> Result<MsgMatch, Error> {
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match
Unexecuted instantiation: <dbus::nonblock::Connection>::add_match
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match
271
        let m = match_rule.match_str();
272
        self.add_match_no_cb(&m).await?;
273
        let mi = Arc::new(MatchInner {
274
            token: Default::default(),
275
            cb: Default::default(),
276
        });
277
        let mi_weak = Arc::downgrade(&mi);
278
        let token = self.start_receive(match_rule, Box::new(move |msg, _| {
279
            mi_weak.upgrade().map(|mi| mi.incoming(msg)).unwrap_or(false)
280
        }));
281
        mi.token.store(token.0, Ordering::SeqCst);
282
        Ok(MsgMatch(mi))
283
    }
284
285
286
    /// Adds a new match to the connection, without setting up a callback when this message arrives.
287
0
    pub async fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
Unexecuted instantiation: <dbus::nonblock::Connection>::add_match_no_cb
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match_no_cb
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match_no_cb
288
        self.dbus_proxy().add_match(match_str).await
289
    }
290
291
    /// Removes a match from the connection, without removing any callbacks.
292
0
    pub async fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> {
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match_no_cb
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match_no_cb
Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match_no_cb
293
        self.dbus_proxy().remove_match(match_str).await
294
    }
295
296
    /// Removes a previously added match and callback from the connection.
297
0
    pub async fn remove_match(&self, id: Token) -> Result<(), Error> {
Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match
298
        let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found"))?;
299
        self.remove_match_no_cb(&mr.match_str()).await
300
    }
301
302
    /// If true, configures the connection to send signal messages to all matching [`MatchRule`]
303
    /// filters added with [`add_match`](Self::add_match) rather than just the first one. This comes
304
    /// with the following gotchas:
305
    ///
306
    ///  * The messages might be duplicated, so the message serial might be lost (this is
307
    ///    generally not a problem for signals).
308
    ///  * Panicking inside a match callback might mess with other callbacks, causing them
309
    ///    to be permanently dropped.
310
    ///  * Removing other matches from inside a match callback is not supported.
311
    ///
312
    /// This is false by default, for a newly-created connection.
313
0
    pub fn set_signal_match_mode(&self, match_all: bool) {
314
0
        self.all_signal_matches.store(match_all, Ordering::Release);
315
0
    }
Unexecuted instantiation: <dbus::nonblock::Connection>::set_signal_match_mode
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::set_signal_match_mode
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::set_signal_match_mode
316
}
317
318
319
    }
320
}
321
322
0
connimpl!(Connection, FilterCb, RepliesCb, Send);
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::Sender>::send::{closure#0}
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1}
Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0}
Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_>::{closure#0}::{closure#1}
Unexecuted instantiation: <dbus::nonblock::Connection>::add_match::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_>::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_>::{closure#0}::{closure#1}
Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_>::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match::{closure#0}::{closure#0}
323
0
connimpl!(LocalConnection, LocalFilterCb, LocalRepliesCb);
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1}
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::Sender>::send::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_>::{closure#0}::{closure#1}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_>::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_>::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_>::{closure#0}::{closure#1}
324
0
connimpl!(SyncConnection, SyncFilterCb, SyncRepliesCb, Send);
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0}
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::Sender>::send::{closure#0}
Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_>::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_>::{closure#0}::{closure#1}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match::{closure#0}::{closure#0}::{closure#0}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_>::{closure#0}::{closure#1}
Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_>::{closure#0}::{closure#0}
325
326
impl Connection {
327
0
    fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() }
328
0
    fn replies_mut(&self) -> std::cell::RefMut<Replies<RepliesCb>> { self.replies.borrow_mut() }
329
}
330
331
impl LocalConnection {
332
0
    fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() }
333
0
    fn replies_mut(&self) -> std::cell::RefMut<Replies<LocalRepliesCb>> { self.replies.borrow_mut() }
334
}
335
336
impl SyncConnection {
337
0
    fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() }
338
0
    fn replies_mut(&self) -> std::sync::MutexGuard<Replies<SyncRepliesCb>> { self.replies.lock().unwrap() }
339
}
340
341
/// Internal callback for the executor when a timeout needs to be made.
342
pub type TimeoutMakerCb = fn(timeout: Instant) -> pin::Pin<Box<dyn Future<Output=()> + Send + Sync + 'static>>;
343
344
/// Internal callback for the executor when we need wakeup a task
345
pub type WakerCb = Box<dyn Fn() -> Result<(), ()> + Send + Sync +'static>;
346
347
/// Internal helper trait for async method replies.
348
pub trait NonblockReply {
349
    /// Callback type
350
    type F;
351
    /// Sends a message and calls the callback when a reply is received.
352
    fn send_with_reply(&self, msg: Message, f: Self::F) -> Result<Token, ()>;
353
    /// Cancels a pending reply.
354
    fn cancel_reply(&self, id: Token) -> Option<Self::F>;
355
    /// Internal helper function that creates a callback.
356
    fn make_f<G: FnOnce(Message, &Self) + Send + 'static>(g: G) -> Self::F where Self: Sized;
357
    /// Set the internal timeout maker
358
    fn set_timeout_maker(&mut self, f: Option<TimeoutMakerCb>) -> Option<TimeoutMakerCb>;
359
    /// Get the internal timeout maker
360
    fn timeout_maker(&self) -> Option<TimeoutMakerCb>;
361
    /// Set the wakeup call
362
    fn set_waker(&mut self, f: Option<WakerCb>) -> Option<WakerCb>;
363
}
364
365
366
/// Internal helper trait, implemented for connections that process incoming messages.
367
pub trait Process: Sender + AsRef<Channel> {
368
    /// Dispatches all pending messages, without blocking.
369
    ///
370
    /// This is usually called from the reactor only, after read_write.
371
    /// Despite this taking &self and not "&mut self", it is a logic error to call this
372
    /// recursively or from more than one thread at a time.
373
0
    fn process_all(&self) {
374
0
        let c: &Channel = self.as_ref();
375
0
        while let Some(msg) = c.pop_message() {
376
0
            self.process_one(msg);
377
0
        }
378
0
    }
379
380
    /// Dispatches a message.
381
    fn process_one(&self, msg: Message);
382
}
383
384
/// A struct used to handle incoming matches
385
///
386
/// Note: Due to the lack of async destructors, please call Connection.remove_match()
387
/// in order to properly stop matching (instead of just dropping this struct).
388
pub struct MsgMatch(Arc<MatchInner>);
389
390
struct MatchInner {
391
    token: AtomicUsize,
392
    cb: Mutex<Option<Box<dyn FnMut(Message) -> bool + Send>>>,
393
}
394
395
impl MatchInner {
396
0
    fn incoming(&self, msg: Message) -> bool {
397
0
        if let Some(ref mut cb) = self.cb.lock().unwrap().as_mut() {
398
0
            cb(msg)
399
        }
400
0
        else { true }
401
0
    }
402
}
403
404
impl MsgMatch {
405
    /// Configures the match to receive a synchronous callback with only a message parameter.
406
0
    pub fn msg_cb<F: FnMut(Message) -> bool + Send + 'static>(self, f: F) -> Self {
407
0
        {
408
0
            let mut cb = self.0.cb.lock().unwrap();
409
0
            *cb = Some(Box::new(f));
410
0
        }
411
0
        self
412
0
    }
413
414
    /// Configures the match to receive a synchronous callback with a message parameter and typed
415
    /// message arguments.
416
    ///
417
    /// # Example
418
    ///
419
    /// ```ignore
420
    /// let mr = MatchRule::new_signal("com.example.dbustest", "HelloHappened");
421
    /// let incoming_signal = connection.add_match(mr).await?.cb(|_, (source,): (String,)| {
422
    ///    println!("Hello from {} happened on the bus!", source);
423
    ///    true
424
    /// });
425
    /// ```
426
0
    pub fn cb<R: ReadAll, F: FnMut(Message, R) -> bool + Send + 'static>(self, mut f: F) -> Self {
427
0
        self.msg_cb(move |msg| {
428
0
            if let Ok(r) = R::read(&mut msg.iter_init()) {
429
0
                f(msg, r)
430
0
            } else { true }
431
0
        })
432
0
    }
433
434
    /// Configures the match to receive a stream of messages.
435
    ///
436
    /// Note: If the receiving end is disconnected and a message is received,
437
    /// the message matching will end but not in a clean fashion. Call remove_match() to
438
    /// stop matching cleanly.
439
0
    pub fn msg_stream(self) -> (Self, futures_channel::mpsc::UnboundedReceiver<Message>) {
440
0
        let (sender, receiver) = futures_channel::mpsc::unbounded();
441
0
        (self.msg_cb(move |msg| {
442
0
            sender.unbounded_send(msg).is_ok()
443
0
        }), receiver)
444
0
    }
445
446
    /// Configures the match to receive a stream of messages, parsed and ready.
447
    ///
448
    /// Note: If the receiving end is disconnected and a message is received,
449
    /// the message matching will end but not in a clean fashion. Call remove_match() to
450
    /// stop matching cleanly.
451
    ///
452
    /// # Example
453
    ///
454
    /// ```ignore
455
    /// let mr = MatchRule::new_signal("com.example.dbustest", "HelloHappened");
456
    /// let (incoming_signal, stream) = conn.add_match(mr).await?.stream();
457
    /// let stream = stream.for_each(|(_, (source,)): (_, (String,))| {
458
    ///    println!("Hello from {} happened on the bus!", source);
459
    ///    async {}
460
    /// });
461
    /// ```
462
0
    pub fn stream<R: ReadAll + Send + 'static>(self) -> (Self, futures_channel::mpsc::UnboundedReceiver<(Message, R)>) {
463
0
        let (sender, receiver) = futures_channel::mpsc::unbounded();
464
0
        (self.cb(move |msg, r| {
465
0
            sender.unbounded_send((msg, r)).is_ok()
466
0
        }), receiver)
467
0
    }
468
469
    /// The token retreived can be used in a call to remove_match to stop matching on the data.
470
0
    pub fn token(&self) -> Token { Token(self.0.token.load(Ordering::SeqCst)) }
471
}
472
473
/// A struct that wraps a connection, destination and path.
474
///
475
/// A D-Bus "Proxy" is a client-side object that corresponds to a remote object on the server side.
476
/// Calling methods on the proxy object calls methods on the remote object.
477
/// Read more in the [D-Bus tutorial](https://dbus.freedesktop.org/doc/dbus-tutorial.html#proxies)
478
0
#[derive(Clone, Debug)]
479
pub struct Proxy<'a, C> {
480
    /// Destination, i e what D-Bus service you're communicating with
481
    pub destination: BusName<'a>,
482
    /// Object path on the destination
483
    pub path: Path<'a>,
484
    /// Some way to send and/or receive messages, non-blocking.
485
    pub connection: C,
486
    /// Timeout for method calls
487
    pub timeout: Duration,
488
}
489
490
impl<'a, C> Proxy<'a, C> {
491
    /// Creates a new proxy struct.
492
0
    pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self {
493
0
        Proxy { destination: dest.into(), path: path.into(), timeout, connection }
494
0
    }
Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::LocalConnection>>::new::<&str, &str>
Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::Connection>>::new::<&str, &str>
Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::SyncConnection>>::new::<&str, &str>
495
}
496
497
struct MRAwait {
498
    mrouter: MROuter,
499
    token: Result<Token, ()>,
500
    timeout: Instant,
501
    timeoutfn: Option<TimeoutMakerCb>
502
}
503
504
0
async fn method_call_await(mra: MRAwait) -> Result<Message, Error> {
505
0
    use futures_util::future;
506
0
    let MRAwait { mrouter, token, timeout, timeoutfn } = mra;
507
0
    if token.is_err() { return Err(Error::new_failed("Failed to send message")) };
508
0
    let timeout = if let Some(tfn) = timeoutfn { tfn(timeout) } else { Box::pin(future::pending()) };
509
0
    match future::select(mrouter, timeout).await {
510
0
        future::Either::Left((r, _)) => r,
511
0
        future::Either::Right(_) => Err(Error::new_custom("org.freedesktop.DBus.Error.Timeout", "Timeout waiting for reply")),
512
    }
513
0
}
514
515
impl<'a, T, C> Proxy<'a, C>
516
where
517
    T: NonblockReply,
518
    C: std::ops::Deref<Target=T>
519
{
520
521
0
    fn method_call_setup(&self, msg: Message) -> MRAwait {
522
0
        let mr = Arc::new(Mutex::new(MRInner::Neither));
523
0
        let mrouter = MROuter(mr.clone());
524
0
        let f = T::make_f(move |msg: Message, _: &T| {
525
0
            let mut inner = mr.lock().unwrap();
526
0
            let old = mem::replace(&mut *inner, MRInner::Ready(Ok(msg)));
527
0
            if let MRInner::Pending(waker) = old { waker.wake() }
528
0
        });
529
0
530
0
        let timeout = Instant::now() + self.timeout;
531
0
        let token = self.connection.send_with_reply(msg, f);
532
0
        let timeoutfn = self.connection.timeout_maker();
533
0
        MRAwait { mrouter, token, timeout, timeoutfn }
534
0
    }
535
536
    /// Make a method call using typed input argument, returns a future that resolves to the typed output arguments.
537
0
    pub fn method_call<'i, 'm, R: ReadAll + 'static, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A)
538
0
    -> MethodReply<R> {
539
0
        let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into());
540
0
        args.append(&mut IterAppend::new(&mut msg));
541
0
        let mra = self.method_call_setup(msg);
542
0
        let r = method_call_await(mra);
543
0
        let r = futures_util::FutureExt::map(r, |r| -> Result<R, Error> { r.and_then(|rmsg| rmsg.read_all()) } );
544
0
        MethodReply::new(r)
545
0
    }
546
}
547
548
enum MRInner {
549
    Ready(Result<Message, Error>),
550
    Pending(task::Waker),
551
    Neither,
552
}
553
554
struct MROuter(Arc<Mutex<MRInner>>);
555
556
impl Future for MROuter {
557
    type Output = Result<Message, Error>;
558
0
    fn poll(self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
559
0
        let mut inner = self.0.lock().unwrap();
560
0
        let r = mem::replace(&mut *inner, MRInner::Neither);
561
0
        if let MRInner::Ready(r) = r { task::Poll::Ready(r) }
562
        else {
563
0
            *inner = MRInner::Pending(ctx.waker().clone());
564
0
            return task::Poll::Pending
565
        }
566
0
    }
567
}
568
569
/// Future method reply, used while waiting for a method call reply from the server.
570
pub struct MethodReply<T>(pin::Pin<Box<dyn Future<Output=Result<T, Error>> + Send + 'static>>);
571
572
impl<T> MethodReply<T> {
573
    /// Creates a new method reply from a future.
574
0
    fn new<Fut: Future<Output=Result<T, Error>> + Send + 'static>(fut: Fut) -> Self {
575
0
        MethodReply(Box::pin(fut))
576
0
    }
577
}
578
579
impl<T> Future for MethodReply<T> {
580
    type Output = Result<T, Error>;
581
0
    fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Result<T, Error>> {
582
0
        self.0.as_mut().poll(ctx)
583
0
    }
584
}
585
586
impl<T: 'static> MethodReply<T> {
587
    /// Convenience combinator in case you want to post-process the result after reading it
588
0
    pub fn and_then<T2>(self, f: impl FnOnce(T) -> Result<T2, Error> + Send + Sync + 'static) -> MethodReply<T2> {
589
0
        MethodReply(Box::pin(async move {
590
0
            let x = self.0.await?;
591
0
            f(x)
592
0
        }))
593
0
    }
594
}
595
596
#[test]
597
fn test_conn_send_sync() {
598
    fn is_send<T: Send>() {}
599
    fn is_sync<T: Sync>() {}
600
    is_send::<Connection>();
601
    is_send::<SyncConnection>();
602
    is_sync::<SyncConnection>();
603
    is_send::<MsgMatch>();
604
}