Coverage Report

Created: 2025-08-26 06:06

/src/libzmq/src/object.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <string.h>
5
#include <stdarg.h>
6
7
#include "object.hpp"
8
#include "ctx.hpp"
9
#include "err.hpp"
10
#include "pipe.hpp"
11
#include "io_thread.hpp"
12
#include "session_base.hpp"
13
#include "socket_base.hpp"
14
15
0
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : _ctx (ctx_), _tid (tid_)
16
0
{
17
0
}
18
19
zmq::object_t::object_t (object_t *parent_) :
20
0
    _ctx (parent_->_ctx), _tid (parent_->_tid)
21
0
{
22
0
}
23
24
zmq::object_t::~object_t ()
25
0
{
26
0
}
27
28
uint32_t zmq::object_t::get_tid () const
29
0
{
30
0
    return _tid;
31
0
}
32
33
void zmq::object_t::set_tid (uint32_t id_)
34
0
{
35
0
    _tid = id_;
36
0
}
37
38
zmq::ctx_t *zmq::object_t::get_ctx () const
39
0
{
40
0
    return _ctx;
41
0
}
42
43
void zmq::object_t::process_command (const command_t &cmd_)
44
0
{
45
0
    switch (cmd_.type) {
46
0
        case command_t::activate_read:
47
0
            process_activate_read ();
48
0
            break;
49
50
0
        case command_t::activate_write:
51
0
            process_activate_write (cmd_.args.activate_write.msgs_read);
52
0
            break;
53
54
0
        case command_t::stop:
55
0
            process_stop ();
56
0
            break;
57
58
0
        case command_t::plug:
59
0
            process_plug ();
60
0
            process_seqnum ();
61
0
            break;
62
63
0
        case command_t::own:
64
0
            process_own (cmd_.args.own.object);
65
0
            process_seqnum ();
66
0
            break;
67
68
0
        case command_t::attach:
69
0
            process_attach (cmd_.args.attach.engine);
70
0
            process_seqnum ();
71
0
            break;
72
73
0
        case command_t::bind:
74
0
            process_bind (cmd_.args.bind.pipe);
75
0
            process_seqnum ();
76
0
            break;
77
78
0
        case command_t::hiccup:
79
0
            process_hiccup (cmd_.args.hiccup.pipe);
80
0
            break;
81
82
0
        case command_t::pipe_peer_stats:
83
0
            process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
84
0
                                     cmd_.args.pipe_peer_stats.socket_base,
85
0
                                     cmd_.args.pipe_peer_stats.endpoint_pair);
86
0
            break;
87
88
0
        case command_t::pipe_stats_publish:
89
0
            process_pipe_stats_publish (
90
0
              cmd_.args.pipe_stats_publish.outbound_queue_count,
91
0
              cmd_.args.pipe_stats_publish.inbound_queue_count,
92
0
              cmd_.args.pipe_stats_publish.endpoint_pair);
93
0
            break;
94
95
0
        case command_t::pipe_term:
96
0
            process_pipe_term ();
97
0
            break;
98
99
0
        case command_t::pipe_term_ack:
100
0
            process_pipe_term_ack ();
101
0
            break;
102
103
0
        case command_t::pipe_hwm:
104
0
            process_pipe_hwm (cmd_.args.pipe_hwm.inhwm,
105
0
                              cmd_.args.pipe_hwm.outhwm);
106
0
            break;
107
108
0
        case command_t::term_req:
109
0
            process_term_req (cmd_.args.term_req.object);
110
0
            break;
111
112
0
        case command_t::term:
113
0
            process_term (cmd_.args.term.linger);
114
0
            break;
115
116
0
        case command_t::term_ack:
117
0
            process_term_ack ();
118
0
            break;
119
120
0
        case command_t::term_endpoint:
121
0
            process_term_endpoint (cmd_.args.term_endpoint.endpoint);
122
0
            break;
123
124
0
        case command_t::reap:
125
0
            process_reap (cmd_.args.reap.socket);
126
0
            break;
127
128
0
        case command_t::reaped:
129
0
            process_reaped ();
130
0
            break;
131
132
0
        case command_t::inproc_connected:
133
0
            process_seqnum ();
134
0
            break;
135
136
0
        case command_t::conn_failed:
137
0
            process_conn_failed ();
138
0
            break;
139
140
0
        case command_t::done:
141
0
        default:
142
0
            zmq_assert (false);
143
0
    }
