Coverage Report

Created: 2025-10-13 07:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/socket_base.hpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
4
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
5
6
#include <string>
7
#include <map>
8
#include <stdarg.h>
9
10
#include "own.hpp"
11
#include "array.hpp"
12
#include "blob.hpp"
13
#include "stdint.hpp"
14
#include "poller.hpp"
15
#include "i_poll_events.hpp"
16
#include "i_mailbox.hpp"
17
#include "clock.hpp"
18
#include "pipe.hpp"
19
#include "endpoint.hpp"
20
21
extern "C" {
22
void zmq_free_event (void *data_, void *hint_);
23
}
24
25
namespace zmq
26
{
27
class ctx_t;
28
class msg_t;
29
class pipe_t;
30
31
class socket_base_t : public own_t,
32
                      public array_item_t<>,
33
                      public i_poll_events,
34
                      public i_pipe_events
35
{
36
    friend class reaper_t;
37
38
  public:
39
    //  Returns false if object is not a socket.
40
    bool check_tag () const;
41
42
    //  Returns whether the socket is thread-safe.
43
    bool is_thread_safe () const;
44
45
    //  Create a socket of a specified type.
46
    static socket_base_t *
47
    create (int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_);
48
49
    //  Returns the mailbox associated with this socket.
50
    i_mailbox *get_mailbox () const;
51
52
    //  Interrupt blocking call if the socket is stuck in one.
53
    //  This function can be called from a different thread!
54
    void stop ();
55
56
    //  Interface for communication with the API layer.
57
    int setsockopt (int option_, const void *optval_, size_t optvallen_);
58
    int getsockopt (int option_, void *optval_, size_t *optvallen_);
59
    int bind (const char *endpoint_uri_);
60
    int connect (const char *endpoint_uri_);
61
    int term_endpoint (const char *endpoint_uri_);
62
    int send (zmq::msg_t *msg_, int flags_);
63
    int recv (zmq::msg_t *msg_, int flags_);
64
    void add_signaler (signaler_t *s_);
65
    void remove_signaler (signaler_t *s_);
66
    int close ();
67
68
    //  These functions are used by the polling mechanism to determine
69
    //  which events are to be reported from this socket.
70
    bool has_in ();
71
    bool has_out ();
72
73
    //  Joining and leaving groups
74
    int join (const char *group_);
75
    int leave (const char *group_);
76
77
    //  Using this function reaper thread ask the socket to register with
78
    //  its poller.
79
    void start_reaping (poller_t *poller_);
80
81
    //  i_poll_events implementation. This interface is used when socket
82
    //  is handled by the poller in the reaper thread.
83
    void in_event () ZMQ_FINAL;
84
    void out_event () ZMQ_FINAL;
85
    void timer_event (int id_) ZMQ_FINAL;
86
87
    //  i_pipe_events interface implementation.
88
    void read_activated (pipe_t *pipe_) ZMQ_FINAL;
89
    void write_activated (pipe_t *pipe_) ZMQ_FINAL;
90
    void hiccuped (pipe_t *pipe_) ZMQ_FINAL;
91
    void pipe_terminated (pipe_t *pipe_) ZMQ_FINAL;
92
    void lock ();
93
    void unlock ();
94
95
    int monitor (const char *endpoint_,
96
                 uint64_t events_,
97
                 int event_version_,
98
                 int type_);
99
100
    void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_,
101
                          zmq::fd_t fd_);
102
    void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_,
103
                                int err_);
104
    void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_,
105
                                int interval_);
106
    void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_,
107
                          zmq::fd_t fd_);
108
    void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
109
                            int err_);
110
    void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_,
111
                         zmq::fd_t fd_);
112
    void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
113
                              int err_);
114
    void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_,
115
                       zmq::fd_t fd_);
116
    void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
117
                             int err_);
118
    void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_,
119
                             zmq::fd_t fd_);
120
    void event_handshake_failed_no_detail (
121
      const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
122
    void event_handshake_failed_protocol (
123
      const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
124
    void
125
    event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_,
126
                                 int err_);
127
    void
128
    event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_,
129
                               int err_);
130
131
    //  Query the state of a specific peer. The default implementation
132
    //  always returns an ENOTSUP error.
133
    virtual int get_peer_state (const void *routing_id_,
134
                                size_t routing_id_size_) const;
135
136
    //  Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS
137
    //  after gathering the data asynchronously. Requires event monitoring to
138
    //  be enabled.
139
    int query_pipes_stats ();
140
141
    bool is_disconnected () const;
