Coverage Report

Created: 2025-07-18 07:01

/src/libzmq/tests/testutil_monitoring.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
#include "testutil_monitoring.hpp"
3
#include "testutil_unity.hpp"
4
5
#include <stdlib.h>
6
#include <string.h>
7
8
static int
9
receive_monitor_address (void *monitor_, char **address_, bool expect_more_)
10
0
{
11
0
    zmq_msg_t msg;
12
13
0
    zmq_msg_init (&msg);
14
0
    if (zmq_msg_recv (&msg, monitor_, 0) == -1)
15
0
        return -1; //  Interrupted, presumably
16
0
    TEST_ASSERT_EQUAL (expect_more_, zmq_msg_more (&msg));
17
18
0
    if (address_) {
19
0
        const uint8_t *const data =
20
0
          static_cast<const uint8_t *> (zmq_msg_data (&msg));
21
0
        const size_t size = zmq_msg_size (&msg);
22
0
        *address_ = static_cast<char *> (malloc (size + 1));
23
0
        memcpy (*address_, data, size);
24
0
        (*address_)[size] = 0;
25
0
    }
26
0
    zmq_msg_close (&msg);
27
28
0
    return 0;
29
0
}
30
31
//  Read one event off the monitor socket; return value and address
32
//  by reference, if not null, and event number by value. Returns -1
33
//  in case of error.
34
static int get_monitor_event_internal (void *monitor_,
35
                                       int *value_,
36
                                       char **address_,
37
                                       int recv_flag_)
38
0
{
39
    //  First frame in message contains event number and value
40
0
    zmq_msg_t msg;
41
0
    zmq_msg_init (&msg);
42
0
    if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
43
0
        TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
44
0
        return -1; //  timed out or no message available
45
0
    }
46
0
    TEST_ASSERT_TRUE (zmq_msg_more (&msg));
47
48
0
    uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
49
0
    uint16_t event = *reinterpret_cast<uint16_t *> (data);
50
0
    if (value_)
51
0
        memcpy (value_, data + 2, sizeof (uint32_t));
52
53
    //  Second frame in message contains event address
54
0
    TEST_ASSERT_SUCCESS_ERRNO (
55
0
      receive_monitor_address (monitor_, address_, false));
56
57
0
    return event;
58
0
}
59
60
int get_monitor_event_with_timeout (void *monitor_,
61
                                    int *value_,
62
                                    char **address_,
63
                                    int timeout_)
64
0
{
65
0
    int res;
66
0
    if (timeout_ == -1) {
67
        // process infinite timeout in small steps to allow the user
68
        // to see some information on the console
69
70
0
        int timeout_step = 250;
71
0
        int wait_time = 0;
72
0
        zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
73
0
                        sizeof (timeout_step));
74
0
        while (
75
0
          (res = get_monitor_event_internal (monitor_, value_, address_, 0))
76
0
          == -1) {
77
0
            wait_time += timeout_step;
78
0
            fprintf (stderr, "Still waiting for monitor event after %i ms\n",
79
0
                     wait_time);
80
0
        }
81
0
    } else {
82
0
        zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
83
0
        res = get_monitor_event_internal (monitor_, value_, address_, 0);
84
0
    }
85
0
    int timeout_infinite = -1;
86
0
    zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
87
0
                    sizeof (timeout_infinite));
88
0
    return res;
89
0
}
90
91
int get_monitor_event (void *monitor_, int *value_, char **address_)
92
0
{
93
0
    return get_monitor_event_with_timeout (monitor_, value_, address_, -1);
94
0
}
95
96
void expect_monitor_event (void *monitor_, int expected_event_)
97
0
{
98
0
    TEST_ASSERT_EQUAL_HEX (expected_event_,
99
0
                           get_monitor_event (monitor_, NULL, NULL));
100
0
}
101
102
static void print_unexpected_event (char *buf_,
103
                                    size_t buf_size_,
104
                                    int event_,
105
                                    int err_,
106
                                    int expected_event_,
107
                                    int expected_err_)
108
0
{
109
0
    snprintf (buf_, buf_size_,
110
0
              "Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
111
0
              "= %i/0x%x)\n",
112
0
              event_, err_, err_, expected_event_, expected_err_,
113
0
              expected_err_);
114
0
}
115
116
void print_unexpected_event_stderr (int event_,
117
                                    int err_,
118
                                    int expected_event_,
119
                                    int expected_err_)
120
0
{
121
0
    char buf[256];
122
0
    print_unexpected_event (buf, sizeof buf, event_, err_, expected_event_,
123
0
                            expected_err_);
124
0
    fputs (buf, stderr);
125
0
}
126
127
int expect_monitor_event_multiple (void *server_mon_,
128
                                   int expected_event_,
129
                                   int expected_err_,
130
                                   bool optional_)