144
0
}
145
146
int zmq::object_t::register_endpoint (const char *addr_,
147
                                      const endpoint_t &endpoint_)
148
0
{
149
0
    return _ctx->register_endpoint (addr_, endpoint_);
150
0
}
151
152
int zmq::object_t::unregister_endpoint (const std::string &addr_,
153
                                        socket_base_t *socket_)
154
0
{
155
0
    return _ctx->unregister_endpoint (addr_, socket_);
156
0
}
157
158
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
159
0
{
160
0
    return _ctx->unregister_endpoints (socket_);
161
0
}
162
163
zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) const
164
0
{
165
0
    return _ctx->find_endpoint (addr_);
166
0
}
167
168
void zmq::object_t::pend_connection (const std::string &addr_,
169
                                     const endpoint_t &endpoint_,
170
                                     pipe_t **pipes_)
171
0
{
172
0
    _ctx->pend_connection (addr_, endpoint_, pipes_);
173
0
}
174
175
void zmq::object_t::connect_pending (const char *addr_,
176
                                     zmq::socket_base_t *bind_socket_)
177
0
{
178
0
    return _ctx->connect_pending (addr_, bind_socket_);
179
0
}
180
181
void zmq::object_t::destroy_socket (socket_base_t *socket_)
182
0
{
183
0
    _ctx->destroy_socket (socket_);
184
0
}
185
186
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) const
187
0
{
188
0
    return _ctx->choose_io_thread (affinity_);
189
0
}
190
191
void zmq::object_t::send_stop ()
192
0
{
193
    //  'stop' command goes always from administrative thread to
194
    //  the current object.
195
0
    command_t cmd;
196
0
    cmd.destination = this;
197
0
    cmd.type = command_t::stop;
198
0
    _ctx->send_command (_tid, cmd);
199
0
}
200
201
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
202
0
{
203
0
    if (inc_seqnum_)
204
0
        destination_->inc_seqnum ();
205
206
0
    command_t cmd;
207
0
    cmd.destination = destination_;
208
0
    cmd.type = command_t::plug;
209
0
    send_command (cmd);
210
0
}
211
212
void zmq::object_t::send_own (own_t *destination_, own_t *object_)
213
0
{
214
0
    destination_->inc_seqnum ();
215
0
    command_t cmd;
216
0
    cmd.destination = destination_;
217
0
    cmd.type = command_t::own;
218
0
    cmd.args.own.object = object_;
219
0
    send_command (cmd);
220
0
}
221
222
void zmq::object_t::send_attach (session_base_t *destination_,
223
                                 i_engine *engine_,
224
                                 bool inc_seqnum_)
225
0
{
226
0
    if (inc_seqnum_)
227
0
        destination_->inc_seqnum ();
228
229
0
    command_t cmd;
230
0
    cmd.destination = destination_;
231
0
    cmd.type = command_t::attach;
232
0
    cmd.args.attach.engine = engine_;
233
0
    send_command (cmd);
234
0
}
235
236
void zmq::object_t::send_conn_failed (session_base_t *destination_)
237
0
{
238
0
    command_t cmd;
239
0
    cmd.destination = destination_;
240
0
    cmd.type = command_t::conn_failed;
241
0
    send_command (cmd);
242
0
}
243
244
void zmq::object_t::send_bind (own_t *destination_,
245
                               pipe_t *pipe_,
246
                               bool inc_seqnum_)
247
0
{
248
0
    if (inc_seqnum_)
249
0
        destination_->inc_seqnum ();
250
251
0
    command_t cmd;
252
0
    cmd.destination = destination_;
253
0
    cmd.type = command_t::bind;
254
0
    cmd.args.bind.pipe = pipe_;
255
0
    send_command (cmd);
256
0
}
257
258
void zmq::object_t::send_activate_read (pipe_t *destination_)
259
0
{
260
0
    command_t cmd;
261
0
    cmd.destination = destination_;
262
0
    cmd.type = command_t::activate_read;
263
0
    send_command (cmd);
264
0
}
265
266
void zmq::object_t::send_activate_write (pipe_t *destination_,
267
                                         uint64_t msgs_read_)
