Coverage Report

Created: 2025-10-28 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/dish.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <string.h>
5
6
#include "macros.hpp"
7
#include "dish.hpp"
8
#include "err.hpp"
9
10
zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11
0
    socket_base_t (parent_, tid_, sid_, true), _has_message (false)
12
0
{
13
0
    options.type = ZMQ_DISH;
14
15
    //  When socket is being closed down we don't want to wait till pending
16
    //  subscription commands are sent to the wire.
17
0
    options.linger.store (0);
18
19
0
    const int rc = _message.init ();
20
0
    errno_assert (rc == 0);
21
0
}
22
23
zmq::dish_t::~dish_t ()
24
0
{
25
0
    const int rc = _message.close ();
26
0
    errno_assert (rc == 0);
27
0
}
28
29
void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
30
                                bool subscribe_to_all_,
31
                                bool locally_initiated_)
32
0
{
33
0
    LIBZMQ_UNUSED (subscribe_to_all_);
34
0
    LIBZMQ_UNUSED (locally_initiated_);
35
36
0
    zmq_assert (pipe_);
37
0
    _fq.attach (pipe_);
38
0
    _dist.attach (pipe_);
39
40
    //  Send all the cached subscriptions to the new upstream peer.
41
0
    send_subscriptions (pipe_);
42
0
}
43
44
void zmq::dish_t::xread_activated (pipe_t *pipe_)
45
0
{
46
0
    _fq.activated (pipe_);
47
0
}
48
49
void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
50
0
{
51
0
    _dist.activated (pipe_);
52
0
}
53
54
void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
55
0
{
56
0
    _fq.pipe_terminated (pipe_);
57
0
    _dist.pipe_terminated (pipe_);
58
0
}
59
60
void zmq::dish_t::xhiccuped (pipe_t *pipe_)
61
0
{
62
    //  Send all the cached subscriptions to the hiccuped pipe.
63
0
    send_subscriptions (pipe_);
64
0
}
65
66
int zmq::dish_t::xjoin (const char *group_)
67
0
{
68
0
    const std::string group = std::string (group_);
69
70
0
    if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
71
0
        errno = EINVAL;
72
0
        return -1;
73
0
    }
74
75
    //  User cannot join same group twice
76
0
    if (!_subscriptions.insert (group).second) {
77
0
        errno = EINVAL;
78
0
        return -1;
79
0
    }
80
81
0
    msg_t msg;
82
0
    int rc = msg.init_join ();
83
0
    errno_assert (rc == 0);
84
85
0
    rc = msg.set_group (group_);
86
0
    errno_assert (rc == 0);
87
88
0
    int err = 0;
89
0
    rc = _dist.send_to_all (&msg);
90
0
    if (rc != 0)
91
0
        err = errno;
92
0
    const int rc2 = msg.close ();
93
0
    errno_assert (rc2 == 0);
94
0
    if (rc != 0)
95
0
        errno = err;
96
0
    return rc;
97
0
}
98
99
int zmq::dish_t::xleave (const char *group_)
100
0
{
101
0
    const std::string group = std::string (group_);
102
103
0
    if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
104
0
        errno = EINVAL;
105
0
        return -1;
106
0
    }
107
108
0
    if (0 == _subscriptions.erase (group)) {
109
0
        errno = EINVAL;
110
0
        return -1;
111
0
    }
112
113
0
    msg_t msg;
114
0
    int rc = msg.init_leave ();
115
0
    errno_assert (rc == 0);
116
117
0
    rc = msg.set_group (group_);
118
0
    errno_assert (rc == 0);
119
120
0
    int err = 0;
121
0
    rc = _dist.send_to_all (&msg);
122
0
    if (rc != 0)
123
0
        err = errno;
124
0
    const int rc2 = msg.close ();
125
0
    errno_assert (rc2 == 0);
126
0
    if (rc != 0)
127
0
        errno = err;
128
0
    return rc;
