/src/libzmq/src/zmtp_engine.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | |
6 | | #include <limits.h> |
7 | | #include <string.h> |
8 | | |
9 | | #ifndef ZMQ_HAVE_WINDOWS |
10 | | #include <unistd.h> |
11 | | #endif |
12 | | |
13 | | #include <new> |
14 | | #include <sstream> |
15 | | |
16 | | #include "zmtp_engine.hpp" |
17 | | #include "io_thread.hpp" |
18 | | #include "session_base.hpp" |
19 | | #include "v1_encoder.hpp" |
20 | | #include "v1_decoder.hpp" |
21 | | #include "v2_encoder.hpp" |
22 | | #include "v2_decoder.hpp" |
23 | | #include "v3_1_encoder.hpp" |
24 | | #include "null_mechanism.hpp" |
25 | | #include "plain_client.hpp" |
26 | | #include "plain_server.hpp" |
27 | | #include "gssapi_client.hpp" |
28 | | #include "gssapi_server.hpp" |
29 | | #include "curve_client.hpp" |
30 | | #include "curve_server.hpp" |
31 | | #include "raw_decoder.hpp" |
32 | | #include "raw_encoder.hpp" |
33 | | #include "config.hpp" |
34 | | #include "err.hpp" |
35 | | #include "ip.hpp" |
36 | | #include "likely.hpp" |
37 | | #include "wire.hpp" |
38 | | |
39 | | zmq::zmtp_engine_t::zmtp_engine_t ( |
40 | | fd_t fd_, |
41 | | const options_t &options_, |
42 | | const endpoint_uri_pair_t &endpoint_uri_pair_) : |
43 | 0 | stream_engine_base_t (fd_, options_, endpoint_uri_pair_, true), |
44 | 0 | _greeting_size (v2_greeting_size), |
45 | 0 | _greeting_bytes_read (0), |
46 | 0 | _subscription_required (false), |
47 | 0 | _heartbeat_timeout (0) |
48 | 0 | { |
49 | 0 | _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( |
50 | 0 | &zmtp_engine_t::routing_id_msg); |
51 | 0 | _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( |
52 | 0 | &zmtp_engine_t::process_routing_id_msg); |
53 | |
|
54 | 0 | int rc = _pong_msg.init (); |
55 | 0 | errno_assert (rc == 0); |
56 | |
|
57 | 0 | rc = _routing_id_msg.init (); |
58 | 0 | errno_assert (rc == 0); |
59 | |
|
60 | 0 | if (_options.heartbeat_interval > 0) { |
61 | 0 | _heartbeat_timeout = _options.heartbeat_timeout; |
62 | 0 | if (_heartbeat_timeout == -1) |
63 | 0 | _heartbeat_timeout = _options.heartbeat_interval; |
64 | 0 | } |
65 | 0 | } |
66 | | |
67 | | zmq::zmtp_engine_t::~zmtp_engine_t () |
68 | 0 | { |
69 | 0 | const int rc = _routing_id_msg.close (); |
70 | 0 | errno_assert (rc == 0); |
71 | 0 | } |
72 | | |
73 | | void zmq::zmtp_engine_t::plug_internal () |
74 | 0 | { |
75 | | // start optional timer, to prevent handshake hanging on no input |
76 | 0 | set_handshake_timer (); |
77 | | |
78 | | // Send the 'length' and 'flags' fields of the routing id message. |
79 | | // The 'length' field is encoded in the long format. |
80 | 0 | _outpos = _greeting_send; |
81 | 0 | _outpos[_outsize++] = UCHAR_MAX; |
82 | 0 | put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1); |
83 | 0 | _outsize += 8; |
84 | 0 | _outpos[_outsize++] = 0x7f; |
85 | |
|
86 | 0 | set_pollin (); |
87 | 0 | set_pollout (); |
88 | | // Flush all the data that may have been already received downstream. |
89 | 0 | in_event (); |
90 | 0 | } |
91 | | |
92 | | // Position of the revision and minor fields in the greeting. |
93 | | const size_t revision_pos = 10; |
94 | | const size_t minor_pos = 11; |
95 | | |
96 | | bool zmq::zmtp_engine_t::handshake () |
97 | 0 | { |
98 | 0 | zmq_assert (_greeting_bytes_read < _greeting_size); |
99 | | // Receive the greeting. |
100 | 0 | const int rc = receive_greeting (); |
101 | 0 | if (rc == -1) |
102 | 0 | return false; |
103 | 0 | const bool unversioned = rc != 0; |
104 | |
|
105 | 0 | if (!(this |
106 | 0 | ->*select_handshake_fun (unversioned, _greeting_recv[revision_pos], |
107 | 0 | _greeting_recv[minor_pos])) ()) |
108 | 0 | return false; |
109 | | |
110 | | // Start polling for output if necessary. |
111 | 0 | if (_outsize == 0) |
112 | 0 | set_pollout (); |
113 | |
|
114 | 0 | return true; |
115 | 0 | } |
116 | | |
117 | | int zmq::zmtp_engine_t::receive_greeting () |
118 | 0 | { |
119 | 0 | bool unversioned = false; |
120 | 0 | while (_greeting_bytes_read < _greeting_size) { |
121 | 0 | const int n = read (_greeting_recv + _greeting_bytes_read, |
122 | 0 | _greeting_size - _greeting_bytes_read); |
123 | 0 | if (n == -1) { |
124 | 0 | if (errno != EAGAIN) |
125 | 0 | error (connection_error); |
126 | 0 | return -1; |
127 | 0 | } |
128 | | |
129 | 0 | _greeting_bytes_read += n; |
130 | | |
131 | | // We have received at least one byte from the peer. |
132 | | // If the first byte is not 0xff, we know that the |
133 | | // peer is using unversioned protocol. |
134 | 0 | if (_greeting_recv[0] != 0xff) { |
135 | 0 | unversioned = true; |
136 | 0 | break; |
137 | 0 | } |
138 | | |
139 | 0 | if (_greeting_bytes_read < signature_size) |
140 | 0 | continue; |
141 | | |
142 | | // Inspect the right-most bit of the 10th byte (which coincides |
143 | | // with the 'flags' field if a regular message was sent). |
144 | | // Zero indicates this is a header of a routing id message |
145 | | // (i.e. the peer is using the unversioned protocol). |
146 | 0 | if (!(_greeting_recv[9] & 0x01)) { |
147 | 0 | unversioned = true; |
148 | 0 | break; |
149 | 0 | } |
150 | | |
151 | | // The peer is using versioned protocol. |
152 | 0 | receive_greeting_versioned (); |
153 | 0 | } |
154 | 0 | return unversioned ? 1 : 0; |
155 | 0 | } |
156 | | |
157 | | void zmq::zmtp_engine_t::receive_greeting_versioned () |
158 | 0 | { |
159 | | // Send the major version number. |
160 | 0 | if (_outpos + _outsize == _greeting_send + signature_size) { |
161 | 0 | if (_outsize == 0) |
162 | 0 | set_pollout (); |
163 | 0 | _outpos[_outsize++] = 3; // Major version number |
164 | 0 | } |
165 | |
|
166 | 0 | if (_greeting_bytes_read > signature_size) { |
167 | 0 | if (_outpos + _outsize == _greeting_send + signature_size + 1) { |
168 | 0 | if (_outsize == 0) |
169 | 0 | set_pollout (); |
170 | | |
171 | | // Use ZMTP/2.0 to talk to older peers. |
172 | 0 | if (_greeting_recv[revision_pos] == ZMTP_1_0 |
173 | 0 | || _greeting_recv[revision_pos] == ZMTP_2_0) |
174 | 0 | _outpos[_outsize++] = _options.type; |
175 | 0 | else { |
176 | 0 | _outpos[_outsize++] = 1; // Minor version number |
177 | 0 | memset (_outpos + _outsize, 0, 20); |
178 | |
|
179 | 0 | zmq_assert (_options.mechanism == ZMQ_NULL |
180 | 0 | || _options.mechanism == ZMQ_PLAIN |
181 | 0 | || _options.mechanism == ZMQ_CURVE |
182 | 0 | || _options.mechanism == ZMQ_GSSAPI); |
183 | |
|
184 | 0 | if (_options.mechanism == ZMQ_NULL) |
185 | 0 | memcpy (_outpos + _outsize, "NULL", 4); |
186 | 0 | else if (_options.mechanism == ZMQ_PLAIN) |
187 | 0 | memcpy (_outpos + _outsize, "PLAIN", 5); |
188 | 0 | else if (_options.mechanism == ZMQ_GSSAPI) |
189 | 0 | memcpy (_outpos + _outsize, "GSSAPI", 6); |
190 | 0 | else if (_options.mechanism == ZMQ_CURVE) |
191 | 0 | memcpy (_outpos + _outsize, "CURVE", 5); |
192 | 0 | _outsize += 20; |
193 | 0 | memset (_outpos + _outsize, 0, 32); |
194 | 0 | _outsize += 32; |
195 | 0 | _greeting_size = v3_greeting_size; |
196 | 0 | } |
197 | 0 | } |
198 | 0 | } |
199 | 0 | } |
200 | | |
201 | | zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun ( |
202 | | bool unversioned_, unsigned char revision_, unsigned char minor_) |
203 | 0 | { |
204 | | // Is the peer using ZMTP/1.0 with no revision number? |
205 | 0 | if (unversioned_) { |
206 | 0 | return &zmtp_engine_t::handshake_v1_0_unversioned; |
207 | 0 | } |
208 | 0 | switch (revision_) { |
209 | 0 | case ZMTP_1_0: |
210 | 0 | return &zmtp_engine_t::handshake_v1_0; |
211 | 0 | case ZMTP_2_0: |
212 | 0 | return &zmtp_engine_t::handshake_v2_0; |
213 | 0 | case ZMTP_3_x: |
214 | 0 | switch (minor_) { |
215 | 0 | case 0: |
216 | 0 | return &zmtp_engine_t::handshake_v3_0; |
217 | 0 | default: |
218 | 0 | return &zmtp_engine_t::handshake_v3_1; |
219 | 0 | } |
220 | 0 | default: |
221 | 0 | return &zmtp_engine_t::handshake_v3_1; |
222 | 0 | } |
223 | 0 | } |
224 | | |
225 | | bool zmq::zmtp_engine_t::handshake_v1_0_unversioned () |
226 | 0 | { |
227 | | // We send and receive rest of routing id message |
228 | 0 | if (session ()->zap_enabled ()) { |
229 | | // reject ZMTP 1.0 connections if ZAP is enabled |
230 | 0 | error (protocol_error); |
231 | 0 | return false; |
232 | 0 | } |
233 | | |
234 | 0 | _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); |
235 | 0 | alloc_assert (_encoder); |
236 | |
|
237 | 0 | _decoder = new (std::nothrow) |
238 | 0 | v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); |
239 | 0 | alloc_assert (_decoder); |
240 | | |
241 | | // We have already sent the message header. |
242 | | // Since there is no way to tell the encoder to |
243 | | // skip the message header, we simply throw that |
244 | | // header data away. |
245 | 0 | const size_t header_size = |
246 | 0 | _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2; |
247 | 0 | unsigned char tmp[10], *bufferp = tmp; |
248 | | |
249 | | // Prepare the routing id message and load it into encoder. |
250 | | // Then consume bytes we have already sent to the peer. |
251 | 0 | int rc = _routing_id_msg.close (); |
252 | 0 | zmq_assert (rc == 0); |
253 | 0 | rc = _routing_id_msg.init_size (_options.routing_id_size); |
254 | 0 | zmq_assert (rc == 0); |
255 | 0 | memcpy (_routing_id_msg.data (), _options.routing_id, |
256 | 0 | _options.routing_id_size); |
257 | 0 | _encoder->load_msg (&_routing_id_msg); |
258 | 0 | const size_t buffer_size = _encoder->encode (&bufferp, header_size); |
259 | 0 | zmq_assert (buffer_size == header_size); |
260 | | |
261 | | // Make sure the decoder sees the data we have already received. |
262 | 0 | _inpos = _greeting_recv; |
263 | 0 | _insize = _greeting_bytes_read; |
264 | | |
265 | | // To allow for interoperability with peers that do not forward |
266 | | // their subscriptions, we inject a phantom subscription message |
267 | | // message into the incoming message stream. |
268 | 0 | if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB) |
269 | 0 | _subscription_required = true; |
270 | | |
271 | | // We are sending our routing id now and the next message |
272 | | // will come from the socket. |
273 | 0 | _next_msg = &zmtp_engine_t::pull_msg_from_session; |
274 | | |
275 | | // We are expecting routing id message. |
276 | 0 | _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( |
277 | 0 | &zmtp_engine_t::process_routing_id_msg); |
278 | |
|
279 | 0 | return true; |
280 | 0 | } |
281 | | |
282 | | bool zmq::zmtp_engine_t::handshake_v1_0 () |
283 | 0 | { |
284 | 0 | if (session ()->zap_enabled ()) { |
285 | | // reject ZMTP 1.0 connections if ZAP is enabled |
286 | 0 | error (protocol_error); |
287 | 0 | return false; |
288 | 0 | } |
289 | | |
290 | 0 | _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); |
291 | 0 | alloc_assert (_encoder); |
292 | |
|
293 | 0 | _decoder = new (std::nothrow) |
294 | 0 | v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); |
295 | 0 | alloc_assert (_decoder); |
296 | |
|
297 | 0 | return true; |
298 | 0 | } |
299 | | |
300 | | bool zmq::zmtp_engine_t::handshake_v2_0 () |
301 | 0 | { |
302 | 0 | if (session ()->zap_enabled ()) { |
303 | | // reject ZMTP 2.0 connections if ZAP is enabled |
304 | 0 | error (protocol_error); |
305 | 0 | return false; |
306 | 0 | } |
307 | | |
308 | 0 | _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); |
309 | 0 | alloc_assert (_encoder); |
310 | |
|
311 | 0 | _decoder = new (std::nothrow) v2_decoder_t ( |
312 | 0 | _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); |
313 | 0 | alloc_assert (_decoder); |
314 | |
|
315 | 0 | return true; |
316 | 0 | } |
317 | | |
318 | | bool zmq::zmtp_engine_t::handshake_v3_x (const bool downgrade_sub_) |
319 | 0 | { |
320 | 0 | if (_options.mechanism == ZMQ_NULL |
321 | 0 | && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", |
322 | 0 | 20) |
323 | 0 | == 0) { |
324 | 0 | _mechanism = new (std::nothrow) |
325 | 0 | null_mechanism_t (session (), _peer_address, _options); |
326 | 0 | alloc_assert (_mechanism); |
327 | 0 | } else if (_options.mechanism == ZMQ_PLAIN |
328 | 0 | && memcmp (_greeting_recv + 12, |
329 | 0 | "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) |
330 | 0 | == 0) { |
331 | 0 | if (_options.as_server) |
332 | 0 | _mechanism = new (std::nothrow) |
333 | 0 | plain_server_t (session (), _peer_address, _options); |
334 | 0 | else |
335 | 0 | _mechanism = |
336 | 0 | new (std::nothrow) plain_client_t (session (), _options); |
337 | 0 | alloc_assert (_mechanism); |
338 | 0 | } |
339 | 0 | #ifdef ZMQ_HAVE_CURVE |
340 | 0 | else if (_options.mechanism == ZMQ_CURVE |
341 | 0 | && memcmp (_greeting_recv + 12, |
342 | 0 | "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) |
343 | 0 | == 0) { |
344 | 0 | if (_options.as_server) |
345 | 0 | _mechanism = new (std::nothrow) curve_server_t ( |
346 | 0 | session (), _peer_address, _options, downgrade_sub_); |
347 | 0 | else |
348 | 0 | _mechanism = new (std::nothrow) |
349 | 0 | curve_client_t (session (), _options, downgrade_sub_); |
350 | 0 | alloc_assert (_mechanism); |
351 | 0 | } |
352 | 0 | #endif |
353 | | #ifdef HAVE_LIBGSSAPI_KRB5 |
354 | | else if (_options.mechanism == ZMQ_GSSAPI |
355 | | && memcmp (_greeting_recv + 12, |
356 | | "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) |
357 | | == 0) { |
358 | | if (_options.as_server) |
359 | | _mechanism = new (std::nothrow) |
360 | | gssapi_server_t (session (), _peer_address, _options); |
361 | | else |
362 | | _mechanism = |
363 | | new (std::nothrow) gssapi_client_t (session (), _options); |
364 | | alloc_assert (_mechanism); |
365 | | } |
366 | | #endif |
367 | 0 | else { |
368 | 0 | socket ()->event_handshake_failed_protocol ( |
369 | 0 | session ()->get_endpoint (), |
370 | 0 | ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH); |
371 | 0 | error (protocol_error); |
372 | 0 | return false; |
373 | 0 | } |
374 | | #ifndef ZMQ_HAVE_CURVE |
375 | | LIBZMQ_UNUSED (downgrade_sub_); |
376 | | #endif |
377 | 0 | _next_msg = &zmtp_engine_t::next_handshake_command; |
378 | 0 | _process_msg = &zmtp_engine_t::process_handshake_command; |
379 | |
|
380 | 0 | return true; |
381 | 0 | } |
382 | | |
383 | | bool zmq::zmtp_engine_t::handshake_v3_0 () |
384 | 0 | { |
385 | 0 | _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); |
386 | 0 | alloc_assert (_encoder); |
387 | |
|
388 | 0 | _decoder = new (std::nothrow) v2_decoder_t ( |
389 | 0 | _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); |
390 | 0 | alloc_assert (_decoder); |
391 | |
|
392 | 0 | return zmq::zmtp_engine_t::handshake_v3_x (true); |
393 | 0 | } |
394 | | |
395 | | bool zmq::zmtp_engine_t::handshake_v3_1 () |
396 | 0 | { |
397 | 0 | _encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size); |
398 | 0 | alloc_assert (_encoder); |
399 | |
|
400 | 0 | _decoder = new (std::nothrow) v2_decoder_t ( |
401 | 0 | _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); |
402 | 0 | alloc_assert (_decoder); |
403 | |
|
404 | 0 | return zmq::zmtp_engine_t::handshake_v3_x (false); |
405 | 0 | } |
406 | | |
407 | | int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_) |
408 | 0 | { |
409 | 0 | const int rc = msg_->init_size (_options.routing_id_size); |
410 | 0 | errno_assert (rc == 0); |
411 | 0 | if (_options.routing_id_size > 0) |
412 | 0 | memcpy (msg_->data (), _options.routing_id, _options.routing_id_size); |
413 | 0 | _next_msg = &zmtp_engine_t::pull_msg_from_session; |
414 | 0 | return 0; |
415 | 0 | } |
416 | | |
417 | | int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_) |
418 | 0 | { |
419 | 0 | if (_options.recv_routing_id) { |
420 | 0 | msg_->set_flags (msg_t::routing_id); |
421 | 0 | const int rc = session ()->push_msg (msg_); |
422 | 0 | errno_assert (rc == 0); |
423 | 0 | } else { |
424 | 0 | int rc = msg_->close (); |
425 | 0 | errno_assert (rc == 0); |
426 | 0 | rc = msg_->init (); |
427 | 0 | errno_assert (rc == 0); |
428 | 0 | } |
429 | |
|
430 | 0 | if (_subscription_required) { |
431 | 0 | msg_t subscription; |
432 | | |
433 | | // Inject the subscription message, so that also |
434 | | // ZMQ 2.x peers receive published messages. |
435 | 0 | int rc = subscription.init_size (1); |
436 | 0 | errno_assert (rc == 0); |
437 | 0 | *static_cast<unsigned char *> (subscription.data ()) = 1; |
438 | 0 | rc = session ()->push_msg (&subscription); |
439 | 0 | errno_assert (rc == 0); |
440 | 0 | } |
441 | |
|
442 | 0 | _process_msg = &zmtp_engine_t::push_msg_to_session; |
443 | |
|
444 | 0 | return 0; |
445 | 0 | } |
446 | | |
447 | | int zmq::zmtp_engine_t::produce_ping_message (msg_t *msg_) |
448 | 0 | { |
449 | | // 16-bit TTL + \4PING == 7 |
450 | 0 | const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; |
451 | 0 | zmq_assert (_mechanism != NULL); |
452 | |
|
453 | 0 | int rc = msg_->init_size (ping_ttl_len); |
454 | 0 | errno_assert (rc == 0); |
455 | 0 | msg_->set_flags (msg_t::command); |
456 | | // Copy in the command message |
457 | 0 | memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size); |
458 | |
|
459 | 0 | uint16_t ttl_val = htons (_options.heartbeat_ttl); |
460 | 0 | memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size, |
461 | 0 | &ttl_val, sizeof (ttl_val)); |
462 | |
|
463 | 0 | rc = _mechanism->encode (msg_); |
464 | 0 | _next_msg = &zmtp_engine_t::pull_and_encode; |
465 | 0 | if (!_has_timeout_timer && _heartbeat_timeout > 0) { |
466 | 0 | add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id); |
467 | 0 | _has_timeout_timer = true; |
468 | 0 | } |
469 | 0 | return rc; |
470 | 0 | } |
471 | | |
472 | | int zmq::zmtp_engine_t::produce_pong_message (msg_t *msg_) |
473 | 0 | { |
474 | 0 | zmq_assert (_mechanism != NULL); |
475 | |
|
476 | 0 | int rc = msg_->move (_pong_msg); |
477 | 0 | errno_assert (rc == 0); |
478 | |
|
479 | 0 | rc = _mechanism->encode (msg_); |
480 | 0 | _next_msg = &zmtp_engine_t::pull_and_encode; |
481 | 0 | return rc; |
482 | 0 | } |
483 | | |
484 | | int zmq::zmtp_engine_t::process_heartbeat_message (msg_t *msg_) |
485 | 0 | { |
486 | 0 | if (msg_->is_ping ()) { |
487 | | // 16-bit TTL + \4PING == 7 |
488 | 0 | const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; |
489 | 0 | const size_t ping_max_ctx_len = 16; |
490 | 0 | uint16_t remote_heartbeat_ttl; |
491 | | |
492 | | // Get the remote heartbeat TTL to setup the timer |
493 | 0 | memcpy (&remote_heartbeat_ttl, |
494 | 0 | static_cast<uint8_t *> (msg_->data ()) |
495 | 0 | + msg_t::ping_cmd_name_size, |
496 | 0 | ping_ttl_len - msg_t::ping_cmd_name_size); |
497 | 0 | remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl); |
498 | | // The remote heartbeat is in 10ths of a second |
499 | | // so we multiply it by 100 to get the timer interval in ms. |
500 | 0 | remote_heartbeat_ttl *= 100; |
501 | |
|
502 | 0 | if (!_has_ttl_timer && remote_heartbeat_ttl > 0) { |
503 | 0 | add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id); |
504 | 0 | _has_ttl_timer = true; |
505 | 0 | } |
506 | | |
507 | | // As per ZMTP 3.1 the PING command might contain an up to 16 bytes |
508 | | // context which needs to be PONGed back, so build the pong message |
509 | | // here and store it. Truncate it if it's too long. |
510 | | // Given the engine goes straight to out_event, sequential PINGs will |
511 | | // not be a problem. |
512 | 0 | const size_t context_len = |
513 | 0 | std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len); |
514 | 0 | const int rc = |
515 | 0 | _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); |
516 | 0 | errno_assert (rc == 0); |
517 | 0 | _pong_msg.set_flags (msg_t::command); |
518 | 0 | memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size); |
519 | 0 | if (context_len > 0) |
520 | 0 | memcpy (static_cast<uint8_t *> (_pong_msg.data ()) |
521 | 0 | + msg_t::ping_cmd_name_size, |
522 | 0 | static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len, |
523 | 0 | context_len); |
524 | |
|
525 | 0 | _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( |
526 | 0 | &zmtp_engine_t::produce_pong_message); |
527 | 0 | out_event (); |
528 | 0 | } |
529 | |
|
530 | 0 | return 0; |
531 | 0 | } |
532 | | |
533 | | int zmq::zmtp_engine_t::process_command_message (msg_t *msg_) |
534 | 0 | { |
535 | 0 | const uint8_t cmd_name_size = |
536 | 0 | *(static_cast<const uint8_t *> (msg_->data ())); |
537 | 0 | const size_t ping_name_size = msg_t::ping_cmd_name_size - 1; |
538 | 0 | const size_t sub_name_size = msg_t::sub_cmd_name_size - 1; |
539 | 0 | const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1; |
540 | | // Malformed command |
541 | 0 | if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size))) |
542 | 0 | return -1; |
543 | | |
544 | 0 | const uint8_t *const cmd_name = |
545 | 0 | static_cast<const uint8_t *> (msg_->data ()) + 1; |
546 | 0 | if (cmd_name_size == ping_name_size |
547 | 0 | && memcmp (cmd_name, "PING", cmd_name_size) == 0) |
548 | 0 | msg_->set_flags (zmq::msg_t::ping); |
549 | 0 | if (cmd_name_size == ping_name_size |
550 | 0 | && memcmp (cmd_name, "PONG", cmd_name_size) == 0) |
551 | 0 | msg_->set_flags (zmq::msg_t::pong); |
552 | 0 | if (cmd_name_size == sub_name_size |
553 | 0 | && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0) |
554 | 0 | msg_->set_flags (zmq::msg_t::subscribe); |
555 | 0 | if (cmd_name_size == cancel_name_size |
556 | 0 | && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0) |
557 | 0 | msg_->set_flags (zmq::msg_t::cancel); |
558 | |
|
559 | 0 | if (msg_->is_ping () || msg_->is_pong ()) |
560 | 0 | return process_heartbeat_message (msg_); |
561 | | |
562 | 0 | return 0; |
563 | 0 | } |