Coverage Report

Created: 2026-06-10 06:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/ctx.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#ifndef ZMQ_HAVE_WINDOWS
6
#include <unistd.h>
7
#endif
8
9
#include <limits>
10
#include <climits>
11
#include <new>
12
#include <sstream>
13
#include <string.h>
14
15
#include "ctx.hpp"
16
#include "socket_base.hpp"
17
#include "io_thread.hpp"
18
#include "reaper.hpp"
19
#include "pipe.hpp"
20
#include "err.hpp"
21
#include "msg.hpp"
22
#include "random.hpp"
23
24
#ifdef ZMQ_HAVE_VMCI
25
#include <vmci_sockets.h>
26
#endif
27
28
#ifdef ZMQ_HAVE_VSOCK
29
#include <sys/socket.h>
30
#endif
31
32
#ifdef ZMQ_USE_NSS
33
#include <nss.h>
34
#endif
35
36
#ifdef ZMQ_USE_GNUTLS
37
#include <gnutls/gnutls.h>
38
#endif
39
40
0
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
41
0
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
42
43
static int clipped_maxsocket (int max_requested_)
44
0
{
45
0
    if (max_requested_ >= zmq::poller_t::max_fds ()
46
0
        && zmq::poller_t::max_fds () != -1)
47
        // -1 because we need room for the reaper mailbox.
48
0
        max_requested_ = zmq::poller_t::max_fds () - 1;
49
50
0
    return max_requested_;
51
0
}
52
53
zmq::ctx_t::ctx_t () :
54
0
    _tag (ZMQ_CTX_TAG_VALUE_GOOD),
55
0
    _starting (true),
56
0
    _terminating (false),
57
0
    _reaper (NULL),
58
0
    _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
59
0
    _max_msgsz (INT_MAX),
60
0
    _io_thread_count (ZMQ_IO_THREADS_DFLT),
61
0
    _blocky (true),
62
0
    _ipv6 (false),
63
0
    _zero_copy (true)
64
0
{
65
0
#ifdef HAVE_FORK
66
0
    _pid = getpid ();
67
0
#endif
68
#ifdef ZMQ_HAVE_VMCI
69
    _vmci_fd = -1;
70
    _vmci_family = -1;
71
#endif
72
73
    //  Initialise crypto library, if needed.
74
0
    zmq::random_open ();
75
76
#ifdef ZMQ_USE_NSS
77
    NSS_NoDB_Init (NULL);
78
#endif
79
80
#ifdef ZMQ_USE_GNUTLS
81
    gnutls_global_init ();
82
#endif
83
0
}
84
85
bool zmq::ctx_t::check_tag () const
86
0
{
87
0
    return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
88
0
}
89
90
zmq::ctx_t::~ctx_t ()
91
0
{
92
    //  Check that there are no remaining _sockets.
93
0
    zmq_assert (_sockets.empty ());
94
95
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
96
    //  thread subsequent invocation of destructor would hang-up.
97
0
    const io_threads_t::size_type io_threads_size = _io_threads.size ();
98
0
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
99
0
        _io_threads[i]->stop ();
100
0
    }
101
102
    //  Wait till I/O threads actually terminate.
103
0
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
104
0
        LIBZMQ_DELETE (_io_threads[i]);
105
0
    }
106
107
    //  Deallocate the reaper thread object.
108
0
    LIBZMQ_DELETE (_reaper);
109
110
    //  The mailboxes in _slots themselves were deallocated with their
111
    //  corresponding io_thread/socket objects.
112
113
    //  De-initialise crypto library, if needed.
114
0
    zmq::random_close ();
115
116
#ifdef ZMQ_USE_NSS
117
    NSS_Shutdown ();
118
#endif
119
120
#ifdef ZMQ_USE_GNUTLS
121
    gnutls_global_deinit ();
122
#endif
123
124
    //  Remove the tag, so that the object is considered dead.
125
0
    _tag = ZMQ_CTX_TAG_VALUE_BAD;
