Coverage Report

Created: 2025-07-12 06:05

/src/libzmq/src/zmtp_engine.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
6
#include <limits.h>
7
#include <string.h>
8
9
#ifndef ZMQ_HAVE_WINDOWS
10
#include <unistd.h>
11
#endif
12
13
#include <new>
14
#include <sstream>
15
16
#include "zmtp_engine.hpp"
17
#include "io_thread.hpp"
18
#include "session_base.hpp"
19
#include "v1_encoder.hpp"
20
#include "v1_decoder.hpp"
21
#include "v2_encoder.hpp"
22
#include "v2_decoder.hpp"
23
#include "v3_1_encoder.hpp"
24
#include "null_mechanism.hpp"
25
#include "plain_client.hpp"
26
#include "plain_server.hpp"
27
#include "gssapi_client.hpp"
28
#include "gssapi_server.hpp"
29
#include "curve_client.hpp"
30
#include "curve_server.hpp"
31
#include "raw_decoder.hpp"
32
#include "raw_encoder.hpp"
33
#include "config.hpp"
34
#include "err.hpp"
35
#include "ip.hpp"
36
#include "likely.hpp"
37
#include "wire.hpp"
38
39
zmq::zmtp_engine_t::zmtp_engine_t (
40
  fd_t fd_,
41
  const options_t &options_,
42
  const endpoint_uri_pair_t &endpoint_uri_pair_) :
43
0
    stream_engine_base_t (fd_, options_, endpoint_uri_pair_, true),
44
0
    _greeting_size (v2_greeting_size),
45
0
    _greeting_bytes_read (0),
46
0
    _subscription_required (false),
47
0
    _heartbeat_timeout (0)
48
0
{
49
0
    _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
50
0
      &zmtp_engine_t::routing_id_msg);
51
0
    _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
52
0
      &zmtp_engine_t::process_routing_id_msg);
53
54
0
    int rc = _pong_msg.init ();
55
0
    errno_assert (rc == 0);
56
57
0
    rc = _routing_id_msg.init ();
58
0
    errno_assert (rc == 0);
59
60
0
    if (_options.heartbeat_interval > 0) {
61
0
        _heartbeat_timeout = _options.heartbeat_timeout;
62
0
        if (_heartbeat_timeout == -1)
63
0
            _heartbeat_timeout = _options.heartbeat_interval;
64
0
    }
