Coverage Report

Created: 2025-07-12 06:05

/src/libzmq/src/proxy.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
5
#include <stddef.h>
6
#include "poller.hpp"
7
#include "proxy.hpp"
8
#include "likely.hpp"
9
#include "msg.hpp"
10
11
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS                \
12
  && !defined ZMQ_HAVE_AIX
13
#include <poll.h>
14
#endif
15
16
// These headers end up pulling in zmq.h somewhere in their include
17
// dependency chain
18
#include "socket_base.hpp"
19
#include "err.hpp"
20
21
int zmq::proxy (class socket_base_t *frontend_,
22
                class socket_base_t *backend_,
23
                class socket_base_t *capture_)
24
0
{
25
0
    return zmq::proxy_steerable (frontend_, backend_, capture_, NULL);
26
0
}
27
28
#ifdef ZMQ_HAVE_POLLER
29
30
#include "socket_poller.hpp"
31
32
//  Macros for repetitive code.
33
34
//  PROXY_CLEANUP() must not be used before these variables are initialized.
35
#define PROXY_CLEANUP()                                                        \
36
0
    do {                                                                       \
37
0
        delete poller_all;                                                     \
38
0
        delete poller_in;                                                      \
39
0
        delete poller_receive_blocked;                                         \
40
0
        delete poller_send_blocked;                                            \
41
0
        delete poller_both_blocked;                                            \
42
0
        delete poller_frontend_only;                                           \
43
0
        delete poller_backend_only;                                            \
44
0
    } while (false)
45
46
47
#define CHECK_RC_EXIT_ON_FAILURE()                                             \
48
0
    do {                                                                       \
49
0
        if (rc < 0) {                                                          \
50
0
            PROXY_CLEANUP ();                                                  \
51
0
            return close_and_return (&msg, -1);                                \
52
0
        }                                                                      \
53
0
    } while (false)
54
55
#endif //  ZMQ_HAVE_POLLER
56
57
static int
58
capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
59
0
{
60
    //  Copy message to capture socket if any
61
0
    if (capture_) {
62
0
        zmq::msg_t ctrl;
63
0
        int rc = ctrl.init ();
64
0
        if (unlikely (rc < 0))
65
0
            return -1;
66
0
        rc = ctrl.copy (*msg_);
67
0
        if (unlikely (rc < 0))
68
0
            return -1;
69
0
        rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0);
70
0
        if (unlikely (rc < 0))
71
0
            return -1;
72
0
    }
73
0
    return 0;
74
0
}
75
76
struct stats_socket
77
{
78
    uint64_t count, bytes;
79
};
80
struct stats_endpoint
81
{
82
    stats_socket send, recv;
83
};
84
struct stats_proxy
85
{
86
    stats_endpoint frontend, backend;
87
};
88
89
static int forward (class zmq::socket_base_t *from_,
90
                    class zmq::socket_base_t *to_,
91
                    class zmq::socket_base_t *capture_,
92
                    zmq::msg_t *msg_,
93
                    stats_socket &recving,
94
                    stats_socket &sending)
95
0
{
96
    // Forward a burst of messages
97
0
    for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
98
0
        int more;
99
0
        size_t moresz;
100
101
        // Forward all the parts of one message
102
0
        while (true) {
103
0
            int rc = from_->recv (msg_, ZMQ_DONTWAIT);
104
0
            if (rc < 0) {
105
0
                if (likely (errno == EAGAIN && i > 0))
106
0
                    return 0; // End of burst
107
108
0
                return -1;
109
0
            }
110
111
0
            size_t nbytes = msg_->size ();
112
0
            recving.count += 1;
113
0
            recving.bytes += nbytes;
114
115
0
            moresz = sizeof more;
116
0
            rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
117
0
            if (unlikely (rc < 0))
118
0
                return -1;
119
120
            //  Copy message to capture socket if any
121
0
            rc = capture (capture_, msg_, more);
122
0
            if (unlikely (rc < 0))
123
0
                return -1;
124
125
0
            rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0);
126
0
            if (unlikely (rc < 0))
127
0
                return -1;
128
0
            sending.count += 1;
129
0
            sending.bytes += nbytes;
130
131
0
            if (more == 0)
132
0
                break;
133
0
        }
134
0
    }
135
136
0
    return 0;
