Coverage Report

Created: 2024-10-29 06:13

/src/libzmq/src/zmq.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
// "Tell them I was a writer.
4
//  A maker of software.
5
//  A humanist. A father.
6
//  And many things.
7
//  But above all, a writer.
8
//  Thank You. :)"
9
//  - Pieter Hintjens
10
11
#include "precompiled.hpp"
12
#define ZMQ_TYPE_UNSAFE
13
14
#include "macros.hpp"
15
#include "poller.hpp"
16
#include "peer.hpp"
17
18
#if !defined ZMQ_HAVE_POLLER
19
//  On AIX platform, poll.h has to be included first to get consistent
20
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
21
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
22
//  names to AIX-specific names).
23
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
24
#include <poll.h>
25
#endif
26
27
#include "polling_util.hpp"
28
#endif
29
30
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
31
// zmq.h must be included *after* poll.h for AIX to build properly
32
//#include "../include/zmq.h"
33
34
#if !defined ZMQ_HAVE_WINDOWS
35
#include <unistd.h>
36
#ifdef ZMQ_HAVE_VXWORKS
37
#include <strings.h>
38
#endif
39
#endif
40
41
// XSI vector I/O
42
#if defined ZMQ_HAVE_UIO
43
#include <sys/uio.h>
44
#else
45
struct iovec
46
{
47
    void *iov_base;
48
    size_t iov_len;
49
};
50
#endif
51
52
#include <string.h>
53
#include <stdlib.h>
54
#include <new>
55
#include <climits>
56
57
#include "proxy.hpp"
58
#include "socket_base.hpp"
59
#include "stdint.hpp"
60
#include "config.hpp"
61
#include "likely.hpp"
62
#include "clock.hpp"
63
#include "ctx.hpp"
64
#include "err.hpp"
65
#include "msg.hpp"
66
#include "fd.hpp"
67
#include "metadata.hpp"
68
#include "socket_poller.hpp"
69
#include "timers.hpp"
70
#include "ip.hpp"
71
#include "address.hpp"
72
73
#ifdef ZMQ_HAVE_PPOLL
74
#include "polling_util.hpp"
75
#include <sys/select.h>
76
#endif
77
78
#if defined ZMQ_HAVE_OPENPGM
79
#define __PGM_WININT_H__
80
#include <pgm/pgm.h>
81
#endif
82
83
//  Compile time check whether msg_t fits into zmq_msg_t.
84
typedef char
85
  check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
86
87
88
void zmq_version (int *major_, int *minor_, int *patch_)
89
0
{
90
0
    *major_ = ZMQ_VERSION_MAJOR;
91
0
    *minor_ = ZMQ_VERSION_MINOR;
92
0
    *patch_ = ZMQ_VERSION_PATCH;
93
0
}
94
95
96
const char *zmq_strerror (int errnum_)
97
0
{
98
0
    return zmq::errno_to_string (errnum_);
99
0
}
100
101
int zmq_errno (void)
102
1.06k
{
103
1.06k
    return errno;
104
1.06k
}
105
106
107
//  New context API
108
109
void *zmq_ctx_new (void)
110
4.15k
{
111
    //  We do this before the ctx constructor since its embedded mailbox_t
112
    //  object needs the network to be up and running (at least on Windows).
113
4.15k
    if (!zmq::initialize_network ()) {
114
0
        return NULL;
115
0
    }
116
117
    //  Create 0MQ context.
118
4.15k
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
119
4.15k
    if (ctx) {
120
4.15k
        if (!ctx->valid ()) {
121
0
            delete ctx;
122
0
            return NULL;
123
0
        }
124
4.15k
    }
125
4.15k
    return ctx;
126
4.15k
}
127
128
int zmq_ctx_term (void *ctx_)
129
4.15k
{
130
4.15k
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
131
0
        errno = EFAULT;
132
0
        return -1;
133
0
    }
134
135
4.15k
    const int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
136
4.15k
    const int en = errno;
137
138
    //  Shut down only if termination was not interrupted by a signal.
139
4.15k
    if (!rc || en != EINTR) {
140
4.15k
        zmq::shutdown_network ();
141
4.15k
    }
142
143
4.15k
    errno = en;
144
4.15k
    return rc;
145
4.15k
}
146
147
int zmq_ctx_shutdown (void *ctx_)
148
0
{
149
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
150
0
        errno = EFAULT;
151
0
        return -1;
152
0
    }
153
0
    return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
154
0
}
155
156
int zmq_ctx_set (void *ctx_, int option_, int optval_)
157
0
{
158
0
    return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int));
159
0
}
160
161
int zmq_ctx_set_ext (void *ctx_,
162
                     int option_,
163
                     const void *optval_,
164
                     size_t optvallen_)
165
0
{
166
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
167
0
        errno = EFAULT;
168
0
        return -1;
169
0
    }
170
0
    return (static_cast<zmq::ctx_t *> (ctx_))
171
0
      ->set (option_, optval_, optvallen_);
172
0
}
173
174
int zmq_ctx_get (void *ctx_, int option_)
175
0
{
176
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
177
0
        errno = EFAULT;
178
0
        return -1;
179
0
    }
180
0
    return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
181
0
}
182
183
int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_)
184
0
{
185
0
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
186
0
        errno = EFAULT;
187
0
        return -1;
188
0
    }
189
0
    return (static_cast<zmq::ctx_t *> (ctx_))
190
0
      ->get (option_, optval_, optvallen_);
191
0
}
192
193
194
//  Stable/legacy context API
195
196
void *zmq_init (int io_threads_)
197
0
{
198
0
    if (io_threads_ >= 0) {
199
0
        void *ctx = zmq_ctx_new ();
200
0
        zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
201
0
        return ctx;
202
0
    }
203
0
    errno = EINVAL;
204
0
    return NULL;
205
0
}
206
207
int zmq_term (void *ctx_)
208
0
{
209
0
    return zmq_ctx_term (ctx_);
210
0
}
211
212
int zmq_ctx_destroy (void *ctx_)
213
0
{
214
0
    return zmq_ctx_term (ctx_);
215
0
}
216
217
218
// Sockets
219
220
static zmq::socket_base_t *as_socket_base_t (void *s_)
221
188k
{
222
188k
    zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
223
188k
    if (!s_ || !s->check_tag ()) {
224
0
        errno = ENOTSOCK;
225
0
        return NULL;
226
0
    }
227
188k
    return s;
228
188k
}
229
230
void *zmq_socket (void *ctx_, int type_)
231
4.15k
{
232
4.15k
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
233
0
        errno = EFAULT;
234
0
        return NULL;
235
0
    }
236
4.15k
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
237
4.15k
    zmq::socket_base_t *s = ctx->create_socket (type_);
238
4.15k
    return static_cast<void *> (s);
239
4.15k
}
240
241
int zmq_close (void *s_)
242
4.15k
{
243
4.15k
    zmq::socket_base_t *s = as_socket_base_t (s_);
244
4.15k
    if (!s)
245
0
        return -1;
246
4.15k
    s->close ();
247
4.15k
    return 0;
248
4.15k
}
249
250
int zmq_setsockopt (void *s_,
251
                    int option_,
252
                    const void *optval_,
253
                    size_t optvallen_)
