Coverage Report

Created: 2025-07-11 06:23

/src/libzmq/src/ctx.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#ifndef ZMQ_HAVE_WINDOWS
6
#include <unistd.h>
7
#endif
8
9
#include <limits>
10
#include <climits>
11
#include <new>
12
#include <sstream>
13
#include <string.h>
14
15
#include "ctx.hpp"
16
#include "socket_base.hpp"
17
#include "io_thread.hpp"
18
#include "reaper.hpp"
19
#include "pipe.hpp"
20
#include "err.hpp"
21
#include "msg.hpp"
22
#include "random.hpp"
23
24
#ifdef ZMQ_HAVE_VMCI
25
#include <vmci_sockets.h>
26
#endif
27
28
#ifdef ZMQ_USE_NSS
29
#include <nss.h>
30
#endif
31
32
#ifdef ZMQ_USE_GNUTLS
33
#include <gnutls/gnutls.h>
34
#endif
35
36
0
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
37
0
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
38
39
static int clipped_maxsocket (int max_requested_)
40
0
{
41
0
    if (max_requested_ >= zmq::poller_t::max_fds ()
42
0
        && zmq::poller_t::max_fds () != -1)
43
        // -1 because we need room for the reaper mailbox.
44
0
        max_requested_ = zmq::poller_t::max_fds () - 1;
45
46
0
    return max_requested_;
47
0
}
48
49
zmq::ctx_t::ctx_t () :
50
0
    _tag (ZMQ_CTX_TAG_VALUE_GOOD),
51
0
    _starting (true),
52
0
    _terminating (false),
53
    _reaper (NULL),
54
0
    _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
55
    _max_msgsz (INT_MAX),
56
0
    _io_thread_count (ZMQ_IO_THREADS_DFLT),
57
0
    _blocky (true),
58
0
    _ipv6 (false),
59
0
    _zero_copy (true)
60
0
{
61
0
#ifdef HAVE_FORK
62
0
    _pid = getpid ();
63
0
#endif
64
#ifdef ZMQ_HAVE_VMCI
65
    _vmci_fd = -1;
66
    _vmci_family = -1;
67
#endif
68
69
    //  Initialise crypto library, if needed.
70
0
    zmq::random_open ();
71
72
#ifdef ZMQ_USE_NSS
73
    NSS_NoDB_Init (NULL);
74
#endif
75
76
#ifdef ZMQ_USE_GNUTLS
77
    gnutls_global_init ();
78
#endif
79
0
}
80
81
bool zmq::ctx_t::check_tag () const
82
0
{
83
0
    return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
84
0
}
85
86
zmq::ctx_t::~ctx_t ()
87
0
{
88
    //  Check that there are no remaining _sockets.
89
0
    zmq_assert (_sockets.empty ());
90
91
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
92
    //  thread subsequent invocation of destructor would hang-up.
93
0
    const io_threads_t::size_type io_threads_size = _io_threads.size ();
94
0
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
95
0
        _io_threads[i]->stop ();
96
0
    }
97
98
    //  Wait till I/O threads actually terminate.
99
0
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
100
0
        LIBZMQ_DELETE (_io_threads[i]);
101
0
    }
102
103
    //  Deallocate the reaper thread object.
104
0
    LIBZMQ_DELETE (_reaper);
105
106
    //  The mailboxes in _slots themselves were deallocated with their
107
    //  corresponding io_thread/socket objects.
108
109
    //  De-initialise crypto library, if needed.
110
0
    zmq::random_close ();
111
112
#ifdef ZMQ_USE_NSS
113
    NSS_Shutdown ();
114
#endif
115
116
#ifdef ZMQ_USE_GNUTLS
117
    gnutls_global_deinit ();
118
#endif
119
120
    //  Remove the tag, so that the object is considered dead.
121
0
    _tag = ZMQ_CTX_TAG_VALUE_BAD;
