Coverage Report

Created: 2025-11-09 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/reaper.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "reaper.hpp"
6
#include "socket_base.hpp"
7
#include "err.hpp"
8
9
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
10
4.20k
    object_t (ctx_, tid_),
11
4.20k
    _mailbox_handle (static_cast<poller_t::handle_t> (NULL)),
12
4.20k
    _poller (NULL),
13
4.20k
    _sockets (0),
14
4.20k
    _terminating (false)
15
4.20k
{
16
4.20k
    if (!_mailbox.valid ())
17
0
        return;
18
19
4.20k
    _poller = new (std::nothrow) poller_t (*ctx_);
20
4.20k
    alloc_assert (_poller);
21
22
4.20k
    if (_mailbox.get_fd () != retired_fd) {
23
4.20k
        _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
24
4.20k
        _poller->set_pollin (_mailbox_handle);
25
4.20k
    }
26
27
4.20k
#ifdef HAVE_FORK
28
4.20k
    _pid = getpid ();
29
4.20k
#endif
30
4.20k
}
31
32
zmq::reaper_t::~reaper_t ()
33
4.20k
{
34
4.20k
    LIBZMQ_DELETE (_poller);
35
4.20k
}
36
37
zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
38
12.6k
{
39
12.6k
    return &_mailbox;
40
12.6k
}
41
42
void zmq::reaper_t::start ()
43
4.20k
{
44
4.20k
    zmq_assert (_mailbox.valid ());
45
46
    //  Start the thread.
47
4.20k
    _poller->start ("Reaper");
48
4.20k
}
49
50
void zmq::reaper_t::stop ()
51
4.20k
{
52
4.20k
    if (get_mailbox ()->valid ()) {
53
4.20k
        send_stop ();
54
4.20k
    }
55
4.20k
}
56
57
void zmq::reaper_t::in_event ()
58
4.97k
{
59
17.6k
    while (true) {
60
17.6k
#ifdef HAVE_FORK
61
17.6k
        if (unlikely (_pid != getpid ())) {
62
            //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
63
0
            return;
64
0
        }
65
17.6k
#endif
66
67
        //  Get the next command. If there is none, exit.
68
17.6k
        command_t cmd;
69
17.6k
        const int rc = _mailbox.recv (&cmd, 0);
70
17.6k
        if (rc != 0 && errno == EINTR)
71
0
            continue;
72
17.6k
        if (rc != 0 && errno == EAGAIN)
73
4.97k
            break;
74
12.7k
        errno_assert (rc == 0);
75
76
        //  Process the command.
77
12.7k
        cmd.destination->process_command (cmd);
78
12.7k
    }
79
4.97k
}
80
81
void zmq::reaper_t::out_event ()
82
0
{
83
0
    zmq_assert (false);
84
0
}
85
86
void zmq::reaper_t::timer_event (int)
87
0
{
88
0
    zmq_assert (false);
89
0
}
90
91
void zmq::reaper_t::process_stop ()
92
4.20k
{
93
4.20k
    _terminating = true;
94
95
    //  If there are no sockets being reaped finish immediately.
96
4.20k
    if (!_sockets) {
97
9
        send_done ();
98
9
        _poller->rm_fd (_mailbox_handle);
99
9
        _poller->stop ();
100
9
    }
101
4.20k
}
102
103
void zmq::reaper_t::process_reap (socket_base_t *socket_)
104
4.26k
{
105
    //  Add the socket to the poller.
106
4.26k
    socket_->start_reaping (_poller);
107
108
4.26k
    ++_sockets;
109
4.26k
}
110
111
void zmq::reaper_t::process_reaped ()
112
4.26k
{
113
4.26k
    --_sockets;
114
115
    //  If reaped was already asked to terminate and there are no more sockets,
116
    //  finish immediately.
117
4.26k
    if (!_sockets && _terminating) {
118
4.19k
        send_done ();
119
4.19k
        _poller->rm_fd (_mailbox_handle);
120
4.19k
        _poller->stop ();
121
4.19k
    }
122
4.26k
}