131
0
{
132
0
    int count_of_expected_events = 0;
133
0
    int client_closed_connection = 0;
134
0
    int timeout = 250;
135
0
    int wait_time = 0;
136
137
0
    int event;
138
0
    int err;
139
0
    while ((event =
140
0
              get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout))
141
0
             != -1
142
0
           || !count_of_expected_events) {
143
0
        if (event == -1) {
144
0
            if (optional_)
145
0
                break;
146
0
            wait_time += timeout;
147
0
            fprintf (stderr,
148
0
                     "Still waiting for first event after %ims (expected event "
149
0
                     "%x (value %i/0x%x))\n",
150
0
                     wait_time, expected_event_, expected_err_, expected_err_);
151
0
            continue;
152
0
        }
153
        // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
154
        // ECONNRESET can happen on very slow machines, when the engine writes
155
        // to the peer and then tries to read the socket before the peer reads
156
        // ECONNABORTED happens when a client aborts a connection via RST/timeout
157
0
        if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
158
0
            && ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET
159
0
                || err == ECONNABORTED)) {
160
0
            fprintf (stderr,
161
0
                     "Ignored event (skipping any further events): %x (err = "
162
0
                     "%i == %s)\n",
163
0
                     event, err, zmq_strerror (err));
164
0
            client_closed_connection = 1;
165
0
            break;
166
0
        }
167
0
        if (event != expected_event_
168
0
            || (-1 != expected_err_ && err != expected_err_)) {
169
0
            char buf[256];
170
0
            print_unexpected_event (buf, sizeof buf, event, err,
171
0
                                    expected_event_, expected_err_);
172
0
            TEST_FAIL_MESSAGE (buf);
173
0
        }
174
0
        ++count_of_expected_events;
175
0
    }
176
0
    TEST_ASSERT_TRUE (optional_ || count_of_expected_events > 0
177
0
                      || client_closed_connection);
178
179
0
    return count_of_expected_events;
180
0
}
181
182
static int64_t get_monitor_event_internal_v2 (void *monitor_,
183
                                              uint64_t **value_,
184
                                              char **local_address_,
185
                                              char **remote_address_,
186
                                              int recv_flag_)
187
0
{
188
    //  First frame in message contains event number
189
0
    zmq_msg_t msg;
190
0
    zmq_msg_init (&msg);
191
0
    if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
192
0
        TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
193
0
        return -1; //  timed out or no message available
194
0
    }
195
0
    TEST_ASSERT_TRUE (zmq_msg_more (&msg));
196
0
    TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
197
198
0
    uint64_t event;
199
0
    memcpy (&event, zmq_msg_data (&msg), sizeof (event));
200
0
    zmq_msg_close (&msg);
201
202
    //  Second frame in message contains the number of values
203
0
    zmq_msg_init (&msg);
204
0
    if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
205
0
        TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
206
0
        return -1; //  timed out or no message available
207
0
    }
208
0
    TEST_ASSERT_TRUE (zmq_msg_more (&msg));
209
0
    TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
210
211
0
    uint64_t value_count;
212
0
    memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
213
0
    zmq_msg_close (&msg);
214
215
0
    if (value_) {
216
0
        *value_ =
217
0
          (uint64_t *) malloc ((size_t) value_count * sizeof (uint64_t));
218
0
        TEST_ASSERT_NOT_NULL (*value_);
219
0
    }
220
221
0
    for (uint64_t i = 0; i < value_count; ++i) {
222
        //  Subsequent frames in message contain event values
223
0
        zmq_msg_init (&msg);
224
0
        if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
225
0
            TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
226
0
            return -1; //  timed out or no message available
227
0
        }
228
0
        TEST_ASSERT_TRUE (zmq_msg_more (&msg));
229
0
        TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
230
231
0
        if (value_ && *value_)
232
0
            memcpy (&(*value_)[i], zmq_msg_data (&msg), sizeof (uint64_t));
233
0
        zmq_msg_close (&msg);
234
0
    }
235
236
    //  Second-to-last frame in message contains local address
237
0
    TEST_ASSERT_SUCCESS_ERRNO (
238
0
      receive_monitor_address (monitor_, local_address_, true));
239
240
    //  Last frame in message contains remote address
241
0
    TEST_ASSERT_SUCCESS_ERRNO (
242
0
      receive_monitor_address (monitor_, remote_address_, false));
243
244
0
    return event;
245
0
}
246
247
static int64_t get_monitor_event_with_timeout_v2 (void *monitor_,
248
                                                  uint64_t **value_,
249
                                                  char **local_address_,
250
                                                  char **remote_address_,
251
                                                  int timeout_)
252
0
{
253
0
    int64_t res;
254
0
    if (timeout_ == -1) {
255
        // process infinite timeout in small steps to allow the user
256
        // to see some information on the console
257
258
0
        int timeout_step = 250;
259
0
        int wait_time = 0;
260
0
        zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
261
0
                        sizeof (timeout_step));
262
0
        while ((res = get_monitor_event_internal_v2 (
263
0
                  monitor_, value_, local_address_, remote_address_, 0))
264
0
               == -1) {
265
0
            wait_time += timeout_step;
266
0
            fprintf (stderr, "Still waiting for monitor event after %i ms\n",
267
0
                     wait_time);
268
0
        }
269
0
    } else {
270
0
        zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
271
0
        res = get_monitor_event_internal_v2 (monitor_, value_, local_address_,
272
0
                                             remote_address_, 0);
273
0
    }
