Coverage Report

Created: 2025-10-13 07:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/tcp.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
#include "ip.hpp"
6
#include "tcp.hpp"
7
#include "err.hpp"
8
#include "options.hpp"
9
10
#if !defined ZMQ_HAVE_WINDOWS
11
#include <fcntl.h>
12
#include <sys/types.h>
13
#include <sys/socket.h>
14
#include <netinet/in.h>
15
#include <netinet/tcp.h>
16
#include <unistd.h>
17
#ifdef ZMQ_HAVE_VXWORKS
18
#include <sockLib.h>
19
#endif
20
#endif
21
22
#if defined ZMQ_HAVE_OPENVMS
23
#include <ioctl.h>
24
#endif
25
26
#ifdef __APPLE__
27
#include <TargetConditionals.h>
28
#endif
29
30
int zmq::tune_tcp_socket (fd_t s_)
31
0
{
32
    //  Disable Nagle's algorithm. We are doing data batching on 0MQ level,
33
    //  so using Nagle wouldn't improve throughput in anyway, but it would
34
    //  hurt latency.
35
0
    int nodelay = 1;
36
0
    const int rc =
37
0
      setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
38
0
                  reinterpret_cast<char *> (&nodelay), sizeof (int));
39
0
    assert_success_or_recoverable (s_, rc);
40
0
    if (rc != 0)
41
0
        return rc;
42
43
#ifdef ZMQ_HAVE_OPENVMS
44
    //  Disable delayed acknowledgements as they hurt latency significantly.
45
    int nodelack = 1;
46
    rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
47
                     sizeof (int));
48
    assert_success_or_recoverable (s_, rc);
49
#endif
50
0
    return rc;
51
0
}
52
53
int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
54
0
{
55
0
    const int rc =
56
0
      setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
57
0
                  reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
58
0
    assert_success_or_recoverable (sockfd_, rc);
59
0
    return rc;
60
0
}
61
62
int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
63
0
{
64
0
    const int rc =
65
0
      setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
66
0
                  reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
67
0
    assert_success_or_recoverable (sockfd_, rc);
68
0
    return rc;
69
0
}
70
71
int zmq::tune_tcp_keepalives (fd_t s_,
72
                              int keepalive_,
73
                              int keepalive_cnt_,
74
                              int keepalive_idle_,
75
                              int keepalive_intvl_)
76
0
{
77
    // These options are used only under certain #ifdefs below.
78
0
    LIBZMQ_UNUSED (keepalive_);
79
0
    LIBZMQ_UNUSED (keepalive_cnt_);
80
0
    LIBZMQ_UNUSED (keepalive_idle_);
81
0
    LIBZMQ_UNUSED (keepalive_intvl_);
82
83
    // If none of the #ifdefs apply, then s_ is unused.
84
0
    LIBZMQ_UNUSED (s_);
85
86
    //  Tuning TCP keep-alives if platform allows it
87
    //  All values = -1 means skip and leave it for OS
88
#ifdef ZMQ_HAVE_WINDOWS
89
    if (keepalive_ != -1) {
90
        tcp_keepalive keepalive_opts;
91
        keepalive_opts.onoff = keepalive_;
92
        keepalive_opts.keepalivetime =
93
          keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000;
94
        keepalive_opts.keepaliveinterval =
95
          keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000;
96
        DWORD num_bytes_returned;
97
        const int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
98
                                 sizeof (keepalive_opts), NULL, 0,
99
                                 &num_bytes_returned, NULL, NULL);
100
        assert_success_or_recoverable (s_, rc);
101
        if (rc == SOCKET_ERROR)
102
            return rc;
103
    }
104
#else
105
0
#ifdef ZMQ_HAVE_SO_KEEPALIVE
106
0
    if (keepalive_ != -1) {
107
0
        int rc =
108
0
          setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
109
0
                      reinterpret_cast<char *> (&keepalive_), sizeof (int));
110
0
        assert_success_or_recoverable (s_, rc);
111
0
        if (rc != 0)
112
0
            return rc;
113
114
0
#ifdef ZMQ_HAVE_TCP_KEEPCNT
115
0
        if (keepalive_cnt_ != -1) {
116
0
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
117
0
                                 sizeof (int));
118
0
            assert_success_or_recoverable (s_, rc);
119
0
            if (rc != 0)
120
0
                return rc;
121
0
        }
122
0
#endif // ZMQ_HAVE_TCP_KEEPCNT
123
124
0
#ifdef ZMQ_HAVE_TCP_KEEPIDLE
125
0
        if (keepalive_idle_ != -1) {
126
0
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
127
0
                                 &keepalive_idle_, sizeof (int));
128
0
            assert_success_or_recoverable (s_, rc);
129
0
            if (rc != 0)
130
0
                return rc;
131
0
        }
