Coverage Report

Created: 2025-07-18 07:01

/src/libzmq/src/object.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <string.h>
5
#include <stdarg.h>
6
7
#include "object.hpp"
8
#include "ctx.hpp"
9
#include "err.hpp"
10
#include "pipe.hpp"
11
#include "io_thread.hpp"
12
#include "session_base.hpp"
13
#include "socket_base.hpp"
14
15
5.31k
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : _ctx (ctx_), _tid (tid_)
16
5.31k
{
17
5.31k
}
18
19
zmq::object_t::object_t (object_t *parent_) :
20
2.59k
    _ctx (parent_->_ctx), _tid (parent_->_tid)
21
2.59k
{
22
2.59k
}
23
24
zmq::object_t::~object_t ()
25
7.90k
{
26
7.90k
}
27
28
uint32_t zmq::object_t::get_tid () const
29
14.4k
{
30
14.4k
    return _tid;
31
14.4k
}
32
33
void zmq::object_t::set_tid (uint32_t id_)
34
63
{
35
63
    _tid = id_;
36
63
}
37
38
zmq::ctx_t *zmq::object_t::get_ctx () const
39
1.24k
{
40
1.24k
    return _ctx;
41
1.24k
}
42
43
void zmq::object_t::process_command (const command_t &cmd_)
44
15.1k
{
45
15.1k
    switch (cmd_.type) {
46
42
        case command_t::activate_read:
47
42
            process_activate_read ();
48
42
            break;
49
50
63
        case command_t::activate_write:
51
63
            process_activate_write (cmd_.args.activate_write.msgs_read);
52
63
            break;
53
54
4.24k
        case command_t::stop:
55
4.24k
            process_stop ();
56
4.24k
            break;
57
58
1.23k
        case command_t::plug:
59
1.23k
            process_plug ();
60
1.23k
            process_seqnum ();
61
1.23k
            break;
62
63
1.23k
        case command_t::own:
64
1.23k
            process_own (cmd_.args.own.object);
65
1.23k
            process_seqnum ();
66
1.23k
            break;
67
68
0
        case command_t::attach:
69
0
            process_attach (cmd_.args.attach.engine);
70
0
            process_seqnum ();
71
0
            break;
72
73
63
        case command_t::bind:
74
63
            process_bind (cmd_.args.bind.pipe);
75
63
            process_seqnum ();
76
63
            break;
77
78
0
        case command_t::hiccup:
79
0
            process_hiccup (cmd_.args.hiccup.pipe);
80
0
            break;
81
82
0
        case command_t::pipe_peer_stats:
83
0
            process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
84
0
                                     cmd_.args.pipe_peer_stats.socket_base,
85
0
                                     cmd_.args.pipe_peer_stats.endpoint_pair);
86
0
            break;
87
88
0
        case command_t::pipe_stats_publish:
89
0
            process_pipe_stats_publish (
90
0
              cmd_.args.pipe_stats_publish.outbound_queue_count,
91
0
              cmd_.args.pipe_stats_publish.inbound_queue_count,
92
0
              cmd_.args.pipe_stats_publish.endpoint_pair);
93
0
            break;
94
95
721
        case command_t::pipe_term:
96
721
            process_pipe_term ();
97
721
            break;
98
99
1.35k
        case command_t::pipe_term_ack:
100
1.35k
            process_pipe_term_ack ();
101
1.35k
            break;
102
103
0
        case command_t::pipe_hwm:
104
0
            process_pipe_hwm (cmd_.args.pipe_hwm.inhwm,
105
0
                              cmd_.args.pipe_hwm.outhwm);
106
0
            break;
107
108
0
        case command_t::term_req:
109
0
            process_term_req (cmd_.args.term_req.object);
110
0
            break;
111
112
1.23k
        case command_t::term:
113
1.23k
            process_term (cmd_.args.term.linger);
114
1.23k
            break;
115
116
1.23k
        case command_t::term_ack:
117
1.23k
            process_term_ack ();
118
1.23k
            break;
119
120
0
        case command_t::term_endpoint:
121
0
            process_term_endpoint (cmd_.args.term_endpoint.endpoint);
122
0
            break;
123
124
1.81k
        case command_t::reap:
125
1.81k
            process_reap (cmd_.args.reap.socket);
126
1.81k
            break;
127
128
1.81k
        case command_t::reaped:
129
1.81k
            process_reaped ();
130
1.81k
            break;
131
132
63
        case command_t::inproc_connected:
133
63
            process_seqnum ();
134
63
            break;
135
136
0
        case command_t::conn_failed:
137
0
            process_conn_failed ();
138
0
            break;
139
140
0
        case command_t::done:
141
0
        default:
142
0
            zmq_assert (false);
143
15.1k
    }
