Coverage Report

Created: 2025-07-18 07:00

/src/libzmq/src/dist.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "dist.hpp"
5
#include "pipe.hpp"
6
#include "err.hpp"
7
#include "msg.hpp"
8
#include "likely.hpp"
9
10
zmq::dist_t::dist_t () :
11
0
    _matching (0), _active (0), _eligible (0), _more (false)
12
0
{
13
0
}
14
15
zmq::dist_t::~dist_t ()
16
0
{
17
0
    zmq_assert (_pipes.empty ());
18
0
}
19
20
void zmq::dist_t::attach (pipe_t *pipe_)
21
0
{
22
    //  If we are in the middle of sending a message, we'll add new pipe
23
    //  into the list of eligible pipes. Otherwise we add it to the list
24
    //  of active pipes.
25
0
    if (_more) {
26
0
        _pipes.push_back (pipe_);
27
0
        _pipes.swap (_eligible, _pipes.size () - 1);
28
0
        _eligible++;
29
0
    } else {
30
0
        _pipes.push_back (pipe_);
31
0
        _pipes.swap (_active, _pipes.size () - 1);
32
0
        _active++;
33
0
        _eligible++;
34
0
    }
35
0
}
36
37
bool zmq::dist_t::has_pipe (pipe_t *pipe_)
38
0
{
39
0
    std::size_t claimed_index = _pipes.index (pipe_);
40
41
    // If pipe claims to be outside the available index space it can't be in the distributor.
42
0
    if (claimed_index >= _pipes.size ()) {
43
0
        return false;
44
0
    }
45
46
0
    return _pipes[claimed_index] == pipe_;
47
0
}
48
49
void zmq::dist_t::match (pipe_t *pipe_)
50
0
{
51
    //  If pipe is already matching do nothing.
52
0
    if (_pipes.index (pipe_) < _matching)
53
0
        return;
54
55
    //  If the pipe isn't eligible, ignore it.
56
0
    if (_pipes.index (pipe_) >= _eligible)
57
0
        return;
58
59
    //  Mark the pipe as matching.
60
0
    _pipes.swap (_pipes.index (pipe_), _matching);
61
0
    _matching++;
62
0
}
63
64
void zmq::dist_t::reverse_match ()
65
0
{
66
0
    const pipes_t::size_type prev_matching = _matching;
67
68
    // Reset matching to 0
69
0
    unmatch ();
70
71
    // Mark all matching pipes as not matching and vice-versa.
72
    // To do this, push all pipes that are eligible but not
73
    // matched - i.e. between "matching" and "eligible" -
74
    // to the beginning of the queue.
75
0
    for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
76
0
        _pipes.swap (i, _matching++);
77
0
    }
78
0
}
79
80
void zmq::dist_t::unmatch ()
81
0
{
82
0
    _matching = 0;
83
0
}
84
85
void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
86
0
{
87
    //  Remove the pipe from the list; adjust number of matching, active and/or
88
    //  eligible pipes accordingly.
89
0
    if (_pipes.index (pipe_) < _matching) {
90
0
        _pipes.swap (_pipes.index (pipe_), _matching - 1);
91
0
        _matching--;
92
0
    }
93
0
    if (_pipes.index (pipe_) < _active) {
94
0
        _pipes.swap (_pipes.index (pipe_), _active - 1);
95
0
        _active--;
96
0
    }
97
0
    if (_pipes.index (pipe_) < _eligible) {
98
0
        _pipes.swap (_pipes.index (pipe_), _eligible - 1);
99
0
        _eligible--;
100
0
    }
101
102
0
    _pipes.erase (pipe_);
103
0
}
104
105
void zmq::dist_t::activated (pipe_t *pipe_)
106
0
{
107
    //  Move the pipe from passive to eligible state.
108
0
    if (_eligible < _pipes.size ()) {
109
0
        _pipes.swap (_pipes.index (pipe_), _eligible);
110
0
        _eligible++;
111
0
    }
112
113
    //  If there's no message being sent at the moment, move it to
114
    //  the active state.
115
0
    if (!_more && _active < _pipes.size ()) {
116
0
        _pipes.swap (_eligible - 1, _active);
117
0
        _active++;
118
0
    }
119
0
}
120
121
int zmq::dist_t::send_to_all (msg_t *msg_)
122
0
{
123
0
    _matching = _active;
124
0
    return send_to_matching (msg_);
125
0
}
126
127
int zmq::dist_t::send_to_matching (msg_t *msg_)
128
0
{
129
    //  Is this end of a multipart message?
130
0
    const bool msg_more = (msg_->flags () & msg_t::more) != 0;
131
132
    //  Push the message to matching pipes.
133
0
    distribute (msg_);
134
135
    //  If multipart message is fully sent, activate all the eligible pipes.
136
0
    if (!msg_more)
137
0
        _active = _eligible;
138
139
0
    _more = msg_more;
140
141
0
    return 0;
142
0
}
143
144
void zmq::dist_t::distribute (msg_t *msg_)
145
0
{
146
    //  If there are no matching pipes available, simply drop the message.
147
0
    if (_matching == 0) {
148
0
        int rc = msg_->close ();
149
0
        errno_assert (rc == 0);
150
0
        rc = msg_->init ();
151
0
        errno_assert (rc == 0);
152
0
        return;
153
0
    }
154
155
0
    if (msg_->is_vsm ()) {
156
0
        for (pipes_t::size_type i = 0; i < _matching;) {
157
0
            if (!write (_pipes[i], msg_)) {
158
                //  Use same index again because entry will have been removed.
159
0
            } else {
160
0
                ++i;
161
0
            }
162
0
        }
163
0
        int rc = msg_->init ();
164
0
        errno_assert (rc == 0);
165
0
        return;
166
0
    }
167
168
    //  Add matching-1 references to the message. We already hold one reference,
169
    //  that's why -1.
170
0
    msg_->add_refs (static_cast<int> (_matching) - 1);
171
172
    //  Push copy of the message to each matching pipe.
173
0
    int failed = 0;
174
0
    for (pipes_t::size_type i = 0; i < _matching;) {
175
0
        if (!write (_pipes[i], msg_)) {
176
0
            ++failed;
177
            //  Use same index again because entry will have been removed.
178
0
        } else {
179
0
            ++i;
180
0
        }
181
0
    }
182
0
    if (unlikely (failed))
183
0
        msg_->rm_refs (failed);
184
185
    //  Detach the original message from the data buffer. Note that we don't
186
    //  close the message. That's because we've already used all the references.
187
0
    const int rc = msg_->init ();
188
0
    errno_assert (rc == 0);
189
0
}
190
191
bool zmq::dist_t::has_out ()
192
0
{
193
0
    return true;
194
0
}
195
196
bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
197
0
{
198
0
    if (!pipe_->write (msg_)) {
199
0
        _pipes.swap (_pipes.index (pipe_), _matching - 1);
200
0
        _matching--;
201
0
        _pipes.swap (_pipes.index (pipe_), _active - 1);
202
0
        _active--;
203
0
        _pipes.swap (_active, _eligible - 1);
204
0
        _eligible--;
205
0
        return false;
206
0
    }
207
0
    if (!(msg_->flags () & msg_t::more))
208
0
        pipe_->flush ();
209
0
    return true;
210
0
}
211
212
bool zmq::dist_t::check_hwm ()
213
0
{
214
0
    for (pipes_t::size_type i = 0; i < _matching; ++i)
215
0
        if (!_pipes[i]->check_hwm ())
216
0
            return false;
217
218
0
    return true;
219
0
}