126
0
}
127
128
bool zmq::ctx_t::valid () const
129
0
{
130
0
    return _term_mailbox.valid ();
131
0
}
132
133
int zmq::ctx_t::terminate ()
134
0
{
135
0
    _slot_sync.lock ();
136
137
0
    const bool save_terminating = _terminating;
138
0
    _terminating = false;
139
140
    // Connect up any pending inproc connections, otherwise we will hang
141
0
    pending_connections_t copy = _pending_connections;
142
0
    for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
143
0
         p != end; ++p) {
144
0
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
145
        // create_socket might fail eg: out of memory/sockets limit reached
146
0
        zmq_assert (s);
147
0
        s->bind (p->first.c_str ());
148
0
        s->close ();
149
0
    }
150
0
    _terminating = save_terminating;
151
152
0
    if (!_starting) {
153
0
#ifdef HAVE_FORK
154
0
        if (_pid != getpid ()) {
155
            // we are a forked child process. Close all file descriptors
156
            // inherited from the parent.
157
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
158
0
                 i++) {
159
0
                _sockets[i]->get_mailbox ()->forked ();
160
0
            }
161
0
            _term_mailbox.forked ();
162
0
        }
163
0
#endif
164
165
        //  Check whether termination was already underway, but interrupted and now
166
        //  restarted.
167
0
        const bool restarted = _terminating;
168
0
        _terminating = true;
169
170
        //  First attempt to terminate the context.
171
0
        if (!restarted) {
172
            //  First send stop command to sockets so that any blocking calls
173
            //  can be interrupted. If there are no sockets we can ask reaper
174
            //  thread to stop.
175
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
176
0
                 i++) {
177
0
                _sockets[i]->stop ();
178
0
            }
179
0
            if (_sockets.empty ())
180
0
                _reaper->stop ();
181
0
        }
182
0
        _slot_sync.unlock ();
183
184
        //  Wait till reaper thread closes all the sockets.
185
0
        command_t cmd;
186
0
        const int rc = _term_mailbox.recv (&cmd, -1);
187
0
        if (rc == -1 && errno == EINTR)
188
0
            return -1;
189
0
        errno_assert (rc == 0);
190
0
        zmq_assert (cmd.type == command_t::done);
191
0
        _slot_sync.lock ();
192
0
        zmq_assert (_sockets.empty ());
193
0
    }
194
0
    _slot_sync.unlock ();
195
196
#ifdef ZMQ_HAVE_VMCI
197
    _vmci_sync.lock ();
198
199
    VMCISock_ReleaseAFValueFd (_vmci_fd);
200
    _vmci_family = -1;
201
    _vmci_fd = -1;
202
203
    _vmci_sync.unlock ();
204
#endif
205
206
    //  Deallocate the resources.
207
0
    delete this;
208
209
0
    return 0;
210
0
}
211
212
int zmq::ctx_t::shutdown ()
213
0
{
214
0
    scoped_lock_t locker (_slot_sync);
215
216
0
    if (!_terminating) {
217
0
        _terminating = true;
218
219
0
        if (!_starting) {
220
            //  Send stop command to sockets so that any blocking calls
221
            //  can be interrupted. If there are no sockets we can ask reaper
222
            //  thread to stop.
223
0
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
224
0
                 i++) {
225
0
                _sockets[i]->stop ();
226
0
            }
227
0
            if (_sockets.empty ())
228
0
                _reaper->stop ();
229
0
        }
230
0
    }
231
232
0
    return 0;
233
0
}
234
235
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
236
0
{
237
0
    const bool is_int = (optvallen_ == sizeof (int));
238
0
    int value = 0;
239
0
    if (is_int)
240
0
        memcpy (&value, optval_, sizeof (int));
241
242
0
    switch (option_) {
243
0
        case ZMQ_MAX_SOCKETS:
244
0
            if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
245
0
                scoped_lock_t locker (_opt_sync);
246
0
                _max_sockets = value;
247
0
                return 0;
248
0
            }
249
0
            break;
250
251
0
        case ZMQ_IO_THREADS:
252
0
            if (is_int && value >= 0) {
253
0
                scoped_lock_t locker (_opt_sync);
254
0
                _io_thread_count = value;
255
0
                return 0;
256
0
            }
257
0
            break;
258
259
0
        case ZMQ_IPV6:
260
0
            if (is_int && value >= 0) {
261
0
                scoped_lock_t locker (_opt_sync);
262
0
                _ipv6 = (value != 0);
263
0
                return 0;
264
0
            }
265
0
            break;
266
267
0
        case ZMQ_BLOCKY:
268
0
            if (is_int && value >= 0) {
269
0
                scoped_lock_t locker (_opt_sync);
270
0
                _blocky = (value != 0);
271
0
                return 0;
272
0
            }
273
0
            break;
274
275
0
        case ZMQ_MAX_MSGSZ:
276
0
            if (is_int && value >= 0) {
277
0
                scoped_lock_t locker (_opt_sync);
278
0
                _max_msgsz = value < INT_MAX ? value : INT_MAX;
279
0
                return 0;
280
0
            }
281
0
            break;
282
283
0
        case ZMQ_ZERO_COPY_RECV:
284
0
            if (is_int && value >= 0) {
285
0
                scoped_lock_t locker (_opt_sync);
286
0
                _zero_copy = (value != 0);
287
0
                return 0;
288
0
            }
289
0
            break;
290
291
0
        default: {
292
0
            return thread_ctx_t::set (option_, optval_, optvallen_);
293
0
        }
294
0
    }