129
0
}
130
131
int zmq::dish_t::xsend (msg_t *msg_)
132
0
{
133
0
    LIBZMQ_UNUSED (msg_);
134
0
    errno = ENOTSUP;
135
0
    return -1;
136
0
}
137
138
bool zmq::dish_t::xhas_out ()
139
0
{
140
    //  Subscription can be added/removed anytime.
141
0
    return true;
142
0
}
143
144
int zmq::dish_t::xrecv (msg_t *msg_)
145
0
{
146
    //  If there's already a message prepared by a previous call to zmq_poll,
147
    //  return it straight ahead.
148
0
    if (_has_message) {
149
0
        const int rc = msg_->move (_message);
150
0
        errno_assert (rc == 0);
151
0
        _has_message = false;
152
0
        return 0;
153
0
    }
154
155
0
    return xxrecv (msg_);
156
0
}
157
158
int zmq::dish_t::xxrecv (msg_t *msg_)
159
0
{
160
0
    do {
161
        //  Get a message using fair queueing algorithm.
162
0
        const int rc = _fq.recv (msg_);
163
164
        //  If there's no message available, return immediately.
165
        //  The same when error occurs.
166
0
        if (rc != 0)
167
0
            return -1;
168
169
        //  Skip non matching messages
170
0
    } while (0 == _subscriptions.count (std::string (msg_->group ())));
171
172
    //  Found a matching message
173
0
    return 0;
174
0
}
175
176
bool zmq::dish_t::xhas_in ()
177
0
{
178
    //  If there's already a message prepared by a previous call to zmq_poll,
179
    //  return straight ahead.
180
0
    if (_has_message)
181
0
        return true;
182
183
0
    const int rc = xxrecv (&_message);
184
0
    if (rc != 0) {
185
0
        errno_assert (errno == EAGAIN);
186
0
        return false;
187
0
    }
188
189
    //  Matching message found
190
0
    _has_message = true;
191
0
    return true;
192
0
}
193
194
void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
195
0
{
196
0
    for (subscriptions_t::iterator it = _subscriptions.begin (),
197
0
                                   end = _subscriptions.end ();
198
0
         it != end; ++it) {
199
0
        msg_t msg;
200
0
        int rc = msg.init_join ();
201
0
        errno_assert (rc == 0);
202
203
0
        rc = msg.set_group (it->c_str ());
204
0
        errno_assert (rc == 0);
205
206
        //  Send it to the pipe.
207
0
        pipe_->write (&msg);
208
0
    }
209
210
0
    pipe_->flush ();
211
0
}
212
213
zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
214
                                     bool connect_,
215
                                     socket_base_t *socket_,
216
                                     const options_t &options_,
217
                                     address_t *addr_) :
218
0
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
219
0
    _state (group)
220
0
{
221
0
}
222
223
zmq::dish_session_t::~dish_session_t ()
224
0
{
225
0
}
226
227
int zmq::dish_session_t::push_msg (msg_t *msg_)
228
0
{
229
0
    if (_state == group) {
230
0
        if ((msg_->flags () & msg_t::more) != msg_t::more) {
231
0
            errno = EFAULT;
232
0
            return -1;
233
0
        }
234
235
0
        if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
236
0
            errno = EFAULT;
237
0
            return -1;
238
0
        }
239
240
0
        _group_msg = *msg_;
241
0
        _state = body;
242
243
0
        const int rc = msg_->init ();
244
0
        errno_assert (rc == 0);
245
0
        return 0;
246
0
    }
247
0
    const char *group_setting = msg_->group ();
248
0
    int rc;
249
0
    if (group_setting[0] != 0)
250
0
        goto has_group;
251
252
    //  Set the message group
253
0
    rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
254
0
                          _group_msg.size ());
255
0
    errno_assert (rc == 0);
256
257
    //  We set the group, so we don't need the group_msg anymore
258
0
    rc = _group_msg.close ();
259
0
    errno_assert (rc == 0);
260
0
has_group:
261
    //  Thread safe socket doesn't support multipart messages
262
0
    if ((msg_->flags () & msg_t::more) == msg_t::more) {
263
0
        errno = EFAULT;
264
0
        return -1;
265
0
    }
266
267
    //  Push message to dish socket
268
0
    rc = session_base_t::push_msg (msg_);
269
270
0
    if (rc == 0)
271
0
        _state = group;
272
273
0
    return rc;
274
0
}
275
276
int zmq::dish_session_t::pull_msg (msg_t *msg_)
277
0
{
278
0
    int rc = session_base_t::pull_msg (msg_);
279
280
0
    if (rc != 0)
281
0
        return rc;
282
283
0
    if (!msg_->is_join () && !msg_->is_leave ())
284
0
        return rc;
285
286
0
    const int group_length = static_cast<int> (strlen (msg_->group ()));
287
288
0
    msg_t command;
289
0
    int offset;
290
291
0
    if (msg_->is_join ()) {
292
0
        rc = command.init_size (group_length + 5);
293
0
        errno_assert (rc == 0);
294
0
        offset = 5;
295
0
        memcpy (command.data (), "\4JOIN", 5);
296
0
    } else {
297
0
        rc = command.init_size (group_length + 6);
298
0
        errno_assert (rc == 0);
299
0
        offset = 6;
300
0
        memcpy (command.data (), "\5LEAVE", 6);
301
0
    }
302
303
0
    command.set_flags (msg_t::command);
304
0
    char *command_data = static_cast<char *> (command.data ());
305
306
    //  Copy the group
307
0
    memcpy (command_data + offset, msg_->group (), group_length);
308
309
    //  Close the join message
310
0
    rc = msg_->close ();
311
0
    errno_assert (rc == 0);
312
313
0
    *msg_ = command;
314
315
0
    return 0;
316
0
}
317
318
void zmq::dish_session_t::reset ()
319
0
{
320
0
    session_base_t::reset ();
321
0
    _state = group;
322
0
}