122
0
}
123
124
bool zmq::ctx_t::valid () const
125
0
{
126
0
    return _term_mailbox.valid ();
127
0
}
128
129
int zmq::ctx_t::terminate ()
130
0
{
131
0
    _slot_sync.lock ();
132
133
0
    const bool save_terminating = _terminating;
134
0
    _terminating = false;
135
136
    // Connect up any pending inproc connections, otherwise we will hang
137
0
    pending_connections_t copy = _pending_connections;
138
0
    for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
139
0
         p != end; ++p) {
140
0
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
141
        // create_socket might fail eg: out of memory/sockets limit reached
142
0
        zmq_assert (s);
143
0
        s->bind (p->first.c_str ());
144
0
        s->close ();
145
0
    }
146
0
    _terminating = save_terminating;
147
148
0
    if (!_starting) {
149
0
#ifdef HAVE_FORK
150
0
        if (_pid != getpid ()) {
151
            // we are a forked child process. Close all file descriptors
152
            // inherited from the parent.
153
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
154
0
                 i++) {
155
0
                _sockets[i]->get_mailbox ()->forked ();
156
0
            }
157
0
            _term_mailbox.forked ();
158
0
        }
159
0
#endif
160
161
        //  Check whether termination was already underway, but interrupted and now
162
        //  restarted.
163
0
        const bool restarted = _terminating;
164
0
        _terminating = true;
165
166
        //  First attempt to terminate the context.
167
0
        if (!restarted) {
168
            //  First send stop command to sockets so that any blocking calls
169
            //  can be interrupted. If there are no sockets we can ask reaper
170
            //  thread to stop.
171
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
172
0
                 i++) {
173
0
                _sockets[i]->stop ();
174
0
            }
175
0
            if (_sockets.empty ())
176
0
                _reaper->stop ();
177
0
        }
178
0
        _slot_sync.unlock ();
179
180
        //  Wait till reaper thread closes all the sockets.
181
0
        command_t cmd;
182
0
        const int rc = _term_mailbox.recv (&cmd, -1);
183
0
        if (rc == -1 && errno == EINTR)
184
0
            return -1;
185
0
        errno_assert (rc == 0);
186
0
        zmq_assert (cmd.type == command_t::done);
187
0
        _slot_sync.lock ();
188
0
        zmq_assert (_sockets.empty ());
189
0
    }
190
0
    _slot_sync.unlock ();
191
192
#ifdef ZMQ_HAVE_VMCI
193
    _vmci_sync.lock ();
194
195
    VMCISock_ReleaseAFValueFd (_vmci_fd);
196
    _vmci_family = -1;
197
    _vmci_fd = -1;
198
199
    _vmci_sync.unlock ();
200
#endif
201
202
    //  Deallocate the resources.
203
0
    delete this;
204
205
0
    return 0;
206
0
}
207
208
int zmq::ctx_t::shutdown ()
209
0
{
210
0
    scoped_lock_t locker (_slot_sync);
211
212
0
    if (!_terminating) {
213
0
        _terminating = true;
214
215
0
        if (!_starting) {
216
            //  Send stop command to sockets so that any blocking calls
217
            //  can be interrupted. If there are no sockets we can ask reaper
218
            //  thread to stop.
219
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
220
0
                 i++) {
221
0
                _sockets[i]->stop ();
222
0
            }
223
0
            if (_sockets.empty ())
224
0
                _reaper->stop ();
225
0
        }
226
0
    }
227
228
0
    return 0;
229
0
}
230
231
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
232
0
{
233
0
    const bool is_int = (optvallen_ == sizeof (int));
234
0
    int value = 0;
235
0
    if (is_int)
236
0
        memcpy (&value, optval_, sizeof (int));
237
238
0
    switch (option_) {
239
0
        case ZMQ_MAX_SOCKETS:
240
0
            if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
241
0
                scoped_lock_t locker (_opt_sync);
242
0
                _max_sockets = value;
243
0
                return 0;
244
0
            }
245
0
            break;
246
247
0
        case ZMQ_IO_THREADS:
248
0
            if (is_int && value >= 0) {
249
0
                scoped_lock_t locker (_opt_sync);
250
0
                _io_thread_count = value;
251
0
                return 0;
252
0
            }
253
0
            break;
254
255
0
        case ZMQ_IPV6:
256
0
            if (is_int && value >= 0) {
257
0
                scoped_lock_t locker (_opt_sync);
258
0
                _ipv6 = (value != 0);
259
0
                return 0;
260
0
            }
261
0
            break;
262
263
0
        case ZMQ_BLOCKY:
264
0
            if (is_int && value >= 0) {
265
0
                scoped_lock_t locker (_opt_sync);
266
0
                _blocky = (value != 0);
267
0
                return 0;
268
0
            }
269
0
            break;
270
271
0
        case ZMQ_MAX_MSGSZ:
272
0
            if (is_int && value >= 0) {
273
0
                scoped_lock_t locker (_opt_sync);
274
0
                _max_msgsz = value < INT_MAX ? value : INT_MAX;
275
0
                return 0;
276
0
            }
277
0
            break;
278
279
0
        case ZMQ_ZERO_COPY_RECV:
280
0
            if (is_int && value >= 0) {
281
0
                scoped_lock_t locker (_opt_sync);
282
0
                _zero_copy = (value != 0);
283
0
                return 0;
284
0
            }
285
0
            break;
286
287
0
        default: {
288
0
            return thread_ctx_t::set (option_, optval_, optvallen_);
289
0
        }
290
0
    }
