Coverage Report

Created: 2025-07-12 06:05

/src/libzmq/src/socket_base.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <new>
5
#include <string>
6
#include <algorithm>
7
#include <limits>
8
9
#include "macros.hpp"
10
11
#if defined ZMQ_HAVE_WINDOWS
12
#if defined _MSC_VER
13
#if defined _WIN32_WCE
14
#include <cmnintrin.h>
15
#else
16
#include <intrin.h>
17
#endif
18
#endif
19
#else
20
#include <unistd.h>
21
#include <ctype.h>
22
#endif
23
24
#include "socket_base.hpp"
25
#include "tcp_listener.hpp"
26
#include "ws_listener.hpp"
27
#include "ipc_listener.hpp"
28
#include "tipc_listener.hpp"
29
#include "tcp_connecter.hpp"
30
#ifdef ZMQ_HAVE_WS
31
#include "ws_address.hpp"
32
#endif
33
#include "io_thread.hpp"
34
#include "session_base.hpp"
35
#include "config.hpp"
36
#include "pipe.hpp"
37
#include "err.hpp"
38
#include "ctx.hpp"
39
#include "likely.hpp"
40
#include "msg.hpp"
41
#include "address.hpp"
42
#include "ipc_address.hpp"
43
#include "tcp_address.hpp"
44
#include "udp_address.hpp"
45
#include "tipc_address.hpp"
46
#include "mailbox.hpp"
47
#include "mailbox_safe.hpp"
48
49
#ifdef ZMQ_HAVE_WSS
50
#include "wss_address.hpp"
51
#endif
52
#if defined ZMQ_HAVE_VMCI
53
#include "vmci_address.hpp"
54
#include "vmci_listener.hpp"
55
#endif
56
57
#ifdef ZMQ_HAVE_OPENPGM
58
#include "pgm_socket.hpp"
59
#endif
60
61
#include "pair.hpp"
62
#include "pub.hpp"
63
#include "sub.hpp"
64
#include "req.hpp"
65
#include "rep.hpp"
66
#include "pull.hpp"
67
#include "push.hpp"
68
#include "dealer.hpp"
69
#include "router.hpp"
70
#include "xpub.hpp"
71
#include "xsub.hpp"
72
#include "stream.hpp"
73
#include "server.hpp"
74
#include "client.hpp"
75
#include "radio.hpp"
76
#include "dish.hpp"
77
#include "gather.hpp"
78
#include "scatter.hpp"
79
#include "dgram.hpp"
80
#include "peer.hpp"
81
#include "channel.hpp"
82
83
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
84
                                             pipe_t *pipe_)
85
58
{
86
58
    _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
87
58
}
88
89
int zmq::socket_base_t::inprocs_t::erase_pipes (
90
  const std::string &endpoint_uri_str_)
91
0
{
92
0
    const std::pair<map_t::iterator, map_t::iterator> range =
93
0
      _inprocs.equal_range (endpoint_uri_str_);
94
0
    if (range.first == range.second) {
95
0
        errno = ENOENT;
96
0
        return -1;
97
0
    }
98
99
0
    for (map_t::iterator it = range.first; it != range.second; ++it) {
100
0
        it->second->send_disconnect_msg ();
101
0
        it->second->terminate (true);
102
0
    }
103
0
    _inprocs.erase (range.first, range.second);
104
0
    return 0;
105
0
}
106
107
void zmq::socket_base_t::inprocs_t::erase_pipe (const pipe_t *pipe_)
108
701
{
109
701
    for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
110
701
         it != end; ++it)
111
58
        if (it->second == pipe_) {
112
58
            _inprocs.erase (it);
113
58
            break;
114
58
        }
115
701
}
116
117
bool zmq::socket_base_t::check_tag () const
118
209k
{
119
209k
    return _tag == 0xbaddecaf;
120
209k
}
121
122
bool zmq::socket_base_t::is_thread_safe () const
123
0
{
124
0
    return _thread_safe;
125
0
}
126
127
zmq::socket_base_t *zmq::socket_base_t::create (int type_,
128
                                                class ctx_t *parent_,
129
                                                uint32_t tid_,
130
                                                int sid_)
131
4.21k
{
132
4.21k
    socket_base_t *s = NULL;
133
4.21k
    switch (type_) {
134
58
        case ZMQ_PAIR:
135
58
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
136
58
            break;
137
3.23k
        case ZMQ_PUB:
138
3.23k
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
139
3.23k
            break;
140
0
        case ZMQ_SUB:
141
0
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
142
0
            break;
143
0
        case ZMQ_REQ:
144
0
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
145
0
            break;
146
0
        case ZMQ_REP:
147
0
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
148
0
            break;
149
0
        case ZMQ_DEALER:
150
0
            s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
151
0
            break;
152
0
        case ZMQ_ROUTER:
153
0
            s = new (std::nothrow) router_t (parent_, tid_, sid_);
154
0
            break;
155
0
        case ZMQ_PULL:
156
0
            s = new (std::nothrow) pull_t (parent_, tid_, sid_);
157
0
            break;
158
0
        case ZMQ_PUSH:
159
0
            s = new (std::nothrow) push_t (parent_, tid_, sid_);
160
0
            break;
161
923
        case ZMQ_XPUB:
162
923
            s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
163
923
            break;
164
0
        case ZMQ_XSUB:
165
0
            s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
166
0
            break;
167
0
        case ZMQ_STREAM:
168
0
            s = new (std::nothrow) stream_t (parent_, tid_, sid_);
169
0
            break;
170
0
        case ZMQ_SERVER:
171
0
            s = new (std::nothrow) server_t (parent_, tid_, sid_);
172
0
            break;
173
0
        case ZMQ_CLIENT:
174
0
            s = new (std::nothrow) client_t (parent_, tid_, sid_);
175
0
            break;
176
0
        case ZMQ_RADIO:
177
0
            s = new (std::nothrow) radio_t (parent_, tid_, sid_);
178
0
            break;
179
0
        case ZMQ_DISH:
180
0
            s = new (std::nothrow) dish_t (parent_, tid_, sid_);
181
0
            break;
182
0
        case ZMQ_GATHER:
183
0
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
184
0
            break;
185
0
        case ZMQ_SCATTER:
186
0
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
187
0
            break;
188
0
        case ZMQ_DGRAM:
189
0
            s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
190
0
            break;
191
0
        case ZMQ_PEER:
192
0
            s = new (std::nothrow) peer_t (parent_, tid_, sid_);
193
0
            break;
194
0
        case ZMQ_CHANNEL:
195
0
            s = new (std::nothrow) channel_t (parent_, tid_, sid_);
196
0
            break;
197
0
        default:
198
0
            errno = EINVAL;
199
0
            return NULL;
200
4.21k
    }
201
202
4.21k
    alloc_assert (s);
203
204
4.21k
    if (s->_mailbox == NULL) {
205
0
        s->_destroyed = true;
206
0
        LIBZMQ_DELETE (s);
207
0
        return NULL;
208
0
    }
209
210
4.21k
    return s;
211
4.21k
}
212
213
zmq::socket_base_t::socket_base_t (ctx_t *parent_,
214
                                   uint32_t tid_,
215
                                   int sid_,
216
                                   bool thread_safe_) :
217
4.21k
    own_t (parent_, tid_),
218
4.21k
    _sync (),
219
4.21k
    _tag (0xbaddecaf),
220
4.21k
    _ctx_terminated (false),
221
4.21k
    _destroyed (false),
222
    _poller (NULL),
223
4.21k
    _handle (static_cast<poller_t::handle_t> (NULL)),
224
4.21k
    _last_tsc (0),
225
4.21k
    _ticks (0),
226
4.21k
    _rcvmore (false),
227
    _monitor_socket (NULL),
228
4.21k
    _monitor_events (0),
229
4.21k
    _thread_safe (thread_safe_),
230
    _reaper_signaler (NULL),
231
4.21k
    _monitor_sync (),
232
4.21k
    _disconnected (false)
233
4.21k
{
234
4.21k
    options.socket_id = sid_;
235
4.21k
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
236
4.21k
    options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
237
4.21k
    options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
238
239
4.21k
    if (_thread_safe) {
240
0
        _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
241
0
        zmq_assert (_mailbox);
242
4.21k
    } else {
243
4.21k
        mailbox_t *m = new (std::nothrow) mailbox_t ();
244
4.21k
        zmq_assert (m);
245
246
4.21k
        if (m->get_fd () != retired_fd)
247
4.21k
            _mailbox = m;
248
0
        else {
249
0
            LIBZMQ_DELETE (m);
250
0
            _mailbox = NULL;
251
0
        }
252
4.21k
    }
253
4.21k
}
254
255
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
256
                                        size_t routing_id_size_) const
257
0
{
258
0
    LIBZMQ_UNUSED (routing_id_);
259
0
    LIBZMQ_UNUSED (routing_id_size_);
260
261
    //  Only ROUTER sockets support this
262
0
    errno = ENOTSUP;
263
0
    return -1;
264
0
}
265
266
zmq::socket_base_t::~socket_base_t ()
267
4.21k
{
268
4.21k
    if (_mailbox)
269
4.21k
        LIBZMQ_DELETE (_mailbox);
270
271
4.21k
    if (_reaper_signaler)
272
0
        LIBZMQ_DELETE (_reaper_signaler);
273
274
4.21k
    scoped_lock_t lock (_monitor_sync);
275
4.21k
    stop_monitor ();
276
277
4.21k
    zmq_assert (_destroyed);
278
4.21k
}
279
280
zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
281
4.21k
{
282
4.21k
    return _mailbox;
283
4.21k
}
284
285
void zmq::socket_base_t::stop ()
286
4.21k
{
287
    //  Called by ctx when it is terminated (zmq_ctx_term).
288
    //  'stop' command is sent from the threads that called zmq_ctx_term to
289
    //  the thread owning the socket. This way, blocking call in the
290
    //  owner thread can be interrupted.
291
4.21k
    send_stop ();
292
4.21k
}
293
294
// TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
295
// terminology, but this requires extensive changes to be consistent
296
int zmq::socket_base_t::parse_uri (const char *uri_,
297
                                   std::string &protocol_,
298
                                   std::string &path_)
