Coverage Report

Created: 2025-07-11 06:23

/src/libzmq/src/radio.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <string.h>
5
6
#include "radio.hpp"
7
#include "macros.hpp"
8
#include "pipe.hpp"
9
#include "err.hpp"
10
#include "msg.hpp"
11
12
zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13
0
    socket_base_t (parent_, tid_, sid_, true), _lossy (true)
14
0
{
15
0
    options.type = ZMQ_RADIO;
16
0
}
17
18
zmq::radio_t::~radio_t ()
19
0
{
20
0
}
21
22
void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
23
                                 bool subscribe_to_all_,
24
                                 bool locally_initiated_)
25
0
{
26
0
    LIBZMQ_UNUSED (subscribe_to_all_);
27
0
    LIBZMQ_UNUSED (locally_initiated_);
28
29
0
    zmq_assert (pipe_);
30
31
    //  Don't delay pipe termination as there is no one
32
    //  to receive the delimiter.
33
0
    pipe_->set_nodelay ();
34
35
0
    _dist.attach (pipe_);
36
37
0
    if (subscribe_to_all_)
38
0
        _udp_pipes.push_back (pipe_);
39
    //  The pipe is active when attached. Let's read the subscriptions from
40
    //  it, if any.
41
0
    else
42
0
        xread_activated (pipe_);
43
0
}
44
45
void zmq::radio_t::xread_activated (pipe_t *pipe_)
46
0
{
47
    //  There are some subscriptions waiting. Let's process them.
48
0
    msg_t msg;
49
0
    while (pipe_->read (&msg)) {
50
        //  Apply the subscription to the trie
51
0
        if (msg.is_join () || msg.is_leave ()) {
52
0
            std::string group = std::string (msg.group ());
53
54
0
            if (msg.is_join ())
55
0
                _subscriptions.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (group),
56
0
                                                          pipe_);
57
0
            else {
58
0
                std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
59
0
                  range = _subscriptions.equal_range (group);
60
61
0
                for (subscriptions_t::iterator it = range.first;
62
0
                     it != range.second; ++it) {
63
0
                    if (it->second == pipe_) {
64
0
                        _subscriptions.erase (it);
65
0
                        break;
66
0
                    }
67
0
                }
68
0
            }
69
0
        }
70
0
        msg.close ();
71
0
    }
72
0
}
73
74
void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
75
0
{
76
0
    _dist.activated (pipe_);
77
0
}
78
int zmq::radio_t::xsetsockopt (int option_,
79
                               const void *optval_,
80
                               size_t optvallen_)
81
0
{
82
0
    if (optvallen_ != sizeof (int) || *static_cast<const int *> (optval_) < 0) {
83
0
        errno = EINVAL;
84
0
        return -1;
85
0
    }
86
0
    if (option_ == ZMQ_XPUB_NODROP)
87
0
        _lossy = (*static_cast<const int *> (optval_) == 0);
88
0
    else {
89
0
        errno = EINVAL;
90
0
        return -1;
91
0
    }
92
0
    return 0;
93
0
}
94
95
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
96
0
{
97
0
    for (subscriptions_t::iterator it = _subscriptions.begin (),
98
0
                                   end = _subscriptions.end ();
99
0
         it != end;) {
100
0
        if (it->second == pipe_) {
101
0
#if __cplusplus >= 201103L || (defined _MSC_VER && _MSC_VER >= 1700)
102
0
            it = _subscriptions.erase (it);
103
#else
104
            _subscriptions.erase (it++);
105
#endif
106
0
        } else {
107
0
            ++it;
108
0
        }
109
0
    }
110
111
0
    {
112
0
        const udp_pipes_t::iterator end = _udp_pipes.end ();
113
0
        const udp_pipes_t::iterator it =
114
0
          std::find (_udp_pipes.begin (), end, pipe_);
115
0
        if (it != end)
116
0
            _udp_pipes.erase (it);
117
0
    }
118
119
0
    _dist.pipe_terminated (pipe_);
120
0
}
121
122
int zmq::radio_t::xsend (msg_t *msg_)
123
0
{
124
    //  Radio sockets do not allow multipart data (ZMQ_SNDMORE)
125
0
    if (msg_->flags () & msg_t::more) {
126
0
        errno = EINVAL;
127
0
        return -1;
128
0
    }
129
130
0
    _dist.unmatch ();
131
132
0
    const std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
133
0
      range = _subscriptions.equal_range (std::string (msg_->group ()));
134
135
0
    for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
136
0
        _dist.match (it->second);
137
138
0
    for (udp_pipes_t::iterator it = _udp_pipes.begin (),
139
0
                               end = _udp_pipes.end ();
140
0
         it != end; ++it)
141
0
        _dist.match (*it);
142
143
0
    int rc = -1;
144
0
    if (_lossy || _dist.check_hwm ()) {
145
0
        if (_dist.send_to_matching (msg_) == 0) {
146
0
            rc = 0; //  Yay, sent successfully
147
0
        }
148
0
    } else
149
0
        errno = EAGAIN;
150
151
0
    return rc;
152
0
}
153
154
bool zmq::radio_t::xhas_out ()
155
0
{
156
0
    return _dist.has_out ();
157
0
}
158
159
int zmq::radio_t::xrecv (msg_t *msg_)
160
0
{
161
    //  Messages cannot be received from PUB socket.
162
0
    LIBZMQ_UNUSED (msg_);
163
0
    errno = ENOTSUP;
164
0
    return -1;
165
0
}
166
167
bool zmq::radio_t::xhas_in ()
168
0
{
169
0
    return false;
170
0
}
171
172
zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_,
173
                                       bool connect_,