65
0
}
66
67
zmq::zmtp_engine_t::~zmtp_engine_t ()
68
0
{
69
0
    const int rc = _routing_id_msg.close ();
70
0
    errno_assert (rc == 0);
71
0
}
72
73
void zmq::zmtp_engine_t::plug_internal ()
74
0
{
75
    // start optional timer, to prevent handshake hanging on no input
76
0
    set_handshake_timer ();
77
78
    //  Send the 'length' and 'flags' fields of the routing id message.
79
    //  The 'length' field is encoded in the long format.
80
0
    _outpos = _greeting_send;
81
0
    _outpos[_outsize++] = UCHAR_MAX;
82
0
    put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
83
0
    _outsize += 8;
84
0
    _outpos[_outsize++] = 0x7f;
85
86
0
    set_pollin ();
87
0
    set_pollout ();
88
    //  Flush all the data that may have been already received downstream.
89
0
    in_event ();
90
0
}
91
92
//  Position of the revision and minor fields in the greeting.
93
const size_t revision_pos = 10;
94
const size_t minor_pos = 11;
95
96
bool zmq::zmtp_engine_t::handshake ()
97
0
{
98
0
    zmq_assert (_greeting_bytes_read < _greeting_size);
99
    //  Receive the greeting.
100
0
    const int rc = receive_greeting ();
101
0
    if (rc == -1)
102
0
        return false;
103
0
    const bool unversioned = rc != 0;
104
105
0
    if (!(this
106
0
            ->*select_handshake_fun (unversioned, _greeting_recv[revision_pos],
107
0
                                     _greeting_recv[minor_pos])) ())
108
0
        return false;
109
110
    // Start polling for output if necessary.
111
0
    if (_outsize == 0)
112
0
        set_pollout ();
113
114
0
    return true;
115
0
}
116
117
int zmq::zmtp_engine_t::receive_greeting ()
118
0
{
119
0
    bool unversioned = false;
120
0
    while (_greeting_bytes_read < _greeting_size) {
121
0
        const int n = read (_greeting_recv + _greeting_bytes_read,
122
0
                            _greeting_size - _greeting_bytes_read);
123
0
        if (n == -1) {
124
0
            if (errno != EAGAIN)
125
0
                error (connection_error);
126
0
            return -1;
127
0
        }
128
129
0
        _greeting_bytes_read += n;
130
131
        //  We have received at least one byte from the peer.
132
        //  If the first byte is not 0xff, we know that the
133
        //  peer is using unversioned protocol.
134
0
        if (_greeting_recv[0] != 0xff) {
135
0
            unversioned = true;
136
0
            break;
137
0
        }
138
139
0
        if (_greeting_bytes_read < signature_size)
140
0
            continue;
141
142
        //  Inspect the right-most bit of the 10th byte (which coincides
143
        //  with the 'flags' field if a regular message was sent).
144
        //  Zero indicates this is a header of a routing id message
145
        //  (i.e. the peer is using the unversioned protocol).
146
0
        if (!(_greeting_recv[9] & 0x01)) {
147
0
            unversioned = true;
148
0
            break;
149
0
        }
150
151
        //  The peer is using versioned protocol.
152
0
        receive_greeting_versioned ();
153
0
    }
154
0
    return unversioned ? 1 : 0;
155
0
}
156
157
void zmq::zmtp_engine_t::receive_greeting_versioned ()
158
0
{
159
    //  Send the major version number.
160
0
    if (_outpos + _outsize == _greeting_send + signature_size) {
161
0
        if (_outsize == 0)
162
0
            set_pollout ();
163
0
        _outpos[_outsize++] = 3; //  Major version number
164
0
    }
165
166
0
    if (_greeting_bytes_read > signature_size) {
167
0
        if (_outpos + _outsize == _greeting_send + signature_size + 1) {
168
0
            if (_outsize == 0)
169
0
                set_pollout ();
170
171
            //  Use ZMTP/2.0 to talk to older peers.
172
0
            if (_greeting_recv[revision_pos] == ZMTP_1_0
173
0
                || _greeting_recv[revision_pos] == ZMTP_2_0)
174
0
                _outpos[_outsize++] = _options.type;
175
0
            else {
176
0
                _outpos[_outsize++] = 1; //  Minor version number
177
0
                memset (_outpos + _outsize, 0, 20);
178
179
0
                zmq_assert (_options.mechanism == ZMQ_NULL
180
0
                            || _options.mechanism == ZMQ_PLAIN
181
0
                            || _options.mechanism == ZMQ_CURVE
182
0
                            || _options.mechanism == ZMQ_GSSAPI);
183
184
0
                if (_options.mechanism == ZMQ_NULL)
185
0
                    memcpy (_outpos + _outsize, "NULL", 4);
186
0
                else if (_options.mechanism == ZMQ_PLAIN)
187
0
                    memcpy (_outpos + _outsize, "PLAIN", 5);
188
0
                else if (_options.mechanism == ZMQ_GSSAPI)
189
0
                    memcpy (_outpos + _outsize, "GSSAPI", 6);
190
0
                else if (_options.mechanism == ZMQ_CURVE)
191
0
                    memcpy (_outpos + _outsize, "CURVE", 5);
192
0
                _outsize += 20;
193
0
                memset (_outpos + _outsize, 0, 32);
194
0
                _outsize += 32;
195
0
                _greeting_size = v3_greeting_size;
196
0
            }
197
0
        }
198
0
    }
199
0
}
200
201
zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun (
202
  bool unversioned_, unsigned char revision_, unsigned char minor_)