268
0
{
269
0
    command_t cmd;
270
0
    cmd.destination = destination_;
271
0
    cmd.type = command_t::activate_write;
272
0
    cmd.args.activate_write.msgs_read = msgs_read_;
273
0
    send_command (cmd);
274
0
}
275
276
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
277
0
{
278
0
    command_t cmd;
279
0
    cmd.destination = destination_;
280
0
    cmd.type = command_t::hiccup;
281
0
    cmd.args.hiccup.pipe = pipe_;
282
0
    send_command (cmd);
283
0
}
284
285
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
286
                                          uint64_t queue_count_,
287
                                          own_t *socket_base_,
288
                                          endpoint_uri_pair_t *endpoint_pair_)
289
0
{
290
0
    command_t cmd;
291
0
    cmd.destination = destination_;
292
0
    cmd.type = command_t::pipe_peer_stats;
293
0
    cmd.args.pipe_peer_stats.queue_count = queue_count_;
294
0
    cmd.args.pipe_peer_stats.socket_base = socket_base_;
295
0
    cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
296
0
    send_command (cmd);
297
0
}
298
299
void zmq::object_t::send_pipe_stats_publish (
300
  own_t *destination_,
301
  uint64_t outbound_queue_count_,
302
  uint64_t inbound_queue_count_,
303
  endpoint_uri_pair_t *endpoint_pair_)
304
0
{
305
0
    command_t cmd;
306
0
    cmd.destination = destination_;
307
0
    cmd.type = command_t::pipe_stats_publish;
308
0
    cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
309
0
    cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
310
0
    cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
311
0
    send_command (cmd);
312
0
}
313
314
void zmq::object_t::send_pipe_term (pipe_t *destination_)
315
0
{
316
0
    command_t cmd;
317
0
    cmd.destination = destination_;
318
0
    cmd.type = command_t::pipe_term;
319
0
    send_command (cmd);
320
0
}
321
322
void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
323
0
{
324
0
    command_t cmd;
325
0
    cmd.destination = destination_;
326
0
    cmd.type = command_t::pipe_term_ack;
327
0
    send_command (cmd);
328
0
}
329
330
void zmq::object_t::send_pipe_hwm (pipe_t *destination_,
331
                                   int inhwm_,
332
                                   int outhwm_)
333
0
{
334
0
    command_t cmd;
335
0
    cmd.destination = destination_;
336
0
    cmd.type = command_t::pipe_hwm;
337
0
    cmd.args.pipe_hwm.inhwm = inhwm_;
338
0
    cmd.args.pipe_hwm.outhwm = outhwm_;
339
0
    send_command (cmd);
340
0
}
341
342
void zmq::object_t::send_term_req (own_t *destination_, own_t *object_)
343
0
{
344
0
    command_t cmd;
345
0
    cmd.destination = destination_;
346
0
    cmd.type = command_t::term_req;
347
0
    cmd.args.term_req.object = object_;
348
0
    send_command (cmd);
349
0
}
350
351
void zmq::object_t::send_term (own_t *destination_, int linger_)
352
0
{
353
0
    command_t cmd;
354
0
    cmd.destination = destination_;
355
0
    cmd.type = command_t::term;
356
0
    cmd.args.term.linger = linger_;
357
0
    send_command (cmd);
358
0
}
359
360
void zmq::object_t::send_term_ack (own_t *destination_)
361
0
{
362
0
    command_t cmd;
363
0
    cmd.destination = destination_;
364
0
    cmd.type = command_t::term_ack;
365
0
    send_command (cmd);
366
0
}
367
368
void zmq::object_t::send_term_endpoint (own_t *destination_,
369
                                        std::string *endpoint_)
