/rust/registry/src/index.crates.io-6f17d22bba15001f/dbus-0.9.7/src/nonblock.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Async version of connection. |
2 | | //! |
3 | | //! This module requires the `futures` feature to be enabled. |
4 | | //! |
5 | | //! Current status: |
6 | | //! * Basic client functionality is up and running, i e, you can make method calls and |
7 | | //! receive incoming messages (e g signals). |
8 | | //! * As for server side code, you can use the `tree` module with this connection, but it does not |
9 | | //! support async method handlers. |
10 | | //! |
11 | | //! You're probably going to need a companion crate - dbus-tokio - for this connection to make sense. |
12 | | //! (Although you can also just call read_write and process_all at regular intervals, and possibly |
13 | | //! set a timeout handler.) |
14 | | |
15 | | |
16 | | use crate::{Error, Message}; |
17 | | use crate::channel::{MatchingReceiver, Channel, Sender, Token}; |
18 | | use crate::strings::{BusName, Path, Interface, Member}; |
19 | | use crate::arg::{AppendAll, ReadAll, IterAppend}; |
20 | | use crate::message::{MatchRule, MessageType}; |
21 | | |
22 | | use std::sync::{Arc, Mutex}; |
23 | | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
24 | | use std::{task, pin, mem}; |
25 | | use std::cell::RefCell; |
26 | | use std::time::Duration; |
27 | | use crate::filters::Filters; |
28 | | use std::future::Future; |
29 | | use std::time::Instant; |
30 | | use std::collections::HashMap; |
31 | | |
32 | | |
33 | | #[allow(missing_docs)] |
34 | | mod generated_org_freedesktop_standard_interfaces; |
35 | | mod generated_org_freedesktop_dbus; |
36 | | |
37 | | |
38 | | /// This module contains some standard interfaces and an easy way to call them. |
39 | | /// |
40 | | /// See the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces) for more information about these standard interfaces. |
41 | | /// |
42 | | /// The code was created by dbus-codegen. |
43 | | pub mod stdintf { |
44 | | #[allow(missing_docs)] |
45 | | pub mod org_freedesktop_dbus { |
46 | | pub use super::super::generated_org_freedesktop_standard_interfaces::*; |
47 | | #[allow(unused_imports)] |
48 | | pub(crate) use super::super::generated_org_freedesktop_dbus::*; |
49 | | |
50 | 0 | #[derive(Debug, PartialEq, Eq, Copy, Clone)] |
51 | | pub enum RequestNameReply { |
52 | | PrimaryOwner = 1, |
53 | | InQueue = 2, |
54 | | Exists = 3, |
55 | | AlreadyOwner = 4, |
56 | | } |
57 | | |
58 | 0 | #[derive(Debug, PartialEq, Eq, Copy, Clone)] |
59 | | pub enum ReleaseNameReply { |
60 | | Released = 1, |
61 | | NonExistent = 2, |
62 | | NotOwner = 3, |
63 | | } |
64 | | |
65 | | } |
66 | | } |
67 | | |
68 | | |
69 | | type Replies<F> = HashMap<Token, F>; |
70 | | |
71 | | /// A connection to D-Bus, thread local + async version |
72 | | pub struct LocalConnection { |
73 | | channel: Channel, |
74 | | filters: RefCell<Filters<LocalFilterCb>>, |
75 | | replies: RefCell<Replies<LocalRepliesCb>>, |
76 | | timeout_maker: Option<TimeoutMakerCb>, |
77 | | waker: Option<WakerCb>, |
78 | | all_signal_matches: AtomicBool, |
79 | | } |
80 | | |
81 | | /// A connection to D-Bus, async version, which is Send but not Sync. |
82 | | pub struct Connection { |
83 | | channel: Channel, |
84 | | filters: RefCell<Filters<FilterCb>>, |
85 | | replies: RefCell<Replies<RepliesCb>>, |
86 | | timeout_maker: Option<TimeoutMakerCb>, |
87 | | waker: Option<WakerCb>, |
88 | | all_signal_matches: AtomicBool, |
89 | | } |
90 | | |
91 | | /// A connection to D-Bus, Send + Sync + async version |
92 | | pub struct SyncConnection { |
93 | | channel: Channel, |
94 | | filters: Mutex<Filters<SyncFilterCb>>, |
95 | | replies: Mutex<Replies<SyncRepliesCb>>, |
96 | | timeout_maker: Option<TimeoutMakerCb>, |
97 | | waker: Option<WakerCb>, |
98 | | all_signal_matches: AtomicBool, |
99 | | } |
100 | | |
101 | | use stdintf::org_freedesktop_dbus::DBus; |
102 | | |
103 | | macro_rules! connimpl { |
104 | | ($c: ident, $cb: ident, $rcb: ident $(, $ss:tt)*) => { |
105 | | |
106 | | type |
107 | | $cb = Box<dyn FnMut(Message, &$c) -> bool $(+ $ss)* + 'static>; |
108 | | type |
109 | | $rcb = Box<dyn FnOnce(Message, &$c) $(+ $ss)* + 'static>; |
110 | | |
111 | | impl From<Channel> for $c { |
112 | 0 | fn from(x: Channel) -> Self { |
113 | 0 | $c { |
114 | 0 | channel: x, |
115 | 0 | replies: Default::default(), |
116 | 0 | filters: Default::default(), |
117 | 0 | timeout_maker: None, |
118 | 0 | waker: None, |
119 | 0 | all_signal_matches: AtomicBool::new(false), |
120 | 0 | } |
121 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection as core::convert::From<dbus::channel::ffichannel::Channel>>::from Unexecuted instantiation: <dbus::nonblock::LocalConnection as core::convert::From<dbus::channel::ffichannel::Channel>>::from Unexecuted instantiation: <dbus::nonblock::SyncConnection as core::convert::From<dbus::channel::ffichannel::Channel>>::from |
122 | | } |
123 | | |
124 | | impl AsRef<Channel> for $c { |
125 | 0 | fn as_ref(&self) -> &Channel { &self.channel } Unexecuted instantiation: <dbus::nonblock::LocalConnection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref Unexecuted instantiation: <dbus::nonblock::SyncConnection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref Unexecuted instantiation: <dbus::nonblock::Connection as core::convert::AsRef<dbus::channel::ffichannel::Channel>>::as_ref |
126 | | } |
127 | | |
128 | | impl Sender for $c { |
129 | 0 | fn send(&self, msg: Message) -> Result<u32, ()> { |
130 | 0 | let token = self.channel.send(msg); |
131 | 0 | if self.channel.has_messages_to_send() { |
132 | | // Non-blocking send failed |
133 | | // Wake up task that will send the message |
134 | 0 | if self.waker.as_ref().map(|wake| wake().is_err() ).unwrap_or(false) { |
135 | 0 | return Err(()); |
136 | 0 | } |
137 | 0 | } |
138 | 0 | token |
139 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::Sender>::send Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::Sender>::send Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::Sender>::send |
140 | | } |
141 | | |
142 | | impl MatchingReceiver for $c { |
143 | | type F = $cb; |
144 | 0 | fn start_receive(&self, m: MatchRule<'static>, f: Self::F) -> Token { |
145 | 0 | self.filters_mut().add(m, f) |
146 | 0 | } Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::MatchingReceiver>::start_receive Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::MatchingReceiver>::start_receive Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::MatchingReceiver>::start_receive |
147 | 0 | fn stop_receive(&self, id: Token) -> Option<(MatchRule<'static>, Self::F)> { |
148 | 0 | self.filters_mut().remove(id) |
149 | 0 | } Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::MatchingReceiver>::stop_receive Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::MatchingReceiver>::stop_receive Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::MatchingReceiver>::stop_receive |
150 | | } |
151 | | |
152 | | impl NonblockReply for $c { |
153 | | type F = $rcb; |
154 | 0 | fn send_with_reply(&self, msg: Message, f: Self::F) -> Result<Token, ()> { |
155 | 0 | let token = { |
156 | 0 | // We must hold the mutex from moment we send the message |
157 | 0 | // To moment we set a handler for the reply |
158 | 0 | // So reply can't arrive before we set handler |
159 | 0 | let mut replies = self.replies_mut(); |
160 | 0 | self.channel.send(msg).map(|x| { |
161 | | let t = Token(x as usize); |
162 | | replies.insert(t, f); |
163 | | t |
164 | 0 | }) |
165 | 0 | }; |
166 | 0 | if self.channel.has_messages_to_send() { |
167 | | // Non-blocking send failed |
168 | | // Wake up task that will send the message |
169 | 0 | if self.waker.as_ref().map(|wake| wake().is_err() ).unwrap_or(false) { |
170 | 0 | return Err(()); |
171 | 0 | } |
172 | 0 | } |
173 | 0 | token |
174 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply |
175 | 0 | fn cancel_reply(&self, id: Token) -> Option<Self::F> { self.replies_mut().remove(&id) } Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::cancel_reply Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::cancel_reply Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::cancel_reply |
176 | 0 | fn make_f<G: FnOnce(Message, &Self) + Send + 'static>(g: G) -> Self::F { Box::new(g) } Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::make_f::<_> Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::make_f::<_> Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::make_f::<_> |
177 | 0 | fn timeout_maker(&self) -> Option<TimeoutMakerCb> { self.timeout_maker } Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::timeout_maker Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::timeout_maker Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::timeout_maker |
178 | 0 | fn set_timeout_maker(&mut self, f: Option<TimeoutMakerCb>) -> Option<TimeoutMakerCb> { |
179 | 0 | mem::replace(&mut self.timeout_maker, f) |
180 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::set_timeout_maker Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::set_timeout_maker Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::set_timeout_maker |
181 | 0 | fn set_waker(&mut self, f: Option<WakerCb>) -> Option<WakerCb> { |
182 | 0 | mem::replace(&mut self.waker, f) |
183 | 0 | } Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::set_waker Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::set_waker Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::set_waker |
184 | | } |
185 | | |
186 | | |
187 | | impl Process for $c { |
188 | 0 | fn process_one(&self, msg: Message) { |
189 | 0 | if let Some(serial) = msg.get_reply_serial() { |
190 | 0 | if let Some(f) = self.replies_mut().remove(&Token(serial as usize)) { |
191 | 0 | f(msg, self); |
192 | 0 | return; |
193 | 0 | } |
194 | 0 | } |
195 | 0 | if self.all_signal_matches.load(Ordering::Acquire) && msg.msg_type() == MessageType::Signal { |
196 | | // If it's a signal and the mode is enabled, send a copy of the message to all |
197 | | // matching filters. |
198 | 0 | let matching_filters = self.filters_mut().remove_all_matching(&msg); |
199 | | // `matching_filters` needs to be a separate variable and not inlined here, because if |
200 | | // it's inline then the `MutexGuard` will live too long and we'll get a deadlock on the |
201 | | // next call to `filters_mut()` below. |
202 | 0 | for mut ff in matching_filters { |
203 | 0 | if let Ok(copy) = msg.duplicate() { |
204 | 0 | if ff.2(copy, self) { |
205 | 0 | self.filters_mut().insert(ff); |
206 | 0 | } |
207 | 0 | } else { |
208 | 0 | // Silently drop the message, but add the filter back. |
209 | 0 | self.filters_mut().insert(ff); |
210 | 0 | } |
211 | | } |
212 | | } else { |
213 | | // Otherwise, send the original message to only the first matching filter. |
214 | 0 | let ff = self.filters_mut().remove_first_matching(&msg); |
215 | 0 | if let Some(mut ff) = ff { |
216 | 0 | if ff.2(msg, self) { |
217 | 0 | self.filters_mut().insert(ff); |
218 | 0 | } |
219 | 0 | } else if let Some(reply) = crate::channel::default_reply(&msg) { |
220 | 0 | let _ = self.channel.send(reply); |
221 | 0 | } |
222 | | } |
223 | 0 | } Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::Process>::process_one Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::Process>::process_one Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::Process>::process_one |
224 | | } |
225 | | |
226 | | impl $c { |
227 | 0 | fn dbus_proxy(&self) -> Proxy<&Self> { |
228 | 0 | Proxy::new("org.freedesktop.DBus", "/org/freedesktop/DBus", Duration::from_secs(10), self) |
229 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection>::dbus_proxy Unexecuted instantiation: <dbus::nonblock::LocalConnection>::dbus_proxy Unexecuted instantiation: <dbus::nonblock::SyncConnection>::dbus_proxy |
230 | | |
231 | | /// Get the connection's unique name. |
232 | | /// |
233 | | /// It's usually something like ":1.54" |
234 | 0 | pub fn unique_name(&self) -> BusName { self.channel.unique_name().unwrap().into() } Unexecuted instantiation: <dbus::nonblock::Connection>::unique_name Unexecuted instantiation: <dbus::nonblock::LocalConnection>::unique_name Unexecuted instantiation: <dbus::nonblock::SyncConnection>::unique_name |
235 | | |
236 | | /// Request a name on the D-Bus. |
237 | | /// |
238 | | /// For detailed information on the flags and return values, see the libdbus documentation. |
239 | 0 | pub async fn request_name<'a, N: Into<BusName<'a>>>(&self, name: N, allow_replacement: bool, replace_existing: bool, do_not_queue: bool) |
240 | 0 | -> Result<stdintf::org_freedesktop_dbus::RequestNameReply, Error> { Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_> Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_> Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_> |
241 | | let flags: u32 = |
242 | | if allow_replacement { 1 } else { 0 } + |
243 | | if replace_existing { 2 } else { 0 } + |
244 | | if do_not_queue { 4 } else { 0 }; |
245 | | let r = self.dbus_proxy().request_name(&name.into(), flags).await?; |
246 | | use stdintf::org_freedesktop_dbus::RequestNameReply::*; |
247 | | let all = [PrimaryOwner, InQueue, Exists, AlreadyOwner]; |
248 | | all.iter().find(|x| **x as u32 == r).copied().ok_or_else(|| |
249 | | crate::Error::new_failed("Invalid reply from DBus server") |
250 | | ) |
251 | | } |
252 | | |
253 | | /// Release a previously requested name on the D-Bus. |
254 | 0 | pub async fn release_name<'a, N: Into<BusName<'a>>>(&self, name: N) -> Result<stdintf::org_freedesktop_dbus::ReleaseNameReply, Error> { Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_> Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_> Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_> |
255 | | let r = self.dbus_proxy().release_name(&name.into()).await?; |
256 | | use stdintf::org_freedesktop_dbus::ReleaseNameReply::*; |
257 | | let all = [Released, NonExistent, NotOwner]; |
258 | | all.iter().find(|x| **x as u32 == r).copied().ok_or_else(|| |
259 | | crate::Error::new_failed("Invalid reply from DBus server") |
260 | | ) |
261 | | } |
262 | | |
263 | | /// Adds a new match to the connection, and sets up a callback when this message arrives. |
264 | | /// |
265 | | /// If multiple [`MatchRule`]s match the same message, then by default only the first will get |
266 | | /// the callback. This behaviour can be changed for signal messages by calling |
267 | | /// [`set_signal_match_mode`](Self::set_signal_match_mode). |
268 | | /// |
269 | | /// The returned value can be used to remove the match. |
270 | 0 | pub async fn add_match(&self, match_rule: MatchRule<'static>) -> Result<MsgMatch, Error> { Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match Unexecuted instantiation: <dbus::nonblock::Connection>::add_match Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match |
271 | | let m = match_rule.match_str(); |
272 | | self.add_match_no_cb(&m).await?; |
273 | | let mi = Arc::new(MatchInner { |
274 | | token: Default::default(), |
275 | | cb: Default::default(), |
276 | | }); |
277 | | let mi_weak = Arc::downgrade(&mi); |
278 | | let token = self.start_receive(match_rule, Box::new(move |msg, _| { |
279 | | mi_weak.upgrade().map(|mi| mi.incoming(msg)).unwrap_or(false) |
280 | | })); |
281 | | mi.token.store(token.0, Ordering::SeqCst); |
282 | | Ok(MsgMatch(mi)) |
283 | | } |
284 | | |
285 | | |
286 | | /// Adds a new match to the connection, without setting up a callback when this message arrives. |
287 | 0 | pub async fn add_match_no_cb(&self, match_str: &str) -> Result<(), Error> { Unexecuted instantiation: <dbus::nonblock::Connection>::add_match_no_cb Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match_no_cb Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match_no_cb |
288 | | self.dbus_proxy().add_match(match_str).await |
289 | | } |
290 | | |
291 | | /// Removes a match from the connection, without removing any callbacks. |
292 | 0 | pub async fn remove_match_no_cb(&self, match_str: &str) -> Result<(), Error> { Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match_no_cb Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match_no_cb Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match_no_cb |
293 | | self.dbus_proxy().remove_match(match_str).await |
294 | | } |
295 | | |
296 | | /// Removes a previously added match and callback from the connection. |
297 | 0 | pub async fn remove_match(&self, id: Token) -> Result<(), Error> { Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match |
298 | | let (mr, _) = self.stop_receive(id).ok_or_else(|| Error::new_failed("No match with that id found"))?; |
299 | | self.remove_match_no_cb(&mr.match_str()).await |
300 | | } |
301 | | |
302 | | /// If true, configures the connection to send signal messages to all matching [`MatchRule`] |
303 | | /// filters added with [`add_match`](Self::add_match) rather than just the first one. This comes |
304 | | /// with the following gotchas: |
305 | | /// |
306 | | /// * The messages might be duplicated, so the message serial might be lost (this is |
307 | | /// generally not a problem for signals). |
308 | | /// * Panicking inside a match callback might mess with other callbacks, causing them |
309 | | /// to be permanently dropped. |
310 | | /// * Removing other matches from inside a match callback is not supported. |
311 | | /// |
312 | | /// This is false by default, for a newly-created connection. |
313 | 0 | pub fn set_signal_match_mode(&self, match_all: bool) { |
314 | 0 | self.all_signal_matches.store(match_all, Ordering::Release); |
315 | 0 | } Unexecuted instantiation: <dbus::nonblock::Connection>::set_signal_match_mode Unexecuted instantiation: <dbus::nonblock::LocalConnection>::set_signal_match_mode Unexecuted instantiation: <dbus::nonblock::SyncConnection>::set_signal_match_mode |
316 | | } |
317 | | |
318 | | |
319 | | } |
320 | | } |
321 | | |
322 | 0 | connimpl!(Connection, FilterCb, RepliesCb, Send); Unexecuted instantiation: <dbus::nonblock::Connection as dbus::channel::Sender>::send::{closure#0} Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1} Unexecuted instantiation: <dbus::nonblock::Connection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0} Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_>::{closure#0}::{closure#1} Unexecuted instantiation: <dbus::nonblock::Connection>::add_match::{closure#0}::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::Connection>::request_name::<_>::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_>::{closure#0}::{closure#1} Unexecuted instantiation: <dbus::nonblock::Connection>::release_name::<_>::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::Connection>::remove_match::{closure#0}::{closure#0} |
323 | 0 | connimpl!(LocalConnection, LocalFilterCb, LocalRepliesCb); Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1} Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection as dbus::channel::Sender>::send::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_>::{closure#0}::{closure#1} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::release_name::<_>::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::add_match::{closure#0}::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_>::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::remove_match::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::LocalConnection>::request_name::<_>::{closure#0}::{closure#1} |
324 | 0 | connimpl!(SyncConnection, SyncFilterCb, SyncRepliesCb, Send); Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#0} Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::channel::Sender>::send::{closure#0} Unexecuted instantiation: <dbus::nonblock::SyncConnection as dbus::nonblock::NonblockReply>::send_with_reply::{closure#1} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::remove_match::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_>::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_>::{closure#0}::{closure#1} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::add_match::{closure#0}::{closure#0}::{closure#0} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::request_name::<_>::{closure#0}::{closure#1} Unexecuted instantiation: <dbus::nonblock::SyncConnection>::release_name::<_>::{closure#0}::{closure#0} |
325 | | |
326 | | impl Connection { |
327 | 0 | fn filters_mut(&self) -> std::cell::RefMut<Filters<FilterCb>> { self.filters.borrow_mut() } |
328 | 0 | fn replies_mut(&self) -> std::cell::RefMut<Replies<RepliesCb>> { self.replies.borrow_mut() } |
329 | | } |
330 | | |
331 | | impl LocalConnection { |
332 | 0 | fn filters_mut(&self) -> std::cell::RefMut<Filters<LocalFilterCb>> { self.filters.borrow_mut() } |
333 | 0 | fn replies_mut(&self) -> std::cell::RefMut<Replies<LocalRepliesCb>> { self.replies.borrow_mut() } |
334 | | } |
335 | | |
336 | | impl SyncConnection { |
337 | 0 | fn filters_mut(&self) -> std::sync::MutexGuard<Filters<SyncFilterCb>> { self.filters.lock().unwrap() } |
338 | 0 | fn replies_mut(&self) -> std::sync::MutexGuard<Replies<SyncRepliesCb>> { self.replies.lock().unwrap() } |
339 | | } |
340 | | |
341 | | /// Internal callback for the executor when a timeout needs to be made. |
342 | | pub type TimeoutMakerCb = fn(timeout: Instant) -> pin::Pin<Box<dyn Future<Output=()> + Send + Sync + 'static>>; |
343 | | |
344 | | /// Internal callback for the executor when we need wakeup a task |
345 | | pub type WakerCb = Box<dyn Fn() -> Result<(), ()> + Send + Sync +'static>; |
346 | | |
347 | | /// Internal helper trait for async method replies. |
348 | | pub trait NonblockReply { |
349 | | /// Callback type |
350 | | type F; |
351 | | /// Sends a message and calls the callback when a reply is received. |
352 | | fn send_with_reply(&self, msg: Message, f: Self::F) -> Result<Token, ()>; |
353 | | /// Cancels a pending reply. |
354 | | fn cancel_reply(&self, id: Token) -> Option<Self::F>; |
355 | | /// Internal helper function that creates a callback. |
356 | | fn make_f<G: FnOnce(Message, &Self) + Send + 'static>(g: G) -> Self::F where Self: Sized; |
357 | | /// Set the internal timeout maker |
358 | | fn set_timeout_maker(&mut self, f: Option<TimeoutMakerCb>) -> Option<TimeoutMakerCb>; |
359 | | /// Get the internal timeout maker |
360 | | fn timeout_maker(&self) -> Option<TimeoutMakerCb>; |
361 | | /// Set the wakeup call |
362 | | fn set_waker(&mut self, f: Option<WakerCb>) -> Option<WakerCb>; |
363 | | } |
364 | | |
365 | | |
366 | | /// Internal helper trait, implemented for connections that process incoming messages. |
367 | | pub trait Process: Sender + AsRef<Channel> { |
368 | | /// Dispatches all pending messages, without blocking. |
369 | | /// |
370 | | /// This is usually called from the reactor only, after read_write. |
371 | | /// Despite this taking &self and not "&mut self", it is a logic error to call this |
372 | | /// recursively or from more than one thread at a time. |
373 | 0 | fn process_all(&self) { |
374 | 0 | let c: &Channel = self.as_ref(); |
375 | 0 | while let Some(msg) = c.pop_message() { |
376 | 0 | self.process_one(msg); |
377 | 0 | } |
378 | 0 | } |
379 | | |
380 | | /// Dispatches a message. |
381 | | fn process_one(&self, msg: Message); |
382 | | } |
383 | | |
384 | | /// A struct used to handle incoming matches |
385 | | /// |
386 | | /// Note: Due to the lack of async destructors, please call Connection.remove_match() |
387 | | /// in order to properly stop matching (instead of just dropping this struct). |
388 | | pub struct MsgMatch(Arc<MatchInner>); |
389 | | |
390 | | struct MatchInner { |
391 | | token: AtomicUsize, |
392 | | cb: Mutex<Option<Box<dyn FnMut(Message) -> bool + Send>>>, |
393 | | } |
394 | | |
395 | | impl MatchInner { |
396 | 0 | fn incoming(&self, msg: Message) -> bool { |
397 | 0 | if let Some(ref mut cb) = self.cb.lock().unwrap().as_mut() { |
398 | 0 | cb(msg) |
399 | | } |
400 | 0 | else { true } |
401 | 0 | } |
402 | | } |
403 | | |
404 | | impl MsgMatch { |
405 | | /// Configures the match to receive a synchronous callback with only a message parameter. |
406 | 0 | pub fn msg_cb<F: FnMut(Message) -> bool + Send + 'static>(self, f: F) -> Self { |
407 | 0 | { |
408 | 0 | let mut cb = self.0.cb.lock().unwrap(); |
409 | 0 | *cb = Some(Box::new(f)); |
410 | 0 | } |
411 | 0 | self |
412 | 0 | } |
413 | | |
414 | | /// Configures the match to receive a synchronous callback with a message parameter and typed |
415 | | /// message arguments. |
416 | | /// |
417 | | /// # Example |
418 | | /// |
419 | | /// ```ignore |
420 | | /// let mr = MatchRule::new_signal("com.example.dbustest", "HelloHappened"); |
421 | | /// let incoming_signal = connection.add_match(mr).await?.cb(|_, (source,): (String,)| { |
422 | | /// println!("Hello from {} happened on the bus!", source); |
423 | | /// true |
424 | | /// }); |
425 | | /// ``` |
426 | 0 | pub fn cb<R: ReadAll, F: FnMut(Message, R) -> bool + Send + 'static>(self, mut f: F) -> Self { |
427 | 0 | self.msg_cb(move |msg| { |
428 | 0 | if let Ok(r) = R::read(&mut msg.iter_init()) { |
429 | 0 | f(msg, r) |
430 | 0 | } else { true } |
431 | 0 | }) |
432 | 0 | } |
433 | | |
434 | | /// Configures the match to receive a stream of messages. |
435 | | /// |
436 | | /// Note: If the receiving end is disconnected and a message is received, |
437 | | /// the message matching will end but not in a clean fashion. Call remove_match() to |
438 | | /// stop matching cleanly. |
439 | 0 | pub fn msg_stream(self) -> (Self, futures_channel::mpsc::UnboundedReceiver<Message>) { |
440 | 0 | let (sender, receiver) = futures_channel::mpsc::unbounded(); |
441 | 0 | (self.msg_cb(move |msg| { |
442 | 0 | sender.unbounded_send(msg).is_ok() |
443 | 0 | }), receiver) |
444 | 0 | } |
445 | | |
446 | | /// Configures the match to receive a stream of messages, parsed and ready. |
447 | | /// |
448 | | /// Note: If the receiving end is disconnected and a message is received, |
449 | | /// the message matching will end but not in a clean fashion. Call remove_match() to |
450 | | /// stop matching cleanly. |
451 | | /// |
452 | | /// # Example |
453 | | /// |
454 | | /// ```ignore |
455 | | /// let mr = MatchRule::new_signal("com.example.dbustest", "HelloHappened"); |
456 | | /// let (incoming_signal, stream) = conn.add_match(mr).await?.stream(); |
457 | | /// let stream = stream.for_each(|(_, (source,)): (_, (String,))| { |
458 | | /// println!("Hello from {} happened on the bus!", source); |
459 | | /// async {} |
460 | | /// }); |
461 | | /// ``` |
462 | 0 | pub fn stream<R: ReadAll + Send + 'static>(self) -> (Self, futures_channel::mpsc::UnboundedReceiver<(Message, R)>) { |
463 | 0 | let (sender, receiver) = futures_channel::mpsc::unbounded(); |
464 | 0 | (self.cb(move |msg, r| { |
465 | 0 | sender.unbounded_send((msg, r)).is_ok() |
466 | 0 | }), receiver) |
467 | 0 | } |
468 | | |
469 | | /// The token retreived can be used in a call to remove_match to stop matching on the data. |
470 | 0 | pub fn token(&self) -> Token { Token(self.0.token.load(Ordering::SeqCst)) } |
471 | | } |
472 | | |
473 | | /// A struct that wraps a connection, destination and path. |
474 | | /// |
475 | | /// A D-Bus "Proxy" is a client-side object that corresponds to a remote object on the server side. |
476 | | /// Calling methods on the proxy object calls methods on the remote object. |
477 | | /// Read more in the [D-Bus tutorial](https://dbus.freedesktop.org/doc/dbus-tutorial.html#proxies) |
478 | 0 | #[derive(Clone, Debug)] |
479 | | pub struct Proxy<'a, C> { |
480 | | /// Destination, i e what D-Bus service you're communicating with |
481 | | pub destination: BusName<'a>, |
482 | | /// Object path on the destination |
483 | | pub path: Path<'a>, |
484 | | /// Some way to send and/or receive messages, non-blocking. |
485 | | pub connection: C, |
486 | | /// Timeout for method calls |
487 | | pub timeout: Duration, |
488 | | } |
489 | | |
490 | | impl<'a, C> Proxy<'a, C> { |
491 | | /// Creates a new proxy struct. |
492 | 0 | pub fn new<D: Into<BusName<'a>>, P: Into<Path<'a>>>(dest: D, path: P, timeout: Duration, connection: C) -> Self { |
493 | 0 | Proxy { destination: dest.into(), path: path.into(), timeout, connection } |
494 | 0 | } Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::LocalConnection>>::new::<&str, &str> Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::Connection>>::new::<&str, &str> Unexecuted instantiation: <dbus::nonblock::Proxy<&dbus::nonblock::SyncConnection>>::new::<&str, &str> |
495 | | } |
496 | | |
497 | | struct MRAwait { |
498 | | mrouter: MROuter, |
499 | | token: Result<Token, ()>, |
500 | | timeout: Instant, |
501 | | timeoutfn: Option<TimeoutMakerCb> |
502 | | } |
503 | | |
504 | 0 | async fn method_call_await(mra: MRAwait) -> Result<Message, Error> { |
505 | 0 | use futures_util::future; |
506 | 0 | let MRAwait { mrouter, token, timeout, timeoutfn } = mra; |
507 | 0 | if token.is_err() { return Err(Error::new_failed("Failed to send message")) }; |
508 | 0 | let timeout = if let Some(tfn) = timeoutfn { tfn(timeout) } else { Box::pin(future::pending()) }; |
509 | 0 | match future::select(mrouter, timeout).await { |
510 | 0 | future::Either::Left((r, _)) => r, |
511 | 0 | future::Either::Right(_) => Err(Error::new_custom("org.freedesktop.DBus.Error.Timeout", "Timeout waiting for reply")), |
512 | | } |
513 | 0 | } |
514 | | |
515 | | impl<'a, T, C> Proxy<'a, C> |
516 | | where |
517 | | T: NonblockReply, |
518 | | C: std::ops::Deref<Target=T> |
519 | | { |
520 | | |
521 | 0 | fn method_call_setup(&self, msg: Message) -> MRAwait { |
522 | 0 | let mr = Arc::new(Mutex::new(MRInner::Neither)); |
523 | 0 | let mrouter = MROuter(mr.clone()); |
524 | 0 | let f = T::make_f(move |msg: Message, _: &T| { |
525 | 0 | let mut inner = mr.lock().unwrap(); |
526 | 0 | let old = mem::replace(&mut *inner, MRInner::Ready(Ok(msg))); |
527 | 0 | if let MRInner::Pending(waker) = old { waker.wake() } |
528 | 0 | }); |
529 | 0 |
|
530 | 0 | let timeout = Instant::now() + self.timeout; |
531 | 0 | let token = self.connection.send_with_reply(msg, f); |
532 | 0 | let timeoutfn = self.connection.timeout_maker(); |
533 | 0 | MRAwait { mrouter, token, timeout, timeoutfn } |
534 | 0 | } |
535 | | |
536 | | /// Make a method call using typed input argument, returns a future that resolves to the typed output arguments. |
537 | 0 | pub fn method_call<'i, 'm, R: ReadAll + 'static, A: AppendAll, I: Into<Interface<'i>>, M: Into<Member<'m>>>(&self, i: I, m: M, args: A) |
538 | 0 | -> MethodReply<R> { |
539 | 0 | let mut msg = Message::method_call(&self.destination, &self.path, &i.into(), &m.into()); |
540 | 0 | args.append(&mut IterAppend::new(&mut msg)); |
541 | 0 | let mra = self.method_call_setup(msg); |
542 | 0 | let r = method_call_await(mra); |
543 | 0 | let r = futures_util::FutureExt::map(r, |r| -> Result<R, Error> { r.and_then(|rmsg| rmsg.read_all()) } ); |
544 | 0 | MethodReply::new(r) |
545 | 0 | } |
546 | | } |
547 | | |
548 | | enum MRInner { |
549 | | Ready(Result<Message, Error>), |
550 | | Pending(task::Waker), |
551 | | Neither, |
552 | | } |
553 | | |
554 | | struct MROuter(Arc<Mutex<MRInner>>); |
555 | | |
556 | | impl Future for MROuter { |
557 | | type Output = Result<Message, Error>; |
558 | 0 | fn poll(self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> { |
559 | 0 | let mut inner = self.0.lock().unwrap(); |
560 | 0 | let r = mem::replace(&mut *inner, MRInner::Neither); |
561 | 0 | if let MRInner::Ready(r) = r { task::Poll::Ready(r) } |
562 | | else { |
563 | 0 | *inner = MRInner::Pending(ctx.waker().clone()); |
564 | 0 | return task::Poll::Pending |
565 | | } |
566 | 0 | } |
567 | | } |
568 | | |
569 | | /// Future method reply, used while waiting for a method call reply from the server. |
570 | | pub struct MethodReply<T>(pin::Pin<Box<dyn Future<Output=Result<T, Error>> + Send + 'static>>); |
571 | | |
572 | | impl<T> MethodReply<T> { |
573 | | /// Creates a new method reply from a future. |
574 | 0 | fn new<Fut: Future<Output=Result<T, Error>> + Send + 'static>(fut: Fut) -> Self { |
575 | 0 | MethodReply(Box::pin(fut)) |
576 | 0 | } |
577 | | } |
578 | | |
579 | | impl<T> Future for MethodReply<T> { |
580 | | type Output = Result<T, Error>; |
581 | 0 | fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Result<T, Error>> { |
582 | 0 | self.0.as_mut().poll(ctx) |
583 | 0 | } |
584 | | } |
585 | | |
586 | | impl<T: 'static> MethodReply<T> { |
587 | | /// Convenience combinator in case you want to post-process the result after reading it |
588 | 0 | pub fn and_then<T2>(self, f: impl FnOnce(T) -> Result<T2, Error> + Send + Sync + 'static) -> MethodReply<T2> { |
589 | 0 | MethodReply(Box::pin(async move { |
590 | 0 | let x = self.0.await?; |
591 | 0 | f(x) |
592 | 0 | })) |
593 | 0 | } |
594 | | } |
595 | | |
596 | | #[test] |
597 | | fn test_conn_send_sync() { |
598 | | fn is_send<T: Send>() {} |
599 | | fn is_sync<T: Sync>() {} |
600 | | is_send::<Connection>(); |
601 | | is_send::<SyncConnection>(); |
602 | | is_sync::<SyncConnection>(); |
603 | | is_send::<MsgMatch>(); |
604 | | } |