Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | #include "ip.hpp" |
6 | | #include "tcp.hpp" |
7 | | #include "err.hpp" |
8 | | #include "options.hpp" |
9 | | |
10 | | #if !defined ZMQ_HAVE_WINDOWS |
11 | | #include <fcntl.h> |
12 | | #include <sys/types.h> |
13 | | #include <sys/socket.h> |
14 | | #include <netinet/in.h> |
15 | | #include <netinet/tcp.h> |
16 | | #include <unistd.h> |
17 | | #ifdef ZMQ_HAVE_VXWORKS |
18 | | #include <sockLib.h> |
19 | | #endif |
20 | | #endif |
21 | | |
22 | | #if defined ZMQ_HAVE_OPENVMS |
23 | | #include <ioctl.h> |
24 | | #endif |
25 | | |
26 | | #ifdef __APPLE__ |
27 | | #include <TargetConditionals.h> |
28 | | #endif |
29 | | |
30 | | int zmq::tune_tcp_socket (fd_t s_) |
31 | 0 | { |
32 | | // Disable Nagle's algorithm. We are doing data batching on 0MQ level, |
33 | | // so using Nagle wouldn't improve throughput in anyway, but it would |
34 | | // hurt latency. |
35 | 0 | int nodelay = 1; |
36 | 0 | const int rc = |
37 | 0 | setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, |
38 | 0 | reinterpret_cast<char *> (&nodelay), sizeof (int)); |
39 | 0 | assert_success_or_recoverable (s_, rc); |
40 | 0 | if (rc != 0) |
41 | 0 | return rc; |
42 | | |
43 | | #ifdef ZMQ_HAVE_OPENVMS |
44 | | // Disable delayed acknowledgements as they hurt latency significantly. |
45 | | int nodelack = 1; |
46 | | rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack, |
47 | | sizeof (int)); |
48 | | assert_success_or_recoverable (s_, rc); |
49 | | #endif |
50 | 0 | return rc; |
51 | 0 | } |
52 | | |
53 | | int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) |
54 | 0 | { |
55 | 0 | const int rc = |
56 | 0 | setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, |
57 | 0 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
58 | 0 | assert_success_or_recoverable (sockfd_, rc); |
59 | 0 | return rc; |
60 | 0 | } |
61 | | |
62 | | int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) |
63 | 0 | { |
64 | 0 | const int rc = |
65 | 0 | setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, |
66 | 0 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
67 | 0 | assert_success_or_recoverable (sockfd_, rc); |
68 | 0 | return rc; |
69 | 0 | } |
70 | | |
71 | | int zmq::tune_tcp_keepalives (fd_t s_, |
72 | | int keepalive_, |
73 | | int keepalive_cnt_, |
74 | | int keepalive_idle_, |
75 | | int keepalive_intvl_) |
76 | 0 | { |
77 | | // These options are used only under certain #ifdefs below. |
78 | 0 | LIBZMQ_UNUSED (keepalive_); |
79 | 0 | LIBZMQ_UNUSED (keepalive_cnt_); |
80 | 0 | LIBZMQ_UNUSED (keepalive_idle_); |
81 | 0 | LIBZMQ_UNUSED (keepalive_intvl_); |
82 | | |
83 | | // If none of the #ifdefs apply, then s_ is unused. |
84 | 0 | LIBZMQ_UNUSED (s_); |
85 | | |
86 | | // Tuning TCP keep-alives if platform allows it |
87 | | // All values = -1 means skip and leave it for OS |
88 | | #ifdef ZMQ_HAVE_WINDOWS |
89 | | if (keepalive_ != -1) { |
90 | | tcp_keepalive keepalive_opts; |
91 | | keepalive_opts.onoff = keepalive_; |
92 | | keepalive_opts.keepalivetime = |
93 | | keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000; |
94 | | keepalive_opts.keepaliveinterval = |
95 | | keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000; |
96 | | DWORD num_bytes_returned; |
97 | | const int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts, |
98 | | sizeof (keepalive_opts), NULL, 0, |
99 | | &num_bytes_returned, NULL, NULL); |
100 | | assert_success_or_recoverable (s_, rc); |
101 | | if (rc == SOCKET_ERROR) |
102 | | return rc; |
103 | | } |
104 | | #else |
105 | 0 | #ifdef ZMQ_HAVE_SO_KEEPALIVE |
106 | 0 | if (keepalive_ != -1) { |
107 | 0 | int rc = |
108 | 0 | setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, |
109 | 0 | reinterpret_cast<char *> (&keepalive_), sizeof (int)); |
110 | 0 | assert_success_or_recoverable (s_, rc); |
111 | 0 | if (rc != 0) |
112 | 0 | return rc; |
113 | | |
114 | 0 | #ifdef ZMQ_HAVE_TCP_KEEPCNT |
115 | 0 | if (keepalive_cnt_ != -1) { |
116 | 0 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, |
117 | 0 | sizeof (int)); |
118 | 0 | assert_success_or_recoverable (s_, rc); |
119 | 0 | if (rc != 0) |
120 | 0 | return rc; |
121 | 0 | } |
122 | 0 | #endif // ZMQ_HAVE_TCP_KEEPCNT |
123 | | |
124 | 0 | #ifdef ZMQ_HAVE_TCP_KEEPIDLE |
125 | 0 | if (keepalive_idle_ != -1) { |
126 | 0 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, |
127 | 0 | &keepalive_idle_, sizeof (int)); |
128 | 0 | assert_success_or_recoverable (s_, rc); |
129 | 0 | if (rc != 0) |
130 | 0 | return rc; |
131 | 0 | } |
132 | | #else // ZMQ_HAVE_TCP_KEEPIDLE |
133 | | #ifdef ZMQ_HAVE_TCP_KEEPALIVE |
134 | | if (keepalive_idle_ != -1) { |
135 | | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, |
136 | | &keepalive_idle_, sizeof (int)); |
137 | | assert_success_or_recoverable (s_, rc); |
138 | | if (rc != 0) |
139 | | return rc; |
140 | | } |
141 | | #endif // ZMQ_HAVE_TCP_KEEPALIVE |
142 | | #endif // ZMQ_HAVE_TCP_KEEPIDLE |
143 | | |
144 | 0 | #ifdef ZMQ_HAVE_TCP_KEEPINTVL |
145 | 0 | if (keepalive_intvl_ != -1) { |
146 | 0 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, |
147 | 0 | &keepalive_intvl_, sizeof (int)); |
148 | 0 | assert_success_or_recoverable (s_, rc); |
149 | 0 | if (rc != 0) |
150 | 0 | return rc; |
151 | 0 | } |
152 | 0 | #endif // ZMQ_HAVE_TCP_KEEPINTVL |
153 | 0 | } |
154 | 0 | #endif // ZMQ_HAVE_SO_KEEPALIVE |
155 | 0 | #endif // ZMQ_HAVE_WINDOWS |
156 | | |
157 | 0 | return 0; |
158 | 0 | } |
159 | | |
160 | | int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) |
161 | 0 | { |
162 | 0 | if (timeout_ <= 0) |
163 | 0 | return 0; |
164 | | |
165 | 0 | LIBZMQ_UNUSED (sockfd_); |
166 | |
|
167 | | #if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT) |
168 | | // msdn says it's supported in >= Vista, >= Windows Server 2003 |
169 | | timeout_ /= 1000; // in seconds |
170 | | const int rc = |
171 | | setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, |
172 | | reinterpret_cast<char *> (&timeout_), sizeof (timeout_)); |
173 | | assert_success_or_recoverable (sockfd_, rc); |
174 | | return rc; |
175 | | // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT |
176 | | #elif defined(TCP_USER_TIMEOUT) |
177 | 0 | int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, |
178 | 0 | sizeof (timeout_)); |
179 | 0 | assert_success_or_recoverable (sockfd_, rc); |
180 | 0 | return rc; |
181 | | #else |
182 | | return 0; |
183 | | #endif |
184 | 0 | } |
185 | | |
186 | | int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) |
187 | 0 | { |
188 | | #ifdef ZMQ_HAVE_WINDOWS |
189 | | |
190 | | const int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0); |
191 | | |
192 | | // If not a single byte can be written to the socket in non-blocking mode |
193 | | // we'll get an error (this may happen during the speculative write). |
194 | | const int last_error = WSAGetLastError (); |
195 | | if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK) |
196 | | return 0; |
197 | | |
198 | | // Signalise peer failure. |
199 | | if (nbytes == SOCKET_ERROR |
200 | | && (last_error == WSAENETDOWN || last_error == WSAENETRESET |
201 | | || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED |
202 | | || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET)) |
203 | | return -1; |
204 | | |
205 | | // Circumvent a Windows bug: |
206 | | // See https://support.microsoft.com/en-us/kb/201213 |
207 | | // See https://zeromq.jira.com/browse/LIBZMQ-195 |
208 | | if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS) |
209 | | return 0; |
210 | | |
211 | | wsa_assert (nbytes != SOCKET_ERROR); |
212 | | return nbytes; |
213 | | |
214 | | #else |
215 | 0 | ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0); |
216 | | |
217 | | // Several errors are OK. When speculative write is being done we may not |
218 | | // be able to write a single byte from the socket. Also, SIGSTOP issued |
219 | | // by a debugging tool can result in EINTR error. |
220 | 0 | if (nbytes == -1 |
221 | 0 | && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) |
222 | 0 | return 0; |
223 | | |
224 | | // Signalise peer failure. |
225 | 0 | if (nbytes == -1) { |
226 | 0 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
227 | 0 | errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ |
228 | 0 | && errno != EFAULT && errno != EISCONN |
229 | 0 | && errno != EMSGSIZE && errno != ENOMEM |
230 | 0 | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
231 | | #else |
232 | | errno_assert (errno != EACCES && errno != EDESTADDRREQ |
233 | | && errno != EFAULT && errno != EISCONN |
234 | | && errno != EMSGSIZE && errno != ENOMEM |
235 | | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
236 | | #endif |
237 | 0 | return -1; |
238 | 0 | } |
239 | | |
240 | 0 | return static_cast<int> (nbytes); |
241 | |
|
242 | 0 | #endif |
243 | 0 | } |
244 | | |
245 | | int zmq::tcp_read (fd_t s_, void *data_, size_t size_) |
246 | 0 | { |
247 | | #ifdef ZMQ_HAVE_WINDOWS |
248 | | |
249 | | const int rc = |
250 | | recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0); |
251 | | |
252 | | // If not a single byte can be read from the socket in non-blocking mode |
253 | | // we'll get an error (this may happen during the speculative read). |
254 | | if (rc == SOCKET_ERROR) { |
255 | | const int last_error = WSAGetLastError (); |
256 | | if (last_error == WSAEWOULDBLOCK) { |
257 | | errno = EAGAIN; |
258 | | } else { |
259 | | wsa_assert ( |
260 | | last_error == WSAENETDOWN || last_error == WSAENETRESET |
261 | | || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT |
262 | | || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED |
263 | | || last_error == WSAENOTCONN || last_error == WSAENOBUFS); |
264 | | errno = wsa_error_to_errno (last_error); |
265 | | } |
266 | | } |
267 | | |
268 | | return rc == SOCKET_ERROR ? -1 : rc; |
269 | | |
270 | | #else |
271 | |
|
272 | 0 | const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0); |
273 | | |
274 | | // Several errors are OK. When speculative read is being done we may not |
275 | | // be able to read a single byte from the socket. Also, SIGSTOP issued |
276 | | // by a debugging tool can result in EINTR error. |
277 | 0 | if (rc == -1) { |
278 | 0 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
279 | 0 | errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM |
280 | 0 | && errno != ENOTSOCK); |
281 | | #else |
282 | | errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK); |
283 | | #endif |
284 | 0 | if (errno == EWOULDBLOCK || errno == EINTR) |
285 | 0 | errno = EAGAIN; |
286 | 0 | } |
287 | |
|
288 | 0 | return static_cast<int> (rc); |
289 | |
|
290 | 0 | #endif |
291 | 0 | } |
292 | | |
293 | | void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) |
294 | 0 | { |
295 | | #if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH |
296 | | int sio_loopback_fastpath = 1; |
297 | | DWORD number_of_bytes_returned = 0; |
298 | | |
299 | | const int rc = WSAIoctl ( |
300 | | socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath, |
301 | | sizeof sio_loopback_fastpath, NULL, 0, &number_of_bytes_returned, 0, 0); |
302 | | |
303 | | if (SOCKET_ERROR == rc) { |
304 | | const DWORD last_error = ::WSAGetLastError (); |
305 | | |
306 | | if (WSAEOPNOTSUPP == last_error) { |
307 | | // This system is not Windows 8 or Server 2012, and the call is not supported. |
308 | | } else { |
309 | | wsa_assert (false); |
310 | | } |
311 | | } |
312 | | #else |
313 | 0 | LIBZMQ_UNUSED (socket_); |
314 | 0 | #endif |
315 | 0 | } |
316 | | |
317 | | void zmq::tune_tcp_busy_poll (fd_t socket_, int busy_poll_) |
318 | 0 | { |
319 | | #if defined(ZMQ_HAVE_BUSY_POLL) |
320 | | if (busy_poll_ > 0) { |
321 | | const int rc = |
322 | | setsockopt (socket_, SOL_SOCKET, SO_BUSY_POLL, |
323 | | reinterpret_cast<char *> (&busy_poll_), sizeof (int)); |
324 | | assert_success_or_recoverable (socket_, rc); |
325 | | } |
326 | | #else |
327 | 0 | LIBZMQ_UNUSED (socket_); |
328 | 0 | LIBZMQ_UNUSED (busy_poll_); |
329 | 0 | #endif |
330 | 0 | } |
331 | | |
332 | | zmq::fd_t zmq::tcp_open_socket (const char *address_, |
333 | | const zmq::options_t &options_, |
334 | | bool local_, |
335 | | bool fallback_to_ipv4_, |
336 | | zmq::tcp_address_t *out_tcp_addr_) |
337 | 0 | { |
338 | | // Convert the textual address into address structure. |
339 | 0 | int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6); |
340 | 0 | if (rc != 0) |
341 | 0 | return retired_fd; |
342 | | |
343 | | // Create the socket. |
344 | 0 | fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP); |
345 | | |
346 | | // IPv6 address family not supported, try automatic downgrade to IPv4. |
347 | 0 | if (s == retired_fd && fallback_to_ipv4_ |
348 | 0 | && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT |
349 | 0 | && options_.ipv6) { |
350 | 0 | rc = out_tcp_addr_->resolve (address_, local_, false); |
351 | 0 | if (rc != 0) { |
352 | 0 | return retired_fd; |
353 | 0 | } |
354 | 0 | s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); |
355 | 0 | } |
356 | | |
357 | 0 | if (s == retired_fd) { |
358 | 0 | return retired_fd; |
359 | 0 | } |
360 | | |
361 | | // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. |
362 | | // Switch it on in such cases. |
363 | 0 | if (out_tcp_addr_->family () == AF_INET6) |
364 | 0 | enable_ipv4_mapping (s); |
365 | | |
366 | | // Set the IP Type-Of-Service priority for this socket |
367 | 0 | if (options_.tos != 0) |
368 | 0 | set_ip_type_of_service (s, options_.tos); |
369 | | |
370 | | // Set the protocol-defined priority for this socket |
371 | 0 | if (options_.priority != 0) |
372 | 0 | set_socket_priority (s, options_.priority); |
373 | | |
374 | | // Set the socket to loopback fastpath if configured. |
375 | 0 | if (options_.loopback_fastpath) |
376 | 0 | tcp_tune_loopback_fast_path (s); |
377 | | |
378 | | // Bind the socket to a device if applicable |
379 | 0 | if (!options_.bound_device.empty ()) |
380 | 0 | if (bind_to_device (s, options_.bound_device) == -1) |
381 | 0 | goto setsockopt_error; |
382 | | |
383 | | // Set the socket buffer limits for the underlying socket. |
384 | 0 | if (options_.sndbuf >= 0) |
385 | 0 | set_tcp_send_buffer (s, options_.sndbuf); |
386 | 0 | if (options_.rcvbuf >= 0) |
387 | 0 | set_tcp_receive_buffer (s, options_.rcvbuf); |
388 | | |
389 | | // This option removes several delays caused by scheduling, interrupts and context switching. |
390 | 0 | if (options_.busy_poll) |
391 | 0 | tune_tcp_busy_poll (s, options_.busy_poll); |
392 | 0 | return s; |
393 | | |
394 | 0 | setsockopt_error: |
395 | | #ifdef ZMQ_HAVE_WINDOWS |
396 | | rc = closesocket (s); |
397 | | wsa_assert (rc != SOCKET_ERROR); |
398 | | #else |
399 | 0 | rc = ::close (s); |
400 | | errno_assert (rc == 0); |
401 | 0 | #endif |
402 | 0 | return retired_fd; |
403 | 0 | } |