295
296
0
    errno = EINVAL;
297
0
    return -1;
298
0
}
299
300
int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
301
0
{
302
0
    const bool is_int = (*optvallen_ == sizeof (int));
303
0
    int *value = static_cast<int *> (optval_);
304
305
0
    switch (option_) {
306
0
        case ZMQ_MAX_SOCKETS:
307
0
            if (is_int) {
308
0
                scoped_lock_t locker (_opt_sync);
309
0
                *value = _max_sockets;
310
0
                return 0;
311
0
            }
312
0
            break;
313
314
0
        case ZMQ_SOCKET_LIMIT:
315
0
            if (is_int) {
316
0
                *value = clipped_maxsocket (65535);
317
0
                return 0;
318
0
            }
319
0
            break;
320
321
0
        case ZMQ_IO_THREADS:
322
0
            if (is_int) {
323
0
                scoped_lock_t locker (_opt_sync);
324
0
                *value = _io_thread_count;
325
0
                return 0;
326
0
            }
327
0
            break;
328
329
0
        case ZMQ_IPV6:
330
0
            if (is_int) {
331
0
                scoped_lock_t locker (_opt_sync);
332
0
                *value = _ipv6;
333
0
                return 0;
334
0
            }
335
0
            break;
336
337
0
        case ZMQ_BLOCKY:
338
0
            if (is_int) {
339
0
                scoped_lock_t locker (_opt_sync);
340
0
                *value = _blocky;
341
0
                return 0;
342
0
            }
343
0
            break;
344
345
0
        case ZMQ_MAX_MSGSZ:
346
0
            if (is_int) {
347
0
                scoped_lock_t locker (_opt_sync);
348
0
                *value = _max_msgsz;
349
0
                return 0;
350
0
            }
351
0
            break;
352
353
0
        case ZMQ_MSG_T_SIZE:
354
0
            if (is_int) {
355
0
                scoped_lock_t locker (_opt_sync);
356
0
                *value = sizeof (zmq_msg_t);
357
0
                return 0;
358
0
            }
359
0
            break;
360
361
0
        case ZMQ_ZERO_COPY_RECV:
362
0
            if (is_int) {
363
0
                scoped_lock_t locker (_opt_sync);
364
0
                *value = _zero_copy;
365
0
                return 0;
366
0
            }
367
0
            break;
368
369
0
        default: {
370
0
            return thread_ctx_t::get (option_, optval_, optvallen_);
371
0
        }
372
0
    }
373
374
0
    errno = EINVAL;
375
0
    return -1;
376
0
}
377
378
int zmq::ctx_t::get (int option_)
379
0
{
380
0
    int optval = 0;
381
0
    size_t optvallen = sizeof (int);
382
383
0
    if (get (option_, &optval, &optvallen) == 0)
384
0
        return optval;
385
386
0
    errno = EINVAL;
387
0
    return -1;
388
0
}
389
390
bool zmq::ctx_t::start ()
391
0
{
392
    //  Initialise the array of mailboxes. Additional two slots are for
393
    //  zmq_ctx_term thread and reaper thread.
394
0
    _opt_sync.lock ();
395
0
    const int term_and_reaper_threads_count = 2;
396
0
    const int mazmq = _max_sockets;
397
0
    const int ios = _io_thread_count;
398
0
    _opt_sync.unlock ();
399
0
    const int slot_count = mazmq + ios + term_and_reaper_threads_count;
400
0
    try {
401
0
        _slots.reserve (slot_count);
402
0
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
403
0
    }
404
0
    catch (const std::bad_alloc &) {
405
0
        errno = ENOMEM;
406
0
        return false;
407
0
    }
408
0
    _slots.resize (term_and_reaper_threads_count);
409
410
    //  Initialise the infrastructure for zmq_ctx_term thread.
411
0
    _slots[term_tid] = &_term_mailbox;
412
413
    //  Create the reaper thread.
414
0
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
415
0
    if (!_reaper) {
416
0
        errno = ENOMEM;
417
0
        goto fail_cleanup_slots;
418
0
    }
419
0
    if (!_reaper->get_mailbox ()->valid ())
420
0
        goto fail_cleanup_reaper;
421
0
    _slots[reaper_tid] = _reaper->get_mailbox ();
422
0
    _reaper->start ();
423
424
    //  Create I/O thread objects and launch them.
425
0
    _slots.resize (slot_count, NULL);
426
427
0
    for (int i = term_and_reaper_threads_count;
428
0
         i != ios + term_and_reaper_threads_count; i++) {
429
0
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
430
0
        if (!io_thread) {
431
0
            errno = ENOMEM;
432
0
            goto fail_cleanup_reaper;
433
0
        }
434
0
        if (!io_thread->get_mailbox ()->valid ()) {
435
0
            delete io_thread;
436
0
            goto fail_cleanup_reaper;
437
0
        }
438
0
        _io_threads.push_back (io_thread);
439
0
        _slots[i] = io_thread->get_mailbox ();
440
0
        io_thread->start ();
441
0
    }
442
443
    //  In the unused part of the slot array, create a list of empty slots.
444
0
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
445
0
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
446
0
        _empty_slots.push_back (i);
447
0
    }
