Coverage Report

Created: 2026-03-21 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/reaper.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "reaper.hpp"
6
#include "socket_base.hpp"
7
#include "err.hpp"
8
9
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
10
0
    object_t (ctx_, tid_),
11
0
    _mailbox_handle (static_cast<poller_t::handle_t> (NULL)),
12
0
    _poller (NULL),
13
0
    _sockets (0),
14
0
    _terminating (false)
15
0
{
16
0
    if (!_mailbox.valid ())
17
0
        return;
18
19
0
    _poller = new (std::nothrow) poller_t (*ctx_);
20
0
    alloc_assert (_poller);
21
22
0
    if (_mailbox.get_fd () != retired_fd) {
23
0
        _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
24
0
        _poller->set_pollin (_mailbox_handle);
25
0
    }
26
27
0
#ifdef HAVE_FORK
28
0
    _pid = getpid ();
29
0
#endif
30
0
}
31
32
zmq::reaper_t::~reaper_t ()
33
0
{
34
0
    LIBZMQ_DELETE (_poller);
35
0
}
36
37
zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
38
0
{
39
0
    return &_mailbox;
40
0
}
41
42
void zmq::reaper_t::start ()
43
0
{
44
0
    zmq_assert (_mailbox.valid ());
45
46
    //  Start the thread.
47
0
    _poller->start ("Reaper");
48
0
}
49
50
void zmq::reaper_t::stop ()
51
0
{
52
0
    if (get_mailbox ()->valid ()) {
53
0
        send_stop ();
54
0
    }
55
0
}
56
57
void zmq::reaper_t::in_event ()
58
0
{
59
0
    while (true) {
60
0
#ifdef HAVE_FORK
61
0
        if (unlikely (_pid != getpid ())) {
62
            //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
63
0
            return;
64
0
        }
65
0
#endif
66
67
        //  Get the next command. If there is none, exit.
68
0
        command_t cmd;
69
0
        const int rc = _mailbox.recv (&cmd, 0);
70
0
        if (rc != 0 && errno == EINTR)
71
0
            continue;
72
0
        if (rc != 0 && errno == EAGAIN)
73
0
            break;
74
0
        errno_assert (rc == 0);
75
76
        //  Process the command.
77
0
        cmd.destination->process_command (cmd);
78
0
    }
79
0
}
80
81
void zmq::reaper_t::out_event ()
82
0
{
83
0
    zmq_assert (false);
84
0
}
85
86
void zmq::reaper_t::timer_event (int)
87
0
{
88
0
    zmq_assert (false);
89
0
}
90
91
void zmq::reaper_t::process_stop ()
92
0
{
93
0
    _terminating = true;
94
95
    //  If there are no sockets being reaped finish immediately.
96
0
    if (!_sockets) {
97
0
        send_done ();
98
0
        _poller->rm_fd (_mailbox_handle);
99
0
        _poller->stop ();
100
0
    }
101
0
}
102
103
void zmq::reaper_t::process_reap (socket_base_t *socket_)
104
0
{
105
    //  Add the socket to the poller.
106
0
    socket_->start_reaping (_poller);
107
108
0
    ++_sockets;
109
0
}
110
111
void zmq::reaper_t::process_reaped ()
112
0
{
113
0
    --_sockets;
114
115
    //  If reaped was already asked to terminate and there are no more sockets,
116
    //  finish immediately.
117
0
    if (!_sockets && _terminating) {
118
0
        send_done ();
119
0
        _poller->rm_fd (_mailbox_handle);
120
0
        _poller->stop ();
121
0
    }
122
0
}