Coverage Report

Created: 2025-07-18 07:00

/src/libzmq/src/ip.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "ip.hpp"
5
#include "err.hpp"
6
#include "macros.hpp"
7
#include "config.hpp"
8
#include "address.hpp"
9
10
#if !defined ZMQ_HAVE_WINDOWS
11
#include <fcntl.h>
12
#include <sys/types.h>
13
#include <sys/socket.h>
14
#include <sys/stat.h>
15
#include <netdb.h>
16
#include <netinet/in.h>
17
#include <netinet/tcp.h>
18
#include <stdlib.h>
19
#include <unistd.h>
20
21
#include <vector>
22
#else
23
#include "tcp.hpp"
24
#ifdef ZMQ_HAVE_IPC
25
#include "ipc_address.hpp"
26
// Don't try ipc if it fails once
27
namespace zmq
28
{
29
static bool try_ipc_first = true;
30
}
31
#endif
32
33
#include <direct.h>
34
35
#define rmdir rmdir_utf8
36
#define unlink unlink_utf8
37
#endif
38
39
#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
40
#include <ioctl.h>
41
#endif
42
43
#if defined ZMQ_HAVE_VXWORKS
44
#include <unistd.h>
45
#include <sockLib.h>
46
#include <ioLib.h>
47
#endif
48
49
#if defined ZMQ_HAVE_EVENTFD
50
#include <sys/eventfd.h>
51
#endif
52
53
#if defined ZMQ_HAVE_OPENPGM
54
#ifdef ZMQ_HAVE_WINDOWS
55
#define __PGM_WININT_H__
56
#endif
57
58
#include <pgm/pgm.h>
59
#endif
60
61
#ifdef __APPLE__
62
#include <TargetConditionals.h>
63
#endif
64
65
#ifndef ZMQ_HAVE_WINDOWS
66
// Acceptable temporary directory environment variables
67
static const char *tmp_env_vars[] = {
68
  "TMPDIR", "TEMPDIR", "TMP",
69
  0 // Sentinel
70
};
71
#endif
72
73
zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
74
0
{
75
0
    int rc;
76
77
    //  Setting this option result in sane behaviour when exec() functions
78
    //  are used. Old sockets are closed and don't block TCP ports etc.
79
0
#if defined ZMQ_HAVE_SOCK_CLOEXEC
80
0
    type_ |= SOCK_CLOEXEC;
81
0
#endif
82
83
#if defined ZMQ_HAVE_WINDOWS && defined WSA_FLAG_NO_HANDLE_INHERIT
84
    // if supported, create socket with WSA_FLAG_NO_HANDLE_INHERIT, such that
85
    // the race condition in making it non-inheritable later is avoided
86
    const fd_t s = WSASocket (domain_, type_, protocol_, NULL, 0,
87
                              WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
88
#else
89
0
    const fd_t s = socket (domain_, type_, protocol_);
90
0
#endif
91
0
    if (s == retired_fd) {
92
#ifdef ZMQ_HAVE_WINDOWS
93
        errno = wsa_error_to_errno (WSAGetLastError ());
94
#endif
95
0
        return retired_fd;
96
0
    }
97
98
0
    make_socket_noninheritable (s);
99
100
    //  Socket is not yet connected so EINVAL is not a valid networking error
101
0
    rc = zmq::set_nosigpipe (s);
102
0
    errno_assert (rc == 0);
103
104
0
    return s;
105
0
}
106
107
void zmq::unblock_socket (fd_t s_)
108
6.79k
{
109
#if defined ZMQ_HAVE_WINDOWS
110
    u_long nonblock = 1;
111
    const int rc = ioctlsocket (s_, FIONBIO, &nonblock);
112
    wsa_assert (rc != SOCKET_ERROR);
113
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
114
    int nonblock = 1;
115
    int rc = ioctl (s_, FIONBIO, &nonblock);
116
    errno_assert (rc != -1);
117
#else
118
6.79k
    int flags = fcntl (s_, F_GETFL, 0);
119
6.79k
    if (flags == -1)
120
0
        flags = 0;
121
6.79k
    int rc = fcntl (s_, F_SETFL, flags | O_NONBLOCK);
122
6.79k
    errno_assert (rc != -1);
123
6.79k
#endif
124
6.79k
}
125
126
void zmq::enable_ipv4_mapping (fd_t s_)
127
0
{
128
0
    LIBZMQ_UNUSED (s_);
129
130
0
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD                           \
131
0
  && !defined ZMQ_HAVE_DRAGONFLY
132
#ifdef ZMQ_HAVE_WINDOWS
133
    DWORD flag = 0;
134
#else
135
0
    int flag = 0;
136
0
#endif
137
0
    const int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
138
0
                               reinterpret_cast<char *> (&flag), sizeof (flag));
139
#ifdef ZMQ_HAVE_WINDOWS
140
    wsa_assert (rc != SOCKET_ERROR);
141
#else
142
0
    errno_assert (rc == 0);
143
0
#endif
144
0
#endif
145
0
}
146
147
int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
148
0
{
149
0
    struct sockaddr_storage ss;
150
151
0
    const zmq_socklen_t addrlen =
152
0
      get_socket_address (sockfd_, socket_end_remote, &ss);
153
154
0
    if (addrlen == 0) {
155
#ifdef ZMQ_HAVE_WINDOWS
156
        const int last_error = WSAGetLastError ();
157
        wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT
158
                    && last_error != WSAEINPROGRESS
159
                    && last_error != WSAENOTSOCK);
160
#elif !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
161
0
        errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK);