448
449
0
    _starting = false;
450
0
    return true;
451
452
0
fail_cleanup_reaper:
453
0
    _reaper->stop ();
454
0
    delete _reaper;
455
0
    _reaper = NULL;
456
457
0
fail_cleanup_slots:
458
0
    _slots.clear ();
459
0
    return false;
460
0
}
461
462
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
463
0
{
464
0
    scoped_lock_t locker (_slot_sync);
465
466
    //  Once zmq_ctx_term() or zmq_ctx_shutdown() was called, we can't create
467
    //  new sockets.
468
0
    if (_terminating) {
469
0
        errno = ETERM;
470
0
        return NULL;
471
0
    }
472
473
0
    if (unlikely (_starting)) {
474
0
        if (!start ())
475
0
            return NULL;
476
0
    }
477
478
    //  If max_sockets limit was reached, return error.
479
0
    if (_empty_slots.empty ()) {
480
0
        errno = EMFILE;
481
0
        return NULL;
482
0
    }
483
484
    //  Choose a slot for the socket.
485
0
    const uint32_t slot = _empty_slots.back ();
486
0
    _empty_slots.pop_back ();
487
488
    //  Generate new unique socket ID.
489
0
    const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
490
491
    //  Create the socket and register its mailbox.
492
0
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
493
0
    if (!s) {
494
0
        _empty_slots.push_back (slot);
495
0
        return NULL;
496
0
    }
497
0
    _sockets.push_back (s);
498
0
    _slots[slot] = s->get_mailbox ();
499
500
0
    return s;
501
0
}
502
503
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
504
0
{
505
0
    scoped_lock_t locker (_slot_sync);
506
507
    //  Free the associated thread slot.
508
0
    const uint32_t tid = socket_->get_tid ();
509
0
    _empty_slots.push_back (tid);
510
0
    _slots[tid] = NULL;
511
512
    //  Remove the socket from the list of sockets.
513
0
    _sockets.erase (socket_);
514
515
    //  If zmq_ctx_term() was already called and there are no more socket
516
    //  we can ask reaper thread to terminate.
517
0
    if (_terminating && _sockets.empty ())
518
0
        _reaper->stop ();
519
0
}
520
521
zmq::object_t *zmq::ctx_t::get_reaper () const
522
0
{
523
0
    return _reaper;
524
0
}
525
526
zmq::thread_ctx_t::thread_ctx_t () :
527
0
    _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