203
0
{
204
    //  Is the peer using ZMTP/1.0 with no revision number?
205
0
    if (unversioned_) {
206
0
        return &zmtp_engine_t::handshake_v1_0_unversioned;
207
0
    }
208
0
    switch (revision_) {
209
0
        case ZMTP_1_0:
210
0
            return &zmtp_engine_t::handshake_v1_0;
211
0
        case ZMTP_2_0:
212
0
            return &zmtp_engine_t::handshake_v2_0;
213
0
        case ZMTP_3_x:
214
0
            switch (minor_) {
215
0
                case 0:
216
0
                    return &zmtp_engine_t::handshake_v3_0;
217
0
                default:
218
0
                    return &zmtp_engine_t::handshake_v3_1;
219
0
            }
220
0
        default:
221
0
            return &zmtp_engine_t::handshake_v3_1;
222
0
    }
223
0
}
224
225
bool zmq::zmtp_engine_t::handshake_v1_0_unversioned ()
226
0
{
227
    //  We send and receive rest of routing id message
228
0
    if (session ()->zap_enabled ()) {
229
        // reject ZMTP 1.0 connections if ZAP is enabled
230
0
        error (protocol_error);
231
0
        return false;
232
0
    }
233
234
0
    _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
235
0
    alloc_assert (_encoder);
236
237
0
    _decoder = new (std::nothrow)
238
0
      v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
239
0
    alloc_assert (_decoder);
240
241
    //  We have already sent the message header.
242
    //  Since there is no way to tell the encoder to
243
    //  skip the message header, we simply throw that
244
    //  header data away.
245
0
    const size_t header_size =
246
0
      _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
247
0
    unsigned char tmp[10], *bufferp = tmp;
248
249
    //  Prepare the routing id message and load it into encoder.
250
    //  Then consume bytes we have already sent to the peer.
251
0
    int rc = _routing_id_msg.close ();
252
0
    zmq_assert (rc == 0);
253
0
    rc = _routing_id_msg.init_size (_options.routing_id_size);
254
0
    zmq_assert (rc == 0);
255
0
    memcpy (_routing_id_msg.data (), _options.routing_id,
256
0
            _options.routing_id_size);
257
0
    _encoder->load_msg (&_routing_id_msg);
258
0
    const size_t buffer_size = _encoder->encode (&bufferp, header_size);
259
0
    zmq_assert (buffer_size == header_size);
260
261
    //  Make sure the decoder sees the data we have already received.
262
0
    _inpos = _greeting_recv;
263
0
    _insize = _greeting_bytes_read;
264
265
    //  To allow for interoperability with peers that do not forward
266
    //  their subscriptions, we inject a phantom subscription message
267
    //  message into the incoming message stream.
268
0
    if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
269
0
        _subscription_required = true;
270
271
    //  We are sending our routing id now and the next message
272
    //  will come from the socket.
273
0
    _next_msg = &zmtp_engine_t::pull_msg_from_session;
274
275
    //  We are expecting routing id message.
276
0
    _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
277
0
      &zmtp_engine_t::process_routing_id_msg);
278
279
0
    return true;
