Coverage Report

Created: 2026-01-25 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/zmq.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
// "Tell them I was a writer.
4
//  A maker of software.
5
//  A humanist. A father.
6
//  And many things.
7
//  But above all, a writer.
8
//  Thank You. :)"
9
//  - Pieter Hintjens
10
11
#include "precompiled.hpp"
12
#define ZMQ_TYPE_UNSAFE
13
14
#include "macros.hpp"
15
#include "poller.hpp"
16
#include "peer.hpp"
17
18
#if !defined ZMQ_HAVE_POLLER
19
//  On AIX platform, poll.h has to be included first to get consistent
20
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
21
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
22
//  names to AIX-specific names).
23
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
24
#include <poll.h>
25
#endif
26
27
#include "polling_util.hpp"
28
#endif
29
30
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
31
// zmq.h must be included *after* poll.h for AIX to build properly
32
//#include "../include/zmq.h"
33
34
#if !defined ZMQ_HAVE_WINDOWS
35
#include <unistd.h>
36
#ifdef ZMQ_HAVE_VXWORKS
37
#include <strings.h>
38
#endif
39
#endif
40
41
// XSI vector I/O
42
#if defined ZMQ_HAVE_UIO
43
#include <sys/uio.h>
44
#else
45
struct iovec
46
{
47
    void *iov_base;
48
    size_t iov_len;
49
};
50
#endif
51
52
#include <string.h>
53
#include <stdlib.h>
54
#include <new>
55
#include <climits>
56
57
#include "proxy.hpp"
58
#include "socket_base.hpp"
59
#include "stdint.hpp"
60
#include "config.hpp"
61
#include "likely.hpp"
62
#include "clock.hpp"
63
#include "ctx.hpp"
64
#include "err.hpp"
65
#include "msg.hpp"
66
#include "fd.hpp"
67
#include "metadata.hpp"
68
#include "socket_poller.hpp"
69
#include "timers.hpp"
70
#include "ip.hpp"
71
#include "address.hpp"
72
73
#ifdef ZMQ_HAVE_PPOLL
74
#include "polling_util.hpp"
75
#include <sys/select.h>
76
#endif
77
78
#if defined ZMQ_HAVE_OPENPGM
79
#define __PGM_WININT_H__
80
#include <pgm/pgm.h>
81
#endif
82
83
//  Compile time check whether msg_t fits into zmq_msg_t.
84
typedef char
85
  check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
86
87
88
void zmq_version (int *major_, int *minor_, int *patch_)
89
0
{
90
0
    *major_ = ZMQ_VERSION_MAJOR;
91
0
    *minor_ = ZMQ_VERSION_MINOR;
92
0
    *patch_ = ZMQ_VERSION_PATCH;
93
0
}
94
95
96
const char *zmq_strerror (int errnum_)
97
0
{
98
0
    return zmq::errno_to_string (errnum_);
99
0
}
100
101
int zmq_errno (void)
102
0
{
103
0
    return errno;
104
0
}
105
106
107
//  New context API
108
109
void *zmq_ctx_new (void)
110
0
{
111
    //  We do this before the ctx constructor since its embedded mailbox_t
112
    //  object needs the network to be up and running (at least on Windows).
113
0
    if (!zmq::initialize_network ()) {
114
0
        return NULL;
115
0
    }
116
117
    //  Create 0MQ context.
118
0
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
119
0
    if (ctx) {
120
0
        if (!ctx->valid ()) {
121
0
            delete ctx;
122
0
            return NULL;
123
0
        }
124
0
    }
125
0
    return ctx;
126
0
}
127
128
int zmq_ctx_term (void *ctx_)
129
0
{
130
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
131
0
        errno = EFAULT;
132
0
        return -1;
133
0
    }
134
135
0
    const int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
136
0
    const int en = errno;
137
138
    //  Shut down only if termination was not interrupted by a signal.
139
0
    if (!rc || en != EINTR) {
140
0
        zmq::shutdown_network ();
141
0
    }
142
143
0
    errno = en;
144
0
    return rc;
145
0
}
146
147
int zmq_ctx_shutdown (void *ctx_)
148
0
{
149
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
150
0
        errno = EFAULT;
151
0
        return -1;
152
0
    }
153
0
    return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
154
0
}
155
156
int zmq_ctx_set (void *ctx_, int option_, int optval_)
157
0
{
158
0
    return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int));
159
0
}
160
161
int zmq_ctx_set_ext (void *ctx_,
162
                     int option_,
163
                     const void *optval_,
164
                     size_t optvallen_)
165
0
{
166
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
167
0
        errno = EFAULT;
168
0
        return -1;
169
0
    }
170
0
    return (static_cast<zmq::ctx_t *> (ctx_))
171
0
      ->set (option_, optval_, optvallen_);
172
0
}
173
174
int zmq_ctx_get (void *ctx_, int option_)
175
0
{
176
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
177
0
        errno = EFAULT;
178
0
        return -1;
179
0
    }
180
0
    return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
181
0
}
182
183
int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_)
184
0
{
185
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
186
0
        errno = EFAULT;
187
0
        return -1;
188
0
    }
189
0
    return (static_cast<zmq::ctx_t *> (ctx_))
190
0
      ->get (option_, optval_, optvallen_);
191
0
}
192
193
194
//  Stable/legacy context API
195
196
void *zmq_init (int io_threads_)
197
0
{
198
0
    if (io_threads_ >= 0) {
199
0
        void *ctx = zmq_ctx_new ();
200
0
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
201
0
        return ctx;
202
0
    }
203
0
    errno = EINVAL;
204
0
    return NULL;
205
0
}
206
207
int zmq_term (void *ctx_)
208
0
{
209
0
    return zmq_ctx_term (ctx_);
210
0
}
211
212
int zmq_ctx_destroy (void *ctx_)
213
0
{
214
0
    return zmq_ctx_term (ctx_);
215
0
}
216
217
218
// Sockets
219
220
static zmq::socket_base_t *as_socket_base_t (void *s_)
221
0
{
222
0
    zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
223
0
    if (!s_ || !s->check_tag ()) {
224
0
        errno = ENOTSOCK;
225
0
        return NULL;
226
0
    }
227
0
    return s;
228
0
}
229
230
void *zmq_socket (void *ctx_, int type_)
231
0
{
232
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
233
0
        errno = EFAULT;
234
0
        return NULL;
235
0
    }
236
0
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
237
0
    zmq::socket_base_t *s = ctx->create_socket (type_);
238
0
    return static_cast<void *> (s);
239
0
}
240
241
int zmq_close (void *s_)
242
0
{
243
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
244
0
    if (!s)
245
0
        return -1;
246
0
    s->close ();
247
0
    return 0;
248
0
}
249
250
int zmq_setsockopt (void *s_,
251
                    int option_,
252
                    const void *optval_,
253
                    size_t optvallen_)