254
92.3k
{
255
92.3k
    zmq::socket_base_t *s = as_socket_base_t (s_);
256
92.3k
    if (!s)
257
0
        return -1;
258
92.3k
    return s->setsockopt (option_, optval_, optvallen_);
259
92.3k
}
260
261
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
262
88.9k
{
263
88.9k
    zmq::socket_base_t *s = as_socket_base_t (s_);
264
88.9k
    if (!s)
265
0
        return -1;
266
88.9k
    return s->getsockopt (option_, optval_, optvallen_);
267
88.9k
}
268
269
int zmq_socket_monitor_versioned (
270
  void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
271
0
{
272
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
273
0
    if (!s)
274
0
        return -1;
275
0
    return s->monitor (addr_, events_, event_version_, type_);
276
0
}
277
278
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
279
0
{
280
0
    return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR);
281
0
}
282
283
int zmq_join (void *s_, const char *group_)
284
0
{
285
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
286
0
    if (!s)
287
0
        return -1;
288
0
    return s->join (group_);
289
0
}
290
291
int zmq_leave (void *s_, const char *group_)
292
0
{
293
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
294
0
    if (!s)
295
0
        return -1;
296
0
    return s->leave (group_);
297
0
}
298
299
int zmq_bind (void *s_, const char *addr_)
300
1.61k
{
301
1.61k
    zmq::socket_base_t *s = as_socket_base_t (s_);
302
1.61k
    if (!s)
303
0
        return -1;
304
1.61k
    return s->bind (addr_);
305
1.61k
}
306
307
int zmq_connect (void *s_, const char *addr_)
308
1.71k
{
309
1.71k
    zmq::socket_base_t *s = as_socket_base_t (s_);
310
1.71k
    if (!s)
311
0
        return -1;
312
1.71k
    return s->connect (addr_);
313
1.71k
}
314
315
uint32_t zmq_connect_peer (void *s_, const char *addr_)
316
0
{
317
0
    zmq::peer_t *s = static_cast<zmq::peer_t *> (s_);
318
0
    if (!s_ || !s->check_tag ()) {
319
0
        errno = ENOTSOCK;
320
0
        return 0;
321
0
    }
322
323
0
    int socket_type;
324
0
    size_t socket_type_size = sizeof (socket_type);
325
0
    if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0)
326
0
        return 0;
327
328
0
    if (socket_type != ZMQ_PEER) {
329
0
        errno = ENOTSUP;
330
0
        return 0;
331
0
    }
332
333
0
    return s->connect_peer (addr_);
334
0
}
335
336
337
int zmq_unbind (void *s_, const char *addr_)
338
0
{
339
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
340
0
    if (!s)
341
0
        return -1;
342
0
    return s->term_endpoint (addr_);
343
0
}
344
345
int zmq_disconnect (void *s_, const char *addr_)
346
0
{
347
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
348
0
    if (!s)
349
0
        return -1;
350
0
    return s->term_endpoint (addr_);
351
0
}
352
353
// Sending functions.
354
355
static inline int
356
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
357
0
{
358
0
    size_t sz = zmq_msg_size (msg_);
359
0
    const int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
360
0
    if (unlikely (rc < 0))
361
0
        return -1;
362
363
    //  This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
364
    //  int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
365
0
    size_t max_msgsz = INT_MAX;
366
367
    //  Truncate returned size to INT_MAX to avoid overflow to negative values
368
0
    return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
369
0
}
370
371
/*  To be deprecated once zmq_msg_send() is stable                           */
372
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
373
0
{
374
0
    return zmq_msg_send (msg_, s_, flags_);
375
0
}
376
377
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
378
0
{
379
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
380
0
    if (!s)
381
0
        return -1;
382
0
    zmq_msg_t msg;
383
0
    int rc = zmq_msg_init_buffer (&msg, buf_, len_);
384
0
    if (unlikely (rc < 0))
385
0
        return -1;
386
387
0
    rc = s_sendmsg (s, &msg, flags_);
388
0
    if (unlikely (rc < 0)) {
389
0
        const int err = errno;
390
0
        const int rc2 = zmq_msg_close (&msg);
391
0
        errno_assert (rc2 == 0);
392
0
        errno = err;
393
0
        return -1;
394
0
    }
395
    //  Note the optimisation here. We don't close the msg object as it is
396
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
397
0
    return rc;
398
0
}
399
400
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
401
0
{
402
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
403
0
    if (!s)
404
0
        return -1;
405
0
    zmq_msg_t msg;
406
0
    int rc =
407
0
      zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
408
0
    if (rc != 0)
409
0
        return -1;
410
411
0
    rc = s_sendmsg (s, &msg, flags_);
412
0
    if (unlikely (rc < 0)) {
413
0
        const int err = errno;
414
0
        const int rc2 = zmq_msg_close (&msg);
415
0
        errno_assert (rc2 == 0);
416
0
        errno = err;
417
0
        return -1;
418
0
    }
419
    //  Note the optimisation here. We don't close the msg object as it is
420
    //  empty anyway. This may change when implementation of zmq_msg_t changes.
421
0
    return rc;
422
0
}
423
424
425
// Send multiple messages.
426
// TODO: this function has no man page
427
//
428
// If flag bit ZMQ_SNDMORE is set the vector is treated as
429
// a single multi-part message, i.e. the last message has
430
// ZMQ_SNDMORE bit switched off.
431
//
432
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
433
0
{
434
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
435
0
    if (!s)
436
0
        return -1;
437
0
    if (unlikely (count_ <= 0 || !a_)) {
438
0
        errno = EINVAL;
439
0
        return -1;
440
0
    }
441
442
0
    int rc = 0;
443
0
    zmq_msg_t msg;
444
445
0
    for (size_t i = 0; i < count_; ++i) {
446
0
        rc = zmq_msg_init_size (&msg, a_[i].iov_len);
447
0
        if (rc != 0) {
448
0
            rc = -1;
449
0
            break;
450
0
        }
451
0
        memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
452
0
        if (i == count_ - 1)
453
0
            flags_ = flags_ & ~ZMQ_SNDMORE;
454
0
        rc = s_sendmsg (s, &msg, flags_);
455
0
        if (unlikely (rc < 0)) {
456
0
            const int err = errno;
457
0
            const int rc2 = zmq_msg_close (&msg);
458
0
            errno_assert (rc2 == 0);
459
0
            errno = err;
460
0
            rc = -1;
461
0
            break;
462
0
        }
463
0
    }
464
0
    return rc;
465
0
}
466
467
// Receiving functions.
468
469
static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
470
0
{
471
0
    const int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
472
0
    if (unlikely (rc < 0))
473
0
        return -1;
474
475
    //  Truncate returned size to INT_MAX to avoid overflow to negative values
476
0
    const size_t sz = zmq_msg_size (msg_);
477
0
    return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
478
0
}
479
480
/*  To be deprecated once zmq_msg_recv() is stable                           */
481
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
482
0
{
483
0
    return zmq_msg_recv (msg_, s_, flags_);
484
0
}
485
486
487
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
488
0
{
489
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
490
0
    if (!s)
491
0
        return -1;
492
0
    zmq_msg_t msg;
493
0
    int rc = zmq_msg_init (&msg);
494
0
    errno_assert (rc == 0);
495
496
0
    const int nbytes = s_recvmsg (s, &msg, flags_);
497
0
    if (unlikely (nbytes < 0)) {
498
0
        const int err = errno;
499
0
        rc = zmq_msg_close (&msg);
500
0
        errno_assert (rc == 0);
501
0
        errno = err;
502
0
        return -1;
503
0
    }
504
505
    //  An oversized message is silently truncated.
506
0
    const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
507
508
    //  We explicitly allow a null buffer argument if len is zero
509
0
    if (to_copy) {
510
0
        assert (buf_);
511
0
        memcpy (buf_, zmq_msg_data (&msg), to_copy);
512
0
    }
513
0
    rc = zmq_msg_close (&msg);
514
0
    errno_assert (rc == 0);
515
516
0
    return nbytes;
517
0
}
518
519
// Receive a multi-part message
520
//
521
// Receives up to *count_ parts of a multi-part message.
522
// Sets *count_ to the actual number of parts read.
523
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
524
// Returns number of message parts read, or -1 on error.
525
//
526
// Note: even if -1 is returned, some parts of the message
527
// may have been read. Therefore the client must consult
528
// *count_ to retrieve message parts successfully read,
529
// even if -1 is returned.
530
//
531
// The iov_base* buffers of each iovec *a_ filled in by this
532
// function may be freed using free().
533
// TODO: this function has no man page
534
//
535
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
536
0
{
537
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
538
0
    if (!s)
539
0
        return -1;
540
0
    if (unlikely (!count_ || *count_ <= 0 || !a_)) {
541
0
        errno = EINVAL;
542
0
        return -1;
543
0
    }
544
545
0
    const size_t count = *count_;
546
0
    int nread = 0;
547
0
    bool recvmore = true;
548
549
0
    *count_ = 0;
550
551
0
    for (size_t i = 0; recvmore && i < count; ++i) {
552
0
        zmq_msg_t msg;
553
0
        int rc = zmq_msg_init (&msg);
554
0
        errno_assert (rc == 0);
555
556
0
        const int nbytes = s_recvmsg (s, &msg, flags_);
557
0
        if (unlikely (nbytes < 0)) {
558
0
            const int err = errno;
559
0
            rc = zmq_msg_close (&msg);
560
0
            errno_assert (rc == 0);
561
0
            errno = err;
562
0
            nread = -1;
563
0
            break;
564
0
        }
565
566
0
        a_[i].iov_len = zmq_msg_size (&msg);
567
0
        a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
568
0
        if (unlikely (!a_[i].iov_base)) {
569
0
            errno = ENOMEM;
570
0
            return -1;
571
0
        }
572
0
        memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
573
0
                a_[i].iov_len);
574
        // Assume zmq_socket ZMQ_RVCMORE is properly set.
575
0
        const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
576
0
        recvmore = p_msg->flags () & zmq::msg_t::more;
577
0
        rc = zmq_msg_close (&msg);
578
0
        errno_assert (rc == 0);
579
0
        ++*count_;
580
0
        ++nread;
581
0
    }
