Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include <string.h> |
5 | | |
6 | | #include "xpub.hpp" |
7 | | #include "pipe.hpp" |
8 | | #include "err.hpp" |
9 | | #include "msg.hpp" |
10 | | #include "macros.hpp" |
11 | | #include "generic_mtrie_impl.hpp" |
12 | | |
13 | | zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
14 | 0 | socket_base_t (parent_, tid_, sid_), |
15 | 0 | _verbose_subs (false), |
16 | 0 | _verbose_unsubs (false), |
17 | 0 | _more_send (false), |
18 | 0 | _more_recv (false), |
19 | 0 | _process_subscribe (false), |
20 | 0 | _only_first_subscribe (false), |
21 | 0 | _lossy (true), |
22 | 0 | _manual (false), |
23 | 0 | _send_last_pipe (false), |
24 | 0 | _pending_pipes (), |
25 | 0 | _welcome_msg () |
26 | 0 | { |
27 | 0 | _last_pipe = NULL; |
28 | 0 | options.type = ZMQ_XPUB; |
29 | 0 | _welcome_msg.init (); |
30 | 0 | } |
31 | | |
32 | | zmq::xpub_t::~xpub_t () |
33 | 0 | { |
34 | 0 | _welcome_msg.close (); |
35 | 0 | for (std::deque<metadata_t *>::iterator it = _pending_metadata.begin (), |
36 | 0 | end = _pending_metadata.end (); |
37 | 0 | it != end; ++it) |
38 | 0 | if (*it && (*it)->drop_ref ()) |
39 | 0 | LIBZMQ_DELETE (*it); |
40 | 0 | } |
41 | | |
42 | | void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, |
43 | | bool subscribe_to_all_, |
44 | | bool locally_initiated_) |
45 | 0 | { |
46 | 0 | LIBZMQ_UNUSED (locally_initiated_); |
47 | |
|
48 | 0 | zmq_assert (pipe_); |
49 | 0 | _dist.attach (pipe_); |
50 | | |
51 | | // If subscribe_to_all_ is specified, the caller would like to subscribe |
52 | | // to all data on this pipe, implicitly. |
53 | 0 | if (subscribe_to_all_) |
54 | 0 | _subscriptions.add (NULL, 0, pipe_); |
55 | | |
56 | | // if welcome message exists, send a copy of it |
57 | 0 | if (_welcome_msg.size () > 0) { |
58 | 0 | msg_t copy; |
59 | 0 | copy.init (); |
60 | 0 | const int rc = copy.copy (_welcome_msg); |
61 | 0 | errno_assert (rc == 0); |
62 | 0 | const bool ok = pipe_->write (©); |
63 | 0 | zmq_assert (ok); |
64 | 0 | pipe_->flush (); |
65 | 0 | } |
66 | | |
67 | | // The pipe is active when attached. Let's read the subscriptions from |
68 | | // it, if any. |
69 | 0 | xread_activated (pipe_); |
70 | 0 | } |
71 | | |
72 | | void zmq::xpub_t::xread_activated (pipe_t *pipe_) |
73 | 0 | { |
74 | | // There are some subscriptions waiting. Let's process them. |
75 | 0 | msg_t msg; |
76 | 0 | while (pipe_->read (&msg)) { |
77 | 0 | metadata_t *metadata = msg.metadata (); |
78 | 0 | unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()), |
79 | 0 | *data = NULL; |
80 | 0 | size_t size = 0; |
81 | 0 | bool subscribe = false; |
82 | 0 | bool is_subscribe_or_cancel = false; |
83 | 0 | bool notify = false; |
84 | |
|
85 | 0 | const bool first_part = !_more_recv; |
86 | 0 | _more_recv = (msg.flags () & msg_t::more) != 0; |
87 | |
|
88 | 0 | if (first_part || _process_subscribe) { |
89 | | // Apply the subscription to the trie |
90 | 0 | if (msg.is_subscribe () || msg.is_cancel ()) { |
91 | 0 | data = static_cast<unsigned char *> (msg.command_body ()); |
92 | 0 | size = msg.command_body_size (); |
93 | 0 | subscribe = msg.is_subscribe (); |
94 | 0 | is_subscribe_or_cancel = true; |
95 | 0 | } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) { |
96 | 0 | data = msg_data + 1; |
97 | 0 | size = msg.size () - 1; |
98 | 0 | subscribe = *msg_data == 1; |
99 | 0 | is_subscribe_or_cancel = true; |
100 | 0 | } |
101 | 0 | } |
102 | |
|
103 | 0 | if (first_part) |
104 | 0 | _process_subscribe = |
105 | 0 | !_only_first_subscribe || is_subscribe_or_cancel; |
106 | |
|
107 | 0 | if (is_subscribe_or_cancel) { |
108 | 0 | if (_manual) { |
109 | | // Store manual subscription to use on termination |
110 | 0 | if (!subscribe) |
111 | 0 | _manual_subscriptions.rm (data, size, pipe_); |
112 | 0 | else |
113 | 0 | _manual_subscriptions.add (data, size, pipe_); |
114 | |
|
115 | 0 | _pending_pipes.push_back (pipe_); |
116 | 0 | } else { |
117 | 0 | if (!subscribe) { |
118 | 0 | const mtrie_t::rm_result rm_result = |
119 | 0 | _subscriptions.rm (data, size, pipe_); |
120 | | // TODO reconsider what to do if rm_result == mtrie_t::not_found |
121 | 0 | notify = |
122 | 0 | rm_result != mtrie_t::values_remain || _verbose_unsubs; |
123 | 0 | } else { |
124 | 0 | const bool first_added = |
125 | 0 | _subscriptions.add (data, size, pipe_); |
126 | 0 | notify = first_added || _verbose_subs; |
127 | 0 | } |
128 | 0 | } |
129 | | |
130 | | // If the request was a new subscription, or the subscription |
131 | | // was removed, or verbose mode or manual mode are enabled, store it |
132 | | // so that it can be passed to the user on next recv call. |
133 | 0 | if (_manual || (options.type == ZMQ_XPUB && notify)) { |
134 | | // ZMTP 3.1 hack: we need to support sub/cancel commands, but |
135 | | // we can't give them back to userspace as it would be an API |
136 | | // breakage since the payload of the message is completely |
137 | | // different. Manually craft an old-style message instead. |
138 | | // Although with other transports it would be possible to simply |
139 | | // reuse the same buffer and prefix a 0/1 byte to the topic, with |
140 | | // inproc the subscribe/cancel command string is not present in |
141 | | // the message, so this optimization is not possible. |
142 | | // The pushback makes a copy of the data array anyway, so the |
143 | | // number of buffer copies does not change. |
144 | 0 | blob_t notification (size + 1); |
145 | 0 | if (subscribe) |
146 | 0 | *notification.data () = 1; |
147 | 0 | else |
148 | 0 | *notification.data () = 0; |
149 | 0 | memcpy (notification.data () + 1, data, size); |
150 | |
|
151 | 0 | _pending_data.push_back (ZMQ_MOVE (notification)); |
152 | 0 | if (metadata) |
153 | 0 | metadata->add_ref (); |
154 | 0 | _pending_metadata.push_back (metadata); |
155 | 0 | _pending_flags.push_back (0); |
156 | 0 | } |
157 | 0 | } else if (options.type != ZMQ_PUB) { |
158 | | // Process user message coming upstream from xsub socket, |
159 | | // but not if the type is PUB, which never processes user |
160 | | // messages |
161 | 0 | _pending_data.push_back (blob_t (msg_data, msg.size ())); |
162 | 0 | if (metadata) |
163 | 0 | metadata->add_ref (); |
164 | 0 | _pending_metadata.push_back (metadata); |
165 | 0 | _pending_flags.push_back (msg.flags ()); |
166 | 0 | } |
167 | |
|
168 | 0 | msg.close (); |
169 | 0 | } |
170 | 0 | } |
171 | | |
172 | | void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) |
173 | 0 | { |
174 | 0 | _dist.activated (pipe_); |
175 | 0 | } |
176 | | |
177 | | int zmq::xpub_t::xsetsockopt (int option_, |
178 | | const void *optval_, |
179 | | size_t optvallen_) |
180 | 0 | { |
181 | 0 | if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER |
182 | 0 | || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP |
183 | 0 | || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) { |
184 | 0 | if (optvallen_ != sizeof (int) |
185 | 0 | || *static_cast<const int *> (optval_) < 0) { |
186 | 0 | errno = EINVAL; |
187 | 0 | return -1; |
188 | 0 | } |
189 | 0 | if (option_ == ZMQ_XPUB_VERBOSE) { |
190 | 0 | _verbose_subs = (*static_cast<const int *> (optval_) != 0); |
191 | 0 | _verbose_unsubs = false; |
192 | 0 | } else if (option_ == ZMQ_XPUB_VERBOSER) { |
193 | 0 | _verbose_subs = (*static_cast<const int *> (optval_) != 0); |
194 | 0 | _verbose_unsubs = _verbose_subs; |
195 | 0 | } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) { |
196 | 0 | _manual = (*static_cast<const int *> (optval_) != 0); |
197 | 0 | _send_last_pipe = _manual; |
198 | 0 | } else if (option_ == ZMQ_XPUB_NODROP) |
199 | 0 | _lossy = (*static_cast<const int *> (optval_) == 0); |
200 | 0 | else if (option_ == ZMQ_XPUB_MANUAL) |
201 | 0 | _manual = (*static_cast<const int *> (optval_) != 0); |
202 | 0 | else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) |
203 | 0 | _only_first_subscribe = (*static_cast<const int *> (optval_) != 0); |
204 | 0 | } else if (option_ == ZMQ_SUBSCRIBE && _manual) { |
205 | 0 | if (_last_pipe != NULL) |
206 | 0 | _subscriptions.add ((unsigned char *) optval_, optvallen_, |
207 | 0 | _last_pipe); |
208 | 0 | } else if (option_ == ZMQ_UNSUBSCRIBE && _manual) { |
209 | 0 | if (_last_pipe != NULL) |
210 | 0 | _subscriptions.rm ((unsigned char *) optval_, optvallen_, |
211 | 0 | _last_pipe); |
212 | 0 | } else if (option_ == ZMQ_XPUB_WELCOME_MSG) { |
213 | 0 | _welcome_msg.close (); |
214 | |
|
215 | 0 | if (optvallen_ > 0) { |
216 | 0 | const int rc = _welcome_msg.init_size (optvallen_); |
217 | 0 | errno_assert (rc == 0); |
218 | |
|
219 | 0 | unsigned char *data = |
220 | 0 | static_cast<unsigned char *> (_welcome_msg.data ()); |
221 | 0 | memcpy (data, optval_, optvallen_); |
222 | 0 | } else |
223 | 0 | _welcome_msg.init (); |
224 | 0 | } else { |
225 | 0 | errno = EINVAL; |
226 | 0 | return -1; |
227 | 0 | } |
228 | 0 | return 0; |
229 | 0 | } |
230 | | |
231 | | int zmq::xpub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_) |
232 | 0 | { |
233 | 0 | if (option_ == ZMQ_TOPICS_COUNT) { |
234 | | // make sure to use a multi-thread safe function to avoid race conditions with I/O threads |
235 | | // where subscriptions are processed: |
236 | 0 | return do_getsockopt<int> (optval_, optvallen_, |
237 | 0 | (int) _subscriptions.num_prefixes ()); |
238 | 0 | } |
239 | | |
240 | | // room for future options here |
241 | | |
242 | 0 | errno = EINVAL; |
243 | 0 | return -1; |
244 | 0 | } |
245 | | |
246 | | static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_) |
247 | 0 | { |
248 | 0 | LIBZMQ_UNUSED (data_); |
249 | 0 | LIBZMQ_UNUSED (size_); |
250 | 0 | LIBZMQ_UNUSED (arg_); |
251 | 0 | } |
252 | | |
253 | | void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) |
254 | 0 | { |
255 | 0 | if (_manual) { |
256 | | // Remove the pipe from the trie and send corresponding manual |
257 | | // unsubscriptions upstream. |
258 | 0 | _manual_subscriptions.rm (pipe_, send_unsubscription, this, false); |
259 | | // Remove pipe without actually sending the message as it was taken |
260 | | // care of by the manual call above. subscriptions is the real mtrie, |
261 | | // so the pipe must be removed from there or it will be left over. |
262 | 0 | _subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false); |
263 | | |
264 | | // In case the pipe is currently set as last we must clear it to prevent |
265 | | // subscriptions from being re-added. |
266 | 0 | if (pipe_ == _last_pipe) { |
267 | 0 | _last_pipe = NULL; |
268 | 0 | } |
269 | 0 | } else { |
270 | | // Remove the pipe from the trie. If there are topics that nobody |
271 | | // is interested in anymore, send corresponding unsubscriptions |
272 | | // upstream. |
273 | 0 | _subscriptions.rm (pipe_, send_unsubscription, this, !_verbose_unsubs); |
274 | 0 | } |
275 | |
|
276 | 0 | _dist.pipe_terminated (pipe_); |
277 | 0 | } |
278 | | |
279 | | void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_) |
280 | 0 | { |
281 | 0 | self_->_dist.match (pipe_); |
282 | 0 | } |
283 | | |
284 | | void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_) |
285 | 0 | { |
286 | 0 | if (self_->_last_pipe == pipe_) |
287 | 0 | self_->_dist.match (pipe_); |
288 | 0 | } |
289 | | |
290 | | int zmq::xpub_t::xsend (msg_t *msg_) |
291 | 0 | { |
292 | 0 | const bool msg_more = (msg_->flags () & msg_t::more) != 0; |
293 | | |
294 | | // For the first part of multi-part message, find the matching pipes. |
295 | 0 | if (!_more_send) { |
296 | | // Ensure nothing from previous failed attempt to send is left matched |
297 | 0 | _dist.unmatch (); |
298 | |
|
299 | 0 | if (unlikely (_manual && _last_pipe && _send_last_pipe)) { |
300 | 0 | _subscriptions.match (static_cast<unsigned char *> (msg_->data ()), |
301 | 0 | msg_->size (), mark_last_pipe_as_matching, |
302 | 0 | this); |
303 | 0 | _last_pipe = NULL; |
304 | 0 | } else |
305 | 0 | _subscriptions.match (static_cast<unsigned char *> (msg_->data ()), |
306 | 0 | msg_->size (), mark_as_matching, this); |
307 | | // If inverted matching is used, reverse the selection now |
308 | 0 | if (options.invert_matching) { |
309 | 0 | _dist.reverse_match (); |
310 | 0 | } |
311 | 0 | } |
312 | |
|
313 | 0 | int rc = -1; // Assume we fail |
314 | 0 | if (_lossy || _dist.check_hwm ()) { |
315 | 0 | if (_dist.send_to_matching (msg_) == 0) { |
316 | | // If we are at the end of multi-part message we can mark |
317 | | // all the pipes as non-matching. |
318 | 0 | if (!msg_more) |
319 | 0 | _dist.unmatch (); |
320 | 0 | _more_send = msg_more; |
321 | 0 | rc = 0; // Yay, sent successfully |
322 | 0 | } |
323 | 0 | } else |
324 | 0 | errno = EAGAIN; |
325 | 0 | return rc; |
326 | 0 | } |
327 | | |
328 | | bool zmq::xpub_t::xhas_out () |
329 | 0 | { |
330 | 0 | return _dist.has_out (); |
331 | 0 | } |
332 | | |
333 | | int zmq::xpub_t::xrecv (msg_t *msg_) |
334 | 0 | { |
335 | | // If there is at least one |
336 | 0 | if (_pending_data.empty ()) { |
337 | 0 | errno = EAGAIN; |
338 | 0 | return -1; |
339 | 0 | } |
340 | | |
341 | | // User is reading a message, set last_pipe and remove it from the deque |
342 | 0 | if (_manual && !_pending_pipes.empty ()) { |
343 | 0 | _last_pipe = _pending_pipes.front (); |
344 | 0 | _pending_pipes.pop_front (); |
345 | | |
346 | | // If the distributor doesn't know about this pipe it must have already |
347 | | // been terminated and thus we can't allow manual subscriptions. |
348 | 0 | if (_last_pipe != NULL && !_dist.has_pipe (_last_pipe)) { |
349 | 0 | _last_pipe = NULL; |
350 | 0 | } |
351 | 0 | } |
352 | |
|
353 | 0 | int rc = msg_->close (); |
354 | 0 | errno_assert (rc == 0); |
355 | 0 | rc = msg_->init_size (_pending_data.front ().size ()); |
356 | 0 | errno_assert (rc == 0); |
357 | 0 | memcpy (msg_->data (), _pending_data.front ().data (), |
358 | 0 | _pending_data.front ().size ()); |
359 | | |
360 | | // set metadata only if there is some |
361 | 0 | if (metadata_t *metadata = _pending_metadata.front ()) { |
362 | 0 | msg_->set_metadata (metadata); |
363 | | // Remove ref corresponding to vector placement |
364 | 0 | metadata->drop_ref (); |
365 | 0 | } |
366 | |
|
367 | 0 | msg_->set_flags (_pending_flags.front ()); |
368 | 0 | _pending_data.pop_front (); |
369 | 0 | _pending_metadata.pop_front (); |
370 | 0 | _pending_flags.pop_front (); |
371 | 0 | return 0; |
372 | 0 | } |
373 | | |
374 | | bool zmq::xpub_t::xhas_in () |
375 | 0 | { |
376 | 0 | return !_pending_data.empty (); |
377 | 0 | } |
378 | | |
379 | | void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_, |
380 | | size_t size_, |
381 | | xpub_t *self_) |
382 | 0 | { |
383 | 0 | if (self_->options.type != ZMQ_PUB) { |
384 | | // Place the unsubscription to the queue of pending (un)subscriptions |
385 | | // to be retrieved by the user later on. |
386 | 0 | blob_t unsub (size_ + 1); |
387 | 0 | *unsub.data () = 0; |
388 | 0 | if (size_ > 0) |
389 | 0 | memcpy (unsub.data () + 1, data_, size_); |
390 | 0 | self_->_pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub)); |
391 | 0 | self_->_pending_metadata.push_back (NULL); |
392 | 0 | self_->_pending_flags.push_back (0); |
393 | |
|
394 | 0 | if (self_->_manual) { |
395 | 0 | self_->_last_pipe = NULL; |
396 | 0 | self_->_pending_pipes.push_back (NULL); |
397 | 0 | } |
398 | 0 | } |
399 | 0 | } |