254
0
{
255
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
256
0
    if (!s)
257
0
        return -1;
258
0
    return s->setsockopt (option_, optval_, optvallen_);
259
0
}
260
261
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
262
0
{
263
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
264
0
    if (!s)
265
0
        return -1;
266
0
    return s->getsockopt (option_, optval_, optvallen_);
267
0
}
268
269
int zmq_socket_monitor_versioned (
270
  void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
271
0
{
272
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
273
0
    if (!s)
274
0
        return -1;
275
0
    return s->monitor (addr_, events_, event_version_, type_);
276
0
}
277
278
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
279
0
{
280
0
    return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR);
281
0
}
282
283
int zmq_join (void *s_, const char *group_)
284
0
{
285
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
286
0
    if (!s)
287
0
        return -1;
288
0
    return s->join (group_);
289
0
}
290
291
int zmq_leave (void *s_, const char *group_)
292
0
{
293
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
294
0
    if (!s)
295
0
        return -1;
296
0
    return s->leave (group_);
297
0
}
298
299
int zmq_bind (void *s_, const char *addr_)
300
0
{
301
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
302
0
    if (!s)
303
0
        return -1;
304
0
    return s->bind (addr_);
305
0
}
306
307
int zmq_connect (void *s_, const char *addr_)
308
0
{
309
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
310
0
    if (!s)
311
0
        return -1;
312
0
    return s->connect (addr_);
313
0
}
314
315
uint32_t zmq_connect_peer (void *s_, const char *addr_)
316
0
{
317
0
    zmq::peer_t *s = static_cast<zmq::peer_t *> (s_);
318
0
    if (!s_ || !s->check_tag ()) {
319
0
        errno = ENOTSOCK;
320
0
        return 0;
321
0
    }
322
323
0
    int socket_type;
324
0
    size_t socket_type_size = sizeof (socket_type);
325
0
    if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0)
326
0
        return 0;
327
328
0
    if (socket_type != ZMQ_PEER) {
329
0
        errno = ENOTSUP;
330
0
        return 0;
331
0
    }
332
333
0
    return s->connect_peer (addr_);
334
0
}
335
336
int zmq_disconnect_peer (void *s_, uint32_t routing_id_)
337
0
{
338
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
339
0
    if (!s)
340
0
        return -1;
341
0
    return s->disconnect_peer (routing_id_);
342
0
}
343
344
345
int zmq_unbind (void *s_, const char *addr_)
346
0
{
347
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
348
0
    if (!s)
349
0
        return -1;
350
0
    return s->term_endpoint (addr_);
351
0
}
352
353
int zmq_disconnect (void *s_, const char *addr_)
354
0
{
355
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
356
0
    if (!s)
357
0
        return -1;
358
0
    return s->term_endpoint (addr_);
359
0
}
360
361
// Sending functions.
362
363
static inline int
364
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
365
0
{
366
0
    size_t sz = zmq_msg_size (msg_);
367
0
    const int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
368
0
    if (unlikely (rc < 0))
369
0
        return -1;
370
371
    //  This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
372
    //  int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
373
0
    size_t max_msgsz = INT_MAX;
374
375
    //  Truncate returned size to INT_MAX to avoid overflow to negative values
376
0
    return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
377
0
}
378
379
/*  To be deprecated once zmq_msg_send() is stable                           */
380
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
381
0
{
382
0
    return zmq_msg_send (msg_, s_, flags_);
383
0
}
384
385
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
386
0
{
387
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
388
0
    if (!s)
389
0
        return -1;
390
0
    zmq_msg_t msg;
391
0
    int rc = zmq_msg_init_buffer (&msg, buf_, len_);
392
0
    if (unlikely (rc < 0))
393
0
        return -1;
394
395
0
    rc = s_sendmsg (s, &msg, flags_);
396
0
    if (unlikely (rc < 0)) {
397
0
        const int err = errno;
398
0
        const int rc2 = zmq_msg_close (&msg);
399
0
        errno_assert (rc2 == 0);
400
0
        errno = err;
401
0
        return -1;
402
0
    }
403
    //  Note the optimisation here. We don't close the msg object as it is
404
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
405
0
    return rc;
406
0
}
407
408
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
409
0
{
410
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
411
0
    if (!s)
412
0
        return -1;
413
0
    zmq_msg_t msg;
414
0
    int rc =
415
0
      zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
416
0
    if (rc != 0)
417
0
        return -1;
418
419
0
    rc = s_sendmsg (s, &msg, flags_);
420
0
    if (unlikely (rc < 0)) {
421
0
        const int err = errno;
422
0
        const int rc2 = zmq_msg_close (&msg);
423
0
        errno_assert (rc2 == 0);
424
0
        errno = err;
425
0
        return -1;
426
0
    }
427
    //  Note the optimisation here. We don't close the msg object as it is
428
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
429
0
    return rc;
430
0
}
431
432
433
// Send multiple messages.
434
// TODO: this function has no man page
435
//
436
// If flag bit ZMQ_SNDMORE is set the vector is treated as
437
// a single multi-part message, i.e. the last message has
438
// ZMQ_SNDMORE bit switched off.
439
//
440
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
441
0
{
442
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
443
0
    if (!s)
444
0
        return -1;
445
0
    if (unlikely (count_ <= 0 || !a_)) {
446
0
        errno = EINVAL;
447
0
        return -1;
448
0
    }
449
450
0
    int rc = 0;
451
0
    zmq_msg_t msg;
452
453
0
    for (size_t i = 0; i < count_; ++i) {
454
0
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
455
0
        if (rc != 0) {
456
0
            rc = -1;
457
0
            break;
458
0
        }
459
0
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
460
0
        if (i == count_ - 1)
461
0
            flags_ = flags_ & ~ZMQ_SNDMORE;
462
0
        rc = s_sendmsg (s, &msg, flags_);
463
0
        if (unlikely (rc < 0)) {
464
0
            const int err = errno;
465
0
            const int rc2 = zmq_msg_close (&msg);
466
0
            errno_assert (rc2 == 0);
467
0
            errno = err;
468
0
            rc = -1;
469
0
            break;
470
0
        }
471
0
    }
472
0
    return rc;
473
0
}
474
475
// Receiving functions.
476
477
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
478
0
{
479
0
    const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
480
0
    if (unlikely (rc < 0))
481
0
        return -1;
482
483
    //  Truncate returned size to INT_MAX to avoid overflow to negative values
484
0
    const size_t sz = zmq_msg_size (msg_);
485
0
    return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
486
0
}
487
488
/*  To be deprecated once zmq_msg_recv() is stable                           */
489
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
490
0
{
491
0
    return zmq_msg_recv (msg_, s_, flags_);
492
0
}
493
494
495
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
496
0
{
497
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
498
0
    if (!s)
499
0
        return -1;
500
0
    zmq_msg_t msg;
501
0
    int rc = zmq_msg_init (&msg);
502
0
    errno_assert (rc == 0);
503
504
0
    const int nbytes = s_recvmsg (s, &msg, flags_);
505
0
    if (unlikely (nbytes < 0)) {
506
0
        const int err = errno;
507
0
        rc = zmq_msg_close (&msg);
508
0
        errno_assert (rc == 0);
509
0
        errno = err;
510
0
        return -1;
511
0
    }
512
513
    //  An oversized message is silently truncated.
514
0
    const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
515
516
    //  We explicitly allow a null buffer argument if len is zero
517
0
    if (to_copy) {
518
0
        assert (buf_);
519
0
        memcpy (buf_, zmq_msg_data (&msg), to_copy);
520
0
    }
521
0
    rc = zmq_msg_close (&msg);
522
0
    errno_assert (rc == 0);
523
524
0
    return nbytes;
525
0
}
526
527
// Receive a multi-part message
528
//
529
// Receives up to *count_ parts of a multi-part message.
530
// Sets *count_ to the actual number of parts read.
531
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
532
// Returns number of message parts read, or -1 on error.
533
//
534
// Note: even if -1 is returned, some parts of the message
535
// may have been read. Therefore the client must consult
536
// *count_ to retrieve message parts successfully read,
537
// even if -1 is returned.
538
//
539
// The iov_base* buffers of each iovec *a_ filled in by this
540
// function may be freed using free().
541
// TODO: this function has no man page
542
//
543
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
544
0
{
545
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
546
0
    if (!s)
547
0
        return -1;
548
0
    if (unlikely (!count_ || *count_ <= 0 || !a_)) {
549
0
        errno = EINVAL;
550
0
        return -1;
551
0
    }
552
553
0
    const size_t count = *count_;
554
0
    int nread = 0;
555
0
    bool recvmore = true;
556
557
0
    *count_ = 0;
558
559
0
    for (size_t i = 0; recvmore && i < count; ++i) {
560
0
        zmq_msg_t msg;
561
0
        int rc = zmq_msg_init (&msg);
562
0
        errno_assert (rc == 0);
563
564
0
        const int nbytes = s_recvmsg (s, &msg, flags_);
565
0
        if (unlikely (nbytes < 0)) {
566
0
            const int err = errno;
567
0
            rc = zmq_msg_close (&msg);
568
0
            errno_assert (rc == 0);
569
0
            errno = err;
570
0
            nread = -1;
571
0
            break;
572
0
        }
573
574
0
        a_[i].iov_len = zmq_msg_size (&msg);
575
0
        a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
576
0
        if (unlikely (!a_[i].iov_base)) {
577
0
            errno = ENOMEM;
578
0
            return -1;
579
0
        }
580
0
        memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
581
0
                a_[i].iov_len);
582
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
583
0
        const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
584
0
        recvmore = p_msg->flags () & zmq::msg_t::more;
585
0
        rc = zmq_msg_close (&msg);
586
0
        errno_assert (rc == 0);
587
0
        ++*count_;
588
0
        ++nread;
589
0
    }