274
0
    int timeout_infinite = -1;
275
0
    zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
276
0
                    sizeof (timeout_infinite));
277
0
    return res;
278
0
}
279
280
int64_t get_monitor_event_v2 (void *monitor_,
281
                              uint64_t **value_,
282
                              char **local_address_,
283
                              char **remote_address_)
284
0
{
285
0
    return get_monitor_event_with_timeout_v2 (monitor_, value_, local_address_,
286
0
                                              remote_address_, -1);
287
0
}
288
289
void expect_monitor_event_v2 (void *monitor_,
290
                              int64_t expected_event_,
291
                              const char *expected_local_address_,
292
                              const char *expected_remote_address_)
293
0
{
294
0
    char *local_address = NULL;
295
0
    char *remote_address = NULL;
296
0
    int64_t event = get_monitor_event_v2 (
297
0
      monitor_, NULL, expected_local_address_ ? &local_address : NULL,
298
0
      expected_remote_address_ ? &remote_address : NULL);
299
0
    bool failed = false;
300
0
    char buf[256];
301
0
    char *pos = buf;
302
0
    if (event != expected_event_) {
303
0
        pos += snprintf (pos, sizeof buf - (pos - buf),
304
0
                         "Expected monitor event %llx, but received %llx\n",
305
0
                         static_cast<long long> (expected_event_),
306
0
                         static_cast<long long> (event));
307
0
        failed = true;
308
0
    }
309
0
    if (expected_local_address_
310
0
        && 0 != strcmp (local_address, expected_local_address_)) {
311
0
        pos += snprintf (pos, sizeof buf - (pos - buf),
312
0
                         "Expected local address %s, but received %s\n",
313
0
                         expected_local_address_, local_address);
314
0
    }
315
0
    if (expected_remote_address_
316
0
        && 0 != strcmp (remote_address, expected_remote_address_)) {
317
0
        snprintf (pos, sizeof buf - (pos - buf),
318
0
                  "Expected remote address %s, but received %s\n",
319
0
                  expected_remote_address_, remote_address);
320
0
    }
321
0
    free (local_address);
322
0
    free (remote_address);
323
0
    TEST_ASSERT_FALSE_MESSAGE (failed, buf);
324
0
}
325
326
327
const char *get_zmqEventName (uint64_t event)
328
0
{
329
0
    switch (event) {
330
0
        case ZMQ_EVENT_CONNECTED:
331
0
            return "CONNECTED";
332
0
        case ZMQ_EVENT_CONNECT_DELAYED:
333
0
            return "CONNECT_DELAYED";
334
0
        case ZMQ_EVENT_CONNECT_RETRIED:
335
0
            return "CONNECT_RETRIED";
336
0
        case ZMQ_EVENT_LISTENING:
337
0
            return "LISTENING";
338
0
        case ZMQ_EVENT_BIND_FAILED:
339
0
            return "BIND_FAILED";
340
0
        case ZMQ_EVENT_ACCEPTED:
341
0
            return "ACCEPTED";
342
0
        case ZMQ_EVENT_ACCEPT_FAILED:
343
0
            return "ACCEPT_FAILED";
344
0
        case ZMQ_EVENT_CLOSED:
345
0
            return "CLOSED";
346
0
        case ZMQ_EVENT_CLOSE_FAILED:
347
0
            return "CLOSE_FAILED";
348
0
        case ZMQ_EVENT_DISCONNECTED:
349
0
            return "DISCONNECTED";
350
0
        case ZMQ_EVENT_MONITOR_STOPPED:
351
0
            return "MONITOR_STOPPED";
352
0
        case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
353
0
            return "HANDSHAKE_FAILED_NO_DETAIL";
354
0
        case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
355
0
            return "HANDSHAKE_SUCCEEDED";
356
0
        case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
357
0
            return "HANDSHAKE_FAILED_PROTOCOL";
358
0
        case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
359
0
            return "HANDSHAKE_FAILED_AUTH";
360
0
        default:
361
0
            return "UNKNOWN";
362
0
    }
363
0
}
364
365
void print_events (void *socket, int timeout, int limit)
366
0
{
367
    // print events received
368
0
    int value;
369
0
    char *event_address;
370
0
    int event =
371
0
      get_monitor_event_with_timeout (socket, &value, &event_address, timeout);
372
0
    int i = 0;
373
0
    ;
374
0
    while ((event != -1) && (++i < limit)) {
375
0
        const char *eventName = get_zmqEventName (event);
376
0
        printf ("Got event: %s\n", eventName);
377
0
        event = get_monitor_event_with_timeout (socket, &value, &event_address,
378
0
                                                timeout);
379
0
    }
380
0
}