162
#else
163
        errno_assert (errno != EFAULT && errno != ENOTSOCK);
164
#endif
165
0
        return 0;
166
0
    }
167
168
0
    char host[NI_MAXHOST];
169
0
    const int rc =
170
0
      getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host,
171
0
                   sizeof host, NULL, 0, NI_NUMERICHOST);
172
0
    if (rc != 0)
173
0
        return 0;
174
175
0
    ip_addr_ = host;
176
177
0
    union
178
0
    {
179
0
        struct sockaddr sa;
180
0
        struct sockaddr_storage sa_stor;
181
0
    } u;
182
183
0
    u.sa_stor = ss;
184
0
    return static_cast<int> (u.sa.sa_family);
185
0
}
186
187
void zmq::set_ip_type_of_service (fd_t s_, int iptos_)
188
0
{
189
0
    int rc = setsockopt (s_, IPPROTO_IP, IP_TOS,
190
0
                         reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
191
192
#ifdef ZMQ_HAVE_WINDOWS
193
    wsa_assert (rc != SOCKET_ERROR);
194
#else
195
0
    errno_assert (rc == 0);
196
0
#endif
197
198
    //  Windows and Hurd do not support IPV6_TCLASS
199
0
#if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS)
200
0
    rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS,
201
0
                     reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
202
203
    //  If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and
204
    //  EINVAL on OSX
205
0
    if (rc == -1) {
206
0
        errno_assert (errno == ENOPROTOOPT || errno == EINVAL);
207
0
    }
208
0
#endif
209
0
}
210
211
void zmq::set_socket_priority (fd_t s_, int priority_)
212
0
{
213
0
#ifdef ZMQ_HAVE_SO_PRIORITY
214
0
    int rc =
215
0
      setsockopt (s_, SOL_SOCKET, SO_PRIORITY,
216
0
                  reinterpret_cast<char *> (&priority_), sizeof (priority_));
217
0
    errno_assert (rc == 0);
218
#else
219
    LIBZMQ_UNUSED (s_);
220
    LIBZMQ_UNUSED (priority_);
221
#endif
222
0
}
223
224
int zmq::set_nosigpipe (fd_t s_)
225
0
{
226
#ifdef SO_NOSIGPIPE
227
    //  Make sure that SIGPIPE signal is not generated when writing to a
228
    //  connection that was already closed by the peer.
229
    //  As per POSIX spec, EINVAL will be returned if the socket was valid but
230
    //  the connection has been reset by the peer. Return an error so that the
231
    //  socket can be closed and the connection retried if necessary.
232
    int set = 1;
233
    int rc = setsockopt (s_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
234
    if (rc != 0 && errno == EINVAL)
235
        return -1;
236
    errno_assert (rc == 0);
237
#else
238
0
    LIBZMQ_UNUSED (s_);
239
0
#endif
240
241
0
    return 0;
242
0
}
243
244
int zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
245
0
{
246
0
#ifdef ZMQ_HAVE_SO_BINDTODEVICE
247
0
    int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
248
0
                         bound_device_.c_str (), bound_device_.length ());
249
0
    if (rc != 0) {
250
0
        assert_success_or_recoverable (s_, rc);
251
0
        return -1;
252
0
    }
253
0
    return 0;
254
255
#else
256
    LIBZMQ_UNUSED (s_);
257
    LIBZMQ_UNUSED (bound_device_);
258
259
    errno = ENOTSUP;
260
    return -1;
261
#endif
262
0
}
263
264
bool zmq::initialize_network ()
265
849
{
266
#if defined ZMQ_HAVE_OPENPGM
267
268
    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
269
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
270
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
271
    //  PGM_SLEEP to "USLEEP".
272
    pgm_error_t *pgm_error = NULL;
273
    const bool ok = pgm_init (&pgm_error);
274
    if (ok != TRUE) {
275
        //  Invalid parameters don't set pgm_error_t
276
        zmq_assert (pgm_error != NULL);
277
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
278
            && (pgm_error->code == PGM_ERROR_FAILED)) {
279
            //  Failed to access RTC or HPET device.
280
            pgm_error_free (pgm_error);
281
            errno = EINVAL;
282
            return false;
283
        }
284
285
        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
286
        zmq_assert (false);
287
    }
288
#endif
289
290
#ifdef ZMQ_HAVE_WINDOWS
291
    //  Initialise Windows sockets. Note that WSAStartup can be called multiple
292
    //  times given that WSACleanup will be called for each WSAStartup.
293
294
    const WORD version_requested = MAKEWORD (2, 2);
295
    WSADATA wsa_data;
296
    const int rc = WSAStartup (version_requested, &wsa_data);
297
    zmq_assert (rc == 0);
298
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2
299
                && HIBYTE (wsa_data.wVersion) == 2);