590
0
    return nread;
591
0
}
592
593
// Message manipulators.
594
595
int zmq_msg_init (zmq_msg_t *msg_)
596
0
{
597
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
598
0
}
599
600
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
601
0
{
602
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
603
0
}
604
605
int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_)
606
0
{
607
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_buffer (buf_, size_);
608
0
}
609
610
int zmq_msg_init_data (
611
  zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
612
0
{
613
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))
614
0
      ->init_data (data_, size_, ffn_, hint_);
615
0
}
616
617
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
618
0
{
619
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
620
0
    if (!s)
621
0
        return -1;
622
0
    return s_sendmsg (s, msg_, flags_);
623
0
}
624
625
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
626
0
{
627
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
628
0
    if (!s)
629
0
        return -1;
630
0
    return s_recvmsg (s, msg_, flags_);
631
0
}
632
633
int zmq_msg_close (zmq_msg_t *msg_)
634
0
{
635
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
636
0
}
637
638
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
639
0
{
640
0
    return (reinterpret_cast<zmq::msg_t *> (dest_))
641
0
      ->move (*reinterpret_cast<zmq::msg_t *> (src_));
642
0
}
643
644
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
645
0
{
646
0
    return (reinterpret_cast<zmq::msg_t *> (dest_))
647
0
      ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
648
0
}
649
650
void *zmq_msg_data (zmq_msg_t *msg_)
651
0
{
652
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
653
0
}
654
655
size_t zmq_msg_size (const zmq_msg_t *msg_)
656
0
{
657
0
    return ((zmq::msg_t *) msg_)->size ();
658
0
}
659
660
int zmq_msg_more (const zmq_msg_t *msg_)
661
0
{
662
0
    return zmq_msg_get (msg_, ZMQ_MORE);
663
0
}
664
665
int zmq_msg_get (const zmq_msg_t *msg_, int property_)
666
0
{
667
0
    const char *fd_string;
668
669
0
    switch (property_) {
670
0
        case ZMQ_MORE:
671
0
            return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
672
0
        case ZMQ_SRCFD:
673
0
            fd_string = zmq_msg_gets (msg_, "__fd");
674
0
            if (fd_string == NULL)
675
0
                return -1;
676
677
0
            return atoi (fd_string);
678
0
        case ZMQ_SHARED:
679
0
            return (((zmq::msg_t *) msg_)->is_cmsg ())
680
0
                       || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
681
0
                     ? 1
682
0
                     : 0;
683
0
        default:
684
0
            errno = EINVAL;
685
0
            return -1;
686
0
    }
687
0
}
688
689
int zmq_msg_set (zmq_msg_t *, int, int)
690
0
{
691
    //  No properties supported at present
692
0
    errno = EINVAL;
693
0
    return -1;
694
0
}
695
696
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
697
0
{
698
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))
699
0
      ->set_routing_id (routing_id_);
