Coverage Report

Created: 2025-11-24 06:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/pipe.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <new>
5
#include <stddef.h>
6
7
#include "macros.hpp"
8
#include "pipe.hpp"
9
#include "err.hpp"
10
11
#include "ypipe.hpp"
12
#include "ypipe_conflate.hpp"
13
14
int zmq::pipepair (object_t *parents_[2],
15
                   pipe_t *pipes_[2],
16
                   const int hwms_[2],
17
                   const bool conflate_[2])
18
0
{
19
    //   Creates two pipe objects. These objects are connected by two ypipes,
20
    //   each to pass messages in one direction.
21
22
0
    typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
23
0
    typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
24
25
0
    pipe_t::upipe_t *upipe1;
26
0
    if (conflate_[0])
27
0
        upipe1 = new (std::nothrow) upipe_conflate_t ();
28
0
    else
29
0
        upipe1 = new (std::nothrow) upipe_normal_t ();
30
0
    alloc_assert (upipe1);
31
32
0
    pipe_t::upipe_t *upipe2;
33
0
    if (conflate_[1])
34
0
        upipe2 = new (std::nothrow) upipe_conflate_t ();
35
0
    else
36
0
        upipe2 = new (std::nothrow) upipe_normal_t ();
37
0
    alloc_assert (upipe2);
38
39
0
    pipes_[0] = new (std::nothrow)
40
0
      pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
41
0
    alloc_assert (pipes_[0]);
42
0
    pipes_[1] = new (std::nothrow)
43
0
      pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
44
0
    alloc_assert (pipes_[1]);
45
46
0
    pipes_[0]->set_peer (pipes_[1]);
47
0
    pipes_[1]->set_peer (pipes_[0]);
48
49
0
    return 0;
50
0
}
51
52
void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_)
53
0
{
54
0
    zmq::msg_t id;
55
0
    const int rc = id.init_size (options_.routing_id_size);
56
0
    errno_assert (rc == 0);
57
0
    memcpy (id.data (), options_.routing_id, options_.routing_id_size);
58
0
    id.set_flags (zmq::msg_t::routing_id);
59
0
    const bool written = pipe_->write (&id);
60
0
    zmq_assert (written);
61
0
    pipe_->flush ();
62
0
}
63
64
void zmq::send_hello_msg (pipe_t *pipe_, const options_t &options_)
65
0
{
66
0
    zmq::msg_t hello;
67
0
    const int rc =
68
0
      hello.init_buffer (&options_.hello_msg[0], options_.hello_msg.size ());
69
0
    errno_assert (rc == 0);
70
0
    const bool written = pipe_->write (&hello);
71
0
    zmq_assert (written);
72
0
    pipe_->flush ();
73
0
}
74
75
zmq::pipe_t::pipe_t (object_t *parent_,
76
                     upipe_t *inpipe_,
77
                     upipe_t *outpipe_,
78
                     int inhwm_,
79
                     int outhwm_,
80
                     bool conflate_) :
81
0
    object_t (parent_),
82
0
    _in_pipe (inpipe_),
83
0
    _out_pipe (outpipe_),
84
0
    _in_active (true),
85
0
    _out_active (true),
86
0
    _hwm (outhwm_),
87
0
    _lwm (compute_lwm (inhwm_)),
88
0
    _in_hwm_boost (-1),
89
0
    _out_hwm_boost (-1),
90
0
    _msgs_read (0),
91
0
    _msgs_written (0),
92
0
    _peers_msgs_read (0),
93
0
    _peer (NULL),
94
0
    _sink (NULL),
95
0
    _state (active),
96
0
    _delay (true),
97
0
    _server_socket_routing_id (0),
98
0
    _conflate (conflate_)
99
0
{
100
0
    _disconnect_msg.init ();
101
0
}
102
103
zmq::pipe_t::~pipe_t ()
104
0
{
105
0
    _disconnect_msg.close ();
106
0
}
107
108
void zmq::pipe_t::set_peer (pipe_t *peer_)
109
0
{
110
    //  Peer can be set once only.
111
0
    zmq_assert (!_peer);
112
0
    _peer = peer_;
113
0
}
114
115
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
116
0
{
117
    // Sink can be set once only.
118
0
    zmq_assert (!_sink);
119
0
    _sink = sink_;
120
0
}
121
122
void zmq::pipe_t::set_server_socket_routing_id (
123
  uint32_t server_socket_routing_id_)
