Coverage Report

Created: 2026-01-25 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/ws_connecter.cpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include <new>
5
#include <string>
6
7
#include "macros.hpp"
8
#include "ws_connecter.hpp"
9
#include "io_thread.hpp"
10
#include "err.hpp"
11
#include "ip.hpp"
12
#include "tcp.hpp"
13
#include "address.hpp"
14
#include "ws_address.hpp"
15
#include "ws_engine.hpp"
16
#include "session_base.hpp"
17
18
#ifdef ZMQ_HAVE_WSS
19
#include "wss_engine.hpp"
20
#include "wss_address.hpp"
21
#endif
22
23
#if !defined ZMQ_HAVE_WINDOWS
24
#include <unistd.h>
25
#include <sys/types.h>
26
#include <sys/socket.h>
27
#include <arpa/inet.h>
28
#include <netinet/tcp.h>
29
#include <netinet/in.h>
30
#include <netdb.h>
31
#include <fcntl.h>
32
#ifdef ZMQ_HAVE_VXWORKS
33
#include <sockLib.h>
34
#endif
35
#ifdef ZMQ_HAVE_OPENVMS
36
#include <ioctl.h>
37
#endif
38
#endif
39
40
#ifdef __APPLE__
41
#include <TargetConditionals.h>
42
#endif
43
44
zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
45
                                     class session_base_t *session_,
46
                                     const options_t &options_,
47
                                     address_t *addr_,
48
                                     bool delayed_start_,
49
                                     bool wss_,
50
                                     const std::string &tls_hostname_) :
51
355
    stream_connecter_base_t (
52
355
      io_thread_, session_, options_, addr_, delayed_start_),
53
355
    _connect_timer_started (false),
54
355
    _wss (wss_),
55
355
    _hostname (tls_hostname_)
56
355
{
57
355
}
58
59
zmq::ws_connecter_t::~ws_connecter_t ()
60
355
{
61
355
    zmq_assert (!_connect_timer_started);
62
355
}
63
64
void zmq::ws_connecter_t::process_term (int linger_)
65
355
{
66
355
    if (_connect_timer_started) {
67
0
        cancel_timer (connect_timer_id);
68
0
        _connect_timer_started = false;
69
0
    }
70
71
355
    stream_connecter_base_t::process_term (linger_);
72
355
}
73
74
void zmq::ws_connecter_t::out_event ()
75
30
{
76
30
    if (_connect_timer_started) {
77
0
        cancel_timer (connect_timer_id);
78
0
        _connect_timer_started = false;
79
0
    }
80
81
    //  TODO this is still very similar to (t)ipc_connecter_t, maybe the
82
    //  differences can be factored out
83
84
30
    rm_handle ();
85
86
30
    const fd_t fd = connect ();
87
88
    //  Handle the error condition by attempt to reconnect.
89
30
    if (fd == retired_fd || !tune_socket (fd)) {
90
30
        close ();
91
30
        add_reconnect_timer ();
92
30
        return;
93
30
    }
94
95
0
    if (_wss)
96
#ifdef ZMQ_HAVE_WSS
97
        create_engine (fd,
98
                       get_socket_name<wss_address_t> (fd, socket_end_local));
99
#else
100
0
        assert (false);
101
0
#endif
102
0
    else
103
0
        create_engine (fd,
104
0
                       get_socket_name<ws_address_t> (fd, socket_end_local));
105
0
}
106
107
void zmq::ws_connecter_t::timer_event (int id_)
108
0
{
109
0
    if (id_ == connect_timer_id) {
110
0
        _connect_timer_started = false;
111
0
        rm_handle ();
112
0
        close ();
113
0
        add_reconnect_timer ();
114
0
    } else
115
0
        stream_connecter_base_t::timer_event (id_);
116
0
}
117
118
void zmq::ws_connecter_t::start_connecting ()
119
355
{
120
    //  Open the connecting socket.
121
355
    const int rc = open ();
122
123
    //  Connect may succeed in synchronous manner.
124
355
    if (rc == 0) {
125
0
        _handle = add_fd (_s);
126
0
        out_event ();
127
0
    }
128
129
    //  Connection establishment may be delayed. Poll for its completion.
130
355
    else if (rc == -1 && errno == EINPROGRESS) {
131
82
        _handle = add_fd (_s);
132
82
        set_pollout (_handle);
133
82
        _socket->event_connect_delayed (
134
82
          make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
135
136
        //  add userspace connect timeout
137
82
        add_connect_timer ();
138
82
    }
139
140
    //  Handle any other error condition by eventual reconnect.
141
273
    else {
142
273
        if (_s != retired_fd)
143
2
            close ();
144
273
        add_reconnect_timer ();
145
273
    }
146
355
}
147
148
void zmq::ws_connecter_t::add_connect_timer ()
149
82
{
150
82
    if (options.connect_timeout > 0) {
151
0
        add_timer (options.connect_timeout, connect_timer_id);
152
0
        _connect_timer_started = true;
153
0
    }
154
82
}
155
156
int zmq::ws_connecter_t::open ()
157
355
{
158
355
    zmq_assert (_s == retired_fd);
159
160
355
    tcp_address_t tcp_addr;
161
355
    _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
162
355
                          &tcp_addr);
163
355
    if (_s == retired_fd)
164
271
        return -1;
165
166
    // Set the socket to non-blocking mode so that we get async connect().
167
84
    unblock_socket (_s);
168
169
    //  Connect to the remote peer.
170
#ifdef ZMQ_HAVE_VXWORKS
171
    int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
172
#else
173
84
    const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
174
84
#endif
175
    //  Connect was successful immediately.
176
84
    if (rc == 0) {
177
0
        return 0;
178
0
    }
179
180
    //  Translate error codes indicating asynchronous connect has been
181
    //  launched to a uniform EINPROGRESS.
182
#ifdef ZMQ_HAVE_WINDOWS
183
    const int last_error = WSAGetLastError ();
184
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
185
        errno = EINPROGRESS;
186
    else
187
        errno = wsa_error_to_errno (last_error);
188
#else
189
84
    if (errno == EINTR)
190
84
        errno = EINPROGRESS;
191
84
#endif
192
84
    return -1;
193
84
}
194
195
zmq::fd_t zmq::ws_connecter_t::connect ()
196
30
{
197
    //  Async connect has finished. Check whether an error occurred
198
30
    int err = 0;
199
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
200
    int len = sizeof err;
201
#else
202
30
    socklen_t len = sizeof err;
203
30
#endif
204
205
30
    const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
206
30
                               reinterpret_cast<char *> (&err), &len);