142
143
  protected:
144
    socket_base_t (zmq::ctx_t *parent_,
145
                   uint32_t tid_,
146
                   int sid_,
147
                   bool thread_safe_ = false);
148
    ~socket_base_t () ZMQ_OVERRIDE;
149
150
    //  Concrete algorithms for the x- methods are to be defined by
151
    //  individual socket types.
152
    virtual void xattach_pipe (zmq::pipe_t *pipe_,
153
                               bool subscribe_to_all_ = false,
154
                               bool locally_initiated_ = false) = 0;
155
156
    //  The default implementation assumes there are no specific socket
157
    //  options for the particular socket type. If not so, ZMQ_FINAL this
158
    //  method.
159
    virtual int
160
    xsetsockopt (int option_, const void *optval_, size_t optvallen_);
161
162
    //  The default implementation assumes there are no specific socket
163
    //  options for the particular socket type. If not so, ZMQ_FINAL this
164
    //  method.
165
    virtual int xgetsockopt (int option_, void *optval_, size_t *optvallen_);
166
167
    //  The default implementation assumes that send is not supported.
168
    virtual bool xhas_out ();
169
    virtual int xsend (zmq::msg_t *msg_);
170
171
    //  The default implementation assumes that recv in not supported.
172
    virtual bool xhas_in ();
173
    virtual int xrecv (zmq::msg_t *msg_);
174
175
    //  i_pipe_events will be forwarded to these functions.
176
    virtual void xread_activated (pipe_t *pipe_);
177
    virtual void xwrite_activated (pipe_t *pipe_);
178
    virtual void xhiccuped (pipe_t *pipe_);
179
    virtual void xpipe_terminated (pipe_t *pipe_) = 0;
180
181
    //  the default implementation assumes that joub and leave are not supported.
182
    virtual int xjoin (const char *group_);
183
    virtual int xleave (const char *group_);
184
185
    //  Delay actual destruction of the socket.
186
    void process_destroy () ZMQ_FINAL;
187
188
    int connect_internal (const char *endpoint_uri_);
189
190
    // Mutex for synchronize access to the socket in thread safe mode
191
    mutex_t _sync;
192
193
  private:
194
    // test if event should be sent and then dispatch it
195
    void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
196
                uint64_t values_[],
197
                uint64_t values_count_,
198
                uint64_t type_);
199
200
    // Socket event data dispatch
201
    void monitor_event (uint64_t event_,
202
                        const uint64_t values_[],
203
                        uint64_t values_count_,
204
                        const endpoint_uri_pair_t &endpoint_uri_pair_) const;
205
206
    // Monitor socket cleanup
207
    void stop_monitor (bool send_monitor_stopped_event_ = true);
208
209
    //  Creates new endpoint ID and adds the endpoint to the map.
210
    void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_,
211
                       own_t *endpoint_,
212
                       pipe_t *pipe_);
213
214
    //  Map of open endpoints.
215
    typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
216
    typedef std::multimap<std::string, endpoint_pipe_t> endpoints_t;
217
    endpoints_t _endpoints;
218
219
    //  Map of open inproc endpoints.
220
    class inprocs_t
221
    {
222
      public:
223
        void emplace (const char *endpoint_uri_, pipe_t *pipe_);
224
        int erase_pipes (const std::string &endpoint_uri_str_);
225
        void erase_pipe (const pipe_t *pipe_);
226
227
      private:
228
        typedef std::multimap<std::string, pipe_t *> map_t;
229
        map_t _inprocs;
230
    };
231
    inprocs_t _inprocs;
232
233
    //  To be called after processing commands or invoking any command
234
    //  handlers explicitly. If required, it will deallocate the socket.
235
    void check_destroy ();
236
237
    //  Moves the flags from the message to local variables,
238
    //  to be later retrieved by getsockopt.
239
    void extract_flags (const msg_t *msg_);
240
241
    //  Used to check whether the object is a socket.
242
    uint32_t _tag;
243
244
    //  If true, associated context was already terminated.
245
    bool _ctx_terminated;
246
247
    //  If true, object should have been already destroyed. However,
248
    //  destruction is delayed while we unwind the stack to the point
249
    //  where it doesn't intersect the object being destroyed.
250
    bool _destroyed;
251
252
    //  Parse URI string.
253
    static int
254
    parse_uri (const char *uri_, std::string &protocol_, std::string &path_);
255
256
    //  Check whether transport protocol, as specified in connect or
257
    //  bind, is available and compatible with the socket type.
258
    int check_protocol (const std::string &protocol_) const;
