/src/libzmq/src/mailbox_safe.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "mailbox_safe.hpp" |
5 | | #include "clock.hpp" |
6 | | #include "err.hpp" |
7 | | |
8 | | #include <algorithm> |
9 | | |
10 | 0 | zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_) |
11 | 0 | { |
12 | | // Get the pipe into passive state. That way, if the users starts by |
13 | | // polling on the associated file descriptor it will get woken up when |
14 | | // new command is posted. |
15 | 0 | const bool ok = _cpipe.check_read (); |
16 | 0 | zmq_assert (!ok); |
17 | 0 | } |
18 | | |
19 | | zmq::mailbox_safe_t::~mailbox_safe_t () |
20 | 0 | { |
21 | | // TODO: Retrieve and deallocate commands inside the cpipe. |
22 | | |
23 | | // Work around problem that other threads might still be in our |
24 | | // send() method, by waiting on the mutex before disappearing. |
25 | 0 | _sync->lock (); |
26 | 0 | _sync->unlock (); |
27 | 0 | } |
28 | | |
29 | | void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_) |
30 | 0 | { |
31 | 0 | _signalers.push_back (signaler_); |
32 | 0 | } |
33 | | |
34 | | void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_) |
35 | 0 | { |
36 | | // TODO: make a copy of array and signal outside the lock |
37 | 0 | const std::vector<zmq::signaler_t *>::iterator end = _signalers.end (); |
38 | 0 | const std::vector<signaler_t *>::iterator it = |
39 | 0 | std::find (_signalers.begin (), end, signaler_); |
40 | |
|
41 | 0 | if (it != end) |
42 | 0 | _signalers.erase (it); |
43 | 0 | } |
44 | | |
45 | | void zmq::mailbox_safe_t::clear_signalers () |
46 | 0 | { |
47 | 0 | _signalers.clear (); |
48 | 0 | } |
49 | | |
50 | | void zmq::mailbox_safe_t::send (const command_t &cmd_) |
51 | 0 | { |
52 | 0 | _sync->lock (); |
53 | 0 | _cpipe.write (cmd_, false); |
54 | 0 | const bool ok = _cpipe.flush (); |
55 | |
|
56 | 0 | if (!ok) { |
57 | 0 | _cond_var.broadcast (); |
58 | |
|
59 | 0 | for (std::vector<signaler_t *>::iterator it = _signalers.begin (), |
60 | 0 | end = _signalers.end (); |
61 | 0 | it != end; ++it) { |
62 | 0 | (*it)->send (); |
63 | 0 | } |
64 | 0 | } |
65 | |
|
66 | 0 | _sync->unlock (); |
67 | 0 | } |
68 | | |
69 | | int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_) |
70 | 0 | { |
71 | | // Try to get the command straight away. |
72 | 0 | if (_cpipe.read (cmd_)) |
73 | 0 | return 0; |
74 | | |
75 | | // If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command |
76 | | // and immediately relock it. |
77 | 0 | if (timeout_ == 0) { |
78 | 0 | _sync->unlock (); |
79 | 0 | _sync->lock (); |
80 | 0 | } else { |
81 | | // Wait for signal from the command sender. |
82 | 0 | const int rc = _cond_var.wait (_sync, timeout_); |
83 | 0 | if (rc == -1) { |
84 | 0 | errno_assert (errno == EAGAIN || errno == EINTR); |
85 | 0 | return -1; |
86 | 0 | } |
87 | 0 | } |
88 | | |
89 | | // Another thread may already fetch the command |
90 | 0 | const bool ok = _cpipe.read (cmd_); |
91 | |
|
92 | 0 | if (!ok) { |
93 | 0 | errno = EAGAIN; |
94 | 0 | return -1; |
95 | 0 | } |
96 | | |
97 | 0 | return 0; |
98 | 0 | } |