137
0
}
138
139
enum proxy_state_t
140
{
141
    active,
142
    paused,
143
    terminated
144
};
145
146
// Handle control request [5]PAUSE, [6]RESUME, [9]TERMINATE,
147
// [10]STATISTICS.  Only STATISTICS results in a send.
148
static int handle_control (class zmq::socket_base_t *control_,
149
                           proxy_state_t &state,
150
                           const stats_proxy &stats)
151
0
{
152
0
    zmq::msg_t cmsg;
153
0
    int rc = cmsg.init ();
154
0
    if (rc != 0) {
155
0
        return -1;
156
0
    }
157
0
    rc = control_->recv (&cmsg, ZMQ_DONTWAIT);
158
0
    if (rc < 0) {
159
0
        return -1;
160
0
    }
161
0
    uint8_t *const command = static_cast<uint8_t *> (cmsg.data ());
162
0
    const size_t msiz = cmsg.size ();
163
164
0
    if (msiz == 10 && 0 == memcmp (command, "STATISTICS", 10)) {
165
        // The stats are a cross product:
166
        //
167
        // (Front,Back) X (Recv,Sent) X (Number,Bytes).
168
        //
169
        // that is flattened into sequence of 8 message parts according to the
170
        // zmq_proxy_steerable(3) documentation as:
171
        //
172
        // (frn, frb, fsn, fsb, brn, brb, bsn, bsb)
173
        //
174
        // f=front/b=back, r=recv/s=send, n=number/b=bytes.
175
0
        const uint64_t stat_vals[8] = {
176
0
          stats.frontend.recv.count, stats.frontend.recv.bytes,
177
0
          stats.frontend.send.count, stats.frontend.send.bytes,
178
0
          stats.backend.recv.count,  stats.backend.recv.bytes,
179
0
          stats.backend.send.count,  stats.backend.send.bytes};
180
181
0
        for (size_t ind = 0; ind < 8; ++ind) {
182
0
            cmsg.init_size (sizeof (uint64_t));
183
0
            memcpy (cmsg.data (), stat_vals + ind, sizeof (uint64_t));
184
0
            rc = control_->send (&cmsg, ind < 7 ? ZMQ_SNDMORE : 0);
185
0
            if (unlikely (rc < 0)) {
186
0
                return -1;
187
0
            }
188
0
        }
189
0
        return 0;
190
0
    }
191
192
0
    if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) {
193
0
        state = paused;
194
0
    } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
195
0
        state = active;
196
0
    } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
197
0
        state = terminated;
198
0
    }
199
200
0
    int type;
201
0
    size_t sz = sizeof (type);
202
0
    zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz);
203
0
    if (type == ZMQ_REP) {
204
        // satisfy REP duty and reply no matter what.
205
0
        cmsg.init_size (0);
206
0
        rc = control_->send (&cmsg, 0);
207
0
        if (unlikely (rc < 0)) {
208
0
            return -1;
209
0
        }
210
0
    }
211
0
    return 0;
212
0
}
213
214
#ifdef ZMQ_HAVE_POLLER
215
int zmq::proxy_steerable (class socket_base_t *frontend_,
216
                          class socket_base_t *backend_,
217
                          class socket_base_t *capture_,
218
                          class socket_base_t *control_)
