/src/libzmq/src/router.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "router.hpp" |
6 | | #include "pipe.hpp" |
7 | | #include "wire.hpp" |
8 | | #include "random.hpp" |
9 | | #include "likely.hpp" |
10 | | #include "err.hpp" |
11 | | |
12 | | zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
13 | 0 | routing_socket_base_t (parent_, tid_, sid_), |
14 | 0 | _prefetched (false), |
15 | 0 | _routing_id_sent (false), |
16 | | _current_in (NULL), |
17 | 0 | _terminate_current_in (false), |
18 | 0 | _more_in (false), |
19 | | _current_out (NULL), |
20 | 0 | _more_out (false), |
21 | 0 | _next_integral_routing_id (generate_random ()), |
22 | 0 | _mandatory (false), |
23 | | // raw_socket functionality in ROUTER is deprecated |
24 | 0 | _raw_socket (false), |
25 | 0 | _probe_router (false), |
26 | 0 | _handover (false) |
27 | 0 | { |
28 | 0 | options.type = ZMQ_ROUTER; |
29 | 0 | options.recv_routing_id = true; |
30 | 0 | options.raw_socket = false; |
31 | 0 | options.can_send_hello_msg = true; |
32 | 0 | options.can_recv_disconnect_msg = true; |
33 | |
|
34 | 0 | _prefetched_id.init (); |
35 | 0 | _prefetched_msg.init (); |
36 | 0 | } |
37 | | |
38 | | zmq::router_t::~router_t () |
39 | 0 | { |
40 | 0 | zmq_assert (_anonymous_pipes.empty ()); |
41 | 0 | _prefetched_id.close (); |
42 | 0 | _prefetched_msg.close (); |
43 | 0 | } |
44 | | |
45 | | void zmq::router_t::xattach_pipe (pipe_t *pipe_, |
46 | | bool subscribe_to_all_, |
47 | | bool locally_initiated_) |
48 | 0 | { |
49 | 0 | LIBZMQ_UNUSED (subscribe_to_all_); |
50 | |
|
51 | 0 | zmq_assert (pipe_); |
52 | |
|
53 | 0 | if (_probe_router) { |
54 | 0 | msg_t probe_msg; |
55 | 0 | int rc = probe_msg.init (); |
56 | 0 | errno_assert (rc == 0); |
57 | |
|
58 | 0 | rc = pipe_->write (&probe_msg); |
59 | | // zmq_assert (rc) is not applicable here, since it is not a bug. |
60 | 0 | LIBZMQ_UNUSED (rc); |
61 | |
|
62 | 0 | pipe_->flush (); |
63 | |
|
64 | 0 | rc = probe_msg.close (); |
65 | 0 | errno_assert (rc == 0); |
66 | 0 | } |
67 | |
|
68 | 0 | const bool routing_id_ok = identify_peer (pipe_, locally_initiated_); |
69 | 0 | if (routing_id_ok) |
70 | 0 | _fq.attach (pipe_); |
71 | 0 | else |
72 | 0 | _anonymous_pipes.insert (pipe_); |
73 | 0 | } |
74 | | |
75 | | int zmq::router_t::xsetsockopt (int option_, |
76 | | const void *optval_, |
77 | | size_t optvallen_) |
78 | 0 | { |
79 | 0 | const bool is_int = (optvallen_ == sizeof (int)); |
80 | 0 | int value = 0; |
81 | 0 | if (is_int) |
82 | 0 | memcpy (&value, optval_, sizeof (int)); |
83 | |
|
84 | 0 | switch (option_) { |
85 | 0 | case ZMQ_ROUTER_RAW: |
86 | 0 | if (is_int && value >= 0) { |
87 | 0 | _raw_socket = (value != 0); |
88 | 0 | if (_raw_socket) { |
89 | 0 | options.recv_routing_id = false; |
90 | 0 | options.raw_socket = true; |
91 | 0 | } |
92 | 0 | return 0; |
93 | 0 | } |
94 | 0 | break; |
95 | | |
96 | 0 | case ZMQ_ROUTER_MANDATORY: |
97 | 0 | if (is_int && value >= 0) { |
98 | 0 | _mandatory = (value != 0); |
99 | 0 | return 0; |
100 | 0 | } |
101 | 0 | break; |
102 | | |
103 | 0 | case ZMQ_PROBE_ROUTER: |
104 | 0 | if (is_int && value >= 0) { |
105 | 0 | _probe_router = (value != 0); |
106 | 0 | return 0; |
107 | 0 | } |
108 | 0 | break; |
109 | | |
110 | 0 | case ZMQ_ROUTER_HANDOVER: |
111 | 0 | if (is_int && value >= 0) { |
112 | 0 | _handover = (value != 0); |
113 | 0 | return 0; |
114 | 0 | } |
115 | 0 | break; |
116 | | |
117 | 0 | #ifdef ZMQ_BUILD_DRAFT_API |
118 | 0 | case ZMQ_ROUTER_NOTIFY: |
119 | 0 | if (is_int && value >= 0 |
120 | 0 | && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) { |
121 | 0 | options.router_notify = value; |
122 | 0 | return 0; |
123 | 0 | } |
124 | 0 | break; |
125 | 0 | #endif |
126 | | |
127 | 0 | default: |
128 | 0 | return routing_socket_base_t::xsetsockopt (option_, optval_, |
129 | 0 | optvallen_); |
130 | 0 | } |
131 | 0 | errno = EINVAL; |
132 | 0 | return -1; |
133 | 0 | } |
134 | | |
135 | | |
136 | | void zmq::router_t::xpipe_terminated (pipe_t *pipe_) |
137 | 0 | { |
138 | 0 | if (0 == _anonymous_pipes.erase (pipe_)) { |
139 | 0 | erase_out_pipe (pipe_); |
140 | 0 | _fq.pipe_terminated (pipe_); |
141 | 0 | pipe_->rollback (); |
142 | 0 | if (pipe_ == _current_out) |
143 | 0 | _current_out = NULL; |
144 | 0 | } |
145 | 0 | } |
146 | | |
147 | | void zmq::router_t::xread_activated (pipe_t *pipe_) |
148 | 0 | { |
149 | 0 | const std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_); |
150 | 0 | if (it == _anonymous_pipes.end ()) |
151 | 0 | _fq.activated (pipe_); |
152 | 0 | else { |
153 | 0 | const bool routing_id_ok = identify_peer (pipe_, false); |
154 | 0 | if (routing_id_ok) { |
155 | 0 | _anonymous_pipes.erase (it); |
156 | 0 | _fq.attach (pipe_); |
157 | 0 | } |
158 | 0 | } |
159 | 0 | } |
160 | | |
161 | | int zmq::router_t::xsend (msg_t *msg_) |
162 | 0 | { |
163 | | // If this is the first part of the message it's the ID of the |
164 | | // peer to send the message to. |
165 | 0 | if (!_more_out) { |
166 | 0 | zmq_assert (!_current_out); |
167 | | |
168 | | // If we have malformed message (prefix with no subsequent message) |
169 | | // then just silently ignore it. |
170 | | // TODO: The connections should be killed instead. |
171 | 0 | if (msg_->flags () & msg_t::more) { |
172 | 0 | _more_out = true; |
173 | | |
174 | | // Find the pipe associated with the routing id stored in the prefix. |
175 | | // If there's no such pipe just silently ignore the message, unless |
176 | | // router_mandatory is set. |
177 | 0 | out_pipe_t *out_pipe = lookup_out_pipe ( |
178 | 0 | blob_t (static_cast<unsigned char *> (msg_->data ()), |
179 | 0 | msg_->size (), zmq::reference_tag_t ())); |
180 | |
|
181 | 0 | if (out_pipe) { |
182 | 0 | _current_out = out_pipe->pipe; |
183 | | |
184 | | // Check whether pipe is closed or not |
185 | 0 | if (!_current_out->check_write ()) { |
186 | | // Check whether pipe is full or not |
187 | 0 | const bool pipe_full = !_current_out->check_hwm (); |
188 | 0 | out_pipe->active = false; |
189 | 0 | _current_out = NULL; |
190 | |
|
191 | 0 | if (_mandatory) { |
192 | 0 | _more_out = false; |
193 | 0 | if (pipe_full) |
194 | 0 | errno = EAGAIN; |
195 | 0 | else |
196 | 0 | errno = EHOSTUNREACH; |
197 | 0 | return -1; |
198 | 0 | } |
199 | 0 | } |
200 | 0 | } else if (_mandatory) { |
201 | 0 | _more_out = false; |
202 | 0 | errno = EHOSTUNREACH; |
203 | 0 | return -1; |
204 | 0 | } |
205 | 0 | } |
206 | | |
207 | 0 | int rc = msg_->close (); |
208 | 0 | errno_assert (rc == 0); |
209 | 0 | rc = msg_->init (); |
210 | 0 | errno_assert (rc == 0); |
211 | 0 | return 0; |
212 | 0 | } |
213 | | |
214 | | // Ignore the MORE flag for raw-sock or assert? |
215 | 0 | if (options.raw_socket) |
216 | 0 | msg_->reset_flags (msg_t::more); |
217 | | |
218 | | // Check whether this is the last part of the message. |
219 | 0 | _more_out = (msg_->flags () & msg_t::more) != 0; |
220 | | |
221 | | // Push the message into the pipe. If there's no out pipe, just drop it. |
222 | 0 | if (_current_out) { |
223 | | // Close the remote connection if user has asked to do so |
224 | | // by sending zero length message. |
225 | | // Pending messages in the pipe will be dropped (on receiving term- ack) |
226 | 0 | if (_raw_socket && msg_->size () == 0) { |
227 | 0 | _current_out->terminate (false); |
228 | 0 | int rc = msg_->close (); |
229 | 0 | errno_assert (rc == 0); |
230 | 0 | rc = msg_->init (); |
231 | 0 | errno_assert (rc == 0); |
232 | 0 | _current_out = NULL; |
233 | 0 | return 0; |
234 | 0 | } |
235 | | |
236 | 0 | const bool ok = _current_out->write (msg_); |
237 | 0 | if (unlikely (!ok)) { |
238 | | // Message failed to send - we must close it ourselves. |
239 | 0 | const int rc = msg_->close (); |
240 | 0 | errno_assert (rc == 0); |
241 | | // HWM was checked before, so the pipe must be gone. Roll back |
242 | | // messages that were piped, for example REP labels. |
243 | 0 | _current_out->rollback (); |
244 | 0 | _current_out = NULL; |
245 | 0 | } else { |
246 | 0 | if (!_more_out) { |
247 | 0 | _current_out->flush (); |
248 | 0 | _current_out = NULL; |
249 | 0 | } |
250 | 0 | } |
251 | 0 | } else { |
252 | 0 | const int rc = msg_->close (); |
253 | 0 | errno_assert (rc == 0); |
254 | 0 | } |
255 | | |
256 | | // Detach the message from the data buffer. |
257 | 0 | const int rc = msg_->init (); |
258 | 0 | errno_assert (rc == 0); |
259 | |
|
260 | 0 | return 0; |
261 | 0 | } |
262 | | |
263 | | int zmq::router_t::xrecv (msg_t *msg_) |
264 | 0 | { |
265 | 0 | if (_prefetched) { |
266 | 0 | if (!_routing_id_sent) { |
267 | 0 | const int rc = msg_->move (_prefetched_id); |
268 | 0 | errno_assert (rc == 0); |
269 | 0 | _routing_id_sent = true; |
270 | 0 | } else { |
271 | 0 | const int rc = msg_->move (_prefetched_msg); |
272 | 0 | errno_assert (rc == 0); |
273 | 0 | _prefetched = false; |
274 | 0 | } |
275 | 0 | _more_in = (msg_->flags () & msg_t::more) != 0; |
276 | |
|
277 | 0 | if (!_more_in) { |
278 | 0 | if (_terminate_current_in) { |
279 | 0 | _current_in->terminate (true); |
280 | 0 | _terminate_current_in = false; |
281 | 0 | } |
282 | 0 | _current_in = NULL; |
283 | 0 | } |
284 | 0 | return 0; |
285 | 0 | } |
286 | | |
287 | 0 | pipe_t *pipe = NULL; |
288 | 0 | int rc = _fq.recvpipe (msg_, &pipe); |
289 | | |
290 | | // It's possible that we receive peer's routing id. That happens |
291 | | // after reconnection. The current implementation assumes that |
292 | | // the peer always uses the same routing id. |
293 | 0 | while (rc == 0 && msg_->is_routing_id ()) |
294 | 0 | rc = _fq.recvpipe (msg_, &pipe); |
295 | |
|
296 | 0 | if (rc != 0) |
297 | 0 | return -1; |
298 | | |
299 | 0 | zmq_assert (pipe != NULL); |
300 | | |
301 | | // If we are in the middle of reading a message, just return the next part. |
302 | 0 | if (_more_in) { |
303 | 0 | _more_in = (msg_->flags () & msg_t::more) != 0; |
304 | |
|
305 | 0 | if (!_more_in) { |
306 | 0 | if (_terminate_current_in) { |
307 | 0 | _current_in->terminate (true); |
308 | 0 | _terminate_current_in = false; |
309 | 0 | } |
310 | 0 | _current_in = NULL; |
311 | 0 | } |
312 | 0 | } else { |
313 | | // We are at the beginning of a message. |
314 | | // Keep the message part we have in the prefetch buffer |
315 | | // and return the ID of the peer instead. |
316 | 0 | rc = _prefetched_msg.move (*msg_); |
317 | 0 | errno_assert (rc == 0); |
318 | 0 | _prefetched = true; |
319 | 0 | _current_in = pipe; |
320 | |
|
321 | 0 | const blob_t &routing_id = pipe->get_routing_id (); |
322 | 0 | rc = msg_->init_size (routing_id.size ()); |
323 | 0 | errno_assert (rc == 0); |
324 | 0 | memcpy (msg_->data (), routing_id.data (), routing_id.size ()); |
325 | 0 | msg_->set_flags (msg_t::more); |
326 | 0 | if (_prefetched_msg.metadata ()) |
327 | 0 | msg_->set_metadata (_prefetched_msg.metadata ()); |
328 | 0 | _routing_id_sent = true; |
329 | 0 | } |
330 | |
|
331 | 0 | return 0; |
332 | 0 | } |
333 | | |
334 | | int zmq::router_t::rollback () |
335 | 0 | { |
336 | 0 | if (_current_out) { |
337 | 0 | _current_out->rollback (); |
338 | 0 | _current_out = NULL; |
339 | 0 | _more_out = false; |
340 | 0 | } |
341 | 0 | return 0; |
342 | 0 | } |
343 | | |
344 | | bool zmq::router_t::xhas_in () |
345 | 0 | { |
346 | | // If we are in the middle of reading the messages, there are |
347 | | // definitely more parts available. |
348 | 0 | if (_more_in) |
349 | 0 | return true; |
350 | | |
351 | | // We may already have a message pre-fetched. |
352 | 0 | if (_prefetched) |
353 | 0 | return true; |
354 | | |
355 | | // Try to read the next message. |
356 | | // The message, if read, is kept in the pre-fetch buffer. |
357 | 0 | pipe_t *pipe = NULL; |
358 | 0 | int rc = _fq.recvpipe (&_prefetched_msg, &pipe); |
359 | | |
360 | | // It's possible that we receive peer's routing id. That happens |
361 | | // after reconnection. The current implementation assumes that |
362 | | // the peer always uses the same routing id. |
363 | | // TODO: handle the situation when the peer changes its routing id. |
364 | 0 | while (rc == 0 && _prefetched_msg.is_routing_id ()) |
365 | 0 | rc = _fq.recvpipe (&_prefetched_msg, &pipe); |
366 | |
|
367 | 0 | if (rc != 0) |
368 | 0 | return false; |
369 | | |
370 | 0 | zmq_assert (pipe != NULL); |
371 | |
|
372 | 0 | const blob_t &routing_id = pipe->get_routing_id (); |
373 | 0 | rc = _prefetched_id.init_size (routing_id.size ()); |
374 | 0 | errno_assert (rc == 0); |
375 | 0 | memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ()); |
376 | 0 | _prefetched_id.set_flags (msg_t::more); |
377 | 0 | if (_prefetched_msg.metadata ()) |
378 | 0 | _prefetched_id.set_metadata (_prefetched_msg.metadata ()); |
379 | |
|
380 | 0 | _prefetched = true; |
381 | 0 | _routing_id_sent = false; |
382 | 0 | _current_in = pipe; |
383 | |
|
384 | 0 | return true; |
385 | 0 | } |
386 | | |
387 | | static bool check_pipe_hwm (const zmq::pipe_t &pipe_) |
388 | 0 | { |
389 | 0 | return pipe_.check_hwm (); |
390 | 0 | } |
391 | | |
392 | | bool zmq::router_t::xhas_out () |
393 | 0 | { |
394 | | // In theory, ROUTER socket is always ready for writing (except when |
395 | | // MANDATORY is set). Whether actual attempt to write succeeds depends |
396 | | // on which pipe the message is going to be routed to. |
397 | |
|
398 | 0 | if (!_mandatory) |
399 | 0 | return true; |
400 | | |
401 | 0 | return any_of_out_pipes (check_pipe_hwm); |
402 | 0 | } |
403 | | |
404 | | int zmq::router_t::get_peer_state (const void *routing_id_, |
405 | | size_t routing_id_size_) const |
406 | 0 | { |
407 | 0 | int res = 0; |
408 | | |
409 | | // TODO remove the const_cast, see comment in lookup_out_pipe |
410 | 0 | const blob_t routing_id_blob ( |
411 | 0 | static_cast<unsigned char *> (const_cast<void *> (routing_id_)), |
412 | 0 | routing_id_size_, reference_tag_t ()); |
413 | 0 | const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob); |
414 | 0 | if (!out_pipe) { |
415 | 0 | errno = EHOSTUNREACH; |
416 | 0 | return -1; |
417 | 0 | } |
418 | | |
419 | 0 | if (out_pipe->pipe->check_hwm ()) |
420 | 0 | res |= ZMQ_POLLOUT; |
421 | | |
422 | | /** \todo does it make any sense to check the inpipe as well? */ |
423 | |
|
424 | 0 | return res; |
425 | 0 | } |
426 | | |
427 | | bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) |
428 | 0 | { |
429 | 0 | msg_t msg; |
430 | 0 | blob_t routing_id; |
431 | |
|
432 | 0 | if (locally_initiated_ && connect_routing_id_is_set ()) { |
433 | 0 | const std::string connect_routing_id = extract_connect_routing_id (); |
434 | 0 | routing_id.set ( |
435 | 0 | reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()), |
436 | 0 | connect_routing_id.length ()); |
437 | | // Not allowed to duplicate an existing rid |
438 | 0 | zmq_assert (!has_out_pipe (routing_id)); |
439 | 0 | } else if ( |
440 | 0 | options |
441 | 0 | .raw_socket) { // Always assign an integral routing id for raw-socket |
442 | 0 | unsigned char buf[5]; |
443 | 0 | buf[0] = 0; |
444 | 0 | put_uint32 (buf + 1, _next_integral_routing_id++); |
445 | 0 | routing_id.set (buf, sizeof buf); |
446 | 0 | } else if (!options.raw_socket) { |
447 | | // Pick up handshake cases and also case where next integral routing id is set |
448 | 0 | msg.init (); |
449 | 0 | const bool ok = pipe_->read (&msg); |
450 | 0 | if (!ok) |
451 | 0 | return false; |
452 | | |
453 | 0 | if (msg.size () == 0) { |
454 | | // Fall back on the auto-generation |
455 | 0 | unsigned char buf[5]; |
456 | 0 | buf[0] = 0; |
457 | 0 | put_uint32 (buf + 1, _next_integral_routing_id++); |
458 | 0 | routing_id.set (buf, sizeof buf); |
459 | 0 | msg.close (); |
460 | 0 | } else { |
461 | 0 | routing_id.set (static_cast<unsigned char *> (msg.data ()), |
462 | 0 | msg.size ()); |
463 | 0 | msg.close (); |
464 | | |
465 | | // Try to remove an existing routing id entry to allow the new |
466 | | // connection to take the routing id. |
467 | 0 | const out_pipe_t *const existing_outpipe = |
468 | 0 | lookup_out_pipe (routing_id); |
469 | |
|
470 | 0 | if (existing_outpipe) { |
471 | 0 | if (!_handover) |
472 | | // Ignore peers with duplicate ID |
473 | 0 | return false; |
474 | | |
475 | | // We will allow the new connection to take over this |
476 | | // routing id. Temporarily assign a new routing id to the |
477 | | // existing pipe so we can terminate it asynchronously. |
478 | 0 | unsigned char buf[5]; |
479 | 0 | buf[0] = 0; |
480 | 0 | put_uint32 (buf + 1, _next_integral_routing_id++); |
481 | 0 | blob_t new_routing_id (buf, sizeof buf); |
482 | |
|
483 | 0 | pipe_t *const old_pipe = existing_outpipe->pipe; |
484 | |
|
485 | 0 | erase_out_pipe (old_pipe); |
486 | 0 | old_pipe->set_router_socket_routing_id (new_routing_id); |
487 | 0 | add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe); |
488 | |
|
489 | 0 | if (old_pipe == _current_in) |
490 | 0 | _terminate_current_in = true; |
491 | 0 | else |
492 | 0 | old_pipe->terminate (true); |
493 | 0 | } |
494 | 0 | } |
495 | 0 | } |
496 | | |
497 | 0 | pipe_->set_router_socket_routing_id (routing_id); |
498 | 0 | add_out_pipe (ZMQ_MOVE (routing_id), pipe_); |
499 | |
|
500 | 0 | return true; |
501 | 0 | } |