259
260
    //  Register the pipe with this socket.
261
    void attach_pipe (zmq::pipe_t *pipe_,
262
                      bool subscribe_to_all_ = false,
263
                      bool locally_initiated_ = false);
264
265
    //  Processes commands sent to this socket (if any). If timeout is -1,
266
    //  returns only after at least one command was processed.
267
    //  If throttle argument is true, commands are processed at most once
268
    //  in a predefined time period.
269
    int process_commands (int timeout_, bool throttle_);
270
271
    //  Handlers for incoming commands.
272
    void process_stop () ZMQ_FINAL;
273
    void process_bind (zmq::pipe_t *pipe_) ZMQ_FINAL;
274
    void
275
    process_pipe_stats_publish (uint64_t outbound_queue_count_,
276
                                uint64_t inbound_queue_count_,
277
                                endpoint_uri_pair_t *endpoint_pair_) ZMQ_FINAL;
278
    void process_term (int linger_) ZMQ_FINAL;
279
    void process_term_endpoint (std::string *endpoint_) ZMQ_FINAL;
280
281
    void update_pipe_options (int option_);
282
283
    std::string resolve_tcp_addr (std::string endpoint_uri_,
284
                                  const char *tcp_address_);
285
286
    //  Socket's mailbox object.
287
    i_mailbox *_mailbox;
288
289
    //  List of attached pipes.
290
    typedef array_t<pipe_t, 3> pipes_t;
291
    pipes_t _pipes;
292
293
    //  Reaper's poller and handle of this socket within it.
294
    poller_t *_poller;
295
    poller_t::handle_t _handle;
296
297
    //  Timestamp of when commands were processed the last time.
298
    uint64_t _last_tsc;
299
300
    //  Number of messages received since last command processing.
301
    int _ticks;
302
303
    //  True if the last message received had MORE flag set.
304
    bool _rcvmore;
305
306
    //  Improves efficiency of time measurement.
307
    clock_t _clock;
308
309
    // Monitor socket;
310
    void *_monitor_socket;
311
312
    // Bitmask of events being monitored
313
    int64_t _monitor_events;
314
315
    // Last socket endpoint resolved URI
316
    std::string _last_endpoint;
317
318
    // Indicate if the socket is thread safe
319
    const bool _thread_safe;
320
321
    // Signaler to be used in the reaping stage
322
    signaler_t *_reaper_signaler;
323
324
    // Mutex to synchronize access to the monitor Pair socket
325
    mutex_t _monitor_sync;
326
327
    ZMQ_NON_COPYABLE_NOR_MOVABLE (socket_base_t)
328
329
    // Add a flag for mark disconnect action
330
    bool _disconnected;
331
};
332
333
class routing_socket_base_t : public socket_base_t
334
{
335
  protected:
336
    routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_);
337
    ~routing_socket_base_t () ZMQ_OVERRIDE;
338
339
    // methods from socket_base_t
340
    int xsetsockopt (int option_,
341
                     const void *optval_,
342
                     size_t optvallen_) ZMQ_OVERRIDE;
343
    void xwrite_activated (pipe_t *pipe_) ZMQ_FINAL;
344
345
    // own methods
346
    std::string extract_connect_routing_id ();
347
    bool connect_routing_id_is_set () const;
348
349
    struct out_pipe_t
350
    {
351
        pipe_t *pipe;
352
        bool active;
353
    };
354
355
    void add_out_pipe (blob_t routing_id_, pipe_t *pipe_);
356
    bool has_out_pipe (const blob_t &routing_id_) const;
357
    out_pipe_t *lookup_out_pipe (const blob_t &routing_id_);
358
    const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const;
359
    void erase_out_pipe (const pipe_t *pipe_);
360
    out_pipe_t try_erase_out_pipe (const blob_t &routing_id_);
361
    template <typename Func> bool any_of_out_pipes (Func func_)
362
0
    {
363
0
        bool res = false;
364
0
        for (out_pipes_t::iterator it = _out_pipes.begin (),
365
0
                                   end = _out_pipes.end ();
366
0
             it != end && !res; ++it) {
367
0
            res |= func_ (*it->second.pipe);
368
0
        }
369
370
0
        return res;
371
0
    }
372
373
  private:
374
    //  Outbound pipes indexed by the peer IDs.
375
    typedef std::map<blob_t, out_pipe_t> out_pipes_t;
376
    out_pipes_t _out_pipes;
377
378
    // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
379
    std::string _connect_routing_id;
380
};
381
}
382
383
#endif