299
3.29k
{
300
3.29k
    zmq_assert (uri_ != NULL);
301
302
3.29k
    const std::string uri (uri_);
303
3.29k
    const std::string::size_type pos = uri.find ("://");
304
3.29k
    if (pos == std::string::npos) {
305
130
        errno = EINVAL;
306
130
        return -1;
307
130
    }
308
3.16k
    protocol_ = uri.substr (0, pos);
309
3.16k
    path_ = uri.substr (pos + 3);
310
311
3.16k
    if (protocol_.empty () || path_.empty ()) {
312
284
        errno = EINVAL;
313
284
        return -1;
314
284
    }
315
2.88k
    return 0;
316
3.16k
}
317
318
int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
319
2.88k
{
320
    //  First check out whether the protocol is something we are aware of.
321
2.88k
    if (protocol_ != protocol_name::inproc
322
2.88k
#if defined ZMQ_HAVE_IPC
323
2.88k
        && protocol_ != protocol_name::ipc
324
2.88k
#endif
325
2.88k
        && protocol_ != protocol_name::tcp
326
2.88k
#ifdef ZMQ_HAVE_WS
327
2.88k
        && protocol_ != protocol_name::ws
328
2.88k
#endif
329
#ifdef ZMQ_HAVE_WSS
330
        && protocol_ != protocol_name::wss
331
#endif
332
#if defined ZMQ_HAVE_OPENPGM
333
        //  pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
334
        && protocol_ != protocol_name::pgm
335
        && protocol_ != protocol_name::epgm
336
#endif
337
2.88k
#if defined ZMQ_HAVE_TIPC
338
        // TIPC transport is only available on Linux.
339
2.88k
        && protocol_ != protocol_name::tipc
340
2.88k
#endif
341
#if defined ZMQ_HAVE_NORM
342
        && protocol_ != protocol_name::norm
343
#endif
344
#if defined ZMQ_HAVE_VMCI
345
        && protocol_ != protocol_name::vmci
346
#endif
347
2.88k
        && protocol_ != protocol_name::udp) {
348
415
        errno = EPROTONOSUPPORT;
349
415
        return -1;
350
415
    }
351
352
    //  Check whether socket type and transport protocol match.
353
    //  Specifically, multicast protocols can't be combined with
354
    //  bi-directional messaging patterns (socket types).
355
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
356
#if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
357
    if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
358
         || protocol_ == protocol_name::norm)
359
#elif defined ZMQ_HAVE_OPENPGM
360
    if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
361
#else // defined ZMQ_HAVE_NORM
362
    if (protocol_ == protocol_name::norm
363
#endif
364
        && options.type != ZMQ_PUB && options.type != ZMQ_SUB
365
        && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
366
        errno = ENOCOMPATPROTO;
367
        return -1;
368
    }
369
#endif
370
371
2.46k
    if (protocol_ == protocol_name::udp
372
2.46k
        && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
373
2
            && options.type != ZMQ_DGRAM)) {
374
2
        errno = ENOCOMPATPROTO;
375
2
        return -1;
376
2
    }
377
378
    //  Protocol is available.
379
2.46k
    return 0;
380
2.46k
}
381
382
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
383
                                      bool subscribe_to_all_,
384
                                      bool locally_initiated_)
385
701
{
386
    //  First, register the pipe so that we can terminate it later on.
387
701
    pipe_->set_event_sink (this);
388
701
    _pipes.push_back (pipe_);
389
390
    //  Let the derived socket type know about new pipe.
391
701
    xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
392
393
    //  If the socket is already being closed, ask any new pipes to terminate
394
    //  straight away.
395
701
    if (is_terminating ()) {
396
0
        register_term_acks (1);
397
0
        pipe_->terminate (false);
398
0
    }
399
701
}
400
401
int zmq::socket_base_t::setsockopt (int option_,
402
                                    const void *optval_,
403
                                    size_t optvallen_)
404
102k
{
405
102k
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
406
407
102k
    if (unlikely (_ctx_terminated)) {
408
0
        errno = ETERM;
409
0
        return -1;
410
0
    }
411
412
    //  First, check whether specific socket type overloads the option.
413
102k
    int rc = xsetsockopt (option_, optval_, optvallen_);
414
102k
    if (rc == 0 || errno != EINVAL) {
415
1.48k
        return rc;
416
1.48k
    }
417
418
    //  If the socket type doesn't support the option, pass it to
419
    //  the generic option parser.
420
101k
    rc = options.setsockopt (option_, optval_, optvallen_);
421
101k
    update_pipe_options (option_);
422
423
101k
    return rc;
424
102k
}
425
426
int zmq::socket_base_t::getsockopt (int option_,
427
                                    void *optval_,
428
                                    size_t *optvallen_)
429
99.6k
{
430
99.6k
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
431
432
99.6k
    if (unlikely (_ctx_terminated)) {
433
0
        errno = ETERM;
434
0
        return -1;
435
0
    }
436
437
    //  First, check whether specific socket type overloads the option.
438
99.6k
    int rc = xgetsockopt (option_, optval_, optvallen_);
439
99.6k
    if (rc == 0 || errno != EINVAL) {
440
0
        return rc;
441
0
    }
442
443
99.6k
    if (option_ == ZMQ_RCVMORE) {
444
923
        return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
445
923
    }
446
447
98.7k
    if (option_ == ZMQ_FD) {
448
923
        if (_thread_safe) {
449
            // thread safe socket doesn't provide file descriptor
450
0
            errno = EINVAL;
451
0
            return -1;
452
0
        }
453
454
923
        return do_getsockopt<fd_t> (
455
923
          optval_, optvallen_,
456
923
          (static_cast<mailbox_t *> (_mailbox))->get_fd ());
457
923
    }
458
459
97.8k
    if (option_ == ZMQ_EVENTS) {
460
923
        const int rc = process_commands (0, false);
461
923
        if (rc != 0 && (errno == EINTR || errno == ETERM)) {
462
0
            return -1;
463
0
        }
464
923
        errno_assert (rc == 0);
465
466
923
        return do_getsockopt<int> (optval_, optvallen_,
467
923
                                   (has_out () ? ZMQ_POLLOUT : 0)
468
923
                                     | (has_in () ? ZMQ_POLLIN : 0));
469
923
    }
470
471
96.9k
    if (option_ == ZMQ_LAST_ENDPOINT) {
472
923
        return do_getsockopt (optval_, optvallen_, _last_endpoint);
473
923
    }
474
475
95.9k
    if (option_ == ZMQ_THREAD_SAFE) {
476
923
        return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
477
923
    }
478
479
95.0k
    return options.getsockopt (option_, optval_, optvallen_);
480
95.9k
}
481
482
int zmq::socket_base_t::join (const char *group_)
483
0
{
484
0
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
485
486
0
    return xjoin (group_);
487
0
}
488
489
int zmq::socket_base_t::leave (const char *group_)
490
0
{
491
0
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
492
493
0
    return xleave (group_);
494
0
}
495
496
void zmq::socket_base_t::add_signaler (signaler_t *s_)
497
0
{
498
0
    zmq_assert (_thread_safe);
499
500
0
    scoped_lock_t sync_lock (_sync);
501
0
    (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
502
0
}
503
504
void zmq::socket_base_t::remove_signaler (signaler_t *s_)
505
0
{
506
0
    zmq_assert (_thread_safe);
507
508
0
    scoped_lock_t sync_lock (_sync);
509
0
    (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
510
0
}
511
512
int zmq::socket_base_t::bind (const char *endpoint_uri_)
513
1.64k
{
514
1.64k
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
515
516
1.64k
    if (unlikely (_ctx_terminated)) {
517
0
        errno = ETERM;
518
0
        return -1;
519
0
    }
520
521
    //  Process pending commands, if any.
522
1.64k
    int rc = process_commands (0, false);
523
1.64k
    if (unlikely (rc != 0)) {
524
0
        return -1;
525
0
    }
526
527
    //  Parse endpoint_uri_ string.
528
1.64k
    std::string protocol;
529
1.64k
    std::string address;
530
1.64k
    if (parse_uri (endpoint_uri_, protocol, address)
531
1.64k
        || check_protocol (protocol)) {
532
431
        return -1;
533
431
    }
534
535
1.21k
    if (protocol == protocol_name::inproc) {
536
114
        const endpoint_t endpoint = {this, options};
537
114
        rc = register_endpoint (endpoint_uri_, endpoint);
538
114
        if (rc == 0) {
539
114
            connect_pending (endpoint_uri_, this);
540
114
            _last_endpoint.assign (endpoint_uri_);
541
114
            options.connected = true;
542
114
        }
543
114
        return rc;
544
114
    }
545
546
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
547
#if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
548
    if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
549
        || protocol == protocol_name::norm) {
550
#elif defined ZMQ_HAVE_OPENPGM
551
    if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
552
#else // defined ZMQ_HAVE_NORM
553
    if (protocol == protocol_name::norm) {
554
#endif
555
        //  For convenience's sake, bind can be used interchangeable with
556
        //  connect for PGM, EPGM, NORM transports.
557
        rc = connect (endpoint_uri_);
558
        if (rc != -1)
559
            options.connected = true;
560
        return rc;
561
    }
562
#endif
563
564
1.09k
    if (protocol == protocol_name::udp) {
565
0
        if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
566
0
            errno = ENOCOMPATPROTO;
567
0
            return -1;
568
0
        }
569
570
        //  Choose the I/O thread to run the session in.
571
0
        io_thread_t *io_thread = choose_io_thread (options.affinity);
572
0
        if (!io_thread) {
573
0
            errno = EMTHREAD;
574
0
            return -1;
575
0
        }
576
577
0
        address_t *paddr =
578
0
          new (std::nothrow) address_t (protocol, address, this->get_ctx ());
579
0
        alloc_assert (paddr);
580
581
0
        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
582
0
        alloc_assert (paddr->resolved.udp_addr);
583
0
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
584
0
                                                options.ipv6);
585
0
        if (rc != 0) {
586
0
            LIBZMQ_DELETE (paddr);
587
0
            return -1;
588
0
        }
589
590
0
        session_base_t *session =
591
0
          session_base_t::create (io_thread, true, this, options, paddr);
592
0
        errno_assert (session);
593
594
        //  Create a bi-directional pipe.
595
0
        object_t *parents[2] = {this, session};
596
0
        pipe_t *new_pipes[2] = {NULL, NULL};
597
598
0
        int hwms[2] = {options.sndhwm, options.rcvhwm};
599
0
        bool conflates[2] = {false, false};
600
0
        rc = pipepair (parents, new_pipes, hwms, conflates);
601
0
        errno_assert (rc == 0);
602
603
        //  Attach local end of the pipe to the socket object.
604
0
        attach_pipe (new_pipes[0], true, true);
605
0
        pipe_t *const newpipe = new_pipes[0];
606
607
        //  Attach remote end of the pipe to the session object later on.
608
0
        session->attach_pipe (new_pipes[1]);
609
610
        //  Save last endpoint URI
611
0
        paddr->to_string (_last_endpoint);
612
613
        //  TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
614
0
        add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
615
0
                                           endpoint_type_none),
616
0
                      static_cast<own_t *> (session), newpipe);