300
#endif
301
302
849
    return true;
303
849
}
304
305
void zmq::shutdown_network ()
306
849
{
307
#ifdef ZMQ_HAVE_WINDOWS
308
    //  On Windows, uninitialise socket layer.
309
    const int rc = WSACleanup ();
310
    wsa_assert (rc != SOCKET_ERROR);
311
#endif
312
313
#if defined ZMQ_HAVE_OPENPGM
314
    //  Shut down the OpenPGM library.
315
    if (pgm_shutdown () != TRUE)
316
        zmq_assert (false);
317
#endif
318
849
}
319
320
#if defined ZMQ_HAVE_WINDOWS
321
static void tune_socket (const SOCKET socket_)
322
{
323
    BOOL tcp_nodelay = 1;
324
    const int rc =
325
      setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
326
                  reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
327
    wsa_assert (rc != SOCKET_ERROR);
328
329
    zmq::tcp_tune_loopback_fast_path (socket_);
330
}
331
332
static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
333
{
334
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
335
    //  Windows CE does not manage security attributes
336
    SECURITY_DESCRIPTOR sd;
337
    SECURITY_ATTRIBUTES sa;
338
    memset (&sd, 0, sizeof sd);
339
    memset (&sa, 0, sizeof sa);
340
341
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
342
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
343
344
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
345
    sa.lpSecurityDescriptor = &sd;
346
#endif
347
348
    //  This function has to be in a system-wide critical section so that
349
    //  two instances of the library don't accidentally create signaler
350
    //  crossing the process boundary.
351
    //  We'll use named event object to implement the critical section.
352
    //  Note that if the event object already exists, the CreateEvent requests
353
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
354
    //  the event object asking for SYNCHRONIZE access only.
355
    HANDLE sync = NULL;
356
357
    //  Create critical section only if using fixed signaler port
358
    //  Use problematic Event implementation for compatibility if using old port 5905.
359
    //  Otherwise use Mutex implementation.
360
    const int event_signaler_port = 5905;
361
362
    if (zmq::signaler_port == event_signaler_port) {
363
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
364
        sync =
365
          CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
366
#else
367
        sync =
368
          CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
369
#endif
370
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
371
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
372
                               L"Global\\zmq-signaler-port-sync");
373
374
        win_assert (sync != NULL);
375
    } else if (zmq::signaler_port != 0) {
376
        wchar_t mutex_name[MAX_PATH];
377
#ifdef __MINGW32__
378
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
379
                    zmq::signaler_port);
