/src/libzmq/tests/testutil_monitoring.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | #include "testutil_monitoring.hpp" |
3 | | #include "testutil_unity.hpp" |
4 | | |
5 | | #include <stdlib.h> |
6 | | #include <string.h> |
7 | | |
8 | | static int |
9 | | receive_monitor_address (void *monitor_, char **address_, bool expect_more_) |
10 | 0 | { |
11 | 0 | zmq_msg_t msg; |
12 | |
|
13 | 0 | zmq_msg_init (&msg); |
14 | 0 | if (zmq_msg_recv (&msg, monitor_, 0) == -1) |
15 | 0 | return -1; // Interrupted, presumably |
16 | 0 | TEST_ASSERT_EQUAL (expect_more_, zmq_msg_more (&msg)); |
17 | |
|
18 | 0 | if (address_) { |
19 | 0 | const uint8_t *const data = |
20 | 0 | static_cast<const uint8_t *> (zmq_msg_data (&msg)); |
21 | 0 | const size_t size = zmq_msg_size (&msg); |
22 | 0 | *address_ = static_cast<char *> (malloc (size + 1)); |
23 | 0 | memcpy (*address_, data, size); |
24 | 0 | (*address_)[size] = 0; |
25 | 0 | } |
26 | 0 | zmq_msg_close (&msg); |
27 | |
|
28 | 0 | return 0; |
29 | 0 | } |
30 | | |
31 | | // Read one event off the monitor socket; return value and address |
32 | | // by reference, if not null, and event number by value. Returns -1 |
33 | | // in case of error. |
34 | | static int get_monitor_event_internal (void *monitor_, |
35 | | int *value_, |
36 | | char **address_, |
37 | | int recv_flag_) |
38 | 0 | { |
39 | | // First frame in message contains event number and value |
40 | 0 | zmq_msg_t msg; |
41 | 0 | zmq_msg_init (&msg); |
42 | 0 | if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { |
43 | 0 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1); |
44 | 0 | return -1; // timed out or no message available |
45 | 0 | } |
46 | 0 | TEST_ASSERT_TRUE (zmq_msg_more (&msg)); |
47 | |
|
48 | 0 | uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg)); |
49 | 0 | uint16_t event = *reinterpret_cast<uint16_t *> (data); |
50 | 0 | if (value_) |
51 | 0 | memcpy (value_, data + 2, sizeof (uint32_t)); |
52 | | |
53 | | // Second frame in message contains event address |
54 | 0 | TEST_ASSERT_SUCCESS_ERRNO ( |
55 | 0 | receive_monitor_address (monitor_, address_, false)); |
56 | |
|
57 | 0 | return event; |
58 | 0 | } |
59 | | |
60 | | int get_monitor_event_with_timeout (void *monitor_, |
61 | | int *value_, |
62 | | char **address_, |
63 | | int timeout_) |
64 | 0 | { |
65 | 0 | int res; |
66 | 0 | if (timeout_ == -1) { |
67 | | // process infinite timeout in small steps to allow the user |
68 | | // to see some information on the console |
69 | |
|
70 | 0 | int timeout_step = 250; |
71 | 0 | int wait_time = 0; |
72 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step, |
73 | 0 | sizeof (timeout_step)); |
74 | 0 | while ( |
75 | 0 | (res = get_monitor_event_internal (monitor_, value_, address_, 0)) |
76 | 0 | == -1) { |
77 | 0 | wait_time += timeout_step; |
78 | 0 | fprintf (stderr, "Still waiting for monitor event after %i ms\n", |
79 | 0 | wait_time); |
80 | 0 | } |
81 | 0 | } else { |
82 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_)); |
83 | 0 | res = get_monitor_event_internal (monitor_, value_, address_, 0); |
84 | 0 | } |
85 | 0 | int timeout_infinite = -1; |
86 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite, |
87 | 0 | sizeof (timeout_infinite)); |
88 | 0 | return res; |
89 | 0 | } |
90 | | |
91 | | int get_monitor_event (void *monitor_, int *value_, char **address_) |
92 | 0 | { |
93 | 0 | return get_monitor_event_with_timeout (monitor_, value_, address_, -1); |
94 | 0 | } |
95 | | |
96 | | void expect_monitor_event (void *monitor_, int expected_event_) |
97 | 0 | { |
98 | 0 | TEST_ASSERT_EQUAL_HEX (expected_event_, |
99 | 0 | get_monitor_event (monitor_, NULL, NULL)); |
100 | 0 | } |
101 | | |
102 | | static void print_unexpected_event (char *buf_, |
103 | | size_t buf_size_, |
104 | | int event_, |
105 | | int err_, |
106 | | int expected_event_, |
107 | | int expected_err_) |
108 | 0 | { |
109 | 0 | snprintf (buf_, buf_size_, |
110 | 0 | "Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value " |
111 | 0 | "= %i/0x%x)\n", |
112 | 0 | event_, err_, err_, expected_event_, expected_err_, |
113 | 0 | expected_err_); |
114 | 0 | } |
115 | | |
116 | | void print_unexpected_event_stderr (int event_, |
117 | | int err_, |
118 | | int expected_event_, |
119 | | int expected_err_) |
120 | 0 | { |
121 | 0 | char buf[256]; |
122 | 0 | print_unexpected_event (buf, sizeof buf, event_, err_, expected_event_, |
123 | 0 | expected_err_); |
124 | 0 | fputs (buf, stderr); |
125 | 0 | } |
126 | | |
127 | | int expect_monitor_event_multiple (void *server_mon_, |
128 | | int expected_event_, |
129 | | int expected_err_, |
130 | | bool optional_) |
131 | 0 | { |
132 | 0 | int count_of_expected_events = 0; |
133 | 0 | int client_closed_connection = 0; |
134 | 0 | int timeout = 250; |
135 | 0 | int wait_time = 0; |
136 | |
|
137 | 0 | int event; |
138 | 0 | int err; |
139 | 0 | while ((event = |
140 | 0 | get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout)) |
141 | 0 | != -1 |
142 | 0 | || !count_of_expected_events) { |
143 | 0 | if (event == -1) { |
144 | 0 | if (optional_) |
145 | 0 | break; |
146 | 0 | wait_time += timeout; |
147 | 0 | fprintf (stderr, |
148 | 0 | "Still waiting for first event after %ims (expected event " |
149 | 0 | "%x (value %i/0x%x))\n", |
150 | 0 | wait_time, expected_event_, expected_err_, expected_err_); |
151 | 0 | continue; |
152 | 0 | } |
153 | | // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen |
154 | | // ECONNRESET can happen on very slow machines, when the engine writes |
155 | | // to the peer and then tries to read the socket before the peer reads |
156 | | // ECONNABORTED happens when a client aborts a connection via RST/timeout |
157 | 0 | if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL |
158 | 0 | && ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET |
159 | 0 | || err == ECONNABORTED)) { |
160 | 0 | fprintf (stderr, |
161 | 0 | "Ignored event (skipping any further events): %x (err = " |
162 | 0 | "%i == %s)\n", |
163 | 0 | event, err, zmq_strerror (err)); |
164 | 0 | client_closed_connection = 1; |
165 | 0 | break; |
166 | 0 | } |
167 | 0 | if (event != expected_event_ |
168 | 0 | || (-1 != expected_err_ && err != expected_err_)) { |
169 | 0 | char buf[256]; |
170 | 0 | print_unexpected_event (buf, sizeof buf, event, err, |
171 | 0 | expected_event_, expected_err_); |
172 | 0 | TEST_FAIL_MESSAGE (buf); |
173 | 0 | } |
174 | 0 | ++count_of_expected_events; |
175 | 0 | } |
176 | 0 | TEST_ASSERT_TRUE (optional_ || count_of_expected_events > 0 |
177 | 0 | || client_closed_connection); |
178 | |
|
179 | 0 | return count_of_expected_events; |
180 | 0 | } |
181 | | |
182 | | static int64_t get_monitor_event_internal_v2 (void *monitor_, |
183 | | uint64_t **value_, |
184 | | char **local_address_, |
185 | | char **remote_address_, |
186 | | int recv_flag_) |
187 | 0 | { |
188 | | // First frame in message contains event number |
189 | 0 | zmq_msg_t msg; |
190 | 0 | zmq_msg_init (&msg); |
191 | 0 | if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { |
192 | 0 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1); |
193 | 0 | return -1; // timed out or no message available |
194 | 0 | } |
195 | 0 | TEST_ASSERT_TRUE (zmq_msg_more (&msg)); |
196 | 0 | TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg)); |
197 | |
|
198 | 0 | uint64_t event; |
199 | 0 | memcpy (&event, zmq_msg_data (&msg), sizeof (event)); |
200 | 0 | zmq_msg_close (&msg); |
201 | | |
202 | | // Second frame in message contains the number of values |
203 | 0 | zmq_msg_init (&msg); |
204 | 0 | if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { |
205 | 0 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1); |
206 | 0 | return -1; // timed out or no message available |
207 | 0 | } |
208 | 0 | TEST_ASSERT_TRUE (zmq_msg_more (&msg)); |
209 | 0 | TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg)); |
210 | |
|
211 | 0 | uint64_t value_count; |
212 | 0 | memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count)); |
213 | 0 | zmq_msg_close (&msg); |
214 | |
|
215 | 0 | if (value_) { |
216 | 0 | *value_ = |
217 | 0 | (uint64_t *) malloc ((size_t) value_count * sizeof (uint64_t)); |
218 | 0 | TEST_ASSERT_NOT_NULL (*value_); |
219 | 0 | } |
220 | |
|
221 | 0 | for (uint64_t i = 0; i < value_count; ++i) { |
222 | | // Subsequent frames in message contain event values |
223 | 0 | zmq_msg_init (&msg); |
224 | 0 | if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { |
225 | 0 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1); |
226 | 0 | return -1; // timed out or no message available |
227 | 0 | } |
228 | 0 | TEST_ASSERT_TRUE (zmq_msg_more (&msg)); |
229 | 0 | TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg)); |
230 | |
|
231 | 0 | if (value_ && *value_) |
232 | 0 | memcpy (&(*value_)[i], zmq_msg_data (&msg), sizeof (uint64_t)); |
233 | 0 | zmq_msg_close (&msg); |
234 | 0 | } |
235 | | |
236 | | // Second-to-last frame in message contains local address |
237 | 0 | TEST_ASSERT_SUCCESS_ERRNO ( |
238 | 0 | receive_monitor_address (monitor_, local_address_, true)); |
239 | | |
240 | | // Last frame in message contains remote address |
241 | 0 | TEST_ASSERT_SUCCESS_ERRNO ( |
242 | 0 | receive_monitor_address (monitor_, remote_address_, false)); |
243 | |
|
244 | 0 | return event; |
245 | 0 | } |
246 | | |
247 | | static int64_t get_monitor_event_with_timeout_v2 (void *monitor_, |
248 | | uint64_t **value_, |
249 | | char **local_address_, |
250 | | char **remote_address_, |
251 | | int timeout_) |
252 | 0 | { |
253 | 0 | int64_t res; |
254 | 0 | if (timeout_ == -1) { |
255 | | // process infinite timeout in small steps to allow the user |
256 | | // to see some information on the console |
257 | |
|
258 | 0 | int timeout_step = 250; |
259 | 0 | int wait_time = 0; |
260 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step, |
261 | 0 | sizeof (timeout_step)); |
262 | 0 | while ((res = get_monitor_event_internal_v2 ( |
263 | 0 | monitor_, value_, local_address_, remote_address_, 0)) |
264 | 0 | == -1) { |
265 | 0 | wait_time += timeout_step; |
266 | 0 | fprintf (stderr, "Still waiting for monitor event after %i ms\n", |
267 | 0 | wait_time); |
268 | 0 | } |
269 | 0 | } else { |
270 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_)); |
271 | 0 | res = get_monitor_event_internal_v2 (monitor_, value_, local_address_, |
272 | 0 | remote_address_, 0); |
273 | 0 | } |
274 | 0 | int timeout_infinite = -1; |
275 | 0 | zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite, |
276 | 0 | sizeof (timeout_infinite)); |
277 | 0 | return res; |
278 | 0 | } |
279 | | |
280 | | int64_t get_monitor_event_v2 (void *monitor_, |
281 | | uint64_t **value_, |
282 | | char **local_address_, |
283 | | char **remote_address_) |
284 | 0 | { |
285 | 0 | return get_monitor_event_with_timeout_v2 (monitor_, value_, local_address_, |
286 | 0 | remote_address_, -1); |
287 | 0 | } |
288 | | |
289 | | void expect_monitor_event_v2 (void *monitor_, |
290 | | int64_t expected_event_, |
291 | | const char *expected_local_address_, |
292 | | const char *expected_remote_address_) |
293 | 0 | { |
294 | 0 | char *local_address = NULL; |
295 | 0 | char *remote_address = NULL; |
296 | 0 | int64_t event = get_monitor_event_v2 ( |
297 | 0 | monitor_, NULL, expected_local_address_ ? &local_address : NULL, |
298 | 0 | expected_remote_address_ ? &remote_address : NULL); |
299 | 0 | bool failed = false; |
300 | 0 | char buf[256]; |
301 | 0 | char *pos = buf; |
302 | 0 | if (event != expected_event_) { |
303 | 0 | pos += snprintf (pos, sizeof buf - (pos - buf), |
304 | 0 | "Expected monitor event %llx, but received %llx\n", |
305 | 0 | static_cast<long long> (expected_event_), |
306 | 0 | static_cast<long long> (event)); |
307 | 0 | failed = true; |
308 | 0 | } |
309 | 0 | if (expected_local_address_ |
310 | 0 | && 0 != strcmp (local_address, expected_local_address_)) { |
311 | 0 | pos += snprintf (pos, sizeof buf - (pos - buf), |
312 | 0 | "Expected local address %s, but received %s\n", |
313 | 0 | expected_local_address_, local_address); |
314 | 0 | } |
315 | 0 | if (expected_remote_address_ |
316 | 0 | && 0 != strcmp (remote_address, expected_remote_address_)) { |
317 | 0 | snprintf (pos, sizeof buf - (pos - buf), |
318 | 0 | "Expected remote address %s, but received %s\n", |
319 | 0 | expected_remote_address_, remote_address); |
320 | 0 | } |
321 | 0 | free (local_address); |
322 | 0 | free (remote_address); |
323 | 0 | TEST_ASSERT_FALSE_MESSAGE (failed, buf); |
324 | 0 | } |
325 | | |
326 | | |
327 | | const char *get_zmqEventName (uint64_t event) |
328 | 0 | { |
329 | 0 | switch (event) { |
330 | 0 | case ZMQ_EVENT_CONNECTED: |
331 | 0 | return "CONNECTED"; |
332 | 0 | case ZMQ_EVENT_CONNECT_DELAYED: |
333 | 0 | return "CONNECT_DELAYED"; |
334 | 0 | case ZMQ_EVENT_CONNECT_RETRIED: |
335 | 0 | return "CONNECT_RETRIED"; |
336 | 0 | case ZMQ_EVENT_LISTENING: |
337 | 0 | return "LISTENING"; |
338 | 0 | case ZMQ_EVENT_BIND_FAILED: |
339 | 0 | return "BIND_FAILED"; |
340 | 0 | case ZMQ_EVENT_ACCEPTED: |
341 | 0 | return "ACCEPTED"; |
342 | 0 | case ZMQ_EVENT_ACCEPT_FAILED: |
343 | 0 | return "ACCEPT_FAILED"; |
344 | 0 | case ZMQ_EVENT_CLOSED: |
345 | 0 | return "CLOSED"; |
346 | 0 | case ZMQ_EVENT_CLOSE_FAILED: |
347 | 0 | return "CLOSE_FAILED"; |
348 | 0 | case ZMQ_EVENT_DISCONNECTED: |
349 | 0 | return "DISCONNECTED"; |
350 | 0 | case ZMQ_EVENT_MONITOR_STOPPED: |
351 | 0 | return "MONITOR_STOPPED"; |
352 | 0 | case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: |
353 | 0 | return "HANDSHAKE_FAILED_NO_DETAIL"; |
354 | 0 | case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: |
355 | 0 | return "HANDSHAKE_SUCCEEDED"; |
356 | 0 | case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: |
357 | 0 | return "HANDSHAKE_FAILED_PROTOCOL"; |
358 | 0 | case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: |
359 | 0 | return "HANDSHAKE_FAILED_AUTH"; |
360 | 0 | default: |
361 | 0 | return "UNKNOWN"; |
362 | 0 | } |
363 | 0 | } |
364 | | |
365 | | void print_events (void *socket, int timeout, int limit) |
366 | 0 | { |
367 | | // print events received |
368 | 0 | int value; |
369 | 0 | char *event_address; |
370 | 0 | int event = |
371 | 0 | get_monitor_event_with_timeout (socket, &value, &event_address, timeout); |
372 | 0 | int i = 0; |
373 | 0 | ; |
374 | 0 | while ((event != -1) && (++i < limit)) { |
375 | 0 | const char *eventName = get_zmqEventName (event); |
376 | 0 | printf ("Got event: %s\n", eventName); |
377 | 0 | event = get_monitor_event_with_timeout (socket, &value, &event_address, |
378 | 0 | timeout); |
379 | 0 | } |
380 | 0 | } |