291
292
0
    errno = EINVAL;
293
0
    return -1;
294
0
}
295
296
int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
297
0
{
298
0
    const bool is_int = (*optvallen_ == sizeof (int));
299
0
    int *value = static_cast<int *> (optval_);
300
301
0
    switch (option_) {
302
0
        case ZMQ_MAX_SOCKETS:
303
0
            if (is_int) {
304
0
                scoped_lock_t locker (_opt_sync);
305
0
                *value = _max_sockets;
306
0
                return 0;
307
0
            }
308
0
            break;
309
310
0
        case ZMQ_SOCKET_LIMIT:
311
0
            if (is_int) {
312
0
                *value = clipped_maxsocket (65535);
313
0
                return 0;
314
0
            }
315
0
            break;
316
317
0
        case ZMQ_IO_THREADS:
318
0
            if (is_int) {
319
0
                scoped_lock_t locker (_opt_sync);
320
0
                *value = _io_thread_count;
321
0
                return 0;
322
0
            }
323
0
            break;
324
325
0
        case ZMQ_IPV6:
326
0
            if (is_int) {
327
0
                scoped_lock_t locker (_opt_sync);
328
0
                *value = _ipv6;
329
0
                return 0;
330
0
            }
331
0
            break;
332
333
0
        case ZMQ_BLOCKY:
334
0
            if (is_int) {
335
0
                scoped_lock_t locker (_opt_sync);
336
0
                *value = _blocky;
337
0
                return 0;
338
0
            }
339
0
            break;
340
341
0
        case ZMQ_MAX_MSGSZ:
342
0
            if (is_int) {
343
0
                scoped_lock_t locker (_opt_sync);
344
0
                *value = _max_msgsz;
345
0
                return 0;
346
0
            }
347
0
            break;
348
349
0
        case ZMQ_MSG_T_SIZE:
350
0
            if (is_int) {
351
0
                scoped_lock_t locker (_opt_sync);
352
0
                *value = sizeof (zmq_msg_t);
353
0
                return 0;
354
0
            }
355
0
            break;
356
357
0
        case ZMQ_ZERO_COPY_RECV:
358
0
            if (is_int) {
359
0
                scoped_lock_t locker (_opt_sync);
360
0
                *value = _zero_copy;
361
0
                return 0;
362
0
            }
363
0
            break;
364
365
0
        default: {
366
0
            return thread_ctx_t::get (option_, optval_, optvallen_);
367
0
        }
368
0
    }
369
370
0
    errno = EINVAL;
371
0
    return -1;
372
0
}
373
374
int zmq::ctx_t::get (int option_)
375
0
{
376
0
    int optval = 0;
377
0
    size_t optvallen = sizeof (int);
378
379
0
    if (get (option_, &optval, &optvallen) == 0)
380
0
        return optval;
381
382
0
    errno = EINVAL;
383
0
    return -1;
384
0
}
385
386
bool zmq::ctx_t::start ()
387
0
{
388
    //  Initialise the array of mailboxes. Additional two slots are for
389
    //  zmq_ctx_term thread and reaper thread.
390
0
    _opt_sync.lock ();
391
0
    const int term_and_reaper_threads_count = 2;
392
0
    const int mazmq = _max_sockets;
393
0
    const int ios = _io_thread_count;
394
0
    _opt_sync.unlock ();
395
0
    const int slot_count = mazmq + ios + term_and_reaper_threads_count;
396
0
    try {
397
0
        _slots.reserve (slot_count);
398
0
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
399
0
    }
400
0
    catch (const std::bad_alloc &) {
401
0
        errno = ENOMEM;
402
0
        return false;
403
0
    }
404
0
    _slots.resize (term_and_reaper_threads_count);
405
406
    //  Initialise the infrastructure for zmq_ctx_term thread.
407
0
    _slots[term_tid] = &_term_mailbox;
408
409
    //  Create the reaper thread.
410
0
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
411
0
    if (!_reaper) {
412
0
        errno = ENOMEM;
413
0
        goto fail_cleanup_slots;
414
0
    }
415
0
    if (!_reaper->get_mailbox ()->valid ())
416
0
        goto fail_cleanup_reaper;
417
0
    _slots[reaper_tid] = _reaper->get_mailbox ();
418
0
    _reaper->start ();
419
420
    //  Create I/O thread objects and launch them.
421
0
    _slots.resize (slot_count, NULL);
422
423
0
    for (int i = term_and_reaper_threads_count;
424
0
         i != ios + term_and_reaper_threads_count; i++) {
425
0
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
426
0
        if (!io_thread) {
427
0
            errno = ENOMEM;
428
0
            goto fail_cleanup_reaper;
429
0
        }
430
0
        if (!io_thread->get_mailbox ()->valid ()) {
431
0
            delete io_thread;
432
0
            goto fail_cleanup_reaper;
433
0
        }
434
0
        _io_threads.push_back (io_thread);
435
0
        _slots[i] = io_thread->get_mailbox ();
436
0
        io_thread->start ();
437
0
    }
438
439
    //  In the unused part of the slot array, create a list of empty slots.
440
0
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
441
0
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
442
0
        _empty_slots.push_back (i);
443
0
    }