617
618
0
        return 0;
619
0
    }
620
621
    //  Remaining transports require to be run in an I/O thread, so at this
622
    //  point we'll choose one.
623
1.09k
    io_thread_t *io_thread = choose_io_thread (options.affinity);
624
1.09k
    if (!io_thread) {
625
0
        errno = EMTHREAD;
626
0
        return -1;
627
0
    }
628
629
1.09k
    if (protocol == protocol_name::tcp) {
630
16
        tcp_listener_t *listener =
631
16
          new (std::nothrow) tcp_listener_t (io_thread, this, options);
632
16
        alloc_assert (listener);
633
16
        rc = listener->set_local_address (address.c_str ());
634
16
        if (rc != 0) {
635
14
            LIBZMQ_DELETE (listener);
636
14
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
637
14
                               zmq_errno ());
638
14
            return -1;
639
14
        }
640
641
        // Save last endpoint URI
642
2
        listener->get_local_address (_last_endpoint);
643
644
2
        add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
645
2
                      static_cast<own_t *> (listener), NULL);
646
2
        options.connected = true;
647
2
        return 0;
648
16
    }
649
650
1.08k
#ifdef ZMQ_HAVE_WS
651
#ifdef ZMQ_HAVE_WSS
652
    if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
653
        ws_listener_t *listener = new (std::nothrow) ws_listener_t (
654
          io_thread, this, options, protocol == protocol_name::wss);
655
#else
656
1.08k
    if (protocol == protocol_name::ws) {
657
789
        ws_listener_t *listener =
658
789
          new (std::nothrow) ws_listener_t (io_thread, this, options, false);
659
789
#endif
660
789
        alloc_assert (listener);
661
789
        rc = listener->set_local_address (address.c_str ());
662
789
        if (rc != 0) {
663
701
            LIBZMQ_DELETE (listener);
664
701
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
665
701
                               zmq_errno ());
666
701
            return -1;
667
701
        }
668
669
        // Save last endpoint URI
670
88
        listener->get_local_address (_last_endpoint);
671
672
88
        add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
673
88
                      static_cast<own_t *> (listener), NULL);
674
88
        options.connected = true;
675
88
        return 0;
676
789
    }
677
294
#endif
678
679
294
#if defined ZMQ_HAVE_IPC
680
294
    if (protocol == protocol_name::ipc) {
681
114
        ipc_listener_t *listener =
682
114
          new (std::nothrow) ipc_listener_t (io_thread, this, options);
683
114
        alloc_assert (listener);
684
114
        int rc = listener->set_local_address (address.c_str ());
685
114
        if (rc != 0) {
686
73
            LIBZMQ_DELETE (listener);
687
73
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
688
73
                               zmq_errno ());
689
73
            return -1;
690
73
        }
691
692
        // Save last endpoint URI
693
41
        listener->get_local_address (_last_endpoint);
694
695
41
        add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
696
41
                      static_cast<own_t *> (listener), NULL);
697
41
        options.connected = true;
698
41
        return 0;
699
114
    }
700
180
#endif
701
180
#if defined ZMQ_HAVE_TIPC
702
180
    if (protocol == protocol_name::tipc) {
703
180
        tipc_listener_t *listener =
704
180
          new (std::nothrow) tipc_listener_t (io_thread, this, options);
705
180
        alloc_assert (listener);
706
180
        int rc = listener->set_local_address (address.c_str ());
707
180
        if (rc != 0) {
708
180
            LIBZMQ_DELETE (listener);
709
180
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
710
180
                               zmq_errno ());
711
180
            return -1;
712
180
        }
713
714
        // Save last endpoint URI
715
0
        listener->get_local_address (_last_endpoint);
716
717
        // TODO shouldn't this use _last_endpoint as in the other cases?
718
0
        add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
719
0
                      static_cast<own_t *> (listener), NULL);
720
0
        options.connected = true;
721
0
        return 0;
722
180
    }
723
0
#endif
724
#if defined ZMQ_HAVE_VMCI
725
    if (protocol == protocol_name::vmci) {
726
        vmci_listener_t *listener =
727
          new (std::nothrow) vmci_listener_t (io_thread, this, options);
728
        alloc_assert (listener);
729
        int rc = listener->set_local_address (address.c_str ());
730
        if (rc != 0) {
731
            LIBZMQ_DELETE (listener);
732
            event_bind_failed (make_unconnected_bind_endpoint_pair (address),
733
                               zmq_errno ());
734
            return -1;
735
        }
736
737
        listener->get_local_address (_last_endpoint);
738
739
        add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
740
                      static_cast<own_t *> (listener), NULL);
741
        options.connected = true;
742
        return 0;
743
    }
744
#endif
745
746
0
    zmq_assert (false);
747
0
    return -1;
748
180
}
749
750
int zmq::socket_base_t::connect (const char *endpoint_uri_)
751
1.65k
{
752
1.65k
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
753
1.65k
    return connect_internal (endpoint_uri_);
754
1.65k
}
755
756
int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
757
1.65k
{
758
1.65k
    if (unlikely (_ctx_terminated)) {
759
0
        errno = ETERM;
760
0
        return -1;
761
0
    }
762
763
    //  Process pending commands, if any.
764
1.65k
    int rc = process_commands (0, false);
765
1.65k
    if (unlikely (rc != 0)) {
766
0
        return -1;
767
0
    }
768
769
    //  Parse endpoint_uri_ string.
770
1.65k
    std::string protocol;
771
1.65k
    std::string address;
772
1.65k
    if (parse_uri (endpoint_uri_, protocol, address)
773
1.65k
        || check_protocol (protocol)) {
774
400
        return -1;
775
400
    }
776
777
1.25k
    if (protocol == protocol_name::inproc) {
778
        //  TODO: inproc connect is specific with respect to creating pipes
779
        //  as there's no 'reconnect' functionality implemented. Once that
780
        //  is in place we should follow generic pipe creation algorithm.
781
782
        //  Find the peer endpoint.
783
58
        const endpoint_t peer = find_endpoint (endpoint_uri_);
784
785
        // The total HWM for an inproc connection should be the sum of
786
        // the binder's HWM and the connector's HWM.
787
58
        const int sndhwm = peer.socket == NULL ? options.sndhwm
788
58
                           : options.sndhwm != 0 && peer.options.rcvhwm != 0
789
0
                             ? options.sndhwm + peer.options.rcvhwm
790
0
                             : 0;
791
58
        const int rcvhwm = peer.socket == NULL ? options.rcvhwm
792
58
                           : options.rcvhwm != 0 && peer.options.sndhwm != 0
793
0
                             ? options.rcvhwm + peer.options.sndhwm
794
0
                             : 0;
795
796
        //  Create a bi-directional pipe to connect the peers.
797
58
        object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
798
58
        pipe_t *new_pipes[2] = {NULL, NULL};
799
800
58
        const bool conflate = get_effective_conflate_option (options);
801
802
58
        int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
803
58
        bool conflates[2] = {conflate, conflate};
804
58
        rc = pipepair (parents, new_pipes, hwms, conflates);
805
58
        if (!conflate) {
806
58
            new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
807
58
                                          peer.options.rcvhwm);
808
58
            new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
809
58
        }
810
811
58
        errno_assert (rc == 0);
812
813
58
        if (!peer.socket) {
814
            //  The peer doesn't exist yet so we don't know whether
815
            //  to send the routing id message or not. To resolve this,
816
            //  we always send our routing id and drop it later if
817
            //  the peer doesn't expect it.
818
58
            send_routing_id (new_pipes[0], options);
819
820
58
#ifdef ZMQ_BUILD_DRAFT_API
821
            //  If set, send the hello msg of the local socket to the peer.
822
58
            if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
823
0
                send_hello_msg (new_pipes[0], options);
824
0
            }
825
58
#endif
826
827
58
            const endpoint_t endpoint = {this, options};
828
58
            pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
829
58
        } else {
830
            //  If required, send the routing id of the local socket to the peer.
831
0
            if (peer.options.recv_routing_id) {
832
0
                send_routing_id (new_pipes[0], options);
833
0
            }
834
835
            //  If required, send the routing id of the peer to the local socket.
836
0
            if (options.recv_routing_id) {
837
0
                send_routing_id (new_pipes[1], peer.options);
838
0
            }
839
840
0
#ifdef ZMQ_BUILD_DRAFT_API
841
            //  If set, send the hello msg of the local socket to the peer.
842
0
            if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
843
0
                send_hello_msg (new_pipes[0], options);
844
0
            }