370
0
{
371
0
    command_t cmd;
372
0
    cmd.destination = destination_;
373
0
    cmd.type = command_t::term_endpoint;
374
0
    cmd.args.term_endpoint.endpoint = endpoint_;
375
0
    send_command (cmd);
376
0
}
377
378
void zmq::object_t::send_reap (class socket_base_t *socket_)
379
0
{
380
0
    command_t cmd;
381
0
    cmd.destination = _ctx->get_reaper ();
382
0
    cmd.type = command_t::reap;
383
0
    cmd.args.reap.socket = socket_;
384
0
    send_command (cmd);
385
0
}
386
387
void zmq::object_t::send_reaped ()
388
0
{
389
0
    command_t cmd;
390
0
    cmd.destination = _ctx->get_reaper ();
391
0
    cmd.type = command_t::reaped;
392
0
    send_command (cmd);
393
0
}
394
395
void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
396
0
{
397
0
    command_t cmd;
398
0
    cmd.destination = socket_;
399
0
    cmd.type = command_t::inproc_connected;
400
0
    send_command (cmd);
401
0
}
402
403
void zmq::object_t::send_done ()
404
0
{
405
0
    command_t cmd;
406
0
    cmd.destination = NULL;
407
0
    cmd.type = command_t::done;
408
0
    _ctx->send_command (ctx_t::term_tid, cmd);
409
0
}
410
411
void zmq::object_t::process_stop ()
412
0
{
413
0
    zmq_assert (false);
414
0
}
415
416
void zmq::object_t::process_plug ()
417
0
{
418
0
    zmq_assert (false);
419
0
}
420
421
void zmq::object_t::process_own (own_t *)
422
0
{
423
0
    zmq_assert (false);
424
0
}
425
426
void zmq::object_t::process_attach (i_engine *)
427
0
{
428
0
    zmq_assert (false);
429
0
}
430
431
void zmq::object_t::process_bind (pipe_t *)
432
0
{
433
0
    zmq_assert (false);
434
0
}
435
436
void zmq::object_t::process_activate_read ()
437
0
{
438
0
    zmq_assert (false);
439
0
}
440
441
void zmq::object_t::process_activate_write (uint64_t)
442
0
{
443
0
    zmq_assert (false);
444
0
}
445
446
void zmq::object_t::process_hiccup (void *)
447
0
{
448
0
    zmq_assert (false);
449
0
}
450
451
void zmq::object_t::process_pipe_peer_stats (uint64_t,
452
                                             own_t *,
453
                                             endpoint_uri_pair_t *)
454
0
{
455
0
    zmq_assert (false);
456
0
}
457
458
void zmq::object_t::process_pipe_stats_publish (uint64_t,
459
                                                uint64_t,
460
                                                endpoint_uri_pair_t *)
461
0
{
462
0
    zmq_assert (false);
463
0
}
464
465
void zmq::object_t::process_pipe_term ()
466
0
{
467
0
    zmq_assert (false);
468
0
}
469
470
void zmq::object_t::process_pipe_term_ack ()
471
0
{
472
0
    zmq_assert (false);
473
0
}
474
475
void zmq::object_t::process_pipe_hwm (int, int)
476
0
{
477
0
    zmq_assert (false);
478
0
}
479
480
void zmq::object_t::process_term_req (own_t *)
481
0
{
482
0
    zmq_assert (false);
483
0
}
484
485
void zmq::object_t::process_term (int)
486
0
{
487
0
    zmq_assert (false);
488
0
}
489
490
void zmq::object_t::process_term_ack ()
491
0
{
492
0
    zmq_assert (false);
493
0
}
494
495
void zmq::object_t::process_term_endpoint (std::string *)
496
0
{
497
0
    zmq_assert (false);
498
0
}
499
500
void zmq::object_t::process_reap (class socket_base_t *)
501
0
{
502
0
    zmq_assert (false);
503
0
}
504
505
void zmq::object_t::process_reaped ()
506
0
{
507
0
    zmq_assert (false);
508
0
}
509
510
void zmq::object_t::process_seqnum ()
511
0
{
512
0
    zmq_assert (false);
513
0
}
514
515
void zmq::object_t::process_conn_failed ()
516
0
{
517
0
    zmq_assert (false);
518
0
}
519
520
void zmq::object_t::send_command (const command_t &cmd_)
521
0
{
522
0
    _ctx->send_command (cmd_.destination->get_tid (), cmd_);
523
0
}