528
0
    _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
529
0
{
530
0
}
531
532
void zmq::thread_ctx_t::start_thread (thread_t &thread_,
533
                                      thread_fn *tfn_,
534
                                      void *arg_,
535
                                      const char *name_) const
536
0
{
537
0
    thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
538
0
                                     _thread_affinity_cpus);
539
540
0
    char namebuf[16] = "";
541
0
    snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
542
0
              _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
543
0
              _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
544
0
              name_ ? name_ : "");
545
0
    thread_.start (tfn_, arg_, namebuf);
546
0
}
547
548
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
549
0
{
550
0
    const bool is_int = (optvallen_ == sizeof (int));
551
0
    int value = 0;
552
0
    if (is_int)
553
0
        memcpy (&value, optval_, sizeof (int));
554
555
0
    switch (option_) {
556
0
        case ZMQ_THREAD_SCHED_POLICY:
557
0
            if (is_int && value >= 0) {
558
0
                scoped_lock_t locker (_opt_sync);
559
0
                _thread_sched_policy = value;
560
0
                return 0;
561
0
            }
562
0
            break;
563
564
0
        case ZMQ_THREAD_AFFINITY_CPU_ADD:
565
0
            if (is_int && value >= 0) {
566
0
                scoped_lock_t locker (_opt_sync);
567
0
                _thread_affinity_cpus.insert (value);
568
0
                return 0;
569
0
            }
570
0
            break;
571
572
0
        case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
573
0
            if (is_int && value >= 0) {
574
0
                scoped_lock_t locker (_opt_sync);
575
0
                if (0 == _thread_affinity_cpus.erase (value)) {
576
0
                    errno = EINVAL;
577
0
                    return -1;
578
0
                }
579
0
                return 0;
580
0
            }
581
0
            break;
582
583
0
        case ZMQ_THREAD_PRIORITY:
584
0
            if (is_int && value >= 0) {
585
0
                scoped_lock_t locker (_opt_sync);
586
0
                _thread_priority = value;
587
0
                return 0;
588
0
            }
589
0
            break;
590
591
0
        case ZMQ_THREAD_NAME_PREFIX:
592
            // start_thread() allows max 16 chars for thread name
593
0
            if (is_int) {
594
0
                std::ostringstream s;
595
0
                s << value;
596
0
                scoped_lock_t locker (_opt_sync);
597
0
                _thread_name_prefix = s.str ();
598
0
                return 0;
599
0
            } else if (optvallen_ > 0 && optvallen_ <= 16) {
600
0
                scoped_lock_t locker (_opt_sync);
601
0
                _thread_name_prefix.assign (static_cast<const char *> (optval_),
602
0
                                            optvallen_);
603
0
                return 0;
604
0
            }
605
0
            break;
606
0
    }
607
608
0
    errno = EINVAL;
609
0
    return -1;
610
0
}
611
612
int zmq::thread_ctx_t::get (int option_,
613
                            void *optval_,
614
                            const size_t *optvallen_)
615
0
{
616
0
    const bool is_int = (*optvallen_ == sizeof (int));
617
0
    int *value = static_cast<int *> (optval_);
618
619
0
    switch (option_) {
620
0
        case ZMQ_THREAD_SCHED_POLICY:
621
0
            if (is_int) {
622
0
                scoped_lock_t locker (_opt_sync);
623
0
                *value = _thread_sched_policy;
624
0
                return 0;
625
0
            }
626
0
            break;
627
628
0
        case ZMQ_THREAD_NAME_PREFIX:
629
0
            if (is_int) {
630
0
                scoped_lock_t locker (_opt_sync);
631
0
                *value = atoi (_thread_name_prefix.c_str ());
632
0
                return 0;
633
0
            } else if (*optvallen_ >= _thread_name_prefix.size ()) {
634
0
                scoped_lock_t locker (_opt_sync);
635
0
                memcpy (optval_, _thread_name_prefix.data (),
636
0
                        _thread_name_prefix.size ());
637
0
                return 0;
638
0
            }
639
0
            break;
640
0
    }
641
642
0
    errno = EINVAL;
643
0
    return -1;
644
0
}
645
646
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
647
0
{
648
0
    _slots[tid_]->send (command_);
649
0
}
650
651
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
652
0
{
653
0
    if (_io_threads.empty ())
654
0
        return NULL;
655
656
    //  Find the I/O thread with minimum load.
657
0
    int min_load = -1;
658
0
    io_thread_t *selected_io_thread = NULL;
659
0
    for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
660
0
         i++) {
661
0
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
662
0
            const int load = _io_threads[i]->get_load ();
663
0
            if (selected_io_thread == NULL || load < min_load) {
664
0
                min_load = load;
665
0
                selected_io_thread = _io_threads[i];
666
0
            }
667
0
        }
