LCOV - code coverage report
Current view: top level - app/shared_dev/rpc_client - fd_rpc_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 275 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 13 0.0 %

          Line data    Source code
       1             : #include "fd_rpc_client.h"
       2             : #include "fd_rpc_client_private.h"
       3             : 
       4             : #include "../../../waltz/http/picohttpparser.h"
       5             : #include "../../../ballet/json/cJSON.h"
       6             : #include "../../../ballet/base58/fd_base58.h"
       7             : 
       8             : #include <errno.h>
       9             : #include <stdio.h>
      10             : #include <stdlib.h>
      11             : #include <unistd.h>
      12             : #include <strings.h>
      13             : #include <sys/socket.h>
      14             : #include <sys/types.h>
      15             : #include <netinet/ip.h>
      16             : 
      17             : #define MAX_REQUEST_LEN (1024UL)
      18             : 
      19             : void *
      20             : fd_rpc_client_new( void * mem,
      21             :                    uint   rpc_addr,
      22           0 :                    ushort rpc_port ) {
      23           0 :   fd_rpc_client_t * rpc = (fd_rpc_client_t *)mem;
      24           0 :   rpc->request_id = 0UL;
      25           0 :   rpc->rpc_addr = rpc_addr;
      26           0 :   rpc->rpc_port = rpc_port;
      27           0 :   for( ulong i=0; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      28           0 :     rpc->requests[ i ].state = FD_RPC_CLIENT_STATE_NONE;
      29           0 :     rpc->fds[ i ].fd = -1;
      30           0 :     rpc->fds[ i ].events = POLLIN | POLLOUT;
      31           0 :     rpc->fds[ i ].revents = 0;
      32           0 :   }
      33           0 :   return (void *)rpc;
      34           0 : }
      35             : 
      36             : long
      37             : fd_rpc_client_wait_ready( fd_rpc_client_t * rpc,
      38           0 :                           long              timeout_ns ) {
      39             : 
      40             : 
      41           0 :   struct sockaddr_in addr = {
      42           0 :     .sin_family = AF_INET,
      43           0 :     .sin_port   = fd_ushort_bswap( rpc->rpc_port ),
      44           0 :     .sin_addr   = { .s_addr = rpc->rpc_addr }
      45           0 :   };
      46             : 
      47           0 :   struct pollfd pfd = {
      48           0 :     .fd = 0,
      49           0 :     .events = POLLOUT,
      50           0 :     .revents = 0
      51           0 :   };
      52             : 
      53           0 :   long start = fd_log_wallclock();
      54           0 :   for(;;) {
      55           0 :     pfd.fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
      56           0 :     if( FD_UNLIKELY( pfd.fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
      57             : 
      58           0 :     if( FD_UNLIKELY( -1==connect( pfd.fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
      59           0 :       if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      60           0 :       return FD_RPC_CLIENT_ERR_NETWORK;
      61           0 :     }
      62             : 
      63           0 :     for(;;) {
      64           0 :       long now = fd_log_wallclock();
      65           0 :       if( FD_UNLIKELY( now-start>=timeout_ns ) ) {
      66           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      67           0 :         return FD_RPC_CLIENT_ERR_NETWORK;
      68           0 :       }
      69             : 
      70           0 :       int nfds = poll( &pfd, 1, (int)((now-start) / 1000000) );
      71           0 :       if( FD_UNLIKELY( 0==nfds ) ) continue;
      72           0 :       else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) continue;
      73           0 :       else if( FD_UNLIKELY( -1==nfds ) ) {
      74           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      75           0 :         return FD_RPC_CLIENT_ERR_NETWORK;
      76           0 :       } else if( FD_LIKELY( pfd.revents & (POLLERR | POLLHUP) ) ) {
      77           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      78           0 :         break;
      79           0 :       } else if( FD_LIKELY( pfd.revents & POLLOUT ) ) {
      80           0 :         if( FD_UNLIKELY( close( pfd.fd )<0 ) ) FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      81           0 :         return FD_RPC_CLIENT_SUCCESS;
      82           0 :       }
      83           0 :     }
      84           0 :   }
      85           0 : }
      86             : 
      87             : static ulong
      88           0 : fd_rpc_available_slot( fd_rpc_client_t * rpc ) {
      89           0 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      90           0 :     if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) return i;
      91           0 :   }
      92           0 :   return ULONG_MAX;
      93           0 : }
      94             : 
      95             : static ulong
      96             : fd_rpc_find_request( fd_rpc_client_t * rpc,
      97           0 :                      long              request_id ) {
      98           0 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
      99           0 :     if( FD_LIKELY( rpc->requests[i].state==FD_RPC_CLIENT_STATE_NONE ) ) continue;
     100           0 :     if( FD_LIKELY( rpc->requests[i].response.request_id!=request_id ) ) continue;
     101           0 :     return i;
     102           0 :   }
     103           0 :   return ULONG_MAX;
     104           0 : }
     105             : 
     106             : static long
     107             : fd_rpc_client_request( fd_rpc_client_t * rpc,
     108             :                        ulong             method,
     109             :                        long              request_id,
     110             :                        char *            contents,
     111           0 :                        int               contents_len ) {
     112           0 :   ulong idx = fd_rpc_available_slot( rpc );
     113           0 :   if( FD_UNLIKELY( idx==ULONG_MAX) ) return FD_RPC_CLIENT_ERR_TOO_MANY;
     114             : 
     115           0 :   struct fd_rpc_client_request * request = &rpc->requests[ idx ];
     116             : 
     117           0 :   if( FD_UNLIKELY( contents_len<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     118           0 :   if( FD_UNLIKELY( (ulong)contents_len>=MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     119             : 
     120           0 :   int printed = snprintf( request->connected.request_bytes, sizeof(request->connected.request_bytes),
     121           0 :                           "POST / HTTP/1.1\r\n"
     122           0 :                           "Host: localhost:12001\r\n"
     123           0 :                           "Content-Length: %d\r\n"
     124           0 :                           "Content-Type: application/json\r\n\r\n"
     125           0 :                           "%s", contents_len, contents );
     126           0 :   if( FD_UNLIKELY( printed<0 ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     127           0 :   if( FD_UNLIKELY( (ulong)printed>=sizeof(request->connected.request_bytes) ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     128           0 :   request->connected.request_bytes_cnt = (ulong)printed;
     129             : 
     130           0 :   int fd = socket( AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0 );
     131           0 :   if( FD_UNLIKELY( fd<0 ) ) return FD_RPC_CLIENT_ERR_NETWORK;
     132             : 
     133           0 :   struct sockaddr_in addr = {
     134           0 :     .sin_family = AF_INET,
     135           0 :     .sin_port   = fd_ushort_bswap( rpc->rpc_port ),
     136           0 :     .sin_addr   = { .s_addr = rpc->rpc_addr }
     137           0 :   };
     138             : 
     139           0 :   if( FD_UNLIKELY( -1==connect( fd, fd_type_pun( &addr ), sizeof(addr) ) && errno!=EINPROGRESS ) ) {
     140           0 :     if( FD_UNLIKELY( close( fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     141           0 :     return FD_RPC_CLIENT_ERR_NETWORK;
     142           0 :   }
     143             : 
     144           0 :   rpc->request_id = request_id;
     145           0 :   rpc->fds[ idx ].fd = fd;
     146           0 :   request->response.method = method;
     147           0 :   request->response.status = FD_RPC_CLIENT_PENDING;
     148           0 :   request->response.request_id = rpc->request_id;
     149           0 :   request->connected.request_bytes_sent = 0UL;
     150           0 :   request->state = FD_RPC_CLIENT_STATE_CONNECTED;
     151           0 :   return request->response.request_id;
     152           0 : }
     153             : 
     154             : long
     155           0 : fd_rpc_client_request_latest_block_hash( fd_rpc_client_t * rpc ) {
     156           0 :   char contents[ MAX_REQUEST_LEN ];
     157           0 :   long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
     158             : 
     159           0 :   int contents_len = snprintf( contents, sizeof(contents),
     160           0 :                                "{\"jsonrpc\":\"2.0\",\"id\":%ld,\"method\":\"getLatestBlockhash\",\"params\":[ { \"commitment\": \"processed\" }]}",
     161           0 :                                request_id );
     162             : 
     163           0 :   return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH, request_id, contents, contents_len );
     164           0 : }
     165             : 
     166             : long
     167           0 : fd_rpc_client_request_transaction_count( fd_rpc_client_t * rpc ) {
     168           0 :   char contents[ MAX_REQUEST_LEN ];
     169           0 :   long request_id = fd_long_if( rpc->request_id==LONG_MAX, 0L, rpc->request_id+1L );
     170             : 
     171           0 :   int contents_len = snprintf( contents, sizeof(contents),
     172           0 :                                "{\"jsonrpc\":\"2.0\",\"id\":%ld,\"method\":\"getTransactionCount\",\"params\":[ { \"commitment\": \"processed\" } ]}",
     173           0 :                                request_id );
     174             : 
     175           0 :   return fd_rpc_client_request( rpc, FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT, request_id, contents, contents_len );
     176           0 : }
     177             : 
     178             : static void
     179             : fd_rpc_mark_error( fd_rpc_client_t * rpc,
     180             :                    ulong             idx,
     181           0 :                    long              error ) {
     182           0 :   if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
     183           0 :     if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     184           0 :     rpc->fds[ idx ].fd = -1;
     185           0 :   }
     186           0 :   rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_FINISHED;
     187           0 :   rpc->requests[ idx ].response.status = error;
     188           0 : }
     189             : 
     190             : static ulong
     191             : fd_rpc_phr_content_length( struct phr_header * headers,
     192           0 :                            ulong               num_headers ) {
     193           0 :   for( ulong i=0UL; i<num_headers; i++ ) {
     194           0 :     if( FD_LIKELY( headers[i].name_len!=14UL ) ) continue;
     195           0 :     if( FD_LIKELY( strncasecmp( headers[i].name, "Content-Length", 14UL ) ) ) continue;
     196           0 :     char * end;
     197           0 :     ulong content_length = strtoul( headers[i].value, &end, 10 );
     198           0 :     if( FD_UNLIKELY( content_length>UINT_MAX ) ) return ULONG_MAX; /* prevent overflow */
     199           0 :     if( FD_UNLIKELY( end==headers[i].value ) ) return ULONG_MAX;
     200           0 :     return content_length;
     201           0 :   }
     202           0 :   return ULONG_MAX;
     203           0 : }
     204             : 
     205             : static long
     206             : parse_response( char *                     response,
     207             :                 ulong                      response_len,
     208             :                 ulong                      last_response_len,
     209           0 :                 fd_rpc_client_response_t * result ) {
     210           0 :   int minor_version;
     211           0 :   int status;
     212           0 :   const char * message;
     213           0 :   ulong message_len;
     214           0 :   struct phr_header headers[ 32 ];
     215           0 :   ulong num_headers = 32UL;
     216           0 :   int http_len = phr_parse_response( response, response_len,
     217           0 :                                      &minor_version, &status, &message, &message_len,
     218           0 :                                      headers, &num_headers, last_response_len );
     219           0 :   if( FD_UNLIKELY( -2==http_len ) ) return FD_RPC_CLIENT_PENDING;
     220           0 :   else if( FD_UNLIKELY( -1==http_len ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     221             : 
     222           0 :   if( FD_UNLIKELY( status!=200 ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     223             : 
     224           0 :   ulong content_length = fd_rpc_phr_content_length( headers, num_headers );
     225           0 :   if( FD_UNLIKELY( content_length==ULONG_MAX ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     226           0 :   if( FD_UNLIKELY( content_length+(ulong)http_len > MAX_REQUEST_LEN ) ) return FD_RPC_CLIENT_ERR_TOO_LARGE;
     227           0 :   if( FD_LIKELY( content_length+(ulong)http_len>response_len ) ) return FD_RPC_CLIENT_PENDING;
     228             : 
     229           0 :   const char * parse_end;
     230           0 :   cJSON * json = cJSON_ParseWithLengthOpts( response + http_len, content_length, &parse_end, 0 );
     231           0 :   if( FD_UNLIKELY( !json ) ) return FD_RPC_CLIENT_ERR_MALFORMED;
     232             : 
     233           0 :   switch( result->method ) {
     234           0 :     case FD_RPC_CLIENT_METHOD_TRANSACTION_COUNT: {
     235           0 :       const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
     236           0 :       if( FD_UNLIKELY( !cJSON_IsNumber( node ) || node->valueulong==ULONG_MAX ) ) {
     237           0 :         cJSON_Delete( json );
     238           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     239           0 :       }
     240             : 
     241           0 :       result->result.transaction_count.transaction_count = node->valueulong;
     242           0 :       cJSON_Delete( json );
     243           0 :       return FD_RPC_CLIENT_SUCCESS;
     244           0 :     }
     245           0 :     case FD_RPC_CLIENT_METHOD_LATEST_BLOCK_HASH: {
     246           0 :       const cJSON * node = cJSON_GetObjectItemCaseSensitive( json, "result" );
     247           0 :       if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
     248           0 :         cJSON_Delete( json );
     249           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     250           0 :       }
     251             : 
     252           0 :       node = cJSON_GetObjectItemCaseSensitive( node, "value" );
     253           0 :       if( FD_UNLIKELY( !cJSON_IsObject( node ) ) ) {
     254           0 :         cJSON_Delete( json );
     255           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     256           0 :       }
     257             : 
     258           0 :       node = cJSON_GetObjectItemCaseSensitive( node, "blockhash" );
     259           0 :       if( FD_UNLIKELY( !cJSON_IsString( node ) ) ) {
     260           0 :         cJSON_Delete( json );
     261           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     262           0 :       }
     263             : 
     264           0 :       if( FD_UNLIKELY( strnlen( node->valuestring, 45UL )>44UL ) ) {
     265           0 :         cJSON_Delete( json );
     266           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     267           0 :       }
     268             : 
     269           0 :       if( FD_UNLIKELY( !fd_base58_decode_32( node->valuestring, result->result.latest_block_hash.block_hash ) ) ) {
     270           0 :         cJSON_Delete( json );
     271           0 :         return FD_RPC_CLIENT_ERR_MALFORMED;
     272           0 :       }
     273             : 
     274           0 :       cJSON_Delete( json );
     275           0 :       return FD_RPC_CLIENT_SUCCESS;
     276           0 :     }
     277           0 :     default:
     278           0 :       FD_TEST( 0 );
     279           0 :   }
     280           0 :   return FD_RPC_CLIENT_ERR_MALFORMED;
     281           0 : }
     282             : 
     283             : int
     284             : fd_rpc_client_service( fd_rpc_client_t * rpc,
     285           0 :                        int               wait ) {
     286           0 :   int timeout = wait ? -1 : 0;
     287           0 :   int nfds = poll( rpc->fds, FD_RPC_CLIENT_REQUEST_CNT, timeout );
     288           0 :   if( FD_UNLIKELY( 0==nfds ) ) return 0;
     289           0 :   else if( FD_UNLIKELY( -1==nfds && errno==EINTR ) ) return 0;
     290           0 :   else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
     291             : 
     292           0 :   for( ulong i=0UL; i<FD_RPC_CLIENT_REQUEST_CNT; i++ ) {
     293           0 :     struct fd_rpc_client_request * request = &rpc->requests[i];
     294             : 
     295           0 :     if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_CONNECTED && ( rpc->fds[ i ].revents & POLLOUT ) ) ) {
     296           0 :       long sent = send( rpc->fds[ i ].fd, request->connected.request_bytes+request->connected.request_bytes_sent,
     297           0 :                         request->connected.request_bytes_cnt-request->connected.request_bytes_sent, MSG_NOSIGNAL );
     298           0 :       if( FD_UNLIKELY( -1==sent && errno==EAGAIN ) ) continue;
     299           0 :       if( FD_UNLIKELY( -1==sent ) ) {
     300           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
     301           0 :         continue;
     302           0 :       }
     303             : 
     304           0 :       request->connected.request_bytes_sent += (ulong)sent;
     305           0 :       if( FD_UNLIKELY( request->connected.request_bytes_sent==request->connected.request_bytes_cnt ) ) {
     306           0 :         request->sent.response_bytes_read = 0UL;
     307           0 :         request->state = FD_RPC_CLIENT_STATE_SENT;
     308           0 :       }
     309           0 :     }
     310             : 
     311           0 :     if( FD_LIKELY( request->state==FD_RPC_CLIENT_STATE_SENT && ( rpc->fds[ i ].revents & POLLIN ) ) ) {
     312           0 :       long read = recv( rpc->fds[ i ].fd, request->response_bytes+request->sent.response_bytes_read,
     313           0 :                         sizeof(request->response_bytes)-request->sent.response_bytes_read, 0 );
     314           0 :       if( FD_UNLIKELY( -1==read && errno==EAGAIN ) ) continue;
     315           0 :       else if( FD_UNLIKELY( -1==read ) ) {
     316           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_NETWORK );
     317           0 :         continue;
     318           0 :       }
     319             : 
     320           0 :       request->sent.response_bytes_read += (ulong)read;
     321           0 :       if( FD_UNLIKELY( request->sent.response_bytes_read==sizeof(request->response_bytes) ) ) {
     322           0 :         fd_rpc_mark_error( rpc, i, FD_RPC_CLIENT_ERR_TOO_LARGE );
     323           0 :         continue;
     324           0 :       }
     325             : 
     326           0 :       fd_rpc_client_response_t * response = &request->response;
     327           0 :       long status = parse_response( request->response_bytes,
     328           0 :                                     request->sent.response_bytes_read,
     329           0 :                                     request->sent.response_bytes_read-(ulong)read,
     330           0 :                                     response );
     331           0 :       if( FD_LIKELY( status==FD_RPC_CLIENT_PENDING ) ) continue;
     332           0 :       else if( FD_UNLIKELY( status==FD_RPC_CLIENT_SUCCESS ) ) {
     333           0 :         if( FD_UNLIKELY( close( rpc->fds[ i ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     334           0 :         rpc->fds[ i ].fd = -1;
     335           0 :         response->status = FD_RPC_CLIENT_SUCCESS;
     336           0 :         request->state = FD_RPC_CLIENT_STATE_FINISHED;
     337           0 :         continue;
     338           0 :       } else {
     339           0 :         fd_rpc_mark_error( rpc, i, status );
     340           0 :         continue;
     341           0 :       }
     342           0 :     }
     343           0 :   }
     344             : 
     345           0 :   return 1;
     346           0 : }
     347             : 
     348             : fd_rpc_client_response_t *
     349             : fd_rpc_client_status( fd_rpc_client_t * rpc,
     350             :                       long              request_id,
     351           0 :                       int               wait ) {
     352           0 :   ulong idx = fd_rpc_find_request( rpc, request_id );
     353           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return NULL;
     354             : 
     355           0 :   if( FD_LIKELY( !wait ) ) return &rpc->requests[ idx ].response;
     356             : 
     357           0 :   for(;;) {
     358           0 :     if( FD_LIKELY( rpc->requests[ idx ].state==FD_RPC_CLIENT_STATE_FINISHED ) ) return &rpc->requests[ idx ].response;
     359           0 :     fd_rpc_client_service( rpc, 1 );
     360           0 :   }
     361           0 : }
     362             : 
     363             : void
     364             : fd_rpc_client_close( fd_rpc_client_t * rpc,
     365           0 :                      long              request_id ) {
     366           0 :   ulong idx = fd_rpc_find_request( rpc, request_id );
     367           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return;
     368             : 
     369           0 :   if( FD_LIKELY( rpc->fds[ idx ].fd>=0 ) ) {
     370           0 :     if( FD_UNLIKELY( close( rpc->fds[ idx ].fd )<0 ) ) FD_LOG_WARNING(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     371           0 :     rpc->fds[ idx ].fd = -1;
     372           0 :   }
     373           0 :   rpc->requests[ idx ].state = FD_RPC_CLIENT_STATE_NONE;
     374           0 : }

Generated by: LCOV version 1.14