Coverage Report

Created: 2026-01-10 06:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/stream.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "stream.hpp"
6
#include "pipe.hpp"
7
#include "wire.hpp"
8
#include "random.hpp"
9
#include "likely.hpp"
10
#include "err.hpp"
11
12
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13
0
    routing_socket_base_t (parent_, tid_, sid_),
14
0
    _prefetched (false),
15
0
    _routing_id_sent (false),
16
0
    _current_out (NULL),
17
0
    _more_out (false),
18
0
    _next_integral_routing_id (generate_random ())
19
0
{
20
0
    options.type = ZMQ_STREAM;
21
0
    options.raw_socket = true;
22
23
0
    _prefetched_routing_id.init ();
24
0
    _prefetched_msg.init ();
25
0
}
26
27
zmq::stream_t::~stream_t ()
28
0
{
29
0
    _prefetched_routing_id.close ();
30
0
    _prefetched_msg.close ();
31
0
}
32
33
void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
34
                                  bool subscribe_to_all_,
35
                                  bool locally_initiated_)
36
0
{
37
0
    LIBZMQ_UNUSED (subscribe_to_all_);
38
39
0
    zmq_assert (pipe_);
40
41
0
    identify_peer (pipe_, locally_initiated_);
42
0
    _fq.attach (pipe_);
43
0
}
44
45
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
46
0
{
47
0
    erase_out_pipe (pipe_);
48
0
    _fq.pipe_terminated (pipe_);
49
    // TODO router_t calls pipe_->rollback() here; should this be done here as
50
    // well? then xpipe_terminated could be pulled up to routing_socket_base_t
51
0
    if (pipe_ == _current_out)
52
0
        _current_out = NULL;
53
0
}
54
55
void zmq::stream_t::xread_activated (pipe_t *pipe_)
56
0
{
57
0
    _fq.activated (pipe_);
58
0
}
59
60
int zmq::stream_t::xsend (msg_t *msg_)
61
0
{
62
    //  If this is the first part of the message it's the ID of the
63
    //  peer to send the message to.
64
0
    if (!_more_out) {
65
0
        zmq_assert (!_current_out);
66
67
        //  If we have malformed message (prefix with no subsequent message)
68
        //  then just silently ignore it.
69
        //  TODO: The connections should be killed instead.
70
0
        if (msg_->flags () & msg_t::more) {
71
            //  Find the pipe associated with the routing id stored in the prefix.
72
            //  If there's no such pipe return an error
73
74
0
            out_pipe_t *out_pipe = lookup_out_pipe (
75
0
              blob_t (static_cast<unsigned char *> (msg_->data ()),
76
0
                      msg_->size (), reference_tag_t ()));
77
78
0
            if (out_pipe) {
79
0
                _current_out = out_pipe->pipe;
80
0
                if (!_current_out->check_write ()) {
81
0
                    out_pipe->active = false;
82
0
                    _current_out = NULL;
83
0
                    errno = EAGAIN;
84
0
                    return -1;
85
0
                }
86
0
            } else {
87
0
                errno = EHOSTUNREACH;
88
0
                return -1;
89
0
            }
90
0
        }
91
92
        //  Expect one more message frame.
93
0
        _more_out = true;
94
95
0
        int rc = msg_->close ();
96
0
        errno_assert (rc == 0);
97
0
        rc = msg_->init ();
98
0
        errno_assert (rc == 0);
99
0
        return 0;
100
0
    }
101
102
    //  Ignore the MORE flag
103
0
    msg_->reset_flags (msg_t::more);
104
105
    //  This is the last part of the message.
106
0
    _more_out = false;
107
108
    //  Push the message into the pipe. If there's no out pipe, just drop it.
109
0
    if (_current_out) {
110
        // Close the remote connection if user has asked to do so
111
        // by sending zero length message.
112
        // Pending messages in the pipe will be dropped (on receiving term- ack)
113
0
        if (msg_->size () == 0) {
114
0
            _current_out->terminate (false);
115
0
            int rc = msg_->close ();
116
0
            errno_assert (rc == 0);
117
0
            rc = msg_->init ();
118
0
            errno_assert (rc == 0);
119
0
            _current_out = NULL;
120
0
            return 0;
121
0
        }
122
0
        const bool ok = _current_out->write (msg_);
123
0
        if (likely (ok))
124
0
            _current_out->flush ();
125
0
        _current_out = NULL;
126
0
    } else {
127
0
        const int rc = msg_->close ();
128
0
        errno_assert (rc == 0);
129
0
    }
130
131
    //  Detach the message from the data buffer.
132
0
    const int rc = msg_->init ();
133
0
    errno_assert (rc == 0);
134
135
0
    return 0;
136
0
}
137
138
int zmq::stream_t::xsetsockopt (int option_,
139
                                const void *optval_,
140
                                size_t optvallen_)
