/src/libzmq/src/radio.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include <string.h> |
5 | | |
6 | | #include "radio.hpp" |
7 | | #include "macros.hpp" |
8 | | #include "pipe.hpp" |
9 | | #include "err.hpp" |
10 | | #include "msg.hpp" |
11 | | |
12 | | zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
13 | 0 | socket_base_t (parent_, tid_, sid_, true), _lossy (true) |
14 | 0 | { |
15 | 0 | options.type = ZMQ_RADIO; |
16 | 0 | } |
17 | | |
18 | | zmq::radio_t::~radio_t () |
19 | 0 | { |
20 | 0 | } |
21 | | |
22 | | void zmq::radio_t::xattach_pipe (pipe_t *pipe_, |
23 | | bool subscribe_to_all_, |
24 | | bool locally_initiated_) |
25 | 0 | { |
26 | 0 | LIBZMQ_UNUSED (subscribe_to_all_); |
27 | 0 | LIBZMQ_UNUSED (locally_initiated_); |
28 | |
|
29 | 0 | zmq_assert (pipe_); |
30 | | |
31 | | // Don't delay pipe termination as there is no one |
32 | | // to receive the delimiter. |
33 | 0 | pipe_->set_nodelay (); |
34 | |
|
35 | 0 | _dist.attach (pipe_); |
36 | |
|
37 | 0 | if (subscribe_to_all_) |
38 | 0 | _udp_pipes.push_back (pipe_); |
39 | | // The pipe is active when attached. Let's read the subscriptions from |
40 | | // it, if any. |
41 | 0 | else |
42 | 0 | xread_activated (pipe_); |
43 | 0 | } |
44 | | |
45 | | void zmq::radio_t::xread_activated (pipe_t *pipe_) |
46 | 0 | { |
47 | | // There are some subscriptions waiting. Let's process them. |
48 | 0 | msg_t msg; |
49 | 0 | while (pipe_->read (&msg)) { |
50 | | // Apply the subscription to the trie |
51 | 0 | if (msg.is_join () || msg.is_leave ()) { |
52 | 0 | std::string group = std::string (msg.group ()); |
53 | |
|
54 | 0 | if (msg.is_join ()) |
55 | 0 | _subscriptions.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (group), |
56 | 0 | pipe_); |
57 | 0 | else { |
58 | 0 | std::pair<subscriptions_t::iterator, subscriptions_t::iterator> |
59 | 0 | range = _subscriptions.equal_range (group); |
60 | |
|
61 | 0 | for (subscriptions_t::iterator it = range.first; |
62 | 0 | it != range.second; ++it) { |
63 | 0 | if (it->second == pipe_) { |
64 | 0 | _subscriptions.erase (it); |
65 | 0 | break; |
66 | 0 | } |
67 | 0 | } |
68 | 0 | } |
69 | 0 | } |
70 | 0 | msg.close (); |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | | void zmq::radio_t::xwrite_activated (pipe_t *pipe_) |
75 | 0 | { |
76 | 0 | _dist.activated (pipe_); |
77 | 0 | } |
78 | | int zmq::radio_t::xsetsockopt (int option_, |
79 | | const void *optval_, |
80 | | size_t optvallen_) |
81 | 0 | { |
82 | 0 | if (optvallen_ != sizeof (int) || *static_cast<const int *> (optval_) < 0) { |
83 | 0 | errno = EINVAL; |
84 | 0 | return -1; |
85 | 0 | } |
86 | 0 | if (option_ == ZMQ_XPUB_NODROP) |
87 | 0 | _lossy = (*static_cast<const int *> (optval_) == 0); |
88 | 0 | else { |
89 | 0 | errno = EINVAL; |
90 | 0 | return -1; |
91 | 0 | } |
92 | 0 | return 0; |
93 | 0 | } |
94 | | |
95 | | void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) |
96 | 0 | { |
97 | 0 | for (subscriptions_t::iterator it = _subscriptions.begin (), |
98 | 0 | end = _subscriptions.end (); |
99 | 0 | it != end;) { |
100 | 0 | if (it->second == pipe_) { |
101 | 0 | #if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700) |
102 | 0 | it = _subscriptions.erase (it); |
103 | | #else |
104 | | _subscriptions.erase (it++); |
105 | | #endif |
106 | 0 | } else { |
107 | 0 | ++it; |
108 | 0 | } |
109 | 0 | } |
110 | |
|
111 | 0 | { |
112 | 0 | const udp_pipes_t::iterator end = _udp_pipes.end (); |
113 | 0 | const udp_pipes_t::iterator it = |
114 | 0 | std::find (_udp_pipes.begin (), end, pipe_); |
115 | 0 | if (it != end) |
116 | 0 | _udp_pipes.erase (it); |
117 | 0 | } |
118 | |
|
119 | 0 | _dist.pipe_terminated (pipe_); |
120 | 0 | } |
121 | | |
122 | | int zmq::radio_t::xsend (msg_t *msg_) |
123 | 0 | { |
124 | | // Radio sockets do not allow multipart data (ZMQ_SNDMORE) |
125 | 0 | if (msg_->flags () & msg_t::more) { |
126 | 0 | errno = EINVAL; |
127 | 0 | return -1; |
128 | 0 | } |
129 | | |
130 | 0 | _dist.unmatch (); |
131 | |
|
132 | 0 | const std::pair<subscriptions_t::iterator, subscriptions_t::iterator> |
133 | 0 | range = _subscriptions.equal_range (std::string (msg_->group ())); |
134 | |
|
135 | 0 | for (subscriptions_t::iterator it = range.first; it != range.second; ++it) |
136 | 0 | _dist.match (it->second); |
137 | |
|
138 | 0 | for (udp_pipes_t::iterator it = _udp_pipes.begin (), |
139 | 0 | end = _udp_pipes.end (); |
140 | 0 | it != end; ++it) |
141 | 0 | _dist.match (*it); |
142 | |
|
143 | 0 | int rc = -1; |
144 | 0 | if (_lossy || _dist.check_hwm ()) { |
145 | 0 | if (_dist.send_to_matching (msg_) == 0) { |
146 | 0 | rc = 0; // Yay, sent successfully |
147 | 0 | } |
148 | 0 | } else |
149 | 0 | errno = EAGAIN; |
150 | |
|
151 | 0 | return rc; |
152 | 0 | } |
153 | | |
154 | | bool zmq::radio_t::xhas_out () |
155 | 0 | { |
156 | 0 | return _dist.has_out (); |
157 | 0 | } |
158 | | |
159 | | int zmq::radio_t::xrecv (msg_t *msg_) |
160 | 0 | { |
161 | | // Messages cannot be received from PUB socket. |
162 | 0 | LIBZMQ_UNUSED (msg_); |
163 | 0 | errno = ENOTSUP; |
164 | 0 | return -1; |
165 | 0 | } |
166 | | |
167 | | bool zmq::radio_t::xhas_in () |
168 | 0 | { |
169 | 0 | return false; |
170 | 0 | } |
171 | | |
172 | | zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, |
173 | | bool connect_, |
174 | | socket_base_t *socket_, |
175 | | const options_t &options_, |
176 | | address_t *addr_) : |
177 | 0 | session_base_t (io_thread_, connect_, socket_, options_, addr_), |
178 | 0 | _state (group) |
179 | 0 | { |
180 | 0 | } |
181 | | |
182 | | zmq::radio_session_t::~radio_session_t () |
183 | 0 | { |
184 | 0 | } |
185 | | |
186 | | int zmq::radio_session_t::push_msg (msg_t *msg_) |
187 | 0 | { |
188 | 0 | if (msg_->flags () & msg_t::command) { |
189 | 0 | char *command_data = static_cast<char *> (msg_->data ()); |
190 | 0 | const size_t data_size = msg_->size (); |
191 | |
|
192 | 0 | int group_length; |
193 | 0 | const char *group; |
194 | |
|
195 | 0 | msg_t join_leave_msg; |
196 | 0 | int rc; |
197 | | |
198 | | // Set the msg type to either JOIN or LEAVE |
199 | 0 | if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) { |
200 | 0 | group_length = static_cast<int> (data_size) - 5; |
201 | 0 | group = command_data + 5; |
202 | 0 | rc = join_leave_msg.init_join (); |
203 | 0 | } else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) { |
204 | 0 | group_length = static_cast<int> (data_size) - 6; |
205 | 0 | group = command_data + 6; |
206 | 0 | rc = join_leave_msg.init_leave (); |
207 | 0 | } |
208 | | // If it is not a JOIN or LEAVE just push the message |
209 | 0 | else |
210 | 0 | return session_base_t::push_msg (msg_); |
211 | | |
212 | 0 | errno_assert (rc == 0); |
213 | | |
214 | | // Set the group |
215 | 0 | rc = join_leave_msg.set_group (group, group_length); |
216 | 0 | errno_assert (rc == 0); |
217 | | |
218 | | // Close the current command |
219 | 0 | rc = msg_->close (); |
220 | 0 | errno_assert (rc == 0); |
221 | | |
222 | | // Push the join or leave command |
223 | 0 | *msg_ = join_leave_msg; |
224 | 0 | return session_base_t::push_msg (msg_); |
225 | 0 | } |
226 | 0 | return session_base_t::push_msg (msg_); |
227 | 0 | } |
228 | | |
229 | | int zmq::radio_session_t::pull_msg (msg_t *msg_) |
230 | 0 | { |
231 | 0 | if (_state == group) { |
232 | 0 | int rc = session_base_t::pull_msg (&_pending_msg); |
233 | 0 | if (rc != 0) |
234 | 0 | return rc; |
235 | | |
236 | 0 | const char *group = _pending_msg.group (); |
237 | 0 | const int length = static_cast<int> (strlen (group)); |
238 | | |
239 | | // First frame is the group |
240 | 0 | rc = msg_->init_size (length); |
241 | 0 | errno_assert (rc == 0); |
242 | 0 | msg_->set_flags (msg_t::more); |
243 | 0 | memcpy (msg_->data (), group, length); |
244 | | |
245 | | // Next status is the body |
246 | 0 | _state = body; |
247 | 0 | return 0; |
248 | 0 | } |
249 | 0 | *msg_ = _pending_msg; |
250 | 0 | _state = group; |
251 | 0 | return 0; |
252 | 0 | } |
253 | | |
254 | | void zmq::radio_session_t::reset () |
255 | 0 | { |
256 | 0 | session_base_t::reset (); |
257 | 0 | _state = group; |
258 | 0 | } |