700
0
}
701
702
uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
703
0
{
704
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
705
0
}
706
707
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
708
0
{
709
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
710
0
}
711
712
const char *zmq_msg_group (zmq_msg_t *msg_)
713
0
{
714
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
715
0
}
716
717
//  Get message metadata string
718
719
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
720
0
{
721
0
    const zmq::metadata_t *metadata =
722
0
      reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
723
0
    const char *value = NULL;
724
0
    if (metadata)
725
0
        value = metadata->get (std::string (property_));
726
0
    if (value)
727
0
        return value;
728
729
0
    errno = EINVAL;
730
0
    return NULL;
731
0
}
732
733
// Polling.
734
735
#if defined ZMQ_HAVE_POLLER
736
static int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
737
0
{
738
    // implement zmq_poll on top of zmq_poller
739
0
    int rc;
740
0
    zmq_poller_event_t *events;
741
0
    zmq::socket_poller_t poller;
742
0
    events = new (std::nothrow) zmq_poller_event_t[nitems_];
743
0
    alloc_assert (events);
744
745
0
    bool repeat_items = false;
746
    //  Register sockets with poller
747
0
    for (int i = 0; i < nitems_; i++) {
748
0
        items_[i].revents = 0;
749
750
0
        bool modify = false;
751
0
        short e = items_[i].events;
752
0
        if (items_[i].socket) {
753
            //  Poll item is a 0MQ socket.
754
0
            for (int j = 0; j < i; ++j) {
755
                // Check for repeat entries
756
0
                if (items_[j].socket == items_[i].socket) {
757
0
                    repeat_items = true;
758
0
                    modify = true;
759
0
                    e |= items_[j].events;
760
0
                }
761
0
            }
762
0
            if (modify) {
763
0
                rc = zmq_poller_modify (&poller, items_[i].socket, e);
764
0
            } else {
765
0
                rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
766
0
            }
767
0
            if (rc < 0) {
768
0
                delete[] events;
769
0
                return rc;
770
0
            }
771
0
        } else {
772
            //  Poll item is a raw file descriptor.
773
0
            for (int j = 0; j < i; ++j) {
774
                // Check for repeat entries
775
0
                if (!items_[j].socket && items_[j].fd == items_[i].fd) {
776
0
                    repeat_items = true;
777
0
                    modify = true;
778
0
                    e |= items_[j].events;
779
0
                }
780
0
            }
781
0
            if (modify) {
782
0
                rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
783
0
            } else {
784
0
                rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
785
0
            }
786
0
            if (rc < 0) {
787
0
                delete[] events;
788
0
                return rc;
789
0
            }
790
0
        }
791
0
    }
792
793
    //  Wait for events
794
0
    rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
795
0
    if (rc < 0) {
796
0
        delete[] events;
797
0
        if (zmq_errno () == EAGAIN) {
798
0
            return 0;
799
0
        }
800
0
        return rc;
801
0
    }
802
803
    //  Transform poller events into zmq_pollitem events.
804
    //  items_ contains all items, while events only contains fired events.
805
    //  If no sockets are repeated (likely), the two are still co-ordered, so step through the items
806
    //  checking for matches only on the first event.
807
    //  If there are repeat items, they cannot be assumed to be co-ordered,
808
    //  so each pollitem must check fired events from the beginning.
809
0
    int j_start = 0, found_events = rc;
810
0
    for (int i = 0; i < nitems_; i++) {
811
0
        for (int j = j_start; j < found_events; ++j) {
812
0
            if ((items_[i].socket && items_[i].socket == events[j].socket)
813
0
                || (!(items_[i].socket || events[j].socket)
814
0
                    && items_[i].fd == events[j].fd)) {
815
0
                items_[i].revents = events[j].events & items_[i].events;
816
0
                if (!repeat_items) {
817
                    // no repeats, we can ignore events we've already seen
818
0
                    j_start++;
819
0
                }
820
0
                break;
821
0
            }
822
0
            if (!repeat_items) {
823
                // no repeats, never have to look at j > j_start
824
0
                break;
825
0
            }
826
0
        }
827
0
    }
828
829
    //  Cleanup
830
0
    delete[] events;
831
0
    return rc;
832
0
}
833
#endif // ZMQ_HAVE_POLLER
834
835
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
836
0
{
837
0
#if defined ZMQ_HAVE_POLLER
838
    // if poller is present, use that if there is at least 1 thread-safe socket,
839
    // otherwise fall back to the previous implementation as it's faster.
840
0
    for (int i = 0; i != nitems_; i++) {
841
0
        if (items_[i].socket) {
842
0
            zmq::socket_base_t *s = as_socket_base_t (items_[i].socket);
843
0
            if (s) {
844
0
                if (s->is_thread_safe ())
845
0
                    return zmq_poller_poll (items_, nitems_, timeout_);
846
0
            } else {
847
                //as_socket_base_t returned NULL : socket is invalid
848
0
                return -1;
849
0
            }
850
0
        }
851
0
    }
852
0
#endif // ZMQ_HAVE_POLLER
853
0
#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
854
0
    if (unlikely (nitems_ < 0)) {
855
0
        errno = EINVAL;
856
0
        return -1;
857
0
    }
858
0
    if (unlikely (nitems_ == 0)) {
859
0
        if (timeout_ == 0)
860
0
            return 0;
861
#if defined ZMQ_HAVE_WINDOWS
862
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
863
        return 0;
864
#elif defined ZMQ_HAVE_VXWORKS
865
        struct timespec ns_;
866
        ns_.tv_sec = timeout_ / 1000;
867
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
868
        return nanosleep (&ns_, 0);
869
#else
870
0
        return usleep (timeout_ * 1000);
871
0
#endif
872
0
    }
873
0
    if (!items_) {
874
0
        errno = EFAULT;
875
0
        return -1;
876
0
    }
877
878
0
    zmq::clock_t clock;
879
0
    uint64_t now = 0;
880
0
    uint64_t end = 0;
881
0
#if defined ZMQ_POLL_BASED_ON_POLL
882
0
    zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
883
884
    //  Build pollset for poll () system call.
885
0
    for (int i = 0; i != nitems_; i++) {
886
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
887
        //  retrieved by the ZMQ_FD socket option.
888
0
        if (items_[i].socket) {
889
0
            size_t zmq_fd_size = sizeof (zmq::fd_t);
890
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
891
0
                                &zmq_fd_size)
892
0
                == -1) {
893
0
                return -1;
894
0
            }
895
0
            pollfds[i].events = items_[i].events ? POLLIN : 0;
896
0
        }
897
        //  Else, the poll item is a raw file descriptor. Just convert the
898
        //  events to normal POLLIN/POLLOUT for poll ().
899
0
        else {
900
0
            pollfds[i].fd = items_[i].fd;
901
0
            pollfds[i].events =
902
0
              (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
903
0
              | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
904
0
              | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
905
0
        }
906
0
    }
907
#else
908
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
909
    //  file descriptors.
910
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
911
    zmq_assert (nitems_ <= FD_SETSIZE);
912
913
    zmq::optimized_fd_set_t pollset_in (nitems_);
914
    FD_ZERO (pollset_in.get ());
915
    zmq::optimized_fd_set_t pollset_out (nitems_);
916
    FD_ZERO (pollset_out.get ());
917
    zmq::optimized_fd_set_t pollset_err (nitems_);
918
    FD_ZERO (pollset_err.get ());
919
920
    zmq::fd_t maxfd = 0;
921
922
    //  Build the fd_sets for passing to select ().
923
    for (int i = 0; i != nitems_; i++) {
924
        //  If the poll item is a 0MQ socket we are interested in input on the
925
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
926
        if (items_[i].socket) {
927
            size_t zmq_fd_size = sizeof (zmq::fd_t);
928
            zmq::fd_t notify_fd;
929
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
930
                                &zmq_fd_size)
931
                == -1)
932
                return -1;
933
            if (items_[i].events) {
934
                FD_SET (notify_fd, pollset_in.get ());
935
                if (maxfd < notify_fd)
936
                    maxfd = notify_fd;
937
            }
938
        }
939
        //  Else, the poll item is a raw file descriptor. Convert the poll item
940
        //  events to the appropriate fd_sets.
941
        else {
942
            if (items_[i].events & ZMQ_POLLIN)
943
                FD_SET (items_[i].fd, pollset_in.get ());
944
            if (items_[i].events & ZMQ_POLLOUT)
945
                FD_SET (items_[i].fd, pollset_out.get ());
946
            if (items_[i].events & ZMQ_POLLERR)
947
                FD_SET (items_[i].fd, pollset_err.get ());
948
            if (maxfd < items_[i].fd)
949
                maxfd = items_[i].fd;
950
        }
951
    }
952
953
    zmq::optimized_fd_set_t inset (nitems_);
954
    zmq::optimized_fd_set_t outset (nitems_);
955
    zmq::optimized_fd_set_t errset (nitems_);
956
#endif
957
958
0
    bool first_pass = true;
959
0
    int nevents = 0;
960
961
0
    while (true) {
962
0
#if defined ZMQ_POLL_BASED_ON_POLL
963
964
        //  Compute the timeout for the subsequent poll.
965
0
        const zmq::timeout_t timeout =
966
0
          zmq::compute_timeout (first_pass, timeout_, now, end);
967
968
        //  Wait for events.
969
0
        {
970
0
            const int rc = poll (&pollfds[0], nitems_, timeout);
971
0
            if (rc == -1 && errno == EINTR) {
972
0
                return -1;
973
0
            }
974
0
            errno_assert (rc >= 0);
975
0
        }
976
        //  Check for the events.
977
0
        for (int i = 0; i != nitems_; i++) {
978
0
            items_[i].revents = 0;
979
980
            //  The poll item is a 0MQ socket. Retrieve pending events
981
            //  using the ZMQ_EVENTS socket option.
982
0
            if (items_[i].socket) {
983
0
                size_t zmq_events_size = sizeof (uint32_t);
984
0
                uint32_t zmq_events;
985
0
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
986
0
                                    &zmq_events_size)
987
0
                    == -1) {
988
0
                    return -1;
989
0
                }
990
0
                if ((items_[i].events & ZMQ_POLLOUT)
991
0
                    && (zmq_events & ZMQ_POLLOUT))
992
0
                    items_[i].revents |= ZMQ_POLLOUT;
993
0
                if ((items_[i].events & ZMQ_POLLIN)
994
0
                    && (zmq_events & ZMQ_POLLIN))
995
0
                    items_[i].revents |= ZMQ_POLLIN;
996
0
            }
