Coverage Report

Created: 2025-07-01 06:07

/src/libzmq/src/router.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "router.hpp"
6
#include "pipe.hpp"
7
#include "wire.hpp"
8
#include "random.hpp"
9
#include "likely.hpp"
10
#include "err.hpp"
11
12
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13
0
    routing_socket_base_t (parent_, tid_, sid_),
14
0
    _prefetched (false),
15
0
    _routing_id_sent (false),
16
    _current_in (NULL),
17
0
    _terminate_current_in (false),
18
0
    _more_in (false),
19
    _current_out (NULL),
20
0
    _more_out (false),
21
0
    _next_integral_routing_id (generate_random ()),
22
0
    _mandatory (false),
23
    //  raw_socket functionality in ROUTER is deprecated
24
0
    _raw_socket (false),
25
0
    _probe_router (false),
26
0
    _handover (false)
27
0
{
28
0
    options.type = ZMQ_ROUTER;
29
0
    options.recv_routing_id = true;
30
0
    options.raw_socket = false;
31
0
    options.can_send_hello_msg = true;
32
0
    options.can_recv_disconnect_msg = true;
33
34
0
    _prefetched_id.init ();
35
0
    _prefetched_msg.init ();
36
0
}
37
38
zmq::router_t::~router_t ()
39
0
{
40
0
    zmq_assert (_anonymous_pipes.empty ());
41
0
    _prefetched_id.close ();
42
0
    _prefetched_msg.close ();
43
0
}
44
45
void zmq::router_t::xattach_pipe (pipe_t *pipe_,
46
                                  bool subscribe_to_all_,
47
                                  bool locally_initiated_)
48
0
{
49
0
    LIBZMQ_UNUSED (subscribe_to_all_);
50
51
0
    zmq_assert (pipe_);
52
53
0
    if (_probe_router) {
54
0
        msg_t probe_msg;
55
0
        int rc = probe_msg.init ();
56
0
        errno_assert (rc == 0);
57
58
0
        rc = pipe_->write (&probe_msg);
59
        // zmq_assert (rc) is not applicable here, since it is not a bug.
60
0
        LIBZMQ_UNUSED (rc);
61
62
0
        pipe_->flush ();
63
64
0
        rc = probe_msg.close ();
65
0
        errno_assert (rc == 0);
66
0
    }
67
68
0
    const bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
69
0
    if (routing_id_ok)
70
0
        _fq.attach (pipe_);
71
0
    else
72
0
        _anonymous_pipes.insert (pipe_);
73
0
}
74
75
int zmq::router_t::xsetsockopt (int option_,
76
                                const void *optval_,
77
                                size_t optvallen_)