144
15.1k
}
145
146
int zmq::object_t::register_endpoint (const char *addr_,
147
                                      const endpoint_t &endpoint_)
148
63
{
149
63
    return _ctx->register_endpoint (addr_, endpoint_);
150
63
}
151
152
int zmq::object_t::unregister_endpoint (const std::string &addr_,
153
                                        socket_base_t *socket_)
154
0
{
155
0
    return _ctx->unregister_endpoint (addr_, socket_);
156
0
}
157
158
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
159
1.81k
{
160
1.81k
    return _ctx->unregister_endpoints (socket_);
161
1.81k
}
162
163
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) const
164
63
{
165
63
    return _ctx->find_endpoint (addr_);
166
63
}
167
168
void zmq::object_t::pend_connection (const std::string &addr_,
169
                                     const endpoint_t &endpoint_,
170
                                     pipe_t **pipes_)
171
63
{
172
63
    _ctx->pend_connection (addr_, endpoint_, pipes_);
173
63
}
174
175
void zmq::object_t::connect_pending (const char *addr_,
176
                                     zmq::socket_base_t *bind_socket_)
177
63
{
178
63
    return _ctx->connect_pending (addr_, bind_socket_);
179
63
}
180
181
void zmq::object_t::destroy_socket (socket_base_t *socket_)
182
1.81k
{
183
1.81k
    _ctx->destroy_socket (socket_);
184
1.81k
}
185
186
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) const
187
1.86k
{
188
1.86k
    return _ctx->choose_io_thread (affinity_);
189
1.86k
}
190
191
void zmq::object_t::send_stop ()
192
5.31k
{
193
    //  'stop' command goes always from administrative thread to
194
    //  the current object.
195
5.31k
    command_t cmd;
196
5.31k
    cmd.destination = this;
197
5.31k
    cmd.type = command_t::stop;
198
5.31k
    _ctx->send_command (_tid, cmd);
199
5.31k
}
200
201
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
202
1.23k
{
203
1.23k
    if (inc_seqnum_)
204
1.23k
        destination_->inc_seqnum ();
205
206
1.23k
    command_t cmd;
207
1.23k
    cmd.destination = destination_;
208
1.23k
    cmd.type = command_t::plug;
209
1.23k
    send_command (cmd);
210
1.23k
}
211
212
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
213
1.23k
{
214
1.23k
    destination_->inc_seqnum ();
215
1.23k
    command_t cmd;
216
1.23k
    cmd.destination = destination_;
217
1.23k
    cmd.type = command_t::own;
218
1.23k
    cmd.args.own.object = object_;
219
1.23k
    send_command (cmd);
220
1.23k
}
221
222
void zmq::object_t::send_attach (session_base_t *destination_,
223
                                 i_engine *engine_,
224
                                 bool inc_seqnum_)
225
0
{
226
0
    if (inc_seqnum_)
227
0
        destination_->inc_seqnum ();
228
229
0
    command_t cmd;
230
0
    cmd.destination = destination_;
231
0
    cmd.type = command_t::attach;
232
0
    cmd.args.attach.engine = engine_;
233
0
    send_command (cmd);
234
0
}
235
236
void zmq::object_t::send_conn_failed (session_base_t *destination_)
237
0
{
238
0
    command_t cmd;
239
0
    cmd.destination = destination_;
240
0
    cmd.type = command_t::conn_failed;
241
0
    send_command (cmd);
242
0
}
243
244
void zmq::object_t::send_bind (own_t *destination_,
245
                               pipe_t *pipe_,
246
                               bool inc_seqnum_)
247
0
{
248
0
    if (inc_seqnum_)
249
0
        destination_->inc_seqnum ();
250
251
0
    command_t cmd;
252
0
    cmd.destination = destination_;
253
0
    cmd.type = command_t::bind;
254
0
    cmd.args.bind.pipe = pipe_;
255
0
    send_command (cmd);
256
0
}
257
258
void zmq::object_t::send_activate_read (pipe_t *destination_)
259
42
{
260
42
    command_t cmd;
261
42
    cmd.destination = destination_;
262
42
    cmd.type = command_t::activate_read;
263
42
    send_command (cmd);
264
42
}
265
266
void zmq::object_t::send_activate_write (pipe_t *destination_,
267
                                         uint64_t msgs_read_)