845
846
            //  If set, send the hello msg of the peer to the local socket.
847
0
            if (peer.options.can_send_hello_msg
848
0
                && peer.options.hello_msg.size () > 0) {
849
0
                send_hello_msg (new_pipes[1], peer.options);
850
0
            }
851
852
0
            if (peer.options.can_recv_disconnect_msg
853
0
                && peer.options.disconnect_msg.size () > 0)
854
0
                new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
855
0
#endif
856
857
            //  Attach remote end of the pipe to the peer socket. Note that peer's
858
            //  seqnum was incremented in find_endpoint function. We don't need it
859
            //  increased here.
860
0
            send_bind (peer.socket, new_pipes[1], false);
861
0
        }
862
863
        //  Attach local end of the pipe to this socket object.
864
58
        attach_pipe (new_pipes[0], false, true);
865
866
        // Save last endpoint URI
867
58
        _last_endpoint.assign (endpoint_uri_);
868
869
        // remember inproc connections for disconnect
870
58
        _inprocs.emplace (endpoint_uri_, new_pipes[0]);
871
872
58
        options.connected = true;
873
58
        return 0;
874
58
    }
875
1.19k
    const bool is_single_connect =
876
1.19k
      (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
877
1.19k
       || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
878
1.19k
    if (unlikely (is_single_connect)) {
879
1.19k
        if (0 != _endpoints.count (endpoint_uri_)) {
880
            // There is no valid use for multiple connects for SUB-PUB nor
881
            // DEALER-ROUTER nor REQ-REP. Multiple connects produces
882
            // nonsensical results.
883
0
            return 0;
884
0
        }
885
1.19k
    }
886
887
    //  Choose the I/O thread to run the session in.
888
1.19k
    io_thread_t *io_thread = choose_io_thread (options.affinity);
889
1.19k
    if (!io_thread) {
890
0
        errno = EMTHREAD;
891
0
        return -1;
892
0
    }
893
894
1.19k
    address_t *paddr =
895
1.19k
      new (std::nothrow) address_t (protocol, address, this->get_ctx ());
896
1.19k
    alloc_assert (paddr);
897
898
    //  Resolve address (if needed by the protocol)
899
1.19k
    if (protocol == protocol_name::tcp) {
900
        //  Do some basic sanity checks on tcp:// address syntax
901
        //  - hostname starts with digit or letter, with embedded '-' or '.'
902
        //  - IPv6 address may contain hex chars and colons.
903
        //  - IPv6 link local address may contain % followed by interface name / zone_id
904
        //    (Reference: https://tools.ietf.org/html/rfc4007)
905
        //  - IPv4 address may contain decimal digits and dots.
906
        //  - Address must end in ":port" where port is *, or numeric
907
        //  - Address may contain two parts separated by ':'
908
        //  Following code is quick and dirty check to catch obvious errors,
909
        //  without trying to be fully accurate.
910
154
        const char *check = address.c_str ();
911
154
        if (isalnum (*check) || isxdigit (*check) || *check == '['
912
154
            || *check == ':') {
913
142
            check++;
914
6.59k
            while (isalnum (*check) || isxdigit (*check) || *check == '.'
915
6.59k
                   || *check == '-' || *check == ':' || *check == '%'
916
6.59k
                   || *check == ';' || *check == '[' || *check == ']'
917
6.59k
                   || *check == '_' || *check == '*') {
918
6.44k
                check++;
919
6.44k
            }
920
142
        }
921
        //  Assume the worst, now look for success
922
154
        rc = -1;
923
        //  Did we reach the end of the address safely?
924
154
        if (*check == 0) {
925
            //  Do we have a valid port string? (cannot be '*' in connect
926
120
            check = strrchr (address.c_str (), ':');
927
120
            if (check) {
928
55
                check++;
929
55
                if (*check && (isdigit (*check)))
930
37
                    rc = 0; //  Valid
931
55
            }
932
120
        }
933
154
        if (rc == -1) {
934
117
            errno = EINVAL;
935
117
            LIBZMQ_DELETE (paddr);
936
117
            return -1;
937
117
        }
938
        //  Defer resolution until a socket is opened
939
37
        paddr->resolved.tcp_addr = NULL;
940
37
    }
941
1.03k
#ifdef ZMQ_HAVE_WS
942
#ifdef ZMQ_HAVE_WSS
943
    else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
944
        if (protocol == protocol_name::wss) {
945
            paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
946
            alloc_assert (paddr->resolved.wss_addr);
947
            rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
948
                                                    options.ipv6);
949
        } else
950
#else
951
1.03k
    else if (protocol == protocol_name::ws) {
952
803
#endif
953
803
        {
954
803
            paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
955
803
            alloc_assert (paddr->resolved.ws_addr);
956
803
            rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
957
803
                                                   options.ipv6);
958
803
        }
959
960
803
        if (rc != 0) {
961
403
            LIBZMQ_DELETE (paddr);
962
403
            return -1;
963
403
        }
964
803
    }
965
235
#endif
966
967
235
#if defined ZMQ_HAVE_IPC
968
235
    else if (protocol == protocol_name::ipc) {
969
50
        paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
970
50
        alloc_assert (paddr->resolved.ipc_addr);
971
50
        int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
972
50
        if (rc != 0) {
973
15
            LIBZMQ_DELETE (paddr);
974
15
            return -1;
975
15
        }
976
50
    }
977
657
#endif
978
979
657
    if (protocol == protocol_name::udp) {
980
0
        if (options.type != ZMQ_RADIO) {
981
0
            errno = ENOCOMPATPROTO;
982
0
            LIBZMQ_DELETE (paddr);
983
0
            return -1;
984
0
        }
985
986
0
        paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
987
0
        alloc_assert (paddr->resolved.udp_addr);
988
0
        rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
989
0
                                                options.ipv6);
990
0
        if (rc != 0) {
991
0
            LIBZMQ_DELETE (paddr);
992
0
            return -1;
993
0
        }
994
0
    }
995
996
    // TBD - Should we check address for ZMQ_HAVE_NORM???
997
998
#ifdef ZMQ_HAVE_OPENPGM
999
    if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
1000
        struct pgm_addrinfo_t *res = NULL;
1001
        uint16_t port_number = 0;
1002
        int rc =
1003
          pgm_socket_t::init_address (address.c_str (), &res, &port_number);
1004
        if (res != NULL)
1005
            pgm_freeaddrinfo (res);
1006
        if (rc != 0 || port_number == 0) {
1007
            return -1;
1008
        }
1009
    }
1010
#endif
1011
657
#if defined ZMQ_HAVE_TIPC
1012
657
    else if (protocol == protocol_name::tipc) {
1013
185
        paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
1014
185
        alloc_assert (paddr->resolved.tipc_addr);
1015
185
        int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
1016
185
        if (rc != 0) {
1017
71
            LIBZMQ_DELETE (paddr);
1018
71
            return -1;
1019
71
        }
1020
114
        const sockaddr_tipc *const saddr =
1021
114
          reinterpret_cast<const sockaddr_tipc *> (
1022
114
            paddr->resolved.tipc_addr->addr ());
1023
        // Cannot connect to random Port Identity
1024
114
        if (saddr->addrtype == TIPC_ADDR_ID
1025
114
            && paddr->resolved.tipc_addr->is_random ()) {
1026
1
            LIBZMQ_DELETE (paddr);
1027
1
            errno = EINVAL;
1028
1
            return -1;
1029
1
        }
1030
114
    }
1031
585
#endif
1032
#if defined ZMQ_HAVE_VMCI
1033
    else if (protocol == protocol_name::vmci) {
1034
        paddr->resolved.vmci_addr =
1035
          new (std::nothrow) vmci_address_t (this->get_ctx ());
1036
        alloc_assert (paddr->resolved.vmci_addr);
1037
        int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
1038
        if (rc != 0) {
1039
            LIBZMQ_DELETE (paddr);
1040
            return -1;
1041
        }
1042
    }
1043
#endif
1044
1045
    //  Create session.
1046
585
    session_base_t *session =
1047
585
      session_base_t::create (io_thread, true, this, options, paddr);
1048
585
    errno_assert (session);
1049
1050
    //  PGM does not support subscription forwarding; ask for all data to be
1051
    //  sent to this pipe. (same for NORM, currently?)
1052
#if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
1053
    const bool subscribe_to_all =
