/src/libzmq/src/socket_base.hpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ |
4 | | #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ |
5 | | |
6 | | #include <string> |
7 | | #include <map> |
8 | | #include <stdarg.h> |
9 | | |
10 | | #include "own.hpp" |
11 | | #include "array.hpp" |
12 | | #include "blob.hpp" |
13 | | #include "stdint.hpp" |
14 | | #include "poller.hpp" |
15 | | #include "i_poll_events.hpp" |
16 | | #include "i_mailbox.hpp" |
17 | | #include "clock.hpp" |
18 | | #include "pipe.hpp" |
19 | | #include "endpoint.hpp" |
20 | | |
21 | | extern "C" { |
22 | | void zmq_free_event (void *data_, void *hint_); |
23 | | } |
24 | | |
25 | | namespace zmq |
26 | | { |
27 | | class ctx_t; |
28 | | class msg_t; |
29 | | class pipe_t; |
30 | | |
31 | | class socket_base_t : public own_t, |
32 | | public array_item_t<>, |
33 | | public i_poll_events, |
34 | | public i_pipe_events |
35 | | { |
36 | | friend class reaper_t; |
37 | | |
38 | | public: |
39 | | // Returns false if object is not a socket. |
40 | | bool check_tag () const; |
41 | | |
42 | | // Returns whether the socket is thread-safe. |
43 | | bool is_thread_safe () const; |
44 | | |
45 | | // Create a socket of a specified type. |
46 | | static socket_base_t * |
47 | | create (int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_); |
48 | | |
49 | | // Returns the mailbox associated with this socket. |
50 | | i_mailbox *get_mailbox () const; |
51 | | |
52 | | // Interrupt blocking call if the socket is stuck in one. |
53 | | // This function can be called from a different thread! |
54 | | void stop (); |
55 | | |
56 | | // Interface for communication with the API layer. |
57 | | int setsockopt (int option_, const void *optval_, size_t optvallen_); |
58 | | int getsockopt (int option_, void *optval_, size_t *optvallen_); |
59 | | int bind (const char *endpoint_uri_); |
60 | | int connect (const char *endpoint_uri_); |
61 | | int term_endpoint (const char *endpoint_uri_); |
62 | | int send (zmq::msg_t *msg_, int flags_); |
63 | | int recv (zmq::msg_t *msg_, int flags_); |
64 | | void add_signaler (signaler_t *s_); |
65 | | void remove_signaler (signaler_t *s_); |
66 | | int close (); |
67 | | |
68 | | // These functions are used by the polling mechanism to determine |
69 | | // which events are to be reported from this socket. |
70 | | bool has_in (); |
71 | | bool has_out (); |
72 | | |
73 | | // Joining and leaving groups |
74 | | int join (const char *group_); |
75 | | int leave (const char *group_); |
76 | | |
77 | | // Using this function reaper thread ask the socket to register with |
78 | | // its poller. |
79 | | void start_reaping (poller_t *poller_); |
80 | | |
81 | | // i_poll_events implementation. This interface is used when socket |
82 | | // is handled by the poller in the reaper thread. |
83 | | void in_event () ZMQ_FINAL; |
84 | | void out_event () ZMQ_FINAL; |
85 | | void timer_event (int id_) ZMQ_FINAL; |
86 | | |
87 | | // i_pipe_events interface implementation. |
88 | | void read_activated (pipe_t *pipe_) ZMQ_FINAL; |
89 | | void write_activated (pipe_t *pipe_) ZMQ_FINAL; |
90 | | void hiccuped (pipe_t *pipe_) ZMQ_FINAL; |
91 | | void pipe_terminated (pipe_t *pipe_) ZMQ_FINAL; |
92 | | void lock (); |
93 | | void unlock (); |
94 | | |
95 | | int monitor (const char *endpoint_, |
96 | | uint64_t events_, |
97 | | int event_version_, |
98 | | int type_); |
99 | | |
100 | | void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_, |
101 | | zmq::fd_t fd_); |
102 | | void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
103 | | int err_); |
104 | | void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_, |
105 | | int interval_); |
106 | | void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_, |
107 | | zmq::fd_t fd_); |
108 | | void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
109 | | int err_); |
110 | | void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_, |
111 | | zmq::fd_t fd_); |
112 | | void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
113 | | int err_); |
114 | | void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
115 | | zmq::fd_t fd_); |
116 | | void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
117 | | int err_); |
118 | | void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_, |
119 | | zmq::fd_t fd_); |
120 | | void event_handshake_failed_no_detail ( |
121 | | const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); |
122 | | void event_handshake_failed_protocol ( |
123 | | const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); |
124 | | void |
125 | | event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_, |
126 | | int err_); |
127 | | void |
128 | | event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_, |
129 | | int err_); |
130 | | |
131 | | // Query the state of a specific peer. The default implementation |
132 | | // always returns an ENOTSUP error. |
133 | | virtual int get_peer_state (const void *routing_id_, |
134 | | size_t routing_id_size_) const; |
135 | | |
136 | | // Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS |
137 | | // after gathering the data asynchronously. Requires event monitoring to |
138 | | // be enabled. |
139 | | int query_pipes_stats (); |
140 | | |
141 | | bool is_disconnected () const; |
142 | | |
143 | | protected: |
144 | | socket_base_t (zmq::ctx_t *parent_, |
145 | | uint32_t tid_, |
146 | | int sid_, |
147 | | bool thread_safe_ = false); |
148 | | ~socket_base_t () ZMQ_OVERRIDE; |
149 | | |
150 | | // Concrete algorithms for the x- methods are to be defined by |
151 | | // individual socket types. |
152 | | virtual void xattach_pipe (zmq::pipe_t *pipe_, |
153 | | bool subscribe_to_all_ = false, |
154 | | bool locally_initiated_ = false) = 0; |
155 | | |
156 | | // The default implementation assumes there are no specific socket |
157 | | // options for the particular socket type. If not so, ZMQ_FINAL this |
158 | | // method. |
159 | | virtual int |
160 | | xsetsockopt (int option_, const void *optval_, size_t optvallen_); |
161 | | |
162 | | // The default implementation assumes there are no specific socket |
163 | | // options for the particular socket type. If not so, ZMQ_FINAL this |
164 | | // method. |
165 | | virtual int xgetsockopt (int option_, void *optval_, size_t *optvallen_); |
166 | | |
167 | | // The default implementation assumes that send is not supported. |
168 | | virtual bool xhas_out (); |
169 | | virtual int xsend (zmq::msg_t *msg_); |
170 | | |
171 | | // The default implementation assumes that recv in not supported. |
172 | | virtual bool xhas_in (); |
173 | | virtual int xrecv (zmq::msg_t *msg_); |
174 | | |
175 | | // i_pipe_events will be forwarded to these functions. |
176 | | virtual void xread_activated (pipe_t *pipe_); |
177 | | virtual void xwrite_activated (pipe_t *pipe_); |
178 | | virtual void xhiccuped (pipe_t *pipe_); |
179 | | virtual void xpipe_terminated (pipe_t *pipe_) = 0; |
180 | | |
181 | | // the default implementation assumes that joub and leave are not supported. |
182 | | virtual int xjoin (const char *group_); |
183 | | virtual int xleave (const char *group_); |
184 | | |
185 | | // Delay actual destruction of the socket. |
186 | | void process_destroy () ZMQ_FINAL; |
187 | | |
188 | | int connect_internal (const char *endpoint_uri_); |
189 | | |
190 | | // Mutex for synchronize access to the socket in thread safe mode |
191 | | mutex_t _sync; |
192 | | |
193 | | private: |
194 | | // test if event should be sent and then dispatch it |
195 | | void event (const endpoint_uri_pair_t &endpoint_uri_pair_, |
196 | | uint64_t values_[], |
197 | | uint64_t values_count_, |
198 | | uint64_t type_); |
199 | | |
200 | | // Socket event data dispatch |
201 | | void monitor_event (uint64_t event_, |
202 | | const uint64_t values_[], |
203 | | uint64_t values_count_, |
204 | | const endpoint_uri_pair_t &endpoint_uri_pair_) const; |
205 | | |
206 | | // Monitor socket cleanup |
207 | | void stop_monitor (bool send_monitor_stopped_event_ = true); |
208 | | |
209 | | // Creates new endpoint ID and adds the endpoint to the map. |
210 | | void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_, |
211 | | own_t *endpoint_, |
212 | | pipe_t *pipe_); |
213 | | |
214 | | // Map of open endpoints. |
215 | | typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t; |
216 | | typedef std::multimap<std::string, endpoint_pipe_t> endpoints_t; |
217 | | endpoints_t _endpoints; |
218 | | |
219 | | // Map of open inproc endpoints. |
220 | | class inprocs_t |
221 | | { |
222 | | public: |
223 | | void emplace (const char *endpoint_uri_, pipe_t *pipe_); |
224 | | int erase_pipes (const std::string &endpoint_uri_str_); |
225 | | void erase_pipe (const pipe_t *pipe_); |
226 | | |
227 | | private: |
228 | | typedef std::multimap<std::string, pipe_t *> map_t; |
229 | | map_t _inprocs; |
230 | | }; |
231 | | inprocs_t _inprocs; |
232 | | |
233 | | // To be called after processing commands or invoking any command |
234 | | // handlers explicitly. If required, it will deallocate the socket. |
235 | | void check_destroy (); |
236 | | |
237 | | // Moves the flags from the message to local variables, |
238 | | // to be later retrieved by getsockopt. |
239 | | void extract_flags (const msg_t *msg_); |
240 | | |
241 | | // Used to check whether the object is a socket. |
242 | | uint32_t _tag; |
243 | | |
244 | | // If true, associated context was already terminated. |
245 | | bool _ctx_terminated; |
246 | | |
247 | | // If true, object should have been already destroyed. However, |
248 | | // destruction is delayed while we unwind the stack to the point |
249 | | // where it doesn't intersect the object being destroyed. |
250 | | bool _destroyed; |
251 | | |
252 | | // Parse URI string. |
253 | | static int |
254 | | parse_uri (const char *uri_, std::string &protocol_, std::string &path_); |
255 | | |
256 | | // Check whether transport protocol, as specified in connect or |
257 | | // bind, is available and compatible with the socket type. |
258 | | int check_protocol (const std::string &protocol_) const; |
259 | | |
260 | | // Register the pipe with this socket. |
261 | | void attach_pipe (zmq::pipe_t *pipe_, |
262 | | bool subscribe_to_all_ = false, |
263 | | bool locally_initiated_ = false); |
264 | | |
265 | | // Processes commands sent to this socket (if any). If timeout is -1, |
266 | | // returns only after at least one command was processed. |
267 | | // If throttle argument is true, commands are processed at most once |
268 | | // in a predefined time period. |
269 | | int process_commands (int timeout_, bool throttle_); |
270 | | |
271 | | // Handlers for incoming commands. |
272 | | void process_stop () ZMQ_FINAL; |
273 | | void process_bind (zmq::pipe_t *pipe_) ZMQ_FINAL; |
274 | | void |
275 | | process_pipe_stats_publish (uint64_t outbound_queue_count_, |
276 | | uint64_t inbound_queue_count_, |
277 | | endpoint_uri_pair_t *endpoint_pair_) ZMQ_FINAL; |
278 | | void process_term (int linger_) ZMQ_FINAL; |
279 | | void process_term_endpoint (std::string *endpoint_) ZMQ_FINAL; |
280 | | |
281 | | void update_pipe_options (int option_); |
282 | | |
283 | | std::string resolve_tcp_addr (std::string endpoint_uri_, |
284 | | const char *tcp_address_); |
285 | | |
286 | | // Socket's mailbox object. |
287 | | i_mailbox *_mailbox; |
288 | | |
289 | | // List of attached pipes. |
290 | | typedef array_t<pipe_t, 3> pipes_t; |
291 | | pipes_t _pipes; |
292 | | |
293 | | // Reaper's poller and handle of this socket within it. |
294 | | poller_t *_poller; |
295 | | poller_t::handle_t _handle; |
296 | | |
297 | | // Timestamp of when commands were processed the last time. |
298 | | uint64_t _last_tsc; |
299 | | |
300 | | // Number of messages received since last command processing. |
301 | | int _ticks; |
302 | | |
303 | | // True if the last message received had MORE flag set. |
304 | | bool _rcvmore; |
305 | | |
306 | | // Improves efficiency of time measurement. |
307 | | clock_t _clock; |
308 | | |
309 | | // Monitor socket; |
310 | | void *_monitor_socket; |
311 | | |
312 | | // Bitmask of events being monitored |
313 | | int64_t _monitor_events; |
314 | | |
315 | | // Last socket endpoint resolved URI |
316 | | std::string _last_endpoint; |
317 | | |
318 | | // Indicate if the socket is thread safe |
319 | | const bool _thread_safe; |
320 | | |
321 | | // Signaler to be used in the reaping stage |
322 | | signaler_t *_reaper_signaler; |
323 | | |
324 | | // Mutex to synchronize access to the monitor Pair socket |
325 | | mutex_t _monitor_sync; |
326 | | |
327 | | ZMQ_NON_COPYABLE_NOR_MOVABLE (socket_base_t) |
328 | | |
329 | | // Add a flag for mark disconnect action |
330 | | bool _disconnected; |
331 | | }; |
332 | | |
333 | | class routing_socket_base_t : public socket_base_t |
334 | | { |
335 | | protected: |
336 | | routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_); |
337 | | ~routing_socket_base_t () ZMQ_OVERRIDE; |
338 | | |
339 | | // methods from socket_base_t |
340 | | int xsetsockopt (int option_, |
341 | | const void *optval_, |
342 | | size_t optvallen_) ZMQ_OVERRIDE; |
343 | | void xwrite_activated (pipe_t *pipe_) ZMQ_FINAL; |
344 | | |
345 | | // own methods |
346 | | std::string extract_connect_routing_id (); |
347 | | bool connect_routing_id_is_set () const; |
348 | | |
349 | | struct out_pipe_t |
350 | | { |
351 | | pipe_t *pipe; |
352 | | bool active; |
353 | | }; |
354 | | |
355 | | void add_out_pipe (blob_t routing_id_, pipe_t *pipe_); |
356 | | bool has_out_pipe (const blob_t &routing_id_) const; |
357 | | out_pipe_t *lookup_out_pipe (const blob_t &routing_id_); |
358 | | const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const; |
359 | | void erase_out_pipe (const pipe_t *pipe_); |
360 | | out_pipe_t try_erase_out_pipe (const blob_t &routing_id_); |
361 | | template <typename Func> bool any_of_out_pipes (Func func_) |
362 | 0 | { |
363 | 0 | bool res = false; |
364 | 0 | for (out_pipes_t::iterator it = _out_pipes.begin (), |
365 | 0 | end = _out_pipes.end (); |
366 | 0 | it != end && !res; ++it) { |
367 | 0 | res |= func_ (*it->second.pipe); |
368 | 0 | } |
369 | |
|
370 | 0 | return res; |
371 | 0 | } |
372 | | |
373 | | private: |
374 | | // Outbound pipes indexed by the peer IDs. |
375 | | typedef std::map<blob_t, out_pipe_t> out_pipes_t; |
376 | | out_pipes_t _out_pipes; |
377 | | |
378 | | // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types |
379 | | std::string _connect_routing_id; |
380 | | }; |
381 | | } |
382 | | |
383 | | #endif |