Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "compat.hpp" |
5 | | #include "macros.hpp" |
6 | | #include "msg.hpp" |
7 | | |
8 | | #include <string.h> |
9 | | #include <stdlib.h> |
10 | | #include <new> |
11 | | |
12 | | #include "stdint.hpp" |
13 | | #include "likely.hpp" |
14 | | #include "metadata.hpp" |
15 | | #include "err.hpp" |
16 | | |
17 | | // Check whether the sizes of public representation of the message (zmq_msg_t) |
18 | | // and private representation of the message (zmq::msg_t) match. |
19 | | |
20 | | typedef char |
21 | | zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) |
22 | | - 1]; |
23 | | |
24 | | bool zmq::msg_t::check () const |
25 | 0 | { |
26 | 0 | return _u.base.type >= type_min && _u.base.type <= type_max; |
27 | 0 | } |
28 | | |
29 | | int zmq::msg_t::init (void *data_, |
30 | | size_t size_, |
31 | | msg_free_fn *ffn_, |
32 | | void *hint_, |
33 | | content_t *content_) |
34 | 0 | { |
35 | 0 | if (size_ <= max_vsm_size) { |
36 | 0 | const int rc = init_size (size_); |
37 | |
|
38 | 0 | if (rc != -1) { |
39 | 0 | memcpy (data (), data_, size_); |
40 | 0 | return 0; |
41 | 0 | } |
42 | 0 | return -1; |
43 | 0 | } |
44 | 0 | if (content_) { |
45 | 0 | return init_external_storage (content_, data_, size_, ffn_, hint_); |
46 | 0 | } |
47 | 0 | return init_data (data_, size_, ffn_, hint_); |
48 | 0 | } |
49 | | |
50 | | int zmq::msg_t::init () |
51 | 0 | { |
52 | 0 | _u.vsm.metadata = NULL; |
53 | 0 | _u.vsm.type = type_vsm; |
54 | 0 | _u.vsm.flags = 0; |
55 | 0 | _u.vsm.size = 0; |
56 | 0 | _u.vsm.group.sgroup.group[0] = '\0'; |
57 | 0 | _u.vsm.group.type = group_type_short; |
58 | 0 | _u.vsm.routing_id = 0; |
59 | 0 | return 0; |
60 | 0 | } |
61 | | |
62 | | int zmq::msg_t::init_size (size_t size_) |
63 | 0 | { |
64 | 0 | if (size_ <= max_vsm_size) { |
65 | 0 | _u.vsm.metadata = NULL; |
66 | 0 | _u.vsm.type = type_vsm; |
67 | 0 | _u.vsm.flags = 0; |
68 | 0 | _u.vsm.size = static_cast<unsigned char> (size_); |
69 | 0 | _u.vsm.group.sgroup.group[0] = '\0'; |
70 | 0 | _u.vsm.group.type = group_type_short; |
71 | 0 | _u.vsm.routing_id = 0; |
72 | 0 | } else { |
73 | 0 | _u.lmsg.metadata = NULL; |
74 | 0 | _u.lmsg.type = type_lmsg; |
75 | 0 | _u.lmsg.flags = 0; |
76 | 0 | _u.lmsg.group.sgroup.group[0] = '\0'; |
77 | 0 | _u.lmsg.group.type = group_type_short; |
78 | 0 | _u.lmsg.routing_id = 0; |
79 | 0 | _u.lmsg.content = NULL; |
80 | 0 | if (sizeof (content_t) + size_ > size_) |
81 | 0 | _u.lmsg.content = |
82 | 0 | static_cast<content_t *> (malloc (sizeof (content_t) + size_)); |
83 | 0 | if (unlikely (!_u.lmsg.content)) { |
84 | 0 | errno = ENOMEM; |
85 | 0 | return -1; |
86 | 0 | } |
87 | | |
88 | 0 | _u.lmsg.content->data = _u.lmsg.content + 1; |
89 | 0 | _u.lmsg.content->size = size_; |
90 | 0 | _u.lmsg.content->ffn = NULL; |
91 | 0 | _u.lmsg.content->hint = NULL; |
92 | 0 | new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t (); |
93 | 0 | } |
94 | 0 | return 0; |
95 | 0 | } |
96 | | |
97 | | int zmq::msg_t::init_buffer (const void *buf_, size_t size_) |
98 | 0 | { |
99 | 0 | const int rc = init_size (size_); |
100 | 0 | if (unlikely (rc < 0)) { |
101 | 0 | return -1; |
102 | 0 | } |
103 | 0 | if (size_) { |
104 | | // NULL and zero size is allowed |
105 | 0 | assert (NULL != buf_); |
106 | 0 | memcpy (data (), buf_, size_); |
107 | 0 | } |
108 | 0 | return 0; |
109 | 0 | } |
110 | | |
111 | | int zmq::msg_t::init_external_storage (content_t *content_, |
112 | | void *data_, |
113 | | size_t size_, |
114 | | msg_free_fn *ffn_, |
115 | | void *hint_) |
116 | 0 | { |
117 | 0 | zmq_assert (NULL != data_); |
118 | 0 | zmq_assert (NULL != content_); |
119 | |
|
120 | 0 | _u.zclmsg.metadata = NULL; |
121 | 0 | _u.zclmsg.type = type_zclmsg; |
122 | 0 | _u.zclmsg.flags = 0; |
123 | 0 | _u.zclmsg.group.sgroup.group[0] = '\0'; |
124 | 0 | _u.zclmsg.group.type = group_type_short; |
125 | 0 | _u.zclmsg.routing_id = 0; |
126 | |
|
127 | 0 | _u.zclmsg.content = content_; |
128 | 0 | _u.zclmsg.content->data = data_; |
129 | 0 | _u.zclmsg.content->size = size_; |
130 | 0 | _u.zclmsg.content->ffn = ffn_; |
131 | 0 | _u.zclmsg.content->hint = hint_; |
132 | 0 | new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t (); |
133 | |
|
134 | 0 | return 0; |
135 | 0 | } |
136 | | |
137 | | int zmq::msg_t::init_data (void *data_, |
138 | | size_t size_, |
139 | | msg_free_fn *ffn_, |
140 | | void *hint_) |
141 | 0 | { |
142 | | // If data is NULL and size is not 0, a segfault |
143 | | // would occur once the data is accessed |
144 | 0 | zmq_assert (data_ != NULL || size_ == 0); |
145 | | |
146 | | // Initialize constant message if there's no need to deallocate |
147 | 0 | if (ffn_ == NULL) { |
148 | 0 | _u.cmsg.metadata = NULL; |
149 | 0 | _u.cmsg.type = type_cmsg; |
150 | 0 | _u.cmsg.flags = 0; |
151 | 0 | _u.cmsg.data = data_; |
152 | 0 | _u.cmsg.size = size_; |
153 | 0 | _u.cmsg.group.sgroup.group[0] = '\0'; |
154 | 0 | _u.cmsg.group.type = group_type_short; |
155 | 0 | _u.cmsg.routing_id = 0; |
156 | 0 | } else { |
157 | 0 | _u.lmsg.metadata = NULL; |
158 | 0 | _u.lmsg.type = type_lmsg; |
159 | 0 | _u.lmsg.flags = 0; |
160 | 0 | _u.lmsg.group.sgroup.group[0] = '\0'; |
161 | 0 | _u.lmsg.group.type = group_type_short; |
162 | 0 | _u.lmsg.routing_id = 0; |
163 | 0 | _u.lmsg.content = |
164 | 0 | static_cast<content_t *> (malloc (sizeof (content_t))); |
165 | 0 | if (!_u.lmsg.content) { |
166 | 0 | errno = ENOMEM; |
167 | 0 | return -1; |
168 | 0 | } |
169 | | |
170 | 0 | _u.lmsg.content->data = data_; |
171 | 0 | _u.lmsg.content->size = size_; |
172 | 0 | _u.lmsg.content->ffn = ffn_; |
173 | 0 | _u.lmsg.content->hint = hint_; |
174 | 0 | new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t (); |
175 | 0 | } |
176 | 0 | return 0; |
177 | 0 | } |
178 | | |
179 | | int zmq::msg_t::init_delimiter () |
180 | 0 | { |
181 | 0 | _u.delimiter.metadata = NULL; |
182 | 0 | _u.delimiter.type = type_delimiter; |
183 | 0 | _u.delimiter.flags = 0; |
184 | 0 | _u.delimiter.group.sgroup.group[0] = '\0'; |
185 | 0 | _u.delimiter.group.type = group_type_short; |
186 | 0 | _u.delimiter.routing_id = 0; |
187 | 0 | return 0; |
188 | 0 | } |
189 | | |
190 | | int zmq::msg_t::init_join () |
191 | 0 | { |
192 | 0 | _u.base.metadata = NULL; |
193 | 0 | _u.base.type = type_join; |
194 | 0 | _u.base.flags = 0; |
195 | 0 | _u.base.group.sgroup.group[0] = '\0'; |
196 | 0 | _u.base.group.type = group_type_short; |
197 | 0 | _u.base.routing_id = 0; |
198 | 0 | return 0; |
199 | 0 | } |
200 | | |
201 | | int zmq::msg_t::init_leave () |
202 | 0 | { |
203 | 0 | _u.base.metadata = NULL; |
204 | 0 | _u.base.type = type_leave; |
205 | 0 | _u.base.flags = 0; |
206 | 0 | _u.base.group.sgroup.group[0] = '\0'; |
207 | 0 | _u.base.group.type = group_type_short; |
208 | 0 | _u.base.routing_id = 0; |
209 | 0 | return 0; |
210 | 0 | } |
211 | | |
212 | | int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic_) |
213 | 0 | { |
214 | 0 | int rc = init_size (size_); |
215 | 0 | if (rc == 0) { |
216 | 0 | set_flags (zmq::msg_t::subscribe); |
217 | | |
218 | | // We explicitly allow a NULL subscription with size zero |
219 | 0 | if (size_) { |
220 | 0 | assert (topic_); |
221 | 0 | memcpy (data (), topic_, size_); |
222 | 0 | } |
223 | 0 | } |
224 | 0 | return rc; |
225 | 0 | } |
226 | | |
227 | | int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic_) |
228 | 0 | { |
229 | 0 | int rc = init_size (size_); |
230 | 0 | if (rc == 0) { |
231 | 0 | set_flags (zmq::msg_t::cancel); |
232 | | |
233 | | // We explicitly allow a NULL subscription with size zero |
234 | 0 | if (size_) { |
235 | 0 | assert (topic_); |
236 | 0 | memcpy (data (), topic_, size_); |
237 | 0 | } |
238 | 0 | } |
239 | 0 | return rc; |
240 | 0 | } |
241 | | |
242 | | int zmq::msg_t::close () |
243 | 0 | { |
244 | | // Check the validity of the message. |
245 | 0 | if (unlikely (!check ())) { |
246 | 0 | errno = EFAULT; |
247 | 0 | return -1; |
248 | 0 | } |
249 | | |
250 | 0 | if (_u.base.type == type_lmsg) { |
251 | | // If the content is not shared, or if it is shared and the reference |
252 | | // count has dropped to zero, deallocate it. |
253 | 0 | if (!(_u.lmsg.flags & msg_t::shared) |
254 | 0 | || !_u.lmsg.content->refcnt.sub (1)) { |
255 | | // We used "placement new" operator to initialize the reference |
256 | | // counter so we call the destructor explicitly now. |
257 | 0 | _u.lmsg.content->refcnt.~atomic_counter_t (); |
258 | |
|
259 | 0 | if (_u.lmsg.content->ffn) |
260 | 0 | _u.lmsg.content->ffn (_u.lmsg.content->data, |
261 | 0 | _u.lmsg.content->hint); |
262 | 0 | free (_u.lmsg.content); |
263 | 0 | } |
264 | 0 | } |
265 | |
|
266 | 0 | if (is_zcmsg ()) { |
267 | 0 | zmq_assert (_u.zclmsg.content->ffn); |
268 | | |
269 | | // If the content is not shared, or if it is shared and the reference |
270 | | // count has dropped to zero, deallocate it. |
271 | 0 | if (!(_u.zclmsg.flags & msg_t::shared) |
272 | 0 | || !_u.zclmsg.content->refcnt.sub (1)) { |
273 | | // We used "placement new" operator to initialize the reference |
274 | | // counter so we call the destructor explicitly now. |
275 | 0 | _u.zclmsg.content->refcnt.~atomic_counter_t (); |
276 | |
|
277 | 0 | _u.zclmsg.content->ffn (_u.zclmsg.content->data, |
278 | 0 | _u.zclmsg.content->hint); |
279 | 0 | } |
280 | 0 | } |
281 | |
|
282 | 0 | if (_u.base.metadata != NULL) { |
283 | 0 | if (_u.base.metadata->drop_ref ()) { |
284 | 0 | LIBZMQ_DELETE (_u.base.metadata); |
285 | 0 | } |
286 | 0 | _u.base.metadata = NULL; |
287 | 0 | } |
288 | |
|
289 | 0 | if (_u.base.group.type == group_type_long) { |
290 | 0 | if (!_u.base.group.lgroup.content->refcnt.sub (1)) { |
291 | | // We used "placement new" operator to initialize the reference |
292 | | // counter so we call the destructor explicitly now. |
293 | 0 | _u.base.group.lgroup.content->refcnt.~atomic_counter_t (); |
294 | |
|
295 | 0 | free (_u.base.group.lgroup.content); |
296 | 0 | } |
297 | 0 | } |
298 | | |
299 | | // Make the message invalid. |
300 | 0 | _u.base.type = 0; |
301 | |
|
302 | 0 | return 0; |
303 | 0 | } |
304 | | |
305 | | int zmq::msg_t::move (msg_t &src_) |
306 | 0 | { |
307 | | // Check the validity of the source. |
308 | 0 | if (unlikely (!src_.check ())) { |
309 | 0 | errno = EFAULT; |
310 | 0 | return -1; |
311 | 0 | } |
312 | | |
313 | 0 | int rc = close (); |
314 | 0 | if (unlikely (rc < 0)) |
315 | 0 | return rc; |
316 | | |
317 | 0 | *this = src_; |
318 | |
|
319 | 0 | rc = src_.init (); |
320 | 0 | if (unlikely (rc < 0)) |
321 | 0 | return rc; |
322 | | |
323 | 0 | return 0; |
324 | 0 | } |
325 | | |
326 | | int zmq::msg_t::copy (msg_t &src_) |
327 | 0 | { |
328 | | // Check the validity of the source. |
329 | 0 | if (unlikely (!src_.check ())) { |
330 | 0 | errno = EFAULT; |
331 | 0 | return -1; |
332 | 0 | } |
333 | | |
334 | 0 | const int rc = close (); |
335 | 0 | if (unlikely (rc < 0)) |
336 | 0 | return rc; |
337 | | |
338 | | // The initial reference count, when a non-shared message is initially |
339 | | // shared (between the original and the copy we create here). |
340 | 0 | const atomic_counter_t::integer_t initial_shared_refcnt = 2; |
341 | |
|
342 | 0 | if (src_.is_lmsg () || src_.is_zcmsg ()) { |
343 | | // One reference is added to shared messages. Non-shared messages |
344 | | // are turned into shared messages. |
345 | 0 | if (src_.flags () & msg_t::shared) |
346 | 0 | src_.refcnt ()->add (1); |
347 | 0 | else { |
348 | 0 | src_.set_flags (msg_t::shared); |
349 | 0 | src_.refcnt ()->set (initial_shared_refcnt); |
350 | 0 | } |
351 | 0 | } |
352 | |
|
353 | 0 | if (src_._u.base.metadata != NULL) |
354 | 0 | src_._u.base.metadata->add_ref (); |
355 | |
|
356 | 0 | if (src_._u.base.group.type == group_type_long) |
357 | 0 | src_._u.base.group.lgroup.content->refcnt.add (1); |
358 | |
|
359 | 0 | *this = src_; |
360 | |
|
361 | 0 | return 0; |
362 | 0 | } |
363 | | |
364 | | void *zmq::msg_t::data () |
365 | 0 | { |
366 | | // Check the validity of the message. |
367 | 0 | zmq_assert (check ()); |
368 | |
|
369 | 0 | switch (_u.base.type) { |
370 | 0 | case type_vsm: |
371 | 0 | return _u.vsm.data; |
372 | 0 | case type_lmsg: |
373 | 0 | return _u.lmsg.content->data; |
374 | 0 | case type_cmsg: |
375 | 0 | return _u.cmsg.data; |
376 | 0 | case type_zclmsg: |
377 | 0 | return _u.zclmsg.content->data; |
378 | 0 | default: |
379 | 0 | zmq_assert (false); |
380 | 0 | return NULL; |
381 | 0 | } |
382 | 0 | } |
383 | | |
384 | | size_t zmq::msg_t::size () const |
385 | 0 | { |
386 | | // Check the validity of the message. |
387 | 0 | zmq_assert (check ()); |
388 | |
|
389 | 0 | switch (_u.base.type) { |
390 | 0 | case type_vsm: |
391 | 0 | return _u.vsm.size; |
392 | 0 | case type_lmsg: |
393 | 0 | return _u.lmsg.content->size; |
394 | 0 | case type_zclmsg: |
395 | 0 | return _u.zclmsg.content->size; |
396 | 0 | case type_cmsg: |
397 | 0 | return _u.cmsg.size; |
398 | 0 | default: |
399 | 0 | zmq_assert (false); |
400 | 0 | return 0; |
401 | 0 | } |
402 | 0 | } |
403 | | |
404 | | void zmq::msg_t::shrink (size_t new_size_) |
405 | 0 | { |
406 | | // Check the validity of the message. |
407 | 0 | zmq_assert (check ()); |
408 | 0 | zmq_assert (new_size_ <= size ()); |
409 | |
|
410 | 0 | switch (_u.base.type) { |
411 | 0 | case type_vsm: |
412 | 0 | _u.vsm.size = static_cast<unsigned char> (new_size_); |
413 | 0 | break; |
414 | 0 | case type_lmsg: |
415 | 0 | _u.lmsg.content->size = new_size_; |
416 | 0 | break; |
417 | 0 | case type_zclmsg: |
418 | 0 | _u.zclmsg.content->size = new_size_; |
419 | 0 | break; |
420 | 0 | case type_cmsg: |
421 | 0 | _u.cmsg.size = new_size_; |
422 | 0 | break; |
423 | 0 | default: |
424 | 0 | zmq_assert (false); |
425 | 0 | } |
426 | 0 | } |
427 | | |
428 | | unsigned char zmq::msg_t::flags () const |
429 | 0 | { |
430 | 0 | return _u.base.flags; |
431 | 0 | } |
432 | | |
433 | | void zmq::msg_t::set_flags (unsigned char flags_) |
434 | 0 | { |
435 | 0 | _u.base.flags |= flags_; |
436 | 0 | } |
437 | | |
438 | | void zmq::msg_t::reset_flags (unsigned char flags_) |
439 | 0 | { |
440 | 0 | _u.base.flags &= ~flags_; |
441 | 0 | } |
442 | | |
443 | | zmq::metadata_t *zmq::msg_t::metadata () const |
444 | 0 | { |
445 | 0 | return _u.base.metadata; |
446 | 0 | } |
447 | | |
448 | | void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_) |
449 | 0 | { |
450 | 0 | assert (metadata_ != NULL); |
451 | 0 | assert (_u.base.metadata == NULL); |
452 | 0 | metadata_->add_ref (); |
453 | 0 | _u.base.metadata = metadata_; |
454 | 0 | } |
455 | | |
456 | | void zmq::msg_t::reset_metadata () |
457 | 0 | { |
458 | 0 | if (_u.base.metadata) { |
459 | 0 | if (_u.base.metadata->drop_ref ()) { |
460 | 0 | LIBZMQ_DELETE (_u.base.metadata); |
461 | 0 | } |
462 | 0 | _u.base.metadata = NULL; |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | | bool zmq::msg_t::is_routing_id () const |
467 | 0 | { |
468 | 0 | return (_u.base.flags & routing_id) == routing_id; |
469 | 0 | } |
470 | | |
471 | | bool zmq::msg_t::is_credential () const |
472 | 0 | { |
473 | 0 | return (_u.base.flags & credential) == credential; |
474 | 0 | } |
475 | | |
476 | | bool zmq::msg_t::is_delimiter () const |
477 | 0 | { |
478 | 0 | return _u.base.type == type_delimiter; |
479 | 0 | } |
480 | | |
481 | | bool zmq::msg_t::is_vsm () const |
482 | 0 | { |
483 | 0 | return _u.base.type == type_vsm; |
484 | 0 | } |
485 | | |
486 | | bool zmq::msg_t::is_cmsg () const |
487 | 0 | { |
488 | 0 | return _u.base.type == type_cmsg; |
489 | 0 | } |
490 | | |
491 | | bool zmq::msg_t::is_lmsg () const |
492 | 0 | { |
493 | 0 | return _u.base.type == type_lmsg; |
494 | 0 | } |
495 | | |
496 | | bool zmq::msg_t::is_zcmsg () const |
497 | 0 | { |
498 | 0 | return _u.base.type == type_zclmsg; |
499 | 0 | } |
500 | | |
501 | | bool zmq::msg_t::is_join () const |
502 | 0 | { |
503 | 0 | return _u.base.type == type_join; |
504 | 0 | } |
505 | | |
506 | | bool zmq::msg_t::is_leave () const |
507 | 0 | { |
508 | 0 | return _u.base.type == type_leave; |
509 | 0 | } |
510 | | |
511 | | bool zmq::msg_t::is_ping () const |
512 | 0 | { |
513 | 0 | return (_u.base.flags & CMD_TYPE_MASK) == ping; |
514 | 0 | } |
515 | | |
516 | | bool zmq::msg_t::is_pong () const |
517 | 0 | { |
518 | 0 | return (_u.base.flags & CMD_TYPE_MASK) == pong; |
519 | 0 | } |
520 | | |
521 | | bool zmq::msg_t::is_close_cmd () const |
522 | 0 | { |
523 | 0 | return (_u.base.flags & CMD_TYPE_MASK) == close_cmd; |
524 | 0 | } |
525 | | |
526 | | size_t zmq::msg_t::command_body_size () const |
527 | 0 | { |
528 | 0 | if (this->is_ping () || this->is_pong ()) |
529 | 0 | return this->size () - ping_cmd_name_size; |
530 | 0 | else if (!(this->flags () & msg_t::command) |
531 | 0 | && (this->is_subscribe () || this->is_cancel ())) |
532 | 0 | return this->size (); |
533 | 0 | else if (this->is_subscribe ()) |
534 | 0 | return this->size () - sub_cmd_name_size; |
535 | 0 | else if (this->is_cancel ()) |
536 | 0 | return this->size () - cancel_cmd_name_size; |
537 | | |
538 | 0 | return 0; |
539 | 0 | } |
540 | | |
541 | | void *zmq::msg_t::command_body () |
542 | 0 | { |
543 | 0 | unsigned char *data = NULL; |
544 | |
|
545 | 0 | if (this->is_ping () || this->is_pong ()) |
546 | 0 | data = |
547 | 0 | static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size; |
548 | | // With inproc, command flag is not set for sub/cancel |
549 | 0 | else if (!(this->flags () & msg_t::command) |
550 | 0 | && (this->is_subscribe () || this->is_cancel ())) |
551 | 0 | data = static_cast<unsigned char *> (this->data ()); |
552 | 0 | else if (this->is_subscribe ()) |
553 | 0 | data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size; |
554 | 0 | else if (this->is_cancel ()) |
555 | 0 | data = |
556 | 0 | static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size; |
557 | |
|
558 | 0 | return data; |
559 | 0 | } |
560 | | |
561 | | void zmq::msg_t::add_refs (int refs_) |
562 | 0 | { |
563 | 0 | zmq_assert (refs_ >= 0); |
564 | | |
565 | | // Operation not supported for messages with metadata. |
566 | 0 | zmq_assert (_u.base.metadata == NULL); |
567 | | |
568 | | // No copies required. |
569 | 0 | if (!refs_) |
570 | 0 | return; |
571 | | |
572 | | // VSMs, CMSGS and delimiters can be copied straight away. The only |
573 | | // message type that needs special care are long messages. |
574 | 0 | if (_u.base.type == type_lmsg || is_zcmsg ()) { |
575 | 0 | if (_u.base.flags & msg_t::shared) |
576 | 0 | refcnt ()->add (refs_); |
577 | 0 | else { |
578 | 0 | refcnt ()->set (refs_ + 1); |
579 | 0 | _u.base.flags |= msg_t::shared; |
580 | 0 | } |
581 | 0 | } |
582 | 0 | } |
583 | | |
584 | | bool zmq::msg_t::rm_refs (int refs_) |
585 | 0 | { |
586 | 0 | zmq_assert (refs_ >= 0); |
587 | | |
588 | | // Operation not supported for messages with metadata. |
589 | 0 | zmq_assert (_u.base.metadata == NULL); |
590 | | |
591 | | // No copies required. |
592 | 0 | if (!refs_) |
593 | 0 | return true; |
594 | | |
595 | | // If there's only one reference close the message. |
596 | 0 | if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg) |
597 | 0 | || !(_u.base.flags & msg_t::shared)) { |
598 | 0 | close (); |
599 | 0 | return false; |
600 | 0 | } |
601 | | |
602 | | // The only message type that needs special care are long and zcopy messages. |
603 | 0 | if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) { |
604 | | // We used "placement new" operator to initialize the reference |
605 | | // counter so we call the destructor explicitly now. |
606 | 0 | _u.lmsg.content->refcnt.~atomic_counter_t (); |
607 | |
|
608 | 0 | if (_u.lmsg.content->ffn) |
609 | 0 | _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint); |
610 | 0 | free (_u.lmsg.content); |
611 | |
|
612 | 0 | return false; |
613 | 0 | } |
614 | | |
615 | 0 | if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) { |
616 | | // storage for rfcnt is provided externally |
617 | 0 | if (_u.zclmsg.content->ffn) { |
618 | 0 | _u.zclmsg.content->ffn (_u.zclmsg.content->data, |
619 | 0 | _u.zclmsg.content->hint); |
620 | 0 | } |
621 | |
|
622 | 0 | return false; |
623 | 0 | } |
624 | | |
625 | 0 | return true; |
626 | 0 | } |
627 | | |
628 | | uint32_t zmq::msg_t::get_routing_id () const |
629 | 0 | { |
630 | 0 | return _u.base.routing_id; |
631 | 0 | } |
632 | | |
633 | | int zmq::msg_t::set_routing_id (uint32_t routing_id_) |
634 | 0 | { |
635 | 0 | if (routing_id_) { |
636 | 0 | _u.base.routing_id = routing_id_; |
637 | 0 | return 0; |
638 | 0 | } |
639 | 0 | errno = EINVAL; |
640 | 0 | return -1; |
641 | 0 | } |
642 | | |
643 | | int zmq::msg_t::reset_routing_id () |
644 | 0 | { |
645 | 0 | _u.base.routing_id = 0; |
646 | 0 | return 0; |
647 | 0 | } |
648 | | |
649 | | const char *zmq::msg_t::group () const |
650 | 0 | { |
651 | 0 | if (_u.base.group.type == group_type_long) |
652 | 0 | return _u.base.group.lgroup.content->group; |
653 | 0 | return _u.base.group.sgroup.group; |
654 | 0 | } |
655 | | |
656 | | int zmq::msg_t::set_group (const char *group_) |
657 | 0 | { |
658 | 0 | size_t length = strnlen (group_, ZMQ_GROUP_MAX_LENGTH); |
659 | |
|
660 | 0 | return set_group (group_, length); |
661 | 0 | } |
662 | | |
663 | | int zmq::msg_t::set_group (const char *group_, size_t length_) |
664 | 0 | { |
665 | 0 | if (length_ > ZMQ_GROUP_MAX_LENGTH) { |
666 | 0 | errno = EINVAL; |
667 | 0 | return -1; |
668 | 0 | } |
669 | | |
670 | 0 | if (length_ > 14) { |
671 | 0 | _u.base.group.lgroup.type = group_type_long; |
672 | 0 | _u.base.group.lgroup.content = |
673 | 0 | (long_group_t *) malloc (sizeof (long_group_t)); |
674 | 0 | assert (_u.base.group.lgroup.content); |
675 | 0 | new (&_u.base.group.lgroup.content->refcnt) zmq::atomic_counter_t (); |
676 | 0 | _u.base.group.lgroup.content->refcnt.set (1); |
677 | 0 | strncpy (_u.base.group.lgroup.content->group, group_, length_); |
678 | 0 | _u.base.group.lgroup.content->group[length_] = '\0'; |
679 | 0 | } else { |
680 | 0 | strncpy (_u.base.group.sgroup.group, group_, length_); |
681 | 0 | _u.base.group.sgroup.group[length_] = '\0'; |
682 | 0 | } |
683 | | |
684 | 0 | return 0; |
685 | 0 | } |
686 | | |
687 | | zmq::atomic_counter_t *zmq::msg_t::refcnt () |
688 | 0 | { |
689 | 0 | switch (_u.base.type) { |
690 | 0 | case type_lmsg: |
691 | 0 | return &_u.lmsg.content->refcnt; |
692 | 0 | case type_zclmsg: |
693 | 0 | return &_u.zclmsg.content->refcnt; |
694 | 0 | default: |
695 | 0 | zmq_assert (false); |
696 | | return NULL; |
697 | 0 | } |
698 | 0 | } |