141
0
{
142
0
    switch (option_) {
143
0
        case ZMQ_STREAM_NOTIFY:
144
0
            return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
145
0
                                                     &options.raw_notify);
146
147
0
        default:
148
0
            return routing_socket_base_t::xsetsockopt (option_, optval_,
149
0
                                                       optvallen_);
150
0
    }
151
0
}
152
153
int zmq::stream_t::xrecv (msg_t *msg_)
154
0
{
155
0
    if (_prefetched) {
156
0
        if (!_routing_id_sent) {
157
0
            const int rc = msg_->move (_prefetched_routing_id);
158
0
            errno_assert (rc == 0);
159
0
            _routing_id_sent = true;
160
0
        } else {
161
0
            const int rc = msg_->move (_prefetched_msg);
162
0
            errno_assert (rc == 0);
163
0
            _prefetched = false;
164
0
        }
165
0
        return 0;
166
0
    }
167
168
0
    pipe_t *pipe = NULL;
169
0
    int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
170
0
    if (rc != 0)
171
0
        return -1;
172
173
0
    zmq_assert (pipe != NULL);
174
0
    zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
175
176
    //  We have received a frame with TCP data.
177
    //  Rather than sending this frame, we keep it in prefetched
178
    //  buffer and send a frame with peer's ID.
179
0
    const blob_t &routing_id = pipe->get_routing_id ();
180
0
    rc = msg_->close ();
181
0
    errno_assert (rc == 0);
182
0
    rc = msg_->init_size (routing_id.size ());
183
0
    errno_assert (rc == 0);
184
185
    // forward metadata (if any)
186
0
    metadata_t *metadata = _prefetched_msg.metadata ();
187
0
    if (metadata)
188
0
        msg_->set_metadata (metadata);
189
190
0
    memcpy (msg_->data (), routing_id.data (), routing_id.size ());
191
0
    msg_->set_flags (msg_t::more);
192
193
0
    _prefetched = true;
194
0
    _routing_id_sent = true;
195
196
0
    return 0;
197
0
}
198
199
bool zmq::stream_t::xhas_in ()
200
0
{
201
    //  We may already have a message pre-fetched.
202
0
    if (_prefetched)
203
0
        return true;
204
205
    //  Try to read the next message.
206
    //  The message, if read, is kept in the pre-fetch buffer.
207
0
    pipe_t *pipe = NULL;
208
0
    int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
209
0
    if (rc != 0)
210
0
        return false;
211
212
0
    zmq_assert (pipe != NULL);
213
0
    zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
214
215
0
    const blob_t &routing_id = pipe->get_routing_id ();
216
0
    rc = _prefetched_routing_id.init_size (routing_id.size ());
217
0
    errno_assert (rc == 0);
218
219
    // forward metadata (if any)
220
0
    metadata_t *metadata = _prefetched_msg.metadata ();
221
0
    if (metadata)
222
0
        _prefetched_routing_id.set_metadata (metadata);
223
224
0
    memcpy (_prefetched_routing_id.data (), routing_id.data (),
225
0
            routing_id.size ());
226
0
    _prefetched_routing_id.set_flags (msg_t::more);
227
228
0
    _prefetched = true;
229
0
    _routing_id_sent = false;
230
231
0
    return true;
232
0
}
233
234
bool zmq::stream_t::xhas_out ()
235
0
{
236
    //  In theory, STREAM socket is always ready for writing. Whether actual
237
    //  attempt to write succeeds depends on which pipe the message is going
238
    //  to be routed to.
239
0
    return true;
240
0
}
241
242
void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
243
0
{
244
    //  Always assign routing id for raw-socket
245
0
    unsigned char buffer[5];
246
0
    buffer[0] = 0;
247
0
    blob_t routing_id;
248
0
    if (locally_initiated_ && connect_routing_id_is_set ()) {
249
0
        const std::string connect_routing_id = extract_connect_routing_id ();
250
0
        routing_id.set (
251
0
          reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
252
0
          connect_routing_id.length ());
253
        //  Not allowed to duplicate an existing rid
254
0
        zmq_assert (!has_out_pipe (routing_id));
255
0
    } else {
256
0
        put_uint32 (buffer + 1, _next_integral_routing_id++);
257
0
        routing_id.set (buffer, sizeof buffer);
258
0
        memcpy (options.routing_id, routing_id.data (), routing_id.size ());
259
0
        options.routing_id_size =
260
0
          static_cast<unsigned char> (routing_id.size ());
261
0
    }
262
0
    pipe_->set_router_socket_routing_id (routing_id);
263
0
    add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
264
0
}