997
            //  Else, the poll item is a raw file descriptor, simply convert
998
            //  the events to zmq_pollitem_t-style format.
999
0
            else {
1000
0
                if (pollfds[i].revents & POLLIN)
1001
0
                    items_[i].revents |= ZMQ_POLLIN;
1002
0
                if (pollfds[i].revents & POLLOUT)
1003
0
                    items_[i].revents |= ZMQ_POLLOUT;
1004
0
                if (pollfds[i].revents & POLLPRI)
1005
0
                    items_[i].revents |= ZMQ_POLLPRI;
1006
0
                if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
1007
0
                    items_[i].revents |= ZMQ_POLLERR;
1008
0
            }
1009
1010
0
            if (items_[i].revents)
1011
0
                nevents++;
1012
0
        }
1013
1014
#else
1015
1016
        //  Compute the timeout for the subsequent poll.
1017
        timeval timeout;
1018
        timeval *ptimeout;
1019
        if (first_pass) {
1020
            timeout.tv_sec = 0;
1021
            timeout.tv_usec = 0;
1022
            ptimeout = &timeout;
1023
        } else if (timeout_ < 0)
1024
            ptimeout = NULL;
1025
        else {
1026
            timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1027
            timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1028
            ptimeout = &timeout;
1029
        }
1030
1031
        //  Wait for events. Ignore interrupts if there's infinite timeout.
1032
        while (true) {
1033
            memcpy (inset.get (), pollset_in.get (),
1034
                    zmq::valid_pollset_bytes (*pollset_in.get ()));
1035
            memcpy (outset.get (), pollset_out.get (),
1036
                    zmq::valid_pollset_bytes (*pollset_out.get ()));
1037
            memcpy (errset.get (), pollset_err.get (),
1038
                    zmq::valid_pollset_bytes (*pollset_err.get ()));
1039
#if defined ZMQ_HAVE_WINDOWS
1040
            int rc =
1041
              select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1042
            if (unlikely (rc == SOCKET_ERROR)) {
1043
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1044
                wsa_assert (errno == ENOTSOCK);
1045
                return -1;
1046
            }
1047
#else
1048
            int rc = select (maxfd + 1, inset.get (), outset.get (),
1049
                             errset.get (), ptimeout);
1050
            if (unlikely (rc == -1)) {
1051
                errno_assert (errno == EINTR || errno == EBADF);
1052
                return -1;
1053
            }
1054
#endif
1055
            break;
1056
        }
1057
1058
        //  Check for the events.
1059
        for (int i = 0; i != nitems_; i++) {
1060
            items_[i].revents = 0;
1061
1062
            //  The poll item is a 0MQ socket. Retrieve pending events
1063
            //  using the ZMQ_EVENTS socket option.
1064
            if (items_[i].socket) {
1065
                size_t zmq_events_size = sizeof (uint32_t);
1066
                uint32_t zmq_events;
1067
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1068
                                    &zmq_events_size)
1069
                    == -1)
1070
                    return -1;
1071
                if ((items_[i].events & ZMQ_POLLOUT)
1072
                    && (zmq_events & ZMQ_POLLOUT))
1073
                    items_[i].revents |= ZMQ_POLLOUT;
1074
                if ((items_[i].events & ZMQ_POLLIN)
1075
                    && (zmq_events & ZMQ_POLLIN))
1076
                    items_[i].revents |= ZMQ_POLLIN;
1077
            }
1078
            //  Else, the poll item is a raw file descriptor, simply convert
1079
            //  the events to zmq_pollitem_t-style format.
1080
            else {
1081
                if (FD_ISSET (items_[i].fd, inset.get ()))
1082
                    items_[i].revents |= ZMQ_POLLIN;
1083
                if (FD_ISSET (items_[i].fd, outset.get ()))
1084
                    items_[i].revents |= ZMQ_POLLOUT;
1085
                if (FD_ISSET (items_[i].fd, errset.get ()))
1086
                    items_[i].revents |= ZMQ_POLLERR;
1087
            }
1088
1089
            if (items_[i].revents)
1090
                nevents++;
1091
        }
1092
#endif
1093
1094
        //  If timeout is zero, exit immediately whether there are events or not.
1095
0
        if (timeout_ == 0)
1096
0
            break;
1097
1098
        //  If there are events to return, we can exit immediately.
1099
0
        if (nevents)
1100
0
            break;
1101
1102
        //  At this point we are meant to wait for events but there are none.
1103
        //  If timeout is infinite we can just loop until we get some events.
1104
0
        if (timeout_ < 0) {
1105
0
            if (first_pass)
1106
0
                first_pass = false;
1107
0
            continue;
1108
0
        }
1109
1110
        //  The timeout is finite and there are no events. In the first pass
1111
        //  we get a timestamp of when the polling have begun. (We assume that
1112
        //  first pass have taken negligible time). We also compute the time
1113
        //  when the polling should time out.
1114
0
        if (first_pass) {
1115
0
            now = clock.now_ms ();
1116
0
            end = now + timeout_;
1117
0
            if (now == end)
1118
0
                break;
1119
0
            first_pass = false;
1120
0
            continue;
1121
0
        }
1122
1123
        //  Find out whether timeout have expired.
1124
0
        now = clock.now_ms ();
1125
0
        if (now >= end)
1126
0
            break;
1127
0
    }
1128
1129
0
    return nevents;
1130
#else
1131
    //  Exotic platforms that support neither poll() nor select().
1132
    errno = ENOTSUP;
1133
    return -1;