444
445
0
    _starting = false;
446
0
    return true;
447
448
0
fail_cleanup_reaper:
449
0
    _reaper->stop ();
450
0
    delete _reaper;
451
0
    _reaper = NULL;
452
453
0
fail_cleanup_slots:
454
0
    _slots.clear ();
455
0
    return false;
456
0
}
457
458
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
459
0
{
460
0
    scoped_lock_t locker (_slot_sync);
461
462
    //  Once zmq_ctx_term() or zmq_ctx_shutdown() was called, we can't create
463
    //  new sockets.
464
0
    if (_terminating) {
465
0
        errno = ETERM;
466
0
        return NULL;
467
0
    }
468
469
0
    if (unlikely (_starting)) {
470
0
        if (!start ())
471
0
            return NULL;
472
0
    }
473
474
    //  If max_sockets limit was reached, return error.
475
0
    if (_empty_slots.empty ()) {
476
0
        errno = EMFILE;
477
0
        return NULL;
478
0
    }
479
480
    //  Choose a slot for the socket.
481
0
    const uint32_t slot = _empty_slots.back ();
482
0
    _empty_slots.pop_back ();
483
484
    //  Generate new unique socket ID.
485
0
    const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
486
487
    //  Create the socket and register its mailbox.
488
0
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
489
0
    if (!s) {
490
0
        _empty_slots.push_back (slot);
491
0
        return NULL;
492
0
    }
493
0
    _sockets.push_back (s);
494
0
    _slots[slot] = s->get_mailbox ();
495
496
0
    return s;
497
0
}
498
499
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
500
0
{
501
0
    scoped_lock_t locker (_slot_sync);
502
503
    //  Free the associated thread slot.
504
0
    const uint32_t tid = socket_->get_tid ();
505
0
    _empty_slots.push_back (tid);
506
0
    _slots[tid] = NULL;
507
508
    //  Remove the socket from the list of sockets.
509
0
    _sockets.erase (socket_);
510
511
    //  If zmq_ctx_term() was already called and there are no more socket
512
    //  we can ask reaper thread to terminate.
513
0
    if (_terminating && _sockets.empty ())
514
0
        _reaper->stop ();
515
0
}
516
517
zmq::object_t *zmq::ctx_t::get_reaper () const
518
0
{
519
0
    return _reaper;
520
0
}
521
522
zmq::thread_ctx_t::thread_ctx_t () :
523
0
    _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
