Coverage Report

Created: 2025-10-28 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/req.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "req.hpp"
6
#include "err.hpp"
7
#include "msg.hpp"
8
#include "wire.hpp"
9
#include "random.hpp"
10
#include "likely.hpp"
11
12
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
13
0
    dealer_t (parent_, tid_, sid_),
14
0
    _receiving_reply (false),
15
0
    _message_begins (true),
16
0
    _reply_pipe (NULL),
17
0
    _request_id_frames_enabled (false),
18
0
    _request_id (generate_random ()),
19
0
    _strict (true)
20
0
{
21
0
    options.type = ZMQ_REQ;
22
0
}
23
24
zmq::req_t::~req_t ()
25
0
{
26
0
}
27
28
int zmq::req_t::xsend (msg_t *msg_)
29
0
{
30
    //  If we've sent a request and we still haven't got the reply,
31
    //  we can't send another request unless the strict option is disabled.
32
0
    if (_receiving_reply) {
33
0
        if (_strict) {
34
0
            errno = EFSM;
35
0
            return -1;
36
0
        }
37
38
0
        _receiving_reply = false;
39
0
        _message_begins = true;
40
0
    }
41
42
    //  First part of the request is the request routing id.
43
0
    if (_message_begins) {
44
0
        _reply_pipe = NULL;
45
46
0
        if (_request_id_frames_enabled) {
47
0
            _request_id++;
48
49
0
            msg_t id;
50
0
            int rc = id.init_size (sizeof (uint32_t));
51
0
            memcpy (id.data (), &_request_id, sizeof (uint32_t));
52
0
            errno_assert (rc == 0);
53
0
            id.set_flags (msg_t::more);
54
55
0
            rc = dealer_t::sendpipe (&id, &_reply_pipe);
56
0
            if (rc != 0) {
57
0
                return -1;
58
0
            }
59
0
        }
60
61
0
        msg_t bottom;
62
0
        int rc = bottom.init ();
63
0
        errno_assert (rc == 0);
64
0
        bottom.set_flags (msg_t::more);
65
66
0
        rc = dealer_t::sendpipe (&bottom, &_reply_pipe);
67
0
        if (rc != 0)
68
0
            return -1;
69
0
        zmq_assert (_reply_pipe);
70
71
0
        _message_begins = false;
72
73
        // Eat all currently available messages before the request is fully
74
        // sent. This is done to avoid:
75
        //   REQ sends request to A, A replies, B replies too.
76
        //   A's reply was first and matches, that is used.
77
        //   An hour later REQ sends a request to B. B's old reply is used.
78
0
        msg_t drop;
79
0
        while (true) {
80
0
            rc = drop.init ();
81
0
            errno_assert (rc == 0);
82
0
            rc = dealer_t::xrecv (&drop);
83
0
            if (rc != 0)
84
0
                break;
85
0
            drop.close ();
86
0
        }
87
0
    }
88
89
0
    bool more = (msg_->flags () & msg_t::more) != 0;
90
91
0
    int rc = dealer_t::xsend (msg_);
92
0
    if (rc != 0)
93
0
        return rc;
94
95
    //  If the request was fully sent, flip the FSM into reply-receiving state.
96
0
    if (!more) {
97
0
        _receiving_reply = true;
98
0
        _message_begins = true;
99
0
    }
100
101
0
    return 0;
102
0
}
103
104
int zmq::req_t::xrecv (msg_t *msg_)
105
0
{
106
    //  If request wasn't send, we can't wait for reply.
107
0
    if (!_receiving_reply) {
108
0
        errno = EFSM;
109
0
        return -1;
110
0
    }
111
112
    //  Skip messages until one with the right first frames is found.
113
0
    while (_message_begins) {
114
        //  If enabled, the first frame must have the correct request_id.
115
0
        if (_request_id_frames_enabled) {
116
0
            int rc = recv_reply_pipe (msg_);
117
0
            if (rc != 0)
118
0
                return rc;
119
120
0
            if (unlikely (!(msg_->flags () & msg_t::more)
121
0
                          || msg_->size () != sizeof (_request_id)
122
0
                          || *static_cast<uint32_t *> (msg_->data ())
123
0
                               != _request_id)) {
124
                //  Skip the remaining frames and try the next message
125
0
                while (msg_->flags () & msg_t::more) {
126
0
                    rc = recv_reply_pipe (msg_);
127
0
                    errno_assert (rc == 0);
128
0
                }
129
0
                continue;
130
0
            }
131
0
        }
132
133
        //  The next frame must be 0.
134
        // TODO: Failing this check should also close the connection with the peer!
135
0
        int rc = recv_reply_pipe (msg_);
136
0
        if (rc != 0)
137
0
            return rc;
138
139
0
        if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
140
            //  Skip the remaining frames and try the next message
141
0
            while (msg_->flags () & msg_t::more) {
142
0
                rc = recv_reply_pipe (msg_);
143
0
                errno_assert (rc == 0);
144
0
            }
145
0
            continue;
146
0
        }
147
148
0
        _message_begins = false;
149
0
    }