582
0
    return nread;
583
0
}
584
585
// Message manipulators.
586
587
int zmq_msg_init (zmq_msg_t *msg_)
588
0
{
589
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
590
0
}
591
592
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
593
0
{
594
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
595
0
}
596
597
int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_)
598
0
{
599
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->init_buffer (buf_, size_);
600
0
}
601
602
int zmq_msg_init_data (
603
  zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
604
0
{
605
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))
606
0
      ->init_data (data_, size_, ffn_, hint_);
607
0
}
608
609
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
610
0
{
611
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
612
0
    if (!s)
613
0
        return -1;
614
0
    return s_sendmsg (s, msg_, flags_);
615
0
}
616
617
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
618
0
{
619
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
620
0
    if (!s)
621
0
        return -1;
622
0
    return s_recvmsg (s, msg_, flags_);
623
0
}
624
625
int zmq_msg_close (zmq_msg_t *msg_)
626
0
{
627
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
628
0
}
629
630
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
631
0
{
632
0
    return (reinterpret_cast<zmq::msg_t *> (dest_))
633
0
      ->move (*reinterpret_cast<zmq::msg_t *> (src_));
634
0
}
635
636
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
637
0
{
638
0
    return (reinterpret_cast<zmq::msg_t *> (dest_))
639
0
      ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
640
0
}
641
642
void *zmq_msg_data (zmq_msg_t *msg_)
643
0
{
644
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
645
0
}
646
647
size_t zmq_msg_size (const zmq_msg_t *msg_)
648
0
{
649
0
    return ((zmq::msg_t *) msg_)->size ();
650
0
}
651
652
int zmq_msg_more (const zmq_msg_t *msg_)
653
0
{
654
0
    return zmq_msg_get (msg_, ZMQ_MORE);
655
0
}
656
657
int zmq_msg_get (const zmq_msg_t *msg_, int property_)
658
0
{
659
0
    const char *fd_string;
660
661
0
    switch (property_) {
662
0
        case ZMQ_MORE:
663
0
            return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
664
0
        case ZMQ_SRCFD:
665
0
            fd_string = zmq_msg_gets (msg_, "__fd");
666
0
            if (fd_string == NULL)
667
0
                return -1;
668
669
0
            return atoi (fd_string);
670
0
        case ZMQ_SHARED:
671
0
            return (((zmq::msg_t *) msg_)->is_cmsg ())
672
0
                       || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
673
0
                     ? 1
674
0
                     : 0;
675
0
        default:
676
0
            errno = EINVAL;
677
0
            return -1;
678
0
    }
679
0
}
680
681
int zmq_msg_set (zmq_msg_t *, int, int)
682
0
{
683
    //  No properties supported at present
684
0
    errno = EINVAL;
685
0
    return -1;
686
0
}
687
688
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
689
0
{
690
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))
691
0
      ->set_routing_id (routing_id_);