124
0
{
125
0
    _server_socket_routing_id = server_socket_routing_id_;
126
0
}
127
128
uint32_t zmq::pipe_t::get_server_socket_routing_id () const
129
0
{
130
0
    return _server_socket_routing_id;
131
0
}
132
133
void zmq::pipe_t::set_router_socket_routing_id (
134
  const blob_t &router_socket_routing_id_)
135
0
{
136
0
    _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
137
0
}
138
139
const zmq::blob_t &zmq::pipe_t::get_routing_id () const
140
0
{
141
0
    return _router_socket_routing_id;
142
0
}
143
144
bool zmq::pipe_t::check_read ()
145
0
{
146
0
    if (unlikely (!_in_active))
147
0
        return false;
148
0
    if (unlikely (_state != active && _state != waiting_for_delimiter))
149
0
        return false;
150
151
    //  Check if there's an item in the pipe.
152
0
    if (!_in_pipe->check_read ()) {
153
0
        _in_active = false;
154
0
        return false;
155
0
    }
156
157
    //  If the next item in the pipe is message delimiter,
158
    //  initiate termination process.
159
0
    if (_in_pipe->probe (is_delimiter)) {
160
0
        msg_t msg;
161
0
        const bool ok = _in_pipe->read (&msg);
162
0
        zmq_assert (ok);
163
0
        process_delimiter ();
164
0
        return false;
165
0
    }
166
167
0
    return true;
168
0
}
169
170
bool zmq::pipe_t::read (msg_t *msg_)
171
0
{
172
0
    if (unlikely (!_in_active))
173
0
        return false;
174
0
    if (unlikely (_state != active && _state != waiting_for_delimiter))
175
0
        return false;
176
177
0
    while (true) {
178
0
        if (!_in_pipe->read (msg_)) {
179
0
            _in_active = false;
180
0
            return false;
181
0
        }
182
183
        //  If this is a credential, ignore it and receive next message.
184
0
        if (unlikely (msg_->is_credential ())) {
185
0
            const int rc = msg_->close ();
186
0
            zmq_assert (rc == 0);
187
0
        } else {
188
0
            break;
189
0
        }
190
0
    }
191
192
    //  If delimiter was read, start termination process of the pipe.
193
0
    if (msg_->is_delimiter ()) {
194
0
        process_delimiter ();
195
0
        return false;
196
0
    }
197
198
0
    if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
199
0
        _msgs_read++;
200
201
0
    if (_lwm > 0 && _msgs_read % _lwm == 0)
202
0
        send_activate_write (_peer, _msgs_read);
203
204
0
    return true;
205
0
}
206
207
bool zmq::pipe_t::check_write ()
208
0
{
209
0
    if (unlikely (!_out_active || _state != active))
210
0
        return false;
211
212
0
    const bool full = !check_hwm ();
213
214
0
    if (unlikely (full)) {
215
0
        _out_active = false;
216
0
        return false;
217
0
    }
218
219
0
    return true;
220
0
}
221
222
bool zmq::pipe_t::write (const msg_t *msg_)
223
0
{
224
0
    if (unlikely (!check_write ()))
225
0
        return false;
226
227
0
    const bool more = (msg_->flags () & msg_t::more) != 0;
228
0
    const bool is_routing_id = msg_->is_routing_id ();
229
0
    _out_pipe->write (*msg_, more);
230
0
    if (!more && !is_routing_id)
231
0
        _msgs_written++;
232
233
0
    return true;
234
0
}
235
236
void zmq::pipe_t::rollback () const
237
0
{
238
    //  Remove incomplete message from the outbound pipe.
239
0
    msg_t msg;
240
0
    if (_out_pipe) {
241
0
        while (_out_pipe->unwrite (&msg)) {
242
0
            zmq_assert (msg.flags () & msg_t::more);
243
0
            const int rc = msg.close ();
244
0
            errno_assert (rc == 0);
245
0
        }
246
0
    }
247
0
}
248
249
void zmq::pipe_t::flush ()
250
0
{
251
    //  The peer does not exist anymore at this point.
252
0
    if (_state == term_ack_sent)
253
0
        return;
254
255
0
    if (_out_pipe && !_out_pipe->flush ())
256
0
        send_activate_read (_peer);
257
0
}
258
259
void zmq::pipe_t::process_activate_read ()
260
0
{
261
0
    if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
262
0
        _in_active = true;
263
0
        _sink->read_activated (this);
264
0
    }