78
0
{
79
0
    const bool is_int = (optvallen_ == sizeof (int));
80
0
    int value = 0;
81
0
    if (is_int)
82
0
        memcpy (&value, optval_, sizeof (int));
83
84
0
    switch (option_) {
85
0
        case ZMQ_ROUTER_RAW:
86
0
            if (is_int && value >= 0) {
87
0
                _raw_socket = (value != 0);
88
0
                if (_raw_socket) {
89
0
                    options.recv_routing_id = false;
90
0
                    options.raw_socket = true;
91
0
                }
92
0
                return 0;
93
0
            }
94
0
            break;
95
96
0
        case ZMQ_ROUTER_MANDATORY:
97
0
            if (is_int && value >= 0) {
98
0
                _mandatory = (value != 0);
99
0
                return 0;
100
0
            }
101
0
            break;
102
103
0
        case ZMQ_PROBE_ROUTER:
104
0
            if (is_int && value >= 0) {
105
0
                _probe_router = (value != 0);
106
0
                return 0;
107
0
            }
108
0
            break;
109
110
0
        case ZMQ_ROUTER_HANDOVER:
111
0
            if (is_int && value >= 0) {
112
0
                _handover = (value != 0);
113
0
                return 0;
114
0
            }
115
0
            break;
116
117
0
#ifdef ZMQ_BUILD_DRAFT_API
118
0
        case ZMQ_ROUTER_NOTIFY:
119
0
            if (is_int && value >= 0
120
0
                && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) {
121
0
                options.router_notify = value;
122
0
                return 0;
123
0
            }
124
0
            break;
125
0
#endif
126
127
0
        default:
128
0
            return routing_socket_base_t::xsetsockopt (option_, optval_,
129
0
                                                       optvallen_);
130
0
    }
131
0
    errno = EINVAL;
132
0
    return -1;
133
0
}
134
135
136
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
137
0
{
138
0
    if (0 == _anonymous_pipes.erase (pipe_)) {
139
0
        erase_out_pipe (pipe_);
140
0
        _fq.pipe_terminated (pipe_);
141
0
        pipe_->rollback ();
142
0
        if (pipe_ == _current_out)
143
0
            _current_out = NULL;
144
0
    }
145
0
}
146
147
void zmq::router_t::xread_activated (pipe_t *pipe_)
148
0
{
149
0
    const std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
150
0
    if (it == _anonymous_pipes.end ())
151
0
        _fq.activated (pipe_);
152
0
    else {
153
0
        const bool routing_id_ok = identify_peer (pipe_, false);
154
0
        if (routing_id_ok) {
155
0
            _anonymous_pipes.erase (it);
156
0
            _fq.attach (pipe_);
157
0
        }
158
0
    }
159
0
}
160
161
int zmq::router_t::xsend (msg_t *msg_)
162
0
{
163
    //  If this is the first part of the message it's the ID of the
164
    //  peer to send the message to.
165
0
    if (!_more_out) {
166
0
        zmq_assert (!_current_out);
167
168
        //  If we have malformed message (prefix with no subsequent message)
169
        //  then just silently ignore it.
170
        //  TODO: The connections should be killed instead.
171
0
        if (msg_->flags () & msg_t::more) {
172
0
            _more_out = true;
173
174
            //  Find the pipe associated with the routing id stored in the prefix.
175
            //  If there's no such pipe just silently ignore the message, unless
176
            //  router_mandatory is set.
177
0
            out_pipe_t *out_pipe = lookup_out_pipe (
178
0
              blob_t (static_cast<unsigned char *> (msg_->data ()),
179
0
                      msg_->size (), zmq::reference_tag_t ()));
180
181
0
            if (out_pipe) {
182
0
                _current_out = out_pipe->pipe;
183
184
                // Check whether pipe is closed or not
185
0
                if (!_current_out->check_write ()) {
186
                    // Check whether pipe is full or not
187
0
                    const bool pipe_full = !_current_out->check_hwm ();
188
0
                    out_pipe->active = false;
189
0
                    _current_out = NULL;
190
191
0
                    if (_mandatory) {
192
0
                        _more_out = false;
193
0
                        if (pipe_full)
194
0
                            errno = EAGAIN;
195
0
                        else
196
0
                            errno = EHOSTUNREACH;
197
0
                        return -1;
198
0
                    }
199
0
                }
200
0
            } else if (_mandatory) {
201
0
                _more_out = false;
202
0
                errno = EHOSTUNREACH;
203
0
                return -1;
204
0
            }
205
0
        }
206
207
0
        int rc = msg_->close ();
208
0
        errno_assert (rc == 0);
209
0
        rc = msg_->init ();
210
0
        errno_assert (rc == 0);
211
0
        return 0;
212
0
    }
213
214
    //  Ignore the MORE flag for raw-sock or assert?
215
0
    if (options.raw_socket)
216
0
        msg_->reset_flags (msg_t::more);
217
218
    //  Check whether this is the last part of the message.
219
0
    _more_out = (msg_->flags () & msg_t::more) != 0;
220
221
    //  Push the message into the pipe. If there's no out pipe, just drop it.
222
0
    if (_current_out) {
223
        // Close the remote connection if user has asked to do so
224
        // by sending zero length message.
225
        // Pending messages in the pipe will be dropped (on receiving term- ack)
226
0
        if (_raw_socket && msg_->size () == 0) {
227
0
            _current_out->terminate (false);
228
0
            int rc = msg_->close ();
229
0
            errno_assert (rc == 0);
230
0
            rc = msg_->init ();
231
0
            errno_assert (rc == 0);
232
0
            _current_out = NULL;
233
0
            return 0;
234
0
        }
235
236
0
        const bool ok = _current_out->write (msg_);
237
0
        if (unlikely (!ok)) {
238
            // Message failed to send - we must close it ourselves.
239
0
            const int rc = msg_->close ();
240
0
            errno_assert (rc == 0);
241
            // HWM was checked before, so the pipe must be gone. Roll back
242
            // messages that were piped, for example REP labels.
243
0
            _current_out->rollback ();
244
0
            _current_out = NULL;
245
0
        } else {
246
0
            if (!_more_out) {
247
0
                _current_out->flush ();
248
0
                _current_out = NULL;
249
0
            }
250
0
        }
251
0
    } else {
252
0
        const int rc = msg_->close ();
253
0
        errno_assert (rc == 0);
254
0
    }
255
256
    //  Detach the message from the data buffer.
257
0
    const int rc = msg_->init ();
258
0
    errno_assert (rc == 0);
259
260
0
    return 0;
261
0
}
262
263
int zmq::router_t::xrecv (msg_t *msg_)
264
0
{
265
0
    if (_prefetched) {
266
0
        if (!_routing_id_sent) {
267
0
            const int rc = msg_->move (_prefetched_id);
268
0
            errno_assert (rc == 0);
269
0
            _routing_id_sent = true;
270
0
        } else {
271
0
            const int rc = msg_->move (_prefetched_msg);
272
0
            errno_assert (rc == 0);
273
0
            _prefetched = false;
274
0
        }
275
0
        _more_in = (msg_->flags () & msg_t::more) != 0;
276
277
0
        if (!_more_in) {
278
0
            if (_terminate_current_in) {
279
0
                _current_in->terminate (true);
280
0
                _terminate_current_in = false;
281
0
            }
282
0
            _current_in = NULL;
283
0
        }
284
0
        return 0;
285
0
    }
286
287
0
    pipe_t *pipe = NULL;
288
0
    int rc = _fq.recvpipe (msg_, &pipe);
289
290
    //  It's possible that we receive peer's routing id. That happens
291
    //  after reconnection. The current implementation assumes that
292
    //  the peer always uses the same routing id.
293
0
    while (rc == 0 && msg_->is_routing_id ())
294
0
        rc = _fq.recvpipe (msg_, &pipe);
295
296
0
    if (rc != 0)
297
0
        return -1;
298
299
0
    zmq_assert (pipe != NULL);
300
301
    //  If we are in the middle of reading a message, just return the next part.
302
0
    if (_more_in) {
303
0
        _more_in = (msg_->flags () & msg_t::more) != 0;
304
305
0
        if (!_more_in) {
306
0
            if (_terminate_current_in) {
307
0
                _current_in->terminate (true);
308
0
                _terminate_current_in = false;
309
0
            }
310
0
            _current_in = NULL;
311
0
        }
312
0
    } else {
313
        //  We are at the beginning of a message.
314
        //  Keep the message part we have in the prefetch buffer
315
        //  and return the ID of the peer instead.
316
0
        rc = _prefetched_msg.move (*msg_);
317
0
        errno_assert (rc == 0);
318
0
        _prefetched = true;
319
0
        _current_in = pipe;
320
321
0
        const blob_t &routing_id = pipe->get_routing_id ();
322
0
        rc = msg_->init_size (routing_id.size ());
323
0
        errno_assert (rc == 0);
324
0
        memcpy (msg_->data (), routing_id.data (), routing_id.size ());
325
0
        msg_->set_flags (msg_t::more);
326
0
        if (_prefetched_msg.metadata ())
327
0
            msg_->set_metadata (_prefetched_msg.metadata ());
328
0
        _routing_id_sent = true;
329
0
    }
330
331
0
    return 0;
332
0
}
333
334
int zmq::router_t::rollback ()
335
0
{
336
0
    if (_current_out) {
337
0
        _current_out->rollback ();
338
0
        _current_out = NULL;
339
0
        _more_out = false;
340
0
    }
341
0
    return 0;
342
0
}
343
344
bool zmq::router_t::xhas_in ()
345
0
{
346
    //  If we are in the middle of reading the messages, there are
347
    //  definitely more parts available.
348
0
    if (_more_in)
349
0
        return true;
350
351
    //  We may already have a message pre-fetched.
352
0
    if (_prefetched)
353
0
        return true;
354
355
    //  Try to read the next message.
356
    //  The message, if read, is kept in the pre-fetch buffer.
357
0
    pipe_t *pipe = NULL;
358
0
    int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
359
360
    //  It's possible that we receive peer's routing id. That happens
361
    //  after reconnection. The current implementation assumes that
362
    //  the peer always uses the same routing id.
363
    //  TODO: handle the situation when the peer changes its routing id.
364
0
    while (rc == 0 && _prefetched_msg.is_routing_id ())
365
0
        rc = _fq.recvpipe (&_prefetched_msg, &pipe);
366
367
0
    if (rc != 0)
368
0
        return false;
369
370
0
    zmq_assert (pipe != NULL);
371
372
0
    const blob_t &routing_id = pipe->get_routing_id ();
373
0
    rc = _prefetched_id.init_size (routing_id.size ());
374
0
    errno_assert (rc == 0);
375
0
    memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ());
