Coverage Report

Created: 2025-07-18 07:00

/src/libzmq/src/msg.hpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_MSG_HPP_INCLUDE__
4
#define __ZMQ_MSG_HPP_INCLUDE__
5
6
#include <stddef.h>
7
#include <stdio.h>
8
9
#include "config.hpp"
10
#include "err.hpp"
11
#include "fd.hpp"
12
#include "atomic_counter.hpp"
13
#include "metadata.hpp"
14
15
//  bits 2-5
16
0
#define CMD_TYPE_MASK 0x1c
17
18
//  Signature for free function to deallocate the message content.
19
//  Note that it has to be declared as "C" so that it is the same as
20
//  zmq_free_fn defined in zmq.h.
21
extern "C" {
22
typedef void (msg_free_fn) (void *data_, void *hint_);
23
}
24
25
namespace zmq
26
{
27
//  Note that this structure needs to be explicitly constructed
28
//  (init functions) and destructed (close function).
29
30
static const char cancel_cmd_name[] = "\6CANCEL";
31
static const char sub_cmd_name[] = "\x9SUBSCRIBE";
32
33
class msg_t
34
{
35
  public:
36
    //  Shared message buffer. Message data are either allocated in one
37
    //  continuous block along with this structure - thus avoiding one
38
    //  malloc/free pair or they are stored in user-supplied memory.
39
    //  In the latter case, ffn member stores pointer to the function to be
40
    //  used to deallocate the data. If the buffer is actually shared (there
41
    //  are at least 2 references to it) refcount member contains number of
42
    //  references.
43
    struct content_t
44
    {
45
        void *data;
46
        size_t size;
47
        msg_free_fn *ffn;
48
        void *hint;
49
        zmq::atomic_counter_t refcnt;
50
    };
51
52
    //  Message flags.
53
    enum
54
    {
55
        more = 1,    //  Followed by more parts
56
        command = 2, //  Command frame (see ZMTP spec)
57
        //  Command types, use only bits 2-5 and compare with ==, not bitwise,
58
        //  a command can never be of more that one type at the same time
59
        ping = 4,
60
        pong = 8,
61
        subscribe = 12,
62
        cancel = 16,
63
        close_cmd = 20,
64
        credential = 32,
65
        routing_id = 64,
66
        shared = 128
67
    };
68
69
    bool check () const;
70
    int init ();
71
72
    int init (void *data_,
73
              size_t size_,
74
              msg_free_fn *ffn_,
75
              void *hint_,
76
              content_t *content_ = NULL);
77
78
    int init_size (size_t size_);
79
    int init_buffer (const void *buf_, size_t size_);
80
    int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
81
    int init_external_storage (content_t *content_,
82
                               void *data_,
83
                               size_t size_,
84
                               msg_free_fn *ffn_,
85
                               void *hint_);
86
    int init_delimiter ();
87
    int init_join ();
88
    int init_leave ();
89
    int init_subscribe (const size_t size_, const unsigned char *topic);
90
    int init_cancel (const size_t size_, const unsigned char *topic);
91
    int close ();
92
    int move (msg_t &src_);
93
    int copy (msg_t &src_);
94
    void *data ();
95
    size_t size () const;
96
    unsigned char flags () const;
97
    void set_flags (unsigned char flags_);
98
    void reset_flags (unsigned char flags_);
99
    metadata_t *metadata () const;
100
    void set_metadata (metadata_t *metadata_);
101
    void reset_metadata ();
102
    bool is_routing_id () const;
103
    bool is_credential () const;
104
    bool is_delimiter () const;
105
    bool is_join () const;
106
    bool is_leave () const;
107
    bool is_ping () const;
108
    bool is_pong () const;
109
    bool is_close_cmd () const;
110
111
    //  These are called on each message received by the session_base class,
112
    //  so get them inlined to avoid the overhead of 2 function calls per msg
113
    bool is_subscribe () const
114
0
    {
115
0
        return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
116
0
    }
117
118
    bool is_cancel () const
119
0
    {
120
0
        return (_u.base.flags & CMD_TYPE_MASK) == cancel;
121
0
    }
122
123
    size_t command_body_size () const;
124
    void *command_body ();
125
    bool is_vsm () const;
126
    bool is_cmsg () const;
127
    bool is_lmsg () const;
128
    bool is_zcmsg () const;
129
    uint32_t get_routing_id () const;
130
    int set_routing_id (uint32_t routing_id_);
131
    int reset_routing_id ();
132
    const char *group () const;
133
    int set_group (const char *group_);
134
    int set_group (const char *, size_t length_);
135
136
    //  After calling this function you can copy the message in POD-style
137
    //  refs_ times. No need to call copy.
138
    void add_refs (int refs_);
139
140
    //  Removes references previously added by add_refs. If the number of
141
    //  references drops to 0, the message is closed and false is returned.
142
    bool rm_refs (int refs_);
143
144
    void shrink (size_t new_size_);
145
146
    //  Size in bytes of the largest message that is still copied around
147
    //  rather than being reference-counted.
148
    enum
149
    {
150
        msg_t_size = 64
151
    };
152
    enum
153
    {
154
        max_vsm_size =
155
          msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
156
    };
157
    enum
158
    {
159
        ping_cmd_name_size = 5,   // 4PING
160
        cancel_cmd_name_size = 7, // 6CANCEL
161
        sub_cmd_name_size = 10    // 9SUBSCRIBE
162
    };
163
164
  private:
165
    zmq::atomic_counter_t *refcnt ();
166
167
    //  Different message types.
168
    enum type_t
169
    {
170
        type_min = 101,
171
        //  VSM messages store the content in the message itself
172
        type_vsm = 101,
173
        //  LMSG messages store the content in malloc-ed memory
174
        type_lmsg = 102,
175
        //  Delimiter messages are used in envelopes
176
        type_delimiter = 103,
177
        //  CMSG messages point to constant data
178
        type_cmsg = 104,
179
180
        // zero-copy LMSG message for v2_decoder
181
        type_zclmsg = 105,
182
183
        //  Join message for radio_dish
184
        type_join = 106,
185
186
        //  Leave message for radio_dish
187
        type_leave = 107,
188
189
        type_max = 107
190
    };
191
192
    enum group_type_t
193
    {
194
        group_type_short,
195
        group_type_long
196
    };
197
198
    struct long_group_t
199
    {
200
        char group[ZMQ_GROUP_MAX_LENGTH + 1];
201
        atomic_counter_t refcnt;
202
    };
203
204
    union group_t
205
    {
206
        unsigned char type;
207
        struct
208
        {
209
            unsigned char type;
210
            char group[15];
211
        } sgroup;
212
        struct
213
        {
214
            unsigned char type;
215
            long_group_t *content;
216
        } lgroup;
217
    };
218
219
    //  Note that fields shared between different message types are not
220
    //  moved to the parent class (msg_t). This way we get tighter packing
221
    //  of the data. Shared fields can be accessed via 'base' member of
222
    //  the union.
223
    union
224
    {
225
        struct
226
        {
227
            metadata_t *metadata;
228
            unsigned char unused[msg_t_size
229
                                 - (sizeof (metadata_t *) + 2
230
                                    + sizeof (uint32_t) + sizeof (group_t))];
231
            unsigned char type;
232
            unsigned char flags;
233
            uint32_t routing_id;
234
            group_t group;
235
        } base;
236
        struct
237
        {
238
            metadata_t *metadata;
239
            unsigned char data[max_vsm_size];
240
            unsigned char size;
241
            unsigned char type;
242
            unsigned char flags;
243
            uint32_t routing_id;
244
            group_t group;
245
        } vsm;
246
        struct
247
        {
248
            metadata_t *metadata;
249
            content_t *content;
250
            unsigned char
251
              unused[msg_t_size
252
                     - (sizeof (metadata_t *) + sizeof (content_t *) + 2
253
                        + sizeof (uint32_t) + sizeof (group_t))];
254
            unsigned char type;
255
            unsigned char flags;
256
            uint32_t routing_id;
257
            group_t group;
258
        } lmsg;
259
        struct
260
        {
261
            metadata_t *metadata;
262
            content_t *content;
263
            unsigned char
264
              unused[msg_t_size
265
                     - (sizeof (metadata_t *) + sizeof (content_t *) + 2
266
                        + sizeof (uint32_t) + sizeof (group_t))];
267
            unsigned char type;
268
            unsigned char flags;
269
            uint32_t routing_id;
270
            group_t group;
271
        } zclmsg;
272
        struct
273
        {
274
            metadata_t *metadata;
275
            void *data;
276
            size_t size;
277
            unsigned char unused[msg_t_size
278
                                 - (sizeof (metadata_t *) + sizeof (void *)
279
                                    + sizeof (size_t) + 2 + sizeof (uint32_t)
280
                                    + sizeof (group_t))];
281
            unsigned char type;
282
            unsigned char flags;
283
            uint32_t routing_id;
284
            group_t group;
285
        } cmsg;
286
        struct
287
        {
288
            metadata_t *metadata;
289
            unsigned char unused[msg_t_size
290
                                 - (sizeof (metadata_t *) + 2
291
                                    + sizeof (uint32_t) + sizeof (group_t))];
292
            unsigned char type;
293
            unsigned char flags;
294
            uint32_t routing_id;
295
            group_t group;
296
        } delimiter;
297
    } _u;
298
};
299
300
inline int close_and_return (zmq::msg_t *msg_, int echo_)
301
0
{
302
    // Since we abort on close failure we preserve errno for success case.
303
0
    const int err = errno;
304
0
    const int rc = msg_->close ();
305
0
    errno_assert (rc == 0);
306
0
    errno = err;
307
0
    return echo_;
308
0
}
309
310
inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
311
0
{
312
0
    for (int i = 0; i < count_; i++)
313
0
        close_and_return (&msg_[i], 0);
314
0
    return echo_;
315
0
}
316
}
317
318
#endif