265
0
}
266
267
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
268
0
{
269
    //  Remember the peer's message sequence number.
270
0
    _peers_msgs_read = msgs_read_;
271
272
0
    if (!_out_active && _state == active) {
273
0
        _out_active = true;
274
0
        _sink->write_activated (this);
275
0
    }
276
0
}
277
278
void zmq::pipe_t::process_hiccup (void *pipe_)
279
0
{
280
    //  Destroy old outpipe. Note that the read end of the pipe was already
281
    //  migrated to this thread.
282
0
    zmq_assert (_out_pipe);
283
0
    _out_pipe->flush ();
284
0
    msg_t msg;
285
0
    while (_out_pipe->read (&msg)) {
286
0
        if (!(msg.flags () & msg_t::more))
287
0
            _msgs_written--;
288
0
        const int rc = msg.close ();
289
0
        errno_assert (rc == 0);
290
0
    }
291
0
    LIBZMQ_DELETE (_out_pipe);
292
293
    //  Plug in the new outpipe.
294
0
    zmq_assert (pipe_);
295
0
    _out_pipe = static_cast<upipe_t *> (pipe_);
296
0
    _out_active = true;
297
298
    //  If appropriate, notify the user about the hiccup.
299
0
    if (_state == active)
300
0
        _sink->hiccuped (this);
301
0
}
302
303
void zmq::pipe_t::process_pipe_term ()
304
0
{
305
0
    zmq_assert (_state == active || _state == delimiter_received
306
0
                || _state == term_req_sent1);
307
308
    //  This is the simple case of peer-induced termination. If there are no
309
    //  more pending messages to read, or if the pipe was configured to drop
310
    //  pending messages, we can move directly to the term_ack_sent state.
311
    //  Otherwise we'll hang up in waiting_for_delimiter state till all
312
    //  pending messages are read.
313
0
    if (_state == active) {
314
0
        if (_delay)
315
0
            _state = waiting_for_delimiter;
316
0
        else {
317
0
            _state = term_ack_sent;
318
0
            _out_pipe = NULL;
319
0
            send_pipe_term_ack (_peer);
320
0
        }
321
0
    }
322
323
    //  Delimiter happened to arrive before the term command. Now we have the
324
    //  term command as well, so we can move straight to term_ack_sent state.
325
0
    else if (_state == delimiter_received) {
326
0
        _state = term_ack_sent;
327
0
        _out_pipe = NULL;
328
0
        send_pipe_term_ack (_peer);
329
0
    }
330
331
    //  This is the case where both ends of the pipe are closed in parallel.
332
    //  We simply reply to the request by ack and continue waiting for our
333
    //  own ack.
334
0
    else if (_state == term_req_sent1) {
335
0
        _state = term_req_sent2;
336
0
        _out_pipe = NULL;
337
0
        send_pipe_term_ack (_peer);
338
0
    }
339
0
}
340
341
void zmq::pipe_t::process_pipe_term_ack ()
342
0
{
343
    //  Notify the user that all the references to the pipe should be dropped.
344
0
    zmq_assert (_sink);
345
0
    _sink->pipe_terminated (this);
346
347
    //  In term_ack_sent and term_req_sent2 states there's nothing to do.
348
    //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
349
    //  the peer before deallocating this side of the pipe.
350
    //  All the other states are invalid.
351
0
    if (_state == term_req_sent1) {
352
0
        _out_pipe = NULL;
353
0
        send_pipe_term_ack (_peer);
354
0
    } else
355
0
        zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
356
357
    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
358
    //  pipe (which is an inbound pipe from its point of view).
359
    //  First, delete all the unread messages in the pipe. We have to do it by
360
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
361
    //  the ypipe itself.
362
363
0
    if (!_conflate) {
364
0
        msg_t msg;
365
0
        while (_in_pipe->read (&msg)) {
366
0
            const int rc = msg.close ();
367
0
            errno_assert (rc == 0);
368
0
        }
369
0
    }
370
371
0
    LIBZMQ_DELETE (_in_pipe);
372
373
    //  Deallocate the pipe object
374
0
    delete this;
375
0
}
376
377
void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
378
0
{
379
0
    set_hwms (inhwm_, outhwm_);
380
0
}
381
382
void zmq::pipe_t::set_nodelay ()
383
0
{
384
0
    this->_delay = false;
385
0
}
386
387
void zmq::pipe_t::terminate (bool delay_)
388
0
{
389
    //  Overload the value specified at pipe creation.
390
0
    _delay = delay_;
391
392
    //  If terminate was already called, we can ignore the duplicate invocation.
393
0
    if (_state == term_req_sent1 || _state == term_req_sent2) {
394
0
        return;
395
0
    }
396
    //  If the pipe is in the final phase of async termination, it's going to
397
    //  closed anyway. No need to do anything special here.
398
0
    if (_state == term_ack_sent) {
399
0
        return;
400
0
    }
401
    //  The simple sync termination case. Ask the peer to terminate and wait
402
    //  for the ack.
403
0
    if (_state == active) {
404
0
        send_pipe_term (_peer);
405
0
        _state = term_req_sent1;
406
0
    }
407
    //  There are still pending messages available, but the user calls
408
    //  'terminate'. We can act as if all the pending messages were read.
409
0
    else if (_state == waiting_for_delimiter && !_delay) {
410
        //  Drop any unfinished outbound messages.
411
0
        rollback ();
412
0
        _out_pipe = NULL;
413
0
        send_pipe_term_ack (_peer);
414
0
        _state = term_ack_sent;
415
0
    }
416
    //  If there are pending messages still available, do nothing.
417
0
    else if (_state == waiting_for_delimiter) {
418
0
    }
419
    //  We've already got delimiter, but not term command yet. We can ignore
420
    //  the delimiter and ack synchronously terminate as if we were in
421
    //  active state.
422
0
    else if (_state == delimiter_received) {
423
0
        send_pipe_term (_peer);
424
0
        _state = term_req_sent1;
425
0
    }
426
    //  There are no other states.
427
0
    else {
428
0
        zmq_assert (false);
429
0
    }
430
431
    //  Stop outbound flow of messages.
432
0
    _out_active = false;
433
434
0
    if (_out_pipe) {
435
        //  Drop any unfinished outbound messages.
436
0
        rollback ();
437
438
        //  Write the delimiter into the pipe. Note that watermarks are not
439
        //  checked; thus the delimiter can be written even when the pipe is full.
440
0
        msg_t msg;
441
0
        msg.init_delimiter ();
442
0
        _out_pipe->write (msg, false);
443
0
        flush ();
444
0
    }
445
0
}
446
447
bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
448
0
{
449
0
    return msg_.is_delimiter ();
450
0
}
451
452
int zmq::pipe_t::compute_lwm (int hwm_)
453
0
{
454
    //  Compute the low water mark. Following point should be taken
455
    //  into consideration:
456
    //
457
    //  1. LWM has to be less than HWM.
458
    //  2. LWM cannot be set to very low value (such as zero) as after filling
459
    //     the queue it would start to refill only after all the messages are
460
    //     read from it and thus unnecessarily hold the progress back.
461
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
462
    //     result in lock-step filling of the queue - if a single message is
463
    //     read from a full queue, writer thread is resumed to write exactly one
464
    //     message to the queue and go back to sleep immediately. This would
465
    //     result in low performance.
466
    //
467
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
468
    //  possible to reduce the thread switching overhead to almost zero.
469
    //  Let's make LWM 1/2 of HWM.
470
0
    const int result = (hwm_ + 1) / 2;
471
472
0
    return result;
473
0
}
474
475
void zmq::pipe_t::process_delimiter ()
476
0
{
477
0
    zmq_assert (_state == active || _state == waiting_for_delimiter);
478
479
0
    if (_state == active)
480
0
        _state = delimiter_received;
481
0
    else {
482
0
        rollback ();
483
0
        _out_pipe = NULL;
484
0
        send_pipe_term_ack (_peer);
485
0
        _state = term_ack_sent;
486
0
    }
487
0
}
488
489
void zmq::pipe_t::hiccup ()
490
0
{
491
    //  If termination is already under way do nothing.
492
0
    if (_state != active)
493
0
        return;
494
495
    //  We'll drop the pointer to the inpipe. From now on, the peer is
496
    //  responsible for deallocating it.
497
498
    //  Create new inpipe.
499
0
    _in_pipe =
500
0
      _conflate
501
0
        ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ())
