Coverage Report

Created: 2025-10-10 07:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/channel.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "channel.hpp"
6
#include "err.hpp"
7
#include "pipe.hpp"
8
#include "msg.hpp"
9
10
zmq::channel_t::channel_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
11
0
    socket_base_t (parent_, tid_, sid_, true), _pipe (NULL)
12
0
{
13
0
    options.type = ZMQ_CHANNEL;
14
0
}
15
16
zmq::channel_t::~channel_t ()
17
0
{
18
0
    zmq_assert (!_pipe);
19
0
}
20
21
void zmq::channel_t::xattach_pipe (pipe_t *pipe_,
22
                                   bool subscribe_to_all_,
23
                                   bool locally_initiated_)
24
0
{
25
0
    LIBZMQ_UNUSED (subscribe_to_all_);
26
0
    LIBZMQ_UNUSED (locally_initiated_);
27
28
0
    zmq_assert (pipe_ != NULL);
29
30
    //  ZMQ_PAIR socket can only be connected to a single peer.
31
    //  The socket rejects any further connection requests.
32
0
    if (_pipe == NULL)
33
0
        _pipe = pipe_;
34
0
    else
35
0
        pipe_->terminate (false);
36
0
}
37
38
void zmq::channel_t::xpipe_terminated (pipe_t *pipe_)
39
0
{
40
0
    if (pipe_ == _pipe)
41
0
        _pipe = NULL;
42
0
}
43
44
void zmq::channel_t::xread_activated (pipe_t *)
45
0
{
46
    //  There's just one pipe. No lists of active and inactive pipes.
47
    //  There's nothing to do here.
48
0
}
49
50
void zmq::channel_t::xwrite_activated (pipe_t *)
51
0
{
52
    //  There's just one pipe. No lists of active and inactive pipes.
53
    //  There's nothing to do here.
54
0
}
55
56
int zmq::channel_t::xsend (msg_t *msg_)
57
0
{
58
    //  CHANNEL sockets do not allow multipart data (ZMQ_SNDMORE)
59
0
    if (msg_->flags () & msg_t::more) {
60
0
        errno = EINVAL;
61
0
        return -1;
62
0
    }
63
64
0
    if (!_pipe || !_pipe->write (msg_)) {
65
0
        errno = EAGAIN;
66
0
        return -1;
67
0
    }
68
69
0
    _pipe->flush ();
70
71
    //  Detach the original message from the data buffer.
72
0
    const int rc = msg_->init ();
73
0
    errno_assert (rc == 0);
74
75
0
    return 0;
76
0
}
77
78
int zmq::channel_t::xrecv (msg_t *msg_)
79
0
{
80
    //  Deallocate old content of the message.
81
0
    int rc = msg_->close ();
82
0
    errno_assert (rc == 0);
83
84
0
    if (!_pipe) {
85
        //  Initialise the output parameter to be a 0-byte message.
86
0
        rc = msg_->init ();
87
0
        errno_assert (rc == 0);
88
89
0
        errno = EAGAIN;
90
0
        return -1;
91
0
    }
92
93
    // Drop any messages with more flag
94
0
    bool read = _pipe->read (msg_);
95
0
    while (read && msg_->flags () & msg_t::more) {
96
        // drop all frames of the current multi-frame message
97
0
        read = _pipe->read (msg_);
98
0
        while (read && msg_->flags () & msg_t::more)
99
0
            read = _pipe->read (msg_);
100
101
        // get the new message
102
0
        if (read)
103
0
            read = _pipe->read (msg_);
104
0
    }
105
106
0
    if (!read) {
107
        //  Initialise the output parameter to be a 0-byte message.
108
0
        rc = msg_->init ();
109
0
        errno_assert (rc == 0);
110
111
0
        errno = EAGAIN;
112
0
        return -1;
113
0
    }
114
115
0
    return 0;
116
0
}
117
118
bool zmq::channel_t::xhas_in ()
119
0
{
120
0
    if (!_pipe)
121
0
        return false;
122
123
0
    return _pipe->check_read ();
124
0
}
125
126
bool zmq::channel_t::xhas_out ()
127
0
{
128
0
    if (!_pipe)
129
0
        return false;
130
131
0
    return _pipe->check_write ();
132
0
}