Coverage Report

Created: 2025-07-11 06:23

/src/libzmq/src/tcp_connecter.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <new>
5
#include <string>
6
7
#include "macros.hpp"
8
#include "tcp_connecter.hpp"
9
#include "io_thread.hpp"
10
#include "err.hpp"
11
#include "ip.hpp"
12
#include "tcp.hpp"
13
#include "address.hpp"
14
#include "tcp_address.hpp"
15
#include "session_base.hpp"
16
17
#if !defined ZMQ_HAVE_WINDOWS
18
#include <unistd.h>
19
#include <sys/types.h>
20
#include <sys/socket.h>
21
#include <arpa/inet.h>
22
#include <netinet/tcp.h>
23
#include <netinet/in.h>
24
#include <netdb.h>
25
#include <fcntl.h>
26
#ifdef ZMQ_HAVE_VXWORKS
27
#include <sockLib.h>
28
#endif
29
#ifdef ZMQ_HAVE_OPENVMS
30
#include <ioctl.h>
31
#endif
32
#endif
33
34
#ifdef __APPLE__
35
#include <TargetConditionals.h>
36
#endif
37
38
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
39
                                       class session_base_t *session_,
40
                                       const options_t &options_,
41
                                       address_t *addr_,
42
                                       bool delayed_start_) :
43
0
    stream_connecter_base_t (
44
0
      io_thread_, session_, options_, addr_, delayed_start_),
45
0
    _connect_timer_started (false)
46
0
{
47
0
    zmq_assert (_addr->protocol == protocol_name::tcp);
48
0
}
49
50
zmq::tcp_connecter_t::~tcp_connecter_t ()
51
0
{
52
0
    zmq_assert (!_connect_timer_started);
53
0
}
54
55
void zmq::tcp_connecter_t::process_term (int linger_)
56
0
{
57
0
    if (_connect_timer_started) {
58
0
        cancel_timer (connect_timer_id);
59
0
        _connect_timer_started = false;
60
0
    }
61
62
0
    stream_connecter_base_t::process_term (linger_);
63
0
}
64
65
void zmq::tcp_connecter_t::out_event ()
66
0
{
67
0
    if (_connect_timer_started) {
68
0
        cancel_timer (connect_timer_id);
69
0
        _connect_timer_started = false;
70
0
    }
71
72
    //  TODO this is still very similar to (t)ipc_connecter_t, maybe the
73
    //  differences can be factored out
74
75
0
    rm_handle ();
76
77
0
    const fd_t fd = connect ();
78
79
0
    if (fd == retired_fd
80
0
        && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
81
0
            && errno == ECONNREFUSED)) {
82
0
        send_conn_failed (_session);
83
0
        close ();
84
0
        terminate ();
85
0
        return;
86
0
    }
87
88
    //  Handle the error condition by attempt to reconnect.
89
0
    if (fd == retired_fd || !tune_socket (fd)) {
90
0
        close ();
91
0
        add_reconnect_timer ();
92
0
        return;
93
0
    }
94
95
0
    create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
96
0
}
97
98
void zmq::tcp_connecter_t::timer_event (int id_)
99
0
{
100
0
    if (id_ == connect_timer_id) {
101
0
        _connect_timer_started = false;
102
0
        rm_handle ();
103
0
        close ();
104
0
        add_reconnect_timer ();
105
0
    } else
106
0
        stream_connecter_base_t::timer_event (id_);
107
0
}
108
109
void zmq::tcp_connecter_t::start_connecting ()
110
0
{
111
    //  Open the connecting socket.
112
0
    const int rc = open ();
113
114
    //  Connect may succeed in synchronous manner.
115
0
    if (rc == 0) {
116
0
        _handle = add_fd (_s);
117
0
        out_event ();
118
0
    }
119
120
    //  Connection establishment may be delayed. Poll for its completion.
121
0
    else if (rc == -1 && errno == EINPROGRESS) {
122
0
        _handle = add_fd (_s);
123
0
        set_pollout (_handle);
124
0
        _socket->event_connect_delayed (
125
0
          make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
126
127
        //  add userspace connect timeout
128
0
        add_connect_timer ();
129
0
    }
130
131
    //  Handle any other error condition by eventual reconnect.
132
0
    else {
133
0
        if (_s != retired_fd)
134
0
            close ();
135
0
        add_reconnect_timer ();
136
0
    }
137
0
}
138
139
void zmq::tcp_connecter_t::add_connect_timer ()
140
0
{
141
0
    if (options.connect_timeout > 0) {
142
0
        add_timer (options.connect_timeout, connect_timer_id);
143
0
        _connect_timer_started = true;
144
0
    }
145
0
}
146
147
int zmq::tcp_connecter_t::open ()
148
0
{
149
0
    zmq_assert (_s == retired_fd);
150
151
    //  Resolve the address
152
0
    if (_addr->resolved.tcp_addr != NULL) {
153
0
        LIBZMQ_DELETE (_addr->resolved.tcp_addr);
154
0
    }
155
156
0
    _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
157
0
    alloc_assert (_addr->resolved.tcp_addr);
158
0
    _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
159
0
                          _addr->resolved.tcp_addr);
160
0
    if (_s == retired_fd) {
161
        //  TODO we should emit some event in this case!
162
163
0
        LIBZMQ_DELETE (_addr->resolved.tcp_addr);
164
0
        return -1;
165
0
    }
166
0
    zmq_assert (_addr->resolved.tcp_addr != NULL);
167
168
    // Set the socket to non-blocking mode so that we get async connect().
169
0
    unblock_socket (_s);
170
171
0
    const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
172
173
0
    int rc;
174
175
    // Set a source address for conversations
176
0
    if (tcp_addr->has_src_addr ()) {
177
        //  Allow reusing of the address, to connect to different servers
178
        //  using the same source port on the client.
179
0
        int flag = 1;
180
#ifdef ZMQ_HAVE_WINDOWS
181
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
182
                         reinterpret_cast<const char *> (&flag), sizeof (int));
183
        wsa_assert (rc != SOCKET_ERROR);
184
#elif defined ZMQ_HAVE_VXWORKS
185
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
186
                         sizeof (int));
