Coverage Report

Created: 2026-02-09 06:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/reaper.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "reaper.hpp"
6
#include "socket_base.hpp"
7
#include "err.hpp"
8
9
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
10
1.69k
    object_t (ctx_, tid_),
11
1.69k
    _mailbox_handle (static_cast<poller_t::handle_t> (NULL)),
12
1.69k
    _poller (NULL),
13
1.69k
    _sockets (0),
14
1.69k
    _terminating (false)
15
1.69k
{
16
1.69k
    if (!_mailbox.valid ())
17
0
        return;
18
19
1.69k
    _poller = new (std::nothrow) poller_t (*ctx_);
20
1.69k
    alloc_assert (_poller);
21
22
1.69k
    if (_mailbox.get_fd () != retired_fd) {
23
1.69k
        _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this);
24
1.69k
        _poller->set_pollin (_mailbox_handle);
25
1.69k
    }
26
27
1.69k
#ifdef HAVE_FORK
28
1.69k
    _pid = getpid ();
29
1.69k
#endif
30
1.69k
}
31
32
zmq::reaper_t::~reaper_t ()
33
1.69k
{
34
1.69k
    LIBZMQ_DELETE (_poller);
35
1.69k
}
36
37
zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
38
5.08k
{
39
5.08k
    return &_mailbox;
40
5.08k
}
41
42
void zmq::reaper_t::start ()
43
1.69k
{
44
1.69k
    zmq_assert (_mailbox.valid ());
45
46
    //  Start the thread.
47
1.69k
    _poller->start ("Reaper");
48
1.69k
}
49
50
void zmq::reaper_t::stop ()
51
1.69k
{
52
1.69k
    if (get_mailbox ()->valid ()) {
53
1.69k
        send_stop ();
54
1.69k
    }
55
1.69k
}
56
57
void zmq::reaper_t::in_event ()
58
1.82k
{
59
6.91k
    while (true) {
60
6.91k
#ifdef HAVE_FORK
61
6.91k
        if (unlikely (_pid != getpid ())) {
62
            //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
63
0
            return;
64
0
        }
65
6.91k
#endif
66
67
        //  Get the next command. If there is none, exit.
68
6.91k
        command_t cmd;
69
6.91k
        const int rc = _mailbox.recv (&cmd, 0);
70
6.91k
        if (rc != 0 && errno == EINTR)
71
0
            continue;
72
6.91k
        if (rc != 0 && errno == EAGAIN)
73
1.82k
            break;
74
5.08k
        errno_assert (rc == 0);
75
76
        //  Process the command.
77
5.08k
        cmd.destination->process_command (cmd);
78
5.08k
    }
79
1.82k
}
80
81
void zmq::reaper_t::out_event ()
82
0
{
83
0
    zmq_assert (false);
84
0
}
85
86
void zmq::reaper_t::timer_event (int)
87
0
{
88
0
    zmq_assert (false);
89
0
}
90
91
void zmq::reaper_t::process_stop ()
92
1.69k
{
93
1.69k
    _terminating = true;
94
95
    //  If there are no sockets being reaped finish immediately.
96
1.69k
    if (!_sockets) {
97
1
        send_done ();
98
1
        _poller->rm_fd (_mailbox_handle);
99
1
        _poller->stop ();
100
1
    }
101
1.69k
}
102
103
void zmq::reaper_t::process_reap (socket_base_t *socket_)
104
1.69k
{
105
    //  Add the socket to the poller.
106
1.69k
    socket_->start_reaping (_poller);
107
108
1.69k
    ++_sockets;
109
1.69k
}
110
111
void zmq::reaper_t::process_reaped ()
112
1.69k
{
113
1.69k
    --_sockets;
114
115
    //  If reaped was already asked to terminate and there are no more sockets,
116
    //  finish immediately.
117
1.69k
    if (!_sockets && _terminating) {
118
1.69k
        send_done ();
119
1.69k
        _poller->rm_fd (_mailbox_handle);
120
1.69k
        _poller->stop ();
121
1.69k
    }
122
1.69k
}