1134
#endif
1135
0
}
1136
1137
#ifdef ZMQ_HAVE_PPOLL
1138
// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks
1139
int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_)
1140
0
{
1141
0
    if (unlikely (nitems_ < 0)) {
1142
0
        errno = EINVAL;
1143
0
        return -1;
1144
0
    }
1145
0
    if (unlikely (nitems_ == 0)) {
1146
0
        if (timeout_ == 0)
1147
0
            return 0;
1148
#if defined ZMQ_HAVE_WINDOWS
1149
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
1150
        return 0;
1151
#elif defined ZMQ_HAVE_VXWORKS
1152
        struct timespec ns_;
1153
        ns_.tv_sec = timeout_ / 1000;
1154
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
1155
        return nanosleep (&ns_, 0);
1156
#else
1157
0
        return usleep (timeout_ * 1000);
1158
0
#endif
1159
0
    }
1160
0
    if (!items_) {
1161
0
        errno = EFAULT;
1162
0
        return -1;
1163
0
    }
1164
0
    return 1;
1165
0
}
1166
1167
struct zmq_poll_select_fds_t_
1168
{
1169
    explicit zmq_poll_select_fds_t_ (int nitems_) :
1170
0
        pollset_in (nitems_),
1171
0
        pollset_out (nitems_),
1172
0
        pollset_err (nitems_),
1173
0
        inset (nitems_),
1174
0
        outset (nitems_),
1175
0
        errset (nitems_),
1176
0
        maxfd (0)
1177
0
    {
1178
0
        FD_ZERO (pollset_in.get ());
1179
0
        FD_ZERO (pollset_out.get ());
1180
0
        FD_ZERO (pollset_err.get ());
1181
0
    }
1182
1183
    zmq::optimized_fd_set_t pollset_in;
1184
    zmq::optimized_fd_set_t pollset_out;
1185
    zmq::optimized_fd_set_t pollset_err;
1186
    zmq::optimized_fd_set_t inset;
1187
    zmq::optimized_fd_set_t outset;
1188
    zmq::optimized_fd_set_t errset;
1189
    zmq::fd_t maxfd;
1190
};
1191
1192
zmq_poll_select_fds_t_
1193
zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc)
1194
0
{
1195
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
1196
    //  file descriptors.
1197
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
1198
0
    zmq_assert (nitems_ <= FD_SETSIZE);
1199
1200
0
    zmq_poll_select_fds_t_ fds (nitems_);
1201
1202
    //  Build the fd_sets for passing to select ().
1203
0
    for (int i = 0; i != nitems_; i++) {
1204
        //  If the poll item is a 0MQ socket we are interested in input on the
1205
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
1206
0
        if (items_[i].socket) {
1207
0
            size_t zmq_fd_size = sizeof (zmq::fd_t);
1208
0
            zmq::fd_t notify_fd;
1209
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
1210
0
                                &zmq_fd_size)
1211
0
                == -1) {
1212
0
                rc = -1;
1213
0
                return fds;
1214
0
            }
1215
0
            if (items_[i].events) {
1216
0
                FD_SET (notify_fd, fds.pollset_in.get ());
1217
0
                if (fds.maxfd < notify_fd)
1218
0
                    fds.maxfd = notify_fd;
1219
0
            }
1220
0
        }
1221
        //  Else, the poll item is a raw file descriptor. Convert the poll item
1222
        //  events to the appropriate fd_sets.
1223
0
        else {
1224
0
            if (items_[i].events & ZMQ_POLLIN)
1225
0
                FD_SET (items_[i].fd, fds.pollset_in.get ());
1226
0
            if (items_[i].events & ZMQ_POLLOUT)
1227
0
                FD_SET (items_[i].fd, fds.pollset_out.get ());
1228
0
            if (items_[i].events & ZMQ_POLLERR)
1229
0
                FD_SET (items_[i].fd, fds.pollset_err.get ());
1230
0
            if (fds.maxfd < items_[i].fd)
1231
0
                fds.maxfd = items_[i].fd;
1232
0
        }
1233
0
    }
1234
1235
0
    rc = 0;
1236
0
    return fds;
1237
0
}
1238
1239
timeval *zmq_poll_select_set_timeout_ (
1240
  long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout)
1241
0
{
1242
0
    timeval *ptimeout;
1243
0
    if (first_pass) {
1244
0
        timeout.tv_sec = 0;
1245
0
        timeout.tv_usec = 0;
1246
0
        ptimeout = &timeout;
1247
0
    } else if (timeout_ < 0)
1248
0
        ptimeout = NULL;
1249
0
    else {
1250
0
        timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1251
0
        timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1252
0
        ptimeout = &timeout;
1253
0
    }
1254
0
    return ptimeout;
1255
0
}
1256
1257
timespec *zmq_poll_select_set_timeout_ (
1258
  long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout)
1259
0
{
1260
0
    timespec *ptimeout;
1261
0
    if (first_pass) {
1262
0
        timeout.tv_sec = 0;
1263
0
        timeout.tv_nsec = 0;
1264
0
        ptimeout = &timeout;
1265
0
    } else if (timeout_ < 0)
1266
0
        ptimeout = NULL;
1267
0
    else {
1268
0
        timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1269
0
        timeout.tv_nsec = static_cast<long> ((end - now) % 1000 * 1000000);
1270
0
        ptimeout = &timeout;
1271
0
    }
1272
0
    return ptimeout;
1273
0
}
1274
1275
int zmq_poll_select_check_events_ (zmq_pollitem_t *items_,
1276
                                   int nitems_,
1277
                                   zmq_poll_select_fds_t_ &fds,
1278
                                   int &nevents)
1279
0
{
1280
    //  Check for the events.
1281
0
    for (int i = 0; i != nitems_; i++) {
1282
0
        items_[i].revents = 0;
1283
1284
        //  The poll item is a 0MQ socket. Retrieve pending events
1285
        //  using the ZMQ_EVENTS socket option.
1286
0
        if (items_[i].socket) {
1287
0
            size_t zmq_events_size = sizeof (uint32_t);
1288
0
            uint32_t zmq_events;
1289
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1290
0
                                &zmq_events_size)
1291
0
                == -1)
1292
0
                return -1;
1293
0
            if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
1294
0
                items_[i].revents |= ZMQ_POLLOUT;
1295
0
            if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
1296
0
                items_[i].revents |= ZMQ_POLLIN;
1297
0
        }
1298
        //  Else, the poll item is a raw file descriptor, simply convert
1299
        //  the events to zmq_pollitem_t-style format.
1300
0
        else {
1301
0
            if (FD_ISSET (items_[i].fd, fds.inset.get ()))
1302
0
                items_[i].revents |= ZMQ_POLLIN;
1303
0
            if (FD_ISSET (items_[i].fd, fds.outset.get ()))
1304
0
                items_[i].revents |= ZMQ_POLLOUT;
1305
0
            if (FD_ISSET (items_[i].fd, fds.errset.get ()))
1306
0
                items_[i].revents |= ZMQ_POLLERR;
1307
0
        }
1308
1309
0
        if (items_[i].revents)
1310
0
            nevents++;
1311
0
    }
1312
1313
0
    return 0;
1314
0
}
1315
1316
bool zmq_poll_must_break_loop_ (long timeout_,
1317
                                int nevents,
1318
                                bool &first_pass,
1319
                                zmq::clock_t &clock,
1320
                                uint64_t &now,
1321
                                uint64_t &end)
1322
0
{
1323
    //  If timeout is zero, exit immediately whether there are events or not.
1324
0
    if (timeout_ == 0)
1325
0
        return true;
1326
1327
    //  If there are events to return, we can exit immediately.
1328
0
    if (nevents)
1329
0
        return true;
1330
1331
    //  At this point we are meant to wait for events but there are none.
1332
    //  If timeout is infinite we can just loop until we get some events.
1333
0
    if (timeout_ < 0) {
1334
0
        if (first_pass)
1335
0
            first_pass = false;
1336
0
        return false;
1337
0
    }
1338
1339
    //  The timeout is finite and there are no events. In the first pass
1340
    //  we get a timestamp of when the polling have begun. (We assume that
1341
    //  first pass have taken negligible time). We also compute the time
1342
    //  when the polling should time out.
1343
0
    if (first_pass) {
1344
0
        now = clock.now_ms ();
1345
0
        end = now + timeout_;
1346
0
        if (now == end)
1347
0
            return true;
1348
0
        first_pass = false;
1349
0
        return false;
1350
0
    }
1351
1352
    //  Find out whether timeout have expired.
1353
0
    now = clock.now_ms ();
1354
0
    if (now >= end)
1355
0
        return true;
1356
1357
    // finally, in all other cases, we just continue
1358
0
    return false;
1359
0
}
1360
#endif // ZMQ_HAVE_PPOLL
1361
1362
#if !defined _WIN32
1363
int zmq_ppoll (zmq_pollitem_t *items_,
1364
               int nitems_,
1365
               long timeout_,
1366
               const sigset_t *sigmask_)