692
0
}
693
694
uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
695
0
{
696
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
697
0
}
698
699
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
700
0
{
701
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
702
0
}
703
704
const char *zmq_msg_group (zmq_msg_t *msg_)
705
0
{
706
0
    return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
707
0
}
708
709
//  Get message metadata string
710
711
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
712
0
{
713
0
    const zmq::metadata_t *metadata =
714
0
      reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
715
0
    const char *value = NULL;
716
0
    if (metadata)
717
0
        value = metadata->get (std::string (property_));
718
0
    if (value)
719
0
        return value;
720
721
0
    errno = EINVAL;
722
0
    return NULL;
723
0
}
724
725
// Polling.
726
727
#if defined ZMQ_HAVE_POLLER
728
static int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
729
0
{
730
    // implement zmq_poll on top of zmq_poller
731
0
    int rc;
732
0
    zmq_poller_event_t *events;
733
0
    zmq::socket_poller_t poller;
734
0
    events = new (std::nothrow) zmq_poller_event_t[nitems_];
735
0
    alloc_assert (events);
736
737
0
    bool repeat_items = false;
738
    //  Register sockets with poller
739
0
    for (int i = 0; i < nitems_; i++) {
740
0
        items_[i].revents = 0;
741
742
0
        bool modify = false;
743
0
        short e = items_[i].events;
744
0
        if (items_[i].socket) {
745
            //  Poll item is a 0MQ socket.
746
0
            for (int j = 0; j < i; ++j) {
747
                // Check for repeat entries
748
0
                if (items_[j].socket == items_[i].socket) {
749
0
                    repeat_items = true;
750
0
                    modify = true;
751
0
                    e |= items_[j].events;
752
0
                }
753
0
            }
754
0
            if (modify) {
755
0
                rc = zmq_poller_modify (&poller, items_[i].socket, e);
756
0
            } else {
757
0
                rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
758
0
            }
759
0
            if (rc < 0) {
760
0
                delete[] events;
761
0
                return rc;
762
0
            }
763
0
        } else {
764
            //  Poll item is a raw file descriptor.
765
0
            for (int j = 0; j < i; ++j) {
766
                // Check for repeat entries
767
0
                if (!items_[j].socket && items_[j].fd == items_[i].fd) {
768
0
                    repeat_items = true;
769
0
                    modify = true;
770
0
                    e |= items_[j].events;
771
0
                }
772
0
            }
773
0
            if (modify) {
774
0
                rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
775
0
            } else {
776
0
                rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
777
0
            }
778
0
            if (rc < 0) {
779
0
                delete[] events;
780
0
                return rc;
781
0
            }
782
0
        }
783
0
    }
784
785
    //  Wait for events
786
0
    rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
787
0
    if (rc < 0) {
788
0
        delete[] events;
789
0
        if (zmq_errno () == EAGAIN) {
790
0
            return 0;
791
0
        }
792
0
        return rc;
793
0
    }
794
795
    //  Transform poller events into zmq_pollitem events.
796
    //  items_ contains all items, while events only contains fired events.
797
    //  If no sockets are repeated (likely), the two are still co-ordered, so step through the items
798
    //  checking for matches only on the first event.
799
    //  If there are repeat items, they cannot be assumed to be co-ordered,
800
    //  so each pollitem must check fired events from the beginning.
801
0
    int j_start = 0, found_events = rc;
802
0
    for (int i = 0; i < nitems_; i++) {
803
0
        for (int j = j_start; j < found_events; ++j) {
804
0
            if ((items_[i].socket && items_[i].socket == events[j].socket)
805
0
                || (!(items_[i].socket || events[j].socket)
806
0
                    && items_[i].fd == events[j].fd)) {
807
0
                items_[i].revents = events[j].events & items_[i].events;
808
0
                if (!repeat_items) {
809
                    // no repeats, we can ignore events we've already seen
810
0
                    j_start++;
811
0
                }
812
0
                break;
813
0
            }
814
0
            if (!repeat_items) {
815
                // no repeats, never have to look at j > j_start
816
0
                break;
817
0
            }
818
0
        }
819
0
    }
820
821
    //  Cleanup
822
0
    delete[] events;
823
0
    return rc;
824
0
}
825
#endif // ZMQ_HAVE_POLLER
826
827
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
828
0
{
829
0
#if defined ZMQ_HAVE_POLLER
830
    // if poller is present, use that if there is at least 1 thread-safe socket,
831
    // otherwise fall back to the previous implementation as it's faster.
832
0
    for (int i = 0; i != nitems_; i++) {
833
0
        if (items_[i].socket) {
834
0
            zmq::socket_base_t *s = as_socket_base_t (items_[i].socket);
835
0
            if (s) {
836
0
                if (s->is_thread_safe ())
837
0
                    return zmq_poller_poll (items_, nitems_, timeout_);
838
0
            } else {
839
                //as_socket_base_t returned NULL : socket is invalid
840
0
                return -1;
841
0
            }
842
0
        }
843
0
    }
844
0
#endif // ZMQ_HAVE_POLLER
845
0
#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
846
0
    if (unlikely (nitems_ < 0)) {
847
0
        errno = EINVAL;
848
0
        return -1;
849
0
    }
850
0
    if (unlikely (nitems_ == 0)) {
851
0
        if (timeout_ == 0)
852
0
            return 0;
853
#if defined ZMQ_HAVE_WINDOWS
854
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
855
        return 0;
856
#elif defined ZMQ_HAVE_VXWORKS
857
        struct timespec ns_;
858
        ns_.tv_sec = timeout_ / 1000;
859
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
860
        return nanosleep (&ns_, 0);
861
#else
862
0
        return usleep (timeout_ * 1000);
863
0
#endif
864
0
    }
865
0
    if (!items_) {
866
0
        errno = EFAULT;
867
0
        return -1;
868
0
    }
869
870
0
    zmq::clock_t clock;
871
0
    uint64_t now = 0;
872
0
    uint64_t end = 0;
873
0
#if defined ZMQ_POLL_BASED_ON_POLL
874
0
    zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
875
876
    //  Build pollset for poll () system call.
877
0
    for (int i = 0; i != nitems_; i++) {
878
        //  If the poll item is a 0MQ socket, we poll on the file descriptor
879
        //  retrieved by the ZMQ_FD socket option.
880
0
        if (items_[i].socket) {
881
0
            size_t zmq_fd_size = sizeof (zmq::fd_t);
882
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
883
0
                                &zmq_fd_size)
884
0
                == -1) {
885
0
                return -1;
886
0
            }
887
0
            pollfds[i].events = items_[i].events ? POLLIN : 0;
888
0
        }
889
        //  Else, the poll item is a raw file descriptor. Just convert the
890
        //  events to normal POLLIN/POLLOUT for poll ().
891
0
        else {
892
0
            pollfds[i].fd = items_[i].fd;
893
0
            pollfds[i].events =
894
0
              (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
895
0
              | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
896
0
              | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
897
0
        }
898
0
    }
899
#else
900
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
901
    //  file descriptors.
902
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
903
    zmq_assert (nitems_ <= FD_SETSIZE);
904
905
    zmq::optimized_fd_set_t pollset_in (nitems_);
906
    FD_ZERO (pollset_in.get ());
907
    zmq::optimized_fd_set_t pollset_out (nitems_);
908
    FD_ZERO (pollset_out.get ());
909
    zmq::optimized_fd_set_t pollset_err (nitems_);
910
    FD_ZERO (pollset_err.get ());
911
912
    zmq::fd_t maxfd = 0;
913
914
    //  Build the fd_sets for passing to select ().
915
    for (int i = 0; i != nitems_; i++) {
916
        //  If the poll item is a 0MQ socket we are interested in input on the
917
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
918
        if (items_[i].socket) {
919
            size_t zmq_fd_size = sizeof (zmq::fd_t);
920
            zmq::fd_t notify_fd;
921
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
922
                                &zmq_fd_size)
923
                == -1)
924
                return -1;
925
            if (items_[i].events) {
926
                FD_SET (notify_fd, pollset_in.get ());
927
                if (maxfd < notify_fd)
928
                    maxfd = notify_fd;
929
            }
930
        }
931
        //  Else, the poll item is a raw file descriptor. Convert the poll item
932
        //  events to the appropriate fd_sets.
933
        else {
934
            if (items_[i].events & ZMQ_POLLIN)
935
                FD_SET (items_[i].fd, pollset_in.get ());
936
            if (items_[i].events & ZMQ_POLLOUT)
937
                FD_SET (items_[i].fd, pollset_out.get ());
938
            if (items_[i].events & ZMQ_POLLERR)
939
                FD_SET (items_[i].fd, pollset_err.get ());
940
            if (maxfd < items_[i].fd)
941
                maxfd = items_[i].fd;
942
        }
943
    }
944
945
    zmq::optimized_fd_set_t inset (nitems_);
946
    zmq::optimized_fd_set_t outset (nitems_);
947
    zmq::optimized_fd_set_t errset (nitems_);
948
#endif
949
950
0
    bool first_pass = true;
951
0
    int nevents = 0;
