Coverage Report

Created: 2025-11-30 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/epoll.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#if defined ZMQ_IOTHREAD_POLLER_USE_EPOLL
5
#include "epoll.hpp"
6
7
#if !defined ZMQ_HAVE_WINDOWS
8
#include <unistd.h>
9
#endif
10
11
#include <stdlib.h>
12
#include <string.h>
13
#include <signal.h>
14
#include <algorithm>
15
#include <new>
16
17
#include "macros.hpp"
18
#include "err.hpp"
19
#include "config.hpp"
20
#include "i_poll_events.hpp"
21
22
#ifdef ZMQ_HAVE_WINDOWS
23
const zmq::epoll_t::epoll_fd_t zmq::epoll_t::epoll_retired_fd =
24
  INVALID_HANDLE_VALUE;
25
#endif
26
27
zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
28
8.37k
    worker_poller_base_t (ctx_)
29
8.37k
{
30
8.37k
#ifdef ZMQ_IOTHREAD_POLLER_USE_EPOLL_CLOEXEC
31
    //  Setting this option result in sane behaviour when exec() functions
32
    //  are used. Old sockets are closed and don't block TCP ports, avoid
33
    //  leaks, etc.
34
8.37k
    _epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
35
#else
36
    _epoll_fd = epoll_create (1);
37
#endif
38
8.37k
    errno_assert (_epoll_fd != epoll_retired_fd);
39
8.37k
}
40
41
zmq::epoll_t::~epoll_t ()
42
8.37k
{
43
    //  Wait till the worker thread exits.
44
8.37k
    stop_worker ();
45
46
#ifdef ZMQ_HAVE_WINDOWS
47
    epoll_close (_epoll_fd);
48
#else
49
8.37k
    close (_epoll_fd);
50
8.37k
#endif
51
8.37k
    for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
52
8.37k
         it != end; ++it) {
53
0
        LIBZMQ_DELETE (*it);
54
0
    }
55
8.37k
}
56
57
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
58
12.8k
{
59
12.8k
    check_thread ();
60
12.8k
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
61
12.8k
    alloc_assert (pe);
62
63
    //  The memset is not actually needed. It's here to prevent debugging
64
    //  tools to complain about using uninitialised memory.
65
12.8k
    memset (pe, 0, sizeof (poll_entry_t));
66
67
12.8k
    pe->fd = fd_;
68
12.8k
    pe->ev.events = 0;
69
12.8k
    pe->ev.data.ptr = pe;
70
12.8k
    pe->events = events_;
71
72
12.8k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
73
12.8k
    errno_assert (rc != -1);
74
75
    //  Increase the load metric of the thread.
76
12.8k
    adjust_load (1);
77
78
12.8k
    return pe;
79
12.8k
}
80
81
void zmq::epoll_t::rm_fd (handle_t handle_)
82
12.8k
{
83
12.8k
    check_thread ();
84
12.8k
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
85
12.8k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
86
12.8k
    errno_assert (rc != -1);
87
12.8k
    pe->fd = retired_fd;
88
12.8k
    _retired.push_back (pe);
89
90
    //  Decrease the load metric of the thread.
91
12.8k
    adjust_load (-1);
92
12.8k
}
93
94
void zmq::epoll_t::set_pollin (handle_t handle_)
95
12.7k
{
96
12.7k
    check_thread ();
97
12.7k
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
98
12.7k
    pe->ev.events |= EPOLLIN;
99
12.7k
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
100
12.7k
    errno_assert (rc != -1);
101
12.7k
}
102
103
void zmq::epoll_t::reset_pollin (handle_t handle_)
104
0
{
105
0
    check_thread ();
106
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
107
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLIN));
108
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
109
0
    errno_assert (rc != -1);
110
0
}
111
112
void zmq::epoll_t::set_pollout (handle_t handle_)
113
97
{
114
97
    check_thread ();
115
97
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
116
97
    pe->ev.events |= EPOLLOUT;
117
97
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
118
97
    errno_assert (rc != -1);
119
97
}
120
121
void zmq::epoll_t::reset_pollout (handle_t handle_)
122
0
{
123
0
    check_thread ();
124
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
125
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLOUT));
126
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
127
0
    errno_assert (rc != -1);
128
0
}
129
130
void zmq::epoll_t::stop ()
131
8.37k
{
132
8.37k
    check_thread ();
133
8.37k
}
134
135
int zmq::epoll_t::max_fds ()
136
8.37k
{
137
8.37k
    return -1;
138
8.37k
}
139
140
void zmq::epoll_t::loop ()
141
8.37k
{
142
8.37k
    epoll_event ev_buf[max_io_events];
143
144
20.8k
    while (true) {
145
        //  Execute any due timers.
146
20.8k
        const int timeout = static_cast<int> (execute_timers ());
147
148
20.8k
        if (get_load () == 0) {
149
8.37k
            if (timeout == 0)
150
8.37k
                break;
151
152
            // TODO sleep for timeout
153
0
            continue;
154
8.37k
        }
155
156
        //  Wait for events.
157
12.5k
        const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
158
12.5k
                                  timeout ? timeout : -1);
159
12.5k
        if (n == -1) {
160
0
            errno_assert (errno == EINTR);
161
0
            continue;
162
0
        }
163
164
25.1k
        for (int i = 0; i < n; i++) {
165
12.6k
            const poll_entry_t *const pe =
166
12.6k
              static_cast<const poll_entry_t *> (ev_buf[i].data.ptr);
167
168
12.6k
            if (NULL == pe)
169
0
                continue;
170
12.6k
            if (NULL == pe->events)
171
0
                continue;
172
12.6k
            if (pe->fd == retired_fd)
173
0
                continue;
174
12.6k
            if (ev_buf[i].events & (EPOLLERR | EPOLLHUP))
175
31
                pe->events->in_event ();
176
12.6k
            if (pe->fd == retired_fd)
177
31
                continue;
178
12.6k
            if (ev_buf[i].events & EPOLLOUT)
179
0
                pe->events->out_event ();
180
12.6k
            if (pe->fd == retired_fd)
181
0
                continue;
182
12.6k
            if (ev_buf[i].events & EPOLLIN)
183
12.6k
                pe->events->in_event ();
184
12.6k
        }
185
186
        //  Destroy retired event sources.
187
12.5k
        for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
188
25.3k
             it != end; ++it) {
189
12.8k
            LIBZMQ_DELETE (*it);
190
12.8k
        }
191
12.5k
        _retired.clear ();
192
12.5k
    }
193
8.37k
}
194
195
#endif