Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #ifndef __ZMQ_MSG_HPP_INCLUDE__ |
4 | | #define __ZMQ_MSG_HPP_INCLUDE__ |
5 | | |
6 | | #include <stddef.h> |
7 | | #include <stdio.h> |
8 | | |
9 | | #include "config.hpp" |
10 | | #include "err.hpp" |
11 | | #include "fd.hpp" |
12 | | #include "atomic_counter.hpp" |
13 | | #include "metadata.hpp" |
14 | | |
15 | | // bits 2-5 |
16 | 0 | #define CMD_TYPE_MASK 0x1c |
17 | | |
18 | | // Signature for free function to deallocate the message content. |
19 | | // Note that it has to be declared as "C" so that it is the same as |
20 | | // zmq_free_fn defined in zmq.h. |
21 | | extern "C" { |
22 | | typedef void (msg_free_fn) (void *data_, void *hint_); |
23 | | } |
24 | | |
25 | | namespace zmq |
26 | | { |
27 | | // Note that this structure needs to be explicitly constructed |
28 | | // (init functions) and destructed (close function). |
29 | | |
30 | | static const char cancel_cmd_name[] = "\6CANCEL"; |
31 | | static const char sub_cmd_name[] = "\x9SUBSCRIBE"; |
32 | | |
33 | | class msg_t |
34 | | { |
35 | | public: |
36 | | // Shared message buffer. Message data are either allocated in one |
37 | | // continuous block along with this structure - thus avoiding one |
38 | | // malloc/free pair or they are stored in user-supplied memory. |
39 | | // In the latter case, ffn member stores pointer to the function to be |
40 | | // used to deallocate the data. If the buffer is actually shared (there |
41 | | // are at least 2 references to it) refcount member contains number of |
42 | | // references. |
43 | | struct content_t |
44 | | { |
45 | | void *data; |
46 | | size_t size; |
47 | | msg_free_fn *ffn; |
48 | | void *hint; |
49 | | zmq::atomic_counter_t refcnt; |
50 | | }; |
51 | | |
52 | | // Message flags. |
53 | | enum |
54 | | { |
55 | | more = 1, // Followed by more parts |
56 | | command = 2, // Command frame (see ZMTP spec) |
57 | | // Command types, use only bits 2-5 and compare with ==, not bitwise, |
58 | | // a command can never be of more that one type at the same time |
59 | | ping = 4, |
60 | | pong = 8, |
61 | | subscribe = 12, |
62 | | cancel = 16, |
63 | | close_cmd = 20, |
64 | | credential = 32, |
65 | | routing_id = 64, |
66 | | shared = 128 |
67 | | }; |
68 | | |
69 | | bool check () const; |
70 | | int init (); |
71 | | |
72 | | int init (void *data_, |
73 | | size_t size_, |
74 | | msg_free_fn *ffn_, |
75 | | void *hint_, |
76 | | content_t *content_ = NULL); |
77 | | |
78 | | int init_size (size_t size_); |
79 | | int init_buffer (const void *buf_, size_t size_); |
80 | | int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); |
81 | | int init_external_storage (content_t *content_, |
82 | | void *data_, |
83 | | size_t size_, |
84 | | msg_free_fn *ffn_, |
85 | | void *hint_); |
86 | | int init_delimiter (); |
87 | | int init_join (); |
88 | | int init_leave (); |
89 | | int init_subscribe (const size_t size_, const unsigned char *topic); |
90 | | int init_cancel (const size_t size_, const unsigned char *topic); |
91 | | int close (); |
92 | | int move (msg_t &src_); |
93 | | int copy (msg_t &src_); |
94 | | void *data (); |
95 | | size_t size () const; |
96 | | unsigned char flags () const; |
97 | | void set_flags (unsigned char flags_); |
98 | | void reset_flags (unsigned char flags_); |
99 | | metadata_t *metadata () const; |
100 | | void set_metadata (metadata_t *metadata_); |
101 | | void reset_metadata (); |
102 | | bool is_routing_id () const; |
103 | | bool is_credential () const; |
104 | | bool is_delimiter () const; |
105 | | bool is_join () const; |
106 | | bool is_leave () const; |
107 | | bool is_ping () const; |
108 | | bool is_pong () const; |
109 | | bool is_close_cmd () const; |
110 | | |
111 | | // These are called on each message received by the session_base class, |
112 | | // so get them inlined to avoid the overhead of 2 function calls per msg |
113 | | bool is_subscribe () const |
114 | 0 | { |
115 | 0 | return (_u.base.flags & CMD_TYPE_MASK) == subscribe; |
116 | 0 | } |
117 | | |
118 | | bool is_cancel () const |
119 | 0 | { |
120 | 0 | return (_u.base.flags & CMD_TYPE_MASK) == cancel; |
121 | 0 | } |
122 | | |
123 | | size_t command_body_size () const; |
124 | | void *command_body (); |
125 | | bool is_vsm () const; |
126 | | bool is_cmsg () const; |
127 | | bool is_lmsg () const; |
128 | | bool is_zcmsg () const; |
129 | | uint32_t get_routing_id () const; |
130 | | int set_routing_id (uint32_t routing_id_); |
131 | | int reset_routing_id (); |
132 | | const char *group () const; |
133 | | int set_group (const char *group_); |
134 | | int set_group (const char *, size_t length_); |
135 | | |
136 | | // After calling this function you can copy the message in POD-style |
137 | | // refs_ times. No need to call copy. |
138 | | void add_refs (int refs_); |
139 | | |
140 | | // Removes references previously added by add_refs. If the number of |
141 | | // references drops to 0, the message is closed and false is returned. |
142 | | bool rm_refs (int refs_); |
143 | | |
144 | | void shrink (size_t new_size_); |
145 | | |
146 | | // Size in bytes of the largest message that is still copied around |
147 | | // rather than being reference-counted. |
148 | | enum |
149 | | { |
150 | | msg_t_size = 64 |
151 | | }; |
152 | | enum |
153 | | { |
154 | | max_vsm_size = |
155 | | msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t)) |
156 | | }; |
157 | | enum |
158 | | { |
159 | | ping_cmd_name_size = 5, // 4PING |
160 | | cancel_cmd_name_size = 7, // 6CANCEL |
161 | | sub_cmd_name_size = 10 // 9SUBSCRIBE |
162 | | }; |
163 | | |
164 | | private: |
165 | | zmq::atomic_counter_t *refcnt (); |
166 | | |
167 | | // Different message types. |
168 | | enum type_t |
169 | | { |
170 | | type_min = 101, |
171 | | // VSM messages store the content in the message itself |
172 | | type_vsm = 101, |
173 | | // LMSG messages store the content in malloc-ed memory |
174 | | type_lmsg = 102, |
175 | | // Delimiter messages are used in envelopes |
176 | | type_delimiter = 103, |
177 | | // CMSG messages point to constant data |
178 | | type_cmsg = 104, |
179 | | |
180 | | // zero-copy LMSG message for v2_decoder |
181 | | type_zclmsg = 105, |
182 | | |
183 | | // Join message for radio_dish |
184 | | type_join = 106, |
185 | | |
186 | | // Leave message for radio_dish |
187 | | type_leave = 107, |
188 | | |
189 | | type_max = 107 |
190 | | }; |
191 | | |
192 | | enum group_type_t |
193 | | { |
194 | | group_type_short, |
195 | | group_type_long |
196 | | }; |
197 | | |
198 | | struct long_group_t |
199 | | { |
200 | | char group[ZMQ_GROUP_MAX_LENGTH + 1]; |
201 | | atomic_counter_t refcnt; |
202 | | }; |
203 | | |
204 | | union group_t |
205 | | { |
206 | | unsigned char type; |
207 | | struct |
208 | | { |
209 | | unsigned char type; |
210 | | char group[15]; |
211 | | } sgroup; |
212 | | struct |
213 | | { |
214 | | unsigned char type; |
215 | | long_group_t *content; |
216 | | } lgroup; |
217 | | }; |
218 | | |
219 | | // Note that fields shared between different message types are not |
220 | | // moved to the parent class (msg_t). This way we get tighter packing |
221 | | // of the data. Shared fields can be accessed via 'base' member of |
222 | | // the union. |
223 | | union |
224 | | { |
225 | | struct |
226 | | { |
227 | | metadata_t *metadata; |
228 | | unsigned char unused[msg_t_size |
229 | | - (sizeof (metadata_t *) + 2 |
230 | | + sizeof (uint32_t) + sizeof (group_t))]; |
231 | | unsigned char type; |
232 | | unsigned char flags; |
233 | | uint32_t routing_id; |
234 | | group_t group; |
235 | | } base; |
236 | | struct |
237 | | { |
238 | | metadata_t *metadata; |
239 | | unsigned char data[max_vsm_size]; |
240 | | unsigned char size; |
241 | | unsigned char type; |
242 | | unsigned char flags; |
243 | | uint32_t routing_id; |
244 | | group_t group; |
245 | | } vsm; |
246 | | struct |
247 | | { |
248 | | metadata_t *metadata; |
249 | | content_t *content; |
250 | | unsigned char |
251 | | unused[msg_t_size |
252 | | - (sizeof (metadata_t *) + sizeof (content_t *) + 2 |
253 | | + sizeof (uint32_t) + sizeof (group_t))]; |
254 | | unsigned char type; |
255 | | unsigned char flags; |
256 | | uint32_t routing_id; |
257 | | group_t group; |
258 | | } lmsg; |
259 | | struct |
260 | | { |
261 | | metadata_t *metadata; |
262 | | content_t *content; |
263 | | unsigned char |
264 | | unused[msg_t_size |
265 | | - (sizeof (metadata_t *) + sizeof (content_t *) + 2 |
266 | | + sizeof (uint32_t) + sizeof (group_t))]; |
267 | | unsigned char type; |
268 | | unsigned char flags; |
269 | | uint32_t routing_id; |
270 | | group_t group; |
271 | | } zclmsg; |
272 | | struct |
273 | | { |
274 | | metadata_t *metadata; |
275 | | void *data; |
276 | | size_t size; |
277 | | unsigned char unused[msg_t_size |
278 | | - (sizeof (metadata_t *) + sizeof (void *) |
279 | | + sizeof (size_t) + 2 + sizeof (uint32_t) |
280 | | + sizeof (group_t))]; |
281 | | unsigned char type; |
282 | | unsigned char flags; |
283 | | uint32_t routing_id; |
284 | | group_t group; |
285 | | } cmsg; |
286 | | struct |
287 | | { |
288 | | metadata_t *metadata; |
289 | | unsigned char unused[msg_t_size |
290 | | - (sizeof (metadata_t *) + 2 |
291 | | + sizeof (uint32_t) + sizeof (group_t))]; |
292 | | unsigned char type; |
293 | | unsigned char flags; |
294 | | uint32_t routing_id; |
295 | | group_t group; |
296 | | } delimiter; |
297 | | } _u; |
298 | | }; |
299 | | |
300 | | inline int close_and_return (zmq::msg_t *msg_, int echo_) |
301 | 0 | { |
302 | | // Since we abort on close failure we preserve errno for success case. |
303 | 0 | const int err = errno; |
304 | 0 | const int rc = msg_->close (); |
305 | 0 | errno_assert (rc == 0); |
306 | 0 | errno = err; |
307 | 0 | return echo_; |
308 | 0 | } |
309 | | |
310 | | inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_) |
311 | 0 | { |
312 | 0 | for (int i = 0; i < count_; i++) |
313 | 0 | close_and_return (&msg_[i], 0); |
314 | 0 | return echo_; |
315 | 0 | } |
316 | | } |
317 | | |
318 | | #endif |