207
208
    //  Assert if the error was caused by 0MQ bug.
209
    //  Networking problems are OK. No need to assert.
210
#ifdef ZMQ_HAVE_WINDOWS
211
    zmq_assert (rc == 0);
212
    if (err != 0) {
213
        if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
214
            || err == WSAENOBUFS) {
215
            wsa_assert_no (err);
216
        }
217
        return retired_fd;
218
    }
219
#else
220
    //  Following code should handle both Berkeley-derived socket
221
    //  implementations and Solaris.
222
30
    if (rc == -1)
223
0
        err = errno;
224
30
    if (err != 0) {
225
30
        errno = err;
226
30
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
227
30
        errno_assert (errno != EBADF && errno != ENOPROTOOPT
228
30
                      && errno != ENOTSOCK && errno != ENOBUFS);
229
#else
230
        errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
231
                      && errno != ENOBUFS);
232
#endif
233
30
        return retired_fd;
234
30
    }
235
0
#endif
236
237
    //  Return the newly connected socket.
238
0
    const fd_t result = _s;
239
0
    _s = retired_fd;
240
0
    return result;
241
30
}
242
243
bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
244
0
{
245
0
    const int rc =
246
0
      tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
247
0
    return rc == 0;
248
0
}
249
250
void zmq::ws_connecter_t::create_engine (fd_t fd_,
251
                                         const std::string &local_address_)
252
0
{
253
0
    const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
254
0
                                             endpoint_type_connect);
255
256
    //  Create the engine object for this connection.
257
0
    i_engine *engine = NULL;
258
0
    if (_wss) {
259
#ifdef ZMQ_HAVE_WSS
260
        engine = new (std::nothrow)
261
          wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr,
262
                        true, NULL, _hostname);
263
#else
264
0
        LIBZMQ_UNUSED (_hostname);
265
0
        assert (false);
266
0
#endif
267
0
    } else
268
0
        engine = new (std::nothrow) ws_engine_t (
269
0
          fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true);
270
0
    alloc_assert (engine);
271
272
    //  Attach the engine to the corresponding session object.
273
0
    send_attach (_session, engine);
274
275
    //  Shut the connecter down.
276
0
    terminate ();
277
278
0
    _socket->event_connected (endpoint_pair, fd_);
279
0
}