280
0
}
281
282
bool zmq::zmtp_engine_t::handshake_v1_0 ()
283
0
{
284
0
    if (session ()->zap_enabled ()) {
285
        // reject ZMTP 1.0 connections if ZAP is enabled
286
0
        error (protocol_error);
287
0
        return false;
288
0
    }
289
290
0
    _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
291
0
    alloc_assert (_encoder);
292
293
0
    _decoder = new (std::nothrow)
294
0
      v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
295
0
    alloc_assert (_decoder);
296
297
0
    return true;
298
0
}
299
300
bool zmq::zmtp_engine_t::handshake_v2_0 ()
301
0
{
302
0
    if (session ()->zap_enabled ()) {
303
        // reject ZMTP 2.0 connections if ZAP is enabled
304
0
        error (protocol_error);
305
0
        return false;
306
0
    }
307
308
0
    _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
309
0
    alloc_assert (_encoder);
310
311
0
    _decoder = new (std::nothrow) v2_decoder_t (
312
0
      _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
313
0
    alloc_assert (_decoder);
314
315
0
    return true;
316
0
}
317
318
bool zmq::zmtp_engine_t::handshake_v3_x (const bool downgrade_sub_)
319
0
{
320
0
    if (_options.mechanism == ZMQ_NULL
321
0
        && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
322
0
                   20)
323
0
             == 0) {
324
0
        _mechanism = new (std::nothrow)
325
0
          null_mechanism_t (session (), _peer_address, _options);
326
0
        alloc_assert (_mechanism);
327
0
    } else if (_options.mechanism == ZMQ_PLAIN
328
0
               && memcmp (_greeting_recv + 12,
329
0
                          "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
330
0
                    == 0) {
331
0
        if (_options.as_server)
332
0
            _mechanism = new (std::nothrow)
333
0
              plain_server_t (session (), _peer_address, _options);
334
0
        else
335
0
            _mechanism =
336
0
              new (std::nothrow) plain_client_t (session (), _options);
337
0
        alloc_assert (_mechanism);
338
0
    }
339
0
#ifdef ZMQ_HAVE_CURVE
340
0
    else if (_options.mechanism == ZMQ_CURVE
341
0
             && memcmp (_greeting_recv + 12,
342
0
                        "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
343
0
                  == 0) {
344
0
        if (_options.as_server)
345
0
            _mechanism = new (std::nothrow) curve_server_t (
346
0
              session (), _peer_address, _options, downgrade_sub_);
347
0
        else
348
0
            _mechanism = new (std::nothrow)
349
0
              curve_client_t (session (), _options, downgrade_sub_);
350
0
        alloc_assert (_mechanism);
351
0
    }
352
0
#endif
353
#ifdef HAVE_LIBGSSAPI_KRB5
354
    else if (_options.mechanism == ZMQ_GSSAPI
355
             && memcmp (_greeting_recv + 12,
356
                        "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
357
                  == 0) {
358
        if (_options.as_server)
359
            _mechanism = new (std::nothrow)
360
              gssapi_server_t (session (), _peer_address, _options);
361
        else
362
            _mechanism =
363
              new (std::nothrow) gssapi_client_t (session (), _options);
364
        alloc_assert (_mechanism);
365
    }
366
#endif
367
0
    else {
368
0
        socket ()->event_handshake_failed_protocol (
369
0
          session ()->get_endpoint (),
370
0
          ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
371
0
        error (protocol_error);
372
0
        return false;
373
0
    }
374
#ifndef ZMQ_HAVE_CURVE
375
    LIBZMQ_UNUSED (downgrade_sub_);
376
#endif
377
0
    _next_msg = &zmtp_engine_t::next_handshake_command;
378
0
    _process_msg = &zmtp_engine_t::process_handshake_command;
379
380
0
    return true;
381
0
}
382
383
bool zmq::zmtp_engine_t::handshake_v3_0 ()
384
0
{
385
0
    _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
386
0
    alloc_assert (_encoder);
387
388
0
    _decoder = new (std::nothrow) v2_decoder_t (
389
0
      _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
390
0
    alloc_assert (_decoder);
391
392
0
    return zmq::zmtp_engine_t::handshake_v3_x (true);
393
0
}
394
395
bool zmq::zmtp_engine_t::handshake_v3_1 ()
396
0
{
397
0
    _encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size);
398
0
    alloc_assert (_encoder);
399
400
0
    _decoder = new (std::nothrow) v2_decoder_t (
401
0
      _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
402
0
    alloc_assert (_decoder);
403
404
0
    return zmq::zmtp_engine_t::handshake_v3_x (false);
405
0
}
406
407
int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
408
0
{
409
0
    const int rc = msg_->init_size (_options.routing_id_size);
410
0
    errno_assert (rc == 0);
411
0
    if (_options.routing_id_size > 0)
412
0
        memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
413
0
    _next_msg = &zmtp_engine_t::pull_msg_from_session;
414
0
    return 0;
415
0
}
416
417
int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_)
418
0
{
419
0
    if (_options.recv_routing_id) {
420
0
        msg_->set_flags (msg_t::routing_id);
421
0
        const int rc = session ()->push_msg (msg_);
422
0
        errno_assert (rc == 0);
423
0
    } else {
424
0
        int rc = msg_->close ();
425
0
        errno_assert (rc == 0);
426
0
        rc = msg_->init ();
427
0
        errno_assert (rc == 0);
428
0
    }
429
430
0
    if (_subscription_required) {
431
0
        msg_t subscription;
432
433
        //  Inject the subscription message, so that also
434
        //  ZMQ 2.x peers receive published messages.
435
0
        int rc = subscription.init_size (1);
436
0
        errno_assert (rc == 0);
437
0
        *static_cast<unsigned char *> (subscription.data ()) = 1;
438
0
        rc = session ()->push_msg (&subscription);
439
0
        errno_assert (rc == 0);
440
0
    }
441
442
0
    _process_msg = &zmtp_engine_t::push_msg_to_session;
443
444
0
    return 0;
445
0
}
446
447
int zmq::zmtp_engine_t::produce_ping_message (msg_t *msg_)
448
0
{
449
    // 16-bit TTL + \4PING == 7
450
0
    const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
451
0
    zmq_assert (_mechanism != NULL);
452
453
0
    int rc = msg_->init_size (ping_ttl_len);
454
0
    errno_assert (rc == 0);
455
0
    msg_->set_flags (msg_t::command);
456
    // Copy in the command message
457
0
    memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
458
459
0
    uint16_t ttl_val = htons (_options.heartbeat_ttl);
460
0
    memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
461
0
            &ttl_val, sizeof (ttl_val));
462
463
0
    rc = _mechanism->encode (msg_);
464
0
    _next_msg = &zmtp_engine_t::pull_and_encode;
465
0
    if (!_has_timeout_timer && _heartbeat_timeout > 0) {
466
0
        add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
467
0
        _has_timeout_timer = true;
468
0
    }
469
0
    return rc;
470
0
}
471
472
int zmq::zmtp_engine_t::produce_pong_message (msg_t *msg_)
473
0
{
474
0
    zmq_assert (_mechanism != NULL);
475
476
0
    int rc = msg_->move (_pong_msg);
477
0
    errno_assert (rc == 0);
478
479
0
    rc = _mechanism->encode (msg_);
480
0
    _next_msg = &zmtp_engine_t::pull_and_encode;
481
0
    return rc;
482
0
}
483
484
int zmq::zmtp_engine_t::process_heartbeat_message (msg_t *msg_)
485
0
{
486
0
    if (msg_->is_ping ()) {
487
        // 16-bit TTL + \4PING == 7
488
0
        const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
489
0
        const size_t ping_max_ctx_len = 16;
490
0
        uint16_t remote_heartbeat_ttl;
491
492
        // Get the remote heartbeat TTL to setup the timer
493
0
        memcpy (&remote_heartbeat_ttl,
494
0
                static_cast<uint8_t *> (msg_->data ())
495
0
                  + msg_t::ping_cmd_name_size,
496
0
                ping_ttl_len - msg_t::ping_cmd_name_size);
497
0
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
498
        // The remote heartbeat is in 10ths of a second
499
        // so we multiply it by 100 to get the timer interval in ms.
500
0
        remote_heartbeat_ttl *= 100;
501
502
0
        if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
503
0
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
504
0
            _has_ttl_timer = true;
505
0
        }
506
507
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
508
        //  context which needs to be PONGed back, so build the pong message
509
        //  here and store it. Truncate it if it's too long.
510
        //  Given the engine goes straight to out_event, sequential PINGs will
511
        //  not be a problem.
512
0
        const size_t context_len =
513
0
          std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
514
0
        const int rc =
515
0
          _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
516
0
        errno_assert (rc == 0);
517
0
        _pong_msg.set_flags (msg_t::command);
518
0
        memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
519
0
        if (context_len > 0)
520
0
            memcpy (static_cast<uint8_t *> (_pong_msg.data ())
521
0
                      + msg_t::ping_cmd_name_size,
522
0
                    static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
523
0
                    context_len);
524
525
0
        _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
526
0
          &zmtp_engine_t::produce_pong_message);
527
0
        out_event ();
528
0
    }
529
530
0
    return 0;
531
0
}
532
533
int zmq::zmtp_engine_t::process_command_message (msg_t *msg_)
534
0
{
535
0
    const uint8_t cmd_name_size =
536
0
      *(static_cast<const uint8_t *> (msg_->data ()));
537
0
    const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
538
0
    const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
539
0
    const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
540
    //  Malformed command
541
0
    if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
542
0
        return -1;
543
544
0
    const uint8_t *const cmd_name =
545
0
      static_cast<const uint8_t *> (msg_->data ()) + 1;
546
0
    if (cmd_name_size == ping_name_size
547
0
        && memcmp (cmd_name, "PING", cmd_name_size) == 0)
548
0
        msg_->set_flags (zmq::msg_t::ping);
549
0
    if (cmd_name_size == ping_name_size
550
0
        && memcmp (cmd_name, "PONG", cmd_name_size) == 0)
551
0
        msg_->set_flags (zmq::msg_t::pong);
552
0
    if (cmd_name_size == sub_name_size
553
0
        && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0)
554
0
        msg_->set_flags (zmq::msg_t::subscribe);
555
0
    if (cmd_name_size == cancel_name_size
556
0
        && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0)
557
0
        msg_->set_flags (zmq::msg_t::cancel);
558
559
0
    if (msg_->is_ping () || msg_->is_pong ())
560
0
        return process_heartbeat_message (msg_);
561
562
0
    return 0;
563
0
}