524
0
    _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
525
0
{
526
0
}
527
528
void zmq::thread_ctx_t::start_thread (thread_t &thread_,
529
                                      thread_fn *tfn_,
530
                                      void *arg_,
531
                                      const char *name_) const
532
0
{
533
0
    thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
534
0
                                     _thread_affinity_cpus);
535
536
0
    char namebuf[16] = "";
537
0
    snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
538
0
              _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
539
0
              _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
540
0
              name_ ? name_ : "");
541
0
    thread_.start (tfn_, arg_, namebuf);
542
0
}
543
544
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
545
0
{
546
0
    const bool is_int = (optvallen_ == sizeof (int));
547
0
    int value = 0;
548
0
    if (is_int)
549
0
        memcpy (&value, optval_, sizeof (int));
550
551
0
    switch (option_) {
552
0
        case ZMQ_THREAD_SCHED_POLICY:
553
0
            if (is_int && value >= 0) {
554
0
                scoped_lock_t locker (_opt_sync);
555
0
                _thread_sched_policy = value;
556
0
                return 0;
557
0
            }
558
0
            break;
559
560
0
        case ZMQ_THREAD_AFFINITY_CPU_ADD:
561
0
            if (is_int && value >= 0) {
562
0
                scoped_lock_t locker (_opt_sync);
563
0
                _thread_affinity_cpus.insert (value);
564
0
                return 0;
565
0
            }
566
0
            break;
567
568
0
        case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
569
0
            if (is_int && value >= 0) {
570
0
                scoped_lock_t locker (_opt_sync);
571
0
                if (0 == _thread_affinity_cpus.erase (value)) {
572
0
                    errno = EINVAL;
573
0
                    return -1;
574
0
                }
575
0
                return 0;
576
0
            }
577
0
            break;
578
579
0
        case ZMQ_THREAD_PRIORITY:
580
0
            if (is_int && value >= 0) {
581
0
                scoped_lock_t locker (_opt_sync);
582
0
                _thread_priority = value;
583
0
                return 0;
584
0
            }
585
0
            break;
586
587
0
        case ZMQ_THREAD_NAME_PREFIX:
588
            // start_thread() allows max 16 chars for thread name
589
0
            if (is_int) {
590
0
                std::ostringstream s;
591
0
                s << value;
592
0
                scoped_lock_t locker (_opt_sync);
593
0
                _thread_name_prefix = s.str ();
594
0
                return 0;
595
0
            } else if (optvallen_ > 0 && optvallen_ <= 16) {
596
0
                scoped_lock_t locker (_opt_sync);
597
0
                _thread_name_prefix.assign (static_cast<const char *> (optval_),
598
0
                                            optvallen_);
599
0
                return 0;
600
0
            }
601
0
            break;
602
0
    }
603
604
0
    errno = EINVAL;
605
0
    return -1;
606
0
}
607
608
int zmq::thread_ctx_t::get (int option_,
609
                            void *optval_,
610
                            const size_t *optvallen_)
611
0
{
612
0
    const bool is_int = (*optvallen_ == sizeof (int));
613
0
    int *value = static_cast<int *> (optval_);
614
615
0
    switch (option_) {
616
0
        case ZMQ_THREAD_SCHED_POLICY:
617
0
            if (is_int) {
618
0
                scoped_lock_t locker (_opt_sync);
619
0
                *value = _thread_sched_policy;
620
0
                return 0;
621
0
            }
622
0
            break;
623
624
0
        case ZMQ_THREAD_NAME_PREFIX:
625
0
            if (is_int) {
626
0
                scoped_lock_t locker (_opt_sync);
627
0
                *value = atoi (_thread_name_prefix.c_str ());
628
0
                return 0;
629
0
            } else if (*optvallen_ >= _thread_name_prefix.size ()) {
630
0
                scoped_lock_t locker (_opt_sync);
631
0
                memcpy (optval_, _thread_name_prefix.data (),
632
0
                        _thread_name_prefix.size ());
633
0
                return 0;
634
0
            }
635
0
            break;
636
0
    }
637
638
0
    errno = EINVAL;
639
0
    return -1;
640
0
}
641
642
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
643
0
{
644
0
    _slots[tid_]->send (command_);
645
0
}
646
647
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
648
0
{
649
0
    if (_io_threads.empty ())
650
0
        return NULL;
651
652
    //  Find the I/O thread with minimum load.
653
0
    int min_load = -1;
654
0
    io_thread_t *selected_io_thread = NULL;
655
0
    for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
656
0
         i++) {
657
0
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
658
0
            const int load = _io_threads[i]->get_load ();
659
0
            if (selected_io_thread == NULL || load < min_load) {
660
0
                min_load = load;
661
0
                selected_io_thread = _io_threads[i];
662
0
            }
663
0
        }
