Line data Source code
1 : #include "fd_genesis_client_private.h"
2 :
3 : #include "../../waltz/http/picohttpparser.h"
4 : #include "../../util/fd_util.h"
5 : #include "../../ballet/sha256/fd_sha256.h"
6 :
7 : #include <errno.h>
8 : #include <netinet/in.h>
9 : #include <string.h>
10 : #include <strings.h>
11 : #include <sys/socket.h>
12 : #include <unistd.h>
13 : #include <stdlib.h>
14 :
15 : FD_FN_CONST ulong
16 0 : fd_genesis_client_align( void ) {
17 0 : return alignof(fd_genesis_client_t);
18 0 : }
19 :
20 : FD_FN_CONST ulong
21 0 : fd_genesis_client_footprint( void ) {
22 0 : return sizeof(fd_genesis_client_t);
23 0 : }
24 :
25 : void *
26 0 : fd_genesis_client_new( void * shmem ) {
27 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shmem;
28 :
29 0 : if( FD_UNLIKELY( !shmem ) ) {
30 0 : FD_LOG_WARNING(( "NULL shmem" ));
31 0 : return NULL;
32 0 : }
33 :
34 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_genesis_client_align() ) ) ) {
35 0 : FD_LOG_WARNING(( "misaligned shmem" ));
36 0 : return NULL;
37 0 : }
38 :
39 0 : FD_COMPILER_MFENCE();
40 0 : FD_VOLATILE( gen->magic ) = FD_GENESIS_CLIENT_MAGIC;
41 0 : FD_COMPILER_MFENCE();
42 :
43 0 : return (void *)gen;
44 0 : }
45 :
46 : fd_genesis_client_t *
47 0 : fd_genesis_client_join( void * shgen ) {
48 0 : if( FD_UNLIKELY( !shgen ) ) {
49 0 : FD_LOG_WARNING(( "NULL shgen" ));
50 0 : return NULL;
51 0 : }
52 :
53 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgen, fd_genesis_client_align() ) ) ) {
54 0 : FD_LOG_WARNING(( "misaligned shgen" ));
55 0 : return NULL;
56 0 : }
57 :
58 0 : fd_genesis_client_t * gen = (fd_genesis_client_t *)shgen;
59 :
60 0 : if( FD_UNLIKELY( gen->magic!=FD_GENESIS_CLIENT_MAGIC ) ) {
61 0 : FD_LOG_WARNING(( "bad magic" ));
62 0 : return NULL;
63 0 : }
64 :
65 0 : return gen;
66 0 : }
67 :
68 : void
69 : fd_genesis_client_init( fd_genesis_client_t * client,
70 : fd_ip4_port_t const * servers,
71 0 : ulong servers_len ) {
72 0 : FD_TEST( servers_len<=FD_TOPO_GOSSIP_ENTRYPOINTS_MAX );
73 0 : ulong peer_cnt = 0UL;
74 :
75 0 : for( ulong i=0UL; i<servers_len; i++ ) {
76 0 : fd_ip4_port_t server = servers[ i ];
77 0 : server.port = 8899; // TODO: SPECIFY IN CONFIG
78 :
79 0 : int sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
80 0 : if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
81 :
82 0 : struct sockaddr_in addr = {
83 0 : .sin_family = AF_INET,
84 0 : .sin_port = fd_ushort_bswap( server.port ),
85 0 : .sin_addr = { .s_addr = server.addr }
86 0 : };
87 :
88 0 : if( FD_UNLIKELY( -1==connect( sockfd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
89 0 : if( FD_UNLIKELY( -1==close( sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
90 0 : continue;
91 0 : }
92 :
93 0 : client->pollfds[ peer_cnt ] = (struct pollfd){
94 0 : .fd = sockfd,
95 0 : .events = POLLIN | POLLOUT,
96 0 : .revents = 0
97 0 : };
98 0 : client->peers[ peer_cnt ].addr = server;
99 0 : client->peers[ peer_cnt ].writing = 1;
100 0 : client->peers[ peer_cnt ].request_bytes_sent = 0UL;
101 0 : client->peers[ peer_cnt ].response_bytes_read = 0UL;
102 0 : peer_cnt++;
103 0 : }
104 :
105 0 : for( ulong i=peer_cnt; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) client->pollfds[ i ].fd = -1;
106 :
107 0 : client->peer_cnt = peer_cnt;
108 0 : client->remaining_peer_cnt = peer_cnt;
109 0 : client->start_time_nanos = fd_log_wallclock();
110 0 : }
111 :
112 : static void
113 : close_one( fd_genesis_client_t * client,
114 0 : ulong idx ) {
115 0 : if( FD_UNLIKELY( -1==close( client->pollfds[ idx ].fd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
116 0 : client->pollfds[ idx ].fd = -1;
117 0 : client->remaining_peer_cnt--;
118 0 : }
119 :
120 : static void
121 0 : close_all( fd_genesis_client_t * client ) {
122 0 : for( ulong i=0UL; i<client->peer_cnt; i++ ) {
123 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
124 0 : close_one( client, i );
125 0 : }
126 0 : }
127 :
128 : static void
129 : write_conn( fd_genesis_client_t * client,
130 0 : ulong conn_idx ) {
131 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
132 :
133 0 : if( FD_LIKELY( !peer->writing ) ) return;
134 :
135 0 : char request[ 1024UL ];
136 0 : ulong request_sz = 0UL;
137 0 : FD_TEST( fd_cstr_printf_check( request, sizeof(request), &request_sz, "GET /genesis.tar.bz2 HTTP/1.1\r\n"
138 0 : "Cache-Control: no-cache\r\n"
139 0 : "Connection: keep-alive\r\n"
140 0 : "Pragma: no-cache\r\n"
141 0 : "User-Agent: Firedancer\r\n"
142 0 : "Host: " FD_IP4_ADDR_FMT ":%hu\r\n\r\n",
143 0 : FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ) );
144 0 : FD_TEST( request_sz > peer->request_bytes_sent );
145 :
146 0 : long written = sendto( client->pollfds[ conn_idx ].fd,
147 0 : request+peer->request_bytes_sent,
148 0 : request_sz-peer->request_bytes_sent,
149 0 : MSG_NOSIGNAL,
150 0 : NULL,
151 0 : 0 );
152 0 : if( FD_UNLIKELY( -1==written && errno==EAGAIN ) ) return; /* No data was written, continue. */
153 0 : else if( FD_UNLIKELY( -1==written ) ) {
154 0 : close_one( client, conn_idx );
155 0 : return;
156 0 : }
157 :
158 0 : peer->request_bytes_sent += (ulong)written;
159 0 : if( FD_UNLIKELY( peer->request_bytes_sent==request_sz ) ) {
160 0 : peer->writing = 0;
161 0 : peer->response_bytes_read = 0UL;
162 0 : }
163 0 : }
164 :
165 : static ulong
166 : rpc_parse_decimal_ulong( char const * s,
167 : ulong s_len,
168 0 : ulong * out ) {
169 0 : if( FD_UNLIKELY( !s_len ) ) return 0UL;
170 :
171 0 : ulong val = 0UL;
172 0 : for( ulong i=0UL; i<s_len; i++ ) {
173 0 : char c = s[ i ];
174 0 : if( FD_UNLIKELY( (c<'0') | (c>'9') ) ) return 0UL;
175 0 : ulong digit = (ulong)(c-'0');
176 0 : if( FD_UNLIKELY( val>(ULONG_MAX-digit)/10UL ) ) return 0UL;
177 0 : val = val*10UL + digit;
178 0 : }
179 :
180 0 : *out = val;
181 0 : return 1UL;
182 0 : }
183 :
184 : static ulong
185 : rpc_phr_content_length( struct phr_header * headers,
186 0 : ulong num_headers ) {
187 0 : for( ulong i=0UL; i<num_headers; i++ ) {
188 0 : if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
189 0 : if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
190 0 : ulong content_length = 0UL;
191 0 : if( FD_UNLIKELY( !rpc_parse_decimal_ulong( headers[i].value, (ulong)headers[i].value_len, &content_length ) ) ) return ULONG_MAX;
192 0 : if( FD_UNLIKELY( content_length>UINT_MAX ) ) return ULONG_MAX; /* prevent overflow */
193 0 : return content_length;
194 0 : }
195 0 : return ULONG_MAX;
196 0 : }
197 :
198 : static int
199 : read_conn( fd_genesis_client_t * client,
200 : ulong conn_idx,
201 : uchar ** buffer,
202 0 : ulong * buffer_sz ) {
203 0 : fd_genesis_client_peer_t * peer = &client->peers[ conn_idx ];
204 :
205 0 : if( FD_UNLIKELY( peer->writing ) ) return 1;
206 0 : long read = recvfrom( client->pollfds[ conn_idx ].fd,
207 0 : peer->response+peer->response_bytes_read,
208 0 : sizeof(peer->response)-peer->response_bytes_read,
209 0 : 0,
210 0 : NULL,
211 0 : NULL );
212 0 : if( FD_UNLIKELY( -1==read && (errno==EAGAIN || errno==EINTR) ) ) return 1;
213 0 : else if( FD_UNLIKELY( -1==read ) ) {
214 0 : close_one( client, conn_idx );
215 0 : return 1;
216 0 : }
217 :
218 0 : peer->response_bytes_read += (ulong)read;
219 :
220 0 : int minor_version;
221 0 : int status;
222 0 : const char * message;
223 0 : ulong message_len;
224 0 : struct phr_header headers[ 32 ];
225 0 : ulong num_headers = 32UL;
226 0 : int len = phr_parse_response( (char*)peer->response, peer->response_bytes_read,
227 0 : &minor_version, &status, &message, &message_len,
228 0 : headers, &num_headers, 0L );
229 0 : if( FD_UNLIKELY( -1==len ) ) {
230 0 : close_one( client, conn_idx );
231 0 : return 1;
232 0 : } else if( FD_UNLIKELY( -2==len ) ) {
233 0 : return 1;
234 0 : }
235 :
236 0 : if( FD_UNLIKELY( status!=200 ) ) {
237 0 : close_one( client, conn_idx );
238 0 : return 1;
239 0 : }
240 :
241 0 : ulong content_length = rpc_phr_content_length( headers, num_headers );
242 0 : if( FD_UNLIKELY( content_length==ULONG_MAX ) ) {
243 0 : close_one( client, conn_idx );
244 0 : return 1;
245 0 : }
246 0 : if( FD_UNLIKELY( content_length+(ulong)len>sizeof(peer->response) ) ) {
247 0 : close_one( client, conn_idx );
248 0 : return 1;
249 0 : }
250 0 : if( FD_LIKELY( content_length+(ulong)len>peer->response_bytes_read ) ) {
251 0 : return 1;
252 0 : }
253 :
254 0 : *buffer_sz = content_length;
255 0 : *buffer = peer->response + (ulong)len;
256 :
257 0 : uchar hash[ 32UL ] = {0};
258 0 : fd_sha256_hash( *buffer, *buffer_sz, hash );
259 :
260 0 : return 0;
261 0 : }
262 :
263 : int
264 : fd_genesis_client_poll( fd_genesis_client_t * client,
265 : fd_ip4_port_t * peer,
266 : uchar ** buffer,
267 : ulong * buffer_sz,
268 0 : int * charge_busy ) {
269 0 : if( FD_UNLIKELY( !client->remaining_peer_cnt ) ) return -1;
270 0 : if( FD_UNLIKELY( fd_log_wallclock()-client->start_time_nanos>20L*1000L*1000*1000L ) ) {
271 0 : close_all( client );
272 0 : return -1;
273 0 : }
274 :
275 0 : int nfds = fd_syscall_poll( client->pollfds, (uint)client->peer_cnt, 0 );
276 0 : if( FD_UNLIKELY( 0==nfds ) ) return 1;
277 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 1;
278 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll() failed (%i-%s)", errno, strerror( errno ) ));
279 :
280 0 : *charge_busy = 1;
281 :
282 0 : for( ulong i=0UL; i<FD_TOPO_GOSSIP_ENTRYPOINTS_MAX; i++ ) {
283 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
284 :
285 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLOUT ) ) write_conn( client, i );
286 0 : if( FD_UNLIKELY( -1==client->pollfds[ i ].fd ) ) continue;
287 0 : if( FD_LIKELY( client->pollfds[ i ].revents & POLLIN ) ) {
288 0 : if( FD_LIKELY( !read_conn( client, i, buffer, buffer_sz ) ) ) {
289 0 : close_all( client );
290 0 : *peer = client->peers[ i ].addr;
291 0 : return 0;
292 0 : }
293 0 : }
294 0 : }
295 :
296 0 : return 1;
297 0 : }
298 :
299 : struct pollfd const *
300 0 : fd_genesis_client_get_pollfds( fd_genesis_client_t * client ) {
301 0 : return client->pollfds;
302 0 : }
|