380
#else
381
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
382
                  zmq::signaler_port);
383
#endif
384
385
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
386
        sync = CreateMutexW (&sa, FALSE, mutex_name);
387
#else
388
        sync = CreateMutexW (NULL, FALSE, mutex_name);
389
#endif
390
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
391
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
392
393
        win_assert (sync != NULL);
394
    }
395
396
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
397
    //  handles cannot be polled on. Here we create the socketpair by hand.
398
    *w_ = INVALID_SOCKET;
399
    *r_ = INVALID_SOCKET;
400
401
    //  Create listening socket.
402
    SOCKET listener;
403
    listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
404
    wsa_assert (listener != INVALID_SOCKET);
405
406
    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
407
    BOOL so_reuseaddr = 1;
408
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
409
                         reinterpret_cast<char *> (&so_reuseaddr),
410
                         sizeof so_reuseaddr);
411
    wsa_assert (rc != SOCKET_ERROR);
412
413
    tune_socket (listener);
414
415
    //  Init sockaddr to signaler port.
416
    struct sockaddr_in addr;
417
    memset (&addr, 0, sizeof addr);
418
    addr.sin_family = AF_INET;
419
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
420
    addr.sin_port = htons (zmq::signaler_port);
421
422
    //  Create the writer socket.
423
    *w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
424
    wsa_assert (*w_ != INVALID_SOCKET);
425
426
    if (sync != NULL) {
427
        //  Enter the critical section.
428
        const DWORD dwrc = WaitForSingleObject (sync, INFINITE);
429
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
430
    }
431
432
    //  Bind listening socket to signaler port.
433
    rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
434
               sizeof addr);
435
436
    if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
437
        //  Retrieve ephemeral port number
438
        int addrlen = sizeof addr;
439
        rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
440
                          &addrlen);
441
    }
442
443
    //  Listen for incoming connections.
444
    if (rc != SOCKET_ERROR) {
445
        rc = listen (listener, 1);
446
    }
447
448
    //  Connect writer to the listener.
449
    if (rc != SOCKET_ERROR) {
450
        rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
451
                      sizeof addr);
452
    }
453
454
    //  Accept connection from writer.
455
    if (rc != SOCKET_ERROR) {
456
        //  Set TCP_NODELAY on writer socket.
457
        tune_socket (*w_);
458
459
        *r_ = accept (listener, NULL, NULL);
460
    }
461
462
    //  Send/receive large chunk to work around TCP slow start
463
    //  This code is a workaround for #1608
464
    if (*r_ != INVALID_SOCKET) {
465
        const size_t dummy_size =
466
          1024 * 1024; //  1M to overload default receive buffer
467
        unsigned char *dummy =
468
          static_cast<unsigned char *> (malloc (dummy_size));
469
        wsa_assert (dummy);
470
471
        int still_to_send = static_cast<int> (dummy_size);
472
        int still_to_recv = static_cast<int> (dummy_size);
473
        while (still_to_send || still_to_recv) {
474
            int nbytes;
475
            if (still_to_send > 0) {
476
                nbytes = ::send (
477
                  *w_,
478
                  reinterpret_cast<char *> (dummy + dummy_size - still_to_send),
479
                  still_to_send, 0);
480
                wsa_assert (nbytes != SOCKET_ERROR);
481
                still_to_send -= nbytes;
482
            }
483
            nbytes = ::recv (
484
              *r_,
485
              reinterpret_cast<char *> (dummy + dummy_size - still_to_recv),
486
              still_to_recv, 0);
487
            wsa_assert (nbytes != SOCKET_ERROR);
488
            still_to_recv -= nbytes;
489
        }
490
        free (dummy);
491
    }