1367
#else
1368
// Windows has no sigset_t
1369
int zmq_ppoll (zmq_pollitem_t *items_,
1370
               int nitems_,
1371
               long timeout_,
1372
               const void *sigmask_)
1373
#endif
1374
0
{
1375
0
#ifdef ZMQ_HAVE_PPOLL
1376
0
    int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
1377
0
    if (rc <= 0) {
1378
0
        return rc;
1379
0
    }
1380
1381
0
    zmq::clock_t clock;
1382
0
    uint64_t now = 0;
1383
0
    uint64_t end = 0;
1384
0
    zmq_poll_select_fds_t_ fds =
1385
0
      zmq_poll_build_select_fds_ (items_, nitems_, rc);
1386
0
    if (rc == -1) {
1387
0
        return -1;
1388
0
    }
1389
1390
0
    bool first_pass = true;
1391
0
    int nevents = 0;
1392
1393
0
    while (true) {
1394
        //  Compute the timeout for the subsequent poll.
1395
0
        timespec timeout;
1396
0
        timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
1397
0
                                                           now, end, timeout);
1398
1399
        //  Wait for events. Ignore interrupts if there's infinite timeout.
1400
0
        while (true) {
1401
0
            memcpy (fds.inset.get (), fds.pollset_in.get (),
1402
0
                    zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
1403
0
            memcpy (fds.outset.get (), fds.pollset_out.get (),
1404
0
                    zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
1405
0
            memcpy (fds.errset.get (), fds.pollset_err.get (),
1406
0
                    zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
1407
0
            int rc =
1408
0
              pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
1409
0
                       fds.errset.get (), ptimeout, sigmask_);
1410
0
            if (unlikely (rc == -1)) {
1411
0
                errno_assert (errno == EINTR || errno == EBADF);
1412
0
                return -1;
1413
0
            }
1414
0
            break;
1415
0
        }
1416
1417
0
        rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
1418
0
        if (rc < 0) {
1419
0
            return rc;
1420
0
        }
1421
1422
0
        if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
1423
0
                                       now, end)) {
1424
0
            break;
1425
0
        }
1426
0
    }
1427
1428
0
    return nevents;
1429
#else
1430
    errno = ENOTSUP;
1431
    return -1;
1432
#endif // ZMQ_HAVE_PPOLL
1433
0
}
1434
1435
//  The poller functionality
1436
1437
void *zmq_poller_new (void)
1438
0
{
1439
0
    zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1440
0
    if (!poller) {
1441
0
        errno = ENOMEM;
1442
0
    }
1443
0
    return poller;
1444
0
}
1445
1446
int zmq_poller_destroy (void **poller_p_)
1447
0
{
1448
0
    if (poller_p_) {
1449
0
        const zmq::socket_poller_t *const poller =
1450
0
          static_cast<const zmq::socket_poller_t *> (*poller_p_);
1451
0
        if (poller && poller->check_tag ()) {
1452
0
            delete poller;
1453
0
            *poller_p_ = NULL;
1454
0
            return 0;
1455
0
        }
1456
0
    }
1457
0
    errno = EFAULT;
1458
0
    return -1;
1459
0
}
1460
1461
1462
static int check_poller (void *const poller_)
1463
0
{
1464
0
    if (!poller_
1465
0
        || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1466
0
        errno = EFAULT;
1467
0
        return -1;
1468
0
    }
1469
1470
0
    return 0;
1471
0
}
1472
1473
static int check_events (const short events_)
1474
0
{
1475
0
    if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
1476
0
        errno = EINVAL;
1477
0
        return -1;
1478
0
    }
1479
0
    return 0;
1480
0
}
1481
1482
static int check_poller_registration_args (void *const poller_, void *const s_)
1483
0
{
1484
0
    if (-1 == check_poller (poller_))
1485
0
        return -1;
1486
1487
0
    if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1488
0
        errno = ENOTSOCK;
1489
0
        return -1;
1490
0
    }
1491
1492
0
    return 0;
1493
0
}
1494
1495
static int check_poller_fd_registration_args (void *const poller_,
1496
                                              const zmq::fd_t fd_)
1497
0
{
1498
0
    if (-1 == check_poller (poller_))
1499
0
        return -1;
1500
1501
0
    if (fd_ == zmq::retired_fd) {
1502
0
        errno = EBADF;
1503
0
        return -1;
1504
0
    }
1505
1506
0
    return 0;
1507
0
}
1508
1509
int zmq_poller_size (void *poller_)
1510
0
{
1511
0
    if (-1 == check_poller (poller_))
1512
0
        return -1;
1513
1514
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->size ();
1515
0
}
1516
1517
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1518
0
{
1519
0
    if (-1 == check_poller_registration_args (poller_, s_)
1520
0
        || -1 == check_events (events_))
1521
0
        return -1;
1522
1523
0
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1524
1525
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1526
0
      ->add (socket, user_data_, events_);
1527
0
}
1528
1529
int zmq_poller_add_fd (void *poller_,
1530
                       zmq::fd_t fd_,
1531
                       void *user_data_,
1532
                       short events_)
1533
0
{
1534
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
1535
0
        || -1 == check_events (events_))
1536
0
        return -1;
1537
1538
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1539
0
      ->add_fd (fd_, user_data_, events_);
1540
0
}
1541
1542
1543
int zmq_poller_modify (void *poller_, void *s_, short events_)
1544
0
{
1545
0
    if (-1 == check_poller_registration_args (poller_, s_)
1546
0
        || -1 == check_events (events_))
1547
0
        return -1;
1548
1549
0
    const zmq::socket_base_t *const socket =
1550
0
      static_cast<const zmq::socket_base_t *> (s_);
1551
1552
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1553
0
      ->modify (socket, events_);
1554
0
}
1555
1556
int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1557
0
{
1558
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
1559
0
        || -1 == check_events (events_))
1560
0
        return -1;
1561
1562
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1563
0
      ->modify_fd (fd_, events_);
1564
0
}
1565
1566
int zmq_poller_remove (void *poller_, void *s_)
1567
0
{
1568
0
    if (-1 == check_poller_registration_args (poller_, s_))
1569
0
        return -1;
1570
1571
0
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1572
1573
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1574
0
}
1575
1576
int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1577
0
{
1578
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_))
1579
0
        return -1;
1580
1581
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1582
0
}
1583
1584
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1585
0
{
1586
0
    const int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1587
1588
0
    if (rc < 0 && event_) {
1589
0
        event_->socket = NULL;
1590
0
        event_->fd = zmq::retired_fd;
1591
0
        event_->user_data = NULL;
1592
0
        event_->events = 0;
1593
0
    }
1594
    // wait_all returns number of events, but we return 0 for any success
1595
0
    return rc >= 0 ? 0 : rc;
1596
0
}
1597
1598
int zmq_poller_wait_all (void *poller_,
1599
                         zmq_poller_event_t *events_,
1600
                         int n_events_,
1601
                         long timeout_)