132
#else // ZMQ_HAVE_TCP_KEEPIDLE
133
#ifdef ZMQ_HAVE_TCP_KEEPALIVE
134
        if (keepalive_idle_ != -1) {
135
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
136
                                 &keepalive_idle_, sizeof (int));
137
            assert_success_or_recoverable (s_, rc);
138
            if (rc != 0)
139
                return rc;
140
        }
141
#endif // ZMQ_HAVE_TCP_KEEPALIVE
142
#endif // ZMQ_HAVE_TCP_KEEPIDLE
143
144
0
#ifdef ZMQ_HAVE_TCP_KEEPINTVL
145
0
        if (keepalive_intvl_ != -1) {
146
0
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
147
0
                                 &keepalive_intvl_, sizeof (int));
148
0
            assert_success_or_recoverable (s_, rc);
149
0
            if (rc != 0)
150
0
                return rc;
151
0
        }
152
0
#endif // ZMQ_HAVE_TCP_KEEPINTVL
153
0
    }
154
0
#endif // ZMQ_HAVE_SO_KEEPALIVE
155
0
#endif // ZMQ_HAVE_WINDOWS
156
157
0
    return 0;
158
0
}
159
160
int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
161
0
{
162
0
    if (timeout_ <= 0)
163
0
        return 0;
164
165
0
    LIBZMQ_UNUSED (sockfd_);
166
167
#if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT)
168
    // msdn says it's supported in >= Vista, >= Windows Server 2003
169
    timeout_ /= 1000; // in seconds
170
    const int rc =
171
      setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
172
                  reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
173
    assert_success_or_recoverable (sockfd_, rc);
174
    return rc;
175
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
176
#elif defined(TCP_USER_TIMEOUT)
177
0
    int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
178
0
                         sizeof (timeout_));
179
0
    assert_success_or_recoverable (sockfd_, rc);
180
0
    return rc;
181
#else
182
    return 0;
183
#endif
184
0
}
185
186
int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
187
0
{
188
#ifdef ZMQ_HAVE_WINDOWS
189
190
    const int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
191
192
    //  If not a single byte can be written to the socket in non-blocking mode
193
    //  we'll get an error (this may happen during the speculative write).
194
    const int last_error = WSAGetLastError ();
195
    if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
196
        return 0;
197
198
    //  Signalise peer failure.
199
    if (nbytes == SOCKET_ERROR
200
        && (last_error == WSAENETDOWN || last_error == WSAENETRESET
201
            || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED
202
            || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET))
203
        return -1;
204
205
    //  Circumvent a Windows bug:
206
    //  See https://support.microsoft.com/en-us/kb/201213
207
    //  See https://zeromq.jira.com/browse/LIBZMQ-195
208
    if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
209
        return 0;
210
211
    wsa_assert (nbytes != SOCKET_ERROR);
212
    return nbytes;
213
214
#else
215
0
    ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0);
216
217
    //  Several errors are OK. When speculative write is being done we may not
218
    //  be able to write a single byte from the socket. Also, SIGSTOP issued
219
    //  by a debugging tool can result in EINTR error.
220
0
    if (nbytes == -1
221
0
        && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
222
0
        return 0;
223
224
    //  Signalise peer failure.
225
0
    if (nbytes == -1) {
226
0
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
227
0
        errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ
228
0
                      && errno != EFAULT && errno != EISCONN
229
0
                      && errno != EMSGSIZE && errno != ENOMEM
230
0
                      && errno != ENOTSOCK && errno != EOPNOTSUPP);
231
#else
232
        errno_assert (errno != EACCES && errno != EDESTADDRREQ
233
                      && errno != EFAULT && errno != EISCONN
234
                      && errno != EMSGSIZE && errno != ENOMEM
235
                      && errno != ENOTSOCK && errno != EOPNOTSUPP);
236
#endif
237
0
        return -1;
238
0
    }
239
240
0
    return static_cast<int> (nbytes);
241
242
0
#endif
243
0
}
244
245
int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
246
0
{
247
#ifdef ZMQ_HAVE_WINDOWS
248
249
    const int rc =
250
      recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0);
251
252
    //  If not a single byte can be read from the socket in non-blocking mode
253
    //  we'll get an error (this may happen during the speculative read).
254
    if (rc == SOCKET_ERROR) {
255
        const int last_error = WSAGetLastError ();
256
        if (last_error == WSAEWOULDBLOCK) {
257
            errno = EAGAIN;
258
        } else {
259
            wsa_assert (
260
              last_error == WSAENETDOWN || last_error == WSAENETRESET
261
              || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT
262
              || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED
263
              || last_error == WSAENOTCONN || last_error == WSAENOBUFS);
264
            errno = wsa_error_to_errno (last_error);
265
        }
266
    }
267
268
    return rc == SOCKET_ERROR ? -1 : rc;
269
270
#else
271
272
0
    const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0);