1054
      protocol == protocol_name::pgm || protocol == protocol_name::epgm
1055
      || protocol == protocol_name::norm || protocol == protocol_name::udp;
1056
#elif defined ZMQ_HAVE_OPENPGM
1057
    const bool subscribe_to_all = protocol == protocol_name::pgm
1058
                                  || protocol == protocol_name::epgm
1059
                                  || protocol == protocol_name::udp;
1060
#elif defined ZMQ_HAVE_NORM
1061
    const bool subscribe_to_all =
1062
      protocol == protocol_name::norm || protocol == protocol_name::udp;
1063
#else
1064
585
    const bool subscribe_to_all = protocol == protocol_name::udp;
1065
585
#endif
1066
585
    pipe_t *newpipe = NULL;
1067
1068
585
    if (options.immediate != 1 || subscribe_to_all) {
1069
        //  Create a bi-directional pipe.
1070
585
        object_t *parents[2] = {this, session};
1071
585
        pipe_t *new_pipes[2] = {NULL, NULL};
1072
1073
585
        const bool conflate = get_effective_conflate_option (options);
1074
1075
585
        int hwms[2] = {conflate ? -1 : options.sndhwm,
1076
585
                       conflate ? -1 : options.rcvhwm};
1077
585
        bool conflates[2] = {conflate, conflate};
1078
585
        rc = pipepair (parents, new_pipes, hwms, conflates);
1079
585
        errno_assert (rc == 0);
1080
1081
        //  Attach local end of the pipe to the socket object.
1082
585
        attach_pipe (new_pipes[0], subscribe_to_all, true);
1083
585
        newpipe = new_pipes[0];
1084
1085
        //  Attach remote end of the pipe to the session object later on.
1086
585
        session->attach_pipe (new_pipes[1]);
1087
585
    }
1088
1089
    //  Save last endpoint URI
1090
585
    paddr->to_string (_last_endpoint);
1091
1092
585
    add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
1093
585
                  static_cast<own_t *> (session), newpipe);
1094
585
    return 0;
1095
657
}
1096
1097
std::string
1098
zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
1099
                                      const char *tcp_address_)
1100
0
{
1101
    // The resolved last_endpoint is used as a key in the endpoints map.
1102
    // The address passed by the user might not match in the TCP case due to
1103
    // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1104
    // resolve before giving up. Given at this stage we don't know whether a
1105
    // socket is connected or bound, try with both.
1106
0
    if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1107
0
        tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1108
0
        alloc_assert (tcp_addr);
1109
0
        int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
1110
1111
0
        if (rc == 0) {
1112
0
            tcp_addr->to_string (endpoint_uri_pair_);
1113
0
            if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1114
0
                rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
1115
0
                if (rc == 0) {
1116
0
                    tcp_addr->to_string (endpoint_uri_pair_);
1117
0
                }
1118
0
            }
1119
0
        }
1120
0
        LIBZMQ_DELETE (tcp_addr);
1121
0
    }
1122
0
    return endpoint_uri_pair_;
1123
0
}
1124
1125
void zmq::socket_base_t::add_endpoint (
1126
  const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
1127
716
{
1128
    //  Activate the session. Make it a child of this socket.
1129
716
    launch_child (endpoint_);
1130
716
    _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
1131
716
                                          endpoint_pipe_t (endpoint_, pipe_));
1132
1133
716
    if (pipe_ != NULL)
1134
585
        pipe_->set_endpoint_pair (endpoint_pair_);
1135
716
}
1136
1137
int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1138
0
{
1139
0
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1140
1141
    //  Check whether the context hasn't been shut down yet.
1142
0
    if (unlikely (_ctx_terminated)) {
1143
0
        errno = ETERM;
1144
0
        return -1;
1145
0
    }
1146
1147
    //  Check whether endpoint address passed to the function is valid.
1148
0
    if (unlikely (!endpoint_uri_)) {
1149
0
        errno = EINVAL;
1150
0
        return -1;
1151
0
    }
1152
1153
    //  Process pending commands, if any, since there could be pending unprocessed process_own()'s
1154
    //  (from launch_child() for example) we're asked to terminate now.
1155
0
    const int rc = process_commands (0, false);
1156
0
    if (unlikely (rc != 0)) {
1157
0
        return -1;
1158
0
    }
1159
1160
    //  Parse endpoint_uri_ string.
1161
0
    std::string uri_protocol;
1162
0
    std::string uri_path;
1163
0
    if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1164
0
        || check_protocol (uri_protocol)) {
1165
0
        return -1;
1166
0
    }
1167
1168
0
    const std::string endpoint_uri_str = std::string (endpoint_uri_);
1169
1170
    // Disconnect an inproc socket
1171
0
    if (uri_protocol == protocol_name::inproc) {
1172
0
        return unregister_endpoint (endpoint_uri_str, this) == 0
1173
0
                 ? 0
1174
0
                 : _inprocs.erase_pipes (endpoint_uri_str);
1175
0
    }
1176
1177
0
    const std::string resolved_endpoint_uri =
1178
0
      uri_protocol == protocol_name::tcp
1179
0
        ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1180
0
        : endpoint_uri_str;
1181
1182
    //  Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
1183
0
    const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1184
0
      _endpoints.equal_range (resolved_endpoint_uri);
1185
0
    if (range.first == range.second) {
1186
0
        errno = ENOENT;
1187
0
        return -1;
1188
0
    }
1189
1190
0
    for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1191
        //  If we have an associated pipe, terminate it.
1192
0
        if (it->second.second != NULL)
1193
0
            it->second.second->terminate (false);
1194
0
        term_child (it->second.first);
1195
0
    }
1196
0
    _endpoints.erase (range.first, range.second);
1197
1198
0
    if (options.reconnect_stop & ZMQ_RECONNECT_STOP_AFTER_DISCONNECT) {
1199
0
        _disconnected = true;
1200
0
    }
1201
1202
0
    return 0;
1203
0
}
1204
1205
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1206
0
{
1207
0
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1208
1209
    //  Check whether the context hasn't been shut down yet.
1210
0
    if (unlikely (_ctx_terminated)) {
1211
0
        errno = ETERM;
1212
0
        return -1;
1213
0
    }
1214
1215
    //  Check whether message passed to the function is valid.
1216
0
    if (unlikely (!msg_ || !msg_->check ())) {
1217
0
        errno = EFAULT;
1218
0
        return -1;
1219
0
    }
1220
1221
    //  Process pending commands, if any.
1222
0
    int rc = process_commands (0, true);
1223
0
    if (unlikely (rc != 0)) {
1224
0
        return -1;
1225
0
    }
1226
1227
    //  Clear any user-visible flags that are set on the message.
1228
0
    msg_->reset_flags (msg_t::more);
1229
1230
    //  At this point we impose the flags on the message.
1231
0
    if (flags_ & ZMQ_SNDMORE)
1232
0
        msg_->set_flags (msg_t::more);
1233
1234
0
    msg_->reset_metadata ();
1235
1236
    //  Try to send the message using method in each socket class
1237
0
    rc = xsend (msg_);
1238
0
    if (rc == 0) {
1239
0
        return 0;
1240
0
    }
1241
    //  Special case for ZMQ_PUSH: -2 means pipe is dead while a
1242
    //  multi-part send is in progress and can't be recovered, so drop
1243
    //  silently when in blocking mode to keep backward compatibility.
1244
0
    if (unlikely (rc == -2)) {
1245
0
        if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
1246
0
            rc = msg_->close ();
1247
0
            errno_assert (rc == 0);
1248
0
            rc = msg_->init ();
1249
0
            errno_assert (rc == 0);
1250
0
            return 0;
1251
0
        }
1252
0
    }
1253
0
    if (unlikely (errno != EAGAIN)) {
1254
0
        return -1;
1255
0
    }
1256
1257
    //  In case of non-blocking send we'll simply propagate
1258
    //  the error - including EAGAIN - up the stack.
1259
0
    if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
1260
0
        return -1;
1261
0
    }
1262
1263
    //  Compute the time when the timeout should occur.
1264
    //  If the timeout is infinite, don't care.
1265
0
    int timeout = options.sndtimeo;
1266
0
    const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1267
1268
    //  Oops, we couldn't send the message. Wait for the next
1269
    //  command, process it and try to send the message again.
1270
    //  If timeout is reached in the meantime, return EAGAIN.
1271
0
    while (true) {
1272
0
        if (unlikely (process_commands (timeout, false) != 0)) {
1273
0
            return -1;
1274
0
        }
1275
0
        rc = xsend (msg_);
1276
0
        if (rc == 0)
1277
0
            break;
1278
0
        if (unlikely (errno != EAGAIN)) {
1279
0
            return -1;
1280
0
        }
1281
0
        if (timeout > 0) {
1282
0
            timeout = static_cast<int> (end - _clock.now_ms ());
1283
0
            if (timeout <= 0) {
1284
0
                errno = EAGAIN;
1285
0
                return -1;
1286
0
            }
1287
0
        }
1288
0
    }
1289
1290
0
    return 0;