668
0
    }
669
0
    return selected_io_thread;
670
0
}
671
672
int zmq::ctx_t::register_endpoint (const char *addr_,
673
                                   const endpoint_t &endpoint_)
674
0
{
675
0
    scoped_lock_t locker (_endpoints_sync);
676
677
0
    const bool inserted =
678
0
      _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
679
0
        .second;
680
0
    if (!inserted) {
681
0
        errno = EADDRINUSE;
682
0
        return -1;
683
0
    }
684
0
    return 0;
685
0
}
686
687
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
688
                                     const socket_base_t *const socket_)
689
0
{
690
0
    scoped_lock_t locker (_endpoints_sync);
691
692
0
    const endpoints_t::iterator it = _endpoints.find (addr_);
693
0
    if (it == _endpoints.end () || it->second.socket != socket_) {
694
0
        errno = ENOENT;
695
0
        return -1;
696
0
    }
697
698
    //  Remove endpoint.
699
0
    _endpoints.erase (it);
700
701
0
    return 0;
702
0
}
703
704
void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
705
0
{
706
0
    scoped_lock_t locker (_endpoints_sync);
707
708
0
    for (endpoints_t::iterator it = _endpoints.begin (),
709
0
                               end = _endpoints.end ();
710
0
         it != end;) {
711
0
        if (it->second.socket == socket_)
712
0
#if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
713
0
            it = _endpoints.erase (it);
714
#else
715
            _endpoints.erase (it++);
716
#endif
717
0
        else
718
0
            ++it;
719
0
    }
720
0
}
721
722
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
723
0
{
724
0
    scoped_lock_t locker (_endpoints_sync);
725
726
0
    endpoints_t::iterator it = _endpoints.find (addr_);
727
0
    if (it == _endpoints.end ()) {
728
0
        errno = ECONNREFUSED;
729
0
        endpoint_t empty = {NULL, options_t ()};
730
0
        return empty;
731
0
    }
732
0
    endpoint_t endpoint = it->second;
733
734
    //  Increment the command sequence number of the peer so that it won't
735
    //  get deallocated until "bind" command is issued by the caller.
736
    //  The subsequent 'bind' has to be called with inc_seqnum parameter
737
    //  set to false, so that the seqnum isn't incremented twice.
738
0
    endpoint.socket->inc_seqnum ();
739
740
0
    return endpoint;
741
0
}
742
743
void zmq::ctx_t::pend_connection (const std::string &addr_,
744
                                  const endpoint_t &endpoint_,
745
                                  pipe_t **pipes_)
746
0
{
747
0
    scoped_lock_t locker (_endpoints_sync);
748
749
0
    const pending_connection_t pending_connection = {endpoint_, pipes_[0],
750
0
                                                     pipes_[1]};
751
752
0
    const endpoints_t::iterator it = _endpoints.find (addr_);
753
0
    if (it == _endpoints.end ()) {
754
        //  Still no bind.
755
0
        endpoint_.socket->inc_seqnum ();
756
0
        _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
757
0
                                                        pending_connection);
758
0
    } else {
759
        //  Bind has happened in the mean time, connect directly
760
0
        connect_inproc_sockets (it->second.socket, it->second.options,
761
0
                                pending_connection, connect_side);
762
0
    }
763
0
}
764
765
void zmq::ctx_t::connect_pending (const char *addr_,
766
                                  zmq::socket_base_t *bind_socket_)
767
0
{
768
0
    scoped_lock_t locker (_endpoints_sync);
769
770
0
    const std::pair<pending_connections_t::iterator,
771
0
                    pending_connections_t::iterator>
772
0
      pending = _pending_connections.equal_range (addr_);
773
0
    for (pending_connections_t::iterator p = pending.first; p != pending.second;
774
0
         ++p)
775
0
        connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
776
0
                                p->second, bind_side);
