Line data Source code
1 : #include "fd_rpc_client.h"
2 : #include "fd_rpc_client_private.h"
3 :
4 : #include "../../../waltz/http/picohttpparser.h"
5 : #include "../../../ballet/json/cJSON.h"
6 : #include "../../../ballet/base58/fd_base58.h"
7 :
8 : #include <errno.h>
9 : #include <stdio.h>
10 : #include <stdlib.h>
11 : #include <unistd.h>
12 : #include <strings.h>
13 : #include <sys/socket.h>
14 : #include <sys/types.h>
15 : #include <netinet/ip.h>
16 :
17 : #define MAX_REQUEST_LEN (1024UL)
18 :
19 : void *
20 : fd_rpc_client_new( void * mem,
21 : uint rpc_addr,
22 0 : ushort rpc_port ) {
23 0 : fd_rpc_client_t * rpc = (fd_rpc_client_t *)mem;
24 0 : rpc->request_id = 0UL;
25 0 : rpc->rpc_addr = rpc_addr;
26 0 : rpc->rpc_port = rpc_port;
27 0 : for( ulong i=0; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
28 0 : rpc->requests[ i ].state = FD_RPC_CLIENT_STATE_NONE;
29 0 : rpc->fds[ i ].fd = -1;
30 0 : rpc->fds[ i ].events = POLLIN | POLLOUT;
31 0 : rpc->fds[ i ].revents = 0;
32 0 : }
33 0 : return (void *)rpc;
34 0 : }
35 :
36 : long
37 : fd_rpc_client_wait_ready( fd_rpc_client_t * rpc,
38 0 : long timeout_ns ) {
39 :
40 :
41 0 : struct sockaddr_in addr = {
42 0 : .sin_family = AF_INET,
43 0 : .sin_port = fd_ushort_bswap( rpc->rpc_port ),
44 0 : .sin_addr = { .s_addr = rpc->rpc_addr }
45 0 : };
46 :
47 0 : struct pollfd pfd = {
48 0 : .fd = 0,
49 0 : .events = POLLOUT,
50 0 : .revents = 0
51 0 : };
52 :
53 0 : long start = fd_log_wallclock();
54 0 : for(;;) {
55 0 : pfd.fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
56 0 : if( FD_UNLIKELY( pfd.fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
57 :
58 0 : if( FD_UNLIKELY( -1==connect( pfd.fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
59 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
60 0 : return FD_RPC_CLIENT_ERR_NETWORK;
61 0 : }
62 :
63 0 : for(;;) {
64 0 : long now = fd_log_wallclock();
65 0 : if( FD_UNLIKELY( now-start>=timeout_ns ) ) {
66 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
67 0 : return FD_RPC_CLIENT_ERR_NETWORK;
68 0 : }
69 :
70 0 : int nfds = poll( &pfd, 1, (int)((now-start) / 1000000) );
71 0 : if( FD_UNLIKELY( 0==nfds ) ) continue;
72 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) continue;
73 0 : else if( FD_UNLIKELY( -1==nfds ) ) {
74 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
75 0 : return FD_RPC_CLIENT_ERR_NETWORK;
76 0 : } else if( FD_LIKELY( pfd.revents & (POLLERR | POLLHUP) ) ) {
77 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
78 0 : break;
79 0 : } else if( FD_LIKELY( pfd.revents & POLLOUT ) ) {
80 0 : if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
81 0 : return FD_RPC_CLIENT_SUCCESS;
82 0 : }
83 0 : }
84 0 : }
85 0 : }
86 :
87 : static ulong
88 0 : fd_rpc_available_slot( fd_rpc_client_t * rpc ) {
89 0 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
90 0 : if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) return i;
91 0 : }
92 0 : return ULONG_MAX;
93 0 : }
94 :
95 : static ulong
96 : fd_rpc_find_request( fd_rpc_client_t * rpc,
97 0 : long request_id ) {
98 0 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
99 0 : if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) continue;
100 0 : if( FD_LIKELY( rpc->requests[i].response.request_id!=request_id ) ) continue;
101 0 : return i;
102 0 : }
103 0 : return ULONG_MAX;
104 0 : }
105 :
106 : static long
107 : fd_rpc_client_request( fd_rpc_client_t * rpc,
108 : ulong method,
109 : long request_id,
110 : char * contents,
111 0 : int contents_len ) {
112 0 : ulong idx = fd_rpc_available_slot( rpc );
113 0 : if( FD_UNLIKELY( idx==ULONG_MAX) ) return FD_RPC_CLIENT_ERR_TOO_MANY;
114 :
115 0 : struct fd_rpc_client_request * request = &rpc->requests[ idx ];
116 :
117 0 : if( FD_UNLIKELY( contents_len<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
118 0 : if( FD_UNLIKELY( (ulong)contents_len>=MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
119 :
120 0 : int printed = snprintf( request->connected.request_bytes, sizeof(request->connected.request_bytes),
121 0 : "POST / HTTP/1.1\r\n"
122 0 : "Host: localhost:12001\r\n"
123 0 : "Content-Length: %d\r\n"
124 0 : "Content-Type: application/json\r\n\r\n"
125 0 : "%s", contents_len, contents );
126 0 : if( FD_UNLIKELY( printed<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
127 0 : if( FD_UNLIKELY( (ulong)printed>=sizeof(request->connected.request_bytes) ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
128 0 : request->connected.request_bytes_cnt = (ulong)printed;
129 :
130 0 : int fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
131 0 : if( FD_UNLIKELY( fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
132 :
133 0 : struct sockaddr_in addr = {
134 0 : .sin_family = AF_INET,
135 0 : .sin_port = fd_ushort_bswap( rpc->rpc_port ),
136 0 : .sin_addr = { .s_addr = rpc->rpc_addr }
137 0 : };
138 :
139 0 : if( FD_UNLIKELY( -1==connect( fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
140 0 : if( FD_UNLIKELY( close( fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
141 0 : return FD_RPC_CLIENT_ERR_NETWORK;
142 0 : }
143 :
144 0 : rpc->request_id = request_id;
145 0 : rpc->fds[ idx ].fd = fd;
146 0 : request->response.method = method;
147 0 : request->response.status = FD_RPC_CLIENT_PENDING;
148 0 : request->response.request_id = rpc->request_id;
149 0 : request->connected.request_bytes_sent = 0UL;
150 0 : request->state = FD_RPC_CLIENT_STATE_CONNECTED;
151 0 : return request->response.request_id;
152 0 : }
153 :
154 : long
155 0 : fd_rpc_client_request_latest_block_hash( fd_rpc_client_t * rpc ) {
156 0 : char contents[ MAX_REQUEST_LEN ];
157 0 : long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
158 :
159 0 : int contents_len = snprintf( contents, sizeof(contents),
160 0 : "{\"jsonrpc\":\"2.0\",\"id\":%ld,\"method\":\"getLatestBlockhash\",\"params\":[ { \"commitment\": \"processed\" }]}",
161 0 : request_id );
162 :
163 0 : return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH, request_id, contents, contents_len );
164 0 : }
165 :
166 : long
167 0 : fd_rpc_client_request_transaction_count( fd_rpc_client_t * rpc ) {
168 0 : char contents[ MAX_REQUEST_LEN ];
169 0 : long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
170 :
171 0 : int contents_len = snprintf( contents, sizeof(contents),
172 0 : "{\"jsonrpc\":\"2.0\",\"id\":%ld,\"method\":\"getTransactionCount\",\"params\":[ { \"commitment\": \"processed\" } ]}",
173 0 : request_id );
174 :
175 0 : return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT, request_id, contents, contents_len );
176 0 : }
177 :
178 : static void
179 : fd_rpc_mark_error( fd_rpc_client_t * rpc,
180 : ulong idx,
181 0 : long error ) {
182 0 : if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
183 0 : if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
184 0 : rpc->fds[ idx ].fd = -1;
185 0 : }
186 0 : rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_FINISHED;
187 0 : rpc->requests[ idx ].response.status = error;
188 0 : }
189 :
190 : static ulong
191 : fd_rpc_phr_content_length( struct phr_header * headers,
192 0 : ulong num_headers ) {
193 0 : for( ulong i=0UL; i<num_headers; i++ ) {
194 0 : if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
195 0 : if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
196 0 : char * end;
197 0 : ulong content_length = strtoul( headers[i].value, &end, 10 );
198 0 : if( FD_UNLIKELY( content_length>UINT_MAX ) ) return ULONG_MAX; /* prevent overflow */
199 0 : if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
200 0 : return content_length;
201 0 : }
202 0 : return ULONG_MAX;
203 0 : }
204 :
205 : static long
206 : parse_response( char * response,
207 : ulong response_len,
208 : ulong last_response_len,
209 0 : fd_rpc_client_response_t * result ) {
210 0 : int minor_version;
211 0 : int status;
212 0 : const char * message;
213 0 : ulong message_len;
214 0 : struct phr_header headers[ 32 ];
215 0 : ulong num_headers = 32UL;
216 0 : int http_len = phr_parse_response( response, response_len,
217 0 : &minor_version, &status, &message, &message_len,
218 0 : headers, &num_headers, last_response_len );
219 0 : if( FD_UNLIKELY( -2==http_len ) ) return FD_RPC_CLIENT_PENDING;
220 0 : else if( FD_UNLIKELY( -1==http_len ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
221 :
222 0 : if( FD_UNLIKELY( status!=200 ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
223 :
224 0 : ulong content_length = fd_rpc_phr_content_length( headers, num_headers );
225 0 : if( FD_UNLIKELY( content_length==ULONG_MAX ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
226 0 : if( FD_UNLIKELY( content_length+(ulong)http_len > MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
227 0 : if( FD_LIKELY( content_length+(ulong)http_len>response_len ) ) return FD_RPC_CLIENT_PENDING;
228 :
229 0 : const char * parse_end;
230 0 : cJSON * json = cJSON_ParseWithLengthOpts( response + http_len, content_length, &parse_end, 0 );
231 0 : if( FD_UNLIKELY( !json ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
232 :
233 0 : switch( result->method ) {
234 0 : case FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT: {
235 0 : const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
236 0 : if( FD_UNLIKELY( !cJSON_IsNumber( node ) || node->valueulong==ULONG_MAX ) ) {
237 0 : cJSON_Delete( json );
238 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
239 0 : }
240 :
241 0 : result->result.transaction_count.transaction_count = node->valueulong;
242 0 : cJSON_Delete( json );
243 0 : return FD_RPC_CLIENT_SUCCESS;
244 0 : }
245 0 : case FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH: {
246 0 : const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
247 0 : if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
248 0 : cJSON_Delete( json );
249 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
250 0 : }
251 :
252 0 : node = cJSON_GetObjectItemCaseSensitive( node, "value" );
253 0 : if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
254 0 : cJSON_Delete( json );
255 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
256 0 : }
257 :
258 0 : node = cJSON_GetObjectItemCaseSensitive( node, "blockhash" );
259 0 : if( FD_UNLIKELY( !cJSON_IsString( node ) ) ) {
260 0 : cJSON_Delete( json );
261 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
262 0 : }
263 :
264 0 : if( FD_UNLIKELY( strnlen( node->valuestring, 45UL )>44UL ) ) {
265 0 : cJSON_Delete( json );
266 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
267 0 : }
268 :
269 0 : if( FD_UNLIKELY( !fd_base58_decode_32( node->valuestring, result->result.latest_block_hash.block_hash ) ) ) {
270 0 : cJSON_Delete( json );
271 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
272 0 : }
273 :
274 0 : cJSON_Delete( json );
275 0 : return FD_RPC_CLIENT_SUCCESS;
276 0 : }
277 0 : default:
278 0 : FD_TEST( 0 );
279 0 : }
280 0 : return FD_RPC_CLIENT_ERR_MALFORMED;
281 0 : }
282 :
283 : int
284 : fd_rpc_client_service( fd_rpc_client_t * rpc,
285 0 : int wait ) {
286 0 : int timeout = wait ? -1 : 0;
287 0 : int nfds = poll( rpc->fds, FD_RPC_CLIENT_REQUEST_CNT, timeout );
288 0 : if( FD_UNLIKELY( 0==nfds ) ) return 0;
289 0 : else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 0;
290 0 : else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
291 :
292 0 : for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
293 0 : struct fd_rpc_client_request * request = &rpc->requests[i];
294 :
295 0 : if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_CONNECTED && ( rpc->fds[ i ].revents & POLLOUT ) ) ) {
296 0 : long sent = send( rpc->fds[ i ].fd, request->connected.request_bytes+request->connected.request_bytes_sent,
297 0 : request->connected.request_bytes_cnt-request->connected.request_bytes_sent, MSG_NOSIGNAL );
298 0 : if( FD_UNLIKELY( -1==sent && errno==EAGAIN ) ) continue;
299 0 : if( FD_UNLIKELY( -1==sent ) ) {
300 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
301 0 : continue;
302 0 : }
303 :
304 0 : request->connected.request_bytes_sent += (ulong)sent;
305 0 : if( FD_UNLIKELY( request->connected.request_bytes_sent==request->connected.request_bytes_cnt ) ) {
306 0 : request->sent.response_bytes_read = 0UL;
307 0 : request->state = FD_RPC_CLIENT_STATE_SENT;
308 0 : }
309 0 : }
310 :
311 0 : if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_SENT && ( rpc->fds[ i ].revents & POLLIN ) ) ) {
312 0 : long read = recv( rpc->fds[ i ].fd, request->response_bytes+request->sent.response_bytes_read,
313 0 : sizeof(request->response_bytes)-request->sent.response_bytes_read, 0 );
314 0 : if( FD_UNLIKELY( -1==read && errno==EAGAIN ) ) continue;
315 0 : else if( FD_UNLIKELY( -1==read ) ) {
316 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
317 0 : continue;
318 0 : }
319 :
320 0 : request->sent.response_bytes_read += (ulong)read;
321 0 : if( FD_UNLIKELY( request->sent.response_bytes_read==sizeof(request->response_bytes) ) ) {
322 0 : fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_TOO_LARGE );
323 0 : continue;
324 0 : }
325 :
326 0 : fd_rpc_client_response_t * response = &request->response;
327 0 : long status = parse_response( request->response_bytes,
328 0 : request->sent.response_bytes_read,
329 0 : request->sent.response_bytes_read-(ulong)read,
330 0 : response );
331 0 : if( FD_LIKELY( status==FD_RPC_CLIENT_PENDING ) ) continue;
332 0 : else if( FD_UNLIKELY( status==FD_RPC_CLIENT_SUCCESS ) ) {
333 0 : if( FD_UNLIKELY( close( rpc->fds[ i ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
334 0 : rpc->fds[ i ].fd = -1;
335 0 : response->status = FD_RPC_CLIENT_SUCCESS;
336 0 : request->state = FD_RPC_CLIENT_STATE_FINISHED;
337 0 : continue;
338 0 : } else {
339 0 : fd_rpc_mark_error( rpc, i, status );
340 0 : continue;
341 0 : }
342 0 : }
343 0 : }
344 :
345 0 : return 1;
346 0 : }
347 :
348 : fd_rpc_client_response_t *
349 : fd_rpc_client_status( fd_rpc_client_t * rpc,
350 : long request_id,
351 0 : int wait ) {
352 0 : ulong idx = fd_rpc_find_request( rpc, request_id );
353 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return NULL;
354 :
355 0 : if( FD_LIKELY( !wait ) ) return &rpc->requests[ idx ].response;
356 :
357 0 : for(;;) {
358 0 : if( FD_LIKELY( rpc->requests[ idx ].state==FD_RPC_CLIENT_STATE_FINISHED ) ) return &rpc->requests[ idx ].response;
359 0 : fd_rpc_client_service( rpc, 1 );
360 0 : }
361 0 : }
362 :
363 : void
364 : fd_rpc_client_close( fd_rpc_client_t * rpc,
365 0 : long request_id ) {
366 0 : ulong idx = fd_rpc_find_request( rpc, request_id );
367 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return;
368 :
369 0 : if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
370 0 : if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
371 0 : rpc->fds[ idx ].fd = -1;
372 0 : }
373 0 : rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_NONE;
374 0 : }
|