268
63
{
269
63
    command_t cmd;
270
63
    cmd.destination = destination_;
271
63
    cmd.type = command_t::activate_write;
272
63
    cmd.args.activate_write.msgs_read = msgs_read_;
273
63
    send_command (cmd);
274
63
}
275
276
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
277
0
{
278
0
    command_t cmd;
279
0
    cmd.destination = destination_;
280
0
    cmd.type = command_t::hiccup;
281
0
    cmd.args.hiccup.pipe = pipe_;
282
0
    send_command (cmd);
283
0
}
284
285
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
286
                                          uint64_t queue_count_,
287
                                          own_t *socket_base_,
288
                                          endpoint_uri_pair_t *endpoint_pair_)
289
0
{
290
0
    command_t cmd;
291
0
    cmd.destination = destination_;
292
0
    cmd.type = command_t::pipe_peer_stats;
293
0
    cmd.args.pipe_peer_stats.queue_count = queue_count_;
294
0
    cmd.args.pipe_peer_stats.socket_base = socket_base_;
295
0
    cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
296
0
    send_command (cmd);
297
0
}
298
299
void zmq::object_t::send_pipe_stats_publish (
300
  own_t *destination_,
301
  uint64_t outbound_queue_count_,
302
  uint64_t inbound_queue_count_,
303
  endpoint_uri_pair_t *endpoint_pair_)
304
0
{
305
0
    command_t cmd;
306
0
    cmd.destination = destination_;
307
0
    cmd.type = command_t::pipe_stats_publish;
308
0
    cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
309
0
    cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
310
0
    cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
311
0
    send_command (cmd);
312
0
}
313
314
void zmq::object_t::send_pipe_term (pipe_t *destination_)
315
721
{
316
721
    command_t cmd;
317
721
    cmd.destination = destination_;
318
721
    cmd.type = command_t::pipe_term;
319
721
    send_command (cmd);
320
721
}
321
322
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
323
1.35k
{
324
1.35k
    command_t cmd;
325
1.35k
    cmd.destination = destination_;
326
1.35k
    cmd.type = command_t::pipe_term_ack;
327
1.35k
    send_command (cmd);
328
1.35k
}
329
330
void zmq::object_t::send_pipe_hwm (pipe_t *destination_,
331
                                   int inhwm_,
332
                                   int outhwm_)
333
0
{
334
0
    command_t cmd;
335
0
    cmd.destination = destination_;
336
0
    cmd.type = command_t::pipe_hwm;
337
0
    cmd.args.pipe_hwm.inhwm = inhwm_;
338
0
    cmd.args.pipe_hwm.outhwm = outhwm_;
339
0
    send_command (cmd);
340
0
}
341
342
void zmq::object_t::send_term_req (own_t *destination_, own_t *object_)
343
0
{
344
0
    command_t cmd;
345
0
    cmd.destination = destination_;
346
0
    cmd.type = command_t::term_req;
347
0
    cmd.args.term_req.object = object_;
348
0
    send_command (cmd);
349
0
}
350
351
void zmq::object_t::send_term (own_t *destination_, int linger_)
352
1.23k
{
353
1.23k
    command_t cmd;
354
1.23k
    cmd.destination = destination_;
355
1.23k
    cmd.type = command_t::term;
356
1.23k
    cmd.args.term.linger = linger_;
357
1.23k
    send_command (cmd);
358
1.23k
}
359
360
void zmq::object_t::send_term_ack (own_t *destination_)
361
1.23k
{
362
1.23k
    command_t cmd;
363
1.23k
    cmd.destination = destination_;
364
1.23k
    cmd.type = command_t::term_ack;
365
1.23k
    send_command (cmd);
366
1.23k
}
367
368
void zmq::object_t::send_term_endpoint (own_t *destination_,
369
                                        std::string *endpoint_)
