/src/libzmq/src/io_thread.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | |
5 | | #include <new> |
6 | | |
7 | | #include "macros.hpp" |
8 | | #include "io_thread.hpp" |
9 | | #include "err.hpp" |
10 | | #include "ctx.hpp" |
11 | | |
12 | | zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : |
13 | 1.76k | object_t (ctx_, tid_), |
14 | 1.76k | _mailbox_handle (static_cast<poller_t::handle_t> (NULL)) |
15 | 1.76k | { |
16 | 1.76k | _poller = new (std::nothrow) poller_t (*ctx_); |
17 | 1.76k | alloc_assert (_poller); |
18 | | |
19 | 1.76k | if (_mailbox.get_fd () != retired_fd) { |
20 | 1.76k | _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this); |
21 | 1.76k | _poller->set_pollin (_mailbox_handle); |
22 | 1.76k | } |
23 | 1.76k | } |
24 | | |
25 | | zmq::io_thread_t::~io_thread_t () |
26 | 1.76k | { |
27 | 1.76k | LIBZMQ_DELETE (_poller); |
28 | 1.76k | } |
29 | | |
30 | | void zmq::io_thread_t::start () |
31 | 1.76k | { |
32 | 1.76k | char name[16] = ""; |
33 | 1.76k | snprintf (name, sizeof (name), "IO/%u", |
34 | 1.76k | get_tid () - zmq::ctx_t::reaper_tid - 1); |
35 | | // Start the underlying I/O thread. |
36 | 1.76k | _poller->start (name); |
37 | 1.76k | } |
38 | | |
39 | | void zmq::io_thread_t::stop () |
40 | 1.76k | { |
41 | 1.76k | send_stop (); |
42 | 1.76k | } |
43 | | |
44 | | zmq::mailbox_t *zmq::io_thread_t::get_mailbox () |
45 | 3.52k | { |
46 | 3.52k | return &_mailbox; |
47 | 3.52k | } |
48 | | |
49 | | int zmq::io_thread_t::get_load () const |
50 | 1.86k | { |
51 | 1.86k | return _poller->get_load (); |
52 | 1.86k | } |
53 | | |
54 | | void zmq::io_thread_t::in_event () |
55 | 2.95k | { |
56 | | // TODO: Do we want to limit number of commands I/O thread can |
57 | | // process in a single go? |
58 | | |
59 | 2.95k | command_t cmd; |
60 | 2.95k | int rc = _mailbox.recv (&cmd, 0); |
61 | | |
62 | 9.56k | while (rc == 0 || errno == EINTR) { |
63 | 6.61k | if (rc == 0) |
64 | 6.61k | cmd.destination->process_command (cmd); |
65 | 6.61k | rc = _mailbox.recv (&cmd, 0); |
66 | 6.61k | } |
67 | | |
68 | 2.95k | errno_assert (rc != 0 && errno == EAGAIN); |
69 | 2.95k | } |
70 | | |
71 | | void zmq::io_thread_t::out_event () |
72 | 0 | { |
73 | | // We are never polling for POLLOUT here. This function is never called. |
74 | 0 | zmq_assert (false); |
75 | 0 | } |
76 | | |
77 | | void zmq::io_thread_t::timer_event (int) |
78 | 0 | { |
79 | | // No timers here. This function is never called. |
80 | 0 | zmq_assert (false); |
81 | 0 | } |
82 | | |
83 | | zmq::poller_t *zmq::io_thread_t::get_poller () const |
84 | 1.21k | { |
85 | 1.21k | zmq_assert (_poller); |
86 | 1.21k | return _poller; |
87 | 1.21k | } |
88 | | |
89 | | void zmq::io_thread_t::process_stop () |
90 | 1.76k | { |
91 | 1.76k | zmq_assert (_mailbox_handle); |
92 | 1.76k | _poller->rm_fd (_mailbox_handle); |
93 | 1.76k | _poller->stop (); |
94 | 1.76k | } |