664
0
    }
665
0
    return selected_io_thread;
666
0
}
667
668
int zmq::ctx_t::register_endpoint (const char *addr_,
669
                                   const endpoint_t &endpoint_)
670
0
{
671
0
    scoped_lock_t locker (_endpoints_sync);
672
673
0
    const bool inserted =
674
0
      _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
675
0
        .second;
676
0
    if (!inserted) {
677
0
        errno = EADDRINUSE;
678
0
        return -1;
679
0
    }
680
0
    return 0;
681
0
}
682
683
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
684
                                     const socket_base_t *const socket_)
685
0
{
686
0
    scoped_lock_t locker (_endpoints_sync);
687
688
0
    const endpoints_t::iterator it = _endpoints.find (addr_);
689
0
    if (it == _endpoints.end () || it->second.socket != socket_) {
690
0
        errno = ENOENT;
691
0
        return -1;
692
0
    }
693
694
    //  Remove endpoint.
695
0
    _endpoints.erase (it);
696
697
0
    return 0;
698
0
}
699
700
void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
701
0
{
702
0
    scoped_lock_t locker (_endpoints_sync);
703
704
0
    for (endpoints_t::iterator it = _endpoints.begin (),
705
0
                               end = _endpoints.end ();
706
0
         it != end;) {
707
0
        if (it->second.socket == socket_)
708
0
#if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
709
0
            it = _endpoints.erase (it);
710
#else
711
            _endpoints.erase (it++);
712
#endif
713
0
        else
714
0
            ++it;
715
0
    }
716
0
}
717
718
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
719
0
{
720
0
    scoped_lock_t locker (_endpoints_sync);
721
722
0
    endpoints_t::iterator it = _endpoints.find (addr_);
723
0
    if (it == _endpoints.end ()) {
724
0
        errno = ECONNREFUSED;
725
0
        endpoint_t empty = {NULL, options_t ()};
726
0
        return empty;
727
0
    }
728
0
    endpoint_t endpoint = it->second;
729
730
    //  Increment the command sequence number of the peer so that it won't
731
    //  get deallocated until "bind" command is issued by the caller.
732
    //  The subsequent 'bind' has to be called with inc_seqnum parameter
733
    //  set to false, so that the seqnum isn't incremented twice.
734
0
    endpoint.socket->inc_seqnum ();
735
736
0
    return endpoint;
737
0
}
738
739
void zmq::ctx_t::pend_connection (const std::string &addr_,
740
                                  const endpoint_t &endpoint_,
741
                                  pipe_t **pipes_)
742
0
{
743
0
    scoped_lock_t locker (_endpoints_sync);
744
745
0
    const pending_connection_t pending_connection = {endpoint_, pipes_[0],
746
0
                                                     pipes_[1]};
747
748
0
    const endpoints_t::iterator it = _endpoints.find (addr_);
749
0
    if (it == _endpoints.end ()) {
750
        //  Still no bind.
751
0
        endpoint_.socket->inc_seqnum ();
752
0
        _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
753
0
                                                        pending_connection);
754
0
    } else {
755
        //  Bind has happened in the mean time, connect directly
756
0
        connect_inproc_sockets (it->second.socket, it->second.options,
757
0
                                pending_connection, connect_side);
758
0
    }
759
0
}
760
761
void zmq::ctx_t::connect_pending (const char *addr_,
762
                                  zmq::socket_base_t *bind_socket_)