174
                                       socket_base_t *socket_,
175
                                       const options_t &options_,
176
                                       address_t *addr_) :
177
0
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
178
0
    _state (group)
179
0
{
180
0
}
181
182
zmq::radio_session_t::~radio_session_t ()
183
0
{
184
0
}
185
186
int zmq::radio_session_t::push_msg (msg_t *msg_)
187
0
{
188
0
    if (msg_->flags () & msg_t::command) {
189
0
        char *command_data = static_cast<char *> (msg_->data ());
190
0
        const size_t data_size = msg_->size ();
191
192
0
        int group_length;
193
0
        const char *group;
194
195
0
        msg_t join_leave_msg;
196
0
        int rc;
197
198
        //  Set the msg type to either JOIN or LEAVE
199
0
        if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
200
0
            group_length = static_cast<int> (data_size) - 5;
201
0
            group = command_data + 5;
202
0
            rc = join_leave_msg.init_join ();
203
0
        } else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
204
0
            group_length = static_cast<int> (data_size) - 6;
205
0
            group = command_data + 6;
206
0
            rc = join_leave_msg.init_leave ();
207
0
        }
208
        //  If it is not a JOIN or LEAVE just push the message
209
0
        else
210
0
            return session_base_t::push_msg (msg_);
211
212
0
        errno_assert (rc == 0);
213
214
        //  Set the group
215
0
        rc = join_leave_msg.set_group (group, group_length);
216
0
        errno_assert (rc == 0);
217
218
        //  Close the current command
219
0
        rc = msg_->close ();
220
0
        errno_assert (rc == 0);
221
222
        //  Push the join or leave command
223
0
        *msg_ = join_leave_msg;
224
0
        return session_base_t::push_msg (msg_);
225
0
    }
226
0
    return session_base_t::push_msg (msg_);
227
0
}
228
229
int zmq::radio_session_t::pull_msg (msg_t *msg_)
230
0
{
231
0
    if (_state == group) {
232
0
        int rc = session_base_t::pull_msg (&_pending_msg);
233
0
        if (rc != 0)
234
0
            return rc;
235
236
0
        const char *group = _pending_msg.group ();
237
0
        const int length = static_cast<int> (strlen (group));
238
239
        //  First frame is the group
240
0
        rc = msg_->init_size (length);
241
0
        errno_assert (rc == 0);
242
0
        msg_->set_flags (msg_t::more);
243
0
        memcpy (msg_->data (), group, length);
244
245
        //  Next status is the body
246
0
        _state = body;
247
0
        return 0;
248
0
    }
249
0
    *msg_ = _pending_msg;
250
0
    _state = group;
251
0
    return 0;
252
0
}
253
254
void zmq::radio_session_t::reset ()
255
0
{
256
0
    session_base_t::reset ();
257
0
    _state = group;
258
0
}