219
0
{
220
0
    msg_t msg;
221
0
    int rc = msg.init ();
222
0
    if (rc != 0)
223
0
        return -1;
224
225
    //  The algorithm below assumes ratio of requests and replies processed
226
    //  under full load to be 1:1.
227
228
    //  Proxy can be in these three states
229
0
    proxy_state_t state = active;
230
231
0
    bool frontend_equal_to_backend;
232
0
    bool frontend_in = false;
233
0
    bool frontend_out = false;
234
0
    bool backend_in = false;
235
0
    bool backend_out = false;
236
0
    zmq::socket_poller_t::event_t events[4];
237
0
    int nevents = 3; // increase to 4 if we have control_
238
239
0
    stats_proxy stats = {{{0, 0}, {0, 0}}, {{0, 0}, {0, 0}}};
240
241
    //  Don't allocate these pollers from stack because they will take more than 900 kB of stack!
242
    //  On Windows this blows up default stack of 1 MB and aborts the program.
243
    //  I wanted to use std::shared_ptr here as the best solution but that requires C++11...
244
0
    zmq::socket_poller_t *poller_all =
245
0
      new (std::nothrow) zmq::socket_poller_t; //  Poll for everything.
246
0
    zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
247
0
      socket_poller_t; //  Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
248
0
    zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
249
0
      zmq::socket_poller_t; //  All except 'ZMQ_POLLIN' on 'frontend_'.
250
251
    //  If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
252
    //  In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
253
    //  We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
254
    //  We save some RAM and time for initialization.
255
0
    zmq::socket_poller_t *poller_send_blocked =
256
0
      NULL; //  All except 'ZMQ_POLLIN' on 'backend_'.
257
0
    zmq::socket_poller_t *poller_both_blocked =
258
0
      NULL; //  All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
259
0
    zmq::socket_poller_t *poller_frontend_only =
260
0
      NULL; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
261
0
    zmq::socket_poller_t *poller_backend_only =
262
0
      NULL; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
263
264
0
    if (frontend_ != backend_) {
265
0
        poller_send_blocked = new (std::nothrow)
266
0
          zmq::socket_poller_t; //  All except 'ZMQ_POLLIN' on 'backend_'.
267
0
        poller_both_blocked = new (std::nothrow) zmq::
268
0
          socket_poller_t; //  All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
269
0
        poller_frontend_only = new (std::nothrow) zmq::
270
0
          socket_poller_t; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
271
0
        poller_backend_only = new (std::nothrow) zmq::
272
0
          socket_poller_t; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
273
0
        frontend_equal_to_backend = false;
274
0
    } else
275
0
        frontend_equal_to_backend = true;
276
277
0
    if (poller_all == NULL || poller_in == NULL
278
0
        || poller_receive_blocked == NULL
279
0
        || ((poller_send_blocked == NULL || poller_both_blocked == NULL)
280
0
            && !frontend_equal_to_backend)) {
281
0
        PROXY_CLEANUP ();
282
0
        return close_and_return (&msg, -1);
283
0
    }
284
285
0
    zmq::socket_poller_t *poller_wait =
286
0
      poller_in; //  Poller for blocking wait, initially all 'ZMQ_POLLIN'.
287
288
    //  Register 'frontend_' and 'backend_' with pollers.
289
0
    rc = poller_all->add (frontend_, NULL,
290
0
                          ZMQ_POLLIN | ZMQ_POLLOUT); //  Everything.
291
0
    CHECK_RC_EXIT_ON_FAILURE ();
292
0
    rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); //  All 'ZMQ_POLLIN's.
293
0
    CHECK_RC_EXIT_ON_FAILURE ();
294
295
0
    if (frontend_equal_to_backend) {
296
        //  If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same,
297
        //  so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'.
298
        //  We also don't need 'poller_both_blocked', no need to initialize it.
299
0
        rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
300
0
        CHECK_RC_EXIT_ON_FAILURE ();
301
0
    } else {
302
0
        rc = poller_all->add (backend_, NULL,
303
0
                              ZMQ_POLLIN | ZMQ_POLLOUT); //  Everything.
304
0
        CHECK_RC_EXIT_ON_FAILURE ();
305
0
        rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); //  All 'ZMQ_POLLIN's.
306
0
        CHECK_RC_EXIT_ON_FAILURE ();
307
0
        rc = poller_both_blocked->add (
308
0
          frontend_, NULL, ZMQ_POLLOUT); //  Waiting only for 'ZMQ_POLLOUT'.
309
0
        CHECK_RC_EXIT_ON_FAILURE ();
310
0
        rc = poller_both_blocked->add (
311
0
          backend_, NULL, ZMQ_POLLOUT); //  Waiting only for 'ZMQ_POLLOUT'.
312
0
        CHECK_RC_EXIT_ON_FAILURE ();