1291
0
}
1292
1293
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1294
0
{
1295
0
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1296
1297
    //  Check whether the context hasn't been shut down yet.
1298
0
    if (unlikely (_ctx_terminated)) {
1299
0
        errno = ETERM;
1300
0
        return -1;
1301
0
    }
1302
1303
    //  Check whether message passed to the function is valid.
1304
0
    if (unlikely (!msg_ || !msg_->check ())) {
1305
0
        errno = EFAULT;
1306
0
        return -1;
1307
0
    }
1308
1309
    //  Once every inbound_poll_rate messages check for signals and process
1310
    //  incoming commands. This happens only if we are not polling altogether
1311
    //  because there are messages available all the time. If poll occurs,
1312
    //  ticks is set to zero and thus we avoid this code.
1313
    //
1314
    //  Note that 'recv' uses different command throttling algorithm (the one
1315
    //  described above) from the one used by 'send'. This is because counting
1316
    //  ticks is more efficient than doing RDTSC all the time.
1317
0
    if (++_ticks == inbound_poll_rate) {
1318
0
        if (unlikely (process_commands (0, false) != 0)) {
1319
0
            return -1;
1320
0
        }
1321
0
        _ticks = 0;
1322
0
    }
1323
1324
    //  Get the message.
1325
0
    int rc = xrecv (msg_);
1326
0
    if (unlikely (rc != 0 && errno != EAGAIN)) {
1327
0
        return -1;
1328
0
    }
1329
1330
    //  If we have the message, return immediately.
1331
0
    if (rc == 0) {
1332
0
        extract_flags (msg_);
1333
0
        return 0;
1334
0
    }
1335
1336
    //  If the message cannot be fetched immediately, there are two scenarios.
1337
    //  For non-blocking recv, commands are processed in case there's an
1338
    //  activate_reader command already waiting in a command pipe.
1339
    //  If it's not, return EAGAIN.
1340
0
    if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
1341
0
        if (unlikely (process_commands (0, false) != 0)) {
1342
0
            return -1;
1343
0
        }
1344
0
        _ticks = 0;
1345
1346
0
        rc = xrecv (msg_);
1347
0
        if (rc < 0) {
1348
0
            return rc;
1349
0
        }
1350
0
        extract_flags (msg_);
1351
1352
0
        return 0;
1353
0
    }
1354
1355
    //  Compute the time when the timeout should occur.
1356
    //  If the timeout is infinite, don't care.
1357
0
    int timeout = options.rcvtimeo;
1358
0
    const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1359
1360
    //  In blocking scenario, commands are processed over and over again until
1361
    //  we are able to fetch a message.
1362
0
    bool block = (_ticks != 0);
1363
0
    while (true) {
1364
0
        if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1365
0
            return -1;
1366
0
        }
1367
0
        rc = xrecv (msg_);
1368
0
        if (rc == 0) {
1369
0
            _ticks = 0;
1370
0
            break;
1371
0
        }
1372
0
        if (unlikely (errno != EAGAIN)) {
1373
0
            return -1;
1374
0
        }
1375
0
        block = true;
1376
0
        if (timeout > 0) {
1377
0
            timeout = static_cast<int> (end - _clock.now_ms ());
1378
0
            if (timeout <= 0) {
1379
0
                errno = EAGAIN;
1380
0
                return -1;
1381
0
            }
1382
0
        }
1383
0
    }
1384
1385
0
    extract_flags (msg_);
1386
0
    return 0;
1387
0
}
1388
1389
int zmq::socket_base_t::close ()
1390
4.21k
{
1391
4.21k
    scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1392
1393
    //  Remove all existing signalers for thread safe sockets
1394
4.21k
    if (_thread_safe)
1395
0
        (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1396
1397
    //  Mark the socket as dead
1398
4.21k
    _tag = 0xdeadbeef;
1399
1400
1401
    //  Transfer the ownership of the socket from this application thread
1402
    //  to the reaper thread which will take care of the rest of shutdown
1403
    //  process.
1404
4.21k
    send_reap (this);
1405
1406
4.21k
    return 0;
1407
4.21k
}
1408
1409
bool zmq::socket_base_t::has_in ()
1410
923
{
1411
923
    return xhas_in ();
1412
923
}
1413
1414
bool zmq::socket_base_t::has_out ()
1415
923
{
1416
923
    return xhas_out ();
1417
923
}
1418
1419
void zmq::socket_base_t::start_reaping (poller_t *poller_)
1420
4.21k
{
1421
    //  Plug the socket to the reaper thread.
1422
4.21k
    _poller = poller_;
1423
1424
4.21k
    fd_t fd;
1425
1426
4.21k
    if (!_thread_safe)
1427
4.21k
        fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1428
0
    else {
1429
0
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1430
1431
0
        _reaper_signaler = new (std::nothrow) signaler_t ();
1432
0
        zmq_assert (_reaper_signaler);
1433
1434
        //  Add signaler to the safe mailbox
1435
0
        fd = _reaper_signaler->get_fd ();
1436
0
        (static_cast<mailbox_safe_t *> (_mailbox))
1437
0
          ->add_signaler (_reaper_signaler);
1438
1439
        //  Send a signal to make sure reaper handle existing commands
1440
0
        _reaper_signaler->send ();
1441
0
    }
1442
1443
4.21k
    _handle = _poller->add_fd (fd, this);
1444
4.21k
    _poller->set_pollin (_handle);
1445
1446
    //  Initialise the termination and check whether it can be deallocated
1447
    //  immediately.
1448
4.21k
    terminate ();
1449
4.21k
    check_destroy ();
1450
4.21k
}
1451
1452
int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1453
6.43k
{
1454
6.43k
    if (timeout_ == 0) {
1455
        //  If we are asked not to wait, check whether we haven't processed
1456
        //  commands recently, so that we can throttle the new commands.
1457
1458
        //  Get the CPU's tick counter. If 0, the counter is not available.
1459
6.43k
        const uint64_t tsc = zmq::clock_t::rdtsc ();
1460
1461
        //  Optimised version of command processing - it doesn't have to check
1462
        //  for incoming commands each time. It does so only if certain time
1463
        //  elapsed since last command processing. Command delay varies
1464
        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1465
        //  etc. The optimisation makes sense only on platforms where getting
1466
        //  a timestamp is a very cheap operation (tens of nanoseconds).
1467
6.43k
        if (tsc && throttle_) {
1468
            //  Check whether TSC haven't jumped backwards (in case of migration
1469
            //  between CPU cores) and whether certain time have elapsed since
1470
            //  last command processing. If it didn't do nothing.
1471
0
            if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1472
0
                return 0;
1473
0
            _last_tsc = tsc;
1474
0
        }
1475
6.43k
    }
1476
1477
    //  Check whether there are any commands pending for this thread.
1478
6.43k
    command_t cmd;
1479
6.43k
    int rc = _mailbox->recv (&cmd, timeout_);
1480
1481
6.43k
    if (rc != 0 && errno == EINTR)
1482
0
        return -1;
1483
1484
    //  Process all available commands.
1485
9.64k
    while (rc == 0 || errno == EINTR) {
1486
3.21k
        if (rc == 0) {
1487
3.21k
            cmd.destination->process_command (cmd);
1488
3.21k
        }
1489
3.21k
        rc = _mailbox->recv (&cmd, 0);
1490
3.21k
    }
1491
1492
6.43k
    zmq_assert (errno == EAGAIN);
1493
1494
6.43k
    if (_ctx_terminated) {
1495
2.18k
        errno = ETERM;
1496
2.18k
        return -1;
1497
2.18k
    }
1498
1499
4.24k
    return 0;
1500
6.43k
}
1501
1502
void zmq::socket_base_t::process_stop ()
1503
832
{
1504
    //  Here, someone have called zmq_ctx_term while the socket was still alive.
1505
    //  We'll remember the fact so that any blocking call is interrupted and any
1506
    //  further attempt to use the socket will return ETERM. The user is still
1507
    //  responsible for calling zmq_close on the socket though!
1508
832
    scoped_lock_t lock (_monitor_sync);
1509
832
    stop_monitor ();
1510
1511
832
    _ctx_terminated = true;
1512
832
}
1513
1514
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
1515
58
{
1516
58
    attach_pipe (pipe_);
1517
58
}
1518
1519
void zmq::socket_base_t::process_term (int linger_)
1520
4.21k
{
1521
    //  Unregister all inproc endpoints associated with this socket.
1522
    //  Doing this we make sure that no new pipes from other sockets (inproc)
1523
    //  will be initiated.
1524
4.21k
    unregister_endpoints (this);
1525
1526
    //  Ask all attached pipes to terminate.
1527
4.91k
    for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1528
        //  Only inprocs might have a disconnect message set
1529
701
        _pipes[i]->send_disconnect_msg ();
1530
701
        _pipes[i]->terminate (false);
1531
701
    }
1532
4.21k
    register_term_acks (static_cast<int> (_pipes.size ()));
1533
1534
    //  Continue the termination process immediately.
1535
4.21k
    own_t::process_term (linger_);
1536
4.21k
}
1537
1538
void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
1539
0
{
1540
0
    term_endpoint (endpoint_->c_str ());
1541
0
    delete endpoint_;
1542
0
}
1543
1544
void zmq::socket_base_t::process_pipe_stats_publish (
1545
  uint64_t outbound_queue_count_,
1546
  uint64_t inbound_queue_count_,
1547
  endpoint_uri_pair_t *endpoint_pair_)
1548
0
{
1549
0
    uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1550
0
    event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1551
0
    delete endpoint_pair_;
1552
0
}
1553
1554
/*
1555
 * There are 2 pipes per connection, and the inbound one _must_ be queried from
1556
 * the I/O thread. So ask the outbound pipe, in the application thread, to send
1557
 * a message (pipe_peer_stats) to its peer. The message will carry the outbound
1558
 * pipe stats and endpoint, and the reference to the socket object.
1559
 * The inbound pipe on the I/O thread will then add its own stats and endpoint,
1560
 * and write back a message to the socket object (pipe_stats_publish) which
1561
 * will raise an event with the data.
1562
 */
