Coverage Report

Created: 2025-10-28 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/xsub.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <string.h>
5
6
#include "macros.hpp"
7
#include "xsub.hpp"
8
#include "err.hpp"
9
10
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11
0
    socket_base_t (parent_, tid_, sid_),
12
0
    _verbose_unsubs (false),
13
0
    _has_message (false),
14
0
    _more_send (false),
15
0
    _more_recv (false),
16
0
    _process_subscribe (false),
17
0
    _only_first_subscribe (false)
18
0
{
19
0
    options.type = ZMQ_XSUB;
20
21
    //  When socket is being closed down we don't want to wait till pending
22
    //  subscription commands are sent to the wire.
23
0
    options.linger.store (0);
24
25
0
    const int rc = _message.init ();
26
0
    errno_assert (rc == 0);
27
0
}
28
29
zmq::xsub_t::~xsub_t ()
30
0
{
31
0
    const int rc = _message.close ();
32
0
    errno_assert (rc == 0);
33
0
}
34
35
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_,
36
                                bool subscribe_to_all_,
37
                                bool locally_initiated_)
38
0
{
39
0
    LIBZMQ_UNUSED (subscribe_to_all_);
40
0
    LIBZMQ_UNUSED (locally_initiated_);
41
42
0
    zmq_assert (pipe_);
43
0
    _fq.attach (pipe_);
44
0
    _dist.attach (pipe_);
45
46
    //  Send all the cached subscriptions to the new upstream peer.
47
0
    _subscriptions.apply (send_subscription, pipe_);
48
0
    pipe_->flush ();
49
0
}
50
51
void zmq::xsub_t::xread_activated (pipe_t *pipe_)
52
0
{
53
0
    _fq.activated (pipe_);
54
0
}
55
56
void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
57
0
{
58
0
    _dist.activated (pipe_);
59
0
}
60
61
void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
62
0
{
63
0
    _fq.pipe_terminated (pipe_);
64
0
    _dist.pipe_terminated (pipe_);
65
0
}
66
67
void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
68
0
{
69
    //  Send all the cached subscriptions to the hiccuped pipe.
70
0
    _subscriptions.apply (send_subscription, pipe_);
71
0
    pipe_->flush ();
72
0
}
73
74
int zmq::xsub_t::xsetsockopt (int option_,
75
                              const void *optval_,
76
                              size_t optvallen_)
77
0
{
78
0
    if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
79
0
        if (optvallen_ != sizeof (int)
80
0
            || *static_cast<const int *> (optval_) < 0) {
81
0
            errno = EINVAL;
82
0
            return -1;
83
0
        }
84
0
        _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
85
0
        return 0;
86
0
    }
87
0
#ifdef ZMQ_BUILD_DRAFT_API
88
0
    else if (option_ == ZMQ_XSUB_VERBOSE_UNSUBSCRIBE) {
89
0
        _verbose_unsubs = (*static_cast<const int *> (optval_) != 0);
90
0
        return 0;
91
0
    }
92
0
#endif
93
0
    errno = EINVAL;
94
0
    return -1;
95
0
}
96
97
int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
98
0
{
99
0
    if (option_ == ZMQ_TOPICS_COUNT) {
100
        // make sure to use a multi-thread safe function to avoid race conditions with I/O threads
101
        // where subscriptions are processed:
102
0
#ifdef ZMQ_USE_RADIX_TREE
103
0
        uint64_t num_subscriptions = _subscriptions.size ();
104
#else
105
        uint64_t num_subscriptions = _subscriptions.num_prefixes ();
106
#endif
107
108
0
        return do_getsockopt<int> (optval_, optvallen_,
109
0
                                   (int) num_subscriptions);
110
0
    }
111
112
    // room for future options here
113
114
0
    errno = EINVAL;
115
0
    return -1;
116
0
}
117
118
int zmq::xsub_t::xsend (msg_t *msg_)
119
0
{
120
0
    size_t size = msg_->size ();
121
0
    unsigned char *data = static_cast<unsigned char *> (msg_->data ());
122
123
0
    const bool first_part = !_more_send;
124
0
    _more_send = (msg_->flags () & msg_t::more) != 0;
125
126
0
    if (first_part) {
127
0
        _process_subscribe = !_only_first_subscribe;
128
0
    } else if (!_process_subscribe) {
129
        //  User message sent upstream to XPUB socket
130
0
        return _dist.send_to_all (msg_);
131
0
    }
132
133
0
    if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
134
        //  Process subscribe message
135
        //  This used to filter out duplicate subscriptions,
136
        //  however this is already done on the XPUB side and
137
        //  doing it here as well breaks ZMQ_XPUB_VERBOSE
138
        //  when there are forwarding devices involved.
139
0
        if (!msg_->is_subscribe ()) {
140
0
            data = data + 1;
141
0
            size = size - 1;
142
0
        }
143
0
        _subscriptions.add (data, size);
144
0
        _process_subscribe = true;
145
0
        return _dist.send_to_all (msg_);
146
0
    }
