Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_ipecho_server.h"
3 :
4 : #include "../../util/fd_util.h"
5 : #include "../../util/net/fd_ip4.h"
6 :
7 : #include <errno.h>
8 : #include <unistd.h>
9 : #include <sys/poll.h>
10 : #include <sys/socket.h>
11 : #include <netinet/in.h>
12 :
13 0 : #define STATE_READING (0)
14 0 : #define STATE_WRITING (1)
15 :
16 0 : #define CLOSE_OK ( 0)
17 0 : #define CLOSE_EXPECTED_EOF (-1)
18 0 : #define CLOSE_PEER_RESET (-2)
19 0 : #define CLOSE_LARGE_REQUEST (-3)
20 0 : #define CLOSE_BAD_HEADER (-4)
21 0 : #define CLOSE_BAD_TRAILER (-5)
22 0 : #define CLOSE_BAD_LENGTH (-6)
23 0 : #define CLOSE_EVICTED (-7)
24 :
25 : struct fd_ipecho_server_connection {
26 : int state;
27 :
28 : uint ipv4;
29 :
30 : ushort parent;
31 :
32 : ulong request_bytes_read;
33 : uchar request_bytes[ 22UL ];
34 : ulong response_bytes_written;
35 : uchar response_bytes[ 27UL ];
36 : };
37 :
38 : typedef struct fd_ipecho_server_connection fd_ipecho_server_connection_t;
39 :
40 : #define POOL_NAME conn_pool
41 0 : #define POOL_T fd_ipecho_server_connection_t
42 : #define POOL_IDX_T ushort
43 0 : #define POOL_NEXT parent
44 : #include "../../util/tmpl/fd_pool.c"
45 :
46 : struct fd_ipecho_server {
47 : int sockfd;
48 :
49 : ushort shred_version;
50 :
51 : ulong evict_idx;
52 : ulong max_connection_cnt;
53 :
54 : fd_ipecho_server_connection_t * pool;
55 : struct pollfd * pollfds;
56 :
57 : fd_ipecho_server_metrics_t metrics[ 1 ];
58 :
59 : ulong magic;
60 : };
61 :
62 : FD_FN_CONST ulong
63 0 : fd_ipecho_server_align( void ) {
64 0 : return 128UL;
65 0 : }
66 :
67 : FD_FN_CONST ulong
68 0 : fd_ipecho_server_footprint( ulong max_connection_cnt ) {
69 0 : ulong l = FD_LAYOUT_INIT;
70 0 : l = FD_LAYOUT_APPEND( l, fd_ipecho_server_align(), sizeof(fd_ipecho_server_t) );
71 0 : l = FD_LAYOUT_APPEND( l, conn_pool_align(), conn_pool_footprint( max_connection_cnt ) );
72 0 : l = FD_LAYOUT_APPEND( l, alignof(struct pollfd), (1UL+max_connection_cnt)*sizeof(struct pollfd) );
73 0 : return FD_LAYOUT_FINI( l, fd_ipecho_server_align() );
74 0 : }
75 :
76 : void *
77 : fd_ipecho_server_new( void * shmem,
78 0 : ulong max_connection_cnt ) {
79 0 : if( FD_UNLIKELY( !shmem ) ) {
80 0 : FD_LOG_WARNING(( "NULL shmem" ));
81 0 : return NULL;
82 0 : }
83 :
84 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ipecho_server_align() ) ) ) {
85 0 : FD_LOG_WARNING(( "misaligned shmem" ));
86 0 : return NULL;
87 0 : }
88 :
89 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
90 0 : fd_ipecho_server_t * server = FD_SCRATCH_ALLOC_APPEND( l, fd_ipecho_server_align(), sizeof(fd_ipecho_server_t) );
91 0 : void * pool = FD_SCRATCH_ALLOC_APPEND( l, conn_pool_align(), conn_pool_footprint( max_connection_cnt ) );
92 0 : server->pollfds = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct pollfd), (1UL+max_connection_cnt)*sizeof(struct pollfd) );
93 :
94 0 : server->pool = conn_pool_join( conn_pool_new( pool, max_connection_cnt ) );
95 0 : FD_TEST( server->pool );
96 :
97 0 : server->sockfd = -1;
98 :
99 0 : for( ulong i=0UL; i<max_connection_cnt; i++ ) {
100 0 : server->pollfds[ i ].fd = -1;
101 0 : server->pollfds[ i ].events = POLLIN | POLLOUT;
102 0 : }
103 0 : server->pollfds[ max_connection_cnt ].fd = -1;
104 :
105 0 : server->evict_idx = 0UL;
106 0 : server->max_connection_cnt = max_connection_cnt;
107 :
108 0 : memset( &server->metrics, 0, sizeof(server->metrics) );
109 :
110 0 : FD_COMPILER_MFENCE();
111 0 : FD_VOLATILE( server->magic ) = FD_IPECHO_SERVER_MAGIC;
112 0 : FD_COMPILER_MFENCE();
113 :
114 0 : return server;
115 0 : }
116 :
117 : fd_ipecho_server_t *
118 0 : fd_ipecho_server_join( void * shipe ) {
119 0 : if( FD_UNLIKELY( !shipe ) ) {
120 0 : FD_LOG_WARNING(( "NULL shipe" ));
121 0 : return NULL;
122 0 : }
123 :
124 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shipe, fd_ipecho_server_align() ) ) ) {
125 0 : FD_LOG_WARNING(( "misaligned shipe" ));
126 0 : return NULL;
127 0 : }
128 :
129 0 : fd_ipecho_server_t * server = (fd_ipecho_server_t *)shipe;
130 :
131 0 : if( FD_UNLIKELY( server->magic!=FD_IPECHO_SERVER_MAGIC ) ) {
132 0 : FD_LOG_WARNING(( "bad magic" ));
133 0 : return NULL;
134 0 : }
135 :
136 0 : return server;
137 0 : }
138 :
139 : void
140 : fd_ipecho_server_init( fd_ipecho_server_t * server,
141 : uint address,
142 : ushort port,
143 0 : ushort shred_version ) {
144 :
145 : /* If the shred version is 0 that means that the shred version has not
146 : been set yet. */
147 0 : server->shred_version = shred_version;
148 :
149 0 : server->sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
150 0 : if( FD_UNLIKELY( -1==server->sockfd ) ) FD_LOG_ERR(( "socket() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
151 :
152 0 : int optval = 1;
153 0 : if( FD_UNLIKELY( -1==setsockopt( server->sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof( optval ) ) ) )
154 0 : FD_LOG_ERR(( "setsockopt failed (%i-%s)", errno, strerror( errno ) ));
155 :
156 0 : struct sockaddr_in addr = {
157 0 : .sin_family = AF_INET,
158 0 : .sin_port = fd_ushort_bswap( port ),
159 0 : .sin_addr.s_addr = address,
160 0 : };
161 :
162 0 : if( FD_UNLIKELY( -1==bind( server->sockfd, fd_type_pun( &addr ), sizeof( addr ) ) ) ) {
163 0 : FD_LOG_ERR(( "bind(%i,AF_INET," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
164 0 : server->sockfd, FD_IP4_ADDR_FMT_ARGS( address ), port,
165 0 : errno, fd_io_strerror( errno ) ));
166 0 : }
167 0 : if( FD_UNLIKELY( -1==listen( server->sockfd, (int)server->max_connection_cnt ) ) ) FD_LOG_ERR(( "listen() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
168 :
169 0 : server->pollfds[ server->max_connection_cnt ] = (struct pollfd){ .fd = server->sockfd, .events = POLLIN, .revents = 0 };
170 0 : }
171 :
172 : void
173 0 : fd_ipecho_server_fini( fd_ipecho_server_t * server ) {
174 0 : fd_ipecho_server_close_conns( server );
175 0 : if( FD_UNLIKELY( -1!=server->sockfd ) ) {
176 0 : FD_TEST( -1!=close( server->sockfd ) );
177 0 : server->sockfd = -1;
178 0 : server->pollfds[ server->max_connection_cnt ].fd = -1;
179 0 : }
180 0 : }
181 :
182 : void
183 0 : fd_ipecho_server_close_conns( fd_ipecho_server_t * server ) {
184 0 : for( ulong i=0UL; i<server->max_connection_cnt; i++ ) {
185 0 : if( FD_UNLIKELY( -1!=server->pollfds[ i ].fd ) ) {
186 0 : FD_TEST( -1!=close( server->pollfds[ i ].fd ) );
187 0 : server->pollfds[ i ].fd = -1;
188 0 : conn_pool_ele_release( server->pool, &server->pool[ i ] );
189 0 : }
190 0 : }
191 0 : server->metrics->connection_cnt = 0UL;
192 0 : server->evict_idx = 0UL;
193 0 : }
194 :
195 : void
196 : fd_ipecho_server_set_shred_version( fd_ipecho_server_t * server,
197 0 : ushort shred_version ) {
198 0 : server->shred_version = shred_version;
199 0 : }
200 :
201 : static inline int
202 0 : is_expected_network_error( int err ) {
203 0 : return
204 0 : err==ENETDOWN ||
205 0 : err==EPROTO ||
206 0 : err==ENOPROTOOPT ||
207 0 : err==EHOSTDOWN ||
208 0 : err==ENONET ||
209 0 : err==EHOSTUNREACH ||
210 0 : err==EOPNOTSUPP ||
211 0 : err==ENETUNREACH ||
212 0 : err==ETIMEDOUT ||
213 0 : err==ENETRESET ||
214 0 : err==ECONNABORTED ||
215 0 : err==ECONNRESET ||
216 0 : err==EPIPE;
217 0 : }
218 :
219 : static void
220 : close_conn( fd_ipecho_server_t * server,
221 : ulong conn_idx,
222 0 : int reason ) {
223 0 : (void)reason;
224 0 : FD_TEST( server->pollfds[ conn_idx ].fd!=-1 );
225 :
226 0 : if( FD_UNLIKELY( -1==close( server->pollfds[ conn_idx ].fd ) ) ) FD_LOG_ERR(( "close failed (%i-%s)", errno, strerror( errno ) ));
227 0 : server->pollfds[ conn_idx ].fd = -1;
228 0 : conn_pool_ele_release( server->pool, &server->pool[ conn_idx ] );
229 :
230 0 : FD_TEST( server->metrics->connection_cnt );
231 0 : server->metrics->connection_cnt--;
232 0 : if( FD_UNLIKELY( reason==CLOSE_OK ) ) server->metrics->connections_closed_ok++;
233 0 : else server->metrics->connections_closed_error++;
234 0 : }
235 :
236 : static void
237 0 : accept_conns( fd_ipecho_server_t * server ) {
238 0 : for(;;) {
239 0 : struct sockaddr_in addr;
240 0 : socklen_t addr_len = sizeof(addr);
241 0 : int fd = accept4( server->pollfds[ server->max_connection_cnt ].fd, fd_type_pun( &addr ), &addr_len, SOCK_NONBLOCK|SOCK_CLOEXEC );
242 :
243 0 : if( FD_UNLIKELY( -1==fd ) ) {
244 0 : if( FD_LIKELY( EAGAIN==errno ) ) break;
245 0 : else if( FD_LIKELY( is_expected_network_error( errno ) ) ) continue;
246 0 : else FD_LOG_ERR(( "accept4() failed (%i-%s)", errno, strerror( errno ) ));
247 0 : }
248 :
249 0 : if( FD_UNLIKELY( !conn_pool_free( server->pool ) ) ) {
250 0 : close_conn( server, server->evict_idx, CLOSE_EVICTED );
251 0 : server->evict_idx = (server->evict_idx+1UL) % server->max_connection_cnt;
252 0 : }
253 0 : ulong conn_id = conn_pool_idx_acquire( server->pool );
254 :
255 0 : server->pollfds[ conn_id ].fd = fd;
256 0 : server->pool[ conn_id ].ipv4 = addr.sin_addr.s_addr;
257 0 : server->pool[ conn_id ].state = STATE_READING;
258 0 : server->pool[ conn_id ].request_bytes_read = 0UL;
259 0 : server->pool[ conn_id ].response_bytes_written = 0UL;
260 :
261 0 : server->metrics->connection_cnt++;
262 0 : }
263 0 : }
264 :
265 : static void
266 : read_conn( fd_ipecho_server_t * server,
267 0 : ulong conn_idx ) {
268 0 : fd_ipecho_server_connection_t * conn = &server->pool[ conn_idx ];
269 :
270 0 : if( FD_UNLIKELY( conn->state!=STATE_READING ) ) {
271 0 : close_conn( server, conn_idx, CLOSE_EXPECTED_EOF );
272 0 : return;
273 0 : }
274 :
275 0 : long sz = read( server->pollfds[ conn_idx ].fd, conn->request_bytes+conn->request_bytes_read, sizeof(conn->request_bytes)-conn->request_bytes_read );
276 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data to read, continue. */
277 0 : else if( -1==sz && is_expected_network_error( errno ) ) {
278 0 : close_conn( server, conn_idx, CLOSE_PEER_RESET );
279 0 : return;
280 0 : }
281 0 : else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "read failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
282 :
283 0 : if( FD_UNLIKELY( !sz && conn->request_bytes_read!=21UL ) ) {
284 0 : close_conn( server, conn_idx, CLOSE_BAD_LENGTH );
285 0 : return;
286 0 : }
287 :
288 : /* New data was read... process it */
289 0 : server->metrics->bytes_read += (ulong)sz;
290 0 : conn->request_bytes_read += (ulong)sz;
291 0 : if( FD_UNLIKELY( conn->request_bytes_read==sizeof(conn->request_bytes) ) ) {
292 0 : close_conn( server, conn_idx, CLOSE_LARGE_REQUEST );
293 0 : return;
294 0 : }
295 :
296 0 : if( FD_UNLIKELY( conn->request_bytes_read<21UL ) ) return;
297 :
298 0 : if( FD_UNLIKELY( memcmp( conn->request_bytes, "\0\0\0\0", 4UL ) ) ) {
299 0 : close_conn( server, conn_idx, CLOSE_BAD_HEADER );
300 0 : return;
301 0 : }
302 :
303 0 : if( FD_UNLIKELY( conn->request_bytes[ 20UL ]!='\n' ) ) {
304 0 : close_conn( server, conn_idx, CLOSE_BAD_TRAILER );
305 0 : return;
306 0 : }
307 :
308 0 : uchar response[ 27UL ] = {
309 0 : 0, 0, 0, 0, /* Magic */
310 0 : 0, 0, 0, 0, /* IP address variant */
311 0 : 0, 0, 0, 0, /* IP address */
312 0 : 1, /* Shred version option variant */
313 0 : 0, 0, /* Shred version */
314 0 : 0, /* [...] 12 bytes of trailing garbage, as in Agave */
315 0 : };
316 :
317 0 : FD_STORE( uint, response+8UL, conn->ipv4 );
318 0 : FD_STORE( ushort, response+13UL, server->shred_version );
319 :
320 : /* Now have a complete request ... buffer response */
321 0 : conn->state = STATE_WRITING;
322 0 : conn->response_bytes_written = 0UL;
323 0 : memcpy( conn->response_bytes, response, sizeof(response) );
324 0 : }
325 :
326 : static void
327 : write_conn( fd_ipecho_server_t * server,
328 0 : ulong conn_idx ) {
329 0 : fd_ipecho_server_connection_t * conn = &server->pool[ conn_idx ];
330 :
331 0 : if( FD_LIKELY( conn->state==STATE_READING ) ) return;
332 :
333 0 : long sz = sendto( server->pollfds[ conn_idx ].fd, conn->response_bytes+conn->response_bytes_written, sizeof(conn->response_bytes)-conn->response_bytes_written, MSG_NOSIGNAL, NULL, 0 );
334 0 : if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
335 0 : if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
336 0 : close_conn( server, conn_idx, CLOSE_PEER_RESET );
337 0 : return;
338 0 : }
339 0 : if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
340 :
341 0 : server->metrics->bytes_written += (ulong)sz;
342 0 : conn->response_bytes_written += (ulong)sz;
343 0 : if( FD_UNLIKELY( conn->response_bytes_written<sizeof(conn->response_bytes) ) ) return;
344 :
345 0 : close_conn( server, conn_idx, CLOSE_OK );
346 0 : }
347 :
348 : void
349 : fd_ipecho_server_poll( fd_ipecho_server_t * server,
350 : int * charge_busy,
351 0 : int timeout_ms ) {
352 :
353 : /* If the shred version is 0 that means that the shred version just
354 : has not been set yet. Don't try to accept connections yet. */
355 0 : if( FD_UNLIKELY( server->shred_version==0U ) ) return;
356 :
357 0 : int nfds = fd_syscall_poll( server->pollfds, (uint)(server->max_connection_cnt+1UL), timeout_ms );
358 0 : if( FD_UNLIKELY( 0==nfds ) ) return;
359 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return;
360 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
361 :
362 0 : *charge_busy = 1;
363 :
364 0 : for( ulong i=0UL; i<server->max_connection_cnt+1UL; i++ ) {
365 0 : if( FD_UNLIKELY( -1==server->pollfds[ i ].fd ) ) continue;
366 0 : if( FD_UNLIKELY( i==server->max_connection_cnt ) ) {
367 0 : accept_conns( server );
368 0 : } else {
369 0 : if( FD_LIKELY( server->pollfds[ i ].revents & POLLIN ) ) read_conn( server, i );
370 0 : if( FD_UNLIKELY( -1==server->pollfds[ i ].fd ) ) continue;
371 0 : if( FD_LIKELY( server->pollfds[ i ].revents & POLLOUT ) ) write_conn( server, i );
372 : /* No need to handle POLLHUP, read() will return 0 soon enough. */
373 0 : }
374 0 : }
375 0 : }
376 :
377 : fd_ipecho_server_metrics_t *
378 0 : fd_ipecho_server_metrics( fd_ipecho_server_t * server ) {
379 0 : return server->metrics;
380 0 : }
381 :
382 : int
383 0 : fd_ipecho_server_sockfd( fd_ipecho_server_t * server ) {
384 0 : return server->sockfd;
385 0 : }
|