Coverage Report

Created: 2026-01-10 06:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/epoll.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#if defined ZMQ_IOTHREAD_POLLER_USE_EPOLL
5
#include "epoll.hpp"
6
7
#if !defined ZMQ_HAVE_WINDOWS
8
#include <unistd.h>
9
#endif
10
11
#include <stdlib.h>
12
#include <string.h>
13
#include <signal.h>
14
#include <algorithm>
15
#include <new>
16
17
#include "macros.hpp"
18
#include "err.hpp"
19
#include "config.hpp"
20
#include "i_poll_events.hpp"
21
22
#ifdef ZMQ_HAVE_WINDOWS
23
const zmq::epoll_t::epoll_fd_t zmq::epoll_t::epoll_retired_fd =
24
  INVALID_HANDLE_VALUE;
25
#endif
26
27
zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
28
3.56k
    worker_poller_base_t (ctx_)
29
3.56k
{
30
3.56k
#ifdef ZMQ_IOTHREAD_POLLER_USE_EPOLL_CLOEXEC
31
    //  Setting this option result in sane behaviour when exec() functions
32
    //  are used. Old sockets are closed and don't block TCP ports, avoid
33
    //  leaks, etc.
34
3.56k
    _epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
35
#else
36
    _epoll_fd = epoll_create (1);
37
#endif
38
3.56k
    errno_assert (_epoll_fd != epoll_retired_fd);
39
3.56k
}
40
41
zmq::epoll_t::~epoll_t ()
42
3.56k
{
43
    //  Wait till the worker thread exits.
44
3.56k
    stop_worker ();
45
46
#ifdef ZMQ_HAVE_WINDOWS
47
    epoll_close (_epoll_fd);
48
#else
49
3.56k
    close (_epoll_fd);
50
3.56k
#endif
51
3.56k
    for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
52
3.56k
         it != end; ++it) {
53
0
        LIBZMQ_DELETE (*it);
54
0
    }
55
3.56k
}
56
57
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
58
5.49k
{
59
5.49k
    check_thread ();
60
5.49k
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
61
5.49k
    alloc_assert (pe);
62
63
    //  The memset is not actually needed. It's here to prevent debugging
64
    //  tools to complain about using uninitialised memory.
65
5.49k
    memset (pe, 0, sizeof (poll_entry_t));
66
67
5.49k
    pe->fd = fd_;
68
5.49k
    pe->ev.events = 0;
69
5.49k
    pe->ev.data.ptr = pe;
70
5.49k
    pe->events = events_;
71
72
5.49k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
73
5.49k
    errno_assert (rc != -1);
74
75
    //  Increase the load metric of the thread.
76
5.49k
    adjust_load (1);
77
78
5.49k
    return pe;
79
5.49k
}
80
81
void zmq::epoll_t::rm_fd (handle_t handle_)
82
5.49k
{
83
5.49k
    check_thread ();
84
5.49k
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
85
5.49k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
86
5.49k
    errno_assert (rc != -1);
87
5.49k
    pe->fd = retired_fd;
88
5.49k
    _retired.push_back (pe);
89
90
    //  Decrease the load metric of the thread.
91
5.49k
    adjust_load (-1);
92
5.49k
}
93
94
void zmq::epoll_t::set_pollin (handle_t handle_)
95
5.41k
{
96
5.41k
    check_thread ();
97
5.41k
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
98
5.41k
    pe->ev.events |= EPOLLIN;
99
5.41k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
100
5.41k
    errno_assert (rc != -1);
101
5.41k
}
102
103
void zmq::epoll_t::reset_pollin (handle_t handle_)
104
0
{
105
0
    check_thread ();
106
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
107
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLIN));
108
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
109
0
    errno_assert (rc != -1);
110
0
}
111
112
void zmq::epoll_t::set_pollout (handle_t handle_)
113
86
{
114
86
    check_thread ();
115
86
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
116
86
    pe->ev.events |= EPOLLOUT;
117
86
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
118
86
    errno_assert (rc != -1);
119
86
}
120
121
void zmq::epoll_t::reset_pollout (handle_t handle_)
122
0
{
123
0
    check_thread ();
124
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
125
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLOUT));
126
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
127
0
    errno_assert (rc != -1);
128
0
}
129
130
void zmq::epoll_t::stop ()
131
3.56k
{
132
3.56k
    check_thread ();
133
3.56k
}
134
135
int zmq::epoll_t::max_fds ()
136
3.56k
{
137
3.56k
    return -1;
138
3.56k
}
139
140
void zmq::epoll_t::loop ()
141
3.56k
{
142
3.56k
    epoll_event ev_buf[max_io_events];
143
144
11.0k
    while (true) {
145
        //  Execute any due timers.
146
11.0k
        const int timeout = static_cast<int> (execute_timers ());
147
148
11.0k
        if (get_load () == 0) {
149
3.56k
            if (timeout == 0)
150
3.56k
                break;
151
152
            // TODO sleep for timeout
153
0
            continue;
154
3.56k
        }
155
156
        //  Wait for events.
157
7.48k
        const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
158
7.48k
                                  timeout ? timeout : -1);
159
7.48k
        if (n == -1) {
160
0
            errno_assert (errno == EINTR);
161
0
            continue;
162
0
        }
163
164
15.1k
        for (int i = 0; i < n; i++) {
165
7.62k
            const poll_entry_t *const pe =
166
7.62k
              static_cast<const poll_entry_t *> (ev_buf[i].data.ptr);
167
168
7.62k
            if (NULL == pe)
169
0
                continue;
170
7.62k
            if (NULL == pe->events)
171
0
                continue;
172
7.62k
            if (pe->fd == retired_fd)
173
0
                continue;
174
7.62k
            if (ev_buf[i].events & (EPOLLERR | EPOLLHUP))
175
29
                pe->events->in_event ();
176
7.62k
            if (pe->fd == retired_fd)
177
29
                continue;
178
7.59k
            if (ev_buf[i].events & EPOLLOUT)
179
0
                pe->events->out_event ();
180
7.59k
            if (pe->fd == retired_fd)
181
0
                continue;
182
7.59k
            if (ev_buf[i].events & EPOLLIN)
183
7.59k
                pe->events->in_event ();
184
7.59k
        }
185
186
        //  Destroy retired event sources.
187
7.48k
        for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
188
12.9k
             it != end; ++it) {
189
5.49k
            LIBZMQ_DELETE (*it);
190
5.49k
        }
191
7.48k
        _retired.clear ();
192
7.48k
    }
193
3.56k
}
194
195
#endif