Line data Source code
1 : #define _GNU_SOURCE /* dup3 */
2 : #include "fd_sock_tile_private.h"
3 : #include "../fd_net_common.h"
4 : #include "../../topo/fd_topo.h"
5 : #include "../../../util/net/fd_eth.h"
6 : #include "../../../util/net/fd_ip4.h"
7 : #include "../../../util/net/fd_udp.h"
8 :
9 : #include <assert.h> /* assert */
10 : #include <stdalign.h> /* alignof */
11 : #include <errno.h>
12 : #include <fcntl.h> /* fcntl */
13 : #include <unistd.h> /* dup3, close */
14 : #include <netinet/in.h> /* sockaddr_in */
15 : #include <sys/socket.h> /* socket */
16 : #include "../../metrics/fd_metrics.h"
17 :
18 : #include "generated/fd_sock_tile_seccomp.h"
19 :
20 : /* recv/sendmmsg packet count in batch and tango burst depth
21 : FIXME make configurable in the future?
22 : FIXME keep in sync with fd_net_tile_topo.c */
23 0 : #define STEM_BURST (64UL)
24 :
25 : /* Place RX socket file descriptors in contiguous integer range. */
26 0 : #define RX_SOCK_FD_MIN (128)
27 :
28 : /* Controls max ancillary data size.
29 : Must be aligned by alignof(struct cmsghdr) */
30 0 : #define FD_SOCK_CMSG_MAX (64UL)
31 :
32 : /* Value of the sock_idx for Firedancer repair intake.
33 : Used to determine whether repair packets should go to shred vs repair tile.
34 : This value is validated at startup. */
35 0 : #define REPAIR_SHRED_SOCKET_ID (4U)
36 :
37 : static ulong
38 : populate_allowed_seccomp( fd_topo_t const * topo,
39 : fd_topo_tile_t const * tile,
40 : ulong out_cnt,
41 0 : struct sock_filter * out ) {
42 0 : FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) );
43 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
44 :
45 0 : populate_sock_filter_policy_fd_sock_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->tx_sock, RX_SOCK_FD_MIN, RX_SOCK_FD_MIN+(uint)ctx->sock_cnt );
46 0 : return sock_filter_policy_fd_sock_tile_instr_cnt;
47 0 : }
48 :
49 : static ulong
50 : populate_allowed_fds( fd_topo_t const * topo,
51 : fd_topo_tile_t const * tile,
52 : ulong out_fds_cnt,
53 0 : int * out_fds ) {
54 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
55 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
56 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
57 :
58 0 : ulong sock_cnt = ctx->sock_cnt;
59 0 : if( FD_UNLIKELY( out_fds_cnt<sock_cnt+3UL ) ) {
60 0 : FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
61 0 : }
62 :
63 0 : ulong out_cnt = 0UL;
64 :
65 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
66 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
67 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
68 0 : }
69 0 : out_fds[ out_cnt++ ] = ctx->tx_sock;
70 0 : for( ulong j=0UL; j<sock_cnt; j++ ) {
71 0 : out_fds[ out_cnt++ ] = ctx->pollfd[ j ].fd;
72 0 : }
73 0 : return out_cnt;
74 0 : }
75 :
76 : FD_FN_CONST static inline ulong
77 0 : tx_scratch_footprint( void ) {
78 0 : return STEM_BURST * fd_ulong_align_up( FD_NET_MTU, FD_CHUNK_ALIGN );
79 0 : }
80 :
81 : FD_FN_CONST static inline ulong
82 0 : scratch_align( void ) {
83 0 : return 4096UL;
84 0 : }
85 :
86 : FD_FN_PURE static inline ulong
87 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
88 0 : ulong l = FD_LAYOUT_INIT;
89 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
90 0 : l = FD_LAYOUT_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
91 0 : l = FD_LAYOUT_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
92 0 : l = FD_LAYOUT_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
93 0 : l = FD_LAYOUT_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
94 0 : l = FD_LAYOUT_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
95 0 : return FD_LAYOUT_FINI( l, scratch_align() );
96 0 : }
97 :
98 : /* create_udp_socket creates and configures a new UDP socket for the
99 : sock tile at the given file descriptor ID. */
100 :
101 : static void
102 : create_udp_socket( int sock_fd,
103 : uint bind_addr,
104 : ushort udp_port,
105 0 : int so_rcvbuf ) {
106 :
107 0 : if( fcntl( sock_fd, F_GETFD, 0 )!=-1 ) {
108 0 : FD_LOG_ERR(( "file descriptor %d already exists", sock_fd ));
109 0 : } else if( errno!=EBADF ) {
110 0 : FD_LOG_ERR(( "fcntl(F_GETFD) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
111 0 : }
112 :
113 0 : int orig_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
114 0 : if( FD_UNLIKELY( orig_fd<0 ) ) {
115 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
116 0 : }
117 :
118 0 : int reuseport = 1;
119 0 : if( FD_UNLIKELY( setsockopt( orig_fd, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(int) )<0 ) ) {
120 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_REUSEPORT,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
121 0 : }
122 :
123 0 : int ip_pktinfo = 1;
124 0 : if( FD_UNLIKELY( setsockopt( orig_fd, IPPROTO_IP, IP_PKTINFO, &ip_pktinfo, sizeof(int) )<0 ) ) {
125 0 : FD_LOG_ERR(( "setsockopt(IPPROTO_IP,IP_PKTINFO,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
126 0 : }
127 :
128 0 : if( FD_UNLIKELY( 0!=setsockopt( orig_fd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, sizeof(int) ) ) ) {
129 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", so_rcvbuf, errno, fd_io_strerror( errno ) ));
130 0 : }
131 :
132 0 : struct sockaddr_in saddr = {
133 0 : .sin_family = AF_INET,
134 0 : .sin_addr.s_addr = bind_addr,
135 0 : .sin_port = fd_ushort_bswap( udp_port ),
136 0 : };
137 0 : if( FD_UNLIKELY( 0!=bind( orig_fd, fd_type_pun_const( &saddr ), sizeof(struct sockaddr_in) ) ) ) {
138 0 : FD_LOG_ERR(( "bind(0.0.0.0:%i) failed (%i-%s)", udp_port, errno, fd_io_strerror( errno ) ));
139 0 : }
140 :
141 0 : # if defined(__linux__)
142 0 : int dup_res = dup3( orig_fd, sock_fd, O_CLOEXEC );
143 : # else
144 : int dup_res = dup2( orig_fd, sock_fd );
145 : # endif
146 0 : if( FD_UNLIKELY( dup_res!=sock_fd ) ) {
147 0 : FD_LOG_ERR(( "dup2 returned %i (%i-%s)", sock_fd, errno, fd_io_strerror( errno ) ));
148 0 : }
149 :
150 0 : if( FD_UNLIKELY( 0!=close( orig_fd ) ) ) {
151 0 : FD_LOG_ERR(( "close(%d) failed (%i-%s)", orig_fd, errno, fd_io_strerror( errno ) ));
152 0 : }
153 :
154 0 : }
155 :
156 : static void
157 : privileged_init( fd_topo_t * topo,
158 0 : fd_topo_tile_t * tile ) {
159 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
160 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
161 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
162 0 : struct iovec * batch_iov = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
163 0 : void * batch_cmsg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
164 0 : struct sockaddr_in * batch_sa = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
165 0 : struct mmsghdr * batch_msg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
166 0 : uchar * tx_scratch = FD_SCRATCH_ALLOC_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
167 :
168 0 : assert( scratch==ctx );
169 :
170 0 : fd_memset( ctx, 0, sizeof(fd_sock_tile_t) );
171 0 : fd_memset( batch_iov, 0, STEM_BURST*sizeof(struct iovec) );
172 0 : fd_memset( batch_sa, 0, STEM_BURST*sizeof(struct sockaddr_in) );
173 0 : fd_memset( batch_msg, 0, STEM_BURST*sizeof(struct mmsghdr) );
174 :
175 0 : ctx->batch_cnt = 0UL;
176 0 : ctx->batch_iov = batch_iov;
177 0 : ctx->batch_cmsg = batch_cmsg;
178 0 : ctx->batch_sa = batch_sa;
179 0 : ctx->batch_msg = batch_msg;
180 0 : ctx->tx_scratch0 = tx_scratch;
181 0 : ctx->tx_scratch1 = tx_scratch + tx_scratch_footprint();
182 0 : ctx->tx_ptr = tx_scratch;
183 :
184 : /* Create receive sockets. Incrementally assign them to file
185 : descriptors starting at sock_fd_min. */
186 :
187 0 : int sock_fd_min = RX_SOCK_FD_MIN;
188 0 : ushort udp_port_candidates[] = {
189 0 : (ushort)tile->sock.net.legacy_transaction_listen_port,
190 0 : (ushort)tile->sock.net.quic_transaction_listen_port,
191 0 : (ushort)tile->sock.net.shred_listen_port,
192 0 : (ushort)tile->sock.net.gossip_listen_port,
193 0 : (ushort)tile->sock.net.repair_intake_listen_port,
194 0 : (ushort)tile->sock.net.repair_serve_listen_port,
195 0 : (ushort)tile->sock.net.txsend_src_port
196 0 : };
197 0 : static char const * udp_port_links[] = {
198 0 : "net_quic", /* legacy_transaction_listen_port */
199 0 : "net_quic", /* quic_transaction_listen_port */
200 0 : "net_shred", /* shred_listen_port (turbine) */
201 0 : "net_gossvf", /* gossip_listen_port */
202 0 : "net_shred", /* shred_listen_port (repair) */
203 0 : "net_repair", /* repair_serve_listen_port */
204 0 : "net_txsend" /* txsend_src_port */
205 0 : };
206 0 : static uchar const udp_port_protos[] = {
207 0 : DST_PROTO_TPU_UDP, /* legacy_transaction_listen_port */
208 0 : DST_PROTO_TPU_QUIC, /* quic_transaction_listen_port */
209 0 : DST_PROTO_SHRED, /* shred_listen_port (turbine) */
210 0 : DST_PROTO_GOSSIP, /* gossip_listen_port */
211 0 : DST_PROTO_REPAIR, /* shred_listen_port (repair) */
212 0 : DST_PROTO_REPAIR, /* repair_serve_listen_port */
213 : DST_PROTO_SEND /* send_src_port */
214 0 : };
215 0 : for( uint candidate_idx=0U; candidate_idx<7; candidate_idx++ ) {
216 0 : if( !udp_port_candidates[ candidate_idx ] ) continue;
217 0 : uint sock_idx = ctx->sock_cnt;
218 0 : if( sock_idx>=FD_SOCK_TILE_MAX_SOCKETS ) FD_LOG_ERR(( "too many sockets" ));
219 0 : ushort port = (ushort)udp_port_candidates[ candidate_idx ];
220 :
221 : /* Validate value of REPAIR_SHRED_SOCKET_ID */
222 0 : if( tile->sock.net.repair_intake_listen_port &&
223 0 : udp_port_candidates[candidate_idx]==tile->sock.net.repair_intake_listen_port )
224 0 : FD_TEST( sock_idx==REPAIR_SHRED_SOCKET_ID );
225 0 : if( tile->sock.net.repair_serve_listen_port &&
226 0 : udp_port_candidates[candidate_idx]==tile->sock.net.repair_serve_listen_port )
227 0 : FD_TEST( sock_idx==REPAIR_SHRED_SOCKET_ID+1 );
228 :
229 0 : char const * target_link = udp_port_links[ candidate_idx ];
230 0 : ctx->link_rx_map[ sock_idx ] = 0xFF;
231 0 : for( ulong j=0UL; j<(tile->out_cnt); j++ ) {
232 0 : if( 0==strcmp( topo->links[ tile->out_link_id[ j ] ].name, target_link ) ) {
233 0 : ctx->proto_id [ sock_idx ] = (uchar)udp_port_protos[ candidate_idx ];
234 0 : ctx->link_rx_map [ sock_idx ] = (uchar)j;
235 0 : ctx->rx_sock_port[ sock_idx ] = (ushort)port;
236 0 : break;
237 0 : }
238 0 : }
239 0 : if( ctx->link_rx_map[ sock_idx ]==0xFF ) {
240 0 : continue; /* listen port number has no associated links */
241 0 : }
242 :
243 0 : int sock_fd = sock_fd_min + (int)sock_idx;
244 0 : create_udp_socket( sock_fd, tile->sock.net.bind_address, port, tile->sock.so_rcvbuf );
245 0 : ctx->pollfd[ sock_idx ].fd = sock_fd;
246 0 : ctx->pollfd[ sock_idx ].events = POLLIN;
247 0 : ctx->sock_cnt++;
248 0 : }
249 :
250 : /* Create transmit socket */
251 :
252 0 : int tx_sock = socket( AF_INET, SOCK_RAW|SOCK_CLOEXEC, FD_IP4_HDR_PROTOCOL_UDP );
253 0 : if( FD_UNLIKELY( tx_sock<0 ) ) {
254 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_RAW|SOCK_CLOEXEC,17) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
255 0 : }
256 :
257 0 : if( FD_UNLIKELY( 0!=setsockopt( tx_sock, SOL_SOCKET, SO_SNDBUF, &tile->sock.so_sndbuf, sizeof(int) ) ) ) {
258 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", tile->sock.so_sndbuf, errno, fd_io_strerror( errno ) ));
259 0 : }
260 :
261 0 : uchar mcast_ttl = 64;
262 0 : if( FD_UNLIKELY( 0!=setsockopt( tx_sock, IPPROTO_IP, IP_MULTICAST_TTL, &mcast_ttl, sizeof(mcast_ttl) ) ) ) {
263 0 : FD_LOG_ERR(( "setsockopt(IPPROTO_IP,IP_MULTICAST_TTL,%u) failed (%i-%s)", (uint)mcast_ttl, errno, fd_io_strerror( errno ) ));
264 0 : }
265 :
266 0 : ctx->tx_sock = tx_sock;
267 0 : ctx->bind_address = tile->sock.net.bind_address;
268 :
269 0 : }
270 :
271 : static void
272 : unprivileged_init( fd_topo_t * topo,
273 0 : fd_topo_tile_t * tile ) {
274 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
275 :
276 0 : if( FD_UNLIKELY( tile->out_cnt > MAX_NET_OUTS ) ) {
277 0 : FD_LOG_ERR(( "sock tile has %lu out links which exceeds the max (%lu)", tile->out_cnt, MAX_NET_OUTS ));
278 0 : }
279 :
280 0 : for( ulong i=0UL; i<(tile->out_cnt); i++ ) {
281 0 : if( 0!=strncmp( topo->links[ tile->out_link_id[ i ] ].name, "net_", 4 ) ) {
282 0 : FD_LOG_ERR(( "out link %lu is not a net RX link", i ));
283 0 : }
284 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
285 0 : ctx->link_rx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
286 0 : ctx->link_rx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_rx[ i ].base, link->dcache );
287 0 : ctx->link_rx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_rx[ i ].base, link->dcache, link->mtu );
288 0 : ctx->link_rx[ i ].chunk = ctx->link_rx[ i ].chunk0;
289 0 : if( FD_UNLIKELY( link->burst < STEM_BURST ) ) {
290 0 : FD_LOG_ERR(( "link %lu dcache burst is too low (%lu<%lu)",
291 0 : tile->out_link_id[ i ], link->burst, STEM_BURST ));
292 0 : }
293 0 : }
294 :
295 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
296 0 : if( !strstr( topo->links[ tile->in_link_id[ i ] ].name, "_net" ) ) {
297 0 : FD_LOG_ERR(( "in link %lu is not a net TX link", i ));
298 0 : }
299 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
300 0 : ctx->link_tx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
301 0 : ctx->link_tx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_tx[ i ].base, link->dcache );
302 0 : ctx->link_tx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_tx[ i ].base, link->dcache, link->mtu );
303 0 : }
304 :
305 0 : }
306 :
307 : /* RX PATH (socket->tango) ********************************************/
308 :
309 : /* FIXME Pace RX polling and interleave it with TX jobs to reduce TX
310 : tail latency */
311 :
312 : /* poll_rx_socket does one recvmmsg batch receive on the given socket
313 : index. Returns the number of packets returned by recvmmsg. */
314 :
315 : static ulong
316 : poll_rx_socket( fd_sock_tile_t * ctx,
317 : fd_stem_context_t * stem,
318 : uint sock_idx,
319 : int sock_fd,
320 0 : ushort proto ) {
321 0 : ulong hdr_sz = sizeof(fd_eth_hdr_t) + sizeof(fd_ip4_hdr_t) + sizeof(fd_udp_hdr_t);
322 0 : ulong payload_max = FD_NET_MTU-hdr_sz;
323 0 : uchar rx_link = ctx->link_rx_map[ sock_idx ];
324 0 : ushort dport = ctx->rx_sock_port[ sock_idx ];
325 :
326 0 : fd_sock_link_rx_t * link = ctx->link_rx + rx_link;
327 0 : void * const base = link->base;
328 0 : ulong const chunk0 = link->chunk0;
329 0 : ulong const wmark = link->wmark;
330 0 : ulong chunk_next = link->chunk;
331 0 : uchar * cmsg_next = ctx->batch_cmsg;
332 :
333 0 : for( ulong j=0UL; j<STEM_BURST; j++ ) {
334 0 : ctx->batch_iov[ j ].iov_base = (uchar *)fd_chunk_to_laddr( base, chunk_next ) + hdr_sz;
335 0 : ctx->batch_iov[ j ].iov_len = payload_max;
336 0 : ctx->batch_msg[ j ].msg_hdr = (struct msghdr) {
337 0 : .msg_iov = ctx->batch_iov+j,
338 0 : .msg_iovlen = 1,
339 0 : .msg_name = ctx->batch_sa+j,
340 0 : .msg_namelen = sizeof(struct sockaddr_in),
341 0 : .msg_control = cmsg_next,
342 0 : .msg_controllen = FD_SOCK_CMSG_MAX,
343 0 : };
344 0 : cmsg_next += FD_SOCK_CMSG_MAX;
345 : /* Speculatively prepare all chunk indexes for a receive.
346 : At function exit, chunks into which a packet was received are
347 : committed, all others are freed. */
348 0 : chunk_next = fd_dcache_compact_next( chunk_next, FD_NET_MTU, chunk0, wmark );
349 0 : }
350 :
351 0 : int msg_cnt = recvmmsg( sock_fd, ctx->batch_msg, STEM_BURST, MSG_DONTWAIT, NULL );
352 0 : if( FD_UNLIKELY( msg_cnt<0 ) ) {
353 0 : if( FD_LIKELY( errno==EAGAIN ) ) return 0UL;
354 : /* unreachable if socket is in a valid state */
355 0 : FD_LOG_ERR(( "recvmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
356 0 : }
357 0 : long ts = fd_tickcount();
358 0 : ctx->metrics.sys_recvmmsg_cnt++;
359 :
360 0 : if( FD_UNLIKELY( msg_cnt==0 ) ) return 0UL;
361 :
362 : /* Track the chunk index of the last frag populated, so we can derive
363 : the chunk indexes for the next poll_rx_socket call.
364 : Guaranteed to be set since msg_cnt>0. */
365 0 : ulong last_chunk;
366 :
367 0 : for( ulong j=0; j<(ulong)msg_cnt; j++ ) {
368 0 : uchar * payload = ctx->batch_iov[ j ].iov_base;
369 0 : ulong payload_sz = ctx->batch_msg[ j ].msg_len;
370 0 : struct sockaddr_in * sa = ctx->batch_msg[ j ].msg_hdr.msg_name;
371 0 : ulong frame_sz = payload_sz + hdr_sz;
372 0 : ctx->metrics.rx_bytes_total += frame_sz;
373 0 : if( FD_UNLIKELY( sa->sin_family!=AF_INET ) ) {
374 : /* unreachable */
375 0 : FD_LOG_ERR(( "Received packet with unexpected sin_family %i", sa->sin_family ));
376 0 : }
377 :
378 0 : long daddr = -1;
379 0 : struct cmsghdr * cmsg = CMSG_FIRSTHDR( &ctx->batch_msg[ j ].msg_hdr );
380 0 : if( FD_LIKELY( cmsg ) ) {
381 0 : do {
382 0 : if( FD_LIKELY( (cmsg->cmsg_level==IPPROTO_IP) &
383 0 : (cmsg->cmsg_type ==IP_PKTINFO) ) ) {
384 0 : struct in_pktinfo const * pi = (struct in_pktinfo const *)CMSG_DATA( cmsg );
385 0 : daddr = pi->ipi_addr.s_addr;
386 0 : }
387 0 : cmsg = CMSG_NXTHDR( &ctx->batch_msg[ j ].msg_hdr, cmsg );
388 0 : } while( FD_UNLIKELY( cmsg ) ); /* optimize for 1 cmsg */
389 0 : }
390 0 : if( FD_UNLIKELY( daddr<0L ) ) {
391 : /* unreachable because IP_PKTINFO was set */
392 0 : FD_LOG_ERR(( "Missing IP_PKTINFO on incoming packet" ));
393 0 : }
394 :
395 0 : fd_eth_hdr_t * eth_hdr = (fd_eth_hdr_t *)( payload-42UL );
396 0 : fd_ip4_hdr_t * ip_hdr = (fd_ip4_hdr_t *)( payload-28UL );
397 0 : fd_udp_hdr_t * udp_hdr = (fd_udp_hdr_t *)( payload- 8UL );
398 0 : memset( eth_hdr->dst, 0, 6 );
399 0 : memset( eth_hdr->src, 0, 6 );
400 0 : eth_hdr->net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP );
401 0 : *ip_hdr = (fd_ip4_hdr_t) {
402 0 : .verihl = FD_IP4_VERIHL( 4, 5 ),
403 0 : .net_tot_len = fd_ushort_bswap( (ushort)( payload_sz+28UL ) ),
404 0 : .ttl = 1,
405 0 : .protocol = FD_IP4_HDR_PROTOCOL_UDP,
406 0 : };
407 0 : uint daddr_ = (uint)(ulong)daddr;
408 0 : memcpy( ip_hdr->saddr_c, &sa->sin_addr.s_addr, 4 );
409 0 : memcpy( ip_hdr->daddr_c, &daddr_, 4 );
410 0 : *udp_hdr = (fd_udp_hdr_t) {
411 0 : .net_sport = sa->sin_port,
412 0 : .net_dport = (ushort)fd_ushort_bswap( (ushort)dport ),
413 0 : .net_len = (ushort)fd_ushort_bswap( (ushort)( payload_sz+8UL ) ),
414 0 : .check = 0
415 0 : };
416 :
417 0 : ctx->metrics.rx_pkt_cnt++;
418 0 : ulong chunk = fd_laddr_to_chunk( base, eth_hdr );
419 0 : ulong sig = fd_disco_netmux_sig( sa->sin_addr.s_addr, fd_ushort_bswap( sa->sin_port ), sa->sin_addr.s_addr, proto, hdr_sz );
420 0 : ulong tspub = fd_frag_meta_ts_comp( ts );
421 :
422 : /* default for repair intake is to send to [shreds] to shred tile.
423 : ping messages should be routed to the repair. */
424 0 : if( FD_UNLIKELY( sock_idx==REPAIR_SHRED_SOCKET_ID && frame_sz==REPAIR_PING_SZ ) ) {
425 0 : uchar repair_rx_link = ctx->link_rx_map[ REPAIR_SHRED_SOCKET_ID+1 ];
426 0 : fd_sock_link_rx_t * repair_link = ctx->link_rx + repair_rx_link;
427 0 : uchar * repair_buf = fd_chunk_to_laddr( repair_link->base, repair_link->chunk );
428 0 : memcpy( repair_buf, eth_hdr, frame_sz );
429 0 : fd_stem_publish( stem, repair_rx_link, sig, repair_link->chunk, frame_sz, 0UL, 0UL, tspub );
430 0 : repair_link->chunk = fd_dcache_compact_next( repair_link->chunk, FD_NET_MTU, repair_link->chunk0, repair_link->wmark );
431 0 : } else {
432 0 : fd_stem_publish( stem, rx_link, sig, chunk, frame_sz, 0UL, 0UL, tspub );
433 0 : }
434 :
435 0 : last_chunk = chunk;
436 0 : }
437 :
438 : /* Rewind the chunk index to the first free index. */
439 0 : link->chunk = fd_dcache_compact_next( last_chunk, FD_NET_MTU, chunk0, wmark );
440 0 : return (ulong)msg_cnt;
441 0 : }
442 :
443 : static ulong
444 : poll_rx( fd_sock_tile_t * ctx,
445 0 : fd_stem_context_t * stem ) {
446 0 : ulong pkt_cnt = 0UL;
447 0 : if( FD_UNLIKELY( ctx->batch_cnt ) ) {
448 0 : FD_LOG_ERR(( "Batch is not clean" ));
449 0 : }
450 0 : ctx->tx_idle_cnt = 0; /* restart TX polling */
451 0 : if( FD_UNLIKELY( fd_syscall_poll( ctx->pollfd, ctx->sock_cnt, 0 )<0 ) ) {
452 0 : FD_LOG_ERR(( "fd_syscall_poll failed (%i-%s)", errno, fd_io_strerror( errno ) ));
453 0 : }
454 0 : for( uint j=0UL; j<ctx->sock_cnt; j++ ) {
455 0 : if( ctx->pollfd[ j ].revents & (POLLIN|POLLERR) ) {
456 0 : pkt_cnt += poll_rx_socket(
457 0 : ctx,
458 0 : stem,
459 0 : j,
460 0 : ctx->pollfd[ j ].fd,
461 0 : ctx->proto_id[ j ]
462 0 : );
463 0 : }
464 0 : ctx->pollfd[ j ].revents = 0;
465 0 : }
466 0 : return pkt_cnt;
467 0 : }
468 :
469 : /* TX PATH (tango->socket) ********************************************/
470 :
471 : static void
472 0 : flush_tx_batch( fd_sock_tile_t * ctx ) {
473 0 : ulong batch_cnt = ctx->batch_cnt;
474 0 : for( int j = 0; j < (int)batch_cnt; /* incremented in loop */ ) {
475 0 : int remain = (int)batch_cnt - j;
476 0 : int send_cnt = sendmmsg( ctx->tx_sock, ctx->batch_msg + j, (uint)remain, MSG_DONTWAIT );
477 0 : if( send_cnt>=0 ) {
478 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_NO_ERROR_IDX ]++;
479 0 : }
480 0 : if( FD_UNLIKELY( send_cnt < remain ) ) {
481 0 : ctx->metrics.tx_drop_cnt++;
482 0 : if( FD_UNLIKELY( send_cnt < 0 ) ) {
483 0 : switch( errno ) {
484 0 : case EAGAIN:
485 0 : case ENOBUFS:
486 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_SLOW_IDX ]++;
487 0 : break;
488 0 : case EPERM:
489 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_PERM_IDX ]++;
490 0 : break;
491 0 : case ENETUNREACH:
492 0 : case EHOSTUNREACH:
493 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_UNREACH_IDX ]++;
494 0 : break;
495 0 : case ENONET:
496 0 : case ENETDOWN:
497 0 : case EHOSTDOWN:
498 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_DOWN_IDX ]++;
499 0 : break;
500 0 : default:
501 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_OTHER_IDX ]++;
502 : /* log with NOTICE, since flushing has a significant negative performance impact */
503 0 : FD_LOG_NOTICE(( "sendmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
504 0 : }
505 :
506 : /* first message failed, so skip failing message and continue */
507 0 : j++;
508 0 : } else {
509 : /* send_cnt succeeded, so skip those and also the failing message */
510 0 : j += send_cnt + 1;
511 :
512 : /* add the successful count */
513 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
514 0 : }
515 :
516 0 : continue;
517 0 : }
518 :
519 : /* send_cnt == batch_cnt, so we sent everything */
520 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
521 0 : break;
522 0 : }
523 :
524 0 : ctx->tx_ptr = ctx->tx_scratch0;
525 0 : ctx->batch_cnt = 0;
526 0 : }
527 :
528 : /* before_frag is called when a new frag has been detected. The sock
529 : tile can do early filtering here in the future. For example, it may
530 : want to install routing logic here to take turns with an XDP tile.
531 : (Fast path with slow fallback) */
532 :
533 : static inline int
534 : before_frag( fd_sock_tile_t * ctx FD_PARAM_UNUSED,
535 : ulong in_idx FD_PARAM_UNUSED,
536 : ulong seq FD_PARAM_UNUSED,
537 0 : ulong sig ) {
538 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
539 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
540 0 : return 0; /* continue */
541 0 : }
542 :
543 : /* during_frag is called when a new frag passed early filtering.
544 : Speculatively copies data into a sendmmsg buffer. (If all tiles
545 : respect backpressure could eliminate this copy) */
546 :
547 : static inline void
548 : during_frag( fd_sock_tile_t * ctx,
549 : ulong in_idx,
550 : ulong seq FD_PARAM_UNUSED,
551 : ulong sig,
552 : ulong chunk,
553 : ulong sz,
554 0 : ulong ctl FD_PARAM_UNUSED ) {
555 0 : if( FD_UNLIKELY( chunk<ctx->link_tx[ in_idx ].chunk0 || chunk>ctx->link_tx[ in_idx ].wmark || sz>FD_NET_MTU ) ) {
556 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->link_tx[ in_idx ].chunk0, ctx->link_tx[ in_idx ].wmark ));
557 0 : }
558 :
559 0 : ulong const hdr_min = sizeof(fd_eth_hdr_t)+sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t);
560 0 : if( FD_UNLIKELY( sz<hdr_min ) ) {
561 : /* FIXME support ICMP messages in the future? */
562 0 : FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx ));
563 0 : }
564 :
565 0 : uchar const * frame = fd_chunk_to_laddr_const( ctx->link_tx[ in_idx ].base, chunk );
566 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
567 0 : uchar const * payload = frame+hdr_sz;
568 0 : if( FD_UNLIKELY( hdr_sz>sz || hdr_sz<hdr_min ) ) {
569 0 : FD_LOG_ERR(( "packet from in_idx=%lu corrupt: hdr_sz=%lu total_sz=%lu",
570 0 : in_idx, hdr_sz, sz ));
571 0 : }
572 0 : ulong payload_sz = sz-hdr_sz;
573 :
574 0 : fd_ip4_hdr_t const * ip_hdr = (fd_ip4_hdr_t const *)( frame +sizeof(fd_eth_hdr_t) );
575 0 : fd_udp_hdr_t const * udp_hdr = (fd_udp_hdr_t const *)( payload-sizeof(fd_udp_hdr_t) );
576 0 : if( FD_UNLIKELY( ( FD_IP4_GET_VERSION( *ip_hdr )!=4 ) |
577 0 : ( ip_hdr->protocol != FD_IP4_HDR_PROTOCOL_UDP ) ) ) {
578 0 : FD_LOG_ERR(( "packet from in_idx=%lu: sock tile only supports IPv4 UDP for now", in_idx ));
579 0 : }
580 :
581 0 : ulong msg_sz = sizeof(fd_udp_hdr_t) + payload_sz;
582 :
583 0 : ulong batch_idx = ctx->batch_cnt;
584 0 : assert( batch_idx<STEM_BURST );
585 0 : struct mmsghdr * msg = ctx->batch_msg + batch_idx;
586 0 : struct sockaddr_in * sa = ctx->batch_sa + batch_idx;
587 0 : struct iovec * iov = ctx->batch_iov + batch_idx;
588 0 : struct cmsghdr * cmsg = (void *)( (ulong)ctx->batch_cmsg + batch_idx*FD_SOCK_CMSG_MAX );
589 0 : uchar * buf = ctx->tx_ptr;
590 :
591 0 : *iov = (struct iovec) {
592 0 : .iov_base = buf,
593 0 : .iov_len = msg_sz,
594 0 : };
595 0 : sa->sin_family = AF_INET;
596 0 : sa->sin_addr.s_addr = FD_LOAD( uint, ip_hdr->daddr_c );
597 0 : sa->sin_port = 0; /* ignored */
598 :
599 0 : cmsg->cmsg_level = IPPROTO_IP;
600 0 : cmsg->cmsg_type = IP_PKTINFO;
601 0 : cmsg->cmsg_len = CMSG_LEN( sizeof(struct in_pktinfo) );
602 0 : struct in_pktinfo * pi = (struct in_pktinfo *)CMSG_DATA( cmsg );
603 0 : pi->ipi_ifindex = 0;
604 0 : pi->ipi_addr.s_addr = 0;
605 0 : pi->ipi_spec_dst.s_addr = fd_uint_if( !!ip_hdr->saddr, ip_hdr->saddr, ctx->bind_address );
606 :
607 0 : *msg = (struct mmsghdr) {
608 0 : .msg_hdr = {
609 0 : .msg_name = sa,
610 0 : .msg_namelen = sizeof(struct sockaddr_in),
611 0 : .msg_iov = iov,
612 0 : .msg_iovlen = 1,
613 0 : .msg_control = cmsg,
614 0 : .msg_controllen = CMSG_LEN( sizeof(struct in_pktinfo) )
615 0 : }
616 0 : };
617 :
618 0 : memcpy( buf, udp_hdr, sizeof(fd_udp_hdr_t) );
619 0 : fd_memcpy( buf+sizeof(fd_udp_hdr_t), payload, payload_sz );
620 0 : ctx->metrics.tx_bytes_total += sz;
621 0 : }
622 :
623 : /* after_frag is called when a frag was copied into a sendmmsg buffer. */
624 :
625 : static void
626 : after_frag( fd_sock_tile_t * ctx,
627 : ulong in_idx FD_PARAM_UNUSED,
628 : ulong seq FD_PARAM_UNUSED,
629 : ulong sig FD_PARAM_UNUSED,
630 : ulong sz,
631 : ulong tsorig FD_PARAM_UNUSED,
632 : ulong tspub FD_PARAM_UNUSED,
633 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
634 : /* Commit the packet added in during_frag */
635 :
636 0 : ctx->tx_idle_cnt = 0;
637 0 : ctx->batch_cnt++;
638 : /* Technically leaves a gap. sz is always larger than the payload
639 : written to tx_ptr because Ethernet & IPv4 headers were stripped. */
640 0 : ctx->tx_ptr += fd_ulong_align_up( sz, FD_CHUNK_ALIGN );
641 :
642 0 : if( ctx->batch_cnt >= STEM_BURST ) {
643 0 : flush_tx_batch( ctx );
644 0 : }
645 0 : }
646 :
647 : /* End TX path ********************************************************/
648 :
649 : /* after_credit is called every stem iteration when there are enough
650 : flow control credits to publish a burst of fragments. */
651 :
652 : static inline void
653 : after_credit( fd_sock_tile_t * ctx,
654 : fd_stem_context_t * stem,
655 : int * poll_in FD_PARAM_UNUSED,
656 0 : int * charge_busy ) {
657 0 : if( ctx->tx_idle_cnt > 512 ) {
658 0 : if( ctx->batch_cnt ) {
659 0 : flush_tx_batch( ctx );
660 0 : }
661 0 : ulong pkt_cnt = poll_rx( ctx, stem );
662 0 : *charge_busy = pkt_cnt!=0;
663 0 : }
664 0 : ctx->tx_idle_cnt++;
665 0 : }
666 :
667 : static void
668 0 : metrics_write( fd_sock_tile_t * ctx ) {
669 0 : FD_MCNT_SET( SOCK, SYSCALLS_RECVMMSG, ctx->metrics.sys_recvmmsg_cnt );
670 0 : FD_MCNT_ENUM_COPY( SOCK, SYSCALLS_SENDMMSG, ctx->metrics.sys_sendmmsg_cnt );
671 0 : FD_MCNT_SET( SOCK, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt );
672 0 : FD_MCNT_SET( SOCK, TX_PKT_CNT, ctx->metrics.tx_pkt_cnt );
673 0 : FD_MCNT_SET( SOCK, TX_DROP_CNT, ctx->metrics.tx_drop_cnt );
674 0 : FD_MCNT_SET( SOCK, TX_BYTES_TOTAL, ctx->metrics.tx_bytes_total );
675 0 : FD_MCNT_SET( SOCK, RX_BYTES_TOTAL, ctx->metrics.rx_bytes_total );
676 0 : }
677 :
678 : static ulong
679 : rlimit_file_cnt( fd_topo_t const * topo,
680 0 : fd_topo_tile_t const * tile ) {
681 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
682 0 : return RX_SOCK_FD_MIN + ctx->sock_cnt;
683 0 : }
684 :
685 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_sock_tile_t
686 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_sock_tile_t)
687 :
688 0 : #define STEM_LAZY ((long)10e6) /* 10ms */
689 :
690 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
691 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
692 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
693 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
694 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
695 :
696 : #include "../../stem/fd_stem.c"
697 :
698 : fd_topo_run_tile_t fd_tile_sock = {
699 : .name = "sock",
700 : .rlimit_file_cnt_fn = rlimit_file_cnt,
701 : .populate_allowed_seccomp = populate_allowed_seccomp,
702 : .populate_allowed_fds = populate_allowed_fds,
703 : .scratch_align = scratch_align,
704 : .scratch_footprint = scratch_footprint,
705 : .privileged_init = privileged_init,
706 : .unprivileged_init = unprivileged_init,
707 : .run = stem_run,
708 : };
|