/src/libzmq/src/server.cpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "server.hpp" |
6 | | #include "pipe.hpp" |
7 | | #include "wire.hpp" |
8 | | #include "random.hpp" |
9 | | #include "likely.hpp" |
10 | | #include "err.hpp" |
11 | | |
12 | | zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
13 | 0 | socket_base_t (parent_, tid_, sid_, true), |
14 | 0 | _next_routing_id (generate_random ()) |
15 | 0 | { |
16 | 0 | options.type = ZMQ_SERVER; |
17 | 0 | options.can_send_hello_msg = true; |
18 | 0 | options.can_recv_disconnect_msg = true; |
19 | 0 | } |
20 | | |
21 | | zmq::server_t::~server_t () |
22 | 0 | { |
23 | 0 | zmq_assert (_out_pipes.empty ()); |
24 | 0 | } |
25 | | |
26 | | void zmq::server_t::xattach_pipe (pipe_t *pipe_, |
27 | | bool subscribe_to_all_, |
28 | | bool locally_initiated_) |
29 | 0 | { |
30 | 0 | LIBZMQ_UNUSED (subscribe_to_all_); |
31 | 0 | LIBZMQ_UNUSED (locally_initiated_); |
32 | |
|
33 | 0 | zmq_assert (pipe_); |
34 | |
|
35 | 0 | uint32_t routing_id = _next_routing_id++; |
36 | 0 | if (!routing_id) |
37 | 0 | routing_id = _next_routing_id++; // Never use Routing ID zero |
38 | |
|
39 | 0 | pipe_->set_server_socket_routing_id (routing_id); |
40 | | // Add the record into output pipes lookup table |
41 | 0 | outpipe_t outpipe = {pipe_, true}; |
42 | 0 | const bool ok = |
43 | 0 | _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second; |
44 | 0 | zmq_assert (ok); |
45 | |
|
46 | 0 | _fq.attach (pipe_); |
47 | 0 | } |
48 | | |
49 | | void zmq::server_t::xpipe_terminated (pipe_t *pipe_) |
50 | 0 | { |
51 | 0 | const out_pipes_t::iterator it = |
52 | 0 | _out_pipes.find (pipe_->get_server_socket_routing_id ()); |
53 | 0 | zmq_assert (it != _out_pipes.end ()); |
54 | 0 | _out_pipes.erase (it); |
55 | 0 | _fq.pipe_terminated (pipe_); |
56 | 0 | } |
57 | | |
58 | | void zmq::server_t::xread_activated (pipe_t *pipe_) |
59 | 0 | { |
60 | 0 | _fq.activated (pipe_); |
61 | 0 | } |
62 | | |
63 | | void zmq::server_t::xwrite_activated (pipe_t *pipe_) |
64 | 0 | { |
65 | 0 | const out_pipes_t::iterator end = _out_pipes.end (); |
66 | 0 | out_pipes_t::iterator it; |
67 | 0 | for (it = _out_pipes.begin (); it != end; ++it) |
68 | 0 | if (it->second.pipe == pipe_) |
69 | 0 | break; |
70 | |
|
71 | 0 | zmq_assert (it != _out_pipes.end ()); |
72 | 0 | zmq_assert (!it->second.active); |
73 | 0 | it->second.active = true; |
74 | 0 | } |
75 | | |
76 | | int zmq::server_t::xsend (msg_t *msg_) |
77 | 0 | { |
78 | | // SERVER sockets do not allow multipart data (ZMQ_SNDMORE) |
79 | 0 | if (msg_->flags () & msg_t::more) { |
80 | 0 | errno = EINVAL; |
81 | 0 | return -1; |
82 | 0 | } |
83 | | // Find the pipe associated with the routing stored in the message. |
84 | 0 | const uint32_t routing_id = msg_->get_routing_id (); |
85 | 0 | out_pipes_t::iterator it = _out_pipes.find (routing_id); |
86 | |
|
87 | 0 | if (it != _out_pipes.end ()) { |
88 | 0 | if (!it->second.pipe->check_write ()) { |
89 | 0 | it->second.active = false; |
90 | 0 | errno = EAGAIN; |
91 | 0 | return -1; |
92 | 0 | } |
93 | 0 | } else { |
94 | 0 | errno = EHOSTUNREACH; |
95 | 0 | return -1; |
96 | 0 | } |
97 | | |
98 | | // Message might be delivered over inproc, so we reset routing id |
99 | 0 | int rc = msg_->reset_routing_id (); |
100 | 0 | errno_assert (rc == 0); |
101 | |
|
102 | 0 | const bool ok = it->second.pipe->write (msg_); |
103 | 0 | if (unlikely (!ok)) { |
104 | | // Message failed to send - we must close it ourselves. |
105 | 0 | rc = msg_->close (); |
106 | 0 | errno_assert (rc == 0); |
107 | 0 | } else |
108 | 0 | it->second.pipe->flush (); |
109 | | |
110 | | // Detach the message from the data buffer. |
111 | 0 | rc = msg_->init (); |
112 | 0 | errno_assert (rc == 0); |
113 | |
|
114 | 0 | return 0; |
115 | 0 | } |
116 | | |
117 | | int zmq::server_t::xrecv (msg_t *msg_) |
118 | 0 | { |
119 | 0 | pipe_t *pipe = NULL; |
120 | 0 | int rc = _fq.recvpipe (msg_, &pipe); |
121 | | |
122 | | // Drop any messages with more flag |
123 | 0 | while (rc == 0 && msg_->flags () & msg_t::more) { |
124 | | // drop all frames of the current multi-frame message |
125 | 0 | rc = _fq.recvpipe (msg_, NULL); |
126 | |
|
127 | 0 | while (rc == 0 && msg_->flags () & msg_t::more) |
128 | 0 | rc = _fq.recvpipe (msg_, NULL); |
129 | | |
130 | | // get the new message |
131 | 0 | if (rc == 0) |
132 | 0 | rc = _fq.recvpipe (msg_, &pipe); |
133 | 0 | } |
134 | |
|
135 | 0 | if (rc != 0) |
136 | 0 | return rc; |
137 | | |
138 | 0 | zmq_assert (pipe != NULL); |
139 | |
|
140 | 0 | const uint32_t routing_id = pipe->get_server_socket_routing_id (); |
141 | 0 | msg_->set_routing_id (routing_id); |
142 | |
|
143 | 0 | return 0; |
144 | 0 | } |
145 | | |
146 | | bool zmq::server_t::xhas_in () |
147 | 0 | { |
148 | 0 | return _fq.has_in (); |
149 | 0 | } |
150 | | |
151 | | bool zmq::server_t::xhas_out () |
152 | 0 | { |
153 | | // In theory, SERVER socket is always ready for writing. Whether actual |
154 | | // attempt to write succeeds depends on which pipe the message is going |
155 | | // to be routed to. |
156 | 0 | return true; |
157 | 0 | } |