952
953
0
    while (true) {
954
0
#if defined ZMQ_POLL_BASED_ON_POLL
955
956
        //  Compute the timeout for the subsequent poll.
957
0
        const zmq::timeout_t timeout =
958
0
          zmq::compute_timeout (first_pass, timeout_, now, end);
959
960
        //  Wait for events.
961
0
        {
962
0
            const int rc = poll (&pollfds[0], nitems_, timeout);
963
0
            if (rc == -1 && errno == EINTR) {
964
0
                return -1;
965
0
            }
966
0
            errno_assert (rc >= 0);
967
0
        }
968
        //  Check for the events.
969
0
        for (int i = 0; i != nitems_; i++) {
970
0
            items_[i].revents = 0;
971
972
            //  The poll item is a 0MQ socket. Retrieve pending events
973
            //  using the ZMQ_EVENTS socket option.
974
0
            if (items_[i].socket) {
975
0
                size_t zmq_events_size = sizeof (uint32_t);
976
0
                uint32_t zmq_events;
977
0
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
978
0
                                    &zmq_events_size)
979
0
                    == -1) {
980
0
                    return -1;
981
0
                }
982
0
                if ((items_[i].events & ZMQ_POLLOUT)
983
0
                    && (zmq_events & ZMQ_POLLOUT))
984
0
                    items_[i].revents |= ZMQ_POLLOUT;
985
0
                if ((items_[i].events & ZMQ_POLLIN)
986
0
                    && (zmq_events & ZMQ_POLLIN))
987
0
                    items_[i].revents |= ZMQ_POLLIN;
988
0
            }
989
            //  Else, the poll item is a raw file descriptor, simply convert
990
            //  the events to zmq_pollitem_t-style format.
991
0
            else {
992
0
                if (pollfds[i].revents & POLLIN)
993
0
                    items_[i].revents |= ZMQ_POLLIN;
994
0
                if (pollfds[i].revents & POLLOUT)
995
0
                    items_[i].revents |= ZMQ_POLLOUT;
996
0
                if (pollfds[i].revents & POLLPRI)
997
0
                    items_[i].revents |= ZMQ_POLLPRI;
998
0
                if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
999
0
                    items_[i].revents |= ZMQ_POLLERR;
1000
0
            }
1001
1002
0
            if (items_[i].revents)
1003
0
                nevents++;
1004
0
        }
1005
1006
#else
1007
1008
        //  Compute the timeout for the subsequent poll.
1009
        timeval timeout;
1010
        timeval *ptimeout;
1011
        if (first_pass) {
1012
            timeout.tv_sec = 0;
1013
            timeout.tv_usec = 0;
1014
            ptimeout = &timeout;
1015
        } else if (timeout_ < 0)
1016
            ptimeout = NULL;
1017
        else {
1018
            timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1019
            timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1020
            ptimeout = &timeout;
1021
        }
1022
1023
        //  Wait for events. Ignore interrupts if there's infinite timeout.
1024
        while (true) {
1025
            memcpy (inset.get (), pollset_in.get (),
1026
                    zmq::valid_pollset_bytes (*pollset_in.get ()));
1027
            memcpy (outset.get (), pollset_out.get (),
1028
                    zmq::valid_pollset_bytes (*pollset_out.get ()));
1029
            memcpy (errset.get (), pollset_err.get (),
1030
                    zmq::valid_pollset_bytes (*pollset_err.get ()));
1031
#if defined ZMQ_HAVE_WINDOWS
1032
            int rc =
1033
              select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1034
            if (unlikely (rc == SOCKET_ERROR)) {
1035
                errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1036
                wsa_assert (errno == ENOTSOCK);
1037
                return -1;
1038
            }
1039
#else
1040
            int rc = select (maxfd + 1, inset.get (), outset.get (),
1041
                             errset.get (), ptimeout);
1042
            if (unlikely (rc == -1)) {
1043
                errno_assert (errno == EINTR || errno == EBADF);
1044
                return -1;
1045
            }
1046
#endif
1047
            break;
1048
        }
1049
1050
        //  Check for the events.
1051
        for (int i = 0; i != nitems_; i++) {
1052
            items_[i].revents = 0;
1053
1054
            //  The poll item is a 0MQ socket. Retrieve pending events
1055
            //  using the ZMQ_EVENTS socket option.
1056
            if (items_[i].socket) {
1057
                size_t zmq_events_size = sizeof (uint32_t);
1058
                uint32_t zmq_events;
1059
                if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1060
                                    &zmq_events_size)
1061
                    == -1)
1062
                    return -1;
1063
                if ((items_[i].events & ZMQ_POLLOUT)
1064
                    && (zmq_events & ZMQ_POLLOUT))
1065
                    items_[i].revents |= ZMQ_POLLOUT;
1066
                if ((items_[i].events & ZMQ_POLLIN)
1067
                    && (zmq_events & ZMQ_POLLIN))
1068
                    items_[i].revents |= ZMQ_POLLIN;
1069
            }
1070
            //  Else, the poll item is a raw file descriptor, simply convert
1071
            //  the events to zmq_pollitem_t-style format.
1072
            else {
1073
                if (FD_ISSET (items_[i].fd, inset.get ()))
1074
                    items_[i].revents |= ZMQ_POLLIN;
1075
                if (FD_ISSET (items_[i].fd, outset.get ()))
1076
                    items_[i].revents |= ZMQ_POLLOUT;
1077
                if (FD_ISSET (items_[i].fd, errset.get ()))
1078
                    items_[i].revents |= ZMQ_POLLERR;
1079
            }
1080
1081
            if (items_[i].revents)
1082
                nevents++;
1083
        }
1084
#endif
1085
1086
        //  If timeout is zero, exit immediately whether there are events or not.
1087
0
        if (timeout_ == 0)
1088
0
            break;
1089
1090
        //  If there are events to return, we can exit immediately.
1091
0
        if (nevents)
1092
0
            break;
1093
1094
        //  At this point we are meant to wait for events but there are none.
1095
        //  If timeout is infinite we can just loop until we get some events.
1096
0
        if (timeout_ < 0) {
1097
0
            if (first_pass)
1098
0
                first_pass = false;
1099
0
            continue;
1100
0
        }
1101
1102
        //  The timeout is finite and there are no events. In the first pass
1103
        //  we get a timestamp of when the polling have begun. (We assume that
1104
        //  first pass have taken negligible time). We also compute the time
1105
        //  when the polling should time out.
1106
0
        if (first_pass) {
1107
0
            now = clock.now_ms ();
1108
0
            end = now + timeout_;
1109
0
            if (now == end)
1110
0
                break;
1111
0
            first_pass = false;
1112
0
            continue;
1113
0
        }
1114
1115
        //  Find out whether timeout have expired.
1116
0
        now = clock.now_ms ();
1117
0
        if (now >= end)
1118
0
            break;
1119
0
    }
1120
1121
0
    return nevents;
1122
#else
1123
    //  Exotic platforms that support neither poll() nor select().
1124
    errno = ENOTSUP;
1125
    return -1;