370
0
{
371
0
    command_t cmd;
372
0
    cmd.destination = destination_;
373
0
    cmd.type = command_t::term_endpoint;
374
0
    cmd.args.term_endpoint.endpoint = endpoint_;
375
0
    send_command (cmd);
376
0
}
377
378
void zmq::object_t::send_reap (class socket_base_t *socket_)
379
1.81k
{
380
1.81k
    command_t cmd;
381
1.81k
    cmd.destination = _ctx->get_reaper ();
382
1.81k
    cmd.type = command_t::reap;
383
1.81k
    cmd.args.reap.socket = socket_;
384
1.81k
    send_command (cmd);
385
1.81k
}
386
387
void zmq::object_t::send_reaped ()
388
1.81k
{
389
1.81k
    command_t cmd;
390
1.81k
    cmd.destination = _ctx->get_reaper ();
391
1.81k
    cmd.type = command_t::reaped;
392
1.81k
    send_command (cmd);
393
1.81k
}
394
395
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
396
63
{
397
63
    command_t cmd;
398
63
    cmd.destination = socket_;
399
63
    cmd.type = command_t::inproc_connected;
400
63
    send_command (cmd);
401
63
}
402
403
void zmq::object_t::send_done ()
404
1.74k
{
405
1.74k
    command_t cmd;
406
1.74k
    cmd.destination = NULL;
407
1.74k
    cmd.type = command_t::done;
408
1.74k
    _ctx->send_command (ctx_t::term_tid, cmd);
409
1.74k
}
410
411
void zmq::object_t::process_stop ()
412
0
{
413
0
    zmq_assert (false);
414
0
}
415
416
void zmq::object_t::process_plug ()
417
0
{
418
0
    zmq_assert (false);
419
0
}
420
421
void zmq::object_t::process_own (own_t *)
422
0
{
423
0
    zmq_assert (false);
424
0
}
425
426
void zmq::object_t::process_attach (i_engine *)
427
0
{
428
0
    zmq_assert (false);
429
0
}
430
431
void zmq::object_t::process_bind (pipe_t *)
432
0
{
433
0
    zmq_assert (false);
434
0
}
435
436
void zmq::object_t::process_activate_read ()
437
0
{
438
0
    zmq_assert (false);
439
0
}
440
441
void zmq::object_t::process_activate_write (uint64_t)
442
0
{
443
0
    zmq_assert (false);
444
0
}
445
446
void zmq::object_t::process_hiccup (void *)
447
0
{
448
0
    zmq_assert (false);
449
0
}
450
451
void zmq::object_t::process_pipe_peer_stats (uint64_t,
452
                                             own_t *,
453
                                             endpoint_uri_pair_t *)
454
0
{
455
0
    zmq_assert (false);
456
0
}
457
458
void zmq::object_t::process_pipe_stats_publish (uint64_t,
459
                                                uint64_t,
460
                                                endpoint_uri_pair_t *)
461
0
{
462
0
    zmq_assert (false);
463
0
}
464
465
void zmq::object_t::process_pipe_term ()
466
0
{
467
0
    zmq_assert (false);
468
0
}
469
470
void zmq::object_t::process_pipe_term_ack ()
471
0
{
472
0
    zmq_assert (false);
473
0
}
474
475
void zmq::object_t::process_pipe_hwm (int, int)
476
0
{
477
0
    zmq_assert (false);
478
0
}
479
480
void zmq::object_t::process_term_req (own_t *)
481
0
{
482
0
    zmq_assert (false);
483
0
}
484
485
void zmq::object_t::process_term (int)
486
0
{
487
0
    zmq_assert (false);
488
0
}
489
490
void zmq::object_t::process_term_ack ()
491
0
{
492
0
    zmq_assert (false);
493
0
}
494
495
void zmq::object_t::process_term_endpoint (std::string *)
496
0
{
497
0
    zmq_assert (false);
498
0
}
499
500
void zmq::object_t::process_reap (class socket_base_t *)
501
0
{
502
0
    zmq_assert (false);
503
0
}
504
505
void zmq::object_t::process_reaped ()
506
0
{
507
0
    zmq_assert (false);
508
0
}
509
510
void zmq::object_t::process_seqnum ()
511
0
{
512
0
    zmq_assert (false);
513
0
}
514
515
void zmq::object_t::process_conn_failed ()
516
0
{
517
0
    zmq_assert (false);
518
0
}
519
520
void zmq::object_t::send_command (const command_t &cmd_)
521
10.7k
{
522
10.7k
    _ctx->send_command (cmd_.destination->get_tid (), cmd_);
523
10.7k
}