/src/libzmq/src/signaler.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "poller.hpp" |
5 | | #include "polling_util.hpp" |
6 | | |
7 | | #if defined ZMQ_POLL_BASED_ON_POLL |
8 | | #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX |
9 | | #include <poll.h> |
10 | | #endif |
11 | | #elif defined ZMQ_POLL_BASED_ON_SELECT |
12 | | #if defined ZMQ_HAVE_WINDOWS |
13 | | #elif defined ZMQ_HAVE_HPUX |
14 | | #include <sys/param.h> |
15 | | #include <sys/types.h> |
16 | | #include <sys/time.h> |
17 | | #elif defined ZMQ_HAVE_OPENVMS |
18 | | #include <sys/types.h> |
19 | | #include <sys/time.h> |
20 | | #elif defined ZMQ_HAVE_VXWORKS |
21 | | #include <sys/types.h> |
22 | | #include <sys/time.h> |
23 | | #include <sockLib.h> |
24 | | #include <strings.h> |
25 | | #else |
26 | | #include <sys/select.h> |
27 | | #endif |
28 | | #endif |
29 | | |
30 | | #include "signaler.hpp" |
31 | | #include "likely.hpp" |
32 | | #include "stdint.hpp" |
33 | | #include "config.hpp" |
34 | | #include "err.hpp" |
35 | | #include "fd.hpp" |
36 | | #include "ip.hpp" |
37 | | #include "tcp.hpp" |
38 | | |
39 | | #if !defined ZMQ_HAVE_WINDOWS |
40 | | #include <unistd.h> |
41 | | #include <netinet/tcp.h> |
42 | | #include <sys/types.h> |
43 | | #include <sys/socket.h> |
44 | | #endif |
45 | | |
46 | | #if !defined(ZMQ_HAVE_WINDOWS) |
47 | | // Helper to sleep for specific number of milliseconds (or until signal) |
48 | | // |
49 | | static int sleep_ms (unsigned int ms_) |
50 | 0 | { |
51 | 0 | if (ms_ == 0) |
52 | 0 | return 0; |
53 | | #if defined ZMQ_HAVE_ANDROID |
54 | | usleep (ms_ * 1000); |
55 | | return 0; |
56 | | #elif defined ZMQ_HAVE_VXWORKS |
57 | | struct timespec ns_; |
58 | | ns_.tv_sec = ms_ / 1000; |
59 | | ns_.tv_nsec = ms_ % 1000 * 1000000; |
60 | | return nanosleep (&ns_, 0); |
61 | | #else |
62 | 0 | return usleep (ms_ * 1000); |
63 | 0 | #endif |
64 | 0 | } |
65 | | |
66 | | // Helper to wait on close(), for non-blocking sockets, until it completes |
67 | | // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until |
68 | | // the overall timeout is reached. |
69 | | // |
70 | | static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000) |
71 | 0 | { |
72 | 0 | unsigned int ms_so_far = 0; |
73 | 0 | const unsigned int min_step_ms = 1; |
74 | 0 | const unsigned int max_step_ms = 100; |
75 | 0 | const unsigned int step_ms = |
76 | 0 | std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms); |
77 | |
|
78 | 0 | int rc = 0; // do not sleep on first attempt |
79 | 0 | do { |
80 | 0 | if (rc == -1 && errno == EAGAIN) { |
81 | 0 | sleep_ms (step_ms); |
82 | 0 | ms_so_far += step_ms; |
83 | 0 | } |
84 | 0 | rc = close (fd_); |
85 | 0 | } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN); |
86 | |
|
87 | 0 | return rc; |
88 | 0 | } |
89 | | #endif |
90 | | |
91 | | zmq::signaler_t::signaler_t () |
92 | 0 | { |
93 | | // Create the socketpair for signaling. |
94 | 0 | if (make_fdpair (&_r, &_w) == 0) { |
95 | 0 | unblock_socket (_w); |
96 | 0 | unblock_socket (_r); |
97 | 0 | } |
98 | 0 | #ifdef HAVE_FORK |
99 | 0 | pid = getpid (); |
100 | 0 | #endif |
101 | 0 | } |
102 | | |
103 | | // This might get run after some part of construction failed, leaving one or |
104 | | // both of _r and _w retired_fd. |
105 | | zmq::signaler_t::~signaler_t () |
106 | 0 | { |
107 | 0 | #if defined ZMQ_HAVE_EVENTFD |
108 | 0 | if (_r == retired_fd) |
109 | 0 | return; |
110 | 0 | int rc = close_wait_ms (_r); |
111 | 0 | errno_assert (rc == 0); |
112 | | #elif defined ZMQ_HAVE_WINDOWS |
113 | | if (_w != retired_fd) { |
114 | | const struct linger so_linger = {1, 0}; |
115 | | int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER, |
116 | | reinterpret_cast<const char *> (&so_linger), |
117 | | sizeof so_linger); |
118 | | // Only check shutdown if WSASTARTUP was previously done |
119 | | if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) { |
120 | | wsa_assert (rc != SOCKET_ERROR); |
121 | | rc = closesocket (_w); |
122 | | wsa_assert (rc != SOCKET_ERROR); |
123 | | if (_r == retired_fd) |
124 | | return; |
125 | | rc = closesocket (_r); |
126 | | wsa_assert (rc != SOCKET_ERROR); |
127 | | } |
128 | | } |
129 | | #else |
130 | | if (_w != retired_fd) { |
131 | | int rc = close_wait_ms (_w); |
132 | | errno_assert (rc == 0); |
133 | | } |
134 | | if (_r != retired_fd) { |
135 | | int rc = close_wait_ms (_r); |
136 | | errno_assert (rc == 0); |
137 | | } |
138 | | #endif |
139 | 0 | } |
140 | | |
141 | | zmq::fd_t zmq::signaler_t::get_fd () const |
142 | 0 | { |
143 | 0 | return _r; |
144 | 0 | } |
145 | | |
146 | | void zmq::signaler_t::send () |
147 | 0 | { |
148 | 0 | #if defined HAVE_FORK |
149 | 0 | if (unlikely (pid != getpid ())) { |
150 | | //printf("Child process %d signaler_t::send returning without sending #1\n", getpid()); |
151 | 0 | return; // do not send anything in forked child context |
152 | 0 | } |
153 | 0 | #endif |
154 | 0 | #if defined ZMQ_HAVE_EVENTFD |
155 | 0 | const uint64_t inc = 1; |
156 | 0 | ssize_t sz = write (_w, &inc, sizeof (inc)); |
157 | 0 | errno_assert (sz == sizeof (inc)); |
158 | | #elif defined ZMQ_HAVE_WINDOWS |
159 | | const char dummy = 0; |
160 | | int nbytes; |
161 | | do { |
162 | | nbytes = ::send (_w, &dummy, sizeof (dummy), 0); |
163 | | wsa_assert (nbytes != SOCKET_ERROR); |
164 | | // wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry. |
165 | | } while (nbytes == SOCKET_ERROR); |
166 | | // Given the small size of dummy (should be 1) expect that send was able to send everything. |
167 | | zmq_assert (nbytes == sizeof (dummy)); |
168 | | #elif defined ZMQ_HAVE_VXWORKS |
169 | | unsigned char dummy = 0; |
170 | | while (true) { |
171 | | ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0); |
172 | | if (unlikely (nbytes == -1 && errno == EINTR)) |
173 | | continue; |
174 | | #if defined(HAVE_FORK) |
175 | | if (unlikely (pid != getpid ())) { |
176 | | //printf("Child process %d signaler_t::send returning without sending #2\n", getpid()); |
177 | | errno = EINTR; |
178 | | break; |
179 | | } |
180 | | #endif |
181 | | zmq_assert (nbytes == sizeof dummy); |
182 | | break; |
183 | | } |
184 | | #else |
185 | | unsigned char dummy = 0; |
186 | | while (true) { |
187 | | ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0); |
188 | | if (unlikely (nbytes == -1 && errno == EINTR)) |
189 | | continue; |
190 | | #if defined(HAVE_FORK) |
191 | | if (unlikely (pid != getpid ())) { |
192 | | //printf("Child process %d signaler_t::send returning without sending #2\n", getpid()); |
193 | | errno = EINTR; |
194 | | break; |
195 | | } |
196 | | #endif |
197 | | zmq_assert (nbytes == sizeof dummy); |
198 | | break; |
199 | | } |
200 | | #endif |
201 | 0 | } |
202 | | |
203 | | int zmq::signaler_t::wait (int timeout_) const |
204 | 0 | { |
205 | 0 | #ifdef HAVE_FORK |
206 | 0 | if (unlikely (pid != getpid ())) { |
207 | | // we have forked and the file descriptor is closed. Emulate an interrupt |
208 | | // response. |
209 | | //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid()); |
210 | 0 | errno = EINTR; |
211 | 0 | return -1; |
212 | 0 | } |
213 | 0 | #endif |
214 | | |
215 | 0 | #ifdef ZMQ_POLL_BASED_ON_POLL |
216 | 0 | struct pollfd pfd; |
217 | 0 | pfd.fd = _r; |
218 | 0 | pfd.events = POLLIN; |
219 | 0 | const int rc = poll (&pfd, 1, timeout_); |
220 | 0 | if (unlikely (rc < 0)) { |
221 | 0 | errno_assert (errno == EINTR); |
222 | 0 | return -1; |
223 | 0 | } |
224 | 0 | if (unlikely (rc == 0)) { |
225 | 0 | errno = EAGAIN; |
226 | 0 | return -1; |
227 | 0 | } |
228 | 0 | #ifdef HAVE_FORK |
229 | 0 | if (unlikely (pid != getpid ())) { |
230 | | // we have forked and the file descriptor is closed. Emulate an interrupt |
231 | | // response. |
232 | | //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid()); |
233 | 0 | errno = EINTR; |
234 | 0 | return -1; |
235 | 0 | } |
236 | 0 | #endif |
237 | 0 | zmq_assert (rc == 1); |
238 | 0 | zmq_assert (pfd.revents & POLLIN); |
239 | 0 | return 0; |
240 | |
|
241 | | #elif defined ZMQ_POLL_BASED_ON_SELECT |
242 | | |
243 | | optimized_fd_set_t fds (1); |
244 | | FD_ZERO (fds.get ()); |
245 | | FD_SET (_r, fds.get ()); |
246 | | struct timeval timeout; |
247 | | if (timeout_ >= 0) { |
248 | | timeout.tv_sec = timeout_ / 1000; |
249 | | timeout.tv_usec = timeout_ % 1000 * 1000; |
250 | | } |
251 | | #ifdef ZMQ_HAVE_WINDOWS |
252 | | int rc = |
253 | | select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); |
254 | | wsa_assert (rc != SOCKET_ERROR); |
255 | | #else |
256 | | int rc = |
257 | | select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); |
258 | | if (unlikely (rc < 0)) { |
259 | | errno_assert (errno == EINTR); |
260 | | return -1; |
261 | | } |
262 | | #endif |
263 | | if (unlikely (rc == 0)) { |
264 | | errno = EAGAIN; |
265 | | return -1; |
266 | | } |
267 | | zmq_assert (rc == 1); |
268 | | return 0; |
269 | | |
270 | | #else |
271 | | #error |
272 | | #endif |
273 | 0 | } |
274 | | |
275 | | void zmq::signaler_t::recv () |
276 | 0 | { |
277 | | // Attempt to read a signal. |
278 | 0 | #if defined ZMQ_HAVE_EVENTFD |
279 | 0 | uint64_t dummy; |
280 | 0 | ssize_t sz = read (_r, &dummy, sizeof (dummy)); |
281 | 0 | errno_assert (sz == sizeof (dummy)); |
282 | | |
283 | | // If we accidentally grabbed the next signal(s) along with the current |
284 | | // one, return it back to the eventfd object. |
285 | 0 | if (unlikely (dummy > 1)) { |
286 | 0 | const uint64_t inc = dummy - 1; |
287 | 0 | ssize_t sz2 = write (_w, &inc, sizeof (inc)); |
288 | 0 | errno_assert (sz2 == sizeof (inc)); |
289 | 0 | return; |
290 | 0 | } |
291 | | |
292 | 0 | zmq_assert (dummy == 1); |
293 | | #else |
294 | | unsigned char dummy; |
295 | | #if defined ZMQ_HAVE_WINDOWS |
296 | | const int nbytes = |
297 | | ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0); |
298 | | wsa_assert (nbytes != SOCKET_ERROR); |
299 | | #elif defined ZMQ_HAVE_VXWORKS |
300 | | ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0); |
301 | | errno_assert (nbytes >= 0); |
302 | | #else |
303 | | ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0); |
304 | | errno_assert (nbytes >= 0); |
305 | | #endif |
306 | | zmq_assert (nbytes == sizeof (dummy)); |
307 | | zmq_assert (dummy == 0); |
308 | | #endif |
309 | 0 | } |
310 | | |
311 | | int zmq::signaler_t::recv_failable () |
312 | 0 | { |
313 | | // Attempt to read a signal. |
314 | 0 | #if defined ZMQ_HAVE_EVENTFD |
315 | 0 | uint64_t dummy; |
316 | 0 | ssize_t sz = read (_r, &dummy, sizeof (dummy)); |
317 | 0 | if (sz == -1) { |
318 | 0 | errno_assert (errno == EAGAIN); |
319 | 0 | return -1; |
320 | 0 | } |
321 | 0 | errno_assert (sz == sizeof (dummy)); |
322 | | |
323 | | // If we accidentally grabbed the next signal(s) along with the current |
324 | | // one, return it back to the eventfd object. |
325 | 0 | if (unlikely (dummy > 1)) { |
326 | 0 | const uint64_t inc = dummy - 1; |
327 | 0 | ssize_t sz2 = write (_w, &inc, sizeof (inc)); |
328 | 0 | errno_assert (sz2 == sizeof (inc)); |
329 | 0 | return 0; |
330 | 0 | } |
331 | | |
332 | 0 | zmq_assert (dummy == 1); |
333 | |
|
334 | | #else |
335 | | unsigned char dummy; |
336 | | #if defined ZMQ_HAVE_WINDOWS |
337 | | const int nbytes = |
338 | | ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0); |
339 | | if (nbytes == SOCKET_ERROR) { |
340 | | const int last_error = WSAGetLastError (); |
341 | | if (last_error == WSAEWOULDBLOCK) { |
342 | | errno = EAGAIN; |
343 | | return -1; |
344 | | } |
345 | | wsa_assert (last_error == WSAEWOULDBLOCK); |
346 | | } |
347 | | #elif defined ZMQ_HAVE_VXWORKS |
348 | | ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0); |
349 | | if (nbytes == -1) { |
350 | | if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { |
351 | | errno = EAGAIN; |
352 | | return -1; |
353 | | } |
354 | | errno_assert (errno == EAGAIN || errno == EWOULDBLOCK |
355 | | || errno == EINTR); |
356 | | } |
357 | | #else |
358 | | ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0); |
359 | | if (nbytes == -1) { |
360 | | if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { |
361 | | errno = EAGAIN; |
362 | | return -1; |
363 | | } |
364 | | errno_assert (errno == EAGAIN || errno == EWOULDBLOCK |
365 | | || errno == EINTR); |
366 | | } |
367 | | #endif |
368 | | zmq_assert (nbytes == sizeof (dummy)); |
369 | | zmq_assert (dummy == 0); |
370 | | #endif |
371 | 0 | return 0; |
372 | 0 | } |
373 | | |
374 | | bool zmq::signaler_t::valid () const |
375 | 0 | { |
376 | 0 | return _w != retired_fd; |
377 | 0 | } |
378 | | |
379 | | #ifdef HAVE_FORK |
380 | | void zmq::signaler_t::forked () |
381 | 0 | { |
382 | | // Close file descriptors created in the parent and create new pair |
383 | 0 | close (_r); |
384 | 0 | close (_w); |
385 | 0 | make_fdpair (&_r, &_w); |
386 | 0 | } |
387 | | #endif |