502
0
        : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
503
504
0
    alloc_assert (_in_pipe);
505
0
    _in_active = true;
506
507
    //  Notify the peer about the hiccup.
508
0
    send_hiccup (_peer, _in_pipe);
509
0
}
510
511
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
512
0
{
513
0
    int in = inhwm_ + std::max (_in_hwm_boost, 0);
514
0
    int out = outhwm_ + std::max (_out_hwm_boost, 0);
515
516
    // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
517
0
    if (inhwm_ <= 0 || _in_hwm_boost == 0)
518
0
        in = 0;
519
520
0
    if (outhwm_ <= 0 || _out_hwm_boost == 0)
521
0
        out = 0;
522
523
0
    _lwm = compute_lwm (in);
524
0
    _hwm = out;
525
0
}
526
527
void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
528
0
{
529
0
    _in_hwm_boost = inhwmboost_;
530
0
    _out_hwm_boost = outhwmboost_;
531
0
}
532
533
bool zmq::pipe_t::check_hwm () const
534
0
{
535
0
    const bool full =
536
0
      _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
537
0
    return !full;
538
0
}
539
540
void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
541
0
{
542
0
    if (_state == active)
543
0
        send_pipe_hwm (_peer, inhwm_, outhwm_);
544
0
}
545
546
void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
547
0
{
548
0
    _endpoint_pair = ZMQ_MOVE (endpoint_pair_);
549
0
}
550
551
const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
552
0
{
553
0
    return _endpoint_pair;
554
0
}
555
556
void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
557
0
{
558
0
    if (_state == active) {
559
0
        endpoint_uri_pair_t *ep =
560
0
          new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
561
0
        send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read,
562
0
                              socket_base_, ep);
563
0
    }
