/src/libzmq/src/dgram.cpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "dgram.hpp" |
6 | | #include "pipe.hpp" |
7 | | #include "wire.hpp" |
8 | | #include "random.hpp" |
9 | | #include "likely.hpp" |
10 | | #include "err.hpp" |
11 | | |
12 | | zmq::dgram_t::dgram_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
13 | 0 | socket_base_t (parent_, tid_, sid_), _pipe (NULL), _more_out (false) |
14 | 0 | { |
15 | 0 | options.type = ZMQ_DGRAM; |
16 | 0 | options.raw_socket = true; |
17 | 0 | } |
18 | | |
19 | | zmq::dgram_t::~dgram_t () |
20 | 0 | { |
21 | 0 | zmq_assert (!_pipe); |
22 | 0 | } |
23 | | |
24 | | void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, |
25 | | bool subscribe_to_all_, |
26 | | bool locally_initiated_) |
27 | 0 | { |
28 | 0 | LIBZMQ_UNUSED (subscribe_to_all_); |
29 | 0 | LIBZMQ_UNUSED (locally_initiated_); |
30 | |
|
31 | 0 | zmq_assert (pipe_); |
32 | | |
33 | | // ZMQ_DGRAM socket can only be connected to a single peer. |
34 | | // The socket rejects any further connection requests. |
35 | 0 | if (_pipe == NULL) |
36 | 0 | _pipe = pipe_; |
37 | 0 | else |
38 | 0 | pipe_->terminate (false); |
39 | 0 | } |
40 | | |
41 | | void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_) |
42 | 0 | { |
43 | 0 | if (pipe_ == _pipe) { |
44 | 0 | _pipe = NULL; |
45 | 0 | } |
46 | 0 | } |
47 | | |
48 | | void zmq::dgram_t::xread_activated (pipe_t *) |
49 | 0 | { |
50 | | // There's just one pipe. No lists of active and inactive pipes. |
51 | | // There's nothing to do here. |
52 | 0 | } |
53 | | |
54 | | void zmq::dgram_t::xwrite_activated (pipe_t *) |
55 | 0 | { |
56 | | // There's just one pipe. No lists of active and inactive pipes. |
57 | | // There's nothing to do here. |
58 | 0 | } |
59 | | |
60 | | int zmq::dgram_t::xsend (msg_t *msg_) |
61 | 0 | { |
62 | | // If there's no out pipe, just drop it. |
63 | 0 | if (!_pipe) { |
64 | 0 | const int rc = msg_->close (); |
65 | 0 | errno_assert (rc == 0); |
66 | 0 | return -1; |
67 | 0 | } |
68 | | |
69 | | // If this is the first part of the message it's the ID of the |
70 | | // peer to send the message to. |
71 | 0 | if (!_more_out) { |
72 | 0 | if (!(msg_->flags () & msg_t::more)) { |
73 | 0 | errno = EINVAL; |
74 | 0 | return -1; |
75 | 0 | } |
76 | 0 | } else { |
77 | | // dgram messages are two part only, reject part if more is set |
78 | 0 | if (msg_->flags () & msg_t::more) { |
79 | 0 | errno = EINVAL; |
80 | 0 | return -1; |
81 | 0 | } |
82 | 0 | } |
83 | | |
84 | | // Push the message into the pipe. |
85 | 0 | if (!_pipe->write (msg_)) { |
86 | 0 | errno = EAGAIN; |
87 | 0 | return -1; |
88 | 0 | } |
89 | | |
90 | 0 | if (!(msg_->flags () & msg_t::more)) |
91 | 0 | _pipe->flush (); |
92 | | |
93 | | // flip the more flag |
94 | 0 | _more_out = !_more_out; |
95 | | |
96 | | // Detach the message from the data buffer. |
97 | 0 | const int rc = msg_->init (); |
98 | 0 | errno_assert (rc == 0); |
99 | |
|
100 | 0 | return 0; |
101 | 0 | } |
102 | | |
103 | | int zmq::dgram_t::xrecv (msg_t *msg_) |
104 | 0 | { |
105 | | // Deallocate old content of the message. |
106 | 0 | int rc = msg_->close (); |
107 | 0 | errno_assert (rc == 0); |
108 | |
|
109 | 0 | if (!_pipe || !_pipe->read (msg_)) { |
110 | | // Initialise the output parameter to be a 0-byte message. |
111 | 0 | rc = msg_->init (); |
112 | 0 | errno_assert (rc == 0); |
113 | |
|
114 | 0 | errno = EAGAIN; |
115 | 0 | return -1; |
116 | 0 | } |
117 | | |
118 | 0 | return 0; |
119 | 0 | } |
120 | | |
121 | | bool zmq::dgram_t::xhas_in () |
122 | 0 | { |
123 | 0 | if (!_pipe) |
124 | 0 | return false; |
125 | | |
126 | 0 | return _pipe->check_read (); |
127 | 0 | } |
128 | | |
129 | | bool zmq::dgram_t::xhas_out () |
130 | 0 | { |
131 | 0 | if (!_pipe) |
132 | 0 | return false; |
133 | | |
134 | 0 | return _pipe->check_write (); |
135 | 0 | } |