1126
#endif
1127
0
}
1128
1129
#ifdef ZMQ_HAVE_PPOLL
1130
// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks
1131
int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_)
1132
0
{
1133
0
    if (unlikely (nitems_ < 0)) {
1134
0
        errno = EINVAL;
1135
0
        return -1;
1136
0
    }
1137
0
    if (unlikely (nitems_ == 0)) {
1138
0
        if (timeout_ == 0)
1139
0
            return 0;
1140
#if defined ZMQ_HAVE_WINDOWS
1141
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
1142
        return 0;
1143
#elif defined ZMQ_HAVE_VXWORKS
1144
        struct timespec ns_;
1145
        ns_.tv_sec = timeout_ / 1000;
1146
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
1147
        return nanosleep (&ns_, 0);
1148
#else
1149
0
        return usleep (timeout_ * 1000);
1150
0
#endif
1151
0
    }
1152
0
    if (!items_) {
1153
0
        errno = EFAULT;
1154
0
        return -1;
1155
0
    }
1156
0
    return 1;
1157
0
}
1158
1159
struct zmq_poll_select_fds_t_
1160
{
1161
    explicit zmq_poll_select_fds_t_ (int nitems_) :
1162
0
        pollset_in (nitems_),
1163
0
        pollset_out (nitems_),
1164
0
        pollset_err (nitems_),
1165
0
        inset (nitems_),
1166
0
        outset (nitems_),
1167
0
        errset (nitems_),
1168
0
        maxfd (0)
1169
0
    {
1170
0
        FD_ZERO (pollset_in.get ());
1171
0
        FD_ZERO (pollset_out.get ());
1172
0
        FD_ZERO (pollset_err.get ());
1173
0
    }
1174
1175
    zmq::optimized_fd_set_t pollset_in;
1176
    zmq::optimized_fd_set_t pollset_out;
1177
    zmq::optimized_fd_set_t pollset_err;
1178
    zmq::optimized_fd_set_t inset;
1179
    zmq::optimized_fd_set_t outset;
1180
    zmq::optimized_fd_set_t errset;
1181
    zmq::fd_t maxfd;
1182
};
1183
1184
zmq_poll_select_fds_t_
1185
zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc)
1186
0
{
1187
    //  Ensure we do not attempt to select () on more than FD_SETSIZE
1188
    //  file descriptors.
1189
    //  TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
1190
0
    zmq_assert (nitems_ <= FD_SETSIZE);
1191
1192
0
    zmq_poll_select_fds_t_ fds (nitems_);
1193
1194
    //  Build the fd_sets for passing to select ().
1195
0
    for (int i = 0; i != nitems_; i++) {
1196
        //  If the poll item is a 0MQ socket we are interested in input on the
1197
        //  notification file descriptor retrieved by the ZMQ_FD socket option.
1198
0
        if (items_[i].socket) {
1199
0
            size_t zmq_fd_size = sizeof (zmq::fd_t);
1200
0
            zmq::fd_t notify_fd;
1201
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
1202
0
                                &zmq_fd_size)
1203
0
                == -1) {
1204
0
                rc = -1;
1205
0
                return fds;
1206
0
            }
1207
0
            if (items_[i].events) {
1208
0
                FD_SET (notify_fd, fds.pollset_in.get ());
1209
0
                if (fds.maxfd < notify_fd)
1210
0
                    fds.maxfd = notify_fd;
1211
0
            }
1212
0
        }
1213
        //  Else, the poll item is a raw file descriptor. Convert the poll item
1214
        //  events to the appropriate fd_sets.
1215
0
        else {
1216
0
            if (items_[i].events & ZMQ_POLLIN)
1217
0
                FD_SET (items_[i].fd, fds.pollset_in.get ());
1218
0
            if (items_[i].events & ZMQ_POLLOUT)
1219
0
                FD_SET (items_[i].fd, fds.pollset_out.get ());
1220
0
            if (items_[i].events & ZMQ_POLLERR)
1221
0
                FD_SET (items_[i].fd, fds.pollset_err.get ());
1222
0
            if (fds.maxfd < items_[i].fd)
1223
0
                fds.maxfd = items_[i].fd;
1224
0
        }
1225
0
    }
1226
1227
0
    rc = 0;
1228
0
    return fds;
1229
0
}
1230
1231
timeval *zmq_poll_select_set_timeout_ (
1232
  long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout)
1233
0
{
1234
0
    timeval *ptimeout;
1235
0
    if (first_pass) {
1236
0
        timeout.tv_sec = 0;
1237
0
        timeout.tv_usec = 0;
1238
0
        ptimeout = &timeout;
1239
0
    } else if (timeout_ < 0)
1240
0
        ptimeout = NULL;
1241
0
    else {
1242
0
        timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1243
0
        timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1244
0
        ptimeout = &timeout;
1245
0
    }
1246
0
    return ptimeout;
1247
0
}
1248
1249
timespec *zmq_poll_select_set_timeout_ (
1250
  long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout)
1251
0
{
1252
0
    timespec *ptimeout;
1253
0
    if (first_pass) {
1254
0
        timeout.tv_sec = 0;
1255
0
        timeout.tv_nsec = 0;
1256
0
        ptimeout = &timeout;
1257
0
    } else if (timeout_ < 0)
1258
0
        ptimeout = NULL;
1259
0
    else {
1260
0
        timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1261
0
        timeout.tv_nsec = static_cast<long> ((end - now) % 1000 * 1000000);
1262
0
        ptimeout = &timeout;
1263
0
    }
1264
0
    return ptimeout;
1265
0
}
1266
1267
int zmq_poll_select_check_events_ (zmq_pollitem_t *items_,
1268
                                   int nitems_,
1269
                                   zmq_poll_select_fds_t_ &fds,
1270
                                   int &nevents)
1271
0
{
1272
    //  Check for the events.
1273
0
    for (int i = 0; i != nitems_; i++) {
1274
0
        items_[i].revents = 0;
1275
1276
        //  The poll item is a 0MQ socket. Retrieve pending events
1277
        //  using the ZMQ_EVENTS socket option.
1278
0
        if (items_[i].socket) {
1279
0
            size_t zmq_events_size = sizeof (uint32_t);
1280
0
            uint32_t zmq_events;
1281
0
            if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1282
0
                                &zmq_events_size)
1283
0
                == -1)
1284
0
                return -1;
1285
0
            if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
1286
0
                items_[i].revents |= ZMQ_POLLOUT;
1287
0
            if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
1288
0
                items_[i].revents |= ZMQ_POLLIN;
1289
0
        }
1290
        //  Else, the poll item is a raw file descriptor, simply convert
1291
        //  the events to zmq_pollitem_t-style format.
1292
0
        else {
1293
0
            if (FD_ISSET (items_[i].fd, fds.inset.get ()))
1294
0
                items_[i].revents |= ZMQ_POLLIN;
1295
0
            if (FD_ISSET (items_[i].fd, fds.outset.get ()))
1296
0
                items_[i].revents |= ZMQ_POLLOUT;
1297
0
            if (FD_ISSET (items_[i].fd, fds.errset.get ()))
1298
0
                items_[i].revents |= ZMQ_POLLERR;
1299
0
        }
1300
1301
0
        if (items_[i].revents)
1302
0
            nevents++;
1303
0
    }
1304
1305
0
    return 0;
1306
0
}
1307
1308
bool zmq_poll_must_break_loop_ (long timeout_,
1309
                                int nevents,
1310
                                bool &first_pass,
1311
                                zmq::clock_t &clock,
1312
                                uint64_t &now,
1313
                                uint64_t &end)
1314
0
{
1315
    //  If timeout is zero, exit immediately whether there are events or not.
1316
0
    if (timeout_ == 0)
1317
0
        return true;
1318
1319
    //  If there are events to return, we can exit immediately.
1320
0
    if (nevents)
1321
0
        return true;
1322
1323
    //  At this point we are meant to wait for events but there are none.
1324
    //  If timeout is infinite we can just loop until we get some events.
1325
0
    if (timeout_ < 0) {
1326
0
        if (first_pass)
1327
0
            first_pass = false;
1328
0
        return false;
1329
0
    }
1330
1331
    //  The timeout is finite and there are no events. In the first pass
1332
    //  we get a timestamp of when the polling have begun. (We assume that
1333
    //  first pass have taken negligible time). We also compute the time
1334
    //  when the polling should time out.
1335
0
    if (first_pass) {
1336
0
        now = clock.now_ms ();
1337
0
        end = now + timeout_;
1338
0
        if (now == end)
1339
0
            return true;
1340
0
        first_pass = false;
1341
0
        return false;
1342
0
    }
1343
1344
    //  Find out whether timeout have expired.
1345
0
    now = clock.now_ms ();
1346
0
    if (now >= end)
1347
0
        return true;
1348
1349
    // finally, in all other cases, we just continue
1350
0
    return false;
1351
0
}
1352
#endif // ZMQ_HAVE_PPOLL
1353
1354
#if !defined _WIN32
1355
int zmq_ppoll (zmq_pollitem_t *items_,
1356
               int nitems_,
1357
               long timeout_,
1358
               const sigset_t *sigmask_)