777
778
0
    _pending_connections.erase (pending.first, pending.second);
779
0
}
780
781
void zmq::ctx_t::connect_inproc_sockets (
782
  zmq::socket_base_t *bind_socket_,
783
  const options_t &bind_options_,
784
  const pending_connection_t &pending_connection_,
785
  side side_)
786
0
{
787
0
    bind_socket_->inc_seqnum ();
788
0
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
789
790
0
    if (!bind_options_.recv_routing_id) {
791
0
        msg_t msg;
792
0
        const bool ok = pending_connection_.bind_pipe->read (&msg);
793
0
        zmq_assert (ok);
794
0
        const int rc = msg.close ();
795
0
        errno_assert (rc == 0);
796
0
    }
797
798
0
    if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
799
0
        pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
800
0
                                                          bind_options_.rcvhwm);
801
0
        pending_connection_.bind_pipe->set_hwms_boost (
802
0
          pending_connection_.endpoint.options.sndhwm,
803
0
          pending_connection_.endpoint.options.rcvhwm);
804
805
0
        pending_connection_.connect_pipe->set_hwms (
806
0
          pending_connection_.endpoint.options.rcvhwm,
807
0
          pending_connection_.endpoint.options.sndhwm);
808
0
        pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
809
0
                                                 bind_options_.sndhwm);
810
0
    } else {
811
0
        pending_connection_.connect_pipe->set_hwms (-1, -1);
812
0
        pending_connection_.bind_pipe->set_hwms (-1, -1);
813
0
    }
814
815
0
#ifdef ZMQ_BUILD_DRAFT_API
816
0
    if (bind_options_.can_recv_disconnect_msg
817
0
        && !bind_options_.disconnect_msg.empty ())
818
0
        pending_connection_.connect_pipe->set_disconnect_msg (
819
0
          bind_options_.disconnect_msg);
820
0
#endif
821
822
0
    if (side_ == bind_side) {
823
0
        command_t cmd;
824
0
        cmd.type = command_t::bind;
825
0
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
826
0
        bind_socket_->process_command (cmd);
827
0
        bind_socket_->send_inproc_connected (
828
0
          pending_connection_.endpoint.socket);
829
0
    } else
830
0
        pending_connection_.connect_pipe->send_bind (
831
0
          bind_socket_, pending_connection_.bind_pipe, false);
832
833
    // When a ctx is terminated all pending inproc connection will be
834
    // connected, but the socket will already be closed and the pipe will be
835
    // in waiting_for_delimiter state, which means no more writes can be done
836
    // and the routing id write fails and causes an assert. Check if the socket
837
    // is open before sending.
838
0
    if (pending_connection_.endpoint.options.recv_routing_id
839
0
        && pending_connection_.endpoint.socket->check_tag ()) {
840
0
        send_routing_id (pending_connection_.bind_pipe, bind_options_);
841
0
    }
842
843
0
#ifdef ZMQ_BUILD_DRAFT_API
844
    //  If set, send the hello msg of the bind socket to the pending connection.
845
0
    if (bind_options_.can_send_hello_msg
846
0
        && bind_options_.hello_msg.size () > 0) {
847
0
        send_hello_msg (pending_connection_.bind_pipe, bind_options_);
848
0
    }
849
0
#endif
850
0
}
851
852
#ifdef ZMQ_HAVE_VMCI
853
854
int zmq::ctx_t::get_vmci_socket_family ()
855
{
856
    zmq::scoped_lock_t locker (_vmci_sync);
857
858
    if (_vmci_fd == -1) {
859
        _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
860
861
        if (_vmci_fd != -1) {
862
#ifdef FD_CLOEXEC
863
            int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
864
            errno_assert (rc != -1);
865
#endif
866
        }
867
    }
868
869
    return _vmci_family;
870
}
871
872
#endif
873
874
#ifdef ZMQ_HAVE_VSOCK
875
876
int zmq::ctx_t::get_vsock_socket_family ()
877
0
{
878
    return AF_VSOCK;
879
0
}
880
881
#endif
882
883
//  The last used socket ID, or 0 if no socket was used so far. Note that this
884
//  is a global variable. Thus, even sockets created in different contexts have
885
//  unique IDs.
886
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;