147
0
    if (msg_->is_cancel () || (size > 0 && *data == 0)) {
148
        //  Process unsubscribe message
149
0
        if (!msg_->is_cancel ()) {
150
0
            data = data + 1;
151
0
            size = size - 1;
152
0
        }
153
0
        _process_subscribe = true;
154
0
        const bool rm_result = _subscriptions.rm (data, size);
155
0
        if (rm_result || _verbose_unsubs)
156
0
            return _dist.send_to_all (msg_);
157
0
    } else
158
        //  User message sent upstream to XPUB socket
159
0
        return _dist.send_to_all (msg_);
160
161
0
    int rc = msg_->close ();
162
0
    errno_assert (rc == 0);
163
0
    rc = msg_->init ();
164
0
    errno_assert (rc == 0);
165
166
0
    return 0;
167
0
}
168
169
bool zmq::xsub_t::xhas_out ()
170
0
{
171
    //  Subscription can be added/removed anytime.
172
0
    return true;
173
0
}
174
175
int zmq::xsub_t::xrecv (msg_t *msg_)
176
0
{
177
    //  If there's already a message prepared by a previous call to zmq_poll,
178
    //  return it straight ahead.
179
0
    if (_has_message) {
180
0
        const int rc = msg_->move (_message);
181
0
        errno_assert (rc == 0);
182
0
        _has_message = false;
183
0
        _more_recv = (msg_->flags () & msg_t::more) != 0;
184
0
        return 0;
185
0
    }
186
187
    //  TODO: This can result in infinite loop in the case of continuous
188
    //  stream of non-matching messages which breaks the non-blocking recv
189
    //  semantics.
190
0
    while (true) {
191
        //  Get a message using fair queueing algorithm.
192
0
        int rc = _fq.recv (msg_);
193
194
        //  If there's no message available, return immediately.
195
        //  The same when error occurs.
196
0
        if (rc != 0)
197
0
            return -1;
198
199
        //  Check whether the message matches at least one subscription.
200
        //  Non-initial parts of the message are passed
201
0
        if (_more_recv || !options.filter || match (msg_)) {
202
0
            _more_recv = (msg_->flags () & msg_t::more) != 0;
203
0
            return 0;
204
0
        }
205
206
        //  Message doesn't match. Pop any remaining parts of the message
207
        //  from the pipe.
208
0
        while (msg_->flags () & msg_t::more) {
209
0
            rc = _fq.recv (msg_);
210
0
            errno_assert (rc == 0);
211
0
        }
212
0
    }
213
0
}
214
215
bool zmq::xsub_t::xhas_in ()
216
0
{
217
    //  There are subsequent parts of the partly-read message available.
218
0
    if (_more_recv)
219
0
        return true;
220
221
    //  If there's already a message prepared by a previous call to zmq_poll,
222
    //  return straight ahead.
223
0
    if (_has_message)
224
0
        return true;
225
226
    //  TODO: This can result in infinite loop in the case of continuous
227
    //  stream of non-matching messages.
228
0
    while (true) {
229
        //  Get a message using fair queueing algorithm.
230
0
        int rc = _fq.recv (&_message);
231
232
        //  If there's no message available, return immediately.
233
        //  The same when error occurs.
234
0
        if (rc != 0) {
235
0
            errno_assert (errno == EAGAIN);
236
0
            return false;
237
0
        }
238
239
        //  Check whether the message matches at least one subscription.
240
0
        if (!options.filter || match (&_message)) {
241
0
            _has_message = true;
242
0
            return true;
243
0
        }
244
245
        //  Message doesn't match. Pop any remaining parts of the message
246
        //  from the pipe.
247
0
        while (_message.flags () & msg_t::more) {
248
0
            rc = _fq.recv (&_message);
249
0
            errno_assert (rc == 0);
250
0
        }
251
0
    }
252
0
}
253
254
bool zmq::xsub_t::match (msg_t *msg_)
255
0
{
256
0
    const bool matching = _subscriptions.check (
257
0
      static_cast<unsigned char *> (msg_->data ()), msg_->size ());
258
259
0
    return matching ^ options.invert_matching;
260
0
}
261
262
void zmq::xsub_t::send_subscription (unsigned char *data_,
263
                                     size_t size_,
264
                                     void *arg_)
265
0
{
266
0
    pipe_t *pipe = static_cast<pipe_t *> (arg_);
267
268
    //  Create the subscription message.
269
0
    msg_t msg;
270
0
    const int rc = msg.init_subscribe (size_, data_);
271
0
    errno_assert (rc == 0);
272
273
    //  Send it to the pipe.
274
0
    const bool sent = pipe->write (&msg);
275
    //  If we reached the SNDHWM, and thus cannot send the subscription, drop
276
    //  the subscription message instead. This matches the behaviour of
277
    //  zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
278
    //  when the SNDHWM is reached.
279
0
    if (!sent)
280
0
        msg.close ();
281
0
}