313
0
        rc = poller_send_blocked->add (
314
0
          backend_, NULL,
315
0
          ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'backend_'.
316
0
        CHECK_RC_EXIT_ON_FAILURE ();
317
0
        rc = poller_send_blocked->add (
318
0
          frontend_, NULL,
319
0
          ZMQ_POLLIN | ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'backend_'.
320
0
        CHECK_RC_EXIT_ON_FAILURE ();
321
0
        rc = poller_receive_blocked->add (
322
0
          frontend_, NULL,
323
0
          ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'frontend_'.
324
0
        CHECK_RC_EXIT_ON_FAILURE ();
325
0
        rc = poller_receive_blocked->add (
326
0
          backend_, NULL,
327
0
          ZMQ_POLLIN | ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'frontend_'.
328
0
        CHECK_RC_EXIT_ON_FAILURE ();
329
0
        rc =
330
0
          poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
331
0
        CHECK_RC_EXIT_ON_FAILURE ();
332
0
        rc =
333
0
          poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
334
0
        CHECK_RC_EXIT_ON_FAILURE ();
335
0
    }
336
337
0
    if (control_) {
338
0
        ++nevents;
339
340
        // wherever you go, there you are.
341
342
0
        rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
343
0
        CHECK_RC_EXIT_ON_FAILURE ();
344
345
0
        rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
346
0
        CHECK_RC_EXIT_ON_FAILURE ();
347
348
0
        rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
349
0
        CHECK_RC_EXIT_ON_FAILURE ();
350
351
0
        rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
352
0
        CHECK_RC_EXIT_ON_FAILURE ();
353
354
0
        rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
355
0
        CHECK_RC_EXIT_ON_FAILURE ();
356
357
0
        rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
358
0
        CHECK_RC_EXIT_ON_FAILURE ();
359
360
0
        rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
361
0
        CHECK_RC_EXIT_ON_FAILURE ();
362
0
    }
363
364
0
    bool request_processed = false, reply_processed = false;
365
366
0
    while (state != terminated) {
367
        //  Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
368
        //  If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
369
        //  'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
370
0
        rc = poller_wait->wait (events, nevents, -1);
371
0
        if (rc < 0 && errno == EAGAIN)
372
0
            rc = 0;
373
0
        CHECK_RC_EXIT_ON_FAILURE ();
374
375
        //  Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
376
0
        rc = poller_all->wait (events, nevents, 0);
377
0
        if (rc < 0 && errno == EAGAIN)
378
0
            rc = 0;
379
0
        CHECK_RC_EXIT_ON_FAILURE ();
380
381
        //  Process events.
382
0
        for (int i = 0; i < rc; i++) {
383
0
            if (control_ && events[i].socket == control_) {
384
0
                rc = handle_control (control_, state, stats);
385
0
                CHECK_RC_EXIT_ON_FAILURE ();
386
0
                continue;
387
0
            }
388
389
0
            if (events[i].socket == frontend_) {
390
0
                frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
391
0
                frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
392
0
            } else
393
                //  This 'if' needs to be after check for 'frontend_' in order never
394
                //  to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
395
0
                if (events[i].socket == backend_) {
396
0
                    backend_in = (events[i].events & ZMQ_POLLIN) != 0;
397
0
                    backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
398
0
                }
399
0
        }
400
401
0
        if (state == active) {
402
            //  Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
403
            //  In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
404
0
            if (frontend_in && (backend_out || frontend_equal_to_backend)) {
405
0
                rc = forward (frontend_, backend_, capture_, &msg,
406
0
                              stats.frontend.recv, stats.backend.send);
407
0
                CHECK_RC_EXIT_ON_FAILURE ();
408
0
                request_processed = true;
409
0
                frontend_in = backend_out = false;
410
0
            } else
411
0
                request_processed = false;
412
413
            //  Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
414
            //  If 'frontend_' and 'backend_' are the same this is not needed because previous processing
415
            //  covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
416
            //  design in 'for' event processing loop.
417
0
            if (backend_in && frontend_out) {
418
0
                rc = forward (backend_, frontend_, capture_, &msg,
419
0
                              stats.backend.recv, stats.frontend.send);
420
0
                CHECK_RC_EXIT_ON_FAILURE ();
421
0
                reply_processed = true;
422
0
                backend_in = frontend_out = false;
423
0
            } else
424
0
                reply_processed = false;
425
426
0
            if (request_processed || reply_processed) {
427
                //  If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
428
                //  Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
429
0
                if (poller_wait != poller_in) {
430
0
                    if (request_processed) { //  'frontend_' -> 'backend_'
431
0
                        if (poller_wait == poller_both_blocked)
432
0
                            poller_wait = poller_send_blocked;
433
0
                        else if (poller_wait == poller_receive_blocked
434
0
                                 || poller_wait == poller_frontend_only)
435
0
                            poller_wait = poller_in;
436
0
                    }
437
0
                    if (reply_processed) { //  'backend_' -> 'frontend_'
438
0
                        if (poller_wait == poller_both_blocked)
439
0
                            poller_wait = poller_receive_blocked;
440
0
                        else if (poller_wait == poller_send_blocked
441
0
                                 || poller_wait == poller_backend_only)
442
0
                            poller_wait = poller_in;
443
0
                    }
444
0
                }
445
0
            } else {
446
                //  No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events.
447
                //  That means that out queue(s) is/are full or one out queue is full and second one has no messages to process.
448
                //  Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT',
449
                //  or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'.
450
0
                if (frontend_in) {
451
0
                    if (frontend_out)
452
                        // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false.
453
                        // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'.
454
                        // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false.
455
0
                        poller_wait = poller_backend_only;
456
0
                    else {
457
0
                        if (poller_wait == poller_send_blocked)
458
0
                            poller_wait = poller_both_blocked;
459
0
                        else if (poller_wait == poller_in)
460
0
                            poller_wait = poller_receive_blocked;
461
0
                    }
462
0
                }
463
0
                if (backend_in) {
464
                    //  Will never be reached if frontend_==backend_, 'backend_in' will
465
                    //  always be false due to design in 'for' event processing loop.
466
0
                    if (backend_out)
467
                        // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false.
468
                        // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'.
469
0
                        poller_wait = poller_frontend_only;
470
0
                    else {
471
0
                        if (poller_wait == poller_receive_blocked)
472
0
                            poller_wait = poller_both_blocked;
473
0
                        else if (poller_wait == poller_in)
474
0
                            poller_wait = poller_send_blocked;
475
0
                    }
476
0
                }
477
0
            }
478
0
        }
479
0
    }
480
0
    PROXY_CLEANUP ();
481
0
    return close_and_return (&msg, 0);
482
0
}
483
484
#else //  ZMQ_HAVE_POLLER
485
486
int zmq::proxy_steerable (class socket_base_t *frontend_,
487
                          class socket_base_t *backend_,
488
                          class socket_base_t *capture_,
489
                          class socket_base_t *control_)
490
{
491
    msg_t msg;
492
    int rc = msg.init ();
493
    if (rc != 0)
494
        return -1;
495
496
    //  The algorithm below assumes ratio of requests and replies processed
497
    //  under full load to be 1:1.
498
499
    zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
500
                              {backend_, 0, ZMQ_POLLIN, 0},
501
                              {control_, 0, ZMQ_POLLIN, 0}};
502
    const int qt_poll_items = control_ ? 3 : 2;
503
504
    zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
505
                                 {backend_, 0, ZMQ_POLLOUT, 0}};
506
507
    stats_proxy stats = {0};
508
509
    //  Proxy can be in these three states
510
    proxy_state_t state = active;
511
512
    while (state != terminated) {
513
        //  Wait while there are either requests or replies to process.
514
        rc = zmq_poll (&items[0], qt_poll_items, -1);
515
        if (unlikely (rc < 0))
516
            return close_and_return (&msg, -1);
517
518
        if (control_ && items[2].revents & ZMQ_POLLIN) {
519
            rc = handle_control (control_, state, stats);
520
            if (unlikely (rc < 0))
521
                return close_and_return (&msg, -1);
522
        }
523
524
        //  Get the pollout separately because when combining this with pollin it maxes the CPU
525
        //  because pollout shall most of the time return directly.
526
        //  POLLOUT is only checked when frontend and backend sockets are not the same.
527
        if (frontend_ != backend_) {
528
            rc = zmq_poll (&itemsout[0], 2, 0);
529
            if (unlikely (rc < 0)) {
530
                return close_and_return (&msg, -1);
531
            }
532
        }
533
534
        if (state == active && items[0].revents & ZMQ_POLLIN
535
            && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
536
            rc = forward (frontend_, backend_, capture_, &msg,
537
                          stats.frontend.recv, stats.backend.send);
538
            if (unlikely (rc < 0))
539
                return close_and_return (&msg, -1);
540
        }
541
        //  Process a reply
542
        if (state == active && frontend_ != backend_
543
            && items[1].revents & ZMQ_POLLIN
544
            && itemsout[0].revents & ZMQ_POLLOUT) {
545
            rc = forward (backend_, frontend_, capture_, &msg,
546
                          stats.backend.recv, stats.frontend.send);
547
            if (unlikely (rc < 0))
548
                return close_and_return (&msg, -1);
549
        }
550
    }
551
552
    return close_and_return (&msg, 0);
553
}
554
555
#endif //  ZMQ_HAVE_POLLER