1602
0
{
1603
0
    if (-1 == check_poller (poller_))
1604
0
        return -1;
1605
1606
0
    if (!events_) {
1607
0
        errno = EFAULT;
1608
0
        return -1;
1609
0
    }
1610
0
    if (n_events_ < 0) {
1611
0
        errno = EINVAL;
1612
0
        return -1;
1613
0
    }
1614
1615
0
    const int rc =
1616
0
      (static_cast<zmq::socket_poller_t *> (poller_))
1617
0
        ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1618
0
                n_events_, timeout_);
1619
1620
0
    return rc;
1621
0
}
1622
1623
int zmq_poller_fd (void *poller_, zmq_fd_t *fd_)
1624
0
{
1625
0
    if (!poller_
1626
0
        || !(static_cast<zmq::socket_poller_t *> (poller_)->check_tag ())) {
1627
0
        errno = EFAULT;
1628
0
        return -1;
1629
0
    }
1630
0
    return static_cast<zmq::socket_poller_t *> (poller_)->signaler_fd (fd_);
1631
0
}
1632
1633
//  Peer-specific state
1634
1635
int zmq_socket_get_peer_state (void *s_,
1636
                               const void *routing_id_,
1637
                               size_t routing_id_size_)
1638
0
{
1639
0
    const zmq::socket_base_t *const s = as_socket_base_t (s_);
1640
0
    if (!s)
1641
0
        return -1;
1642
1643
0
    return s->get_peer_state (routing_id_, routing_id_size_);
1644
0
}
1645
1646
//  Timers
1647
1648
void *zmq_timers_new (void)
1649
0
{
1650
0
    zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1651
0
    alloc_assert (timers);
1652
0
    return timers;
1653
0
}
1654
1655
int zmq_timers_destroy (void **timers_p_)
1656
0
{
1657
0
    void *timers = *timers_p_;
1658
0
    if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
1659
0
        errno = EFAULT;
1660
0
        return -1;
1661
0
    }
1662
0
    delete (static_cast<zmq::timers_t *> (timers));
1663
0
    *timers_p_ = NULL;
1664
0
    return 0;
1665
0
}
1666
1667
int zmq_timers_add (void *timers_,
1668
                    size_t interval_,
1669
                    zmq_timer_fn handler_,
1670
                    void *arg_)
1671
0
{
1672
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1673
0
        errno = EFAULT;
1674
0
        return -1;
1675
0
    }
1676
1677
0
    return (static_cast<zmq::timers_t *> (timers_))
1678
0
      ->add (interval_, handler_, arg_);
1679
0
}
1680
1681
int zmq_timers_cancel (void *timers_, int timer_id_)
1682
0
{
1683
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1684
0
        errno = EFAULT;
1685
0
        return -1;
1686
0
    }
1687
1688
0
    return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
1689
0
}
1690
1691
int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1692
0
{
1693
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1694
0
        errno = EFAULT;
1695
0
        return -1;
1696
0
    }
1697
1698
0
    return (static_cast<zmq::timers_t *> (timers_))
1699
0
      ->set_interval (timer_id_, interval_);
1700
0
}
1701
1702
int zmq_timers_reset (void *timers_, int timer_id_)
1703
0
{
1704
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1705
0
        errno = EFAULT;
1706
0
        return -1;
1707
0
    }
1708
1709
0
    return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
1710
0
}
1711
1712
long zmq_timers_timeout (void *timers_)
1713
0
{
1714
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1715
0
        errno = EFAULT;
1716
0
        return -1;
1717
0
    }
1718
1719
0
    return (static_cast<zmq::timers_t *> (timers_))->timeout ();
1720
0
}
1721
1722
int zmq_timers_execute (void *timers_)
1723
0
{
1724
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1725
0
        errno = EFAULT;
1726
0
        return -1;
1727
0
    }
1728
1729
0
    return (static_cast<zmq::timers_t *> (timers_))->execute ();
1730
0
}
1731
1732
//  The proxy functionality
1733
1734
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1735
0
{
1736
0
    if (!frontend_ || !backend_) {
1737
0
        errno = EFAULT;
1738
0
        return -1;
1739
0
    }
1740
    // Runs zmq::proxy_steerable with a NULL control_.
1741
0
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1742
0
                       static_cast<zmq::socket_base_t *> (backend_),
1743
0
                       static_cast<zmq::socket_base_t *> (capture_));
1744
0
}
1745
1746
int zmq_proxy_steerable (void *frontend_,
1747
                         void *backend_,
1748
                         void *capture_,
1749
                         void *control_)
1750
0
{
1751
0
    if (!frontend_ || !backend_) {
1752
0
        errno = EFAULT;
1753
0
        return -1;
1754
0
    }
1755
0
    return zmq::proxy_steerable (static_cast<zmq::socket_base_t *> (frontend_),
1756
0
                                 static_cast<zmq::socket_base_t *> (backend_),
1757
0
                                 static_cast<zmq::socket_base_t *> (capture_),
1758
0
                                 static_cast<zmq::socket_base_t *> (control_));
1759
0
}
1760
1761
//  The deprecated device functionality
1762
1763
int zmq_device (int /* type */, void *frontend_, void *backend_)
1764
0
{
1765
0
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1766
0
                       static_cast<zmq::socket_base_t *> (backend_), NULL);
1767
0
}
1768
1769
//  Probe library capabilities; for now, reports on transport and security
1770
1771
int zmq_has (const char *capability_)
1772
0
{
1773
0
#if defined(ZMQ_HAVE_IPC)
1774
0
    if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1775
0
        return true;
1776
0
#endif
1777
#if defined(ZMQ_HAVE_OPENPGM)
1778
    if (strcmp (capability_, zmq::protocol_name::pgm) == 0)
1779
        return true;
1780
#endif
1781
0
#if defined(ZMQ_HAVE_TIPC)
1782
0
    if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1783
0
        return true;
1784
0
#endif
1785
#if defined(ZMQ_HAVE_NORM)
1786
    if (strcmp (capability_, zmq::protocol_name::norm) == 0)
1787
        return true;
1788
#endif
1789
0
#if defined(ZMQ_HAVE_CURVE)
1790
0
    if (strcmp (capability_, "curve") == 0)
1791
0
        return true;
1792
0
#endif
1793
#if defined(HAVE_LIBGSSAPI_KRB5)
1794
    if (strcmp (capability_, "gssapi") == 0)
1795
        return true;
1796
#endif
1797
#if defined(ZMQ_HAVE_VMCI)
1798
    if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
1799
        return true;
1800
#endif
1801
0
#if defined(ZMQ_BUILD_DRAFT_API)
1802
0
    if (strcmp (capability_, "draft") == 0)
1803
0
        return true;
1804
0
#endif
1805
0
#if defined(ZMQ_HAVE_WS)
1806
0
    if (strcmp (capability_, "WS") == 0)
1807
0
        return true;
1808
0
#endif
1809
#if defined(ZMQ_HAVE_WSS)
1810
    if (strcmp (capability_, "WSS") == 0)
1811
        return true;
1812
#endif
1813
    //  Whatever the application asked for, we don't have
1814
0
    return false;
1815
0
}
1816
1817
int zmq_socket_monitor_pipes_stats (void *s_)
1818
0
{
1819
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
1820
0
    if (!s)
1821
0
        return -1;
1822
0
    return s->query_pipes_stats ();
1823
0
}