564
0
}
565
566
void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
567
                                           own_t *socket_base_,
568
                                           endpoint_uri_pair_t *endpoint_pair_)
569
0
{
570
0
    send_pipe_stats_publish (socket_base_, queue_count_,
571
0
                             _msgs_written - _peers_msgs_read, endpoint_pair_);
572
0
}
573
574
void zmq::pipe_t::send_disconnect_msg ()
575
0
{
576
0
    if (_disconnect_msg.size () > 0 && _out_pipe) {
577
        // Rollback any incomplete message in the pipe, and push the disconnect message.
578
0
        rollback ();
579
580
0
        _out_pipe->write (_disconnect_msg, false);
581
0
        flush ();
582
0
        _disconnect_msg.init ();
583
0
    }
584
0
}
585
586
void zmq::pipe_t::set_disconnect_msg (
587
  const std::vector<unsigned char> &disconnect_)
588
0
{
589
0
    _disconnect_msg.close ();
590
0
    const int rc =
591
0
      _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
592
0
    errno_assert (rc == 0);
593
0
}
594
595
void zmq::pipe_t::send_hiccup_msg (const std::vector<unsigned char> &hiccup_)
596
0
{
597
0
    if (!hiccup_.empty () && _out_pipe) {
598
0
        msg_t msg;
599
0
        const int rc = msg.init_buffer (&hiccup_[0], hiccup_.size ());
600
0
        errno_assert (rc == 0);
601
602
0
        _out_pipe->write (msg, false);
603
0
        flush ();
604
0
    }
605
0
}