492
493
    //  Save errno if error occurred in bind/listen/connect/accept.
494
    int saved_errno = 0;
495
    if (*r_ == INVALID_SOCKET)
496
        saved_errno = WSAGetLastError ();
497
498
    //  We don't need the listening socket anymore. Close it.
499
    rc = closesocket (listener);
500
    wsa_assert (rc != SOCKET_ERROR);
501
502
    if (sync != NULL) {
503
        //  Exit the critical section.
504
        BOOL brc;
505
        if (zmq::signaler_port == event_signaler_port)
506
            brc = SetEvent (sync);
507
        else
508
            brc = ReleaseMutex (sync);
509
        win_assert (brc != 0);
510
511
        //  Release the kernel object
512
        brc = CloseHandle (sync);
513
        win_assert (brc != 0);
514
    }
515
516
    if (*r_ != INVALID_SOCKET) {
517
        zmq::make_socket_noninheritable (*r_);
518
        return 0;
519
    }
520
    //  Cleanup writer if connection failed
521
    if (*w_ != INVALID_SOCKET) {
522
        rc = closesocket (*w_);
523
        wsa_assert (rc != SOCKET_ERROR);
524
        *w_ = INVALID_SOCKET;
525
    }
526
    //  Set errno from saved value
527
    errno = zmq::wsa_error_to_errno (saved_errno);
528
    return -1;
529
}
530
#endif
531
532
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
533
3.39k
{
534
3.39k
#if defined ZMQ_HAVE_EVENTFD
535
3.39k
    int flags = 0;
536
3.39k
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
537
    //  Setting this option result in sane behaviour when exec() functions
538
    //  are used. Old sockets are closed and don't block TCP ports, avoid
539
    //  leaks, etc.
540
3.39k
    flags |= EFD_CLOEXEC;
541
3.39k
#endif
542
3.39k
    fd_t fd = eventfd (0, flags);
543
3.39k
    if (fd == -1) {
544
0
        errno_assert (errno == ENFILE || errno == EMFILE);
545
0
        *w_ = *r_ = -1;
546
0
        return -1;
547
0
    }
548
3.39k
    *w_ = *r_ = fd;
549
3.39k
    return 0;
550
551
552
#elif defined ZMQ_HAVE_WINDOWS
553
#ifdef ZMQ_HAVE_IPC
554
    ipc_address_t address;
555
    std::string dirname, filename;
556
    sockaddr_un lcladdr;
557
    socklen_t lcladdr_len = sizeof lcladdr;
558
    int rc = 0;
559
    int saved_errno = 0;
560
    SOCKET listener = INVALID_SOCKET;
561
562
    // It appears that a lack of runtime AF_UNIX support
563
    // can fail in more than one way.
564
    // At least: open_socket can fail or later in bind or even in connect after bind
565
    bool ipc_fallback_on_tcpip = true;
566
567
    if (!zmq::try_ipc_first) {
568
        // a past ipc attempt failed, skip straight to try_tcpip in the future;
569
        goto try_tcpip;
570
    }
571
572
    //  Create a listening socket.
573
    listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
574
    if (listener == retired_fd) {
575
        //  This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
576
        goto try_tcpip;
577
    }
578
579
    rc = create_ipc_wildcard_address (dirname, filename);
580
    if (rc != 0) {
581
        // This may happen if tmpfile creation fails
582
        goto error_closelistener;
583
    }
584
585
    //  Initialise the address structure.
586
    rc = address.resolve (filename.c_str ());
587
    if (rc != 0) {
588
        goto error_closelistener;
589
    }
590
591
    //  Bind the socket to the file path.
592
    rc = bind (listener, const_cast<sockaddr *> (address.addr ()),
593
               address.addrlen ());
594
    if (rc != 0) {
595
        errno = wsa_error_to_errno (WSAGetLastError ());
596
        goto error_closelistener;
597
    }
598
    // if we got here, ipc should be working,
599
    // but there are at least some cases where connect can still fail
600
601
    //  Listen for incoming connections.
602
    rc = listen (listener, 1);
603
    if (rc != 0) {
604
        errno = wsa_error_to_errno (WSAGetLastError ());
605
        goto error_closelistener;
606
    }
607
608
    rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
609
                      &lcladdr_len);