273
274
    //  Several errors are OK. When speculative read is being done we may not
275
    //  be able to read a single byte from the socket. Also, SIGSTOP issued
276
    //  by a debugging tool can result in EINTR error.
277
0
    if (rc == -1) {
278
0
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
279
0
        errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
280
0
                      && errno != ENOTSOCK);
281
#else
282
        errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK);
283
#endif
284
0
        if (errno == EWOULDBLOCK || errno == EINTR)
285
0
            errno = EAGAIN;
286
0
    }
287
288
0
    return static_cast<int> (rc);
289
290
0
#endif
291
0
}
292
293
void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
294
0
{
295
#if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH
296
    int sio_loopback_fastpath = 1;
297
    DWORD number_of_bytes_returned = 0;
298
299
    const int rc = WSAIoctl (
300
      socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
301
      sizeof sio_loopback_fastpath, NULL, 0, &number_of_bytes_returned, 0, 0);
302
303
    if (SOCKET_ERROR == rc) {
304
        const DWORD last_error = ::WSAGetLastError ();
305
306
        if (WSAEOPNOTSUPP == last_error) {
307
            // This system is not Windows 8 or Server 2012, and the call is not supported.
308
        } else {
309
            wsa_assert (false);
310
        }
311
    }
312
#else
313
0
    LIBZMQ_UNUSED (socket_);
314
0
#endif
315
0
}
316
317
void zmq::tune_tcp_busy_poll (fd_t socket_, int busy_poll_)
318
0
{
319
#if defined(ZMQ_HAVE_BUSY_POLL)
320
    if (busy_poll_ > 0) {
321
        const int rc =
322
          setsockopt (socket_, SOL_SOCKET, SO_BUSY_POLL,
323
                      reinterpret_cast<char *> (&busy_poll_), sizeof (int));
324
        assert_success_or_recoverable (socket_, rc);
325
    }
326
#else
327
0
    LIBZMQ_UNUSED (socket_);
328
0
    LIBZMQ_UNUSED (busy_poll_);
329
0
#endif
330
0
}
331
332
zmq::fd_t zmq::tcp_open_socket (const char *address_,
333
                                const zmq::options_t &options_,
334
                                bool local_,
335
                                bool fallback_to_ipv4_,
336
                                zmq::tcp_address_t *out_tcp_addr_)
337
0
{
338
    //  Convert the textual address into address structure.
339
0
    int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6);
340
0
    if (rc != 0)
341
0
        return retired_fd;
342
343
    //  Create the socket.
344
0
    fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP);
345
346
    //  IPv6 address family not supported, try automatic downgrade to IPv4.
347
0
    if (s == retired_fd && fallback_to_ipv4_
348
0
        && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT
349
0
        && options_.ipv6) {
350
0
        rc = out_tcp_addr_->resolve (address_, local_, false);
351
0
        if (rc != 0) {
352
0
            return retired_fd;
353
0
        }
354
0
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
355
0
    }
356
357
0
    if (s == retired_fd) {
358
0
        return retired_fd;
359
0
    }
360
361
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
362
    //  Switch it on in such cases.
363
0
    if (out_tcp_addr_->family () == AF_INET6)
364
0
        enable_ipv4_mapping (s);
365
366
    // Set the IP Type-Of-Service priority for this socket
367
0
    if (options_.tos != 0)
368
0
        set_ip_type_of_service (s, options_.tos);
369
370
    // Set the protocol-defined priority for this socket
371
0
    if (options_.priority != 0)
372
0
        set_socket_priority (s, options_.priority);
373
374
    // Set the socket to loopback fastpath if configured.
375
0
    if (options_.loopback_fastpath)
376
0
        tcp_tune_loopback_fast_path (s);
377
378
    // Bind the socket to a device if applicable
379
0
    if (!options_.bound_device.empty ())
380
0
        if (bind_to_device (s, options_.bound_device) == -1)
381
0
            goto setsockopt_error;
382
383
    //  Set the socket buffer limits for the underlying socket.
384
0
    if (options_.sndbuf >= 0)
385
0
        set_tcp_send_buffer (s, options_.sndbuf);
386
0
    if (options_.rcvbuf >= 0)
387
0
        set_tcp_receive_buffer (s, options_.rcvbuf);
388
389
    //  This option removes several delays caused by scheduling, interrupts and context switching.
390
0
    if (options_.busy_poll)
391
0
        tune_tcp_busy_poll (s, options_.busy_poll);
392
0
    return s;
393
394
0
setsockopt_error:
395
#ifdef ZMQ_HAVE_WINDOWS
396
    rc = closesocket (s);
397
    wsa_assert (rc != SOCKET_ERROR);
398
#else
399
0
    rc = ::close (s);
400
    errno_assert (rc == 0);
401
0
#endif
402
0
    return retired_fd;
403
0
}