Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include <string.h> |
5 | | |
6 | | #include "macros.hpp" |
7 | | #include "xsub.hpp" |
8 | | #include "err.hpp" |
9 | | |
10 | | zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
11 | 0 | socket_base_t (parent_, tid_, sid_), |
12 | 0 | _verbose_unsubs (false), |
13 | 0 | _has_message (false), |
14 | 0 | _more_send (false), |
15 | 0 | _more_recv (false), |
16 | 0 | _process_subscribe (false), |
17 | 0 | _only_first_subscribe (false) |
18 | 0 | { |
19 | 0 | options.type = ZMQ_XSUB; |
20 | | |
21 | | // When socket is being closed down we don't want to wait till pending |
22 | | // subscription commands are sent to the wire. |
23 | 0 | options.linger.store (0); |
24 | |
|
25 | 0 | const int rc = _message.init (); |
26 | 0 | errno_assert (rc == 0); |
27 | 0 | } |
28 | | |
29 | | zmq::xsub_t::~xsub_t () |
30 | 0 | { |
31 | 0 | const int rc = _message.close (); |
32 | 0 | errno_assert (rc == 0); |
33 | 0 | } |
34 | | |
35 | | void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, |
36 | | bool subscribe_to_all_, |
37 | | bool locally_initiated_) |
38 | 0 | { |
39 | 0 | LIBZMQ_UNUSED (subscribe_to_all_); |
40 | 0 | LIBZMQ_UNUSED (locally_initiated_); |
41 | |
|
42 | 0 | zmq_assert (pipe_); |
43 | 0 | _fq.attach (pipe_); |
44 | 0 | _dist.attach (pipe_); |
45 | | |
46 | | // Send all the cached subscriptions to the new upstream peer. |
47 | 0 | _subscriptions.apply (send_subscription, pipe_); |
48 | 0 | pipe_->flush (); |
49 | 0 | } |
50 | | |
51 | | void zmq::xsub_t::xread_activated (pipe_t *pipe_) |
52 | 0 | { |
53 | 0 | _fq.activated (pipe_); |
54 | 0 | } |
55 | | |
56 | | void zmq::xsub_t::xwrite_activated (pipe_t *pipe_) |
57 | 0 | { |
58 | 0 | _dist.activated (pipe_); |
59 | 0 | } |
60 | | |
61 | | void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_) |
62 | 0 | { |
63 | 0 | _fq.pipe_terminated (pipe_); |
64 | 0 | _dist.pipe_terminated (pipe_); |
65 | 0 | } |
66 | | |
67 | | void zmq::xsub_t::xhiccuped (pipe_t *pipe_) |
68 | 0 | { |
69 | | // Send all the cached subscriptions to the hiccuped pipe. |
70 | 0 | _subscriptions.apply (send_subscription, pipe_); |
71 | 0 | pipe_->flush (); |
72 | 0 | } |
73 | | |
74 | | int zmq::xsub_t::xsetsockopt (int option_, |
75 | | const void *optval_, |
76 | | size_t optvallen_) |
77 | 0 | { |
78 | 0 | if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) { |
79 | 0 | if (optvallen_ != sizeof (int) |
80 | 0 | || *static_cast<const int *> (optval_) < 0) { |
81 | 0 | errno = EINVAL; |
82 | 0 | return -1; |
83 | 0 | } |
84 | 0 | _only_first_subscribe = (*static_cast<const int *> (optval_) != 0); |
85 | 0 | return 0; |
86 | 0 | } |
87 | 0 | #ifdef ZMQ_BUILD_DRAFT_API |
88 | 0 | else if (option_ == ZMQ_XSUB_VERBOSE_UNSUBSCRIBE) { |
89 | 0 | _verbose_unsubs = (*static_cast<const int *> (optval_) != 0); |
90 | 0 | return 0; |
91 | 0 | } |
92 | 0 | #endif |
93 | 0 | errno = EINVAL; |
94 | 0 | return -1; |
95 | 0 | } |
96 | | |
97 | | int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_) |
98 | 0 | { |
99 | 0 | if (option_ == ZMQ_TOPICS_COUNT) { |
100 | | // make sure to use a multi-thread safe function to avoid race conditions with I/O threads |
101 | | // where subscriptions are processed: |
102 | 0 | #ifdef ZMQ_USE_RADIX_TREE |
103 | 0 | uint64_t num_subscriptions = _subscriptions.size (); |
104 | | #else |
105 | | uint64_t num_subscriptions = _subscriptions.num_prefixes (); |
106 | | #endif |
107 | |
|
108 | 0 | return do_getsockopt<int> (optval_, optvallen_, |
109 | 0 | (int) num_subscriptions); |
110 | 0 | } |
111 | | |
112 | | // room for future options here |
113 | | |
114 | 0 | errno = EINVAL; |
115 | 0 | return -1; |
116 | 0 | } |
117 | | |
118 | | int zmq::xsub_t::xsend (msg_t *msg_) |
119 | 0 | { |
120 | 0 | size_t size = msg_->size (); |
121 | 0 | unsigned char *data = static_cast<unsigned char *> (msg_->data ()); |
122 | |
|
123 | 0 | const bool first_part = !_more_send; |
124 | 0 | _more_send = (msg_->flags () & msg_t::more) != 0; |
125 | |
|
126 | 0 | if (first_part) { |
127 | 0 | _process_subscribe = !_only_first_subscribe; |
128 | 0 | } else if (!_process_subscribe) { |
129 | | // User message sent upstream to XPUB socket |
130 | 0 | return _dist.send_to_all (msg_); |
131 | 0 | } |
132 | | |
133 | 0 | if (msg_->is_subscribe () || (size > 0 && *data == 1)) { |
134 | | // Process subscribe message |
135 | | // This used to filter out duplicate subscriptions, |
136 | | // however this is already done on the XPUB side and |
137 | | // doing it here as well breaks ZMQ_XPUB_VERBOSE |
138 | | // when there are forwarding devices involved. |
139 | 0 | if (!msg_->is_subscribe ()) { |
140 | 0 | data = data + 1; |
141 | 0 | size = size - 1; |
142 | 0 | } |
143 | 0 | _subscriptions.add (data, size); |
144 | 0 | _process_subscribe = true; |
145 | 0 | return _dist.send_to_all (msg_); |
146 | 0 | } |
147 | 0 | if (msg_->is_cancel () || (size > 0 && *data == 0)) { |
148 | | // Process unsubscribe message |
149 | 0 | if (!msg_->is_cancel ()) { |
150 | 0 | data = data + 1; |
151 | 0 | size = size - 1; |
152 | 0 | } |
153 | 0 | _process_subscribe = true; |
154 | 0 | const bool rm_result = _subscriptions.rm (data, size); |
155 | 0 | if (rm_result || _verbose_unsubs) |
156 | 0 | return _dist.send_to_all (msg_); |
157 | 0 | } else |
158 | | // User message sent upstream to XPUB socket |
159 | 0 | return _dist.send_to_all (msg_); |
160 | | |
161 | 0 | int rc = msg_->close (); |
162 | 0 | errno_assert (rc == 0); |
163 | 0 | rc = msg_->init (); |
164 | 0 | errno_assert (rc == 0); |
165 | |
|
166 | 0 | return 0; |
167 | 0 | } |
168 | | |
169 | | bool zmq::xsub_t::xhas_out () |
170 | 0 | { |
171 | | // Subscription can be added/removed anytime. |
172 | 0 | return true; |
173 | 0 | } |
174 | | |
175 | | int zmq::xsub_t::xrecv (msg_t *msg_) |
176 | 0 | { |
177 | | // If there's already a message prepared by a previous call to zmq_poll, |
178 | | // return it straight ahead. |
179 | 0 | if (_has_message) { |
180 | 0 | const int rc = msg_->move (_message); |
181 | 0 | errno_assert (rc == 0); |
182 | 0 | _has_message = false; |
183 | 0 | _more_recv = (msg_->flags () & msg_t::more) != 0; |
184 | 0 | return 0; |
185 | 0 | } |
186 | | |
187 | | // TODO: This can result in infinite loop in the case of continuous |
188 | | // stream of non-matching messages which breaks the non-blocking recv |
189 | | // semantics. |
190 | 0 | while (true) { |
191 | | // Get a message using fair queueing algorithm. |
192 | 0 | int rc = _fq.recv (msg_); |
193 | | |
194 | | // If there's no message available, return immediately. |
195 | | // The same when error occurs. |
196 | 0 | if (rc != 0) |
197 | 0 | return -1; |
198 | | |
199 | | // Check whether the message matches at least one subscription. |
200 | | // Non-initial parts of the message are passed |
201 | 0 | if (_more_recv || !options.filter || match (msg_)) { |
202 | 0 | _more_recv = (msg_->flags () & msg_t::more) != 0; |
203 | 0 | return 0; |
204 | 0 | } |
205 | | |
206 | | // Message doesn't match. Pop any remaining parts of the message |
207 | | // from the pipe. |
208 | 0 | while (msg_->flags () & msg_t::more) { |
209 | 0 | rc = _fq.recv (msg_); |
210 | 0 | errno_assert (rc == 0); |
211 | 0 | } |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | bool zmq::xsub_t::xhas_in () |
216 | 0 | { |
217 | | // There are subsequent parts of the partly-read message available. |
218 | 0 | if (_more_recv) |
219 | 0 | return true; |
220 | | |
221 | | // If there's already a message prepared by a previous call to zmq_poll, |
222 | | // return straight ahead. |
223 | 0 | if (_has_message) |
224 | 0 | return true; |
225 | | |
226 | | // TODO: This can result in infinite loop in the case of continuous |
227 | | // stream of non-matching messages. |
228 | 0 | while (true) { |
229 | | // Get a message using fair queueing algorithm. |
230 | 0 | int rc = _fq.recv (&_message); |
231 | | |
232 | | // If there's no message available, return immediately. |
233 | | // The same when error occurs. |
234 | 0 | if (rc != 0) { |
235 | 0 | errno_assert (errno == EAGAIN); |
236 | 0 | return false; |
237 | 0 | } |
238 | | |
239 | | // Check whether the message matches at least one subscription. |
240 | 0 | if (!options.filter || match (&_message)) { |
241 | 0 | _has_message = true; |
242 | 0 | return true; |
243 | 0 | } |
244 | | |
245 | | // Message doesn't match. Pop any remaining parts of the message |
246 | | // from the pipe. |
247 | 0 | while (_message.flags () & msg_t::more) { |
248 | 0 | rc = _fq.recv (&_message); |
249 | 0 | errno_assert (rc == 0); |
250 | 0 | } |
251 | 0 | } |
252 | 0 | } |
253 | | |
254 | | bool zmq::xsub_t::match (msg_t *msg_) |
255 | 0 | { |
256 | 0 | const bool matching = _subscriptions.check ( |
257 | 0 | static_cast<unsigned char *> (msg_->data ()), msg_->size ()); |
258 | |
|
259 | 0 | return matching ^ options.invert_matching; |
260 | 0 | } |
261 | | |
262 | | void zmq::xsub_t::send_subscription (unsigned char *data_, |
263 | | size_t size_, |
264 | | void *arg_) |
265 | 0 | { |
266 | 0 | pipe_t *pipe = static_cast<pipe_t *> (arg_); |
267 | | |
268 | | // Create the subscription message. |
269 | 0 | msg_t msg; |
270 | 0 | const int rc = msg.init_subscribe (size_, data_); |
271 | 0 | errno_assert (rc == 0); |
272 | | |
273 | | // Send it to the pipe. |
274 | 0 | const bool sent = pipe->write (&msg); |
275 | | // If we reached the SNDHWM, and thus cannot send the subscription, drop |
276 | | // the subscription message instead. This matches the behaviour of |
277 | | // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions |
278 | | // when the SNDHWM is reached. |
279 | 0 | if (!sent) |
280 | 0 | msg.close (); |
281 | 0 | } |