1359
#else
1360
// Windows has no sigset_t
1361
int zmq_ppoll (zmq_pollitem_t *items_,
1362
               int nitems_,
1363
               long timeout_,
1364
               const void *sigmask_)
1365
#endif
1366
0
{
1367
0
#ifdef ZMQ_HAVE_PPOLL
1368
0
    int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
1369
0
    if (rc <= 0) {
1370
0
        return rc;
1371
0
    }
1372
1373
0
    zmq::clock_t clock;
1374
0
    uint64_t now = 0;
1375
0
    uint64_t end = 0;
1376
0
    zmq_poll_select_fds_t_ fds =
1377
0
      zmq_poll_build_select_fds_ (items_, nitems_, rc);
1378
0
    if (rc == -1) {
1379
0
        return -1;
1380
0
    }
1381
1382
0
    bool first_pass = true;
1383
0
    int nevents = 0;
1384
1385
0
    while (true) {
1386
        //  Compute the timeout for the subsequent poll.
1387
0
        timespec timeout;
1388
0
        timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
1389
0
                                                           now, end, timeout);
1390
1391
        //  Wait for events. Ignore interrupts if there's infinite timeout.
1392
0
        while (true) {
1393
0
            memcpy (fds.inset.get (), fds.pollset_in.get (),
1394
0
                    zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
1395
0
            memcpy (fds.outset.get (), fds.pollset_out.get (),
1396
0
                    zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
1397
0
            memcpy (fds.errset.get (), fds.pollset_err.get (),
1398
0
                    zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
1399
0
            int rc =
1400
0
              pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
1401
0
                       fds.errset.get (), ptimeout, sigmask_);
1402
0
            if (unlikely (rc == -1)) {
1403
0
                errno_assert (errno == EINTR || errno == EBADF);
1404
0
                return -1;
1405
0
            }
1406
0
            break;
1407
0
        }
1408
1409
0
        rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
1410
0
        if (rc < 0) {
1411
0
            return rc;
1412
0
        }
1413
1414
0
        if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
1415
0
                                       now, end)) {
1416
0
            break;
1417
0
        }
1418
0
    }
1419
1420
0
    return nevents;
1421
#else
1422
    errno = ENOTSUP;
1423
    return -1;
1424
#endif // ZMQ_HAVE_PPOLL
1425
0
}
1426
1427
//  The poller functionality
1428
1429
void *zmq_poller_new (void)
1430
0
{
1431
0
    zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1432
0
    if (!poller) {
1433
0
        errno = ENOMEM;
1434
0
    }
1435
0
    return poller;
1436
0
}
1437
1438
int zmq_poller_destroy (void **poller_p_)
1439
0
{
1440
0
    if (poller_p_) {
1441
0
        const zmq::socket_poller_t *const poller =
1442
0
          static_cast<const zmq::socket_poller_t *> (*poller_p_);
1443
0
        if (poller && poller->check_tag ()) {
1444
0
            delete poller;
1445
0
            *poller_p_ = NULL;
1446
0
            return 0;
1447
0
        }
1448
0
    }
1449
0
    errno = EFAULT;
1450
0
    return -1;
1451
0
}
1452
1453
1454
static int check_poller (void *const poller_)
1455
0
{
1456
0
    if (!poller_
1457
0
        || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1458
0
        errno = EFAULT;
1459
0
        return -1;
1460
0
    }
1461
1462
0
    return 0;
1463
0
}
1464
1465
static int check_events (const short events_)
1466
0
{
1467
0
    if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
1468
0
        errno = EINVAL;
1469
0
        return -1;
1470
0
    }
1471
0
    return 0;
1472
0
}
1473
1474
static int check_poller_registration_args (void *const poller_, void *const s_)
1475
0
{
1476
0
    if (-1 == check_poller (poller_))
1477
0
        return -1;
1478
1479
0
    if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1480
0
        errno = ENOTSOCK;
1481
0
        return -1;
1482
0
    }
1483
1484
0
    return 0;
1485
0
}
1486
1487
static int check_poller_fd_registration_args (void *const poller_,
1488
                                              const zmq::fd_t fd_)
1489
0
{
1490
0
    if (-1 == check_poller (poller_))
1491
0
        return -1;
1492
1493
0
    if (fd_ == zmq::retired_fd) {
1494
0
        errno = EBADF;
1495
0
        return -1;
1496
0
    }
1497
1498
0
    return 0;
1499
0
}
1500
1501
int zmq_poller_size (void *poller_)
1502
0
{
1503
0
    if (-1 == check_poller (poller_))
1504
0
        return -1;
1505
1506
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->size ();
1507
0
}
1508
1509
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1510
0
{
1511
0
    if (-1 == check_poller_registration_args (poller_, s_)
1512
0
        || -1 == check_events (events_))
1513
0
        return -1;
1514
1515
0
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1516
1517
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1518
0
      ->add (socket, user_data_, events_);
1519
0
}
1520
1521
int zmq_poller_add_fd (void *poller_,
1522
                       zmq::fd_t fd_,
1523
                       void *user_data_,
1524
                       short events_)
1525
0
{
1526
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
1527
0
        || -1 == check_events (events_))
1528
0
        return -1;
1529
1530
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1531
0
      ->add_fd (fd_, user_data_, events_);
1532
0
}
1533
1534
1535
int zmq_poller_modify (void *poller_, void *s_, short events_)
1536
0
{
1537
0
    if (-1 == check_poller_registration_args (poller_, s_)
1538
0
        || -1 == check_events (events_))
1539
0
        return -1;
1540
1541
0
    const zmq::socket_base_t *const socket =
1542
0
      static_cast<const zmq::socket_base_t *> (s_);
1543
1544
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1545
0
      ->modify (socket, events_);
1546
0
}
1547
1548
int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1549
0
{
1550
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_)
1551
0
        || -1 == check_events (events_))
1552
0
        return -1;
1553
1554
0
    return (static_cast<zmq::socket_poller_t *> (poller_))
1555
0
      ->modify_fd (fd_, events_);
1556
0
}
1557
1558
int zmq_poller_remove (void *poller_, void *s_)
1559
0
{
1560
0
    if (-1 == check_poller_registration_args (poller_, s_))
1561
0
        return -1;
1562
1563
0
    zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1564
1565
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1566
0
}
1567
1568
int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1569
0
{
1570
0
    if (-1 == check_poller_fd_registration_args (poller_, fd_))
1571
0
        return -1;
1572
1573
0
    return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1574
0
}
1575
1576
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1577
0
{
1578
0
    const int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1579
1580
0
    if (rc < 0 && event_) {
1581
0
        event_->socket = NULL;
1582
0
        event_->fd = zmq::retired_fd;
1583
0
        event_->user_data = NULL;
1584
0
        event_->events = 0;
1585
0
    }
1586
    // wait_all returns number of events, but we return 0 for any success
1587
0
    return rc >= 0 ? 0 : rc;
1588
0
}
1589
1590
int zmq_poller_wait_all (void *poller_,
1591
                         zmq_poller_event_t *events_,
1592
                         int n_events_,
1593
                         long timeout_)