1563
int zmq::socket_base_t::query_pipes_stats ()
1564
0
{
1565
0
    {
1566
0
        scoped_lock_t lock (_monitor_sync);
1567
0
        if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
1568
0
            errno = EINVAL;
1569
0
            return -1;
1570
0
        }
1571
0
    }
1572
0
    if (_pipes.size () == 0) {
1573
0
        errno = EAGAIN;
1574
0
        return -1;
1575
0
    }
1576
0
    for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1577
0
        _pipes[i]->send_stats_to_peer (this);
1578
0
    }
1579
1580
0
    return 0;
1581
0
}
1582
1583
void zmq::socket_base_t::update_pipe_options (int option_)
1584
101k
{
1585
101k
    if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1586
1.84k
        for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
1587
0
            _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
1588
0
            _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1589
0
        }
1590
1.84k
    }
1591
101k
}
1592
1593
void zmq::socket_base_t::process_destroy ()
1594
4.21k
{
1595
4.21k
    _destroyed = true;
1596
4.21k
}
1597
1598
int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1599
0
{
1600
0
    errno = EINVAL;
1601
0
    return -1;
1602
0
}
1603
1604
int zmq::socket_base_t::xgetsockopt (int, void *, size_t *)
1605
0
{
1606
0
    errno = EINVAL;
1607
0
    return -1;
1608
0
}
1609
1610
bool zmq::socket_base_t::xhas_out ()
1611
0
{
1612
0
    return false;
1613
0
}
1614
1615
int zmq::socket_base_t::xsend (msg_t *)
1616
0
{
1617
0
    errno = ENOTSUP;
1618
0
    return -1;
1619
0
}
1620
1621
bool zmq::socket_base_t::xhas_in ()
1622
0
{
1623
0
    return false;
1624
0
}
1625
1626
int zmq::socket_base_t::xjoin (const char *group_)
1627
0
{
1628
0
    LIBZMQ_UNUSED (group_);
1629
0
    errno = ENOTSUP;
1630
0
    return -1;
1631
0
}
1632
1633
int zmq::socket_base_t::xleave (const char *group_)
1634
0
{
1635
0
    LIBZMQ_UNUSED (group_);
1636
0
    errno = ENOTSUP;
1637
0
    return -1;
1638
0
}
1639
1640
int zmq::socket_base_t::xrecv (msg_t *)
1641
0
{
1642
0
    errno = ENOTSUP;
1643
0
    return -1;
1644
0
}
1645
1646
void zmq::socket_base_t::xread_activated (pipe_t *)
1647
0
{
1648
0
    zmq_assert (false);
1649
0
}
1650
void zmq::socket_base_t::xwrite_activated (pipe_t *)
1651
0
{
1652
0
    zmq_assert (false);
1653
0
}
1654
1655
void zmq::socket_base_t::xhiccuped (pipe_t *)
1656
0
{
1657
0
    zmq_assert (false);
1658
0
}
1659
1660
void zmq::socket_base_t::in_event ()
1661
2.21k
{
1662
    //  This function is invoked only once the socket is running in the context
1663
    //  of the reaper thread. Process any commands from other threads/sockets
1664
    //  that may be available at the moment. Ultimately, the socket will
1665
    //  be destroyed.
1666
2.21k
    {
1667
2.21k
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1668
1669
        //  If the socket is thread safe we need to unsignal the reaper signaler
1670
2.21k
        if (_thread_safe)
1671
0
            _reaper_signaler->recv ();
1672
1673
2.21k
        process_commands (0, false);
1674
2.21k
    }
1675
2.21k
    check_destroy ();
1676
2.21k
}
1677
1678
void zmq::socket_base_t::out_event ()
1679
0
{
1680
0
    zmq_assert (false);
1681
0
}
1682
1683
void zmq::socket_base_t::timer_event (int)
1684
0
{
1685
0
    zmq_assert (false);
1686
0
}
1687
1688
void zmq::socket_base_t::check_destroy ()
1689
6.43k
{
1690
    //  If the object was already marked as destroyed, finish the deallocation.
1691
6.43k
    if (_destroyed) {
1692
        //  Remove the socket from the reaper's poller.
1693
4.21k
        _poller->rm_fd (_handle);
1694
1695
        //  Remove the socket from the context.
1696
4.21k
        destroy_socket (this);
1697
1698
        //  Notify the reaper about the fact.
1699
4.21k
        send_reaped ();
1700
1701
        //  Deallocate.
1702
4.21k
        own_t::process_destroy ();
1703
4.21k
    }
1704
6.43k
}
1705
1706
void zmq::socket_base_t::read_activated (pipe_t *pipe_)
1707
0
{
1708
0
    xread_activated (pipe_);
1709
0
}
1710
1711
void zmq::socket_base_t::write_activated (pipe_t *pipe_)
1712
0
{
1713
0
    xwrite_activated (pipe_);
1714
0
}
1715
1716
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1717
0
{
1718
0
    if (options.immediate == 1)
1719
0
        pipe_->terminate (false);
1720
0
    else
1721
        // Notify derived sockets of the hiccup
1722
0
        xhiccuped (pipe_);
1723
0
}
1724
1725
void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1726
701
{
1727
    //  Notify the specific socket type about the pipe termination.
1728
701
    xpipe_terminated (pipe_);
1729
1730
    // Remove pipe from inproc pipes
1731
701
    _inprocs.erase_pipe (pipe_);
1732
1733
    //  Remove the pipe from the list of attached pipes and confirm its
1734
    //  termination if we are already shutting down.
1735
701
    _pipes.erase (pipe_);
1736
1737
    // Remove the pipe from _endpoints (set it to NULL).
1738
701
    const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1739
701
    if (!identifier.empty ()) {
1740
585
        std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
1741
585
        range = _endpoints.equal_range (identifier);
1742
1743
585
        for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1744
585
            if (it->second.second == pipe_) {
1745
585
                it->second.second = NULL;
1746
585
                break;
1747
585
            }
1748
585
        }
1749
585
    }
1750
1751
701
    if (is_terminating ())
1752
701
        unregister_term_ack ();
1753
701
}
1754
1755
void zmq::socket_base_t::extract_flags (const msg_t *msg_)
1756
0
{
1757
    //  Test whether routing_id flag is valid for this socket type.
1758
0
    if (unlikely (msg_->flags () & msg_t::routing_id))
1759
0
        zmq_assert (options.recv_routing_id);
1760
1761
    //  Remove MORE flag.
1762
0
    _rcvmore = (msg_->flags () & msg_t::more) != 0;
1763
0
}
1764
1765
int zmq::socket_base_t::monitor (const char *endpoint_,
1766
                                 uint64_t events_,
1767
                                 int event_version_,
1768
                                 int type_)
1769
0
{
1770
0
    scoped_lock_t lock (_monitor_sync);
1771
1772
0
    if (unlikely (_ctx_terminated)) {
1773
0
        errno = ETERM;
1774
0
        return -1;
1775
0
    }
1776
1777
    //  Event version 1 supports only first 16 events.
1778
0
    if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1779
0
        errno = EINVAL;
1780
0
        return -1;
1781
0
    }
1782
1783
    //  Support deregistering monitoring endpoints as well
1784
0
    if (endpoint_ == NULL) {
1785
0
        stop_monitor ();
1786
0
        return 0;
1787
0
    }
1788
    //  Parse endpoint_uri_ string.
1789
0
    std::string protocol;
1790
0
    std::string address;
1791
0
    if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1792
0
        return -1;
1793
1794
    //  Event notification only supported over inproc://
1795
0
    if (protocol != protocol_name::inproc) {
1796
0
        errno = EPROTONOSUPPORT;
1797
0
        return -1;
1798
0
    }
1799
1800
    // already monitoring. Stop previous monitor before starting new one.
1801
0
    if (_monitor_socket != NULL) {
1802
0
        stop_monitor (true);
1803
0
    }
1804
1805
    // Check if the specified socket type is supported. It must be a
1806
    // one-way socket types that support the SNDMORE flag.
1807
0
    switch (type_) {
1808
0
        case ZMQ_PAIR:
1809
0
            break;
1810
0
        case ZMQ_PUB:
1811
0
            break;
1812
0
        case ZMQ_PUSH:
1813
0
            break;
1814
0
        default:
1815
0
            errno = EINVAL;
1816
0
            return -1;
1817
0
    }
1818
1819
    //  Register events to monitor
1820
0
    _monitor_events = events_;
1821
0
    options.monitor_event_version = event_version_;
1822
    //  Create a monitor socket of the specified type.
1823
0
    _monitor_socket = zmq_socket (get_ctx (), type_);
1824
0
    if (_monitor_socket == NULL)
1825
0
        return -1;
1826
1827
    //  Never block context termination on pending event messages
1828
0
    int linger = 0;
1829
0
    int rc =
1830
0
      zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1831
0
    if (rc == -1)
1832
0
        stop_monitor (false);
1833
1834
    //  Spawn the monitor socket endpoint
1835
0
    rc = zmq_bind (_monitor_socket, endpoint_);
1836
0
    if (rc == -1)
1837
0
        stop_monitor (false);
1838
0
    return rc;