610
    wsa_assert (rc == 0);
611
612
    //  Create the client socket.
613
    *w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
614
    if (*w_ == retired_fd) {
615
        errno = wsa_error_to_errno (WSAGetLastError ());
616
        goto error_closelistener;
617
    }
618
619
    //  Connect to the remote peer.
620
    rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
621
                    lcladdr_len);
622
    if (rc != 0) {
623
        errno = wsa_error_to_errno (WSAGetLastError ());
624
        goto error_closeclient;
625
    }
626
    // if we got here, ipc should be working,
627
    // so raise any remaining errors
628
    ipc_fallback_on_tcpip = false;
629
630
    *r_ = accept (listener, NULL, NULL);
631
    wsa_assert (*r_ != retired_fd);
632
633
    //  Close the listener socket, we don't need it anymore.
634
    rc = closesocket (listener);
635
    wsa_assert (rc == 0);
636
637
    //  Cleanup temporary socket file descriptor
638
    if (!filename.empty ()) {
639
        rc = ::unlink (filename.c_str ());
640
        if ((rc == 0) && !dirname.empty ()) {
641
            rc = ::rmdir (dirname.c_str ());
642
            dirname.clear ();
643
        }
644
        filename.clear ();
645
    }
646
647
    return 0;
648
649
error_closeclient:
650
    saved_errno = errno;
651
    rc = closesocket (*w_);
652
    wsa_assert (rc == 0);
653
    *w_ = retired_fd;
654
    errno = saved_errno;
655
656
error_closelistener:
657
    saved_errno = errno;
658
    rc = closesocket (listener);
659
    wsa_assert (rc == 0);
660
661
    //  Cleanup temporary socket file descriptor
662
    if (!filename.empty ()) {
663
        rc = ::unlink (filename.c_str ());
664
        if ((rc == 0) && !dirname.empty ()) {
665
            rc = ::rmdir (dirname.c_str ());
666
            dirname.clear ();
667
        }
668
        filename.clear ();
669
    }
670
671
    // ipc failed due to lack of AF_UNIX support, fallback on tcpip
672
    if (ipc_fallback_on_tcpip) {
673
        goto try_tcpip;
674
    }
675
676
    errno = saved_errno;
677
    return -1;
678
679
try_tcpip:
680
    // try to fallback to TCP/IP
681
    rc = make_fdpair_tcpip (r_, w_);
682
    if (rc == 0 && zmq::try_ipc_first) {
683
        // ipc didn't work but tcp/ip did; skip ipc in the future
684
        zmq::try_ipc_first = false;
685
    }
686
    return rc;
687
#endif // ZMQ_HAVE_IPC
688
    return make_fdpair_tcpip (r_, w_);
689
#elif defined ZMQ_HAVE_OPENVMS
690
691
    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
692
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
693
    //  can lead to performance problems.
694
    //
695
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
696
    //  create the socket pair manually.
697
    struct sockaddr_in lcladdr;
698
    memset (&lcladdr, 0, sizeof lcladdr);
699
    lcladdr.sin_family = AF_INET;
700
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
701
    lcladdr.sin_port = 0;
702
703
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
704
    errno_assert (listener != -1);
705
706
    int on = 1;
707
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
708
    errno_assert (rc != -1);
709
710
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
711
    errno_assert (rc != -1);
712
713
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
714
    errno_assert (rc != -1);
715
716
    socklen_t lcladdr_len = sizeof lcladdr;
717
718
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
719
    errno_assert (rc != -1);
720
721
    rc = listen (listener, 1);
722
    errno_assert (rc != -1);
723
724
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
725
    errno_assert (*w_ != -1);
726
727
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
728
    errno_assert (rc != -1);
729
730
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
731
    errno_assert (rc != -1);
732
733
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
734
    errno_assert (rc != -1);
735
736
    *r_ = accept (listener, NULL, NULL);
737
    errno_assert (*r_ != -1);
738
739
    close (listener);