150
151
0
    const int rc = recv_reply_pipe (msg_);
152
0
    if (rc != 0)
153
0
        return rc;
154
155
    //  If the reply is fully received, flip the FSM into request-sending state.
156
0
    if (!(msg_->flags () & msg_t::more)) {
157
0
        _receiving_reply = false;
158
0
        _message_begins = true;
159
0
    }
160
161
0
    return 0;
162
0
}
163
164
bool zmq::req_t::xhas_in ()
165
0
{
166
    //  TODO: Duplicates should be removed here.
167
168
0
    if (!_receiving_reply)
169
0
        return false;
170
171
0
    return dealer_t::xhas_in ();
172
0
}
173
174
bool zmq::req_t::xhas_out ()
175
0
{
176
0
    if (_receiving_reply && _strict)
177
0
        return false;
178
179
0
    return dealer_t::xhas_out ();
180
0
}
181
182
int zmq::req_t::xsetsockopt (int option_,
183
                             const void *optval_,
184
                             size_t optvallen_)
185
0
{
186
0
    const bool is_int = (optvallen_ == sizeof (int));
187
0
    int value = 0;
188
0
    if (is_int)
189
0
        memcpy (&value, optval_, sizeof (int));
190
191
0
    switch (option_) {
192
0
        case ZMQ_REQ_CORRELATE:
193
0
            if (is_int && value >= 0) {
194
0
                _request_id_frames_enabled = (value != 0);
195
0
                return 0;
196
0
            }
197
0
            break;
198
199
0
        case ZMQ_REQ_RELAXED:
200
0
            if (is_int && value >= 0) {
201
0
                _strict = (value == 0);
202
0
                return 0;
203
0
            }
204
0
            break;
205
206
0
        default:
207
0
            break;
208
0
    }
209
210
0
    return dealer_t::xsetsockopt (option_, optval_, optvallen_);
211
0
}
212
213
void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
214
0
{
215
0
    if (_reply_pipe == pipe_)
216
0
        _reply_pipe = NULL;
217
0
    dealer_t::xpipe_terminated (pipe_);
218
0
}
219
220
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
221
0
{
222
0
    while (true) {
223
0
        pipe_t *pipe = NULL;
224
0
        const int rc = dealer_t::recvpipe (msg_, &pipe);
225
0
        if (rc != 0)
226
0
            return rc;
227
0
        if (!_reply_pipe || pipe == _reply_pipe)
228
0
            return 0;
229
0
    }
230
0
}
231
232
zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
233
                                   bool connect_,
234
                                   socket_base_t *socket_,
235
                                   const options_t &options_,
236
                                   address_t *addr_) :
237
0
    session_base_t (io_thread_, connect_, socket_, options_, addr_),
238
0
    _state (bottom)
239
0
{
240
0
}
241
242
zmq::req_session_t::~req_session_t ()
243
0
{
244
0
}
245
246
int zmq::req_session_t::push_msg (msg_t *msg_)
247
0
{
248
    //  Ignore commands, they are processed by the engine and should not
249
    //  affect the state machine.
250
0
    if (unlikely (msg_->flags () & msg_t::command))
251
0
        return 0;
252
253
0
    switch (_state) {
254
0
        case bottom:
255
0
            if (msg_->flags () == msg_t::more) {
256
                //  In case option ZMQ_CORRELATE is on, allow request_id to be
257
                //  transferred as first frame (would be too cumbersome to check
258
                //  whether the option is actually on or not).
259
0
                if (msg_->size () == sizeof (uint32_t)) {
260
0
                    _state = request_id;
261
0
                    return session_base_t::push_msg (msg_);
262
0
                }
263
0
                if (msg_->size () == 0) {
264
0
                    _state = body;
265
0
                    return session_base_t::push_msg (msg_);
266
0
                }
267
0
            }
268
0
            break;
269
0
        case request_id:
270
0
            if (msg_->flags () == msg_t::more && msg_->size () == 0) {
271
0
                _state = body;
272
0
                return session_base_t::push_msg (msg_);
273
0
            }
274
0
            break;
275
0
        case body:
276
0
            if (msg_->flags () == msg_t::more)
277
0
                return session_base_t::push_msg (msg_);
278
0
            if (msg_->flags () == 0) {
279
0
                _state = bottom;
280
0
                return session_base_t::push_msg (msg_);
281
0
            }
282
0
            break;
283
0
    }
284
0
    errno = EFAULT;
285
0
    return -1;
286
0
}
287
288
void zmq::req_session_t::reset ()
289
0
{
290
0
    session_base_t::reset ();
291
0
    _state = bottom;
292
0
}