/src/libzmq/src/proxy.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | |
5 | | #include <stddef.h> |
6 | | #include "poller.hpp" |
7 | | #include "proxy.hpp" |
8 | | #include "likely.hpp" |
9 | | #include "msg.hpp" |
10 | | |
11 | | #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \ |
12 | | && !defined ZMQ_HAVE_AIX |
13 | | #include <poll.h> |
14 | | #endif |
15 | | |
16 | | // These headers end up pulling in zmq.h somewhere in their include |
17 | | // dependency chain |
18 | | #include "socket_base.hpp" |
19 | | #include "err.hpp" |
20 | | |
21 | | int zmq::proxy (class socket_base_t *frontend_, |
22 | | class socket_base_t *backend_, |
23 | | class socket_base_t *capture_) |
24 | 0 | { |
25 | 0 | return zmq::proxy_steerable (frontend_, backend_, capture_, NULL); |
26 | 0 | } |
27 | | |
28 | | #ifdef ZMQ_HAVE_POLLER |
29 | | |
30 | | #include "socket_poller.hpp" |
31 | | |
32 | | // Macros for repetitive code. |
33 | | |
34 | | // PROXY_CLEANUP() must not be used before these variables are initialized. |
35 | | #define PROXY_CLEANUP() \ |
36 | 0 | do { \ |
37 | 0 | delete poller_all; \ |
38 | 0 | delete poller_in; \ |
39 | 0 | delete poller_receive_blocked; \ |
40 | 0 | delete poller_send_blocked; \ |
41 | 0 | delete poller_both_blocked; \ |
42 | 0 | delete poller_frontend_only; \ |
43 | 0 | delete poller_backend_only; \ |
44 | 0 | } while (false) |
45 | | |
46 | | |
47 | | #define CHECK_RC_EXIT_ON_FAILURE() \ |
48 | 0 | do { \ |
49 | 0 | if (rc < 0) { \ |
50 | 0 | PROXY_CLEANUP (); \ |
51 | 0 | return close_and_return (&msg, -1); \ |
52 | 0 | } \ |
53 | 0 | } while (false) |
54 | | |
55 | | #endif // ZMQ_HAVE_POLLER |
56 | | |
57 | | static int |
58 | | capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0) |
59 | 0 | { |
60 | | // Copy message to capture socket if any |
61 | 0 | if (capture_) { |
62 | 0 | zmq::msg_t ctrl; |
63 | 0 | int rc = ctrl.init (); |
64 | 0 | if (unlikely (rc < 0)) |
65 | 0 | return -1; |
66 | 0 | rc = ctrl.copy (*msg_); |
67 | 0 | if (unlikely (rc < 0)) |
68 | 0 | return -1; |
69 | 0 | rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0); |
70 | 0 | if (unlikely (rc < 0)) |
71 | 0 | return -1; |
72 | 0 | } |
73 | 0 | return 0; |
74 | 0 | } |
75 | | |
76 | | struct stats_socket |
77 | | { |
78 | | uint64_t count, bytes; |
79 | | }; |
80 | | struct stats_endpoint |
81 | | { |
82 | | stats_socket send, recv; |
83 | | }; |
84 | | struct stats_proxy |
85 | | { |
86 | | stats_endpoint frontend, backend; |
87 | | }; |
88 | | |
89 | | static int forward (class zmq::socket_base_t *from_, |
90 | | class zmq::socket_base_t *to_, |
91 | | class zmq::socket_base_t *capture_, |
92 | | zmq::msg_t *msg_, |
93 | | stats_socket &recving, |
94 | | stats_socket &sending) |
95 | 0 | { |
96 | | // Forward a burst of messages |
97 | 0 | for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { |
98 | 0 | int more; |
99 | 0 | size_t moresz; |
100 | | |
101 | | // Forward all the parts of one message |
102 | 0 | while (true) { |
103 | 0 | int rc = from_->recv (msg_, ZMQ_DONTWAIT); |
104 | 0 | if (rc < 0) { |
105 | 0 | if (likely (errno == EAGAIN && i > 0)) |
106 | 0 | return 0; // End of burst |
107 | | |
108 | 0 | return -1; |
109 | 0 | } |
110 | | |
111 | 0 | size_t nbytes = msg_->size (); |
112 | 0 | recving.count += 1; |
113 | 0 | recving.bytes += nbytes; |
114 | |
|
115 | 0 | moresz = sizeof more; |
116 | 0 | rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); |
117 | 0 | if (unlikely (rc < 0)) |
118 | 0 | return -1; |
119 | | |
120 | | // Copy message to capture socket if any |
121 | 0 | rc = capture (capture_, msg_, more); |
122 | 0 | if (unlikely (rc < 0)) |
123 | 0 | return -1; |
124 | | |
125 | 0 | rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0); |
126 | 0 | if (unlikely (rc < 0)) |
127 | 0 | return -1; |
128 | 0 | sending.count += 1; |
129 | 0 | sending.bytes += nbytes; |
130 | |
|
131 | 0 | if (more == 0) |
132 | 0 | break; |
133 | 0 | } |
134 | 0 | } |
135 | | |
136 | 0 | return 0; |
137 | 0 | } |
138 | | |
139 | | enum proxy_state_t |
140 | | { |
141 | | active, |
142 | | paused, |
143 | | terminated |
144 | | }; |
145 | | |
146 | | // Handle control request [5]PAUSE, [6]RESUME, [9]TERMINATE, |
147 | | // [10]STATISTICS. Only STATISTICS results in a send. |
148 | | static int handle_control (class zmq::socket_base_t *control_, |
149 | | proxy_state_t &state, |
150 | | const stats_proxy &stats) |
151 | 0 | { |
152 | 0 | zmq::msg_t cmsg; |
153 | 0 | int rc = cmsg.init (); |
154 | 0 | if (rc != 0) { |
155 | 0 | return -1; |
156 | 0 | } |
157 | 0 | rc = control_->recv (&cmsg, ZMQ_DONTWAIT); |
158 | 0 | if (rc < 0) { |
159 | 0 | return -1; |
160 | 0 | } |
161 | 0 | uint8_t *const command = static_cast<uint8_t *> (cmsg.data ()); |
162 | 0 | const size_t msiz = cmsg.size (); |
163 | |
|
164 | 0 | if (msiz == 10 && 0 == memcmp (command, "STATISTICS", 10)) { |
165 | | // The stats are a cross product: |
166 | | // |
167 | | // (Front,Back) X (Recv,Sent) X (Number,Bytes). |
168 | | // |
169 | | // that is flattened into sequence of 8 message parts according to the |
170 | | // zmq_proxy_steerable(3) documentation as: |
171 | | // |
172 | | // (frn, frb, fsn, fsb, brn, brb, bsn, bsb) |
173 | | // |
174 | | // f=front/b=back, r=recv/s=send, n=number/b=bytes. |
175 | 0 | const uint64_t stat_vals[8] = { |
176 | 0 | stats.frontend.recv.count, stats.frontend.recv.bytes, |
177 | 0 | stats.frontend.send.count, stats.frontend.send.bytes, |
178 | 0 | stats.backend.recv.count, stats.backend.recv.bytes, |
179 | 0 | stats.backend.send.count, stats.backend.send.bytes}; |
180 | |
|
181 | 0 | for (size_t ind = 0; ind < 8; ++ind) { |
182 | 0 | cmsg.init_size (sizeof (uint64_t)); |
183 | 0 | memcpy (cmsg.data (), stat_vals + ind, sizeof (uint64_t)); |
184 | 0 | rc = control_->send (&cmsg, ind < 7 ? ZMQ_SNDMORE : 0); |
185 | 0 | if (unlikely (rc < 0)) { |
186 | 0 | return -1; |
187 | 0 | } |
188 | 0 | } |
189 | 0 | return 0; |
190 | 0 | } |
191 | | |
192 | 0 | if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) { |
193 | 0 | state = paused; |
194 | 0 | } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { |
195 | 0 | state = active; |
196 | 0 | } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) { |
197 | 0 | state = terminated; |
198 | 0 | } |
199 | |
|
200 | 0 | int type; |
201 | 0 | size_t sz = sizeof (type); |
202 | 0 | zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz); |
203 | 0 | if (type == ZMQ_REP) { |
204 | | // satisfy REP duty and reply no matter what. |
205 | 0 | cmsg.init_size (0); |
206 | 0 | rc = control_->send (&cmsg, 0); |
207 | 0 | if (unlikely (rc < 0)) { |
208 | 0 | return -1; |
209 | 0 | } |
210 | 0 | } |
211 | 0 | return 0; |
212 | 0 | } |
213 | | |
214 | | #ifdef ZMQ_HAVE_POLLER |
215 | | int zmq::proxy_steerable (class socket_base_t *frontend_, |
216 | | class socket_base_t *backend_, |
217 | | class socket_base_t *capture_, |
218 | | class socket_base_t *control_) |
219 | 0 | { |
220 | 0 | msg_t msg; |
221 | 0 | int rc = msg.init (); |
222 | 0 | if (rc != 0) |
223 | 0 | return -1; |
224 | | |
225 | | // The algorithm below assumes ratio of requests and replies processed |
226 | | // under full load to be 1:1. |
227 | | |
228 | | // Proxy can be in these three states |
229 | 0 | proxy_state_t state = active; |
230 | |
|
231 | 0 | bool frontend_equal_to_backend; |
232 | 0 | bool frontend_in = false; |
233 | 0 | bool frontend_out = false; |
234 | 0 | bool backend_in = false; |
235 | 0 | bool backend_out = false; |
236 | 0 | zmq::socket_poller_t::event_t events[4]; |
237 | 0 | int nevents = 3; // increase to 4 if we have control_ |
238 | |
|
239 | 0 | stats_proxy stats = {{{0, 0}, {0, 0}}, {{0, 0}, {0, 0}}}; |
240 | | |
241 | | // Don't allocate these pollers from stack because they will take more than 900 kB of stack! |
242 | | // On Windows this blows up default stack of 1 MB and aborts the program. |
243 | | // I wanted to use std::shared_ptr here as the best solution but that requires C++11... |
244 | 0 | zmq::socket_poller_t *poller_all = |
245 | 0 | new (std::nothrow) zmq::socket_poller_t; // Poll for everything. |
246 | 0 | zmq::socket_poller_t *poller_in = new (std::nothrow) zmq:: |
247 | 0 | socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. |
248 | 0 | zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) |
249 | 0 | zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. |
250 | | |
251 | | // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored. |
252 | | // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'. |
253 | | // We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it. |
254 | | // We save some RAM and time for initialization. |
255 | 0 | zmq::socket_poller_t *poller_send_blocked = |
256 | 0 | NULL; // All except 'ZMQ_POLLIN' on 'backend_'. |
257 | 0 | zmq::socket_poller_t *poller_both_blocked = |
258 | 0 | NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. |
259 | 0 | zmq::socket_poller_t *poller_frontend_only = |
260 | 0 | NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. |
261 | 0 | zmq::socket_poller_t *poller_backend_only = |
262 | 0 | NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. |
263 | |
|
264 | 0 | if (frontend_ != backend_) { |
265 | 0 | poller_send_blocked = new (std::nothrow) |
266 | 0 | zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'. |
267 | 0 | poller_both_blocked = new (std::nothrow) zmq:: |
268 | 0 | socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. |
269 | 0 | poller_frontend_only = new (std::nothrow) zmq:: |
270 | 0 | socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. |
271 | 0 | poller_backend_only = new (std::nothrow) zmq:: |
272 | 0 | socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. |
273 | 0 | frontend_equal_to_backend = false; |
274 | 0 | } else |
275 | 0 | frontend_equal_to_backend = true; |
276 | |
|
277 | 0 | if (poller_all == NULL || poller_in == NULL |
278 | 0 | || poller_receive_blocked == NULL |
279 | 0 | || ((poller_send_blocked == NULL || poller_both_blocked == NULL) |
280 | 0 | && !frontend_equal_to_backend)) { |
281 | 0 | PROXY_CLEANUP (); |
282 | 0 | return close_and_return (&msg, -1); |
283 | 0 | } |
284 | | |
285 | 0 | zmq::socket_poller_t *poller_wait = |
286 | 0 | poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'. |
287 | | |
288 | | // Register 'frontend_' and 'backend_' with pollers. |
289 | 0 | rc = poller_all->add (frontend_, NULL, |
290 | 0 | ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. |
291 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
292 | 0 | rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. |
293 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
294 | | |
295 | 0 | if (frontend_equal_to_backend) { |
296 | | // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, |
297 | | // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'. |
298 | | // We also don't need 'poller_both_blocked', no need to initialize it. |
299 | 0 | rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); |
300 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
301 | 0 | } else { |
302 | 0 | rc = poller_all->add (backend_, NULL, |
303 | 0 | ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. |
304 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
305 | 0 | rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. |
306 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
307 | 0 | rc = poller_both_blocked->add ( |
308 | 0 | frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. |
309 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
310 | 0 | rc = poller_both_blocked->add ( |
311 | 0 | backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. |
312 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
313 | 0 | rc = poller_send_blocked->add ( |
314 | 0 | backend_, NULL, |
315 | 0 | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. |
316 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
317 | 0 | rc = poller_send_blocked->add ( |
318 | 0 | frontend_, NULL, |
319 | 0 | ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. |
320 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
321 | 0 | rc = poller_receive_blocked->add ( |
322 | 0 | frontend_, NULL, |
323 | 0 | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. |
324 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
325 | 0 | rc = poller_receive_blocked->add ( |
326 | 0 | backend_, NULL, |
327 | 0 | ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. |
328 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
329 | 0 | rc = |
330 | 0 | poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); |
331 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
332 | 0 | rc = |
333 | 0 | poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); |
334 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
335 | 0 | } |
336 | | |
337 | 0 | if (control_) { |
338 | 0 | ++nevents; |
339 | | |
340 | | // wherever you go, there you are. |
341 | |
|
342 | 0 | rc = poller_all->add (control_, NULL, ZMQ_POLLIN); |
343 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
344 | | |
345 | 0 | rc = poller_in->add (control_, NULL, ZMQ_POLLIN); |
346 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
347 | | |
348 | 0 | rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN); |
349 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
350 | | |
351 | 0 | rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN); |
352 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
353 | | |
354 | 0 | rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); |
355 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
356 | | |
357 | 0 | rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN); |
358 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
359 | | |
360 | 0 | rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN); |
361 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
362 | 0 | } |
363 | | |
364 | 0 | bool request_processed = false, reply_processed = false; |
365 | |
|
366 | 0 | while (state != terminated) { |
367 | | // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'. |
368 | | // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available), |
369 | | // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'. |
370 | 0 | rc = poller_wait->wait (events, nevents, -1); |
371 | 0 | if (rc < 0 && errno == EAGAIN) |
372 | 0 | rc = 0; |
373 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
374 | | |
375 | | // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking. |
376 | 0 | rc = poller_all->wait (events, nevents, 0); |
377 | 0 | if (rc < 0 && errno == EAGAIN) |
378 | 0 | rc = 0; |
379 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
380 | | |
381 | | // Process events. |
382 | 0 | for (int i = 0; i < rc; i++) { |
383 | 0 | if (control_ && events[i].socket == control_) { |
384 | 0 | rc = handle_control (control_, state, stats); |
385 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
386 | 0 | continue; |
387 | 0 | } |
388 | | |
389 | 0 | if (events[i].socket == frontend_) { |
390 | 0 | frontend_in = (events[i].events & ZMQ_POLLIN) != 0; |
391 | 0 | frontend_out = (events[i].events & ZMQ_POLLOUT) != 0; |
392 | 0 | } else |
393 | | // This 'if' needs to be after check for 'frontend_' in order never |
394 | | // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case. |
395 | 0 | if (events[i].socket == backend_) { |
396 | 0 | backend_in = (events[i].events & ZMQ_POLLIN) != 0; |
397 | 0 | backend_out = (events[i].events & ZMQ_POLLOUT) != 0; |
398 | 0 | } |
399 | 0 | } |
400 | | |
401 | 0 | if (state == active) { |
402 | | // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. |
403 | | // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. |
404 | 0 | if (frontend_in && (backend_out || frontend_equal_to_backend)) { |
405 | 0 | rc = forward (frontend_, backend_, capture_, &msg, |
406 | 0 | stats.frontend.recv, stats.backend.send); |
407 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
408 | 0 | request_processed = true; |
409 | 0 | frontend_in = backend_out = false; |
410 | 0 | } else |
411 | 0 | request_processed = false; |
412 | | |
413 | | // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'. |
414 | | // If 'frontend_' and 'backend_' are the same this is not needed because previous processing |
415 | | // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to |
416 | | // design in 'for' event processing loop. |
417 | 0 | if (backend_in && frontend_out) { |
418 | 0 | rc = forward (backend_, frontend_, capture_, &msg, |
419 | 0 | stats.backend.recv, stats.frontend.send); |
420 | 0 | CHECK_RC_EXIT_ON_FAILURE (); |
421 | 0 | reply_processed = true; |
422 | 0 | backend_in = frontend_out = false; |
423 | 0 | } else |
424 | 0 | reply_processed = false; |
425 | | |
426 | 0 | if (request_processed || reply_processed) { |
427 | | // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event. |
428 | | // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled. |
429 | 0 | if (poller_wait != poller_in) { |
430 | 0 | if (request_processed) { // 'frontend_' -> 'backend_' |
431 | 0 | if (poller_wait == poller_both_blocked) |
432 | 0 | poller_wait = poller_send_blocked; |
433 | 0 | else if (poller_wait == poller_receive_blocked |
434 | 0 | || poller_wait == poller_frontend_only) |
435 | 0 | poller_wait = poller_in; |
436 | 0 | } |
437 | 0 | if (reply_processed) { // 'backend_' -> 'frontend_' |
438 | 0 | if (poller_wait == poller_both_blocked) |
439 | 0 | poller_wait = poller_receive_blocked; |
440 | 0 | else if (poller_wait == poller_send_blocked |
441 | 0 | || poller_wait == poller_backend_only) |
442 | 0 | poller_wait = poller_in; |
443 | 0 | } |
444 | 0 | } |
445 | 0 | } else { |
446 | | // No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events. |
447 | | // That means that out queue(s) is/are full or one out queue is full and second one has no messages to process. |
448 | | // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT', |
449 | | // or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'. |
450 | 0 | if (frontend_in) { |
451 | 0 | if (frontend_out) |
452 | | // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false. |
453 | | // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'. |
454 | | // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false. |
455 | 0 | poller_wait = poller_backend_only; |
456 | 0 | else { |
457 | 0 | if (poller_wait == poller_send_blocked) |
458 | 0 | poller_wait = poller_both_blocked; |
459 | 0 | else if (poller_wait == poller_in) |
460 | 0 | poller_wait = poller_receive_blocked; |
461 | 0 | } |
462 | 0 | } |
463 | 0 | if (backend_in) { |
464 | | // Will never be reached if frontend_==backend_, 'backend_in' will |
465 | | // always be false due to design in 'for' event processing loop. |
466 | 0 | if (backend_out) |
467 | | // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false. |
468 | | // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'. |
469 | 0 | poller_wait = poller_frontend_only; |
470 | 0 | else { |
471 | 0 | if (poller_wait == poller_receive_blocked) |
472 | 0 | poller_wait = poller_both_blocked; |
473 | 0 | else if (poller_wait == poller_in) |
474 | 0 | poller_wait = poller_send_blocked; |
475 | 0 | } |
476 | 0 | } |
477 | 0 | } |
478 | 0 | } |
479 | 0 | } |
480 | 0 | PROXY_CLEANUP (); |
481 | 0 | return close_and_return (&msg, 0); |
482 | 0 | } |
483 | | |
484 | | #else // ZMQ_HAVE_POLLER |
485 | | |
486 | | int zmq::proxy_steerable (class socket_base_t *frontend_, |
487 | | class socket_base_t *backend_, |
488 | | class socket_base_t *capture_, |
489 | | class socket_base_t *control_) |
490 | | { |
491 | | msg_t msg; |
492 | | int rc = msg.init (); |
493 | | if (rc != 0) |
494 | | return -1; |
495 | | |
496 | | // The algorithm below assumes ratio of requests and replies processed |
497 | | // under full load to be 1:1. |
498 | | |
499 | | zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0}, |
500 | | {backend_, 0, ZMQ_POLLIN, 0}, |
501 | | {control_, 0, ZMQ_POLLIN, 0}}; |
502 | | const int qt_poll_items = control_ ? 3 : 2; |
503 | | |
504 | | zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0}, |
505 | | {backend_, 0, ZMQ_POLLOUT, 0}}; |
506 | | |
507 | | stats_proxy stats = {0}; |
508 | | |
509 | | // Proxy can be in these three states |
510 | | proxy_state_t state = active; |
511 | | |
512 | | while (state != terminated) { |
513 | | // Wait while there are either requests or replies to process. |
514 | | rc = zmq_poll (&items[0], qt_poll_items, -1); |
515 | | if (unlikely (rc < 0)) |
516 | | return close_and_return (&msg, -1); |
517 | | |
518 | | if (control_ && items[2].revents & ZMQ_POLLIN) { |
519 | | rc = handle_control (control_, state, stats); |
520 | | if (unlikely (rc < 0)) |
521 | | return close_and_return (&msg, -1); |
522 | | } |
523 | | |
524 | | // Get the pollout separately because when combining this with pollin it maxes the CPU |
525 | | // because pollout shall most of the time return directly. |
526 | | // POLLOUT is only checked when frontend and backend sockets are not the same. |
527 | | if (frontend_ != backend_) { |
528 | | rc = zmq_poll (&itemsout[0], 2, 0); |
529 | | if (unlikely (rc < 0)) { |
530 | | return close_and_return (&msg, -1); |
531 | | } |
532 | | } |
533 | | |
534 | | if (state == active && items[0].revents & ZMQ_POLLIN |
535 | | && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) { |
536 | | rc = forward (frontend_, backend_, capture_, &msg, |
537 | | stats.frontend.recv, stats.backend.send); |
538 | | if (unlikely (rc < 0)) |
539 | | return close_and_return (&msg, -1); |
540 | | } |
541 | | // Process a reply |
542 | | if (state == active && frontend_ != backend_ |
543 | | && items[1].revents & ZMQ_POLLIN |
544 | | && itemsout[0].revents & ZMQ_POLLOUT) { |
545 | | rc = forward (backend_, frontend_, capture_, &msg, |
546 | | stats.backend.recv, stats.frontend.send); |
547 | | if (unlikely (rc < 0)) |
548 | | return close_and_return (&msg, -1); |
549 | | } |
550 | | } |
551 | | |
552 | | return close_and_return (&msg, 0); |
553 | | } |
554 | | |
555 | | #endif // ZMQ_HAVE_POLLER |