187
        errno_assert (rc == 0);
188
#else
189
0
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
190
0
        errno_assert (rc == 0);
191
0
#endif
192
193
#if defined ZMQ_HAVE_VXWORKS
194
        rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
195
                     tcp_addr->src_addrlen ());
196
#else
197
0
        rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
198
0
#endif
199
0
        if (rc == -1)
200
0
            return -1;
201
0
    }
202
203
    //  Connect to the remote peer.
204
#if defined ZMQ_HAVE_VXWORKS
205
    rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
206
#else
207
0
    rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
208
0
#endif
209
    //  Connect was successful immediately.
210
0
    if (rc == 0) {
211
0
        return 0;
212
0
    }
213
214
    //  Translate error codes indicating asynchronous connect has been
215
    //  launched to a uniform EINPROGRESS.
216
#ifdef ZMQ_HAVE_WINDOWS
217
    const int last_error = WSAGetLastError ();
218
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
219
        errno = EINPROGRESS;
220
    else
221
        errno = wsa_error_to_errno (last_error);
222
#else
223
0
    if (errno == EINTR)
224
0
        errno = EINPROGRESS;
225
0
#endif
226
0
    return -1;
227
0
}
228
229
zmq::fd_t zmq::tcp_connecter_t::connect ()
230
0
{
231
    //  Async connect has finished. Check whether an error occurred
232
0
    int err = 0;
233
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
234
    int len = sizeof err;
235
#else
236
0
    socklen_t len = sizeof err;
237
0
#endif
238
239
0
    const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
240
0
                               reinterpret_cast<char *> (&err), &len);
241
242
    //  Assert if the error was caused by 0MQ bug.
243
    //  Networking problems are OK. No need to assert.
244
#ifdef ZMQ_HAVE_WINDOWS
245
    zmq_assert (rc == 0);
246
    if (err != 0) {
247
        if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
248
            || err == WSAENOBUFS) {
249
            wsa_assert_no (err);
250
        }
251
        errno = wsa_error_to_errno (err);
252
        return retired_fd;
253
    }
254
#else
255
    //  Following code should handle both Berkeley-derived socket
256
    //  implementations and Solaris.
257
0
    if (rc == -1)
258
0
        err = errno;
259
0
    if (err != 0) {
260
0
        errno = err;
261
0
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
262
0
        errno_assert (errno != EBADF && errno != ENOPROTOOPT
263
0
                      && errno != ENOTSOCK && errno != ENOBUFS);
264
#else
265
        errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
266
                      && errno != ENOBUFS);
267
#endif
268
0
        return retired_fd;
269
0
    }
270
0
#endif
271
272
    //  Return the newly connected socket.
273
0
    const fd_t result = _s;
274
0
    _s = retired_fd;
275
0
    return result;
276
0
}
277
278
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
279
0
{
280
0
    const int rc = tune_tcp_socket (fd_)
281
0
                   | tune_tcp_keepalives (
282
0
                     fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
283
0
                     options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
284
0
                   | tune_tcp_maxrt (fd_, options.tcp_maxrt);
285
0
    return rc == 0;
286
0
}