1839
0
}
1840
1841
void zmq::socket_base_t::event_connected (
1842
  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1843
0
{
1844
0
    uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1845
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
1846
0
}
1847
1848
void zmq::socket_base_t::event_connect_delayed (
1849
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1850
101
{
1851
101
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1852
101
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
1853
101
}
1854
1855
void zmq::socket_base_t::event_connect_retried (
1856
  const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
1857
528
{
1858
528
    uint64_t values[1] = {static_cast<uint64_t> (interval_)};
1859
528
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
1860
528
}
1861
1862
void zmq::socket_base_t::event_listening (
1863
  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1864
131
{
1865
131
    uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1866
131
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
1867
131
}
1868
1869
void zmq::socket_base_t::event_bind_failed (
1870
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1871
968
{
1872
968
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1873
968
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
1874
968
}
1875
1876
void zmq::socket_base_t::event_accepted (
1877
  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1878
0
{
1879
0
    uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1880
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
1881
0
}
1882
1883
void zmq::socket_base_t::event_accept_failed (
1884
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1885
0
{
1886
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1887
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
1888
0
}
1889
1890
void zmq::socket_base_t::event_closed (
1891
  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1892
340
{
1893
340
    uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1894
340
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
1895
340
}
1896
1897
void zmq::socket_base_t::event_close_failed (
1898
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1899
0
{
1900
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1901
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
1902
0
}
1903
1904
void zmq::socket_base_t::event_disconnected (
1905
  const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1906
0
{
1907
0
    uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1908
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
1909
0
}
1910
1911
void zmq::socket_base_t::event_handshake_failed_no_detail (
1912
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1913
0
{
1914
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1915
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
1916
0
}
1917
1918
void zmq::socket_base_t::event_handshake_failed_protocol (
1919
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1920
0
{
1921
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1922
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1923
0
}
1924
1925
void zmq::socket_base_t::event_handshake_failed_auth (
1926
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1927
0
{
1928
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1929
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1930
0
}
1931
1932
void zmq::socket_base_t::event_handshake_succeeded (
1933
  const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1934
0
{
1935
0
    uint64_t values[1] = {static_cast<uint64_t> (err_)};
1936
0
    event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
1937
0
}
1938
1939
void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
1940
                                uint64_t values_[],
1941
                                uint64_t values_count_,
1942
                                uint64_t type_)
1943
2.06k
{
1944
2.06k
    scoped_lock_t lock (_monitor_sync);
1945
2.06k
    if (_monitor_events & type_) {
1946
0
        monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1947
0
    }
1948
2.06k
}
1949
1950
//  Send a monitor event
1951
void zmq::socket_base_t::monitor_event (
1952
  uint64_t event_,
1953
  const uint64_t values_[],
1954
  uint64_t values_count_,
1955
  const endpoint_uri_pair_t &endpoint_uri_pair_) const
1956
0
{
1957
    // this is a private method which is only called from
1958
    // contexts where the _monitor_sync mutex has been locked before
1959
1960
0
    if (_monitor_socket) {
1961
0
        zmq_msg_t msg;
1962
1963
0
        switch (options.monitor_event_version) {
1964
0
            case 1: {
1965
                //  The API should not allow to activate unsupported events
1966
0
                zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1967
                //  v1 only allows one value
1968
0
                zmq_assert (values_count_ == 1);
1969
0
                zmq_assert (values_[0]
1970
0
                            <= std::numeric_limits<uint32_t>::max ());
1971
1972
                //  Send event and value in first frame
1973
0
                const uint16_t event = static_cast<uint16_t> (event_);
1974
0
                const uint32_t value = static_cast<uint32_t> (values_[0]);
1975
0
                zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1976
0
                uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1977
                //  Avoid dereferencing uint32_t on unaligned address
1978
0
                memcpy (data + 0, &event, sizeof (event));
1979
0
                memcpy (data + sizeof (event), &value, sizeof (value));
1980
0
                zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1981
1982
0
                const std::string &endpoint_uri =
1983
0
                  endpoint_uri_pair_.identifier ();
1984
1985
                //  Send address in second frame
1986
0
                zmq_msg_init_size (&msg, endpoint_uri.size ());
1987
0
                memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
1988
0
                        endpoint_uri.size ());
1989
0
                zmq_msg_send (&msg, _monitor_socket, 0);
1990
0
            } break;
1991
0
            case 2: {
1992
                //  Send event in first frame (64bit unsigned)
1993
0
                zmq_msg_init_size (&msg, sizeof (event_));
1994
0
                memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
1995
0
                zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1996
1997
                //  Send number of values that will follow in second frame
1998
0
                zmq_msg_init_size (&msg, sizeof (values_count_));
1999
0
                memcpy (zmq_msg_data (&msg), &values_count_,
2000
0
                        sizeof (values_count_));
2001
0
                zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2002
2003
                //  Send values in third-Nth frames (64bit unsigned)
2004
0
                for (uint64_t i = 0; i < values_count_; ++i) {
2005
0
                    zmq_msg_init_size (&msg, sizeof (values_[i]));
2006
0
                    memcpy (zmq_msg_data (&msg), &values_[i],
2007
0
                            sizeof (values_[i]));
2008
0
                    zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2009
0
                }
2010
2011
                //  Send local endpoint URI in second-to-last frame (string)
2012
0
                zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
2013
0
                memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
2014
0
                        endpoint_uri_pair_.local.size ());
2015
0
                zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
2016
2017
                //  Send remote endpoint URI in last frame (string)
2018
0
                zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
2019
0
                memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
2020
0
                        endpoint_uri_pair_.remote.size ());
2021
0
                zmq_msg_send (&msg, _monitor_socket, 0);
2022
0
            } break;
2023
0
        }
2024
0
    }
2025
0
}
2026
2027
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
2028
5.04k
{
2029
    // this is a private method which is only called from
2030
    // contexts where the _monitor_sync mutex has been locked before
2031
2032
5.04k
    if (_monitor_socket) {
2033
0
        if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
2034
0
            && send_monitor_stopped_event_) {
2035
0
            uint64_t values[1] = {0};
2036
0
            monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
2037
0
                           endpoint_uri_pair_t ());
2038
0
        }
2039
0
        zmq_close (_monitor_socket);
2040
0
        _monitor_socket = NULL;
2041
0
        _monitor_events = 0;
2042
0
    }
2043
5.04k
}
2044
2045
bool zmq::socket_base_t::is_disconnected () const
2046
0
{
2047
0
    return _disconnected;
2048
0
}
2049
2050
zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
2051
                                                   uint32_t tid_,
2052
                                                   int sid_) :
2053
0
    socket_base_t (parent_, tid_, sid_)
2054
0
{
2055
0
}
2056
2057
zmq::routing_socket_base_t::~routing_socket_base_t ()
2058
0
{
2059
0
    zmq_assert (_out_pipes.empty ());
2060
0
}
2061
2062
int zmq::routing_socket_base_t::xsetsockopt (int option_,
2063
                                             const void *optval_,
2064
                                             size_t optvallen_)
2065
0
{
2066
0
    switch (option_) {
2067
0
        case ZMQ_CONNECT_ROUTING_ID:
2068
            // TODO why isn't it possible to set an empty connect_routing_id
2069
            //   (which is the default value)
2070
0
            if (optval_ && optvallen_) {
2071
0
                _connect_routing_id.assign (static_cast<const char *> (optval_),
2072
0
                                            optvallen_);
2073
0
                return 0;
2074
0
            }
2075
0
            break;
2076
0
    }
2077
0
    errno = EINVAL;
2078
0
    return -1;
2079
0
}
2080
2081
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
2082
0
{
2083
0
    const out_pipes_t::iterator end = _out_pipes.end ();
2084
0
    out_pipes_t::iterator it;
2085
0
    for (it = _out_pipes.begin (); it != end; ++it)
2086
0
        if (it->second.pipe == pipe_)
2087
0
            break;
2088
2089
0
    zmq_assert (it != end);
2090
0
    zmq_assert (!it->second.active);
2091
0
    it->second.active = true;
2092
0
}
2093
2094
std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
2095
0
{
2096
0
    std::string res = ZMQ_MOVE (_connect_routing_id);
2097
0
    _connect_routing_id.clear ();
2098
0
    return res;
2099
0
}
2100
2101
bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
2102
0
{
2103
0
    return !_connect_routing_id.empty ();
2104
0
}
2105
2106
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
2107
                                               pipe_t *pipe_)
2108
0
{
2109
    //  Add the record into output pipes lookup table
2110
0
    const out_pipe_t outpipe = {pipe_, true};
2111
0
    const bool ok =
2112
0
      _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
2113
0
        .second;
2114
0
    zmq_assert (ok);
2115
0
}
2116
2117
bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
2118
0
{
2119
0
    return 0 != _out_pipes.count (routing_id_);
2120
0
}
2121
2122
zmq::routing_socket_base_t::out_pipe_t *
2123
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
2124
0
{
2125
    // TODO we could probably avoid constructor a temporary blob_t to call this function
2126
0
    out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2127
0
    return it == _out_pipes.end () ? NULL : &it->second;
2128
0
}
2129
2130
const zmq::routing_socket_base_t::out_pipe_t *
2131
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
2132
0
{
2133
    // TODO we could probably avoid constructor a temporary blob_t to call this function
2134
0
    const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
2135
0
    return it == _out_pipes.end () ? NULL : &it->second;
2136
0
}
2137
2138
void zmq::routing_socket_base_t::erase_out_pipe (const pipe_t *pipe_)
2139
0
{
2140
0
    const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2141
0
    zmq_assert (erased);
2142
0
}
2143
2144
zmq::routing_socket_base_t::out_pipe_t
2145
zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
2146
0
{
2147
0
    const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2148
0
    out_pipe_t res = {NULL, false};
2149
0
    if (it != _out_pipes.end ()) {
2150
0
        res = it->second;
2151
0
        _out_pipes.erase (it);
2152
0
    }
2153
0
    return res;
2154
0
}