740
741
    return 0;
742
#elif defined ZMQ_HAVE_VXWORKS
743
    struct sockaddr_in lcladdr;
744
    memset (&lcladdr, 0, sizeof lcladdr);
745
    lcladdr.sin_family = AF_INET;
746
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
747
    lcladdr.sin_port = 0;
748
749
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
750
    errno_assert (listener != -1);
751
752
    int on = 1;
753
    int rc =
754
      setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
755
    errno_assert (rc != -1);
756
757
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
758
    errno_assert (rc != -1);
759
760
    socklen_t lcladdr_len = sizeof lcladdr;
761
762
    rc = getsockname (listener, (struct sockaddr *) &lcladdr,
763
                      (int *) &lcladdr_len);
764
    errno_assert (rc != -1);
765
766
    rc = listen (listener, 1);
767
    errno_assert (rc != -1);
768
769
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
770
    errno_assert (*w_ != -1);
771
772
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
773
    errno_assert (rc != -1);
774
775
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
776
    errno_assert (rc != -1);
777
778
    *r_ = accept (listener, NULL, NULL);
779
    errno_assert (*r_ != -1);
780
781
    close (listener);
782
783
    return 0;
784
#else
785
    // All other implementations support socketpair()
786
    int sv[2];
787
    int type = SOCK_STREAM;
788
    //  Setting this option result in sane behaviour when exec() functions
789
    //  are used. Old sockets are closed and don't block TCP ports, avoid
790
    //  leaks, etc.
791
#if defined ZMQ_HAVE_SOCK_CLOEXEC
792
    type |= SOCK_CLOEXEC;
793
#endif
794
    int rc = socketpair (AF_UNIX, type, 0, sv);
795
    if (rc == -1) {
796
        errno_assert (errno == ENFILE || errno == EMFILE);
797
        *w_ = *r_ = -1;
798
        return -1;
799
    } else {
800
        make_socket_noninheritable (sv[0]);
801
        make_socket_noninheritable (sv[1]);
802
803
        *w_ = sv[0];
804
        *r_ = sv[1];
805
        return 0;
806
    }
807
#endif
808
3.39k
}
809
810
void zmq::make_socket_noninheritable (fd_t sock_)
811
0
{
812
#if defined ZMQ_HAVE_WINDOWS && !defined _WIN32_WCE                            \
813
  && !defined ZMQ_HAVE_WINDOWS_UWP
814
    //  On Windows, preventing sockets to be inherited by child processes.
815
    const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
816
                                           HANDLE_FLAG_INHERIT, 0);
817
    win_assert (brc);
818
#elif (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4)                \
819
  && defined FD_CLOEXEC
820
    //  If there 's no SOCK_CLOEXEC, let's try the second best option.
821
    //  Race condition can cause socket not to be closed (if fork happens
822
    //  between accept and this point).
823
    const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
824
    errno_assert (rc != -1);
825
#else
826
0
    LIBZMQ_UNUSED (sock_);
827
0
#endif
828
0
}
829
830
void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
831
0
{
832
#ifdef ZMQ_HAVE_WINDOWS
833
    if (rc_ != SOCKET_ERROR) {
834
        return;
835
    }
836
#else
837
0
    if (rc_ != -1) {
838
0
        return;
839
0
    }
840
0
#endif
841
842
    //  Check whether an error occurred
843
0
    int err = 0;
844
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
845
    int len = sizeof err;
846
#else
847
0
    socklen_t len = sizeof err;
848
0
#endif
849
850
0
    const int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
851
0
                               reinterpret_cast<char *> (&err), &len);
852
853
    //  Assert if the error was caused by 0MQ bug.
854
    //  Networking problems are OK. No need to assert.
855
#ifdef ZMQ_HAVE_WINDOWS
856
    zmq_assert (rc == 0);
857
    if (err != 0) {
858
        wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
859
                    || err == WSAECONNABORTED || err == WSAEINTR
860
                    || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
861
                    || err == WSAENETUNREACH || err == WSAENETDOWN
862
                    || err == WSAENETRESET || err == WSAEACCES
863
                    || err == WSAEINVAL || err == WSAEADDRINUSE);
