Coverage Report

Created: 2025-08-03 07:05

/src/libzmq/src/io_thread.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
5
#include <new>
6
7
#include "macros.hpp"
8
#include "io_thread.hpp"
9
#include "err.hpp"
10
#include "ctx.hpp"
11
12
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
13
1.76k
    object_t (ctx_, tid_),
14
1.76k
    _mailbox_handle (static_cast<poller_t::handle_t> (NULL))
15
1.76k
{
16
1.76k
    _poller = new (std::nothrow) poller_t (*ctx_);
17
1.76k
    alloc_assert (_poller);
18
19
1.76k
    if (_mailbox.get_fd () != retired_fd) {
20
1.76k
        _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
21
1.76k
        _poller->set_pollin (_mailbox_handle);
22
1.76k
    }
23
1.76k
}
24
25
zmq::io_thread_t::~io_thread_t ()
26
1.76k
{
27
1.76k
    LIBZMQ_DELETE (_poller);
28
1.76k
}
29
30
void zmq::io_thread_t::start ()
31
1.76k
{
32
1.76k
    char name[16] = "";
33
1.76k
    snprintf (name, sizeof (name), "IO/%u",
34
1.76k
              get_tid () - zmq::ctx_t::reaper_tid - 1);
35
    //  Start the underlying I/O thread.
36
1.76k
    _poller->start (name);
37
1.76k
}
38
39
void zmq::io_thread_t::stop ()
40
1.76k
{
41
1.76k
    send_stop ();
42
1.76k
}
43
44
zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
45
3.52k
{
46
3.52k
    return &_mailbox;
47
3.52k
}
48
49
int zmq::io_thread_t::get_load () const
50
1.86k
{
51
1.86k
    return _poller->get_load ();
52
1.86k
}
53
54
void zmq::io_thread_t::in_event ()
55
2.95k
{
56
    //  TODO: Do we want to limit number of commands I/O thread can
57
    //  process in a single go?
58
59
2.95k
    command_t cmd;
60
2.95k
    int rc = _mailbox.recv (&cmd, 0);
61
62
9.56k
    while (rc == 0 || errno == EINTR) {
63
6.61k
        if (rc == 0)
64
6.61k
            cmd.destination->process_command (cmd);
65
6.61k
        rc = _mailbox.recv (&cmd, 0);
66
6.61k
    }
67
68
2.95k
    errno_assert (rc != 0 && errno == EAGAIN);
69
2.95k
}
70
71
void zmq::io_thread_t::out_event ()
72
0
{
73
    //  We are never polling for POLLOUT here. This function is never called.
74
0
    zmq_assert (false);
75
0
}
76
77
void zmq::io_thread_t::timer_event (int)
78
0
{
79
    //  No timers here. This function is never called.
80
0
    zmq_assert (false);
81
0
}
82
83
zmq::poller_t *zmq::io_thread_t::get_poller () const
84
1.21k
{
85
1.21k
    zmq_assert (_poller);
86
1.21k
    return _poller;
87
1.21k
}
88
89
void zmq::io_thread_t::process_stop ()
90
1.76k
{
91
1.76k
    zmq_assert (_mailbox_handle);
92
1.76k
    _poller->rm_fd (_mailbox_handle);
93
1.76k
    _poller->stop ();
94
1.76k
}