763
0
{
764
0
    scoped_lock_t locker (_endpoints_sync);
765
766
0
    const std::pair<pending_connections_t::iterator,
767
0
                    pending_connections_t::iterator>
768
0
      pending = _pending_connections.equal_range (addr_);
769
0
    for (pending_connections_t::iterator p = pending.first; p != pending.second;
770
0
         ++p)
771
0
        connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
772
0
                                p->second, bind_side);
773
774
0
    _pending_connections.erase (pending.first, pending.second);
775
0
}
776
777
void zmq::ctx_t::connect_inproc_sockets (
778
  zmq::socket_base_t *bind_socket_,
779
  const options_t &bind_options_,
780
  const pending_connection_t &pending_connection_,
781
  side side_)
782
0
{
783
0
    bind_socket_->inc_seqnum ();
784
0
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
785
786
0
    if (!bind_options_.recv_routing_id) {
787
0
        msg_t msg;
788
0
        const bool ok = pending_connection_.bind_pipe->read (&msg);
789
0
        zmq_assert (ok);
790
0
        const int rc = msg.close ();
791
0
        errno_assert (rc == 0);
792
0
    }
793
794
0
    if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
795
0
        pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
796
0
                                                          bind_options_.rcvhwm);
797
0
        pending_connection_.bind_pipe->set_hwms_boost (
798
0
          pending_connection_.endpoint.options.sndhwm,
799
0
          pending_connection_.endpoint.options.rcvhwm);
800
801
0
        pending_connection_.connect_pipe->set_hwms (
802
0
          pending_connection_.endpoint.options.rcvhwm,
803
0
          pending_connection_.endpoint.options.sndhwm);
804
0
        pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
805
0
                                                 bind_options_.sndhwm);
806
0
    } else {
807
0
        pending_connection_.connect_pipe->set_hwms (-1, -1);
808
0
        pending_connection_.bind_pipe->set_hwms (-1, -1);
809
0
    }
810
811
0
#ifdef ZMQ_BUILD_DRAFT_API
812
0
    if (bind_options_.can_recv_disconnect_msg
813
0
        && !bind_options_.disconnect_msg.empty ())
814
0
        pending_connection_.connect_pipe->set_disconnect_msg (
815
0
          bind_options_.disconnect_msg);
816
0
#endif
817
818
0
    if (side_ == bind_side) {
819
0
        command_t cmd;
820
0
        cmd.type = command_t::bind;
821
0
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
822
0
        bind_socket_->process_command (cmd);
823
0
        bind_socket_->send_inproc_connected (
824
0
          pending_connection_.endpoint.socket);
825
0
    } else
826
0
        pending_connection_.connect_pipe->send_bind (
827
0
          bind_socket_, pending_connection_.bind_pipe, false);
828
829
    // When a ctx is terminated all pending inproc connection will be
830
    // connected, but the socket will already be closed and the pipe will be
831
    // in waiting_for_delimiter state, which means no more writes can be done
832
    // and the routing id write fails and causes an assert. Check if the socket
833
    // is open before sending.
834
0
    if (pending_connection_.endpoint.options.recv_routing_id
835
0
        && pending_connection_.endpoint.socket->check_tag ()) {
836
0
        send_routing_id (pending_connection_.bind_pipe, bind_options_);
837
0
    }
838
839
0
#ifdef ZMQ_BUILD_DRAFT_API
840
    //  If set, send the hello msg of the bind socket to the pending connection.
841
0
    if (bind_options_.can_send_hello_msg
842
0
        && bind_options_.hello_msg.size () > 0) {
843
0
        send_hello_msg (pending_connection_.bind_pipe, bind_options_);
844
0
    }
845
0
#endif
846
0
}
847
848
#ifdef ZMQ_HAVE_VMCI
849
850
int zmq::ctx_t::get_vmci_socket_family ()
851
{
852
    zmq::scoped_lock_t locker (_vmci_sync);
853
854
    if (_vmci_fd == -1) {
855
        _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
856
857
        if (_vmci_fd != -1) {
858
#ifdef FD_CLOEXEC
859
            int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
860
            errno_assert (rc != -1);
861
#endif
862
        }
863
    }
864
865
    return _vmci_family;
866
}
867
868
#endif
869
870
//  The last used socket ID, or 0 if no socket was used so far. Note that this
871
//  is a global variable. Thus, even sockets created in different contexts have
872
//  unique IDs.
873
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;