376
0
    _prefetched_id.set_flags (msg_t::more);
377
0
    if (_prefetched_msg.metadata ())
378
0
        _prefetched_id.set_metadata (_prefetched_msg.metadata ());
379
380
0
    _prefetched = true;
381
0
    _routing_id_sent = false;
382
0
    _current_in = pipe;
383
384
0
    return true;
385
0
}
386
387
static bool check_pipe_hwm (const zmq::pipe_t &pipe_)
388
0
{
389
0
    return pipe_.check_hwm ();
390
0
}
391
392
bool zmq::router_t::xhas_out ()
393
0
{
394
    //  In theory, ROUTER socket is always ready for writing (except when
395
    //  MANDATORY is set). Whether actual attempt to write succeeds depends
396
    //  on which pipe the message is going to be routed to.
397
398
0
    if (!_mandatory)
399
0
        return true;
400
401
0
    return any_of_out_pipes (check_pipe_hwm);
402
0
}
403
404
int zmq::router_t::get_peer_state (const void *routing_id_,
405
                                   size_t routing_id_size_) const
406
0
{
407
0
    int res = 0;
408
409
    // TODO remove the const_cast, see comment in lookup_out_pipe
410
0
    const blob_t routing_id_blob (
411
0
      static_cast<unsigned char *> (const_cast<void *> (routing_id_)),
412
0
      routing_id_size_, reference_tag_t ());
413
0
    const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
414
0
    if (!out_pipe) {
415
0
        errno = EHOSTUNREACH;
416
0
        return -1;
417
0
    }
418
419
0
    if (out_pipe->pipe->check_hwm ())
420
0
        res |= ZMQ_POLLOUT;
421
422
    /** \todo does it make any sense to check the inpipe as well? */
423
424
0
    return res;
425
0
}
426
427
bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
428
0
{
429
0
    msg_t msg;
430
0
    blob_t routing_id;
431
432
0
    if (locally_initiated_ && connect_routing_id_is_set ()) {
433
0
        const std::string connect_routing_id = extract_connect_routing_id ();
434
0
        routing_id.set (
435
0
          reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
436
0
          connect_routing_id.length ());
437
        //  Not allowed to duplicate an existing rid
438
0
        zmq_assert (!has_out_pipe (routing_id));
439
0
    } else if (
440
0
      options
441
0
        .raw_socket) { //  Always assign an integral routing id for raw-socket
442
0
        unsigned char buf[5];
443
0
        buf[0] = 0;
444
0
        put_uint32 (buf + 1, _next_integral_routing_id++);
445
0
        routing_id.set (buf, sizeof buf);
446
0
    } else if (!options.raw_socket) {
447
        //  Pick up handshake cases and also case where next integral routing id is set
448
0
        msg.init ();
449
0
        const bool ok = pipe_->read (&msg);
450
0
        if (!ok)
451
0
            return false;
452
453
0
        if (msg.size () == 0) {
454
            //  Fall back on the auto-generation
455
0
            unsigned char buf[5];
456
0
            buf[0] = 0;
457
0
            put_uint32 (buf + 1, _next_integral_routing_id++);
458
0
            routing_id.set (buf, sizeof buf);
459
0
            msg.close ();
460
0
        } else {
461
0
            routing_id.set (static_cast<unsigned char *> (msg.data ()),
462
0
                            msg.size ());
463
0
            msg.close ();
464
465
            //  Try to remove an existing routing id entry to allow the new
466
            //  connection to take the routing id.
467
0
            const out_pipe_t *const existing_outpipe =
468
0
              lookup_out_pipe (routing_id);
469
470
0
            if (existing_outpipe) {
471
0
                if (!_handover)
472
                    //  Ignore peers with duplicate ID
473
0
                    return false;
474
475
                //  We will allow the new connection to take over this
476
                //  routing id. Temporarily assign a new routing id to the
477
                //  existing pipe so we can terminate it asynchronously.
478
0
                unsigned char buf[5];
479
0
                buf[0] = 0;
480
0
                put_uint32 (buf + 1, _next_integral_routing_id++);
481
0
                blob_t new_routing_id (buf, sizeof buf);
482
483
0
                pipe_t *const old_pipe = existing_outpipe->pipe;
484
485
0
                erase_out_pipe (old_pipe);
486
0
                old_pipe->set_router_socket_routing_id (new_routing_id);
487
0
                add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe);
488
489
0
                if (old_pipe == _current_in)
490
0
                    _terminate_current_in = true;
491
0
                else
492
0
                    old_pipe->terminate (true);
493
0
            }
494
0
        }
495
0
    }
496
497
0
    pipe_->set_router_socket_routing_id (routing_id);
498
0
    add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
499
500
0
    return true;
501
0
}