Coverage Report

Created: 2025-10-13 07:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/epoll.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#if defined ZMQ_IOTHREAD_POLLER_USE_EPOLL
5
#include "epoll.hpp"
6
7
#if !defined ZMQ_HAVE_WINDOWS
8
#include <unistd.h>
9
#endif
10
11
#include <stdlib.h>
12
#include <string.h>
13
#include <signal.h>
14
#include <algorithm>
15
#include <new>
16
17
#include "macros.hpp"
18
#include "err.hpp"
19
#include "config.hpp"
20
#include "i_poll_events.hpp"
21
22
#ifdef ZMQ_HAVE_WINDOWS
23
const zmq::epoll_t::epoll_fd_t zmq::epoll_t::epoll_retired_fd =
24
  INVALID_HANDLE_VALUE;
25
#endif
26
27
zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
28
0
    worker_poller_base_t (ctx_)
29
0
{
30
0
#ifdef ZMQ_IOTHREAD_POLLER_USE_EPOLL_CLOEXEC
31
    //  Setting this option result in sane behaviour when exec() functions
32
    //  are used. Old sockets are closed and don't block TCP ports, avoid
33
    //  leaks, etc.
34
0
    _epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
35
#else
36
    _epoll_fd = epoll_create (1);
37
#endif
38
0
    errno_assert (_epoll_fd != epoll_retired_fd);
39
0
}
40
41
zmq::epoll_t::~epoll_t ()
42
0
{
43
    //  Wait till the worker thread exits.
44
0
    stop_worker ();
45
46
#ifdef ZMQ_HAVE_WINDOWS
47
    epoll_close (_epoll_fd);
48
#else
49
0
    close (_epoll_fd);
50
0
#endif
51
0
    for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
52
0
         it != end; ++it) {
53
0
        LIBZMQ_DELETE (*it);
54
0
    }
55
0
}
56
57
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
58
0
{
59
0
    check_thread ();
60
0
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
61
0
    alloc_assert (pe);
62
63
    //  The memset is not actually needed. It's here to prevent debugging
64
    //  tools to complain about using uninitialised memory.
65
0
    memset (pe, 0, sizeof (poll_entry_t));
66
67
0
    pe->fd = fd_;
68
0
    pe->ev.events = 0;
69
0
    pe->ev.data.ptr = pe;
70
0
    pe->events = events_;
71
72
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
73
0
    errno_assert (rc != -1);
74
75
    //  Increase the load metric of the thread.
76
0
    adjust_load (1);
77
78
0
    return pe;
79
0
}
80
81
void zmq::epoll_t::rm_fd (handle_t handle_)
82
0
{
83
0
    check_thread ();
84
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
85
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
86
0
    errno_assert (rc != -1);
87
0
    pe->fd = retired_fd;
88
0
    _retired.push_back (pe);
89
90
    //  Decrease the load metric of the thread.
91
0
    adjust_load (-1);
92
0
}
93
94
void zmq::epoll_t::set_pollin (handle_t handle_)
95
0
{
96
0
    check_thread ();
97
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
98
0
    pe->ev.events |= EPOLLIN;
99
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
100
0
    errno_assert (rc != -1);
101
0
}
102
103
void zmq::epoll_t::reset_pollin (handle_t handle_)
104
0
{
105
0
    check_thread ();
106
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
107
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLIN));
108
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
109
0
    errno_assert (rc != -1);
110
0
}
111
112
void zmq::epoll_t::set_pollout (handle_t handle_)
113
0
{
114
0
    check_thread ();
115
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
116
0
    pe->ev.events |= EPOLLOUT;
117
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
118
0
    errno_assert (rc != -1);
119
0
}
120
121
void zmq::epoll_t::reset_pollout (handle_t handle_)
122
0
{
123
0
    check_thread ();
124
0
    poll_entry_t *pe = static_cast<poll_entry_t *> (handle_);
125
0
    pe->ev.events &= ~(static_cast<uint32_t> (EPOLLOUT));
126
0
    const int rc = epoll_ctl (_epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
127
0
    errno_assert (rc != -1);
128
0
}
129
130
void zmq::epoll_t::stop ()
131
0
{
132
0
    check_thread ();
133
0
}
134
135
int zmq::epoll_t::max_fds ()
136
0
{
137
0
    return -1;
138
0
}
139
140
void zmq::epoll_t::loop ()
141
0
{
142
0
    epoll_event ev_buf[max_io_events];
143
144
0
    while (true) {
145
        //  Execute any due timers.
146
0
        const int timeout = static_cast<int> (execute_timers ());
147
148
0
        if (get_load () == 0) {
149
0
            if (timeout == 0)
150
0
                break;
151
152
            // TODO sleep for timeout
153
0
            continue;
154
0
        }
155
156
        //  Wait for events.
157
0
        const int n = epoll_wait (_epoll_fd, &ev_buf[0], max_io_events,
158
0
                                  timeout ? timeout : -1);
159
0
        if (n == -1) {
160
0
            errno_assert (errno == EINTR);
161
0
            continue;
162
0
        }
163
164
0
        for (int i = 0; i < n; i++) {
165
0
            const poll_entry_t *const pe =
166
0
              static_cast<const poll_entry_t *> (ev_buf[i].data.ptr);
167
168
0
            if (NULL == pe)
169
0
                continue;
170
0
            if (NULL == pe->events)
171
0
                continue;
172
0
            if (pe->fd == retired_fd)
173
0
                continue;
174
0
            if (ev_buf[i].events & (EPOLLERR | EPOLLHUP))
175
0
                pe->events->in_event ();
176
0
            if (pe->fd == retired_fd)
177
0
                continue;
178
0
            if (ev_buf[i].events & EPOLLOUT)
179
0
                pe->events->out_event ();
180
0
            if (pe->fd == retired_fd)
181
0
                continue;
182
0
            if (ev_buf[i].events & EPOLLIN)
183
0
                pe->events->in_event ();
184
0
        }
185
186
        //  Destroy retired event sources.
187
0
        for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
188
0
             it != end; ++it) {
189
0
            LIBZMQ_DELETE (*it);
190
0
        }
191
0
        _retired.clear ();
192
0
    }
193
0
}
194
195
#endif