/src/libzmq/src/reaper.cpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "reaper.hpp" |
6 | | #include "socket_base.hpp" |
7 | | #include "err.hpp" |
8 | | |
9 | | zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : |
10 | 1.69k | object_t (ctx_, tid_), |
11 | 1.69k | _mailbox_handle (static_cast<poller_t::handle_t> (NULL)), |
12 | 1.69k | _poller (NULL), |
13 | 1.69k | _sockets (0), |
14 | 1.69k | _terminating (false) |
15 | 1.69k | { |
16 | 1.69k | if (!_mailbox.valid ()) |
17 | 0 | return; |
18 | | |
19 | 1.69k | _poller = new (std::nothrow) poller_t (*ctx_); |
20 | 1.69k | alloc_assert (_poller); |
21 | | |
22 | 1.69k | if (_mailbox.get_fd () != retired_fd) { |
23 | 1.69k | _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this); |
24 | 1.69k | _poller->set_pollin (_mailbox_handle); |
25 | 1.69k | } |
26 | | |
27 | 1.69k | #ifdef HAVE_FORK |
28 | 1.69k | _pid = getpid (); |
29 | 1.69k | #endif |
30 | 1.69k | } |
31 | | |
32 | | zmq::reaper_t::~reaper_t () |
33 | 1.69k | { |
34 | 1.69k | LIBZMQ_DELETE (_poller); |
35 | 1.69k | } |
36 | | |
37 | | zmq::mailbox_t *zmq::reaper_t::get_mailbox () |
38 | 5.08k | { |
39 | 5.08k | return &_mailbox; |
40 | 5.08k | } |
41 | | |
42 | | void zmq::reaper_t::start () |
43 | 1.69k | { |
44 | 1.69k | zmq_assert (_mailbox.valid ()); |
45 | | |
46 | | // Start the thread. |
47 | 1.69k | _poller->start ("Reaper"); |
48 | 1.69k | } |
49 | | |
50 | | void zmq::reaper_t::stop () |
51 | 1.69k | { |
52 | 1.69k | if (get_mailbox ()->valid ()) { |
53 | 1.69k | send_stop (); |
54 | 1.69k | } |
55 | 1.69k | } |
56 | | |
57 | | void zmq::reaper_t::in_event () |
58 | 1.82k | { |
59 | 6.91k | while (true) { |
60 | 6.91k | #ifdef HAVE_FORK |
61 | 6.91k | if (unlikely (_pid != getpid ())) { |
62 | | //printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid()); |
63 | 0 | return; |
64 | 0 | } |
65 | 6.91k | #endif |
66 | | |
67 | | // Get the next command. If there is none, exit. |
68 | 6.91k | command_t cmd; |
69 | 6.91k | const int rc = _mailbox.recv (&cmd, 0); |
70 | 6.91k | if (rc != 0 && errno == EINTR) |
71 | 0 | continue; |
72 | 6.91k | if (rc != 0 && errno == EAGAIN) |
73 | 1.82k | break; |
74 | 5.08k | errno_assert (rc == 0); |
75 | | |
76 | | // Process the command. |
77 | 5.08k | cmd.destination->process_command (cmd); |
78 | 5.08k | } |
79 | 1.82k | } |
80 | | |
81 | | void zmq::reaper_t::out_event () |
82 | 0 | { |
83 | 0 | zmq_assert (false); |
84 | 0 | } |
85 | | |
86 | | void zmq::reaper_t::timer_event (int) |
87 | 0 | { |
88 | 0 | zmq_assert (false); |
89 | 0 | } |
90 | | |
91 | | void zmq::reaper_t::process_stop () |
92 | 1.69k | { |
93 | 1.69k | _terminating = true; |
94 | | |
95 | | // If there are no sockets being reaped finish immediately. |
96 | 1.69k | if (!_sockets) { |
97 | 1 | send_done (); |
98 | 1 | _poller->rm_fd (_mailbox_handle); |
99 | 1 | _poller->stop (); |
100 | 1 | } |
101 | 1.69k | } |
102 | | |
103 | | void zmq::reaper_t::process_reap (socket_base_t *socket_) |
104 | 1.69k | { |
105 | | // Add the socket to the poller. |
106 | 1.69k | socket_->start_reaping (_poller); |
107 | | |
108 | 1.69k | ++_sockets; |
109 | 1.69k | } |
110 | | |
111 | | void zmq::reaper_t::process_reaped () |
112 | 1.69k | { |
113 | 1.69k | --_sockets; |
114 | | |
115 | | // If reaped was already asked to terminate and there are no more sockets, |
116 | | // finish immediately. |
117 | 1.69k | if (!_sockets && _terminating) { |
118 | 1.69k | send_done (); |
119 | 1.69k | _poller->rm_fd (_mailbox_handle); |
120 | 1.69k | _poller->stop (); |
121 | 1.69k | } |
122 | 1.69k | } |