/src/libzmq/src/ws_connecter.cpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include <new> |
5 | | #include <string> |
6 | | |
7 | | #include "macros.hpp" |
8 | | #include "ws_connecter.hpp" |
9 | | #include "io_thread.hpp" |
10 | | #include "err.hpp" |
11 | | #include "ip.hpp" |
12 | | #include "tcp.hpp" |
13 | | #include "address.hpp" |
14 | | #include "ws_address.hpp" |
15 | | #include "ws_engine.hpp" |
16 | | #include "session_base.hpp" |
17 | | |
18 | | #ifdef ZMQ_HAVE_WSS |
19 | | #include "wss_engine.hpp" |
20 | | #include "wss_address.hpp" |
21 | | #endif |
22 | | |
23 | | #if !defined ZMQ_HAVE_WINDOWS |
24 | | #include <unistd.h> |
25 | | #include <sys/types.h> |
26 | | #include <sys/socket.h> |
27 | | #include <arpa/inet.h> |
28 | | #include <netinet/tcp.h> |
29 | | #include <netinet/in.h> |
30 | | #include <netdb.h> |
31 | | #include <fcntl.h> |
32 | | #ifdef ZMQ_HAVE_VXWORKS |
33 | | #include <sockLib.h> |
34 | | #endif |
35 | | #ifdef ZMQ_HAVE_OPENVMS |
36 | | #include <ioctl.h> |
37 | | #endif |
38 | | #endif |
39 | | |
40 | | #ifdef __APPLE__ |
41 | | #include <TargetConditionals.h> |
42 | | #endif |
43 | | |
44 | | zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_, |
45 | | class session_base_t *session_, |
46 | | const options_t &options_, |
47 | | address_t *addr_, |
48 | | bool delayed_start_, |
49 | | bool wss_, |
50 | | const std::string &tls_hostname_) : |
51 | 355 | stream_connecter_base_t ( |
52 | 355 | io_thread_, session_, options_, addr_, delayed_start_), |
53 | 355 | _connect_timer_started (false), |
54 | 355 | _wss (wss_), |
55 | 355 | _hostname (tls_hostname_) |
56 | 355 | { |
57 | 355 | } |
58 | | |
59 | | zmq::ws_connecter_t::~ws_connecter_t () |
60 | 355 | { |
61 | 355 | zmq_assert (!_connect_timer_started); |
62 | 355 | } |
63 | | |
64 | | void zmq::ws_connecter_t::process_term (int linger_) |
65 | 355 | { |
66 | 355 | if (_connect_timer_started) { |
67 | 0 | cancel_timer (connect_timer_id); |
68 | 0 | _connect_timer_started = false; |
69 | 0 | } |
70 | | |
71 | 355 | stream_connecter_base_t::process_term (linger_); |
72 | 355 | } |
73 | | |
74 | | void zmq::ws_connecter_t::out_event () |
75 | 30 | { |
76 | 30 | if (_connect_timer_started) { |
77 | 0 | cancel_timer (connect_timer_id); |
78 | 0 | _connect_timer_started = false; |
79 | 0 | } |
80 | | |
81 | | // TODO this is still very similar to (t)ipc_connecter_t, maybe the |
82 | | // differences can be factored out |
83 | | |
84 | 30 | rm_handle (); |
85 | | |
86 | 30 | const fd_t fd = connect (); |
87 | | |
88 | | // Handle the error condition by attempt to reconnect. |
89 | 30 | if (fd == retired_fd || !tune_socket (fd)) { |
90 | 30 | close (); |
91 | 30 | add_reconnect_timer (); |
92 | 30 | return; |
93 | 30 | } |
94 | | |
95 | 0 | if (_wss) |
96 | | #ifdef ZMQ_HAVE_WSS |
97 | | create_engine (fd, |
98 | | get_socket_name<wss_address_t> (fd, socket_end_local)); |
99 | | #else |
100 | 0 | assert (false); |
101 | 0 | #endif |
102 | 0 | else |
103 | 0 | create_engine (fd, |
104 | 0 | get_socket_name<ws_address_t> (fd, socket_end_local)); |
105 | 0 | } |
106 | | |
107 | | void zmq::ws_connecter_t::timer_event (int id_) |
108 | 0 | { |
109 | 0 | if (id_ == connect_timer_id) { |
110 | 0 | _connect_timer_started = false; |
111 | 0 | rm_handle (); |
112 | 0 | close (); |
113 | 0 | add_reconnect_timer (); |
114 | 0 | } else |
115 | 0 | stream_connecter_base_t::timer_event (id_); |
116 | 0 | } |
117 | | |
118 | | void zmq::ws_connecter_t::start_connecting () |
119 | 355 | { |
120 | | // Open the connecting socket. |
121 | 355 | const int rc = open (); |
122 | | |
123 | | // Connect may succeed in synchronous manner. |
124 | 355 | if (rc == 0) { |
125 | 0 | _handle = add_fd (_s); |
126 | 0 | out_event (); |
127 | 0 | } |
128 | | |
129 | | // Connection establishment may be delayed. Poll for its completion. |
130 | 355 | else if (rc == -1 && errno == EINPROGRESS) { |
131 | 82 | _handle = add_fd (_s); |
132 | 82 | set_pollout (_handle); |
133 | 82 | _socket->event_connect_delayed ( |
134 | 82 | make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); |
135 | | |
136 | | // add userspace connect timeout |
137 | 82 | add_connect_timer (); |
138 | 82 | } |
139 | | |
140 | | // Handle any other error condition by eventual reconnect. |
141 | 273 | else { |
142 | 273 | if (_s != retired_fd) |
143 | 2 | close (); |
144 | 273 | add_reconnect_timer (); |
145 | 273 | } |
146 | 355 | } |
147 | | |
148 | | void zmq::ws_connecter_t::add_connect_timer () |
149 | 82 | { |
150 | 82 | if (options.connect_timeout > 0) { |
151 | 0 | add_timer (options.connect_timeout, connect_timer_id); |
152 | 0 | _connect_timer_started = true; |
153 | 0 | } |
154 | 82 | } |
155 | | |
156 | | int zmq::ws_connecter_t::open () |
157 | 355 | { |
158 | 355 | zmq_assert (_s == retired_fd); |
159 | | |
160 | 355 | tcp_address_t tcp_addr; |
161 | 355 | _s = tcp_open_socket (_addr->address.c_str (), options, false, true, |
162 | 355 | &tcp_addr); |
163 | 355 | if (_s == retired_fd) |
164 | 271 | return -1; |
165 | | |
166 | | // Set the socket to non-blocking mode so that we get async connect(). |
167 | 84 | unblock_socket (_s); |
168 | | |
169 | | // Connect to the remote peer. |
170 | | #ifdef ZMQ_HAVE_VXWORKS |
171 | | int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ()); |
172 | | #else |
173 | 84 | const int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ()); |
174 | 84 | #endif |
175 | | // Connect was successful immediately. |
176 | 84 | if (rc == 0) { |
177 | 0 | return 0; |
178 | 0 | } |
179 | | |
180 | | // Translate error codes indicating asynchronous connect has been |
181 | | // launched to a uniform EINPROGRESS. |
182 | | #ifdef ZMQ_HAVE_WINDOWS |
183 | | const int last_error = WSAGetLastError (); |
184 | | if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) |
185 | | errno = EINPROGRESS; |
186 | | else |
187 | | errno = wsa_error_to_errno (last_error); |
188 | | #else |
189 | 84 | if (errno == EINTR) |
190 | 84 | errno = EINPROGRESS; |
191 | 84 | #endif |
192 | 84 | return -1; |
193 | 84 | } |
194 | | |
195 | | zmq::fd_t zmq::ws_connecter_t::connect () |
196 | 30 | { |
197 | | // Async connect has finished. Check whether an error occurred |
198 | 30 | int err = 0; |
199 | | #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS |
200 | | int len = sizeof err; |
201 | | #else |
202 | 30 | socklen_t len = sizeof err; |
203 | 30 | #endif |
204 | | |
205 | 30 | const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, |
206 | 30 | reinterpret_cast<char *> (&err), &len); |
207 | | |
208 | | // Assert if the error was caused by 0MQ bug. |
209 | | // Networking problems are OK. No need to assert. |
210 | | #ifdef ZMQ_HAVE_WINDOWS |
211 | | zmq_assert (rc == 0); |
212 | | if (err != 0) { |
213 | | if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK |
214 | | || err == WSAENOBUFS) { |
215 | | wsa_assert_no (err); |
216 | | } |
217 | | return retired_fd; |
218 | | } |
219 | | #else |
220 | | // Following code should handle both Berkeley-derived socket |
221 | | // implementations and Solaris. |
222 | 30 | if (rc == -1) |
223 | 0 | err = errno; |
224 | 30 | if (err != 0) { |
225 | 30 | errno = err; |
226 | 30 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
227 | 30 | errno_assert (errno != EBADF && errno != ENOPROTOOPT |
228 | 30 | && errno != ENOTSOCK && errno != ENOBUFS); |
229 | | #else |
230 | | errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK |
231 | | && errno != ENOBUFS); |
232 | | #endif |
233 | 30 | return retired_fd; |
234 | 30 | } |
235 | 0 | #endif |
236 | | |
237 | | // Return the newly connected socket. |
238 | 0 | const fd_t result = _s; |
239 | 0 | _s = retired_fd; |
240 | 0 | return result; |
241 | 30 | } |
242 | | |
243 | | bool zmq::ws_connecter_t::tune_socket (const fd_t fd_) |
244 | 0 | { |
245 | 0 | const int rc = |
246 | 0 | tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt); |
247 | 0 | return rc == 0; |
248 | 0 | } |
249 | | |
250 | | void zmq::ws_connecter_t::create_engine (fd_t fd_, |
251 | | const std::string &local_address_) |
252 | 0 | { |
253 | 0 | const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint, |
254 | 0 | endpoint_type_connect); |
255 | | |
256 | | // Create the engine object for this connection. |
257 | 0 | i_engine *engine = NULL; |
258 | 0 | if (_wss) { |
259 | | #ifdef ZMQ_HAVE_WSS |
260 | | engine = new (std::nothrow) |
261 | | wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr, |
262 | | true, NULL, _hostname); |
263 | | #else |
264 | 0 | LIBZMQ_UNUSED (_hostname); |
265 | 0 | assert (false); |
266 | 0 | #endif |
267 | 0 | } else |
268 | 0 | engine = new (std::nothrow) ws_engine_t ( |
269 | 0 | fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true); |
270 | 0 | alloc_assert (engine); |
271 | | |
272 | | // Attach the engine to the corresponding session object. |
273 | 0 | send_attach (_session, engine); |
274 | | |
275 | | // Shut the connecter down. |
276 | 0 | terminate (); |
277 | |
|
278 | 0 | _socket->event_connected (endpoint_pair, fd_); |
279 | 0 | } |