1594
0
{
1595
0
    if (-1 == check_poller (poller_))
1596
0
        return -1;
1597
1598
0
    if (!events_) {
1599
0
        errno = EFAULT;
1600
0
        return -1;
1601
0
    }
1602
0
    if (n_events_ < 0) {
1603
0
        errno = EINVAL;
1604
0
        return -1;
1605
0
    }
1606
1607
0
    const int rc =
1608
0
      (static_cast<zmq::socket_poller_t *> (poller_))
1609
0
        ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1610
0
                n_events_, timeout_);
1611
1612
0
    return rc;
1613
0
}
1614
1615
int zmq_poller_fd (void *poller_, zmq_fd_t *fd_)
1616
0
{
1617
0
    if (!poller_
1618
0
        || !(static_cast<zmq::socket_poller_t *> (poller_)->check_tag ())) {
1619
0
        errno = EFAULT;
1620
0
        return -1;
1621
0
    }
1622
0
    return static_cast<zmq::socket_poller_t *> (poller_)->signaler_fd (fd_);
1623
0
}
1624
1625
//  Peer-specific state
1626
1627
int zmq_socket_get_peer_state (void *s_,
1628
                               const void *routing_id_,
1629
                               size_t routing_id_size_)
1630
0
{
1631
0
    const zmq::socket_base_t *const s = as_socket_base_t (s_);
1632
0
    if (!s)
1633
0
        return -1;
1634
1635
0
    return s->get_peer_state (routing_id_, routing_id_size_);
1636
0
}
1637
1638
//  Timers
1639
1640
void *zmq_timers_new (void)
1641
0
{
1642
0
    zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1643
0
    alloc_assert (timers);
1644
0
    return timers;
1645
0
}
1646
1647
int zmq_timers_destroy (void **timers_p_)
1648
0
{
1649
0
    void *timers = *timers_p_;
1650
0
    if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
1651
0
        errno = EFAULT;
1652
0
        return -1;
1653
0
    }
1654
0
    delete (static_cast<zmq::timers_t *> (timers));
1655
0
    *timers_p_ = NULL;
1656
0
    return 0;
1657
0
}
1658
1659
int zmq_timers_add (void *timers_,
1660
                    size_t interval_,
1661
                    zmq_timer_fn handler_,
1662
                    void *arg_)
1663
0
{
1664
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1665
0
        errno = EFAULT;
1666
0
        return -1;
1667
0
    }
1668
1669
0
    return (static_cast<zmq::timers_t *> (timers_))
1670
0
      ->add (interval_, handler_, arg_);
1671
0
}
1672
1673
int zmq_timers_cancel (void *timers_, int timer_id_)
1674
0
{
1675
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1676
0
        errno = EFAULT;
1677
0
        return -1;
1678
0
    }
1679
1680
0
    return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
1681
0
}
1682
1683
int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1684
0
{
1685
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1686
0
        errno = EFAULT;
1687
0
        return -1;
1688
0
    }
1689
1690
0
    return (static_cast<zmq::timers_t *> (timers_))
1691
0
      ->set_interval (timer_id_, interval_);
1692
0
}
1693
1694
int zmq_timers_reset (void *timers_, int timer_id_)
1695
0
{
1696
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1697
0
        errno = EFAULT;
1698
0
        return -1;
1699
0
    }
1700
1701
0
    return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
1702
0
}
1703
1704
long zmq_timers_timeout (void *timers_)
1705
0
{
1706
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1707
0
        errno = EFAULT;
1708
0
        return -1;
1709
0
    }
1710
1711
0
    return (static_cast<zmq::timers_t *> (timers_))->timeout ();
1712
0
}
1713
1714
int zmq_timers_execute (void *timers_)
1715
0
{
1716
0
    if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1717
0
        errno = EFAULT;
1718
0
        return -1;
1719
0
    }
1720
1721
0
    return (static_cast<zmq::timers_t *> (timers_))->execute ();
1722
0
}
1723
1724
//  The proxy functionality
1725
1726
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1727
0
{
1728
0
    if (!frontend_ || !backend_) {
1729
0
        errno = EFAULT;
1730
0
        return -1;
1731
0
    }
1732
    // Runs zmq::proxy_steerable with a NULL control_.
1733
0
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1734
0
                       static_cast<zmq::socket_base_t *> (backend_),
1735
0
                       static_cast<zmq::socket_base_t *> (capture_));
1736
0
}
1737
1738
int zmq_proxy_steerable (void *frontend_,
1739
                         void *backend_,
1740
                         void *capture_,
1741
                         void *control_)
1742
0
{
1743
0
    if (!frontend_ || !backend_) {
1744
0
        errno = EFAULT;
1745
0
        return -1;
1746
0
    }
1747
0
    return zmq::proxy_steerable (static_cast<zmq::socket_base_t *> (frontend_),
1748
0
                                 static_cast<zmq::socket_base_t *> (backend_),
1749
0
                                 static_cast<zmq::socket_base_t *> (capture_),
1750
0
                                 static_cast<zmq::socket_base_t *> (control_));
1751
0
}
1752
1753
//  The deprecated device functionality
1754
1755
int zmq_device (int /* type */, void *frontend_, void *backend_)
1756
0
{
1757
0
    return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1758
0
                       static_cast<zmq::socket_base_t *> (backend_), NULL);
1759
0
}
1760
1761
//  Probe library capabilities; for now, reports on transport and security
1762
1763
int zmq_has (const char *capability_)
1764
0
{
1765
0
#if defined(ZMQ_HAVE_IPC)
1766
0
    if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1767
0
        return true;
1768
0
#endif
1769
#if defined(ZMQ_HAVE_OPENPGM)
1770
    if (strcmp (capability_, zmq::protocol_name::pgm) == 0)
1771
        return true;
1772
#endif
1773
0
#if defined(ZMQ_HAVE_TIPC)
1774
0
    if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1775
0
        return true;
1776
0
#endif
1777
#if defined(ZMQ_HAVE_NORM)
1778
    if (strcmp (capability_, zmq::protocol_name::norm) == 0)
1779
        return true;
1780
#endif
1781
0
#if defined(ZMQ_HAVE_CURVE)
1782
0
    if (strcmp (capability_, "curve") == 0)
1783
0
        return true;
1784
0
#endif
1785
#if defined(HAVE_LIBGSSAPI_KRB5)
1786
    if (strcmp (capability_, "gssapi") == 0)
1787
        return true;
1788
#endif
1789
#if defined(ZMQ_HAVE_VMCI)
1790
    if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
1791
        return true;
1792
#endif
1793
0
#if defined(ZMQ_BUILD_DRAFT_API)
1794
0
    if (strcmp (capability_, "draft") == 0)
1795
0
        return true;
1796
0
#endif
1797
0
#if defined(ZMQ_HAVE_WS)
1798
0
    if (strcmp (capability_, "WS") == 0)
1799
0
        return true;
1800
0
#endif
1801
#if defined(ZMQ_HAVE_WSS)
1802
    if (strcmp (capability_, "WSS") == 0)
1803
        return true;
1804
#endif
1805
    //  Whatever the application asked for, we don't have
1806
0
    return false;
1807
0
}
1808
1809
int zmq_socket_monitor_pipes_stats (void *s_)
1810
0
{
1811
0
    zmq::socket_base_t *s = as_socket_base_t (s_);
1812
0
    if (!s)
1813
0
        return -1;
1814
0
    return s->query_pipes_stats ();
1815
0
}