864
    }
865
#else
866
    //  Following code should handle both Berkeley-derived socket
867
    //  implementations and Solaris.
868
0
    if (rc == -1)
869
0
        err = errno;
870
0
    if (err != 0) {
871
0
        errno = err;
872
0
        errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
873
0
                      || errno == ECONNABORTED || errno == EINTR
874
0
                      || errno == ETIMEDOUT || errno == EHOSTUNREACH
875
0
                      || errno == ENETUNREACH || errno == ENETDOWN
876
0
                      || errno == ENETRESET || errno == EINVAL);
877
0
    }
878
0
#endif
879
0
}
880
881
#ifdef ZMQ_HAVE_IPC
882
883
#if defined ZMQ_HAVE_WINDOWS
884
char *widechar_to_utf8 (const wchar_t *widestring)
885
{
886
    int nch, n;
887
    char *utf8 = 0;
888
    nch = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, 0, 0, NULL, NULL);
889
    if (nch > 0) {
890
        utf8 = (char *) malloc ((nch + 1) * sizeof (char));
891
        n = WideCharToMultiByte (CP_UTF8, 0, widestring, -1, utf8, nch, NULL,
892
                                 NULL);
893
        utf8[nch] = 0;
894
    }
895
    return utf8;
896
}
897
#endif
898
899
int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
900
0
{
901
#if defined ZMQ_HAVE_WINDOWS
902
    wchar_t buffer[MAX_PATH];
903
904
    {
905
        const errno_t rc = _wtmpnam_s (buffer);
906
        errno_assert (rc == 0);
907
    }
908
909
    // TODO or use CreateDirectoryA and specify permissions?
910
    const int rc = _wmkdir (buffer);
911
    if (rc != 0) {
912
        return -1;
913
    }
914
915
    char *tmp = widechar_to_utf8 (buffer);
916
    if (tmp == 0) {
917
        return -1;
918
    }
919
920
    path_.assign (tmp);
921
    file_ = path_ + "/socket";
922
923
    free (tmp);
924
#else
925
0
    std::string tmp_path;
926
927
    // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
928
    // the socket directory there.
929
0
    const char **tmp_env = tmp_env_vars;
930
0
    while (tmp_path.empty () && *tmp_env != 0) {
931
0
        const char *const tmpdir = getenv (*tmp_env);
932
0
        struct stat statbuf;
933
934
        // Confirm it is actually a directory before trying to use
935
0
        if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
936
0
            && S_ISDIR (statbuf.st_mode)) {
937
0
            tmp_path.assign (tmpdir);
938
0
            if (*(tmp_path.rbegin ()) != '/') {
939
0
                tmp_path.push_back ('/');
940
0
            }
941
0
        }
942
943
        // Try the next environment variable
944
0
        ++tmp_env;
945
0
    }
946
947
    // Append a directory name
948
0
    tmp_path.append ("tmpXXXXXX");
949
950
    // We need room for tmp_path + trailing NUL
951
0
    std::vector<char> buffer (tmp_path.length () + 1);
952
0
    memcpy (&buffer[0], tmp_path.c_str (), tmp_path.length () + 1);
953
954
0
#if defined HAVE_MKDTEMP
955
    // Create the directory.  POSIX requires that mkdtemp() creates the
956
    // directory with 0700 permissions, meaning the only possible race
957
    // with socket creation could be the same user.  However, since
958
    // each socket is created in a directory created by mkdtemp(), and
959
    // mkdtemp() guarantees a unique directory name, there will be no
960
    // collision.
961
0
    if (mkdtemp (&buffer[0]) == 0) {
962
0
        return -1;
963
0
    }
964
965
0
    path_.assign (&buffer[0]);
966
0
    file_ = path_ + "/socket";
967
#else
968
    LIBZMQ_UNUSED (path_);
969
    int fd = mkstemp (&buffer[0]);
970
    if (fd == -1)
971
        return -1;
972
    ::close (fd);
973
974
    file_.assign (&buffer[0]);
975
#endif
976
0
#endif
977
978
0
    return 0;
979
0
}
980
#endif