Coverage Report

Created: 2025-07-01 06:07

/src/libzmq/src/lb.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "lb.hpp"
5
#include "pipe.hpp"
6
#include "err.hpp"
7
#include "msg.hpp"
8
9
0
zmq::lb_t::lb_t () : _active (0), _current (0), _more (false), _dropping (false)
10
0
{
11
0
}
12
13
zmq::lb_t::~lb_t ()
14
0
{
15
0
    zmq_assert (_pipes.empty ());
16
0
}
17
18
void zmq::lb_t::attach (pipe_t *pipe_)
19
0
{
20
0
    _pipes.push_back (pipe_);
21
0
    activated (pipe_);
22
0
}
23
24
void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
25
0
{
26
0
    const pipes_t::size_type index = _pipes.index (pipe_);
27
28
    //  If we are in the middle of multipart message and current pipe
29
    //  have disconnected, we have to drop the remainder of the message.
30
0
    if (index == _current && _more)
31
0
        _dropping = true;
32
33
    //  Remove the pipe from the list; adjust number of active pipes
34
    //  accordingly.
35
0
    if (index < _active) {
36
0
        _active--;
37
0
        _pipes.swap (index, _active);
38
0
        if (_current == _active)
39
0
            _current = 0;
40
0
    }
41
0
    _pipes.erase (pipe_);
42
0
}
43
44
void zmq::lb_t::activated (pipe_t *pipe_)
45
0
{
46
    //  Move the pipe to the list of active pipes.
47
0
    _pipes.swap (_pipes.index (pipe_), _active);
48
0
    _active++;
49
0
}
50
51
int zmq::lb_t::send (msg_t *msg_)
52
0
{
53
0
    return sendpipe (msg_, NULL);
54
0
}
55
56
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
57
0
{
58
    //  Drop the message if required. If we are at the end of the message
59
    //  switch back to non-dropping mode.
60
0
    if (_dropping) {
61
0
        _more = (msg_->flags () & msg_t::more) != 0;
62
0
        _dropping = _more;
63
64
0
        int rc = msg_->close ();
65
0
        errno_assert (rc == 0);
66
0
        rc = msg_->init ();
67
0
        errno_assert (rc == 0);
68
0
        return 0;
69
0
    }
70
71
0
    while (_active > 0) {
72
0
        if (_pipes[_current]->write (msg_)) {
73
0
            if (pipe_)
74
0
                *pipe_ = _pipes[_current];
75
0
            break;
76
0
        }
77
78
        // If send fails for multi-part msg rollback other
79
        // parts sent earlier and return EAGAIN.
80
        // Application should handle this as suitable
81
0
        if (_more) {
82
0
            _pipes[_current]->rollback ();
83
            // At this point the pipe is already being deallocated
84
            // and the first N frames are unreachable (_outpipe is
85
            // most likely already NULL so rollback won't actually do
86
            // anything and they can't be un-written to deliver later).
87
            // Return EFAULT to socket_base caller to drop current message
88
            // and any other subsequent frames to avoid them being
89
            // "stuck" and received when a new client reconnects, which
90
            // would break atomicity of multi-part messages (in blocking mode
91
            // socket_base just tries again and again to send the same message)
92
            // Note that given dropping mode returns 0, the user will
93
            // never know that the message could not be delivered, but
94
            // can't really fix it without breaking backward compatibility.
95
            // -2/EAGAIN will make sure socket_base caller does not re-enter
96
            // immediately or after a short sleep in blocking mode.
97
0
            _dropping = (msg_->flags () & msg_t::more) != 0;
98
0
            _more = false;
99
0
            errno = EAGAIN;
100
0
            return -2;
101
0
        }
102
103
0
        _active--;
104
0
        if (_current < _active)
105
0
            _pipes.swap (_current, _active);
106
0
        else
107
0
            _current = 0;
108
0
    }
109
110
    //  If there are no pipes we cannot send the message.
111
0
    if (_active == 0) {
112
0
        errno = EAGAIN;
113
0
        return -1;
114
0
    }
115
116
    //  If it's final part of the message we can flush it downstream and
117
    //  continue round-robining (load balance).
118
0
    _more = (msg_->flags () & msg_t::more) != 0;
119
0
    if (!_more) {
120
0
        _pipes[_current]->flush ();
121
122
0
        if (++_current >= _active)
123
0
            _current = 0;
124
0
    }
125
126
    //  Detach the message from the data buffer.
127
0
    const int rc = msg_->init ();
128
0
    errno_assert (rc == 0);
129
130
0
    return 0;
131
0
}
132
133
bool zmq::lb_t::has_out ()
134
0
{
135
    //  If one part of the message was already written we can definitely
136
    //  write the rest of the message.
137
0
    if (_more)
138
0
        return true;
139
140
0
    while (_active > 0) {
141
        //  Check whether a pipe has room for another message.
142
0
        if (_pipes[_current]->check_write ())
143
0
            return true;
144
145
        //  Deactivate the pipe.
146
0
        _active--;
147
0
        _pipes.swap (_current, _active);
148
0
        if (_current == _active)
149
0
            _current = 0;
150
0
    }
151
152
0
    return false;
153
0
}